diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index c9270c4fc..1bd6e829f 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -28,6 +28,8 @@ public abstract class MessageAgent { protected AnyValue config; + protected MessageProducer producer; + //本地Service消息接收处理器, key:topic protected Map localConsumers; @@ -58,6 +60,19 @@ public abstract class MessageAgent { return name; } + //获取指定topic的生产处理器 + public synchronized MessageProducer getProducer() { + if (this.producer == null) { + this.producer = createProducer(); + this.producer.start(); + this.producer.waitFor(); + } + return this.producer; + } + + //创建指定topic的生产处理器 + protected abstract MessageProducer createProducer(); + //创建topic,如果已存在则跳过 public abstract boolean createTopic(String... topics); @@ -70,9 +85,6 @@ public abstract class MessageAgent { //创建指定topic的消费处理器 public abstract MessageConsumer createConsumer(String topic, Consumer processor); - //创建指定topic的生产处理器 - public abstract MessageProducer createProducer(); - //格式: sncp:req:user protected static String generateSncpReqTopic(NodeServer ns, Service service) { String resname = Sncp.getResourceName(service);