新增:dalay 支持字符表达式
This commit is contained in:
parent
b08469963d
commit
873d86fd60
@ -1,4 +1,4 @@
|
|||||||
package com.zdemo.zdb;
|
package com.zdemo.zhub;
|
||||||
|
|
||||||
import com.zdemo.*;
|
import com.zdemo.*;
|
||||||
import org.redkale.convert.json.JsonConvert;
|
import org.redkale.convert.json.JsonConvert;
|
||||||
@ -277,10 +277,44 @@ public abstract class ZHubClient extends AbstractConsumer implements IConsumer,
|
|||||||
send("broadcast", topic, toStr(v));
|
send("broadcast", topic, toStr(v));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 发送 publish 主题消息,若多次发送的 topic + "-" + value 相同,将会做延时重置
|
||||||
public <V> void delay(String topic, V v, int delay) {
|
public <V> void delay(String topic, V v, int delay) {
|
||||||
send("delay", topic, toStr(v), String.valueOf(delay));
|
send("delay", topic, toStr(v), String.valueOf(delay));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 表达式支持:d+[d,H,m,s]
|
||||||
|
public <V> void delay(String topic, V v, String delayExpr) {
|
||||||
|
String endchar = "";
|
||||||
|
int delay;
|
||||||
|
if (delayExpr.matches("^\\d+[d,H,m,s]$")) {
|
||||||
|
endchar = delayExpr.substring(delayExpr.length() - 1);
|
||||||
|
delay = Integer.parseInt(delayExpr.substring(0, delayExpr.length() - 1));
|
||||||
|
} else {
|
||||||
|
if (!delayExpr.matches("^\\d+$")) {
|
||||||
|
throw new IllegalArgumentException(String.format("ScheduledCycle period config error: [%s]", delayExpr));
|
||||||
|
}
|
||||||
|
|
||||||
|
delay = Integer.parseInt(delayExpr);
|
||||||
|
if (delay <= 0L) {
|
||||||
|
throw new IllegalArgumentException(String.format("ScheduledCycle period config error: [%s]", delayExpr));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if ("M".equals(endchar)) {
|
||||||
|
delay *= (1000 * 60 * 60 * 24 * 30);
|
||||||
|
} else if ("d".equals(endchar)) {
|
||||||
|
delay *= (1000 * 60 * 60 * 24);
|
||||||
|
} else if ("H".equals(endchar)) {
|
||||||
|
delay *= (1000 * 60 * 60);
|
||||||
|
} else if ("m".equals(endchar)) {
|
||||||
|
delay *= (1000 * 60);
|
||||||
|
} else if ("s".equals(endchar)) {
|
||||||
|
delay *= 1000;
|
||||||
|
}
|
||||||
|
|
||||||
|
delay(topic, v, delay);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void subscribe(String topic, Consumer<String> consumer) {
|
public void subscribe(String topic, Consumer<String> consumer) {
|
||||||
addEventType(EventType.of(topic, consumer));
|
addEventType(EventType.of(topic, consumer));
|
@ -25,6 +25,7 @@ public class AppTest {
|
|||||||
MyConsumer consumer = Application.singleton(MyConsumer.class);
|
MyConsumer consumer = Application.singleton(MyConsumer.class);
|
||||||
|
|
||||||
consumer.subscribe("a", str -> {
|
consumer.subscribe("a", str -> {
|
||||||
|
System.out.println(System.currentTimeMillis());
|
||||||
logger.info("我收到了消息 a 事件:" + str);
|
logger.info("我收到了消息 a 事件:" + str);
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -40,9 +41,9 @@ public class AppTest {
|
|||||||
consumer.timer("b", () -> {
|
consumer.timer("b", () -> {
|
||||||
System.out.println(Utility.now() + " ----------------- timer b 执行了");
|
System.out.println(Utility.now() + " ----------------- timer b 执行了");
|
||||||
});
|
});
|
||||||
consumer.delay("a", "1", 3000);
|
//consumer.delay("a", "1", 200);
|
||||||
consumer.delay("a", "1", 5000);
|
System.out.println(System.currentTimeMillis());
|
||||||
logger.info("----");
|
consumer.delay("a", "1", "2000");
|
||||||
|
|
||||||
Thread.sleep(60_000 * 60);
|
Thread.sleep(60_000 * 60);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
package com.zdemo.test;
|
package com.zdemo.test;
|
||||||
|
|
||||||
import com.zdemo.zdb.ZHubClient;
|
import com.zdemo.zhub.ZHubClient;
|
||||||
|
|
||||||
public class MyConsumer extends ZHubClient {
|
public class MyConsumer extends ZHubClient {
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user