From 6057b8f90abc6d7c4735e7eb1aba673871be98bf Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Sat, 26 Dec 2020 00:48:05 +0800 Subject: [PATCH] --- src/org/redkale/mq/MessageClient.java | 7 +++++-- src/org/redkale/mq/MessageRespFutureNode.java | 12 +++++++++++- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/src/org/redkale/mq/MessageClient.java b/src/org/redkale/mq/MessageClient.java index fd55b25a0..ec04b2597 100644 --- a/src/org/redkale/mq/MessageClient.java +++ b/src/org/redkale/mq/MessageClient.java @@ -66,6 +66,7 @@ public abstract class MessageClient { messageAgent.logger.log(Level.WARNING, MessageClient.this.getClass().getSimpleName() + " process " + msg + " error, not found mqmsg.respnode"); return; } + if (node.scheduledFuture != null) node.scheduledFuture.cancel(true); AtomicLong ncer = node.getCounter(); if (ncer != null) ncer.decrementAndGet(); node.future.complete(msg); @@ -91,10 +92,12 @@ public abstract class MessageClient { if (counter != null) counter.incrementAndGet(); getProducer().apply(message); if (needresp) { - MessageRespFutureNode node = new MessageRespFutureNode(message, respNodes, counter, future); + MessageRespFutureNode node = new MessageRespFutureNode(messageAgent.logger, message, respNodes, counter, future); respNodes.put(message.getSeqid(), node); ScheduledThreadPoolExecutor executor = messageAgent.timeoutExecutor; - if (executor != null) executor.schedule(node, 30, TimeUnit.SECONDS); + if (executor != null) { + node.scheduledFuture = executor.schedule(node, 30, TimeUnit.SECONDS); + } } else { future.complete(null); } diff --git a/src/org/redkale/mq/MessageRespFutureNode.java b/src/org/redkale/mq/MessageRespFutureNode.java index 08cbebd6d..e2cc2e42c 100644 --- a/src/org/redkale/mq/MessageRespFutureNode.java +++ b/src/org/redkale/mq/MessageRespFutureNode.java @@ -7,6 +7,7 @@ package org.redkale.mq; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.*; /** * MQ管理器 @@ -28,9 +29,17 @@ public class MessageRespFutureNode implements Runnable { protected final CompletableFuture future; + protected final Logger logger; + + protected final MessageRecord message; + protected final ConcurrentHashMap respNodes; - public MessageRespFutureNode(MessageRecord message, ConcurrentHashMap respNodes, AtomicLong counter, CompletableFuture future) { + protected ScheduledFuture scheduledFuture; + + public MessageRespFutureNode(Logger logger, MessageRecord message, ConcurrentHashMap respNodes, AtomicLong counter, CompletableFuture future) { + this.logger = logger; + this.message = message; this.seqid = message.getSeqid(); this.respNodes = respNodes; this.counter = counter; @@ -42,6 +51,7 @@ public class MessageRespFutureNode implements Runnable { public void run() { //timeout respNodes.remove(this.seqid); future.completeExceptionally(new TimeoutException()); + logger.log(Level.WARNING, getClass().getSimpleName() + " wait msg: " + message + " timeout"); } public long getSeqid() {