diff --git a/src/com/zdemo/kafak/KafakConsumer.java b/src/com/zdemo/kafak/KafakConsumer.java index fb975ad..ee0e85f 100644 --- a/src/com/zdemo/kafak/KafakConsumer.java +++ b/src/com/zdemo/kafak/KafakConsumer.java @@ -42,9 +42,6 @@ public abstract class KafakConsumer extends AbstractConsumer implements IConsume @Override public void addEventType(EventType... eventTypes) { super.addEventType(eventTypes); - - // 增加变更标记 - queue.add(() -> logger.info("KafakConsumer add new topic!")); } @Override diff --git a/src/com/zdemo/zdb/ZHubClient.java b/src/com/zdemo/zdb/ZHubClient.java index abdac9b..1754921 100644 --- a/src/com/zdemo/zdb/ZHubClient.java +++ b/src/com/zdemo/zdb/ZHubClient.java @@ -15,7 +15,9 @@ import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.logging.Level; import java.util.logging.Logger; @@ -32,20 +34,30 @@ public abstract class ZHubClient extends AbstractConsumer implements IConsumer, private int port = 1216; private ReentrantLock lock = new ReentrantLock(); - - private Socket client; private OutputStream writer; private BufferedReader reader; + private final LinkedBlockingQueue timerQueue = new LinkedBlockingQueue<>(); + private final LinkedBlockingQueue> topicQueue = new LinkedBlockingQueue<>(); + + private BiConsumer threadBuilder = (r, n) -> { + for (int i = 0; i < n; i++) { + new Thread(() -> r.run()).start(); + } + }; + @Override public void init(AnyValue config) { if (!preInit()) { return; } - boolean flag = initSocket(); + if (!initSocket()) { + return; + } + // 消息 事件接收 new Thread(() -> { - while (flag) { + while (true) { try { String readLine = reader.readLine(); if (readLine == null) { // 连接中断 处理 @@ -71,11 +83,8 @@ public abstract class ZHubClient extends AbstractConsumer implements IConsumer, reader.readLine(); //$n len(value) String value = reader.readLine(); // value - try { - accept(topic, value); - } catch (Exception e) { - logger.log(Level.WARNING, "topic[" + topic + "] event accept error :" + value, e); - } + + topicQueue.put(Event.of(topic, value)); } // timer 消息 @@ -88,10 +97,7 @@ public abstract class ZHubClient extends AbstractConsumer implements IConsumer, reader.readLine(); //$n len(key) String topic = reader.readLine(); // name - Timer timer = timerMap.get(topic); - if (timer != null) { - timer.runnable.run(); - } + timerQueue.put(timerMap.get(topic)); } } catch (IOException e) { logger.log(Level.WARNING, "reconnection ", e.getMessage()); @@ -104,9 +110,49 @@ public abstract class ZHubClient extends AbstractConsumer implements IConsumer, } } } + } catch (InterruptedException e) { + e.printStackTrace(); } } }).start(); + + // 定时调度事件 + threadBuilder.accept(() -> { + while (true) { + Timer timer = null; + try { + if ((timer = timerQueue.take()) == null) { + return; + } + long start = System.currentTimeMillis(); + timer.runnable.run(); + long end = System.currentTimeMillis(); + logger.finest(String.format("timer [%s] : elapsed time %s ms", timer.name, end - start)); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (Exception e) { + logger.log(Level.WARNING, "timer [" + timer.name + "]", e); + } + } + }, 2); + + threadBuilder.accept(() -> { + while (true) { + Event event = null; + try { + if ((event = topicQueue.take()) == null) { + continue; + } + + accept(event.topic, event.value); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (Exception e) { + logger.log(Level.WARNING, "topic[" + event.topic + "] event accept error :" + event.value, e); + } + } + }, 1); + } // --------------------- @@ -226,7 +272,7 @@ public abstract class ZHubClient extends AbstractConsumer implements IConsumer, public void publish(String topic, V v) { send("publish", topic, toStr(v)); } - + public void broadcast(String topic, V v) { send("broadcast", topic, toStr(v)); } diff --git a/test/com/zdemo/test/AppTest.java b/test/com/zdemo/test/AppTest.java index 416ff2f..9728d32 100644 --- a/test/com/zdemo/test/AppTest.java +++ b/test/com/zdemo/test/AppTest.java @@ -28,6 +28,15 @@ public class AppTest { consumer.timer("a", () -> { System.out.println(Utility.now() + " timer a 执行了"); + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + + consumer.timer("b", () -> { + System.out.println(Utility.now() + " ----------------- timer b 执行了"); });