From 8316fce151c7dfaed09797284289f2757f61f07b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=A2=81=E6=98=BE=E4=BC=98?= <237809796@qq.com> Date: Mon, 17 Apr 2023 02:11:07 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=EF=BC=9A=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E9=98=9F=E5=88=97=E7=9B=91=E6=8E=A7=EF=BC=8C=E7=9B=91=E6=8E=A7?= =?UTF-8?q?=E5=88=B0=E9=98=9F=E5=88=97=E4=BB=BB=E5=8A=A1=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E8=B6=85=E8=BF=875=E7=A7=92=EF=BC=8C=E6=8A=9B=E5=87=BA?= =?UTF-8?q?=E6=97=A5=E5=BF=97=E6=8F=90=E9=86=92=EF=BC=8C=E5=B9=B6=E5=88=9B?= =?UTF-8?q?=E5=BB=BA=E6=96=B0=E7=9A=84=E6=89=A7=E8=A1=8C=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E7=BB=A7=E7=BB=AD=E5=90=8E=E7=BB=AD=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/com/zdemo/zhub/ZHubClient.java | 60 ++++++++++++++++++------------ 1 file changed, 37 insertions(+), 23 deletions(-) diff --git a/src/com/zdemo/zhub/ZHubClient.java b/src/com/zdemo/zhub/ZHubClient.java index 2a7e6f8..a86b606 100644 --- a/src/com/zdemo/zhub/ZHubClient.java +++ b/src/com/zdemo/zhub/ZHubClient.java @@ -16,10 +16,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.function.BiConsumer; +import java.util.concurrent.*; import java.util.function.Consumer; import java.util.function.Function; import java.util.logging.Level; @@ -44,11 +41,11 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer private final LinkedBlockingQueue> rpcCallQueue = new LinkedBlockingQueue<>(); // RPC CALL MSG private final LinkedBlockingQueue sendMsgQueue = new LinkedBlockingQueue<>(); // SEND MSG - private BiConsumer threadBuilder = (r, n) -> { + /*private BiConsumer threadBuilder = (r, n) -> { for (int i = 0; i < n; i++) { new Thread(() -> r.run()).start(); } - }; + };*/ /*private static boolean isFirst = true; private boolean isMain = false;*/ @@ -182,8 +179,9 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer } }).start(); - // 定时调度事件 - threadBuilder.accept(() -> { + // 定时调度事件,已加入耗时监控 + new Thread(() -> { + ExecutorService pool = Executors.newFixedThreadPool(1); while (true) { Timer timer = null; try { @@ -191,36 +189,46 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer return; } long start = System.currentTimeMillis(); - timer.runnable.run(); + pool.submit(timer.runnable).get(5, TimeUnit.SECONDS); long end = System.currentTimeMillis(); logger.finest(String.format("timer [%s] : elapsed time %s ms", timer.name, end - start)); } catch (InterruptedException e) { e.printStackTrace(); + } catch (TimeoutException e) { + logger.log(Level.SEVERE, "timer [" + timer.name + "] time out: " + 5 + " S", e); + pool = Executors.newFixedThreadPool(1); } catch (Exception e) { logger.log(Level.WARNING, "timer [" + timer.name + "]", e); } } - }, 1); + }).start(); - // topic msg - threadBuilder.accept(() -> { + // topic msg,已加入耗时监控 + new Thread(() -> { + ExecutorService pool = Executors.newFixedThreadPool(1); while (true) { Event event = null; try { if ((event = topicQueue.take()) == null) { continue; } - accept(event.topic, event.value); + + String topic = event.topic; + String value = event.value; + pool.submit(() -> accept(topic, value)).get(5, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); + } catch (TimeoutException e) { + logger.log(Level.SEVERE, "topic[" + event.topic + "] event deal time out: " + 5 + " S, value: " + event.value, e); + pool = Executors.newFixedThreadPool(1); } catch (Exception e) { logger.log(Level.WARNING, "topic[" + event.topic + "] event accept error :" + event.value, e); } } - }, 16); + }).start(); - // rpc back - threadBuilder.accept(() -> { + // rpc back ,仅做数据解析,暂无耗时监控 + new Thread(() -> { while (true) { Event event = null; try { @@ -236,10 +244,11 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer logger.log(Level.WARNING, "rpc-back[" + event.topic + "] event accept error :" + event.value, e); } } - }, 1); + }).start(); - // rpc call - threadBuilder.accept(() -> { + // rpc call,已加入耗时监控 + new Thread(() -> { + ExecutorService pool = Executors.newFixedThreadPool(1); while (true) { Event event = null; try { @@ -247,17 +256,22 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer continue; } logger.finest(String.format("rpc-call:[%s] %s", event.topic, event.value)); - accept(event.topic, event.value); + String topic = event.topic; + String value = event.value; + pool.submit(() -> accept(topic, value)).get(5, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); + } catch (TimeoutException e) { + logger.log(Level.SEVERE, "topic[" + event.topic + "] event deal time out: " + 5 + " S, value: " + event.value, e); + pool = Executors.newFixedThreadPool(1); } catch (Exception e) { logger.log(Level.WARNING, "rpc-call[" + event.topic + "] event accept error :" + event.value, e); } } - }, 1); + }).start(); // send msg - threadBuilder.accept(() -> { + new Thread(() -> { while (true) { String msg = null; try { @@ -273,7 +287,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer logger.log(Level.WARNING, "send-msg[" + msg + "] event accept error :", e); } } - }, 1); + }).start(); }