This commit is contained in:
redkale
2023-10-14 15:29:38 +08:00
parent c2e3636062
commit 17fbb29957
2 changed files with 8 additions and 9 deletions

View File

@@ -110,8 +110,8 @@ public abstract class MessageAgent implements Resourcable {
this.workExecutor = threads > 0 ? WorkThread.createExecutor(threads, "Redkale-MessageConsumerThread-[" + name + "]-%s")
: WorkThread.createWorkExecutor(Utility.cpus(), "Redkale-MessageConsumerThread-[" + name + "]-%s");
}
this.httpMessageClient = new MessageClient(this, this.httpAppRespTopic);
this.sncpMessageClient = new MessageClient(this, this.sncpAppRespTopic);
this.httpMessageClient = new MessageClient("http", this, this.httpAppRespTopic);
this.sncpMessageClient = new MessageClient("sncp", this, this.sncpAppRespTopic);
String coderType = config.getValue("coder", "");
if (!coderType.trim().isEmpty()) {
@@ -198,8 +198,6 @@ public abstract class MessageAgent implements Resourcable {
this.messageConsumerList.clear();
this.messageConsumerMap.clear();
//-------------- MessageClient --------------
this.httpMessageClient.stop();
this.sncpMessageClient.stop();
if (this.messageClientProducer != null) {
this.messageClientProducer.stop();
}

View File

@@ -48,12 +48,15 @@ public class MessageClient implements ClusterRpcClient<MessageRecord, MessageRec
protected final AtomicLong msgSeqno;
protected final String protocol;
//key: reqTopic
private final HashMap<String, MessageProcessor> messageProcessors = new HashMap<>();
final ConcurrentHashMap<Long, MessageRespFuture> respQueue = new ConcurrentHashMap<>();
protected MessageClient(MessageAgent messageAgent, String appRespTopic) {
protected MessageClient(String protocol, MessageAgent messageAgent, String appRespTopic) {
this.protocol = protocol;
this.messageAgent = messageAgent;
this.appRespTopic = appRespTopic;
this.msgSeqno = messageAgent.msgSeqno;
@@ -206,10 +209,8 @@ public class MessageClient implements ClusterRpcClient<MessageRecord, MessageRec
return ctype;
}
public void start() {
}
public void stop() {
public String getProtocol() {
return protocol;
}
public MessageAgent getMessageAgent() {