diff --git a/src/com/zdemo/zdb/ZHubConsumer.java b/src/com/zdemo/zdb/ZHubConsumer.java index 8a6bbd2..0fa8e1f 100644 --- a/src/com/zdemo/zdb/ZHubConsumer.java +++ b/src/com/zdemo/zdb/ZHubConsumer.java @@ -20,11 +20,11 @@ import java.util.logging.Level; public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer, Service { - @Resource(name = "property.zdb.host") - private String host = "39.108.56.246"; - @Resource(name = "property.zdb.password") + @Resource(name = "property.zhub.host") + private String host = "127.0.0.1"; + @Resource(name = "property.zhub.password") private String password = ""; - @Resource(name = "property.zdb.port") + @Resource(name = "property.zhub.port") private int port = 1216; private ReentrantLock lock = new ReentrantLock(); @@ -70,8 +70,10 @@ public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer reader.readLine(); //$n len(key) topic = reader.readLine(); // name - - accept(topic, value); + Timer timer = timerMap.get(topic); + if (timer != null) { + timer.runnable.run(); + } } } catch (IOException e) { logger.log(Level.WARNING, "reconnection ", e.getMessage()); @@ -130,8 +132,10 @@ public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer } buf.append("\r\n"); - // todo: 重连 timer 订阅, 需要 - + // 重连 timer 订阅 + timerMap.forEach((name, timer) -> { + send("timer", name, timer.expr, timer.single ? "a" : "x"); + }); send(buf.toString()); } catch (IOException e) { logger.log(Level.WARNING, "Zdb Consumer 初始化失败!", e); @@ -163,10 +167,44 @@ public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer } // timer - private ConcurrentHashMap timerMap = new ConcurrentHashMap(); + private ConcurrentHashMap timerMap = new ConcurrentHashMap(); public void timer(String name, String expr, Runnable run) { - timerMap.put(name, run); - send("timer", name, expr); + timerMap.put(name, new Timer(name, expr, run, false)); + send("timer", name, expr, "x"); + } + + public void timerSingle(String name, String expr, Runnable run) { + send("timer", name, expr, "a"); + timerMap.put(name, new Timer(name, expr, run, true)); + } + + class Timer { + String name; + String expr; + Runnable runnable; + boolean single; + + public String getName() { + return name; + } + + public String getExpr() { + return expr; + } + + public Runnable getRunnable() { + return runnable; + } + + public boolean isSingle() { + return single; + } + + public Timer(String name, String expr, Runnable runnable, boolean single) { + this.name = name; + this.runnable = runnable; + this.single = single; + } } } diff --git a/test/com/zdemo/test/AppTest.java b/test/com/zdemo/test/AppTest.java index 2fdfa0b..373a8f1 100644 --- a/test/com/zdemo/test/AppTest.java +++ b/test/com/zdemo/test/AppTest.java @@ -1,12 +1,11 @@ package com.zdemo.test; import com.zdemo.Event; -import com.zdemo.EventType; -import com.zdemo.IConsumer; import com.zdemo.IProducer; import com.zdemo.zdb.ZHubProducer; import org.junit.Test; import org.redkale.boot.Application; +import org.redkale.util.Utility; import java.util.ArrayList; import java.util.List; @@ -28,18 +27,18 @@ public class AppTest { //启动并开启消费监听 MyConsumer consumer = Application.singleton(MyConsumer.class); - consumer.addEventType( - EventType.of("a", str -> { + /*consumer.addEventType( + EventType.of("ax", str -> { System.out.println("我收到了消息 a 事件:" + str); }) , EventType.of("bx", str -> { System.out.println("我收到了消息 主题bx 事件:" + str); }) - ); + );*/ - consumer.timer("a", "* * * * * *", () -> { - System.out.println("timer a 执行了"); + consumer.timerSingle("a", "*/10 * * * * *", () -> { + System.out.println(Utility.now() + "timer a 执行了"); });