56 Commits

Author SHA1 Message Date
3fe08e92a8 修改:包结构名称 2024-04-23 19:20:16 +08:00
a0126582bd . 2024-04-22 00:21:54 +08:00
0bedcf366e 优化:1、rpc调用优先调用本地订阅
2、本地模式返回结果与目标类型不一致的转换
2024-04-09 02:06:23 +08:00
af44804282 新增:IType(SHORT、LONG、DOUBLE) 2024-04-08 22:35:55 +08:00
7ce9b4012a 修改:支持jdk17,@PostConstruct 高版本与低版本不兼容 2024-04-08 01:10:25 +08:00
f4771aadf2 新增:trylock 尝试获取锁,并立即返回加锁结果 2023-10-21 13:07:22 +08:00
8e2779f2d8 新增:springboot 组件初始化 zhub 实例 2023-10-21 10:23:59 +08:00
094d3fc5a2 . 2023-07-18 03:55:07 +08:00
9a3314e0e6 修改:工程结构变更为 标准 maven 工程 2023-06-25 02:27:57 +08:00
99cb1ea3c8 修改:topic 消息接收日志打印级别 2023-04-03 19:45:33 +08:00
46082b5461 新增:topic 接收日志, zhub 消息发送日志 2023-04-03 19:25:47 +08:00
35e4bebb6d 修改:断线重连 bug 2023-03-06 21:35:11 +08:00
144b99c3d2 修改:包名称 2023-03-05 19:36:00 +08:00
2296a7fb15 新增:zhub 验权处理 2022-06-23 19:45:11 +08:00
90378c5daf 修改:jdk-api 使用修改,支持 jdk8 语法 2022-01-13 18:21:00 +08:00
0df9e254e6 修改:删除多余依赖调整,代码风格调整 2021-12-31 12:55:01 +08:00
3debcee7d2 新增:zhub-cli 独立分支 2021-12-31 11:53:25 +08:00
6995d103f0 . 2021-11-17 19:04:59 +08:00
ab7e9536a7 修改: 合并 zhub 配置,addr="host:port", 去除原有 port参数 2021-11-17 18:33:18 +08:00
c135ef8925 修改:eventMap 使用 ConcurrentHashMap 2021-11-12 10:36:08 +08:00
a16e448438 修改:日志级别 2021-11-11 17:28:47 +08:00
92ef169b6d . 2021-11-04 19:03:23 +08:00
89d474284c 新增:1.redtimer 代码包暂时合并到此工程
2.新增Timers.tryDelay、Timers.delay 方法
2021-11-03 19:56:27 +08:00
lxy
257e840e7e 新增:rpc 超时处理 2021-10-12 19:00:06 +08:00
lxy
0bb543d1a9 修改:rpc 方法 2021-08-29 19:44:04 +08:00
lxy
a812e2d78d 新增:zhub 服务 ping 回应 2021-08-05 10:00:24 +08:00
lxy
3f2db60432 修改:主题消息内容多行文本支持 2021-07-28 18:16:39 +08:00
lxy
38c023a503 修改:rpc 调用端订阅 topic 逻辑修改 2021-07-07 12:56:04 +08:00
lxy
f7ef96d0f6 修改:延时时间数值类型为long 2021-05-22 16:19:43 +08:00
lxy
fc1f91af04 修改:rpc 调用端带返回结果反序化bug 2021-04-11 22:09:52 +08:00
lxy
c6940cef2b 修改:1、rpc 消费端异常捕获 2、代码风格修改 2021-04-07 16:27:19 +08:00
lxy
ee7154990b 新增:1、lock 锁客户端API 2、rpc调用请求/接收端;
修改:消息发送优化
2021-04-04 23:40:28 +08:00
lxy
3d0d2f8f81 . 2021-03-05 10:59:13 +08:00
lxy
895df0e05a 修改:zhub-client 重连逻辑 2021-03-01 18:50:15 +08:00
lxy
57c216c791 新增:总线消息插件模式注入 2021-02-03 19:14:01 +08:00
lxy
c4639fcabf . 2021-02-03 16:45:42 +08:00
lxy
7acacc2ce3 重构:代码组合逻辑 2021-02-03 11:01:50 +08:00
lxy
80fe44d5d7 新增:redis 缓存(从 redkalex-plugin 迁移至此) 2021-02-02 13:49:51 +08:00
lxy
9652c60dcd 新增:dalay 支持字符表达式 2021-02-02 09:38:21 +08:00
lxy
e5b5c66835 新增:delay 延时事件api 2021-02-01 19:40:22 +08:00
lxy
3c1c38207f 修改:定时调度执行异常和 任务堆叠导致丢失任务bug 2021-01-25 12:50:53 +08:00
lxy
f6c56c62df 删除:producer 的 send 方法 2021-01-23 19:01:50 +08:00
lxy
91e4e48aa2 新增:主题发布标准方法 publish 替换原 send 方法 2021-01-23 11:03:20 +08:00
lxy
361ef58d98 新增:zhub 广播 broadcast 使用主题同订阅发布 2021-01-23 11:01:47 +08:00
lxy
69be5ec3d7 新增:订阅 subscribe 方法 2021-01-22 17:53:38 +08:00
lxy
d83dcd37c2 修改:1.定时调度调整 2.zhub 服务 producer、consumer 合并为 ZHubClient 2021-01-12 19:00:00 +08:00
lxy
f4089d4118 修改:timer api 2021-01-12 17:29:22 +08:00
lxy
d70d9ad244 新增:ZHub consumer 网络连接中断处理 2021-01-12 11:50:21 +08:00
lxy
c457e8094d 修改:ztimer-cli 2021-01-11 18:23:54 +08:00
lxy
26a1fc4971 新增:ztimer 客户端 2021-01-08 19:51:36 +08:00
lxy
4d5fb83b3c . 2021-01-08 18:17:14 +08:00
lxy
5dd58d1fab 新增:zdb 客户端程序实现,其他修改 2021-01-08 15:37:18 +08:00
lxy
a37f1b9564 新增:主题退订 2020-12-08 19:31:15 +08:00
lxy
10472078b0 . 2020-11-30 09:25:23 +08:00
lxy
9138790fd2 . 2020-11-13 18:23:23 +08:00
lxy
1603140c1c 新增:pulsar 消费订阅实现 2020-10-29 12:29:08 +08:00
24 changed files with 287 additions and 1489 deletions

View File

@@ -1,13 +0,0 @@
redkale.name=zhub-dev
redkale.port=6560
redkale.server[0].protocol=HTTP
redkale.server[0].host=127.0.0.1
redkale.server[0].port=80
# redkale.server[0].root = root
redkale.server[0].rest.autoload=true
redkale.server[0].rest.path=
redkale.server[0].services[0].autoload=true
# zhub
redkale.cluster.zhub[hub].addr=47.111.150.118:6066
redkale.cluster.zhub[hub].auth=zchd@123456
redkale.cluster.zhub[hub].groupid=venue-zhub

View File

@@ -1,14 +0,0 @@
# redis
redis.host=47.111.150.118
redis.password=*Zhong9307!
redis.port=6064
# pulsar
pulsar.serviceurl=pulsar://47.113.228.247:6650
# zdb
# zdb.host = 127.0.0.1
# zdb.port = 1216
zhub.host = 127.0.0.1
zhub.port = 1216

View File

@@ -1,18 +0,0 @@
handlers=java.util.logging.ConsoleHandler
# handlers = java.util.logging.FileHandler
############################################################
.level=FINEST
java.level=INFO
javax.level=INFO
com.sun.level=INFO
sun.level=INFO
jdk.level=INFO
java.util.logging.FileHandler.level=FINER
#10M
java.util.logging.FileHandler.limit=10M
java.util.logging.FileHandler.count=20
java.util.logging.FileHandler.encoding=UTF-8
java.util.logging.FileHandler.pattern=${APP_HOME}/logs-%tY%tm/log-%tY%tm%td.log
java.util.logging.FileHandler.unusual=${APP_HOME}/logs-%tY%tm/log-warnerr-%tY%tm%td.log
java.util.logging.FileHandler.append=true
java.util.logging.ConsoleHandler.level=FINEST

View File

@@ -1,5 +0,0 @@
############ ClusterSource @Resource(name="hub") ############
# redkale.cluster.zhub[hub].addr = 47.111.150.118:6066
# redkale.cluster.zhub[hub].auth = zchd@123456
# redkale.cluster.zhub[hub].groupid = venue-zhub

58
pom.xml
View File

@@ -5,15 +5,46 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>dev.zhub</groupId> <groupId>dev.zhub</groupId>
<artifactId>zhub-client-redkale</artifactId> <artifactId>zhub-client-spring</artifactId>
<version>0.1.1.dev</version> <version>17.0.0423.dev</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.10</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties> <properties>
<maven.compiler.source>17</maven.compiler.source> <maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target> <maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties> </properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<!--<version>4.13.1</version>-->
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<!--<version>1.18.30</version>-->
<scope>provided</scope>
</dependency>
</dependencies>
<repositories> <repositories>
<repository> <repository>
<id>maven-release</id> <id>maven-release</id>
@@ -28,19 +59,12 @@
<url>https://nexus.1216.top/repository/maven-releases/</url> <url>https://nexus.1216.top/repository/maven-releases/</url>
</repository> </repository>
</distributionManagement> </distributionManagement>
<!--<distributionManagement>
<repository>
<id>mvn-recloud</id>
<name>mvn-release</name>
<url>http://nexus.recloud.cn/repository/maven-releases/</url>
</repository>
</distributionManagement>-->
<dependencies>
<dependency>
<groupId>org.redkale</groupId>
<artifactId>redkale</artifactId>
<version>2.8.0.dev</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project> </project>

View File

@@ -1,9 +1,10 @@
package dev.zhub; package dev.zhub;
import org.redkale.convert.json.JsonConvert; import com.google.gson.Gson;
import org.redkale.util.Resourcable; import com.google.gson.reflect.TypeToken;
import org.redkale.util.TypeToken; import dev.zhub.client.Rpc;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@@ -13,22 +14,26 @@ import java.util.function.Consumer;
* @author Liang * @author Liang
* @data 2020-09-05 23:18 * @data 2020-09-05 23:18
*/ */
public abstract class AbstractConsumer extends ZhubAgentProvider implements IConsumer, Resourcable { public abstract class AbstractConsumer implements IConsumer {
protected JsonConvert convert = JsonConvert.root(); public Gson gson = Rpc.gson;
protected static String APP_NAME = "";
protected Map<String, EventType<?>> eventMap = new ConcurrentHashMap<>(); protected Map<String, EventType<?>> eventMap = new ConcurrentHashMap<>();
protected abstract String getGroupid(); protected abstract String getGroupid();
protected boolean preInit() {
return true;
}
protected final Set<String> getTopics() { protected final Set<String> getTopics() {
if (!eventMap.isEmpty()) { if (!eventMap.isEmpty()) {
return eventMap.keySet(); return eventMap.keySet();
} }
HashSet<String> set = new HashSet<>();
set.add("-");
return Set.of("-"); return set;
} }
// topic 消息消费前处理 // topic 消息消费前处理
@@ -39,7 +44,7 @@ public abstract class AbstractConsumer extends ZhubAgentProvider implements ICon
if ("java.lang.String".equals(eventType.typeToken.getType().getTypeName())) { if ("java.lang.String".equals(eventType.typeToken.getType().getTypeName())) {
data = value; data = value;
} else { } else {
data = convert.convertFrom(eventType.typeToken.getType(), value); data = gson.fromJson(value, eventType.typeToken.getType());
} }
eventType.accept(data); eventType.accept(data);
@@ -48,6 +53,13 @@ public abstract class AbstractConsumer extends ZhubAgentProvider implements ICon
// rpc 被调用端 // rpc 被调用端
protected <T> void rpcAccept(String topic, T value) { protected <T> void rpcAccept(String topic, T value) {
EventType eventType = eventMap.get(topic); EventType eventType = eventMap.get(topic);
/*// eventType 与 T 的类型比较、
if (!eventType.typeToken.getType().getTypeName().equals(value.getClass().getTypeName())) {
eventType.accept(toStr(value));
} else {
eventType.accept(value);
}*/
eventType.accept(value); eventType.accept(value);
} }
@@ -63,7 +75,7 @@ public abstract class AbstractConsumer extends ZhubAgentProvider implements ICon
protected abstract void subscribe(String topic); protected abstract void subscribe(String topic);
public void subscribe(String topic, Consumer<String> consumer) { public void subscribe(String topic, Consumer<String> consumer) {
subscribe(topic, IType.STRING, consumer); subscribe(topic, TYPE_TOKEN_STRING, consumer);
} }
@Override @Override
@@ -78,19 +90,13 @@ public abstract class AbstractConsumer extends ZhubAgentProvider implements ICon
} }
} }
// --------------
@Override
public String resourceName() {
return super.getName();
}
protected String toStr(Object v) { protected String toStr(Object v) {
if (v instanceof String) { if (v instanceof String) {
return (String) v; return (String) v;
} else if (v == null) { } else if (v == null) {
return null; return null;
} }
return convert.convertTo(v); return gson.toJson(v);
} }
} }

View File

@@ -19,5 +19,4 @@ public class Event<V> {
return new Event<>(topic, value); return new Event<>(topic, value);
} }
} }

View File

@@ -1,6 +1,7 @@
package dev.zhub; package dev.zhub;
import org.redkale.util.TypeToken;
import com.google.gson.reflect.TypeToken;
import java.util.function.Consumer; import java.util.function.Consumer;
@@ -9,7 +10,7 @@ public class EventType<T> {
public final TypeToken<T> typeToken; public final TypeToken<T> typeToken;
private final Consumer<T> consumer; private final Consumer<T> consumer;
private final static TypeToken<String> stringToken = new TypeToken<>() { private final static TypeToken<String> stringToken = new TypeToken<String>() {
}; };
private EventType(String topic, TypeToken<T> typeToken, Consumer<T> consumer) { private EventType(String topic, TypeToken<T> typeToken, Consumer<T> consumer) {

View File

@@ -1,10 +1,14 @@
package dev.zhub; package dev.zhub;
import org.redkale.util.TypeToken; import com.google.gson.reflect.TypeToken;
import java.util.function.Consumer; import java.util.function.Consumer;
public interface IConsumer { public interface IConsumer {
TypeToken<String> TYPE_TOKEN_STRING = new TypeToken<String>() {
};
TypeToken<Integer> TYPE_TOKEN_INT = new TypeToken<Integer>() {
};
/** /**
* 取消订阅 * 取消订阅

View File

@@ -1,33 +1,27 @@
package dev.zhub; package dev.zhub;
import org.redkale.util.TypeToken; import com.google.gson.reflect.TypeToken;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
public interface IType { public interface IType {
TypeToken<String> STRING = new TypeToken<String>() { TypeToken<String> STRING = new TypeToken<>() {
}; };
TypeToken<Integer> INT = new TypeToken<Integer>() { TypeToken<Short> SHORT = new TypeToken<>() {
};
TypeToken<Integer> INT = new TypeToken<>() {
};
TypeToken<Long> LONG = new TypeToken<>() {
};
TypeToken<Double> DOUBLE = new TypeToken<>() {
}; };
TypeToken<Short> SHORT = new TypeToken<Short>() { TypeToken<Map<String, String>> MAP = new TypeToken<>() {
};
TypeToken<Long> LONG = new TypeToken<Long>() {
};
TypeToken<Map<String, String>> MAP = new TypeToken<Map<String, String>>() {
}; };
TypeToken<List<Map<String, String>>> LMAP = new TypeToken<List<Map<String, String>>>() { TypeToken<List<Map<String, String>>> LMAP = new TypeToken<List<Map<String, String>>>() {
}; };
TypeToken<List<String>> LSTRING = new TypeToken<List<String>>() {
};
TypeToken<List<Integer>> LINT = new TypeToken<List<Integer>>() {
};
} }

View File

@@ -1,392 +0,0 @@
package dev.zhub;
import org.junit.Test;
import org.redkale.net.AsyncIOGroup;
import org.redkale.util.AnyValue;
import org.redkale.util.ResourceFactory;
import org.redkalex.cache.redis.MyRedisCacheSource;
import static org.redkale.boot.Application.RESNAME_APP_CLIENT_ASYNCGROUP;
import static org.redkale.source.AbstractCacheSource.*;
public class RedisTest {
static MyRedisCacheSource source = new MyRedisCacheSource();
/**
* 3
*/
@Test
public void keyTest() {
source.set("a", 3);
System.out.println(source.get("a"));
source.del("a");
}
/**
* ax:false
* ax:true
* ax:false
*/
@Test
public void bitTest() {
boolean ax = source.getBit("ax", 6);
System.out.println("ax:"+ ax); // false
source.setBit("ax", 6, true);
ax = source.getBit("ax", 6);
System.out.println("ax:"+ ax); // true
source.setBit("ax", 6, false);
ax = source.getBit("ax", 6);
System.out.println("ax:"+ ax); // false
source.del("ax");
}
@Test
public void setTest() {
source.del("setx");
source.sadd("setx", int.class, 1, 2, 3, 5, 6);
int setx = source.spop("setx", int.class);
System.out.println(setx);
setx = source.spop("setx", int.class);
System.out.println(setx);
source.del("setx");
source.srem("setx", int.class,213, 2312);
/*//source.sadd("setx", list.toArray(Integer[]::new));
List<Integer> list = List.of(2, 3, 5);
// source.sadd("setx", list.toArray(Integer[]::new));
source.sadd("setx", list.toArray(Integer[]::new));
source.sadd("setx", 12, 2312, 213);
source.sadd("setx", List.of(1011, 10222));*/
}
static { // redis://:*Zhong9307!@47.111.150.118:6064?db=2
AnyValue.DefaultAnyValue conf = new AnyValue.DefaultAnyValue().addValue(CACHE_SOURCE_MAXCONNS, "1");
conf.addValue(CACHE_SOURCE_NODES, "redis://:123456@127.0.0.1:6379?db=0");
final ResourceFactory factory = ResourceFactory.create();
final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16);
asyncGroup.start();
factory.register(RESNAME_APP_CLIENT_ASYNCGROUP, asyncGroup);
factory.inject(source);
//source.defaultConvert = JsonFactory.root().getConvert();
source.init(conf);
/*
source.lock("lockx", 5000);
*/
source.keysStartsWith("more-hot").forEach(x -> {
System.out.println(x);
source.del(x);
int i = (short) 3;
});
//--------------------- set ------------------------------
/*
*/
/*
Collection<String> setx1 = source.getCollection("setx", String.class);
System.out.println(setx1);
//source.getexLong()
source.setHms("hmx", Map.of("a", "5", "b", "51", "c", "ads"));
List<Serializable> hmget = source.hmget("hmx", int.class, "a");
System.out.println(hmget);
Integer hm = source.getHm("hmx", int.class, "ads");
System.out.println(hm);
Map<String, String> hms = source.getHms("hmx", "a", "b");
System.out.println("hmx:" + hms);
*//*System.out.println("======================================================");
System.out.println(source.incrHm("hmx", "a", -6.0));
hms = source.getHms("hmx", "a", "b");
System.out.println("hmxa+1后的结果 " + hms);*//*
System.out.println("======================================================");
source.setHm("hmx", "c", 12);
hms = source.getHms("hmx", "a", "b", "c", "d", "a");
System.out.println("hmx设置 c=12 后的结果 " + hms);
System.out.println("======================================================");
Double c = source.getHm("hmx", double.class, "c");
System.out.println("hmx 中 c 值:" + c);*/
/*Map<String, Object> hmx = source.getHmall("hmx");
System.out.println("Hmall" + hmx);*/
/*AnyValue.DefaultAnyValue conf = new AnyValue.DefaultAnyValue();
conf.addValue("node", new AnyValue.DefaultAnyValue().addValue("addr", "47.111.150.118").addValue("port", "6064").addValue("password", "*Zhong9307!").addValue("db", 2));
source.defaultConvert = JsonFactory.root().getConvert();
source.initValueType(String.class); //value用String类型
source.init(conf);*/
}
public static void main(String[] args) {
//source.setLong("a", 125);
/*long a = source.getLong("a", 0);
System.out.println(a);
List<String> keys = source.keys("farm*");
keys.forEach(x -> System.out.println(x));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}*/
// ===========================================
//System.out.println(source.remove("a", "b"));
// bit
/*source.initValueType(Integer.class);
source.remove("a");
boolean a = source.getBit("a", 1);
System.out.println(a);
source.setBit("a", 1, true);
a = source.getBit("a", 1);
System.out.println("bit-a-1: " + a);
source.setBit("a", 1, false);
a = source.getBit("a", 1);
System.out.println("bit-a-1: " + a);*/
/*source.remove("a");
// setnx
System.out.println(source.setnx("a", 1));
source.remove("a");
System.out.println(source.setnx("a", 1));
// set
source.remove("abx1");
source.appendSetItems("abx1", "a", "b", "c");
List<String> list = source.srandomItems("abx1", 2);
String str = source.srandomItem("abx1"); //r
System.out.println(list);//[r1, r2] */
/*int[] arr = {0};
ExecutorService executor = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(10_0000);
for (int i = 0; i < 10_0000; i++) {
executor.submit(() -> {
try {
source.lock("a", 50000);
arr[0]++;
System.out.println("Thread: " + Thread.currentThread().getName());
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
} finally {
source.unlock("a");
latch.countDown();
}
});
}
try {
latch.await();
System.out.println("n=" + arr[0]);
executor.shutdown();
} catch (InterruptedException e) {
e.printStackTrace();
}*/
/*List<String> list = (List) source.getCollection("gamerank-comment-stat");
System.out.println(list);*/
/*for (int i = 0; i < 10; i++) {
String brpop = source.brpop("z", 2);
System.out.println(brpop);
}*/
// key 测试
/*source.set("a", "123321");
System.out.println(source.get("a")); // 123321
System.out.println(source.getTtl("a")); // -1
System.out.println(source.getPttl("a")); // -1
System.out.println(source.getPttl("x")); // -2*/
// hashmap 测试
/*source.remove("sk");
source.setHm("sk", "a", "1");
source.setHm("sk", "b", "2");
System.out.println(source.getHm("sk", "a")); // 1
source.remove("sk");
source.setHms("sk", Map.of("b", "5", "c", "3", "a", "1"));
source.hdel("sk", "a");
Map map = source.getHms("sk", "a", "x", "b", "c", "f"); // {b=5, c=3}
System.out.println(map);
System.out.println(source.getHmall("sk")); //{b=5, c=3}
System.out.println(source.incrHm("sk", "b", 1.1d)); // b = 6.1
System.out.println(source.incrHm("sk", "c", 1)); // c = 4
System.out.println(source.getHmall("sk")); //{b=6.1, c=4}
System.out.println("--------------");
System.out.println(source.hexists("sk", "b")); // true
System.out.println(source.getCollectionSize("sk")); // 2*/
/*Map<String, String> hms = source.getHms("supportusers", "5-kfeu0f", "xxxx", "3-0kbt7u8t", "95q- ");
hms.forEach((k, v) -> {
System.out.println(k + " : " + v);
});*/
/*MyRedisCacheSource<String> source2 = new MyRedisCacheSource();
source2.defaultConvert = JsonFactory.root().getConvert();
source2.initValueType(String.class); //value用String类型
source2.init(conf);*/
/*Map<String, String> gcMap = source.getHmall("hot-gamecomment");
gcMap.forEach((k,v) -> {
System.out.println(k + " : " + v);
});*/
//Map<String, String> gameinfo = source.getHms("gameinfo", "22759", "22838", "10097", "22751", "22632", "22711", "22195", "15821", "10099", "16313", "11345", "10534", "22768", "22647", "22924", "18461", "15871", "17099", "22640", "22644", "10744", "10264", "18032", "22815", "13584", "10031", "22818", "22452", "22810", "10513", "10557", "15848", "11923", "15920", "22808", "20073", "22809", "15840", "12332", "15803", "10597", "22624", "17113", "19578", "22664", "22621", "20722", "16226", "10523", "12304", "10597","11923","10031");
//Map<String, String> gameinfo = source.getHms("gameinfo", "22759","22838","10097","22751","22632","22711","22195","15821","10099","16313","11345","10534","22768","22647","22924","18461","15871","17099","22363","22640","22644","10744","10264","18032","22815","13584","22818","22452","22810","10513","10557","15848","15920","22808","20073","22809","15840","12332","15803","10597","22624","17113","19578","22627","22664","22621","20722","16226","10523","12304");
/*gameinfo.forEach((k,v ) -> {
System.out.println(v);
});*/
/*source.queryKeysStartsWith("articlebean:").forEach(x -> {
System.out.println(x);
//source.remove(x);
//System.out.println(source.getHmall(x));
});*/
// list 测试
/*MyRedisCacheSource<Integer> sourceInt = new MyRedisCacheSource();
sourceInt.defaultConvert = JsonFactory.root().getConvert();
sourceInt.initValueType(Integer.class); //value用String类型
sourceInt.init(conf);
sourceInt.remove("list");
Collection<Integer> list = sourceInt.getCollection("list");
System.out.println(list);
for (int i = 1; i <= 10; i++) {
sourceInt.appendListItem("list", i);
}
System.out.println(sourceInt.getCollection("list")); // [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
sourceInt.appendListItems("list", 11, 12, 13);
System.out.println(sourceInt.getCollection("list")); // [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13]
System.out.println(sourceInt.getCollection("list", 0, 5)); // [1, 2, 3, 4, 5]
System.out.println(sourceInt.getCollectionSize("list")); // 13
List<Integer> ids = new ArrayList<>(100);
for (int i = 0; i < 5000; i++) {
ids.add(i);
}
sourceInt.remove("abx");
sourceInt.appendListItems("abx", ids.toArray(Integer[]::new));
System.out.println(sourceInt.getCollection("abx"));*/
/*System.out.println(sourceInt.getCollectionSize("recommend-user-quality"));
Collection<Integer> uid = sourceInt.getCollection("recommend-user-quality");
System.out.println(uid);*/
// zset 测试
/*source.initValueType(String.class); //value用Integer类型
source.remove("zx");
source.zadd("zx", Map.of("a", 1, "b", 2));
source.zadd("zx", Map.of("a", 1, "c", 5L));
source.zadd("zx", Map.of("x", 20, "j", 3.5));
source.zadd("zx", Map.of("f", System.currentTimeMillis(), "c", 5L));
source.zadd("zx", Map.of("a", 1, "c", 5L));
System.out.println(source.zincr("zx", "a", 1.34)); // 2.34
System.out.println(source.getZsetDoubleScore("zx")); // {f=1592924555704, x=20, c=5, j=3, b=2.34, a=1}
source.zrem("zx", "b", "c", "e", "x");
System.out.println(source.getZsetLongScore("zx")); // {f=1592924555704, j=3, a=2}
System.out.println("--------------");
System.out.println(source.getZsetLongScore("zx", "f"));
System.out.println(source.getZrevrank("zx", "f")); // 0
System.out.println(source.getZrank("zx", "f")); // 2
System.out.println(source.getZrank("zx", "Y")); // -1
System.out.println(source.getCollectionSize("zx")); // 3
System.out.println(source.getZset("zx"));
System.out.println(source.zexists("zx", "f", "x", "a"));*/
/*LocalDate date = LocalDate.of(2019, 12, 31);
for (int i = 0; i < 60; i++) {
LocalDate localDate = date.plusDays(-i);
String day = localDate.format(DateTimeFormatter.ISO_LOCAL_DATE);
System.out.println(String.format("mkdir %s; mv *%s*.zip %s", day, day, day));
}*/
/*MyRedisCacheSource<UserDetail> source = new MyRedisCacheSource();
source.defaultConvert = JsonFactory.root().getConvert();
source.initValueType(UserDetail.class);
source.init(conf);
Map<String, UserDetail> map = source.getHmall("user-detail");
Integer[] array = map.values().stream().map(x -> x.getUserid()).toArray(Integer[]::new);
System.out.println(JsonConvert.root().convertTo(array));
Map<Integer, UserDetail> hms = source.getHms("user-detail", 11746, 11988, 11504, 11987, 11745, 11503, 11748, 11506, 11747, 11989, 11505, 11508, 11507, 11509, 11980, 11740, 11982, 11981, 11984, 11742, 11500, 11983, 11741, 11502, 11744, 11986, 11985, 11501, 11743, 11999, 11757, 11515, 1, 11514, 11998, 11756, 2, 11517, 11516, 11758, 3, 11519, 4, 5, 11518, 6, 7, 11991, 8, 11990, 9, 11993, 11751, 11750, 11992, 11753, 11511, 11995, 11994, 11510, 11752, 11755, 11513, 11997, 11512, 11996, 11754, 11724, 11966, 11965, 11723, 11968, 11726, 11967, 11725, 11728, 11969, 11727, 11729, 11960, 11720, 11962, 11961, 11722, 11964, 11721);
System.out.println(hms.size());*/
/*source.getCollection("article-comment-list", 19, 1).forEach(x -> System.out.println(x));
while (true) {
System.out.println("---" + Utility.now() + "---");
source.getHmall("ck").forEach((k, v) -> {
System.out.println(k + ":" + v);
});
try {
Thread.sleep(60 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}*/
}
}

View File

@@ -1,63 +0,0 @@
package dev.zhub;
import org.redkale.boot.Application;
import org.redkale.boot.NodeServer;
import org.redkale.cluster.CacheClusterAgent;
import org.redkale.cluster.ClusterAgent;
import org.redkale.service.Service;
import org.redkale.util.ResourceEvent;
import java.net.InetSocketAddress;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
public abstract class ZhubAgentProvider extends ClusterAgent {
@Override
public void onResourceChange(ResourceEvent[] events) {
}
@Override
public void register(Application application) {
}
@Override
public void deregister(Application application) {
}
@Override
public CompletableFuture<Set<InetSocketAddress>> queryHttpAddress(String protocol, String module, String resname) {
return null;
}
@Override
public CompletableFuture<Set<InetSocketAddress>> querySncpAddress(String protocol, String restype, String resname) {
return null;
}
@Override
protected CompletableFuture<Set<InetSocketAddress>> queryAddress(ClusterEntry entry) {
return null;
}
@Override
protected ClusterEntry register(NodeServer ns, String protocol, Service service) {
deregister(ns, protocol, service);
ClusterEntry clusterEntry = new ClusterEntry(ns, protocol, service);
CacheClusterAgent.AddressEntry entry = new CacheClusterAgent.AddressEntry();
entry.addr = clusterEntry.address;
entry.resname = clusterEntry.resourceName;
entry.nodeid = this.nodeid;
entry.time = System.currentTimeMillis();
//source.hset(clusterEntry.serviceName, clusterEntry.serviceid, CacheClusterAgent.AddressEntry.class, entry);
return clusterEntry;
}
@Override
protected void deregister(NodeServer ns, String protocol, Service service) {
}
}

View File

@@ -1,21 +0,0 @@
package dev.zhub;
import dev.zhub.client.ZHubClient;
import org.redkale.annotation.Priority;
import org.redkale.cluster.ClusterAgent;
import org.redkale.cluster.ClusterAgentProvider;
import org.redkale.util.AnyValue;
@Priority(1)
public class ZhubProvider implements ClusterAgentProvider {
@Override
public boolean acceptsConf(AnyValue config) {
return new ZHubClient().acceptsConf(config);
}
@Override
public ClusterAgent createInstance() {
return new ZHubClient();
}
}

View File

@@ -1,99 +1,50 @@
package dev.zhub.client; package dev.zhub.client;
import org.redkale.convert.ConvertColumn; import com.google.gson.Gson;
import org.redkale.service.RetResult; import com.google.gson.annotations.Expose;
import org.redkale.util.TypeToken; import com.google.gson.reflect.TypeToken;
import org.redkale.util.Utility; import lombok.Getter;
import lombok.Setter;
import java.util.UUID;
@Getter
@Setter
public class Rpc<T> { public class Rpc<T> {
/*public final static Gson gson = new GsonBuilder()
.excludeFieldsWithoutExposeAnnotation()
.create();*/
public final static Gson gson = new Gson();
private String ruk; // request unique key: private String ruk; // request unique key:
private String topic; // call topic private String topic; // call topic
private T value; // call paras private T value; // call paras
@Expose(deserialize = false, serialize = false)
private RpcResult rpcResult; private RpcResult rpcResult;
@Expose(deserialize = false, serialize = false)
private TypeToken typeToken; private TypeToken typeToken;
public Rpc() { /*public Rpc() {
} }*/
protected Rpc(String appname, String topic, T value) { protected Rpc(String appname, String topic, T value) {
this.ruk = appname + "::" + Utility.uuid(); this.ruk = appname + "::" + UUID.randomUUID().toString().replaceAll("-", "");
this.topic = topic; this.topic = topic;
this.value = value; this.value = value;
} }
public String getRuk() {
return ruk;
}
public void setRuk(String ruk) {
this.ruk = ruk;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public T getValue() {
return value;
}
public void setValue(T value) {
this.value = value;
}
@ConvertColumn(ignore = true)
public RpcResult getRpcResult() {
return rpcResult;
}
public void setRpcResult(RpcResult rpcResult) {
this.rpcResult = rpcResult;
}
@ConvertColumn(ignore = true)
public TypeToken getTypeToken() {
return typeToken;
}
@ConvertColumn(ignore = true)
public void setTypeToken(TypeToken typeToken) {
this.typeToken = typeToken;
}
@ConvertColumn(ignore = true)
public String getBackTopic() { public String getBackTopic() {
return ruk.split("::")[0]; return ruk.split("::")[0];
} }
public <R> RpcResult<R> render(int retcode, String retinfo) {
RpcResult<R> response = new RpcResult<>();
response.setRuk(ruk);
response.setRetcode(retcode);
response.setRetinfo(retinfo);
return response;
}
public <R> RpcResult<R> render() { public <R> RpcResult<R> render() {
RpcResult<R> response = new RpcResult<>(); RpcResult<R> response = new RpcResult<>();
response.setRuk(ruk); response.setRuk(ruk);
return response; return response;
} }
public RpcResult render(RetResult result) {
RpcResult resp = new RpcResult<>();
resp.setRuk(ruk);
resp.setRetcode(result.getRetcode());
resp.setRetinfo(result.getRetinfo());
resp.setResult(result.getResult());
return resp;
}
public <R> RpcResult<R> render(R result) { public <R> RpcResult<R> render(R result) {
RpcResult<R> response = new RpcResult<>(); RpcResult<R> response = new RpcResult<>();
response.setRuk(ruk); response.setRuk(ruk);
@@ -116,5 +67,4 @@ public class Rpc<T> {
response.setRetinfo(retinfo); response.setRetinfo(retinfo);
return response; return response;
} }
} }

View File

@@ -1,14 +1,12 @@
package dev.zhub.client; package dev.zhub.client;
import com.google.gson.reflect.TypeToken;
import dev.zhub.*; import dev.zhub.*;
import jakarta.annotation.PostConstruct;
import lombok.Setter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import dev.zhub.timer.Timers; import dev.zhub.timer.Timers;
import org.redkale.annotation.AutoLoad;
import org.redkale.annotation.ResourceType;
import org.redkale.service.Local;
import org.redkale.service.Service;
import org.redkale.util.AnyValue;
import org.redkale.util.Comment;
import org.redkale.util.TypeToken;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
@@ -24,23 +22,34 @@ import java.util.function.Function;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@Local @Component
@AutoLoad(false) public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer {
@ResourceType(ZHubClient.class)
public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer, Service {
public Logger logger = Logger.getLogger(ZHubClient.class.getSimpleName()); public Logger logger = Logger.getLogger(ZHubClient.class.getSimpleName());
@Setter
@Value("${zhub.addr}")
private String addr = "127.0.0.1:1216"; private String addr = "127.0.0.1:1216";
private String auth = ""; @Setter
@Value("${zhub.groupid}")
private String groupid = ""; private String groupid = "";
@Setter
@Value("${zhub.auth}")
private String auth = "";
@Setter
@Value("${zhub.appid}")
protected String appid = "";
@PostConstruct
public void init() {
init(null);
}
//private ReentrantLock lock = new ReentrantLock();
private Socket client; private Socket client;
private OutputStream writer; private OutputStream writer;
private BufferedReader reader; private BufferedReader reader;
private final LinkedBlockingQueue<Timer> timerQueue = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue<Timer> timerQueue = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<Event<String>> topicQueue = new LinkedBlockingQueue<>(); // [=> Object] private final LinkedBlockingQueue<Event<String>> topicQueue = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<Event<Object>> rpcBackQueue = new LinkedBlockingQueue<>(); // RPC BACK MSG [=> Object] private final LinkedBlockingQueue<Event<Object>> rpcBackQueue = new LinkedBlockingQueue<>(); // RPC BACK MSG [=> Object]
private final LinkedBlockingQueue<Event<Object>> rpcCallQueue = new LinkedBlockingQueue<>(); // RPC CALL MSG [=> Object] private final LinkedBlockingQueue<Event<Object>> rpcCallQueue = new LinkedBlockingQueue<>(); // RPC CALL MSG [=> Object]
private final LinkedBlockingQueue<String> sendMsgQueue = new LinkedBlockingQueue<>(); // SEND MSG private final LinkedBlockingQueue<String> sendMsgQueue = new LinkedBlockingQueue<>(); // SEND MSG
@@ -51,46 +60,24 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
} }
public ZHubClient(String name, Map<String, String> attr) { public ZHubClient(String addr, String groupid, String appid, String auth) {
this.APP_NAME = name; this.addr = addr;
this.addr = attr.get("addr"); this.groupid = groupid;
this.groupid = attr.get("groupid"); this.appid = appid;
this.auth = attr.get("auth"); this.auth = auth;
init(null);
this.initClient(null);
} }
@Override public void init(Map<String, String> config) {
public void init(AnyValue config) { if (!preInit()) {
APP_NAME = application.getName();
/*if (!preInit()) {
return; return;
}*/
if (config == null) {
initClient(null);
} else {
Map<String, AnyValue> nodes = getNodes(config);
for (String rsName : nodes.keySet()) {
ZHubClient client = new ZHubClient().initClient(nodes.get(rsName));
application.getResourceFactory().register(rsName, client);
}
} }
}
private ZHubClient initClient(AnyValue config) {
// 自动注入 // 自动注入
if (config != null) { if (config != null) {
addr = config.getValue("addr", addr); addr = config.getOrDefault("addr", addr);
groupid = config.getOrDefault("groupid", groupid);
// 合并 addr = host:port, 做历史兼容 appid = config.getOrDefault("appname", appid);
int port = config.getIntValue("port", 0);
if (port != 0 && !addr.contains(":")) {
addr = addr + ":" + port;
}
auth = config.getOrDefault("auth", auth);
groupid = config.getValue("groupid", groupid);
} }
// 设置第一个启动的 实例为主实例 // 设置第一个启动的 实例为主实例
@@ -170,7 +157,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
} }
// rpc back msg // rpc back msg
if (APP_NAME.equals(topic)) { if (appid.equals(topic)) {
rpcBackQueue.add(Event.of(topic, value)); rpcBackQueue.add(Event.of(topic, value));
continue; continue;
} }
@@ -314,33 +301,6 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
} }
}).start(); }).start();
}); });
return this;
}
public boolean acceptsConf(AnyValue config) {
if (config == null) {
return false;
}
if (!getNodes(config).isEmpty()) {
return true;
}
return false;
}
private HashMap<String, AnyValue> getNodes(AnyValue config) {
AnyValue[] zhubs = config.getAnyValues("zhub");
HashMap<String, AnyValue> confMap = new HashMap<>();
for (AnyValue zhub : zhubs) {
String[] names = zhub.getNames();
for (String name : names) {
confMap.put(name, zhub.getAnyValue(name));
}
}
return confMap;
} }
// --------------------- // ---------------------
@@ -349,7 +309,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
if (data.length == 1) { if (data.length == 1) {
sendMsgQueue.add(data[0] + "\r\n"); sendMsgQueue.add(data[0] + "\r\n");
} else if (data.length > 1) { } else if (data.length > 1) {
StringBuffer buf = new StringBuffer(); StringBuilder buf = new StringBuilder();
buf.append("*" + data.length + "\r\n"); buf.append("*" + data.length + "\r\n");
for (String d : data) { for (String d : data) {
buf.append("$" + strLength(d) + "\r\n"); buf.append("$" + strLength(d) + "\r\n");
@@ -410,7 +370,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
StringBuilder buf = new StringBuilder("subscribe lock trylock"); StringBuilder buf = new StringBuilder("subscribe lock trylock");
if (mainHub.containsValue(this)) { if (mainHub.containsValue(this)) {
buf.append(" " + APP_NAME); buf.append(" ").append(appid);
} }
for (String topic : getTopics()) { for (String topic : getTopics()) {
buf.append(" ").append(topic); buf.append(" ").append(topic);
@@ -418,9 +378,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
send(buf.toString()); send(buf.toString());
// 重连 timer 订阅 // 重连 timer 订阅
timerMap.forEach((name, timer) -> { timerMap.forEach((name, timer) -> send("timer", name));
send("timer", name);
});
if (retry > 0) { if (retry > 0) {
logger.warning(String.format("ZHubClient[%s][%s] %s Succeed", getGroupid(), i + 1, "reconnection")); logger.warning(String.format("ZHubClient[%s][%s] %s Succeed", getGroupid(), i + 1, "reconnection"));
} else { } else {
@@ -591,21 +549,22 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
// ================================================== rpc ================================================== // ================================================== rpc ==================================================
// -- 调用端 -- // -- 调用端 --
private static Map<String, Rpc> rpcMap = new ConcurrentHashMap<>(); private static final Map<String, Rpc> rpcMap = new ConcurrentHashMap<>();
// private static final Map<String, TypeToken> rpcRetType = new ConcurrentHashMap<>();
@Comment("rpc call") // rpc call
public RpcResult<Void> rpc(String topic, Object v) { public RpcResult<Void> rpc(String topic, Object v) {
return rpc(topic, v, null); return rpc(topic, v, null);
} }
@Comment("rpc call") // rpc call
public <T, R> RpcResult<R> rpc(String topic, T v, TypeToken<R> typeToken) { public <T, R> RpcResult<R> rpc(String topic, T v, TypeToken<R> typeToken) {
return rpc(topic, v, typeToken, 0); return rpc(topic, v, typeToken, 0);
} }
@Comment("rpc call") // rpc call
public <T, R> RpcResult<R> rpc(String topic, T v, TypeToken<R> typeToken, long timeout) { public <T, R> RpcResult<R> rpc(String topic, T v, TypeToken<R> typeToken, long timeout) {
Rpc rpc = new Rpc<>(APP_NAME, topic, v); Rpc rpc = new Rpc<>(appid, topic, v);
rpc.setTypeToken(typeToken); rpc.setTypeToken(typeToken);
String ruk = rpc.getRuk(); String ruk = rpc.getRuk();
@@ -629,9 +588,9 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
return; return;
} }
RpcResult rpcResult = rpc.render(505, "请求超时"); RpcResult rpcResult = rpc.retError(505, "请求超时");
rpc.setRpcResult(rpcResult); rpc.setRpcResult(rpcResult);
logger.warning("rpc timeout: " + convert.convertTo(rpc)); logger.warning("rpc timeout: " + gson.toJson(rpc));
rpc.notify(); rpc.notify();
} }
}, timeout); }, timeout);
@@ -642,7 +601,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
// call error // call error
RpcResult rpcResult = rpc.render(501, "请求失败"); RpcResult rpcResult = rpc.retError(501, "请求失败");
rpc.setRpcResult(rpcResult); rpc.setRpcResult(rpcResult);
} }
return rpc.getRpcResult(); return rpc.getRpcResult();
@@ -665,7 +624,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
} }
// RpcResult: {ruk:xxx-xxxx, retcode:0} // RpcResult: {ruk:xxx-xxxx, retcode:0}
@Comment("rpc call back consumer") // rpc call back consumer
private <T> void rpcAccept(T value) { private <T> void rpcAccept(T value) {
// 接收到 本地调用返回的 RpcResult // 接收到 本地调用返回的 RpcResult
if (value instanceof RpcResult) { if (value instanceof RpcResult) {
@@ -675,10 +634,10 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
return; return;
} }
// 本地模式下返回的数据对象类型需要和处理端一致,不然会出现类型转换异常 - 解决办法,当出现不一致的情况取数据做转换 // TODO, 本地模式下返回的数据对象类型需要和处理端一致,不然会出现类型转换异常 - 解决办法,当出现不一致的情况取数据做转换
TypeToken typeToken = rpc.getTypeToken(); TypeToken typeToken = rpc.getTypeToken();
if (typeToken.getType() != ((RpcResult<?>) value).getResult().getClass()) { if (typeToken.getType() != ((RpcResult<?>) value).getResult().getClass()) {
Object result = convert.convertFrom(typeToken.getType(), toStr(((RpcResult<?>) value).getResult())); Object result = gson.fromJson(toStr(((RpcResult<?>) value).getResult()), typeToken.getType());
((RpcResult<Object>) value).setResult(result); ((RpcResult<Object>) value).setResult(result);
} }
@@ -689,8 +648,8 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
return; return;
} }
RpcResult resp = convert.convertFrom(new TypeToken<RpcResult<String>>() { RpcResult resp = gson.fromJson((String) value, new TypeToken<RpcResult<String>>() {
}.getType(), (String) value); }.getType());
String ruk = resp.getRuk(); String ruk = resp.getRuk();
Rpc rpc = rpcMap.remove(ruk); Rpc rpc = rpcMap.remove(ruk);
@@ -701,7 +660,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
Object result = resp.getResult(); Object result = resp.getResult();
if (result != null && typeToken != null && !"java.lang.String".equals(typeToken.getType().getTypeName()) && !"java.lang.Void".equals(typeToken.getType().getTypeName())) { if (result != null && typeToken != null && !"java.lang.String".equals(typeToken.getType().getTypeName()) && !"java.lang.Void".equals(typeToken.getType().getTypeName())) {
result = convert.convertFrom(typeToken.getType(), (String) resp.getResult()); result = gson.fromJson((String) resp.getResult(), typeToken.getType());
} }
resp.setResult(result); resp.setResult(result);
@@ -714,31 +673,31 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
// -- 订阅端 -- // -- 订阅端 --
private Set<String> rpcTopics = new HashSet(); private Set<String> rpcTopics = new HashSet();
@Comment("rpc call consumer") // rpc call consumer
public <R> void rpcSubscribe(String topic, Function<Rpc<String>, RpcResult<R>> fun) { public <R> void rpcSubscribe(String topic, Function<Rpc<String>, RpcResult<R>> fun) {
rpcSubscribe(topic, IType.STRING, fun); rpcSubscribe(topic, IType.STRING, fun);
} }
@Comment("rpc call consumer") // rpc call consumer
public <T, R> void rpcSubscribe(String topic, TypeToken<T> typeToken, Function<Rpc<T>, RpcResult<R>> fun) { public <T, R> void rpcSubscribe(String topic, TypeToken<T> typeToken, Function<Rpc<T>, RpcResult<R>> fun) {
Consumer<T> consumer = v -> { Consumer<T> consumer = v -> {
Rpc<T> rpc = null; Rpc<T> rpc = null;
try { try {
if (v instanceof String) { if (v instanceof String) {
rpc = convert.convertFrom(new TypeToken<Rpc<String>>() { rpc = gson.fromJson((String) v, new TypeToken<Rpc<String>>() {
}.getType(), (String) v); }.getType());
} else { } else {
rpc = (Rpc<T>) v; rpc = (Rpc<T>) v;
} }
// 参数转换 // 参数转换
if (rpc.getValue() instanceof String && !"java.lang.String".equals(typeToken.getType().getTypeName())) { if (rpc.getValue() instanceof String && !"java.lang.String".equals(typeToken.getType().getTypeName())) {
T paras = convert.convertFrom(typeToken.getType(), (String) rpc.getValue()); T paras = gson.fromJson((String) rpc.getValue(), typeToken.getType());
rpc.setValue(paras); rpc.setValue(paras);
} }
RpcResult result = fun.apply(rpc); RpcResult result = fun.apply(rpc);
if (APP_NAME.equals(rpc.getBackTopic())) { if (appid.equals(rpc.getBackTopic())) {
rpcBackQueue.add(Event.of(topic, result)); rpcBackQueue.add(Event.of(topic, result));
} else { } else {
result.setResult(toStr(result.getResult())); // 远程模式 结果转换 result.setResult(toStr(result.getResult())); // 远程模式 结果转换
@@ -754,4 +713,10 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
rpcTopics.add(topic); rpcTopics.add(topic);
subscribe(topic, typeToken, consumer); subscribe(topic, typeToken, consumer);
} }
public static void main(String[] args) {
System.out.println(IType.INT.getType());
Integer a = 1;
System.out.println(a.getClass());
}
} }

View File

@@ -1,22 +1,19 @@
package dev.zhub.timer; package dev.zhub.timer;
import dev.zhub.timer.scheduled.ScheduledCycle; import dev.zhub.timer.scheduled.ScheduledCycle;
import org.redkale.util.Utility;
import java.util.UUID;
import java.util.function.Supplier; import java.util.function.Supplier;
public class Timers { public class Timers {
private static TimerExecutor timerExecutor = new TimerExecutor(1); private static final TimerExecutor timerExecutor = new TimerExecutor(1);
/** /**
* 本地延时重试 * 本地延时重试
* @param supplier
* @param millis
* @param maxCount
*/ */
public static void tryDelay(Supplier<Boolean> supplier, long millis, int maxCount) { public static void tryDelay(Supplier<Boolean> supplier, long millis, int maxCount) {
timerExecutor.add(TimerTask.by("try-delay-task-" + Utility.uuid(), ScheduledCycle.of(0), task -> { timerExecutor.add(TimerTask.by("try-delay-task-" + UUID.randomUUID().toString().replaceAll("-", ""), ScheduledCycle.of(0), task -> {
if (supplier.get() || task.getExecCount() == maxCount) { if (supplier.get() || task.getExecCount() == maxCount) {
task.setComplete(true); task.setComplete(true);
} }
@@ -31,11 +28,9 @@ public class Timers {
/** /**
* 本地延时:延时时间极短的场景下使用 1分钟内 * 本地延时:延时时间极短的场景下使用 1分钟内
* @param runnable
* @param millis
*/ */
public static void delay(Runnable runnable, long millis) { public static void delay(Runnable runnable, long millis) {
timerExecutor.add(TimerTask.by("delay-task-" + Utility.uuid(), ScheduledCycle.of(millis), task -> { timerExecutor.add(TimerTask.by("delay-task-" + UUID.randomUUID().toString().replaceAll("-", ""), ScheduledCycle.of(millis), task -> {
runnable.run(); runnable.run();
task.setComplete(true); task.setComplete(true);
})); }));

View File

@@ -63,6 +63,7 @@ public interface Task extends Runnable {
/** /**
* 得到总执行次数 * 得到总执行次数
*
* @return * @return
*/ */
int getExecCount(); int getExecCount();

View File

@@ -1 +0,0 @@
org.redkalex.cache.redis.RedisCacheSourceProvider

View File

@@ -0,0 +1,7 @@
# zhub 配置
zhub:
appid: local_api
addr: 127.0.0.1:1216
groupid: hub-api
auth: token-12345

108
test/HelloService.java Normal file
View File

@@ -0,0 +1,108 @@
import org.junit.Before;
import org.junit.Test;
import net.tccn.IType;
import net.tccn.zhub.Lock;
import net.tccn.zhub.ZHubClient;
// @RestService(automapping = true)
public class HelloService {
// @Resource(name = "zhub")
private ZHubClient zhub;
@Before
public void init() {
//zhub = new ZHubClient("127.0.0.1:1216", "g-dev", "DEV-LOCAL", "zchd@123456");
zhub = new ZHubClient("127.0.0.1:1216", "g-dev", "DEV-LOCAL", "token-12345");
zhub.subscribe("tv:test", x -> {
System.out.println(x);
});
Lock lock = zhub.tryLock("lock-a", 5);
System.out.println("lock-1: " + lock.success());
//zhub.init(Kv.of("host", "47.111.150.118", "port", "6066", "groupid", "g-dev", "appname", "DEV-LOCAL"));
// Function<Rpc<T>, RpcResult<R>> fun
/*zhub.rpcSubscribe("x", new TypeToken<String>() {
}, r -> {
return r.buildResp(Map.of("v", r.getValue().toUpperCase() + ": Ok"));
});
zhub.subscribe("sport:reqtime", x -> {
//System.out.println(x);
});
zhub.subscribe("abx", x -> {
System.out.println(x);
});
try {
Thread.sleep(010);
} catch (InterruptedException e) {
e.printStackTrace();
}
zhub.delay("sport:reqtime", "别✈人家的✦女娃子❤🤞🏻", 0);
zhub.delay("sport:reqtime", "别人家的女娃子➾🤞🏻", 0);
zhub.delay("sport:reqtime", "❤别人家✉<E5AEB6>的女娃子❤🤞🏻", 0);*/
/*zhub.delay("sport:reqtime", "中文特殊符号:『』 £ ♀ ‖ 「」\n" +
"英文:# + = & ﹉ .. ^ \"\" ·{ } % ' €\n" +
"数学:+× ° ± ℃ ㎡ ∑ ≥ ∫ ㏄ ⊥ ≯ ∠ ∴ ∈ ∧ ∵ ≮ ㎝ ㏑ ≌ ㎞ № § ℉ ÷ ‰ ㎎ ㎏ ㎜ ㏒ ⊙ ∮ ∝ ∞ º ¹ ² ³ ½ ¾ ¼ ≈ ≡ ≠ ≤ ≦ ≧ ∽ ∷ ∏ ∩ ⌒ √Ψ ¤ ‖ ¶\n" +
"特殊:♤ ♧ ♡ ♢ ♪ ♬ ♭ ✔ ✘ ♞ ♟ ↪ ↣ ♚ ♛ ♝ ☞ ☜ ⇔ ☆ ★ □ ■ ○ ● △ ▲ ▽ ▼ ◇ ◆ ♀ ♂ ※ ↓ ↑ ↔ ↖ ↙ ↗ ↘ ← → ♣ ♠ ♥ ◎ ◣ ◢ ◤ ◥ 卍 ℡ ⊙ ㊣ ® © ™ ㈱ 囍\n" +
"序号:①②③④⑤⑥⑦⑧⑨⑩㈠㈡㈢㈣㈤㈥㈦㈧㈨㈩⑴ ⑵ ⑶ ⑷ ⑸ ⑹ ⑺ ⑻ ⑼ ⑽ ⒈ ⒉ ⒊ ⒋ ⒌ ⒍ ⒎ ⒏ ⒐ ⒑ Ⅱ Ⅲ Ⅳ Ⅵ Ⅶ Ⅷ ⅨⅩ\n" +
"日文:アイウエオァィゥェォカキクケコガギグゲゴサシスセソザジズゼゾタチツテトダヂヅデドッナニヌネノハヒフヘホバビブベボパピプペポマミムメモャヤュユョラリヨルレロワヰヱヲンヴヵヶヽヾ゛゜ー、。「「あいうえおぁぃぅぇぉかきくけこがぎぐげごさしすせそざじずぜぞたちつてでどっなにぬねのはひふへ」」ほばびぶべぼぱぴぷぺぽまみむめもやゆよゃゅょらりるれろわをんゎ゛゜ー、。「」\n" +
"部首:犭 凵 巛 冖 氵 廴 讠 亻 钅 宀 亠 忄 辶 弋 饣 刂 阝 冫 卩 疒 艹 疋 豸 冂 匸 扌 丬 屮衤 礻 勹 彳 彡", 0);*/
}
@Test
public void rpcTest() {
//RpcResult<String> rpc = zhub.rpc("wx:users", Map.of("appId", "wxa554ec3ab3bf1fc7"), IConsumer.TYPE_TOKEN_STRING);
//RpcResult<String> rpc = zhub.rpc("a", "fa", IConsumer.TYPE_TOKEN_STRING);
zhub.publish("tv:test", "hello ym!");
zhub.subscribe("tv:abx", x -> {
System.out.println(x);
});
zhub.rpcSubscribe("rpc-x", IType.STRING, x -> {
return x.render(x.getValue().toUpperCase());
});
Lock lock = zhub.tryLock("lock-a", 5);
System.out.println("lock-2: " + lock.success());
try {
Thread.sleep(5 * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Lock lock2 = zhub.tryLock("lock-a", 5);
System.out.println("lock-3: " + lock2.success());
/*try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}*/
Lock lock3 = zhub.tryLock("lock-a", 5);
System.out.println("lock-4: " + lock3.success());
try {
Thread.sleep(3000 * 30000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public void lockTest() {
}
/*RpcResult<FileToken> x = zhub.rpc("rpc:file:up-token", Map.of(), new TypeToken<>() {
});*/
}

View File

@@ -1,412 +0,0 @@
package net.tccn.mq;
import net.tccn.Event;
import dev.zhub.timer.Timers;
import org.junit.Test;
import org.redkale.convert.json.JsonConvert;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* 消息发布订阅测试
*/
public class AppTest {
Logger logger = Logger.getLogger("");
@Test
public void runConsumer() {
try {
// String str = ", response = {\"success\":true,\"retcode\":0,\"result\":{\"age\":0,\"explevel\":1,\"face\":\"https://aimg.woaihaoyouxi.com/haogame/202106/pic/20210629095545FmGt-v9NYqyNZ_Q6_y3zM_RMrDgd.jpg\",\"followed\":0,\"gender\":0,\"idenstatus\":0,\"matchcatelist\":[{\"catename\":\"足球\",\"catepic\":\"https://aimg.woaihaoyouxi.com/haogame/202107/pic/20210714103556FoG5ICf_7BFx6Idyo3TYpJQ7tmfG.png\",\"matchcateid\":1},{\"catename\":\"篮球\",\"catepic\":\"https://aimg.woaihaoyouxi.com/haogame/202107/pic/20210714103636FklsXTn1f6Jlsam8Jk-yFB7Upo3C.png\",\"matchcateid\":2}],\"matchcates\":\"2,1\",\"mobile\":\"18515190967\",\"regtime\":1624931714781,\"sessionid\":\"d1fc447753bd4700ad29674a753030fa\",\"status\":10,\"userid\":100463,\"username\":\"绝尘\",\"userno\":100463}}";
String str = "hello你好";
System.out.println(str.length());
/*consumer.timer("a", () -> {
System.out.println(Utility.now() + " timer a 执行了");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
consumer.timer("b", () -> {
System.out.println(Utility.now() + " ----------------- timer b 执行了");
});
//consumer.delay("a", "1", 200);
consumer.delay("a", "1", "2000");*/
/*Consumer<String> con = x -> {
logger.info("--->开始申请锁:" + System.currentTimeMillis());
Lock lock = consumer.tryLock("a", 20);
logger.info("===>成功申请锁:" + System.currentTimeMillis());
for (int i = 0; i < 20; i++) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(x + ":" + i);
}
lock.unLock();
};
new Thread(() -> con.accept("x")).start();
new Thread(() -> con.accept("y")).start();
new Thread(() -> con.accept("z")).start();*/
Thread.sleep(60_000 * 60);
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void runProducer() {
}
private static LinkedBlockingQueue<String> queue = new LinkedBlockingQueue();
@Test
public void t() {
List<String> list = new ArrayList<>();
list.toArray(String[]::new);
new Thread(() -> {
while (true) {
System.out.println("accept:");
String peek = null;
try {
System.out.println(!queue.isEmpty());
peek = queue.poll(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(peek);
}
}).start();
for (int i = 0; i < 10; i++) {
try {
queue.put(i + "");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
System.out.println("---");
Thread.sleep(1000 * 5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test
public void xx() {
Function<String, String> fun = x -> {
return x.toUpperCase();
};
System.out.println(fun.toString());
}
// (27+5*23)/(63-59)
// [27+5*23] [/] [63-59]
// [27] + [5*23] [/] [63-59]
/**
* 1. 按照优先级逐一拆解运算
* 括号, 乘除,加减
* 2. 逐一进行计算
*/
class C {
C A;
C B;
int c1; // 如果 A 只剩数字将c1 转为整数存放到 c1
int c2;
String x; // + - * /
}
@Test
public void x() {
// (27+5*23)/(63-59)
String str = "(27+5*23)/(63-59)";
str = "27+5*23";
str = "258/((35+17)/(5*3+18-29)+3)(138-134)*41-6+10+24";
str = "5*3+18-29";
//System.out.println("258/((35+17)/(5*3+18-29)+3)(138-134)*41-6+10+24".replaceAll("\\)\\(", ")*("));
List<String> parse = parse(str.replaceAll("\\)\\(", ")\\*("));
System.out.println(c(str));
}
// 因式分解:括号 -> 乘除 -> 加减
public List<String> parse(String str) {
// 找到一对
/*
258/()()
[258, /, (35+17)/(5*3+18-29)+3, 138-134, *, 41, -, 6, +, 10, +, 24]
*/
// 258/((35+17)/(5*3+18-29)+3)(138-134)*41-6+10+24
//str = "258 / (35+17)/(5*3+18-29)+3 138-134, * 41-6+10+24";
String[] strArr = str.split("");
// 一级括号、加、减、乘、除分解
List<String> arr = new ArrayList<>();
String tmp = "";
int n1 = 0; // 括号层级开始
int m1 = 0; // 括号层级结尾
for (String s : strArr) {
if (n1 > 0) { // 一级括号分解
if (")".equals(s) && (++m1) == n1) { // 一级括号结束
arr.add(tmp);
tmp = "";
n1 = 0;
m1 = 0;
} else {
if ("(".equals(s)) {
n1++;
}
tmp += s;
}
} else { // 无括号
if ("+".equals(s) || "-".equals(s) || "*".equals(s) || "/".equals(s)) {
if (!"".equals(tmp)) {
arr.add(tmp);
}
arr.add(s);
tmp = "";
} else if ("(".equals(s)) {
n1 = 1;
} else {
tmp += s;
}
}
}
if (!"".equals(tmp)) {
arr.add(tmp);
}
return arr;
}
public int c(String str) {
List<String> arr = parse(str); // 预期 length >= 3 基数的基数,如:[27+5*23, /, 63-59], [60, /, 2, *, 164-23*7]
System.out.println(arr);
if (arr == null || arr.size() < 3) {
return -1; // 错误码-1错误的计算式
}
List<String> _arr = new ArrayList<>();
// 按照优先级做合并计算 乘除优先
// 乘除
for (int i = 1; i < arr.size() - 1; i += 2) {
/*if ("*".equals(arr.get(i)) || "/".equals(arr.get(i))) {
if (_arr.size() > 0) {
_arr.remove(_arr.size() - 1);
}
int c = c(arr.get(i - 1), arr.get(i + 1), arr.get(i));
if (c < 0) {
return -1;
}
_arr.add(c + "");
} else {
_arr.add(arr.get(i - 1));
_arr.add(arr.get(i));
_arr.add(arr.get(i + 1));
}*/
if ("*".equals(arr.get(i)) || "/".equals(arr.get(i))) {
int c = c(arr.get(i - 1), arr.get(i + 1), arr.get(i));
if (c < 0) {
return c;
}
_arr.add(c + "");
} else {
}
}
if (_arr.size() == 1) { // 通过第一轮的 乘除计算,完成结果合并
return Integer.parseInt(_arr.get(0));
}
int c = 0;
for (int i = 1; i < _arr.size(); i += 2) {
int _c = c(_arr.get(i - 1), _arr.get(i + 1), _arr.get(i));
if (_c < 0) {
return _c;
}
c += _c;
}
return c;
}
public int c(String a, String b, String x) {
int _a = 0;
if (a.contains("(") || a.contains("+") || a.contains("-") || a.contains("*") || a.contains("/")) {
_a = c(a);
} else { // 预期 无 ( + - * / 的结果为标准数字
_a = Integer.parseInt(a);
}
int _b = 0;
if (b.contains("(") || b.contains("+") || b.contains("-") || b.contains("*") || b.contains("/")) {
_b = c(b);
} else { // 预期 无 ( + - * / 的结果为标准数字
_b = Integer.parseInt(b);
}
// 如果出现负数(错误码)直接返回对应的负数
if (_a < 0) {
return _a;
}
if (_b < 0) {
return _b;
}
// 定义错误标识: -1错误的计算式-2除不尽-3除数为0-4大于200,
if ("+".equals(x)) {
return _a + _b;
} else if ("-".equals(x)) {
return _a - _b;
} else if ("*".equals(x)) {
return _a * _b;
} else if ("/".equals(x)) {
return _a % _b > 0 ? -2 : _a / _b; // 除不尽
}
return 0;
}
@Test
public void testxx() {
Event of = Event.of("A", Map.of("b", 1));
System.out.println(JsonConvert.root().convertTo(of));
String str = "❦别人家的女娃子🤞🏻ꚢ";
/*
System.out.println("别人家的女娃子🤞🏻".length());*/
System.out.println(strLength(str));
System.out.println(getWordCount(str));
/*try {
System.out.println("别人家的女娃子🤞🏻".getBytes("UTF-8").length);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
System.out.println("系统默认编码方式:" + System.getProperty("file.encoding"));*/
}
public static int strLength(String value) {
int valueLength = 0;
String chinese = "[\u4e00-\u9fa5]";
for (int i = 0; i < value.length(); i++) {
String temp = value.substring(i, i + 1);
if (temp.matches(chinese)) {
valueLength += 2;
} else {
valueLength += 1;
}
}
return valueLength;
}
public int getWordCount(String str) {
str = str.replaceAll("[^\\x00-\\xff]", "*");
return str.length();
}
@Test
public void delay() {
DelayQueue<com.zdemo.zhub.Delays> delayQueue = new DelayQueue<>();
logger.info("加入延时任务1");
delayQueue.add(new com.zdemo.zhub.Delays(5000, () -> {
logger.info("任务1 延时任务执行了!");
}));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info("加入延时任务2");
delayQueue.add(new com.zdemo.zhub.Delays(5000, () -> {
logger.info("任务2 延时任务执行了!");
}));
try {
while (true) {
com.zdemo.zhub.Delays delay = delayQueue.take();
delay.run();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test
public void regTest() {
// 按指定模式在字符串查找
String line = "This order was placed for QT3000! OK?";
String pattern = "(\\D*)(\\d+)(.*)";
// 创建 Pattern 对象
Pattern r = Pattern.compile(pattern);
// 现在创建 matcher 对象
Matcher m = r.matcher(line);
if (m.find()) {
System.out.println("Found value: " + m.group(0));
System.out.println("Found value: " + m.group(1));
System.out.println("Found value: " + m.group(2));
System.out.println("Found value: " + m.group(3));
} else {
System.out.println("NO MATCH");
}
}
@Test
public void timersTest() {
Timers.tryDelay(() -> {
logger.info("xx:" + System.currentTimeMillis());
return true;
}, 1000, 5);
Timers.delay(() -> {
System.out.println("11");
}, 3000);
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

View File

@@ -1,72 +0,0 @@
package com.zdemo.zhub;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.logging.Logger;
public class Delays implements Delayed, Runnable {
public Logger logger = Logger.getLogger(Delays.class.getSimpleName());
private long time; // 执行时间
private Runnable runnable; // 任务到时间执行 runnable
public Delays(long timeout, Runnable runnable) {
this.time = System.currentTimeMillis() + timeout;
this.runnable = runnable;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(time - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed other) {
if (other == this) { // compare zero ONLY if same object
return 0;
}
if (other instanceof Delays) {
Delays x = (Delays) other;
long diff = time - x.time;
if (diff < 0) {
return -1;
} else if (diff > 0) {
return 1;
}
}
long d = (getDelay(TimeUnit.NANOSECONDS) -
other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}
@Override
public void run() {
runnable.run();
}
// ===========
public static DelayQueue<Delays> delayQueue = new DelayQueue<>();
public static void addDelay(long timeout, Runnable runnable) {
delayQueue.add(new Delays(timeout, runnable));
}
public static void tryDelay(Supplier<Boolean> supplier, long delayMillis, int maxCount) {
}
static {
new Thread(() -> {
try {
while (true) {
Delays delay = delayQueue.take();
delay.run(); //异常会导致延时队列失败
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}

View File

@@ -1,244 +0,0 @@
package net.tccn.mq;
import net.tccn.IType;
import net.tccn.zhub.RpcResult;
import net.tccn.zhub.ZHubClient;
import org.redkale.net.http.RestMapping;
import org.redkale.net.http.RestService;
import org.redkale.service.Service;
import org.redkale.util.AnyValue;
import org.redkale.util.TypeToken;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@RestService(automapping = true, name = "hello")
public class HelloService implements Service {
@Resource(name = "hub")
private ZHubClient zhub;
/*@Resource(name = "hubx")
private ZHubClient zhubx;
@Resource(name = "vvvvhub2")
private ZHubClient zhub2;*/
//private dev.zhub.client.ZHubClient zhubx = null;
@Override
public void init(AnyValue config) {
/*CompletableFuture.runAsync(() -> {
zhubx = new dev.zhub.client.ZHubClient("127.0.0.1", 1216, "g-dev", "DEV-LOCAL");
//zhubx = new dev.zhub.client.ZHubClient("47.111.150.118", 6066, "g-dev", "DEV-LOCAL");
});*/
// Function<Rpc<T>, RpcResult<R>> fun
/*zhub.rpcSubscribe("x", new TypeToken<String>() {
}, r -> {
return r.buildResp(Map.of("v", r.getValue().toUpperCase() + ": Ok"));
});*/
zhub.rpcSubscribe("y", new TypeToken<String>() {
}, r -> {
return r.render(Map.of("v", r.getValue().toUpperCase() + ": Ok"));
});
zhub.subscribe("sport:reqtime", x -> {
System.out.println(x);
});
zhub.subscribe("abx1", x -> {
System.out.println(x);
});
try {
Thread.sleep(010);
} catch (InterruptedException e) {
e.printStackTrace();
}
/*zhub.delay("sport:reqtime", "别✈人家的✦女娃子❤🤞🏻", 0);
zhub.delay("sport:reqtime", "别人家的女娃子➾🤞🏻", 0);
zhub.delay("sport:reqtime", "❤别人家✉<E5AEB6>的女娃子❤🤞🏻", 0);
zhub.delay("sport:reqtime", "中文特殊符号:『』 £ ♀ ‖ 「」\n" +
"英文:# + = & ﹉ .. ^ \"\" ·{ } % ' €\n" +
"数学:+× ° ± ℃ ㎡ ∑ ≥ ∫ ㏄ ⊥ ≯ ∠ ∴ ∈ ∧ ∵ ≮ ㎝ ㏑ ≌ ㎞ № § ℉ ÷ ‰ ㎎ ㎏ ㎜ ㏒ ⊙ ∮ ∝ ∞ º ¹ ² ³ ½ ¾ ¼ ≈ ≡ ≠ ≤ ≦ ≧ ∽ ∷ ∏ ∩ ⌒ √Ψ ¤ ‖ ¶\n" +
"特殊:♤ ♧ ♡ ♢ ♪ ♬ ♭ ✔ ✘ ♞ ♟ ↪ ↣ ♚ ♛ ♝ ☞ ☜ ⇔ ☆ ★ □ ■ ○ ● △ ▲ ▽ ▼ ◇ ◆ ♀ ♂ ※ ↓ ↑ ↔ ↖ ↙ ↗ ↘ ← → ♣ ♠ ♥ ◎ ◣ ◢ ◤ ◥ 卍 ℡ ⊙ ㊣ ® © ™ ㈱ 囍\n" +
"序号:①②③④⑤⑥⑦⑧⑨⑩㈠㈡㈢㈣㈤㈥㈦㈧㈨㈩⑴ ⑵ ⑶ ⑷ ⑸ ⑹ ⑺ ⑻ ⑼ ⑽ ⒈ ⒉ ⒊ ⒋ ⒌ ⒍ ⒎ ⒏ ⒐ ⒑ Ⅱ Ⅲ Ⅳ Ⅵ Ⅶ Ⅷ ⅨⅩ\n" +
"日文:アイウエオァィゥェォカキクケコガギグゲゴサシスセソザジズゼゾタチツテトダヂヅデドッナニヌネノハヒフヘホバビブベボパピプペポマミムメモャヤュユョラリヨルレロワヰヱヲンヴヵヶヽヾ゛゜ー、。「「あいうえおぁぃぅぇぉかきくけこがぎぐげごさしすせそざじずぜぞたちつてでどっなにぬねのはひふへ」」ほばびぶべぼぱぴぷぺぽまみむめもやゆよゃゅょらりるれろわをんゎ゛゜ー、。「」\n" +
"部首:犭 凵 巛 冖 氵 廴 讠 亻 钅 宀 亠 忄 辶 弋 饣 刂 阝 冫 卩 疒 艹 疋 豸 冂 匸 扌 丬 屮衤 礻 勹 彳 彡", 0);
*/
}
@RestMapping
public RpcResult x(String v) {
if (v == null) {
v = "";
}
List<CompletableFuture> list = new ArrayList();
for (int i = 0; i < 100; i++) {
long start = System.currentTimeMillis();
/*RpcResult<FileToken> x = zhub.rpc("rpc:file:up-token", Map.of(), new TypeToken<>() {
});*/
/*list.add(zhub.rpcAsync("x", v + i, new TypeToken<>() {
}));*/
zhub.publish("x", v + i);
System.out.println("time: " + (System.currentTimeMillis() - start) + " ms");
//System.out.println(x.getResult().get("v"));
}
return zhub.rpc("x", v, IType.STRING);
}
@RestMapping
public RpcResult<String> d(String v) {
RpcResult<String> rpc = zhub.rpc("x", v, IType.STRING);
return rpc;
}
@RestMapping
public String y(String v) {
if (v == null) {
v = "";
}
for (int i = 0; i < 100; i++) {
long start = System.currentTimeMillis();
/*RpcResult<FileToken> x = zhub.rpc("rpc:file:up-token", Map.of(), new TypeToken<>() {
});*/
/*RpcResult<Object> x = zhubx.rpc("y", v + i, new com.google.gson.reflect.TypeToken<>() {
});*/
System.out.println("time: " + (System.currentTimeMillis() - start) + " ms");
//System.out.println(x.getResult());
}
return "ok";
}
@RestMapping(name = "send")
public String send() {
zhub.publish("abx1", 1);
return "ok";
}
public static void main(String[] args) {
// "\"别人家的女娃子\uD83E\uDD1E\uD83C\uDFFB\""
/*String s = "别人家的女娃子\uD83E\uDD1E\uD83C\uDFFB";
System.out.println("别人家的女娃子🤞🏻".length());
byte[] bytes = "别人家的女娃子🤞🏻".getBytes();
System.out.println(bytes.length);
System.out.println(unicodeToUtf8(s));
System.out.println(utf8ToUnicode("别人家的女娃子🤞🏻"));
*/
//ExecutorService pool = Executors.newFixedThreadPool(5);
}
public static String utf8ToUnicode(String inStr) {
char[] myBuffer = inStr.toCharArray();
StringBuffer sb = new StringBuffer();
for (int i = 0; i < inStr.length(); i++) {
Character.UnicodeBlock ub = Character.UnicodeBlock.of(myBuffer[i]);
if (ub == Character.UnicodeBlock.BASIC_LATIN) {
//英文及数字等
sb.append(myBuffer[i]);
} else if (ub == Character.UnicodeBlock.HALFWIDTH_AND_FULLWIDTH_FORMS) {
//全角半角字符
int j = (int) myBuffer[i] - 65248;
sb.append((char) j);
} else {
//汉字
short s = (short) myBuffer[i];
String hexS = Integer.toHexString(s);
String unicode = "\\u" + hexS;
sb.append(unicode.toLowerCase());
}
}
return sb.toString();
}
public static String unicodeToUtf8(String theString) {
char aChar;
int len = theString.length();
StringBuffer outBuffer = new StringBuffer(len);
for (int x = 0; x < len; ) {
aChar = theString.charAt(x++);
if (aChar == '\\') {
aChar = theString.charAt(x++);
if (aChar == 'u') {
// Read the xxxx
int value = 0;
for (int i = 0; i < 4; i++) {
aChar = theString.charAt(x++);
switch (aChar) {
case '0':
case '1':
case '2':
case '3':
case '4':
case '5':
case '6':
case '7':
case '8':
case '9':
value = (value << 4) + aChar - '0';
break;
case 'a':
case 'b':
case 'c':
case 'd':
case 'e':
case 'f':
value = (value << 4) + 10 + aChar - 'a';
break;
case 'A':
case 'B':
case 'C':
case 'D':
case 'E':
case 'F':
value = (value << 4) + 10 + aChar - 'A';
break;
default:
throw new IllegalArgumentException(
"Malformed \\uxxxx encoding.");
}
}
outBuffer.append((char) value);
} else {
if (aChar == 't')
aChar = '\t';
else if (aChar == 'r')
aChar = '\r';
else if (aChar == 'n')
aChar = '\n';
else if (aChar == 'f')
aChar = '\f';
outBuffer.append(aChar);
}
} else
outBuffer.append(aChar);
}
return outBuffer.toString();
}
}