From 88404157396ebbf48d586f38f92f64e7718538fb Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Thu, 11 Jun 2020 14:25:50 +0800 Subject: [PATCH] =?UTF-8?q?MessageAgent=E5=A2=9E=E5=8A=A0timeoutExecutor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/org/redkale/mq/MessageAgent.java | 11 ++++ src/org/redkale/mq/MessageClient.java | 4 +- src/org/redkale/mq/MessageRespFutureNode.java | 15 ++++-- src/org/redkale/mq/SncpRespProcessor.java | 53 ------------------- 4 files changed, 26 insertions(+), 57 deletions(-) delete mode 100644 src/org/redkale/mq/SncpRespProcessor.java diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index 238b75ba9..86b151e27 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -44,6 +44,8 @@ public abstract class MessageAgent { protected SncpMessageClient sncpMessageClient; + protected ScheduledThreadPoolExecutor timeoutExecutor; + //本地Service消息接收处理器, key:consumer protected HashMap messageNodes = new LinkedHashMap<>(); @@ -51,6 +53,14 @@ public abstract class MessageAgent { this.name = checkName(config.getValue("name", "")); this.httpMessageClient = new HttpMessageClient(this); this.sncpMessageClient = new SncpMessageClient(this); + // application (it doesn't execute completion handlers). + this.timeoutExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, (Runnable r) -> { + Thread t = new Thread(r); + t.setName("MessageAgent-Timeout-Thread"); + t.setDaemon(true); + return t; + }); + this.timeoutExecutor.setRemoveOnCancelPolicy(true); } public CompletableFuture> start() { @@ -76,6 +86,7 @@ public abstract class MessageAgent { public void destroy(AnyValue config) { this.httpMessageClient.close().join(); this.sncpMessageClient.close().join(); + if (this.timeoutExecutor != null) this.timeoutExecutor.shutdown(); if (this.producer != null) this.producer.shutdown().join(); } diff --git a/src/org/redkale/mq/MessageClient.java b/src/org/redkale/mq/MessageClient.java index db2e7e5a8..2524e7103 100644 --- a/src/org/redkale/mq/MessageClient.java +++ b/src/org/redkale/mq/MessageClient.java @@ -86,8 +86,10 @@ public class MessageClient { if (counter != null) counter.incrementAndGet(); messageAgent.getProducer().apply(message); if (needresp) { - MessageRespFutureNode node = new MessageRespFutureNode(message.getSeqid(), counter, future); + MessageRespFutureNode node = new MessageRespFutureNode(message.getSeqid(), respNodes, counter, future); respNodes.put(message.getSeqid(), node); + ScheduledThreadPoolExecutor executor = messageAgent.timeoutExecutor; + if (executor != null) executor.schedule(node, 6, TimeUnit.SECONDS); } else { future.complete(null); } diff --git a/src/org/redkale/mq/MessageRespFutureNode.java b/src/org/redkale/mq/MessageRespFutureNode.java index a75845f54..185f2986d 100644 --- a/src/org/redkale/mq/MessageRespFutureNode.java +++ b/src/org/redkale/mq/MessageRespFutureNode.java @@ -5,7 +5,7 @@ */ package org.redkale.mq; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; /** @@ -18,7 +18,7 @@ import java.util.concurrent.atomic.AtomicLong; * * @since 2.1.0 */ -public class MessageRespFutureNode { +public class MessageRespFutureNode implements Runnable { protected final long seqid; @@ -28,13 +28,22 @@ public class MessageRespFutureNode { protected final CompletableFuture future; - public MessageRespFutureNode(long seqid, AtomicLong counter, CompletableFuture future) { + protected final ConcurrentHashMap respNodes; + + public MessageRespFutureNode(long seqid, ConcurrentHashMap respNodes, AtomicLong counter, CompletableFuture future) { this.seqid = seqid; + this.respNodes = respNodes; this.counter = counter; this.future = future; this.createtime = System.currentTimeMillis(); } + @Override //超时后被timeoutExecutor调用 + public void run() { //timeout + respNodes.remove(this.seqid); + future.completeExceptionally(new TimeoutException()); + } + public long getSeqid() { return seqid; } diff --git a/src/org/redkale/mq/SncpRespProcessor.java b/src/org/redkale/mq/SncpRespProcessor.java deleted file mode 100644 index ddf51a59e..000000000 --- a/src/org/redkale/mq/SncpRespProcessor.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package org.redkale.mq; - -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicLong; -import java.util.logging.*; - -/** - * MQ管理器 - * - * - * 详情见: https://redkale.org - * - * @author zhangjx - * - * @since 2.1.0 - */ -public class SncpRespProcessor implements MessageProcessor { - - protected final Logger logger; - - protected final MessageAgent messageAgent; - - protected final ConcurrentHashMap respNodes = new ConcurrentHashMap<>(); - - public SncpRespProcessor(Logger logger, MessageAgent messageAgent) { - this.logger = logger; - this.messageAgent = messageAgent; - } - - @Override - public void process(MessageRecord message) { - MessageRespFutureNode node = respNodes.get(message.getSeqid()); - if (node == null) { - logger.log(Level.WARNING, SncpRespProcessor.class.getSimpleName() + " process " + message + " error"); - return; - } - if (node.getCounter() != null) node.getCounter().decrementAndGet(); - node.future.complete(message); - } - - public CompletableFuture createFuture2(long seqid, AtomicLong counter) { - CompletableFuture future = new CompletableFuture<>(); - MessageRespFutureNode node = new MessageRespFutureNode(seqid, counter, future); - respNodes.put(seqid, node); - return future; - } - -}