Compare commits

...

57 Commits

Author SHA1 Message Date
94b1ac4822 . 2024-04-24 20:26:25 +08:00
3fe08e92a8 修改:包结构名称 2024-04-23 19:20:16 +08:00
a0126582bd . 2024-04-22 00:21:54 +08:00
0bedcf366e 优化:1、rpc调用优先调用本地订阅
2、本地模式返回结果与目标类型不一致的转换
2024-04-09 02:06:23 +08:00
af44804282 新增:IType(SHORT、LONG、DOUBLE) 2024-04-08 22:35:55 +08:00
7ce9b4012a 修改:支持jdk17,@PostConstruct 高版本与低版本不兼容 2024-04-08 01:10:25 +08:00
f4771aadf2 新增:trylock 尝试获取锁,并立即返回加锁结果 2023-10-21 13:07:22 +08:00
8e2779f2d8 新增:springboot 组件初始化 zhub 实例 2023-10-21 10:23:59 +08:00
094d3fc5a2 . 2023-07-18 03:55:07 +08:00
9a3314e0e6 修改:工程结构变更为 标准 maven 工程 2023-06-25 02:27:57 +08:00
99cb1ea3c8 修改:topic 消息接收日志打印级别 2023-04-03 19:45:33 +08:00
46082b5461 新增:topic 接收日志, zhub 消息发送日志 2023-04-03 19:25:47 +08:00
35e4bebb6d 修改:断线重连 bug 2023-03-06 21:35:11 +08:00
144b99c3d2 修改:包名称 2023-03-05 19:36:00 +08:00
2296a7fb15 新增:zhub 验权处理 2022-06-23 19:45:11 +08:00
90378c5daf 修改:jdk-api 使用修改,支持 jdk8 语法 2022-01-13 18:21:00 +08:00
0df9e254e6 修改:删除多余依赖调整,代码风格调整 2021-12-31 12:55:01 +08:00
3debcee7d2 新增:zhub-cli 独立分支 2021-12-31 11:53:25 +08:00
6995d103f0 . 2021-11-17 19:04:59 +08:00
ab7e9536a7 修改: 合并 zhub 配置,addr="host:port", 去除原有 port参数 2021-11-17 18:33:18 +08:00
c135ef8925 修改:eventMap 使用 ConcurrentHashMap 2021-11-12 10:36:08 +08:00
a16e448438 修改:日志级别 2021-11-11 17:28:47 +08:00
92ef169b6d . 2021-11-04 19:03:23 +08:00
89d474284c 新增:1.redtimer 代码包暂时合并到此工程
2.新增Timers.tryDelay、Timers.delay 方法
2021-11-03 19:56:27 +08:00
lxy
257e840e7e 新增:rpc 超时处理 2021-10-12 19:00:06 +08:00
lxy
0bb543d1a9 修改:rpc 方法 2021-08-29 19:44:04 +08:00
lxy
a812e2d78d 新增:zhub 服务 ping 回应 2021-08-05 10:00:24 +08:00
lxy
3f2db60432 修改:主题消息内容多行文本支持 2021-07-28 18:16:39 +08:00
lxy
38c023a503 修改:rpc 调用端订阅 topic 逻辑修改 2021-07-07 12:56:04 +08:00
lxy
f7ef96d0f6 修改:延时时间数值类型为long 2021-05-22 16:19:43 +08:00
lxy
fc1f91af04 修改:rpc 调用端带返回结果反序化bug 2021-04-11 22:09:52 +08:00
lxy
c6940cef2b 修改:1、rpc 消费端异常捕获 2、代码风格修改 2021-04-07 16:27:19 +08:00
lxy
ee7154990b 新增:1、lock 锁客户端API 2、rpc调用请求/接收端;
修改:消息发送优化
2021-04-04 23:40:28 +08:00
lxy
3d0d2f8f81 . 2021-03-05 10:59:13 +08:00
lxy
895df0e05a 修改:zhub-client 重连逻辑 2021-03-01 18:50:15 +08:00
lxy
57c216c791 新增:总线消息插件模式注入 2021-02-03 19:14:01 +08:00
lxy
c4639fcabf . 2021-02-03 16:45:42 +08:00
lxy
7acacc2ce3 重构:代码组合逻辑 2021-02-03 11:01:50 +08:00
lxy
80fe44d5d7 新增:redis 缓存(从 redkalex-plugin 迁移至此) 2021-02-02 13:49:51 +08:00
lxy
9652c60dcd 新增:dalay 支持字符表达式 2021-02-02 09:38:21 +08:00
lxy
e5b5c66835 新增:delay 延时事件api 2021-02-01 19:40:22 +08:00
lxy
3c1c38207f 修改:定时调度执行异常和 任务堆叠导致丢失任务bug 2021-01-25 12:50:53 +08:00
lxy
f6c56c62df 删除:producer 的 send 方法 2021-01-23 19:01:50 +08:00
lxy
91e4e48aa2 新增:主题发布标准方法 publish 替换原 send 方法 2021-01-23 11:03:20 +08:00
lxy
361ef58d98 新增:zhub 广播 broadcast 使用主题同订阅发布 2021-01-23 11:01:47 +08:00
lxy
69be5ec3d7 新增:订阅 subscribe 方法 2021-01-22 17:53:38 +08:00
lxy
d83dcd37c2 修改:1.定时调度调整 2.zhub 服务 producer、consumer 合并为 ZHubClient 2021-01-12 19:00:00 +08:00
lxy
f4089d4118 修改:timer api 2021-01-12 17:29:22 +08:00
lxy
d70d9ad244 新增:ZHub consumer 网络连接中断处理 2021-01-12 11:50:21 +08:00
lxy
c457e8094d 修改:ztimer-cli 2021-01-11 18:23:54 +08:00
lxy
26a1fc4971 新增:ztimer 客户端 2021-01-08 19:51:36 +08:00
lxy
4d5fb83b3c . 2021-01-08 18:17:14 +08:00
lxy
5dd58d1fab 新增:zdb 客户端程序实现,其他修改 2021-01-08 15:37:18 +08:00
lxy
a37f1b9564 新增:主题退订 2020-12-08 19:31:15 +08:00
lxy
10472078b0 . 2020-11-30 09:25:23 +08:00
lxy
9138790fd2 . 2020-11-13 18:23:23 +08:00
lxy
1603140c1c 新增:pulsar 消费订阅实现 2020-10-29 12:29:08 +08:00
33 changed files with 2222 additions and 529 deletions

View File

@ -1,20 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<application port="2001">
<resources>
<properties load="redis.properties"></properties>
</resources>
<server protocol="HTTP" port="80">
<request>
<remoteaddr value="request.headers.X-Real-IP"/>
</request>
<rest autoload="true" path=""/>
<services autoload="true"/>
<servlets path="/platf"/>
</server>
</application>

View File

@ -1,19 +0,0 @@
# Producer
#bootstrap.servers=47.111.150.118:6062
#bootstrap.servers=121.196.17.55:6062
bootstrap.servers=39.108.56.246:9092
#bootstrap.servers=122.112.180.156:6062
acks=all
retries=0
batch.size=16384
linger.ms=1
buffer.memory=33554432
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
# Consumer
enable.auto.commit=true
auto.commit.interval.ms=1000
group.id=
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

View File

@ -1,4 +0,0 @@
# redis
redis.host=47.111.150.118
redis.password=*Zhong9307!
redis.port=6064

70
pom.xml Normal file
View File

@ -0,0 +1,70 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>dev.zhub</groupId>
<artifactId>zhub-client-spring</artifactId>
<version>8.0.0424.dev</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<!--<version>4.13.1</version>-->
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<!--<version>1.18.30</version>-->
<scope>provided</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>maven-release</id>
<name>maven-nexus</name>
<url>https://nexus.1216.top/repository/maven-public/</url>
</repository>
</repositories>
<distributionManagement>
<repository>
<id>mvn-release</id>
<name>mvn-release</name>
<url>https://nexus.1216.top/repository/maven-releases/</url>
</repository>
</distributionManagement>
<!--<distributionManagement>
<repository>
<id>mvn-recloud</id>
<name>mvn-release</name>
<url>http://nexus.recloud.cn/repository/maven-releases/</url>
</repository>
</distributionManagement>-->
</project>

View File

@ -1,55 +0,0 @@
package com.zdemo;
import org.redkale.convert.json.JsonConvert;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
/**
* @author Liang
* @data 2020-09-05 23:18
*/
public abstract class AbstractConsumer implements IConsumer {
public final Map<String, EventType> eventMap = new HashMap<>();
public abstract String getGroupid();
public boolean preInit() {
return true;
}
public void addEventType(EventType... eventType) {
for (EventType type : eventType) {
String[] topics = type.topic.split(",");
for (String topic : topics) {
if (topic.isEmpty()) {
continue;
}
eventMap.put(topic, type);
}
}
}
@Override
public final Collection<String> getSubscribes() {
return eventMap.keySet();
}
@Override
public final void accept(String topic, String value) {
EventType eventType = eventMap.get(topic);
Object data = null;
if ("java.lang.String".equals(eventType.typeToken.getType().getTypeName())) {
data = value;
} else {
data = JsonConvert.root().convertFrom(eventType.typeToken.getType(), value);
}
eventType.accept(data);
}
}

View File

@ -1,19 +0,0 @@
package com.zdemo;
import org.redkale.util.TypeToken;
import java.util.Collection;
import java.util.logging.Logger;
public interface IConsumer<T extends Event> {
TypeToken<String> TYPE_TOKEN_STRING = new TypeToken<String>() {
};
TypeToken<Integer> TYPE_TOKEN_INT = new TypeToken<Integer>() {
};
Logger logger = Logger.getLogger(IConsumer.class.getSimpleName());
Collection<String> getSubscribes();
<T> void accept(String topic, String record);
}

View File

@ -1,101 +0,0 @@
package com.zdemo.kafak;
import com.zdemo.AbstractConsumer;
import com.zdemo.EventType;
import com.zdemo.IConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.redkale.net.http.RestService;
import org.redkale.service.Service;
import org.redkale.util.AnyValue;
import javax.annotation.Resource;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import static java.util.Arrays.asList;
/**
* 消费
*/
@RestService
public abstract class KafakConsumer extends AbstractConsumer implements IConsumer, Service {
@Resource(name = "APP_HOME")
protected File APP_HOME;
protected Properties props;
// 0:none 1:restart -1:stop
//private int cmd = -1;
public abstract String getGroupid();
private final LinkedBlockingQueue<EventType> queue = new LinkedBlockingQueue<>();
@Override
public void addEventType(EventType... eventTypes) {
super.addEventType(eventTypes);
try {
for (EventType eventType : eventTypes) {
queue.put(eventType);
}
} catch (InterruptedException e) {
logger.log(Level.WARNING, "", e);
}
}
@Override
public final void init(AnyValue config) {
if (!preInit()) {
return;
}
try (FileInputStream fis = new FileInputStream(new File(APP_HOME, "conf/kafak.properties"));) {
props = new Properties();
props.load(fis);
if (logger.isLoggable(Level.INFO)) logger.info(getGroupid() + " consumer started!");
new Thread(() -> {
try {
props.put("group.id", getGroupid());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(asList("_"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1_000));
records.forEach(record -> {
String topic = record.topic();
long offset = record.offset();
String value = record.value();
try {
accept(topic, value);
} catch (Exception e) {
logger.log(Level.WARNING, String.format("topic[%s] event accept error, offset=%s,value:%s", topic, offset, value), e);
}
});
// 动态新增订阅
while (!queue.isEmpty()) {
queue.clear();
consumer.unsubscribe();
consumer.subscribe(getSubscribes());
}
}
} catch (WakeupException ex) {
System.out.println("WakeupException !!!!");
}
}, "thread-consumer-[" + getGroupid() + "]").start();
} catch (IOException e) {
logger.log(Level.WARNING, "", e);
}
}
}

View File

@ -1,56 +0,0 @@
package com.zdemo.kafak;
import com.zdemo.Event;
import com.zdemo.IProducer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.redkale.convert.json.JsonConvert;
import org.redkale.net.http.RestService;
import org.redkale.service.Service;
import org.redkale.util.AnyValue;
import javax.annotation.Resource;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
import java.util.logging.Level;
/**
* 生产
*
* @param <T>
*/
@RestService
public class KafakProducer<T extends Event> implements IProducer<T>, Service {
private KafkaProducer<String, String> producer;
@Resource(name = "APP_HOME")
protected File APP_HOME;
@Override
public void init(AnyValue config) {
File file = new File(APP_HOME, "conf/kafak.properties");
try (FileInputStream fis = new FileInputStream(file)) {
Properties props = new Properties();
props.load(fis);
producer = new KafkaProducer(props);
} catch (IOException e) {
logger.log(Level.WARNING, "未初始化kafak 生产者kafak发布消息不可用", e);
}
}
@Override
public void send(T t) {
String v = JsonConvert.root().convertTo(t.value);
if (v.startsWith("\"") && v.endsWith("\"")) {
v = v.substring(1, v.length() - 1);
}
producer.send(new ProducerRecord(t.topic, v));
}
@Override
public void destroy(AnyValue config) {
producer.close();
}
}

View File

@ -1,104 +0,0 @@
package com.zdemo.redis;
import com.zdemo.AbstractConsumer;
import com.zdemo.EventType;
import com.zdemo.IConsumer;
import org.redkale.service.Service;
import org.redkale.util.AnyValue;
import javax.annotation.Resource;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.logging.Level;
public abstract class RedisConsumer extends AbstractConsumer implements IConsumer, Service {
@Resource(name = "property.redis.host")
private String host = "127.0.0.1";
@Resource(name = "property.redis.password")
private String password = "";
@Resource(name = "property.redis.port")
private int port = 6379;
private Socket client;
private OutputStreamWriter writer;
private BufferedReader reader;
@Override
public void init(AnyValue config) {
try {
client = new Socket();
client.connect(new InetSocketAddress(host, port));
client.setKeepAlive(true);
writer = new OutputStreamWriter(client.getOutputStream());
writer.write("AUTH " + password + "\r\n");
writer.flush();
StringBuffer buf = new StringBuffer("SUBSCRIBE");
for (String topic : getSubscribes()) {
buf.append(" ").append(topic);
}
buf.append(" _\r\n");
writer.write(buf.toString());
writer.flush();
reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
} catch (IOException e) {
logger.log(Level.WARNING, "Redis Consumer 初始化失败!", e);
}
new Thread(() -> {
try {
while (true) {
String readLine = reader.readLine();
String type = "";
if ("*3".equals(readLine)) {
readLine = reader.readLine(); // $7 len()
type = reader.readLine(); // message
if (!"message".equals(type)) {
continue;
}
reader.readLine(); //$n len(key)
String topic = reader.readLine(); // topic
reader.readLine(); //$n len(value)
String value = reader.readLine(); // value
try {
accept(topic, value);
} catch (Exception e) {
logger.log(Level.WARNING, "topic[" + topic + "] event accept error :" + value, e);
}
}
}
} catch (IOException e) {
logger.log(Level.WARNING, "", e);
}
}).start();
}
@Override
public void addEventType(EventType... eventType) {
for (EventType type : eventType) {
String[] topics = type.topic.split(",");
for (String topic : topics) {
if (topic.isEmpty()) {
continue;
}
eventMap.put(topic, type);
//新增订阅
try {
writer.write("SUBSCRIBE " + topic + "\r\n");
writer.flush();
} catch (IOException e) {
logger.log(Level.WARNING, "", e);
}
}
}
}
}

View File

@ -1,52 +0,0 @@
package com.zdemo.redis;
import com.zdemo.Event;
import com.zdemo.IProducer;
import org.redkale.convert.json.JsonConvert;
import org.redkale.service.Service;
import org.redkale.util.AnyValue;
import javax.annotation.Resource;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.logging.Level;
public class RedisProducer<T extends Event> implements IProducer<T>, Service {
@Resource(name = "property.redis.host")
private String host = "127.0.0.1";
@Resource(name = "property.redis.password")
private String password = "";
@Resource(name = "property.redis.port")
private int port = 6379;
private OutputStreamWriter osw;
@Override
public void init(AnyValue config) {
try {
Socket client = new Socket();
client.connect(new InetSocketAddress(host, port));
client.setKeepAlive(true);
osw = new OutputStreamWriter(client.getOutputStream());
osw.write("AUTH " + password + "\r\n");
osw.flush();
} catch (IOException e) {
logger.log(Level.WARNING, "", e);
}
}
@Override
public void send(T t) {
try {
osw.write("PUBLISH " + t.topic + " '" + JsonConvert.root().convertTo(t.value) + "' \r\n");
osw.flush();
} catch (IOException e) {
logger.log(Level.WARNING, "", e);
}
}
}

View File

@ -0,0 +1,102 @@
package dev.zhub;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import dev.zhub.client.Rpc;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
/**
* @author Liang
* @data 2020-09-05 23:18
*/
public abstract class AbstractConsumer implements IConsumer {
public Gson gson = Rpc.gson;
protected Map<String, EventType<?>> eventMap = new ConcurrentHashMap<>();
protected abstract String getGroupid();
protected boolean preInit() {
return true;
}
protected final Set<String> getTopics() {
if (!eventMap.isEmpty()) {
return eventMap.keySet();
}
HashSet<String> set = new HashSet<>();
set.add("-");
return set;
}
// topic 消息消费前处理
protected void accept(String topic, String value) {
EventType eventType = eventMap.get(topic);
Object data = null;
if ("java.lang.String".equals(eventType.typeToken.getType().getTypeName())) {
data = value;
} else {
data = gson.fromJson(value, eventType.typeToken.getType());
}
eventType.accept(data);
}
// rpc 被调用端
protected <T> void rpcAccept(String topic, T value) {
EventType eventType = eventMap.get(topic);
/*// eventType T 的类型比较
if (!eventType.typeToken.getType().getTypeName().equals(value.getClass().getTypeName())) {
eventType.accept(toStr(value));
} else {
eventType.accept(value);
}*/
eventType.accept(value);
}
protected final void removeEventType(String topic) {
eventMap.remove(topic);
}
/**
* 不同组件的订阅事件 发送
*
* @param topic
*/
protected abstract void subscribe(String topic);
public void subscribe(String topic, Consumer<String> consumer) {
subscribe(topic, TYPE_TOKEN_STRING, consumer);
}
@Override
public <T> void subscribe(String topic, TypeToken<T> typeToken, Consumer<T> consumer) {
if (topic.contains(",")) {
for (String x : topic.split(",")) {
subscribe(x, typeToken, consumer);
}
} else {
eventMap.put(topic, EventType.of(topic, typeToken, consumer));
subscribe(topic);
}
}
protected String toStr(Object v) {
if (v instanceof String) {
return (String) v;
} else if (v == null) {
return null;
}
return gson.toJson(v);
}
}

View File

@ -1,4 +1,4 @@
package com.zdemo;
package dev.zhub;
/**
* 发布订阅 事件
@ -15,9 +15,8 @@ public class Event<V> {
this.value = value;
}
public static <V> Event of(String topic, V value) {
return new Event<V>(topic, value);
public static <V> Event<V> of(String topic, V value) {
return new Event<>(topic, value);
}
}

View File

@ -1,6 +1,7 @@
package com.zdemo;
package dev.zhub;
import org.redkale.util.TypeToken;
import com.google.gson.reflect.TypeToken;
import java.util.function.Consumer;
@ -9,7 +10,7 @@ public class EventType<T> {
public final TypeToken<T> typeToken;
private final Consumer<T> consumer;
private final static TypeToken<String> stringToken = new TypeToken<>() {
private final static TypeToken<String> stringToken = new TypeToken<String>() {
};
private EventType(String topic, TypeToken<T> typeToken, Consumer<T> consumer) {

View File

@ -0,0 +1,37 @@
package dev.zhub;
import com.google.gson.reflect.TypeToken;
import java.util.function.Consumer;
public interface IConsumer {
TypeToken<String> TYPE_TOKEN_STRING = new TypeToken<String>() {
};
TypeToken<Integer> TYPE_TOKEN_INT = new TypeToken<Integer>() {
};
/**
* 取消订阅
*
* @param topic
*/
void unsubscribe(String topic);
/**
* 订阅 接收数据类型 String
*
* @param topic
* @param consumer
*/
void subscribe(String topic, Consumer<String> consumer);
/**
* 订阅接收类型为 <T>
*
* @param topic
* @param typeToken
* @param consumer
* @param <T>
*/
<T> void subscribe(String topic, TypeToken<T> typeToken, Consumer<T> consumer);
}

View File

@ -1,10 +1,9 @@
package com.zdemo;
package dev.zhub;
import java.util.logging.Logger;
public interface IProducer<T extends Event> {
public interface IProducer {
Logger logger = Logger.getLogger(IProducer.class.getSimpleName());
void send(T t);
boolean publish(String topic, Object v);
}

View File

@ -0,0 +1,27 @@
package dev.zhub;
import com.google.gson.reflect.TypeToken;
import java.util.List;
import java.util.Map;
public interface IType {
TypeToken<String> STRING = new TypeToken<String>() {
};
TypeToken<Short> SHORT = new TypeToken<Short>() {
};
TypeToken<Integer> INT = new TypeToken<Integer>() {
};
TypeToken<Long> LONG = new TypeToken<Long>() {
};
TypeToken<Double> DOUBLE = new TypeToken<Double>() {
};
TypeToken<Map<String, String>> MAP = new TypeToken<Map<String, String>>() {
};
TypeToken<List<Map<String, String>>> LMAP = new TypeToken<List<Map<String, String>>>() {
};
}

View File

@ -0,0 +1,25 @@
package dev.zhub.client;
// ================================================== lock ==================================================
public class Lock {
protected String name;
protected String uuid;
protected int duration;
protected boolean success;
protected ZHubClient hubClient;
protected Lock(String name, String uuid, int duration, ZHubClient hubClient) {
this.name = name;
this.uuid = uuid;
this.duration = duration;
this.hubClient = hubClient;
}
public void unLock() {
hubClient.send("unlock", name, uuid);
}
public boolean success() {
return success;
}
}

View File

@ -0,0 +1,70 @@
package dev.zhub.client;
import com.google.gson.Gson;
import com.google.gson.annotations.Expose;
import com.google.gson.reflect.TypeToken;
import lombok.Getter;
import lombok.Setter;
import java.util.UUID;
@Getter
@Setter
public class Rpc<T> {
/*public final static Gson gson = new GsonBuilder()
.excludeFieldsWithoutExposeAnnotation()
.create();*/
public final static Gson gson = new Gson();
private String ruk; // request unique key:
private String topic; // call topic
private T value; // call paras
@Expose(deserialize = false, serialize = false)
private RpcResult rpcResult;
@Expose(deserialize = false, serialize = false)
private TypeToken typeToken;
/*public Rpc() {
}*/
protected Rpc(String appname, String topic, T value) {
this.ruk = appname + "::" + UUID.randomUUID().toString().replaceAll("-", "");
this.topic = topic;
this.value = value;
}
public String getBackTopic() {
return ruk.split("::")[0];
}
public <R> RpcResult<R> render() {
RpcResult<R> response = new RpcResult<>();
response.setRuk(ruk);
return response;
}
public <R> RpcResult<R> render(R result) {
RpcResult<R> response = new RpcResult<>();
response.setRuk(ruk);
response.setResult(result);
return response;
}
public <R> RpcResult<R> retError(String retinfo) {
RpcResult<R> response = new RpcResult<>();
response.setRuk(ruk);
response.setRetcode(100);
response.setRetinfo(retinfo);
return response;
}
public <R> RpcResult<R> retError(int retcode, String retinfo) {
RpcResult<R> response = new RpcResult<>();
response.setRuk(ruk);
response.setRetcode(retcode);
response.setRetinfo(retinfo);
return response;
}
}

View File

@ -0,0 +1,40 @@
package dev.zhub.client;
public class RpcResult<R> {
private String ruk;
private int retcode;
private String retinfo;
private R result;
public String getRuk() {
return ruk;
}
public void setRuk(String ruk) {
this.ruk = ruk;
}
public int getRetcode() {
return retcode;
}
public void setRetcode(int retcode) {
this.retcode = retcode;
}
public String getRetinfo() {
return retinfo;
}
public void setRetinfo(String retinfo) {
this.retinfo = retinfo;
}
public R getResult() {
return result;
}
public void setResult(R result) {
this.result = result;
}
}

View File

@ -0,0 +1,722 @@
package dev.zhub.client;
import com.google.gson.reflect.TypeToken;
import dev.zhub.*;
import dev.zhub.timer.Timers;
import lombok.Setter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
@Component
public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer {
public Logger logger = Logger.getLogger(ZHubClient.class.getSimpleName());
@Setter
@Value("${zhub.addr}")
private String addr = "127.0.0.1:1216";
@Setter
@Value("${zhub.groupid}")
private String groupid = "";
@Setter
@Value("${zhub.auth}")
private String auth = "";
@Setter
@Value("${zhub.appid}")
protected String appid = "";
@PostConstruct
public void init() {
init(null);
}
private Socket client;
private OutputStream writer;
private BufferedReader reader;
private final LinkedBlockingQueue<Timer> timerQueue = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<Event<String>> topicQueue = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<Event<Object>> rpcBackQueue = new LinkedBlockingQueue<>(); // RPC BACK MSG [=> Object]
private final LinkedBlockingQueue<Event<Object>> rpcCallQueue = new LinkedBlockingQueue<>(); // RPC CALL MSG [=> Object]
private final LinkedBlockingQueue<String> sendMsgQueue = new LinkedBlockingQueue<>(); // SEND MSG
private static Map<String, ZHubClient> mainHub = new HashMap<>(); // 127.0.0.1:1216 - ZHubClient
public ZHubClient() {
}
public ZHubClient(String addr, String groupid, String appid, String auth) {
this.addr = addr;
this.groupid = groupid;
this.appid = appid;
this.auth = auth;
init(null);
}
public void init(Map<String, String> config) {
if (!preInit()) {
return;
}
// 自动注入
if (config != null) {
addr = config.getOrDefault("addr", addr);
groupid = config.getOrDefault("groupid", groupid);
appid = config.getOrDefault("appname", appid);
}
// 设置第一个启动的 实例为主实例
if (!mainHub.containsKey(addr)) { // 确保同步执行此 init 逻辑
mainHub.put(addr, this);
}
CompletableFuture.runAsync(() -> {
if (!initSocket(0)) {
return;
}
// 消息 事件接收
new Thread(() -> {
while (true) {
try {
String readLine = reader.readLine();
if (readLine == null && initSocket(Integer.MAX_VALUE)) { // 连接中断 处理
continue;
}
String type = "";
// +ping
if ("+ping".equals(readLine)) {
send("+pong");
continue;
}
// 主题订阅消息
if ("*3".equals(readLine)) {
readLine = reader.readLine(); // $7 len()
type = reader.readLine(); // message
if (!"message".equals(type)) {
continue;
}
reader.readLine(); //$n len(key)
String topic = reader.readLine(); // topic
String lenStr = reader.readLine();//$n len(value)
int clen = 0;
if (lenStr.startsWith("$")) {
clen = Integer.parseInt(lenStr.replace("$", ""));
}
String value = "";
do {
if (!value.isEmpty()) {
value += "\r\n";
}
String s = reader.readLine();
value += s; // value
} while (clen > 0 && clen > strLength(value));
logger.finest("topic[" + topic + "]: " + value);
// lock msg
if ("lock".equals(topic)) {
Lock lock = lockTag.get(value);
if (lock != null) {
synchronized (lock) {
lock.success = true;
lock.notifyAll();
}
}
continue;
}
// trylock msg
if ("trylock".equals(topic)) {
Lock lock = lockTag.get(value);
if (lock != null) {
synchronized (lock) {
lock.success = false;
lock.notifyAll();
}
}
continue;
}
// rpc back msg
if (appid.equals(topic)) {
rpcBackQueue.add(Event.of(topic, value));
continue;
}
// rpc call msg
if (rpcTopics.contains(topic)) {
rpcCallQueue.add(Event.of(topic, value));
continue;
}
// oth msg
topicQueue.add(Event.of(topic, value));
continue;
}
// timer 消息
if ("*2".equals(readLine)) {
readLine = reader.readLine(); // $7 len()
type = reader.readLine(); // message
if (!"timer".equals(type)) {
continue;
}
reader.readLine(); //$n len(key)
String topic = reader.readLine(); // name
logger.finest("timer[" + topic + "]: ");
timerQueue.add(timerMap.get(topic));
continue;
}
logger.finest(readLine);
} catch (IOException e) {
if (e instanceof SocketException) {
initSocket(Integer.MAX_VALUE);
}
e.printStackTrace();
}
}
}).start();
}).thenAcceptAsync(x -> {
// 定时调度事件已加入耗时监控
new Thread(() -> {
ExecutorService pool = Executors.newFixedThreadPool(1);
while (true) {
Timer timer = null;
try {
timer = timerQueue.take();
long start = System.currentTimeMillis();
pool.submit(timer.runnable).get(5, TimeUnit.SECONDS);
long end = System.currentTimeMillis();
logger.finest(String.format("timer [%s] : elapsed time %s ms", timer.name, end - start));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (TimeoutException e) {
logger.log(Level.SEVERE, "timer [" + timer.name + "] time out: " + 5 + " S", e);
pool = Executors.newFixedThreadPool(1);
} catch (Exception e) {
logger.log(Level.WARNING, "timer [" + timer.name + "]", e);
}
}
}).start();
// topic msg已加入耗时监控
new Thread(() -> {
ExecutorService pool = Executors.newFixedThreadPool(1);
while (true) {
Event<String> event = null;
try {
event = topicQueue.take();
String topic = event.topic;
String value = event.value;
pool.submit(() -> accept(topic, value)).get(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (TimeoutException e) {
logger.log(Level.SEVERE, "topic[" + event.topic + "] event deal time out: " + 5 + " S, value: " + toStr(event.value), e);
pool = Executors.newFixedThreadPool(1);
} catch (Exception e) {
logger.log(Level.WARNING, "topic[" + event.topic + "] event accept error :" + toStr(event.value), e);
}
}
}).start();
// rpc back ,仅做数据解析暂无耗时监控
new Thread(() -> {
while (true) {
Event<Object> event = null;
try {
event = rpcBackQueue.take();
//if (event)
logger.finest(String.format("rpc-back:[%s]: %s", event.topic, toStr(event.value)));
rpcAccept(event.value);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
logger.log(Level.WARNING, "rpc-back[" + event.topic + "] event accept error :" + toStr(event.value), e);
}
}
}).start();
// rpc call已加入耗时监控
new Thread(() -> {
ExecutorService pool = Executors.newFixedThreadPool(1);
while (true) {
Event<Object> event = null;
try {
event = rpcCallQueue.take();
logger.finest(String.format("rpc-call:[%s] %s", event.topic, toStr(event.value)));
String topic = event.topic;
Object value = event.value;
pool.submit(() -> rpcAccept(topic, value)).get(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (TimeoutException e) {
logger.log(Level.SEVERE, "topic[" + event.topic + "] event deal time out: " + 5 + " S, value: " + toStr(event.value), e);
pool = Executors.newFixedThreadPool(1);
} catch (Exception e) {
logger.log(Level.WARNING, "rpc-call[" + event.topic + "] event accept error :" + toStr(event.value), e);
}
}
}).start();
// send msg
new Thread(() -> {
while (true) {
String msg = null;
try {
msg = sendMsgQueue.take();
// logger.log(Level.FINEST, "send-msg: [" + msg + "]");
writer.write(msg.getBytes());
writer.flush();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
logger.log(Level.WARNING, "send-msg[" + msg + "] event accept error :", e);
}
}
}).start();
});
}
// ---------------------
// -- 消息发送 --
protected boolean send(String... data) {
if (data.length == 1) {
sendMsgQueue.add(data[0] + "\r\n");
} else if (data.length > 1) {
StringBuilder buf = new StringBuilder();
buf.append("*" + data.length + "\r\n");
for (String d : data) {
buf.append("$" + strLength(d) + "\r\n");
buf.append(d + "\r\n");
}
sendMsgQueue.add(buf.toString());
}
return true;
}
/*protected boolean send(String... data) {
try {
lock.lock();
if (data.length == 1) {
writer.write((data[0] + "\r\n").getBytes());
} else if (data.length > 1) {
writer.write(("*" + data.length + "\r\n").getBytes());
for (String d : data) {
writer.write(("$" + d.length() + "\r\n").getBytes());
writer.write((d + "\r\n").getBytes());
}
}
writer.flush();
return true;
} catch (IOException e) {
logger.log(Level.WARNING, "", e);
} finally {
lock.unlock();
}
return false;
}*/
private int strLength(String str) {
str = str.replaceAll("[^\\x00-\\xff]", "*");
return str.length();
}
protected boolean initSocket(int retry) {
for (int i = 0; i <= retry; i++) {
try {
String[] hostPort = addr.split(":");
String host = hostPort[0];
int port = Integer.parseInt(hostPort[1]);
client = new Socket();
client.connect(new InetSocketAddress(host, port));
client.setKeepAlive(true);
writer = client.getOutputStream();
reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
String groupid = getGroupid();
if (groupid == null || groupid.isEmpty()) {
throw new RuntimeException("ZHubClient groupid can not is empty");
}
send("auth", auth);
send("groupid " + groupid);
StringBuilder buf = new StringBuilder("subscribe lock trylock");
if (mainHub.containsValue(this)) {
buf.append(" ").append(appid);
}
for (String topic : getTopics()) {
buf.append(" ").append(topic);
}
send(buf.toString());
// 重连 timer 订阅
timerMap.forEach((name, timer) -> send("timer", name));
if (retry > 0) {
logger.warning(String.format("ZHubClient[%s][%s] %s Succeed", getGroupid(), i + 1, "reconnection"));
} else {
logger.fine(String.format("ZHubClient[%s] %s Succeed", getGroupid(), "init"));
}
return true;
} catch (Exception e) {
if (i == 0) {
logger.log(Level.WARNING, String.format("ZHubClient[%s] %s Failed 初始化失败!", getGroupid(), retry == 0 ? "init" : "reconnection"), e);
try {
Thread.sleep(1000);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
} else {
logger.log(Level.WARNING, String.format("ZHubClient[%s][%s] reconnection Failed", getGroupid(), i + 1));
try {
Thread.sleep(1000 * 5);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
}
}
}
return false;
}
@Override
public void unsubscribe(String topic) {
send("unsubscribe " + topic);
super.removeEventType(topic);
}
public boolean publish(String topic, Object v) {
/*if (eventMap.containsKey(topic)) { // 本地调用
topicQueue.add(Event.of(topic, v));
return true;
}*/
return send("publish", topic, toStr(v));
}
public void broadcast(String topic, Object v) {
send("broadcast", topic, toStr(v)); // 广播必须走远端模式
}
// 发送 publish 主题消息若多次发送的 topic + "-" + value 相同将会做延时重置
public void delay(String topic, Object v, long millis) {
send("delay", topic, toStr(v), String.valueOf(millis));
}
// 表达式支持d+[d,H,m,s]
public void delay(String topic, Object v, String delayExpr) {
String endchar = "";
int delay;
if (delayExpr.matches("^\\d+[d,H,m,s]$")) {
endchar = delayExpr.substring(delayExpr.length() - 1);
delay = Integer.parseInt(delayExpr.substring(0, delayExpr.length() - 1));
} else {
if (!delayExpr.matches("^\\d+$")) {
throw new IllegalArgumentException(String.format("ScheduledCycle period config error: [%s]", delayExpr));
}
delay = Integer.parseInt(delayExpr);
if (delay <= 0L) {
throw new IllegalArgumentException(String.format("ScheduledCycle period config error: [%s]", delayExpr));
}
}
if ("M".equals(endchar)) {
delay *= (1000 * 60 * 60 * 24 * 30);
} else if ("d".equals(endchar)) {
delay *= (1000 * 60 * 60 * 24);
} else if ("H".equals(endchar)) {
delay *= (1000 * 60 * 60);
} else if ("m".equals(endchar)) {
delay *= (1000 * 60);
} else if ("s".equals(endchar)) {
delay *= 1000;
}
delay(topic, v, delay);
}
@Override
protected String getGroupid() {
return groupid;
}
@Override
protected void subscribe(String topic) {
send("subscribe " + topic); //新增订阅
}
// ================================================== lock ==================================================
private Map<String, Lock> lockTag = new ConcurrentHashMap<>();
/**
* 尝试加锁立即返回
*
* @param key
* @param duration
* @return Lock: lock.success 锁定是否成功标识
*/
public Lock tryLock(String key, int duration) {
return lock("trylock", key, duration);
}
public Lock lock(String key, int duration) {
return lock("lock", key, duration);
}
/**
* @param cmd lock|trylock
* @param key 加锁 key
* @param duration 锁定时长
* @return
*/
private Lock lock(String cmd, String key, int duration) {
String uuid = UUID.randomUUID().toString().replaceAll("-", "");
Lock lock = new Lock(key, uuid, duration, this);
lockTag.put(uuid, lock);
try {
// c.send("lock", key, uuid, strconv.Itoa(duration))
send(cmd, key, uuid, String.valueOf(duration));
synchronized (lock) {
lock.wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return lock;
}
// ================================================== timer ==================================================
private ConcurrentHashMap<String, Timer> timerMap = new ConcurrentHashMap();
class Timer {
String name;
//String expr;
Runnable runnable;
//boolean single;
public String getName() {
return name;
}
public Runnable getRunnable() {
return runnable;
}
public Timer(String name, Runnable runnable) {
this.name = name;
this.runnable = runnable;
}
}
public void timer(String name, Runnable run) {
timerMap.put(name, new Timer(name, run));
send("timer", name);
}
@Deprecated
public void reloadTimer() {
send("cmd", "reload-timer");
}
// ================================================== rpc ==================================================
// -- 调用端 --
private static final Map<String, Rpc> rpcMap = new ConcurrentHashMap<>();
// private static final Map<String, TypeToken> rpcRetType = new ConcurrentHashMap<>();
// rpc call
public RpcResult<Void> rpc(String topic, Object v) {
return rpc(topic, v, null);
}
// rpc call
public <T, R> RpcResult<R> rpc(String topic, T v, TypeToken<R> typeToken) {
return rpc(topic, v, typeToken, 0);
}
// rpc call
public <T, R> RpcResult<R> rpc(String topic, T v, TypeToken<R> typeToken, long timeout) {
Rpc rpc = new Rpc<>(appid, topic, v);
rpc.setTypeToken(typeToken);
String ruk = rpc.getRuk();
rpcMap.put(ruk, rpc);
try {
if (eventMap.containsKey(topic)) { // 本地调用
rpcCallQueue.add(Event.of(topic, rpc));
} else {
rpc.setValue(toStr(rpc.getValue()));
publish(topic, rpc); // send("rpc", topic, toStr(rpc));
}
synchronized (rpc) {
if (timeout <= 0) {
timeout = 1000 * 15;
}
// call timeout default: 15s
Timers.delay(() -> {
synchronized (rpc) {
Rpc rpc1 = rpcMap.get(ruk);
if (rpc1 == null) {
return;
}
RpcResult rpcResult = rpc.retError(505, "请求超时");
rpc.setRpcResult(rpcResult);
logger.warning("rpc timeout: " + gson.toJson(rpc));
rpc.notify();
}
}, timeout);
rpc.wait();
rpcMap.remove(ruk);
}
} catch (InterruptedException e) {
e.printStackTrace();
// call error
RpcResult rpcResult = rpc.retError(501, "请求失败");
rpc.setRpcResult(rpcResult);
}
return rpc.getRpcResult();
}
public <T, R> CompletableFuture<RpcResult<R>> rpcAsync(String topic, T v) {
return CompletableFuture.supplyAsync(() -> rpc(topic, v, null));
}
public <T, R> CompletableFuture<RpcResult<R>> rpcAsync(String topic, T v, TypeToken<R> typeToken) {
return CompletableFuture.supplyAsync(() -> rpc(topic, v, typeToken));
}
public <T, R> CompletableFuture<RpcResult<R>> rpcAsync(String topic, T v, long timeout) {
return CompletableFuture.supplyAsync(() -> rpc(topic, v, null, timeout));
}
public <T, R> CompletableFuture<RpcResult<R>> rpcAsync(String topic, T v, TypeToken<R> typeToken, long timeout) {
return CompletableFuture.supplyAsync(() -> rpc(topic, v, typeToken, timeout));
}
// RpcResult: {ruk:xxx-xxxx, retcode:0}
// rpc call back consumer
private <T> void rpcAccept(T value) {
// 接收到 本地调用返回的 RpcResult
if (value instanceof RpcResult) {
String ruk = ((RpcResult) value).getRuk();
Rpc rpc = rpcMap.remove(ruk);
if (rpc == null) {
return;
}
// TODO, 本地模式下返回的数据对象类型需要和处理端一致不然会出现类型转换异常 - 解决办法当出现不一致的情况取数据做转换
TypeToken typeToken = rpc.getTypeToken();
if (typeToken.getType() != ((RpcResult<?>) value).getResult().getClass()) {
Object result = gson.fromJson(toStr(((RpcResult<?>) value).getResult()), typeToken.getType());
((RpcResult<Object>) value).setResult(result);
}
rpc.setRpcResult((RpcResult) value);
synchronized (rpc) {
rpc.notify();
}
return;
}
RpcResult resp = gson.fromJson((String) value, new TypeToken<RpcResult<String>>() {
}.getType());
String ruk = resp.getRuk();
Rpc rpc = rpcMap.remove(ruk);
if (rpc == null) {
return;
}
TypeToken typeToken = rpc.getTypeToken();
Object result = resp.getResult();
if (result != null && typeToken != null && !"java.lang.String".equals(typeToken.getType().getTypeName()) && !"java.lang.Void".equals(typeToken.getType().getTypeName())) {
result = gson.fromJson((String) resp.getResult(), typeToken.getType());
}
resp.setResult(result);
rpc.setRpcResult(resp);
synchronized (rpc) {
rpc.notify();
}
}
// -- 订阅端 --
private Set<String> rpcTopics = new HashSet();
// rpc call consumer
public <R> void rpcSubscribe(String topic, Function<Rpc<String>, RpcResult<R>> fun) {
rpcSubscribe(topic, IType.STRING, fun);
}
// rpc call consumer
public <T, R> void rpcSubscribe(String topic, TypeToken<T> typeToken, Function<Rpc<T>, RpcResult<R>> fun) {
Consumer<T> consumer = v -> {
Rpc<T> rpc = null;
try {
if (v instanceof String) {
rpc = gson.fromJson((String) v, new TypeToken<Rpc<String>>() {
}.getType());
} else {
rpc = (Rpc<T>) v;
}
// 参数转换
if (rpc.getValue() instanceof String && !"java.lang.String".equals(typeToken.getType().getTypeName())) {
T paras = gson.fromJson((String) rpc.getValue(), typeToken.getType());
rpc.setValue(paras);
}
RpcResult result = fun.apply(rpc);
if (appid.equals(rpc.getBackTopic())) {
rpcBackQueue.add(Event.of(topic, result));
} else {
result.setResult(toStr(result.getResult())); // 远程模式 结果转换
publish(rpc.getBackTopic(), result);
}
} catch (Exception e) {
logger.log(Level.WARNING, "rpc call consumer error: " + v, e);
publish(rpc.getBackTopic(), rpc.retError("服务调用失败!"));
}
// back
};
rpcTopics.add(topic);
subscribe(topic, typeToken, consumer);
}
public static void main(String[] args) {
System.out.println(IType.INT.getType());
Integer a = 1;
System.out.println(a.getClass());
}
}

View File

@ -0,0 +1,66 @@
package dev.zhub.timer;
import dev.zhub.timer.queue.TimerQueue;
import dev.zhub.timer.task.Task;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Logger;
/**
* @author: liangxianyou
*/
public class TimerExecutor {
private Logger logger = Logger.getLogger(this.getClass().getSimpleName());
private TimerQueue queue = new TimerQueue();
private ExecutorService executor;
public TimerExecutor(int n) {
executor = Executors.newFixedThreadPool(n);
start();
}
public void add(Task... task) {
for (Task t : task) {
t.setTimerExecutor(this);
queue.push(t);
// logger.finest("add new task : " + t.getName());
}
}
protected void add(Task task, boolean upTime) {
task.setTimerExecutor(this);
if (upTime) task.nextTime();
queue.push(task);
}
public Task remove(String name) {
return queue.remove(name);
}
public Task get(String name) {
return queue.get(name);
}
public void start() {
new Thread(() -> {
while (true) {
try {
Task take = null;
try {
take = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
//执行调度
executor.execute(take);
//add(take, true); //继续添加任务到 队列
} catch (Exception e) {
e.printStackTrace();
}
}
}, "Thread-Redtimer-0").start();
}
}

View File

@ -0,0 +1,108 @@
package dev.zhub.timer;
import dev.zhub.timer.scheduled.Scheduled;
import dev.zhub.timer.task.Job;
import dev.zhub.timer.task.Task;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
/**
* Created by liangxianyou at 2018/7/23 14:33.
*/
public class TimerTask implements Task {
private Logger logger = Logger.getLogger(this.getClass().getSimpleName());
private long startTime = System.currentTimeMillis();
private AtomicInteger execCount = new AtomicInteger();
protected String name;
private long theTime;
private Scheduled scheduled;
private boolean isComplete;
private TimerExecutor timerExecutor;
private Job job;
public static Task by(String name, Scheduled scheduled, Job job) {
TimerTask task = new TimerTask();
task.name = name;
task.scheduled = scheduled;
task.job = job;
return task;
}
@Override
public String getName() {
return name;
}
@Override
public void setScheduled(Scheduled scheduled) {
this.scheduled = scheduled;
//this.theTime = Date.from(scheduled.theTime().atZone(ZoneId.systemDefault()).toInstant()).getTime();
}
@Override
public long nextTime() {
LocalDateTime next = scheduled.nextTime();
this.theTime = Date.from(next.atZone(ZoneId.systemDefault()).toInstant()).getTime();
/*SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("下次执行:"+ sdf.format(next.toInstant(ZoneOffset.of("+8")).toEpochMilli()));*/
return theTime;
}
@Override
public long theTime() {
LocalDateTime next = scheduled.theTime();
this.theTime = Date.from(next.atZone(ZoneId.systemDefault()).toInstant()).getTime();
return theTime;
}
@Override
public boolean isComplete() {
return isComplete;
}
public void setComplete(boolean complete) {
if (isComplete = complete)
timerExecutor.remove(name);
}
public int getExecCount() {
return execCount.get();
}
public TimerExecutor getTimerExecutor() {
return timerExecutor;
}
public void setTimerExecutor(TimerExecutor timerExecutor) {
this.timerExecutor = timerExecutor;
}
public long startTime() {
return startTime;
}
@Override
public void run() {
//没有完成任务继续执行
if (!isComplete) {
int count = execCount.incrementAndGet(); // 执行次数+1
// long start = System.currentTimeMillis();
job.execute(this);
// long end = System.currentTimeMillis();
// logger.finest(String.format("task [%s] : not complete -> %s, time: %s ms, exec count: %s.", getName(), isComplete ? "had complete" : "not complete", end - start, count));
if (!isComplete) {
timerExecutor.add(this, true);
}
}
}
}

View File

@ -0,0 +1,40 @@
package dev.zhub.timer;
import dev.zhub.timer.scheduled.ScheduledCycle;
import java.util.UUID;
import java.util.function.Supplier;
public class Timers {
private static final TimerExecutor timerExecutor = new TimerExecutor(1);
/**
* 本地延时重试
*/
public static void tryDelay(Supplier<Boolean> supplier, long millis, int maxCount) {
timerExecutor.add(TimerTask.by("try-delay-task-" + UUID.randomUUID().toString().replaceAll("-", ""), ScheduledCycle.of(0), task -> {
if (supplier.get() || task.getExecCount() == maxCount) {
task.setComplete(true);
}
if (task.getExecCount() == 1) {
task.setScheduled(ScheduledCycle.of(millis));
}
}));
}
/**
* 本地延时延时时间极短的场景下使用 1分钟内
*/
public static void delay(Runnable runnable, long millis) {
timerExecutor.add(TimerTask.by("delay-task-" + UUID.randomUUID().toString().replaceAll("-", ""), ScheduledCycle.of(millis), task -> {
runnable.run();
task.setComplete(true);
}));
}
}

View File

@ -0,0 +1,105 @@
package dev.zhub.timer.queue;
import dev.zhub.timer.task.Task;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* Created by liangxianyou at 2018/7/23 14:07.
*/
public class TimerQueue {
private ReentrantLock lock = new ReentrantLock();
private Condition isEmpty = lock.newCondition();
private LinkedList<Task> queue = new LinkedList();
/**
* 新加调度任务
*
* @param task
*/
public void push(Task task) {
try {
lock.lock();
remove(task.getName());
int inx = queue.size();//目标坐标
while (inx > 0 && queue.get(inx - 1).theTime() > task.theTime()) {
inx--;
}
queue.add(inx, task);
isEmpty.signal();
} finally {
lock.unlock();
}
}
/**
* 调度等待执行的任务
*
* @return
* @throws InterruptedException
*/
public Task take() throws InterruptedException {
try {
lock.lock();
while (queue.size() == 0) {
isEmpty.await();
}
long currentTime = System.currentTimeMillis();
long nextTime = queue.getFirst().theTime();
if (currentTime >= nextTime) {
return queue.removeFirst();
} else {
isEmpty.await(nextTime - currentTime, TimeUnit.MILLISECONDS);
return take();
}
} finally {
lock.unlock();
}
}
/**
* 删除指定名称的任务
*
* @param name
* @return
*/
public Task remove(String name) {
return get(name, true);
}
/**
* 返回指定名称的任务
*
* @param name
* @return
*/
public Task get(String name) {
return get(name, false);
}
private Task get(String name, boolean remove) {
try {
lock.lock();
Task take = null;
for (int i = 0; i < queue.size(); i++) {
if (name.equals(queue.get(i).getName())) {
take = queue.get(i);
}
}
if (remove && take != null) {
queue.remove(take);
}
isEmpty.signal();
return take;
} finally {
lock.unlock();
}
}
}

View File

@ -0,0 +1,23 @@
package dev.zhub.timer.scheduled;
import java.time.LocalDateTime;
/**
* @author: liangxianyou at 2018/8/5 17:35.
*/
public interface Scheduled {
/**
* 下次执行时间
*
* @return
*/
LocalDateTime nextTime();
/**
* 当前执行时间
*
* @return
*/
LocalDateTime theTime();
}

View File

@ -0,0 +1,79 @@
package dev.zhub.timer.scheduled;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
/**
* @author: liangxianyou at 2018/8/5 18:05.
*/
public class ScheduledCycle implements Scheduled {
private LocalDateTime theTime;
private long period;
private TemporalUnit unit = ChronoUnit.MILLIS;
private ScheduledCycle() {
}
public static Scheduled of(String periodCfg) {
TemporalUnit unit = ChronoUnit.MILLIS;
String endchar = "";
long period;
if (periodCfg.matches("^\\d+[y,M,d,H,m,s]$")) {
endchar = periodCfg.substring(periodCfg.length() - 1);
period = Long.parseLong(periodCfg.substring(0, periodCfg.length() - 1));
} else if (periodCfg.matches("^\\d+$")) {
period = Long.parseLong(periodCfg);
if (period <= 0) {
throw new IllegalArgumentException(String.format("ScheduledCycle period config error: [%s]", periodCfg));
}
} else {
throw new IllegalArgumentException(String.format("ScheduledCycle period config error: [%s]", periodCfg));
}
if ("y".equals(endchar)) unit = ChronoUnit.YEARS;
else if ("M".equals(endchar)) unit = ChronoUnit.MONTHS;
else if ("d".equals(endchar)) unit = ChronoUnit.DAYS;
else if ("H".equals(endchar)) unit = ChronoUnit.HOURS;
else if ("m".equals(endchar)) unit = ChronoUnit.MINUTES;
else if ("s".equals(endchar)) unit = ChronoUnit.SECONDS;
return of(period, unit);
}
public static Scheduled of(long period) {
return of(period, ChronoUnit.MILLIS);
}
public static Scheduled of(long period, TemporalUnit unit) {
LocalDateTime theTime = LocalDateTime.now().plus(period, unit);
return of(theTime, period, unit);
}
public static Scheduled of(LocalDateTime startTime, long period) {
return of(startTime, period, ChronoUnit.MILLIS);
}
public static Scheduled of(LocalDateTime startTime, long period, TemporalUnit unit) {
ScheduledCycle scheduled = new ScheduledCycle();
scheduled.theTime = startTime;
scheduled.period = period;
scheduled.unit = unit;
return scheduled;
}
@Override
public LocalDateTime nextTime() {
if (theTime.isAfter(LocalDateTime.now())) {
return theTime;
}
return theTime = theTime.plus(period, unit);
}
@Override
public LocalDateTime theTime() {
return theTime;
}
}

View File

@ -0,0 +1,499 @@
package dev.zhub.timer.scheduled;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* 时间解析器
*
* @author: liangxianyou
*/
@SuppressWarnings("Duplicates")
public class ScheduledExpres implements Scheduled {
private int year;
private int month;
private int[] minutes;
private int[] hours;
private int[] days;
private int[] monthes;
private int[] weeks;
private String cfg;
private String[] cfgArr;
private LocalDateTime theTime;
private int _y, _M, _d, _H, _m;
@Deprecated
private ScheduledExpres(String cfg) {
this.cfg = cfg;
this.theTime = LocalDateTime.now();
initTheTime();
}
@Deprecated
private ScheduledExpres(final LocalDateTime startTime, String cfg) {
LocalDateTime now = LocalDateTime.now();
this.theTime = now.isAfter(startTime) ? now : startTime;
this.cfg = cfg;
initTheTime();
}
public static Scheduled of(String cfg) {
return new ScheduledExpres(cfg);
}
public static Scheduled of(final LocalDateTime startTime, String cfg) {
return new ScheduledExpres(startTime, cfg);
}
//寻找初始合法时间
public void initTheTime() {
year = theTime.getYear();
month = theTime.getMonthValue();
cfgArr = cfg.split(" ");
setWeeks();
setMonthes();
setDays();
setHours();
setMinutes();
_y = theTime.getYear();
_M = theTime.getMonthValue();
_d = theTime.getDayOfMonth();
_H = theTime.getHour();
_m = theTime.getMinute();
String cmd = "";//y M d H m
if (days.length == 0) cmd = "M";
do {
carry(cmd);
int inx;
if ((inx = nowOk(monthes, _M)) < 0) {
cmd = "y";
continue;
}
_M = monthes[inx];
if ((inx = nowOk(days, _d)) < 0) {
cmd = "M";
continue;
}
_d = days[inx];
if ((inx = nowOk(hours, _H)) < 0) {
cmd = "d";
continue;
}
_H = hours[inx];
if ((inx = nowOk(minutes, _m)) < 0) {
cmd = "H";
continue;
}
_m = minutes[inx];
break;
} while (true);
theTime = LocalDateTime.of(_y, _M, _d, _H, _m);
LocalDateTime now = LocalDateTime.now();
while (theTime.isBefore(now)) {
theTime = carry("m");
}
}
/**
* 下一次执行的时间
*
* @return
*/
@Override
public LocalDateTime nextTime() {
if (theTime.isAfter(LocalDateTime.now())) {
return theTime;
}
return theTime = carry("m");
}
@Override
public LocalDateTime theTime() {
return theTime;
}
/**
* 通过发送指令进行进位
*
* @param cmd 进位指令
*/
private LocalDateTime carry(String cmd) {
int inx;
while (!"".equals(cmd)) {
switch (cmd) {
case "y":
_y = this.year = ++_y;
_M = this.month = monthes[0];
setDays();
if (days.length == 0) {
cmd = "M";
continue;
}
_d = days[0];
_H = hours[0];
_m = minutes[0];
break;
case "M":
if (_M < monthes[0]) {
_M = monthes[0];
break;
}
inx = Arrays.binarySearch(monthes, _M);
if (inx < 0 || inx >= monthes.length - 1) {
cmd = "y";
continue;
}
_M = this.month = monthes[inx + 1];
setDays();
if (days.length == 0) {
cmd = "M";
continue;
}
_d = days[0];
_H = hours[0];
_m = minutes[0];
break;
case "d":
if (_d < days[0]) {
_d = days[0];
break;
}
inx = Arrays.binarySearch(days, _d);
if (inx < 0 || inx >= days.length - 1) {
cmd = "M";
continue;
}
_d = days[inx + 1];
_H = hours[0];
_m = minutes[0];
break;
case "H":
if (_H < hours[0]) {
_H = hours[0];
break;
}
inx = Arrays.binarySearch(hours, _H);
if (inx < 0 || inx >= hours.length - 1) {
cmd = "d";
continue;
}
_H = hours[inx + 1];
_m = minutes[0];
break;
case "m":
if (_m < minutes[0]) {
_m = minutes[0];
break;
}
inx = Arrays.binarySearch(minutes, _m);
if (inx < 0 || inx >= minutes.length - 1) {
cmd = "H";
continue;
}
_m = minutes[inx + 1];
break;
}
cmd = "";
}
return LocalDateTime.of(_y, _M, _d, _H, _m);
}
/**
* 得到初始合法时间的索引
*
* @param arr 合法时间序列
* @param n 初始选中值
* @return 合法时间的索引
*/
private int nowOk(int[] arr, int n) {
if (arr == null || arr.length == 0) return -1;
if (arr[0] > n)
return 0;
if (arr[arr.length - 1] < n)
return 0;
if (arr[arr.length - 1] == n)
return arr.length - 1;
for (int i = 0; i < arr.length - 1; i++) {
if ((arr[i] < n && arr[i + 1] > n) || arr[i] == n) {
return i;
}
}
return -1;
}
/**
* 以下为 初始化合法时间 weeksmonthesdayshourminutes 序列
*/
private void setMinutes() {
String cfg = cfgArr[0];
if ("*".equals(cfg)) {//*
minutes = new int[60];
for (int i = 0; i < 60; i++) {
minutes[i] = i;
}
} else if (cfg.matches("^[0-5]??[0-9]??$")) {//n
minutes = new int[1];
minutes[0] = Integer.parseInt(cfg);
} else if (cfg.matches("^[*]/[0-9]+$")) {// */5
String[] strArr = cfg.split("/");
int p = Integer.parseInt(strArr[1]);
minutes = new int[60 / p];
for (int i = 0; i < minutes.length; i++) {
minutes[i] = i * p;
}
} else if (cfg.matches("^([0-5]??[0-9]??,)+([0-5]??[0-9]??)?$")) {//1,3
String[] strings = cfg.split(",");
minutes = new int[strings.length];
for (int i = 0; i < strings.length; i++) {
minutes[i] = Integer.parseInt(strings[i]);
}
} else if (cfg.matches("^[0-5]??[0-9]??\\-[0-5]??[0-9]??$")) {//1-3
String[] split = cfg.split("-");
int s = Integer.parseInt(split[0]);
int e = Integer.parseInt(split[1]);
minutes = new int[e - s + 1];
for (int i = 0; i < minutes.length; i++) {
minutes[i] = s + i;
}
} else if (cfg.matches("^[0-5]??[0-9]??\\-[0-5]??[0-9]??/[0-5]??[0-9]??$")) {//3-18/5
String[] strArr = cfg.split("/");
String[] str2Arr = strArr[0].split("-");
int s = Integer.parseInt(str2Arr[0]);
int e = Integer.parseInt(str2Arr[1]);
int p = Integer.parseInt(strArr[1]);
minutes = new int[(e - s) / p];
for (int i = 0; i < minutes.length; i++) {
minutes[i] = s + i * p;
}
}
}
private void setHours() {
String cfg = cfgArr[1];
if ("*".equals(cfg)) {//*
hours = new int[24];
for (int i = 0; i < hours.length; i++) {
hours[i] = i;
}
} else if (cfg.matches("^[0-5]??[0-9]??$")) {//n
hours = new int[1];
hours[0] = Integer.parseInt(cfg);
} else if (cfg.matches("^[*]/[0-9]+$")) {// */5
String[] strArr = cfg.split("/");
int p = Integer.parseInt(strArr[1]);
hours = new int[24 / p];
for (int i = 0; i < hours.length; i++) {
hours[i] = i * p;
}
} else if (cfg.matches("^([0-5]??[0-9]??,)+([0-5]??[0-9]??)?$")) {//1,3
String[] strArr = cfg.split(",");
hours = new int[strArr.length];
for (int i = 0; i < strArr.length; i++) {
hours[i] = Integer.parseInt(strArr[i]);
}
} else if (cfg.matches("^[0-5]??[0-9]??\\-[0-5]??[0-9]??$")) {//1-3
String[] split = cfg.split("-");
int s = Integer.parseInt(split[0]);
int e = Integer.parseInt(split[1]);
hours = new int[e - s + 1];
for (int i = 0; i < hours.length; i++) {
hours[i] = s + i;
}
} else if (cfg.matches("^[0-5]??[0-9]??\\-[0-5]??[0-9]??/[0-5]??[0-9]??$")) {//3-18/5
String[] strArr = cfg.split("/");
String[] str2Arr = strArr[0].split("-");
int s = Integer.parseInt(str2Arr[0]);
int e = Integer.parseInt(str2Arr[1]);
int p = Integer.parseInt(strArr[1]);
hours = new int[(e - s) / p];
for (int i = 0; i < hours.length; i++) {
hours[i] = s + i * p;
}
}
}
private void setWeeks() {
String cfg = cfgArr[4];
if ("*".equals(cfg)) {//*
weeks = new int[7];
for (int i = 0; i < weeks.length; i++) {
weeks[i] = i + 1;
}
} else if (cfg.matches("^[0-5]??[0-9]??$")) {//n
weeks = new int[1];
weeks[0] = Integer.parseInt(cfg);
} else if (cfg.matches("^[*]/[0-9]+$")) {// */5
String[] strArr = cfg.split("/");
int p = Integer.parseInt(strArr[1]);
weeks = new int[7 / p];
for (int i = 0; i < weeks.length; i++) {
weeks[i] = i * p;
}
} else if (cfg.matches("^([0-5]??[0-9]??,)+([0-5]??[0-9]??)?$")) {//1,3
String[] strArr = cfg.split(",");
weeks = new int[strArr.length];
for (int i = 0; i < strArr.length; i++) {
weeks[i] = Integer.parseInt(strArr[i]);
}
} else if (cfg.matches("^[0-5]??[0-9]??\\-[0-5]??[0-9]??$")) {//1-3
String[] split = cfg.split("-");
int s = Integer.parseInt(split[0]);
int e = Integer.parseInt(split[1]);
weeks = new int[e - s + 1];
for (int i = 0; i < weeks.length; i++) {
weeks[i] = s + i;
}
} else if (cfg.matches("^[0-5]??[0-9]??\\-[0-5]??[0-9]??/[0-5]??[0-9]??$")) {//3-18/5
String[] strArr = cfg.split("/");
String[] str2Arr = strArr[0].split("-");
int s = Integer.parseInt(str2Arr[0]);
int e = Integer.parseInt(str2Arr[1]);
int p = Integer.parseInt(strArr[1]);
weeks = new int[(e - s) / p];
for (int i = 0; i < weeks.length; i++) {
weeks[i] = s + i * p;
}
}
}
private void setMonthes() {
String cfg = cfgArr[3];
if ("*".equals(cfg)) {//*
monthes = new int[12];
for (int i = 0; i < monthes.length; i++) {
monthes[i] = i + 1;
}
} else if (cfg.matches("^[0-5]??[0-9]??$")) {//n
monthes = new int[1];
monthes[0] = Integer.parseInt(cfg);
} else if (cfg.matches("^[*]/[0-9]+$")) {// */5
String[] strArr = cfg.split("/");
int p = Integer.parseInt(strArr[1]);
monthes = new int[12 / p];
for (int i = 0; i < monthes.length; i++) {
monthes[i] = i * p;
}
} else if (cfg.matches("^([0-5]??[0-9]??,)+([0-5]??[0-9]??)?$")) {//1,3
String[] strArr = cfg.split(",");
monthes = new int[strArr.length];
for (int i = 0; i < strArr.length; i++) {
monthes[i] = Integer.parseInt(strArr[i]);
}
} else if (cfg.matches("^[0-5]??[0-9]??\\-[0-5]??[0-9]??$")) {//1-3
String[] split = cfg.split("-");
int s = Integer.parseInt(split[0]);
int e = Integer.parseInt(split[1]);
monthes = new int[e - s + 1];
for (int i = 0; i < monthes.length; i++) {
monthes[i] = s + i;
}
} else if (cfg.matches("^[0-5]??[0-9]??\\-[0-5]??[0-9]??/[0-5]??[0-9]??$")) {//3-18/5
String[] strArr = cfg.split("/");
String[] str2Arr = strArr[0].split("-");
int s = Integer.parseInt(str2Arr[0]);
int e = Integer.parseInt(str2Arr[1]);
int p = Integer.parseInt(strArr[1]);
monthes = new int[(e - s) / p];
for (int i = 0; i < monthes.length; i++) {
monthes[i] = s + i * p;
}
}
}
private void setDays() {
String cfg = cfgArr[2];
//当前月份总天数
LocalDate firstDay = LocalDate.of(year, month, 1);
int lengthOfMonth = firstDay.lengthOfMonth();
if ("*".equals(cfg)) {//*
days = new int[lengthOfMonth];
for (int i = 0; i < days.length; i++) {
days[i] = i + 1;
}
} else if (cfg.matches("^[0-5]??[0-9]??$")) {//n
days = new int[1];
days[0] = Integer.parseInt(cfg);
} else if (cfg.matches("^[*]/[0-9]+$")) {// */5
String[] strArr = cfg.split("/");
int p = Integer.parseInt(strArr[1]);
days = new int[lengthOfMonth / p];
for (int i = 0; i < days.length; i++) {
days[i] = i * p;
}
} else if (cfg.matches("^([0-5]??[0-9]??,)+([0-5]??[0-9]??)?$")) {//1,3
String[] strArr = cfg.split(",");
days = new int[strArr.length];
for (int i = 0; i < strArr.length; i++) {
days[i] = Integer.parseInt(strArr[i]);
}
} else if (cfg.matches("^[0-5]??[0-9]??\\-[0-5]??[0-9]??$")) {//1-3
String[] split = cfg.split("-");
int s = Integer.parseInt(split[0]);
int e = Integer.parseInt(split[1]);
days = new int[e - s + 1];
for (int i = 0; i < days.length; i++) {
days[i] = s + i;
}
} else if (cfg.matches("^[0-5]??[0-9]??\\-[0-5]??[0-9]??/[0-5]??[0-9]??$")) {//3-18/5
String[] strArr = cfg.split("/");
String[] str2Arr = strArr[0].split("-");
int s = Integer.parseInt(str2Arr[0]);
int e = Integer.parseInt(str2Arr[1]);
int p = Integer.parseInt(strArr[1]);
days = new int[(e - s) / p];
for (int i = 0; i < days.length; i++) {
days[i] = s + i * p;
}
}
int firstWeek = firstDay.getDayOfWeek().getValue();
List<Integer> allDay = new ArrayList<>();
for (int i = 0; i < days.length; i++) {
//int week = 7 - Math.abs(i - firstWeek) % 7;//当前星期X
int week;
int d = days[i];
if (d + firstWeek <= 8) {
week = firstWeek + d - 1;
} else {
week = (d - (8 - firstWeek)) % 7;
if (week == 0) week = 7;
}
//System.out.printf("M:%s,d:%s,w:%s%n", month, d, week);
if (Arrays.binarySearch(weeks, week) > -1) {
allDay.add(d);//加入日期
}
}
days = new int[allDay.size()];
for (int i = 0; i < allDay.size(); i++) {
days[i] = allDay.get(i);
}
}
}

View File

@ -0,0 +1,14 @@
package dev.zhub.timer.task;
/**
* @author: liangxianyou at 2018/12/8 17:24.
*/
@FunctionalInterface
public interface Job {
/**
* 任务执行的内容
*/
void execute(Task task);
}

View File

@ -0,0 +1,70 @@
package dev.zhub.timer.task;
import dev.zhub.timer.TimerExecutor;
import dev.zhub.timer.scheduled.Scheduled;
/**
* @author: liangxianyou at 2018/8/5 19:32.
*/
public interface Task extends Runnable {
/**
* 得到任务名称
*
* @return
*/
String getName();
/**
* 设置任务执行计划
*
* @param scheduled
*/
void setScheduled(Scheduled scheduled);
/**
* 得到下一次执行计划的时间并设置thenTime
*
* @return
*/
long nextTime();
/**
* 任务即将执行的时间点
*
* @return
*/
long theTime();
/**
* 是否完成
*
* @return
*/
boolean isComplete();
/**
* 完成任务(结束标记)
*
* @param complete
*/
void setComplete(boolean complete);
/**
* 开始时间创建时间
*
* @return
*/
long startTime();
TimerExecutor getTimerExecutor();
void setTimerExecutor(TimerExecutor timerExecutor);
/**
* 得到总执行次数
*
* @return
*/
int getExecCount();
}

View File

@ -0,0 +1,7 @@
# zhub 配置
zhub:
appid: local_api
addr: 127.0.0.1:1216
groupid: hub-api
auth: token-12345

108
test/HelloService.java Normal file
View File

@ -0,0 +1,108 @@
import org.junit.Before;
import org.junit.Test;
import net.tccn.IType;
import net.tccn.zhub.Lock;
import net.tccn.zhub.ZHubClient;
// @RestService(automapping = true)
public class HelloService {
// @Resource(name = "zhub")
private ZHubClient zhub;
@Before
public void init() {
//zhub = new ZHubClient("127.0.0.1:1216", "g-dev", "DEV-LOCAL", "zchd@123456");
zhub = new ZHubClient("127.0.0.1:1216", "g-dev", "DEV-LOCAL", "token-12345");
zhub.subscribe("tv:test", x -> {
System.out.println(x);
});
Lock lock = zhub.tryLock("lock-a", 5);
System.out.println("lock-1: " + lock.success());
//zhub.init(Kv.of("host", "47.111.150.118", "port", "6066", "groupid", "g-dev", "appname", "DEV-LOCAL"));
// Function<Rpc<T>, RpcResult<R>> fun
/*zhub.rpcSubscribe("x", new TypeToken<String>() {
}, r -> {
return r.buildResp(Map.of("v", r.getValue().toUpperCase() + ": Ok"));
});
zhub.subscribe("sport:reqtime", x -> {
//System.out.println(x);
});
zhub.subscribe("abx", x -> {
System.out.println(x);
});
try {
Thread.sleep(010);
} catch (InterruptedException e) {
e.printStackTrace();
}
zhub.delay("sport:reqtime", "别✈人家的✦女娃子❤🤞🏻", 0);
zhub.delay("sport:reqtime", "别人家的女娃子➾🤞🏻", 0);
zhub.delay("sport:reqtime", "❤别人家✉<EFBFBD>的女娃子❤🤞🏻", 0);*/
/*zhub.delay("sport:reqtime", "中文特殊符号:『』 £ ♀ ‖ 「」\n" +
"英文:# + = & ﹉ .. ^ \"\" ·{ } % ' €\n" +
"数学:+× ° ± ℃ ㎡ ∑ ≥ ∫ ㏄ ⊥ ≯ ∠ ∴ ∈ ∧ ∵ ≮ ㎝ ㏑ ≌ ㎞ № § ℉ ÷ ‰ ㎎ ㎏ ㎜ ㏒ ⊙ ∮ ∝ ∞ º ¹ ² ³ ½ ¾ ¼ ≈ ≡ ≠ ≤ ≦ ≧ ∽ ∷ ∏ ∩ ⌒ √Ψ ¤ ‖ ¶\n" +
"特殊:♤ ♧ ♡ ♢ ♪ ♬ ♭ ✔ ✘ ♞ ♟ ↪ ↣ ♚ ♛ ♝ ☞ ☜ ⇔ ☆ ★ □ ■ ○ ● △ ▲ ▽ ▼ ◇ ◆ ♀ ♂ ※ ↓ ↑ ↔ ↖ ↙ ↗ ↘ ← → ♣ ♠ ♥ ◎ ◣ ◢ ◤ ◥ 卍 ℡ ⊙ ㊣ ® © ™ ㈱ 囍\n" +
"序号:①②③④⑤⑥⑦⑧⑨⑩㈠㈡㈢㈣㈤㈥㈦㈧㈨㈩⑴ ⑵ ⑶ ⑷ ⑸ ⑹ ⑺ ⑻ ⑼ ⑽ ⒈ ⒉ ⒊ ⒋ ⒌ ⒍ ⒎ ⒏ ⒐ ⒑ Ⅱ Ⅲ Ⅳ Ⅵ Ⅶ Ⅷ ⅨⅩ\n" +
"日文:アイウエオァィゥェォカキクケコガギグゲゴサシスセソザジズゼゾタチツテトダヂヅデドッナニヌネノハヒフヘホバビブベボパピプペポマミムメモャヤュユョラリヨルレロワヰヱヲンヴヵヶヽヾ゛゜ー、。「「あいうえおぁぃぅぇぉかきくけこがぎぐげごさしすせそざじずぜぞたちつてでどっなにぬねのはひふへ」」ほばびぶべぼぱぴぷぺぽまみむめもやゆよゃゅょらりるれろわをんゎ゛゜ー、。「」\n" +
"部首:犭 凵 巛 冖 氵 廴 讠 亻 钅 宀 亠 忄 辶 弋 饣 刂 阝 冫 卩 疒 艹 疋 豸 冂 匸 扌 丬 屮衤 礻 勹 彳 彡", 0);*/
}
@Test
public void rpcTest() {
//RpcResult<String> rpc = zhub.rpc("wx:users", Map.of("appId", "wxa554ec3ab3bf1fc7"), IConsumer.TYPE_TOKEN_STRING);
//RpcResult<String> rpc = zhub.rpc("a", "fa", IConsumer.TYPE_TOKEN_STRING);
zhub.publish("tv:test", "hello ym!");
zhub.subscribe("tv:abx", x -> {
System.out.println(x);
});
zhub.rpcSubscribe("rpc-x", IType.STRING, x -> {
return x.render(x.getValue().toUpperCase());
});
Lock lock = zhub.tryLock("lock-a", 5);
System.out.println("lock-2: " + lock.success());
try {
Thread.sleep(5 * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Lock lock2 = zhub.tryLock("lock-a", 5);
System.out.println("lock-3: " + lock2.success());
/*try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}*/
Lock lock3 = zhub.tryLock("lock-a", 5);
System.out.println("lock-4: " + lock3.success());
try {
Thread.sleep(3000 * 30000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public void lockTest() {
}
/*RpcResult<FileToken> x = zhub.rpc("rpc:file:up-token", Map.of(), new TypeToken<>() {
});*/
}

View File

@ -1,73 +0,0 @@
package com.zdemo.test;
import com.zdemo.Event;
import com.zdemo.EventType;
import com.zdemo.kafak.KafakProducer;
import org.junit.Test;
import org.redkale.boot.Application;
import org.redkale.convert.json.JsonConvert;
import org.redkale.util.TypeToken;
import java.util.List;
import java.util.Map;
import static java.util.Arrays.asList;
/**
* 消息发布订阅测试
*/
public class AppTest {
@Test
public void runConsumer() {
try {
//启动并开启消费监听
MyConsumer consumer = Application.singleton(MyConsumer.class);
consumer.addEventType(
EventType.of("a1", new TypeToken<Float>() {
}, r -> {
System.out.println("我收到了消息 主题a1 事件:" + JsonConvert.root().convertTo(r));
}),
EventType.of("bx", str -> {
System.out.println("我收到了消息 主题bx 事件:" + str);
})
, EventType.of("game-update", str -> {
System.out.println("我收到了消息 主题game-update 事件:" + str);
})
);
Thread.sleep(60_000 * 60);
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void runProducer() {
try {
KafakProducer producer = Application.singleton(KafakProducer.class);
// 发送不同的 事件
float v0 = 1f;
Map v1 = Map.of("k", "v");
List v2 = asList(1, 2, 3);
//producer.send(Event.of("a1", v0));
/*producer.send(Event.of("b1", v1));
producer.send(Event.of("c1", v2));*/
producer.send(Event.of("game-update", 23256));
try {
Thread.sleep(10_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -1,15 +0,0 @@
package com.zdemo.test;
import com.zdemo.kafak.KafakConsumer;
public class MyConsumer extends KafakConsumer {
public String getGroupid() {
return "group-test"; //消费组名称
}
@Override
public boolean preInit() {
return true;
}
}