MessageAgent增加timeoutExecutor

This commit is contained in:
Redkale
2020-06-11 14:25:50 +08:00
parent bdf2fd21c3
commit 8840415739
4 changed files with 26 additions and 57 deletions

View File

@@ -44,6 +44,8 @@ public abstract class MessageAgent {
protected SncpMessageClient sncpMessageClient; protected SncpMessageClient sncpMessageClient;
protected ScheduledThreadPoolExecutor timeoutExecutor;
//本地Service消息接收处理器 key:consumer //本地Service消息接收处理器 key:consumer
protected HashMap<String, MessageConsumerNode> messageNodes = new LinkedHashMap<>(); protected HashMap<String, MessageConsumerNode> messageNodes = new LinkedHashMap<>();
@@ -51,6 +53,14 @@ public abstract class MessageAgent {
this.name = checkName(config.getValue("name", "")); this.name = checkName(config.getValue("name", ""));
this.httpMessageClient = new HttpMessageClient(this); this.httpMessageClient = new HttpMessageClient(this);
this.sncpMessageClient = new SncpMessageClient(this); this.sncpMessageClient = new SncpMessageClient(this);
// application (it doesn't execute completion handlers).
this.timeoutExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, (Runnable r) -> {
Thread t = new Thread(r);
t.setName("MessageAgent-Timeout-Thread");
t.setDaemon(true);
return t;
});
this.timeoutExecutor.setRemoveOnCancelPolicy(true);
} }
public CompletableFuture<Map<String, Long>> start() { public CompletableFuture<Map<String, Long>> start() {
@@ -76,6 +86,7 @@ public abstract class MessageAgent {
public void destroy(AnyValue config) { public void destroy(AnyValue config) {
this.httpMessageClient.close().join(); this.httpMessageClient.close().join();
this.sncpMessageClient.close().join(); this.sncpMessageClient.close().join();
if (this.timeoutExecutor != null) this.timeoutExecutor.shutdown();
if (this.producer != null) this.producer.shutdown().join(); if (this.producer != null) this.producer.shutdown().join();
} }

View File

@@ -86,8 +86,10 @@ public class MessageClient {
if (counter != null) counter.incrementAndGet(); if (counter != null) counter.incrementAndGet();
messageAgent.getProducer().apply(message); messageAgent.getProducer().apply(message);
if (needresp) { if (needresp) {
MessageRespFutureNode node = new MessageRespFutureNode(message.getSeqid(), counter, future); MessageRespFutureNode node = new MessageRespFutureNode(message.getSeqid(), respNodes, counter, future);
respNodes.put(message.getSeqid(), node); respNodes.put(message.getSeqid(), node);
ScheduledThreadPoolExecutor executor = messageAgent.timeoutExecutor;
if (executor != null) executor.schedule(node, 6, TimeUnit.SECONDS);
} else { } else {
future.complete(null); future.complete(null);
} }

View File

@@ -5,7 +5,7 @@
*/ */
package org.redkale.mq; package org.redkale.mq;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
/** /**
@@ -18,7 +18,7 @@ import java.util.concurrent.atomic.AtomicLong;
* *
* @since 2.1.0 * @since 2.1.0
*/ */
public class MessageRespFutureNode { public class MessageRespFutureNode implements Runnable {
protected final long seqid; protected final long seqid;
@@ -28,13 +28,22 @@ public class MessageRespFutureNode {
protected final CompletableFuture<MessageRecord> future; protected final CompletableFuture<MessageRecord> future;
public MessageRespFutureNode(long seqid, AtomicLong counter, CompletableFuture<MessageRecord> future) { protected final ConcurrentHashMap<Long, MessageRespFutureNode> respNodes;
public MessageRespFutureNode(long seqid, ConcurrentHashMap<Long, MessageRespFutureNode> respNodes, AtomicLong counter, CompletableFuture<MessageRecord> future) {
this.seqid = seqid; this.seqid = seqid;
this.respNodes = respNodes;
this.counter = counter; this.counter = counter;
this.future = future; this.future = future;
this.createtime = System.currentTimeMillis(); this.createtime = System.currentTimeMillis();
} }
@Override //超时后被timeoutExecutor调用
public void run() { //timeout
respNodes.remove(this.seqid);
future.completeExceptionally(new TimeoutException());
}
public long getSeqid() { public long getSeqid() {
return seqid; return seqid;
} }

View File

@@ -1,53 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.mq;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.*;
/**
* MQ管理器
*
*
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.1.0
*/
public class SncpRespProcessor implements MessageProcessor {
protected final Logger logger;
protected final MessageAgent messageAgent;
protected final ConcurrentHashMap<Long, MessageRespFutureNode> respNodes = new ConcurrentHashMap<>();
public SncpRespProcessor(Logger logger, MessageAgent messageAgent) {
this.logger = logger;
this.messageAgent = messageAgent;
}
@Override
public void process(MessageRecord message) {
MessageRespFutureNode node = respNodes.get(message.getSeqid());
if (node == null) {
logger.log(Level.WARNING, SncpRespProcessor.class.getSimpleName() + " process " + message + " error");
return;
}
if (node.getCounter() != null) node.getCounter().decrementAndGet();
node.future.complete(message);
}
public CompletableFuture<MessageRecord> createFuture2(long seqid, AtomicLong counter) {
CompletableFuture<MessageRecord> future = new CompletableFuture<>();
MessageRespFutureNode node = new MessageRespFutureNode(seqid, counter, future);
respNodes.put(seqid, node);
return future;
}
}