From c0a1c60cb699870507a9e6ecb8173e06cea86288 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=A2=81=E6=98=BE=E4=BC=98?= <237809796@qq.com> Date: Thu, 25 May 2023 17:53:37 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=EF=BC=9A1=E3=80=81ZhubClient?= =?UTF-8?q?=20=E6=9E=84=E9=80=A0=E6=96=B9=E6=B3=95=EF=BC=8C=E6=94=AF?= =?UTF-8?q?=E6=8C=81=20new=20=E7=9A=84=E6=96=B9=E5=BC=8F=E5=88=9D=E5=A7=8B?= =?UTF-8?q?=E5=8C=96=E5=AE=9E=E4=BE=8B=EF=BC=88=E4=B8=80=E8=88=AC=E5=9C=A8?= =?UTF-8?q?=E6=B5=8B=E8=AF=95=E6=97=B6=E4=BD=BF=E7=94=A8=EF=BC=892?= =?UTF-8?q?=E3=80=81initClient=20=E6=96=B9=E6=B3=95=E9=80=BB=E8=BE=91?= =?UTF-8?q?=E5=BE=AE=E8=B0=83=EF=BC=8C=E7=A1=AE=E4=BF=9D=E5=AE=9E=E4=BE=8B?= =?UTF-8?q?=E5=AE=8C=E6=88=90=E5=88=9D=E5=A7=8B=E5=8C=96=E5=9C=A8=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=E9=98=9F=E5=88=97=E6=95=B0=E6=8D=AE=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/net/tccn/zhub/ZHubClient.java | 369 +++++++++++++++--------------- 1 file changed, 190 insertions(+), 179 deletions(-) diff --git a/src/net/tccn/zhub/ZHubClient.java b/src/net/tccn/zhub/ZHubClient.java index de3150f..702e209 100644 --- a/src/net/tccn/zhub/ZHubClient.java +++ b/src/net/tccn/zhub/ZHubClient.java @@ -56,10 +56,19 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer };*/ private static Map mainHub = new HashMap<>(); // 127.0.0.1:1216 - ZHubClient -/* + public ZHubClient() { - logger.info("ZHubClient:" + (application != null ? application.getName() : "NULL")); - }*/ + + } + + public ZHubClient(String name, Map attr) { + this.APP_NAME = name; + this.addr = attr.get("addr"); + this.groupid = attr.get("groupid"); + this.auth = attr.get("auth"); + + this.initClient(null); + } @Override public void init(AnyValue config) { @@ -99,215 +108,217 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer mainHub.put(addr, this); } - // 消息 事件接收 - new Thread(() -> { - if (!initSocket(0)) { - return; - } + CompletableFuture.runAsync(() -> { + // 消息 事件接收 + new Thread(() -> { + if (!initSocket(0)) { + return; + } - while (true) { - try { - String readLine = reader.readLine(); - if (readLine == null && initSocket(Integer.MAX_VALUE)) { // 连接中断 处理 - continue; - } - - String type = ""; - - // +ping - if ("+ping".equals(readLine)) { - send("+pong"); - continue; - } - - // 主题订阅消息 - if ("*3".equals(readLine)) { - readLine = reader.readLine(); // $7 len() - type = reader.readLine(); // message - if (!"message".equals(type)) { + while (true) { + try { + String readLine = reader.readLine(); + if (readLine == null && initSocket(Integer.MAX_VALUE)) { // 连接中断 处理 continue; } - reader.readLine(); //$n len(key) - String topic = reader.readLine(); // topic - String lenStr = reader.readLine();//$n len(value) - int clen = 0; - if (lenStr.startsWith("$")) { - clen = Integer.parseInt(lenStr.replace("$", "")); + String type = ""; + + // +ping + if ("+ping".equals(readLine)) { + send("+pong"); + continue; } - String value = ""; - do { - if (value.length() > 0) { - value += "\r\n"; + // 主题订阅消息 + if ("*3".equals(readLine)) { + readLine = reader.readLine(); // $7 len() + type = reader.readLine(); // message + if (!"message".equals(type)) { + continue; } - String s = reader.readLine(); - value += s; // value - } while (clen > 0 && clen > strLength(value)); + reader.readLine(); //$n len(key) + String topic = reader.readLine(); // topic - logger.finest("topic[" + topic + "]: " + value); + String lenStr = reader.readLine();//$n len(value) + int clen = 0; + if (lenStr.startsWith("$")) { + clen = Integer.parseInt(lenStr.replace("$", "")); + } - // lock msg - if ("lock".equals(topic)) { - Lock lock = lockTag.get(value); - if (lock != null) { - synchronized (lock) { - lock.notifyAll(); + String value = ""; + do { + if (value.length() > 0) { + value += "\r\n"; } + String s = reader.readLine(); + value += s; // value + } while (clen > 0 && clen > strLength(value)); + + logger.finest("topic[" + topic + "]: " + value); + + // lock msg + if ("lock".equals(topic)) { + Lock lock = lockTag.get(value); + if (lock != null) { + synchronized (lock) { + lock.notifyAll(); + } + } + continue; } - continue; - } - // rpc back msg - if (APP_NAME.equals(topic)) { - rpcBackQueue.add(Event.of(topic, value)); + // rpc back msg + if (APP_NAME.equals(topic)) { + rpcBackQueue.add(Event.of(topic, value)); + continue; + } + + // rpc call msg + if (rpcTopics.contains(topic)) { + rpcCallQueue.add(Event.of(topic, value)); + continue; + } + + // oth msg + topicQueue.add(Event.of(topic, value)); continue; } - // rpc call msg - if (rpcTopics.contains(topic)) { - rpcCallQueue.add(Event.of(topic, value)); + // timer 消息 + if ("*2".equals(readLine)) { + readLine = reader.readLine(); // $7 len() + type = reader.readLine(); // message + if (!"timer".equals(type)) { + continue; + } + reader.readLine(); //$n len(key) + String topic = reader.readLine(); // name + + logger.finest("timer[" + topic + "]: "); + timerQueue.add(timerMap.get(topic)); continue; } - // oth msg - topicQueue.add(Event.of(topic, value)); - continue; + logger.finest(readLine); + } catch (IOException e) { + if (e instanceof SocketException) { + initSocket(Integer.MAX_VALUE); + } + e.printStackTrace(); } + } + }).start(); + }).thenAcceptAsync(x -> { + // 定时调度事件,已加入耗时监控 + new Thread(() -> { + ExecutorService pool = Executors.newFixedThreadPool(1); + while (true) { + Timer timer = null; + try { + if ((timer = timerQueue.take()) == null) { + return; + } + long start = System.currentTimeMillis(); + pool.submit(timer.runnable).get(5, TimeUnit.SECONDS); + long end = System.currentTimeMillis(); + logger.finest(String.format("timer [%s] : elapsed time %s ms", timer.name, end - start)); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (TimeoutException e) { + logger.log(Level.SEVERE, "timer [" + timer.name + "] time out: " + 5 + " S", e); + pool = Executors.newFixedThreadPool(1); + } catch (Exception e) { + logger.log(Level.WARNING, "timer [" + timer.name + "]", e); + } + } + }).start(); - // timer 消息 - if ("*2".equals(readLine)) { - readLine = reader.readLine(); // $7 len() - type = reader.readLine(); // message - if (!"timer".equals(type)) { + // topic msg,已加入耗时监控 + new Thread(() -> { + ExecutorService pool = Executors.newFixedThreadPool(1); + while (true) { + Event event = null; + try { + if ((event = topicQueue.take()) == null) { continue; } - reader.readLine(); //$n len(key) - String topic = reader.readLine(); // name - logger.finest("timer[" + topic + "]: "); - timerQueue.add(timerMap.get(topic)); - continue; + String topic = event.topic; + String value = event.value; + pool.submit(() -> accept(topic, value)).get(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (TimeoutException e) { + logger.log(Level.SEVERE, "topic[" + event.topic + "] event deal time out: " + 5 + " S, value: " + event.value, e); + pool = Executors.newFixedThreadPool(1); + } catch (Exception e) { + logger.log(Level.WARNING, "topic[" + event.topic + "] event accept error :" + event.value, e); } - - logger.finest(readLine); - } catch (IOException e) { - if (e instanceof SocketException) { - initSocket(Integer.MAX_VALUE); - } - e.printStackTrace(); } - } - }).start(); + }).start(); - // 定时调度事件,已加入耗时监控 - new Thread(() -> { - ExecutorService pool = Executors.newFixedThreadPool(1); - while (true) { - Timer timer = null; - try { - if ((timer = timerQueue.take()) == null) { - return; + // rpc back ,仅做数据解析,暂无耗时监控 + new Thread(() -> { + while (true) { + Event event = null; + try { + if ((event = rpcBackQueue.take()) == null) { + continue; + } + //if (event) + logger.finest(String.format("rpc-back:[%s]: %s", event.topic, event.value)); + rpcAccept(event.value); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (Exception e) { + logger.log(Level.WARNING, "rpc-back[" + event.topic + "] event accept error :" + event.value, e); } - long start = System.currentTimeMillis(); - pool.submit(timer.runnable).get(5, TimeUnit.SECONDS); - long end = System.currentTimeMillis(); - logger.finest(String.format("timer [%s] : elapsed time %s ms", timer.name, end - start)); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (TimeoutException e) { - logger.log(Level.SEVERE, "timer [" + timer.name + "] time out: " + 5 + " S", e); - pool = Executors.newFixedThreadPool(1); - } catch (Exception e) { - logger.log(Level.WARNING, "timer [" + timer.name + "]", e); } - } - }).start(); + }).start(); - // topic msg,已加入耗时监控 - new Thread(() -> { - ExecutorService pool = Executors.newFixedThreadPool(1); - while (true) { - Event event = null; - try { - if ((event = topicQueue.take()) == null) { - continue; + // rpc call,已加入耗时监控 + new Thread(() -> { + ExecutorService pool = Executors.newFixedThreadPool(1); + while (true) { + Event event = null; + try { + if ((event = rpcCallQueue.take()) == null) { + continue; + } + logger.finest(String.format("rpc-call:[%s] %s", event.topic, event.value)); + String topic = event.topic; + String value = event.value; + pool.submit(() -> accept(topic, value)).get(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (TimeoutException e) { + logger.log(Level.SEVERE, "topic[" + event.topic + "] event deal time out: " + 5 + " S, value: " + event.value, e); + pool = Executors.newFixedThreadPool(1); + } catch (Exception e) { + logger.log(Level.WARNING, "rpc-call[" + event.topic + "] event accept error :" + event.value, e); } - - String topic = event.topic; - String value = event.value; - pool.submit(() -> accept(topic, value)).get(5, TimeUnit.SECONDS); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (TimeoutException e) { - logger.log(Level.SEVERE, "topic[" + event.topic + "] event deal time out: " + 5 + " S, value: " + event.value, e); - pool = Executors.newFixedThreadPool(1); - } catch (Exception e) { - logger.log(Level.WARNING, "topic[" + event.topic + "] event accept error :" + event.value, e); } - } - }).start(); + }).start(); - // rpc back ,仅做数据解析,暂无耗时监控 - new Thread(() -> { - while (true) { - Event event = null; - try { - if ((event = rpcBackQueue.take()) == null) { - continue; + // send msg + new Thread(() -> { + while (true) { + String msg = null; + try { + if ((msg = sendMsgQueue.take()) == null) { + continue; + } + // logger.log(Level.FINEST, "send-msg: [" + msg + "]"); + writer.write(msg.getBytes()); + writer.flush(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (Exception e) { + logger.log(Level.WARNING, "send-msg[" + msg + "] event accept error :", e); } - //if (event) - logger.finest(String.format("rpc-back:[%s]: %s", event.topic, event.value)); - rpcAccept(event.value); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (Exception e) { - logger.log(Level.WARNING, "rpc-back[" + event.topic + "] event accept error :" + event.value, e); } - } - }).start(); - - // rpc call,已加入耗时监控 - new Thread(() -> { - ExecutorService pool = Executors.newFixedThreadPool(1); - while (true) { - Event event = null; - try { - if ((event = rpcCallQueue.take()) == null) { - continue; - } - logger.finest(String.format("rpc-call:[%s] %s", event.topic, event.value)); - String topic = event.topic; - String value = event.value; - pool.submit(() -> accept(topic, value)).get(5, TimeUnit.SECONDS); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (TimeoutException e) { - logger.log(Level.SEVERE, "topic[" + event.topic + "] event deal time out: " + 5 + " S, value: " + event.value, e); - pool = Executors.newFixedThreadPool(1); - } catch (Exception e) { - logger.log(Level.WARNING, "rpc-call[" + event.topic + "] event accept error :" + event.value, e); - } - } - }).start(); - - // send msg - new Thread(() -> { - while (true) { - String msg = null; - try { - if ((msg = sendMsgQueue.take()) == null) { - continue; - } - // logger.log(Level.FINEST, "send-msg: [" + msg + "]"); - writer.write(msg.getBytes()); - writer.flush(); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (Exception e) { - logger.log(Level.WARNING, "send-msg[" + msg + "] event accept error :", e); - } - } - }).start(); + }).start(); + }); return this; }