diff --git a/src/com/zdemo/zdb/ZHubClient.java b/src/com/zdemo/zhub/ZHubClient.java similarity index 87% rename from src/com/zdemo/zdb/ZHubClient.java rename to src/com/zdemo/zhub/ZHubClient.java index ecada2d..5ef3446 100644 --- a/src/com/zdemo/zdb/ZHubClient.java +++ b/src/com/zdemo/zhub/ZHubClient.java @@ -1,4 +1,4 @@ -package com.zdemo.zdb; +package com.zdemo.zhub; import com.zdemo.*; import org.redkale.convert.json.JsonConvert; @@ -277,10 +277,44 @@ public abstract class ZHubClient extends AbstractConsumer implements IConsumer, send("broadcast", topic, toStr(v)); } + // 发送 publish 主题消息,若多次发送的 topic + "-" + value 相同,将会做延时重置 public void delay(String topic, V v, int delay) { send("delay", topic, toStr(v), String.valueOf(delay)); } + // 表达式支持:d+[d,H,m,s] + public void delay(String topic, V v, String delayExpr) { + String endchar = ""; + int delay; + if (delayExpr.matches("^\\d+[d,H,m,s]$")) { + endchar = delayExpr.substring(delayExpr.length() - 1); + delay = Integer.parseInt(delayExpr.substring(0, delayExpr.length() - 1)); + } else { + if (!delayExpr.matches("^\\d+$")) { + throw new IllegalArgumentException(String.format("ScheduledCycle period config error: [%s]", delayExpr)); + } + + delay = Integer.parseInt(delayExpr); + if (delay <= 0L) { + throw new IllegalArgumentException(String.format("ScheduledCycle period config error: [%s]", delayExpr)); + } + } + + if ("M".equals(endchar)) { + delay *= (1000 * 60 * 60 * 24 * 30); + } else if ("d".equals(endchar)) { + delay *= (1000 * 60 * 60 * 24); + } else if ("H".equals(endchar)) { + delay *= (1000 * 60 * 60); + } else if ("m".equals(endchar)) { + delay *= (1000 * 60); + } else if ("s".equals(endchar)) { + delay *= 1000; + } + + delay(topic, v, delay); + } + @Override public void subscribe(String topic, Consumer consumer) { addEventType(EventType.of(topic, consumer)); diff --git a/test/com/zdemo/test/AppTest.java b/test/com/zdemo/test/AppTest.java index 412120d..36cb9c8 100644 --- a/test/com/zdemo/test/AppTest.java +++ b/test/com/zdemo/test/AppTest.java @@ -25,6 +25,7 @@ public class AppTest { MyConsumer consumer = Application.singleton(MyConsumer.class); consumer.subscribe("a", str -> { + System.out.println(System.currentTimeMillis()); logger.info("我收到了消息 a 事件:" + str); }); @@ -40,9 +41,9 @@ public class AppTest { consumer.timer("b", () -> { System.out.println(Utility.now() + " ----------------- timer b 执行了"); }); - consumer.delay("a", "1", 3000); - consumer.delay("a", "1", 5000); - logger.info("----"); + //consumer.delay("a", "1", 200); + System.out.println(System.currentTimeMillis()); + consumer.delay("a", "1", "2000"); Thread.sleep(60_000 * 60); } catch (Exception e) { diff --git a/test/com/zdemo/test/MyConsumer.java b/test/com/zdemo/test/MyConsumer.java index a6110b0..5e8cf6a 100644 --- a/test/com/zdemo/test/MyConsumer.java +++ b/test/com/zdemo/test/MyConsumer.java @@ -1,6 +1,6 @@ package com.zdemo.test; -import com.zdemo.zdb.ZHubClient; +import com.zdemo.zhub.ZHubClient; public class MyConsumer extends ZHubClient {