diff --git a/src/org/redkale/mq/MessageClient.java b/src/org/redkale/mq/MessageClient.java index fd55b25a0..ec04b2597 100644 --- a/src/org/redkale/mq/MessageClient.java +++ b/src/org/redkale/mq/MessageClient.java @@ -66,6 +66,7 @@ public abstract class MessageClient { messageAgent.logger.log(Level.WARNING, MessageClient.this.getClass().getSimpleName() + " process " + msg + " error, not found mqmsg.respnode"); return; } + if (node.scheduledFuture != null) node.scheduledFuture.cancel(true); AtomicLong ncer = node.getCounter(); if (ncer != null) ncer.decrementAndGet(); node.future.complete(msg); @@ -91,10 +92,12 @@ public abstract class MessageClient { if (counter != null) counter.incrementAndGet(); getProducer().apply(message); if (needresp) { - MessageRespFutureNode node = new MessageRespFutureNode(message, respNodes, counter, future); + MessageRespFutureNode node = new MessageRespFutureNode(messageAgent.logger, message, respNodes, counter, future); respNodes.put(message.getSeqid(), node); ScheduledThreadPoolExecutor executor = messageAgent.timeoutExecutor; - if (executor != null) executor.schedule(node, 30, TimeUnit.SECONDS); + if (executor != null) { + node.scheduledFuture = executor.schedule(node, 30, TimeUnit.SECONDS); + } } else { future.complete(null); } diff --git a/src/org/redkale/mq/MessageRespFutureNode.java b/src/org/redkale/mq/MessageRespFutureNode.java index 08cbebd6d..e2cc2e42c 100644 --- a/src/org/redkale/mq/MessageRespFutureNode.java +++ b/src/org/redkale/mq/MessageRespFutureNode.java @@ -7,6 +7,7 @@ package org.redkale.mq; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.*; /** * MQ管理器 @@ -28,9 +29,17 @@ public class MessageRespFutureNode implements Runnable { protected final CompletableFuture future; + protected final Logger logger; + + protected final MessageRecord message; + protected final ConcurrentHashMap respNodes; - public MessageRespFutureNode(MessageRecord message, ConcurrentHashMap respNodes, AtomicLong counter, CompletableFuture future) { + protected ScheduledFuture scheduledFuture; + + public MessageRespFutureNode(Logger logger, MessageRecord message, ConcurrentHashMap respNodes, AtomicLong counter, CompletableFuture future) { + this.logger = logger; + this.message = message; this.seqid = message.getSeqid(); this.respNodes = respNodes; this.counter = counter; @@ -42,6 +51,7 @@ public class MessageRespFutureNode implements Runnable { public void run() { //timeout respNodes.remove(this.seqid); future.completeExceptionally(new TimeoutException()); + logger.log(Level.WARNING, getClass().getSimpleName() + " wait msg: " + message + " timeout"); } public long getSeqid() {