diff --git a/src/main/java/org/redkale/cluster/CacheClusterAgent.java b/src/main/java/org/redkale/cluster/CacheClusterAgent.java index 3fbc19e7d..af33b757c 100644 --- a/src/main/java/org/redkale/cluster/CacheClusterAgent.java +++ b/src/main/java/org/redkale/cluster/CacheClusterAgent.java @@ -8,7 +8,7 @@ package org.redkale.cluster; import java.net.InetSocketAddress; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.*; import java.util.logging.Level; import org.redkale.annotation.*; import org.redkale.annotation.ResourceListener; @@ -237,7 +237,7 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable { } private CompletableFuture> queryAddress(final String serviceName) { - final CompletableFuture> future = source.hscanAsync(serviceName, AddressEntry.class, new AtomicInteger(), 10000); + final CompletableFuture> future = source.hscanAsync(serviceName, AddressEntry.class, new AtomicLong(), 10000); return future.thenApply(map -> { final Set set = new HashSet<>(); map.forEach((n, v) -> { diff --git a/src/main/java/org/redkale/source/CacheMemorySource.java b/src/main/java/org/redkale/source/CacheMemorySource.java index 7479c7727..3c5cfd6b0 100644 --- a/src/main/java/org/redkale/source/CacheMemorySource.java +++ b/src/main/java/org/redkale/source/CacheMemorySource.java @@ -9,7 +9,7 @@ import java.io.Serializable; import java.lang.reflect.Type; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.*; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import java.util.function.*; import java.util.logging.*; @@ -393,7 +393,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } @Override - public Map hscan(final String key, final Type type, AtomicInteger cursor, int limit, String pattern) { + public Map hscan(final String key, final Type type, AtomicLong cursor, int limit, String pattern) { if (key == null) { return new HashMap(); } @@ -646,7 +646,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } @Override - public CompletableFuture> hscanAsync(final String key, final Type type, AtomicInteger cursor, int limit, String pattern) { + public CompletableFuture> hscanAsync(final String key, final Type type, AtomicLong cursor, int limit, String pattern) { return supplyAsync(() -> hscan(key, type, cursor, limit, pattern), getExecutor()); } @@ -1628,6 +1628,35 @@ public final class CacheMemorySource extends AbstractCacheSource { return list; } + @Override + public Set< T> sscan(final String key, final Type componentType, AtomicLong cursor, int limit, String pattern) { + if (key == null) { + return new LinkedHashSet(); + } + CacheEntry entry = container.get(key); + if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) { + return new LinkedHashSet<>(); + } + if (entry.csetValue.isEmpty()) { + return new LinkedHashSet<>(); + } + Iterator it = entry.csetValue.iterator(); + Set list = new LinkedHashSet<>(); + int index = 0; + while (it.hasNext()) { + Object obj = it.next(); + if (obj != null && componentType == long.class) { + obj = ((Number) obj).longValue(); + } + list.add((T) obj); + it.remove(); + if (limit > 0 && ++index >= limit) { + break; + } + } + return list; + } + protected void appendSetItem(CacheEntryType cacheType, String key, Object value) { if (key == null) { return; @@ -1845,7 +1874,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } @Override - public List scan(AtomicInteger cursor, int limit, String pattern) { + public List scan(AtomicLong cursor, int limit, String pattern) { if (pattern == null || pattern.isEmpty()) { return new ArrayList<>(container.keySet()); } else { @@ -1882,7 +1911,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } @Override - public CompletableFuture> scanAsync(AtomicInteger cursor, int limit, String pattern) { + public CompletableFuture> scanAsync(AtomicLong cursor, int limit, String pattern) { return supplyAsync(() -> scan(cursor, limit, pattern), getExecutor()).whenComplete(futureCompleteConsumer); } @@ -1896,6 +1925,11 @@ public final class CacheMemorySource extends AbstractCacheSource { return supplyAsync(() -> spop(key, count, componentType), getExecutor()).whenComplete(futureCompleteConsumer); } + @Override + public CompletableFuture> sscanAsync(final String key, final Type componentType, AtomicLong cursor, int limit, String pattern) { + return supplyAsync(() -> sscan(key, componentType, cursor, limit, pattern), getExecutor()).whenComplete(futureCompleteConsumer); + } + @Override public CompletableFuture spopStringAsync(String key) { return supplyAsync(() -> spopString(key), getExecutor()).whenComplete(futureCompleteConsumer); diff --git a/src/main/java/org/redkale/source/CacheSource.java b/src/main/java/org/redkale/source/CacheSource.java index 1dfbe4fe7..3a7fb1c0d 100644 --- a/src/main/java/org/redkale/source/CacheSource.java +++ b/src/main/java/org/redkale/source/CacheSource.java @@ -9,7 +9,7 @@ import java.io.Serializable; import java.lang.reflect.Type; import java.util.*; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.redkale.annotation.Component; import org.redkale.convert.Convert; import org.redkale.util.*; @@ -248,11 +248,11 @@ public interface CacheSource extends Resourcable { public List hmget(final String key, final Type type, final String... fields); - default Map hscan(final String key, final Type type, AtomicInteger cursor, int limit) { + default Map hscan(final String key, final Type type, AtomicLong cursor, int limit) { return hscan(key, type, cursor, limit, null); } - public Map hscan(final String key, final Type type, AtomicInteger cursor, int limit, String pattern); + public Map hscan(final String key, final Type type, AtomicLong cursor, int limit, String pattern); //------------------------ list ------------------------ public int llen(final String key); @@ -296,6 +296,28 @@ public interface CacheSource extends Resourcable { public Set spop(final String key, final int count, final Type componentType); + public Set sscan(final String key, final Type componentType, AtomicLong cursor, int limit, String pattern); + + default Set sscan(final String key, final Type componentType, AtomicLong cursor, int limit) { + return sscan(key, componentType, cursor, limit, null); + } + + default Set sscanString(final String key, AtomicLong cursor, int limit, String pattern) { + return sscan(key, String.class, cursor, limit, pattern); + } + + default Set sscanLong(final String key, AtomicLong cursor, int limit, String pattern) { + return sscan(key, Long.class, cursor, limit, pattern); + } + + default Set sscanString(final String key, AtomicLong cursor, int limit) { + return sscan(key, String.class, cursor, limit, null); + } + + default Set sscanLong(final String key, AtomicLong cursor, int limit) { + return sscan(key, Long.class, cursor, limit, null); + } + //---------- set-string ---------- default Set smembersString(final String key) { return smembers(key, String.class); @@ -333,11 +355,11 @@ public interface CacheSource extends Resourcable { public List keys(String pattern); - default List scan(AtomicInteger cursor, int limit) { + default List scan(AtomicLong cursor, int limit) { return scan(cursor, limit, null); } - public List scan(AtomicInteger cursor, int limit, String pattern); + public List scan(AtomicLong cursor, int limit, String pattern); public long dbsize(); @@ -565,11 +587,11 @@ public interface CacheSource extends Resourcable { public CompletableFuture> hmgetAsync(final String key, final Type type, final String... fields); - default CompletableFuture> hscanAsync(final String key, final Type type, AtomicInteger cursor, int limit) { + default CompletableFuture> hscanAsync(final String key, final Type type, AtomicLong cursor, int limit) { return hscanAsync(key, type, cursor, limit, null); } - public CompletableFuture> hscanAsync(final String key, final Type type, AtomicInteger cursor, int limit, String pattern); + public CompletableFuture> hscanAsync(final String key, final Type type, AtomicLong cursor, int limit, String pattern); //------------------------ listAsync ------------------------ public CompletableFuture llenAsync(final String key); @@ -609,6 +631,28 @@ public interface CacheSource extends Resourcable { public CompletableFuture> spopAsync(final String key, final int count, final Type componentType); + public CompletableFuture> sscanAsync(final String key, final Type componentType, AtomicLong cursor, int limit, String pattern); + + default CompletableFuture> sscanAsync(final String key, final Type componentType, AtomicLong cursor, int limit) { + return sscanAsync(key, componentType, cursor, limit, null); + } + + default CompletableFuture> sscanStringAsync(final String key, AtomicLong cursor, int limit, String pattern) { + return sscanAsync(key, String.class, cursor, limit, pattern); + } + + default CompletableFuture> sscanLongAsync(final String key, AtomicLong cursor, int limit, String pattern) { + return sscanAsync(key, Long.class, cursor, limit, pattern); + } + + default CompletableFuture> sscanStringAsync(final String key, AtomicLong cursor, int limit) { + return sscanAsync(key, String.class, cursor, limit); + } + + default CompletableFuture> sscanLongAsync(final String key, AtomicLong cursor, int limit) { + return sscanAsync(key, Long.class, cursor, limit); + } + //---------- set-string ---------- default CompletableFuture> smembersStringAsync(final String key) { return smembersAsync(key, String.class); @@ -646,11 +690,11 @@ public interface CacheSource extends Resourcable { public CompletableFuture> keysAsync(String pattern); - default CompletableFuture> scanAsync(AtomicInteger cursor, int limit) { + default CompletableFuture> scanAsync(AtomicLong cursor, int limit) { return scanAsync(cursor, limit, null); } - public CompletableFuture> scanAsync(AtomicInteger cursor, int limit, String pattern); + public CompletableFuture> scanAsync(AtomicLong cursor, int limit, String pattern); public CompletableFuture dbsizeAsync();