CacheSource的增删改查操作增加Type参数
This commit is contained in:
@@ -60,7 +60,7 @@ public abstract class WebSocketNode {
|
||||
protected Semaphore semaphore;
|
||||
|
||||
public void init(AnyValue conf) {
|
||||
if (sncpNodeAddresses != null) sncpNodeAddresses.initValueType(InetSocketAddress.class);
|
||||
//if (sncpNodeAddresses != null) sncpNodeAddresses.initValueType(InetSocketAddress.class);
|
||||
if (localEngine != null) {
|
||||
int wsthreads = localEngine.wsthreads;
|
||||
if (wsthreads == 0) wsthreads = Runtime.getRuntime().availableProcessors() * 8;
|
||||
@@ -82,7 +82,7 @@ public abstract class WebSocketNode {
|
||||
//关掉所有本地本地WebSocket
|
||||
this.localEngine.getLocalWebSockets().forEach(g -> g.close());
|
||||
if (sncpNodeAddresses != null && localSncpAddress != null) {
|
||||
sncpNodeAddresses.removeSetItem(SOURCE_SNCP_ADDRS_KEY, localSncpAddress);
|
||||
sncpNodeAddresses.removeSetItem(SOURCE_SNCP_ADDRS_KEY, InetSocketAddress.class, localSncpAddress);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -151,7 +151,7 @@ public abstract class WebSocketNode {
|
||||
public CompletableFuture<Collection<InetSocketAddress>> getRpcNodeAddresses(final Serializable userid) {
|
||||
if (this.sncpNodeAddresses != null) {
|
||||
tryAcquireSemaphore();
|
||||
CompletableFuture<Collection<InetSocketAddress>> result = this.sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid);
|
||||
CompletableFuture<Collection<InetSocketAddress>> result = this.sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class);
|
||||
if (semaphore != null) result.whenComplete((r, e) -> releaseSemaphore());
|
||||
return result;
|
||||
}
|
||||
@@ -235,7 +235,7 @@ public abstract class WebSocketNode {
|
||||
}
|
||||
//远程节点关闭
|
||||
tryAcquireSemaphore();
|
||||
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid);
|
||||
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class);
|
||||
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
|
||||
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
|
||||
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket found userid:" + userid + " on " + addrs);
|
||||
@@ -513,7 +513,7 @@ public abstract class WebSocketNode {
|
||||
final Object remoteMessage = formatRemoteMessage(message);
|
||||
CompletableFuture<Integer> localFuture = this.localEngine == null ? null : this.localEngine.broadcastMessage(wsrange, message, last);
|
||||
tryAcquireSemaphore();
|
||||
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_ADDRS_KEY);
|
||||
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_ADDRS_KEY, InetSocketAddress.class);
|
||||
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
|
||||
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
|
||||
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket broadcast message (" + remoteMessage + ") on " + addrs);
|
||||
@@ -544,7 +544,7 @@ public abstract class WebSocketNode {
|
||||
//远程节点发送消息
|
||||
final Object remoteMessage = formatRemoteMessage(message);
|
||||
tryAcquireSemaphore();
|
||||
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid);
|
||||
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class);
|
||||
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
|
||||
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
|
||||
if (addrs == null || addrs.isEmpty()) {
|
||||
@@ -577,7 +577,7 @@ public abstract class WebSocketNode {
|
||||
}
|
||||
CompletableFuture<Integer> localFuture = this.localEngine == null ? null : this.localEngine.broadcastAction(action);
|
||||
tryAcquireSemaphore();
|
||||
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_ADDRS_KEY);
|
||||
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_ADDRS_KEY, InetSocketAddress.class);
|
||||
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
|
||||
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
|
||||
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket broadcast action (" + action + ") on " + addrs);
|
||||
@@ -597,7 +597,7 @@ public abstract class WebSocketNode {
|
||||
* 向指定用户发送操作,先发送本地连接,再发送远程连接 <br>
|
||||
* 如果当前WebSocketNode是远程模式,此方法只发送远程连接
|
||||
*
|
||||
* @param action 操作参数
|
||||
* @param action 操作参数
|
||||
* @param userids Serializable[]
|
||||
*
|
||||
* @return 为0表示成功, 其他值表示部分发送异常
|
||||
@@ -628,7 +628,7 @@ public abstract class WebSocketNode {
|
||||
}
|
||||
//远程节点发送操作
|
||||
tryAcquireSemaphore();
|
||||
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid);
|
||||
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class);
|
||||
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
|
||||
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
|
||||
if (addrs == null || addrs.isEmpty()) {
|
||||
|
||||
@@ -90,8 +90,8 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
|
||||
@Override
|
||||
public CompletableFuture<Void> connect(Serializable userid, InetSocketAddress sncpAddr) {
|
||||
tryAcquireSemaphore();
|
||||
CompletableFuture<Void> future = sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, sncpAddr);
|
||||
future = future.thenAccept((a) -> sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_ADDRS_KEY, sncpAddr));
|
||||
CompletableFuture<Void> future = sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class, sncpAddr);
|
||||
future = future.thenAccept((a) -> sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_ADDRS_KEY, InetSocketAddress.class, sncpAddr));
|
||||
if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore());
|
||||
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " connect from " + sncpAddr);
|
||||
return future;
|
||||
@@ -108,7 +108,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
|
||||
@Override
|
||||
public CompletableFuture<Void> disconnect(Serializable userid, InetSocketAddress sncpAddr) {
|
||||
tryAcquireSemaphore();
|
||||
CompletableFuture<Void> future = sncpNodeAddresses.removeSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, sncpAddr);
|
||||
CompletableFuture<Void> future = sncpNodeAddresses.removeSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class, sncpAddr);
|
||||
if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore());
|
||||
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " disconnect from " + sncpAddr);
|
||||
return future;
|
||||
@@ -126,8 +126,8 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
|
||||
@Override
|
||||
public CompletableFuture<Void> changeUserid(Serializable olduserid, Serializable newuserid, InetSocketAddress sncpAddr) {
|
||||
tryAcquireSemaphore();
|
||||
CompletableFuture<Void> future = sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_USERID_PREFIX + newuserid, sncpAddr);
|
||||
future = future.thenAccept((a) -> sncpNodeAddresses.removeSetItemAsync(SOURCE_SNCP_USERID_PREFIX + olduserid, sncpAddr));
|
||||
CompletableFuture<Void> future = sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_USERID_PREFIX + newuserid, InetSocketAddress.class, sncpAddr);
|
||||
future = future.thenAccept((a) -> sncpNodeAddresses.removeSetItemAsync(SOURCE_SNCP_USERID_PREFIX + olduserid, InetSocketAddress.class, sncpAddr));
|
||||
if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore());
|
||||
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + olduserid + " changeUserid to " + newuserid + " from " + sncpAddr);
|
||||
return future;
|
||||
|
||||
@@ -27,9 +27,9 @@ import org.redkale.util.*;
|
||||
*
|
||||
* @author zhangjx
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
@Local
|
||||
@AutoLoad(false)
|
||||
@SuppressWarnings("unchecked")
|
||||
@ResourceType(CacheSource.class)
|
||||
public class CacheMemorySource<V extends Object> extends AbstractService implements CacheSource<V>, Service, AutoCloseable, Resourcable {
|
||||
|
||||
@@ -340,6 +340,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
return (V) entry.objectValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T get(final String key, final Type type) {
|
||||
return (T) get(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getString(String key) {
|
||||
if (key == null) return null;
|
||||
@@ -361,6 +366,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
return CompletableFuture.supplyAsync(() -> get(key), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> CompletableFuture<T> getAsync(final String key, final Type type) {
|
||||
return CompletableFuture.supplyAsync(() -> (T) get(key), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<String> getStringAsync(final String key) {
|
||||
return CompletableFuture.supplyAsync(() -> getString(key), getExecutor());
|
||||
@@ -385,6 +395,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
return (V) entry.objectValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T getAndRefresh(final String key, final int expireSeconds, final Type type) {
|
||||
return (T) getAndRefresh(key, expireSeconds);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
@SuppressWarnings("unchecked")
|
||||
@@ -415,6 +430,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
return CompletableFuture.supplyAsync(() -> getAndRefresh(key, expireSeconds), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> CompletableFuture<T> getAndRefreshAsync(final String key, final int expireSeconds, final Type type) {
|
||||
return CompletableFuture.supplyAsync(() -> getAndRefresh(key, expireSeconds, type), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<String> getStringAndRefreshAsync(final String key, final int expireSeconds) {
|
||||
@@ -462,6 +482,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
set(CacheEntryType.OBJECT, key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> void set(String key, Type type, T value) {
|
||||
set(CacheEntryType.OBJECT, key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void setString(String key, String value) {
|
||||
@@ -480,6 +505,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
return CompletableFuture.runAsync(() -> set(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> CompletableFuture<Void> setAsync(String key, Type type, T value) {
|
||||
return CompletableFuture.runAsync(() -> set(key, type, value), getExecutor()).whenComplete(futureCompleteConsumer);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<Void> setStringAsync(String key, String value) {
|
||||
@@ -511,6 +541,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
set(CacheEntryType.OBJECT, expireSeconds, key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> void set(final int expireSeconds, String key, Type type, T value) {
|
||||
set(CacheEntryType.OBJECT, expireSeconds, key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void setString(int expireSeconds, String key, String value) {
|
||||
@@ -529,6 +564,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
return CompletableFuture.runAsync(() -> set(expireSeconds, key, value), getExecutor()).whenComplete(futureCompleteConsumer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> CompletableFuture<Void> setAsync(int expireSeconds, String key, Type type, T value) {
|
||||
return CompletableFuture.runAsync(() -> set(expireSeconds, key, type, value), getExecutor()).whenComplete(futureCompleteConsumer);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<Void> setStringAsync(int expireSeconds, String key, String value) {
|
||||
@@ -632,6 +672,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
return (Collection<V>) get(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Collection<T> getCollection(final String key, final Type componentType) {
|
||||
return (Collection<T>) get(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<String> getStringCollection(final String key) {
|
||||
return (Collection<String>) get(key);
|
||||
@@ -647,6 +692,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
return CompletableFuture.supplyAsync(() -> getCollection(key), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Collection<V>> getCollectionAsync(final String key, final Type componentType) {
|
||||
return CompletableFuture.supplyAsync(() -> getCollection(key, componentType), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Collection<String>> getStringCollectionAsync(final String key) {
|
||||
return CompletableFuture.supplyAsync(() -> getStringCollection(key), getExecutor());
|
||||
@@ -674,6 +724,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
return (Collection<V>) getAndRefresh(key, expireSeconds);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Collection<T> getCollectionAndRefresh(final String key, final int expireSeconds, final Type componentType) {
|
||||
return (Collection<T>) getAndRefresh(key, expireSeconds, componentType);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public Collection<String> getStringCollectionAndRefresh(final String key, final int expireSeconds) {
|
||||
@@ -686,11 +741,22 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
return list != null && list.contains(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> boolean existsSetItem(final String key, final Type type, final T value) {
|
||||
Collection list = getCollection(key);
|
||||
return list != null && list.contains(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> existsSetItemAsync(final String key, final V value) {
|
||||
return CompletableFuture.supplyAsync(() -> existsSetItem(key, value), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> CompletableFuture<Boolean> existsSetItemAsync(final String key, final Type type, final T value) {
|
||||
return CompletableFuture.supplyAsync(() -> existsSetItem(key, type, value), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean existsStringSetItem(final String key, final String value) {
|
||||
Collection<String> list = getStringCollection(key);
|
||||
@@ -725,6 +791,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
return CompletableFuture.supplyAsync(() -> getCollectionAndRefresh(key, expireSeconds), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> CompletableFuture<Collection<T>> getCollectionAndRefreshAsync(final String key, final int expireSeconds, final Type componentType) {
|
||||
return CompletableFuture.supplyAsync(() -> getCollectionAndRefresh(key, expireSeconds, componentType), getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<Collection<String>> getStringCollectionAndRefreshAsync(final String key, final int expireSeconds) {
|
||||
@@ -757,6 +828,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
appendListItem(CacheEntryType.OBJECT_LIST, key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> void appendListItem(String key, Type componentType, T value) {
|
||||
appendListItem(CacheEntryType.OBJECT_LIST, key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void appendStringListItem(String key, String value) {
|
||||
@@ -775,6 +851,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
return CompletableFuture.runAsync(() -> appendListItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> CompletableFuture<Void> appendListItemAsync(final String key, final Type componentType, final T value) {
|
||||
return CompletableFuture.runAsync(() -> appendListItem(key, componentType, value), getExecutor()).whenComplete(futureCompleteConsumer);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<Void> appendStringListItemAsync(final String key, final String value) {
|
||||
@@ -796,6 +877,14 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
entry.listValue.remove(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> void removeListItem(String key, final Type componentType, T value) {
|
||||
if (key == null) return;
|
||||
CacheEntry entry = container.get(key);
|
||||
if (entry == null || entry.listValue == null) return;
|
||||
entry.listValue.remove(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void removeStringListItem(String key, String value) {
|
||||
@@ -820,6 +909,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
return CompletableFuture.runAsync(() -> removeListItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> CompletableFuture<Void> removeListItemAsync(final String key, final Type componentType, T value) {
|
||||
return CompletableFuture.runAsync(() -> removeListItem(key, componentType, value), getExecutor()).whenComplete(futureCompleteConsumer);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<Void> removeStringListItemAsync(final String key, final String value) {
|
||||
@@ -852,6 +946,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
appendSetItem(CacheEntryType.OBJECT_SET, key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> void appendSetItem(String key, final Type componentType, T value) {
|
||||
appendSetItem(CacheEntryType.OBJECT_SET, key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void appendStringSetItem(String key, String value) {
|
||||
@@ -870,6 +969,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
return CompletableFuture.runAsync(() -> appendSetItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> CompletableFuture<Void> appendSetItemAsync(final String key, final Type componentType, T value) {
|
||||
return CompletableFuture.runAsync(() -> appendSetItem(key, componentType, value), getExecutor()).whenComplete(futureCompleteConsumer);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<Void> appendStringSetItemAsync(final String key, final String value) {
|
||||
@@ -891,6 +995,14 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
entry.csetValue.remove(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> void removeSetItem(String key, Type type, T value) {
|
||||
if (key == null) return;
|
||||
CacheEntry entry = container.get(key);
|
||||
if (entry == null || entry.csetValue == null) return;
|
||||
entry.csetValue.remove(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public void removeStringSetItem(String key, String value) {
|
||||
@@ -915,6 +1027,11 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
|
||||
return CompletableFuture.runAsync(() -> removeSetItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> CompletableFuture<Void> removeSetItemAsync(final String key, final Type componentType, final T value) {
|
||||
return CompletableFuture.runAsync(() -> removeSetItem(key, componentType, value), getExecutor()).whenComplete(futureCompleteConsumer);
|
||||
}
|
||||
|
||||
@Override
|
||||
@RpcMultiRun
|
||||
public CompletableFuture<Void> removeStringSetItemAsync(final String key, final String value) {
|
||||
|
||||
@@ -42,6 +42,8 @@ public interface CacheSource<V extends Object> {
|
||||
|
||||
public V get(final String key);
|
||||
|
||||
public <T> T get(final String key, final Type type);
|
||||
|
||||
default V getIfAbsent(final String key, Function<String, ? extends V> mappingFunction) {
|
||||
V rs = get(key);
|
||||
if (rs == null) {
|
||||
@@ -53,6 +55,8 @@ public interface CacheSource<V extends Object> {
|
||||
|
||||
public V getAndRefresh(final String key, final int expireSeconds);
|
||||
|
||||
public <T> T getAndRefresh(final String key, final int expireSeconds, final Type type);
|
||||
|
||||
default V getAndRefreshIfAbsent(final String key, final int expireSeconds, Function<String, ? extends V> mappingFunction) {
|
||||
V rs = getAndRefresh(key, expireSeconds);
|
||||
if (rs == null) {
|
||||
@@ -66,8 +70,12 @@ public interface CacheSource<V extends Object> {
|
||||
|
||||
public void set(final String key, final V value);
|
||||
|
||||
public <T> void set(final String key, final Type type, final T value);
|
||||
|
||||
public void set(final int expireSeconds, final String key, final V value);
|
||||
|
||||
public <T> void set(final int expireSeconds, final String key, final Type type, final T value);
|
||||
|
||||
public void setExpireSeconds(final String key, final int expireSeconds);
|
||||
|
||||
public void remove(final String key);
|
||||
@@ -82,10 +90,14 @@ public interface CacheSource<V extends Object> {
|
||||
|
||||
public Collection<V> getCollection(final String key);
|
||||
|
||||
public <T> Collection<T> getCollection(final String key, final Type componentType);
|
||||
|
||||
public int getCollectionSize(final String key);
|
||||
|
||||
public Collection<V> getCollectionAndRefresh(final String key, final int expireSeconds);
|
||||
|
||||
public <T> Collection<T> getCollectionAndRefresh(final String key, final int expireSeconds, final Type componentType);
|
||||
|
||||
public void appendListItem(final String key, final V value);
|
||||
|
||||
public void removeListItem(final String key, final V value);
|
||||
@@ -96,6 +108,16 @@ public interface CacheSource<V extends Object> {
|
||||
|
||||
public void removeSetItem(final String key, final V value);
|
||||
|
||||
public <T> void appendListItem(final String key, final Type componentType, final T value);
|
||||
|
||||
public <T> void removeListItem(final String key, final Type componentType, final T value);
|
||||
|
||||
public <T> boolean existsSetItem(final String key, final Type componentType, final T value);
|
||||
|
||||
public <T> void appendSetItem(final String key, final Type componentType, final T value);
|
||||
|
||||
public <T> void removeSetItem(final String key, final Type componentType, final T value);
|
||||
|
||||
public List<String> queryKeys();
|
||||
|
||||
public List<String> queryKeysStartsWith(String startsWith);
|
||||
@@ -153,6 +175,8 @@ public interface CacheSource<V extends Object> {
|
||||
//---------------------- CompletableFuture 异步版 ---------------------------------
|
||||
public CompletableFuture<Boolean> existsAsync(final String key);
|
||||
|
||||
public <T> CompletableFuture<T> getAsync(final String key, final Type type);
|
||||
|
||||
public CompletableFuture<V> getAsync(final String key);
|
||||
|
||||
default CompletableFuture<V> getIfAbsentAsync(final String key, Function<String, ? extends V> mappingFunction) {
|
||||
@@ -170,6 +194,8 @@ public interface CacheSource<V extends Object> {
|
||||
|
||||
public CompletableFuture<V> getAndRefreshAsync(final String key, final int expireSeconds);
|
||||
|
||||
public <T> CompletableFuture<T> getAndRefreshAsync(final String key, final int expireSeconds, final Type type);
|
||||
|
||||
default CompletableFuture<V> getAndRefreshIfAbsentAsync(final String key, final int expireSeconds, Function<String, ? extends V> mappingFunction) {
|
||||
return getAndRefreshAsync(key, expireSeconds).thenCompose((V rs) -> {
|
||||
if (rs == null) {
|
||||
@@ -187,8 +213,12 @@ public interface CacheSource<V extends Object> {
|
||||
|
||||
public CompletableFuture<Void> setAsync(final String key, final V value);
|
||||
|
||||
public <T> CompletableFuture<Void> setAsync(final String key, final Type type, final T value);
|
||||
|
||||
public CompletableFuture<Void> setAsync(final int expireSeconds, final String key, final V value);
|
||||
|
||||
public <T> CompletableFuture<Void> setAsync(final int expireSeconds, final String key, final Type type, final T value);
|
||||
|
||||
public CompletableFuture<Void> setExpireSecondsAsync(final String key, final int expireSeconds);
|
||||
|
||||
public CompletableFuture<Void> removeAsync(final String key);
|
||||
@@ -203,10 +233,14 @@ public interface CacheSource<V extends Object> {
|
||||
|
||||
public CompletableFuture<Collection<V>> getCollectionAsync(final String key);
|
||||
|
||||
public <T> CompletableFuture<Collection<T>> getCollectionAsync(final String key, final Type componentType);
|
||||
|
||||
public CompletableFuture<Integer> getCollectionSizeAsync(final String key);
|
||||
|
||||
public CompletableFuture<Collection<V>> getCollectionAndRefreshAsync(final String key, final int expireSeconds);
|
||||
|
||||
public <T> CompletableFuture<Collection<T>> getCollectionAndRefreshAsync(final String key, final int expireSeconds, final Type componentType);
|
||||
|
||||
public CompletableFuture<Void> appendListItemAsync(final String key, final V value);
|
||||
|
||||
public CompletableFuture<Void> removeListItemAsync(final String key, final V value);
|
||||
@@ -217,6 +251,16 @@ public interface CacheSource<V extends Object> {
|
||||
|
||||
public CompletableFuture<Void> removeSetItemAsync(final String key, final V value);
|
||||
|
||||
public <T> CompletableFuture<Void> appendListItemAsync(final String key, final Type componentType, final T value);
|
||||
|
||||
public <T> CompletableFuture<Void> removeListItemAsync(final String key, final Type componentType, final T value);
|
||||
|
||||
public <T> CompletableFuture<Boolean> existsSetItemAsync(final String key, final Type componentType, final T value);
|
||||
|
||||
public <T> CompletableFuture<Void> appendSetItemAsync(final String key, final Type componentType, final T value);
|
||||
|
||||
public <T> CompletableFuture<Void> removeSetItemAsync(final String key, final Type componentType, final T value);
|
||||
|
||||
public CompletableFuture<List<String>> queryKeysAsync();
|
||||
|
||||
public CompletableFuture<List<String>> queryKeysStartsWithAsync(String startsWith);
|
||||
|
||||
Reference in New Issue
Block a user