diff --git a/src/main/java/org/redkale/cluster/CacheClusterAgent.java b/src/main/java/org/redkale/cluster/CacheClusterAgent.java index 3f52b4260..f97513bb2 100644 --- a/src/main/java/org/redkale/cluster/CacheClusterAgent.java +++ b/src/main/java/org/redkale/cluster/CacheClusterAgent.java @@ -5,11 +5,11 @@ */ package org.redkale.cluster; -import java.net.*; +import java.net.InetSocketAddress; import java.util.*; import java.util.concurrent.*; import java.util.logging.Level; -import org.redkale.annotation.Resource; +import org.redkale.annotation.*; import org.redkale.annotation.ResourceListener; import org.redkale.boot.*; import org.redkale.convert.json.JsonConvert; @@ -141,7 +141,7 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable { } protected void loadMqtpAddressHealth() { - List keys = source.queryKeysStartsWith("cluster.mqtp:"); + List keys = source.keysStartsWith("cluster.mqtp:"); keys.forEach(serviceName -> { try { this.mqtpAddressMap.put(serviceName, queryAddress(serviceName).get(3, TimeUnit.SECONDS)); diff --git a/src/main/java/org/redkale/net/http/WebSocketNode.java b/src/main/java/org/redkale/net/http/WebSocketNode.java index ae07e5969..b776a384e 100644 --- a/src/main/java/org/redkale/net/http/WebSocketNode.java +++ b/src/main/java/org/redkale/net/http/WebSocketNode.java @@ -270,7 +270,7 @@ public abstract class WebSocketNode { return CompletableFuture.completedFuture(new LinkedHashSet<>(this.localEngine.getLocalUserSet().stream().map(x -> String.valueOf(x)).collect(Collectors.toList()))); } tryAcquireSemaphore(); - CompletableFuture> listFuture = this.source.queryKeysStartsWithAsync(WS_SOURCE_KEY_USERID_PREFIX); + CompletableFuture> listFuture = this.source.keysStartsWithAsync(WS_SOURCE_KEY_USERID_PREFIX); CompletableFuture> rs = listFuture.thenApply(v -> new LinkedHashSet<>(v.stream().map(x -> x.substring(WS_SOURCE_KEY_USERID_PREFIX.length())).collect(Collectors.toList()))); if (semaphore != null) rs.whenComplete((r, e) -> releaseSemaphore()); return rs; diff --git a/src/main/java/org/redkale/source/CacheMemorySource.java b/src/main/java/org/redkale/source/CacheMemorySource.java index fb953c6ca..7681de399 100644 --- a/src/main/java/org/redkale/source/CacheMemorySource.java +++ b/src/main/java/org/redkale/source/CacheMemorySource.java @@ -12,6 +12,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.function.*; import java.util.logging.*; +import java.util.regex.Pattern; import org.redkale.annotation.AutoLoad; import org.redkale.annotation.ConstructorParameters; import org.redkale.annotation.*; @@ -581,20 +582,6 @@ public final class CacheMemorySource extends AbstractCacheSource { return CompletableFuture.supplyAsync(() -> getexLong(key, expireSeconds, defValue), getExecutor()); } - @Override - public void refresh(String key, final int expireSeconds) { - if (key == null) return; - CacheEntry entry = container.get(key); - if (entry == null) return; - entry.lastAccessed = (int) (System.currentTimeMillis() / 1000); - entry.expireSeconds = expireSeconds; - } - - @Override - public CompletableFuture refreshAsync(final String key, final int expireSeconds) { - return CompletableFuture.runAsync(() -> refresh(key, expireSeconds), getExecutor()).whenComplete(futureCompleteConsumer); - } - protected void set(CacheEntryType cacheType, String key, Object value) { if (key == null) return; CacheEntry entry = container.get(key); @@ -1536,44 +1523,38 @@ public final class CacheMemorySource extends AbstractCacheSource { } @Override - public List queryKeys() { - return new ArrayList<>(container.keySet()); + public int getKeySize() { + return container.size(); } @Override - public List queryKeysStartsWith(String startsWith) { - if (startsWith == null) return queryKeys(); + public List keys(String pattern) { + if (pattern == null || pattern.isEmpty()) { + return new ArrayList<>(container.keySet()); + } else { + List rs = new ArrayList<>(); + Predicate filter = Pattern.compile(pattern).asPredicate(); + container.keySet().stream().filter(filter).forEach(x -> rs.add(x)); + return rs; + } + } + + @Override + public List keysStartsWith(String startsWith) { + if (startsWith == null) return keys(); List rs = new ArrayList<>(); container.keySet().stream().filter(x -> x.startsWith(startsWith)).forEach(x -> rs.add(x)); return rs; } @Override - public List queryKeysEndsWith(String endsWith) { - if (endsWith == null) return queryKeys(); - List rs = new ArrayList<>(); - container.keySet().stream().filter(x -> x.endsWith(endsWith)).forEach(x -> rs.add(x)); - return rs; + public CompletableFuture> keysAsync(String pattern) { + return CompletableFuture.completedFuture(keys(pattern)); } @Override - public int getKeySize() { - return container.size(); - } - - @Override - public CompletableFuture> queryKeysAsync() { - return CompletableFuture.completedFuture(new ArrayList<>(container.keySet())); - } - - @Override - public CompletableFuture> queryKeysStartsWithAsync(String startsWith) { - return CompletableFuture.completedFuture(queryKeysStartsWith(startsWith)); - } - - @Override - public CompletableFuture> queryKeysEndsWithAsync(String endsWith) { - return CompletableFuture.completedFuture(queryKeysEndsWith(endsWith)); + public CompletableFuture> keysStartsWithAsync(String startsWith) { + return CompletableFuture.completedFuture(keysStartsWith(startsWith)); } @Override diff --git a/src/main/java/org/redkale/source/CacheSource.java b/src/main/java/org/redkale/source/CacheSource.java index b0e6f78bc..cf6e10d46 100644 --- a/src/main/java/org/redkale/source/CacheSource.java +++ b/src/main/java/org/redkale/source/CacheSource.java @@ -238,6 +238,29 @@ public interface CacheSource extends Resourcable { public Set spopLong(final String key, final int count); + //------------------------ other ------------------------ + default List keys() { + return keys(null); + } + + default List keysStartsWith(String startsWith) { + return keys(startsWith + "*"); + } + + public List keys(String pattern); + + public int getKeySize(); + + public Map getMap(final Type componentType, final String... keys); + + public Map getStringMap(final String... keys); + + public String[] getStringArray(final String... keys); + + public Map getLongMap(final String... keys); + + public Long[] getLongArray(final String... keys); + //------------------------ collection ------------------------ @Deprecated public Collection getCollection(final String key, final Type componentType); @@ -269,25 +292,6 @@ public interface CacheSource extends Resourcable { @Deprecated public Collection getexLongCollection(final String key, final int expireSeconds); - //------------------------ other ------------------------ - public List queryKeys(); - - public List queryKeysStartsWith(String startsWith); - - public List queryKeysEndsWith(String endsWith); - - public int getKeySize(); - - public Map getMap(final Type componentType, final String... keys); - - public Map getStringMap(final String... keys); - - public String[] getStringArray(final String... keys); - - public Map getLongMap(final String... keys); - - public Long[] getLongArray(final String... keys); - //---------------------- CompletableFuture 异步版 --------------------------------- default CompletableFuture isOpenAsync() { return CompletableFuture.completedFuture(isOpen()); @@ -497,6 +501,29 @@ public interface CacheSource extends Resourcable { public CompletableFuture> spopLongAsync(final String key, final int count); + //------------------------ other-Async ------------------------ + default CompletableFuture> keysAsync() { + return keysAsync(null); + } + + default CompletableFuture> keysStartsWithAsync(String startsWith) { + return keysAsync(startsWith + "*"); + } + + public CompletableFuture> keysAsync(String pattern); + + public CompletableFuture> getMapAsync(final Type componentType, final String... keys); + + public CompletableFuture getKeySizeAsync(); + + public CompletableFuture> getStringMapAsync(final String... keys); + + public CompletableFuture getStringArrayAsync(final String... keys); + + public CompletableFuture> getLongMapAsync(final String... keys); + + public CompletableFuture getLongArrayAsync(final String... keys); + //------------------------ collectionAsync ------------------------ @Deprecated public CompletableFuture> getCollectionAsync(final String key, final Type componentType); @@ -528,25 +555,6 @@ public interface CacheSource extends Resourcable { @Deprecated public CompletableFuture> getexLongCollectionAsync(final String key, final int expireSeconds); - //------------------------ other-Async ------------------------ - public CompletableFuture> getMapAsync(final Type componentType, final String... keys); - - public CompletableFuture> queryKeysAsync(); - - public CompletableFuture> queryKeysStartsWithAsync(String startsWith); - - public CompletableFuture> queryKeysEndsWithAsync(String endsWith); - - public CompletableFuture getKeySizeAsync(); - - public CompletableFuture> getStringMapAsync(final String... keys); - - public CompletableFuture getStringArrayAsync(final String... keys); - - public CompletableFuture> getLongMapAsync(final String... keys); - - public CompletableFuture getLongArrayAsync(final String... keys); - //-------------------------- 过期方法 ---------------------------------- @Deprecated default CompletableFuture refreshAsync(final String key, final int expireSeconds) { @@ -892,4 +900,34 @@ public interface CacheSource extends Resourcable { default int removeLongListItem(final String key, final long value) { return lremLong(key, value); } + + @Deprecated + default List queryKeys() { + return keys(); + } + + @Deprecated + default List queryKeysStartsWith(String startsWith) { + return keys(startsWith + "*"); + } + + @Deprecated + default List queryKeysEndsWith(String endsWith) { + return keys("*" + endsWith); + } + + @Deprecated + default CompletableFuture> queryKeysAsync() { + return keysAsync(); + } + + @Deprecated + default CompletableFuture> queryKeysStartsWithAsync(String startsWith) { + return keysAsync(startsWith + "*"); + } + + @Deprecated + default CompletableFuture> queryKeysEndsWithAsync(String endsWith) { + return keysAsync("*" + endsWith); + } }