修改:ztimer-cli

This commit is contained in:
lxy 2021-01-11 18:23:54 +08:00
parent 26a1fc4971
commit c457e8094d
2 changed files with 55 additions and 18 deletions

View File

@ -20,11 +20,11 @@ import java.util.logging.Level;
public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer, Service {
@Resource(name = "property.zdb.host")
private String host = "39.108.56.246";
@Resource(name = "property.zdb.password")
@Resource(name = "property.zhub.host")
private String host = "127.0.0.1";
@Resource(name = "property.zhub.password")
private String password = "";
@Resource(name = "property.zdb.port")
@Resource(name = "property.zhub.port")
private int port = 1216;
private ReentrantLock lock = new ReentrantLock();
@ -70,8 +70,10 @@ public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer
reader.readLine(); //$n len(key)
topic = reader.readLine(); // name
accept(topic, value);
Timer timer = timerMap.get(topic);
if (timer != null) {
timer.runnable.run();
}
}
} catch (IOException e) {
logger.log(Level.WARNING, "reconnection ", e.getMessage());
@ -130,8 +132,10 @@ public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer
}
buf.append("\r\n");
// todo: 重连 timer 订阅 需要
// 重连 timer 订阅
timerMap.forEach((name, timer) -> {
send("timer", name, timer.expr, timer.single ? "a" : "x");
});
send(buf.toString());
} catch (IOException e) {
logger.log(Level.WARNING, "Zdb Consumer 初始化失败!", e);
@ -163,10 +167,44 @@ public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer
}
// timer
private ConcurrentHashMap<String, Runnable> timerMap = new ConcurrentHashMap();
private ConcurrentHashMap<String, Timer> timerMap = new ConcurrentHashMap();
public void timer(String name, String expr, Runnable run) {
timerMap.put(name, run);
send("timer", name, expr);
timerMap.put(name, new Timer(name, expr, run, false));
send("timer", name, expr, "x");
}
public void timerSingle(String name, String expr, Runnable run) {
send("timer", name, expr, "a");
timerMap.put(name, new Timer(name, expr, run, true));
}
class Timer {
String name;
String expr;
Runnable runnable;
boolean single;
public String getName() {
return name;
}
public String getExpr() {
return expr;
}
public Runnable getRunnable() {
return runnable;
}
public boolean isSingle() {
return single;
}
public Timer(String name, String expr, Runnable runnable, boolean single) {
this.name = name;
this.runnable = runnable;
this.single = single;
}
}
}

View File

@ -1,12 +1,11 @@
package com.zdemo.test;
import com.zdemo.Event;
import com.zdemo.EventType;
import com.zdemo.IConsumer;
import com.zdemo.IProducer;
import com.zdemo.zdb.ZHubProducer;
import org.junit.Test;
import org.redkale.boot.Application;
import org.redkale.util.Utility;
import java.util.ArrayList;
import java.util.List;
@ -28,18 +27,18 @@ public class AppTest {
//启动并开启消费监听
MyConsumer consumer = Application.singleton(MyConsumer.class);
consumer.addEventType(
EventType.of("a", str -> {
/*consumer.addEventType(
EventType.of("ax", str -> {
System.out.println("我收到了消息 a 事件:" + str);
})
, EventType.of("bx", str -> {
System.out.println("我收到了消息 主题bx 事件:" + str);
})
);
);*/
consumer.timer("a", "* * * * * *", () -> {
System.out.println("timer a 执行了");
consumer.timerSingle("a", "*/10 * * * * *", () -> {
System.out.println(Utility.now() + "timer a 执行了");
});