This commit is contained in:
Redkale
2020-06-14 01:01:38 +08:00
parent 13217a11c0
commit 0377dac71f

View File

@@ -65,17 +65,18 @@ public abstract class MessageClient {
synchronized (this) {
if (this.respConsumerid == null) this.respConsumerid = "consumer-" + this.respTopic;
if (this.consumer == null) {
MessageProcessor processor = (msg,callback) -> {
MessageProcessor processor = (msg, callback) -> {
MessageRespFutureNode node = respNodes.get(msg.getSeqid());
if (node == null) {
messageAgent.logger.log(Level.WARNING, MessageClient.this.getClass().getSimpleName() + " process " + msg + " error");
messageAgent.logger.log(Level.WARNING, MessageClient.this.getClass().getSimpleName() + " process " + msg + " error not found msgnode");
return;
}
if (node.getCounter() != null) node.getCounter().decrementAndGet();
node.future.complete(msg);
};
this.consumer = messageAgent.createConsumer(new String[]{respTopic}, respConsumerid, processor);
this.consumer.startup().join();
MessageConsumer one = messageAgent.createConsumer(new String[]{respTopic}, respConsumerid, processor);
one.startup().join();
this.consumer = one;
}
}
}