From 644186a398438103e98a91f2f115ca600fc31da7 Mon Sep 17 00:00:00 2001 From: redkale Date: Wed, 14 Jun 2023 22:43:01 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96CacheMemorySource?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../convert/ext/InetAddressSimpledCoder.java | 10 +- .../convert/json/JsonByteBufferReader.java | 27 +- .../org/redkale/convert/json/JsonReader.java | 14 +- .../org/redkale/source/CacheMemorySource.java | 1951 ++++++++--------- .../java/org/redkale/source/CacheSource.java | 426 ++-- 5 files changed, 1109 insertions(+), 1319 deletions(-) diff --git a/src/main/java/org/redkale/convert/ext/InetAddressSimpledCoder.java b/src/main/java/org/redkale/convert/ext/InetAddressSimpledCoder.java index a85531045..e4a616f1c 100644 --- a/src/main/java/org/redkale/convert/ext/InetAddressSimpledCoder.java +++ b/src/main/java/org/redkale/convert/ext/InetAddressSimpledCoder.java @@ -139,16 +139,12 @@ public final class InetAddressSimpledCoder e @Override public InetSocketAddress convertFrom(R in) { - String str = StringSimpledCoder.instance.convertFrom(in); + String str = in.readStringValue(); if (str == null) { return null; } - try { - int pos = str.indexOf(':'); - return new InetSocketAddress(str.substring(0, pos), Integer.parseInt(str.substring(pos + 1))); - } catch (Exception ex) { - return null; - } + int pos = str.indexOf(':'); + return new InetSocketAddress(str.substring(0, pos), Integer.parseInt(str.substring(pos + 1))); } } diff --git a/src/main/java/org/redkale/convert/json/JsonByteBufferReader.java b/src/main/java/org/redkale/convert/json/JsonByteBufferReader.java index 420a5f53b..b25930e4d 100644 --- a/src/main/java/org/redkale/convert/json/JsonByteBufferReader.java +++ b/src/main/java/org/redkale/convert/json/JsonByteBufferReader.java @@ -202,6 +202,21 @@ public class JsonByteBufferReader extends JsonReader { */ @Override public final String readSmallString() { + return readString(true); + } + + /** + * 读取字符串, 必须是"或者'包围的字符串值 + * + * @return String值 + */ + @Override + public final String readString() { + return readString(true); + } + + @Override + protected String readString(boolean flag) { char ch = nextGoodChar(true); if (ch == 0) { return null; @@ -282,7 +297,7 @@ public class JsonByteBufferReader extends JsonReader { default: throw new ConvertException("illegal escape(" + c + ") (position = " + this.position + ")"); } - } else if (ch == ',' || ch == ']' || ch == '}' || ch <= ' ' || ch == ':') { // ch <= ' ' 包含 0 + } else if (ch == ',' || ch == ']' || ch == '}' || ch <= ' ' || (flag && ch == ':')) { // ch <= ' ' 包含 0 backChar(ch); break; } else { @@ -294,14 +309,4 @@ public class JsonByteBufferReader extends JsonReader { } } - /** - * 读取字符串, 必须是"或者'包围的字符串值 - * - * @return String值 - */ - @Override - public final String readString() { - return readSmallString(); - } - } diff --git a/src/main/java/org/redkale/convert/json/JsonReader.java b/src/main/java/org/redkale/convert/json/JsonReader.java index 0aa2273e4..2455549cf 100644 --- a/src/main/java/org/redkale/convert/json/JsonReader.java +++ b/src/main/java/org/redkale/convert/json/JsonReader.java @@ -824,6 +824,14 @@ public class JsonReader extends Reader { */ @Override public String readString() { + return readString(true); + } + + public final String readStringValue() { + return readString(false); + } + + protected String readString(boolean flag) { final char[] text0 = this.text; char expected = nextGoodChar(true); int currpos = this.position; @@ -833,7 +841,7 @@ public class JsonReader extends Reader { this.position = currpos; if (text0.length > currpos + 4) { char ch = text0[currpos + 1]; - if (ch == ',' || ch <= ' ' || ch == '}' || ch == ']' || ch == ':') { + if (ch == ',' || ch <= ' ' || ch == '}' || ch == ']' || (flag && ch == ':')) { return null; } final int start = currpos - 3; @@ -842,7 +850,7 @@ public class JsonReader extends Reader { break; } ch = text0[currpos]; - if (ch == ',' || ch <= ' ' || ch == '}' || ch == ']' || ch == ':') { + if (ch == ',' || ch <= ' ' || ch == '}' || ch == ']' || (flag && ch == ':')) { break; } currpos++; @@ -863,7 +871,7 @@ public class JsonReader extends Reader { break; } char ch = text0[currpos]; - if (ch == ',' || ch <= ' ' || ch == '}' || ch == ']' || ch == ':') { + if (ch == ',' || ch <= ' ' || ch == '}' || ch == ']' || (flag && ch == ':')) { break; } currpos++; diff --git a/src/main/java/org/redkale/source/CacheMemorySource.java b/src/main/java/org/redkale/source/CacheMemorySource.java index 5ec3e35e1..0ae11e73d 100644 --- a/src/main/java/org/redkale/source/CacheMemorySource.java +++ b/src/main/java/org/redkale/source/CacheMemorySource.java @@ -14,6 +14,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.*; import java.util.logging.*; import java.util.regex.Pattern; +import java.util.stream.Collectors; import org.redkale.annotation.AutoLoad; import org.redkale.annotation.ConstructorParameters; import org.redkale.annotation.*; @@ -159,374 +160,324 @@ public final class CacheMemorySource extends AbstractCacheSource { return CompletableFuture.completedFuture(true); } - //----------- hxxx -------------- - @Override - public long hdel(final String key, String... fields) { - long count = 0; - CacheEntry entry = container.get(key); - if (entry == null || entry.mapValue == null) { - return 0; - } - for (String field : fields) { - if (entry.mapValue.remove(field) != null) { - count++; - } - } - return count; - } - - @Override - public List hkeys(final String key) { - List list = new ArrayList<>(); - CacheEntry entry = container.get(key); - if (entry == null || entry.mapValue == null) { - return list; - } - list.addAll(entry.mapValue.keySet()); - return list; - } - - @Override - public long hlen(final String key) { - CacheEntry entry = container.get(key); - if (entry == null || entry.mapValue == null) { - return 0; - } - return entry.mapValue.keySet().size(); - } - - @Override - public long hincr(final String key, String field) { - return hincrby(key, field, 1); - } - - @Override - public long hincrby(final String key, String field, long num) { - CacheEntry entry = container.get(key); - if (entry == null) { - containerLock.lock(); - try { - entry = container.get(key); - if (entry == null) { - ConcurrentHashMap map = new ConcurrentHashMap(); - map.put(field, new AtomicLong()); - entry = new CacheEntry(CacheEntryType.MAP, key, new AtomicLong(), null, null, map); - container.put(key, entry); - } - } finally { - containerLock.unlock(); - } - } - Serializable val = (Serializable) entry.mapValue.computeIfAbsent(field, f -> new AtomicLong()); - if (!(val instanceof AtomicLong)) { - entry.mapLock.lock(); - try { - if (!(val instanceof AtomicLong)) { - if (val == null) { - val = new AtomicLong(); - } else { - val = new AtomicLong(((Number) val).longValue()); - } - entry.mapValue.put(field, val); - } - } finally { - entry.mapLock.unlock(); - } - } - return ((AtomicLong) entry.mapValue.get(field)).addAndGet(num); - } - - @Override - public double hincrbyFloat(final String key, String field, double num) { - CacheEntry entry = container.get(key); - if (entry == null) { - containerLock.lock(); - try { - entry = container.get(key); - if (entry == null) { - ConcurrentHashMap map = new ConcurrentHashMap(); - map.put(field, new AtomicLong()); - entry = new CacheEntry(CacheEntryType.MAP, key, new AtomicLong(), null, null, map); - container.put(key, entry); - } - } finally { - containerLock.unlock(); - } - } - Serializable val = (Serializable) entry.mapValue.computeIfAbsent(field, f -> new AtomicLong()); - if (!(val instanceof AtomicLong)) { - entry.mapLock.lock(); - try { - if (!(val instanceof AtomicLong)) { - if (val == null) { - val = new AtomicLong(); - } else { - val = new AtomicLong(((Number) val).longValue()); - } - entry.mapValue.put(field, val); - } - } finally { - entry.mapLock.unlock(); - } - } - return Double.longBitsToDouble(((AtomicLong) entry.mapValue.get(field)).addAndGet(Double.doubleToLongBits(num))); - } - - @Override - public long hdecr(final String key, String field) { - return hincrby(key, field, -1); - } - - @Override - public long hdecrby(final String key, String field, long num) { - return hincrby(key, field, -num); - } - - @Override - public boolean hexists(final String key, String field) { - if (key == null) { - return false; - } - CacheEntry entry = container.get(key); - if (entry == null || entry.isExpired() || entry.mapValue == null) { - return false; - } - return entry.mapValue.contains(field); - } - - @Override - public void hset(final String key, final String field, final Convert convert, final Type type, final T value) { - hset(CacheEntryType.MAP, key, field, value); - } - - @Override - public boolean hsetnx(final String key, final String field, final Convert convert, final Type type, final T value) { - return hsetnx(CacheEntryType.MAP, key, field, value); - } - - @Override - public void hmset(final String key, final Serializable... values) { - for (int i = 0; i < values.length; i += 2) { - hset(CacheEntryType.MAP, key, (String) values[i], values[i + 1]); - } - } - - @Override - public void hmset(final String key, final Map map) { - map.forEach((k, v) -> hset(CacheEntryType.MAP, key, (String) k, v)); - } - - @Override - public List hmget(final String key, final Type type, final String... fields) { - if (key == null) { - return null; - } - CacheEntry entry = container.get(key); - if (entry == null || entry.isExpired() || entry.mapValue == null) { - return null; - } - List rs = new ArrayList<>(fields.length); - for (int i = 0; i < fields.length; i++) { - Serializable val = (Serializable) entry.mapValue.get(fields[i]); - if (type == String.class) { - rs.add(val == null ? null : (T) String.valueOf(val)); - } else { - rs.add((T) val); - } - } - return rs; - } - - @Override - public Map hgetall(final String key, final Type type) { - return hgetall(CacheEntryType.MAP, key); - } - - @Override - public List hvals(final String key, final Type type) { - return hvals(CacheEntryType.MAP, key); - } - - @Override - public Map hscan(final String key, final Type type, AtomicLong cursor, int limit, String pattern) { - if (key == null) { - return new HashMap(); - } - CacheEntry entry = container.get(key); - if (entry == null || entry.isExpired() || entry.mapValue == null) { - return new HashMap(); - } - return new HashMap(entry.mapValue); - } - - @Override - public T hget(final String key, final String field, final Type type) { - if (key == null) { - return null; - } - CacheEntry entry = container.get(key); - if (entry == null || entry.isExpired() || entry.mapValue == null) { - return null; - } - return (T) entry.mapValue.get(field); - } - - //----------- hxxx -------------- - @Override - public boolean exists(String key) { - if (key == null) { - return false; - } - CacheEntry entry = container.get(key); - if (entry == null) { - return false; - } - return !entry.isExpired(); - } - - @Override - public CompletableFuture existsAsync(final String key) { - return supplyAsync(() -> exists(key), getExecutor()); - } - - @Override - public T get(final String key, final Type type) { - if (key == null) { - return null; - } - CacheEntry entry = container.get(key); - if (entry == null || entry.isExpired()) { - return null; - } - if (entry.isListCacheType()) { - return (T) (entry.listValue == null ? null : new ArrayList(entry.listValue)); - } - if (entry.isSetCacheType()) { - return (T) (entry.csetValue == null ? null : new HashSet(entry.csetValue)); - } - return entry.cacheType == CacheEntryType.DOUBLE ? (T) (Double) Double.longBitsToDouble(((AtomicLong) entry.objectValue).intValue()) : (T) entry.objectValue; - } - //----------- hxxx -------------- @Override public CompletableFuture hdelAsync(final String key, String... fields) { - return supplyAsync(() -> hdel(key, fields), getExecutor()); + return supplyAsync(() -> { + long count = 0; + CacheEntry entry = container.get(key); + if (entry == null || entry.mapValue == null) { + return 0L; + } + for (String field : fields) { + if (entry.mapValue.remove(field) != null) { + count++; + } + } + return count; + }, getExecutor()); } @Override public CompletableFuture> hkeysAsync(final String key) { - return supplyAsync(() -> hkeys(key), getExecutor()); + return supplyAsync(() -> { + List list = new ArrayList<>(); + CacheEntry entry = container.get(key); + if (entry == null || entry.mapValue == null) { + return list; + } + list.addAll(entry.mapValue.keySet()); + return list; + }, getExecutor()); } @Override public CompletableFuture hlenAsync(final String key) { - return supplyAsync(() -> hlen(key), getExecutor()); + return supplyAsync(() -> { + CacheEntry entry = container.get(key); + if (entry == null || entry.mapValue == null) { + return 0L; + } + return (long) entry.mapValue.keySet().size(); + }, getExecutor()); } @Override public CompletableFuture hincrAsync(final String key, String field) { - return supplyAsync(() -> hincr(key, field), getExecutor()); + return hincrbyAsync(key, field, 1); } @Override public CompletableFuture hincrbyAsync(final String key, String field, long num) { - return supplyAsync(() -> hincrby(key, field, num), getExecutor()); + return supplyAsync(() -> { + CacheEntry entry = container.get(key); + if (entry == null) { + containerLock.lock(); + try { + entry = container.get(key); + if (entry == null) { + ConcurrentHashMap map = new ConcurrentHashMap(); + map.put(field, new AtomicLong()); + entry = new CacheEntry(CacheEntryType.MAP, key, new AtomicLong(), null, null, map); + container.put(key, entry); + } + } finally { + containerLock.unlock(); + } + } + Serializable val = (Serializable) entry.mapValue.computeIfAbsent(field, f -> new AtomicLong()); + if (!(val instanceof AtomicLong)) { + entry.mapLock.lock(); + try { + if (!(val instanceof AtomicLong)) { + if (val == null) { + val = new AtomicLong(); + } else { + val = new AtomicLong(((Number) val).longValue()); + } + entry.mapValue.put(field, val); + } + } finally { + entry.mapLock.unlock(); + } + } + return ((AtomicLong) entry.mapValue.get(field)).addAndGet(num); + }, getExecutor()); } @Override public CompletableFuture hincrbyFloatAsync(final String key, String field, double num) { - return supplyAsync(() -> hincrbyFloat(key, field, num), getExecutor()); + return supplyAsync(() -> { + CacheEntry entry = container.get(key); + if (entry == null) { + containerLock.lock(); + try { + entry = container.get(key); + if (entry == null) { + ConcurrentHashMap map = new ConcurrentHashMap(); + map.put(field, new AtomicLong()); + entry = new CacheEntry(CacheEntryType.MAP, key, new AtomicLong(), null, null, map); + container.put(key, entry); + } + } finally { + containerLock.unlock(); + } + } + Serializable val = (Serializable) entry.mapValue.computeIfAbsent(field, f -> new AtomicLong()); + if (!(val instanceof AtomicLong)) { + entry.mapLock.lock(); + try { + if (!(val instanceof AtomicLong)) { + if (val == null) { + val = new AtomicLong(); + } else { + val = new AtomicLong(((Number) val).longValue()); + } + entry.mapValue.put(field, val); + } + } finally { + entry.mapLock.unlock(); + } + } + return Double.longBitsToDouble(((AtomicLong) entry.mapValue.get(field)).addAndGet(Double.doubleToLongBits(num))); + }, getExecutor()); } @Override public CompletableFuture hdecrAsync(final String key, String field) { - return supplyAsync(() -> hdecr(key, field), getExecutor()); + return hincrbyAsync(key, field, -1); } @Override public CompletableFuture hdecrbyAsync(final String key, String field, long num) { - return supplyAsync(() -> hdecrby(key, field, num), getExecutor()); + return hincrbyAsync(key, field, -num); } @Override public CompletableFuture hexistsAsync(final String key, String field) { - return supplyAsync(() -> hexists(key, field), getExecutor()); + return supplyAsync(() -> { + if (key == null) { + return false; + } + CacheEntry entry = container.get(key); + if (entry == null || entry.isExpired() || entry.mapValue == null) { + return false; + } + return entry.mapValue.contains(field); + }, getExecutor()); } @Override public CompletableFuture hsetAsync(final String key, final String field, final Convert convert, final Type type, final T value) { - return runAsync(() -> hset(key, field, convert, type, value), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> { + hset(CacheEntryType.MAP, key, field, value); + }, getExecutor()); } @Override public CompletableFuture hsetnxAsync(final String key, final String field, final Convert convert, final Type type, final T value) { - return supplyAsync(() -> hsetnx(key, field, convert, type, value), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> { + return hsetnx(CacheEntryType.MAP, key, field, value); + }, getExecutor()); } @Override public CompletableFuture hmsetAsync(final String key, final Serializable... values) { - return runAsync(() -> hmset(key, values), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> { + for (int i = 0; i < values.length; i += 2) { + hset(CacheEntryType.MAP, key, (String) values[i], values[i + 1]); + } + }, getExecutor()); } @Override public CompletableFuture hmsetAsync(final String key, final Map map) { - return runAsync(() -> hmset(key, map), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> { + map.forEach((k, v) -> hset(CacheEntryType.MAP, key, (String) k, v)); + }, getExecutor()); } @Override public CompletableFuture> hmgetAsync(final String key, final Type type, final String... fields) { - return supplyAsync(() -> hmget(key, type, fields), getExecutor()); + return supplyAsync(() -> { + if (key == null) { + return null; + } + CacheEntry entry = container.get(key); + if (entry == null || entry.isExpired() || entry.mapValue == null) { + return null; + } + List rs = new ArrayList<>(fields.length); + for (int i = 0; i < fields.length; i++) { + Serializable val = (Serializable) entry.mapValue.get(fields[i]); + if (type == String.class) { + rs.add(val == null ? null : (T) String.valueOf(val)); + } else { + rs.add((T) val); + } + } + return rs; + }, getExecutor()); } @Override public CompletableFuture> hgetallAsync(final String key, final Type type) { - return supplyAsync(() -> hgetall(key, type), getExecutor()); + return supplyAsync(() -> { + return hgetall(CacheEntryType.MAP, key, type); + }, getExecutor()); } @Override public CompletableFuture> hvalsAsync(final String key, final Type type) { - return supplyAsync(() -> hvals(key, type), getExecutor()); + return supplyAsync(() -> { + return hvals(CacheEntryType.MAP, key, type); + }, getExecutor()); } @Override public CompletableFuture> hscanAsync(final String key, final Type type, AtomicLong cursor, int limit, String pattern) { - return supplyAsync(() -> hscan(key, type, cursor, limit, pattern), getExecutor()); + return supplyAsync(() -> { + if (key == null) { + return new HashMap(); + } + CacheEntry entry = container.get(key); + if (entry == null || entry.isExpired() || entry.mapValue == null) { + return new HashMap(); + } + if (Utility.isEmpty(pattern)) { + return new HashMap(entry.mapValue); + } else { + Predicate regx = Pattern.compile(pattern.replace("*", ".*")).asPredicate(); + Set> set = entry.mapValue.entrySet(); + return set.stream().filter(en -> regx.test(en.getKey())).collect(Collectors.toMap(en -> en.getKey(), en -> en.getValue())); + } + }, getExecutor()); } @Override public CompletableFuture hgetAsync(final String key, final String field, final Type type) { - return supplyAsync(() -> hget(key, field, type), getExecutor()); + return supplyAsync(() -> { + if (key == null) { + return null; + } + CacheEntry entry = container.get(key); + if (entry == null || entry.isExpired() || entry.mapValue == null) { + return null; + } + Object obj = entry.mapValue.get(field); + if (obj == null) { + return null; + } + if (type == long.class || type == Long.class) { + return (T) (obj instanceof Long ? obj : Long.parseLong(obj.toString())); + } + return (T) obj; + }, getExecutor()); } //----------- hxxx -------------- @Override - public CompletableFuture getAsync(final String key, final Type type) { - return supplyAsync(() -> (T) get(key, type), getExecutor()); + public CompletableFuture existsAsync(String key) { + return supplyAsync(() -> { + if (key == null) { + return false; + } + CacheEntry entry = container.get(key); + if (entry == null) { + return false; + } + return !entry.isExpired(); + }, getExecutor()); } @Override - public T getex(final String key, final int expireSeconds, final Type type) { - if (key == null) { - return null; - } - CacheEntry entry = container.get(key); - if (entry == null || entry.isExpired()) { - return null; - } - entry.lastAccessed = (int) (System.currentTimeMillis() / 1000); - entry.expireSeconds = expireSeconds; - if (entry.isListCacheType()) { - return (T) (entry.listValue == null ? null : new ArrayList(entry.listValue)); - } - if (entry.isSetCacheType()) { - return (T) (entry.csetValue == null ? null : new HashSet(entry.csetValue)); - } - return (T) entry.objectValue; + public CompletableFuture getAsync(final String key, final Type type) { + return supplyAsync(() -> { + if (key == null) { + return null; + } + CacheEntry entry = container.get(key); + if (entry == null || entry.isExpired()) { + return null; + } + if (entry.isListCacheType()) { + return (T) (entry.listValue == null ? null : new ArrayList(entry.listValue)); + } + if (entry.isSetCacheType()) { + return (T) (entry.csetValue == null ? null : new LinkedHashSet<>(entry.csetValue)); + } + if (entry.cacheType == CacheEntryType.DOUBLE) { + return (T) (Double) Double.longBitsToDouble(((AtomicLong) entry.objectValue).intValue()); + } + Object obj = entry.objectValue; + if (obj != null && obj.getClass() != type) { + return (T) JsonConvert.root().convertFrom(type, JsonConvert.root().convertToBytes(obj)); + } + return (T) obj; + }, getExecutor()); } + //----------- hxxx -------------- @Override public CompletableFuture getexAsync(final String key, final int expireSeconds, final Type type) { - return supplyAsync(() -> getex(key, expireSeconds, type), getExecutor()); + return supplyAsync(() -> { + if (key == null) { + return null; + } + CacheEntry entry = container.get(key); + if (entry == null || entry.isExpired()) { + return null; + } + entry.lastAccessed = (int) (System.currentTimeMillis() / 1000); + entry.expireSeconds = expireSeconds; + if (entry.isListCacheType()) { + return (T) (entry.listValue == null ? null : new ArrayList(entry.listValue)); + } + if (entry.isSetCacheType()) { + return (T) (entry.csetValue == null ? null : new HashSet(entry.csetValue)); + } + return (T) entry.objectValue; + }, getExecutor()); } protected void set(CacheEntryType cacheType, String key, Object value) { @@ -561,7 +512,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } protected void hset(CacheEntryType cacheType, String key, String field, Object value) { - if (key == null) { + if (key == null || value == null) { return; } CacheEntry entry = container.get(key); @@ -593,19 +544,25 @@ public final class CacheMemorySource extends AbstractCacheSource { } } - protected Map hgetall(CacheEntryType cacheType, String key) { + protected Map hgetall(CacheEntryType cacheType, String key, final Type type) { if (key == null) { return new LinkedHashMap(); } CacheEntry entry = container.get(key); if (entry == null) { return new LinkedHashMap(); + } else if (type == long.class || type == Long.class) { + Map map = new LinkedHashMap(); + entry.mapValue.forEach((k, v) -> { + map.put(k, v instanceof Long ? v : (v == null ? null : Long.parseLong(v.toString()))); + }); + return map; } else { return new LinkedHashMap(entry.mapValue); } } - protected List hvals(CacheEntryType cacheType, String key) { + protected List hvals(CacheEntryType cacheType, String key, final Type type) { if (key == null) { return new ArrayList(); } @@ -613,86 +570,77 @@ public final class CacheMemorySource extends AbstractCacheSource { if (entry == null) { return new ArrayList(); } else { - return new ArrayList(entry.mapValue.values()); - } - } - - @Override - public void mset(Serializable... keyVals) { - if (keyVals.length % 2 != 0) { - throw new SourceException("key value must be paired"); - } - for (int i = 0; i < keyVals.length; i += 2) { - String key = keyVals[i].toString(); - Object val = keyVals[i + 1]; - if (val instanceof String) { - set(CacheEntryType.STRING, key, val); - } else if (val instanceof Number) { - set(CacheEntryType.LONG, key, ((Number) val).longValue()); + if (type == long.class || type == Long.class) { + return entry.mapValue.values().stream().map(v -> v instanceof Long ? v : (v == null ? null : Long.parseLong(v.toString()))).toList(); } else { - set(CacheEntryType.OBJECT, key, val); + return new ArrayList(entry.mapValue.values()); } } } @Override - public void mset(Map map) { - map.forEach((key, val) -> { - if (val instanceof String) { - set(CacheEntryType.STRING, (String) key, val); - } else if (val instanceof Number) { - set(CacheEntryType.LONG, (String) key, ((Number) val).longValue()); - } else { - set(CacheEntryType.OBJECT, (String) key, val); + public CompletableFuture msetAsync(Serializable... keyVals) { + return runAsync(() -> { + if (keyVals.length % 2 != 0) { + throw new SourceException("key value must be paired"); } - }); + for (int i = 0; i < keyVals.length; i += 2) { + String key = keyVals[i].toString(); + Object val = keyVals[i + 1]; + if (val instanceof String) { + set(CacheEntryType.STRING, key, val); + } else if (val instanceof Number) { + set(CacheEntryType.LONG, key, ((Number) val).longValue()); + } else { + set(CacheEntryType.OBJECT, key, val); + } + } + }, getExecutor()); } @Override - public void set(String key, Convert convert, Type type, T value) { - set(findEntryType(type), key, value); - } - - @Override - public boolean setnx(String key, Convert convert, Type type, T value) { - return setnx(findEntryType(type), key, value); - } - - @Override - public boolean setnxex(String key, int expireSeconds, Convert convert, Type type, T value) { - return setnxex(findEntryType(type), expireSeconds, key, value); - } - - @Override - public T getSet(String key, Convert convert, Type type, T value) { - T old = get(key, type); - set(findEntryType(type), key, value); - return old; - } - - @Override - public CompletableFuture msetAsync(final Serializable... keyVals) { - return runAsync(() -> mset(keyVals), getExecutor()).whenComplete(futureCompleteConsumer); - } - - @Override - public CompletableFuture msetAsync(final Map map) { - return runAsync(() -> mset(map), getExecutor()).whenComplete(futureCompleteConsumer); + public CompletableFuture msetAsync(Map map) { + return runAsync(() -> { + map.forEach((key, val) -> { + if (val instanceof String) { + set(CacheEntryType.STRING, (String) key, val); + } else if (val instanceof Number) { + set(CacheEntryType.LONG, (String) key, ((Number) val).longValue()); + } else { + set(CacheEntryType.OBJECT, (String) key, val); + } + }); + }, getExecutor()); } @Override public CompletableFuture setAsync(String key, Convert convert, Type type, T value) { - return runAsync(() -> set(key, convert, type, value), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> { + set(findEntryType(type), key, value); + }, getExecutor()); } @Override public CompletableFuture setnxAsync(String key, Convert convert, Type type, T value) { - return supplyAsync(() -> setnx(key, convert, type, value), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> { + return setnx(findEntryType(type), key, value); + }, getExecutor()); + } + + @Override + public CompletableFuture setnxexAsync(String key, int expireSeconds, Convert convert, Type type, T value) { + return supplyAsync(() -> { + return setnxex(findEntryType(type), expireSeconds, key, value); + }, getExecutor()); } @Override public CompletableFuture getSetAsync(String key, Convert convert, Type type, T value) { - return runAsync(() -> getSet(key, convert, type, value), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> { + T old = get(key, type); + set(findEntryType(type), key, value); + return old; + }, getExecutor()); } protected void set(CacheEntryType cacheType, int expireSeconds, String key, Object value) { @@ -730,406 +678,339 @@ public final class CacheMemorySource extends AbstractCacheSource { } } - @Override - public void setex(String key, int expireSeconds, Convert convert, Type type, T value) { - set(findEntryType(type), expireSeconds, key, value); - } - - @Override - public CompletableFuture setexAsync(String key, int expireSeconds, Type type, T value) { - return runAsync(() -> setex(key, expireSeconds, type, value), getExecutor()).whenComplete(futureCompleteConsumer); - } - @Override public CompletableFuture setexAsync(String key, int expireSeconds, Convert convert, Type type, T value) { - return runAsync(() -> setex(key, expireSeconds, convert, type, value), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> { + set(findEntryType(type), expireSeconds, key, value); + }, getExecutor()); } @Override - public CompletableFuture setnxexAsync(final String key, final int expireSeconds, final Type type, final T value) { - return supplyAsync(() -> setnxex(key, expireSeconds, type, value), getExecutor()).whenComplete(futureCompleteConsumer); + public CompletableFuture expireAsync(String key, int expireSeconds) { + return runAsync(() -> { + if (key == null) { + return; + } + CacheEntry entry = container.get(key); + if (entry == null) { + return; + } + entry.expireSeconds = expireSeconds; + }, getExecutor()); } @Override - public CompletableFuture setnxexAsync(final String key, final int expireSeconds, final Convert convert, final Type type, final T value) { - return supplyAsync(() -> setnxex(key, expireSeconds, convert, type, value), getExecutor()).whenComplete(futureCompleteConsumer); - } - - @Override - public void expire(String key, int expireSeconds) { - if (key == null) { - return; - } - CacheEntry entry = container.get(key); - if (entry == null) { - return; - } - entry.expireSeconds = expireSeconds; - } - - @Override - public boolean persist(final String key) { - if (key == null) { - return false; - } - CacheEntry entry = container.get(key); - if (entry == null) { - return false; - } - entry.expireSeconds = 0; - return true; - } - - @Override - public boolean rename(String oldKey, String newKey) { - if (oldKey == null || newKey == null) { - return false; - } - CacheEntry entry = container.get(oldKey); - if (entry == null) { - return false; - } - entry.key = newKey; - container.put(newKey, entry); - container.remove(oldKey); - return true; - } - - @Override - public boolean renamenx(String oldKey, String newKey) { - if (oldKey == null || newKey == null) { - return false; - } - if (container.containsKey(newKey)) { - return false; - } - CacheEntry entry = container.get(oldKey); - if (entry == null) { - return false; - } - entry.key = newKey; - container.put(newKey, entry); - container.remove(oldKey); - return true; - } - - @Override - public CompletableFuture expireAsync(final String key, final int expireSeconds) { - return runAsync(() -> expire(key, expireSeconds), getExecutor()).whenComplete(futureCompleteConsumer); - } - - @Override - public CompletableFuture persistAsync(String key) { - return supplyAsync(() -> persist(key), getExecutor()).whenComplete(futureCompleteConsumer); + public CompletableFuture persistAsync(final String key) { + return supplyAsync(() -> { + if (key == null) { + return false; + } + CacheEntry entry = container.get(key); + if (entry == null) { + return false; + } + if (entry.expireSeconds > 0) { + entry.expireSeconds = 0; + return true; + } else { + return false; + } + }, getExecutor()); } @Override public CompletableFuture renameAsync(String oldKey, String newKey) { - return supplyAsync(() -> rename(oldKey, newKey), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> { + if (oldKey == null || newKey == null) { + return false; + } + CacheEntry entry = container.get(oldKey); + if (entry == null) { + return false; + } + entry.key = newKey; + container.put(newKey, entry); + container.remove(oldKey); + return true; + }, getExecutor()); } @Override public CompletableFuture renamenxAsync(String oldKey, String newKey) { - return supplyAsync(() -> renamenx(oldKey, newKey), getExecutor()).whenComplete(futureCompleteConsumer); - } - - @Override - public long del(final String... keys) { - if (keys == null) { - return 0; - } - int count = 0; - for (String key : keys) { - count += container.remove(key) == null ? 0 : 1; - } - return count; - } - - @Override - public long incr(final String key) { - return incrby(key, 1); - } - - @Override - public CompletableFuture incrAsync(final String key) { - return supplyAsync(() -> incr(key), getExecutor()).whenComplete(futureCompleteConsumer); - } - - @Override - public long incrby(final String key, long num) { - CacheEntry entry = container.get(key); - if (entry == null) { - containerLock.lock(); - try { - entry = container.get(key); - if (entry == null) { - entry = new CacheEntry(CacheEntryType.ATOMIC, key, new AtomicLong(), null, null, null); - container.put(key, entry); - } - } finally { - containerLock.unlock(); + return supplyAsync(() -> { + if (oldKey == null || newKey == null) { + return false; } - } - return ((AtomicLong) entry.objectValue).addAndGet(num); - } - - @Override - public double incrbyFloat(final String key, double num) { - CacheEntry entry = container.get(key); - if (entry == null) { - containerLock.lock(); - try { - entry = container.get(key); - if (entry == null) { - entry = new CacheEntry(CacheEntryType.DOUBLE, key, new AtomicLong(), null, null, null); - container.put(key, entry); - } - } finally { - containerLock.unlock(); + if (container.containsKey(newKey)) { + return false; } - } - Long v = ((AtomicLong) entry.objectValue).addAndGet(Double.doubleToLongBits(num)); - return Double.longBitsToDouble(v.intValue()); - } - - @Override - public CompletableFuture incrbyAsync(final String key, long num) { - return supplyAsync(() -> incrby(key, num), getExecutor()).whenComplete(futureCompleteConsumer); - } - - @Override - public CompletableFuture incrbyFloatAsync(final String key, double num) { - return supplyAsync(() -> incrbyFloat(key, num), getExecutor()).whenComplete(futureCompleteConsumer); - } - - @Override - public long decr(final String key) { - return incrby(key, -1); - } - - @Override - public CompletableFuture decrAsync(final String key) { - return supplyAsync(() -> decr(key), getExecutor()).whenComplete(futureCompleteConsumer); - } - - @Override - public long decrby(final String key, long num) { - return incrby(key, -num); - } - - @Override - public CompletableFuture decrbyAsync(final String key, long num) { - return supplyAsync(() -> decrby(key, num), getExecutor()).whenComplete(futureCompleteConsumer); + CacheEntry entry = container.get(oldKey); + if (entry == null) { + return false; + } + entry.key = newKey; + container.put(newKey, entry); + container.remove(oldKey); + return true; + }, getExecutor()); } @Override public CompletableFuture delAsync(final String... keys) { - return supplyAsync(() -> del(keys), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> { + if (keys == null) { + return 0L; + } + long count = 0; + for (String key : keys) { + count += container.remove(key) == null ? 0 : 1; + } + return count; + }, getExecutor()); + } + + @Override + public CompletableFuture incrAsync(final String key) { + return incrbyAsync(key, 1); + } + + @Override + public CompletableFuture incrbyAsync(final String key, long num) { + return supplyAsync(() -> { + CacheEntry entry = container.get(key); + if (entry == null) { + containerLock.lock(); + try { + entry = container.get(key); + if (entry == null) { + entry = new CacheEntry(CacheEntryType.ATOMIC, key, new AtomicLong(), null, null, null); + container.put(key, entry); + } + } finally { + containerLock.unlock(); + } + } + if (!(entry.objectValue instanceof AtomicLong)) { + containerLock.lock(); + try { + if (!(entry.objectValue instanceof AtomicLong)) { + entry.objectValue = new AtomicLong(Long.parseLong(entry.objectValue.toString())); + } + } finally { + containerLock.unlock(); + } + } + return ((AtomicLong) entry.objectValue).addAndGet(num); + }, getExecutor()); + } + + @Override + public CompletableFuture incrbyFloatAsync(final String key, double num) { + return supplyAsync(() -> { + CacheEntry entry = container.get(key); + if (entry == null) { + containerLock.lock(); + try { + entry = container.get(key); + if (entry == null) { + entry = new CacheEntry(CacheEntryType.DOUBLE, key, new AtomicLong(), null, null, null); + container.put(key, entry); + } + } finally { + containerLock.unlock(); + } + } + Long v = ((AtomicLong) entry.objectValue).addAndGet(Double.doubleToLongBits(num)); + return Double.longBitsToDouble(v.intValue()); + }, getExecutor()); + } + + @Override + public CompletableFuture decrAsync(final String key) { + return incrbyAsync(key, -1); + } + + @Override + public CompletableFuture decrbyAsync(final String key, long num) { + return incrbyAsync(key, -num); } @Override public CompletableFuture> sdiffAsync(final String key, final Type componentType, final String... key2s) { - return supplyAsync(() -> sdiff(key, componentType, key2s), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> { + Set rs = new HashSet<>(); + CacheEntry entry = container.get(key); + if (entry == null || entry.csetValue == null) { + return rs; + } + rs.addAll(entry.csetValue); + for (String k : key2s) { + CacheEntry en2 = container.get(k); + if (en2 != null && en2.csetValue != null) { + en2.csetValue.forEach(v -> rs.remove(v)); + } + } + return rs; + }, getExecutor()); } @Override public CompletableFuture sdiffstoreAsync(final String key, final String srcKey, final String... srcKey2s) { - return supplyAsync(() -> sdiffstore(key, srcKey, srcKey2s), getExecutor()).whenComplete(futureCompleteConsumer); - } - - @Override - public CompletableFuture> smismembersAsync(final String key, final String... members) { - return supplyAsync(() -> smismembers(key, members), getExecutor()).whenComplete(futureCompleteConsumer); - } - - @Override - public Set sdiff(final String key, final Type componentType, final String... key2s) { - Set rs = new HashSet<>(); - CacheEntry entry = container.get(key); - if (entry == null || entry.csetValue == null) { - return rs; - } - rs.addAll(entry.csetValue); - for (String k : key2s) { - CacheEntry en2 = container.get(k); - if (en2 != null && en2.csetValue != null) { - en2.csetValue.forEach(v -> rs.remove(v)); + return supplyAsync(() -> { + Set rs = sdiff(srcKey, Object.class, srcKey2s); + if (container.containsKey(key)) { + Set set = container.get(srcKey).csetValue; + set.clear(); + set.addAll(rs); + } else { + appendSetItem(CacheEntryType.SET_OBJECT, key, rs); } - } - return rs; - } - - @Override - public long sdiffstore(final String key, final String srcKey, final String... srcKey2s) { - Set rs = sdiff(srcKey, Object.class, srcKey2s); - if (container.containsKey(key)) { - Set set = container.get(srcKey).csetValue; - set.clear(); - set.addAll(rs); - } else { - appendSetItem(CacheEntryType.SET_OBJECT, key, rs); - } - return rs.size(); + return (long) rs.size(); + }, getExecutor()); } @Override public CompletableFuture> sinterAsync(final String key, final Type componentType, final String... key2s) { - return supplyAsync(() -> sinter(key, componentType, key2s), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> { + Set rs = new HashSet<>(); + CacheEntry entry = container.get(key); + if (entry == null || entry.csetValue == null) { + return rs; + } + rs.addAll(entry.csetValue); + for (String k : key2s) { + CacheEntry en2 = container.get(k); + if (en2 != null && en2.csetValue != null) { + Set removes = new HashSet<>(); + for (T v : rs) { + if (!en2.csetValue.contains(v)) { + removes.add(v); + } + } + rs.removeAll(removes); + } else { + rs.clear(); + return rs; + } + } + return rs; + }, getExecutor()); } @Override public CompletableFuture sinterstoreAsync(final String key, final String srcKey, final String... srcKey2s) { - return supplyAsync(() -> sinterstore(key, srcKey, srcKey2s), getExecutor()).whenComplete(futureCompleteConsumer); - } - - @Override - public Set sinter(final String key, final Type componentType, final String... key2s) { - Set rs = new HashSet<>(); - CacheEntry entry = container.get(key); - if (entry == null || entry.csetValue == null) { - return rs; - } - rs.addAll(entry.csetValue); - for (String k : key2s) { - CacheEntry en2 = container.get(k); - if (en2 != null && en2.csetValue != null) { - Set removes = new HashSet<>(); - for (T v : rs) { - if (!en2.csetValue.contains(v)) { - removes.add(v); - } - } - rs.removeAll(removes); + return supplyAsync(() -> { + Set rs = sinter(srcKey, Object.class, srcKey2s); + if (container.containsKey(key)) { + Set set = container.get(srcKey).csetValue; + set.clear(); + set.addAll(rs); } else { - rs.clear(); - return rs; + appendSetItem(CacheEntryType.SET_OBJECT, key, rs); } - } - return rs; + return (long) rs.size(); + }, getExecutor()); } @Override - public long sinterstore(final String key, final String srcKey, final String... srcKey2s) { - Set rs = sinter(srcKey, Object.class, srcKey2s); - if (container.containsKey(key)) { - Set set = container.get(srcKey).csetValue; - set.clear(); - set.addAll(rs); - } else { - appendSetItem(CacheEntryType.SET_OBJECT, key, rs); - } - return rs.size(); + public CompletableFuture> smembersAsync(final String key, final Type componentType) { + return getAsync(key, componentType); } @Override - public Set smembers(final String key, final Type componentType) { - return (Set) get(key, componentType); + public CompletableFuture> lrangeAsync(final String key, final Type componentType, int start, int stop) { + return getAsync(key, componentType); } @Override - public List lrange(final String key, final Type componentType, int start, int stop) { - return (List) get(key, componentType); - } - - @Override - public Map> smembers(final Type componentType, final String... keys) { - Map> map = new HashMap<>(); - for (String key : keys) { - Set s = (Set) get(key, componentType); - if (s != null) { - map.put(key, s); + public CompletableFuture>> smembersAsync(final Type componentType, final String... keys) { + return supplyAsync(() -> { + Map> map = new HashMap<>(); + for (String key : keys) { + Set s = (Set) get(key, componentType); + if (s != null) { + map.put(key, s); + } } - } - return map; + return map; + }, getExecutor()); } @Override - public List smismembers(final String key, final String... members) { - Set s = (Set) get(key, Object.class); - List rs = new ArrayList<>(); - for (String member : members) { - rs.add(s != null && s.contains(member)); - } - return rs; - } - - @Override - public Map> lrange(final Type componentType, final String... keys) { - Map> map = new HashMap<>(); - for (String key : keys) { - List s = (List) get(key, componentType); - if (s != null) { - map.put(key, s); + public CompletableFuture> smismembersAsync(final String key, final String... members) { + return supplyAsync(() -> { + Set s = (Set) get(key, Object.class); + List rs = new ArrayList<>(); + for (String member : members) { + rs.add(s != null && s.contains(member)); } - } - return map; + return rs; + }, getExecutor()); } @Override - public Map mget(final Type componentType, final String... keys) { - Map map = new LinkedHashMap<>(); - for (String key : keys) { - map.put(key, (T) get(key, componentType)); - } - return map; + public CompletableFuture>> lrangeAsync(final Type componentType, final String... keys) { + return supplyAsync(() -> { + Map> map = new HashMap<>(); + for (String key : keys) { + List s = (List) get(key, componentType); + if (s != null) { + map.put(key, s); + } + } + return map; + }, getExecutor()); } @Override public CompletableFuture> mgetAsync(final Type componentType, final String... keys) { - return CompletableFuture.completedFuture(mget(componentType, keys)); - } - - @Override - public CompletableFuture>> lrangeAsync(Type componentType, String... keys) { - return supplyAsync(() -> lrange(componentType, keys), getExecutor()); - } - - @Override - public CompletableFuture>> smembersAsync(Type componentType, String... keys) { - return supplyAsync(() -> smembers(componentType, keys), getExecutor()); - } - - @Override - public CompletableFuture> smembersAsync(String key, Type componentType) { - return supplyAsync(() -> smembers(key, componentType), getExecutor()); - } - - @Override - public CompletableFuture> lrangeAsync(String key, Type componentType, int start, int stop) { - return supplyAsync(() -> lrange(key, componentType, start, stop), getExecutor()); - } - - @Override - public long llen(final String key) { - Collection collection = (Collection) get(key, Object.class); - return collection == null ? 0 : collection.size(); - } - - @Override - public long scard(final String key) { - Collection collection = (Collection) get(key, Object.class); - return collection == null ? 0 : collection.size(); + return supplyAsync(() -> { + Map map = new LinkedHashMap<>(); + for (String key : keys) { + Object v = get(key, componentType); + if (v != null) { + if (componentType == String.class) { + map.put(key, (T) v.toString()); + } else if (componentType == long.class || componentType == Long.class) { + map.put(key, (T) (Object) ((Number) v).longValue()); + } else { + map.put(key, (T) v); + } + } + } + return map; + }, getExecutor()); } @Override public CompletableFuture llenAsync(final String key) { - return supplyAsync(() -> llen(key), getExecutor()); + return supplyAsync(() -> { + Collection collection = (Collection) get(key, Object.class); + return collection == null ? 0L : collection.size(); + }, getExecutor()); + } + + @Override + public CompletableFuture saddAsync(final String key, final Type componentType, T... values) { + return runAsync(() -> { + appendSetItem(componentType == String.class ? CacheEntryType.SET_STRING : CacheEntryType.SET_OBJECT, key, List.of(values)); + }, getExecutor()); } @Override public CompletableFuture scardAsync(final String key) { - return supplyAsync(() -> scard(key), getExecutor()); - } - - @Override - public boolean sismember(final String key, final Type type, final T value) { - Collection list = get(key, type); - return list != null && list.contains(value); + return supplyAsync(() -> { + Collection collection = (Collection) get(key, Object.class); + return collection == null ? 0L : collection.size(); + }, getExecutor()); } @Override public CompletableFuture sismemberAsync(final String key, final Type type, final T value) { - return supplyAsync(() -> sismember(key, type, value), getExecutor()); + return supplyAsync(() -> { + Collection list = get(key, type); + return list != null && list.contains(value); + }, getExecutor()); } protected void appendListItem(CacheEntryType cacheType, String key, Object... values) { @@ -1169,266 +1050,220 @@ public final class CacheMemorySource extends AbstractCacheSource { } @Override - public void lpush(final String key, final Type componentType, T... values) { - for (T value : values) { - appendListItem(CacheEntryType.LIST_OBJECT, false, key, value); - } - } - - @Override - public CompletableFuture lpushAsync(final String key, final Type componentType, final T... values) { - return runAsync(() -> lpush(key, componentType, values), getExecutor()).whenComplete(futureCompleteConsumer); - } - - @Override - public void lpushx(final String key, final Type componentType, T... values) { - if (container.containsKey(key)) { + public CompletableFuture lpushAsync(final String key, final Type componentType, T... values) { + return runAsync(() -> { for (T value : values) { appendListItem(CacheEntryType.LIST_OBJECT, false, key, value); } - } + }, getExecutor()); } @Override - public CompletableFuture lpushxAsync(final String key, final Type componentType, final T... values) { - return runAsync(() -> lpushx(key, componentType, values), getExecutor()).whenComplete(futureCompleteConsumer); - } - - @Override - public T lpop(final String key, final Type componentType) { - if (key == null) { - return null; - } - CacheEntry entry = container.get(key); - if (entry == null || !entry.isListCacheType() || entry.listValue == null) { - return null; - } - if (entry.listValue.isEmpty()) { - return null; - } - Object obj = entry.listValue.pollFirst(); - if (obj != null && componentType == long.class) { - obj = ((Number) obj).longValue(); - } - return (T) obj; - } - - @Override - public void ltrim(final String key, int start, int stop) { - if (key == null) { - return; - } - CacheEntry entry = container.get(key); - if (entry == null || !entry.isListCacheType() || entry.listValue == null) { - return; - } - if (entry.listValue.isEmpty()) { - return; - } - Iterator it = entry.listValue.iterator(); - int index = -1; - int end = stop >= 0 ? stop : entry.listValue.size() + stop; - while (it.hasNext()) { - ++index; - if (index > end) { - break; - } else if (index >= start) { - it.remove(); + public CompletableFuture lpushxAsync(final String key, final Type componentType, T... values) { + return runAsync(() -> { + if (container.containsKey(key)) { + for (T value : values) { + appendListItem(CacheEntryType.LIST_OBJECT, false, key, value); + } } - } - } - - @Override - public CompletableFuture ltrimAsync(final String key, int start, int stop) { - return runAsync(() -> ltrim(key, start, stop), getExecutor()).whenComplete(futureCompleteConsumer); + }, getExecutor()); } @Override public CompletableFuture lpopAsync(final String key, final Type componentType) { - return supplyAsync(() -> lpop(key, componentType), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> { + if (key == null) { + return null; + } + CacheEntry entry = container.get(key); + if (entry == null || !entry.isListCacheType() || entry.listValue == null) { + return null; + } + if (entry.listValue.isEmpty()) { + return null; + } + Object obj = entry.listValue.pollFirst(); + if (obj != null && componentType == long.class) { + obj = ((Number) obj).longValue(); + } + return (T) obj; + }, getExecutor()); } @Override - public T rpoplpush(final String list1, final String list2, final Type componentType) { - T val = rpop(list1, componentType); - lpush(list2, componentType, val); - return val; + public CompletableFuture ltrimAsync(final String key, int start, int stop) { + return runAsync(() -> { + if (key == null) { + return; + } + CacheEntry entry = container.get(key); + if (entry == null || !entry.isListCacheType() || entry.listValue == null) { + return; + } + if (entry.listValue.isEmpty()) { + return; + } + Iterator it = entry.listValue.iterator(); + int index = -1; + int end = stop >= 0 ? stop : entry.listValue.size() + stop; + while (it.hasNext()) { + ++index; + if (index > end) { + break; + } else if (index >= start) { + it.remove(); + } + } + }, getExecutor()); } @Override public CompletableFuture rpoplpushAsync(final String key, final String key2, final Type componentType) { - return supplyAsync(() -> rpoplpush(key, key2, componentType), getExecutor()).whenComplete(futureCompleteConsumer); - } - - @Override - public T rpop(final String key, final Type componentType) { - if (key == null) { - return null; - } - CacheEntry entry = container.get(key); - if (entry == null || !entry.isListCacheType() || entry.listValue == null) { - return null; - } - if (entry.listValue.isEmpty()) { - return null; - } - Object obj = entry.listValue.pollLast(); - if (obj != null && componentType == long.class) { - obj = ((Number) obj).longValue(); - } - return (T) obj; + return supplyAsync(() -> { + T val = rpop(key, componentType); + lpush(key2, componentType, val); + return val; + }, getExecutor()); } @Override public CompletableFuture rpopAsync(final String key, final Type componentType) { - return supplyAsync(() -> rpop(key, componentType), getExecutor()).whenComplete(futureCompleteConsumer); - } - - @Override - public void rpushx(String key, Type componentType, T... values) { - if (container.containsKey(key)) { - for (T value : values) { - appendListItem(CacheEntryType.LIST_OBJECT, key, value); + return supplyAsync(() -> { + if (key == null) { + return null; } - } + CacheEntry entry = container.get(key); + if (entry == null || !entry.isListCacheType() || entry.listValue == null) { + return null; + } + if (entry.listValue.isEmpty()) { + return null; + } + Object obj = entry.listValue.pollLast(); + if (obj != null && componentType == long.class) { + obj = ((Number) obj).longValue(); + } + return (T) obj; + }, getExecutor()); } @Override public CompletableFuture rpushxAsync(final String key, final Type componentType, final T... values) { - return runAsync(() -> rpushx(key, componentType, values), getExecutor()).whenComplete(futureCompleteConsumer); - } - - @Override - public void rpush(String key, Type componentType, T... values) { - appendListItem(CacheEntryType.LIST_OBJECT, key, values); + return runAsync(() -> { + if (container.containsKey(key)) { + for (T value : values) { + appendListItem(CacheEntryType.LIST_OBJECT, key, value); + } + } + }, getExecutor()); } @Override public CompletableFuture rpushAsync(final String key, final Type componentType, final T... values) { - return runAsync(() -> rpush(key, componentType, values), getExecutor()).whenComplete(futureCompleteConsumer); + return runAsync(() -> { + appendListItem(CacheEntryType.LIST_OBJECT, key, values); + }, getExecutor()); } @Override - public int lrem(String key, final Type componentType, T value) { - if (key == null) { - return 0; - } - CacheEntry entry = container.get(key); - if (entry == null || entry.listValue == null) { - return 0; - } - return entry.listValue.remove(value) ? 1 : 0; + public CompletableFuture lremAsync(final String key, final Type componentType, T value) { + return supplyAsync(() -> { + if (key == null) { + return 0L; + } + CacheEntry entry = container.get(key); + if (entry == null || entry.listValue == null) { + return 0L; + } + return entry.listValue.remove(value) ? 1L : 0L; + }, getExecutor()); } @Override - public int lremString(String key, String value) { - if (key == null) { - return 0; - } - CacheEntry entry = container.get(key); - if (entry == null || entry.listValue == null) { - return 0; - } - return entry.listValue.remove(value) ? 1 : 0; - } - - @Override - public int lremLong(String key, long value) { - if (key == null) { - return 0; - } - CacheEntry entry = container.get(key); - if (entry == null || entry.listValue == null) { - return 0; - } - return entry.listValue.remove(value) ? 1 : 0; - } - - @Override - public CompletableFuture lremAsync(final String key, final Type componentType, T value) { - return supplyAsync(() -> lrem(key, componentType, value), getExecutor()).whenComplete(futureCompleteConsumer); - } - - @Override - public T spop(final String key, final Type componentType) { - if (key == null) { + public CompletableFuture spopAsync(final String key, final Type componentType) { + return supplyAsync(() -> { + if (key == null) { + return null; + } + CacheEntry entry = container.get(key); + if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) { + return null; + } + if (entry.csetValue.isEmpty()) { + return null; + } + Iterator it = entry.csetValue.iterator(); + Object del = null; + if (it.hasNext()) { + Object obj = it.next(); + if (obj != null && componentType == long.class) { + obj = ((Number) obj).longValue(); + } + del = obj; + } + if (del != null) { + entry.csetValue.remove(del); + return (T) del; + } return null; - } - CacheEntry entry = container.get(key); - if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) { - return null; - } - if (entry.csetValue.isEmpty()) { - return null; - } - Iterator it = entry.csetValue.iterator(); - if (it.hasNext()) { - Object obj = it.next(); - if (obj != null && componentType == long.class) { - obj = ((Number) obj).longValue(); - } - it.remove(); - return (T) obj; - } - return null; + }, getExecutor()); } @Override - public Set spop(final String key, final int count, final Type componentType) { - if (key == null) { - return new LinkedHashSet<>(); - } - CacheEntry entry = container.get(key); - if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) { - return new LinkedHashSet<>(); - } - if (entry.csetValue.isEmpty()) { - return new LinkedHashSet<>(); - } - Iterator it = entry.csetValue.iterator(); - Set list = new LinkedHashSet<>(); - int index = 0; - while (it.hasNext()) { - Object obj = it.next(); - if (obj != null && componentType == long.class) { - obj = ((Number) obj).longValue(); + public CompletableFuture> spopAsync(final String key, final int count, final Type componentType) { + return supplyAsync(() -> { + if (key == null) { + return new LinkedHashSet<>(); } - list.add((T) obj); - it.remove(); - if (++index >= count) { - break; + CacheEntry entry = container.get(key); + if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) { + return new LinkedHashSet<>(); } - } - return list; + if (entry.csetValue.isEmpty()) { + return new LinkedHashSet<>(); + } + Iterator it = entry.csetValue.iterator(); + Set list = new LinkedHashSet<>(); + int index = 0; + while (it.hasNext()) { + Object obj = it.next(); + if (obj != null && componentType == long.class) { + obj = ((Number) obj).longValue(); + } + list.add((T) obj); + if (++index >= count) { + break; + } + } + entry.csetValue.removeAll(list); + return list; + }, getExecutor()); } @Override - public Set< T> sscan(final String key, final Type componentType, AtomicLong cursor, int limit, String pattern) { - if (key == null) { - return new LinkedHashSet(); - } - CacheEntry entry = container.get(key); - if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) { - return new LinkedHashSet<>(); - } - if (entry.csetValue.isEmpty()) { - return new LinkedHashSet<>(); - } - Iterator it = entry.csetValue.iterator(); - Set list = new LinkedHashSet<>(); - int index = 0; - while (it.hasNext()) { - Object obj = it.next(); - if (obj != null && componentType == long.class) { - obj = ((Number) obj).longValue(); + public CompletableFuture> sscanAsync(final String key, final Type componentType, AtomicLong cursor, int limit, String pattern) { + return supplyAsync(() -> { + if (key == null) { + return new LinkedHashSet(); } - list.add((T) obj); - it.remove(); - if (limit > 0 && ++index >= limit) { - break; + CacheEntry entry = container.get(key); + if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) { + return new LinkedHashSet<>(); } - } - return list; + if (entry.csetValue.isEmpty()) { + return new LinkedHashSet<>(); + } + Iterator it = entry.csetValue.iterator(); + Set list = new LinkedHashSet<>(); + while (it.hasNext()) { + Object obj = it.next(); + if (obj != null && componentType == long.class) { + obj = ((Number) obj).longValue(); + } + list.add((T) obj); + } + return list; + }, getExecutor()); } protected void appendSetItem(CacheEntryType cacheType, String key, Collection values) { @@ -1452,75 +1287,85 @@ public final class CacheMemorySource extends AbstractCacheSource { } @Override - public void zadd(String key, CacheScoredValue... values) { - List list = new ArrayList<>(); - for (CacheScoredValue v : values) { - list.add(new CacheScoredValue.NumberScoredValue(v)); - } - appendSetItem(CacheEntryType.SET_SORTED, key, list); - } - - @Override - public long zcard(String key) { - if (key == null) { - return 0L; - } - CacheEntry entry = container.get(key); - if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) { - return 0L; - } - return entry.csetValue.size(); - } - - @Override - public long zrem(String key, String... members) { - if (key == null) { - return 0L; - } - CacheEntry entry = container.get(key); - if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) { - return 0L; - } - Set sets = entry.csetValue; - long c = 0; - Set keys = Set.of(members); - Iterator it = sets.iterator(); - while (it.hasNext()) { - CacheScoredValue v = it.next(); - if (keys.contains(v.getValue())) { - c++; - it.remove(); + public CompletableFuture zaddAsync(String key, CacheScoredValue... values) { + return runAsync(() -> { + List list = new ArrayList<>(); + for (CacheScoredValue v : values) { + list.add(new CacheScoredValue.NumberScoredValue(v)); } - } - return c; + appendSetItem(CacheEntryType.SET_SORTED, key, list); + }, getExecutor()); } @Override - public List zmscore(String key, Class scoreType, String... members) { - List list = new ArrayList<>(); - if (key == null) { - for (int i = 0; i < members.length; i++) { - list.add(null); + public CompletableFuture zcardAsync(String key) { + return supplyAsync(() -> { + if (key == null) { + return 0L; + } + CacheEntry entry = container.get(key); + if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) { + return 0L; + } + return (long) entry.csetValue.size(); + }, getExecutor()); + } + + @Override + public CompletableFuture zremAsync(String key, String... members) { + return supplyAsync(() -> { + if (key == null) { + return 0L; + } + CacheEntry entry = container.get(key); + if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) { + return 0L; + } + Set sets = entry.csetValue; + long c = 0; + Set keys = Set.of(members); + Iterator it = sets.iterator(); + Set dels = new HashSet<>(); + while (it.hasNext()) { + CacheScoredValue v = it.next(); + if (keys.contains(v.getValue())) { + c++; + dels.add(v); + } + } + sets.removeAll(dels); + return c; + }, getExecutor()); + } + + @Override + public CompletableFuture> zmscoreAsync(String key, Class scoreType, String... members) { + return supplyAsync(() -> { + List list = new ArrayList<>(); + if (key == null) { + for (int i = 0; i < members.length; i++) { + list.add(null); + } + return list; + } + CacheEntry entry = container.get(key); + if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) { + for (int i = 0; i < members.length; i++) { + list.add(null); + } + return list; + } + Set keys = Set.of(members); + Set sets = entry.csetValue; + Map map = new HashMap<>(); + sets.stream().filter(v -> keys.contains(v.getValue())).forEach(v -> { + map.put(v.getValue(), formatScore(scoreType, v.getScore())); + }); + for (String m : members) { + list.add(map.get(m)); } return list; - } - CacheEntry entry = container.get(key); - if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) { - for (int i = 0; i < members.length; i++) { - list.add(null); - } - return list; - } - Set keys = Set.of(members); - Set sets = entry.csetValue; - Map map = new HashMap<>(); - sets.stream().filter(v -> keys.contains(v.getValue())).forEach(v -> { - map.put(v.getValue(), formatScore(scoreType, v.getScore())); - }); - for (String m : members) { - list.add(map.get(m)); - } - return list; + }, getExecutor()); } private T formatScore(Class scoreType, Number score) { @@ -1538,157 +1383,93 @@ public final class CacheMemorySource extends AbstractCacheSource { } @Override - public T zscore(String key, Class scoreType, String member) { - if (key == null) { - return null; - } - CacheEntry entry = container.get(key); - if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) { - return null; - } - Set sets = entry.csetValue; - return (T) sets.stream().filter(v -> Objects.equals(member, v.getValue())).findAny().map(v -> v.getScore()).orElse(null); + public CompletableFuture zscoreAsync(String key, Class scoreType, String member) { + return supplyAsync(() -> { + if (key == null) { + return null; + } + CacheEntry entry = container.get(key); + if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) { + return null; + } + Set sets = entry.csetValue; + return (T) sets.stream().filter(v -> Objects.equals(member, v.getValue())).findAny().map(v -> v.getScore()).orElse(null); + }, getExecutor()); } @Override - public CompletableFuture saddAsync(final String key, final Type componentType, T... values) { - return runAsync(() -> sadd(key, componentType, values), getExecutor()).whenComplete(futureCompleteConsumer); - } - - @Override - public CompletableFuture zcardAsync(String key) { - return supplyAsync(() -> zcard(key), getExecutor()).whenComplete(futureCompleteConsumer); - } - - @Override - public long srem(String key, Type type, T... values) { - if (key == null) { - return 0; - } - CacheEntry entry = container.get(key); - if (entry == null || entry.csetValue == null) { - return 0; - } - return entry.csetValue.removeAll(List.of(values)) ? 1 : 0; - } - - @Override - public CompletableFuture sremAsync(final String key, final Type componentType, final T... values) { - return supplyAsync(() -> srem(key, componentType, values), getExecutor()).whenComplete(futureCompleteConsumer); - } - - @Override - public long dbsize() { - return container.size(); - } - - @Override - public void flushdb() { - container.clear(); - } - - @Override - public CompletableFuture flushdbAsync() { - return runAsync(() -> flushdb(), getExecutor()).whenComplete(futureCompleteConsumer); - } - - @Override - public void flushall() { - container.clear(); - } - - @Override - public CompletableFuture flushallAsync() { - return runAsync(() -> flushall(), getExecutor()).whenComplete(futureCompleteConsumer); - } - - @Override - public List keys(String pattern) { - if (pattern == null || pattern.isEmpty()) { - return new ArrayList<>(container.keySet()); - } else { - List rs = new ArrayList<>(); - Predicate filter = Pattern.compile(pattern).asPredicate(); - container.keySet().stream().filter(filter).forEach(x -> rs.add(x)); - return rs; - } - } - - @Override - public List scan(AtomicLong cursor, int limit, String pattern) { - if (pattern == null || pattern.isEmpty()) { - return new ArrayList<>(container.keySet()); - } else { - List rs = new ArrayList<>(); - Predicate filter = Pattern.compile(pattern).asPredicate(); - container.keySet().stream().filter(filter).forEach(x -> rs.add(x)); - return rs; - } - } - - @Override - public List keysStartsWith(String startsWith) { - if (startsWith == null) { - return keys(); - } - List rs = new ArrayList<>(); - container.keySet().stream().filter(x -> x.startsWith(startsWith)).forEach(x -> rs.add(x)); - return rs; - } - - @Override - public CompletableFuture> keysAsync(String pattern) { - return CompletableFuture.completedFuture(keys(pattern)); - } - - @Override - public CompletableFuture> keysStartsWithAsync(String startsWith) { - return CompletableFuture.completedFuture(keysStartsWith(startsWith)); + public CompletableFuture sremAsync(String key, Type type, T... values) { + return supplyAsync(() -> { + if (key == null) { + return 0L; + } + CacheEntry entry = container.get(key); + if (entry == null || entry.csetValue == null) { + return 0L; + } + return entry.csetValue.removeAll(List.of(values)) ? 1L : 0L; + }, getExecutor()); } @Override public CompletableFuture dbsizeAsync() { - return CompletableFuture.completedFuture((long) container.size()); + return supplyAsync(() -> { + return (long) container.size(); + }, getExecutor()); + } + + @Override + public CompletableFuture flushdbAsync() { + return runAsync(() -> { + container.clear(); + }, getExecutor()); + } + + @Override + public CompletableFuture flushallAsync() { + return runAsync(() -> { + container.clear(); + }, getExecutor()); + } + + @Override + public CompletableFuture> keysAsync(String pattern) { + return supplyAsync(() -> { + if (pattern == null || pattern.isEmpty()) { + return new ArrayList<>(container.keySet()); + } else { + List rs = new ArrayList<>(); + Predicate filter = Pattern.compile(pattern).asPredicate(); + container.keySet().stream().filter(filter).forEach(x -> rs.add(x)); + return rs; + } + }, getExecutor()); } @Override public CompletableFuture> scanAsync(AtomicLong cursor, int limit, String pattern) { - return supplyAsync(() -> scan(cursor, limit, pattern), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> { + if (pattern == null || pattern.isEmpty()) { + return new ArrayList<>(container.keySet()); + } else { + List rs = new ArrayList<>(); + Predicate filter = Pattern.compile(pattern).asPredicate(); + container.keySet().stream().filter(filter).forEach(x -> rs.add(x)); + return rs; + } + }, getExecutor()); } @Override - public CompletableFuture spopAsync(String key, Type componentType) { - return supplyAsync(() -> spop(key, componentType), getExecutor()).whenComplete(futureCompleteConsumer); - } - - @Override - public CompletableFuture> spopAsync(String key, int count, Type componentType) { - return supplyAsync(() -> spop(key, count, componentType), getExecutor()).whenComplete(futureCompleteConsumer); - } - - @Override - public CompletableFuture> sscanAsync(final String key, final Type componentType, AtomicLong cursor, int limit, String pattern) { - return supplyAsync(() -> sscan(key, componentType, cursor, limit, pattern), getExecutor()).whenComplete(futureCompleteConsumer); - } - - @Override - public CompletableFuture zaddAsync(String key, CacheScoredValue... values) { - return runAsync(() -> zadd(key, values), getExecutor()).whenComplete(futureCompleteConsumer); - } - - @Override - public CompletableFuture> zmscoreAsync(String key, Class type, String... members) { - return supplyAsync(() -> zmscore(key, type, members), getExecutor()).whenComplete(futureCompleteConsumer); - } - - @Override - public CompletableFuture zremAsync(String key, String... members) { - return supplyAsync(() -> zrem(key, members), getExecutor()).whenComplete(futureCompleteConsumer); - } - - @Override - public CompletableFuture zscoreAsync(String key, Class type, String member) { - return supplyAsync(() -> zscore(key, type, member), getExecutor()).whenComplete(futureCompleteConsumer); + public CompletableFuture> keysStartsWithAsync(String startsWith) { + if (startsWith == null) { + return keysAsync(); + } + return supplyAsync(() -> { + List rs = new ArrayList<>(); + container.keySet().stream().filter(x -> x.startsWith(startsWith)).forEach(x -> rs.add(x)); + return rs; + }, getExecutor()); } protected CacheEntryType findEntryType(Type type) { diff --git a/src/main/java/org/redkale/source/CacheSource.java b/src/main/java/org/redkale/source/CacheSource.java index 899064eb6..aa32c07b5 100644 --- a/src/main/java/org/redkale/source/CacheSource.java +++ b/src/main/java/org/redkale/source/CacheSource.java @@ -36,6 +36,104 @@ public interface CacheSource extends Resourcable { return isOpenAsync().join(); } + //------------------------ 字符串 String ------------------------ + default long incr(String key) { + return incrAsync(key).join(); + } + + default long incrby(String key, long num) { + return incrbyAsync(key, num).join(); + } + + default double incrbyFloat(String key, double num) { + return incrbyFloatAsync(key, num).join(); + } + + default long decr(String key) { + return decrAsync(key).join(); + } + + default long decrby(String key, long num) { + return decrbyAsync(key, num).join(); + } + + //------------------------ set ------------------------ + default void set(String key, Convert convert, Type type, T value) { + setAsync(key, convert, type, value).join(); + } + + default void set(String key, Type type, T value) { + set(key, (Convert) null, type, value); + } + + default void setString(String key, String value) { + set(key, String.class, value); + } + + default void setLong(String key, long value) { + set(key, Long.class, value); + } + + //MSET key value [key value ...] + default void mset(Serializable... keyVals) { + msetAsync(keyVals).join(); + } + + default void mset(Map map) { + msetAsync(map).join(); + } + + //------------------------ setnx ------------------------ + default boolean setnx(String key, Convert convert, Type type, T value) { + return setnxAsync(key, convert, type, value).join(); + } + + default boolean setnx(String key, Type type, T value) { + return setnx(key, (Convert) null, type, value); + } + + default boolean setnxString(String key, String value) { + return setnx(key, String.class, value); + } + + default boolean setnxLong(String key, long value) { + return setnx(key, Long.class, value); + } + + //------------------------ setex ------------------------ + default void setex(String key, int expireSeconds, Convert convert, Type type, T value) { + setexAsync(key, expireSeconds, convert, type, value).join(); + } + + default void setex(String key, int expireSeconds, Type type, T value) { + setex(key, expireSeconds, (Convert) null, type, value); + } + + default void setexString(String key, int expireSeconds, String value) { + setex(key, expireSeconds, String.class, value); + } + + default void setexLong(String key, int expireSeconds, long value) { + setex(key, expireSeconds, Long.class, value); + } + + //------------------------ setnxex ------------------------ + default boolean setnxex(String key, int expireSeconds, Convert convert, Type type, T value) { + return setnxexAsync(key, expireSeconds, convert, type, value).join(); + } + + default boolean setnxex(String key, int expireSeconds, Type type, T value) { + return setnxex(key, expireSeconds, (Convert) null, type, value); + } + + default boolean setnxexString(String key, int expireSeconds, String value) { + return setnxex(key, expireSeconds, String.class, value); + } + + default boolean setnxexLong(String key, int expireSeconds, long value) { + return setnxex(key, expireSeconds, Long.class, value); + } + //------------------------ get ------------------------ default T get(String key, Type type) { return (T) getAsync(key, type).join(); @@ -122,84 +220,11 @@ public interface CacheSource extends Resourcable { return val == null ? defValue : val; } - //------------------------ set ------------------------ - //MSET key value [key value ...] - default void mset(Serializable... keyVals) { - msetAsync(keyVals).join(); + //------------------------ 键 Keys ------------------------ + default long del(String... keys) { + return delAsync(keys).join(); } - default void mset(Map map) { - msetAsync(map).join(); - } - - default void set(String key, Convert convert, Type type, T value) { - setAsync(key, convert, type, value).join(); - } - - default void set(String key, Type type, T value) { - set(key, (Convert) null, type, value); - } - - default void setString(String key, String value) { - set(key, String.class, value); - } - - default void setLong(String key, long value) { - set(key, Long.class, value); - } - - //------------------------ setnx ------------------------ - default boolean setnx(String key, Convert convert, Type type, T value) { - return setnxAsync(key, convert, type, value).join(); - } - - default boolean setnx(String key, Type type, T value) { - return setnx(key, (Convert) null, type, value); - } - - default boolean setnxString(String key, String value) { - return setnx(key, String.class, value); - } - - default boolean setnxLong(String key, long value) { - return setnx(key, Long.class, value); - } - - //------------------------ setnxex ------------------------ - default boolean setnxex(String key, int expireSeconds, Convert convert, Type type, T value) { - return setnxexAsync(key, expireSeconds, convert, type, value).join(); - } - - default boolean setnxex(String key, int expireSeconds, Type type, T value) { - return setnxex(key, expireSeconds, (Convert) null, type, value); - } - - default boolean setnxexString(String key, int expireSeconds, String value) { - return setnxex(key, expireSeconds, String.class, value); - } - - default boolean setnxexLong(String key, int expireSeconds, long value) { - return setnxex(key, expireSeconds, Long.class, value); - } - - //------------------------ setex ------------------------ - default void setex(String key, int expireSeconds, Convert convert, Type type, T value) { - setexAsync(key, expireSeconds, convert, type, value).join(); - } - - default void setex(String key, int expireSeconds, Type type, T value) { - setex(key, expireSeconds, (Convert) null, type, value); - } - - default void setexString(String key, int expireSeconds, String value) { - setex(key, expireSeconds, String.class, value); - } - - default void setexLong(String key, int expireSeconds, long value) { - setex(key, expireSeconds, Long.class, value); - } - - //------------------------ xxxx ------------------------ default boolean exists(String key) { return existsAsync(key).join(); } @@ -208,6 +233,18 @@ public interface CacheSource extends Resourcable { expireAsync(key, expireSeconds).join(); } + default List keys(String pattern) { + return keysAsync(pattern).join(); + } + + default List keys() { + return keys(null); + } + + default List keysStartsWith(String startsWith) { + return keys(startsWith + "*"); + } + default boolean persist(String key) { return persistAsync(key).join(); } @@ -220,43 +257,6 @@ public interface CacheSource extends Resourcable { return renamenxAsync(oldKey, newKey).join(); } - default long del(String... keys) { - return delAsync(keys).join(); - } - - default long incr(String key) { - return incrAsync(key).join(); - } - - default long incrby(String key, long num) { - return incrbyAsync(key, num).join(); - } - - default double incrbyFloat(String key, double num) { - return incrbyFloatAsync(key, num).join(); - } - - default long decr(String key) { - return decrAsync(key).join(); - } - - default long decrby(String key, long num) { - return decrbyAsync(key, num).join(); - } - - //------------------------ 键 Keys ------------------------ - default List keys(String pattern) { - return keysAsync(pattern).join(); - } - - default List keys() { - return keys(null); - } - - default List keysStartsWith(String startsWith) { - return keys(startsWith + "*"); - } - default List scan(AtomicLong cursor, int limit, String pattern) { return scanAsync(cursor, limit, pattern).join(); } @@ -529,15 +529,15 @@ public interface CacheSource extends Resourcable { return rpoplpush(key, key2, Long.class); } - default int lrem(String key, Type componentType, T value) { + default long lrem(String key, Type componentType, T value) { return lremAsync(key, componentType, value).join(); } - default int lremString(String key, String value) { + default long lremString(String key, String value) { return lrem(key, String.class, value); } - default int lremLong(String key, long value) { + default long lremLong(String key, long value) { return lrem(key, Long.class, value); } @@ -767,7 +767,83 @@ public interface CacheSource extends Resourcable { //---------------------- CompletableFuture 异步版 --------------------------------- public CompletableFuture isOpenAsync(); - //------------------------ getAsync ------------------------ + //------------------------ 键 Keys ------------------------ + public CompletableFuture incrAsync(String key); + + public CompletableFuture incrbyAsync(String key, long num); + + public CompletableFuture decrAsync(String key); + + public CompletableFuture decrbyAsync(String key, long num); + + public CompletableFuture incrbyFloatAsync(String key, double num); + + //------------------------ set ------------------------ + public CompletableFuture setAsync(String key, Convert convert, Type type, T value); + + default CompletableFuture setAsync(String key, Type type, T value) { + return setAsync(key, (Convert) null, type, value); + } + + default CompletableFuture setStringAsync(String key, String value) { + return setAsync(key, String.class, value); + } + + default CompletableFuture setLongAsync(String key, long value) { + return setAsync(key, Long.class, value); + } + + //MSET key value [key value ...] + public CompletableFuture msetAsync(Serializable... keyVals); + + public CompletableFuture msetAsync(Map map); + + //------------------------ setnx ------------------------ + public CompletableFuture setnxAsync(String key, Convert convert, Type type, T value); + + default CompletableFuture setnxAsync(String key, Type type, T value) { + return setnxAsync(key, (Convert) null, type, value); + } + + default CompletableFuture setnxStringAsync(String key, String value) { + return setnxAsync(key, String.class, value); + } + + default CompletableFuture setnxLongAsync(String key, long value) { + return setnxAsync(key, Long.class, value); + } + + //------------------------ setex ------------------------ + public CompletableFuture setexAsync(String key, int expireSeconds, Convert convert, Type type, T value); + + default CompletableFuture setexAsync(String key, int expireSeconds, Type type, T value) { + return setexAsync(key, expireSeconds, (Convert) null, type, value); + } + + default CompletableFuture setexStringAsync(String key, int expireSeconds, String value) { + return setexAsync(key, expireSeconds, String.class, value); + } + + default CompletableFuture setexLongAsync(String key, int expireSeconds, long value) { + return setexAsync(key, expireSeconds, Long.class, value); + } + + //------------------------ setnxex ------------------------ + public CompletableFuture setnxexAsync(String key, int expireSeconds, Convert convert, Type type, T value); + + default CompletableFuture setnxexAsync(String key, int expireSeconds, Type type, T value) { + return setnxexAsync(key, expireSeconds, (Convert) null, type, value); + } + + default CompletableFuture setnxexStringAsync(String key, int expireSeconds, String value) { + return setnxexAsync(key, expireSeconds, String.class, value); + } + + default CompletableFuture setnxexLongAsync(String key, int expireSeconds, long value) { + return setnxexAsync(key, expireSeconds, Long.class, value); + } + + //------------------------ get ------------------------ public CompletableFuture getAsync(String key, Type type); default CompletableFuture getStringAsync(String key) { @@ -778,7 +854,7 @@ public interface CacheSource extends Resourcable { return getAsync(key, Long.class).thenApply(v -> v == null ? defValue : (Long) v); } - //------------------------ mgetAsync ------------------------ + //------------------------ mget ------------------------ public CompletableFuture> mgetAsync(Type componentType, String... keys); default CompletableFuture> mgetStringAsync(String... keys) { @@ -819,7 +895,7 @@ public interface CacheSource extends Resourcable { }); } - //------------------------ getexAsync ------------------------ + //------------------------ getex ------------------------ public CompletableFuture getexAsync(String key, int expireSeconds, Type type); default CompletableFuture getexStringAsync(String key, int expireSeconds) { @@ -830,7 +906,7 @@ public interface CacheSource extends Resourcable { return getexAsync(key, expireSeconds, Long.class).thenApply(v -> v == null ? defValue : (Long) v); } - //------------------------ getsetAsync ------------------------ + //------------------------ getset ------------------------ public CompletableFuture getSetAsync(String key, Convert convert, Type type, T value); default CompletableFuture getSetAsync(String key, Type type, T value) { @@ -845,95 +921,13 @@ public interface CacheSource extends Resourcable { return getSetAsync(key, Long.class, value).thenApply(v -> v == null ? defValue : (Long) v); } - //------------------------ setAsync ------------------------ - //MSET key value [key value ...] - public CompletableFuture msetAsync(Serializable... keyVals); + //------------------------ 键 Keys ------------------------ + public CompletableFuture delAsync(String... keys); - public CompletableFuture msetAsync(Map map); - - public CompletableFuture setAsync(String key, Convert convert, Type type, T value); - - default CompletableFuture setAsync(String key, Type type, T value) { - return setAsync(key, (Convert) null, type, value); - } - - default CompletableFuture setStringAsync(String key, String value) { - return setAsync(key, String.class, value); - } - - default CompletableFuture setLongAsync(String key, long value) { - return setAsync(key, Long.class, value); - } - - //------------------------ setnxAsync ------------------------ - public CompletableFuture setnxAsync(String key, Convert convert, Type type, T value); - - default CompletableFuture setnxAsync(String key, Type type, T value) { - return setnxAsync(key, (Convert) null, type, value); - } - - default CompletableFuture setnxStringAsync(String key, String value) { - return setnxAsync(key, String.class, value); - } - - default CompletableFuture setnxLongAsync(String key, long value) { - return setnxAsync(key, Long.class, value); - } - - //------------------------ setnxexAsync ------------------------ - public CompletableFuture setnxexAsync(String key, int expireSeconds, Convert convert, Type type, T value); - - default CompletableFuture setnxexAsync(String key, int expireSeconds, Type type, T value) { - return setnxexAsync(key, expireSeconds, (Convert) null, type, value); - } - - default CompletableFuture setnxexStringAsync(String key, int expireSeconds, String value) { - return setnxexAsync(key, expireSeconds, String.class, value); - } - - default CompletableFuture setnxexLongAsync(String key, int expireSeconds, long value) { - return setnxexAsync(key, expireSeconds, Long.class, value); - } - - //------------------------ setexAsync ------------------------ - public CompletableFuture setexAsync(String key, int expireSeconds, Convert convert, Type type, T value); - - default CompletableFuture setexAsync(String key, int expireSeconds, Type type, T value) { - return setexAsync(key, expireSeconds, (Convert) null, type, value); - } - - default CompletableFuture setexStringAsync(String key, int expireSeconds, String value) { - return setexAsync(key, expireSeconds, String.class, value); - } - - default CompletableFuture setexLongAsync(String key, int expireSeconds, long value) { - return setexAsync(key, expireSeconds, Long.class, value); - } - - //------------------------ xxxxAsync ------------------------ public CompletableFuture existsAsync(String key); public CompletableFuture expireAsync(String key, int seconds); - public CompletableFuture persistAsync(String key); - - public CompletableFuture renameAsync(String oldKey, String newKey); - - public CompletableFuture renamenxAsync(String oldKey, String newKey); - - public CompletableFuture delAsync(String... keys); - - public CompletableFuture incrAsync(String key); - - public CompletableFuture incrbyAsync(String key, long num); - - public CompletableFuture decrAsync(String key); - - public CompletableFuture decrbyAsync(String key, long num); - - public CompletableFuture incrbyFloatAsync(String key, double num); - - //------------------------ 键 Keys ------------------------ public CompletableFuture> keysAsync(String pattern); default CompletableFuture> keysAsync() { @@ -944,6 +938,12 @@ public interface CacheSource extends Resourcable { return keysAsync(startsWith + "*"); } + public CompletableFuture persistAsync(String key); + + public CompletableFuture renameAsync(String oldKey, String newKey); + + public CompletableFuture renamenxAsync(String oldKey, String newKey); + public CompletableFuture> scanAsync(AtomicLong cursor, int limit, String pattern); default CompletableFuture> scanAsync(AtomicLong cursor, int limit) { @@ -1151,13 +1151,13 @@ public interface CacheSource extends Resourcable { return rpoplpushAsync(key, key2, Long.class); } - public CompletableFuture lremAsync(String key, Type componentType, T value); + public CompletableFuture lremAsync(String key, Type componentType, T value); - default CompletableFuture lremStringAsync(String key, String value) { + default CompletableFuture lremStringAsync(String key, String value) { return lremAsync(key, String.class, value); } - default CompletableFuture lremLongAsync(String key, long value) { + default CompletableFuture lremLongAsync(String key, long value) { return lremAsync(key, Long.class, value); } @@ -1678,7 +1678,7 @@ public interface CacheSource extends Resourcable { @Deprecated(since = "2.8.0") default CompletableFuture removeListItemAsync(String key, Type componentType, T value) { - return lremAsync(key, componentType, value); + return lremAsync(key, componentType, value).thenApply(v -> v.intValue()); } @Deprecated(since = "2.8.0") @@ -1688,7 +1688,7 @@ public interface CacheSource extends Resourcable { @Deprecated(since = "2.8.0") default CompletableFuture removeStringListItemAsync(String key, String value) { - return lremStringAsync(key, value); + return lremStringAsync(key, value).thenApply(v -> v.intValue()); } @Deprecated(since = "2.8.0") @@ -1698,7 +1698,7 @@ public interface CacheSource extends Resourcable { @Deprecated(since = "2.8.0") default CompletableFuture removeLongListItemAsync(String key, long value) { - return lremLongAsync(key, value); + return lremLongAsync(key, value).thenApply(v -> v.intValue()); } @Deprecated(since = "2.8.0") @@ -1708,7 +1708,7 @@ public interface CacheSource extends Resourcable { @Deprecated(since = "2.8.0") default int removeListItem(String key, Type componentType, T value) { - return lrem(key, componentType, value); + return (int) lrem(key, componentType, value); } @Deprecated(since = "2.8.0") @@ -1718,7 +1718,7 @@ public interface CacheSource extends Resourcable { @Deprecated(since = "2.8.0") default int removeStringListItem(String key, String value) { - return lremString(key, value); + return (int) lremString(key, value); } @Deprecated(since = "2.8.0") @@ -1728,7 +1728,7 @@ public interface CacheSource extends Resourcable { @Deprecated(since = "2.8.0") default int removeLongListItem(String key, long value) { - return lremLong(key, value); + return (int) lremLong(key, value); } @Deprecated(since = "2.8.0")