diff --git a/src/com/zdemo/zhub/ZHubClient.java b/src/com/zdemo/zhub/ZHubClient.java index 2a7e6f8..a86b606 100644 --- a/src/com/zdemo/zhub/ZHubClient.java +++ b/src/com/zdemo/zhub/ZHubClient.java @@ -16,10 +16,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.function.BiConsumer; +import java.util.concurrent.*; import java.util.function.Consumer; import java.util.function.Function; import java.util.logging.Level; @@ -44,11 +41,11 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer private final LinkedBlockingQueue> rpcCallQueue = new LinkedBlockingQueue<>(); // RPC CALL MSG private final LinkedBlockingQueue sendMsgQueue = new LinkedBlockingQueue<>(); // SEND MSG - private BiConsumer threadBuilder = (r, n) -> { + /*private BiConsumer threadBuilder = (r, n) -> { for (int i = 0; i < n; i++) { new Thread(() -> r.run()).start(); } - }; + };*/ /*private static boolean isFirst = true; private boolean isMain = false;*/ @@ -182,8 +179,9 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer } }).start(); - // 定时调度事件 - threadBuilder.accept(() -> { + // 定时调度事件,已加入耗时监控 + new Thread(() -> { + ExecutorService pool = Executors.newFixedThreadPool(1); while (true) { Timer timer = null; try { @@ -191,36 +189,46 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer return; } long start = System.currentTimeMillis(); - timer.runnable.run(); + pool.submit(timer.runnable).get(5, TimeUnit.SECONDS); long end = System.currentTimeMillis(); logger.finest(String.format("timer [%s] : elapsed time %s ms", timer.name, end - start)); } catch (InterruptedException e) { e.printStackTrace(); + } catch (TimeoutException e) { + logger.log(Level.SEVERE, "timer [" + timer.name + "] time out: " + 5 + " S", e); + pool = Executors.newFixedThreadPool(1); } catch (Exception e) { logger.log(Level.WARNING, "timer [" + timer.name + "]", e); } } - }, 1); + }).start(); - // topic msg - threadBuilder.accept(() -> { + // topic msg,已加入耗时监控 + new Thread(() -> { + ExecutorService pool = Executors.newFixedThreadPool(1); while (true) { Event event = null; try { if ((event = topicQueue.take()) == null) { continue; } - accept(event.topic, event.value); + + String topic = event.topic; + String value = event.value; + pool.submit(() -> accept(topic, value)).get(5, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); + } catch (TimeoutException e) { + logger.log(Level.SEVERE, "topic[" + event.topic + "] event deal time out: " + 5 + " S, value: " + event.value, e); + pool = Executors.newFixedThreadPool(1); } catch (Exception e) { logger.log(Level.WARNING, "topic[" + event.topic + "] event accept error :" + event.value, e); } } - }, 16); + }).start(); - // rpc back - threadBuilder.accept(() -> { + // rpc back ,仅做数据解析,暂无耗时监控 + new Thread(() -> { while (true) { Event event = null; try { @@ -236,10 +244,11 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer logger.log(Level.WARNING, "rpc-back[" + event.topic + "] event accept error :" + event.value, e); } } - }, 1); + }).start(); - // rpc call - threadBuilder.accept(() -> { + // rpc call,已加入耗时监控 + new Thread(() -> { + ExecutorService pool = Executors.newFixedThreadPool(1); while (true) { Event event = null; try { @@ -247,17 +256,22 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer continue; } logger.finest(String.format("rpc-call:[%s] %s", event.topic, event.value)); - accept(event.topic, event.value); + String topic = event.topic; + String value = event.value; + pool.submit(() -> accept(topic, value)).get(5, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); + } catch (TimeoutException e) { + logger.log(Level.SEVERE, "topic[" + event.topic + "] event deal time out: " + 5 + " S, value: " + event.value, e); + pool = Executors.newFixedThreadPool(1); } catch (Exception e) { logger.log(Level.WARNING, "rpc-call[" + event.topic + "] event accept error :" + event.value, e); } } - }, 1); + }).start(); // send msg - threadBuilder.accept(() -> { + new Thread(() -> { while (true) { String msg = null; try { @@ -273,7 +287,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer logger.log(Level.WARNING, "send-msg[" + msg + "] event accept error :", e); } } - }, 1); + }).start(); }