From cac8662c013e723f227696cec1ec3584de44e69c Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Sat, 30 May 2020 09:31:38 +0800 Subject: [PATCH] --- src/org/redkale/mq/MessageAgent.java | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index c9270c4fc..1bd6e829f 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -28,6 +28,8 @@ public abstract class MessageAgent { protected AnyValue config; + protected MessageProducer producer; + //本地Service消息接收处理器, key:topic protected Map localConsumers; @@ -58,6 +60,19 @@ public abstract class MessageAgent { return name; } + //获取指定topic的生产处理器 + public synchronized MessageProducer getProducer() { + if (this.producer == null) { + this.producer = createProducer(); + this.producer.start(); + this.producer.waitFor(); + } + return this.producer; + } + + //创建指定topic的生产处理器 + protected abstract MessageProducer createProducer(); + //创建topic,如果已存在则跳过 public abstract boolean createTopic(String... topics); @@ -70,9 +85,6 @@ public abstract class MessageAgent { //创建指定topic的消费处理器 public abstract MessageConsumer createConsumer(String topic, Consumer processor); - //创建指定topic的生产处理器 - public abstract MessageProducer createProducer(); - //格式: sncp:req:user protected static String generateSncpReqTopic(NodeServer ns, Service service) { String resname = Sncp.getResourceName(service);