From c6a24098ff4b023ba18ef8f4f4b2839319015a0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=A2=81=E6=98=BE=E4=BC=98?= <237809796@qq.com> Date: Wed, 3 Nov 2021 19:56:27 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=EF=BC=9A1.redtimer=20?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=E5=8C=85=E6=9A=82=E6=97=B6=E5=90=88=E5=B9=B6?= =?UTF-8?q?=E5=88=B0=E6=AD=A4=E5=B7=A5=E7=A8=8B=202.=E6=96=B0=E5=A2=9ETime?= =?UTF-8?q?rs.tryDelay=E3=80=81Timers.delay=20=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- conf/application.xml | 7 +- conf/config.properties | 4 +- src/com/zdemo/cache_/RedisCacheSource.java | 2173 +++++++++++++++++ src/com/zdemo/cache_/RedisTest.java | 262 ++ src/com/zdemo/zhub/ZHubClient.java | 16 +- src/net/tccn/timer/TimerExecutor.java | 66 + src/net/tccn/timer/TimerTask.java | 108 + src/net/tccn/timer/Timers.java | 45 + src/net/tccn/timer/queue/TimerQueue.java | 105 + src/net/tccn/timer/scheduled/Scheduled.java | 23 + .../tccn/timer/scheduled/ScheduledCycle.java | 79 + .../tccn/timer/scheduled/ScheduledExpres.java | 499 ++++ src/net/tccn/timer/task/Job.java | 14 + src/net/tccn/timer/task/Task.java | 69 + test/com/zdemo/test/AppTest.java | 115 +- .../zhub => test/com/zdemo/test}/Delays.java | 0 test/com/zdemo/test/HelloService.java | 205 +- 17 files changed, 3774 insertions(+), 16 deletions(-) create mode 100644 src/com/zdemo/cache_/RedisCacheSource.java create mode 100644 src/com/zdemo/cache_/RedisTest.java create mode 100644 src/net/tccn/timer/TimerExecutor.java create mode 100644 src/net/tccn/timer/TimerTask.java create mode 100644 src/net/tccn/timer/Timers.java create mode 100644 src/net/tccn/timer/queue/TimerQueue.java create mode 100644 src/net/tccn/timer/scheduled/Scheduled.java create mode 100644 src/net/tccn/timer/scheduled/ScheduledCycle.java create mode 100644 src/net/tccn/timer/scheduled/ScheduledExpres.java create mode 100644 src/net/tccn/timer/task/Job.java create mode 100644 src/net/tccn/timer/task/Task.java rename {src/com/zdemo/zhub => test/com/zdemo/test}/Delays.java (100%) diff --git a/conf/application.xml b/conf/application.xml index c11fbfe..68f0ce3 100644 --- a/conf/application.xml +++ b/conf/application.xml @@ -1,8 +1,9 @@ - + + @@ -16,10 +17,10 @@ - + - + diff --git a/conf/config.properties b/conf/config.properties index ff1a7b4..da55390 100644 --- a/conf/config.properties +++ b/conf/config.properties @@ -10,5 +10,5 @@ pulsar.serviceurl=pulsar://47.113.228.247:6650 # zdb.host = 127.0.0.1 # zdb.port = 1216 -zhub.host = 47.111.150.118 -zhub.port = 6066 \ No newline at end of file +zhub.host = 127.0.0.1 +zhub.port = 1216 \ No newline at end of file diff --git a/src/com/zdemo/cache_/RedisCacheSource.java b/src/com/zdemo/cache_/RedisCacheSource.java new file mode 100644 index 0000000..4bf837e --- /dev/null +++ b/src/com/zdemo/cache_/RedisCacheSource.java @@ -0,0 +1,2173 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package com.zdemo.cache_; + +import org.redkale.convert.Convert; +import org.redkale.convert.json.JsonConvert; +import org.redkale.convert.json.JsonFactory; +import org.redkale.net.AsyncConnection; +import org.redkale.net.Transport; +import org.redkale.net.TransportFactory; +import org.redkale.service.AbstractService; +import org.redkale.service.Local; +import org.redkale.service.Service; +import org.redkale.source.CacheSource; +import org.redkale.source.Flipper; +import org.redkale.util.*; +import org.redkale.util.AnyValue.DefaultAnyValue; + +import javax.annotation.Resource; +import java.io.IOException; +import java.io.Serializable; +import java.lang.reflect.Type; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.CompletionHandler; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Semaphore; +import java.util.logging.Level; +import java.util.logging.Logger; + +import static org.redkale.boot.Application.RESNAME_APP_GROUP; + +/** + * 详情见: https://redkale.org + * + * @param Value + * @author zhangjx + */ +@Local +@AutoLoad(false) +@ResourceType(CacheSource.class) +public class RedisCacheSource extends AbstractService implements CacheSource, Service, AutoCloseable, Resourcable { + + protected static final byte DOLLAR_BYTE = '$'; + + protected static final byte ASTERISK_BYTE = '*'; + + protected static final byte PLUS_BYTE = '+'; + + protected static final byte MINUS_BYTE = '-'; + + protected static final byte COLON_BYTE = ':'; + + private final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); + + @Resource(name = RESNAME_APP_GROUP) + protected AsyncGroup asyncGroup; + + @Resource + public JsonConvert defaultConvert; + + @Resource(name = "$_convert") + public JsonConvert convert; + + protected Type objValueType = String.class; + + protected Map passwords; + + protected List nodeAddrs; + + protected int db; + + protected Transport transport; + + @Override + public void init(AnyValue conf) { + if (this.convert == null) this.convert = this.defaultConvert; + if (conf == null) conf = new DefaultAnyValue(); + final int readTimeoutSeconds = conf.getIntValue("readTimeoutSeconds", TransportFactory.DEFAULT_READTIMEOUTSECONDS); + final int writeTimeoutSeconds = conf.getIntValue("writeTimeoutSeconds", TransportFactory.DEFAULT_WRITETIMEOUTSECONDS); + final List addresses = new ArrayList<>(); + Map passwords0 = new HashMap<>(); + for (AnyValue node : conf.getAnyValues("node")) { + String addrstr = node.getValue("addr"); + InetSocketAddress addr = null; + if (addrstr.startsWith("redis://")) { + addrstr = addrstr.substring("redis://".length()); + int pos = addrstr.indexOf(':'); + addr = new InetSocketAddress(addrstr.substring(0, pos), Integer.parseInt(addrstr.substring(pos + 1))); + addresses.add(addr); + } else { + addr = new InetSocketAddress(addrstr, node.getIntValue("port")); + addresses.add(addr); + } + String password = node.getValue("password", "").trim(); + if (!password.isEmpty()) passwords0.put(addr, password.getBytes(StandardCharsets.UTF_8)); + String db0 = node.getValue("db", "").trim(); + if (!db0.isEmpty()) this.db = Integer.valueOf(db0); + } + if (!passwords0.isEmpty()) this.passwords = passwords0; + this.nodeAddrs = addresses; + TransportFactory transportFactory = TransportFactory.create(asyncGroup, readTimeoutSeconds, writeTimeoutSeconds); + this.transport = transportFactory.createTransportTCP("Redis-Transport", null, addresses); + this.transport.setSemaphore(new Semaphore(conf.getIntValue("maxconns", 1000))); + if (logger.isLoggable(Level.FINE)) + logger.log(Level.FINE, RedisCacheSource.class.getSimpleName() + ": addrs=" + addresses + ", db=" + db); + + } + + @Override //ServiceLoader时判断配置是否符合当前实现类 + public boolean match(AnyValue config) { + if (config == null) return false; + AnyValue[] nodes = config.getAnyValues("node"); + if (nodes == null || nodes.length == 0) return false; + for (AnyValue node : nodes) { + if (node.getValue("addr") != null && node.getValue("port") != null) return true; + if (node.getValue("addr") != null && node.getValue("addr").startsWith("redis://")) return true; + } + return false; + } + + public void updateRemoteAddresses(final Collection addresses) { + this.transport.updateRemoteAddresses(addresses); + } + + @Override + @Deprecated + public final void initValueType(Type valueType) { + this.objValueType = valueType == null ? String.class : valueType; + } + + @Override + @Deprecated + public final void initTransient(boolean flag) { + } + + @Override + public final String getType() { + return "redis"; + } + + public static void main(String[] args) throws Exception { + DefaultAnyValue conf = new DefaultAnyValue().addValue("maxconns", "1"); + conf.addValue("node", new DefaultAnyValue().addValue("addr", "127.0.0.1").addValue("port", "6363")); + + final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16); + asyncGroup.start(); + ResourceFactory.root().register(RESNAME_APP_GROUP, asyncGroup); + + RedisCacheSource source = new RedisCacheSource(); + ResourceFactory.root().inject(source); + source.init(null); + source.defaultConvert = JsonFactory.root().getConvert(); + source.init(conf); + InetSocketAddress addr = new InetSocketAddress("127.0.0.1", 7788); + try { + System.out.println("------------------------------------"); + source.removeAsync("stritem1"); + source.removeAsync("stritem2"); + source.setStringAsync("stritem1", "value1"); + source.setStringAsync("stritem2", "value2"); + System.out.println("stritem开头的key有两个: " + source.queryKeysStartsWith("stritem")); + System.out.println("[有值] MGET : " + source.getStringMap("stritem1", "stritem2")); + System.out.println("[有值] MGET : " + Arrays.toString(source.getStringArray("stritem1", "stritem2"))); + + source.remove("intitem1"); + source.remove("intitem2"); + source.setLong("intitem1", 333); + source.setLong("intitem2", 444); + System.out.println("[有值] MGET : " + source.getStringMap("intitem1", "intitem22", "intitem2")); + System.out.println("[有值] MGET : " + Arrays.toString(source.getStringArray("intitem1", "intitem22", "intitem2"))); + source.remove("objitem1"); + source.remove("objitem2"); + source.set("objitem1", Flipper.class, new Flipper(10)); + source.set("objitem2", Flipper.class, new Flipper(20)); + System.out.println("[有值] MGET : " + source.getMap(Flipper.class, "objitem1", "objitem2")); + + source.remove("key1"); + source.remove("key2"); + source.remove("300"); + source.set(1000, "key1", String.class, "value1"); + source.set("key1", String.class, "value1"); + source.setString("keystr1", "strvalue1"); + source.setLong("keylong1", 333L); + source.set("300", String.class, "4000"); + source.getAndRefresh("key1", 3500, String.class); + System.out.println("[有值] 300 GET : " + source.get("300", String.class)); + System.out.println("[有值] key1 GET : " + source.get("key1", String.class)); + System.out.println("[无值] key2 GET : " + source.get("key2", String.class)); + System.out.println("[有值] keystr1 GET : " + source.getString("keystr1")); + System.out.println("[有值] keylong1 GET : " + source.getLong("keylong1", 0L)); + System.out.println("[有值] key1 EXISTS : " + source.exists("key1")); + System.out.println("[无值] key2 EXISTS : " + source.exists("key2")); + + source.remove("keys3"); + source.appendListItem("keys3", String.class, "vals1"); + source.appendListItem("keys3", String.class, "vals2"); + System.out.println("-------- keys3 追加了两个值 --------"); + System.out.println("[两值] keys3 VALUES : " + source.getCollection("keys3", String.class)); + System.out.println("[有值] keys3 EXISTS : " + source.exists("keys3")); + source.removeListItem("keys3", String.class, "vals1"); + System.out.println("[一值] keys3 VALUES : " + source.getCollection("keys3", String.class)); + source.getCollectionAndRefresh("keys3", 3000, String.class); + + source.remove("stringmap"); + source.appendSetItem("stringmap", JsonConvert.TYPE_MAP_STRING_STRING, Utility.ofMap("a", "aa", "b", "bb")); + source.appendSetItem("stringmap", JsonConvert.TYPE_MAP_STRING_STRING, Utility.ofMap("c", "cc", "d", "dd")); + System.out.println("[两值] stringmap VALUES : " + source.getCollectionAsync("stringmap", JsonConvert.TYPE_MAP_STRING_STRING).join()); + + source.remove("sets3"); + source.remove("sets4"); + source.appendSetItem("sets3", String.class, "setvals1"); + source.appendSetItem("sets3", String.class, "setvals2"); + source.appendSetItem("sets3", String.class, "setvals1"); + source.appendSetItem("sets4", String.class, "setvals2"); + source.appendSetItem("sets4", String.class, "setvals1"); + System.out.println("[两值] sets3 VALUES : " + source.getCollection("sets3", String.class)); + System.out.println("[有值] sets3 EXISTS : " + source.exists("sets3")); + System.out.println("[有值] sets3-setvals2 EXISTSITEM : " + source.existsSetItem("sets3", String.class, "setvals2")); + System.out.println("[有值] sets3-setvals3 EXISTSITEM : " + source.existsSetItem("sets3", String.class, "setvals3")); + source.removeSetItem("sets3", String.class, "setvals1"); + System.out.println("[一值] sets3 VALUES : " + source.getCollection("sets3", String.class)); + System.out.println("sets3 大小 : " + source.getCollectionSize("sets3")); + System.out.println("all keys: " + source.queryKeys()); + System.out.println("key startkeys: " + source.queryKeysStartsWith("key")); + System.out.println("newnum 值 : " + source.incr("newnum")); + System.out.println("newnum 值 : " + source.decr("newnum")); + System.out.println("sets3&sets4: " + source.getStringCollectionMap(true, "sets3", "sets4")); + System.out.println("------------------------------------"); + source.set("myaddr", InetSocketAddress.class, addr); + System.out.println("myaddrstr: " + source.getString("myaddr")); + System.out.println("myaddr: " + source.get("myaddr", InetSocketAddress.class)); + source.remove("myaddrs"); + source.remove("myaddrs2"); + source.appendSetItem("myaddrs", InetSocketAddress.class, new InetSocketAddress("127.0.0.1", 7788)); + source.appendSetItem("myaddrs", InetSocketAddress.class, new InetSocketAddress("127.0.0.1", 7799)); + System.out.println("myaddrs: " + source.getCollection("myaddrs", InetSocketAddress.class)); + source.removeSetItem("myaddrs", InetSocketAddress.class, new InetSocketAddress("127.0.0.1", 7788)); + System.out.println("myaddrs: " + source.getCollection("myaddrs", InetSocketAddress.class)); + source.appendSetItem("myaddrs2", InetSocketAddress.class, new InetSocketAddress("127.0.0.1", 7788)); + source.appendSetItem("myaddrs2", InetSocketAddress.class, new InetSocketAddress("127.0.0.1", 7799)); + System.out.println("myaddrs&myaddrs2: " + source.getCollectionMap(true, InetSocketAddress.class, "myaddrs", "myaddrs2")); + System.out.println("------------------------------------"); + source.remove("myaddrs"); + Type mapType = new TypeToken>() { + }.getType(); + Map map = new HashMap<>(); + map.put("a", 1); + map.put("b", 2); + source.set("mapvals", mapType, map); + System.out.println("mapvals: " + source.get("mapvals", mapType)); + + source.remove("byteskey"); + source.setBytes("byteskey", new byte[]{1, 2, 3}); + System.out.println("byteskey 值 : " + Arrays.toString(source.getBytes("byteskey"))); + //h + source.remove("hmap"); + source.hincr("hmap", "key1"); + System.out.println("hmap.key1 值 : " + source.hgetLong("hmap", "key1", -1)); + source.hmset("hmap", "key2", "haha", "key3", 333); + source.hmset("hmap", "sm", (HashMap) Utility.ofMap("a", "aa", "b", "bb")); + System.out.println("hmap.sm 值 : " + source.hget("hmap", "sm", JsonConvert.TYPE_MAP_STRING_STRING)); + System.out.println("hmap.[key1,key2,key3] 值 : " + source.hmget("hmap", String.class, "key1", "key2", "key3")); + System.out.println("hmap.keys 四值 : " + source.hkeys("hmap")); + source.hremove("hmap", "key1", "key3"); + System.out.println("hmap.keys 两值 : " + source.hkeys("hmap")); + System.out.println("hmap.key2 值 : " + source.hgetString("hmap", "key2")); + System.out.println("hmap列表(2)大小 : " + source.hsize("hmap")); + + source.remove("hmaplong"); + source.hincr("hmaplong", "key1", 10); + source.hsetLong("hmaplong", "key2", 30); + System.out.println("hmaplong.所有两值 : " + source.hmap("hmaplong", long.class, 0, 10)); + + source.remove("hmapstr"); + source.hsetString("hmapstr", "key1", "str10"); + source.hsetString("hmapstr", "key2", null); + System.out.println("hmapstr.所有一值 : " + source.hmap("hmapstr", String.class, 0, 10)); + + source.remove("hmapstrmap"); + source.hset("hmapstrmap", "key1", JsonConvert.TYPE_MAP_STRING_STRING, (HashMap) Utility.ofMap("ks11", "vv11")); + source.hset("hmapstrmap", "key2", JsonConvert.TYPE_MAP_STRING_STRING, null); + System.out.println("hmapstrmap.无值 : " + source.hmap("hmapstrmap", JsonConvert.TYPE_MAP_STRING_STRING, 0, 10, "key2*")); + + source.remove("popset"); + source.appendStringSetItem("popset", "111"); + source.appendStringSetItem("popset", "222"); + source.appendStringSetItem("popset", "333"); + source.appendStringSetItem("popset", "444"); + source.appendStringSetItem("popset", "555"); + System.out.println("SPOP一个元素:" + source.spopStringSetItem("popset")); + System.out.println("SPOP两个元素:" + source.spopStringSetItem("popset", 2)); + System.out.println("SPOP五个元素:" + source.spopStringSetItem("popset", 5)); + source.appendLongSetItem("popset", 111); + source.appendLongSetItem("popset", 222); + source.appendLongSetItem("popset", 333); + source.appendLongSetItem("popset", 444); + source.appendLongSetItem("popset", 555); + System.out.println("SPOP一个元素:" + source.spopLongSetItem("popset")); + System.out.println("SPOP两个元素:" + source.spopLongSetItem("popset", 2)); + System.out.println("SPOP五个元素:" + source.spopLongSetItem("popset", 5)); + System.out.println("SPOP一个元素:" + source.spopLongSetItem("popset")); + + //清除 + int rs = source.remove("stritem1"); + System.out.println("删除stritem1个数: " + rs); + source.remove("popset"); + source.remove("stritem2"); + source.remove("intitem1"); + source.remove("intitem2"); + source.remove("keylong1"); + source.remove("keystr1"); + source.remove("mapvals"); + source.remove("myaddr"); + source.remove("myaddrs2"); + source.remove("newnum"); + source.remove("objitem1"); + source.remove("objitem2"); + source.remove("key1"); + source.remove("key2"); + source.remove("keys3"); + source.remove("sets3"); + source.remove("sets4"); + source.remove("myaddrs"); + source.remove("300"); + source.remove("stringmap"); + source.remove("hmap"); + source.remove("hmaplong"); + source.remove("hmapstr"); + source.remove("hmapstrmap"); + source.remove("byteskey"); + System.out.println("------------------------------------"); +// System.out.println("--------------测试大文本---------------"); +// HashMap bigmap = new HashMap<>(); +// StringBuilder sb = new StringBuilder(); +// sb.append("起始"); +// for (int i = 0; i < 1024 * 1024; i++) { +// sb.append("abcde"); +// } +// sb.append("结束"); +// bigmap.put("val", sb.toString()); +// System.out.println("文本长度: " + sb.length()); +// source.set("bigmap", JsonConvert.TYPE_MAP_STRING_STRING, bigmap); +// System.out.println("写入完成"); +// for (int i = 0; i < 1; i++) { +// HashMap fs = (HashMap) source.get("bigmap", JsonConvert.TYPE_MAP_STRING_STRING); +// System.out.println("内容长度: " + fs.get("val").length()); +// } + source.remove("bigmap"); + + } finally { + source.close(); + } + } + + @Override + public void close() throws Exception { //在 Application 关闭时调用 + destroy(null); + } + + @Override + public String resourceName() { + Resource res = this.getClass().getAnnotation(Resource.class); + return res == null ? "" : res.name(); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "{addrs = " + this.nodeAddrs + ", db=" + this.db + "}"; + } + + @Override + public void destroy(AnyValue conf) { + if (transport != null) transport.close(); + } + + //--------------------- exists ------------------------------ + @Override + public CompletableFuture existsAsync(String key) { + return (CompletableFuture) send("EXISTS", null, (Type) null, key, key.getBytes(StandardCharsets.UTF_8)); + } + + @Override + public boolean exists(String key) { + return existsAsync(key).join(); + } + + //--------------------- get ------------------------------ + @Override + @Deprecated + public CompletableFuture getAsync(String key) { + return (CompletableFuture) send("GET", CacheEntryType.OBJECT, (Type) null, key, key.getBytes(StandardCharsets.UTF_8)); + } + + @Override + public CompletableFuture getAsync(String key, Type type) { + return (CompletableFuture) send("GET", CacheEntryType.OBJECT, type, key, key.getBytes(StandardCharsets.UTF_8)); + } + + @Override + public CompletableFuture getStringAsync(String key) { + return (CompletableFuture) send("GET", CacheEntryType.STRING, (Type) null, key, key.getBytes(StandardCharsets.UTF_8)); + } + + @Override + public CompletableFuture getLongAsync(String key, long defValue) { + return ((CompletableFuture) send("GET", CacheEntryType.LONG, (Type) null, key, key.getBytes(StandardCharsets.UTF_8))).thenApplyAsync(v -> v == null ? defValue : v); + } + + @Override + @Deprecated + public V get(String key) { + return getAsync(key).join(); + } + + @Override + public T get(String key, final Type type) { + return (T) getAsync(key, type).join(); + } + + @Override + public String getString(String key) { + return getStringAsync(key).join(); + } + + @Override + public long getLong(String key, long defValue) { + return getLongAsync(key, defValue).join(); + } + + //--------------------- getAndRefresh ------------------------------ + @Override + @Deprecated + public CompletableFuture getAndRefreshAsync(String key, int expireSeconds) { + return (CompletableFuture) refreshAsync(key, expireSeconds).thenCompose(v -> getAsync(key)); + } + + @Override + public CompletableFuture getAndRefreshAsync(String key, int expireSeconds, final Type type) { + return (CompletableFuture) refreshAsync(key, expireSeconds).thenCompose(v -> getAsync(key, type)); + } + + @Override + @Deprecated + public V getAndRefresh(String key, final int expireSeconds) { + return getAndRefreshAsync(key, expireSeconds).join(); + } + + @Override + public T getAndRefresh(String key, final int expireSeconds, final Type type) { + return (T) getAndRefreshAsync(key, expireSeconds, type).join(); + } + + @Override + public CompletableFuture getStringAndRefreshAsync(String key, int expireSeconds) { + return (CompletableFuture) refreshAsync(key, expireSeconds).thenCompose(v -> getStringAsync(key)); + } + + @Override + public String getStringAndRefresh(String key, final int expireSeconds) { + return getStringAndRefreshAsync(key, expireSeconds).join(); + } + + @Override + public CompletableFuture getLongAndRefreshAsync(String key, int expireSeconds, long defValue) { + return (CompletableFuture) refreshAsync(key, expireSeconds).thenCompose(v -> getLongAsync(key, defValue)); + } + + @Override + public long getLongAndRefresh(String key, final int expireSeconds, long defValue) { + return getLongAndRefreshAsync(key, expireSeconds, defValue).join(); + } + + //--------------------- refresh ------------------------------ + @Override + public CompletableFuture refreshAsync(String key, int expireSeconds) { + return setExpireSecondsAsync(key, expireSeconds); + } + + @Override + public void refresh(String key, final int expireSeconds) { + setExpireSeconds(key, expireSeconds); + } + + //--------------------- set ------------------------------ + @Override + @Deprecated + public CompletableFuture setAsync(String key, V value) { + CacheEntryType cet = this.objValueType == String.class ? CacheEntryType.STRING : CacheEntryType.OBJECT; + return (CompletableFuture) send("SET", cet, (Type) null, key, key.getBytes(StandardCharsets.UTF_8), formatValue(cet, (Convert) null, (Type) null, value)); + } + + @Override + public CompletableFuture setAsync(String key, Convert convert, T value) { + CacheEntryType cet = value instanceof CharSequence ? CacheEntryType.STRING : CacheEntryType.OBJECT; + return (CompletableFuture) send("SET", cet, (Type) null, key, key.getBytes(StandardCharsets.UTF_8), formatValue(cet, convert, (Type) null, value)); + } + + @Override + public CompletableFuture setAsync(String key, final Type type, T value) { + CacheEntryType cet = type == String.class ? CacheEntryType.STRING : CacheEntryType.OBJECT; + return (CompletableFuture) send("SET", cet, type, key, key.getBytes(StandardCharsets.UTF_8), formatValue(cet, (Convert) null, type, value)); + } + + @Override + public CompletableFuture setAsync(String key, Convert convert, final Type type, T value) { + CacheEntryType cet = type == String.class ? CacheEntryType.STRING : CacheEntryType.OBJECT; + return (CompletableFuture) send("SET", cet, type, key, key.getBytes(StandardCharsets.UTF_8), formatValue(cet, convert, type, value)); + } + + @Override + @Deprecated + public void set(final String key, V value) { + setAsync(key, value).join(); + } + + @Override + public void set(final String key, final Convert convert, T value) { + setAsync(key, convert, value).join(); + } + + @Override + public void set(final String key, final Type type, T value) { + setAsync(key, type, value).join(); + } + + @Override + public void set(String key, final Convert convert, final Type type, T value) { + setAsync(key, convert, type, value).join(); + } + + @Override + public CompletableFuture setStringAsync(String key, String value) { + return (CompletableFuture) send("SET", CacheEntryType.STRING, (Type) null, key, key.getBytes(StandardCharsets.UTF_8), formatValue(CacheEntryType.STRING, (Convert) null, (Type) null, value)); + } + + @Override + public void setString(String key, String value) { + setStringAsync(key, value).join(); + } + + @Override + public CompletableFuture setLongAsync(String key, long value) { + return (CompletableFuture) send("SET", CacheEntryType.LONG, (Type) null, key, key.getBytes(StandardCharsets.UTF_8), formatValue(CacheEntryType.LONG, (Convert) null, (Type) null, value)); + } + + @Override + public void setLong(String key, long value) { + setLongAsync(key, value).join(); + } + + //--------------------- set ------------------------------ + @Override + @Deprecated + public CompletableFuture setAsync(int expireSeconds, String key, V value) { + return (CompletableFuture) setAsync(key, value).thenCompose(v -> setExpireSecondsAsync(key, expireSeconds)); + } + + @Override + public CompletableFuture setAsync(int expireSeconds, String key, Convert convert, T value) { + return (CompletableFuture) setAsync(key, convert, value).thenCompose(v -> setExpireSecondsAsync(key, expireSeconds)); + } + + @Override + public CompletableFuture setAsync(int expireSeconds, String key, final Type type, T value) { + return (CompletableFuture) setAsync(key, type, value).thenCompose(v -> setExpireSecondsAsync(key, expireSeconds)); + } + + @Override + public CompletableFuture setAsync(int expireSeconds, String key, Convert convert, final Type type, T value) { + return (CompletableFuture) setAsync(key, convert, type, value).thenCompose(v -> setExpireSecondsAsync(key, expireSeconds)); + } + + @Override + @Deprecated + public void set(int expireSeconds, String key, V value) { + setAsync(expireSeconds, key, value).join(); + } + + @Override + public void set(int expireSeconds, String key, Convert convert, T value) { + setAsync(expireSeconds, key, convert, value).join(); + } + + @Override + public void set(int expireSeconds, String key, final Type type, T value) { + setAsync(expireSeconds, key, type, value).join(); + } + + @Override + public void set(int expireSeconds, String key, Convert convert, final Type type, T value) { + setAsync(expireSeconds, key, convert, type, value).join(); + } + + @Override + public CompletableFuture setStringAsync(int expireSeconds, String key, String value) { + return (CompletableFuture) setStringAsync(key, value).thenCompose(v -> setExpireSecondsAsync(key, expireSeconds)); + } + + @Override + public void setString(int expireSeconds, String key, String value) { + setStringAsync(expireSeconds, key, value).join(); + } + + @Override + public CompletableFuture setLongAsync(int expireSeconds, String key, long value) { + return (CompletableFuture) setLongAsync(key, value).thenCompose(v -> setExpireSecondsAsync(key, expireSeconds)); + } + + @Override + public void setLong(int expireSeconds, String key, long value) { + setLongAsync(expireSeconds, key, value).join(); + } + + //--------------------- setExpireSeconds ------------------------------ + @Override + public CompletableFuture setExpireSecondsAsync(String key, int expireSeconds) { + return (CompletableFuture) send("EXPIRE", null, (Type) null, key, key.getBytes(StandardCharsets.UTF_8), String.valueOf(expireSeconds).getBytes(StandardCharsets.UTF_8)); + } + + @Override + public void setExpireSeconds(String key, int expireSeconds) { + setExpireSecondsAsync(key, expireSeconds).join(); + } + + //--------------------- remove ------------------------------ + @Override + public CompletableFuture removeAsync(String key) { + return (CompletableFuture) send("DEL", null, (Type) null, key, key.getBytes(StandardCharsets.UTF_8)); + } + + @Override + public int remove(String key) { + return removeAsync(key).join(); + } + + //--------------------- incr ------------------------------ + @Override + public long incr(final String key) { + return incrAsync(key).join(); + } + + @Override + public CompletableFuture incrAsync(final String key) { + return (CompletableFuture) send("INCR", CacheEntryType.ATOMIC, (Type) null, key, key.getBytes(StandardCharsets.UTF_8)); + } + + @Override + public long incr(final String key, long num) { + return incrAsync(key, num).join(); + } + + @Override + public CompletableFuture incrAsync(final String key, long num) { + return (CompletableFuture) send("INCRBY", CacheEntryType.ATOMIC, (Type) null, key, key.getBytes(StandardCharsets.UTF_8), String.valueOf(num).getBytes(StandardCharsets.UTF_8)); + } + + //--------------------- decr ------------------------------ + @Override + public long decr(final String key) { + return decrAsync(key).join(); + } + + @Override + public CompletableFuture decrAsync(final String key) { + return (CompletableFuture) send("DECR", CacheEntryType.ATOMIC, (Type) null, key, key.getBytes(StandardCharsets.UTF_8)); + } + + @Override + public long decr(final String key, long num) { + return decrAsync(key, num).join(); + } + + @Override + public CompletableFuture decrAsync(final String key, long num) { + return (CompletableFuture) send("DECRBY", CacheEntryType.ATOMIC, (Type) null, key, key.getBytes(StandardCharsets.UTF_8), String.valueOf(num).getBytes(StandardCharsets.UTF_8)); + } + + @Override + public int hremove(final String key, String... fields) { + return hremoveAsync(key, fields).join(); + } + + @Override + public int hsize(final String key) { + return hsizeAsync(key).join(); + } + + @Override + public List hkeys(final String key) { + return hkeysAsync(key).join(); + } + + @Override + public long hincr(final String key, String field) { + return hincrAsync(key, field).join(); + } + + @Override + public long hincr(final String key, String field, long num) { + return hincrAsync(key, field, num).join(); + } + + @Override + public long hdecr(final String key, String field) { + return hdecrAsync(key, field).join(); + } + + @Override + public long hdecr(final String key, String field, long num) { + return hdecrAsync(key, field, num).join(); + } + + @Override + public boolean hexists(final String key, String field) { + return hexistsAsync(key, field).join(); + } + + @Override + public void hset(final String key, final String field, final Convert convert, final T value) { + hsetAsync(key, field, convert, value).join(); + } + + @Override + public void hset(final String key, final String field, final Type type, final T value) { + hsetAsync(key, field, type, value).join(); + } + + @Override + public void hset(final String key, final String field, final Convert convert, final Type type, final T value) { + hsetAsync(key, field, convert, type, value).join(); + } + + @Override + public void hsetString(final String key, final String field, final String value) { + hsetStringAsync(key, field, value).join(); + } + + @Override + public void hsetLong(final String key, final String field, final long value) { + hsetLongAsync(key, field, value).join(); + } + + @Override + public void hmset(final String key, final Serializable... values) { + hmsetAsync(key, values).join(); + } + + @Override + public List hmget(final String key, final Type type, final String... fields) { + return hmgetAsync(key, type, fields).join(); + } + + @Override + public Map hmap(final String key, final Type type, int offset, int limit, String pattern) { + return (Map) hmapAsync(key, type, offset, limit, pattern).join(); + } + + @Override + public Map hmap(final String key, final Type type, int offset, int limit) { + return (Map) hmapAsync(key, type, offset, limit).join(); + } + + @Override + public T hget(final String key, final String field, final Type type) { + return (T) hgetAsync(key, field, type).join(); + } + + @Override + public String hgetString(final String key, final String field) { + return hgetStringAsync(key, field).join(); + } + + @Override + public long hgetLong(final String key, final String field, long defValue) { + return hgetLongAsync(key, field, defValue).join(); + } + + @Override + public CompletableFuture hremoveAsync(final String key, String... fields) { + byte[][] bs = new byte[fields.length + 1][]; + bs[0] = key.getBytes(StandardCharsets.UTF_8); + for (int i = 0; i < fields.length; i++) { + bs[i + 1] = fields[i].getBytes(StandardCharsets.UTF_8); + } + return (CompletableFuture) send("HDEL", CacheEntryType.MAP, (Type) null, key, bs); + } + + @Override + public CompletableFuture hsizeAsync(final String key) { + return (CompletableFuture) send("HLEN", CacheEntryType.LONG, (Type) null, key, key.getBytes(StandardCharsets.UTF_8)); + } + + @Override + public CompletableFuture> hkeysAsync(final String key) { + return (CompletableFuture) send("HKEYS", CacheEntryType.MAP, (Type) null, key, key.getBytes(StandardCharsets.UTF_8)); + } + + @Override + public CompletableFuture hincrAsync(final String key, String field) { + return hincrAsync(key, field, 1); + } + + @Override + public CompletableFuture hincrAsync(final String key, String field, long num) { + return (CompletableFuture) send("HINCRBY", CacheEntryType.MAP, (Type) null, key, key.getBytes(StandardCharsets.UTF_8), field.getBytes(StandardCharsets.UTF_8), String.valueOf(num).getBytes(StandardCharsets.UTF_8)); + } + + @Override + public CompletableFuture hdecrAsync(final String key, String field) { + return hincrAsync(key, field, -1); + } + + @Override + public CompletableFuture hdecrAsync(final String key, String field, long num) { + return hincrAsync(key, field, -num); + } + + @Override + public CompletableFuture hexistsAsync(final String key, String field) { + return (CompletableFuture) send("HEXISTS", CacheEntryType.MAP, (Type) null, key, key.getBytes(StandardCharsets.UTF_8), field.getBytes(StandardCharsets.UTF_8)); + } + + @Override + public CompletableFuture hsetAsync(final String key, final String field, final Convert convert, final T value) { + return (CompletableFuture) send("HSET", CacheEntryType.MAP, (Type) null, key, key.getBytes(StandardCharsets.UTF_8), field.getBytes(StandardCharsets.UTF_8), formatValue(CacheEntryType.MAP, convert, null, value)); + } + + @Override + public CompletableFuture hsetAsync(final String key, final String field, final Type type, final T value) { + return (CompletableFuture) send("HSET", CacheEntryType.MAP, type, key, key.getBytes(StandardCharsets.UTF_8), field.getBytes(StandardCharsets.UTF_8), formatValue(CacheEntryType.MAP, null, type, value)); + } + + @Override + public CompletableFuture hsetAsync(final String key, final String field, final Convert convert, final Type type, final T value) { + return (CompletableFuture) send("HSET", CacheEntryType.MAP, type, key, key.getBytes(StandardCharsets.UTF_8), field.getBytes(StandardCharsets.UTF_8), formatValue(CacheEntryType.MAP, convert, type, value)); + } + + @Override + public CompletableFuture hsetStringAsync(final String key, final String field, final String value) { + return (CompletableFuture) send("HSET", CacheEntryType.MAP, (Type) null, key, key.getBytes(StandardCharsets.UTF_8), field.getBytes(StandardCharsets.UTF_8), formatValue(CacheEntryType.STRING, null, null, value)); + } + + @Override + public CompletableFuture hsetLongAsync(final String key, final String field, final long value) { + return (CompletableFuture) send("HSET", CacheEntryType.MAP, (Type) null, key, key.getBytes(StandardCharsets.UTF_8), field.getBytes(StandardCharsets.UTF_8), formatValue(CacheEntryType.LONG, null, null, value)); + } + + @Override + public CompletableFuture hmsetAsync(final String key, final Serializable... values) { + byte[][] bs = new byte[values.length + 1][]; + bs[0] = key.getBytes(StandardCharsets.UTF_8); + for (int i = 0; i < values.length; i += 2) { + bs[i + 1] = String.valueOf(values[i]).getBytes(StandardCharsets.UTF_8); + bs[i + 2] = formatValue(CacheEntryType.MAP, null, null, values[i + 1]); + } + return (CompletableFuture) send("HMSET", CacheEntryType.MAP, (Type) null, key, bs); + } + + @Override + public CompletableFuture> hmgetAsync(final String key, final Type type, final String... fields) { + byte[][] bs = new byte[fields.length + 1][]; + bs[0] = key.getBytes(StandardCharsets.UTF_8); + for (int i = 0; i < fields.length; i++) { + bs[i + 1] = fields[i].getBytes(StandardCharsets.UTF_8); + } + return (CompletableFuture) send("HMGET", CacheEntryType.MAP, type, key, bs); + } + + @Override + public CompletableFuture> hmapAsync(final String key, final Type type, int offset, int limit) { + return hmapAsync(key, type, offset, limit, null); + } + + @Override + public CompletableFuture> hmapAsync(final String key, final Type type, int offset, int limit, String pattern) { + byte[][] bs = new byte[pattern == null || pattern.isEmpty() ? 4 : 6][limit]; + int index = -1; + bs[++index] = key.getBytes(StandardCharsets.UTF_8); + bs[++index] = String.valueOf(offset).getBytes(StandardCharsets.UTF_8); + if (pattern != null && !pattern.isEmpty()) { + bs[++index] = "MATCH".getBytes(StandardCharsets.UTF_8); + bs[++index] = pattern.getBytes(StandardCharsets.UTF_8); + } + bs[++index] = "COUNT".getBytes(StandardCharsets.UTF_8); + bs[++index] = String.valueOf(limit).getBytes(StandardCharsets.UTF_8); + return (CompletableFuture) send("HSCAN", CacheEntryType.MAP, type, key, bs); + } + + @Override + public CompletableFuture hgetAsync(final String key, final String field, final Type type) { + return (CompletableFuture) send("HGET", CacheEntryType.OBJECT, type, key, key.getBytes(StandardCharsets.UTF_8), field.getBytes(StandardCharsets.UTF_8)); + } + + @Override + public CompletableFuture hgetStringAsync(final String key, final String field) { + return (CompletableFuture) send("HGET", CacheEntryType.STRING, (Type) null, key, key.getBytes(StandardCharsets.UTF_8), field.getBytes(StandardCharsets.UTF_8)); + } + + @Override + public CompletableFuture hgetLongAsync(final String key, final String field, long defValue) { + return (CompletableFuture) send("HGET", CacheEntryType.LONG, (Type) null, key, key.getBytes(StandardCharsets.UTF_8), field.getBytes(StandardCharsets.UTF_8)).thenApplyAsync(v -> v == null ? defValue : v); + } + + //--------------------- collection ------------------------------ + @Override + public CompletableFuture getCollectionSizeAsync(String key) { + return (CompletableFuture) send("TYPE", null, (Type) null, key, key.getBytes(StandardCharsets.UTF_8)).thenCompose(t -> { + if (t == null) return CompletableFuture.completedFuture(0); + String rs = new String((byte[]) t); + if (rs.contains("zset")) { // ziplist + return send("ZCARD", null, int.class, key, key.getBytes(StandardCharsets.UTF_8)); + } else if (rs.contains("list")) { //list + return send("LLEN", null, int.class, key, key.getBytes(StandardCharsets.UTF_8)); + } else if (rs.contains("hash")) { + return send("HLEN", null, int.class, key, key.getBytes(StandardCharsets.UTF_8)); + } else { + return send("SCARD", null, int.class, key, key.getBytes(StandardCharsets.UTF_8)); + } + }); + } + + @Override + public int getCollectionSize(String key) { + return getCollectionSizeAsync(key).join(); + } + + @Override + @Deprecated + public CompletableFuture> getCollectionAsync(String key) { + return (CompletableFuture) send("TYPE", null, (Type) null, key, key.getBytes(StandardCharsets.UTF_8)).thenCompose(t -> { + if (t == null) return CompletableFuture.completedFuture(null); + String str = new String((byte[]) t); + if (str.contains("list")) { //list + return send("LRANGE", CacheEntryType.OBJECT, (Type) null, false, key, key.getBytes(StandardCharsets.UTF_8), new byte[]{'0'}, new byte[]{'-', '1'}); + } else if (str.contains("none")) { + return CompletableFuture.completedFuture(new ArrayList<>()); + } else { + return send("SMEMBERS", CacheEntryType.OBJECT, (Type) null, true, key, key.getBytes(StandardCharsets.UTF_8)); + } + }); + } + + @Override + public CompletableFuture> getCollectionAsync(String key, final Type componentType) { + return (CompletableFuture) send("TYPE", null, componentType, key, key.getBytes(StandardCharsets.UTF_8)).thenCompose(t -> { + if (t == null) return CompletableFuture.completedFuture(null); + if (new String((byte[]) t).contains("list")) { //list + return send("LRANGE", CacheEntryType.OBJECT, componentType, false, key, key.getBytes(StandardCharsets.UTF_8), new byte[]{'0'}, new byte[]{'-', '1'}); + } else { + return send("SMEMBERS", CacheEntryType.OBJECT, componentType, true, key, key.getBytes(StandardCharsets.UTF_8)); + } + }); + } + + @Override + public CompletableFuture> getLongMapAsync(String... keys) { + byte[][] bs = new byte[keys.length][]; + for (int i = 0; i < bs.length; i++) { + bs[i] = keys[i].getBytes(StandardCharsets.UTF_8); + } + return (CompletableFuture) send("MGET", CacheEntryType.LONG, null, false, keys[0], bs).thenApply(r -> { + List list = (List) r; + Map map = new LinkedHashMap<>(); + for (int i = 0; i < keys.length; i++) { + Object obj = list.get(i); + if (obj != null) map.put(keys[i], list.get(i)); + } + return map; + }); + } + + @Override + public CompletableFuture getLongArrayAsync(String... keys) { + byte[][] bs = new byte[keys.length][]; + for (int i = 0; i < bs.length; i++) { + bs[i] = keys[i].getBytes(StandardCharsets.UTF_8); + } + return (CompletableFuture) send("MGET", CacheEntryType.LONG, null, false, keys[0], bs).thenApply(r -> { + List list = (List) r; + Long[] rs = new Long[keys.length]; + for (int i = 0; i < keys.length; i++) { + Number obj = (Number) list.get(i); + rs[i] = obj == null ? null : obj.longValue(); + } + return rs; + }); + } + + @Override + public CompletableFuture getStringArrayAsync(String... keys) { + byte[][] bs = new byte[keys.length][]; + for (int i = 0; i < bs.length; i++) { + bs[i] = keys[i].getBytes(StandardCharsets.UTF_8); + } + return (CompletableFuture) send("MGET", CacheEntryType.STRING, null, false, keys[0], bs).thenApply(r -> { + List list = (List) r; + String[] rs = new String[keys.length]; + for (int i = 0; i < keys.length; i++) { + Object obj = list.get(i); + rs[i] = obj == null ? null : obj.toString(); + } + return rs; + }); + } + + @Override + public CompletableFuture> getStringMapAsync(String... keys) { + byte[][] bs = new byte[keys.length][]; + for (int i = 0; i < bs.length; i++) { + bs[i] = keys[i].getBytes(StandardCharsets.UTF_8); + } + return (CompletableFuture) send("MGET", CacheEntryType.STRING, null, false, keys[0], bs).thenApply(r -> { + List list = (List) r; + Map map = new LinkedHashMap<>(); + for (int i = 0; i < keys.length; i++) { + Object obj = list.get(i); + if (obj != null) map.put(keys[i], list.get(i)); + } + return map; + }); + } + + @Override + public CompletableFuture> getMapAsync(final Type componentType, String... keys) { + byte[][] bs = new byte[keys.length][]; + for (int i = 0; i < bs.length; i++) { + bs[i] = keys[i].getBytes(StandardCharsets.UTF_8); + } + return (CompletableFuture) send("MGET", CacheEntryType.OBJECT, componentType, false, keys[0], bs).thenApply(r -> { + List list = (List) r; + Map map = new LinkedHashMap<>(); + for (int i = 0; i < keys.length; i++) { + Object obj = list.get(i); + if (obj != null) map.put(keys[i], list.get(i)); + } + return map; + }); + } + + @Override + public CompletableFuture>> getCollectionMapAsync(final boolean set, final Type componentType, final String... keys) { + final CompletableFuture>> rsFuture = new CompletableFuture<>(); + final Map> map = new HashMap<>(); + final CompletableFuture[] futures = new CompletableFuture[keys.length]; + if (!set) { //list + for (int i = 0; i < keys.length; i++) { + final String key = keys[i]; + futures[i] = send("LRANGE", CacheEntryType.OBJECT, componentType, false, key, key.getBytes(StandardCharsets.UTF_8), new byte[]{'0'}, new byte[]{'-', '1'}).thenAccept(c -> { + if (c != null) { + synchronized (map) { + map.put(key, (Collection) c); + } + } + }); + } + } else { + for (int i = 0; i < keys.length; i++) { + final String key = keys[i]; + futures[i] = send("SMEMBERS", CacheEntryType.OBJECT, componentType, true, key, key.getBytes(StandardCharsets.UTF_8)).thenAccept(c -> { + if (c != null) { + synchronized (map) { + map.put(key, (Collection) c); + } + } + }); + } + } + CompletableFuture.allOf(futures).whenComplete((w, e) -> { + if (e != null) { + rsFuture.completeExceptionally(e); + } else { + rsFuture.complete(map); + } + }); + return rsFuture; + } + + @Override + @Deprecated + public Collection getCollection(String key) { + return getCollectionAsync(key).join(); + } + + @Override + public Collection getCollection(String key, final Type componentType) { + return (Collection) getCollectionAsync(key, componentType).join(); + } + + @Override + public Map getLongMap(final String... keys) { + return getLongMapAsync(keys).join(); + } + + @Override + public Long[] getLongArray(final String... keys) { + return getLongArrayAsync(keys).join(); + } + + @Override + public Map getStringMap(final String... keys) { + return getStringMapAsync(keys).join(); + } + + @Override + public String[] getStringArray(final String... keys) { + return getStringArrayAsync(keys).join(); + } + + @Override + public Map getMap(final Type componentType, final String... keys) { + return (Map) getMapAsync(componentType, keys).join(); + } + + @Override + public Map> getCollectionMap(final boolean set, final Type componentType, String... keys) { + return (Map) getCollectionMapAsync(set, componentType, keys).join(); + } + + @Override + public CompletableFuture> getStringCollectionAsync(String key) { + return (CompletableFuture) send("TYPE", null, (Type) null, key, key.getBytes(StandardCharsets.UTF_8)).thenCompose(t -> { + if (t == null) return CompletableFuture.completedFuture(null); + if (new String((byte[]) t).contains("list")) { //list + return send("LRANGE", CacheEntryType.STRING, (Type) null, false, key, key.getBytes(StandardCharsets.UTF_8), new byte[]{'0'}, new byte[]{'-', '1'}); + } else { + return send("SMEMBERS", CacheEntryType.STRING, (Type) null, true, key, key.getBytes(StandardCharsets.UTF_8)); + } + }); + } + + @Override + public CompletableFuture>> getStringCollectionMapAsync(final boolean set, String... keys) { + final CompletableFuture>> rsFuture = new CompletableFuture<>(); + final Map> map = new HashMap<>(); + final CompletableFuture[] futures = new CompletableFuture[keys.length]; + if (!set) { //list + for (int i = 0; i < keys.length; i++) { + final String key = keys[i]; + futures[i] = send("LRANGE", CacheEntryType.STRING, (Type) null, false, key, key.getBytes(StandardCharsets.UTF_8), new byte[]{'0'}, new byte[]{'-', '1'}).thenAccept(c -> { + if (c != null) { + synchronized (map) { + map.put(key, (Collection) c); + } + } + }); + } + } else { + for (int i = 0; i < keys.length; i++) { + final String key = keys[i]; + futures[i] = send("SMEMBERS", CacheEntryType.STRING, (Type) null, true, key, key.getBytes(StandardCharsets.UTF_8)).thenAccept(c -> { + if (c != null) { + synchronized (map) { + map.put(key, (Collection) c); + } + } + }); + } + } + CompletableFuture.allOf(futures).whenComplete((w, e) -> { + if (e != null) { + rsFuture.completeExceptionally(e); + } else { + rsFuture.complete(map); + } + }); + return rsFuture; + } + + @Override + public Collection getStringCollection(String key) { + return getStringCollectionAsync(key).join(); + } + + @Override + public Map> getStringCollectionMap(final boolean set, String... keys) { + return getStringCollectionMapAsync(set, keys).join(); + } + + @Override + public CompletableFuture> getLongCollectionAsync(String key) { + return (CompletableFuture) send("TYPE", null, (Type) null, key, key.getBytes(StandardCharsets.UTF_8)).thenCompose(t -> { + if (t == null) return CompletableFuture.completedFuture(null); + if (new String((byte[]) t).contains("list")) { //list + return send("LRANGE", CacheEntryType.LONG, (Type) null, false, key, key.getBytes(StandardCharsets.UTF_8), new byte[]{'0'}, new byte[]{'-', '1'}); + } else { + return send("SMEMBERS", CacheEntryType.LONG, (Type) null, true, key, key.getBytes(StandardCharsets.UTF_8)); + } + }); + } + + @Override + public CompletableFuture>> getLongCollectionMapAsync(final boolean set, String... keys) { + final CompletableFuture>> rsFuture = new CompletableFuture<>(); + final Map> map = new HashMap<>(); + final CompletableFuture[] futures = new CompletableFuture[keys.length]; + if (!set) { //list + for (int i = 0; i < keys.length; i++) { + final String key = keys[i]; + futures[i] = send("LRANGE", CacheEntryType.LONG, (Type) null, false, key, key.getBytes(StandardCharsets.UTF_8), new byte[]{'0'}, new byte[]{'-', '1'}).thenAccept(c -> { + if (c != null) { + synchronized (map) { + map.put(key, (Collection) c); + } + } + }); + } + } else { + for (int i = 0; i < keys.length; i++) { + final String key = keys[i]; + futures[i] = send("SMEMBERS", CacheEntryType.LONG, (Type) null, true, key, key.getBytes(StandardCharsets.UTF_8)).thenAccept(c -> { + if (c != null) { + synchronized (map) { + map.put(key, (Collection) c); + } + } + }); + } + } + CompletableFuture.allOf(futures).whenComplete((w, e) -> { + if (e != null) { + rsFuture.completeExceptionally(e); + } else { + rsFuture.complete(map); + } + }); + return rsFuture; + } + + @Override + public Collection getLongCollection(String key) { + return getLongCollectionAsync(key).join(); + } + + @Override + public Map> getLongCollectionMap(final boolean set, String... keys) { + return getLongCollectionMapAsync(set, keys).join(); + } + + //--------------------- getCollectionAndRefresh ------------------------------ + @Override + @Deprecated + public CompletableFuture> getCollectionAndRefreshAsync(String key, int expireSeconds) { + return (CompletableFuture) refreshAsync(key, expireSeconds).thenCompose(v -> getCollectionAsync(key)); + } + + @Override + public CompletableFuture> getCollectionAndRefreshAsync(String key, int expireSeconds, final Type componentType) { + return (CompletableFuture) refreshAsync(key, expireSeconds).thenCompose(v -> getCollectionAsync(key, componentType)); + } + + @Override + @Deprecated + public Collection getCollectionAndRefresh(String key, final int expireSeconds) { + return getCollectionAndRefreshAsync(key, expireSeconds).join(); + } + + @Override + public Collection getCollectionAndRefresh(String key, final int expireSeconds, final Type componentType) { + return (Collection) getCollectionAndRefreshAsync(key, expireSeconds, componentType).join(); + } + + @Override + public CompletableFuture> getStringCollectionAndRefreshAsync(String key, int expireSeconds) { + return (CompletableFuture) refreshAsync(key, expireSeconds).thenCompose(v -> getStringCollectionAsync(key)); + } + + @Override + public Collection getStringCollectionAndRefresh(String key, final int expireSeconds) { + return getStringCollectionAndRefreshAsync(key, expireSeconds).join(); + } + + @Override + public CompletableFuture> getLongCollectionAndRefreshAsync(String key, int expireSeconds) { + return (CompletableFuture) refreshAsync(key, expireSeconds).thenCompose(v -> getLongCollectionAsync(key)); + } + + @Override + public Collection getLongCollectionAndRefresh(String key, final int expireSeconds) { + return getLongCollectionAndRefreshAsync(key, expireSeconds).join(); + } + + //--------------------- existsItem ------------------------------ + @Override + @Deprecated + public boolean existsSetItem(String key, V value) { + return existsSetItemAsync(key, value).join(); + } + + @Override + public boolean existsSetItem(String key, final Type componentType, T value) { + return existsSetItemAsync(key, componentType, value).join(); + } + + @Override + @Deprecated + public CompletableFuture existsSetItemAsync(String key, V value) { + return (CompletableFuture) send("SISMEMBER", null, (Type) null, key, key.getBytes(StandardCharsets.UTF_8), formatValue(CacheEntryType.OBJECT, (Convert) null, (Type) null, value)); + } + + @Override + public CompletableFuture existsSetItemAsync(String key, final Type componentType, T value) { + return (CompletableFuture) send("SISMEMBER", null, componentType, key, key.getBytes(StandardCharsets.UTF_8), formatValue(CacheEntryType.OBJECT, (Convert) null, componentType, value)); + } + + @Override + public boolean existsStringSetItem(String key, String value) { + return existsStringSetItemAsync(key, value).join(); + } + + @Override + public CompletableFuture existsStringSetItemAsync(String key, String value) { + return (CompletableFuture) send("SISMEMBER", null, (Type) null, key, key.getBytes(StandardCharsets.UTF_8), formatValue(CacheEntryType.STRING, (Convert) null, (Type) null, value)); + } + + @Override + public boolean existsLongSetItem(String key, long value) { + return existsLongSetItemAsync(key, value).join(); + } + + @Override + public CompletableFuture existsLongSetItemAsync(String key, long value) { + return (CompletableFuture) send("SISMEMBER", null, (Type) null, key, key.getBytes(StandardCharsets.UTF_8), formatValue(CacheEntryType.LONG, (Convert) null, (Type) null, value)); + } + + //--------------------- appendListItem ------------------------------ + @Override + @Deprecated + public CompletableFuture appendListItemAsync(String key, V value) { + return (CompletableFuture) send("RPUSH", null, (Type) null, key, key.getBytes(StandardCharsets.UTF_8), formatValue(CacheEntryType.OBJECT, (Convert) null, (Type) null, value)); + } + + @Override + public CompletableFuture appendListItemAsync(String key, final Type componentType, T value) { + return (CompletableFuture) send("RPUSH", null, componentType, key, key.getBytes(StandardCharsets.UTF_8), formatValue(CacheEntryType.OBJECT, (Convert) null, componentType, value)); + } + + @Override + @Deprecated + public void appendListItem(String key, V value) { + appendListItemAsync(key, value).join(); + } + + @Override + public void appendListItem(String key, final Type componentType, T value) { + appendListItemAsync(key, componentType, value).join(); + } + + @Override + public CompletableFuture appendStringListItemAsync(String key, String value) { + return (CompletableFuture) send("RPUSH", null, (Type) null, key, key.getBytes(StandardCharsets.UTF_8), formatValue(CacheEntryType.STRING, (Convert) null, (Type) null, value)); + } + + @Override + public void appendStringListItem(String key, String value) { + appendStringListItemAsync(key, value).join(); + } + + @Override + public CompletableFuture appendLongListItemAsync(String key, long value) { + return (CompletableFuture) send("RPUSH", null, (Type) null, key, key.getBytes(StandardCharsets.UTF_8), formatValue(CacheEntryType.LONG, (Convert) null, (Type) null, value)); + } + + @Override + public void appendLongListItem(String key, long value) { + appendLongListItemAsync(key, value).join(); + } + + //--------------------- removeListItem ------------------------------ + @Override + @Deprecated + public CompletableFuture removeListItemAsync(String key, V value) { + return (CompletableFuture) send("LREM", null, (Type) null, key, key.getBytes(StandardCharsets.UTF_8), new byte[]{'0'}, formatValue(CacheEntryType.OBJECT, (Convert) null, (Type) null, value)); + } + + @Override + public CompletableFuture removeListItemAsync(String key, final Type componentType, T value) { + return (CompletableFuture) send("LREM", null, componentType, key, key.getBytes(StandardCharsets.UTF_8), new byte[]{'0'}, formatValue(CacheEntryType.OBJECT, (Convert) null, componentType, value)); + } + + @Override + @Deprecated + public int removeListItem(String key, V value) { + return removeListItemAsync(key, value).join(); + } + + @Override + public int removeListItem(String key, final Type componentType, T value) { + return removeListItemAsync(key, componentType, value).join(); + } + + @Override + public CompletableFuture removeStringListItemAsync(String key, String value) { + return (CompletableFuture) send("LREM", null, (Type) null, key, key.getBytes(StandardCharsets.UTF_8), new byte[]{'0'}, formatValue(CacheEntryType.STRING, (Convert) null, (Type) null, value)); + } + + @Override + public int removeStringListItem(String key, String value) { + return removeStringListItemAsync(key, value).join(); + } + + @Override + public CompletableFuture removeLongListItemAsync(String key, long value) { + return (CompletableFuture) send("LREM", null, (Type) null, key, key.getBytes(StandardCharsets.UTF_8), new byte[]{'0'}, formatValue(CacheEntryType.LONG, (Convert) null, (Type) null, value)); + } + + @Override + public int removeLongListItem(String key, long value) { + return removeLongListItemAsync(key, value).join(); + } + + //--------------------- appendSetItem ------------------------------ + @Override + @Deprecated + public CompletableFuture appendSetItemAsync(String key, V value) { + return (CompletableFuture) send("SADD", null, (Type) null, key, key.getBytes(StandardCharsets.UTF_8), formatValue(CacheEntryType.OBJECT, (Convert) null, (Type) null, value)); + } + + @Override + public CompletableFuture appendSetItemAsync(String key, Type componentType, T value) { + return (CompletableFuture) send("SADD", null, componentType, key, key.getBytes(StandardCharsets.UTF_8), formatValue(CacheEntryType.OBJECT, (Convert) null, componentType, value)); + } + + @Override + public CompletableFuture spopSetItemAsync(String key, Type componentType) { + return (CompletableFuture) send("SPOP", CacheEntryType.OBJECT, componentType, key, key.getBytes(StandardCharsets.UTF_8)); + } + + @Override + public CompletableFuture> spopSetItemAsync(String key, int count, Type componentType) { + return (CompletableFuture) send("SPOP", CacheEntryType.OBJECT, componentType, key, key.getBytes(StandardCharsets.UTF_8), String.valueOf(count).getBytes(StandardCharsets.UTF_8)); + } + + @Override + public CompletableFuture spopStringSetItemAsync(String key) { + return (CompletableFuture) send("SPOP", CacheEntryType.STRING, String.class, key, key.getBytes(StandardCharsets.UTF_8)); + } + + @Override + public CompletableFuture> spopStringSetItemAsync(String key, int count) { + return (CompletableFuture) send("SPOP", CacheEntryType.STRING, String.class, key, key.getBytes(StandardCharsets.UTF_8), String.valueOf(count).getBytes(StandardCharsets.UTF_8)); + } + + @Override + public CompletableFuture spopLongSetItemAsync(String key) { + return (CompletableFuture) send("SPOP", CacheEntryType.LONG, long.class, key, key.getBytes(StandardCharsets.UTF_8)); + } + + @Override + public CompletableFuture> spopLongSetItemAsync(String key, int count) { + return (CompletableFuture) send("SPOP", CacheEntryType.LONG, long.class, key, key.getBytes(StandardCharsets.UTF_8), String.valueOf(count).getBytes(StandardCharsets.UTF_8)); + } + + @Override + @Deprecated + public void appendSetItem(String key, V value) { + appendSetItemAsync(key, value).join(); + } + + @Override + public void appendSetItem(String key, final Type componentType, T value) { + appendSetItemAsync(key, componentType, value).join(); + } + + @Override + public T spopSetItem(String key, final Type componentType) { + return (T) spopSetItemAsync(key, componentType).join(); + } + + @Override + public List spopSetItem(String key, int count, final Type componentType) { + return (List) spopSetItemAsync(key, count, componentType).join(); + } + + @Override + public String spopStringSetItem(String key) { + return spopStringSetItemAsync(key).join(); + } + + @Override + public List spopStringSetItem(String key, int count) { + return spopStringSetItemAsync(key, count).join(); + } + + @Override + public Long spopLongSetItem(String key) { + return spopLongSetItemAsync(key).join(); + } + + @Override + public List spopLongSetItem(String key, int count) { + return spopLongSetItemAsync(key, count).join(); + } + + @Override + public CompletableFuture appendStringSetItemAsync(String key, String value) { + return (CompletableFuture) send("SADD", null, (Type) null, key, key.getBytes(StandardCharsets.UTF_8), formatValue(CacheEntryType.STRING, (Convert) null, (Type) null, value)); + } + + @Override + public void appendStringSetItem(String key, String value) { + appendStringSetItemAsync(key, value).join(); + } + + @Override + public CompletableFuture appendLongSetItemAsync(String key, long value) { + return (CompletableFuture) send("SADD", null, (Type) null, key, key.getBytes(StandardCharsets.UTF_8), formatValue(CacheEntryType.LONG, (Convert) null, (Type) null, value)); + } + + @Override + public void appendLongSetItem(String key, long value) { + appendLongSetItemAsync(key, value).join(); + } + + //--------------------- removeSetItem ------------------------------ + @Override + @Deprecated + public CompletableFuture removeSetItemAsync(String key, V value) { + return (CompletableFuture) send("SREM", null, (Type) null, key, key.getBytes(StandardCharsets.UTF_8), formatValue(CacheEntryType.OBJECT, (Convert) null, (Type) null, value)); + } + + @Override + public CompletableFuture removeSetItemAsync(String key, final Type componentType, T value) { + return (CompletableFuture) send("SREM", null, componentType, key, key.getBytes(StandardCharsets.UTF_8), formatValue(CacheEntryType.OBJECT, (Convert) null, componentType, value)); + } + + @Override + @Deprecated + public int removeSetItem(String key, V value) { + return removeSetItemAsync(key, value).join(); + } + + @Override + public int removeSetItem(String key, final Type componentType, T value) { + return removeSetItemAsync(key, componentType, value).join(); + } + + @Override + public CompletableFuture removeStringSetItemAsync(String key, String value) { + return (CompletableFuture) send("SREM", null, (Type) null, key, key.getBytes(StandardCharsets.UTF_8), formatValue(CacheEntryType.STRING, (Convert) null, (Type) null, value)); + } + + @Override + public int removeStringSetItem(String key, String value) { + return removeStringSetItemAsync(key, value).join(); + } + + @Override + public CompletableFuture removeLongSetItemAsync(String key, long value) { + return (CompletableFuture) send("SREM", null, (Type) null, key, key.getBytes(StandardCharsets.UTF_8), formatValue(CacheEntryType.LONG, (Convert) null, (Type) null, value)); + } + + @Override + public int removeLongSetItem(String key, long value) { + return removeLongSetItemAsync(key, value).join(); + } + + //--------------------- queryKeys ------------------------------ + @Override + public List queryKeys() { + return queryKeysAsync().join(); + } + + @Override + public List queryKeysStartsWith(String startsWith) { + return queryKeysStartsWithAsync(startsWith).join(); + } + + @Override + public List queryKeysEndsWith(String endsWith) { + return queryKeysEndsWithAsync(endsWith).join(); + } + + @Override + public byte[] getBytes(final String key) { + return getBytesAsync(key).join(); + } + + @Override + public byte[] getBytesAndRefresh(final String key, final int expireSeconds) { + return getBytesAndRefreshAsync(key, expireSeconds).join(); + } + + @Override + public void setBytes(final String key, final byte[] value) { + setBytesAsync(key, value).join(); + } + + @Override + public void setBytes(final int expireSeconds, final String key, final byte[] value) { + setBytesAsync(expireSeconds, key, value).join(); + } + + @Override + public void setBytes(final String key, final Convert convert, final Type type, final T value) { + setBytesAsync(key, convert, type, value).join(); + } + + @Override + public void setBytes(final int expireSeconds, final String key, final Convert convert, final Type type, final T value) { + setBytesAsync(expireSeconds, key, convert, type, value).join(); + } + + @Override + public CompletableFuture getBytesAsync(final String key) { + return (CompletableFuture) send("GET", CacheEntryType.BYTES, (Type) null, key, key.getBytes(StandardCharsets.UTF_8)); + } + + @Override + public CompletableFuture getBytesAndRefreshAsync(final String key, final int expireSeconds) { + return (CompletableFuture) refreshAsync(key, expireSeconds).thenCompose(v -> getBytesAsync(key)); + } + + @Override + public CompletableFuture setBytesAsync(final String key, final byte[] value) { + return (CompletableFuture) send("SET", CacheEntryType.BYTES, (Type) null, key, key.getBytes(StandardCharsets.UTF_8), value); + + } + + @Override + public CompletableFuture setBytesAsync(final int expireSeconds, final String key, final byte[] value) { + return (CompletableFuture) setBytesAsync(key, value).thenCompose(v -> setExpireSecondsAsync(key, expireSeconds)); + } + + @Override + public CompletableFuture setBytesAsync(final String key, final Convert convert, final Type type, final T value) { + return (CompletableFuture) send("SET", CacheEntryType.BYTES, (Type) null, key, key.getBytes(StandardCharsets.UTF_8), convert.convertToBytes(type, value)); + } + + @Override + public CompletableFuture setBytesAsync(final int expireSeconds, final String key, final Convert convert, final Type type, final T value) { + return (CompletableFuture) setBytesAsync(key, convert.convertToBytes(type, value)).thenCompose(v -> setExpireSecondsAsync(key, expireSeconds)); + } + + @Override + public CompletableFuture> queryKeysAsync() { + return (CompletableFuture) send("KEYS", null, (Type) null, "*", new byte[]{(byte) '*'}); + } + + @Override + public CompletableFuture> queryKeysStartsWithAsync(String startsWith) { + if (startsWith == null) return queryKeysAsync(); + String key = startsWith + "*"; + return (CompletableFuture) send("KEYS", null, (Type) null, key, key.getBytes(StandardCharsets.UTF_8)); + } + + @Override + public CompletableFuture> queryKeysEndsWithAsync(String endsWith) { + if (endsWith == null) return queryKeysAsync(); + String key = "*" + endsWith; + return (CompletableFuture) send("KEYS", null, (Type) null, key, key.getBytes(StandardCharsets.UTF_8)); + } + + //--------------------- getKeySize ------------------------------ + @Override + public int getKeySize() { + return getKeySizeAsync().join(); + } + + @Override + public CompletableFuture getKeySizeAsync() { + return (CompletableFuture) send("DBSIZE", null, (Type) null, null); + } + + //--------------------- queryList ------------------------------ + @Override + public List> queryList() { + return queryListAsync().join(); + } + + @Override + public CompletableFuture>> queryListAsync() { + return CompletableFuture.completedFuture(new ArrayList<>()); //不返回数据 + } + + //--------------------- send ------------------------------ + private byte[] formatValue(CacheEntryType cacheType, Convert convert0, Type resultType, Object value) { + if (value == null) return "null".getBytes(StandardCharsets.UTF_8); + if (value instanceof byte[]) return (byte[]) value; + if (convert0 == null) convert0 = convert; + if (cacheType == CacheEntryType.MAP) { + if ((value instanceof CharSequence) || (value instanceof Number)) { + return String.valueOf(value).getBytes(StandardCharsets.UTF_8); + } + if (objValueType == String.class && !(value instanceof CharSequence)) resultType = value.getClass(); + return convert0.convertToBytes(resultType == null ? objValueType : resultType, value); + } + if (value instanceof Number || cacheType == CacheEntryType.LONG || cacheType == CacheEntryType.ATOMIC || value.getClass() == String.class) + return String.valueOf(value).getBytes(StandardCharsets.UTF_8); + if (cacheType == CacheEntryType.STRING) return convert0.convertToBytes(String.class, value); + return convert0.convertToBytes(resultType == null ? objValueType : resultType, value); + } + + protected CompletableFuture send(final String command, final CacheEntryType cacheType, final Type resultType, final String key, final byte[]... args) { + return send(command, cacheType, resultType, false, key, args); + } + + protected CompletableFuture send(final String command, final CacheEntryType cacheType, final Type resultType, final boolean set, final String key, final byte[]... args) { + return send(null, command, cacheType, resultType, set, key, args); + } + + private CompletableFuture send(final CompletionHandler callback, final String command, final CacheEntryType cacheType, final Type resultType, final boolean set, final String key, final byte[]... args) { + final ByteArray writer = new ByteArray(); + writer.put(ASTERISK_BYTE); + writer.put(String.valueOf(args.length + 1).getBytes(StandardCharsets.UTF_8)); + writer.put((byte) '\r', (byte) '\n'); + writer.put(DOLLAR_BYTE); + writer.put(String.valueOf(command.length()).getBytes(StandardCharsets.UTF_8)); + writer.put((byte) '\r', (byte) '\n'); + writer.put(command.getBytes(StandardCharsets.UTF_8)); + writer.put((byte) '\r', (byte) '\n'); + + for (final byte[] arg : args) { + writer.put(DOLLAR_BYTE); + writer.put(String.valueOf(arg.length).getBytes(StandardCharsets.UTF_8)); + writer.put((byte) '\r', (byte) '\n'); + writer.put(arg); + writer.put((byte) '\r', (byte) '\n'); + } + + final CompletableFuture future = callback == null ? new CompletableFuture<>() : null; + CompletableFuture connFuture = this.transport.pollConnection(null); + if (passwords != null) { + connFuture = connFuture.thenCompose(conn -> { + if (conn.getSubobject() != null) return CompletableFuture.completedFuture(conn); + byte[] password = passwords.get(conn.getRemoteAddress()); + if (password == null) return CompletableFuture.completedFuture(conn); + CompletableFuture rs = auth(conn, password, command); + if (db > 0) { + rs = rs.thenCompose(conn2 -> { + if (conn2.getSubobject() != null) return CompletableFuture.completedFuture(conn2); + return selectdb(conn2, db, command); + }); + } + return rs; + }); + } else if (db > 0) { + connFuture = connFuture.thenCompose(conn2 -> { + if (conn2.getSubobject() != null) return CompletableFuture.completedFuture(conn2); + return selectdb(conn2, db, command); + }); + } + connFuture.whenComplete((conn, ex) -> { + if (ex != null) { + if (future == null) { + callback.failed(ex, null); + } else { + future.completeExceptionally(ex); + } + return; + } + conn.write(writer, new CompletionHandler() { + @Override + public void completed(Integer result, Void attachment0) { + try { + //----------------------- 读取返回结果 ------------------------------------- + conn.read(new ReplyCompletionHandler(conn) { + @Override + public void completed(Integer result, ByteBuffer buffer) { + buffer.flip(); + try { + final byte sign = buffer.get(); + if (sign == PLUS_BYTE) { // + + byte[] bs = readBytes(buffer); + if (future == null) { + transport.offerConnection(false, conn); //必须在complete之前,防止并发是conn还没回收完毕 + callback.completed(null, key); + } else { + transport.offerConnection(false, conn); + future.complete(("SET".equals(command) || "HSET".equals(command)) ? null : bs); + } + } else if (sign == MINUS_BYTE) { // - + String bs = readString(buffer); + if (future == null) { + transport.offerConnection(false, conn); + callback.failed(new RuntimeException(bs), key); + } else { + transport.offerConnection(false, conn); + future.completeExceptionally(new RuntimeException("command : " + command + ", error: " + bs)); + } + } else if (sign == COLON_BYTE) { // : + long rs = readLong(buffer); + if (future == null) { + if (command.startsWith("INCR") || command.startsWith("DECR") || command.startsWith("HINCR")) { + transport.offerConnection(false, conn); + callback.completed(rs, key); + } else { + transport.offerConnection(false, conn); + callback.completed((command.endsWith("EXISTS") || "SISMEMBER".equals(command)) ? (rs > 0) : (("LLEN".equals(command) || "SCARD".equals(command) || "SREM".equals(command) || "LREM".equals(command) || "DEL".equals(command) || "HDEL".equals(command) || "HLEN".equals(command) || "DBSIZE".equals(command)) ? (int) rs : null), key); + } + } else { + if (command.startsWith("INCR") || command.startsWith("DECR") || command.startsWith("HINCR") || command.startsWith("HGET")) { + transport.offerConnection(false, conn); + future.complete(rs); + } else if (command.equals("ZRANK") || command.equals("ZREVRANK")) { + transport.offerConnection(false, conn); + future.complete(rs); + } else if (command.equals("GETBIT")) { + transport.offerConnection(false, conn); + future.complete(rs); + } else if (command.equals("TTL") || command.equals("PTTL")) { + transport.offerConnection(false, conn); + future.complete(rs); + } else if (command.equals("SETNX")) { + transport.offerConnection(false, conn); + future.complete(rs); + } else { + transport.offerConnection(false, conn); + future.complete((command.endsWith("EXISTS") || "SISMEMBER".equals(command)) ? (rs > 0) : (("LLEN".equals(command) || "SCARD".equals(command) || "SREM".equals(command) || "LREM".equals(command) || "DEL".equals(command) || "HDEL".equals(command) || "HLEN".equals(command) || "DBSIZE".equals(command) || "ZCARD".equals(command) || "EVAL".equals(command)) ? (int) rs : null)); + } + } + } else if (sign == DOLLAR_BYTE) { // $ + long val = readLong(buffer); + byte[] rs = val <= 0 ? null : readBytes(buffer); + Type ct = cacheType == CacheEntryType.LONG ? long.class : (cacheType == CacheEntryType.STRING ? String.class : (resultType == null ? objValueType : resultType)); + if (future == null) { + transport.offerConnection(false, conn); + callback.completed((("SPOP".equals(command) || command.endsWith("GET") || rs == null) ? (ct == String.class && rs != null ? new String(rs, StandardCharsets.UTF_8) : convert.convertFrom(ct, new String(rs, StandardCharsets.UTF_8))) : null), key); + } else { + transport.offerConnection(false, conn); + if ("ZSCORE".equals(command)) { + future.complete(ct == String.class && rs != null ? new String(rs, StandardCharsets.UTF_8) : convert.convertFrom(ct, rs == null ? null : new String(rs, StandardCharsets.UTF_8))); + } else if (command.equals("HINCRBYFLOAT")) { + future.complete(new String(rs, StandardCharsets.UTF_8)); + } else { + future.complete("SPOP".equals(command) || command.endsWith("GET") || command.endsWith("ZINCRBY") ? (ct == String.class && rs != null ? new String(rs, StandardCharsets.UTF_8) : convert.convertFrom(ct, rs == null ? null : new String(rs, StandardCharsets.UTF_8))) : rs); + } + } + } else if (sign == ASTERISK_BYTE) { // * + final int len = readInt(buffer); + if (len < 0) { + if (future == null) { + transport.offerConnection(false, conn); + callback.completed(null, key); + } else { + transport.offerConnection(false, conn); + future.complete((byte[]) null); + } + } else { + Object rsobj; + if (command.endsWith("SCAN")) { + LinkedHashMap rs = new LinkedHashMap(); + rsobj = rs; + for (int i = 0; i < len; i++) { + int l = readInt(buffer); + if (l > 0) { + readBytes(buffer); + } else { + while (buffer.hasRemaining()) { + readBytes(buffer); + String field = new String(readBytes(buffer), StandardCharsets.UTF_8); + String value = null; + if (readInt(buffer) > 0) { + value = new String(readBytes(buffer), StandardCharsets.UTF_8); + } + Type ct = cacheType == CacheEntryType.LONG ? long.class : (cacheType == CacheEntryType.STRING ? String.class : (resultType == null ? objValueType : resultType)); + try { + rs.put(field, value == null ? null : (ct == String.class ? value : convert.convertFrom(ct, value))); + } catch (RuntimeException e) { + buffer.flip(); + byte[] bsss = new byte[buffer.remaining()]; + buffer.get(bsss); + logger.log(Level.SEVERE, "异常: " + new String(bsss)); + throw e; + } + } + } + } + } else { + Collection rs = set ? new HashSet() : new ArrayList(); + rsobj = rs; + boolean keys = "KEYS".equals(command) || "HKEYS".equals(command); + boolean mget = !keys && ("MGET".equals(command) || "HMGET".equals(command)); + boolean zsetscore = !keys && ("ZREVRANGE".equals(command) || "ZRANGE".equals(command)); + boolean hgetall = !keys && "HGETALL".equals(command); + boolean lpop = !keys && "BRPOP".equals(command); + Type ct = cacheType == CacheEntryType.LONG ? long.class : (cacheType == CacheEntryType.STRING ? String.class : (resultType == null ? objValueType : resultType)); + for (int i = 0; i < len; i++) { + int l = readInt(buffer); + if (l > 0) { + // 1、zsetscore 查询 偶数元素统一返回 string 方法内自定义转换 + // 2、hgetall 基数元素返回 string + if ((zsetscore && rs.size() % 2 == 1) || (hgetall && rs.size() % 2 == 0)) { + rs.add(readString(buffer)); + } else if (lpop && i == 1) { + rsobj = convert.convertFrom(ct, new String(readBytes(buffer), StandardCharsets.UTF_8)); + } else { + rs.add(keys ? new String(readBytes(buffer), StandardCharsets.UTF_8) : (ct == String.class ? new String(readBytes(buffer), StandardCharsets.UTF_8) : convert.convertFrom(ct, new String(readBytes(buffer), StandardCharsets.UTF_8)))); + } + } else if (mget) { + rs.add(null); + } + } + } + if (future == null) { + transport.offerConnection(false, conn); + callback.completed(rsobj, key); + } else { + transport.offerConnection(false, conn); + future.complete((Serializable) rsobj); + } + } + } else { + String exstr = "Unknown reply: " + (char) sign; + if (future == null) { + transport.offerConnection(false, conn); + callback.failed(new RuntimeException(exstr), key); + } else { + transport.offerConnection(false, conn); + future.completeExceptionally(new RuntimeException(exstr)); + } + } + } catch (Exception e) { + failed(e, buffer); + } + } + + @Override + public void failed(Throwable exc, ByteBuffer attachment) { + conn.offerBuffer(attachment); + transport.offerConnection(true, conn); + if (future == null) { + callback.failed(exc, attachment0); + } else { + future.completeExceptionally(exc); + } + } + + }); + } catch (Exception e) { + failed(e, attachment0); + } + } + + @Override + public void failed(Throwable exc, Void attachment0) { + transport.offerConnection(true, conn); + if (future == null) { + callback.failed(exc, attachment0); + } else { + future.completeExceptionally(exc); + } + } + }); + }); + return future; //.orTimeout(3, TimeUnit.SECONDS) JDK9以上才支持 + } + + private CompletableFuture selectdb(final AsyncConnection conn, final int db, final String command) { + final CompletableFuture rsfuture = new CompletableFuture(); + try { + final ByteArray dbwriter = new ByteArray(); + dbwriter.put(ASTERISK_BYTE); + dbwriter.put((byte) '2'); + dbwriter.put((byte) '\r', (byte) '\n'); + dbwriter.put(DOLLAR_BYTE); + dbwriter.put((byte) '6'); + dbwriter.put((byte) '\r', (byte) '\n'); + dbwriter.put("SELECT".getBytes(StandardCharsets.UTF_8)); + dbwriter.put((byte) '\r', (byte) '\n'); + + dbwriter.put(DOLLAR_BYTE); + dbwriter.put(String.valueOf(String.valueOf(db).length()).getBytes(StandardCharsets.UTF_8)); + dbwriter.put((byte) '\r', (byte) '\n'); + dbwriter.put(String.valueOf(db).getBytes(StandardCharsets.UTF_8)); + dbwriter.put((byte) '\r', (byte) '\n'); + + conn.write(dbwriter, new CompletionHandler() { + @Override + public void completed(Integer result, Void attachments) { + try { + //----------------------- 读取返回结果 ------------------------------------- + conn.read(new ReplyCompletionHandler(conn) { + @Override + public void completed(Integer result, ByteBuffer buffer) { + buffer.flip(); + try { + final byte sign = buffer.get(); + if (sign == PLUS_BYTE) { // + + byte[] bs = readBytes(buffer); + if ("OK".equalsIgnoreCase(new String(bs))) { + conn.setSubobject("authed+db"); + rsfuture.complete(conn); + } else { + transport.offerConnection(false, conn); + rsfuture.completeExceptionally(new RuntimeException("command : " + command + ", error: " + bs)); + } + } else if (sign == MINUS_BYTE) { // - 异常 + String bs = readString(buffer); + transport.offerConnection(false, conn); + rsfuture.completeExceptionally(new RuntimeException("command : " + command + ", error: " + bs)); + } else { + String exstr = "Unknown reply: " + (char) sign; + transport.offerConnection(false, conn); + rsfuture.completeExceptionally(new RuntimeException(exstr)); + } + } catch (Exception e) { + failed(e, buffer); + } + } + + @Override + public void failed(Throwable exc, ByteBuffer buffer) { + conn.offerBuffer(buffer); + transport.offerConnection(true, conn); + rsfuture.completeExceptionally(exc); + } + + }); + } catch (Exception e) { + failed(e, attachments); + } + } + + @Override + public void failed(Throwable exc, Void attachments) { + transport.offerConnection(true, conn); + rsfuture.completeExceptionally(exc); + } + }); + } catch (Exception e) { + rsfuture.completeExceptionally(e); + } + return rsfuture; + } + + private CompletableFuture auth(final AsyncConnection conn, final byte[] password, final String command) { + final CompletableFuture rsfuture = new CompletableFuture(); + try { + final ByteArray authwriter = new ByteArray(); + authwriter.put(ASTERISK_BYTE); + authwriter.put((byte) '2'); + authwriter.put((byte) '\r', (byte) '\n'); + authwriter.put(DOLLAR_BYTE); + authwriter.put((byte) '4'); + authwriter.put((byte) '\r', (byte) '\n'); + authwriter.put("AUTH".getBytes(StandardCharsets.UTF_8)); + authwriter.put((byte) '\r', (byte) '\n'); + + authwriter.put(DOLLAR_BYTE); + authwriter.put(String.valueOf(password.length).getBytes(StandardCharsets.UTF_8)); + authwriter.put((byte) '\r', (byte) '\n'); + authwriter.put(password); + authwriter.put((byte) '\r', (byte) '\n'); + + conn.write(authwriter, new CompletionHandler() { + @Override + public void completed(Integer result, Void attachments) { + try { + //----------------------- 读取返回结果 ------------------------------------- + conn.read(new ReplyCompletionHandler(conn) { + @Override + public void completed(Integer result, ByteBuffer buffer) { + buffer.flip(); + try { + final byte sign = buffer.get(); + if (sign == PLUS_BYTE) { // + + byte[] bs = readBytes(buffer); + if ("OK".equalsIgnoreCase(new String(bs))) { + conn.setSubobject("authed"); + rsfuture.complete(conn); + } else { + transport.offerConnection(false, conn); + rsfuture.completeExceptionally(new RuntimeException("command : " + command + ", error: " + bs)); + } + } else if (sign == MINUS_BYTE) { // - 异常 + String bs = readString(buffer); + transport.offerConnection(false, conn); + rsfuture.completeExceptionally(new RuntimeException("command : " + command + ", error: " + bs)); + } else { + String exstr = "Unknown reply: " + (char) sign; + transport.offerConnection(false, conn); + rsfuture.completeExceptionally(new RuntimeException(exstr)); + } + } catch (Exception e) { + failed(e, buffer); + } + } + + @Override + public void failed(Throwable exc, ByteBuffer buffer) { + conn.offerBuffer(buffer); + transport.offerConnection(true, conn); + rsfuture.completeExceptionally(exc); + } + + }); + } catch (Exception e) { + failed(e, attachments); + } + } + + @Override + public void failed(Throwable exc, Void attachments) { + transport.offerConnection(true, conn); + rsfuture.completeExceptionally(exc); + } + }); + } catch (Exception e) { + rsfuture.completeExceptionally(e); + } + return rsfuture; + } +} + +abstract class ReplyCompletionHandler implements CompletionHandler { + + protected final ByteArray out = new ByteArray(); + + protected final AsyncConnection conn; + + public ReplyCompletionHandler(AsyncConnection conn) { + this.conn = conn; + } + + protected byte[] readBytes(ByteBuffer buffer) throws IOException { + readLine(buffer); + return out.getBytesAndClear(); + } + + protected String readString(ByteBuffer buffer) throws IOException { + readLine(buffer); + return out.toStringAndClear(null);//传null则表示使用StandardCharsets.UTF_8 + } + + protected int readInt(ByteBuffer buffer) throws IOException { + return (int) readLong(buffer); + } + + protected long readLong(ByteBuffer buffer) throws IOException { + readLine(buffer); + int start = 0; + if (out.get(0) == '$') start = 1; + boolean negative = out.get(start) == '-'; + long value = negative ? 0 : (out.get(start) - '0'); + for (int i = 1 + start; i < out.length(); i++) { + value = value * 10 + (out.get(i) - '0'); + } + out.clear(); + return negative ? -value : value; + } + + private void readLine(ByteBuffer buffer) throws IOException { + boolean has = buffer.hasRemaining(); + byte lasted = has ? buffer.get() : 0; + if (lasted == '\n' && !out.isEmpty() && out.getLastByte() == '\r') { + out.removeLastByte();//读掉 \r + buffer.get();//读掉 \n + return;//传null则表示使用StandardCharsets.UTF_8 + } + if (has) out.put(lasted); + while (buffer.hasRemaining()) { + byte b = buffer.get(); + if (b == '\n' && lasted == '\r') { + out.removeLastByte(); + return; + } + out.put(lasted = b); + } + //说明数据还没读取完 + buffer.clear(); + conn.readableByteChannel().read(buffer); + buffer.flip(); + readLine(buffer); + } + +} diff --git a/src/com/zdemo/cache_/RedisTest.java b/src/com/zdemo/cache_/RedisTest.java new file mode 100644 index 0000000..bf441a7 --- /dev/null +++ b/src/com/zdemo/cache_/RedisTest.java @@ -0,0 +1,262 @@ +package com.zdemo.cache_; + +import com.zdemo.cachex.MyRedisCacheSource; +import org.redkale.convert.json.JsonFactory; +import org.redkale.net.AsyncIOGroup; +import org.redkale.util.AnyValue; +import org.redkale.util.ResourceFactory; + +import java.util.Map; + +import static org.redkale.boot.Application.RESNAME_APP_GROUP; + +public class RedisTest { + + static MyRedisCacheSource source; + static MyRedisCacheSource sourceInt; + + static { + AnyValue.DefaultAnyValue conf = new AnyValue.DefaultAnyValue().addValue("maxconns", "10"); + conf.addValue("node", new AnyValue.DefaultAnyValue().addValue("addr", "47.111.150.118").addValue("port", "6064").addValue("password", "*Zhong9307!").addValue("db", 1)); + + final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16); + asyncGroup.start(); + ResourceFactory.root().register(RESNAME_APP_GROUP, asyncGroup); + + source = new MyRedisCacheSource(); + ResourceFactory.root().inject(source); + source.init(null); + source.defaultConvert = JsonFactory.root().getConvert(); + source.init(conf); + + // int + sourceInt = new MyRedisCacheSource(); + ResourceFactory.root().inject(sourceInt); + sourceInt.init(null); + sourceInt.defaultConvert = JsonFactory.root().getConvert(); + sourceInt.init(conf); + sourceInt.initValueType(Integer.class); + } + + public static void main(String[] args) { + + //System.out.println(source.remove("a", "b")); + + // bit + /*source.initValueType(Integer.class); + source.remove("a"); + boolean a = source.getBit("a", 1); + System.out.println(a); + + source.setBit("a", 1, true); + a = source.getBit("a", 1); + System.out.println("bit-a-1: " + a); + + source.setBit("a", 1, false); + a = source.getBit("a", 1); + System.out.println("bit-a-1: " + a);*/ + + /*source.remove("a"); + + // setnx + System.out.println(source.setnx("a", 1)); + source.remove("a"); + System.out.println(source.setnx("a", 1)); + + // set + source.remove("abx1"); + source.appendSetItems("abx1", "a", "b", "c"); + List list = source.srandomItems("abx1", 2); + String str = source.srandomItem("abx1"); //r + System.out.println(list);//[r1, r2] */ + + /*int[] arr = {0}; + ExecutorService executor = Executors.newFixedThreadPool(10); + CountDownLatch latch = new CountDownLatch(1000); + for (int i = 0; i < 1000; i++) { + executor.submit(() -> { + try { + source.lock("c", 1000); + arr[0]++; + // System.out.println("Thread: " + Thread.currentThread().getName()); + // Thread.sleep(10); + + } catch (Exception e) { + e.printStackTrace(); + } finally { + source.unlock("c"); + latch.countDown(); + } + }); + } + + try { + latch.await(); + System.out.println("n=" + arr[0]); + executor.shutdown(); + } catch (InterruptedException e) { + e.printStackTrace(); + }*/ + + + /*List list = (List) source.getCollection("gamerank-comment-stat"); + System.out.println(list);*/ + + /*for (int i = 0; i < 10; i++) { + String brpop = source.brpop("z", 2); + System.out.println(brpop); + }*/ + + // key 测试 + /*source.set("a", "123321"); + System.out.println(source.get("a")); // 123321 + System.out.println(source.getTtl("a")); // -1 + System.out.println(source.getPttl("a")); // -1 + System.out.println(source.getPttl("x")); // -2*/ + + // hashmap 测试 + /*source.remove("sk"); + source.setHm("sk", "a", "1"); + source.setHm("sk", "b", "2"); + System.out.println(source.getHm("sk", "a")); // 1 + source.remove("sk"); + + source.setHms("sk", Map.of("b", "5", "c", "3", "a", "1")); + source.hdel("sk", "a"); + + Map map = source.getHms("sk", "a", "x", "b", "c", "f"); // {b=5, c=3} + System.out.println(map); + System.out.println(source.getHmall("sk")); //{b=5, c=3} + System.out.println(source.incrHm("sk", "b", 1.1d)); // b = 6.1 + System.out.println(source.incrHm("sk", "c", 1)); // c = 4 + System.out.println(source.getHmall("sk")); //{b=6.1, c=4} + + System.out.println("--------------"); + System.out.println(source.hexists("sk", "b")); // true + System.out.println(source.getCollectionSize("sk")); // 2*/ + + + Map hms = source.getHms("supportusers", "5-kfeu0f", "xxxx", "3-0kbt7u8t", "95q- "); + hms.forEach((k, v) -> { + System.out.println(k + " : " + v); + }); + + + /*MyRedisCacheSource source2 = new MyRedisCacheSource(); + source2.defaultConvert = JsonFactory.root().getConvert(); + source2.initValueType(String.class); //value用String类型 + source2.init(conf);*/ + + /*Map gcMap = source.getHmall("hot-gamecomment"); + gcMap.forEach((k,v) -> { + System.out.println(k + " : " + v); + });*/ + + + //Map gameinfo = source.getHms("gameinfo", "22759", "22838", "10097", "22751", "22632", "22711", "22195", "15821", "10099", "16313", "11345", "10534", "22768", "22647", "22924", "18461", "15871", "17099", "22640", "22644", "10744", "10264", "18032", "22815", "13584", "10031", "22818", "22452", "22810", "10513", "10557", "15848", "11923", "15920", "22808", "20073", "22809", "15840", "12332", "15803", "10597", "22624", "17113", "19578", "22664", "22621", "20722", "16226", "10523", "12304", "10597","11923","10031"); + //Map gameinfo = source.getHms("gameinfo", "22759","22838","10097","22751","22632","22711","22195","15821","10099","16313","11345","10534","22768","22647","22924","18461","15871","17099","22363","22640","22644","10744","10264","18032","22815","13584","22818","22452","22810","10513","10557","15848","15920","22808","20073","22809","15840","12332","15803","10597","22624","17113","19578","22627","22664","22621","20722","16226","10523","12304"); + + /*gameinfo.forEach((k,v ) -> { + System.out.println(v); + });*/ + + + /*source.queryKeysStartsWith("articlebean:").forEach(x -> { + System.out.println(x); + //source.remove(x); + //System.out.println(source.getHmall(x)); + });*/ + + // list 测试 + /*sourceInt.remove("list"); + Collection list = sourceInt.getCollection("list"); + System.out.println(list); + for (int i = 1; i <= 10; i++) { + sourceInt.appendListItem("list", i); + } + System.out.println(sourceInt.getCollection("list")); // [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + + sourceInt.appendListItems("list", 11, 12, 13); + System.out.println(sourceInt.getCollection("list")); // [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13] + System.out.println(sourceInt.getCollection("list", 0, 5)); // [1, 2, 3, 4, 5] + System.out.println(sourceInt.getCollectionSize("list")); // 13 + + List ids = new ArrayList<>(100); + for (int i = 0; i < 2000; i++) { + ids.add(i); + } + sourceInt.remove("abx"); + sourceInt.appendListItems("abx", ids.toArray(Integer[]::new)); + System.out.println(sourceInt.getCollection("abx")); + */ + + /*System.out.println(sourceInt.getCollectionSize("recommend-user-quality")); + Collection uid = sourceInt.getCollection("recommend-user-quality"); + System.out.println(uid);*/ + + // zset 测试 + /*source.initValueType(String.class); //value用Integer类型 + source.remove("zx"); + source.zadd("zx", Map.of("a", 1, "b", 2)); + + source.zadd("zx", Map.of("a", 1, "c", 5L)); + source.zadd("zx", Map.of("x", 20, "j", 3.5)); + source.zadd("zx", Map.of("f", System.currentTimeMillis(), "c", 5L)); + source.zadd("zx", Map.of("a", 1, "c", 5L)); + + System.out.println(source.zincr("zx", "a", 1.34)); // 2.34 + System.out.println(source.getZsetDoubleScore("zx")); // {f=1592924555704, x=20, c=5, j=3, b=2.34, a=1} + source.zrem("zx", "b", "c", "e", "x"); + System.out.println(source.getZsetLongScore("zx")); // {f=1592924555704, j=3, a=2} + + System.out.println("--------------"); + System.out.println(source.getZsetLongScore("zx", "f")); + + System.out.println(source.getZrevrank("zx", "f")); // 0 + System.out.println(source.getZrank("zx", "f")); // 2 + System.out.println(source.getZrank("zx", "Y")); // -1 + System.out.println(source.getCollectionSize("zx")); // 3 + + System.out.println(source.getZset("zx")); + System.out.println(source.zexists("zx", "f", "x", "a"));*/ + + /*LocalDate date = LocalDate.of(2019, 12, 31); + for (int i = 0; i < 60; i++) { + LocalDate localDate = date.plusDays(-i); + String day = localDate.format(DateTimeFormatter.ISO_LOCAL_DATE); + System.out.println(String.format("mkdir %s; mv *%s*.zip %s", day, day, day)); + }*/ + + + /*MyRedisCacheSource source = new MyRedisCacheSource(); + source.defaultConvert = JsonFactory.root().getConvert(); + source.initValueType(UserDetail.class); + source.init(conf); + + + Map map = source.getHmall("user-detail"); + + Integer[] array = map.values().stream().map(x -> x.getUserid()).toArray(Integer[]::new); + + System.out.println(JsonConvert.root().convertTo(array)); + + Map hms = source.getHms("user-detail", 11746, 11988, 11504, 11987, 11745, 11503, 11748, 11506, 11747, 11989, 11505, 11508, 11507, 11509, 11980, 11740, 11982, 11981, 11984, 11742, 11500, 11983, 11741, 11502, 11744, 11986, 11985, 11501, 11743, 11999, 11757, 11515, 1, 11514, 11998, 11756, 2, 11517, 11516, 11758, 3, 11519, 4, 5, 11518, 6, 7, 11991, 8, 11990, 9, 11993, 11751, 11750, 11992, 11753, 11511, 11995, 11994, 11510, 11752, 11755, 11513, 11997, 11512, 11996, 11754, 11724, 11966, 11965, 11723, 11968, 11726, 11967, 11725, 11728, 11969, 11727, 11729, 11960, 11720, 11962, 11961, 11722, 11964, 11721); + + System.out.println(hms.size());*/ + + /*source.getCollection("article-comment-list", 19, 1).forEach(x -> System.out.println(x)); + + + while (true) { + System.out.println("---" + Utility.now() + "---"); + source.getHmall("ck").forEach((k, v) -> { + System.out.println(k + ":" + v); + }); + try { + Thread.sleep(60 * 1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }*/ + } +} diff --git a/src/com/zdemo/zhub/ZHubClient.java b/src/com/zdemo/zhub/ZHubClient.java index e7cb105..2700c8e 100644 --- a/src/com/zdemo/zhub/ZHubClient.java +++ b/src/com/zdemo/zhub/ZHubClient.java @@ -4,6 +4,7 @@ import com.zdemo.AbstractConsumer; import com.zdemo.Event; import com.zdemo.IConsumer; import com.zdemo.IProducer; +import net.tccn.timer.Timers; import org.redkale.service.Service; import org.redkale.util.*; @@ -524,14 +525,19 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer timeout = 1000 * 15; } // call timeout default: 15s - Delays.addDelay(timeout, () -> { - RpcResult rpcResult = rpc.buildResp(505, "请求超时"); - rpc.setRpcResult(rpcResult); + Timers.delay(() -> { synchronized (rpc) { + Rpc rpc1 = rpcMap.get(ruk); + if (rpc1 == null) { + return; + } + + RpcResult rpcResult = rpc.buildResp(505, "请求超时"); + rpc.setRpcResult(rpcResult); logger.warning("rpc timeout: " + convert.convertTo(rpc)); rpc.notify(); } - }); + }, timeout); rpc.wait(); rpcMap.remove(ruk); @@ -568,7 +574,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer }.getType(), value); String ruk = resp.getRuk(); - Rpc rpc = rpcMap.get(ruk); + Rpc rpc = rpcMap.remove(ruk); if (rpc == null) { return; } diff --git a/src/net/tccn/timer/TimerExecutor.java b/src/net/tccn/timer/TimerExecutor.java new file mode 100644 index 0000000..720ddf5 --- /dev/null +++ b/src/net/tccn/timer/TimerExecutor.java @@ -0,0 +1,66 @@ +package net.tccn.timer; + +import net.tccn.timer.queue.TimerQueue; +import net.tccn.timer.task.Task; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.logging.Logger; + +/** + * @author: liangxianyou + */ +public class TimerExecutor { + private Logger logger = Logger.getLogger(this.getClass().getSimpleName()); + private TimerQueue queue = new TimerQueue(); + private ExecutorService executor; + + public TimerExecutor(int n) { + executor = Executors.newFixedThreadPool(n); + start(); + } + + public void add(Task... task) { + for (Task t : task) { + t.setTimerExecutor(this); + queue.push(t); + logger.finest("add new task : " + t.getName()); + } + } + + protected void add(Task task, boolean upTime) { + task.setTimerExecutor(this); + if (upTime) task.nextTime(); + queue.push(task); + } + + public Task remove(String name) { + return queue.remove(name); + } + + public Task get(String name) { + return queue.get(name); + } + + + public void start() { + new Thread(() -> { + while (true) { + try { + Task take = null; + try { + take = queue.take(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + //执行调度 + executor.execute(take); + //add(take, true); //继续添加任务到 队列 + } catch (Exception e) { + e.printStackTrace(); + } + } + }, "Thread-Redtimer-0").start(); + } +} \ No newline at end of file diff --git a/src/net/tccn/timer/TimerTask.java b/src/net/tccn/timer/TimerTask.java new file mode 100644 index 0000000..e1993b4 --- /dev/null +++ b/src/net/tccn/timer/TimerTask.java @@ -0,0 +1,108 @@ +package net.tccn.timer; + +import net.tccn.timer.scheduled.Scheduled; +import net.tccn.timer.task.Job; +import net.tccn.timer.task.Task; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.Date; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Logger; + +/** + * Created by liangxianyou at 2018/7/23 14:33. + */ +public class TimerTask implements Task { + private Logger logger = Logger.getLogger(this.getClass().getSimpleName()); + private long startTime = System.currentTimeMillis(); + private AtomicInteger execCount = new AtomicInteger(); + protected String name; + private long theTime; + private Scheduled scheduled; + private boolean isComplete; + + private TimerExecutor timerExecutor; + private Job job; + + public static Task by(String name, Scheduled scheduled, Job job) { + TimerTask task = new TimerTask(); + task.name = name; + task.scheduled = scheduled; + task.job = job; + return task; + } + + @Override + public String getName() { + return name; + } + + @Override + public void setScheduled(Scheduled scheduled) { + this.scheduled = scheduled; + //this.theTime = Date.from(scheduled.theTime().atZone(ZoneId.systemDefault()).toInstant()).getTime(); + } + + @Override + public long nextTime() { + LocalDateTime next = scheduled.nextTime(); + this.theTime = Date.from(next.atZone(ZoneId.systemDefault()).toInstant()).getTime(); + + /*SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + System.out.println("下次执行:"+ sdf.format(next.toInstant(ZoneOffset.of("+8")).toEpochMilli()));*/ + return theTime; + } + + @Override + public long theTime() { + LocalDateTime next = scheduled.theTime(); + this.theTime = Date.from(next.atZone(ZoneId.systemDefault()).toInstant()).getTime(); + return theTime; + } + + @Override + public boolean isComplete() { + return isComplete; + } + + public void setComplete(boolean complete) { + if (isComplete = complete) + timerExecutor.remove(name); + } + + public int getExecCount() { + return execCount.get(); + } + + public TimerExecutor getTimerExecutor() { + return timerExecutor; + } + + public void setTimerExecutor(TimerExecutor timerExecutor) { + this.timerExecutor = timerExecutor; + } + + public long startTime() { + return startTime; + } + + @Override + public void run() { + //没有完成任务,继续执行 + if (!isComplete) { + int count = execCount.incrementAndGet(); // 执行次数+1 + + long start = System.currentTimeMillis(); + job.execute(this); + long end = System.currentTimeMillis(); + logger.finest(String.format("task [%s] : not complete -> %s, time: %s ms, exec count: %s.", getName(), isComplete ? "had complete" : "not complete", end - start, count)); + + if (!isComplete) { + timerExecutor.add(this, true); + } + } + + } +} + diff --git a/src/net/tccn/timer/Timers.java b/src/net/tccn/timer/Timers.java new file mode 100644 index 0000000..c84480a --- /dev/null +++ b/src/net/tccn/timer/Timers.java @@ -0,0 +1,45 @@ +package net.tccn.timer; + +import net.tccn.timer.scheduled.ScheduledCycle; +import org.redkale.util.Utility; + +import java.util.function.Supplier; + +public class Timers { + + private static TimerExecutor timerExecutor = new TimerExecutor(1); + + /** + * 本地延时重试 + * @param supplier + * @param millis + * @param maxCount + */ + public static void tryDelay(Supplier supplier, long millis, int maxCount) { + timerExecutor.add(TimerTask.by("try-delay-task-" + Utility.uuid(), ScheduledCycle.of(0), task -> { + if (supplier.get() || task.getExecCount() == maxCount) { + task.setComplete(true); + } + + if (task.getExecCount() == 1) { + task.setScheduled(ScheduledCycle.of(millis)); + } + })); + + + } + + /** + * 本地延时:延时时间极短的场景下使用 (如:1分钟内) + * @param runnable + * @param millis + */ + public static void delay(Runnable runnable, long millis) { + timerExecutor.add(TimerTask.by("delay-task-" + Utility.uuid(), ScheduledCycle.of(millis), task -> { + runnable.run(); + task.setComplete(true); + })); + } + + +} diff --git a/src/net/tccn/timer/queue/TimerQueue.java b/src/net/tccn/timer/queue/TimerQueue.java new file mode 100644 index 0000000..5a0e8c8 --- /dev/null +++ b/src/net/tccn/timer/queue/TimerQueue.java @@ -0,0 +1,105 @@ +package net.tccn.timer.queue; + +import net.tccn.timer.task.Task; + +import java.util.LinkedList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Created by liangxianyou at 2018/7/23 14:07. + */ +public class TimerQueue { + private ReentrantLock lock = new ReentrantLock(); + private Condition isEmpty = lock.newCondition(); + private LinkedList queue = new LinkedList(); + + /** + * 新加调度任务 + * + * @param task + */ + public void push(Task task) { + try { + lock.lock(); + remove(task.getName()); + int inx = queue.size();//目标坐标 + while (inx > 0 && queue.get(inx - 1).theTime() > task.theTime()) { + inx--; + } + + queue.add(inx, task); + isEmpty.signal(); + } finally { + lock.unlock(); + } + } + + /** + * 调度等待执行的任务 + * + * @return + * @throws InterruptedException + */ + public Task take() throws InterruptedException { + try { + lock.lock(); + while (queue.size() == 0) { + isEmpty.await(); + } + + long currentTime = System.currentTimeMillis(); + long nextTime = queue.getFirst().theTime(); + + if (currentTime >= nextTime) { + return queue.removeFirst(); + } else { + isEmpty.await(nextTime - currentTime, TimeUnit.MILLISECONDS); + return take(); + } + } finally { + lock.unlock(); + } + } + + /** + * 删除指定名称的任务 + * + * @param name + * @return + */ + public Task remove(String name) { + return get(name, true); + } + + /** + * 返回指定名称的任务 + * + * @param name + * @return + */ + public Task get(String name) { + return get(name, false); + } + + private Task get(String name, boolean remove) { + try { + lock.lock(); + Task take = null; + for (int i = 0; i < queue.size(); i++) { + if (name.equals(queue.get(i).getName())) { + take = queue.get(i); + } + } + if (remove && take != null) { + queue.remove(take); + } + + isEmpty.signal(); + return take; + } finally { + lock.unlock(); + } + } +} diff --git a/src/net/tccn/timer/scheduled/Scheduled.java b/src/net/tccn/timer/scheduled/Scheduled.java new file mode 100644 index 0000000..31eb3ba --- /dev/null +++ b/src/net/tccn/timer/scheduled/Scheduled.java @@ -0,0 +1,23 @@ +package net.tccn.timer.scheduled; + +import java.time.LocalDateTime; + +/** + * @author: liangxianyou at 2018/8/5 17:35. + */ +public interface Scheduled { + + /** + * 下次执行时间 + * + * @return + */ + LocalDateTime nextTime(); + + /** + * 当前执行时间 + * + * @return + */ + LocalDateTime theTime(); +} diff --git a/src/net/tccn/timer/scheduled/ScheduledCycle.java b/src/net/tccn/timer/scheduled/ScheduledCycle.java new file mode 100644 index 0000000..c43796e --- /dev/null +++ b/src/net/tccn/timer/scheduled/ScheduledCycle.java @@ -0,0 +1,79 @@ +package net.tccn.timer.scheduled; + +import java.time.LocalDateTime; +import java.time.temporal.ChronoUnit; +import java.time.temporal.TemporalUnit; + +/** + * @author: liangxianyou at 2018/8/5 18:05. + */ +public class ScheduledCycle implements Scheduled { + + private LocalDateTime theTime; + private long period; + private TemporalUnit unit = ChronoUnit.MILLIS; + + private ScheduledCycle() { + } + + public static Scheduled of(String periodCfg) { + TemporalUnit unit = ChronoUnit.MILLIS; + String endchar = ""; + long period; + + if (periodCfg.matches("^\\d+[y,M,d,H,m,s]$")) { + endchar = periodCfg.substring(periodCfg.length() - 1); + period = Long.parseLong(periodCfg.substring(0, periodCfg.length() - 1)); + } else if (periodCfg.matches("^\\d+$")) { + period = Long.parseLong(periodCfg); + if (period <= 0) { + throw new IllegalArgumentException(String.format("ScheduledCycle period config error: [%s]", periodCfg)); + } + } else { + throw new IllegalArgumentException(String.format("ScheduledCycle period config error: [%s]", periodCfg)); + } + + if ("y".equals(endchar)) unit = ChronoUnit.YEARS; + else if ("M".equals(endchar)) unit = ChronoUnit.MONTHS; + else if ("d".equals(endchar)) unit = ChronoUnit.DAYS; + else if ("H".equals(endchar)) unit = ChronoUnit.HOURS; + else if ("m".equals(endchar)) unit = ChronoUnit.MINUTES; + else if ("s".equals(endchar)) unit = ChronoUnit.SECONDS; + + return of(period, unit); + } + + public static Scheduled of(long period) { + return of(period, ChronoUnit.MILLIS); + } + + public static Scheduled of(long period, TemporalUnit unit) { + LocalDateTime theTime = LocalDateTime.now().plus(period, unit); + return of(theTime, period, unit); + } + + public static Scheduled of(LocalDateTime startTime, long period) { + return of(startTime, period, ChronoUnit.MILLIS); + } + + public static Scheduled of(LocalDateTime startTime, long period, TemporalUnit unit) { + ScheduledCycle scheduled = new ScheduledCycle(); + scheduled.theTime = startTime; + scheduled.period = period; + scheduled.unit = unit; + return scheduled; + } + + @Override + public LocalDateTime nextTime() { + if (theTime.isAfter(LocalDateTime.now())) { + return theTime; + } + return theTime = theTime.plus(period, unit); + } + + @Override + public LocalDateTime theTime() { + return theTime; + } +} diff --git a/src/net/tccn/timer/scheduled/ScheduledExpres.java b/src/net/tccn/timer/scheduled/ScheduledExpres.java new file mode 100644 index 0000000..509b342 --- /dev/null +++ b/src/net/tccn/timer/scheduled/ScheduledExpres.java @@ -0,0 +1,499 @@ +package net.tccn.timer.scheduled; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * 时间解析器 + * + * @author: liangxianyou + */ +@SuppressWarnings("Duplicates") +public class ScheduledExpres implements Scheduled { + private int year; + private int month; + private int[] minutes; + private int[] hours; + private int[] days; + private int[] monthes; + private int[] weeks; + + private String cfg; + private String[] cfgArr; + private LocalDateTime theTime; + private int _y, _M, _d, _H, _m; + + @Deprecated + private ScheduledExpres(String cfg) { + this.cfg = cfg; + this.theTime = LocalDateTime.now(); + initTheTime(); + } + + @Deprecated + private ScheduledExpres(final LocalDateTime startTime, String cfg) { + LocalDateTime now = LocalDateTime.now(); + this.theTime = now.isAfter(startTime) ? now : startTime; + this.cfg = cfg; + initTheTime(); + } + + public static Scheduled of(String cfg) { + return new ScheduledExpres(cfg); + } + + public static Scheduled of(final LocalDateTime startTime, String cfg) { + return new ScheduledExpres(startTime, cfg); + } + + //寻找初始合法时间 + public void initTheTime() { + year = theTime.getYear(); + month = theTime.getMonthValue(); + cfgArr = cfg.split(" "); + + setWeeks(); + setMonthes(); + setDays(); + setHours(); + setMinutes(); + + _y = theTime.getYear(); + _M = theTime.getMonthValue(); + _d = theTime.getDayOfMonth(); + _H = theTime.getHour(); + _m = theTime.getMinute(); + + String cmd = "";//y M d H m + if (days.length == 0) cmd = "M"; + do { + carry(cmd); + int inx; + if ((inx = nowOk(monthes, _M)) < 0) { + cmd = "y"; + continue; + } + _M = monthes[inx]; + + if ((inx = nowOk(days, _d)) < 0) { + cmd = "M"; + continue; + } + _d = days[inx]; + + if ((inx = nowOk(hours, _H)) < 0) { + cmd = "d"; + continue; + } + _H = hours[inx]; + + if ((inx = nowOk(minutes, _m)) < 0) { + cmd = "H"; + continue; + } + _m = minutes[inx]; + break; + } while (true); + + theTime = LocalDateTime.of(_y, _M, _d, _H, _m); + LocalDateTime now = LocalDateTime.now(); + while (theTime.isBefore(now)) { + theTime = carry("m"); + } + } + + /** + * 下一次执行的时间 + * + * @return + */ + @Override + public LocalDateTime nextTime() { + if (theTime.isAfter(LocalDateTime.now())) { + return theTime; + } + return theTime = carry("m"); + } + + @Override + public LocalDateTime theTime() { + return theTime; + } + + /** + * 通过发送指令进行进位 + * + * @param cmd 进位指令 + */ + private LocalDateTime carry(String cmd) { + int inx; + while (!"".equals(cmd)) { + switch (cmd) { + case "y": + _y = this.year = ++_y; + _M = this.month = monthes[0]; + setDays(); + if (days.length == 0) { + cmd = "M"; + continue; + } + _d = days[0]; + _H = hours[0]; + _m = minutes[0]; + break; + case "M": + if (_M < monthes[0]) { + _M = monthes[0]; + break; + } + inx = Arrays.binarySearch(monthes, _M); + if (inx < 0 || inx >= monthes.length - 1) { + cmd = "y"; + continue; + } + _M = this.month = monthes[inx + 1]; + setDays(); + if (days.length == 0) { + cmd = "M"; + continue; + } + _d = days[0]; + _H = hours[0]; + _m = minutes[0]; + break; + case "d": + if (_d < days[0]) { + _d = days[0]; + break; + } + inx = Arrays.binarySearch(days, _d); + if (inx < 0 || inx >= days.length - 1) { + cmd = "M"; + continue; + } + _d = days[inx + 1]; + _H = hours[0]; + _m = minutes[0]; + break; + case "H": + if (_H < hours[0]) { + _H = hours[0]; + break; + } + inx = Arrays.binarySearch(hours, _H); + if (inx < 0 || inx >= hours.length - 1) { + cmd = "d"; + continue; + } + _H = hours[inx + 1]; + _m = minutes[0]; + break; + case "m": + if (_m < minutes[0]) { + _m = minutes[0]; + break; + } + inx = Arrays.binarySearch(minutes, _m); + if (inx < 0 || inx >= minutes.length - 1) { + cmd = "H"; + continue; + } + _m = minutes[inx + 1]; + break; + } + cmd = ""; + } + return LocalDateTime.of(_y, _M, _d, _H, _m); + } + + /** + * 得到初始合法时间的索引 + * + * @param arr 合法时间序列 + * @param n 初始选中值 + * @return 合法时间的索引 + */ + private int nowOk(int[] arr, int n) { + if (arr == null || arr.length == 0) return -1; + if (arr[0] > n) + return 0; + if (arr[arr.length - 1] < n) + return 0; + if (arr[arr.length - 1] == n) + return arr.length - 1; + + for (int i = 0; i < arr.length - 1; i++) { + if ((arr[i] < n && arr[i + 1] > n) || arr[i] == n) { + return i; + } + } + + return -1; + } + + /** + * 以下为 初始化合法时间 weeks,monthes,days,hour,minutes 序列 + */ + private void setMinutes() { + String cfg = cfgArr[0]; + if ("*".equals(cfg)) {//* + minutes = new int[60]; + for (int i = 0; i < 60; i++) { + minutes[i] = i; + } + } else if (cfg.matches("^[0-5]??[0-9]??$")) {//n + minutes = new int[1]; + minutes[0] = Integer.parseInt(cfg); + } else if (cfg.matches("^[*]/[0-9]+$")) {// */5 + String[] strArr = cfg.split("/"); + int p = Integer.parseInt(strArr[1]); + minutes = new int[60 / p]; + for (int i = 0; i < minutes.length; i++) { + minutes[i] = i * p; + } + } else if (cfg.matches("^([0-5]??[0-9]??,)+([0-5]??[0-9]??)?$")) {//1,3 + String[] strings = cfg.split(","); + minutes = new int[strings.length]; + for (int i = 0; i < strings.length; i++) { + minutes[i] = Integer.parseInt(strings[i]); + } + } else if (cfg.matches("^[0-5]??[0-9]??\\-[0-5]??[0-9]??$")) {//1-3 + String[] split = cfg.split("-"); + int s = Integer.parseInt(split[0]); + int e = Integer.parseInt(split[1]); + + minutes = new int[e - s + 1]; + for (int i = 0; i < minutes.length; i++) { + minutes[i] = s + i; + } + } else if (cfg.matches("^[0-5]??[0-9]??\\-[0-5]??[0-9]??/[0-5]??[0-9]??$")) {//3-18/5 + String[] strArr = cfg.split("/"); + String[] str2Arr = strArr[0].split("-"); + int s = Integer.parseInt(str2Arr[0]); + int e = Integer.parseInt(str2Arr[1]); + int p = Integer.parseInt(strArr[1]); + + minutes = new int[(e - s) / p]; + for (int i = 0; i < minutes.length; i++) { + minutes[i] = s + i * p; + } + } + } + + private void setHours() { + String cfg = cfgArr[1]; + if ("*".equals(cfg)) {//* + hours = new int[24]; + for (int i = 0; i < hours.length; i++) { + hours[i] = i; + } + } else if (cfg.matches("^[0-5]??[0-9]??$")) {//n + hours = new int[1]; + hours[0] = Integer.parseInt(cfg); + } else if (cfg.matches("^[*]/[0-9]+$")) {// */5 + String[] strArr = cfg.split("/"); + int p = Integer.parseInt(strArr[1]); + hours = new int[24 / p]; + for (int i = 0; i < hours.length; i++) { + hours[i] = i * p; + } + } else if (cfg.matches("^([0-5]??[0-9]??,)+([0-5]??[0-9]??)?$")) {//1,3 + String[] strArr = cfg.split(","); + hours = new int[strArr.length]; + for (int i = 0; i < strArr.length; i++) { + hours[i] = Integer.parseInt(strArr[i]); + } + } else if (cfg.matches("^[0-5]??[0-9]??\\-[0-5]??[0-9]??$")) {//1-3 + String[] split = cfg.split("-"); + int s = Integer.parseInt(split[0]); + int e = Integer.parseInt(split[1]); + + hours = new int[e - s + 1]; + for (int i = 0; i < hours.length; i++) { + hours[i] = s + i; + } + } else if (cfg.matches("^[0-5]??[0-9]??\\-[0-5]??[0-9]??/[0-5]??[0-9]??$")) {//3-18/5 + String[] strArr = cfg.split("/"); + String[] str2Arr = strArr[0].split("-"); + int s = Integer.parseInt(str2Arr[0]); + int e = Integer.parseInt(str2Arr[1]); + int p = Integer.parseInt(strArr[1]); + + hours = new int[(e - s) / p]; + for (int i = 0; i < hours.length; i++) { + hours[i] = s + i * p; + } + } + } + + private void setWeeks() { + String cfg = cfgArr[4]; + if ("*".equals(cfg)) {//* + weeks = new int[7]; + for (int i = 0; i < weeks.length; i++) { + weeks[i] = i + 1; + } + } else if (cfg.matches("^[0-5]??[0-9]??$")) {//n + weeks = new int[1]; + weeks[0] = Integer.parseInt(cfg); + } else if (cfg.matches("^[*]/[0-9]+$")) {// */5 + String[] strArr = cfg.split("/"); + int p = Integer.parseInt(strArr[1]); + weeks = new int[7 / p]; + for (int i = 0; i < weeks.length; i++) { + weeks[i] = i * p; + } + } else if (cfg.matches("^([0-5]??[0-9]??,)+([0-5]??[0-9]??)?$")) {//1,3 + String[] strArr = cfg.split(","); + weeks = new int[strArr.length]; + for (int i = 0; i < strArr.length; i++) { + weeks[i] = Integer.parseInt(strArr[i]); + } + } else if (cfg.matches("^[0-5]??[0-9]??\\-[0-5]??[0-9]??$")) {//1-3 + String[] split = cfg.split("-"); + int s = Integer.parseInt(split[0]); + int e = Integer.parseInt(split[1]); + + weeks = new int[e - s + 1]; + for (int i = 0; i < weeks.length; i++) { + weeks[i] = s + i; + } + } else if (cfg.matches("^[0-5]??[0-9]??\\-[0-5]??[0-9]??/[0-5]??[0-9]??$")) {//3-18/5 + String[] strArr = cfg.split("/"); + String[] str2Arr = strArr[0].split("-"); + int s = Integer.parseInt(str2Arr[0]); + int e = Integer.parseInt(str2Arr[1]); + int p = Integer.parseInt(strArr[1]); + + weeks = new int[(e - s) / p]; + for (int i = 0; i < weeks.length; i++) { + weeks[i] = s + i * p; + } + } + } + + private void setMonthes() { + String cfg = cfgArr[3]; + if ("*".equals(cfg)) {//* + monthes = new int[12]; + for (int i = 0; i < monthes.length; i++) { + monthes[i] = i + 1; + } + } else if (cfg.matches("^[0-5]??[0-9]??$")) {//n + monthes = new int[1]; + monthes[0] = Integer.parseInt(cfg); + } else if (cfg.matches("^[*]/[0-9]+$")) {// */5 + String[] strArr = cfg.split("/"); + int p = Integer.parseInt(strArr[1]); + monthes = new int[12 / p]; + for (int i = 0; i < monthes.length; i++) { + monthes[i] = i * p; + } + } else if (cfg.matches("^([0-5]??[0-9]??,)+([0-5]??[0-9]??)?$")) {//1,3 + String[] strArr = cfg.split(","); + monthes = new int[strArr.length]; + for (int i = 0; i < strArr.length; i++) { + monthes[i] = Integer.parseInt(strArr[i]); + } + } else if (cfg.matches("^[0-5]??[0-9]??\\-[0-5]??[0-9]??$")) {//1-3 + String[] split = cfg.split("-"); + int s = Integer.parseInt(split[0]); + int e = Integer.parseInt(split[1]); + + monthes = new int[e - s + 1]; + for (int i = 0; i < monthes.length; i++) { + monthes[i] = s + i; + } + } else if (cfg.matches("^[0-5]??[0-9]??\\-[0-5]??[0-9]??/[0-5]??[0-9]??$")) {//3-18/5 + String[] strArr = cfg.split("/"); + String[] str2Arr = strArr[0].split("-"); + int s = Integer.parseInt(str2Arr[0]); + int e = Integer.parseInt(str2Arr[1]); + int p = Integer.parseInt(strArr[1]); + + monthes = new int[(e - s) / p]; + for (int i = 0; i < monthes.length; i++) { + monthes[i] = s + i * p; + } + } + } + + private void setDays() { + String cfg = cfgArr[2]; + //当前月份总天数, + LocalDate firstDay = LocalDate.of(year, month, 1); + int lengthOfMonth = firstDay.lengthOfMonth(); + + if ("*".equals(cfg)) {//* + days = new int[lengthOfMonth]; + for (int i = 0; i < days.length; i++) { + days[i] = i + 1; + } + } else if (cfg.matches("^[0-5]??[0-9]??$")) {//n + days = new int[1]; + days[0] = Integer.parseInt(cfg); + } else if (cfg.matches("^[*]/[0-9]+$")) {// */5 + String[] strArr = cfg.split("/"); + int p = Integer.parseInt(strArr[1]); + days = new int[lengthOfMonth / p]; + for (int i = 0; i < days.length; i++) { + days[i] = i * p; + } + } else if (cfg.matches("^([0-5]??[0-9]??,)+([0-5]??[0-9]??)?$")) {//1,3 + String[] strArr = cfg.split(","); + days = new int[strArr.length]; + for (int i = 0; i < strArr.length; i++) { + days[i] = Integer.parseInt(strArr[i]); + } + } else if (cfg.matches("^[0-5]??[0-9]??\\-[0-5]??[0-9]??$")) {//1-3 + String[] split = cfg.split("-"); + int s = Integer.parseInt(split[0]); + int e = Integer.parseInt(split[1]); + + days = new int[e - s + 1]; + for (int i = 0; i < days.length; i++) { + days[i] = s + i; + } + } else if (cfg.matches("^[0-5]??[0-9]??\\-[0-5]??[0-9]??/[0-5]??[0-9]??$")) {//3-18/5 + String[] strArr = cfg.split("/"); + String[] str2Arr = strArr[0].split("-"); + int s = Integer.parseInt(str2Arr[0]); + int e = Integer.parseInt(str2Arr[1]); + int p = Integer.parseInt(strArr[1]); + + days = new int[(e - s) / p]; + for (int i = 0; i < days.length; i++) { + days[i] = s + i * p; + } + } + + int firstWeek = firstDay.getDayOfWeek().getValue(); + List allDay = new ArrayList<>(); + for (int i = 0; i < days.length; i++) { + //int week = 7 - Math.abs(i - firstWeek) % 7;//当前星期X + int week; + int d = days[i]; + if (d + firstWeek <= 8) { + week = firstWeek + d - 1; + } else { + week = (d - (8 - firstWeek)) % 7; + if (week == 0) week = 7; + } + + //System.out.printf("M:%s,d:%s,w:%s%n", month, d, week); + + if (Arrays.binarySearch(weeks, week) > -1) { + allDay.add(d);//加入日期 + } + } + + days = new int[allDay.size()]; + for (int i = 0; i < allDay.size(); i++) { + days[i] = allDay.get(i); + } + } + +} diff --git a/src/net/tccn/timer/task/Job.java b/src/net/tccn/timer/task/Job.java new file mode 100644 index 0000000..ddfd6f7 --- /dev/null +++ b/src/net/tccn/timer/task/Job.java @@ -0,0 +1,14 @@ +package net.tccn.timer.task; + +/** + * @author: liangxianyou at 2018/12/8 17:24. + */ +@FunctionalInterface +public interface Job { + + /** + * 任务执行的内容 + */ + void execute(Task task); + +} diff --git a/src/net/tccn/timer/task/Task.java b/src/net/tccn/timer/task/Task.java new file mode 100644 index 0000000..c488dab --- /dev/null +++ b/src/net/tccn/timer/task/Task.java @@ -0,0 +1,69 @@ +package net.tccn.timer.task; + +import net.tccn.timer.TimerExecutor; +import net.tccn.timer.scheduled.Scheduled; + +/** + * @author: liangxianyou at 2018/8/5 19:32. + */ +public interface Task extends Runnable { + + /** + * 得到任务名称 + * + * @return + */ + String getName(); + + /** + * 设置任务执行计划 + * + * @param scheduled + */ + void setScheduled(Scheduled scheduled); + + /** + * 得到下一次执行计划的时间,并设置thenTime + * + * @return + */ + long nextTime(); + + /** + * 任务即将执行的时间点 + * + * @return + */ + long theTime(); + + /** + * 是否完成 + * + * @return + */ + boolean isComplete(); + + /** + * 完成任务(结束标记) + * + * @param complete + */ + void setComplete(boolean complete); + + /** + * 开始时间(创建时间) + * + * @return + */ + long startTime(); + + TimerExecutor getTimerExecutor(); + + void setTimerExecutor(TimerExecutor timerExecutor); + + /** + * 得到总执行次数 + * @return + */ + int getExecCount(); +} diff --git a/test/com/zdemo/test/AppTest.java b/test/com/zdemo/test/AppTest.java index f8667a7..428bb92 100644 --- a/test/com/zdemo/test/AppTest.java +++ b/test/com/zdemo/test/AppTest.java @@ -2,6 +2,8 @@ package com.zdemo.test; import com.zdemo.Event; import com.zdemo.IProducer; +import com.zdemo.zhub.Delays; +import net.tccn.timer.Timers; import org.junit.Test; import org.redkale.boot.Application; import org.redkale.convert.json.JsonConvert; @@ -9,10 +11,13 @@ import org.redkale.convert.json.JsonConvert; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.DelayQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.logging.Logger; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** * 消息发布订阅测试 @@ -23,10 +28,15 @@ public class AppTest { @Test public void runConsumer() { try { + // String str = ", response = {\"success\":true,\"retcode\":0,\"result\":{\"age\":0,\"explevel\":1,\"face\":\"https://aimg.woaihaoyouxi.com/haogame/202106/pic/20210629095545FmGt-v9NYqyNZ_Q6_y3zM_RMrDgd.jpg\",\"followed\":0,\"gender\":0,\"idenstatus\":0,\"matchcatelist\":[{\"catename\":\"足球\",\"catepic\":\"https://aimg.woaihaoyouxi.com/haogame/202107/pic/20210714103556FoG5ICf_7BFx6Idyo3TYpJQ7tmfG.png\",\"matchcateid\":1},{\"catename\":\"篮球\",\"catepic\":\"https://aimg.woaihaoyouxi.com/haogame/202107/pic/20210714103636FklsXTn1f6Jlsam8Jk-yFB7Upo3C.png\",\"matchcateid\":2}],\"matchcates\":\"2,1\",\"mobile\":\"18515190967\",\"regtime\":1624931714781,\"sessionid\":\"d1fc447753bd4700ad29674a753030fa\",\"status\":10,\"userid\":100463,\"username\":\"绝尘\",\"userno\":100463}}"; + String str = "hello你好"; + + System.out.println(str.length()); + //启动并开启消费监听 MyConsumer consumer = Application.singleton(MyConsumer.class); - consumer.subscribe("a", str -> { + consumer.subscribe("a", strx -> { logger.info("我收到了消息 a 事件:" + str); }); @@ -333,5 +343,108 @@ public class AppTest { Event of = Event.of("A", Map.of("b", 1)); System.out.println(JsonConvert.root().convertTo(of)); + + String str = "❦别人家的女娃子🤞🏻ꚢ"; + + /* + System.out.println("别人家的女娃子🤞🏻".length());*/ + System.out.println(strLength(str)); + System.out.println(getWordCount(str)); + /*try { + System.out.println("别人家的女娃子🤞🏻".getBytes("UTF-8").length); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + } + System.out.println("系统默认编码方式:" + System.getProperty("file.encoding"));*/ + } + + + public static int strLength(String value) { + int valueLength = 0; + String chinese = "[\u4e00-\u9fa5]"; + for (int i = 0; i < value.length(); i++) { + String temp = value.substring(i, i + 1); + if (temp.matches(chinese)) { + valueLength += 2; + } else { + valueLength += 1; + } + } + return valueLength; + } + + public int getWordCount(String str) { + str = str.replaceAll("[^\\x00-\\xff]", "*"); + return str.length(); + } + + @Test + public void delay() { + DelayQueue delayQueue = new DelayQueue<>(); + + logger.info("加入延时任务1"); + delayQueue.add(new Delays(5000, () -> { + logger.info("任务1 延时任务执行了!"); + })); + + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + logger.info("加入延时任务2"); + delayQueue.add(new Delays(5000, () -> { + logger.info("任务2 延时任务执行了!"); + })); + + try { + while (true) { + Delays delay = delayQueue.take(); + + delay.run(); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Test + public void regTest() { + // 按指定模式在字符串查找 + String line = "This order was placed for QT3000! OK?"; + String pattern = "(\\D*)(\\d+)(.*)"; + + // 创建 Pattern 对象 + Pattern r = Pattern.compile(pattern); + + // 现在创建 matcher 对象 + Matcher m = r.matcher(line); + if (m.find()) { + System.out.println("Found value: " + m.group(0)); + System.out.println("Found value: " + m.group(1)); + System.out.println("Found value: " + m.group(2)); + System.out.println("Found value: " + m.group(3)); + } else { + System.out.println("NO MATCH"); + } + } + + @Test + public void timersTest() { + Timers.tryDelay(() -> { + logger.info("xx:" + System.currentTimeMillis()); + return true; + }, 1000, 5); + + Timers.delay(() -> { + System.out.println("11"); + }, 3000); + + try { + Thread.sleep(100000); + } catch (InterruptedException e) { + e.printStackTrace(); + } } } diff --git a/src/com/zdemo/zhub/Delays.java b/test/com/zdemo/test/Delays.java similarity index 100% rename from src/com/zdemo/zhub/Delays.java rename to test/com/zdemo/test/Delays.java diff --git a/test/com/zdemo/test/HelloService.java b/test/com/zdemo/test/HelloService.java index 072f305..2091cc9 100644 --- a/test/com/zdemo/test/HelloService.java +++ b/test/com/zdemo/test/HelloService.java @@ -3,12 +3,17 @@ package com.zdemo.test; import com.zdemo.IConsumer; import com.zdemo.zhub.RpcResult; import com.zdemo.zhub.ZHubClient; +import org.redkale.net.http.RestMapping; import org.redkale.net.http.RestService; import org.redkale.service.Service; import org.redkale.util.AnyValue; import org.redkale.util.TypeToken; import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; @RestService(automapping = true) public class HelloService implements Service { @@ -16,22 +21,212 @@ public class HelloService implements Service { @Resource(name = "zhub") private ZHubClient zhub; + private net.tccn.zhub.ZHubClient zhubx = null; + + @Override public void init(AnyValue config) { + + CompletableFuture.runAsync(() -> { + zhubx = new net.tccn.zhub.ZHubClient("127.0.0.1", 1216, "g-dev", "DEV-LOCAL"); + //zhubx = new net.tccn.zhub.ZHubClient("47.111.150.118", 6066, "g-dev", "DEV-LOCAL"); + }); + // Function, RpcResult> fun - zhub.rpcSubscribe("x", new TypeToken() { + /*zhub.rpcSubscribe("x", new TypeToken() { }, r -> { - return r.buildResp(r.getValue().toUpperCase() + ": Ok"); + return r.buildResp(Map.of("v", r.getValue().toUpperCase() + ": Ok")); + });*/ + zhub.rpcSubscribe("y", new TypeToken() { + }, r -> { + + return r.buildResp(Map.of("v", r.getValue().toUpperCase() + ": Ok")); }); + + zhub.subscribe("sport:reqtime", x -> { + System.out.println(x); + }); + zhub.subscribe("abx", x -> { + System.out.println(x); + }); + + try { + Thread.sleep(010); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + /*zhub.delay("sport:reqtime", "别✈人家的✦女娃子❤🤞🏻", 0); + zhub.delay("sport:reqtime", "别人家的女娃子➾🤞🏻", 0); + zhub.delay("sport:reqtime", "❤别人家✉�的女娃子❤🤞🏻", 0); + zhub.delay("sport:reqtime", "中文特殊符号:『』 $ £ ♀ ‖ 「」\n" + + "英文:# + = & ﹉ .. ^ \"\" ·{ } % – ' €\n" + + "数学:+× = - ° ± < > ℃ ㎡ ∑ ≥ ∫ ㏄ ⊥ ≯ ∠ ∴ ∈ ∧ ∵ ≮ ∪ ㎝ ㏑ ≌ ㎞ № § ℉ ÷ % ‰ ㎎ ㎏ ㎜ ㏒ ⊙ ∮ ∝ ∞ º ¹ ² ³ ½ ¾ ¼ ≈ ≡ ≠ ≤ ≦ ≧ ∽ ∷ / ∨ ∏ ∩ ⌒ √Ψ ¤ ‖ ¶\n" + + "特殊:♤ ♧ ♡ ♢ ♪ ♬ ♭ ✔ ✘ ♞ ♟ ↪ ↣ ♚ ♛ ♝ ☞ ☜ ⇔ ☆ ★ □ ■ ○ ● △ ▲ ▽ ▼ ◇ ◆ ♀ ♂ ※ ↓ ↑ ↔ ↖ ↙ ↗ ↘ ← → ♣ ♠ ♥ ◎ ◣ ◢ ◤ ◥ 卍 ℡ ⊙ ㊣ ® © ™ ㈱ 囍\n" + + "序号:①②③④⑤⑥⑦⑧⑨⑩㈠㈡㈢㈣㈤㈥㈦㈧㈨㈩⑴ ⑵ ⑶ ⑷ ⑸ ⑹ ⑺ ⑻ ⑼ ⑽ ⒈ ⒉ ⒊ ⒋ ⒌ ⒍ ⒎ ⒏ ⒐ ⒑ Ⅰ Ⅱ Ⅲ Ⅳ Ⅴ Ⅵ Ⅶ Ⅷ ⅨⅩ\n" + + "日文:アイウエオァィゥェォカキクケコガギグゲゴサシスセソザジズゼゾタチツテトダヂヅデドッナニヌネノハヒフヘホバビブベボパピプペポマミムメモャヤュユョラリヨルレロワヰヱヲンヴヵヶヽヾ゛゜ー、。「「あいうえおぁぃぅぇぉかきくけこがぎぐげごさしすせそざじずぜぞたちつてでどっなにぬねのはひふへ」」ほばびぶべぼぱぴぷぺぽまみむめもやゆよゃゅょらりるれろわをんゎ゛゜ー、。「」\n" + + "部首:犭 凵 巛 冖 氵 廴 讠 亻 钅 宀 亠 忄 辶 弋 饣 刂 阝 冫 卩 疒 艹 疋 豸 冂 匸 扌 丬 屮衤 礻 勹 彳 彡", 0); +*/ } - public RpcResult x(String v) { + @RestMapping + public RpcResult x(String v) { if (v == null) { v = ""; } - RpcResult x = zhub.rpc("x", v, IConsumer.TYPE_TOKEN_STRING).join(); - return x; + List list = new ArrayList(); + + for (int i = 0; i < 100; i++) { + long start = System.currentTimeMillis(); + /*RpcResult x = zhub.rpc("rpc:file:up-token", Map.of(), new TypeToken<>() { + });*/ + + /*list.add(zhub.rpcAsync("x", v + i, new TypeToken<>() { + }));*/ + zhub.publish("x", v + i); + + System.out.println("time: " + (System.currentTimeMillis() - start) + " ms"); + //System.out.println(x.getResult().get("v")); + } + + return zhub.rpc("x", v, IConsumer.TYPE_TOKEN_STRING); + } + + @RestMapping + public RpcResult d(String v) { + RpcResult rpc = zhub.rpc("x", v, IConsumer.TYPE_TOKEN_STRING); + return rpc; + } + + @RestMapping + public String y(String v) { + if (v == null) { + v = ""; + } + + for (int i = 0; i < 100; i++) { + long start = System.currentTimeMillis(); + /*RpcResult x = zhub.rpc("rpc:file:up-token", Map.of(), new TypeToken<>() { + });*/ + + net.tccn.zhub.RpcResult x = zhubx.rpc("y", v + i, new com.google.gson.reflect.TypeToken<>() { + }); + + System.out.println("time: " + (System.currentTimeMillis() - start) + " ms"); + + //System.out.println(x.getResult()); + } + + return "ok"; + } + + + public static void main(String[] args) { + // "\"别人家的女娃子\uD83E\uDD1E\uD83C\uDFFB\"" + /*String s = "别人家的女娃子\uD83E\uDD1E\uD83C\uDFFB"; + System.out.println("别人家的女娃子🤞🏻".length()); + + byte[] bytes = "别人家的女娃子🤞🏻".getBytes(); + System.out.println(bytes.length); + + System.out.println(unicodeToUtf8(s)); + System.out.println(utf8ToUnicode("别人家的女娃子🤞🏻")); + */ + + //ExecutorService pool = Executors.newFixedThreadPool(5); + + + } + + + public static String utf8ToUnicode(String inStr) { + char[] myBuffer = inStr.toCharArray(); + + StringBuffer sb = new StringBuffer(); + for (int i = 0; i < inStr.length(); i++) { + Character.UnicodeBlock ub = Character.UnicodeBlock.of(myBuffer[i]); + if (ub == Character.UnicodeBlock.BASIC_LATIN) { + //英文及数字等 + sb.append(myBuffer[i]); + } else if (ub == Character.UnicodeBlock.HALFWIDTH_AND_FULLWIDTH_FORMS) { + //全角半角字符 + int j = (int) myBuffer[i] - 65248; + sb.append((char) j); + } else { + //汉字 + short s = (short) myBuffer[i]; + String hexS = Integer.toHexString(s); + String unicode = "\\u" + hexS; + sb.append(unicode.toLowerCase()); + } + } + return sb.toString(); + } + + public static String unicodeToUtf8(String theString) { + char aChar; + int len = theString.length(); + StringBuffer outBuffer = new StringBuffer(len); + for (int x = 0; x < len; ) { + aChar = theString.charAt(x++); + if (aChar == '\\') { + aChar = theString.charAt(x++); + if (aChar == 'u') { + // Read the xxxx + int value = 0; + for (int i = 0; i < 4; i++) { + aChar = theString.charAt(x++); + switch (aChar) { + case '0': + case '1': + case '2': + case '3': + case '4': + case '5': + case '6': + case '7': + case '8': + case '9': + value = (value << 4) + aChar - '0'; + break; + case 'a': + case 'b': + case 'c': + case 'd': + case 'e': + case 'f': + value = (value << 4) + 10 + aChar - 'a'; + break; + case 'A': + case 'B': + case 'C': + case 'D': + case 'E': + case 'F': + value = (value << 4) + 10 + aChar - 'A'; + break; + default: + throw new IllegalArgumentException( + "Malformed \\uxxxx encoding."); + } + } + outBuffer.append((char) value); + } else { + if (aChar == 't') + aChar = '\t'; + else if (aChar == 'r') + aChar = '\r'; + else if (aChar == 'n') + aChar = '\n'; + else if (aChar == 'f') + aChar = '\f'; + outBuffer.append(aChar); + } + } else + outBuffer.append(aChar); + } + return outBuffer.toString(); } }