This commit is contained in:
Redkale
2020-12-26 00:48:05 +08:00
parent 2847f761bb
commit 6057b8f90a
2 changed files with 16 additions and 3 deletions

View File

@@ -66,6 +66,7 @@ public abstract class MessageClient {
messageAgent.logger.log(Level.WARNING, MessageClient.this.getClass().getSimpleName() + " process " + msg + " error not found mqmsg.respnode");
return;
}
if (node.scheduledFuture != null) node.scheduledFuture.cancel(true);
AtomicLong ncer = node.getCounter();
if (ncer != null) ncer.decrementAndGet();
node.future.complete(msg);
@@ -91,10 +92,12 @@ public abstract class MessageClient {
if (counter != null) counter.incrementAndGet();
getProducer().apply(message);
if (needresp) {
MessageRespFutureNode node = new MessageRespFutureNode(message, respNodes, counter, future);
MessageRespFutureNode node = new MessageRespFutureNode(messageAgent.logger, message, respNodes, counter, future);
respNodes.put(message.getSeqid(), node);
ScheduledThreadPoolExecutor executor = messageAgent.timeoutExecutor;
if (executor != null) executor.schedule(node, 30, TimeUnit.SECONDS);
if (executor != null) {
node.scheduledFuture = executor.schedule(node, 30, TimeUnit.SECONDS);
}
} else {
future.complete(null);
}

View File

@@ -7,6 +7,7 @@ package org.redkale.mq;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.*;
/**
* MQ管理器
@@ -28,9 +29,17 @@ public class MessageRespFutureNode implements Runnable {
protected final CompletableFuture<MessageRecord> future;
protected final Logger logger;
protected final MessageRecord message;
protected final ConcurrentHashMap<Long, MessageRespFutureNode> respNodes;
public MessageRespFutureNode(MessageRecord message, ConcurrentHashMap<Long, MessageRespFutureNode> respNodes, AtomicLong counter, CompletableFuture<MessageRecord> future) {
protected ScheduledFuture<?> scheduledFuture;
public MessageRespFutureNode(Logger logger, MessageRecord message, ConcurrentHashMap<Long, MessageRespFutureNode> respNodes, AtomicLong counter, CompletableFuture<MessageRecord> future) {
this.logger = logger;
this.message = message;
this.seqid = message.getSeqid();
this.respNodes = respNodes;
this.counter = counter;
@@ -42,6 +51,7 @@ public class MessageRespFutureNode implements Runnable {
public void run() { //timeout
respNodes.remove(this.seqid);
future.completeExceptionally(new TimeoutException());
logger.log(Level.WARNING, getClass().getSimpleName() + " wait msg: " + message + " timeout");
}
public long getSeqid() {