Compare commits

...

8 Commits

41 changed files with 292 additions and 3372 deletions

37
pom.xml
View File

@ -4,9 +4,9 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>net.tccn</groupId>
<groupId>dev.zhub</groupId>
<artifactId>zhub-client-redkale</artifactId>
<version>0.1.2-dev</version>
<version>0.1.1.dev</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
@ -14,30 +14,33 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.redkale</groupId>
<artifactId>redkale</artifactId>
<version>2.8.0-dev</version>
<scope>compile</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>maven-nexus</id>
<id>maven-release</id>
<name>maven-nexus</name>
<url>http://47.106.237.198:8081/repository/maven-public/</url>
<url>https://nexus.1216.top/repository/maven-public/</url>
</repository>
</repositories>
<distributionManagement>
<repository>
<id>mvn-release</id>
<name>mvn-release</name>
<url>http://47.106.237.198:8081/repository/maven-releases/</url>
<url>https://nexus.1216.top/repository/maven-releases/</url>
</repository>
</distributionManagement>
<dependencies>
<dependency>
<groupId>org.redkale</groupId>
<artifactId>redkale</artifactId>
<version>2.8.0.dev</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

View File

@ -1,4 +1,4 @@
package net.tccn;
package dev.zhub;
import org.redkale.convert.json.JsonConvert;
import org.redkale.util.Resourcable;
@ -19,7 +19,7 @@ public abstract class AbstractConsumer extends ZhubAgentProvider implements ICon
protected static String APP_NAME = "";
private Map<String, EventType> eventMap = new ConcurrentHashMap<>();
protected Map<String, EventType<?>> eventMap = new ConcurrentHashMap<>();
protected abstract String getGroupid();
@ -31,6 +31,7 @@ public abstract class AbstractConsumer extends ZhubAgentProvider implements ICon
return Set.of("-");
}
// topic 消息消费前处理
protected void accept(String topic, String value) {
EventType eventType = eventMap.get(topic);
@ -44,6 +45,12 @@ public abstract class AbstractConsumer extends ZhubAgentProvider implements ICon
eventType.accept(data);
}
// rpc 被调用端
protected <T> void rpcAccept(String topic, T value) {
EventType eventType = eventMap.get(topic);
eventType.accept(value);
}
protected final void removeEventType(String topic) {
eventMap.remove(topic);
}
@ -77,4 +84,13 @@ public abstract class AbstractConsumer extends ZhubAgentProvider implements ICon
public String resourceName() {
return super.getName();
}
protected String toStr(Object v) {
if (v instanceof String) {
return (String) v;
} else if (v == null) {
return null;
}
return convert.convertTo(v);
}
}

View File

@ -1,4 +1,4 @@
package net.tccn;
package dev.zhub;
/**
* 发布订阅 事件
@ -15,8 +15,8 @@ public class Event<V> {
this.value = value;
}
public static <V> Event of(String topic, V value) {
return new Event<V>(topic, value);
public static <V> Event<V> of(String topic, V value) {
return new Event<>(topic, value);
}

View File

@ -1,4 +1,4 @@
package net.tccn;
package dev.zhub;
import org.redkale.util.TypeToken;

View File

@ -1,4 +1,4 @@
package net.tccn;
package dev.zhub;
import org.redkale.util.TypeToken;

View File

@ -1,4 +1,4 @@
package net.tccn;
package dev.zhub;
import java.util.logging.Logger;

View File

@ -1,4 +1,4 @@
package net.tccn;
package dev.zhub;
import org.redkale.util.TypeToken;

View File

@ -1,5 +1,6 @@
package net.tccn;
package dev.zhub;
import org.junit.Test;
import org.redkale.net.AsyncIOGroup;
import org.redkale.util.AnyValue;
import org.redkale.util.ResourceFactory;
@ -12,9 +13,65 @@ public class RedisTest {
static MyRedisCacheSource source = new MyRedisCacheSource();
/**
* 3
*/
@Test
public void keyTest() {
source.set("a", 3);
System.out.println(source.get("a"));
source.del("a");
}
/**
* ax:false
* ax:true
* ax:false
*/
@Test
public void bitTest() {
boolean ax = source.getBit("ax", 6);
System.out.println("ax:"+ ax); // false
source.setBit("ax", 6, true);
ax = source.getBit("ax", 6);
System.out.println("ax:"+ ax); // true
source.setBit("ax", 6, false);
ax = source.getBit("ax", 6);
System.out.println("ax:"+ ax); // false
source.del("ax");
}
@Test
public void setTest() {
source.del("setx");
source.sadd("setx", int.class, 1, 2, 3, 5, 6);
int setx = source.spop("setx", int.class);
System.out.println(setx);
setx = source.spop("setx", int.class);
System.out.println(setx);
source.del("setx");
source.srem("setx", int.class,213, 2312);
/*//source.sadd("setx", list.toArray(Integer[]::new));
List<Integer> list = List.of(2, 3, 5);
// source.sadd("setx", list.toArray(Integer[]::new));
source.sadd("setx", list.toArray(Integer[]::new));
source.sadd("setx", 12, 2312, 213);
source.sadd("setx", List.of(1011, 10222));*/
}
static { // redis://:*Zhong9307!@47.111.150.118:6064?db=2
AnyValue.DefaultAnyValue conf = new AnyValue.DefaultAnyValue().addValue(CACHE_SOURCE_MAXCONNS, "1");
conf.addValue(CACHE_SOURCE_NODE, new AnyValue.DefaultAnyValue().addValue(CACHE_SOURCE_URL, "redis://:123456@127.0.0.1:6379?db=0"));
conf.addValue(CACHE_SOURCE_NODES, "redis://:123456@127.0.0.1:6379?db=0");
final ResourceFactory factory = ResourceFactory.create();
final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16);
asyncGroup.start();
@ -23,22 +80,6 @@ public class RedisTest {
//source.defaultConvert = JsonFactory.root().getConvert();
source.init(conf);
System.out.println(source.get("a"));
//--------------------- bit ------------------------------
/*boolean ax = source.getBit("ax", 6);
System.out.println("ax:"+ ax);
source.setBit("ax", 6, true);
ax = source.getBit("ax", 6);
System.out.println("ax:"+ ax);
source.setBit("ax", 6, false);
ax = source.getBit("ax", 6);
System.out.println("ax:"+ ax);*/
//--------------------- bit ------------------------------
//--------------------- bit ------------------------------
/*
source.lock("lockx", 5000);
@ -52,67 +93,15 @@ public class RedisTest {
int i = (short) 3;
});
source.keysStartsWith("districtbeans").forEach(x -> {
System.out.println(x);
source.del(x);
});
source.keysStartsWith("oss-blind-users").forEach(x -> {
System.out.println(x);
source.del(x);
});
source.keysStartsWith("venue:drama-product-schedule-fee:rt-lbeuai84:20221217").forEach(x -> {
System.out.println(x);
source.del(x);
});
source.keysStartsWith("venue:site-hour-fee:54f6e9b74cd7416db8d605366c1f49c2:20220729").forEach(x -> {
System.out.println(x);
source.del(x);
});
source.keysStartsWith("rainbow").forEach(x -> {
System.out.println(x);
source.del(x);
});
source.keysStartsWith("run").forEach(x -> {
System.out.println(x);
source.del(x);
});
source.keysStartsWith("today").forEach(x -> {
System.out.println(x);
source.del(x);
});
source.keysStartsWith("user").forEach(x -> {
System.out.println(x);
source.del(x);
});
source.keysStartsWith("im:").forEach(x -> {
System.out.println(x);
source.del(x);
});
source.keysStartsWith("ios-").forEach(x -> {
System.out.println(x);
source.del(x);
});
//--------------------- set ------------------------------
/*source.del("setx");
*//*
int[] ints = {1, 2, 3};
source.sadd("setx", ints);
*//*
/*
*/
/*
//source.sadd("setx", list.toArray(Integer[]::new));
List<Integer> list = List.of(2, 3, 5);
// source.sadd("setx", list.toArray(Integer[]::new));
source.sadd("setx", list.toArray(Integer[]::new));
source.sadd("setx", 12, 2312, 213);
source.sadd("setx", List.of(1011, 10222));
source.keys("setx*").forEach(x -> {
System.out.println(x);
});
source.srem("setx", 213, 2312);
Collection<String> setx1 = source.getCollection("setx", String.class);

View File

@ -1,4 +1,4 @@
package net.tccn;
package dev.zhub;
import org.redkale.boot.Application;
import org.redkale.boot.NodeServer;
@ -8,7 +8,6 @@ import org.redkale.service.Service;
import org.redkale.util.ResourceEvent;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@ -29,11 +28,6 @@ public abstract class ZhubAgentProvider extends ClusterAgent {
}
@Override
public CompletableFuture<Map<String, Set<InetSocketAddress>>> queryMqtpAddress(String protocol, String module, String resname) {
return null;
}
@Override
public CompletableFuture<Set<InetSocketAddress>> queryHttpAddress(String protocol, String module, String resname) {
return null;

View File

@ -1,6 +1,6 @@
package net.tccn;
package dev.zhub;
import net.tccn.zhub.ZHubClient;
import dev.zhub.client.ZHubClient;
import org.redkale.annotation.Priority;
import org.redkale.cluster.ClusterAgent;
import org.redkale.cluster.ClusterAgentProvider;

View File

@ -1,11 +1,12 @@
package net.tccn.zhub;
package dev.zhub.client;
// ================================================== lock ==================================================
public class Lock {
private String name;
private String uuid;
private int duration;
private ZHubClient hubClient;
protected String name;
protected String uuid;
protected int duration;
protected boolean success;
protected ZHubClient hubClient;
protected Lock(String name, String uuid, int duration, ZHubClient hubClient) {
this.name = name;
@ -17,4 +18,8 @@ public class Lock {
public void unLock() {
hubClient.send("unlock", name, uuid);
}
public boolean success() {
return success;
}
}

View File

@ -1,8 +1,9 @@
package net.tccn.zhub;
package dev.zhub.client;
import org.redkale.convert.ConvertColumn;
import org.redkale.convert.json.JsonConvert;
import org.redkale.service.RetResult;
import org.redkale.util.TypeToken;
import org.redkale.util.Utility;
public class Rpc<T> {
private String ruk; // request unique key:
@ -11,13 +12,15 @@ public class Rpc<T> {
private RpcResult rpcResult;
private TypeToken typeToken;
public Rpc() {
}
protected Rpc(String appname, String ruk, String topic, Object value) {
this.ruk = appname + "::" + ruk;
protected Rpc(String appname, String topic, T value) {
this.ruk = appname + "::" + Utility.uuid();
this.topic = topic;
this.value = (T) JsonConvert.root().convertTo(value);
this.value = value;
}
public String getRuk() {
@ -53,6 +56,16 @@ public class Rpc<T> {
this.rpcResult = rpcResult;
}
@ConvertColumn(ignore = true)
public TypeToken getTypeToken() {
return typeToken;
}
@ConvertColumn(ignore = true)
public void setTypeToken(TypeToken typeToken) {
this.typeToken = typeToken;
}
@ConvertColumn(ignore = true)
public String getBackTopic() {
return ruk.split("::")[0];

View File

@ -1,4 +1,4 @@
package net.tccn.zhub;
package dev.zhub.client;
public class RpcResult<R> {
private String ruk;

View File

@ -1,7 +1,7 @@
package net.tccn.zhub;
package dev.zhub.client;
import net.tccn.*;
import net.tccn.timer.Timers;
import dev.zhub.*;
import dev.zhub.timer.Timers;
import org.redkale.annotation.AutoLoad;
import org.redkale.annotation.ResourceType;
import org.redkale.service.Local;
@ -9,7 +9,6 @@ import org.redkale.service.Service;
import org.redkale.util.AnyValue;
import org.redkale.util.Comment;
import org.redkale.util.TypeToken;
import org.redkale.util.Utility;
import java.io.BufferedReader;
import java.io.IOException;
@ -18,10 +17,7 @@ import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Function;
@ -44,17 +40,11 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
private BufferedReader reader;
private final LinkedBlockingQueue<Timer> timerQueue = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<Event<String>> topicQueue = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<Event<String>> rpcBackQueue = new LinkedBlockingQueue<>(); // RPC BACK MSG
private final LinkedBlockingQueue<Event<String>> rpcCallQueue = new LinkedBlockingQueue<>(); // RPC CALL MSG
private final LinkedBlockingQueue<Event<String>> topicQueue = new LinkedBlockingQueue<>(); // [=> Object]
private final LinkedBlockingQueue<Event<Object>> rpcBackQueue = new LinkedBlockingQueue<>(); // RPC BACK MSG [=> Object]
private final LinkedBlockingQueue<Event<Object>> rpcCallQueue = new LinkedBlockingQueue<>(); // RPC CALL MSG [=> Object]
private final LinkedBlockingQueue<String> sendMsgQueue = new LinkedBlockingQueue<>(); // SEND MSG
/*private BiConsumer<Runnable, Integer> threadBuilder = (r, n) -> {
for (int i = 0; i < n; i++) {
new Thread(() -> r.run()).start();
}
};*/
private static Map<String, ZHubClient> mainHub = new HashMap<>(); // 127.0.0.1:1216 - ZHubClient
public ZHubClient() {
@ -109,12 +99,11 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
}
CompletableFuture.runAsync(() -> {
if (!initSocket(0)) {
return;
}
// 消息 事件接收
new Thread(() -> {
if (!initSocket(0)) {
return;
}
while (true) {
try {
String readLine = reader.readLine();
@ -148,7 +137,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
String value = "";
do {
if (value.length() > 0) {
if (!value.isEmpty()) {
value += "\r\n";
}
String s = reader.readLine();
@ -162,11 +151,24 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
Lock lock = lockTag.get(value);
if (lock != null) {
synchronized (lock) {
lock.success = true;
lock.notifyAll();
}
}
continue;
}
// trylock msg
if ("trylock".equals(topic)) {
Lock lock = lockTag.get(value);
if (lock != null) {
synchronized (lock) {
lock.success = false;
lock.notifyAll();
}
}
continue;
}
// rpc back msg
if (APP_NAME.equals(topic)) {
rpcBackQueue.add(Event.of(topic, value));
@ -215,9 +217,8 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
while (true) {
Timer timer = null;
try {
if ((timer = timerQueue.take()) == null) {
return;
}
timer = timerQueue.take();
long start = System.currentTimeMillis();
pool.submit(timer.runnable).get(5, TimeUnit.SECONDS);
long end = System.currentTimeMillis();
@ -239,9 +240,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
while (true) {
Event<String> event = null;
try {
if ((event = topicQueue.take()) == null) {
continue;
}
event = topicQueue.take();
String topic = event.topic;
String value = event.value;
@ -249,10 +248,10 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
} catch (InterruptedException e) {
e.printStackTrace();
} catch (TimeoutException e) {
logger.log(Level.SEVERE, "topic[" + event.topic + "] event deal time out: " + 5 + " S, value: " + event.value, e);
logger.log(Level.SEVERE, "topic[" + event.topic + "] event deal time out: " + 5 + " S, value: " + toStr(event.value), e);
pool = Executors.newFixedThreadPool(1);
} catch (Exception e) {
logger.log(Level.WARNING, "topic[" + event.topic + "] event accept error :" + event.value, e);
logger.log(Level.WARNING, "topic[" + event.topic + "] event accept error :" + toStr(event.value), e);
}
}
}).start();
@ -260,18 +259,16 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
// rpc back ,仅做数据解析暂无耗时监控
new Thread(() -> {
while (true) {
Event<String> event = null;
Event<Object> event = null;
try {
if ((event = rpcBackQueue.take()) == null) {
continue;
}
event = rpcBackQueue.take();
//if (event)
logger.finest(String.format("rpc-back:[%s]: %s", event.topic, event.value));
logger.finest(String.format("rpc-back:[%s]: %s", event.topic, toStr(event.value)));
rpcAccept(event.value);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
logger.log(Level.WARNING, "rpc-back[" + event.topic + "] event accept error :" + event.value, e);
logger.log(Level.WARNING, "rpc-back[" + event.topic + "] event accept error :" + toStr(event.value), e);
}
}
}).start();
@ -280,22 +277,21 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
new Thread(() -> {
ExecutorService pool = Executors.newFixedThreadPool(1);
while (true) {
Event<String> event = null;
Event<Object> event = null;
try {
if ((event = rpcCallQueue.take()) == null) {
continue;
}
logger.finest(String.format("rpc-call:[%s] %s", event.topic, event.value));
event = rpcCallQueue.take();
logger.finest(String.format("rpc-call:[%s] %s", event.topic, toStr(event.value)));
String topic = event.topic;
String value = event.value;
pool.submit(() -> accept(topic, value)).get(5, TimeUnit.SECONDS);
Object value = event.value;
pool.submit(() -> rpcAccept(topic, value)).get(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (TimeoutException e) {
logger.log(Level.SEVERE, "topic[" + event.topic + "] event deal time out: " + 5 + " S, value: " + event.value, e);
logger.log(Level.SEVERE, "topic[" + event.topic + "] event deal time out: " + 5 + " S, value: " + toStr(event.value), e);
pool = Executors.newFixedThreadPool(1);
} catch (Exception e) {
logger.log(Level.WARNING, "rpc-call[" + event.topic + "] event accept error :" + event.value, e);
logger.log(Level.WARNING, "rpc-call[" + event.topic + "] event accept error :" + toStr(event.value), e);
}
}
}).start();
@ -305,9 +301,8 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
while (true) {
String msg = null;
try {
if ((msg = sendMsgQueue.take()) == null) {
continue;
}
msg = sendMsgQueue.take();
// logger.log(Level.FINEST, "send-msg: [" + msg + "]");
writer.write(msg.getBytes());
writer.flush();
@ -392,15 +387,6 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
return str.length();
}
private String toStr(Object v) {
if (v instanceof String) {
return (String) v;
} else if (v == null) {
return null;
}
return convert.convertTo(v);
}
protected boolean initSocket(int retry) {
for (int i = 0; i <= retry; i++) {
try {
@ -422,7 +408,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
send("auth", auth);
send("groupid " + groupid);
StringBuffer buf = new StringBuffer("subscribe lock");
StringBuilder buf = new StringBuilder("subscribe lock trylock");
if (mainHub.containsValue(this)) {
buf.append(" " + APP_NAME);
}
@ -436,9 +422,9 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
send("timer", name);
});
if (retry > 0) {
logger.warning(String.format("ZHubClient[%s][%s] %s Succeed", getGroupid(), i + 1, retry > 0 ? "reconnection" : "init"));
logger.warning(String.format("ZHubClient[%s][%s] %s Succeed", getGroupid(), i + 1, "reconnection"));
} else {
logger.fine(String.format("ZHubClient[%s] %s Succeed", getGroupid(), retry > 0 ? "reconnection" : "init"));
logger.fine(String.format("ZHubClient[%s] %s Succeed", getGroupid(), "init"));
}
return true;
} catch (Exception e) {
@ -469,11 +455,15 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
}
public boolean publish(String topic, Object v) {
/*if (eventMap.containsKey(topic)) { // 本地调用
topicQueue.add(Event.of(topic, v));
return true;
}*/
return send("publish", topic, toStr(v));
}
public void broadcast(String topic, Object v) {
send("broadcast", topic, toStr(v));
send("broadcast", topic, toStr(v)); // 广播必须走远端模式
}
// 发送 publish 主题消息若多次发送的 topic + "-" + value 相同将会做延时重置
@ -527,32 +517,35 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
// ================================================== lock ==================================================
private Map<String, Lock> lockTag = new ConcurrentHashMap<>();
/**
* 尝试加锁立即返回
*
* @param key
* @param duration
* @return Lock: lock.success 锁定是否成功标识
*/
public Lock tryLock(String key, int duration) {
String uuid = Utility.uuid();
Lock lock = new Lock(key, uuid, duration, this);
lockTag.put(uuid, lock);
try {
// c.send("lock", key, uuid, strconv.Itoa(duration))
send("lock", key, uuid, String.valueOf(duration));
synchronized (lock) {
lock.wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return lock;
return lock("trylock", key, duration);
}
// 为替换 tryLock 方法做过度准确
public Lock lock(String key, int duration) {
String uuid = Utility.uuid();
return lock("lock", key, duration);
}
/**
* @param cmd lock|trylock
* @param key 加锁 key
* @param duration 锁定时长
* @return
*/
private Lock lock(String cmd, String key, int duration) {
String uuid = UUID.randomUUID().toString().replaceAll("-", "");
Lock lock = new Lock(key, uuid, duration, this);
lockTag.put(uuid, lock);
try {
// c.send("lock", key, uuid, strconv.Itoa(duration))
send("lock", key, uuid, String.valueOf(duration));
send(cmd, key, uuid, String.valueOf(duration));
synchronized (lock) {
lock.wait();
}
@ -599,7 +592,6 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
// ================================================== rpc ==================================================
// -- 调用端 --
private static Map<String, Rpc> rpcMap = new ConcurrentHashMap<>();
private static Map<String, TypeToken> rpcRetType = new ConcurrentHashMap<>();
@Comment("rpc call")
public RpcResult<Void> rpc(String topic, Object v) {
@ -613,14 +605,18 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
@Comment("rpc call")
public <T, R> RpcResult<R> rpc(String topic, T v, TypeToken<R> typeToken, long timeout) {
Rpc rpc = new Rpc<>(APP_NAME, Utility.uuid(), topic, v);
Rpc rpc = new Rpc<>(APP_NAME, topic, v);
rpc.setTypeToken(typeToken);
String ruk = rpc.getRuk();
rpcMap.put(ruk, rpc);
if (typeToken != null) {
rpcRetType.put(ruk, typeToken);
}
try {
publish(topic, rpc); // send("rpc", topic, toStr(rpc));
if (eventMap.containsKey(topic)) { // 本地调用
rpcCallQueue.add(Event.of(topic, rpc));
} else {
rpc.setValue(toStr(rpc.getValue()));
publish(topic, rpc); // send("rpc", topic, toStr(rpc));
}
synchronized (rpc) {
if (timeout <= 0) {
timeout = 1000 * 15;
@ -670,16 +666,38 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
// RpcResult: {ruk:xxx-xxxx, retcode:0}
@Comment("rpc call back consumer")
private void rpcAccept(String value) {
private <T> void rpcAccept(T value) {
// 接收到 本地调用返回的 RpcResult
if (value instanceof RpcResult) {
String ruk = ((RpcResult) value).getRuk();
Rpc rpc = rpcMap.remove(ruk);
if (rpc == null) {
return;
}
// 本地模式下返回的数据对象类型需要和处理端一致不然会出现类型转换异常 - 解决办法当出现不一致的情况取数据做转换
TypeToken typeToken = rpc.getTypeToken();
if (typeToken.getType() != ((RpcResult<?>) value).getResult().getClass()) {
Object result = convert.convertFrom(typeToken.getType(), toStr(((RpcResult<?>) value).getResult()));
((RpcResult<Object>) value).setResult(result);
}
rpc.setRpcResult((RpcResult) value);
synchronized (rpc) {
rpc.notify();
}
return;
}
RpcResult resp = convert.convertFrom(new TypeToken<RpcResult<String>>() {
}.getType(), value);
}.getType(), (String) value);
String ruk = resp.getRuk();
Rpc rpc = rpcMap.remove(ruk);
if (rpc == null) {
return;
}
TypeToken typeToken = rpcRetType.get(ruk);
TypeToken typeToken = rpc.getTypeToken();
Object result = resp.getResult();
if (result != null && typeToken != null && !"java.lang.String".equals(typeToken.getType().getTypeName()) && !"java.lang.Void".equals(typeToken.getType().getTypeName())) {
@ -703,18 +721,29 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
@Comment("rpc call consumer")
public <T, R> void rpcSubscribe(String topic, TypeToken<T> typeToken, Function<Rpc<T>, RpcResult<R>> fun) {
Consumer<String> consumer = v -> {
Consumer<T> consumer = v -> {
Rpc<T> rpc = null;
try {
rpc = convert.convertFrom(new TypeToken<Rpc<String>>() {
}.getType(), v);
if (v instanceof String) {
rpc = convert.convertFrom(new TypeToken<Rpc<String>>() {
}.getType(), (String) v);
} else {
rpc = (Rpc<T>) v;
}
// 参数转换
T paras = convert.convertFrom(typeToken.getType(), (String) rpc.getValue());
rpc.setValue(paras);
if (rpc.getValue() instanceof String && !"java.lang.String".equals(typeToken.getType().getTypeName())) {
T paras = convert.convertFrom(typeToken.getType(), (String) rpc.getValue());
rpc.setValue(paras);
}
RpcResult result = fun.apply(rpc);
result.setResult(toStr(result.getResult()));
publish(rpc.getBackTopic(), result);
if (APP_NAME.equals(rpc.getBackTopic())) {
rpcBackQueue.add(Event.of(topic, result));
} else {
result.setResult(toStr(result.getResult())); // 远程模式 结果转换
publish(rpc.getBackTopic(), result);
}
} catch (Exception e) {
logger.log(Level.WARNING, "rpc call consumer error: " + v, e);
publish(rpc.getBackTopic(), rpc.retError("服务调用失败!"));
@ -723,6 +752,6 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
};
rpcTopics.add(topic);
subscribe(topic, consumer);
subscribe(topic, typeToken, consumer);
}
}

View File

@ -1,7 +1,7 @@
package net.tccn.timer;
package dev.zhub.timer;
import net.tccn.timer.queue.TimerQueue;
import net.tccn.timer.task.Task;
import dev.zhub.timer.queue.TimerQueue;
import dev.zhub.timer.task.Task;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -24,7 +24,7 @@ public class TimerExecutor {
for (Task t : task) {
t.setTimerExecutor(this);
queue.push(t);
logger.finest("add new task : " + t.getName());
// logger.finest("add new task : " + t.getName());
}
}

View File

@ -1,8 +1,8 @@
package net.tccn.timer;
package dev.zhub.timer;
import net.tccn.timer.scheduled.Scheduled;
import net.tccn.timer.task.Job;
import net.tccn.timer.task.Task;
import dev.zhub.timer.scheduled.Scheduled;
import dev.zhub.timer.task.Job;
import dev.zhub.timer.task.Task;
import java.time.LocalDateTime;
import java.time.ZoneId;
@ -93,10 +93,10 @@ public class TimerTask implements Task {
if (!isComplete) {
int count = execCount.incrementAndGet(); // 执行次数+1
long start = System.currentTimeMillis();
// long start = System.currentTimeMillis();
job.execute(this);
long end = System.currentTimeMillis();
logger.finest(String.format("task [%s] : not complete -> %s, time: %s ms, exec count: %s.", getName(), isComplete ? "had complete" : "not complete", end - start, count));
// long end = System.currentTimeMillis();
// logger.finest(String.format("task [%s] : not complete -> %s, time: %s ms, exec count: %s.", getName(), isComplete ? "had complete" : "not complete", end - start, count));
if (!isComplete) {
timerExecutor.add(this, true);

View File

@ -1,6 +1,6 @@
package net.tccn.timer;
package dev.zhub.timer;
import net.tccn.timer.scheduled.ScheduledCycle;
import dev.zhub.timer.scheduled.ScheduledCycle;
import org.redkale.util.Utility;
import java.util.function.Supplier;

View File

@ -1,6 +1,6 @@
package net.tccn.timer.queue;
package dev.zhub.timer.queue;
import net.tccn.timer.task.Task;
import dev.zhub.timer.task.Task;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;

View File

@ -1,4 +1,4 @@
package net.tccn.timer.scheduled;
package dev.zhub.timer.scheduled;
import java.time.LocalDateTime;

View File

@ -1,4 +1,4 @@
package net.tccn.timer.scheduled;
package dev.zhub.timer.scheduled;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;

View File

@ -1,4 +1,4 @@
package net.tccn.timer.scheduled;
package dev.zhub.timer.scheduled;
import java.time.LocalDate;
import java.time.LocalDateTime;

View File

@ -1,4 +1,4 @@
package net.tccn.timer.task;
package dev.zhub.timer.task;
/**
* @author: liangxianyou at 2018/12/8 17:24.

View File

@ -1,7 +1,7 @@
package net.tccn.timer.task;
package dev.zhub.timer.task;
import net.tccn.timer.TimerExecutor;
import net.tccn.timer.scheduled.Scheduled;
import dev.zhub.timer.TimerExecutor;
import dev.zhub.timer.scheduled.Scheduled;
/**
* @author: liangxianyou at 2018/8/5 19:32.

View File

@ -1,131 +0,0 @@
/*
* Click nbfs://nbhost/SystemFileSystem/Templates/Licenses/license-default.txt to change this license
* Click nbfs://nbhost/SystemFileSystem/Templates/Classes/Class.java to edit this template
*/
package org.redkalex.cache.redis;
import org.redkale.annotation.Resource;
import org.redkale.convert.Convert;
import org.redkale.convert.json.JsonConvert;
import org.redkale.source.AbstractCacheSource;
import org.redkale.util.AnyValue;
import org.redkale.util.RedkaleClassLoader;
import org.redkale.util.RedkaleException;
import org.redkale.util.ResourceFactory;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
/**
* @author zhangjx
* @since 2.8.0
*/
public abstract class AbstractRedisSource extends AbstractCacheSource {
public static final String CACHE_SOURCE_CRYPTOR = "cryptor";
protected String name;
@Resource(required = false)
protected ResourceFactory resourceFactory;
@Resource(required = false)
protected JsonConvert defaultConvert;
@Resource(name = "$_convert", required = false)
protected JsonConvert convert;
protected int db;
protected RedisCryptor cryptor;
protected AnyValue config;
@Override
public void init(AnyValue conf) {
this.config = conf;
super.init(conf);
this.name = conf.getValue("name", "");
if (this.convert == null) this.convert = this.defaultConvert;
if (conf != null) {
String cryptStr = conf.getValue(CACHE_SOURCE_CRYPTOR, "").trim();
if (!cryptStr.isEmpty()) {
try {
Class<RedisCryptor> cryptClass = (Class) getClass().getClassLoader().loadClass(cryptStr);
RedkaleClassLoader.putReflectionPublicConstructors(cryptClass, cryptClass.getName());
this.cryptor = cryptClass.getConstructor().newInstance();
} catch (ReflectiveOperationException e) {
throw new RedkaleException(e);
}
}
}
if (cryptor != null) {
if (resourceFactory != null) {
resourceFactory.inject(cryptor);
}
cryptor.init(conf);
}
}
@Override
public void destroy(AnyValue conf) {
super.destroy(conf);
if (cryptor != null) {
cryptor.destroy(conf);
}
}
@Override
public void close() throws Exception { // Application 关闭时调用
destroy(null);
}
@Override
public String resourceName() {
return name;
}
protected String decryptValue(String key, RedisCryptor cryptor, String value) {
return cryptor != null ? cryptor.decrypt(key, value) : value;
}
protected <T> T decryptValue(String key, RedisCryptor cryptor, Type type, byte[] bs) {
return decryptValue(key, cryptor, convert, type, bs);
}
protected <T> T decryptValue(String key, RedisCryptor cryptor, Convert c, Type type, byte[] bs) {
if (bs == null) return null;
if (type == byte[].class) return (T) bs;
if (cryptor == null || (type instanceof Class && (((Class) type).isPrimitive() || Number.class.isAssignableFrom((Class) type)))) {
return (T) (c == null ? this.convert : c).convertFrom(type, bs);
}
String deval = cryptor.decrypt(key, new String(bs, StandardCharsets.UTF_8));
return deval == null ? null : (T) (c == null ? this.convert : c).convertFrom(type, deval.getBytes(StandardCharsets.UTF_8));
}
protected String encryptValue(String key, RedisCryptor cryptor, String value) {
return cryptor != null ? cryptor.encrypt(key, value) : value;
}
protected <T> byte[] encryptValue(String key, RedisCryptor cryptor, Convert c, T value) {
return encryptValue(key, cryptor, null, c, value);
}
protected <T> byte[] encryptValue(String key, RedisCryptor cryptor, Type type, Convert c, T value) {
if (value == null) return null;
Type t = type == null ? value.getClass() : type;
if (cryptor == null && type == String.class) {
return value.toString().getBytes(StandardCharsets.UTF_8);
}
return encryptValue(key, cryptor, t, (c == null ? this.convert : c).convertToBytes(t, value));
}
protected byte[] encryptValue(String key, RedisCryptor cryptor, Type type, byte[] bs) {
if (bs == null) return null;
if (cryptor == null || (type instanceof Class && (((Class) type).isPrimitive() || Number.class.isAssignableFrom((Class) type)))) {
return bs;
}
String enval = cryptor.encrypt(key, new String(bs, StandardCharsets.UTF_8));
return enval == null ? null : enval.getBytes(StandardCharsets.UTF_8);
}
}

View File

@ -1,343 +0,0 @@
package org.redkalex.cache.redis;
import org.redkale.annotation.AutoLoad;
import org.redkale.annotation.ResourceType;
import org.redkale.service.Local;
import org.redkale.source.CacheSource;
import org.redkale.util.AnyValue;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.CompletableFuture;
@Local
@AutoLoad(false)
@ResourceType(CacheSource.class)
public class MyRedisCacheSource extends RedisCacheSource {
@Override
public void init(AnyValue conf) {
super.init(conf);
}
/*
//--------------------- zset ------------------------------
public int getZrank(String key, V v) {
byte[][] bytes = Stream.of(key, v).map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
Long t = (Long) send("ZRANK", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
return t == null ? -1 : (int) (long) t;
}
public int getZrevrank(String key, V v) {
byte[][] bytes = Stream.of(key, v).map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
Long t = (Long) send("ZREVRANK", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
return t == null ? -1 : (int) (long) t;
}
//ZRANGE/ZREVRANGE key start stop
public List<V> getZset(String key) {
byte[][] bytes = Stream.of(key, 0, -1).map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
List<V> vs = (List<V>) send("ZREVRANGE", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
List<V> vs2 = new ArrayList(vs.size());
for (int i = 0; i < vs.size(); ++i) {
if (i % 2 == 1) {
vs2.add(this.convert.convertFrom(this.objValueType, String.valueOf(vs.get(i))));
} else {
vs2.add(vs.get(i));
}
}
return vs2;
}
public List<V> getZset(String key, int offset, int limit) {
byte[][] bytes = Stream.of(key, offset, offset + limit - 1).map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
List<V> vs = (List<V>) send("ZREVRANGE", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
List<V> vs2 = new ArrayList(vs.size());
for (int i = 0; i < vs.size(); ++i) {
if (i % 2 == 1) {
vs2.add(this.convert.convertFrom(this.objValueType, String.valueOf(vs.get(i))));
} else {
vs2.add(vs.get(i));
}
}
return vs2;
}
public LinkedHashMap<V, Long> getZsetLongScore(String key) {
LinkedHashMap<V, Double> map = getZsetDoubleScore(key);
if (map.isEmpty()) {
return new LinkedHashMap<>();
}
LinkedHashMap<V, Long> map2 = new LinkedHashMap<>(map.size());
map.forEach((k, v) -> map2.put(k, (long) (double) v));
return map2;
}
public LinkedHashMap<V, Long> getZsetItemsLongScore(String key) {
byte[][] bytes = Stream.of(key, 0, -1, "WITHSCORES").map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
List vs = (List) send("ZRANGE", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
LinkedHashMap<V, Long> map = new LinkedHashMap<>();
for (int i = 0; i < vs.size(); i += 2) {
map.put((V) vs.get(i), (long) Double.parseDouble((String) vs.get(i + 1)));
}
return map;
}
public Long getZsetLongScore(String key, V v) {
Double score = getZsetDoubleScore(key, v);
if (score == null) {
return null;
}
return (long) (double) score;
}
public LinkedHashMap<V, Double> getZsetDoubleScore(String key) {
byte[][] bytes = Stream.of(key, 0, -1, "WITHSCORES").map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
List vs = (List) send("ZREVRANGE", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
LinkedHashMap<V, Double> map = new LinkedHashMap<>();
for (int i = 0; i < vs.size(); i += 2) {
map.put((V) vs.get(i), Double.parseDouble((String) vs.get(i + 1)));
}
return map;
}
public Double getZsetDoubleScore(String key, V v) {
byte[][] bytes = Stream.of(key, v).map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
Serializable zscore = send("ZSCORE", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
if (zscore == null) {
return null;
}
return Double.parseDouble(String.valueOf(zscore));
}
public LinkedHashMap<V, Long> getZsetLongScore(String key, int offset, int limit) {
byte[][] bytes = Stream.of(key, offset, offset + limit - 1, "WITHSCORES").map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
List vs = (List) send("ZREVRANGE", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
LinkedHashMap<V, Long> map = new LinkedHashMap<>();
for (int i = 0; i < vs.size(); i += 2) {
map.put((V) vs.get(i), (long) Double.parseDouble((String) vs.get(i + 1)));
}
return map;
}
public LinkedHashMap<V, Double> getZsetDoubleScore(String key, int offset, int limit) {
byte[][] bytes = Stream.of(key, offset, offset + limit - 1, "WITHSCORES").map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
List vs = (List) send("ZREVRANGE", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
LinkedHashMap<V, Double> map = new LinkedHashMap<>();
for (int i = 0; i < vs.size(); i += 2) {
map.put((V) vs.get(i), Double.parseDouble(vs.get(i + 1) + ""));
}
return map;
}
* */
// --------------------
public <N extends Number> void zadd(String key, Map<Serializable, N> kv) {
if (kv == null || kv.isEmpty()) {
return;
}
List<Serializable> args = new ArrayList();
kv.forEach((k, v) -> {
args.add(k);
args.add(v);
});
sendAsync("ZADD", key, args.toArray(Serializable[]::new)).join();
}
public <N extends Number> double zincr(String key, Serializable number, N n) {
return sendAsync("ZINCRBY", key, number, n).thenApply(x -> x.getDoubleValue(0d)).join();
}
public void zrem(String key, Serializable... vs) {
sendAsync("ZREM", key, vs).join();
}
/*public <T> List<T> zexists(String key, T... fields) {
if (fields == null || fields.length == 0) {
return new ArrayList<>();
}
List<String> para = new ArrayList<>();
para.add("" +
" local key = KEYS[1];" +
" local args = ARGV;" +
" local result = {};" +
" for i,v in ipairs(args) do" +
" local inx = redis.call('ZREVRANK', key, v);" +
" if(inx) then" +
" table.insert(result,1,v);" +
" end" +
" end" +
" return result;");
para.add("1");
para.add(key);
for (Object field : fields) {
para.add(String.valueOf(field));
}
// todo:
//sendAsync("EVAL", null, para.toArray(Serializable[]::new)).thenApply(x -> x.).join();
return null;
}*/
//--------------------- bit ------------------------------
public boolean getBit(String key, int offset) {
return sendAsync("GETBIT", key, key.getBytes(StandardCharsets.UTF_8), String.valueOf(offset).getBytes(StandardCharsets.UTF_8)).thenApply(v -> v.getIntValue(0) > 0).join();
}
public void setBit(String key, int offset, boolean bool) {
sendAsync("SETBIT", key, offset, bool ? 1 : 0).join();
}
//--------------------- bit ------------------------------
//--------------------- lock ------------------------------
// 尝试加锁成功返回0否则返回上一锁剩余毫秒值
public long tryLock(String key, int millis) {
Serializable[] obj = {"" +
"if (redis.call('EXISTS',KEYS[1]) == 0) then " +
"redis.call('PSETEX',KEYS[1],ARGV[1],1); " +
"return 0; " +
"else " +
"return redis.call('PTTL',KEYS[1]); " +
"end;", 1, key, millis
};
return sendAsync("EVAL", null, obj).thenApply(v -> v.getIntValue(1)).join();
}
// 加锁
public void lock(String key, int millis) {
long i;
do {
i = tryLock(key, millis);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
} while (i > 0);
}
// 解锁
public void unlock(String key) {
remove(key);
}
//--------------------- key ------------------------------
public String get(String key) {
return get(key, String.class);
}
public void set(String key, Serializable value) {
sendAsync("SET", key, value).join();
}
//--------------------- set ------------------------------
public <T> void sadd(String key, Collection<T> args) {
saddAsync(key, args.toArray(Serializable[]::new)).join();
}
public void sadd(String key, Serializable... args) {
saddAsync(key, Arrays.stream(args).toArray(Serializable[]::new)).join();
}
public void srem(String key, Serializable... args) {
sremAsync(key, args).join();
}
public CompletableFuture<RedisCacheResult> saddAsync(String key, Serializable... args) {
return sendAsync("SADD", key, args);
}
public CompletableFuture<RedisCacheResult> sremAsync(String key, Serializable... args) {
return sendAsync("SREM", key, args);
}
//--------------------- hm ------------------------------
/*public Long incrHm(String key, String field, int value) {
return sendAsync("HINCRBY", key, field, value).thenApply(x -> x.getLongValue(0l)).join();
}
public Double incrHm(String key, String field, double value) {
return sendAsync("HINCRBYFLOAT", key, field, value).thenApply(x -> x.getDoubleValue(0d)).join();
}*/
public void setHm(String key, String field, Serializable value) {
setHmsAsync(key, Map.of(field, value)).join();
}
public void setHms(String key, Map kv) {
setHmsAsync(key, kv).join();
}
public CompletableFuture<RedisCacheResult> setHmsAsync(String key, Map<Serializable, Serializable> kv) {
List<Serializable> args = new ArrayList();
kv.forEach((k, v) -> {
args.add(k);
args.add(v);
});
return sendAsync("HMSET", key, args.toArray(Serializable[]::new));
}
public String getHm(String key, String field) {
return getHm(key, String.class, field);
}
public <T extends Serializable> T getHm(String key, Class<T> type, String field) {
List<Serializable> list = super.hmget(key, type, field);
if (list == null && list.isEmpty()) {
return null;
}
return (T) list.get(0);
}
public Map<String, String> getHms(String key, String... field) {
return getHms(key, String.class, field);
}
public <T extends Serializable> Map<String, T> getHms(String key, Class<T> type, String... field) {
List<Serializable> list = super.hmget(key, type, field);
if (list == null && list.isEmpty()) {
return null;
}
Map<String, T> map = new HashMap<>(field.length);
for (int i = 0; i < field.length; i++) {
if (list.get(i) == null) {
continue;
}
map.put(field[i], (T) list.get(i));
}
return map;
}
/*public Map<String, Object> getHmall(String key) {
List<String> list = null; // TODO:
Map<String, Object> map = new HashMap<>();
if (list.isEmpty()) {
return map;
}
for (int i = 0; i + 1 < list.size(); i += 2) {
map.put((String) list.get(i), list.get(i + 1));
}
return map;
}*/
}

View File

@ -1,78 +0,0 @@
/*
*
*/
package org.redkalex.cache.redis;
import org.redkale.util.Utility;
import java.util.Arrays;
/**
* @author zhangjx
*/
public class RedisCRC16 {
private static final int[] LOOKUP_TABLE = {0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50A5, 0x60C6,
0x70E7, 0x8108, 0x9129, 0xA14A, 0xB16B, 0xC18C, 0xD1AD, 0xE1CE, 0xF1EF, 0x1231, 0x0210, 0x3273,
0x2252, 0x52B5, 0x4294, 0x72F7, 0x62D6, 0x9339, 0x8318, 0xB37B, 0xA35A, 0xD3BD, 0xC39C, 0xF3FF,
0xE3DE, 0x2462, 0x3443, 0x0420, 0x1401, 0x64E6, 0x74C7, 0x44A4, 0x5485, 0xA56A, 0xB54B, 0x8528,
0x9509, 0xE5EE, 0xF5CF, 0xC5AC, 0xD58D, 0x3653, 0x2672, 0x1611, 0x0630, 0x76D7, 0x66F6, 0x5695,
0x46B4, 0xB75B, 0xA77A, 0x9719, 0x8738, 0xF7DF, 0xE7FE, 0xD79D, 0xC7BC, 0x48C4, 0x58E5, 0x6886,
0x78A7, 0x0840, 0x1861, 0x2802, 0x3823, 0xC9CC, 0xD9ED, 0xE98E, 0xF9AF, 0x8948, 0x9969, 0xA90A,
0xB92B, 0x5AF5, 0x4AD4, 0x7AB7, 0x6A96, 0x1A71, 0x0A50, 0x3A33, 0x2A12, 0xDBFD, 0xCBDC, 0xFBBF,
0xEB9E, 0x9B79, 0x8B58, 0xBB3B, 0xAB1A, 0x6CA6, 0x7C87, 0x4CE4, 0x5CC5, 0x2C22, 0x3C03, 0x0C60,
0x1C41, 0xEDAE, 0xFD8F, 0xCDEC, 0xDDCD, 0xAD2A, 0xBD0B, 0x8D68, 0x9D49, 0x7E97, 0x6EB6, 0x5ED5,
0x4EF4, 0x3E13, 0x2E32, 0x1E51, 0x0E70, 0xFF9F, 0xEFBE, 0xDFDD, 0xCFFC, 0xBF1B, 0xAF3A, 0x9F59,
0x8F78, 0x9188, 0x81A9, 0xB1CA, 0xA1EB, 0xD10C, 0xC12D, 0xF14E, 0xE16F, 0x1080, 0x00A1, 0x30C2,
0x20E3, 0x5004, 0x4025, 0x7046, 0x6067, 0x83B9, 0x9398, 0xA3FB, 0xB3DA, 0xC33D, 0xD31C, 0xE37F,
0xF35E, 0x02B1, 0x1290, 0x22F3, 0x32D2, 0x4235, 0x5214, 0x6277, 0x7256, 0xB5EA, 0xA5CB, 0x95A8,
0x8589, 0xF56E, 0xE54F, 0xD52C, 0xC50D, 0x34E2, 0x24C3, 0x14A0, 0x0481, 0x7466, 0x6447, 0x5424,
0x4405, 0xA7DB, 0xB7FA, 0x8799, 0x97B8, 0xE75F, 0xF77E, 0xC71D, 0xD73C, 0x26D3, 0x36F2, 0x0691,
0x16B0, 0x6657, 0x7676, 0x4615, 0x5634, 0xD94C, 0xC96D, 0xF90E, 0xE92F, 0x99C8, 0x89E9, 0xB98A,
0xA9AB, 0x5844, 0x4865, 0x7806, 0x6827, 0x18C0, 0x08E1, 0x3882, 0x28A3, 0xCB7D, 0xDB5C, 0xEB3F,
0xFB1E, 0x8BF9, 0x9BD8, 0xABBB, 0xBB9A, 0x4A75, 0x5A54, 0x6A37, 0x7A16, 0x0AF1, 0x1AD0, 0x2AB3,
0x3A92, 0xFD2E, 0xED0F, 0xDD6C, 0xCD4D, 0xBDAA, 0xAD8B, 0x9DE8, 0x8DC9, 0x7C26, 0x6C07, 0x5C64,
0x4C45, 0x3CA2, 0x2C83, 0x1CE0, 0x0CC1, 0xEF1F, 0xFF3E, 0xCF5D, 0xDF7C, 0xAF9B, 0xBFBA, 0x8FD9,
0x9FF8, 0x6E17, 0x7E36, 0x4E55, 0x5E74, 0x2E93, 0x3EB2, 0x0ED1, 0x1EF0};
private RedisCRC16() {
}
public static int crc16(byte[] bytes) {
int crc = 0x0000;
for (byte b : bytes) {
crc = (crc << 8) ^ LOOKUP_TABLE[((crc >>> 8) ^ (b & 0xFF)) & 0xFF];
}
return crc & 0xFFFF;
}
public static int calcSlot(int maxSlot, byte[] key) {
if (key == null) {
return 0;
}
int start = Utility.indexOf(key, (byte) '{');
if (start != -1) {
int end = Utility.indexOf(key, start + 1, (byte) '}');
if (end != -1) {
key = Arrays.copyOfRange(key, start + 1, end);
}
}
int result = crc16(key) % maxSlot;
return result;
}
public static int calcSlot(int maxSlot, String key) {
if (key == null) {
return 0;
}
int start = key.indexOf('{');
if (start != -1) {
int end = key.indexOf('}');
if (end != -1 && start + 1 < end) {
key = key.substring(start + 1, end);
}
}
int result = crc16(key.getBytes()) % maxSlot;
return result;
}
}

View File

@ -1,36 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkalex.cache.redis;
import org.redkale.net.AsyncConnection;
import org.redkale.net.AsyncGroup;
import org.redkale.net.client.Client;
import org.redkale.net.client.ClientAddress;
/**
* @author zhangjx
*/
public class RedisCacheClient extends Client<RedisCacheConnection, RedisCacheRequest, RedisCacheResult> {
public RedisCacheClient(String name, AsyncGroup group, String key, ClientAddress address, int maxConns, int maxPipelines, RedisCacheReqAuth authReq, RedisCacheReqDB dbReq) {
super(name, group, true, address, maxConns, maxPipelines, () -> new RedisCacheReqPing(), () -> new RedisCacheReqClose(), null); //maxConns
if (authReq != null || dbReq != null) {
if (authReq != null && dbReq != null) {
this.authenticate = conn -> writeChannel(conn, authReq).thenCompose(v -> writeChannel(conn, dbReq)).thenApply(v -> conn);
} else if (authReq != null) {
this.authenticate = conn -> writeChannel(conn, authReq).thenApply(v -> conn);
} else {
this.authenticate = conn -> writeChannel(conn, dbReq).thenApply(v -> conn);
}
}
}
@Override
protected RedisCacheConnection createClientConnection(final int index, AsyncConnection channel) {
return new RedisCacheConnection(this, index, channel);
}
}

View File

@ -1,249 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkalex.cache.redis;
import org.redkale.net.client.ClientCodec;
import org.redkale.net.client.ClientConnection;
import org.redkale.util.ByteArray;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;
/**
* @author zhangjx
*/
public class RedisCacheCodec extends ClientCodec<RedisCacheRequest, RedisCacheResult> {
protected static final byte TYPE_STRING = '+'; //简单字符串(不包含CRLF)类型
protected static final byte TYPE_ERROR = '-'; //错误(不包含CRLF)类型
protected static final byte TYPE_NUMBER = ':'; //整型
protected static final byte TYPE_BULK = '$'; //块字符串
protected static final byte TYPE_ARRAY = '*'; //数组
protected static final Logger logger = Logger.getLogger(RedisCacheCodec.class.getSimpleName());
protected byte halfFrameCmd;
protected int halfFrameBulkLength = -10;
protected int halfFrameArraySize = -10;
protected int halfFrameArrayIndex; //从0开始
protected int halfFrameArrayItemLength = -10;
protected ByteArray halfFrameBytes;
protected byte frameType;
protected byte[] frameValue; //(不包含CRLF)
protected List<byte[]> frameList; //(不包含CRLF)
private ByteArray recyclableArray;
public RedisCacheCodec(ClientConnection connection) {
super(connection);
}
protected ByteArray pollArray(ByteArray array) {
if (recyclableArray == null) {
recyclableArray = new ByteArray();
} else {
recyclableArray.clear();
}
recyclableArray.clear();
if (array != null) {
recyclableArray.put(array, 0, array.length());
}
return recyclableArray;
}
private boolean checkBytesFrame(RedisCacheConnection conn, ByteBuffer buffer, ByteArray array) {
// byte[] dbs = new byte[buffer.remaining()];
// for (int i = 0; i < dbs.length; i++) {
// dbs[i] = buffer.get(buffer.position() + i);
// }
// ArrayDeque<ClientFuture> deque = (ArrayDeque) responseQueue(conn);
// logger.log(Level.FINEST, "[" + Utility.nowMillis() + "] [" + Thread.currentThread().getName() + "]: " + conn + ", 原始数据: " + new String(dbs).replace("\r\n", " ") + ", req=" + deque.getFirst().getRequest());
array.clear();
byte type = halfFrameCmd == 0 ? buffer.get() : halfFrameCmd;
if (halfFrameBytes != null) {
array.put(halfFrameBytes, 0, halfFrameBytes.length());
}
frameType = type;
if (type == TYPE_STRING || type == TYPE_ERROR || type == TYPE_NUMBER) {
if (readComplete(buffer, array)) {
frameValue = array.getBytes();
} else {
halfFrameCmd = type;
halfFrameBytes = pollArray(array);
return false;
}
} else if (type == TYPE_BULK) {
int bulkLength = halfFrameBulkLength;
if (bulkLength < -2) {
if (!readComplete(buffer, array)) { //没有读到bulkLength
halfFrameCmd = type;
halfFrameBulkLength = -10;
halfFrameBytes = pollArray(array);
return false;
}
bulkLength = Integer.parseInt(array.toString(StandardCharsets.UTF_8));
array.clear();
}
if (bulkLength == -1) {
frameValue = null;
} else if (readComplete(buffer, array)) {
frameValue = array.getBytes();
} else {
halfFrameCmd = type;
halfFrameBulkLength = bulkLength;
halfFrameBytes = pollArray(array);
return false;
}
} else if (type == TYPE_ARRAY) {
int arraySize = halfFrameArraySize;
if (arraySize < -2) {
if (!readComplete(buffer, array)) { //没有读到arraySize
halfFrameCmd = type;
halfFrameArraySize = -10;
halfFrameArrayIndex = 0;
halfFrameArrayItemLength = -10;
halfFrameBytes = pollArray(array);
return false;
}
arraySize = Integer.parseInt(array.toString(StandardCharsets.UTF_8));
array.clear();
}
int arrayIndex = halfFrameArrayIndex;
for (int i = arrayIndex; i < arraySize; i++) {
int itemLength = halfFrameArrayItemLength;
halfFrameArrayItemLength = -10;
if (itemLength < -2) {
if (!readComplete(buffer, array)) { //没有读到bulkLength
halfFrameCmd = type;
halfFrameArraySize = arraySize;
halfFrameArrayIndex = i;
halfFrameArrayItemLength = -10;
halfFrameBytes = pollArray(array);
return false;
}
byte sign = array.get(0);
itemLength = Integer.parseInt(array.toString(1, StandardCharsets.UTF_8));
array.clear();
if (sign == TYPE_ARRAY) { //数组中嵌套数组目前有 HSCAN
frameValue = null;
if (frameList != null) {
frameList.clear();
}
clearHalfFrame();
if (itemLength == 0) {
return true;
}
halfFrameCmd = sign;
halfFrameArraySize = itemLength;
if (!buffer.hasRemaining()) {
return false;
}
return checkBytesFrame(conn, buffer, array);
}
}
int cha = itemLength - array.length();
if (itemLength == -1) {
if (frameList == null) {
frameList = new ArrayList<>();
}
frameList.add(null);
array.clear();
} else if (buffer.remaining() >= cha + 2) {
for (int j = 0; j < cha; j++) array.put(buffer.get());
buffer.get(); //\r
buffer.get(); //\n
if (frameList == null) {
frameList = new ArrayList<>();
}
frameList.add(array.getBytes());
array.clear();
} else {
while (buffer.hasRemaining()) array.put(buffer.get());
halfFrameCmd = type;
halfFrameArraySize = arraySize;
halfFrameArrayIndex = i;
halfFrameArrayItemLength = itemLength;
halfFrameBytes = pollArray(array);
return false;
}
}
}
clearHalfFrame();
return true;
}
protected void clearHalfFrame() {
halfFrameCmd = 0;
halfFrameBulkLength = -10;
halfFrameArraySize = -10;
halfFrameArrayIndex = 0;
halfFrameArrayItemLength = -10;
halfFrameBytes = null;
}
@Override
public void decodeMessages(ByteBuffer realbuf, ByteArray array) {
RedisCacheConnection conn = (RedisCacheConnection) connection;
if (!realbuf.hasRemaining()) {
return;
}
ByteBuffer buffer = realbuf;
if (!checkBytesFrame(conn, buffer, array)) {
return;
}
//buffer必然包含一个完整的frame数据
boolean first = true;
RedisCacheRequest request = null;
while (first || buffer.hasRemaining()) {
if (request == null) {
request = nextRequest();
}
if (!first && !checkBytesFrame(conn, buffer, array)) {
break;
}
if (frameType == TYPE_ERROR) {
addMessage(request, new RuntimeException(new String(frameValue, StandardCharsets.UTF_8)));
} else {
addMessage(request, conn.pollResultSet(request).prepare(frameType, frameValue, frameList));
}
frameType = 0;
frameValue = null;
frameList = null;
halfFrameCmd = 0;
halfFrameBytes = null;
first = false;
buffer = realbuf;
}
}
protected boolean readComplete(ByteBuffer buffer, ByteArray array) {
while (buffer.hasRemaining()) {
byte b = buffer.get();
if (b == '\n') {
array.removeLastByte(); //移除 \r
return true;
}
array.put(b);
}
return false;
}
}

View File

@ -1,49 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkalex.cache.redis;
import org.redkale.net.AsyncConnection;
import org.redkale.net.WorkThread;
import org.redkale.net.client.Client;
import org.redkale.net.client.ClientCodec;
import org.redkale.net.client.ClientConnection;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
/**
* @author zhangjx
*/
public class RedisCacheConnection extends ClientConnection<RedisCacheRequest, RedisCacheResult> {
public RedisCacheConnection(Client client, int index, AsyncConnection channel) {
super(client, index, channel);
}
@Override
protected ClientCodec createCodec() {
return new RedisCacheCodec(this);
}
protected CompletableFuture<RedisCacheResult> writeRequest(RedisCacheRequest request) {
return super.writeChannel(request);
}
protected <T> CompletableFuture<T> writeRequest(RedisCacheRequest request, Function<RedisCacheResult, T> respTransfer) {
return super.writeChannel(request, respTransfer);
}
public RedisCacheResult pollResultSet(RedisCacheRequest request) {
RedisCacheResult rs = new RedisCacheResult();
return rs;
}
public RedisCacheRequest pollRequest(WorkThread workThread) {
RedisCacheRequest rs = new RedisCacheRequest().currThread(workThread);
return rs;
}
}

View File

@ -1,45 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkalex.cache.redis;
import org.redkale.net.client.ClientConnection;
import org.redkale.util.ByteArray;
import java.nio.charset.StandardCharsets;
/**
* @author zhangjx
*/
public class RedisCacheReqAuth extends RedisCacheRequest {
private static final byte[] PS = "AUTH".getBytes(StandardCharsets.UTF_8);
protected String password;
public RedisCacheReqAuth(String password) {
this.password = password;
}
@Override
public void writeTo(ClientConnection conn, ByteArray writer) {
byte[] pwd = password.getBytes();
writer.put((byte) '*');
writer.put((byte) '2');
writer.put((byte) '\r', (byte) '\n');
writer.put((byte) '$');
writer.put((byte) '4');
writer.put((byte) '\r', (byte) '\n');
writer.put(PS);
writer.put((byte) '\r', (byte) '\n');
writer.put((byte) '$');
writer.put(String.valueOf(pwd.length).getBytes(StandardCharsets.UTF_8));
writer.put((byte) '\r', (byte) '\n');
writer.put(pwd);
writer.put((byte) '\r', (byte) '\n');
}
}

View File

@ -1,36 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkalex.cache.redis;
import org.redkale.net.client.ClientConnection;
import org.redkale.util.ByteArray;
import java.nio.charset.StandardCharsets;
/**
* @author zhangjx
*/
public class RedisCacheReqClose extends RedisCacheRequest {
private static final byte[] PS = "QUIT".getBytes(StandardCharsets.UTF_8);
@Override
public final boolean isCloseType() {
return true;
}
@Override
public void writeTo(ClientConnection conn, ByteArray writer) {
writer.put((byte) '*');
writer.put((byte) '1');
writer.put((byte) '\r', (byte) '\n');
writer.put((byte) '$');
writer.put((byte) '4');
writer.put((byte) '\r', (byte) '\n');
writer.put(PS);
writer.put((byte) '\r', (byte) '\n');
}
}

View File

@ -1,43 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkalex.cache.redis;
import org.redkale.net.client.ClientConnection;
import org.redkale.util.ByteArray;
import java.nio.charset.StandardCharsets;
/**
* @author zhangjx
*/
public class RedisCacheReqDB extends RedisCacheRequest {
protected int db;
public RedisCacheReqDB(int db) {
this.db = db;
}
@Override
public void writeTo(ClientConnection conn, ByteArray writer) {
writer.put((byte) '*');
writer.put((byte) '2');
writer.put((byte) '\r', (byte) '\n');
writer.put((byte) '$');
writer.put((byte) '6');
writer.put((byte) '\r', (byte) '\n');
writer.put("SELECT".getBytes(StandardCharsets.UTF_8));
writer.put((byte) '\r', (byte) '\n');
byte[] dbs = String.valueOf(db).getBytes(StandardCharsets.UTF_8);
writer.put((byte) '$');
writer.put(String.valueOf(dbs.length).getBytes(StandardCharsets.UTF_8));
writer.put((byte) '\r', (byte) '\n');
writer.put(dbs);
writer.put((byte) '\r', (byte) '\n');
}
}

View File

@ -1,31 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkalex.cache.redis;
import org.redkale.net.client.ClientConnection;
import org.redkale.util.ByteArray;
import java.nio.charset.StandardCharsets;
/**
* @author zhangjx
*/
public class RedisCacheReqPing extends RedisCacheRequest {
private static final byte[] PS = "PING".getBytes(StandardCharsets.UTF_8);
@Override
public void writeTo(ClientConnection conn, ByteArray writer) {
writer.put((byte) '*');
writer.put((byte) '1');
writer.put((byte) '\r', (byte) '\n');
writer.put((byte) '$');
writer.put((byte) '4');
writer.put((byte) '\r', (byte) '\n');
writer.put(PS);
writer.put((byte) '\r', (byte) '\n');
}
}

View File

@ -1,61 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkalex.cache.redis;
import org.redkale.net.client.ClientConnection;
import org.redkale.net.client.ClientRequest;
import org.redkale.util.ByteArray;
import java.nio.charset.StandardCharsets;
/**
* @author zhangjx
*/
public class RedisCacheRequest extends ClientRequest {
static final byte[] TRUE = new byte[]{'t'};
static final byte[] FALSE = new byte[]{'f'};
protected String key;
protected String command;
protected byte[][] args;
public <T> RedisCacheRequest prepare(String command, String key, byte[]... args) {
super.prepare();
this.command = command;
this.key = key;
this.args = args;
return this;
}
@Override
public void writeTo(ClientConnection conn, ByteArray writer) {
writer.put((byte) '*');
writer.put(String.valueOf(args.length + 1).getBytes(StandardCharsets.UTF_8));
writer.put((byte) '\r', (byte) '\n');
writer.put((byte) '$');
writer.put(String.valueOf(command.length()).getBytes(StandardCharsets.UTF_8));
writer.put((byte) '\r', (byte) '\n');
writer.put(command.getBytes(StandardCharsets.UTF_8));
writer.put((byte) '\r', (byte) '\n');
for (final byte[] arg : args) {
writer.put((byte) '$');
writer.put(String.valueOf(arg.length).getBytes(StandardCharsets.UTF_8));
writer.put((byte) '\r', (byte) '\n');
writer.put(arg);
writer.put((byte) '\r', (byte) '\n');
}
}
@Override
public String toString() {
return getClass().getSimpleName() + "{command=" + command + ", key=" + key + "}";
}
}

View File

@ -1,151 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkalex.cache.redis;
import org.redkale.convert.json.JsonConvert;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.*;
/**
* @author zhangjx
*/
public class RedisCacheResult {
//+ 简单字符串类型 (不包含CRLF)
//- 错误类型 (不包含CRLF)
//': 整型
//$ 块字符串
//* 数组
protected byte frameType;
protected byte[] frameValue; //(不包含CRLF)
protected List<byte[]> frameList; //(不包含CRLF)
public RedisCacheResult prepare(byte byteType, byte[] val, List<byte[]> bytesList) {
this.frameType = byteType;
this.frameValue = val;
this.frameList = bytesList;
return this;
}
public Void getVoidValue() {
return null;
}
public byte[] getFrameValue() {
return frameValue;
}
public Boolean getBoolValue() {
if (frameValue == null) {
return false;
}
String val = new String(frameValue, StandardCharsets.UTF_8);
if ("OK".equals(val)) {
return true;
}
return Integer.parseInt(val) > 0;
}
public String getStringValue(String key, RedisCryptor cryptor) {
if (frameValue == null) {
return null;
}
String val = new String(frameValue, StandardCharsets.UTF_8);
if (cryptor != null) {
val = cryptor.decrypt(key, val);
}
return val;
}
public Double getDoubleValue(Double defvalue) {
return frameValue == null ? defvalue : Double.parseDouble(new String(frameValue, StandardCharsets.UTF_8));
}
public Long getLongValue(Long defvalue) {
return frameValue == null ? defvalue : Long.parseLong(new String(frameValue, StandardCharsets.UTF_8));
}
public Integer getIntValue(Integer defvalue) {
return frameValue == null ? defvalue : Integer.parseInt(new String(frameValue, StandardCharsets.UTF_8));
}
public <T> T getObjectValue(String key, RedisCryptor cryptor, Type type) {
return formatValue(key, cryptor, frameValue, type);
}
protected <T> Set<T> getSetValue(String key, RedisCryptor cryptor, Type type) {
if (frameList == null || frameList.isEmpty()) {
return new LinkedHashSet<>();
}
Set<T> set = new LinkedHashSet<>();
for (byte[] bs : frameList) {
set.add(formatValue(key, cryptor, bs, type));
}
return set;
}
protected <T> List<T> getListValue(String key, RedisCryptor cryptor, Type type) {
if (frameList == null || frameList.isEmpty()) {
return new ArrayList<>();
}
List<T> list = new ArrayList<>();
for (byte[] bs : frameList) {
list.add(formatValue(key, cryptor, bs, type));
}
return list;
}
protected <T> Map<String, T> getMapValue(String key, RedisCryptor cryptor, Type type) {
if (frameList == null || frameList.isEmpty()) {
return new LinkedHashMap<>();
}
Map<String, T> map = new LinkedHashMap<>();
for (int i = 0; i < frameList.size(); i += 2) {
byte[] bs1 = frameList.get(i);
byte[] bs2 = frameList.get(i + 1);
T val = formatValue(key, cryptor, bs2, type);
if (val != null) {
map.put(formatValue(key, cryptor, bs1, String.class).toString(), val);
}
}
return map;
}
protected static <T> T formatValue(String key, RedisCryptor cryptor, byte[] frames, Type type) {
if (frames == null) {
return null;
}
if (type == byte[].class) {
return (T) frames;
}
if (type == String.class) {
String val = new String(frames, StandardCharsets.UTF_8);
if (cryptor != null) {
val = cryptor.decrypt(key, val);
}
return (T) val;
}
if (type == boolean.class || type == Boolean.class) {
return (T) (Boolean) "t".equalsIgnoreCase(new String(frames, StandardCharsets.UTF_8));
}
if (type == long.class || type == Long.class) {
return (T) (Long) Long.parseLong(new String(frames, StandardCharsets.UTF_8));
}
if (type == double.class || type == Double.class) {
return (T) (Double) Double.parseDouble(new String(frames, StandardCharsets.UTF_8));
}
if (cryptor != null) {
String val = cryptor.decrypt(key, new String(frames, StandardCharsets.UTF_8));
return (T) JsonConvert.root().convertFrom(type, val);
}
return (T) JsonConvert.root().convertFrom(type, frames);
}
}

File diff suppressed because it is too large Load Diff

View File

@ -1,29 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkalex.cache.redis;
import org.redkale.annotation.Priority;
import org.redkale.source.CacheSource;
import org.redkale.source.CacheSourceProvider;
import org.redkale.util.AnyValue;
/**
* @author zhangjx
*/
@Priority(1)
public class RedisCacheSourceProvider implements CacheSourceProvider {
@Override
public boolean acceptsConf(AnyValue config) {
return new MyRedisCacheSource().acceptsConf(config);
}
@Override
public CacheSource createInstance() {
return new MyRedisCacheSource();
}
}

View File

@ -1,46 +0,0 @@
/*
* Click nbfs://nbhost/SystemFileSystem/Templates/Licenses/license-default.txt to change this license
* Click nbfs://nbhost/SystemFileSystem/Templates/Classes/Class.java to edit this template
*/
package org.redkalex.cache.redis;
import org.redkale.util.AnyValue;
/**
* @author zhangjx
*/
public interface RedisCryptor {
/**
* 初始化
*
* @param conf 配置
*/
public void init(AnyValue conf);
/**
* 加密, 无需加密的key对应的值需要直接返回value
*
* @param key key
* @param value 明文
* @return 密文
*/
public String encrypt(String key, String value);
/**
* 解密, 无需解密的key对应的值需要直接返回value
*
* @param key key
* @param value 密文
* @return 明文
*/
public String decrypt(String key, String value);
/**
* 销毁
*
* @param conf 配置
*/
public void destroy(AnyValue conf);
}

View File

@ -1 +1 @@
net.tccn.ZhubProvider
dev.zhub.ZhubProvider

View File

@ -1,7 +1,7 @@
package net.tccn.mq;
import net.tccn.Event;
import net.tccn.timer.Timers;
import dev.zhub.timer.Timers;
import org.junit.Test;
import org.redkale.convert.json.JsonConvert;

View File

@ -27,15 +27,15 @@ public class HelloService implements Service {
@Resource(name = "vvvvhub2")
private ZHubClient zhub2;*/
//private net.tccn.zhub.ZHubClient zhubx = null;
//private dev.zhub.client.ZHubClient zhubx = null;
@Override
public void init(AnyValue config) {
/*CompletableFuture.runAsync(() -> {
zhubx = new net.tccn.zhub.ZHubClient("127.0.0.1", 1216, "g-dev", "DEV-LOCAL");
//zhubx = new net.tccn.zhub.ZHubClient("47.111.150.118", 6066, "g-dev", "DEV-LOCAL");
zhubx = new dev.zhub.client.ZHubClient("127.0.0.1", 1216, "g-dev", "DEV-LOCAL");
//zhubx = new dev.zhub.client.ZHubClient("47.111.150.118", 6066, "g-dev", "DEV-LOCAL");
});*/
// Function<Rpc<T>, RpcResult<R>> fun