This commit is contained in:
Redkale
2020-06-11 23:30:43 +08:00
parent 98f29d6a6e
commit 4cf160e5d3
4 changed files with 39 additions and 12 deletions

View File

@@ -136,4 +136,9 @@ public class HttpMessageClient extends MessageClient {
message.userid(userid).groupid(groupid); message.userid(userid).groupid(groupid);
return sendMessage(message, needresp, counter).thenApply(r -> r.decodeContent(HttpResultCoder.getInstance())); return sendMessage(message, needresp, counter).thenApply(r -> r.decodeContent(HttpResultCoder.getInstance()));
} }
@Override
protected MessageProducer getProducer() {
return messageAgent.getHttpProducer();
}
} }

View File

@@ -38,7 +38,9 @@ public abstract class MessageAgent {
protected AnyValue config; protected AnyValue config;
protected MessageProducer producer; protected MessageProducer httpProducer;
protected MessageProducer sncpProducer;
protected HttpMessageClient httpMessageClient; protected HttpMessageClient httpMessageClient;
@@ -87,7 +89,8 @@ public abstract class MessageAgent {
this.httpMessageClient.close().join(); this.httpMessageClient.close().join();
this.sncpMessageClient.close().join(); this.sncpMessageClient.close().join();
if (this.timeoutExecutor != null) this.timeoutExecutor.shutdown(); if (this.timeoutExecutor != null) this.timeoutExecutor.shutdown();
if (this.producer != null) this.producer.shutdown().join(); if (this.sncpProducer != null) this.sncpProducer.shutdown().join();
if (this.httpProducer != null) this.httpProducer.shutdown().join();
} }
public String getName() { public String getName() {
@@ -122,16 +125,28 @@ public abstract class MessageAgent {
} }
//获取指定topic的生产处理器 //获取指定topic的生产处理器
public MessageProducer getProducer() { public MessageProducer getSncpProducer() {
if (this.producer == null) { if (this.sncpProducer == null) {
synchronized (this) { synchronized (this) {
if (this.producer == null) { if (this.sncpProducer == null) {
this.producer = createProducer(); this.sncpProducer = createProducer();
this.producer.startup().join(); this.sncpProducer.startup().join();
} }
} }
} }
return this.producer; return this.sncpProducer;
}
public MessageProducer getHttpProducer() {
if (this.httpProducer == null) {
synchronized (this) {
if (this.httpProducer == null) {
this.httpProducer = createProducer();
this.httpProducer.startup().join();
}
}
}
return this.httpProducer;
} }
//创建指定topic的生产处理器 //创建指定topic的生产处理器
@@ -156,7 +171,7 @@ public abstract class MessageAgent {
String[] topics = generateHttpReqTopics(service); String[] topics = generateHttpReqTopics(service);
String consumerid = generateHttpConsumerid(topics, service); String consumerid = generateHttpConsumerid(topics, service);
if (messageNodes.containsKey(consumerid)) throw new RuntimeException("consumerid(" + consumerid + ") is repeat"); if (messageNodes.containsKey(consumerid)) throw new RuntimeException("consumerid(" + consumerid + ") is repeat");
HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, getProducer(), ns, service, servlet); HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, getHttpProducer(), ns, service, servlet);
this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(topics, consumerid, processor))); this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(topics, consumerid, processor)));
} }
@@ -164,7 +179,7 @@ public abstract class MessageAgent {
String topic = generateSncpReqTopic(service); String topic = generateSncpReqTopic(service);
String consumerid = generateSncpConsumerid(topic, service); String consumerid = generateSncpConsumerid(topic, service);
if (messageNodes.containsKey(consumerid)) throw new RuntimeException("consumerid(" + consumerid + ") is repeat"); if (messageNodes.containsKey(consumerid)) throw new RuntimeException("consumerid(" + consumerid + ") is repeat");
SncpMessageProcessor processor = new SncpMessageProcessor(this.logger, getProducer(), ns, service, servlet); SncpMessageProcessor processor = new SncpMessageProcessor(this.logger, getSncpProducer(), ns, service, servlet);
this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(new String[]{topic}, consumerid, processor))); this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(new String[]{topic}, consumerid, processor)));
} }

View File

@@ -19,7 +19,7 @@ import org.redkale.convert.ConvertType;
* *
* @since 2.1.0 * @since 2.1.0
*/ */
public class MessageClient { public abstract class MessageClient {
protected final ConcurrentHashMap<Long, MessageRespFutureNode> respNodes = new ConcurrentHashMap<>(); protected final ConcurrentHashMap<Long, MessageRespFutureNode> respNodes = new ConcurrentHashMap<>();
@@ -84,7 +84,7 @@ public class MessageClient {
message.setResptopic(respTopic); message.setResptopic(respTopic);
} }
if (counter != null) counter.incrementAndGet(); if (counter != null) counter.incrementAndGet();
messageAgent.getProducer().apply(message); getProducer().apply(message);
if (needresp) { if (needresp) {
MessageRespFutureNode node = new MessageRespFutureNode(message.getSeqid(), respNodes, counter, future); MessageRespFutureNode node = new MessageRespFutureNode(message.getSeqid(), respNodes, counter, future);
respNodes.put(message.getSeqid(), node); respNodes.put(message.getSeqid(), node);
@@ -99,4 +99,6 @@ public class MessageClient {
return future; return future;
} }
} }
protected abstract MessageProducer getProducer();
} }

View File

@@ -23,4 +23,9 @@ public class SncpMessageClient extends MessageClient {
this.respTopic = messageAgent.generateSncpRespTopic(); this.respTopic = messageAgent.generateSncpRespTopic();
this.convertType = ConvertType.BSON; this.convertType = ConvertType.BSON;
} }
@Override
protected MessageProducer getProducer() {
return messageAgent.getSncpProducer();
}
} }