diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index d587f62c3..bdb042ef4 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -60,7 +60,7 @@ public abstract class WebSocketNode { protected Semaphore semaphore; public void init(AnyValue conf) { - if (sncpNodeAddresses != null) sncpNodeAddresses.initValueType(InetSocketAddress.class); + //if (sncpNodeAddresses != null) sncpNodeAddresses.initValueType(InetSocketAddress.class); if (localEngine != null) { int wsthreads = localEngine.wsthreads; if (wsthreads == 0) wsthreads = Runtime.getRuntime().availableProcessors() * 8; @@ -82,7 +82,7 @@ public abstract class WebSocketNode { //关掉所有本地本地WebSocket this.localEngine.getLocalWebSockets().forEach(g -> g.close()); if (sncpNodeAddresses != null && localSncpAddress != null) { - sncpNodeAddresses.removeSetItem(SOURCE_SNCP_ADDRS_KEY, localSncpAddress); + sncpNodeAddresses.removeSetItem(SOURCE_SNCP_ADDRS_KEY, InetSocketAddress.class, localSncpAddress); } } @@ -151,7 +151,7 @@ public abstract class WebSocketNode { public CompletableFuture> getRpcNodeAddresses(final Serializable userid) { if (this.sncpNodeAddresses != null) { tryAcquireSemaphore(); - CompletableFuture> result = this.sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid); + CompletableFuture> result = this.sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class); if (semaphore != null) result.whenComplete((r, e) -> releaseSemaphore()); return result; } @@ -235,7 +235,7 @@ public abstract class WebSocketNode { } //远程节点关闭 tryAcquireSemaphore(); - CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid); + CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class); if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore()); CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { if (logger.isLoggable(Level.FINEST)) logger.finest("websocket found userid:" + userid + " on " + addrs); @@ -513,7 +513,7 @@ public abstract class WebSocketNode { final Object remoteMessage = formatRemoteMessage(message); CompletableFuture localFuture = this.localEngine == null ? null : this.localEngine.broadcastMessage(wsrange, message, last); tryAcquireSemaphore(); - CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_ADDRS_KEY); + CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_ADDRS_KEY, InetSocketAddress.class); if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore()); CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { if (logger.isLoggable(Level.FINEST)) logger.finest("websocket broadcast message (" + remoteMessage + ") on " + addrs); @@ -544,7 +544,7 @@ public abstract class WebSocketNode { //远程节点发送消息 final Object remoteMessage = formatRemoteMessage(message); tryAcquireSemaphore(); - CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid); + CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class); if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore()); CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { if (addrs == null || addrs.isEmpty()) { @@ -577,7 +577,7 @@ public abstract class WebSocketNode { } CompletableFuture localFuture = this.localEngine == null ? null : this.localEngine.broadcastAction(action); tryAcquireSemaphore(); - CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_ADDRS_KEY); + CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_ADDRS_KEY, InetSocketAddress.class); if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore()); CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { if (logger.isLoggable(Level.FINEST)) logger.finest("websocket broadcast action (" + action + ") on " + addrs); @@ -597,7 +597,7 @@ public abstract class WebSocketNode { * 向指定用户发送操作,先发送本地连接,再发送远程连接
* 如果当前WebSocketNode是远程模式,此方法只发送远程连接 * - * @param action 操作参数 + * @param action 操作参数 * @param userids Serializable[] * * @return 为0表示成功, 其他值表示部分发送异常 @@ -628,7 +628,7 @@ public abstract class WebSocketNode { } //远程节点发送操作 tryAcquireSemaphore(); - CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid); + CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class); if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore()); CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { if (addrs == null || addrs.isEmpty()) { diff --git a/src/org/redkale/service/WebSocketNodeService.java b/src/org/redkale/service/WebSocketNodeService.java index 0659df6c0..49627679f 100644 --- a/src/org/redkale/service/WebSocketNodeService.java +++ b/src/org/redkale/service/WebSocketNodeService.java @@ -90,8 +90,8 @@ public class WebSocketNodeService extends WebSocketNode implements Service { @Override public CompletableFuture connect(Serializable userid, InetSocketAddress sncpAddr) { tryAcquireSemaphore(); - CompletableFuture future = sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, sncpAddr); - future = future.thenAccept((a) -> sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_ADDRS_KEY, sncpAddr)); + CompletableFuture future = sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class, sncpAddr); + future = future.thenAccept((a) -> sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_ADDRS_KEY, InetSocketAddress.class, sncpAddr)); if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore()); if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " connect from " + sncpAddr); return future; @@ -108,7 +108,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service { @Override public CompletableFuture disconnect(Serializable userid, InetSocketAddress sncpAddr) { tryAcquireSemaphore(); - CompletableFuture future = sncpNodeAddresses.removeSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, sncpAddr); + CompletableFuture future = sncpNodeAddresses.removeSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class, sncpAddr); if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore()); if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " disconnect from " + sncpAddr); return future; @@ -126,8 +126,8 @@ public class WebSocketNodeService extends WebSocketNode implements Service { @Override public CompletableFuture changeUserid(Serializable olduserid, Serializable newuserid, InetSocketAddress sncpAddr) { tryAcquireSemaphore(); - CompletableFuture future = sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_USERID_PREFIX + newuserid, sncpAddr); - future = future.thenAccept((a) -> sncpNodeAddresses.removeSetItemAsync(SOURCE_SNCP_USERID_PREFIX + olduserid, sncpAddr)); + CompletableFuture future = sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_USERID_PREFIX + newuserid, InetSocketAddress.class, sncpAddr); + future = future.thenAccept((a) -> sncpNodeAddresses.removeSetItemAsync(SOURCE_SNCP_USERID_PREFIX + olduserid, InetSocketAddress.class, sncpAddr)); if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore()); if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + olduserid + " changeUserid to " + newuserid + " from " + sncpAddr); return future; diff --git a/src/org/redkale/source/CacheMemorySource.java b/src/org/redkale/source/CacheMemorySource.java index 7e7213193..3814f505a 100644 --- a/src/org/redkale/source/CacheMemorySource.java +++ b/src/org/redkale/source/CacheMemorySource.java @@ -27,9 +27,9 @@ import org.redkale.util.*; * * @author zhangjx */ -@SuppressWarnings("unchecked") @Local @AutoLoad(false) +@SuppressWarnings("unchecked") @ResourceType(CacheSource.class) public class CacheMemorySource extends AbstractService implements CacheSource, Service, AutoCloseable, Resourcable { @@ -340,6 +340,11 @@ public class CacheMemorySource extends AbstractService impleme return (V) entry.objectValue; } + @Override + public T get(final String key, final Type type) { + return (T) get(key); + } + @Override public String getString(String key) { if (key == null) return null; @@ -361,6 +366,11 @@ public class CacheMemorySource extends AbstractService impleme return CompletableFuture.supplyAsync(() -> get(key), getExecutor()); } + @Override + public CompletableFuture getAsync(final String key, final Type type) { + return CompletableFuture.supplyAsync(() -> (T) get(key), getExecutor()); + } + @Override public CompletableFuture getStringAsync(final String key) { return CompletableFuture.supplyAsync(() -> getString(key), getExecutor()); @@ -385,6 +395,11 @@ public class CacheMemorySource extends AbstractService impleme return (V) entry.objectValue; } + @Override + public T getAndRefresh(final String key, final int expireSeconds, final Type type) { + return (T) getAndRefresh(key, expireSeconds); + } + @Override @RpcMultiRun @SuppressWarnings("unchecked") @@ -415,6 +430,11 @@ public class CacheMemorySource extends AbstractService impleme return CompletableFuture.supplyAsync(() -> getAndRefresh(key, expireSeconds), getExecutor()); } + @Override + public CompletableFuture getAndRefreshAsync(final String key, final int expireSeconds, final Type type) { + return CompletableFuture.supplyAsync(() -> getAndRefresh(key, expireSeconds, type), getExecutor()); + } + @Override @RpcMultiRun public CompletableFuture getStringAndRefreshAsync(final String key, final int expireSeconds) { @@ -462,6 +482,11 @@ public class CacheMemorySource extends AbstractService impleme set(CacheEntryType.OBJECT, key, value); } + @Override + public void set(String key, Type type, T value) { + set(CacheEntryType.OBJECT, key, value); + } + @Override @RpcMultiRun public void setString(String key, String value) { @@ -480,6 +505,11 @@ public class CacheMemorySource extends AbstractService impleme return CompletableFuture.runAsync(() -> set(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } + @Override + public CompletableFuture setAsync(String key, Type type, T value) { + return CompletableFuture.runAsync(() -> set(key, type, value), getExecutor()).whenComplete(futureCompleteConsumer); + } + @Override @RpcMultiRun public CompletableFuture setStringAsync(String key, String value) { @@ -511,6 +541,11 @@ public class CacheMemorySource extends AbstractService impleme set(CacheEntryType.OBJECT, expireSeconds, key, value); } + @Override + public void set(final int expireSeconds, String key, Type type, T value) { + set(CacheEntryType.OBJECT, expireSeconds, key, value); + } + @Override @RpcMultiRun public void setString(int expireSeconds, String key, String value) { @@ -529,6 +564,11 @@ public class CacheMemorySource extends AbstractService impleme return CompletableFuture.runAsync(() -> set(expireSeconds, key, value), getExecutor()).whenComplete(futureCompleteConsumer); } + @Override + public CompletableFuture setAsync(int expireSeconds, String key, Type type, T value) { + return CompletableFuture.runAsync(() -> set(expireSeconds, key, type, value), getExecutor()).whenComplete(futureCompleteConsumer); + } + @Override @RpcMultiRun public CompletableFuture setStringAsync(int expireSeconds, String key, String value) { @@ -632,6 +672,11 @@ public class CacheMemorySource extends AbstractService impleme return (Collection) get(key); } + @Override + public Collection getCollection(final String key, final Type componentType) { + return (Collection) get(key); + } + @Override public Collection getStringCollection(final String key) { return (Collection) get(key); @@ -647,6 +692,11 @@ public class CacheMemorySource extends AbstractService impleme return CompletableFuture.supplyAsync(() -> getCollection(key), getExecutor()); } + @Override + public CompletableFuture> getCollectionAsync(final String key, final Type componentType) { + return CompletableFuture.supplyAsync(() -> getCollection(key, componentType), getExecutor()); + } + @Override public CompletableFuture> getStringCollectionAsync(final String key) { return CompletableFuture.supplyAsync(() -> getStringCollection(key), getExecutor()); @@ -674,6 +724,11 @@ public class CacheMemorySource extends AbstractService impleme return (Collection) getAndRefresh(key, expireSeconds); } + @Override + public Collection getCollectionAndRefresh(final String key, final int expireSeconds, final Type componentType) { + return (Collection) getAndRefresh(key, expireSeconds, componentType); + } + @Override @RpcMultiRun public Collection getStringCollectionAndRefresh(final String key, final int expireSeconds) { @@ -686,11 +741,22 @@ public class CacheMemorySource extends AbstractService impleme return list != null && list.contains(value); } + @Override + public boolean existsSetItem(final String key, final Type type, final T value) { + Collection list = getCollection(key); + return list != null && list.contains(value); + } + @Override public CompletableFuture existsSetItemAsync(final String key, final V value) { return CompletableFuture.supplyAsync(() -> existsSetItem(key, value), getExecutor()); } + @Override + public CompletableFuture existsSetItemAsync(final String key, final Type type, final T value) { + return CompletableFuture.supplyAsync(() -> existsSetItem(key, type, value), getExecutor()); + } + @Override public boolean existsStringSetItem(final String key, final String value) { Collection list = getStringCollection(key); @@ -725,6 +791,11 @@ public class CacheMemorySource extends AbstractService impleme return CompletableFuture.supplyAsync(() -> getCollectionAndRefresh(key, expireSeconds), getExecutor()); } + @Override + public CompletableFuture> getCollectionAndRefreshAsync(final String key, final int expireSeconds, final Type componentType) { + return CompletableFuture.supplyAsync(() -> getCollectionAndRefresh(key, expireSeconds, componentType), getExecutor()); + } + @Override @RpcMultiRun public CompletableFuture> getStringCollectionAndRefreshAsync(final String key, final int expireSeconds) { @@ -757,6 +828,11 @@ public class CacheMemorySource extends AbstractService impleme appendListItem(CacheEntryType.OBJECT_LIST, key, value); } + @Override + public void appendListItem(String key, Type componentType, T value) { + appendListItem(CacheEntryType.OBJECT_LIST, key, value); + } + @Override @RpcMultiRun public void appendStringListItem(String key, String value) { @@ -775,6 +851,11 @@ public class CacheMemorySource extends AbstractService impleme return CompletableFuture.runAsync(() -> appendListItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } + @Override + public CompletableFuture appendListItemAsync(final String key, final Type componentType, final T value) { + return CompletableFuture.runAsync(() -> appendListItem(key, componentType, value), getExecutor()).whenComplete(futureCompleteConsumer); + } + @Override @RpcMultiRun public CompletableFuture appendStringListItemAsync(final String key, final String value) { @@ -796,6 +877,14 @@ public class CacheMemorySource extends AbstractService impleme entry.listValue.remove(value); } + @Override + public void removeListItem(String key, final Type componentType, T value) { + if (key == null) return; + CacheEntry entry = container.get(key); + if (entry == null || entry.listValue == null) return; + entry.listValue.remove(value); + } + @Override @RpcMultiRun public void removeStringListItem(String key, String value) { @@ -820,6 +909,11 @@ public class CacheMemorySource extends AbstractService impleme return CompletableFuture.runAsync(() -> removeListItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } + @Override + public CompletableFuture removeListItemAsync(final String key, final Type componentType, T value) { + return CompletableFuture.runAsync(() -> removeListItem(key, componentType, value), getExecutor()).whenComplete(futureCompleteConsumer); + } + @Override @RpcMultiRun public CompletableFuture removeStringListItemAsync(final String key, final String value) { @@ -852,6 +946,11 @@ public class CacheMemorySource extends AbstractService impleme appendSetItem(CacheEntryType.OBJECT_SET, key, value); } + @Override + public void appendSetItem(String key, final Type componentType, T value) { + appendSetItem(CacheEntryType.OBJECT_SET, key, value); + } + @Override @RpcMultiRun public void appendStringSetItem(String key, String value) { @@ -870,6 +969,11 @@ public class CacheMemorySource extends AbstractService impleme return CompletableFuture.runAsync(() -> appendSetItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } + @Override + public CompletableFuture appendSetItemAsync(final String key, final Type componentType, T value) { + return CompletableFuture.runAsync(() -> appendSetItem(key, componentType, value), getExecutor()).whenComplete(futureCompleteConsumer); + } + @Override @RpcMultiRun public CompletableFuture appendStringSetItemAsync(final String key, final String value) { @@ -891,6 +995,14 @@ public class CacheMemorySource extends AbstractService impleme entry.csetValue.remove(value); } + @Override + public void removeSetItem(String key, Type type, T value) { + if (key == null) return; + CacheEntry entry = container.get(key); + if (entry == null || entry.csetValue == null) return; + entry.csetValue.remove(value); + } + @Override @RpcMultiRun public void removeStringSetItem(String key, String value) { @@ -915,6 +1027,11 @@ public class CacheMemorySource extends AbstractService impleme return CompletableFuture.runAsync(() -> removeSetItem(key, value), getExecutor()).whenComplete(futureCompleteConsumer); } + @Override + public CompletableFuture removeSetItemAsync(final String key, final Type componentType, final T value) { + return CompletableFuture.runAsync(() -> removeSetItem(key, componentType, value), getExecutor()).whenComplete(futureCompleteConsumer); + } + @Override @RpcMultiRun public CompletableFuture removeStringSetItemAsync(final String key, final String value) { diff --git a/src/org/redkale/source/CacheSource.java b/src/org/redkale/source/CacheSource.java index 6ff1e3f57..d868a8521 100644 --- a/src/org/redkale/source/CacheSource.java +++ b/src/org/redkale/source/CacheSource.java @@ -42,6 +42,8 @@ public interface CacheSource { public V get(final String key); + public T get(final String key, final Type type); + default V getIfAbsent(final String key, Function mappingFunction) { V rs = get(key); if (rs == null) { @@ -53,6 +55,8 @@ public interface CacheSource { public V getAndRefresh(final String key, final int expireSeconds); + public T getAndRefresh(final String key, final int expireSeconds, final Type type); + default V getAndRefreshIfAbsent(final String key, final int expireSeconds, Function mappingFunction) { V rs = getAndRefresh(key, expireSeconds); if (rs == null) { @@ -66,8 +70,12 @@ public interface CacheSource { public void set(final String key, final V value); + public void set(final String key, final Type type, final T value); + public void set(final int expireSeconds, final String key, final V value); + public void set(final int expireSeconds, final String key, final Type type, final T value); + public void setExpireSeconds(final String key, final int expireSeconds); public void remove(final String key); @@ -82,10 +90,14 @@ public interface CacheSource { public Collection getCollection(final String key); + public Collection getCollection(final String key, final Type componentType); + public int getCollectionSize(final String key); public Collection getCollectionAndRefresh(final String key, final int expireSeconds); + public Collection getCollectionAndRefresh(final String key, final int expireSeconds, final Type componentType); + public void appendListItem(final String key, final V value); public void removeListItem(final String key, final V value); @@ -96,6 +108,16 @@ public interface CacheSource { public void removeSetItem(final String key, final V value); + public void appendListItem(final String key, final Type componentType, final T value); + + public void removeListItem(final String key, final Type componentType, final T value); + + public boolean existsSetItem(final String key, final Type componentType, final T value); + + public void appendSetItem(final String key, final Type componentType, final T value); + + public void removeSetItem(final String key, final Type componentType, final T value); + public List queryKeys(); public List queryKeysStartsWith(String startsWith); @@ -153,6 +175,8 @@ public interface CacheSource { //---------------------- CompletableFuture 异步版 --------------------------------- public CompletableFuture existsAsync(final String key); + public CompletableFuture getAsync(final String key, final Type type); + public CompletableFuture getAsync(final String key); default CompletableFuture getIfAbsentAsync(final String key, Function mappingFunction) { @@ -170,6 +194,8 @@ public interface CacheSource { public CompletableFuture getAndRefreshAsync(final String key, final int expireSeconds); + public CompletableFuture getAndRefreshAsync(final String key, final int expireSeconds, final Type type); + default CompletableFuture getAndRefreshIfAbsentAsync(final String key, final int expireSeconds, Function mappingFunction) { return getAndRefreshAsync(key, expireSeconds).thenCompose((V rs) -> { if (rs == null) { @@ -187,8 +213,12 @@ public interface CacheSource { public CompletableFuture setAsync(final String key, final V value); + public CompletableFuture setAsync(final String key, final Type type, final T value); + public CompletableFuture setAsync(final int expireSeconds, final String key, final V value); + public CompletableFuture setAsync(final int expireSeconds, final String key, final Type type, final T value); + public CompletableFuture setExpireSecondsAsync(final String key, final int expireSeconds); public CompletableFuture removeAsync(final String key); @@ -203,10 +233,14 @@ public interface CacheSource { public CompletableFuture> getCollectionAsync(final String key); + public CompletableFuture> getCollectionAsync(final String key, final Type componentType); + public CompletableFuture getCollectionSizeAsync(final String key); public CompletableFuture> getCollectionAndRefreshAsync(final String key, final int expireSeconds); + public CompletableFuture> getCollectionAndRefreshAsync(final String key, final int expireSeconds, final Type componentType); + public CompletableFuture appendListItemAsync(final String key, final V value); public CompletableFuture removeListItemAsync(final String key, final V value); @@ -217,6 +251,16 @@ public interface CacheSource { public CompletableFuture removeSetItemAsync(final String key, final V value); + public CompletableFuture appendListItemAsync(final String key, final Type componentType, final T value); + + public CompletableFuture removeListItemAsync(final String key, final Type componentType, final T value); + + public CompletableFuture existsSetItemAsync(final String key, final Type componentType, final T value); + + public CompletableFuture appendSetItemAsync(final String key, final Type componentType, final T value); + + public CompletableFuture removeSetItemAsync(final String key, final Type componentType, final T value); + public CompletableFuture> queryKeysAsync(); public CompletableFuture> queryKeysStartsWithAsync(String startsWith);