This commit is contained in:
Redkale
2020-05-30 09:31:38 +08:00
parent 197c58ef98
commit cac8662c01

View File

@@ -28,6 +28,8 @@ public abstract class MessageAgent {
protected AnyValue config;
protected MessageProducer producer;
//本地Service消息接收处理器 key:topic
protected Map<String, Service> localConsumers;
@@ -58,6 +60,19 @@ public abstract class MessageAgent {
return name;
}
//获取指定topic的生产处理器
public synchronized MessageProducer getProducer() {
if (this.producer == null) {
this.producer = createProducer();
this.producer.start();
this.producer.waitFor();
}
return this.producer;
}
//创建指定topic的生产处理器
protected abstract MessageProducer createProducer();
//创建topic如果已存在则跳过
public abstract boolean createTopic(String... topics);
@@ -70,9 +85,6 @@ public abstract class MessageAgent {
//创建指定topic的消费处理器
public abstract MessageConsumer createConsumer(String topic, Consumer<MessageRecord> processor);
//创建指定topic的生产处理器
public abstract MessageProducer createProducer();
//格式: sncp:req:user
protected static String generateSncpReqTopic(NodeServer ns, Service service) {
String resname = Sncp.getResourceName(service);