新增:redkale-2.2 版本支持

This commit is contained in:
绝尘 2024-01-25 02:03:46 +08:00
parent 11ed5d2018
commit 0ab33845f0
26 changed files with 4268 additions and 3253 deletions

View File

@ -6,7 +6,7 @@
<groupId>net.tccn</groupId>
<artifactId>zhub-client-redkale</artifactId>
<version>0.1.1.dev</version>
<version>x.22.0</version> <!-- 支持 redkale-2.2 版本 -->
<properties>
<maven.compiler.source>17</maven.compiler.source>
@ -18,7 +18,7 @@
<dependency>
<groupId>org.redkale</groupId>
<artifactId>redkale</artifactId>
<version>2.8.0.dev</version>
<version>2.2.0</version>
<scope>compile</scope>
</dependency>
<dependency>

View File

@ -1,9 +1,9 @@
package net.tccn;
import org.redkale.convert.json.JsonConvert;
import org.redkale.util.Resourcable;
import org.redkale.util.TypeToken;
import javax.annotation.Resource;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@ -13,11 +13,12 @@ import java.util.function.Consumer;
* @author Liang
* @data 2020-09-05 23:18
*/
public abstract class AbstractConsumer extends ZhubAgentProvider implements IConsumer, Resourcable {
public abstract class AbstractConsumer implements IConsumer {
protected JsonConvert convert = JsonConvert.root();
protected static String APP_NAME = "";
@Resource(name = "APP_NAME")
protected String APP_NAME = "";
private Map<String, EventType> eventMap = new ConcurrentHashMap<>();
@ -72,9 +73,4 @@ public abstract class AbstractConsumer extends ZhubAgentProvider implements ICon
}
// --------------
@Override
public String resourceName() {
return super.getName();
}
}

View File

@ -1,64 +0,0 @@
package net.tccn;
import org.redkale.boot.Application;
import org.redkale.boot.NodeServer;
import org.redkale.cluster.CacheClusterAgent;
import org.redkale.cluster.ClusterAgent;
import org.redkale.service.Service;
import org.redkale.util.ResourceEvent;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
public abstract class ZhubAgentProvider extends ClusterAgent {
@Override
public void onResourceChange(ResourceEvent[] events) {
}
@Override
public void register(Application application) {
}
@Override
public void deregister(Application application) {
}
@Override
public CompletableFuture<Set<InetSocketAddress>> queryHttpAddress(String protocol, String module, String resname) {
return null;
}
@Override
public CompletableFuture<Set<InetSocketAddress>> querySncpAddress(String protocol, String restype, String resname) {
return null;
}
@Override
protected CompletableFuture<Set<InetSocketAddress>> queryAddress(ClusterEntry entry) {
return null;
}
@Override
protected ClusterEntry register(NodeServer ns, String protocol, Service service) {
deregister(ns, protocol, service);
ClusterEntry clusterEntry = new ClusterEntry(ns, protocol, service);
CacheClusterAgent.AddressEntry entry = new CacheClusterAgent.AddressEntry();
entry.addr = clusterEntry.address;
entry.resname = clusterEntry.resourceName;
entry.nodeid = this.nodeid;
entry.time = System.currentTimeMillis();
//source.hset(clusterEntry.serviceName, clusterEntry.serviceid, CacheClusterAgent.AddressEntry.class, entry);
return clusterEntry;
}
@Override
protected void deregister(NodeServer ns, String protocol, Service service) {
}
}

View File

@ -0,0 +1,50 @@
package net.tccn;
import net.tccn.zhub.ZHubClient;
import org.redkale.boot.Application;
import org.redkale.boot.ApplicationListener;
import org.redkale.service.Service;
import org.redkale.util.AnyValue;
import org.redkale.util.RedkaleClassLoader;
import org.redkale.util.ResourceFactory;
import java.lang.reflect.InvocationTargetException;
import java.util.concurrent.CompletableFuture;
/**
* 服务监听
*
* @author: liangxy.
*/
public class ZhubListener implements ApplicationListener {
@Override
public void preStart(Application application) {
CompletableFuture.runAsync(() -> {
ResourceFactory resourceFactory = application.getResourceFactory();
RedkaleClassLoader classLoader = application.getClassLoader();
AnyValue appConfig = application.getAppConfig();
AnyValue zhubs = appConfig.getAnyValue("zhubs");
AnyValue[] values = zhubs.getAnyValues("zhub");
for (AnyValue zhub : values) {
String className = zhub.getValue("value", ZHubClient.class.getCanonicalName());
try {
Class<?> clazz = classLoader.loadClass(className);
Service obj = (Service) clazz.getDeclaredConstructor().newInstance();
application.getResourceFactory().inject(obj);
obj.init(zhub);
resourceFactory.register(zhub.get("name"), clazz, obj);
} catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException | ClassNotFoundException e) {
e.printStackTrace();
}
}
});
}
@Override
public void preShutdown(Application application) {
}
}

View File

@ -1,21 +0,0 @@
package net.tccn;
import net.tccn.zhub.ZHubClient;
import org.redkale.annotation.Priority;
import org.redkale.cluster.ClusterAgent;
import org.redkale.cluster.ClusterAgentProvider;
import org.redkale.util.AnyValue;
@Priority(1)
public class ZhubProvider implements ClusterAgentProvider {
@Override
public boolean acceptsConf(AnyValue config) {
return new ZHubClient().acceptsConf(config);
}
@Override
public ClusterAgent createInstance() {
return new ZHubClient();
}
}

View File

@ -2,14 +2,9 @@ package net.tccn.zhub;
import net.tccn.*;
import net.tccn.timer.Timers;
import org.redkale.annotation.AutoLoad;
import org.redkale.annotation.ResourceType;
import org.redkale.service.Local;
import org.redkale.service.Service;
import org.redkale.util.AnyValue;
import org.redkale.util.Comment;
import org.redkale.util.TypeToken;
import org.redkale.util.Utility;
import org.redkale.util.*;
import java.io.BufferedReader;
import java.io.IOException;
@ -56,38 +51,8 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
private static Map<String, ZHubClient> mainHub = new HashMap<>(); // 127.0.0.1:1216 - ZHubClient
public ZHubClient() {
}
public ZHubClient(String name, Map<String, String> attr) {
this.APP_NAME = name;
this.addr = attr.get("addr");
this.groupid = attr.get("groupid");
this.auth = attr.get("auth");
this.initClient(null);
}
@Override
public void init(AnyValue config) {
APP_NAME = application.getName();
/*if (!preInit()) {
return;
}*/
if (config == null) {
initClient(null);
} else {
Map<String, AnyValue> nodes = getNodes(config);
for (String rsName : nodes.keySet()) {
ZHubClient client = new ZHubClient().initClient(nodes.get(rsName));
application.getResourceFactory().register(rsName, client);
}
}
}
private ZHubClient initClient(AnyValue config) {
// 自动注入
if (config != null) {
addr = config.getValue("addr", addr);
@ -103,8 +68,10 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
}
// 设置第一个启动的 实例为主实例
if (!mainHub.containsKey(addr)) { // 确保同步执行此 init 逻辑
mainHub.put(addr, this);
synchronized (ZHubClient.class) {
if (!mainHub.containsKey(addr)) { // 确保同步执行此 init 逻辑
mainHub.put(addr, this);
}
}
CompletableFuture.runAsync(() -> {
@ -272,18 +239,17 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
while (true) {
Event<String> event = null;
try {
event = rpcCallQueue.take();
logger.info(String.format("rpc-call:[%s] %s", event.topic, event.value));
event = rpcBackQueue.take();
logger.info(String.format("rpc-back:[%s]", event.value));
String topic = event.topic;
String value = event.value;
executor.submit(() -> accept(topic, value)).get(5, TimeUnit.SECONDS);
executor.submit(() -> rpcAccept(value)).get(5, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
if (e instanceof TimeoutException) {
executor = Executors.newSingleThreadExecutor();
logger.log(Level.WARNING, "rpc-call TimeoutException, topic[" + event.topic + "], value[" + event.value + "]", e);
logger.log(Level.WARNING, "rpc-back TimeoutException, topic[" + event.topic + "], value[" + event.value + "]", e);
} else if (event != null) {
logger.log(Level.WARNING, "rpc-call[" + event.topic + "] event accept error :" + event.value, e);
logger.log(Level.WARNING, "rpc-back[" + event.value + "] event accept error :" + event.value, e);
}
}
}
@ -335,8 +301,6 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
}
}).start();
});
return this;
}
public boolean acceptsConf(AnyValue config) {
@ -621,8 +585,8 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
private static Map<String, TypeToken> rpcRetType = new ConcurrentHashMap<>();
@Comment("rpc call")
public RpcResult<Void> rpc(String topic, Object v) {
return rpc(topic, v, null);
public RpcResult<String> rpc(String topic, Object v) {
return rpc(topic, v, IType.STRING);
}
@Comment("rpc call")
@ -671,8 +635,8 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
return rpc.getRpcResult();
}
public <T, R> CompletableFuture<RpcResult<R>> rpcAsync(String topic, T v) {
return CompletableFuture.supplyAsync(() -> rpc(topic, v, null));
public <T> CompletableFuture<RpcResult<String>> rpcAsync(String topic, T v) {
return CompletableFuture.supplyAsync(() -> rpc(topic, v, IType.STRING));
}
public <T, R> CompletableFuture<RpcResult<R>> rpcAsync(String topic, T v, TypeToken<R> typeToken) {

View File

@ -1,234 +0,0 @@
/*
* Click nbfs://nbhost/SystemFileSystem/Templates/Licenses/license-default.txt to change this license
* Click nbfs://nbhost/SystemFileSystem/Templates/Classes/Class.java to edit this template
*/
package org.redkalex.cache.redis;
import org.redkale.annotation.Resource;
import org.redkale.convert.Convert;
import org.redkale.convert.json.JsonConvert;
import org.redkale.source.AbstractCacheSource;
import org.redkale.util.*;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import static org.redkale.boot.Application.RESNAME_APP_EXECUTOR;
import static org.redkale.boot.Application.RESNAME_APP_NAME;
/**
*
* @author zhangjx
*
* @since 2.8.0
*/
public abstract class AbstractRedisSource extends AbstractCacheSource {
public static final String CACHE_SOURCE_CRYPTOR = "cryptor";
protected String name;
@Resource(name = RESNAME_APP_NAME, required = false)
protected String appName = "";
@Resource(required = false)
protected ResourceFactory resourceFactory;
@Resource(required = false)
protected JsonConvert defaultConvert;
@Resource(name = Resource.PARENT_NAME + "_convert", required = false)
protected JsonConvert convert;
protected int db;
protected RedisCryptor cryptor;
protected AnyValue conf;
private ExecutorService subExecutor;
private final ReentrantLock subExecutorLock = new ReentrantLock();
@Resource(name = RESNAME_APP_EXECUTOR, required = false)
protected ExecutorService workExecutor;
@Override
public void init(AnyValue conf) {
this.conf = conf;
super.init(conf);
this.name = conf.getValue("name", "");
if (this.convert == null) {
this.convert = this.defaultConvert;
}
if (conf != null) {
String cryptStr = conf.getValue(CACHE_SOURCE_CRYPTOR, "").trim();
if (!cryptStr.isEmpty()) {
try {
Class<RedisCryptor> cryptClass = (Class) getClass().getClassLoader().loadClass(cryptStr);
RedkaleClassLoader.putReflectionPublicConstructors(cryptClass, cryptClass.getName());
this.cryptor = cryptClass.getConstructor().newInstance();
} catch (ReflectiveOperationException e) {
throw new RedkaleException(e);
}
}
}
if (cryptor != null) {
if (resourceFactory != null) {
resourceFactory.inject(cryptor);
}
cryptor.init(conf);
}
}
@Override
public void destroy(AnyValue conf) {
super.destroy(conf);
if (cryptor != null) {
cryptor.destroy(conf);
}
}
public boolean acceptsConf(AnyValue config) {
if (config == null) {
return false;
}
return "redis".equalsIgnoreCase(config.getValue(CACHE_SOURCE_TYPE))
|| getClass().getName().equalsIgnoreCase(config.getValue(CACHE_SOURCE_TYPE))
|| config.getValue(CACHE_SOURCE_NODES, config.getValue("url", "")).startsWith("redis://")
|| config.getValue(CACHE_SOURCE_NODES, config.getValue("url", "")).startsWith("rediss://");
}
protected ExecutorService subExecutor() {
ExecutorService executor = subExecutor;
if (executor != null) {
return executor;
}
subExecutorLock.lock();
try {
if (subExecutor == null) {
String threadNameFormat = "CacheSource-" + resourceName() + "-SubThread-%s";
Function<String, ExecutorService> func = Utility.virtualExecutorFunction();
final AtomicInteger counter = new AtomicInteger();
subExecutor = func == null ? Executors.newFixedThreadPool(Utility.cpus(), r -> {
Thread t = new Thread(r);
t.setDaemon(true);
int c = counter.incrementAndGet();
t.setName(String.format(threadNameFormat, "Virtual-" + (c < 10 ? ("00" + c) : (c < 100 ? ("0" + c) : c))));
return t;
}) : func.apply(threadNameFormat);
}
executor = subExecutor;
} finally {
subExecutorLock.unlock();
}
return executor;
}
protected String getNodes(AnyValue config) {
return config.getValue(CACHE_SOURCE_NODES, config.getValue("url", ""));
}
@Override
public void close() throws Exception { // Application 关闭时调用
destroy(null);
}
@Override
public String resourceName() {
return name;
}
protected String decryptValue(String key, RedisCryptor cryptor, String value) {
return cryptor != null ? cryptor.decrypt(key, value) : value;
}
protected <T> T decryptValue(String key, RedisCryptor cryptor, Type type, byte[] bs) {
return decryptValue(key, cryptor, convert, type, bs);
}
protected <T> T decryptValue(String key, RedisCryptor cryptor, Convert c, Type type, byte[] bs) {
if (bs == null) {
return null;
}
if (type == byte[].class) {
return (T) bs;
}
if (cryptor == null && type == String.class) {
return (T) new String(bs, StandardCharsets.UTF_8);
}
if (cryptor == null || (type instanceof Class && (((Class) type).isPrimitive() || Number.class.isAssignableFrom((Class) type)))) {
return (T) (c == null ? this.convert : c).convertFrom(type, bs);
}
String deval = cryptor.decrypt(key, new String(bs, StandardCharsets.UTF_8));
if (type == String.class) {
return (T) deval;
}
return deval == null ? null : (T) (c == null ? this.convert : c).convertFrom(type, deval.getBytes(StandardCharsets.UTF_8));
}
protected String encryptValue(String key, RedisCryptor cryptor, String value) {
return cryptor != null ? cryptor.encrypt(key, value) : value;
}
protected <T> byte[] encryptValue(String key, RedisCryptor cryptor, Convert c, T value) {
return encryptValue(key, cryptor, null, c, value);
}
protected <T> byte[] encryptValue(String key, RedisCryptor cryptor, Type type, Convert c, T value) {
if (value == null) {
return null;
}
Type t = type == null ? value.getClass() : type;
if (cryptor == null && t == String.class) {
return value.toString().getBytes(StandardCharsets.UTF_8);
}
byte[] bs = (c == null ? this.convert : c).convertToBytes(t, value);
if (bs.length > 1 && t instanceof Class && !CharSequence.class.isAssignableFrom((Class) t)) {
if (bs[0] == '"' && bs[bs.length - 1] == '"') {
bs = Arrays.copyOfRange(bs, 1, bs.length - 1);
}
}
return encryptValue(key, cryptor, t, bs);
}
protected byte[] encryptValue(String key, RedisCryptor cryptor, Type type, byte[] bs) {
if (bs == null) {
return null;
}
if (cryptor == null || (type instanceof Class && (((Class) type).isPrimitive() || Number.class.isAssignableFrom((Class) type)))) {
return bs;
}
String enval = cryptor.encrypt(key, new String(bs, StandardCharsets.UTF_8));
return enval == null ? null : enval.getBytes(StandardCharsets.UTF_8);
}
protected <T extends Number> T decryptScore(Class<T> scoreType, Double score) {
if (score == null) {
return null;
}
if (scoreType == int.class || scoreType == Integer.class) {
return (T) (Number) score.intValue();
} else if (scoreType == long.class || scoreType == Long.class) {
return (T) (Number) score.longValue();
} else if (scoreType == float.class || scoreType == Float.class) {
return (T) (Number) score.floatValue();
} else if (scoreType == double.class || scoreType == Double.class) {
return (T) (Number) score;
} else {
return JsonConvert.root().convertFrom(scoreType, score.toString());
}
}
protected CompletableFuture<Integer> returnFutureSize(List<CompletableFuture<Void>> futures) {
return futures == null || futures.isEmpty() ? CompletableFuture.completedFuture(0) : Utility.allOfFutures(futures).thenApply(v -> futures.size());
}
}

View File

@ -1,27 +1,299 @@
package org.redkalex.cache.redis;
import org.redkale.annotation.AutoLoad;
import org.redkale.annotation.ResourceType;
import org.redkale.convert.Convert;
import org.redkale.service.Local;
import org.redkale.source.CacheSource;
import org.redkale.util.AnyValue;
import org.redkale.util.AutoLoad;
import org.redkale.util.ResourceType;
import java.io.Serializable;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
@Local
@AutoLoad(false)
@ResourceType(CacheSource.class)
public class MyRedisCacheSource extends RedisCacheSource {
@Override
public void init(AnyValue conf) {
super.init(conf);
public class MyRedisCacheSource<V extends Object> extends RedisCacheSource<V> {
//--------------------- oth ------------------------------
public boolean setnx(String key, Object v) {
byte[][] bytes = Stream.of(key, v).map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
Serializable rs = send("SETNX", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
return rs == null ? false : (long) rs == 1;
}
//--------------------- oth ------------------------------
//--------------------- bit ------------------------------
public boolean getBit(String key, int offset) {
byte[][] bytes = Stream.of(key, offset).map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
Serializable v = send("GETBIT", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
return v == null ? false : (long) v == 1;
}
public void setBit(String key, int offset, boolean bool) {
byte[][] bytes = Stream.of(key, offset, bool ? 1 : 0).map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
send("SETBIT", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
}
//--------------------- bit ------------------------------
//--------------------- lock ------------------------------
// 尝试加锁成功返回0否则返回上一锁剩余毫秒值
public int tryLock(String key, int millis) {
byte[][] bytes = Stream.of("" +
"if (redis.call('exists',KEYS[1]) == 0) then " +
"redis.call('psetex', KEYS[1], ARGV[1], 1) " +
"return 0; " +
"else " +
"return redis.call('PTTL', KEYS[1]); " +
"end; ", 1, key, millis).map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
int n = (int) send("EVAL", CacheEntryType.OBJECT, (Type) null, null, bytes).join();
return n;
}
// 加锁
public void lock(String key, int millis) {
int i;
do {
i = tryLock(key, millis);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
} while (i > 0);
}
// 解锁
public void unlock(String key) {
remove(key);
}
//--------------------- key ------------------------------
public long getTtl(String key) {
return (long) send("TTL", CacheEntryType.OBJECT, (Type) null, key, key.getBytes(StandardCharsets.UTF_8)).join();
}
public long getPttl(String key) {
return (long) send("PTTL", CacheEntryType.OBJECT, (Type) null, key, key.getBytes(StandardCharsets.UTF_8)).join();
}
public int remove(String... keys) {
if (keys == null || keys.length == 0) {
return 0;
}
List<String> para = new ArrayList<>();
para.add("" +
" local args = ARGV;" +
" local x = 0;" +
" for i,v in ipairs(args) do" +
" local inx = redis.call('del', v);" +
" if(inx > 0) then" +
" x = x + 1;" +
" end" +
" end" +
" return x;");
para.add("0");
for (Object field : keys) {
para.add(String.valueOf(field));
}
byte[][] bytes = para.stream().map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
return (int) send("EVAL", CacheEntryType.OBJECT, (Type) null, null, bytes).join();
}
//--------------------- hmget ------------------------------
public <T extends Object> V getHm(String key, T field) {
// return (V) send("HMGET", CacheEntryType.OBJECT, (Type) null, key, key.getBytes(StandardCharsets.UTF_8), field.getBytes(StandardCharsets.UTF_8)).join();
Map<Object, V> map = getHms(key, field);
return map.get(field);
}
public <T extends Object> Map<T, V> getHms(String key, T... field) {
if (field == null || field.length == 0) {
return new HashMap<>();
}
byte[][] bytes = Stream.concat(Stream.of(key), Stream.of(field)).map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
Map<T, V> result = new HashMap<>();
List<V> vs = (List) send("HMGET", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
for (int i = 0; i < field.length; i++) { // /*vs != null && vs.size() > i &&*/
if (vs.get(i) == null) {
continue;
}
result.put(field[i], vs.get(i));
}
return result;
}
public Map<String, V> getHmall(String key) {
List<V> vs = (List) send("HGETALL", CacheEntryType.OBJECT, (Type) null, key, key.getBytes(StandardCharsets.UTF_8)).join();
Map<String, V> result = new HashMap<>(vs.size() / 2);
for (int i = 0; i < vs.size(); i += 2) {
result.put(String.valueOf(vs.get(i)), vs.get(i + 1));
}
return result;
}
//--------------------- hmsethmdelincr ------------------------------
public <T> void setHm(String key, T field, V value) {
byte[][] bytes = Stream.of(key, field, value).map(x -> x.toString().getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
send("HMSET", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
}
public <T> void setHms(String key, Map<T, V> kv) {
List<String> args = new ArrayList();
args.add(key);
kv.forEach((k, v) -> {
args.add(String.valueOf(k));
args.add(String.valueOf(v));
});
byte[][] bytes = args.stream().map(x -> x.getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
send("HMSET", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
}
public <T> Long incrHm(String key, T field, long n) {
byte[][] bytes = Stream.of(key, String.valueOf(field), String.valueOf(n)).map(x -> x.getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
return (Long) send("HINCRBY", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
}
public <T> Double incrHm(String key, T field, double n) {
byte[][] bytes = Stream.of(key, String.valueOf(field), String.valueOf(n)).map(x -> x.getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
Serializable v = send("HINCRBYFLOAT", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
if (v == null) {
return null;
}
return Double.parseDouble(String.valueOf(v));
}
public <T> void hdel(String key, T... field) {
byte[][] bytes = Stream.concat(Stream.of(key), Stream.of(field)).map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
send("HDEL", null, (Type) null, key, bytes).join();
}
public <T> List<T> zexists(String key, T... fields) {
if (fields == null || fields.length == 0) {
return new ArrayList<>();
}
List<String> para = new ArrayList<>();
para.add("" +
" local key = KEYS[1];" +
" local args = ARGV;" +
" local result = {};" +
" for i,v in ipairs(args) do" +
" local inx = redis.call('ZREVRANK', key, v);" +
" if(inx) then" +
" table.insert(result,1,v);" +
" end" +
" end" +
" return result;");
para.add("1");
para.add(key);
for (Object field : fields) {
para.add(String.valueOf(field));
}
byte[][] bytes = para.stream().map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
return (List<T>) send("EVAL", CacheEntryType.OBJECT, (Type) null, null, bytes).join();
}
//--------------------- set ------------------------------
public <T> T srandomItem(String key) {
byte[][] bytes = Stream.of(key, 1).map(x -> formatValue(CacheEntryType.OBJECT, (Convert) null, (Type) null, x)).toArray(byte[][]::new);
List<T> list = (List) send("SRANDMEMBER", null, (Type) null, key, bytes).join();
return list != null && !list.isEmpty() ? list.get(0) : null;
}
public <T> List<T> srandomItems(String key, int n) {
byte[][] bytes = Stream.of(key, n).map(x -> formatValue(CacheEntryType.OBJECT, (Convert) null, (Type) null, x)).toArray(byte[][]::new);
return (List) send("SRANDMEMBER", null, (Type) null, key, bytes).join();
}
//--------------------- list ------------------------------
public CompletableFuture<Void> appendListItemsAsync(String key, V... values) {
byte[][] bytes = Stream.concat(Stream.of(key), Stream.of(values)).map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
return (CompletableFuture) send("RPUSH", null, (Type) null, key, bytes);
}
public CompletableFuture<Void> lpushListItemAsync(String key, V value) {
return (CompletableFuture) send("LPUSH", null, (Type) null, key, key.getBytes(StandardCharsets.UTF_8), formatValue(CacheEntryType.OBJECT, (Convert) null, (Type) null, value));
}
public void lpushListItem(String key, V value) {
lpushListItemAsync(key, value).join();
}
public void appendListItems(String key, V... values) {
appendListItemsAsync(key, values).join();
}
public void appendSetItems(String key, V... values) {
// todo:
for (V v : values) {
appendSetItem(key, v);
}
}
// 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
public CompletableFuture<Collection<V>> getCollectionAsync(String key, int offset, int limit) {
return (CompletableFuture) send("OBJECT", null, (Type) null, key, "ENCODING".getBytes(StandardCharsets.UTF_8), key.getBytes(StandardCharsets.UTF_8)).thenCompose(t -> {
if (t == null) return CompletableFuture.completedFuture(null);
if (new String((byte[]) t).contains("list")) { //list
return send("LRANGE", CacheEntryType.OBJECT, (Type) null, false, key, key.getBytes(StandardCharsets.UTF_8), String.valueOf(offset).getBytes(StandardCharsets.UTF_8), String.valueOf(offset + limit - 1).getBytes(StandardCharsets.UTF_8));
} else {
return send("SMEMBERS", CacheEntryType.OBJECT, (Type) null, true, key, key.getBytes(StandardCharsets.UTF_8));
}
});
}
public Collection<V> getCollection(String key, int offset, int limit) {
return getCollectionAsync(key, offset, limit).join();
}
public V brpop(String key, int seconds) {
byte[][] bytes = Stream.concat(Stream.of(key), Stream.of(seconds)).map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
return (V) send("BRPOP", null, (Type) null, key, bytes).join();
}
//--------------------- zset ------------------------------
public <N extends Number> void zadd(String key, Map<V, N> kv) {
if (kv == null || kv.isEmpty()) {
return;
}
List<String> args = new ArrayList();
args.add(key);
kv.forEach((k, v) -> {
args.add(String.valueOf(v));
args.add(String.valueOf(k));
});
byte[][] bytes = args.stream().map(x -> x.getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
send("ZADD", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
}
public <N extends Number> double zincr(String key, Object number, N n) {
byte[][] bytes = Stream.of(key, n, number).map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
Serializable v = send("ZINCRBY", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
return Double.parseDouble(String.valueOf(v));
}
public void zrem(String key, V... vs) {
List<String> args = new ArrayList();
args.add(key);
for (V v : vs) {
args.add(String.valueOf(v));
}
byte[][] bytes = args.stream().map(x -> x.getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
send("ZREM", CacheEntryType.OBJECT, (Type) null, key, bytes).join();
}
/*
//--------------------- zset ------------------------------
public int getZrank(String key, V v) {
byte[][] bytes = Stream.of(key, v).map(x -> String.valueOf(x).getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
@ -142,209 +414,17 @@ public class MyRedisCacheSource extends RedisCacheSource {
}
return map;
}
* */
// --------------------
/*
supper had support
public <N extends Number> void zadd(String key, Map<Serializable, N> kv) {
if (kv == null || kv.isEmpty()) {
return;
}
List<Serializable> args = new ArrayList();
kv.forEach((k, v) -> {
args.add(k);
args.add(v);
});
// ----------
protected byte[] formatValue(CacheEntryType cacheType, Convert convert0, Type resultType, Object value) {
if (value == null) return "null".getBytes(StandardCharsets.UTF_8);
if (convert0 == null) convert0 = convert;
if (cacheType == CacheEntryType.LONG || cacheType == CacheEntryType.ATOMIC)
return String.valueOf(value).getBytes(StandardCharsets.UTF_8);
if (cacheType == CacheEntryType.STRING) return convert0.convertToBytes(String.class, value);
sendAsync(RedisCommand.ZADD, key, args.toArray(Serializable[]::new)).join();
if (value instanceof String) return String.valueOf(value).getBytes(StandardCharsets.UTF_8);
if (value instanceof Number) return String.valueOf(value).getBytes(StandardCharsets.UTF_8);
return convert0.convertToBytes(resultType == null ? objValueType : resultType, value);
}
public <N extends Number> double zincr(String key, Serializable number, N n) {
return sendAsync(RedisCommand.ZINCRBY, key, number, n).thenApply(x -> x.getDoubleValue(0d)).join();
}
@Override
public long zrem(String key, String... vs) {
return sendAsync(RedisCommand.ZREM, key, keysArgs(key, vs)).thenApply(x -> x.getLongValue(0L)).join();
}*/
/*public <T> List<T> zexists(String key, T... fields) {
if (fields == null || fields.length == 0) {
return new ArrayList<>();
}
List<String> para = new ArrayList<>();
para.add("" +
" local key = KEYS[1];" +
" local args = ARGV;" +
" local result = {};" +
" for i,v in ipairs(args) do" +
" local inx = redis.call('ZREVRANK', key, v);" +
" if(inx) then" +
" table.insert(result,1,v);" +
" end" +
" end" +
" return result;");
para.add("1");
para.add(key);
for (Object field : fields) {
para.add(String.valueOf(field));
}
// todo:
//sendAsync("EVAL", null, para.toArray(Serializable[]::new)).thenApply(x -> x.).join();
return null;
}*/
//--------------------- bit ------------------------------
public boolean getBit(String key, int offset) {
return sendAsync(RedisCommand.GETBIT, key, key.getBytes(StandardCharsets.UTF_8), String.valueOf(offset).getBytes(StandardCharsets.UTF_8)).thenApply(v -> v.getIntValue(0) > 0).join();
}
public void setBit(String key, int offset, boolean bool) {
sendAsync(RedisCommand.SETBIT, key, keysArgs(key, offset + "", bool ? "1" : "0")).join();
}
//--------------------- bit ------------------------------
//--------------------- lock ------------------------------
// 尝试加锁成功返回0否则返回上一锁剩余毫秒值
public long tryLock(String key, int millis) {
String[] obj = {"" +
"if (redis.call('EXISTS',KEYS[1]) == 0) then " +
"redis.call('PSETEX',KEYS[1],ARGV[1],1); " +
"return 0; " +
"else " +
"return redis.call('PTTL',KEYS[1]); " +
"end;", 1 + "", key, millis + ""
};
return sendAsync(RedisCommand.EVAL, null, keysArgs(null, obj)).thenApply(v -> v.getIntValue(1)).join();
}
// 加锁
public void lock(String key, int millis) {
long i;
do {
i = tryLock(key, millis);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
} while (i > 0);
}
// 解锁
public void unlock(String key) {
remove(key);
}
//--------------------- key ------------------------------
public String get(String key) {
return get(key, String.class);
}
public void set(String key, Serializable value) {
sendAsync(RedisCommand.SET, key, keysArgs(key, value + "")).join();
}
//--------------------- set ------------------------------
/*public <T> void sadd(String key, Collection<T> args) {
saddAsync(key, args.toArray(T[]::new)).join();
}*/
/*public void sadd(String key, Serializable... args) {
String[] arr = new String[args.length];
for (int i = 0; i < args.length; i++) {
arr[i] = args[i] + "";
}
saddAsync(key, arr).join();
}
public void srem(String key, String... args) {
sremAsync(key, args).join();
}
public CompletableFuture<RedisCacheResult> saddAsync(String key, Serializable... args) {
return sendAsync(RedisCommand.SADD, key, keysArgs(key, args));
}
public CompletableFuture<RedisCacheResult> sremAsync(String key, String... args) {
return sendAsync(RedisCommand.SREM, key, keysArgs(key, args));
}*/
//--------------------- hm ------------------------------
/*public Long incrHm(String key, String field, int value) {
return sendAsync("HINCRBY", key, field, value).thenApply(x -> x.getLongValue(0l)).join();
}
public Double incrHm(String key, String field, double value) {
return sendAsync("HINCRBYFLOAT", key, field, value).thenApply(x -> x.getDoubleValue(0d)).join();
}*/
public void setHm(String key, String field, Serializable value) {
setHmsAsync(key, Map.of(field, value)).join();
}
public void setHms(String key, Map kv) {
setHmsAsync(key, kv).join();
}
public CompletableFuture<RedisCacheResult> setHmsAsync(String key, Map<String, Serializable> kv) {
List<String> args = new ArrayList();
kv.forEach((k, v) -> {
args.add(k);
args.add(v + "");
});
return sendAsync(RedisCommand.HMSET, key, keysArgs(key, args.toArray(String[]::new)));
}
public String getHm(String key, String field) {
return getHm(key, String.class, field);
}
public <T extends Serializable> T getHm(String key, Class<T> type, String field) {
List<Serializable> list = super.hmget(key, type, field);
if (list == null && list.isEmpty()) {
return null;
}
return (T) list.get(0);
}
public Map<String, String> getHms(String key, String... field) {
return getHms(key, String.class, field);
}
public <T extends Serializable> Map<String, T> getHms(String key, Class<T> type, String... field) {
List<Serializable> list = super.hmget(key, type, field);
if (list == null && list.isEmpty()) {
return null;
}
Map<String, T> map = new HashMap<>(field.length);
for (int i = 0; i < field.length; i++) {
if (list.get(i) == null) {
continue;
}
map.put(field[i], (T) list.get(i));
}
return map;
}
/*public Map<String, Object> getHmall(String key) {
List<String> list = null; // TODO:
Map<String, Object> map = new HashMap<>();
if (list.isEmpty()) {
return map;
}
for (int i = 0; i + 1 < list.size(); i += 2) {
map.put((String) list.get(i), list.get(i + 1));
}
return map;
}*/
}

View File

@ -1,79 +0,0 @@
/*
*
*/
package org.redkalex.cache.redis;
import org.redkale.util.Utility;
import java.util.Arrays;
/**
*
* @author zhangjx
*/
public class RedisCRC16 {
private static final int MAX_SLOT = 16384;
private static final int[] LOOKUP_TABLE = {0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50A5, 0x60C6,
0x70E7, 0x8108, 0x9129, 0xA14A, 0xB16B, 0xC18C, 0xD1AD, 0xE1CE, 0xF1EF, 0x1231, 0x0210, 0x3273,
0x2252, 0x52B5, 0x4294, 0x72F7, 0x62D6, 0x9339, 0x8318, 0xB37B, 0xA35A, 0xD3BD, 0xC39C, 0xF3FF,
0xE3DE, 0x2462, 0x3443, 0x0420, 0x1401, 0x64E6, 0x74C7, 0x44A4, 0x5485, 0xA56A, 0xB54B, 0x8528,
0x9509, 0xE5EE, 0xF5CF, 0xC5AC, 0xD58D, 0x3653, 0x2672, 0x1611, 0x0630, 0x76D7, 0x66F6, 0x5695,
0x46B4, 0xB75B, 0xA77A, 0x9719, 0x8738, 0xF7DF, 0xE7FE, 0xD79D, 0xC7BC, 0x48C4, 0x58E5, 0x6886,
0x78A7, 0x0840, 0x1861, 0x2802, 0x3823, 0xC9CC, 0xD9ED, 0xE98E, 0xF9AF, 0x8948, 0x9969, 0xA90A,
0xB92B, 0x5AF5, 0x4AD4, 0x7AB7, 0x6A96, 0x1A71, 0x0A50, 0x3A33, 0x2A12, 0xDBFD, 0xCBDC, 0xFBBF,
0xEB9E, 0x9B79, 0x8B58, 0xBB3B, 0xAB1A, 0x6CA6, 0x7C87, 0x4CE4, 0x5CC5, 0x2C22, 0x3C03, 0x0C60,
0x1C41, 0xEDAE, 0xFD8F, 0xCDEC, 0xDDCD, 0xAD2A, 0xBD0B, 0x8D68, 0x9D49, 0x7E97, 0x6EB6, 0x5ED5,
0x4EF4, 0x3E13, 0x2E32, 0x1E51, 0x0E70, 0xFF9F, 0xEFBE, 0xDFDD, 0xCFFC, 0xBF1B, 0xAF3A, 0x9F59,
0x8F78, 0x9188, 0x81A9, 0xB1CA, 0xA1EB, 0xD10C, 0xC12D, 0xF14E, 0xE16F, 0x1080, 0x00A1, 0x30C2,
0x20E3, 0x5004, 0x4025, 0x7046, 0x6067, 0x83B9, 0x9398, 0xA3FB, 0xB3DA, 0xC33D, 0xD31C, 0xE37F,
0xF35E, 0x02B1, 0x1290, 0x22F3, 0x32D2, 0x4235, 0x5214, 0x6277, 0x7256, 0xB5EA, 0xA5CB, 0x95A8,
0x8589, 0xF56E, 0xE54F, 0xD52C, 0xC50D, 0x34E2, 0x24C3, 0x14A0, 0x0481, 0x7466, 0x6447, 0x5424,
0x4405, 0xA7DB, 0xB7FA, 0x8799, 0x97B8, 0xE75F, 0xF77E, 0xC71D, 0xD73C, 0x26D3, 0x36F2, 0x0691,
0x16B0, 0x6657, 0x7676, 0x4615, 0x5634, 0xD94C, 0xC96D, 0xF90E, 0xE92F, 0x99C8, 0x89E9, 0xB98A,
0xA9AB, 0x5844, 0x4865, 0x7806, 0x6827, 0x18C0, 0x08E1, 0x3882, 0x28A3, 0xCB7D, 0xDB5C, 0xEB3F,
0xFB1E, 0x8BF9, 0x9BD8, 0xABBB, 0xBB9A, 0x4A75, 0x5A54, 0x6A37, 0x7A16, 0x0AF1, 0x1AD0, 0x2AB3,
0x3A92, 0xFD2E, 0xED0F, 0xDD6C, 0xCD4D, 0xBDAA, 0xAD8B, 0x9DE8, 0x8DC9, 0x7C26, 0x6C07, 0x5C64,
0x4C45, 0x3CA2, 0x2C83, 0x1CE0, 0x0CC1, 0xEF1F, 0xFF3E, 0xCF5D, 0xDF7C, 0xAF9B, 0xBFBA, 0x8FD9,
0x9FF8, 0x6E17, 0x7E36, 0x4E55, 0x5E74, 0x2E93, 0x3EB2, 0x0ED1, 0x1EF0};
private RedisCRC16() {
}
public static int crc16(byte[] bytes) {
int crc = 0x0000;
for (byte b : bytes) {
crc = (crc << 8) ^ LOOKUP_TABLE[((crc >>> 8) ^ (b & 0xFF)) & 0xFF];
}
return crc & 0xFFFF;
}
public static int calcSlot(byte[] key) {
if (key == null) {
return 0;
}
int start = Utility.indexOf(key, (byte) '{');
if (start != -1) {
int end = Utility.indexOf(key, start + 1, (byte) '}');
if (end != -1) {
key = Arrays.copyOfRange(key, start + 1, end);
}
}
return crc16(key) % MAX_SLOT;
}
public static int calcSlot(String key) {
if (key == null) {
return 0;
}
int start = key.indexOf('{');
if (start != -1) {
int end = key.indexOf('}');
if (end != -1 && start + 1 < end) {
key = key.substring(start + 1, end);
}
}
return crc16(key.getBytes()) % MAX_SLOT;
}
}

View File

@ -1,58 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkalex.cache.redis;
import org.redkale.net.AsyncConnection;
import org.redkale.net.AsyncGroup;
import org.redkale.net.client.Client;
import org.redkale.net.client.ClientAddress;
import org.redkale.util.Traces;
/**
*
* @author zhangjx
*/
public class RedisCacheClient extends Client<RedisCacheConnection, RedisCacheRequest, RedisCacheResult> {
public RedisCacheClient(String appName, String name, AsyncGroup group, String key,
ClientAddress address, int maxConns, int maxPipelines, RedisCacheReqAuth authReq, RedisCacheReqDB dbReq) {
super(name, group, true, address, maxConns, maxPipelines, () -> new RedisCacheReqPing(), () -> new RedisCacheReqClose(), null); //maxConns
RedisCacheReqClientName clientNameReq = new RedisCacheReqClientName(appName, name);
if (authReq != null && dbReq != null) {
this.authenticate = traceid -> {
Traces.currentTraceid(traceid);
return conn -> writeChannelBatch(conn, authReq.createTime(), dbReq.createTime(), clientNameReq.createTime())
.thenApply(v -> conn);
};
} else if (authReq != null) {
this.authenticate = traceid -> {
Traces.currentTraceid(traceid);
return conn -> writeChannelBatch(conn, authReq.createTime(), clientNameReq.createTime())
.thenApply(v -> conn);
};
} else if (dbReq != null) {
this.authenticate = traceid -> {
Traces.currentTraceid(traceid);
return conn -> writeChannelBatch(conn, dbReq.createTime(), clientNameReq.createTime())
.thenApply(v -> conn);
};
} else {
this.authenticate = traceid -> {
Traces.currentTraceid(traceid);
return conn -> writeChannel(conn, clientNameReq.createTime())
.thenApply(v -> conn);
};
}
this.readTimeoutSeconds = 3;
this.writeTimeoutSeconds = 3;
}
@Override
protected RedisCacheConnection createClientConnection(final int index, AsyncConnection channel) {
return new RedisCacheConnection(this, index, channel);
}
}

View File

@ -1,255 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkalex.cache.redis;
import org.redkale.net.client.ClientCodec;
import org.redkale.net.client.ClientConnection;
import org.redkale.util.ByteArray;
import org.redkale.util.RedkaleException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;
/**
*
* @author zhangjx
*/
public class RedisCacheCodec extends ClientCodec<RedisCacheRequest, RedisCacheResult> {
protected static final byte TYPE_BULK = '$'; //字符串块类型, 例如$6\r\n\abcdef\r\nNULL字符串$-1\r\n
protected static final byte TYPE_MULTI = '*'; //数组紧接的数字为数组长度
protected static final byte TYPE_STRING = '+'; //字符串值类型字符串以\r\n结尾, 例如+OK\r\n
protected static final byte TYPE_ERROR = '-'; //错误字符串类型字符串以\r\n结尾, 例如-ERR unknown command 'red'\r\n
protected static final byte TYPE_NUMBER = ':'; //整型 例如:2\r\n
private static final Logger logger = Logger.getLogger(RedisCacheCodec.class.getSimpleName());
private ByteArray halfFrameBytes;
private int halfFrameBulkLength = Integer.MIN_VALUE;
private int halfFrameMultiSize = Integer.MIN_VALUE;
private int halfFrameMultiItemIndex; //从0开始
private byte halfFrameMultiItemType;
private int halfFrameMultiItemLength = Integer.MIN_VALUE;
private byte frameType;
private byte[] frameCursor;
private byte[] frameValue;
private List<byte[]> frameList;
private ByteArray recyclableArray;
public RedisCacheCodec(ClientConnection connection) {
super(connection);
}
private ByteArray pollArray(ByteArray array) {
if (recyclableArray == null) {
recyclableArray = new ByteArray();
}
recyclableArray.clear();
if (array != null) {
recyclableArray.put(array);
}
return recyclableArray;
}
private boolean readFrames(RedisCacheConnection conn, ByteBuffer buffer, ByteArray array) {
// byte[] dbs = new byte[buffer.remaining()];
// for (int i = 0; i < dbs.length; i++) {
// dbs[i] = buffer.get(buffer.position() + i);
// }
// (System. out).println("[" + Utility.nowMillis() + "] [" + Thread.currentThread().getName() + "]: " + conn + ", 原始数据: " + new String(dbs).replace("\r\n", " "));
array.clear();
if (this.frameType == 0) {
this.frameType = buffer.get();
} else if (halfFrameBytes != null) {
array.put(halfFrameBytes);
halfFrameBytes = null;
}
if (frameType == TYPE_STRING || frameType == TYPE_ERROR || frameType == TYPE_NUMBER) {
if (!readComplete(buffer, array)) {
halfFrameBytes = pollArray(array);
return false;
}
frameValue = array.getBytes();
} else if (frameType == TYPE_BULK) {
if (halfFrameBulkLength == Integer.MIN_VALUE) {
if (!readComplete(buffer, array)) { //没有读到bulkLength
halfFrameBytes = pollArray(array);
return false;
}
halfFrameBulkLength = readInt(array);
array.clear();
}
if (halfFrameBulkLength == -1) {
frameValue = null;
} else {
int expect = halfFrameBulkLength + 2 - array.length();
if (buffer.remaining() < expect) {
array.put(buffer);
halfFrameBytes = pollArray(array);
return false;
}
array.put(buffer, expect);
array.removeLastByte(); //移除\n
array.removeLastByte(); //移除\r
frameValue = array.getBytes();
}
} else if (frameType == TYPE_MULTI) {
int size = halfFrameMultiSize;
if (size == Integer.MIN_VALUE) {
if (!readComplete(buffer, array)) { //没有读到bulkLength
halfFrameBytes = pollArray(array);
return false;
}
size = readInt(array);
halfFrameMultiSize = size;
array.clear();
frameValue = null;
}
if (frameList == null) {
frameList = new ArrayList<>();
}
if (size > 0) {
int index = halfFrameMultiItemIndex;
for (int i = index; i < size; i++) {
if (!buffer.hasRemaining()) {
return false;
}
if (halfFrameMultiItemType == 0) {
halfFrameMultiItemType = buffer.get();
}
halfFrameMultiItemIndex = i;
final byte itemType = halfFrameMultiItemType;
if (itemType == TYPE_STRING || itemType == TYPE_ERROR || itemType == TYPE_NUMBER) {
if (!readComplete(buffer, array)) {
halfFrameBytes = pollArray(array);
return false;
}
frameList.add(array.getBytes());
} else if (itemType == TYPE_BULK) {
if (halfFrameMultiItemLength == Integer.MIN_VALUE) {
if (!readComplete(buffer, array)) { //没有读到bulkLength
halfFrameBytes = pollArray(array);
return false;
}
halfFrameMultiItemLength = readInt(array);
array.clear();
}
if (halfFrameMultiItemLength == -1) {
frameList.add(null);
} else {
int expect = halfFrameMultiItemLength + 2 - array.length();
if (buffer.remaining() < expect) {
array.put(buffer);
halfFrameBytes = pollArray(array);
return false;
}
array.put(buffer, expect);
array.removeLastByte(); //移除\n
array.removeLastByte(); //移除\r
frameList.add(array.getBytes());
}
} else if (itemType == TYPE_MULTI) { //数组中嵌套数组例如: SCANHSCAN
if (size == 2 && frameList != null && frameList.size() == 1) {
//读游标 数据例如: *2 $1 0 *4 $4 key1 $2 10 $4 key2 $2 30
frameCursor = frameList.get(0);
frameList.clear();
clearHalfFrame();
return readFrames(conn, buffer, array);
} else {
throw new RedkaleException("Not support multi type in array data");
}
}
halfFrameMultiItemType = 0;
halfFrameMultiItemLength = Integer.MIN_VALUE;
array.clear();
}
}
}
return true;
}
private void clearHalfFrame() {
halfFrameBytes = null;
halfFrameBulkLength = Integer.MIN_VALUE;
halfFrameMultiSize = Integer.MIN_VALUE;
halfFrameMultiItemLength = Integer.MIN_VALUE;
halfFrameMultiItemIndex = 0; //从0开始
halfFrameMultiItemType = 0;
}
@Override
public void decodeMessages(ByteBuffer realbuf, ByteArray array) {
RedisCacheConnection conn = (RedisCacheConnection) connection;
if (!realbuf.hasRemaining()) {
return;
}
ByteBuffer buffer = realbuf;
while (buffer.hasRemaining()) {
if (!readFrames(conn, buffer, array)) {
break;
}
RedisCacheRequest request = nextRequest();
if (frameType == TYPE_ERROR) {
addMessage(request, new RedkaleException(new String(frameValue, StandardCharsets.UTF_8)));
} else {
addMessage(request, conn.pollResultSet(request).prepare(frameType, frameCursor, frameValue, frameList));
}
frameType = 0;
frameCursor = null;
frameValue = null;
frameList = null;
clearHalfFrame();
buffer = realbuf;
}
}
protected RedisCacheRequest nextRequest() {
return super.nextRequest();
}
private boolean readComplete(ByteBuffer buffer, ByteArray array) {
while (buffer.hasRemaining()) {
byte b = buffer.get();
if (b == '\n') {
array.removeLastByte(); //移除 \r
return true;
}
array.put(b);
}
return false;
}
private int readInt(ByteArray array) {
String val = array.toString(StandardCharsets.ISO_8859_1);
if (val.length() == 1 && val.charAt(0) == '0') {
return 0;
}
if (val.length() == 2 && val.charAt(0) == '-' && val.charAt(1) == '1') {
return -1;
}
return Integer.parseInt(val);
}
}

View File

@ -1,64 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkalex.cache.redis;
import org.redkale.net.AsyncConnection;
import org.redkale.net.WorkThread;
import org.redkale.net.client.Client;
import org.redkale.net.client.ClientCodec;
import org.redkale.net.client.ClientConnection;
import org.redkale.net.client.ClientFuture;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
/**
*
* @author zhangjx
*/
public class RedisCacheConnection extends ClientConnection<RedisCacheRequest, RedisCacheResult> {
public RedisCacheConnection(Client client, int index, AsyncConnection channel) {
super(client, index, channel);
}
@Override
protected ClientCodec createCodec() {
return new RedisCacheCodec(this);
}
protected CompletableFuture<RedisCacheResult> writeRequest(RedisCacheRequest request) {
return super.writeChannel(request);
}
protected CompletableFuture<List<RedisCacheResult>> writeRequest(RedisCacheRequest[] requests) {
return super.writeChannel(requests);
}
protected <T> CompletableFuture<T> writeRequest(RedisCacheRequest request, Function<RedisCacheResult, T> respTransfer) {
return super.writeChannel(request, respTransfer);
}
protected <T> CompletableFuture<List<T>> writeRequest(RedisCacheRequest[] requests, Function<RedisCacheResult, T> respTransfer) {
return super.writeChannel(requests, respTransfer);
}
public RedisCacheResult pollResultSet(RedisCacheRequest request) {
RedisCacheResult rs = new RedisCacheResult();
return rs;
}
public RedisCacheRequest pollRequest(WorkThread workThread, String traceid) {
RedisCacheRequest rs = new RedisCacheRequest().workThread(workThread).traceid(traceid);
return rs;
}
protected ClientFuture<RedisCacheRequest, RedisCacheResult> pollRespFuture(Serializable requestid) {
return super.pollRespFuture(requestid);
}
}

View File

@ -1,44 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkalex.cache.redis;
import org.redkale.net.client.ClientConnection;
import org.redkale.util.ByteArray;
import java.nio.charset.StandardCharsets;
/**
*
* @author zhangjx
*/
public class RedisCacheReqAuth extends RedisCacheRequest {
private static final byte[] PS = "AUTH\r\n".getBytes(StandardCharsets.UTF_8);
protected String password;
public RedisCacheReqAuth(String password) {
this.password = password;
}
@Override
public void writeTo(ClientConnection conn, ByteArray writer) {
byte[] pwd = password.getBytes();
writer.put(mutliLengthBytes(2));
writer.put(bulkLengthBytes(4));
writer.put(PS);
writer.put(bulkLengthBytes(pwd.length));
writer.put(pwd);
writer.put(CRLF);
}
@Override
public String toString() {
return getClass().getSimpleName() + "{AUTH " + password + "}";
}
}

View File

@ -1,45 +0,0 @@
/*
*
*/
package org.redkalex.cache.redis;
import org.redkale.net.client.ClientConnection;
import org.redkale.util.ByteArray;
import org.redkale.util.Utility;
import java.nio.charset.StandardCharsets;
/**
*
* @author zhangjx
*/
public class RedisCacheReqClientName extends RedisCacheRequest {
private final String clientName;
public RedisCacheReqClientName(String appName, String resourceName) {
this.clientName = "redkalex" + (Utility.isEmpty(appName) ? "" : ("-" + appName))
+ (Utility.isEmpty(resourceName) ? "" : (":" + resourceName));
}
@Override
public void writeTo(ClientConnection conn, ByteArray writer) {
writer.put(mutliLengthBytes(3));
writer.put(bulkLengthBytes(6));
writer.put("CLIENT\r\n".getBytes(StandardCharsets.UTF_8));
writer.put(bulkLengthBytes(7));
writer.put("SETNAME\r\n".getBytes(StandardCharsets.UTF_8));
byte[] ns = clientName.getBytes(StandardCharsets.UTF_8);
writer.put(bulkLengthBytes(ns.length));
writer.put(ns);
writer.put(CRLF);
}
@Override
public String toString() {
return getClass().getSimpleName() + "{CLIENT SETNAME " + clientName + "}";
}
}

View File

@ -1,43 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkalex.cache.redis;
import org.redkale.net.client.ClientConnection;
import org.redkale.util.ByteArray;
import java.nio.charset.StandardCharsets;
/**
*
* @author zhangjx
*/
public class RedisCacheReqClose extends RedisCacheRequest {
private static final byte[] BYTES = new ByteArray()
.put((byte) '*')
.put((byte) '1')
.put((byte) '\r', (byte) '\n')
.put((byte) '$')
.put((byte) '4')
.put((byte) '\r', (byte) '\n')
.put("QUIT".getBytes(StandardCharsets.UTF_8))
.put((byte) '\r', (byte) '\n').getBytes();
@Override
public final boolean isCloseType() {
return true;
}
@Override
public void writeTo(ClientConnection conn, ByteArray writer) {
writer.put(BYTES);
}
@Override
public String toString() {
return getClass().getSimpleName() + "{QUIT}";
}
}

View File

@ -1,42 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkalex.cache.redis;
import org.redkale.net.client.ClientConnection;
import org.redkale.util.ByteArray;
import java.nio.charset.StandardCharsets;
/**
*
* @author zhangjx
*/
public class RedisCacheReqDB extends RedisCacheRequest {
protected int db;
public RedisCacheReqDB(int db) {
this.db = db;
}
@Override
public void writeTo(ClientConnection conn, ByteArray writer) {
writer.put(mutliLengthBytes(2));
writer.put(bulkLengthBytes(6));
writer.put("SELECT\r\n".getBytes(StandardCharsets.UTF_8));
byte[] dbs = String.valueOf(db).getBytes(StandardCharsets.UTF_8);
writer.put(bulkLengthBytes(dbs.length));
writer.put(dbs);
writer.put(CRLF);
}
@Override
public String toString() {
return getClass().getSimpleName() + "{SELECT " + db + "}";
}
}

View File

@ -1,38 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkalex.cache.redis;
import org.redkale.net.client.ClientConnection;
import org.redkale.util.ByteArray;
import java.nio.charset.StandardCharsets;
/**
*
* @author zhangjx
*/
public class RedisCacheReqPing extends RedisCacheRequest {
private static final byte[] BYTES = new ByteArray()
.put((byte) '*')
.put((byte) '1')
.put((byte) '\r', (byte) '\n')
.put((byte) '$')
.put((byte) '4')
.put((byte) '\r', (byte) '\n')
.put("PING".getBytes(StandardCharsets.UTF_8))
.put((byte) '\r', (byte) '\n').getBytes();
@Override
public void writeTo(ClientConnection conn, ByteArray writer) {
writer.put(BYTES);
}
@Override
public String toString() {
return getClass().getSimpleName() + "{PING}";
}
}

View File

@ -1,116 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkalex.cache.redis;
import org.redkale.net.client.ClientConnection;
import org.redkale.net.client.ClientRequest;
import org.redkale.util.ByteArray;
import java.nio.charset.StandardCharsets;
/**
*
* @author zhangjx
*/
public class RedisCacheRequest extends ClientRequest {
static final byte[] BYTES_TRUE = new byte[]{'t'};
static final byte[] BYTES_FALSE = new byte[]{'f'};
static final byte[] BYTES_MATCH = "MATCH".getBytes(StandardCharsets.UTF_8);
static final byte[] BYTES_COUNT = "COUNT".getBytes(StandardCharsets.UTF_8);
protected static final byte[] CRLF = new byte[]{'\r', '\n'};
private static final byte[][] starLengthBytes;
private static final byte[][] dollarLengthBytes;
static {
starLengthBytes = new byte[1024][];
dollarLengthBytes = new byte[1024][];
for (int i = 0; i < dollarLengthBytes.length; i++) {
starLengthBytes[i] = ("*" + i + "\r\n").getBytes(StandardCharsets.ISO_8859_1);
dollarLengthBytes[i] = ("$" + i + "\r\n").getBytes(StandardCharsets.ISO_8859_1);
}
}
protected RedisCommand command;
protected String key;
protected byte[][] args;
public static RedisCacheRequest create(RedisCommand command, String key, String... args) {
return new RedisCacheRequest().prepare(command, key, RedisCacheSource.keysArgs(key, args));
}
public static RedisCacheRequest create(RedisCommand command, String key, byte[]... args) {
return new RedisCacheRequest().prepare(command, key, args);
}
public RedisCacheRequest prepare(RedisCommand command, String key, byte[]... args) {
super.prepare();
this.command = command;
this.key = key;
this.args = args;
return this;
}
public RedisCacheRequest createTime() {
this.createTime = System.currentTimeMillis();
return this;
}
@Override
public void writeTo(ClientConnection conn, ByteArray writer) {
writer.put(mutliLengthBytes(args.length + 1));
writer.put(command.getBytes());
for (final byte[] arg : args) {
putArgBytes(writer, arg);
}
}
protected void putArgBytes(ByteArray writer, byte[] arg) {
writer.put(bulkLengthBytes(arg.length));
writer.put(arg);
writer.put(CRLF);
}
protected static byte[] mutliLengthBytes(int length) {
if (length >= 0 && length < starLengthBytes.length) {
return starLengthBytes[length];
} else {
return ("*" + length + "\r\n").getBytes(StandardCharsets.ISO_8859_1);
}
}
protected static byte[] bulkLengthBytes(int length) {
if (length >= 0 && length < dollarLengthBytes.length) {
return dollarLengthBytes[length];
} else {
return ("$" + length + "\r\n").getBytes(StandardCharsets.ISO_8859_1);
}
}
@Override
public String toString() {
if (args == null || args.length == 0) {
return getClass().getSimpleName() + "{" + command + " " + key + "}";
} else {
StringBuilder sb = new StringBuilder();
sb.append(command);
sb.append(" ").append(key);
for (final byte[] arg : args) {
sb.append(" ").append(arg == null ? null : new String(arg, StandardCharsets.UTF_8));
}
return sb.toString();
}
}
}

View File

@ -1,262 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkalex.cache.redis;
import org.redkale.convert.json.JsonConvert;
import org.redkale.net.client.ClientResult;
import org.redkale.source.CacheScoredValue;
import java.lang.reflect.Type;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.util.*;
/**
*
* @author zhangjx
*/
public class RedisCacheResult implements ClientResult {
//$ 块字符串类型
//* 数组
//+ 简单字符串类型
//- 错误类型
//: 整型
protected byte frameType;
protected byte[] frameCursor;
protected byte[] frameValue;
protected List<byte[]> frameList;
public RedisCacheResult prepare(byte byteType, byte[] frameCursor, byte[] frameValue, List<byte[]> frameList) {
this.frameType = byteType;
this.frameCursor = frameCursor;
this.frameValue = frameValue;
this.frameList = frameList;
return this;
}
@Override
public boolean isKeepAlive() {
return true;
}
public Void getVoidValue() {
return null;
}
public byte[] getFrameValue() {
return frameValue;
}
public int getCursor() {
if (frameCursor == null || frameCursor.length < 1) {
return -1;
} else {
return Integer.parseInt(new String(frameCursor));
}
}
public Boolean getBoolValue() {
if (frameValue == null) {
return false;
}
String val = new String(frameValue, StandardCharsets.UTF_8);
if ("OK".equals(val)) {
return true;
}
if (val.isEmpty()) {
return false;
}
for (char ch : val.toCharArray()) {
if (!Character.isDigit(ch)) {
return false;
}
}
return Integer.parseInt(val) > 0;
}
public Double getDoubleValue(Double defValue) {
if (frameValue == null) {
return defValue;
}
String val = new String(frameValue, StandardCharsets.UTF_8);
if ("nan".equalsIgnoreCase(val) || "-nan".equalsIgnoreCase(val)) {
return Double.NaN;
} else if ("inf".equalsIgnoreCase(val)) {
return Double.POSITIVE_INFINITY;
} else if ("-inf".equalsIgnoreCase(val)) {
return Double.NEGATIVE_INFINITY;
} else if ("-1".equalsIgnoreCase(val)) {
return -1.0;
} else if ("0".equalsIgnoreCase(val)) {
return 0.0;
} else if ("1".equalsIgnoreCase(val)) {
return 1.0;
} else {
return Double.parseDouble(val);
}
}
public Long getLongValue(Long defValue) {
if (frameValue == null) {
return defValue;
}
String val = new String(frameValue, StandardCharsets.UTF_8);
if ("-1".equalsIgnoreCase(val)) {
return -1L;
} else if ("0".equalsIgnoreCase(val)) {
return 0L;
} else if ("1".equalsIgnoreCase(val)) {
return 1L;
} else {
return Long.parseLong(val);
}
}
public Integer getIntValue(Integer defValue) {
if (frameValue == null) {
return defValue;
}
String val = new String(frameValue, StandardCharsets.UTF_8);
if ("-1".equalsIgnoreCase(val)) {
return -1;
} else if ("0".equalsIgnoreCase(val)) {
return 0;
} else if ("1".equalsIgnoreCase(val)) {
return 1;
} else {
return Integer.parseInt(val);
}
}
public <T> T getObjectValue(String key, RedisCryptor cryptor, Type type) {
return decodeValue(key, cryptor, frameValue, type);
}
protected <T> Set<T> getSetValue(String key, RedisCryptor cryptor, Type type) {
if (frameList == null || frameList.isEmpty()) {
return new LinkedHashSet<>();
}
Set<T> set = new LinkedHashSet<>();
for (byte[] bs : frameList) {
set.add(decodeValue(key, cryptor, bs, type));
}
return set;
}
protected List<CacheScoredValue> getScoreListValue(String key, RedisCryptor cryptor, Type scoreType) {
if (frameList == null || frameList.isEmpty()) {
return new ArrayList<>();
}
List<CacheScoredValue> set = new ArrayList<>();
for (int i = 0; i < frameList.size(); i += 2) {
byte[] bs1 = frameList.get(i);
byte[] bs2 = frameList.get(i + 1);
Number val = decodeValue(key, cryptor, bs2, scoreType);
if (val != null) {
set.add(CacheScoredValue.create(val, new String(bs1, StandardCharsets.UTF_8)));
}
}
return set;
}
protected <T> List<T> getListValue(String key, RedisCryptor cryptor, Type type) {
if (frameList == null || frameList.isEmpty()) {
return new ArrayList<>();
}
List<T> list = new ArrayList<>();
for (byte[] bs : frameList) {
list.add(decodeValue(key, cryptor, bs, type));
}
return list;
}
protected <T> Map<String, T> getMapValue(String key, RedisCryptor cryptor, Type type) {
if (frameList == null || frameList.isEmpty()) {
return new LinkedHashMap<>();
}
Map<String, T> map = new LinkedHashMap<>();
for (int i = 0; i < frameList.size(); i += 2) {
byte[] bs1 = frameList.get(i);
byte[] bs2 = frameList.get(i + 1);
T val = decodeValue(key, cryptor, bs2, type);
if (val != null) {
map.put(decodeValue(key, cryptor, bs1, String.class).toString(), val);
}
}
return map;
}
protected static <T> T decodeValue(String key, RedisCryptor cryptor, byte[] frames, Type type) {
if (frames == null) {
return null;
}
if (type == byte[].class) {
return (T) frames;
}
if (type == String.class) {
String val = new String(frames, StandardCharsets.UTF_8);
if (cryptor != null) {
val = cryptor.decrypt(key, val);
}
return (T) val;
}
if (type == int.class || type == Integer.class) {
return (T) (Integer) Integer.parseInt(new String(frames, StandardCharsets.UTF_8));
}
if (type == long.class || type == Long.class) {
return (T) (Long) Long.parseLong(new String(frames, StandardCharsets.UTF_8));
}
if (type == float.class || type == Float.class) {
return (T) (Float) Float.parseFloat(new String(frames, StandardCharsets.UTF_8));
}
if (type == BigInteger.class) {
return (T) new BigInteger(new String(frames, StandardCharsets.UTF_8));
}
if (type == BigDecimal.class) {
return (T) new BigDecimal(new String(frames, StandardCharsets.UTF_8));
}
if (type == boolean.class || type == Boolean.class) {
String v = new String(frames, StandardCharsets.UTF_8);
return (T) (Boolean) ("t".equalsIgnoreCase(v) || "1".equals(v));
}
if (type == double.class || type == Double.class) {
return (T) (Double) Double.parseDouble(new String(frames, StandardCharsets.UTF_8));
}
if (cryptor != null) {
String val = cryptor.decrypt(key, new String(frames, StandardCharsets.UTF_8));
return (T) JsonConvert.root().convertFrom(type, val);
}
return (T) JsonConvert.root().convertFrom(type, frames);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("{type: ").append(frameType);
if (frameValue != null) {
sb.append(", value: ").append(new String(frameValue, StandardCharsets.UTF_8));
}
if (frameList != null) {
sb.append(", list: [");
boolean first = true;
for (byte[] bs : frameList) {
if (!first) {
sb.append(", ");
}
sb.append(bs == null ? null : new String(bs, StandardCharsets.UTF_8));
first = false;
}
sb.append("]");
}
return sb.append("}").toString();
}
}

File diff suppressed because it is too large Load Diff

View File

@ -1,30 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkalex.cache.redis;
import org.redkale.annotation.Priority;
import org.redkale.source.CacheSource;
import org.redkale.source.CacheSourceProvider;
import org.redkale.util.AnyValue;
/**
*
* @author zhangjx
*/
@Priority(-900)
public class RedisCacheSourceProvider implements CacheSourceProvider {
@Override
public boolean acceptsConf(AnyValue config) {
return new RedisCacheSource().acceptsConf(config);
}
@Override
public CacheSource createInstance() {
return new RedisCacheSource();
}
}

View File

@ -1,124 +0,0 @@
/*
*
*/
package org.redkalex.cache.redis;
import java.nio.charset.StandardCharsets;
/**
* @author zhangjx
*/
public enum RedisCommand {
EXISTS("EXISTS", true),
GET("GET", true),
GETEX("GETEX", true),
MSET("MSET", false),
SET("SET", false),
SETNX("SETNX", false),
GETSET("GETSET", false),
GETDEL("GETDEL", false),
SETEX("SETEX", false),
EXPIRE("EXPIRE", false),
PERSIST("PERSIST", false),
RENAME("RENAME", false),
RENAMENX("RENAMENX", false),
DEL("DEL", false),
INCR("INCR", false),
INCRBY("INCRBY", false),
INCRBYFLOAT("INCRBYFLOAT", false),
DECR("DECR", false),
DECRBY("DECRBY", false),
HDEL("HDEL", false),
HINCRBY("HINCRBY", false),
HINCRBYFLOAT("HINCRBYFLOAT", false),
LINSERT("LINSERT", false),
LTRIM("LTRIM", false),
LPOP("LPOP", false),
LPUSH("LPUSH", false),
LPUSHX("LPUSHX", false),
RPOP("RPOP", false),
RPOPLPUSH("RPOPLPUSH", false),
SMOVE("SMOVE", false),
RPUSH("RPUSH", false),
RPUSHX("RPUSHX", false),
LREM("LREM", false),
SADD("SADD", false),
SPOP("SPOP", false),
SSCAN("SSCAN", true),
SREM("SREM", false),
ZADD("ZADD", false),
ZINCRBY("ZINCRBY", false),
ZREM("ZREM", false),
ZMSCORE("ZMSCORE", true),
ZSCORE("ZSCORE", true),
ZCARD("ZCARD", true),
ZRANK("ZRANK", true),
ZREVRANK("ZREVRANK", true),
ZRANGE("ZRANGE", true),
ZSCAN("ZSCAN", true),
FLUSHDB("FLUSHDB", false),
FLUSHALL("FLUSHALL", false),
HMGET("HMGET", true),
HLEN("HLEN", true),
HKEYS("HKEYS", true),
HEXISTS("HEXISTS", true),
HSET("HSET", true),
HSETNX("HSETNX", true),
HMSET("HMSET", true),
HSCAN("HSCAN", true),
HGETALL("HGETALL", true),
HVALS("HVALS", true),
HGET("HGET", true),
HSTRLEN("HSTRLEN", true),
SMEMBERS("SMEMBERS", true),
SISMEMBER("SISMEMBER", true),
LRANGE("LRANGE", true),
LINDEX("LINDEX", true),
LLEN("LLEN", true),
SCARD("SCARD", true),
SMISMEMBER("SMISMEMBER", true),
SRANDMEMBER("SRANDMEMBER", true),
SDIFF("SDIFF", true),
SDIFFSTORE("SDIFFSTORE", false),
SINTER("SINTER", true),
SINTERSTORE("SINTERSTORE", false),
SUNION("SUNION", true),
SUNIONSTORE("SUNIONSTORE", false),
MGET("MGET", true),
KEYS("KEYS", true),
SCAN("SCAN", true),
DBSIZE("DBSIZE", true),
TYPE("TYPE", true),
SUBSCRIBE("SUBSCRIBE", false),
UNSUBSCRIBE("UNSUBSCRIBE", false),
PUBLISH("PUBLISH", false),
PUBSUB("PUBSUB", true),
GETBIT("GETBIT", true),
SETBIT("SETBIT", false),
EVAL("EVAL", false);
private final String command;
private final byte[] bytes;
private final boolean readOnly;
private RedisCommand(String command, boolean readOnly) {
this.command = command;
this.readOnly = readOnly;
this.bytes = ("$" + command.length() + "\r\n" + command + "\r\n").getBytes(StandardCharsets.ISO_8859_1);
}
public String getCommand() {
return command;
}
public byte[] getBytes() {
return bytes;
}
public boolean isReadOnly() {
return readOnly;
}
}

View File

@ -1,190 +0,0 @@
/*
*
*/
package org.redkalex.cache.redis;
import org.redkale.convert.json.JsonConvert;
import org.redkale.util.AnyValue;
import org.redkale.util.RedkaleException;
import org.redkale.util.Utility;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import static org.redkale.source.AbstractCacheSource.*;
import static org.redkale.util.Utility.isEmpty;
import static org.redkale.util.Utility.isNotEmpty;
/**
*
* @author zhangjx
*/
public class RedisConfig {
private boolean ssl;
private int db;
private String username;
private String password;
private int maxconns;
private int pipelines;
private List<String> addresses;
public static RedisConfig create(AnyValue conf) {
RedisConfig result = new RedisConfig();
int gdb = conf.getIntValue(CACHE_SOURCE_DB, 0);
String gusername = null;
String gpassword = conf.getValue(CACHE_SOURCE_PASSWORD);
int gmaxconns = conf.getIntValue(CACHE_SOURCE_MAXCONNS, Utility.cpus());
int gpipelines = conf.getIntValue(CACHE_SOURCE_PIPELINES, org.redkale.net.client.Client.DEFAULT_MAX_PIPELINES);
String nodes = conf.getValue(CACHE_SOURCE_NODES, conf.getValue("url", ""));
if (Utility.isEmpty(nodes)) {
throw new RedkaleException("Not found nodes config for redis");
}
List<String> addrs = new ArrayList<>();
for (String url : nodes.replace(',', ';').split(";")) {
String urluser = null;
String urlpwd = null;
String urldb = null;
String urlmaxconns = null;
String urlpipelines = null;
addrs.add(url);
if (url.startsWith("redis://")) { //兼容 redis://:1234@127.0.0.1:6379?db=2
URI uri = URI.create(url);
String userInfo = uri.getUserInfo();
if (isEmpty(userInfo)) {
String authority = uri.getAuthority();
if (authority != null && authority.indexOf('@') > 0) {
userInfo = authority.substring(0, authority.indexOf('@'));
}
}
if (isNotEmpty(userInfo)) {
urlpwd = userInfo;
if (urlpwd.startsWith(":")) {
urlpwd = urlpwd.substring(1);
} else {
int index = urlpwd.indexOf(':');
if (index > 0) {
urluser = urlpwd.substring(0, index);
urlpwd = urlpwd.substring(index + 1);
}
}
}
if (isNotEmpty(uri.getQuery())) {
String[] qrys = uri.getQuery().split("&|=");
for (int i = 0; i < qrys.length; i += 2) {
if (CACHE_SOURCE_USER.equals(qrys[i])) {
urluser = i == qrys.length - 1 ? "" : qrys[i + 1];
} else if (CACHE_SOURCE_PASSWORD.equals(qrys[i])) {
urlpwd = i == qrys.length - 1 ? "" : qrys[i + 1];
} else if (CACHE_SOURCE_DB.equals(qrys[i])) {
urldb = i == qrys.length - 1 ? "" : qrys[i + 1];
} else if (CACHE_SOURCE_MAXCONNS.equals(qrys[i])) {
urlmaxconns = i == qrys.length - 1 ? "" : qrys[i + 1];
} else if (CACHE_SOURCE_PIPELINES.equals(qrys[i])) {
urlpipelines = i == qrys.length - 1 ? "" : qrys[i + 1];
}
}
}
if (isNotEmpty(urluser)) {
gusername = urluser;
}
if (isNotEmpty(urlpwd)) {
gpassword = urlpwd;
}
if (isNotEmpty(urlmaxconns)) {
gmaxconns = Integer.parseInt(urlmaxconns);
}
if (isNotEmpty(urlpipelines)) {
gpipelines = Integer.parseInt(urlpipelines);
}
if (isNotEmpty(urldb)) {
gdb = Integer.parseInt(urldb);
}
}
}
result.ssl = nodes.startsWith("rediss://");
result.db = gdb;
if (isNotEmpty(gusername)) {
result.username = gusername;
}
if (isNotEmpty(gpassword)) {
result.password = gpassword;
}
result.maxconns = gmaxconns;
result.pipelines = gpipelines;
result.addresses = addrs;
return result;
}
public boolean isSsl() {
return ssl;
}
public void setSsl(boolean ssl) {
this.ssl = ssl;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public int getDb() {
return db;
}
public void setDb(int db) {
this.db = db;
}
public int getMaxconns() {
return maxconns;
}
public int getMaxconns(int min) {
return Math.max(maxconns, min);
}
public void setMaxconns(int maxconns) {
this.maxconns = maxconns;
}
public int getPipelines() {
return pipelines;
}
public void setPipelines(int pipelines) {
this.pipelines = pipelines;
}
public List<String> getAddresses() {
return addresses;
}
public void setAddresses(List<String> addresses) {
this.addresses = addresses;
}
@Override
public String toString() {
return JsonConvert.root().convertTo(this);
}
}

View File

@ -1,49 +0,0 @@
/*
* Click nbfs://nbhost/SystemFileSystem/Templates/Licenses/license-default.txt to change this license
* Click nbfs://nbhost/SystemFileSystem/Templates/Classes/Class.java to edit this template
*/
package org.redkalex.cache.redis;
import org.redkale.util.AnyValue;
/**
*
* @author zhangjx
*/
public interface RedisCryptor {
/**
* 初始化
*
* @param conf 配置
*/
public void init(AnyValue conf);
/**
* 加密, 无需加密的key对应的值需要直接返回value
*
* @param key key
* @param value 明文
*
* @return 密文
*/
public String encrypt(String key, String value);
/**
* 解密, 无需解密的key对应的值需要直接返回value
*
* @param key key
* @param value 密文
*
* @return 明文
*/
public String decrypt(String key, String value);
/**
* 销毁
*
* @param conf 配置
*/
public void destroy(AnyValue conf);
}

View File

@ -1,173 +1,25 @@
package net.tccn;
package org.redkalex.cache.redis;
import org.junit.Test;
import org.redkale.net.AsyncIOGroup;
import org.redkale.convert.json.JsonFactory;
import org.redkale.util.AnyValue;
import org.redkale.util.ResourceFactory;
import org.redkalex.cache.redis.MyRedisCacheSource;
import static org.redkale.boot.Application.RESNAME_APP_CLIENT_ASYNCGROUP;
import static org.redkale.source.AbstractCacheSource.*;
import java.util.Map;
public class RedisTest {
static MyRedisCacheSource source = new MyRedisCacheSource();
static MyRedisCacheSource<String> source = new MyRedisCacheSource();
/**
* 3
*/
@Test
public void keyTest() {
source.set("a", 3);
System.out.println(source.get("a"));
source.del("a");
}
/**
* ax:false
* ax:true
* ax:false
*/
@Test
public void bitTest() {
boolean ax = source.getBit("ax", 6);
System.out.println("ax:"+ ax); // false
source.setBit("ax", 6, true);
ax = source.getBit("ax", 6);
System.out.println("ax:"+ ax); // true
source.setBit("ax", 6, false);
ax = source.getBit("ax", 6);
System.out.println("ax:"+ ax); // false
source.del("ax");
}
@Test
public void setTest() {
source.del("setx");
source.sadd("setx", int.class, 1, 2, 3, 5, 6);
int setx = source.spop("setx", int.class);
System.out.println(setx);
setx = source.spop("setx", int.class);
System.out.println(setx);
source.del("setx");
source.srem("setx", int.class,213, 2312);
/*//source.sadd("setx", list.toArray(Integer[]::new));
List<Integer> list = List.of(2, 3, 5);
// source.sadd("setx", list.toArray(Integer[]::new));
source.sadd("setx", list.toArray(Integer[]::new));
source.sadd("setx", 12, 2312, 213);
source.sadd("setx", List.of(1011, 10222));*/
}
static { // redis://:*Zhong9307!@47.111.150.118:6064?db=2
AnyValue.DefaultAnyValue conf = new AnyValue.DefaultAnyValue().addValue(CACHE_SOURCE_MAXCONNS, "1");
conf.addValue(CACHE_SOURCE_NODES, "redis://:123456@127.0.0.1:6379?db=0");
final ResourceFactory factory = ResourceFactory.create();
final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16);
asyncGroup.start();
factory.register(RESNAME_APP_CLIENT_ASYNCGROUP, asyncGroup);
factory.inject(source);
//source.defaultConvert = JsonFactory.root().getConvert();
source.init(conf);
/*
source.lock("lockx", 5000);
*/
source.keysStartsWith("more-hot").forEach(x -> {
System.out.println(x);
source.del(x);
int i = (short) 3;
});
//--------------------- set ------------------------------
/*
*/
/*
Collection<String> setx1 = source.getCollection("setx", String.class);
System.out.println(setx1);
//source.getexLong()
source.setHms("hmx", Map.of("a", "5", "b", "51", "c", "ads"));
List<Serializable> hmget = source.hmget("hmx", int.class, "a");
System.out.println(hmget);
Integer hm = source.getHm("hmx", int.class, "ads");
System.out.println(hm);
Map<String, String> hms = source.getHms("hmx", "a", "b");
System.out.println("hmx:" + hms);
*//*System.out.println("======================================================");
System.out.println(source.incrHm("hmx", "a", -6.0));
hms = source.getHms("hmx", "a", "b");
System.out.println("hmxa+1后的结果 " + hms);*//*
System.out.println("======================================================");
source.setHm("hmx", "c", 12);
hms = source.getHms("hmx", "a", "b", "c", "d", "a");
System.out.println("hmx设置 c=12 后的结果 " + hms);
System.out.println("======================================================");
Double c = source.getHm("hmx", double.class, "c");
System.out.println("hmx 中 c 值:" + c);*/
/*Map<String, Object> hmx = source.getHmall("hmx");
System.out.println("Hmall" + hmx);*/
/*AnyValue.DefaultAnyValue conf = new AnyValue.DefaultAnyValue();
static {
AnyValue.DefaultAnyValue conf = new AnyValue.DefaultAnyValue();
conf.addValue("node", new AnyValue.DefaultAnyValue().addValue("addr", "47.111.150.118").addValue("port", "6064").addValue("password", "*Zhong9307!").addValue("db", 2));
source.defaultConvert = JsonFactory.root().getConvert();
source.initValueType(String.class); //value用String类型
source.init(conf);*/
source.init(conf);
}
public static void main(String[] args) {
//source.setLong("a", 125);
/*long a = source.getLong("a", 0);
System.out.println(a);
List<String> keys = source.keys("farm*");
keys.forEach(x -> System.out.println(x));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}*/
// ===========================================
//System.out.println(source.remove("a", "b"));
// bit
@ -263,10 +115,10 @@ public class RedisTest {
System.out.println(source.getCollectionSize("sk")); // 2*/
/*Map<String, String> hms = source.getHms("supportusers", "5-kfeu0f", "xxxx", "3-0kbt7u8t", "95q- ");
Map<String, String> hms = source.getHms("supportusers", "5-kfeu0f", "xxxx", "3-0kbt7u8t", "95q- ");
hms.forEach((k, v) -> {
System.out.println(k + " : " + v);
});*/
});
/*MyRedisCacheSource<String> source2 = new MyRedisCacheSource();

File diff suppressed because it is too large Load Diff