From 895df0e05a7fe350cd413a00595ca35525078f17 Mon Sep 17 00:00:00 2001 From: lxy <237809796@qq.com> Date: Mon, 1 Mar 2021 18:50:15 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=EF=BC=9Azhub-client=20?= =?UTF-8?q?=E9=87=8D=E8=BF=9E=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- conf/application.xml | 9 +- src/com/zdemo/IProducer.java | 2 +- src/com/zdemo/kafak/KafakProducer.java | 3 +- src/com/zdemo/redis/RedisProducer.java | 7 +- src/com/zdemo/zhub/ZHubClient.java | 111 +++++++++++++------------ test/com/zdemo/test/AppTest.java | 10 +++ 6 files changed, 77 insertions(+), 65 deletions(-) diff --git a/conf/application.xml b/conf/application.xml index 1501daf..c11fbfe 100644 --- a/conf/application.xml +++ b/conf/application.xml @@ -2,16 +2,15 @@ + + + + - - - - - diff --git a/src/com/zdemo/IProducer.java b/src/com/zdemo/IProducer.java index 082da0a..b5c8329 100644 --- a/src/com/zdemo/IProducer.java +++ b/src/com/zdemo/IProducer.java @@ -5,5 +5,5 @@ import java.util.logging.Logger; public interface IProducer { Logger logger = Logger.getLogger(IProducer.class.getSimpleName()); - void publish(String topic, V v); + boolean publish(String topic, Object v); } diff --git a/src/com/zdemo/kafak/KafakProducer.java b/src/com/zdemo/kafak/KafakProducer.java index 6b3ba45..b96d620 100644 --- a/src/com/zdemo/kafak/KafakProducer.java +++ b/src/com/zdemo/kafak/KafakProducer.java @@ -40,8 +40,9 @@ public class KafakProducer implements IProducer, Service { } @Override - public void publish(String topic, V v) { + public boolean publish(String topic, Object v) { producer.send(new ProducerRecord(topic, toStr(v))); + return true; } @Override diff --git a/src/com/zdemo/redis/RedisProducer.java b/src/com/zdemo/redis/RedisProducer.java index ef5a4d3..4bc3e1c 100644 --- a/src/com/zdemo/redis/RedisProducer.java +++ b/src/com/zdemo/redis/RedisProducer.java @@ -39,17 +39,18 @@ public class RedisProducer implements IProducer, Service { } @Override - public void publish(String topic, V v) { + public boolean publish(String topic, Object v) { try { osw.write("PUBLISH " + topic + " '" + toStr(v) + "' \r\n"); osw.flush(); + return true; } catch (IOException e) { logger.log(Level.WARNING, "", e); - } + return false; } - private String toStr(V v) { + private String toStr(Object v) { if (v instanceof String) { return (String) v; } diff --git a/src/com/zdemo/zhub/ZHubClient.java b/src/com/zdemo/zhub/ZHubClient.java index a354ed2..203235a 100644 --- a/src/com/zdemo/zhub/ZHubClient.java +++ b/src/com/zdemo/zhub/ZHubClient.java @@ -65,7 +65,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer groupid = config.getValue("groupid", groupid); } - if (!initSocket()) { + if (!initSocket(0)) { return; } // 消息 事件接收 @@ -73,14 +73,8 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer while (true) { try { String readLine = reader.readLine(); - if (readLine == null) { // 连接中断 处理 - while (!initSocket()) { - try { - Thread.sleep(1000 * 5); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } + if (readLine == null && initSocket(Integer.MAX_VALUE)) { // 连接中断 处理 + continue; } String type = ""; @@ -113,15 +107,8 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer timerQueue.put(timerMap.get(topic)); } } catch (IOException e) { - logger.log(Level.WARNING, "reconnection ", e.getMessage()); if (e instanceof SocketException) { - while (!initSocket()) { - try { - Thread.sleep(1000 * 5); - } catch (InterruptedException interruptedException) { - interruptedException.printStackTrace(); - } - } + initSocket(Integer.MAX_VALUE); } } catch (InterruptedException e) { e.printStackTrace(); @@ -170,7 +157,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer // --------------------- // 消息发送 - private void send(String... data) { + private boolean send(String... data) { try { lock.lock(); if (data.length == 1) { @@ -183,54 +170,68 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer } } writer.flush(); + return true; } catch (IOException e) { logger.log(Level.WARNING, "", e); } finally { lock.unlock(); } + return false; } - private String toStr(V v) { + private String toStr(Object v) { if (v instanceof String) { return (String) v; } return JsonConvert.root().convertTo(v); } - protected boolean initSocket() { - try { - client = new Socket(); - client.connect(new InetSocketAddress(host, port)); - client.setKeepAlive(true); + protected boolean initSocket(int retry) { + for (int i = 0; i <= retry; i++) { + try { + client = new Socket(); + client.connect(new InetSocketAddress(host, port)); + client.setKeepAlive(true); - writer = client.getOutputStream(); - reader = new BufferedReader(new InputStreamReader(client.getInputStream())); + writer = client.getOutputStream(); + reader = new BufferedReader(new InputStreamReader(client.getInputStream())); - String groupid = getGroupid(); - if (groupid == null || groupid.isEmpty()) { - throw new RuntimeException("ZHubClient groupid can not is empty"); + String groupid = getGroupid(); + if (groupid == null || groupid.isEmpty()) { + throw new RuntimeException("ZHubClient groupid can not is empty"); + } + send("groupid " + groupid); + + StringBuffer buf = new StringBuffer("subscribe"); + for (String topic : getTopics()) { + buf.append(" ").append(topic); + } + send(buf.toString()); + + // 重连 timer 订阅 + timerMap.forEach((name, timer) -> { + send("timer", name); + }); + if (retry > 0) { + logger.log(Level.WARNING, String.format("ZHubClient[%s][%s] %s Succeed!", getGroupid(), i + 1, retry > 0 ? "reconnection" : "init")); + } else { + logger.log(Level.FINE, String.format("ZHubClient[%s] %s Succeed!", getGroupid(), retry > 0 ? "reconnection" : "init")); + } + return true; + } catch (Exception e) { + if (retry == 0 || i > 0) { + logger.log(Level.WARNING, String.format("ZHubClient[%s] %s Failed 初始化失败!", getGroupid(), retry == 0 ? "init" : "reconnection"), e); + } else { + logger.log(Level.WARNING, String.format("ZHubClient[%s][%s] reconnection Failed!", getGroupid(), i + 1)); + try { + Thread.sleep(1000 * 5); + } catch (InterruptedException interruptedException) { + interruptedException.printStackTrace(); + } + } } - send("groupid " + groupid); - - StringBuffer buf = new StringBuffer("subscribe"); - for (String topic : getTopics()) { - buf.append(" ").append(topic); - } - send(buf.toString()); - - // 重连 timer 订阅 - timerMap.forEach((name, timer) -> { - send("timer", name); - }); - } catch (IOException e) { - logger.log(Level.WARNING, "Zdb Consumer 初始化失败!", e); - return false; - } catch (Exception e) { - logger.log(Level.WARNING, "Zdb Consumer 初始化失败!", e); - return false; } - - return true; + return false; } @Override @@ -239,21 +240,21 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer super.removeEventType(topic); } - public void publish(String topic, V v) { - send("publish", topic, toStr(v)); + public boolean publish(String topic, Object v) { + return send("publish", topic, toStr(v)); } - public void broadcast(String topic, V v) { + public void broadcast(String topic, Object v) { send("broadcast", topic, toStr(v)); } // 发送 publish 主题消息,若多次发送的 topic + "-" + value 相同,将会做延时重置 - public void delay(String topic, V v, int delay) { + public void delay(String topic, Object v, int delay) { send("delay", topic, toStr(v), String.valueOf(delay)); } // 表达式支持:d+[d,H,m,s] - public void delay(String topic, V v, String delayExpr) { + public void delay(String topic, Object v, String delayExpr) { String endchar = ""; int delay; if (delayExpr.matches("^\\d+[d,H,m,s]$")) { @@ -326,6 +327,6 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer } public void reloadTimer() { - send("cmd", "reload-timer-config"); + send("cmd", "reload-timer"); } } diff --git a/test/com/zdemo/test/AppTest.java b/test/com/zdemo/test/AppTest.java index ffe5c33..0d0ad40 100644 --- a/test/com/zdemo/test/AppTest.java +++ b/test/com/zdemo/test/AppTest.java @@ -1,12 +1,15 @@ package com.zdemo.test; +import com.zdemo.Event; import com.zdemo.IProducer; import org.junit.Test; import org.redkale.boot.Application; +import org.redkale.convert.json.JsonConvert; import org.redkale.util.Utility; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -305,4 +308,11 @@ public class AppTest { } return 0; } + + @Test + public void testxx() { + Event of = Event.of("A", Map.of("b", 1)); + + System.out.println(JsonConvert.root().convertTo(of)); + } }