Compare commits
8 Commits
master
...
dev/redkal
Author | SHA1 | Date | |
---|---|---|---|
79402950be | |||
0ab33845f0 | |||
11ed5d2018 | |||
71ccfc919b | |||
385bffac8f | |||
b9eb54a405 | |||
2e0d9af014 | |||
747b165e77 |
14
pom.xml
14
pom.xml
@ -6,7 +6,7 @@
|
||||
|
||||
<groupId>net.tccn</groupId>
|
||||
<artifactId>zhub-client-redkale</artifactId>
|
||||
<version>0.1.2-dev</version>
|
||||
<version>x.22.0</version> <!-- 支持 redkale-2.2 版本 -->
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source>17</maven.compiler.source>
|
||||
@ -18,7 +18,13 @@
|
||||
<dependency>
|
||||
<groupId>org.redkale</groupId>
|
||||
<artifactId>redkale</artifactId>
|
||||
<version>2.8.0-dev</version>
|
||||
<version>2.2.0</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>4.13.1</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
@ -28,7 +34,7 @@
|
||||
<repository>
|
||||
<id>maven-nexus</id>
|
||||
<name>maven-nexus</name>
|
||||
<url>http://47.106.237.198:8081/repository/maven-public/</url>
|
||||
<url>https://nexus.1216.top/repository/maven-public/</url>
|
||||
</repository>
|
||||
</repositories>
|
||||
|
||||
@ -37,7 +43,7 @@
|
||||
<repository>
|
||||
<id>mvn-release</id>
|
||||
<name>mvn-release</name>
|
||||
<url>http://47.106.237.198:8081/repository/maven-releases/</url>
|
||||
<url>https://nexus.1216.top/repository/maven-releases/</url>
|
||||
</repository>
|
||||
</distributionManagement>
|
||||
</project>
|
@ -1,9 +1,9 @@
|
||||
package net.tccn;
|
||||
|
||||
import org.redkale.convert.json.JsonConvert;
|
||||
import org.redkale.util.Resourcable;
|
||||
import org.redkale.util.TypeToken;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
@ -13,11 +13,12 @@ import java.util.function.Consumer;
|
||||
* @author Liang
|
||||
* @data 2020-09-05 23:18
|
||||
*/
|
||||
public abstract class AbstractConsumer extends ZhubAgentProvider implements IConsumer, Resourcable {
|
||||
public abstract class AbstractConsumer implements IConsumer {
|
||||
|
||||
protected JsonConvert convert = JsonConvert.root();
|
||||
|
||||
protected static String APP_NAME = "";
|
||||
@Resource(name = "APP_NAME")
|
||||
protected String APP_NAME = "";
|
||||
|
||||
private Map<String, EventType> eventMap = new ConcurrentHashMap<>();
|
||||
|
||||
@ -72,9 +73,4 @@ public abstract class AbstractConsumer extends ZhubAgentProvider implements ICon
|
||||
}
|
||||
|
||||
// --------------
|
||||
|
||||
@Override
|
||||
public String resourceName() {
|
||||
return super.getName();
|
||||
}
|
||||
}
|
||||
|
@ -15,8 +15,8 @@ public class Event<V> {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public static <V> Event of(String topic, V value) {
|
||||
return new Event<V>(topic, value);
|
||||
public static <V> Event<V> of(String topic, V value) {
|
||||
return new Event<>(topic, value);
|
||||
}
|
||||
|
||||
|
||||
|
@ -1,69 +0,0 @@
|
||||
package net.tccn;
|
||||
|
||||
import org.redkale.boot.Application;
|
||||
import org.redkale.boot.NodeServer;
|
||||
import org.redkale.cluster.CacheClusterAgent;
|
||||
import org.redkale.cluster.ClusterAgent;
|
||||
import org.redkale.service.Service;
|
||||
import org.redkale.util.ResourceEvent;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
public abstract class ZhubAgentProvider extends ClusterAgent {
|
||||
|
||||
@Override
|
||||
public void onResourceChange(ResourceEvent[] events) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void register(Application application) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deregister(Application application) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Map<String, Set<InetSocketAddress>>> queryMqtpAddress(String protocol, String module, String resname) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Set<InetSocketAddress>> queryHttpAddress(String protocol, String module, String resname) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Set<InetSocketAddress>> querySncpAddress(String protocol, String restype, String resname) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected CompletableFuture<Set<InetSocketAddress>> queryAddress(ClusterEntry entry) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClusterEntry register(NodeServer ns, String protocol, Service service) {
|
||||
deregister(ns, protocol, service);
|
||||
ClusterEntry clusterEntry = new ClusterEntry(ns, protocol, service);
|
||||
CacheClusterAgent.AddressEntry entry = new CacheClusterAgent.AddressEntry();
|
||||
entry.addr = clusterEntry.address;
|
||||
entry.resname = clusterEntry.resourceName;
|
||||
entry.nodeid = this.nodeid;
|
||||
entry.time = System.currentTimeMillis();
|
||||
//source.hset(clusterEntry.serviceName, clusterEntry.serviceid, CacheClusterAgent.AddressEntry.class, entry);
|
||||
return clusterEntry;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void deregister(NodeServer ns, String protocol, Service service) {
|
||||
|
||||
}
|
||||
}
|
50
src/main/java/net/tccn/ZhubListener.java
Normal file
50
src/main/java/net/tccn/ZhubListener.java
Normal file
@ -0,0 +1,50 @@
|
||||
package net.tccn;
|
||||
|
||||
import net.tccn.zhub.ZHubClient;
|
||||
import org.redkale.boot.Application;
|
||||
import org.redkale.boot.ApplicationListener;
|
||||
import org.redkale.service.Service;
|
||||
import org.redkale.util.AnyValue;
|
||||
import org.redkale.util.RedkaleClassLoader;
|
||||
import org.redkale.util.ResourceFactory;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
/**
|
||||
* 服务监听
|
||||
*
|
||||
* @author: liangxy.
|
||||
*/
|
||||
public class ZhubListener implements ApplicationListener {
|
||||
|
||||
@Override
|
||||
public void preStart(Application application) {
|
||||
|
||||
CompletableFuture.runAsync(() -> {
|
||||
ResourceFactory resourceFactory = application.getResourceFactory();
|
||||
RedkaleClassLoader classLoader = application.getClassLoader();
|
||||
|
||||
AnyValue appConfig = application.getAppConfig();
|
||||
AnyValue zhubs = appConfig.getAnyValue("zhubs");
|
||||
AnyValue[] values = zhubs.getAnyValues("zhub");
|
||||
for (AnyValue zhub : values) {
|
||||
String className = zhub.getValue("value", ZHubClient.class.getCanonicalName());
|
||||
try {
|
||||
Class<?> clazz = classLoader.loadClass(className);
|
||||
Service obj = (Service) clazz.getDeclaredConstructor().newInstance();
|
||||
application.getResourceFactory().inject(obj);
|
||||
obj.init(zhub);
|
||||
resourceFactory.register(zhub.get("name"), clazz, obj);
|
||||
} catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException | ClassNotFoundException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preShutdown(Application application) {
|
||||
|
||||
}
|
||||
}
|
@ -1,21 +0,0 @@
|
||||
package net.tccn;
|
||||
|
||||
import net.tccn.zhub.ZHubClient;
|
||||
import org.redkale.annotation.Priority;
|
||||
import org.redkale.cluster.ClusterAgent;
|
||||
import org.redkale.cluster.ClusterAgentProvider;
|
||||
import org.redkale.util.AnyValue;
|
||||
|
||||
@Priority(1)
|
||||
public class ZhubProvider implements ClusterAgentProvider {
|
||||
|
||||
@Override
|
||||
public boolean acceptsConf(AnyValue config) {
|
||||
return new ZHubClient().acceptsConf(config);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterAgent createInstance() {
|
||||
return new ZHubClient();
|
||||
}
|
||||
}
|
@ -2,10 +2,11 @@ package net.tccn.zhub;
|
||||
|
||||
// ================================================== lock ==================================================
|
||||
public class Lock {
|
||||
private String name;
|
||||
private String uuid;
|
||||
private int duration;
|
||||
private ZHubClient hubClient;
|
||||
protected String name;
|
||||
protected String uuid;
|
||||
protected int duration;
|
||||
protected boolean success;
|
||||
protected ZHubClient hubClient;
|
||||
|
||||
protected Lock(String name, String uuid, int duration, ZHubClient hubClient) {
|
||||
this.name = name;
|
||||
@ -17,4 +18,8 @@ public class Lock {
|
||||
public void unLock() {
|
||||
hubClient.send("unlock", name, uuid);
|
||||
}
|
||||
|
||||
public boolean success() {
|
||||
return success;
|
||||
}
|
||||
}
|
||||
|
@ -2,14 +2,9 @@ package net.tccn.zhub;
|
||||
|
||||
import net.tccn.*;
|
||||
import net.tccn.timer.Timers;
|
||||
import org.redkale.annotation.AutoLoad;
|
||||
import org.redkale.annotation.ResourceType;
|
||||
import org.redkale.service.Local;
|
||||
import org.redkale.service.Service;
|
||||
import org.redkale.util.AnyValue;
|
||||
import org.redkale.util.Comment;
|
||||
import org.redkale.util.TypeToken;
|
||||
import org.redkale.util.Utility;
|
||||
import org.redkale.util.*;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
@ -18,16 +13,15 @@ import java.io.OutputStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.net.SocketException;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
|
||||
@Local
|
||||
@AutoLoad(false)
|
||||
@ResourceType(ZHubClient.class)
|
||||
@ -57,38 +51,8 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
||||
|
||||
private static Map<String, ZHubClient> mainHub = new HashMap<>(); // 127.0.0.1:1216 - ZHubClient
|
||||
|
||||
public ZHubClient() {
|
||||
|
||||
}
|
||||
|
||||
public ZHubClient(String name, Map<String, String> attr) {
|
||||
this.APP_NAME = name;
|
||||
this.addr = attr.get("addr");
|
||||
this.groupid = attr.get("groupid");
|
||||
this.auth = attr.get("auth");
|
||||
|
||||
this.initClient(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(AnyValue config) {
|
||||
APP_NAME = application.getName();
|
||||
/*if (!preInit()) {
|
||||
return;
|
||||
}*/
|
||||
|
||||
if (config == null) {
|
||||
initClient(null);
|
||||
} else {
|
||||
Map<String, AnyValue> nodes = getNodes(config);
|
||||
for (String rsName : nodes.keySet()) {
|
||||
ZHubClient client = new ZHubClient().initClient(nodes.get(rsName));
|
||||
application.getResourceFactory().register(rsName, client);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private ZHubClient initClient(AnyValue config) {
|
||||
// 自动注入
|
||||
if (config != null) {
|
||||
addr = config.getValue("addr", addr);
|
||||
@ -104,17 +68,18 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
||||
}
|
||||
|
||||
// 设置第一个启动的 实例为主实例
|
||||
if (!mainHub.containsKey(addr)) { // 确保同步执行此 init 逻辑
|
||||
mainHub.put(addr, this);
|
||||
synchronized (ZHubClient.class) {
|
||||
if (!mainHub.containsKey(addr)) { // 确保同步执行此 init 逻辑
|
||||
mainHub.put(addr, this);
|
||||
}
|
||||
}
|
||||
|
||||
CompletableFuture.runAsync(() -> {
|
||||
if (!initSocket(0)) {
|
||||
return;
|
||||
}
|
||||
// 消息 事件接收
|
||||
new Thread(() -> {
|
||||
if (!initSocket(0)) {
|
||||
return;
|
||||
}
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
String readLine = reader.readLine();
|
||||
@ -162,11 +127,24 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
||||
Lock lock = lockTag.get(value);
|
||||
if (lock != null) {
|
||||
synchronized (lock) {
|
||||
lock.success = true;
|
||||
lock.notifyAll();
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
// trylock msg
|
||||
if ("trylock".equals(topic)) {
|
||||
Lock lock = lockTag.get(value);
|
||||
if (lock != null) {
|
||||
synchronized (lock) {
|
||||
lock.success = false;
|
||||
lock.notifyAll();
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
// rpc back msg
|
||||
if (APP_NAME.equals(topic)) {
|
||||
rpcBackQueue.add(Event.of(topic, value));
|
||||
@ -211,116 +189,117 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
||||
}).thenAcceptAsync(x -> {
|
||||
// 定时调度事件,已加入耗时监控
|
||||
new Thread(() -> {
|
||||
ExecutorService pool = Executors.newFixedThreadPool(1);
|
||||
ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||
while (true) {
|
||||
Timer timer = null;
|
||||
try {
|
||||
if ((timer = timerQueue.take()) == null) {
|
||||
return;
|
||||
}
|
||||
timer = timerQueue.take();
|
||||
long start = System.currentTimeMillis();
|
||||
pool.submit(timer.runnable).get(5, TimeUnit.SECONDS);
|
||||
executor.submit(timer.runnable).get(5, TimeUnit.SECONDS);
|
||||
long end = System.currentTimeMillis();
|
||||
logger.finest(String.format("timer [%s] : elapsed time %s ms", timer.name, end - start));
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
} catch (TimeoutException e) {
|
||||
logger.log(Level.SEVERE, "timer [" + timer.name + "] time out: " + 5 + " S", e);
|
||||
pool = Executors.newFixedThreadPool(1);
|
||||
} catch (Exception e) {
|
||||
logger.log(Level.WARNING, "timer [" + timer.name + "]", e);
|
||||
} catch (InterruptedException | ExecutionException | TimeoutException e) {
|
||||
if (e instanceof TimeoutException) {
|
||||
executor = Executors.newSingleThreadExecutor();
|
||||
logger.log(Level.WARNING, "TimeoutException [" + timer.name + "]", e);
|
||||
} else {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}).start();
|
||||
|
||||
// topic msg,已加入耗时监控
|
||||
new Thread(() -> {
|
||||
ExecutorService pool = Executors.newFixedThreadPool(1);
|
||||
ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||
while (true) {
|
||||
Event<String> event = null;
|
||||
try {
|
||||
if ((event = topicQueue.take()) == null) {
|
||||
continue;
|
||||
}
|
||||
event = topicQueue.take();
|
||||
logger.log(Level.FINE, "topic[" + event.topic + "] :" + event.value);
|
||||
|
||||
String topic = event.topic;
|
||||
String value = event.value;
|
||||
pool.submit(() -> accept(topic, value)).get(5, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
} catch (TimeoutException e) {
|
||||
logger.log(Level.SEVERE, "topic[" + event.topic + "] event deal time out: " + 5 + " S, value: " + event.value, e);
|
||||
pool = Executors.newFixedThreadPool(1);
|
||||
} catch (Exception e) {
|
||||
logger.log(Level.WARNING, "topic[" + event.topic + "] event accept error :" + event.value, e);
|
||||
executor.submit(() -> accept(topic, value)).get(5, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException | ExecutionException | TimeoutException e) {
|
||||
if (e instanceof TimeoutException) {
|
||||
executor = Executors.newSingleThreadExecutor();
|
||||
logger.log(Level.WARNING, "TimeoutException, topic[" + event.topic + "], value[" + event.value + "]", e);
|
||||
} else if (event != null) {
|
||||
logger.log(Level.WARNING, "topic[" + event.topic + "] event accept error :" + event.value, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}).start();
|
||||
}, "ZHub-topic-accept").start();
|
||||
|
||||
// rpc back ,仅做数据解析,暂无耗时监控
|
||||
new Thread(() -> {
|
||||
ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||
while (true) {
|
||||
Event<String> event = null;
|
||||
try {
|
||||
if ((event = rpcBackQueue.take()) == null) {
|
||||
continue;
|
||||
event = rpcBackQueue.take();
|
||||
logger.info(String.format("rpc-back:[%s]", event.value));
|
||||
|
||||
String value = event.value;
|
||||
executor.submit(() -> rpcAccept(value)).get(5, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException | ExecutionException | TimeoutException e) {
|
||||
if (e instanceof TimeoutException) {
|
||||
executor = Executors.newSingleThreadExecutor();
|
||||
logger.log(Level.WARNING, "rpc-back TimeoutException, topic[" + event.topic + "], value[" + event.value + "]", e);
|
||||
} else if (event != null) {
|
||||
logger.log(Level.WARNING, "rpc-back[" + event.value + "] event accept error :" + event.value, e);
|
||||
}
|
||||
//if (event)
|
||||
logger.finest(String.format("rpc-back:[%s]: %s", event.topic, event.value));
|
||||
rpcAccept(event.value);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
} catch (Exception e) {
|
||||
logger.log(Level.WARNING, "rpc-back[" + event.topic + "] event accept error :" + event.value, e);
|
||||
}
|
||||
}
|
||||
}).start();
|
||||
}, "ZHub-rpc-call").start();
|
||||
|
||||
// rpc call,已加入耗时监控
|
||||
new Thread(() -> {
|
||||
ExecutorService pool = Executors.newFixedThreadPool(1);
|
||||
ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||
while (true) {
|
||||
Event<String> event = null;
|
||||
try {
|
||||
if ((event = rpcCallQueue.take()) == null) {
|
||||
continue;
|
||||
}
|
||||
logger.finest(String.format("rpc-call:[%s] %s", event.topic, event.value));
|
||||
event = rpcCallQueue.take();
|
||||
logger.info(String.format("rpc-call:[%s] %s", event.topic, event.value));
|
||||
|
||||
String topic = event.topic;
|
||||
String value = event.value;
|
||||
pool.submit(() -> accept(topic, value)).get(5, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
} catch (TimeoutException e) {
|
||||
logger.log(Level.SEVERE, "topic[" + event.topic + "] event deal time out: " + 5 + " S, value: " + event.value, e);
|
||||
pool = Executors.newFixedThreadPool(1);
|
||||
} catch (Exception e) {
|
||||
logger.log(Level.WARNING, "rpc-call[" + event.topic + "] event accept error :" + event.value, e);
|
||||
executor.submit(() -> accept(topic, value)).get(5, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException | ExecutionException | TimeoutException e) {
|
||||
if (e instanceof TimeoutException) {
|
||||
executor = Executors.newSingleThreadExecutor();
|
||||
logger.log(Level.WARNING, "rpc-call TimeoutException, topic[" + event.topic + "], value[" + event.value + "]", e);
|
||||
} else if (event != null) {
|
||||
logger.log(Level.WARNING, "rpc-call[" + event.topic + "] event accept error :" + event.value, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}).start();
|
||||
}, "ZHub-rpc-call").start();
|
||||
|
||||
// send msg
|
||||
new Thread(() -> {
|
||||
while (true) {
|
||||
String msg = null;
|
||||
try {
|
||||
if ((msg = sendMsgQueue.take()) == null) {
|
||||
continue;
|
||||
}
|
||||
// logger.log(Level.FINEST, "send-msg: [" + msg + "]");
|
||||
writer.write(msg.getBytes());
|
||||
msg = sendMsgQueue.take();
|
||||
writer.write(msg.getBytes(UTF_8));
|
||||
writer.flush();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
} catch (Exception e) {
|
||||
} catch (InterruptedException | IOException e) {
|
||||
logger.log(Level.WARNING, "send-msg[" + msg + "] event accept error :", e);
|
||||
|
||||
try {
|
||||
Thread.sleep(5000);
|
||||
assert msg != null;
|
||||
writer.write(msg.getBytes(UTF_8));
|
||||
writer.flush();
|
||||
} catch (IOException | InterruptedException | NullPointerException ex) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}).start();
|
||||
});
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
public boolean acceptsConf(AnyValue config) {
|
||||
@ -413,7 +392,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
||||
client.setKeepAlive(true);
|
||||
|
||||
writer = client.getOutputStream();
|
||||
reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
|
||||
reader = new BufferedReader(new InputStreamReader(client.getInputStream(), UTF_8));
|
||||
|
||||
String groupid = getGroupid();
|
||||
if (groupid == null || groupid.isEmpty()) {
|
||||
@ -422,7 +401,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
||||
send("auth", auth);
|
||||
send("groupid " + groupid);
|
||||
|
||||
StringBuffer buf = new StringBuffer("subscribe lock");
|
||||
StringBuffer buf = new StringBuffer("subscribe lock trylock");
|
||||
if (mainHub.containsValue(this)) {
|
||||
buf.append(" " + APP_NAME);
|
||||
}
|
||||
@ -527,32 +506,35 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
||||
// ================================================== lock ==================================================
|
||||
private Map<String, Lock> lockTag = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 尝试加锁,立即返回,
|
||||
*
|
||||
* @param key
|
||||
* @param duration
|
||||
* @return Lock: lock.success 锁定是否成功标识
|
||||
*/
|
||||
public Lock tryLock(String key, int duration) {
|
||||
String uuid = Utility.uuid();
|
||||
Lock lock = new Lock(key, uuid, duration, this);
|
||||
lockTag.put(uuid, lock);
|
||||
|
||||
try {
|
||||
// c.send("lock", key, uuid, strconv.Itoa(duration))
|
||||
send("lock", key, uuid, String.valueOf(duration));
|
||||
synchronized (lock) {
|
||||
lock.wait();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return lock;
|
||||
return lock("trylock", key, duration);
|
||||
}
|
||||
|
||||
// 为替换 tryLock 方法做过度准确
|
||||
public Lock lock(String key, int duration) {
|
||||
String uuid = Utility.uuid();
|
||||
return lock("lock", key, duration);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param cmd lock|trylock
|
||||
* @param key 加锁 key
|
||||
* @param duration 锁定时长
|
||||
* @return
|
||||
*/
|
||||
private Lock lock(String cmd, String key, int duration) {
|
||||
String uuid = UUID.randomUUID().toString().replaceAll("-", "");
|
||||
Lock lock = new Lock(key, uuid, duration, this);
|
||||
lockTag.put(uuid, lock);
|
||||
|
||||
try {
|
||||
// c.send("lock", key, uuid, strconv.Itoa(duration))
|
||||
send("lock", key, uuid, String.valueOf(duration));
|
||||
send(cmd, key, uuid, String.valueOf(duration));
|
||||
synchronized (lock) {
|
||||
lock.wait();
|
||||
}
|
||||
@ -602,8 +584,8 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
||||
private static Map<String, TypeToken> rpcRetType = new ConcurrentHashMap<>();
|
||||
|
||||
@Comment("rpc call")
|
||||
public RpcResult<Void> rpc(String topic, Object v) {
|
||||
return rpc(topic, v, null);
|
||||
public RpcResult<String> rpc(String topic, Object v) {
|
||||
return rpc(topic, v, IType.STRING);
|
||||
}
|
||||
|
||||
@Comment("rpc call")
|
||||
@ -652,8 +634,8 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
|
||||
return rpc.getRpcResult();
|
||||
}
|
||||
|
||||
public <T, R> CompletableFuture<RpcResult<R>> rpcAsync(String topic, T v) {
|
||||
return CompletableFuture.supplyAsync(() -> rpc(topic, v, null));
|
||||
public <T> CompletableFuture<RpcResult<String>> rpcAsync(String topic, T v) {
|
||||
return CompletableFuture.supplyAsync(() -> rpc(topic, v, IType.STRING));
|
||||
}
|
||||
|
||||
public <T, R> CompletableFuture<RpcResult<R>> rpcAsync(String topic, T v, TypeToken<R> typeToken) {
|
||||
|
@ -1,131 +0,0 @@
|
||||
/*
|
||||
* Click nbfs://nbhost/SystemFileSystem/Templates/Licenses/license-default.txt to change this license
|
||||
* Click nbfs://nbhost/SystemFileSystem/Templates/Classes/Class.java to edit this template
|
||||
*/
|
||||
package org.redkalex.cache.redis;
|
||||
|
||||
import org.redkale.annotation.Resource;
|
||||
import org.redkale.convert.Convert;
|
||||
import org.redkale.convert.json.JsonConvert;
|
||||
import org.redkale.source.AbstractCacheSource;
|
||||
import org.redkale.util.AnyValue;
|
||||
import org.redkale.util.RedkaleClassLoader;
|
||||
import org.redkale.util.RedkaleException;
|
||||
import org.redkale.util.ResourceFactory;
|
||||
|
||||
import java.lang.reflect.Type;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
/**
|
||||
* @author zhangjx
|
||||
* @since 2.8.0
|
||||
*/
|
||||
public abstract class AbstractRedisSource extends AbstractCacheSource {
|
||||
|
||||
public static final String CACHE_SOURCE_CRYPTOR = "cryptor";
|
||||
|
||||
protected String name;
|
||||
|
||||
@Resource(required = false)
|
||||
protected ResourceFactory resourceFactory;
|
||||
|
||||
@Resource(required = false)
|
||||
protected JsonConvert defaultConvert;
|
||||
|
||||
@Resource(name = "$_convert", required = false)
|
||||
protected JsonConvert convert;
|
||||
|
||||
protected int db;
|
||||
|
||||
protected RedisCryptor cryptor;
|
||||
|
||||
protected AnyValue config;
|
||||
|
||||
@Override
|
||||
public void init(AnyValue conf) {
|
||||
this.config = conf;
|
||||
super.init(conf);
|
||||
this.name = conf.getValue("name", "");
|
||||
if (this.convert == null) this.convert = this.defaultConvert;
|
||||
if (conf != null) {
|
||||
String cryptStr = conf.getValue(CACHE_SOURCE_CRYPTOR, "").trim();
|
||||
if (!cryptStr.isEmpty()) {
|
||||
try {
|
||||
Class<RedisCryptor> cryptClass = (Class) getClass().getClassLoader().loadClass(cryptStr);
|
||||
RedkaleClassLoader.putReflectionPublicConstructors(cryptClass, cryptClass.getName());
|
||||
this.cryptor = cryptClass.getConstructor().newInstance();
|
||||
} catch (ReflectiveOperationException e) {
|
||||
throw new RedkaleException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (cryptor != null) {
|
||||
if (resourceFactory != null) {
|
||||
resourceFactory.inject(cryptor);
|
||||
}
|
||||
cryptor.init(conf);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy(AnyValue conf) {
|
||||
super.destroy(conf);
|
||||
if (cryptor != null) {
|
||||
cryptor.destroy(conf);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception { //在 Application 关闭时调用
|
||||
destroy(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String resourceName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
protected String decryptValue(String key, RedisCryptor cryptor, String value) {
|
||||
return cryptor != null ? cryptor.decrypt(key, value) : value;
|
||||
}
|
||||
|
||||
protected <T> T decryptValue(String key, RedisCryptor cryptor, Type type, byte[] bs) {
|
||||
return decryptValue(key, cryptor, convert, type, bs);
|
||||
}
|
||||
|
||||
protected <T> T decryptValue(String key, RedisCryptor cryptor, Convert c, Type type, byte[] bs) {
|
||||
if (bs == null) return null;
|
||||
if (type == byte[].class) return (T) bs;
|
||||
if (cryptor == null || (type instanceof Class && (((Class) type).isPrimitive() || Number.class.isAssignableFrom((Class) type)))) {
|
||||
return (T) (c == null ? this.convert : c).convertFrom(type, bs);
|
||||
}
|
||||
String deval = cryptor.decrypt(key, new String(bs, StandardCharsets.UTF_8));
|
||||
return deval == null ? null : (T) (c == null ? this.convert : c).convertFrom(type, deval.getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
protected String encryptValue(String key, RedisCryptor cryptor, String value) {
|
||||
return cryptor != null ? cryptor.encrypt(key, value) : value;
|
||||
}
|
||||
|
||||
protected <T> byte[] encryptValue(String key, RedisCryptor cryptor, Convert c, T value) {
|
||||
return encryptValue(key, cryptor, null, c, value);
|
||||
}
|
||||
|
||||
protected <T> byte[] encryptValue(String key, RedisCryptor cryptor, Type type, Convert c, T value) {
|
||||
if (value == null) return null;
|
||||
Type t = type == null ? value.getClass() : type;
|
||||
if (cryptor == null && type == String.class) {
|
||||
return value.toString().getBytes(StandardCharsets.UTF_8);
|
||||
}
|
||||
return encryptValue(key, cryptor, t, (c == null ? this.convert : c).convertToBytes(t, value));
|
||||
}
|
||||
|
||||
protected byte[] encryptValue(String key, RedisCryptor cryptor, Type type, byte[] bs) {
|
||||
if (bs == null) return null;
|
||||
if (cryptor == null || (type instanceof Class && (((Class) type).isPrimitive() || Number.class.isAssignableFrom((Class) type)))) {
|
||||
return bs;
|
||||
}
|
||||
String enval = cryptor.encrypt(key, new String(bs, StandardCharsets.UTF_8));
|
||||
return enval == null ? null : enval.getBytes(StandardCharsets.UTF_8);
|
||||
}
|
||||
}
|
@ -1,27 +1,299 @@
|
||||
package org.redkalex.cache.redis;
|
||||
|
||||
import org.redkale.annotation.AutoLoad;
|
||||
import org.redkale.annotation.ResourceType;
|
||||
|
||||
import org.redkale.convert.Convert;
|
||||
import org.redkale.service.Local;
|
||||
import org.redkale.source.CacheSource;
|
||||
import org.redkale.util.AnyValue;
|
||||
import org.redkale.util.AutoLoad;
|
||||
import org.redkale.util.ResourceType;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.lang.reflect.Type;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@Local
|
||||
@AutoLoad(false)
|
||||
@ResourceType(CacheSource.class)
|
||||
public class MyRedisCacheSource extends RedisCacheSource {
|
||||
|
||||
@Override
|
||||
public void init(AnyValue conf) {
|
||||
super.init(conf);
|
||||
public class MyRedisCacheSource<V extends Object> extends RedisCacheSource<V> {
|
||||
//--------------------- oth ------------------------------
|
||||
public boolean setnx(String key, Object v) {
|
||||
byte[][] bytes = Stream.of(key, v).map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
|
||||
Serializable rs = send("SETNX", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
|
||||
return rs == null ? false : (long) rs == 1;
|
||||
}
|
||||
//--------------------- oth ------------------------------
|
||||
|
||||
//--------------------- bit ------------------------------
|
||||
public boolean getBit(String key, int offset) {
|
||||
byte[][] bytes = Stream.of(key, offset).map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
|
||||
Serializable v = send("GETBIT", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
|
||||
return v == null ? false : (long) v == 1;
|
||||
}
|
||||
|
||||
public void setBit(String key, int offset, boolean bool) {
|
||||
byte[][] bytes = Stream.of(key, offset, bool ? 1 : 0).map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
|
||||
send("SETBIT", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
|
||||
}
|
||||
|
||||
//--------------------- bit ------------------------------
|
||||
//--------------------- lock ------------------------------
|
||||
// 尝试加锁,成功返回0,否则返回上一锁剩余毫秒值
|
||||
public int tryLock(String key, int millis) {
|
||||
byte[][] bytes = Stream.of("" +
|
||||
"if (redis.call('exists',KEYS[1]) == 0) then " +
|
||||
"redis.call('psetex', KEYS[1], ARGV[1], 1) " +
|
||||
"return 0; " +
|
||||
"else " +
|
||||
"return redis.call('PTTL', KEYS[1]); " +
|
||||
"end; ", 1, key, millis).map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
|
||||
int n = (int) send("EVAL", CacheEntryType.OBJECT, (Type) null, null, bytes).join();
|
||||
return n;
|
||||
}
|
||||
|
||||
// 加锁
|
||||
public void lock(String key, int millis) {
|
||||
int i;
|
||||
do {
|
||||
i = tryLock(key, millis);
|
||||
try {
|
||||
Thread.sleep(10);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
} while (i > 0);
|
||||
}
|
||||
|
||||
// 解锁
|
||||
public void unlock(String key) {
|
||||
remove(key);
|
||||
}
|
||||
|
||||
|
||||
//--------------------- key ------------------------------
|
||||
|
||||
public long getTtl(String key) {
|
||||
return (long) send("TTL", CacheEntryType.OBJECT, (Type) null, key, key.getBytes(StandardCharsets.UTF_8)).join();
|
||||
}
|
||||
|
||||
public long getPttl(String key) {
|
||||
return (long) send("PTTL", CacheEntryType.OBJECT, (Type) null, key, key.getBytes(StandardCharsets.UTF_8)).join();
|
||||
}
|
||||
|
||||
public int remove(String... keys) {
|
||||
if (keys == null || keys.length == 0) {
|
||||
return 0;
|
||||
}
|
||||
List<String> para = new ArrayList<>();
|
||||
para.add("" +
|
||||
" local args = ARGV;" +
|
||||
" local x = 0;" +
|
||||
" for i,v in ipairs(args) do" +
|
||||
" local inx = redis.call('del', v);" +
|
||||
" if(inx > 0) then" +
|
||||
" x = x + 1;" +
|
||||
" end" +
|
||||
" end" +
|
||||
" return x;");
|
||||
|
||||
para.add("0");
|
||||
for (Object field : keys) {
|
||||
para.add(String.valueOf(field));
|
||||
}
|
||||
byte[][] bytes = para.stream().map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
|
||||
return (int) send("EVAL", CacheEntryType.OBJECT, (Type) null, null, bytes).join();
|
||||
}
|
||||
|
||||
//--------------------- hmget ------------------------------
|
||||
public <T extends Object> V getHm(String key, T field) {
|
||||
// return (V) send("HMGET", CacheEntryType.OBJECT, (Type) null, key, key.getBytes(StandardCharsets.UTF_8), field.getBytes(StandardCharsets.UTF_8)).join();
|
||||
Map<Object, V> map = getHms(key, field);
|
||||
return map.get(field);
|
||||
}
|
||||
|
||||
public <T extends Object> Map<T, V> getHms(String key, T... field) {
|
||||
if (field == null || field.length == 0) {
|
||||
return new HashMap<>();
|
||||
}
|
||||
byte[][] bytes = Stream.concat(Stream.of(key), Stream.of(field)).map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
|
||||
Map<T, V> result = new HashMap<>();
|
||||
|
||||
List<V> vs = (List) send("HMGET", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
|
||||
for (int i = 0; i < field.length; i++) { // /*vs != null && vs.size() > i &&*/
|
||||
if (vs.get(i) == null) {
|
||||
continue;
|
||||
}
|
||||
result.put(field[i], vs.get(i));
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
public Map<String, V> getHmall(String key) {
|
||||
List<V> vs = (List) send("HGETALL", CacheEntryType.OBJECT, (Type) null, key, key.getBytes(StandardCharsets.UTF_8)).join();
|
||||
Map<String, V> result = new HashMap<>(vs.size() / 2);
|
||||
for (int i = 0; i < vs.size(); i += 2) {
|
||||
result.put(String.valueOf(vs.get(i)), vs.get(i + 1));
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
//--------------------- hmset、hmdel、incr ------------------------------
|
||||
public <T> void setHm(String key, T field, V value) {
|
||||
byte[][] bytes = Stream.of(key, field, value).map(x -> x.toString().getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
|
||||
send("HMSET", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
|
||||
}
|
||||
|
||||
public <T> void setHms(String key, Map<T, V> kv) {
|
||||
List<String> args = new ArrayList();
|
||||
args.add(key);
|
||||
|
||||
kv.forEach((k, v) -> {
|
||||
args.add(String.valueOf(k));
|
||||
args.add(String.valueOf(v));
|
||||
});
|
||||
|
||||
byte[][] bytes = args.stream().map(x -> x.getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
|
||||
send("HMSET", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
|
||||
}
|
||||
|
||||
public <T> Long incrHm(String key, T field, long n) {
|
||||
byte[][] bytes = Stream.of(key, String.valueOf(field), String.valueOf(n)).map(x -> x.getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
|
||||
return (Long) send("HINCRBY", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
|
||||
}
|
||||
|
||||
public <T> Double incrHm(String key, T field, double n) {
|
||||
byte[][] bytes = Stream.of(key, String.valueOf(field), String.valueOf(n)).map(x -> x.getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
|
||||
Serializable v = send("HINCRBYFLOAT", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
|
||||
if (v == null) {
|
||||
return null;
|
||||
}
|
||||
return Double.parseDouble(String.valueOf(v));
|
||||
}
|
||||
|
||||
public <T> void hdel(String key, T... field) {
|
||||
byte[][] bytes = Stream.concat(Stream.of(key), Stream.of(field)).map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
|
||||
send("HDEL", null, (Type) null, key, bytes).join();
|
||||
}
|
||||
|
||||
public <T> List<T> zexists(String key, T... fields) {
|
||||
if (fields == null || fields.length == 0) {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
List<String> para = new ArrayList<>();
|
||||
para.add("" +
|
||||
" local key = KEYS[1];" +
|
||||
" local args = ARGV;" +
|
||||
" local result = {};" +
|
||||
" for i,v in ipairs(args) do" +
|
||||
" local inx = redis.call('ZREVRANK', key, v);" +
|
||||
" if(inx) then" +
|
||||
" table.insert(result,1,v);" +
|
||||
" end" +
|
||||
" end" +
|
||||
" return result;");
|
||||
para.add("1");
|
||||
para.add(key);
|
||||
for (Object field : fields) {
|
||||
para.add(String.valueOf(field));
|
||||
}
|
||||
byte[][] bytes = para.stream().map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
|
||||
return (List<T>) send("EVAL", CacheEntryType.OBJECT, (Type) null, null, bytes).join();
|
||||
}
|
||||
|
||||
//--------------------- set ------------------------------
|
||||
public <T> T srandomItem(String key) {
|
||||
byte[][] bytes = Stream.of(key, 1).map(x -> formatValue(CacheEntryType.OBJECT, (Convert) null, (Type) null, x)).toArray(byte[][]::new);
|
||||
List<T> list = (List) send("SRANDMEMBER", null, (Type) null, key, bytes).join();
|
||||
return list != null && !list.isEmpty() ? list.get(0) : null;
|
||||
}
|
||||
|
||||
public <T> List<T> srandomItems(String key, int n) {
|
||||
byte[][] bytes = Stream.of(key, n).map(x -> formatValue(CacheEntryType.OBJECT, (Convert) null, (Type) null, x)).toArray(byte[][]::new);
|
||||
return (List) send("SRANDMEMBER", null, (Type) null, key, bytes).join();
|
||||
}
|
||||
|
||||
//--------------------- list ------------------------------
|
||||
public CompletableFuture<Void> appendListItemsAsync(String key, V... values) {
|
||||
byte[][] bytes = Stream.concat(Stream.of(key), Stream.of(values)).map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
|
||||
return (CompletableFuture) send("RPUSH", null, (Type) null, key, bytes);
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> lpushListItemAsync(String key, V value) {
|
||||
return (CompletableFuture) send("LPUSH", null, (Type) null, key, key.getBytes(StandardCharsets.UTF_8), formatValue(CacheEntryType.OBJECT, (Convert) null, (Type) null, value));
|
||||
}
|
||||
|
||||
public void lpushListItem(String key, V value) {
|
||||
lpushListItemAsync(key, value).join();
|
||||
}
|
||||
|
||||
public void appendListItems(String key, V... values) {
|
||||
appendListItemsAsync(key, values).join();
|
||||
}
|
||||
|
||||
public void appendSetItems(String key, V... values) {
|
||||
// todo:
|
||||
for (V v : values) {
|
||||
appendSetItem(key, v);
|
||||
}
|
||||
}
|
||||
|
||||
// 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
||||
public CompletableFuture<Collection<V>> getCollectionAsync(String key, int offset, int limit) {
|
||||
return (CompletableFuture) send("OBJECT", null, (Type) null, key, "ENCODING".getBytes(StandardCharsets.UTF_8), key.getBytes(StandardCharsets.UTF_8)).thenCompose(t -> {
|
||||
if (t == null) return CompletableFuture.completedFuture(null);
|
||||
if (new String((byte[]) t).contains("list")) { //list
|
||||
return send("LRANGE", CacheEntryType.OBJECT, (Type) null, false, key, key.getBytes(StandardCharsets.UTF_8), String.valueOf(offset).getBytes(StandardCharsets.UTF_8), String.valueOf(offset + limit - 1).getBytes(StandardCharsets.UTF_8));
|
||||
} else {
|
||||
return send("SMEMBERS", CacheEntryType.OBJECT, (Type) null, true, key, key.getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public Collection<V> getCollection(String key, int offset, int limit) {
|
||||
return getCollectionAsync(key, offset, limit).join();
|
||||
}
|
||||
|
||||
public V brpop(String key, int seconds) {
|
||||
byte[][] bytes = Stream.concat(Stream.of(key), Stream.of(seconds)).map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
|
||||
return (V) send("BRPOP", null, (Type) null, key, bytes).join();
|
||||
}
|
||||
|
||||
//--------------------- zset ------------------------------
|
||||
public <N extends Number> void zadd(String key, Map<V, N> kv) {
|
||||
if (kv == null || kv.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
List<String> args = new ArrayList();
|
||||
args.add(key);
|
||||
|
||||
kv.forEach((k, v) -> {
|
||||
args.add(String.valueOf(v));
|
||||
args.add(String.valueOf(k));
|
||||
});
|
||||
|
||||
byte[][] bytes = args.stream().map(x -> x.getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
|
||||
send("ZADD", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
|
||||
}
|
||||
|
||||
public <N extends Number> double zincr(String key, Object number, N n) {
|
||||
byte[][] bytes = Stream.of(key, n, number).map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
|
||||
Serializable v = send("ZINCRBY", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
|
||||
return Double.parseDouble(String.valueOf(v));
|
||||
}
|
||||
|
||||
public void zrem(String key, V... vs) {
|
||||
List<String> args = new ArrayList();
|
||||
args.add(key);
|
||||
for (V v : vs) {
|
||||
args.add(String.valueOf(v));
|
||||
}
|
||||
byte[][] bytes = args.stream().map(x -> x.getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
|
||||
send("ZREM", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
|
||||
}
|
||||
/*
|
||||
//--------------------- zset ------------------------------
|
||||
|
||||
public int getZrank(String key, V v) {
|
||||
byte[][] bytes = Stream.of(key, v).map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
|
||||
@ -142,202 +414,17 @@ public class MyRedisCacheSource extends RedisCacheSource {
|
||||
}
|
||||
return map;
|
||||
}
|
||||
* */
|
||||
|
||||
// --------------------
|
||||
public <N extends Number> void zadd(String key, Map<Serializable, N> kv) {
|
||||
if (kv == null || kv.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
List<Serializable> args = new ArrayList();
|
||||
kv.forEach((k, v) -> {
|
||||
args.add(k);
|
||||
args.add(v);
|
||||
});
|
||||
// ----------
|
||||
protected byte[] formatValue(CacheEntryType cacheType, Convert convert0, Type resultType, Object value) {
|
||||
if (value == null) return "null".getBytes(StandardCharsets.UTF_8);
|
||||
if (convert0 == null) convert0 = convert;
|
||||
if (cacheType == CacheEntryType.LONG || cacheType == CacheEntryType.ATOMIC)
|
||||
return String.valueOf(value).getBytes(StandardCharsets.UTF_8);
|
||||
if (cacheType == CacheEntryType.STRING) return convert0.convertToBytes(String.class, value);
|
||||
|
||||
sendAsync("ZADD", key, args.toArray(Serializable[]::new)).join();
|
||||
if (value instanceof String) return String.valueOf(value).getBytes(StandardCharsets.UTF_8);
|
||||
if (value instanceof Number) return String.valueOf(value).getBytes(StandardCharsets.UTF_8);
|
||||
return convert0.convertToBytes(resultType == null ? objValueType : resultType, value);
|
||||
}
|
||||
|
||||
public <N extends Number> double zincr(String key, Serializable number, N n) {
|
||||
return sendAsync("ZINCRBY", key, number, n).thenApply(x -> x.getDoubleValue(0d)).join();
|
||||
}
|
||||
|
||||
public void zrem(String key, Serializable... vs) {
|
||||
sendAsync("ZREM", key, vs).join();
|
||||
}
|
||||
|
||||
/*public <T> List<T> zexists(String key, T... fields) {
|
||||
if (fields == null || fields.length == 0) {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
List<String> para = new ArrayList<>();
|
||||
para.add("" +
|
||||
" local key = KEYS[1];" +
|
||||
" local args = ARGV;" +
|
||||
" local result = {};" +
|
||||
" for i,v in ipairs(args) do" +
|
||||
" local inx = redis.call('ZREVRANK', key, v);" +
|
||||
" if(inx) then" +
|
||||
" table.insert(result,1,v);" +
|
||||
" end" +
|
||||
" end" +
|
||||
" return result;");
|
||||
para.add("1");
|
||||
para.add(key);
|
||||
for (Object field : fields) {
|
||||
para.add(String.valueOf(field));
|
||||
}
|
||||
|
||||
// todo:
|
||||
//sendAsync("EVAL", null, para.toArray(Serializable[]::new)).thenApply(x -> x.).join();
|
||||
|
||||
return null;
|
||||
}*/
|
||||
|
||||
//--------------------- bit ------------------------------
|
||||
public boolean getBit(String key, int offset) {
|
||||
return sendAsync("GETBIT", key, key.getBytes(StandardCharsets.UTF_8), String.valueOf(offset).getBytes(StandardCharsets.UTF_8)).thenApply(v -> v.getIntValue(0) > 0).join();
|
||||
}
|
||||
|
||||
public void setBit(String key, int offset, boolean bool) {
|
||||
sendAsync("SETBIT", key, offset, bool ? 1 : 0).join();
|
||||
}
|
||||
//--------------------- bit ------------------------------
|
||||
|
||||
//--------------------- lock ------------------------------
|
||||
// 尝试加锁,成功返回0,否则返回上一锁剩余毫秒值
|
||||
public long tryLock(String key, int millis) {
|
||||
Serializable[] obj = {"" +
|
||||
"if (redis.call('EXISTS',KEYS[1]) == 0) then " +
|
||||
"redis.call('PSETEX',KEYS[1],ARGV[1],1); " +
|
||||
"return 0; " +
|
||||
"else " +
|
||||
"return redis.call('PTTL',KEYS[1]); " +
|
||||
"end;", 1, key, millis
|
||||
};
|
||||
|
||||
return sendAsync("EVAL", null, obj).thenApply(v -> v.getIntValue(1)).join();
|
||||
}
|
||||
|
||||
// 加锁
|
||||
public void lock(String key, int millis) {
|
||||
long i;
|
||||
do {
|
||||
i = tryLock(key, millis);
|
||||
try {
|
||||
Thread.sleep(10);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
} while (i > 0);
|
||||
}
|
||||
|
||||
// 解锁
|
||||
public void unlock(String key) {
|
||||
remove(key);
|
||||
}
|
||||
|
||||
//--------------------- key ------------------------------
|
||||
|
||||
public String get(String key) {
|
||||
return get(key, String.class);
|
||||
}
|
||||
|
||||
public void set(String key, Serializable value) {
|
||||
sendAsync("SET", key, value).join();
|
||||
}
|
||||
|
||||
//--------------------- set ------------------------------
|
||||
public <T> void sadd(String key, Collection<T> args) {
|
||||
saddAsync(key, args.toArray(Serializable[]::new)).join();
|
||||
}
|
||||
|
||||
public void sadd(String key, Serializable... args) {
|
||||
saddAsync(key, Arrays.stream(args).toArray(Serializable[]::new)).join();
|
||||
}
|
||||
|
||||
public void srem(String key, Serializable... args) {
|
||||
sremAsync(key, args).join();
|
||||
}
|
||||
|
||||
public CompletableFuture<RedisCacheResult> saddAsync(String key, Serializable... args) {
|
||||
return sendAsync("SADD", key, args);
|
||||
}
|
||||
|
||||
public CompletableFuture<RedisCacheResult> sremAsync(String key, Serializable... args) {
|
||||
return sendAsync("SREM", key, args);
|
||||
}
|
||||
|
||||
//--------------------- hm ------------------------------
|
||||
|
||||
/*public Long incrHm(String key, String field, int value) {
|
||||
return sendAsync("HINCRBY", key, field, value).thenApply(x -> x.getLongValue(0l)).join();
|
||||
}
|
||||
|
||||
public Double incrHm(String key, String field, double value) {
|
||||
return sendAsync("HINCRBYFLOAT", key, field, value).thenApply(x -> x.getDoubleValue(0d)).join();
|
||||
}*/
|
||||
|
||||
public void setHm(String key, String field, Serializable value) {
|
||||
setHmsAsync(key, Map.of(field, value)).join();
|
||||
}
|
||||
|
||||
public void setHms(String key, Map kv) {
|
||||
setHmsAsync(key, kv).join();
|
||||
}
|
||||
|
||||
public CompletableFuture<RedisCacheResult> setHmsAsync(String key, Map<Serializable, Serializable> kv) {
|
||||
List<Serializable> args = new ArrayList();
|
||||
kv.forEach((k, v) -> {
|
||||
args.add(k);
|
||||
args.add(v);
|
||||
});
|
||||
|
||||
return sendAsync("HMSET", key, args.toArray(Serializable[]::new));
|
||||
}
|
||||
|
||||
public String getHm(String key, String field) {
|
||||
return getHm(key, String.class, field);
|
||||
}
|
||||
|
||||
public <T extends Serializable> T getHm(String key, Class<T> type, String field) {
|
||||
List<Serializable> list = super.hmget(key, type, field);
|
||||
if (list == null && list.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
return (T) list.get(0);
|
||||
}
|
||||
|
||||
public Map<String, String> getHms(String key, String... field) {
|
||||
return getHms(key, String.class, field);
|
||||
}
|
||||
|
||||
public <T extends Serializable> Map<String, T> getHms(String key, Class<T> type, String... field) {
|
||||
List<Serializable> list = super.hmget(key, type, field);
|
||||
if (list == null && list.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
Map<String, T> map = new HashMap<>(field.length);
|
||||
|
||||
for (int i = 0; i < field.length; i++) {
|
||||
if (list.get(i) == null) {
|
||||
continue;
|
||||
}
|
||||
map.put(field[i], (T) list.get(i));
|
||||
}
|
||||
return map;
|
||||
}
|
||||
|
||||
/*public Map<String, Object> getHmall(String key) {
|
||||
List<String> list = null; // TODO:
|
||||
Map<String, Object> map = new HashMap<>();
|
||||
if (list.isEmpty()) {
|
||||
return map;
|
||||
}
|
||||
|
||||
for (int i = 0; i + 1 < list.size(); i += 2) {
|
||||
map.put((String) list.get(i), list.get(i + 1));
|
||||
}
|
||||
return map;
|
||||
}*/
|
||||
}
|
||||
|
@ -1,78 +0,0 @@
|
||||
/*
|
||||
*
|
||||
*/
|
||||
package org.redkalex.cache.redis;
|
||||
|
||||
import org.redkale.util.Utility;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
/**
|
||||
* @author zhangjx
|
||||
*/
|
||||
public class RedisCRC16 {
|
||||
|
||||
private static final int[] LOOKUP_TABLE = {0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50A5, 0x60C6,
|
||||
0x70E7, 0x8108, 0x9129, 0xA14A, 0xB16B, 0xC18C, 0xD1AD, 0xE1CE, 0xF1EF, 0x1231, 0x0210, 0x3273,
|
||||
0x2252, 0x52B5, 0x4294, 0x72F7, 0x62D6, 0x9339, 0x8318, 0xB37B, 0xA35A, 0xD3BD, 0xC39C, 0xF3FF,
|
||||
0xE3DE, 0x2462, 0x3443, 0x0420, 0x1401, 0x64E6, 0x74C7, 0x44A4, 0x5485, 0xA56A, 0xB54B, 0x8528,
|
||||
0x9509, 0xE5EE, 0xF5CF, 0xC5AC, 0xD58D, 0x3653, 0x2672, 0x1611, 0x0630, 0x76D7, 0x66F6, 0x5695,
|
||||
0x46B4, 0xB75B, 0xA77A, 0x9719, 0x8738, 0xF7DF, 0xE7FE, 0xD79D, 0xC7BC, 0x48C4, 0x58E5, 0x6886,
|
||||
0x78A7, 0x0840, 0x1861, 0x2802, 0x3823, 0xC9CC, 0xD9ED, 0xE98E, 0xF9AF, 0x8948, 0x9969, 0xA90A,
|
||||
0xB92B, 0x5AF5, 0x4AD4, 0x7AB7, 0x6A96, 0x1A71, 0x0A50, 0x3A33, 0x2A12, 0xDBFD, 0xCBDC, 0xFBBF,
|
||||
0xEB9E, 0x9B79, 0x8B58, 0xBB3B, 0xAB1A, 0x6CA6, 0x7C87, 0x4CE4, 0x5CC5, 0x2C22, 0x3C03, 0x0C60,
|
||||
0x1C41, 0xEDAE, 0xFD8F, 0xCDEC, 0xDDCD, 0xAD2A, 0xBD0B, 0x8D68, 0x9D49, 0x7E97, 0x6EB6, 0x5ED5,
|
||||
0x4EF4, 0x3E13, 0x2E32, 0x1E51, 0x0E70, 0xFF9F, 0xEFBE, 0xDFDD, 0xCFFC, 0xBF1B, 0xAF3A, 0x9F59,
|
||||
0x8F78, 0x9188, 0x81A9, 0xB1CA, 0xA1EB, 0xD10C, 0xC12D, 0xF14E, 0xE16F, 0x1080, 0x00A1, 0x30C2,
|
||||
0x20E3, 0x5004, 0x4025, 0x7046, 0x6067, 0x83B9, 0x9398, 0xA3FB, 0xB3DA, 0xC33D, 0xD31C, 0xE37F,
|
||||
0xF35E, 0x02B1, 0x1290, 0x22F3, 0x32D2, 0x4235, 0x5214, 0x6277, 0x7256, 0xB5EA, 0xA5CB, 0x95A8,
|
||||
0x8589, 0xF56E, 0xE54F, 0xD52C, 0xC50D, 0x34E2, 0x24C3, 0x14A0, 0x0481, 0x7466, 0x6447, 0x5424,
|
||||
0x4405, 0xA7DB, 0xB7FA, 0x8799, 0x97B8, 0xE75F, 0xF77E, 0xC71D, 0xD73C, 0x26D3, 0x36F2, 0x0691,
|
||||
0x16B0, 0x6657, 0x7676, 0x4615, 0x5634, 0xD94C, 0xC96D, 0xF90E, 0xE92F, 0x99C8, 0x89E9, 0xB98A,
|
||||
0xA9AB, 0x5844, 0x4865, 0x7806, 0x6827, 0x18C0, 0x08E1, 0x3882, 0x28A3, 0xCB7D, 0xDB5C, 0xEB3F,
|
||||
0xFB1E, 0x8BF9, 0x9BD8, 0xABBB, 0xBB9A, 0x4A75, 0x5A54, 0x6A37, 0x7A16, 0x0AF1, 0x1AD0, 0x2AB3,
|
||||
0x3A92, 0xFD2E, 0xED0F, 0xDD6C, 0xCD4D, 0xBDAA, 0xAD8B, 0x9DE8, 0x8DC9, 0x7C26, 0x6C07, 0x5C64,
|
||||
0x4C45, 0x3CA2, 0x2C83, 0x1CE0, 0x0CC1, 0xEF1F, 0xFF3E, 0xCF5D, 0xDF7C, 0xAF9B, 0xBFBA, 0x8FD9,
|
||||
0x9FF8, 0x6E17, 0x7E36, 0x4E55, 0x5E74, 0x2E93, 0x3EB2, 0x0ED1, 0x1EF0};
|
||||
|
||||
private RedisCRC16() {
|
||||
}
|
||||
|
||||
public static int crc16(byte[] bytes) {
|
||||
int crc = 0x0000;
|
||||
for (byte b : bytes) {
|
||||
crc = (crc << 8) ^ LOOKUP_TABLE[((crc >>> 8) ^ (b & 0xFF)) & 0xFF];
|
||||
}
|
||||
return crc & 0xFFFF;
|
||||
}
|
||||
|
||||
public static int calcSlot(int maxSlot, byte[] key) {
|
||||
if (key == null) {
|
||||
return 0;
|
||||
}
|
||||
int start = Utility.indexOf(key, (byte) '{');
|
||||
if (start != -1) {
|
||||
int end = Utility.indexOf(key, start + 1, (byte) '}');
|
||||
if (end != -1) {
|
||||
key = Arrays.copyOfRange(key, start + 1, end);
|
||||
}
|
||||
}
|
||||
int result = crc16(key) % maxSlot;
|
||||
return result;
|
||||
}
|
||||
|
||||
public static int calcSlot(int maxSlot, String key) {
|
||||
if (key == null) {
|
||||
return 0;
|
||||
}
|
||||
int start = key.indexOf('{');
|
||||
if (start != -1) {
|
||||
int end = key.indexOf('}');
|
||||
if (end != -1 && start + 1 < end) {
|
||||
key = key.substring(start + 1, end);
|
||||
}
|
||||
}
|
||||
int result = crc16(key.getBytes()) % maxSlot;
|
||||
return result;
|
||||
}
|
||||
}
|
@ -1,36 +0,0 @@
|
||||
/*
|
||||
* To change this license header, choose License Headers in Project Properties.
|
||||
* To change this template file, choose Tools | Templates
|
||||
* and open the template in the editor.
|
||||
*/
|
||||
package org.redkalex.cache.redis;
|
||||
|
||||
import org.redkale.net.AsyncConnection;
|
||||
import org.redkale.net.AsyncGroup;
|
||||
import org.redkale.net.client.Client;
|
||||
import org.redkale.net.client.ClientAddress;
|
||||
|
||||
/**
|
||||
* @author zhangjx
|
||||
*/
|
||||
public class RedisCacheClient extends Client<RedisCacheConnection, RedisCacheRequest, RedisCacheResult> {
|
||||
|
||||
public RedisCacheClient(String name, AsyncGroup group, String key, ClientAddress address, int maxConns, int maxPipelines, RedisCacheReqAuth authReq, RedisCacheReqDB dbReq) {
|
||||
super(name, group, true, address, maxConns, maxPipelines, () -> new RedisCacheReqPing(), () -> new RedisCacheReqClose(), null); //maxConns
|
||||
if (authReq != null || dbReq != null) {
|
||||
if (authReq != null && dbReq != null) {
|
||||
this.authenticate = conn -> writeChannel(conn, authReq).thenCompose(v -> writeChannel(conn, dbReq)).thenApply(v -> conn);
|
||||
} else if (authReq != null) {
|
||||
this.authenticate = conn -> writeChannel(conn, authReq).thenApply(v -> conn);
|
||||
} else {
|
||||
this.authenticate = conn -> writeChannel(conn, dbReq).thenApply(v -> conn);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RedisCacheConnection createClientConnection(final int index, AsyncConnection channel) {
|
||||
return new RedisCacheConnection(this, index, channel);
|
||||
}
|
||||
|
||||
}
|
@ -1,249 +0,0 @@
|
||||
/*
|
||||
* To change this license header, choose License Headers in Project Properties.
|
||||
* To change this template file, choose Tools | Templates
|
||||
* and open the template in the editor.
|
||||
*/
|
||||
package org.redkalex.cache.redis;
|
||||
|
||||
import org.redkale.net.client.ClientCodec;
|
||||
import org.redkale.net.client.ClientConnection;
|
||||
import org.redkale.util.ByteArray;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
/**
|
||||
* @author zhangjx
|
||||
*/
|
||||
public class RedisCacheCodec extends ClientCodec<RedisCacheRequest, RedisCacheResult> {
|
||||
|
||||
protected static final byte TYPE_STRING = '+'; //简单字符串(不包含CRLF)类型
|
||||
|
||||
protected static final byte TYPE_ERROR = '-'; //错误(不包含CRLF)类型
|
||||
|
||||
protected static final byte TYPE_NUMBER = ':'; //整型
|
||||
|
||||
protected static final byte TYPE_BULK = '$'; //块字符串
|
||||
|
||||
protected static final byte TYPE_ARRAY = '*'; //数组
|
||||
|
||||
protected static final Logger logger = Logger.getLogger(RedisCacheCodec.class.getSimpleName());
|
||||
|
||||
protected byte halfFrameCmd;
|
||||
|
||||
protected int halfFrameBulkLength = -10;
|
||||
|
||||
protected int halfFrameArraySize = -10;
|
||||
|
||||
protected int halfFrameArrayIndex; //从0开始
|
||||
|
||||
protected int halfFrameArrayItemLength = -10;
|
||||
|
||||
protected ByteArray halfFrameBytes;
|
||||
|
||||
protected byte frameType;
|
||||
|
||||
protected byte[] frameValue; //(不包含CRLF)
|
||||
|
||||
protected List<byte[]> frameList; //(不包含CRLF)
|
||||
|
||||
private ByteArray recyclableArray;
|
||||
|
||||
public RedisCacheCodec(ClientConnection connection) {
|
||||
super(connection);
|
||||
}
|
||||
|
||||
protected ByteArray pollArray(ByteArray array) {
|
||||
if (recyclableArray == null) {
|
||||
recyclableArray = new ByteArray();
|
||||
} else {
|
||||
recyclableArray.clear();
|
||||
}
|
||||
recyclableArray.clear();
|
||||
if (array != null) {
|
||||
recyclableArray.put(array, 0, array.length());
|
||||
}
|
||||
return recyclableArray;
|
||||
}
|
||||
|
||||
private boolean checkBytesFrame(RedisCacheConnection conn, ByteBuffer buffer, ByteArray array) {
|
||||
// byte[] dbs = new byte[buffer.remaining()];
|
||||
// for (int i = 0; i < dbs.length; i++) {
|
||||
// dbs[i] = buffer.get(buffer.position() + i);
|
||||
// }
|
||||
// ArrayDeque<ClientFuture> deque = (ArrayDeque) responseQueue(conn);
|
||||
// logger.log(Level.FINEST, "[" + Utility.nowMillis() + "] [" + Thread.currentThread().getName() + "]: " + conn + ", 原始数据: " + new String(dbs).replace("\r\n", " ") + ", req=" + deque.getFirst().getRequest());
|
||||
|
||||
array.clear();
|
||||
byte type = halfFrameCmd == 0 ? buffer.get() : halfFrameCmd;
|
||||
if (halfFrameBytes != null) {
|
||||
array.put(halfFrameBytes, 0, halfFrameBytes.length());
|
||||
}
|
||||
frameType = type;
|
||||
if (type == TYPE_STRING || type == TYPE_ERROR || type == TYPE_NUMBER) {
|
||||
if (readComplete(buffer, array)) {
|
||||
frameValue = array.getBytes();
|
||||
} else {
|
||||
halfFrameCmd = type;
|
||||
halfFrameBytes = pollArray(array);
|
||||
return false;
|
||||
}
|
||||
} else if (type == TYPE_BULK) {
|
||||
int bulkLength = halfFrameBulkLength;
|
||||
if (bulkLength < -2) {
|
||||
if (!readComplete(buffer, array)) { //没有读到bulkLength
|
||||
halfFrameCmd = type;
|
||||
halfFrameBulkLength = -10;
|
||||
halfFrameBytes = pollArray(array);
|
||||
return false;
|
||||
}
|
||||
bulkLength = Integer.parseInt(array.toString(StandardCharsets.UTF_8));
|
||||
array.clear();
|
||||
}
|
||||
if (bulkLength == -1) {
|
||||
frameValue = null;
|
||||
} else if (readComplete(buffer, array)) {
|
||||
frameValue = array.getBytes();
|
||||
} else {
|
||||
halfFrameCmd = type;
|
||||
halfFrameBulkLength = bulkLength;
|
||||
halfFrameBytes = pollArray(array);
|
||||
return false;
|
||||
}
|
||||
} else if (type == TYPE_ARRAY) {
|
||||
int arraySize = halfFrameArraySize;
|
||||
if (arraySize < -2) {
|
||||
if (!readComplete(buffer, array)) { //没有读到arraySize
|
||||
halfFrameCmd = type;
|
||||
halfFrameArraySize = -10;
|
||||
halfFrameArrayIndex = 0;
|
||||
halfFrameArrayItemLength = -10;
|
||||
halfFrameBytes = pollArray(array);
|
||||
return false;
|
||||
}
|
||||
arraySize = Integer.parseInt(array.toString(StandardCharsets.UTF_8));
|
||||
array.clear();
|
||||
}
|
||||
int arrayIndex = halfFrameArrayIndex;
|
||||
for (int i = arrayIndex; i < arraySize; i++) {
|
||||
int itemLength = halfFrameArrayItemLength;
|
||||
halfFrameArrayItemLength = -10;
|
||||
if (itemLength < -2) {
|
||||
if (!readComplete(buffer, array)) { //没有读到bulkLength
|
||||
halfFrameCmd = type;
|
||||
halfFrameArraySize = arraySize;
|
||||
halfFrameArrayIndex = i;
|
||||
halfFrameArrayItemLength = -10;
|
||||
halfFrameBytes = pollArray(array);
|
||||
return false;
|
||||
}
|
||||
byte sign = array.get(0);
|
||||
itemLength = Integer.parseInt(array.toString(1, StandardCharsets.UTF_8));
|
||||
array.clear();
|
||||
if (sign == TYPE_ARRAY) { //数组中嵌套数组,目前有 HSCAN
|
||||
frameValue = null;
|
||||
if (frameList != null) {
|
||||
frameList.clear();
|
||||
}
|
||||
clearHalfFrame();
|
||||
if (itemLength == 0) {
|
||||
return true;
|
||||
}
|
||||
halfFrameCmd = sign;
|
||||
halfFrameArraySize = itemLength;
|
||||
if (!buffer.hasRemaining()) {
|
||||
return false;
|
||||
}
|
||||
return checkBytesFrame(conn, buffer, array);
|
||||
}
|
||||
}
|
||||
int cha = itemLength - array.length();
|
||||
if (itemLength == -1) {
|
||||
if (frameList == null) {
|
||||
frameList = new ArrayList<>();
|
||||
}
|
||||
frameList.add(null);
|
||||
array.clear();
|
||||
} else if (buffer.remaining() >= cha + 2) {
|
||||
for (int j = 0; j < cha; j++) array.put(buffer.get());
|
||||
buffer.get(); //\r
|
||||
buffer.get(); //\n
|
||||
if (frameList == null) {
|
||||
frameList = new ArrayList<>();
|
||||
}
|
||||
frameList.add(array.getBytes());
|
||||
array.clear();
|
||||
} else {
|
||||
while (buffer.hasRemaining()) array.put(buffer.get());
|
||||
halfFrameCmd = type;
|
||||
halfFrameArraySize = arraySize;
|
||||
halfFrameArrayIndex = i;
|
||||
halfFrameArrayItemLength = itemLength;
|
||||
halfFrameBytes = pollArray(array);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
clearHalfFrame();
|
||||
return true;
|
||||
}
|
||||
|
||||
protected void clearHalfFrame() {
|
||||
halfFrameCmd = 0;
|
||||
halfFrameBulkLength = -10;
|
||||
halfFrameArraySize = -10;
|
||||
halfFrameArrayIndex = 0;
|
||||
halfFrameArrayItemLength = -10;
|
||||
halfFrameBytes = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decodeMessages(ByteBuffer realbuf, ByteArray array) {
|
||||
RedisCacheConnection conn = (RedisCacheConnection) connection;
|
||||
if (!realbuf.hasRemaining()) {
|
||||
return;
|
||||
}
|
||||
ByteBuffer buffer = realbuf;
|
||||
if (!checkBytesFrame(conn, buffer, array)) {
|
||||
return;
|
||||
}
|
||||
//buffer必然包含一个完整的frame数据
|
||||
boolean first = true;
|
||||
RedisCacheRequest request = null;
|
||||
while (first || buffer.hasRemaining()) {
|
||||
if (request == null) {
|
||||
request = nextRequest();
|
||||
}
|
||||
if (!first && !checkBytesFrame(conn, buffer, array)) {
|
||||
break;
|
||||
}
|
||||
if (frameType == TYPE_ERROR) {
|
||||
addMessage(request, new RuntimeException(new String(frameValue, StandardCharsets.UTF_8)));
|
||||
} else {
|
||||
addMessage(request, conn.pollResultSet(request).prepare(frameType, frameValue, frameList));
|
||||
}
|
||||
frameType = 0;
|
||||
frameValue = null;
|
||||
frameList = null;
|
||||
halfFrameCmd = 0;
|
||||
halfFrameBytes = null;
|
||||
first = false;
|
||||
buffer = realbuf;
|
||||
}
|
||||
}
|
||||
|
||||
protected boolean readComplete(ByteBuffer buffer, ByteArray array) {
|
||||
while (buffer.hasRemaining()) {
|
||||
byte b = buffer.get();
|
||||
if (b == '\n') {
|
||||
array.removeLastByte(); //移除 \r
|
||||
return true;
|
||||
}
|
||||
array.put(b);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
@ -1,49 +0,0 @@
|
||||
/*
|
||||
* To change this license header, choose License Headers in Project Properties.
|
||||
* To change this template file, choose Tools | Templates
|
||||
* and open the template in the editor.
|
||||
*/
|
||||
package org.redkalex.cache.redis;
|
||||
|
||||
import org.redkale.net.AsyncConnection;
|
||||
import org.redkale.net.WorkThread;
|
||||
import org.redkale.net.client.Client;
|
||||
import org.redkale.net.client.ClientCodec;
|
||||
import org.redkale.net.client.ClientConnection;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* @author zhangjx
|
||||
*/
|
||||
public class RedisCacheConnection extends ClientConnection<RedisCacheRequest, RedisCacheResult> {
|
||||
|
||||
public RedisCacheConnection(Client client, int index, AsyncConnection channel) {
|
||||
super(client, index, channel);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClientCodec createCodec() {
|
||||
return new RedisCacheCodec(this);
|
||||
}
|
||||
|
||||
protected CompletableFuture<RedisCacheResult> writeRequest(RedisCacheRequest request) {
|
||||
return super.writeChannel(request);
|
||||
}
|
||||
|
||||
protected <T> CompletableFuture<T> writeRequest(RedisCacheRequest request, Function<RedisCacheResult, T> respTransfer) {
|
||||
return super.writeChannel(request, respTransfer);
|
||||
}
|
||||
|
||||
public RedisCacheResult pollResultSet(RedisCacheRequest request) {
|
||||
RedisCacheResult rs = new RedisCacheResult();
|
||||
return rs;
|
||||
}
|
||||
|
||||
public RedisCacheRequest pollRequest(WorkThread workThread) {
|
||||
RedisCacheRequest rs = new RedisCacheRequest().currThread(workThread);
|
||||
return rs;
|
||||
}
|
||||
|
||||
}
|
@ -1,45 +0,0 @@
|
||||
/*
|
||||
* To change this license header, choose License Headers in Project Properties.
|
||||
* To change this template file, choose Tools | Templates
|
||||
* and open the template in the editor.
|
||||
*/
|
||||
package org.redkalex.cache.redis;
|
||||
|
||||
import org.redkale.net.client.ClientConnection;
|
||||
import org.redkale.util.ByteArray;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
/**
|
||||
* @author zhangjx
|
||||
*/
|
||||
public class RedisCacheReqAuth extends RedisCacheRequest {
|
||||
|
||||
private static final byte[] PS = "AUTH".getBytes(StandardCharsets.UTF_8);
|
||||
|
||||
protected String password;
|
||||
|
||||
public RedisCacheReqAuth(String password) {
|
||||
this.password = password;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(ClientConnection conn, ByteArray writer) {
|
||||
byte[] pwd = password.getBytes();
|
||||
writer.put((byte) '*');
|
||||
writer.put((byte) '2');
|
||||
writer.put((byte) '\r', (byte) '\n');
|
||||
writer.put((byte) '$');
|
||||
writer.put((byte) '4');
|
||||
writer.put((byte) '\r', (byte) '\n');
|
||||
writer.put(PS);
|
||||
writer.put((byte) '\r', (byte) '\n');
|
||||
|
||||
writer.put((byte) '$');
|
||||
writer.put(String.valueOf(pwd.length).getBytes(StandardCharsets.UTF_8));
|
||||
writer.put((byte) '\r', (byte) '\n');
|
||||
writer.put(pwd);
|
||||
writer.put((byte) '\r', (byte) '\n');
|
||||
|
||||
}
|
||||
}
|
@ -1,36 +0,0 @@
|
||||
/*
|
||||
* To change this license header, choose License Headers in Project Properties.
|
||||
* To change this template file, choose Tools | Templates
|
||||
* and open the template in the editor.
|
||||
*/
|
||||
package org.redkalex.cache.redis;
|
||||
|
||||
import org.redkale.net.client.ClientConnection;
|
||||
import org.redkale.util.ByteArray;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
/**
|
||||
* @author zhangjx
|
||||
*/
|
||||
public class RedisCacheReqClose extends RedisCacheRequest {
|
||||
|
||||
private static final byte[] PS = "QUIT".getBytes(StandardCharsets.UTF_8);
|
||||
|
||||
@Override
|
||||
public final boolean isCloseType() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(ClientConnection conn, ByteArray writer) {
|
||||
writer.put((byte) '*');
|
||||
writer.put((byte) '1');
|
||||
writer.put((byte) '\r', (byte) '\n');
|
||||
writer.put((byte) '$');
|
||||
writer.put((byte) '4');
|
||||
writer.put((byte) '\r', (byte) '\n');
|
||||
writer.put(PS);
|
||||
writer.put((byte) '\r', (byte) '\n');
|
||||
}
|
||||
}
|
@ -1,43 +0,0 @@
|
||||
/*
|
||||
* To change this license header, choose License Headers in Project Properties.
|
||||
* To change this template file, choose Tools | Templates
|
||||
* and open the template in the editor.
|
||||
*/
|
||||
package org.redkalex.cache.redis;
|
||||
|
||||
import org.redkale.net.client.ClientConnection;
|
||||
import org.redkale.util.ByteArray;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
/**
|
||||
* @author zhangjx
|
||||
*/
|
||||
public class RedisCacheReqDB extends RedisCacheRequest {
|
||||
|
||||
protected int db;
|
||||
|
||||
public RedisCacheReqDB(int db) {
|
||||
this.db = db;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(ClientConnection conn, ByteArray writer) {
|
||||
writer.put((byte) '*');
|
||||
writer.put((byte) '2');
|
||||
writer.put((byte) '\r', (byte) '\n');
|
||||
writer.put((byte) '$');
|
||||
writer.put((byte) '6');
|
||||
writer.put((byte) '\r', (byte) '\n');
|
||||
writer.put("SELECT".getBytes(StandardCharsets.UTF_8));
|
||||
writer.put((byte) '\r', (byte) '\n');
|
||||
|
||||
byte[] dbs = String.valueOf(db).getBytes(StandardCharsets.UTF_8);
|
||||
writer.put((byte) '$');
|
||||
writer.put(String.valueOf(dbs.length).getBytes(StandardCharsets.UTF_8));
|
||||
writer.put((byte) '\r', (byte) '\n');
|
||||
writer.put(dbs);
|
||||
writer.put((byte) '\r', (byte) '\n');
|
||||
|
||||
}
|
||||
}
|
@ -1,31 +0,0 @@
|
||||
/*
|
||||
* To change this license header, choose License Headers in Project Properties.
|
||||
* To change this template file, choose Tools | Templates
|
||||
* and open the template in the editor.
|
||||
*/
|
||||
package org.redkalex.cache.redis;
|
||||
|
||||
import org.redkale.net.client.ClientConnection;
|
||||
import org.redkale.util.ByteArray;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
/**
|
||||
* @author zhangjx
|
||||
*/
|
||||
public class RedisCacheReqPing extends RedisCacheRequest {
|
||||
|
||||
private static final byte[] PS = "PING".getBytes(StandardCharsets.UTF_8);
|
||||
|
||||
@Override
|
||||
public void writeTo(ClientConnection conn, ByteArray writer) {
|
||||
writer.put((byte) '*');
|
||||
writer.put((byte) '1');
|
||||
writer.put((byte) '\r', (byte) '\n');
|
||||
writer.put((byte) '$');
|
||||
writer.put((byte) '4');
|
||||
writer.put((byte) '\r', (byte) '\n');
|
||||
writer.put(PS);
|
||||
writer.put((byte) '\r', (byte) '\n');
|
||||
}
|
||||
}
|
@ -1,61 +0,0 @@
|
||||
/*
|
||||
* To change this license header, choose License Headers in Project Properties.
|
||||
* To change this template file, choose Tools | Templates
|
||||
* and open the template in the editor.
|
||||
*/
|
||||
package org.redkalex.cache.redis;
|
||||
|
||||
import org.redkale.net.client.ClientConnection;
|
||||
import org.redkale.net.client.ClientRequest;
|
||||
import org.redkale.util.ByteArray;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
/**
|
||||
* @author zhangjx
|
||||
*/
|
||||
public class RedisCacheRequest extends ClientRequest {
|
||||
|
||||
static final byte[] TRUE = new byte[]{'t'};
|
||||
|
||||
static final byte[] FALSE = new byte[]{'f'};
|
||||
|
||||
protected String key;
|
||||
|
||||
protected String command;
|
||||
|
||||
protected byte[][] args;
|
||||
|
||||
public <T> RedisCacheRequest prepare(String command, String key, byte[]... args) {
|
||||
super.prepare();
|
||||
this.command = command;
|
||||
this.key = key;
|
||||
this.args = args;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(ClientConnection conn, ByteArray writer) {
|
||||
writer.put((byte) '*');
|
||||
writer.put(String.valueOf(args.length + 1).getBytes(StandardCharsets.UTF_8));
|
||||
writer.put((byte) '\r', (byte) '\n');
|
||||
writer.put((byte) '$');
|
||||
writer.put(String.valueOf(command.length()).getBytes(StandardCharsets.UTF_8));
|
||||
writer.put((byte) '\r', (byte) '\n');
|
||||
writer.put(command.getBytes(StandardCharsets.UTF_8));
|
||||
writer.put((byte) '\r', (byte) '\n');
|
||||
|
||||
for (final byte[] arg : args) {
|
||||
writer.put((byte) '$');
|
||||
writer.put(String.valueOf(arg.length).getBytes(StandardCharsets.UTF_8));
|
||||
writer.put((byte) '\r', (byte) '\n');
|
||||
writer.put(arg);
|
||||
writer.put((byte) '\r', (byte) '\n');
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getClass().getSimpleName() + "{command=" + command + ", key=" + key + "}";
|
||||
}
|
||||
}
|
@ -1,151 +0,0 @@
|
||||
/*
|
||||
* To change this license header, choose License Headers in Project Properties.
|
||||
* To change this template file, choose Tools | Templates
|
||||
* and open the template in the editor.
|
||||
*/
|
||||
package org.redkalex.cache.redis;
|
||||
|
||||
import org.redkale.convert.json.JsonConvert;
|
||||
|
||||
import java.lang.reflect.Type;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* @author zhangjx
|
||||
*/
|
||||
public class RedisCacheResult {
|
||||
|
||||
//+ 简单字符串类型 (不包含CRLF)
|
||||
//- 错误类型 (不包含CRLF)
|
||||
//': 整型
|
||||
//$ 块字符串
|
||||
//* 数组
|
||||
protected byte frameType;
|
||||
|
||||
protected byte[] frameValue; //(不包含CRLF)
|
||||
|
||||
protected List<byte[]> frameList; //(不包含CRLF)
|
||||
|
||||
public RedisCacheResult prepare(byte byteType, byte[] val, List<byte[]> bytesList) {
|
||||
this.frameType = byteType;
|
||||
this.frameValue = val;
|
||||
this.frameList = bytesList;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Void getVoidValue() {
|
||||
return null;
|
||||
}
|
||||
|
||||
public byte[] getFrameValue() {
|
||||
return frameValue;
|
||||
}
|
||||
|
||||
public Boolean getBoolValue() {
|
||||
if (frameValue == null) {
|
||||
return false;
|
||||
}
|
||||
String val = new String(frameValue, StandardCharsets.UTF_8);
|
||||
if ("OK".equals(val)) {
|
||||
return true;
|
||||
}
|
||||
return Integer.parseInt(val) > 0;
|
||||
}
|
||||
|
||||
public String getStringValue(String key, RedisCryptor cryptor) {
|
||||
if (frameValue == null) {
|
||||
return null;
|
||||
}
|
||||
String val = new String(frameValue, StandardCharsets.UTF_8);
|
||||
if (cryptor != null) {
|
||||
val = cryptor.decrypt(key, val);
|
||||
}
|
||||
return val;
|
||||
}
|
||||
|
||||
public Double getDoubleValue(Double defvalue) {
|
||||
return frameValue == null ? defvalue : Double.parseDouble(new String(frameValue, StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
public Long getLongValue(Long defvalue) {
|
||||
return frameValue == null ? defvalue : Long.parseLong(new String(frameValue, StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
public Integer getIntValue(Integer defvalue) {
|
||||
return frameValue == null ? defvalue : Integer.parseInt(new String(frameValue, StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
public <T> T getObjectValue(String key, RedisCryptor cryptor, Type type) {
|
||||
return formatValue(key, cryptor, frameValue, type);
|
||||
}
|
||||
|
||||
protected <T> Set<T> getSetValue(String key, RedisCryptor cryptor, Type type) {
|
||||
if (frameList == null || frameList.isEmpty()) {
|
||||
return new LinkedHashSet<>();
|
||||
}
|
||||
Set<T> set = new LinkedHashSet<>();
|
||||
for (byte[] bs : frameList) {
|
||||
set.add(formatValue(key, cryptor, bs, type));
|
||||
}
|
||||
return set;
|
||||
}
|
||||
|
||||
protected <T> List<T> getListValue(String key, RedisCryptor cryptor, Type type) {
|
||||
if (frameList == null || frameList.isEmpty()) {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
List<T> list = new ArrayList<>();
|
||||
for (byte[] bs : frameList) {
|
||||
list.add(formatValue(key, cryptor, bs, type));
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
protected <T> Map<String, T> getMapValue(String key, RedisCryptor cryptor, Type type) {
|
||||
if (frameList == null || frameList.isEmpty()) {
|
||||
return new LinkedHashMap<>();
|
||||
}
|
||||
Map<String, T> map = new LinkedHashMap<>();
|
||||
for (int i = 0; i < frameList.size(); i += 2) {
|
||||
byte[] bs1 = frameList.get(i);
|
||||
byte[] bs2 = frameList.get(i + 1);
|
||||
T val = formatValue(key, cryptor, bs2, type);
|
||||
if (val != null) {
|
||||
map.put(formatValue(key, cryptor, bs1, String.class).toString(), val);
|
||||
}
|
||||
}
|
||||
return map;
|
||||
}
|
||||
|
||||
protected static <T> T formatValue(String key, RedisCryptor cryptor, byte[] frames, Type type) {
|
||||
if (frames == null) {
|
||||
return null;
|
||||
}
|
||||
if (type == byte[].class) {
|
||||
return (T) frames;
|
||||
}
|
||||
if (type == String.class) {
|
||||
String val = new String(frames, StandardCharsets.UTF_8);
|
||||
if (cryptor != null) {
|
||||
val = cryptor.decrypt(key, val);
|
||||
}
|
||||
return (T) val;
|
||||
}
|
||||
if (type == boolean.class || type == Boolean.class) {
|
||||
return (T) (Boolean) "t".equalsIgnoreCase(new String(frames, StandardCharsets.UTF_8));
|
||||
}
|
||||
if (type == long.class || type == Long.class) {
|
||||
return (T) (Long) Long.parseLong(new String(frames, StandardCharsets.UTF_8));
|
||||
}
|
||||
if (type == double.class || type == Double.class) {
|
||||
return (T) (Double) Double.parseDouble(new String(frames, StandardCharsets.UTF_8));
|
||||
}
|
||||
if (cryptor != null) {
|
||||
String val = cryptor.decrypt(key, new String(frames, StandardCharsets.UTF_8));
|
||||
return (T) JsonConvert.root().convertFrom(type, val);
|
||||
}
|
||||
return (T) JsonConvert.root().convertFrom(type, frames);
|
||||
}
|
||||
|
||||
}
|
File diff suppressed because it is too large
Load Diff
@ -1,29 +0,0 @@
|
||||
/*
|
||||
* To change this license header, choose License Headers in Project Properties.
|
||||
* To change this template file, choose Tools | Templates
|
||||
* and open the template in the editor.
|
||||
*/
|
||||
package org.redkalex.cache.redis;
|
||||
|
||||
import org.redkale.annotation.Priority;
|
||||
import org.redkale.source.CacheSource;
|
||||
import org.redkale.source.CacheSourceProvider;
|
||||
import org.redkale.util.AnyValue;
|
||||
|
||||
/**
|
||||
* @author zhangjx
|
||||
*/
|
||||
@Priority(1)
|
||||
public class RedisCacheSourceProvider implements CacheSourceProvider {
|
||||
|
||||
@Override
|
||||
public boolean acceptsConf(AnyValue config) {
|
||||
return new MyRedisCacheSource().acceptsConf(config);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CacheSource createInstance() {
|
||||
return new MyRedisCacheSource();
|
||||
}
|
||||
|
||||
}
|
@ -1,46 +0,0 @@
|
||||
/*
|
||||
* Click nbfs://nbhost/SystemFileSystem/Templates/Licenses/license-default.txt to change this license
|
||||
* Click nbfs://nbhost/SystemFileSystem/Templates/Classes/Class.java to edit this template
|
||||
*/
|
||||
package org.redkalex.cache.redis;
|
||||
|
||||
import org.redkale.util.AnyValue;
|
||||
|
||||
/**
|
||||
* @author zhangjx
|
||||
*/
|
||||
public interface RedisCryptor {
|
||||
|
||||
/**
|
||||
* 初始化
|
||||
*
|
||||
* @param conf 配置
|
||||
*/
|
||||
public void init(AnyValue conf);
|
||||
|
||||
/**
|
||||
* 加密, 无需加密的key对应的值需要直接返回value
|
||||
*
|
||||
* @param key key
|
||||
* @param value 明文
|
||||
* @return 密文
|
||||
*/
|
||||
public String encrypt(String key, String value);
|
||||
|
||||
/**
|
||||
* 解密, 无需解密的key对应的值需要直接返回value
|
||||
*
|
||||
* @param key key
|
||||
* @param value 密文
|
||||
* @return 明文
|
||||
*/
|
||||
public String decrypt(String key, String value);
|
||||
|
||||
/**
|
||||
* 销毁
|
||||
*
|
||||
* @param conf 配置
|
||||
*/
|
||||
public void destroy(AnyValue conf);
|
||||
|
||||
}
|
@ -1,184 +1,25 @@
|
||||
package net.tccn;
|
||||
package org.redkalex.cache.redis;
|
||||
|
||||
import org.redkale.net.AsyncIOGroup;
|
||||
import org.redkale.convert.json.JsonFactory;
|
||||
import org.redkale.util.AnyValue;
|
||||
import org.redkale.util.ResourceFactory;
|
||||
import org.redkalex.cache.redis.MyRedisCacheSource;
|
||||
|
||||
import static org.redkale.boot.Application.RESNAME_APP_CLIENT_ASYNCGROUP;
|
||||
import static org.redkale.source.AbstractCacheSource.*;
|
||||
import java.util.Map;
|
||||
|
||||
public class RedisTest {
|
||||
|
||||
static MyRedisCacheSource source = new MyRedisCacheSource();
|
||||
static MyRedisCacheSource<String> source = new MyRedisCacheSource();
|
||||
|
||||
static { // redis://:*Zhong9307!@47.111.150.118:6064?db=2
|
||||
AnyValue.DefaultAnyValue conf = new AnyValue.DefaultAnyValue().addValue(CACHE_SOURCE_MAXCONNS, "1");
|
||||
conf.addValue(CACHE_SOURCE_NODE, new AnyValue.DefaultAnyValue().addValue(CACHE_SOURCE_URL, "redis://:123456@127.0.0.1:6379?db=0"));
|
||||
final ResourceFactory factory = ResourceFactory.create();
|
||||
final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16);
|
||||
asyncGroup.start();
|
||||
factory.register(RESNAME_APP_CLIENT_ASYNCGROUP, asyncGroup);
|
||||
factory.inject(source);
|
||||
//source.defaultConvert = JsonFactory.root().getConvert();
|
||||
source.init(conf);
|
||||
|
||||
System.out.println(source.get("a"));
|
||||
|
||||
//--------------------- bit ------------------------------
|
||||
/*boolean ax = source.getBit("ax", 6);
|
||||
System.out.println("ax:"+ ax);
|
||||
source.setBit("ax", 6, true);
|
||||
|
||||
ax = source.getBit("ax", 6);
|
||||
System.out.println("ax:"+ ax);
|
||||
|
||||
source.setBit("ax", 6, false);
|
||||
ax = source.getBit("ax", 6);
|
||||
System.out.println("ax:"+ ax);*/
|
||||
//--------------------- bit ------------------------------
|
||||
|
||||
//--------------------- bit ------------------------------
|
||||
|
||||
/*
|
||||
source.lock("lockx", 5000);
|
||||
*/
|
||||
|
||||
|
||||
source.keysStartsWith("more-hot").forEach(x -> {
|
||||
System.out.println(x);
|
||||
source.del(x);
|
||||
|
||||
int i = (short) 3;
|
||||
});
|
||||
|
||||
source.keysStartsWith("districtbeans").forEach(x -> {
|
||||
System.out.println(x);
|
||||
source.del(x);
|
||||
});
|
||||
source.keysStartsWith("oss-blind-users").forEach(x -> {
|
||||
System.out.println(x);
|
||||
source.del(x);
|
||||
});
|
||||
source.keysStartsWith("venue:drama-product-schedule-fee:rt-lbeuai84:20221217").forEach(x -> {
|
||||
System.out.println(x);
|
||||
source.del(x);
|
||||
});
|
||||
source.keysStartsWith("venue:site-hour-fee:54f6e9b74cd7416db8d605366c1f49c2:20220729").forEach(x -> {
|
||||
System.out.println(x);
|
||||
source.del(x);
|
||||
});
|
||||
source.keysStartsWith("rainbow").forEach(x -> {
|
||||
System.out.println(x);
|
||||
source.del(x);
|
||||
});
|
||||
source.keysStartsWith("run").forEach(x -> {
|
||||
System.out.println(x);
|
||||
source.del(x);
|
||||
});
|
||||
source.keysStartsWith("today").forEach(x -> {
|
||||
System.out.println(x);
|
||||
source.del(x);
|
||||
});
|
||||
source.keysStartsWith("user").forEach(x -> {
|
||||
System.out.println(x);
|
||||
source.del(x);
|
||||
});
|
||||
source.keysStartsWith("im:").forEach(x -> {
|
||||
System.out.println(x);
|
||||
source.del(x);
|
||||
});
|
||||
source.keysStartsWith("ios-").forEach(x -> {
|
||||
System.out.println(x);
|
||||
source.del(x);
|
||||
});
|
||||
|
||||
|
||||
//--------------------- set ------------------------------
|
||||
/*source.del("setx");
|
||||
*//*
|
||||
int[] ints = {1, 2, 3};
|
||||
source.sadd("setx", ints);
|
||||
*//*
|
||||
|
||||
//source.sadd("setx", list.toArray(Integer[]::new));
|
||||
List<Integer> list = List.of(2, 3, 5);
|
||||
// source.sadd("setx", list.toArray(Integer[]::new));
|
||||
source.sadd("setx", list.toArray(Integer[]::new));
|
||||
source.sadd("setx", 12, 2312, 213);
|
||||
source.sadd("setx", List.of(1011, 10222));
|
||||
|
||||
source.keys("setx*").forEach(x -> {
|
||||
System.out.println(x);
|
||||
});
|
||||
|
||||
source.srem("setx", 213, 2312);
|
||||
|
||||
|
||||
Collection<String> setx1 = source.getCollection("setx", String.class);
|
||||
|
||||
System.out.println(setx1);
|
||||
|
||||
|
||||
//source.getexLong()
|
||||
|
||||
source.setHms("hmx", Map.of("a", "5", "b", "51", "c", "ads"));
|
||||
|
||||
List<Serializable> hmget = source.hmget("hmx", int.class, "a");
|
||||
|
||||
System.out.println(hmget);
|
||||
|
||||
Integer hm = source.getHm("hmx", int.class, "ads");
|
||||
System.out.println(hm);
|
||||
|
||||
Map<String, String> hms = source.getHms("hmx", "a", "b");
|
||||
System.out.println("hmx:" + hms);
|
||||
|
||||
*//*System.out.println("======================================================");
|
||||
System.out.println(source.incrHm("hmx", "a", -6.0));
|
||||
hms = source.getHms("hmx", "a", "b");
|
||||
System.out.println("hmx:a+1后的结果 " + hms);*//*
|
||||
System.out.println("======================================================");
|
||||
source.setHm("hmx", "c", 12);
|
||||
hms = source.getHms("hmx", "a", "b", "c", "d", "a");
|
||||
System.out.println("hmx:设置 c=12 后的结果 " + hms);
|
||||
System.out.println("======================================================");
|
||||
Double c = source.getHm("hmx", double.class, "c");
|
||||
System.out.println("hmx 中 c 值:" + c);*/
|
||||
|
||||
/*Map<String, Object> hmx = source.getHmall("hmx");
|
||||
System.out.println("Hmall:" + hmx);*/
|
||||
|
||||
|
||||
|
||||
|
||||
/*AnyValue.DefaultAnyValue conf = new AnyValue.DefaultAnyValue();
|
||||
static {
|
||||
AnyValue.DefaultAnyValue conf = new AnyValue.DefaultAnyValue();
|
||||
conf.addValue("node", new AnyValue.DefaultAnyValue().addValue("addr", "47.111.150.118").addValue("port", "6064").addValue("password", "*Zhong9307!").addValue("db", 2));
|
||||
|
||||
source.defaultConvert = JsonFactory.root().getConvert();
|
||||
source.initValueType(String.class); //value用String类型
|
||||
source.init(conf);*/
|
||||
source.init(conf);
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
|
||||
//source.setLong("a", 125);
|
||||
|
||||
/*long a = source.getLong("a", 0);
|
||||
System.out.println(a);
|
||||
|
||||
List<String> keys = source.keys("farm*");
|
||||
keys.forEach(x -> System.out.println(x));
|
||||
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}*/
|
||||
|
||||
|
||||
// ===========================================
|
||||
//System.out.println(source.remove("a", "b"));
|
||||
|
||||
// bit
|
||||
@ -274,10 +115,10 @@ public class RedisTest {
|
||||
System.out.println(source.getCollectionSize("sk")); // 2*/
|
||||
|
||||
|
||||
/*Map<String, String> hms = source.getHms("supportusers", "5-kfeu0f", "xxxx", "3-0kbt7u8t", "95q- ");
|
||||
Map<String, String> hms = source.getHms("supportusers", "5-kfeu0f", "xxxx", "3-0kbt7u8t", "95q- ");
|
||||
hms.forEach((k, v) -> {
|
||||
System.out.println(k + " : " + v);
|
||||
});*/
|
||||
});
|
||||
|
||||
|
||||
/*MyRedisCacheSource<String> source2 = new MyRedisCacheSource();
|
1944
src/main/java/org/redkalex/cache/redis/RedissionCacheSource.java
vendored
Normal file
1944
src/main/java/org/redkalex/cache/redis/RedissionCacheSource.java
vendored
Normal file
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue
Block a user