CacheSource增加sscan方法

This commit is contained in:
redkale
2023-06-08 11:34:34 +08:00
parent 685ae89685
commit 2e35abccd0
3 changed files with 94 additions and 16 deletions

View File

@@ -8,7 +8,7 @@ package org.redkale.cluster;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.*;
import java.util.logging.Level;
import org.redkale.annotation.*;
import org.redkale.annotation.ResourceListener;
@@ -237,7 +237,7 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
}
private CompletableFuture<Set<InetSocketAddress>> queryAddress(final String serviceName) {
final CompletableFuture<Map<String, AddressEntry>> future = source.hscanAsync(serviceName, AddressEntry.class, new AtomicInteger(), 10000);
final CompletableFuture<Map<String, AddressEntry>> future = source.hscanAsync(serviceName, AddressEntry.class, new AtomicLong(), 10000);
return future.thenApply(map -> {
final Set<InetSocketAddress> set = new HashSet<>();
map.forEach((n, v) -> {

View File

@@ -9,7 +9,7 @@ import java.io.Serializable;
import java.lang.reflect.Type;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.*;
import java.util.logging.*;
@@ -393,7 +393,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
@Override
public <T> Map<String, T> hscan(final String key, final Type type, AtomicInteger cursor, int limit, String pattern) {
public <T> Map<String, T> hscan(final String key, final Type type, AtomicLong cursor, int limit, String pattern) {
if (key == null) {
return new HashMap();
}
@@ -646,7 +646,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
@Override
public <T> CompletableFuture<Map<String, T>> hscanAsync(final String key, final Type type, AtomicInteger cursor, int limit, String pattern) {
public <T> CompletableFuture<Map<String, T>> hscanAsync(final String key, final Type type, AtomicLong cursor, int limit, String pattern) {
return supplyAsync(() -> hscan(key, type, cursor, limit, pattern), getExecutor());
}
@@ -1628,6 +1628,35 @@ public final class CacheMemorySource extends AbstractCacheSource {
return list;
}
@Override
public <T> Set< T> sscan(final String key, final Type componentType, AtomicLong cursor, int limit, String pattern) {
if (key == null) {
return new LinkedHashSet();
}
CacheEntry entry = container.get(key);
if (entry == null || !entry.isSetCacheType() || entry.csetValue == null) {
return new LinkedHashSet<>();
}
if (entry.csetValue.isEmpty()) {
return new LinkedHashSet<>();
}
Iterator it = entry.csetValue.iterator();
Set<T> list = new LinkedHashSet<>();
int index = 0;
while (it.hasNext()) {
Object obj = it.next();
if (obj != null && componentType == long.class) {
obj = ((Number) obj).longValue();
}
list.add((T) obj);
it.remove();
if (limit > 0 && ++index >= limit) {
break;
}
}
return list;
}
protected void appendSetItem(CacheEntryType cacheType, String key, Object value) {
if (key == null) {
return;
@@ -1845,7 +1874,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
@Override
public List<String> scan(AtomicInteger cursor, int limit, String pattern) {
public List<String> scan(AtomicLong cursor, int limit, String pattern) {
if (pattern == null || pattern.isEmpty()) {
return new ArrayList<>(container.keySet());
} else {
@@ -1882,7 +1911,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
@Override
public CompletableFuture<List<String>> scanAsync(AtomicInteger cursor, int limit, String pattern) {
public CompletableFuture<List<String>> scanAsync(AtomicLong cursor, int limit, String pattern) {
return supplyAsync(() -> scan(cursor, limit, pattern), getExecutor()).whenComplete(futureCompleteConsumer);
}
@@ -1896,6 +1925,11 @@ public final class CacheMemorySource extends AbstractCacheSource {
return supplyAsync(() -> spop(key, count, componentType), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public <T> CompletableFuture<Set<T>> sscanAsync(final String key, final Type componentType, AtomicLong cursor, int limit, String pattern) {
return supplyAsync(() -> sscan(key, componentType, cursor, limit, pattern), getExecutor()).whenComplete(futureCompleteConsumer);
}
@Override
public CompletableFuture<String> spopStringAsync(String key) {
return supplyAsync(() -> spopString(key), getExecutor()).whenComplete(futureCompleteConsumer);

View File

@@ -9,7 +9,7 @@ import java.io.Serializable;
import java.lang.reflect.Type;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.redkale.annotation.Component;
import org.redkale.convert.Convert;
import org.redkale.util.*;
@@ -248,11 +248,11 @@ public interface CacheSource extends Resourcable {
public <T> List<T> hmget(final String key, final Type type, final String... fields);
default <T> Map<String, T> hscan(final String key, final Type type, AtomicInteger cursor, int limit) {
default <T> Map<String, T> hscan(final String key, final Type type, AtomicLong cursor, int limit) {
return hscan(key, type, cursor, limit, null);
}
public <T> Map<String, T> hscan(final String key, final Type type, AtomicInteger cursor, int limit, String pattern);
public <T> Map<String, T> hscan(final String key, final Type type, AtomicLong cursor, int limit, String pattern);
//------------------------ list ------------------------
public int llen(final String key);
@@ -296,6 +296,28 @@ public interface CacheSource extends Resourcable {
public <T> Set<T> spop(final String key, final int count, final Type componentType);
public <T> Set<T> sscan(final String key, final Type componentType, AtomicLong cursor, int limit, String pattern);
default <T> Set<T> sscan(final String key, final Type componentType, AtomicLong cursor, int limit) {
return sscan(key, componentType, cursor, limit, null);
}
default Set<String> sscanString(final String key, AtomicLong cursor, int limit, String pattern) {
return sscan(key, String.class, cursor, limit, pattern);
}
default Set<Long> sscanLong(final String key, AtomicLong cursor, int limit, String pattern) {
return sscan(key, Long.class, cursor, limit, pattern);
}
default Set<String> sscanString(final String key, AtomicLong cursor, int limit) {
return sscan(key, String.class, cursor, limit, null);
}
default Set<Long> sscanLong(final String key, AtomicLong cursor, int limit) {
return sscan(key, Long.class, cursor, limit, null);
}
//---------- set-string ----------
default Set<String> smembersString(final String key) {
return smembers(key, String.class);
@@ -333,11 +355,11 @@ public interface CacheSource extends Resourcable {
public List<String> keys(String pattern);
default List<String> scan(AtomicInteger cursor, int limit) {
default List<String> scan(AtomicLong cursor, int limit) {
return scan(cursor, limit, null);
}
public List<String> scan(AtomicInteger cursor, int limit, String pattern);
public List<String> scan(AtomicLong cursor, int limit, String pattern);
public long dbsize();
@@ -565,11 +587,11 @@ public interface CacheSource extends Resourcable {
public <T> CompletableFuture<List<T>> hmgetAsync(final String key, final Type type, final String... fields);
default <T> CompletableFuture<Map<String, T>> hscanAsync(final String key, final Type type, AtomicInteger cursor, int limit) {
default <T> CompletableFuture<Map<String, T>> hscanAsync(final String key, final Type type, AtomicLong cursor, int limit) {
return hscanAsync(key, type, cursor, limit, null);
}
public <T> CompletableFuture<Map<String, T>> hscanAsync(final String key, final Type type, AtomicInteger cursor, int limit, String pattern);
public <T> CompletableFuture<Map<String, T>> hscanAsync(final String key, final Type type, AtomicLong cursor, int limit, String pattern);
//------------------------ listAsync ------------------------
public CompletableFuture<Integer> llenAsync(final String key);
@@ -609,6 +631,28 @@ public interface CacheSource extends Resourcable {
public <T> CompletableFuture<Set<T>> spopAsync(final String key, final int count, final Type componentType);
public <T> CompletableFuture<Set<T>> sscanAsync(final String key, final Type componentType, AtomicLong cursor, int limit, String pattern);
default <T> CompletableFuture<Set<T>> sscanAsync(final String key, final Type componentType, AtomicLong cursor, int limit) {
return sscanAsync(key, componentType, cursor, limit, null);
}
default CompletableFuture<Set<String>> sscanStringAsync(final String key, AtomicLong cursor, int limit, String pattern) {
return sscanAsync(key, String.class, cursor, limit, pattern);
}
default CompletableFuture<Set<Long>> sscanLongAsync(final String key, AtomicLong cursor, int limit, String pattern) {
return sscanAsync(key, Long.class, cursor, limit, pattern);
}
default CompletableFuture<Set<String>> sscanStringAsync(final String key, AtomicLong cursor, int limit) {
return sscanAsync(key, String.class, cursor, limit);
}
default CompletableFuture<Set<Long>> sscanLongAsync(final String key, AtomicLong cursor, int limit) {
return sscanAsync(key, Long.class, cursor, limit);
}
//---------- set-string ----------
default CompletableFuture<Set<String>> smembersStringAsync(final String key) {
return smembersAsync(key, String.class);
@@ -646,11 +690,11 @@ public interface CacheSource extends Resourcable {
public CompletableFuture<List<String>> keysAsync(String pattern);
default CompletableFuture<List<String>> scanAsync(AtomicInteger cursor, int limit) {
default CompletableFuture<List<String>> scanAsync(AtomicLong cursor, int limit) {
return scanAsync(cursor, limit, null);
}
public CompletableFuture<List<String>> scanAsync(AtomicInteger cursor, int limit, String pattern);
public CompletableFuture<List<String>> scanAsync(AtomicLong cursor, int limit, String pattern);
public CompletableFuture<Long> dbsizeAsync();