diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index 9b5abc5ba..4d971bfc6 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -55,7 +55,7 @@ public abstract class MessageAgent { protected ThreadHashExecutor workExecutor; - //本地Service消息接收处理器, key:consumer + //本地Service消息接收处理器, key:respConsumer protected HashMap messageNodes = new LinkedHashMap<>(); public void init(AnyValue config) { @@ -104,9 +104,9 @@ public abstract class MessageAgent { protected List getAllMessageConsumer() { List consumers = new ArrayList<>(); - MessageConsumer one = this.httpMessageClient == null ? null : this.httpMessageClient.consumer; + MessageConsumer one = this.httpMessageClient == null ? null : this.httpMessageClient.respConsumer; if (one != null) consumers.add(one); - one = this.sncpMessageClient == null ? null : this.sncpMessageClient.consumer; + one = this.sncpMessageClient == null ? null : this.sncpMessageClient.respConsumer; if (one != null) consumers.add(one); consumers.addAll(messageNodes.values().stream().map(mcn -> mcn.consumer).collect(Collectors.toList())); return consumers; @@ -237,7 +237,7 @@ public abstract class MessageAgent { return "sncp.req." + Sncp.getResourceType(service).getSimpleName().replaceAll("Service.*$", "").toLowerCase() + (resname.isEmpty() ? "" : ("-" + resname)); } - //格式: consumer-sncp.req.user 不提供外部使用 + //格式: respConsumer-sncp.req.user 不提供外部使用 protected final String generateSncpConsumerid(String topic, Service service) { return "consumer-" + topic; } @@ -271,7 +271,7 @@ public abstract class MessageAgent { return new String[]{"http.req." + module + (resname.isEmpty() ? "" : ("-" + resname))}; } - //格式: consumer-http.req.user + //格式: respConsumer-http.req.user protected String generateHttpConsumerid(String[] topics, Service service) { String resname = Sncp.getResourceName(service); String key = Rest.getRestModule(service).toLowerCase(); diff --git a/src/org/redkale/mq/MessageClient.java b/src/org/redkale/mq/MessageClient.java index f805a3486..7b592daaf 100644 --- a/src/org/redkale/mq/MessageClient.java +++ b/src/org/redkale/mq/MessageClient.java @@ -25,7 +25,7 @@ public abstract class MessageClient { protected final MessageAgent messageAgent; - protected MessageConsumer consumer; + protected MessageConsumer respConsumer; protected String respTopic; @@ -38,29 +38,30 @@ public abstract class MessageClient { } protected CompletableFuture close() { - if (this.consumer == null) return CompletableFuture.completedFuture(null); - return this.consumer.shutdown(); + if (this.respConsumer == null) return CompletableFuture.completedFuture(null); + return this.respConsumer.shutdown(); } protected CompletableFuture sendMessage(MessageRecord message, boolean needresp, AtomicLong counter) { CompletableFuture future = new CompletableFuture<>(); try { - if (this.consumer == null) { + if (this.respConsumer == null) { synchronized (this) { if (this.respConsumerid == null) this.respConsumerid = "consumer-" + this.respTopic; - if (this.consumer == null) { + if (this.respConsumer == null) { MessageProcessor processor = (msg, callback) -> { MessageRespFutureNode node = respNodes.remove(msg.getSeqid()); if (node == null) { messageAgent.logger.log(Level.WARNING, MessageClient.this.getClass().getSimpleName() + " process " + msg + " error, not found msgnode"); return; } - if (node.getCounter() != null) node.getCounter().decrementAndGet(); + AtomicLong ncer = node.getCounter(); + if (ncer != null) ncer.decrementAndGet(); node.future.complete(msg); }; MessageConsumer one = messageAgent.createConsumer(new String[]{respTopic}, respConsumerid, processor); one.startup().join(); - this.consumer = one; + this.respConsumer = one; } } }