From d2e13df56e98944c01a1f35cae236eda414374b8 Mon Sep 17 00:00:00 2001 From: lxy <237809796@qq.com> Date: Fri, 8 Jan 2021 19:51:36 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=EF=BC=9Aztimer=20=E5=AE=A2?= =?UTF-8?q?=E6=88=B7=E7=AB=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../{ZdbConsumer.java => ZHubConsumer.java} | 85 ++++++++++++++----- .../{ZdbProducer.java => ZHubProducer.java} | 38 ++++++--- test/com/zdemo/test/AppTest.java | 13 +-- test/com/zdemo/test/MyConsumer.java | 4 +- 4 files changed, 96 insertions(+), 44 deletions(-) rename src/com/zdemo/zdb/{ZdbConsumer.java => ZHubConsumer.java} (65%) rename src/com/zdemo/zdb/{ZdbProducer.java => ZHubProducer.java} (61%) diff --git a/src/com/zdemo/zdb/ZdbConsumer.java b/src/com/zdemo/zdb/ZHubConsumer.java similarity index 65% rename from src/com/zdemo/zdb/ZdbConsumer.java rename to src/com/zdemo/zdb/ZHubConsumer.java index af34976..8a6bbd2 100644 --- a/src/com/zdemo/zdb/ZdbConsumer.java +++ b/src/com/zdemo/zdb/ZHubConsumer.java @@ -14,9 +14,11 @@ import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; -public class ZdbConsumer extends AbstractConsumer implements IConsumer, Service { +public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer, Service { @Resource(name = "property.zdb.host") private String host = "39.108.56.246"; @@ -25,8 +27,11 @@ public class ZdbConsumer extends AbstractConsumer implements IConsumer, Service @Resource(name = "property.zdb.port") private int port = 1216; + private ReentrantLock lock = new ReentrantLock(); + + private Socket client; - private OutputStream os; + private OutputStream writer; private BufferedReader reader; @Override @@ -39,6 +44,8 @@ public class ZdbConsumer extends AbstractConsumer implements IConsumer, Service try { String readLine = reader.readLine(); String type = ""; + + // 主题订阅消息 if ("*3".equals(readLine)) { readLine = reader.readLine(); // $7 len() type = reader.readLine(); // message @@ -52,6 +59,20 @@ public class ZdbConsumer extends AbstractConsumer implements IConsumer, Service value = reader.readLine(); // value accept(topic, value); } + + // timer 消息 + if ("*2".equals(readLine)) { + readLine = reader.readLine(); // $7 len() + type = reader.readLine(); // message + if (!"timer".equals(type)) { + continue; + } + reader.readLine(); //$n len(key) + topic = reader.readLine(); // name + + + accept(topic, value); + } } catch (IOException e) { logger.log(Level.WARNING, "reconnection ", e.getMessage()); if (e instanceof SocketException) { @@ -70,23 +91,48 @@ public class ZdbConsumer extends AbstractConsumer implements IConsumer, Service }).start(); } + // --------------------- + // 消息发送类 + private void send(String... data) { + try { + lock.lock(); + if (data.length == 1) { + writer.write((data[0] + "\r\n").getBytes()); + } else if (data.length > 1) { + writer.write(("*" + data.length + "\r\n").getBytes()); + for (String d : data) { + writer.write(("$" + d.length() + "\r\n").getBytes()); + writer.write((d + "\r\n").getBytes()); + } + } + writer.flush(); + } catch (IOException e) { + logger.log(Level.WARNING, "", e); + } finally { + lock.unlock(); + } + } + public boolean initSocket() { try { client = new Socket(); client.connect(new InetSocketAddress(host, port)); client.setKeepAlive(true); - os = client.getOutputStream(); + writer = client.getOutputStream(); + reader = new BufferedReader(new InputStreamReader(client.getInputStream())); + + send("groupid " + getGroupid()); StringBuffer buf = new StringBuffer("subscribe"); for (String topic : getTopics()) { buf.append(" ").append(topic); } buf.append("\r\n"); - os.write(buf.toString().getBytes()); - os.flush(); - reader = new BufferedReader(new InputStreamReader(client.getInputStream())); + // todo: 重连 timer 订阅, 需要 + + send(buf.toString()); } catch (IOException e) { logger.log(Level.WARNING, "Zdb Consumer 初始化失败!", e); return false; @@ -95,11 +141,6 @@ public class ZdbConsumer extends AbstractConsumer implements IConsumer, Service return true; } - @Override - public String getGroupid() { - return null; - } - @Override public void addEventType(EventType... eventType) { for (EventType type : eventType) { @@ -111,23 +152,21 @@ public class ZdbConsumer extends AbstractConsumer implements IConsumer, Service eventMap.put(topic, type); //新增订阅 - try { - os.write(("subscribe " + topic + "\r\n").getBytes()); - os.flush(); - } catch (IOException e) { - logger.log(Level.WARNING, "", e); - } + send("subscribe " + topic); } } } @Override public void unsubscribe(String topic) { - try { - os.write(("unsubscribe " + topic + "\r\n").getBytes()); - os.flush(); - } catch (IOException e) { - logger.log(Level.WARNING, "", e); - } + send("unsubscribe " + topic); + } + + // timer + private ConcurrentHashMap timerMap = new ConcurrentHashMap(); + + public void timer(String name, String expr, Runnable run) { + timerMap.put(name, run); + send("timer", name, expr); } } diff --git a/src/com/zdemo/zdb/ZdbProducer.java b/src/com/zdemo/zdb/ZHubProducer.java similarity index 61% rename from src/com/zdemo/zdb/ZdbProducer.java rename to src/com/zdemo/zdb/ZHubProducer.java index 98bc2d3..5b4c691 100644 --- a/src/com/zdemo/zdb/ZdbProducer.java +++ b/src/com/zdemo/zdb/ZHubProducer.java @@ -11,9 +11,10 @@ import java.io.IOException; import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; -public class ZdbProducer implements IProducer, Service { +public class ZHubProducer implements IProducer, Service { @Resource(name = "property.zdb.host") private String host = "39.108.56.246"; @@ -22,6 +23,8 @@ public class ZdbProducer implements IProducer, Service { @Resource(name = "property.zdb.port") private int port = 1216; + private ReentrantLock lock = new ReentrantLock(); + private OutputStream os; @Override @@ -38,22 +41,29 @@ public class ZdbProducer implements IProducer, Service { @Override public void send(T t) { - try { - String v = JsonConvert.root().convertTo(t.value); - if (v.startsWith("\"") && v.endsWith("\"")) { - v = v.substring(1, v.length() - 1); - } + String v = JsonConvert.root().convertTo(t.value); + if (v.startsWith("\"") && v.endsWith("\"")) { + v = v.substring(1, v.length() - 1); + } + send("publish", t.topic, v); + } - os.write("*3\r\n".getBytes()); - os.write("$7\r\n".getBytes()); - os.write("publish\r\n".getBytes()); - os.write(("$" + t.topic.length() + "\r\n").getBytes()); - os.write((t.topic + "\r\n").getBytes()); - os.write(("$" + v.length() + "\r\n").getBytes()); - os.write((v + "\r\n").getBytes()); - os.flush(); + private void send(String... data) { + try { + lock.lock(); + if (data.length == 1) { + os.write((data[0] + "\r\n").getBytes()); + } else if (data.length > 1) { + os.write(("*" + data.length + "\r\n").getBytes()); + for (String d : data) { + os.write(("$" + d.length() + "\r\n").getBytes()); + os.write((d + "\r\n").getBytes()); + } + } } catch (IOException e) { logger.log(Level.WARNING, "", e); + } finally { + lock.unlock(); } } diff --git a/test/com/zdemo/test/AppTest.java b/test/com/zdemo/test/AppTest.java index 6313252..2fdfa0b 100644 --- a/test/com/zdemo/test/AppTest.java +++ b/test/com/zdemo/test/AppTest.java @@ -4,7 +4,7 @@ import com.zdemo.Event; import com.zdemo.EventType; import com.zdemo.IConsumer; import com.zdemo.IProducer; -import com.zdemo.zdb.ZdbProducer; +import com.zdemo.zdb.ZHubProducer; import org.junit.Test; import org.redkale.boot.Application; @@ -26,7 +26,7 @@ public class AppTest { public void runConsumer() { try { //启动并开启消费监听 - IConsumer consumer = Application.singleton(MyConsumer.class); + MyConsumer consumer = Application.singleton(MyConsumer.class); consumer.addEventType( EventType.of("a", str -> { @@ -38,6 +38,10 @@ public class AppTest { }) ); + consumer.timer("a", "* * * * * *", () -> { + System.out.println("timer a 执行了"); + }); + Thread.sleep(60_000 * 60); } catch (Exception e) { @@ -48,7 +52,7 @@ public class AppTest { @Test public void runProducer() { try { - IProducer producer = Application.singleton(ZdbProducer.class); + IProducer producer = Application.singleton(ZHubProducer.class); // 发送不同的 事件 float v0 = 1f; @@ -79,7 +83,6 @@ public class AppTest { @Test public void t() { - List list = new ArrayList<>(); list.toArray(String[]::new); @@ -129,7 +132,7 @@ public class AppTest { public void yy() { IProducer producer = null; try { - producer = Application.singleton(ZdbProducer.class); + producer = Application.singleton(ZHubProducer.class); for (int i = 0; i < 100; i++) { diff --git a/test/com/zdemo/test/MyConsumer.java b/test/com/zdemo/test/MyConsumer.java index 920728d..e57013e 100644 --- a/test/com/zdemo/test/MyConsumer.java +++ b/test/com/zdemo/test/MyConsumer.java @@ -1,8 +1,8 @@ package com.zdemo.test; -import com.zdemo.zdb.ZdbConsumer; +import com.zdemo.zdb.ZHubConsumer; -public class MyConsumer extends ZdbConsumer { +public class MyConsumer extends ZHubConsumer { public String getGroupid() { return "group-test"; //消费组名称