diff --git a/src/org/redkale/mq/HttpMessageClient.java b/src/org/redkale/mq/HttpMessageClient.java index 8388da0b6..3efa0573d 100644 --- a/src/org/redkale/mq/HttpMessageClient.java +++ b/src/org/redkale/mq/HttpMessageClient.java @@ -136,4 +136,9 @@ public class HttpMessageClient extends MessageClient { message.userid(userid).groupid(groupid); return sendMessage(message, needresp, counter).thenApply(r -> r.decodeContent(HttpResultCoder.getInstance())); } + + @Override + protected MessageProducer getProducer() { + return messageAgent.getHttpProducer(); + } } diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index b12e317bd..41fb3968e 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -38,7 +38,9 @@ public abstract class MessageAgent { protected AnyValue config; - protected MessageProducer producer; + protected MessageProducer httpProducer; + + protected MessageProducer sncpProducer; protected HttpMessageClient httpMessageClient; @@ -87,7 +89,8 @@ public abstract class MessageAgent { this.httpMessageClient.close().join(); this.sncpMessageClient.close().join(); if (this.timeoutExecutor != null) this.timeoutExecutor.shutdown(); - if (this.producer != null) this.producer.shutdown().join(); + if (this.sncpProducer != null) this.sncpProducer.shutdown().join(); + if (this.httpProducer != null) this.httpProducer.shutdown().join(); } public String getName() { @@ -122,16 +125,28 @@ public abstract class MessageAgent { } //获取指定topic的生产处理器 - public MessageProducer getProducer() { - if (this.producer == null) { + public MessageProducer getSncpProducer() { + if (this.sncpProducer == null) { synchronized (this) { - if (this.producer == null) { - this.producer = createProducer(); - this.producer.startup().join(); + if (this.sncpProducer == null) { + this.sncpProducer = createProducer(); + this.sncpProducer.startup().join(); } } } - return this.producer; + return this.sncpProducer; + } + + public MessageProducer getHttpProducer() { + if (this.httpProducer == null) { + synchronized (this) { + if (this.httpProducer == null) { + this.httpProducer = createProducer(); + this.httpProducer.startup().join(); + } + } + } + return this.httpProducer; } //创建指定topic的生产处理器 @@ -156,7 +171,7 @@ public abstract class MessageAgent { String[] topics = generateHttpReqTopics(service); String consumerid = generateHttpConsumerid(topics, service); if (messageNodes.containsKey(consumerid)) throw new RuntimeException("consumerid(" + consumerid + ") is repeat"); - HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, getProducer(), ns, service, servlet); + HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, getHttpProducer(), ns, service, servlet); this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(topics, consumerid, processor))); } @@ -164,7 +179,7 @@ public abstract class MessageAgent { String topic = generateSncpReqTopic(service); String consumerid = generateSncpConsumerid(topic, service); if (messageNodes.containsKey(consumerid)) throw new RuntimeException("consumerid(" + consumerid + ") is repeat"); - SncpMessageProcessor processor = new SncpMessageProcessor(this.logger, getProducer(), ns, service, servlet); + SncpMessageProcessor processor = new SncpMessageProcessor(this.logger, getSncpProducer(), ns, service, servlet); this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(new String[]{topic}, consumerid, processor))); } diff --git a/src/org/redkale/mq/MessageClient.java b/src/org/redkale/mq/MessageClient.java index 0993a9473..caeca6337 100644 --- a/src/org/redkale/mq/MessageClient.java +++ b/src/org/redkale/mq/MessageClient.java @@ -19,7 +19,7 @@ import org.redkale.convert.ConvertType; * * @since 2.1.0 */ -public class MessageClient { +public abstract class MessageClient { protected final ConcurrentHashMap respNodes = new ConcurrentHashMap<>(); @@ -84,7 +84,7 @@ public class MessageClient { message.setResptopic(respTopic); } if (counter != null) counter.incrementAndGet(); - messageAgent.getProducer().apply(message); + getProducer().apply(message); if (needresp) { MessageRespFutureNode node = new MessageRespFutureNode(message.getSeqid(), respNodes, counter, future); respNodes.put(message.getSeqid(), node); @@ -99,4 +99,6 @@ public class MessageClient { return future; } } + + protected abstract MessageProducer getProducer(); } diff --git a/src/org/redkale/mq/SncpMessageClient.java b/src/org/redkale/mq/SncpMessageClient.java index fe84cd09f..f151bf67a 100644 --- a/src/org/redkale/mq/SncpMessageClient.java +++ b/src/org/redkale/mq/SncpMessageClient.java @@ -23,4 +23,9 @@ public class SncpMessageClient extends MessageClient { this.respTopic = messageAgent.generateSncpRespTopic(); this.convertType = ConvertType.BSON; } + + @Override + protected MessageProducer getProducer() { + return messageAgent.getSncpProducer(); + } }