diff --git a/src/main/java/org/redkale/mq/MessageAgent.java b/src/main/java/org/redkale/mq/MessageAgent.java index 9c70dd0d4..e773305e5 100644 --- a/src/main/java/org/redkale/mq/MessageAgent.java +++ b/src/main/java/org/redkale/mq/MessageAgent.java @@ -110,8 +110,8 @@ public abstract class MessageAgent implements Resourcable { this.workExecutor = threads > 0 ? WorkThread.createExecutor(threads, "Redkale-MessageConsumerThread-[" + name + "]-%s") : WorkThread.createWorkExecutor(Utility.cpus(), "Redkale-MessageConsumerThread-[" + name + "]-%s"); } - this.httpMessageClient = new MessageClient(this, this.httpAppRespTopic); - this.sncpMessageClient = new MessageClient(this, this.sncpAppRespTopic); + this.httpMessageClient = new MessageClient("http", this, this.httpAppRespTopic); + this.sncpMessageClient = new MessageClient("sncp", this, this.sncpAppRespTopic); String coderType = config.getValue("coder", ""); if (!coderType.trim().isEmpty()) { @@ -198,8 +198,6 @@ public abstract class MessageAgent implements Resourcable { this.messageConsumerList.clear(); this.messageConsumerMap.clear(); //-------------- MessageClient -------------- - this.httpMessageClient.stop(); - this.sncpMessageClient.stop(); if (this.messageClientProducer != null) { this.messageClientProducer.stop(); } diff --git a/src/main/java/org/redkale/mq/MessageClient.java b/src/main/java/org/redkale/mq/MessageClient.java index cde302dad..b053b7c35 100644 --- a/src/main/java/org/redkale/mq/MessageClient.java +++ b/src/main/java/org/redkale/mq/MessageClient.java @@ -48,12 +48,15 @@ public class MessageClient implements ClusterRpcClient messageProcessors = new HashMap<>(); final ConcurrentHashMap respQueue = new ConcurrentHashMap<>(); - protected MessageClient(MessageAgent messageAgent, String appRespTopic) { + protected MessageClient(String protocol, MessageAgent messageAgent, String appRespTopic) { + this.protocol = protocol; this.messageAgent = messageAgent; this.appRespTopic = appRespTopic; this.msgSeqno = messageAgent.msgSeqno; @@ -206,10 +209,8 @@ public class MessageClient implements ClusterRpcClient