diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index e0b94249e..03ef81815 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -32,6 +32,9 @@ public abstract class WebSocketNode { @Comment("存储用户ID的key前缀") public static final String SOURCE_SNCP_USERID_PREFIX = "sncpws_uid:"; + @Comment("存储用户数的key") + public static final String SOURCE_SNCP_USERCOUNT_KEY = "sncpws_usercount"; + @Comment("存储当前SNCP节点列表的key") public static final String SOURCE_SNCP_ADDRS_KEY = "sncpws_addrs"; @@ -58,6 +61,7 @@ public abstract class WebSocketNode { protected WebSocketEngine localEngine; public void init(AnyValue conf) { + if(sncpNodeAddresses != null) sncpNodeAddresses.initValueType(InetSocketAddress.class); } public void destroy(AnyValue conf) { @@ -178,9 +182,7 @@ public abstract class WebSocketNode { if (this.localEngine != null && this.sncpNodeAddresses == null) { return CompletableFuture.completedFuture(this.localEngine.getLocalUserSize()); } - return this.sncpNodeAddresses.getKeySizeAsync().thenCompose(count -> { - return sncpNodeAddresses.existsAsync(SOURCE_SNCP_ADDRS_KEY).thenApply(exists -> exists ? (count - 1) : count); - }); + return this.sncpNodeAddresses.getLongAsync(SOURCE_SNCP_USERCOUNT_KEY, 0L).thenApply(v -> v.intValue()); } /** diff --git a/src/org/redkale/service/WebSocketNodeService.java b/src/org/redkale/service/WebSocketNodeService.java index 947b9b634..10f0f1fdd 100644 --- a/src/org/redkale/service/WebSocketNodeService.java +++ b/src/org/redkale/service/WebSocketNodeService.java @@ -69,6 +69,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service { @Override public CompletableFuture connect(Serializable userid, InetSocketAddress sncpAddr) { CompletableFuture future = sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, sncpAddr); + future = future.thenAccept((a) -> sncpNodeAddresses.incr(SOURCE_SNCP_USERCOUNT_KEY)); future = future.thenAccept((a) -> sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_ADDRS_KEY, sncpAddr)); if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " connect from " + sncpAddr); return future; @@ -85,6 +86,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service { @Override public CompletableFuture disconnect(Serializable userid, InetSocketAddress sncpAddr) { CompletableFuture future = sncpNodeAddresses.removeSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, sncpAddr); + future = future.thenAccept((a) -> sncpNodeAddresses.decr(SOURCE_SNCP_USERCOUNT_KEY)); if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " disconnect from " + sncpAddr); return future; } diff --git a/src/org/redkale/source/CacheMemorySource.java b/src/org/redkale/source/CacheMemorySource.java index 3ddada3d5..918e0e331 100644 --- a/src/org/redkale/source/CacheMemorySource.java +++ b/src/org/redkale/source/CacheMemorySource.java @@ -385,7 +385,7 @@ public class CacheMemorySource extends AbstractService impleme } @Override - @RpcMultiRun + //@RpcMultiRun public long getLongAndRefresh(String key, final int expireSeconds, long defValue) { if (key == null) return defValue; CacheEntry entry = container.get(key); @@ -551,7 +551,7 @@ public class CacheMemorySource extends AbstractService impleme } @Override - @RpcMultiRun + //@RpcMultiRun public long incr(final String key) { return incr(key, 1); } @@ -563,7 +563,7 @@ public class CacheMemorySource extends AbstractService impleme } @Override - @RpcMultiRun + //@RpcMultiRun public long incr(final String key, long num) { CacheEntry entry = container.get(key); if (entry == null) { @@ -585,7 +585,7 @@ public class CacheMemorySource extends AbstractService impleme } @Override - @RpcMultiRun + //@RpcMultiRun public long decr(final String key) { return incr(key, -1); } @@ -597,7 +597,7 @@ public class CacheMemorySource extends AbstractService impleme } @Override - @RpcMultiRun + //@RpcMultiRun public long decr(final String key, long num) { return incr(key, -num); } diff --git a/test/org/redkale/test/sncp/SncpTest.java b/test/org/redkale/test/sncp/SncpTest.java index 0ec4b221d..fcdaca4ab 100644 --- a/test/org/redkale/test/sncp/SncpTest.java +++ b/test/org/redkale/test/sncp/SncpTest.java @@ -94,6 +94,7 @@ public class SncpTest { SncpTestBean callbean = new SncpTestBean(); callbean.setId(1); callbean.setContent("数据X"); + service.queryLongResult("f", 3,33L); service.insert(callbean); System.out.println("bean.id应该会被修改(id不会是1): " + callbean); diff --git a/test/org/redkale/test/sncp/SncpTestIService.java b/test/org/redkale/test/sncp/SncpTestIService.java index 91d69588c..3f2da2cb6 100644 --- a/test/org/redkale/test/sncp/SncpTestIService.java +++ b/test/org/redkale/test/sncp/SncpTestIService.java @@ -17,8 +17,10 @@ public interface SncpTestIService extends Service { public String queryResult(SncpTestBean bean); + public long queryLongResult(String a, int b, long value); + public CompletableFuture queryResultAsync(SncpTestBean bean); - + public void insert(@RpcCall(DataCallArrayAttribute.class) SncpTestBean... beans); public String updateBean(@RpcCall(SncpTestServiceImpl.CallAttribute.class) SncpTestBean bean); diff --git a/test/org/redkale/test/sncp/SncpTestServiceImpl.java b/test/org/redkale/test/sncp/SncpTestServiceImpl.java index a68d34198..9657596a8 100644 --- a/test/org/redkale/test/sncp/SncpTestServiceImpl.java +++ b/test/org/redkale/test/sncp/SncpTestServiceImpl.java @@ -41,6 +41,12 @@ public class SncpTestServiceImpl implements SncpTestIService { } + @Override + @RpcMultiRun + public long queryLongResult(String a, int b, long value) { + return value + 1; + } + public static class CallAttribute implements Attribute { @Override