From 1d6ae86cbd86a2ed70f881ab214127f0045a7769 Mon Sep 17 00:00:00 2001 From: lxy <237809796@qq.com> Date: Mon, 11 Jan 2021 18:23:54 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=EF=BC=9Aztimer-cli?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- conf/config.properties | 7 +++- src/com/zdemo/zdb/ZHubConsumer.java | 60 +++++++++++++++++++++++------ test/com/zdemo/test/AppTest.java | 13 +++---- 3 files changed, 60 insertions(+), 20 deletions(-) diff --git a/conf/config.properties b/conf/config.properties index d256608..ff1a7b4 100644 --- a/conf/config.properties +++ b/conf/config.properties @@ -7,5 +7,8 @@ redis.port=6064 pulsar.serviceurl=pulsar://47.113.228.247:6650 # zdb -zdb.host = 127.0.0.1 -zdb.port = 1216 \ No newline at end of file +# zdb.host = 127.0.0.1 +# zdb.port = 1216 + +zhub.host = 47.111.150.118 +zhub.port = 6066 \ No newline at end of file diff --git a/src/com/zdemo/zdb/ZHubConsumer.java b/src/com/zdemo/zdb/ZHubConsumer.java index 8a6bbd2..0fa8e1f 100644 --- a/src/com/zdemo/zdb/ZHubConsumer.java +++ b/src/com/zdemo/zdb/ZHubConsumer.java @@ -20,11 +20,11 @@ import java.util.logging.Level; public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer, Service { - @Resource(name = "property.zdb.host") - private String host = "39.108.56.246"; - @Resource(name = "property.zdb.password") + @Resource(name = "property.zhub.host") + private String host = "127.0.0.1"; + @Resource(name = "property.zhub.password") private String password = ""; - @Resource(name = "property.zdb.port") + @Resource(name = "property.zhub.port") private int port = 1216; private ReentrantLock lock = new ReentrantLock(); @@ -70,8 +70,10 @@ public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer reader.readLine(); //$n len(key) topic = reader.readLine(); // name - - accept(topic, value); + Timer timer = timerMap.get(topic); + if (timer != null) { + timer.runnable.run(); + } } } catch (IOException e) { logger.log(Level.WARNING, "reconnection ", e.getMessage()); @@ -130,8 +132,10 @@ public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer } buf.append("\r\n"); - // todo: 重连 timer 订阅, 需要 - + // 重连 timer 订阅 + timerMap.forEach((name, timer) -> { + send("timer", name, timer.expr, timer.single ? "a" : "x"); + }); send(buf.toString()); } catch (IOException e) { logger.log(Level.WARNING, "Zdb Consumer 初始化失败!", e); @@ -163,10 +167,44 @@ public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer } // timer - private ConcurrentHashMap timerMap = new ConcurrentHashMap(); + private ConcurrentHashMap timerMap = new ConcurrentHashMap(); public void timer(String name, String expr, Runnable run) { - timerMap.put(name, run); - send("timer", name, expr); + timerMap.put(name, new Timer(name, expr, run, false)); + send("timer", name, expr, "x"); + } + + public void timerSingle(String name, String expr, Runnable run) { + send("timer", name, expr, "a"); + timerMap.put(name, new Timer(name, expr, run, true)); + } + + class Timer { + String name; + String expr; + Runnable runnable; + boolean single; + + public String getName() { + return name; + } + + public String getExpr() { + return expr; + } + + public Runnable getRunnable() { + return runnable; + } + + public boolean isSingle() { + return single; + } + + public Timer(String name, String expr, Runnable runnable, boolean single) { + this.name = name; + this.runnable = runnable; + this.single = single; + } } } diff --git a/test/com/zdemo/test/AppTest.java b/test/com/zdemo/test/AppTest.java index 2fdfa0b..373a8f1 100644 --- a/test/com/zdemo/test/AppTest.java +++ b/test/com/zdemo/test/AppTest.java @@ -1,12 +1,11 @@ package com.zdemo.test; import com.zdemo.Event; -import com.zdemo.EventType; -import com.zdemo.IConsumer; import com.zdemo.IProducer; import com.zdemo.zdb.ZHubProducer; import org.junit.Test; import org.redkale.boot.Application; +import org.redkale.util.Utility; import java.util.ArrayList; import java.util.List; @@ -28,18 +27,18 @@ public class AppTest { //启动并开启消费监听 MyConsumer consumer = Application.singleton(MyConsumer.class); - consumer.addEventType( - EventType.of("a", str -> { + /*consumer.addEventType( + EventType.of("ax", str -> { System.out.println("我收到了消息 a 事件:" + str); }) , EventType.of("bx", str -> { System.out.println("我收到了消息 主题bx 事件:" + str); }) - ); + );*/ - consumer.timer("a", "* * * * * *", () -> { - System.out.println("timer a 执行了"); + consumer.timerSingle("a", "*/10 * * * * *", () -> { + System.out.println(Utility.now() + "timer a 执行了"); });