diff --git a/src/main/java/org/redkale/cluster/CacheClusterAgent.java b/src/main/java/org/redkale/cluster/CacheClusterAgent.java index beb97fd85..3fbc19e7d 100644 --- a/src/main/java/org/redkale/cluster/CacheClusterAgent.java +++ b/src/main/java/org/redkale/cluster/CacheClusterAgent.java @@ -237,7 +237,7 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable { } private CompletableFuture> queryAddress(final String serviceName) { - final CompletableFuture> future = source.hmapAsync(serviceName, AddressEntry.class, new AtomicInteger(), 10000); + final CompletableFuture> future = source.hscanAsync(serviceName, AddressEntry.class, new AtomicInteger(), 10000); return future.thenApply(map -> { final Set set = new HashSet<>(); map.forEach((n, v) -> { diff --git a/src/main/java/org/redkale/source/AbstractCacheSource.java b/src/main/java/org/redkale/source/AbstractCacheSource.java index 12ce4e62f..e6afdbe4e 100644 --- a/src/main/java/org/redkale/source/AbstractCacheSource.java +++ b/src/main/java/org/redkale/source/AbstractCacheSource.java @@ -3,7 +3,7 @@ package org.redkale.source; import java.util.*; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.*; import java.util.function.Supplier; import org.redkale.annotation.AutoLoad; import org.redkale.annotation.ResourceListener; @@ -114,4 +114,8 @@ public abstract class AbstractCacheSource extends AbstractService implements Cac protected CompletableFuture supplyAsync(Supplier supplier) { return CompletableFuture.supplyAsync(supplier); } + + protected CompletableFuture supplyAsync(Supplier supplier, Executor executor) { + return CompletableFuture.supplyAsync(supplier, executor); + } } diff --git a/src/main/java/org/redkale/source/CacheMemorySource.java b/src/main/java/org/redkale/source/CacheMemorySource.java index 1a5f3e069..94b0a5306 100644 --- a/src/main/java/org/redkale/source/CacheMemorySource.java +++ b/src/main/java/org/redkale/source/CacheMemorySource.java @@ -393,12 +393,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } @Override - public Map hmap(final String key, final Type type, AtomicInteger cursor, int limit) { - return hmap(key, type, cursor, limit, null); - } - - @Override - public Map hmap(final String key, final Type type, AtomicInteger cursor, int limit, String pattern) { + public Map hscan(final String key, final Type type, AtomicInteger cursor, int limit, String pattern) { if (key == null) { return new HashMap(); } @@ -460,7 +455,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture existsAsync(final String key) { - return CompletableFuture.supplyAsync(() -> exists(key), getExecutor()); + return supplyAsync(() -> exists(key), getExecutor()); } @Override @@ -522,47 +517,47 @@ public final class CacheMemorySource extends AbstractCacheSource { //----------- hxxx -------------- @Override public CompletableFuture hdelAsync(final String key, String... fields) { - return CompletableFuture.supplyAsync(() -> hdel(key, fields), getExecutor()); + return supplyAsync(() -> hdel(key, fields), getExecutor()); } @Override public CompletableFuture> hkeysAsync(final String key) { - return CompletableFuture.supplyAsync(() -> hkeys(key), getExecutor()); + return supplyAsync(() -> hkeys(key), getExecutor()); } @Override public CompletableFuture hlenAsync(final String key) { - return CompletableFuture.supplyAsync(() -> hlen(key), getExecutor()); + return supplyAsync(() -> hlen(key), getExecutor()); } @Override public CompletableFuture hincrAsync(final String key, String field) { - return CompletableFuture.supplyAsync(() -> hincr(key, field), getExecutor()); + return supplyAsync(() -> hincr(key, field), getExecutor()); } @Override public CompletableFuture hincrbyAsync(final String key, String field, long num) { - return CompletableFuture.supplyAsync(() -> hincrby(key, field, num), getExecutor()); + return supplyAsync(() -> hincrby(key, field, num), getExecutor()); } @Override public CompletableFuture hincrbyFloatAsync(final String key, String field, double num) { - return CompletableFuture.supplyAsync(() -> hincrbyFloat(key, field, num), getExecutor()); + return supplyAsync(() -> hincrbyFloat(key, field, num), getExecutor()); } @Override public CompletableFuture hdecrAsync(final String key, String field) { - return CompletableFuture.supplyAsync(() -> hdecr(key, field), getExecutor()); + return supplyAsync(() -> hdecr(key, field), getExecutor()); } @Override public CompletableFuture hdecrbyAsync(final String key, String field, long num) { - return CompletableFuture.supplyAsync(() -> hdecrby(key, field, num), getExecutor()); + return supplyAsync(() -> hdecrby(key, field, num), getExecutor()); } @Override public CompletableFuture hexistsAsync(final String key, String field) { - return CompletableFuture.supplyAsync(() -> hexists(key, field), getExecutor()); + return supplyAsync(() -> hexists(key, field), getExecutor()); } @Override @@ -587,22 +582,22 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture hsetnxAsync(final String key, final String field, final Type type, final T value) { - return CompletableFuture.supplyAsync(() -> hsetnx(key, field, type, value), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> hsetnx(key, field, type, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture hsetnxAsync(final String key, final String field, final Convert convert, final Type type, final T value) { - return CompletableFuture.supplyAsync(() -> hsetnx(key, field, convert, type, value), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> hsetnx(key, field, convert, type, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture hsetnxStringAsync(final String key, final String field, final String value) { - return CompletableFuture.supplyAsync(() -> hsetnxString(key, field, value), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> hsetnxString(key, field, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture hsetnxLongAsync(final String key, final String field, final long value) { - return CompletableFuture.supplyAsync(() -> hsetnxLong(key, field, value), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> hsetnxLong(key, field, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -617,83 +612,78 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture> hmgetAsync(final String key, final Type type, final String... fields) { - return CompletableFuture.supplyAsync(() -> hmget(key, type, fields), getExecutor()); + return supplyAsync(() -> hmget(key, type, fields), getExecutor()); } @Override public CompletableFuture> hgetallAsync(final String key, final Type type) { - return CompletableFuture.supplyAsync(() -> hgetall(key, type), getExecutor()); + return supplyAsync(() -> hgetall(key, type), getExecutor()); } @Override public CompletableFuture> hgetallStringAsync(final String key) { - return CompletableFuture.supplyAsync(() -> hgetallString(key), getExecutor()); + return supplyAsync(() -> hgetallString(key), getExecutor()); } @Override public CompletableFuture> hgetallLongAsync(final String key) { - return CompletableFuture.supplyAsync(() -> hgetallLong(key), getExecutor()); + return supplyAsync(() -> hgetallLong(key), getExecutor()); } @Override public CompletableFuture> hvalsAsync(final String key, final Type type) { - return CompletableFuture.supplyAsync(() -> hvals(key, type), getExecutor()); + return supplyAsync(() -> hvals(key, type), getExecutor()); } @Override public CompletableFuture> hvalsStringAsync(final String key) { - return CompletableFuture.supplyAsync(() -> hvalsString(key), getExecutor()); + return supplyAsync(() -> hvalsString(key), getExecutor()); } @Override public CompletableFuture> hvalsLongAsync(final String key) { - return CompletableFuture.supplyAsync(() -> hvalsLong(key), getExecutor()); + return supplyAsync(() -> hvalsLong(key), getExecutor()); } @Override - public CompletableFuture> hmapAsync(final String key, final Type type, AtomicInteger cursor, int limit) { - return CompletableFuture.supplyAsync(() -> hmap(key, type, cursor, limit), getExecutor()); - } - - @Override - public CompletableFuture> hmapAsync(final String key, final Type type, AtomicInteger cursor, int limit, String pattern) { - return CompletableFuture.supplyAsync(() -> hmap(key, type, cursor, limit, pattern), getExecutor()); + public CompletableFuture> hscanAsync(final String key, final Type type, AtomicInteger cursor, int limit, String pattern) { + return supplyAsync(() -> hscan(key, type, cursor, limit, pattern), getExecutor()); } @Override public CompletableFuture hgetAsync(final String key, final String field, final Type type) { - return CompletableFuture.supplyAsync(() -> hget(key, field, type), getExecutor()); + return supplyAsync(() -> hget(key, field, type), getExecutor()); } @Override public CompletableFuture hgetStringAsync(final String key, final String field) { - return CompletableFuture.supplyAsync(() -> hgetString(key, field), getExecutor()); + return supplyAsync(() -> hgetString(key, field), getExecutor()); } @Override public CompletableFuture hgetLongAsync(final String key, final String field, long defValue) { - return CompletableFuture.supplyAsync(() -> hgetLong(key, field, defValue), getExecutor()); + return supplyAsync(() -> hgetLong(key, field, defValue), getExecutor()); } //----------- hxxx -------------- @Override public CompletableFuture getAsync(final String key, final Type type) { - return CompletableFuture.supplyAsync(() -> (T) get(key, type), getExecutor()); + return supplyAsync(() -> (T) get(key, type), getExecutor()); } @Override public CompletableFuture getStringAsync(final String key) { - return CompletableFuture.supplyAsync(() -> getString(key), getExecutor()); + return supplyAsync(() -> getString(key), getExecutor()); } @Override public CompletableFuture getLongAsync(final String key, long defValue) { - return CompletableFuture.supplyAsync(() -> getLong(key, defValue), getExecutor()); + return supplyAsync(() -> getLong(key, defValue), getExecutor()); } @Override public CompletableFuture getSetLongAsync(final String key, long value, long defValue) { - return CompletableFuture.supplyAsync(() -> getSetLong(key, value, defValue), getExecutor()); + return supplyAsync(() -> getSetLong(key, value, defValue), getExecutor()); } @Override @@ -748,17 +738,17 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture getexAsync(final String key, final int expireSeconds, final Type type) { - return CompletableFuture.supplyAsync(() -> getex(key, expireSeconds, type), getExecutor()); + return supplyAsync(() -> getex(key, expireSeconds, type), getExecutor()); } @Override public CompletableFuture getexStringAsync(final String key, final int expireSeconds) { - return CompletableFuture.supplyAsync(() -> getexString(key, expireSeconds), getExecutor()); + return supplyAsync(() -> getexString(key, expireSeconds), getExecutor()); } @Override public CompletableFuture getexLongAsync(final String key, final int expireSeconds, long defValue) { - return CompletableFuture.supplyAsync(() -> getexLong(key, expireSeconds, defValue), getExecutor()); + return supplyAsync(() -> getexLong(key, expireSeconds, defValue), getExecutor()); } protected void set(CacheEntryType cacheType, String key, Object value) { @@ -981,12 +971,12 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture setnxAsync(String key, Type type, T value) { - return CompletableFuture.supplyAsync(() -> setnx(key, type, value), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> setnx(key, type, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture setnxAsync(String key, Convert convert, Type type, T value) { - return CompletableFuture.supplyAsync(() -> setnx(key, convert, type, value), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> setnx(key, convert, type, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -1006,7 +996,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture setnxStringAsync(String key, String value) { - return CompletableFuture.supplyAsync(() -> setnxString(key, value), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> setnxString(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -1021,7 +1011,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture setnxLongAsync(String key, long value) { - return CompletableFuture.supplyAsync(() -> setnxLong(key, value), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> setnxLong(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } protected void set(CacheEntryType cacheType, int expireSeconds, String key, Object value) { @@ -1101,27 +1091,27 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture setnxexStringAsync(String key, int expireSeconds, String value) { - return CompletableFuture.supplyAsync(() -> setnxexString(key, expireSeconds, value), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> setnxexString(key, expireSeconds, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture setnxexLongAsync(String key, int expireSeconds, long value) { - return CompletableFuture.supplyAsync(() -> setnxexLong(key, expireSeconds, value), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> setnxexLong(key, expireSeconds, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture setnxexBytesAsync(String key, int expireSeconds, byte[] value) { - return CompletableFuture.supplyAsync(() -> setnxexBytes(key, expireSeconds, value), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> setnxexBytes(key, expireSeconds, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture setnxexAsync(final String key, final int expireSeconds, final Type type, final T value) { - return CompletableFuture.supplyAsync(() -> setnxex(key, expireSeconds, type, value), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> setnxex(key, expireSeconds, type, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture setnxexAsync(final String key, final int expireSeconds, final Convert convert, final Type type, final T value) { - return CompletableFuture.supplyAsync(() -> setnxex(key, expireSeconds, convert, type, value), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> setnxex(key, expireSeconds, convert, type, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -1189,17 +1179,17 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture persistAsync(String key) { - return CompletableFuture.supplyAsync(() -> persist(key), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> persist(key), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture renameAsync(String oldKey, String newKey) { - return CompletableFuture.supplyAsync(() -> rename(oldKey, newKey), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> rename(oldKey, newKey), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture renamenxAsync(String oldKey, String newKey) { - return CompletableFuture.supplyAsync(() -> renamenx(oldKey, newKey), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> renamenx(oldKey, newKey), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -1221,7 +1211,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture incrAsync(final String key) { - return CompletableFuture.supplyAsync(() -> incr(key), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> incr(key), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -1263,12 +1253,12 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture incrbyAsync(final String key, long num) { - return CompletableFuture.supplyAsync(() -> incrby(key, num), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> incrby(key, num), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture incrbyFloatAsync(final String key, double num) { - return CompletableFuture.supplyAsync(() -> incrbyFloat(key, num), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> incrbyFloat(key, num), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -1278,7 +1268,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture decrAsync(final String key) { - return CompletableFuture.supplyAsync(() -> decr(key), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> decr(key), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -1288,12 +1278,12 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture decrbyAsync(final String key, long num) { - return CompletableFuture.supplyAsync(() -> decrby(key, num), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> decrby(key, num), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture delAsync(final String... keys) { - return CompletableFuture.supplyAsync(() -> del(keys), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> del(keys), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -1391,22 +1381,22 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture>> lrangeAsync(Type componentType, String... keys) { - return CompletableFuture.supplyAsync(() -> lrange(componentType, keys), getExecutor()); + return supplyAsync(() -> lrange(componentType, keys), getExecutor()); } @Override public CompletableFuture>> smembersAsync(Type componentType, String... keys) { - return CompletableFuture.supplyAsync(() -> smembers(componentType, keys), getExecutor()); + return supplyAsync(() -> smembers(componentType, keys), getExecutor()); } @Override public CompletableFuture> smembersAsync(String key, Type componentType) { - return CompletableFuture.supplyAsync(() -> smembers(key, componentType), getExecutor()); + return supplyAsync(() -> smembers(key, componentType), getExecutor()); } @Override public CompletableFuture> lrangeAsync(String key, Type componentType) { - return CompletableFuture.supplyAsync(() -> lrange(key, componentType), getExecutor()); + return supplyAsync(() -> lrange(key, componentType), getExecutor()); } @Override @@ -1423,12 +1413,12 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture llenAsync(final String key) { - return CompletableFuture.supplyAsync(() -> llen(key), getExecutor()); + return supplyAsync(() -> llen(key), getExecutor()); } @Override public CompletableFuture scardAsync(final String key) { - return CompletableFuture.supplyAsync(() -> scard(key), getExecutor()); + return supplyAsync(() -> scard(key), getExecutor()); } @Override @@ -1439,7 +1429,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture sismemberAsync(final String key, final Type type, final T value) { - return CompletableFuture.supplyAsync(() -> sismember(key, type, value), getExecutor()); + return supplyAsync(() -> sismember(key, type, value), getExecutor()); } @Override @@ -1450,7 +1440,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture sismemberStringAsync(final String key, final String value) { - return CompletableFuture.supplyAsync(() -> sismemberString(key, value), getExecutor()); + return supplyAsync(() -> sismemberString(key, value), getExecutor()); } @Override @@ -1461,7 +1451,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture sismemberLongAsync(final String key, final long value) { - return CompletableFuture.supplyAsync(() -> sismemberLong(key, value), getExecutor()); + return supplyAsync(() -> sismemberLong(key, value), getExecutor()); } protected void appendListItem(CacheEntryType cacheType, String key, Object value) { @@ -1552,17 +1542,17 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture lremAsync(final String key, final Type componentType, T value) { - return CompletableFuture.supplyAsync(() -> lrem(key, componentType, value), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> lrem(key, componentType, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture lremStringAsync(final String key, final String value) { - return CompletableFuture.supplyAsync(() -> lremString(key, value), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> lremString(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture lremLongAsync(final String key, final long value) { - return CompletableFuture.supplyAsync(() -> lremLong(key, value), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> lremLong(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -1726,17 +1716,17 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture sremAsync(final String key, final Type componentType, final T value) { - return CompletableFuture.supplyAsync(() -> srem(key, componentType, value), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> srem(key, componentType, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture sremStringAsync(final String key, final String value) { - return CompletableFuture.supplyAsync(() -> sremString(key, value), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> sremString(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture sremLongAsync(final String key, final long value) { - return CompletableFuture.supplyAsync(() -> sremLong(key, value), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> sremLong(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -1760,12 +1750,12 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture getBytesAsync(final String key) { - return CompletableFuture.supplyAsync(() -> getBytes(key), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> getBytes(key), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture getSetBytesAsync(final String key, byte[] value) { - return CompletableFuture.supplyAsync(() -> getSetBytes(key, value), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> getSetBytes(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -1784,7 +1774,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture getexBytesAsync(final String key, final int expireSeconds) { - return CompletableFuture.supplyAsync(() -> getexBytes(key, expireSeconds), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> getexBytes(key, expireSeconds), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -1804,7 +1794,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override public CompletableFuture setnxBytesAsync(final String key, byte[] value) { - return CompletableFuture.supplyAsync(() -> setnxBytes(key, value), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> setnxBytes(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } @Override @@ -1834,6 +1824,18 @@ public final class CacheMemorySource extends AbstractCacheSource { } } + @Override + public List scan(AtomicInteger cursor, int limit, String pattern) { + if (pattern == null || pattern.isEmpty()) { + return new ArrayList<>(container.keySet()); + } else { + List rs = new ArrayList<>(); + Predicate filter = Pattern.compile(pattern).asPredicate(); + container.keySet().stream().filter(filter).forEach(x -> rs.add(x)); + return rs; + } + } + @Override public List keysStartsWith(String startsWith) { if (startsWith == null) { @@ -1859,34 +1861,39 @@ public final class CacheMemorySource extends AbstractCacheSource { return CompletableFuture.completedFuture((long) container.size()); } + @Override + public CompletableFuture> scanAsync(AtomicInteger cursor, int limit, String pattern) { + return supplyAsync(() -> scan(cursor, limit, pattern), getExecutor()).whenComplete(futureCompleteConsumer); + } + @Override public CompletableFuture spopAsync(String key, Type componentType) { - return CompletableFuture.supplyAsync(() -> spop(key, componentType), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> spop(key, componentType), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture> spopAsync(String key, int count, Type componentType) { - return CompletableFuture.supplyAsync(() -> spop(key, count, componentType), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> spop(key, count, componentType), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture spopStringAsync(String key) { - return CompletableFuture.supplyAsync(() -> spopString(key), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> spopString(key), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture> spopStringAsync(String key, int count) { - return CompletableFuture.supplyAsync(() -> spopString(key, count), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> spopString(key, count), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture spopLongAsync(String key) { - return CompletableFuture.supplyAsync(() -> spopLong(key), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> spopLong(key), getExecutor()).whenComplete(futureCompleteConsumer); } @Override public CompletableFuture> spopLongAsync(String key, int count) { - return CompletableFuture.supplyAsync(() -> spopLong(key, count), getExecutor()).whenComplete(futureCompleteConsumer); + return supplyAsync(() -> spopLong(key, count), getExecutor()).whenComplete(futureCompleteConsumer); } public static enum CacheEntryType { @@ -2004,55 +2011,55 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override @Deprecated(since = "2.8.0") public CompletableFuture> getexCollectionAsync(final String key, final int expireSeconds, final Type componentType) { - return CompletableFuture.supplyAsync(() -> getexCollection(key, expireSeconds, componentType), getExecutor()); + return supplyAsync(() -> getexCollection(key, expireSeconds, componentType), getExecutor()); } @Override @Deprecated(since = "2.8.0") public CompletableFuture> getexStringCollectionAsync(final String key, final int expireSeconds) { - return CompletableFuture.supplyAsync(() -> getexStringCollection(key, expireSeconds), getExecutor()); + return supplyAsync(() -> getexStringCollection(key, expireSeconds), getExecutor()); } @Override @Deprecated(since = "2.8.0") public CompletableFuture> getexLongCollectionAsync(final String key, final int expireSeconds) { - return CompletableFuture.supplyAsync(() -> getexLongCollection(key, expireSeconds), getExecutor()); + return supplyAsync(() -> getexLongCollection(key, expireSeconds), getExecutor()); } @Override @Deprecated(since = "2.8.0") public CompletableFuture>> getCollectionMapAsync(boolean set, Type componentType, String... keys) { - return CompletableFuture.supplyAsync(() -> getCollectionMap(set, componentType, keys), getExecutor()); + return supplyAsync(() -> getCollectionMap(set, componentType, keys), getExecutor()); } @Override @Deprecated(since = "2.8.0") public CompletableFuture> getStringCollectionAsync(final String key) { - return CompletableFuture.supplyAsync(() -> getStringCollection(key), getExecutor()); + return supplyAsync(() -> getStringCollection(key), getExecutor()); } @Override @Deprecated(since = "2.8.0") public CompletableFuture>> getStringCollectionMapAsync(final boolean set, final String... keys) { - return CompletableFuture.supplyAsync(() -> getStringCollectionMap(set, keys), getExecutor()); + return supplyAsync(() -> getStringCollectionMap(set, keys), getExecutor()); } @Override @Deprecated(since = "2.8.0") public CompletableFuture> getLongCollectionAsync(final String key) { - return CompletableFuture.supplyAsync(() -> getLongCollection(key), getExecutor()); + return supplyAsync(() -> getLongCollection(key), getExecutor()); } @Override @Deprecated(since = "2.8.0") public CompletableFuture>> getLongCollectionMapAsync(final boolean set, final String... keys) { - return CompletableFuture.supplyAsync(() -> getLongCollectionMap(set, keys), getExecutor()); + return supplyAsync(() -> getLongCollectionMap(set, keys), getExecutor()); } @Override @Deprecated(since = "2.8.0") public CompletableFuture> getCollectionAsync(String key, Type componentType) { - return CompletableFuture.supplyAsync(() -> getCollection(key, componentType), getExecutor()); + return supplyAsync(() -> getCollection(key, componentType), getExecutor()); } @Override @@ -2119,13 +2126,13 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override @Deprecated(since = "2.8.0") public CompletableFuture> getLongMapAsync(final String... keys) { - return CompletableFuture.supplyAsync(() -> getLongMap(keys), getExecutor()); + return supplyAsync(() -> getLongMap(keys), getExecutor()); } @Override @Deprecated(since = "2.8.0") public CompletableFuture getLongArrayAsync(final String... keys) { - return CompletableFuture.supplyAsync(() -> getLongArray(keys), getExecutor()); + return supplyAsync(() -> getLongArray(keys), getExecutor()); } @Override @@ -2154,13 +2161,13 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override @Deprecated(since = "2.8.0") public CompletableFuture> getStringMapAsync(final String... keys) { - return CompletableFuture.supplyAsync(() -> getStringMap(keys), getExecutor()); + return supplyAsync(() -> getStringMap(keys), getExecutor()); } @Override @Deprecated(since = "2.8.0") public CompletableFuture getStringArrayAsync(final String... keys) { - return CompletableFuture.supplyAsync(() -> getStringArray(keys), getExecutor()); + return supplyAsync(() -> getStringArray(keys), getExecutor()); } @Override @@ -2176,7 +2183,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override @Deprecated(since = "2.8.0") public CompletableFuture> getMapAsync(final Type componentType, final String... keys) { - return CompletableFuture.supplyAsync(() -> getMap(componentType, keys), getExecutor()); + return supplyAsync(() -> getMap(componentType, keys), getExecutor()); } @Override @@ -2208,7 +2215,7 @@ public final class CacheMemorySource extends AbstractCacheSource { @Override @Deprecated(since = "2.8.0") public CompletableFuture getCollectionSizeAsync(final String key) { - return CompletableFuture.supplyAsync(() -> getCollectionSize(key), getExecutor()); + return supplyAsync(() -> getCollectionSize(key), getExecutor()); } @Override diff --git a/src/main/java/org/redkale/source/CacheSource.java b/src/main/java/org/redkale/source/CacheSource.java index 9ccee7c32..77d600bd2 100644 --- a/src/main/java/org/redkale/source/CacheSource.java +++ b/src/main/java/org/redkale/source/CacheSource.java @@ -248,9 +248,11 @@ public interface CacheSource extends Resourcable { public List hmget(final String key, final Type type, final String... fields); - public Map hmap(final String key, final Type type, AtomicInteger cursor, int limit); + default Map hscan(final String key, final Type type, AtomicInteger cursor, int limit) { + return hscan(key, type, cursor, limit, null); + } - public Map hmap(final String key, final Type type, AtomicInteger cursor, int limit, String pattern); + public Map hscan(final String key, final Type type, AtomicInteger cursor, int limit, String pattern); //------------------------ list ------------------------ public int llen(final String key); @@ -331,6 +333,12 @@ public interface CacheSource extends Resourcable { public List keys(String pattern); + default List scan(AtomicInteger cursor, int limit) { + return scan(cursor, limit, null); + } + + public List scan(AtomicInteger cursor, int limit, String pattern); + public long dbsize(); //---------------------- CompletableFuture 异步版 --------------------------------- @@ -553,9 +561,11 @@ public interface CacheSource extends Resourcable { public CompletableFuture> hmgetAsync(final String key, final Type type, final String... fields); - public CompletableFuture> hmapAsync(final String key, final Type type, AtomicInteger cursor, int limit); + default CompletableFuture> hscanAsync(final String key, final Type type, AtomicInteger cursor, int limit) { + return hscanAsync(key, type, cursor, limit, null); + } - public CompletableFuture> hmapAsync(final String key, final Type type, AtomicInteger cursor, int limit, String pattern); + public CompletableFuture> hscanAsync(final String key, final Type type, AtomicInteger cursor, int limit, String pattern); //------------------------ listAsync ------------------------ public CompletableFuture llenAsync(final String key); @@ -632,6 +642,12 @@ public interface CacheSource extends Resourcable { public CompletableFuture> keysAsync(String pattern); + default CompletableFuture> scanAsync(AtomicInteger cursor, int limit) { + return scanAsync(cursor, limit, null); + } + + public CompletableFuture> scanAsync(AtomicInteger cursor, int limit, String pattern); + public CompletableFuture dbsizeAsync(); //-------------------------- 过期方法 ----------------------------------