优化CacheSource

This commit is contained in:
Redkale
2022-12-23 10:48:27 +08:00
parent 989f1cddf8
commit 31eb6f8701
4 changed files with 101 additions and 82 deletions

View File

@@ -5,11 +5,11 @@
*/
package org.redkale.cluster;
import java.net.*;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.*;
import java.util.logging.Level;
import org.redkale.annotation.Resource;
import org.redkale.annotation.*;
import org.redkale.annotation.ResourceListener;
import org.redkale.boot.*;
import org.redkale.convert.json.JsonConvert;
@@ -141,7 +141,7 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
}
protected void loadMqtpAddressHealth() {
List<String> keys = source.queryKeysStartsWith("cluster.mqtp:");
List<String> keys = source.keysStartsWith("cluster.mqtp:");
keys.forEach(serviceName -> {
try {
this.mqtpAddressMap.put(serviceName, queryAddress(serviceName).get(3, TimeUnit.SECONDS));

View File

@@ -270,7 +270,7 @@ public abstract class WebSocketNode {
return CompletableFuture.completedFuture(new LinkedHashSet<>(this.localEngine.getLocalUserSet().stream().map(x -> String.valueOf(x)).collect(Collectors.toList())));
}
tryAcquireSemaphore();
CompletableFuture<List<String>> listFuture = this.source.queryKeysStartsWithAsync(WS_SOURCE_KEY_USERID_PREFIX);
CompletableFuture<List<String>> listFuture = this.source.keysStartsWithAsync(WS_SOURCE_KEY_USERID_PREFIX);
CompletableFuture<Set<String>> rs = listFuture.thenApply(v -> new LinkedHashSet<>(v.stream().map(x -> x.substring(WS_SOURCE_KEY_USERID_PREFIX.length())).collect(Collectors.toList())));
if (semaphore != null) rs.whenComplete((r, e) -> releaseSemaphore());
return rs;

View File

@@ -12,6 +12,7 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.*;
import java.util.logging.*;
import java.util.regex.Pattern;
import org.redkale.annotation.AutoLoad;
import org.redkale.annotation.ConstructorParameters;
import org.redkale.annotation.*;
@@ -581,20 +582,6 @@ public final class CacheMemorySource extends AbstractCacheSource {
return CompletableFuture.supplyAsync(() -> getexLong(key, expireSeconds, defValue), getExecutor());
}
@Override
public void refresh(String key, final int expireSeconds) {
if (key == null) return;
CacheEntry entry = container.get(key);
if (entry == null) return;
entry.lastAccessed = (int) (System.currentTimeMillis() / 1000);
entry.expireSeconds = expireSeconds;
}
@Override
public CompletableFuture<Void> refreshAsync(final String key, final int expireSeconds) {
return CompletableFuture.runAsync(() -> refresh(key, expireSeconds), getExecutor()).whenComplete(futureCompleteConsumer);
}
protected void set(CacheEntryType cacheType, String key, Object value) {
if (key == null) return;
CacheEntry entry = container.get(key);
@@ -1536,44 +1523,38 @@ public final class CacheMemorySource extends AbstractCacheSource {
}
@Override
public List<String> queryKeys() {
return new ArrayList<>(container.keySet());
public int getKeySize() {
return container.size();
}
@Override
public List<String> queryKeysStartsWith(String startsWith) {
if (startsWith == null) return queryKeys();
public List<String> keys(String pattern) {
if (pattern == null || pattern.isEmpty()) {
return new ArrayList<>(container.keySet());
} else {
List<String> rs = new ArrayList<>();
Predicate<String> filter = Pattern.compile(pattern).asPredicate();
container.keySet().stream().filter(filter).forEach(x -> rs.add(x));
return rs;
}
}
@Override
public List<String> keysStartsWith(String startsWith) {
if (startsWith == null) return keys();
List<String> rs = new ArrayList<>();
container.keySet().stream().filter(x -> x.startsWith(startsWith)).forEach(x -> rs.add(x));
return rs;
}
@Override
public List<String> queryKeysEndsWith(String endsWith) {
if (endsWith == null) return queryKeys();
List<String> rs = new ArrayList<>();
container.keySet().stream().filter(x -> x.endsWith(endsWith)).forEach(x -> rs.add(x));
return rs;
public CompletableFuture<List<String>> keysAsync(String pattern) {
return CompletableFuture.completedFuture(keys(pattern));
}
@Override
public int getKeySize() {
return container.size();
}
@Override
public CompletableFuture<List<String>> queryKeysAsync() {
return CompletableFuture.completedFuture(new ArrayList<>(container.keySet()));
}
@Override
public CompletableFuture<List<String>> queryKeysStartsWithAsync(String startsWith) {
return CompletableFuture.completedFuture(queryKeysStartsWith(startsWith));
}
@Override
public CompletableFuture<List<String>> queryKeysEndsWithAsync(String endsWith) {
return CompletableFuture.completedFuture(queryKeysEndsWith(endsWith));
public CompletableFuture<List<String>> keysStartsWithAsync(String startsWith) {
return CompletableFuture.completedFuture(keysStartsWith(startsWith));
}
@Override

View File

@@ -238,6 +238,29 @@ public interface CacheSource extends Resourcable {
public Set<Long> spopLong(final String key, final int count);
//------------------------ other ------------------------
default List<String> keys() {
return keys(null);
}
default List<String> keysStartsWith(String startsWith) {
return keys(startsWith + "*");
}
public List<String> keys(String pattern);
public int getKeySize();
public <T> Map<String, T> getMap(final Type componentType, final String... keys);
public Map<String, String> getStringMap(final String... keys);
public String[] getStringArray(final String... keys);
public Map<String, Long> getLongMap(final String... keys);
public Long[] getLongArray(final String... keys);
//------------------------ collection ------------------------
@Deprecated
public <T> Collection<T> getCollection(final String key, final Type componentType);
@@ -269,25 +292,6 @@ public interface CacheSource extends Resourcable {
@Deprecated
public Collection<Long> getexLongCollection(final String key, final int expireSeconds);
//------------------------ other ------------------------
public List<String> queryKeys();
public List<String> queryKeysStartsWith(String startsWith);
public List<String> queryKeysEndsWith(String endsWith);
public int getKeySize();
public <T> Map<String, T> getMap(final Type componentType, final String... keys);
public Map<String, String> getStringMap(final String... keys);
public String[] getStringArray(final String... keys);
public Map<String, Long> getLongMap(final String... keys);
public Long[] getLongArray(final String... keys);
//---------------------- CompletableFuture 异步版 ---------------------------------
default CompletableFuture<Boolean> isOpenAsync() {
return CompletableFuture.completedFuture(isOpen());
@@ -497,6 +501,29 @@ public interface CacheSource extends Resourcable {
public CompletableFuture<Set<Long>> spopLongAsync(final String key, final int count);
//------------------------ other-Async ------------------------
default CompletableFuture<List<String>> keysAsync() {
return keysAsync(null);
}
default CompletableFuture<List<String>> keysStartsWithAsync(String startsWith) {
return keysAsync(startsWith + "*");
}
public CompletableFuture<List<String>> keysAsync(String pattern);
public <T> CompletableFuture<Map<String, T>> getMapAsync(final Type componentType, final String... keys);
public CompletableFuture<Integer> getKeySizeAsync();
public CompletableFuture<Map<String, String>> getStringMapAsync(final String... keys);
public CompletableFuture<String[]> getStringArrayAsync(final String... keys);
public CompletableFuture<Map<String, Long>> getLongMapAsync(final String... keys);
public CompletableFuture<Long[]> getLongArrayAsync(final String... keys);
//------------------------ collectionAsync ------------------------
@Deprecated
public <T> CompletableFuture<Collection<T>> getCollectionAsync(final String key, final Type componentType);
@@ -528,25 +555,6 @@ public interface CacheSource extends Resourcable {
@Deprecated
public CompletableFuture<Collection<Long>> getexLongCollectionAsync(final String key, final int expireSeconds);
//------------------------ other-Async ------------------------
public <T> CompletableFuture<Map<String, T>> getMapAsync(final Type componentType, final String... keys);
public CompletableFuture<List<String>> queryKeysAsync();
public CompletableFuture<List<String>> queryKeysStartsWithAsync(String startsWith);
public CompletableFuture<List<String>> queryKeysEndsWithAsync(String endsWith);
public CompletableFuture<Integer> getKeySizeAsync();
public CompletableFuture<Map<String, String>> getStringMapAsync(final String... keys);
public CompletableFuture<String[]> getStringArrayAsync(final String... keys);
public CompletableFuture<Map<String, Long>> getLongMapAsync(final String... keys);
public CompletableFuture<Long[]> getLongArrayAsync(final String... keys);
//-------------------------- 过期方法 ----------------------------------
@Deprecated
default CompletableFuture<Void> refreshAsync(final String key, final int expireSeconds) {
@@ -892,4 +900,34 @@ public interface CacheSource extends Resourcable {
default int removeLongListItem(final String key, final long value) {
return lremLong(key, value);
}
@Deprecated
default List<String> queryKeys() {
return keys();
}
@Deprecated
default List<String> queryKeysStartsWith(String startsWith) {
return keys(startsWith + "*");
}
@Deprecated
default List<String> queryKeysEndsWith(String endsWith) {
return keys("*" + endsWith);
}
@Deprecated
default CompletableFuture<List<String>> queryKeysAsync() {
return keysAsync();
}
@Deprecated
default CompletableFuture<List<String>> queryKeysStartsWithAsync(String startsWith) {
return keysAsync(startsWith + "*");
}
@Deprecated
default CompletableFuture<List<String>> queryKeysEndsWithAsync(String endsWith) {
return keysAsync("*" + endsWith);
}
}