修改:ztimer-cli
This commit is contained in:
parent
d2e13df56e
commit
1d6ae86cbd
@ -7,5 +7,8 @@ redis.port=6064
|
|||||||
pulsar.serviceurl=pulsar://47.113.228.247:6650
|
pulsar.serviceurl=pulsar://47.113.228.247:6650
|
||||||
|
|
||||||
# zdb
|
# zdb
|
||||||
zdb.host = 127.0.0.1
|
# zdb.host = 127.0.0.1
|
||||||
zdb.port = 1216
|
# zdb.port = 1216
|
||||||
|
|
||||||
|
zhub.host = 47.111.150.118
|
||||||
|
zhub.port = 6066
|
@ -20,11 +20,11 @@ import java.util.logging.Level;
|
|||||||
|
|
||||||
public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer, Service {
|
public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer, Service {
|
||||||
|
|
||||||
@Resource(name = "property.zdb.host")
|
@Resource(name = "property.zhub.host")
|
||||||
private String host = "39.108.56.246";
|
private String host = "127.0.0.1";
|
||||||
@Resource(name = "property.zdb.password")
|
@Resource(name = "property.zhub.password")
|
||||||
private String password = "";
|
private String password = "";
|
||||||
@Resource(name = "property.zdb.port")
|
@Resource(name = "property.zhub.port")
|
||||||
private int port = 1216;
|
private int port = 1216;
|
||||||
|
|
||||||
private ReentrantLock lock = new ReentrantLock();
|
private ReentrantLock lock = new ReentrantLock();
|
||||||
@ -70,8 +70,10 @@ public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer
|
|||||||
reader.readLine(); //$n len(key)
|
reader.readLine(); //$n len(key)
|
||||||
topic = reader.readLine(); // name
|
topic = reader.readLine(); // name
|
||||||
|
|
||||||
|
Timer timer = timerMap.get(topic);
|
||||||
accept(topic, value);
|
if (timer != null) {
|
||||||
|
timer.runnable.run();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logger.log(Level.WARNING, "reconnection ", e.getMessage());
|
logger.log(Level.WARNING, "reconnection ", e.getMessage());
|
||||||
@ -130,8 +132,10 @@ public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer
|
|||||||
}
|
}
|
||||||
buf.append("\r\n");
|
buf.append("\r\n");
|
||||||
|
|
||||||
// todo: 重连 timer 订阅, 需要
|
// 重连 timer 订阅
|
||||||
|
timerMap.forEach((name, timer) -> {
|
||||||
|
send("timer", name, timer.expr, timer.single ? "a" : "x");
|
||||||
|
});
|
||||||
send(buf.toString());
|
send(buf.toString());
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logger.log(Level.WARNING, "Zdb Consumer 初始化失败!", e);
|
logger.log(Level.WARNING, "Zdb Consumer 初始化失败!", e);
|
||||||
@ -163,10 +167,44 @@ public abstract class ZHubConsumer extends AbstractConsumer implements IConsumer
|
|||||||
}
|
}
|
||||||
|
|
||||||
// timer
|
// timer
|
||||||
private ConcurrentHashMap<String, Runnable> timerMap = new ConcurrentHashMap();
|
private ConcurrentHashMap<String, Timer> timerMap = new ConcurrentHashMap();
|
||||||
|
|
||||||
public void timer(String name, String expr, Runnable run) {
|
public void timer(String name, String expr, Runnable run) {
|
||||||
timerMap.put(name, run);
|
timerMap.put(name, new Timer(name, expr, run, false));
|
||||||
send("timer", name, expr);
|
send("timer", name, expr, "x");
|
||||||
|
}
|
||||||
|
|
||||||
|
public void timerSingle(String name, String expr, Runnable run) {
|
||||||
|
send("timer", name, expr, "a");
|
||||||
|
timerMap.put(name, new Timer(name, expr, run, true));
|
||||||
|
}
|
||||||
|
|
||||||
|
class Timer {
|
||||||
|
String name;
|
||||||
|
String expr;
|
||||||
|
Runnable runnable;
|
||||||
|
boolean single;
|
||||||
|
|
||||||
|
public String getName() {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getExpr() {
|
||||||
|
return expr;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Runnable getRunnable() {
|
||||||
|
return runnable;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isSingle() {
|
||||||
|
return single;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Timer(String name, String expr, Runnable runnable, boolean single) {
|
||||||
|
this.name = name;
|
||||||
|
this.runnable = runnable;
|
||||||
|
this.single = single;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,12 +1,11 @@
|
|||||||
package com.zdemo.test;
|
package com.zdemo.test;
|
||||||
|
|
||||||
import com.zdemo.Event;
|
import com.zdemo.Event;
|
||||||
import com.zdemo.EventType;
|
|
||||||
import com.zdemo.IConsumer;
|
|
||||||
import com.zdemo.IProducer;
|
import com.zdemo.IProducer;
|
||||||
import com.zdemo.zdb.ZHubProducer;
|
import com.zdemo.zdb.ZHubProducer;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.redkale.boot.Application;
|
import org.redkale.boot.Application;
|
||||||
|
import org.redkale.util.Utility;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -28,18 +27,18 @@ public class AppTest {
|
|||||||
//启动并开启消费监听
|
//启动并开启消费监听
|
||||||
MyConsumer consumer = Application.singleton(MyConsumer.class);
|
MyConsumer consumer = Application.singleton(MyConsumer.class);
|
||||||
|
|
||||||
consumer.addEventType(
|
/*consumer.addEventType(
|
||||||
EventType.of("a", str -> {
|
EventType.of("ax", str -> {
|
||||||
System.out.println("我收到了消息 a 事件:" + str);
|
System.out.println("我收到了消息 a 事件:" + str);
|
||||||
})
|
})
|
||||||
|
|
||||||
, EventType.of("bx", str -> {
|
, EventType.of("bx", str -> {
|
||||||
System.out.println("我收到了消息 主题bx 事件:" + str);
|
System.out.println("我收到了消息 主题bx 事件:" + str);
|
||||||
})
|
})
|
||||||
);
|
);*/
|
||||||
|
|
||||||
consumer.timer("a", "* * * * * *", () -> {
|
consumer.timerSingle("a", "*/10 * * * * *", () -> {
|
||||||
System.out.println("timer a 执行了");
|
System.out.println(Utility.now() + "timer a 执行了");
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user