From 17fbb29957e46ce5c37ccbe13dcac58c079443c8 Mon Sep 17 00:00:00 2001 From: redkale Date: Sat, 14 Oct 2023 15:29:38 +0800 Subject: [PATCH] mq --- src/main/java/org/redkale/mq/MessageAgent.java | 6 ++---- src/main/java/org/redkale/mq/MessageClient.java | 11 ++++++----- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/redkale/mq/MessageAgent.java b/src/main/java/org/redkale/mq/MessageAgent.java index 9c70dd0d4..e773305e5 100644 --- a/src/main/java/org/redkale/mq/MessageAgent.java +++ b/src/main/java/org/redkale/mq/MessageAgent.java @@ -110,8 +110,8 @@ public abstract class MessageAgent implements Resourcable { this.workExecutor = threads > 0 ? WorkThread.createExecutor(threads, "Redkale-MessageConsumerThread-[" + name + "]-%s") : WorkThread.createWorkExecutor(Utility.cpus(), "Redkale-MessageConsumerThread-[" + name + "]-%s"); } - this.httpMessageClient = new MessageClient(this, this.httpAppRespTopic); - this.sncpMessageClient = new MessageClient(this, this.sncpAppRespTopic); + this.httpMessageClient = new MessageClient("http", this, this.httpAppRespTopic); + this.sncpMessageClient = new MessageClient("sncp", this, this.sncpAppRespTopic); String coderType = config.getValue("coder", ""); if (!coderType.trim().isEmpty()) { @@ -198,8 +198,6 @@ public abstract class MessageAgent implements Resourcable { this.messageConsumerList.clear(); this.messageConsumerMap.clear(); //-------------- MessageClient -------------- - this.httpMessageClient.stop(); - this.sncpMessageClient.stop(); if (this.messageClientProducer != null) { this.messageClientProducer.stop(); } diff --git a/src/main/java/org/redkale/mq/MessageClient.java b/src/main/java/org/redkale/mq/MessageClient.java index cde302dad..b053b7c35 100644 --- a/src/main/java/org/redkale/mq/MessageClient.java +++ b/src/main/java/org/redkale/mq/MessageClient.java @@ -48,12 +48,15 @@ public class MessageClient implements ClusterRpcClient messageProcessors = new HashMap<>(); final ConcurrentHashMap respQueue = new ConcurrentHashMap<>(); - protected MessageClient(MessageAgent messageAgent, String appRespTopic) { + protected MessageClient(String protocol, MessageAgent messageAgent, String appRespTopic) { + this.protocol = protocol; this.messageAgent = messageAgent; this.appRespTopic = appRespTopic; this.msgSeqno = messageAgent.msgSeqno; @@ -206,10 +209,8 @@ public class MessageClient implements ClusterRpcClient