diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index 7f98dcab5..6f1e38f89 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -32,9 +32,6 @@ public abstract class WebSocketNode { @Comment("存储用户ID的key前缀") public static final String SOURCE_SNCP_USERID_PREFIX = "sncpws_uid:"; - @Comment("存储用户数的key") - public static final String SOURCE_SNCP_USERCOUNT_KEY = "sncpws_usercount"; - @Comment("存储当前SNCP节点列表的key") public static final String SOURCE_SNCP_ADDRS_KEY = "sncpws_addrs"; @@ -211,7 +208,7 @@ public abstract class WebSocketNode { return CompletableFuture.completedFuture(this.localEngine.getLocalUserSize()); } tryAcquireSemaphore(); - CompletableFuture rs = this.sncpNodeAddresses.getLongAsync(SOURCE_SNCP_USERCOUNT_KEY, 0L).thenApply(v -> v.intValue()); + CompletableFuture rs = this.sncpNodeAddresses.queryKeysStartsWithAsync(SOURCE_SNCP_USERID_PREFIX).thenApply(v -> v.size()); if (semaphore != null) rs.whenComplete((r, e) -> releaseSemaphore()); return rs; } @@ -574,7 +571,7 @@ public abstract class WebSocketNode { protected boolean tryAcquireSemaphore() { if (this.semaphore == null) return true; try { - System.out.println("---------this.semaphore.tryAcquire" ); + System.out.println("---------this.semaphore.tryAcquire"); return this.semaphore.tryAcquire(6, TimeUnit.SECONDS); } catch (Exception e) { return false; @@ -583,6 +580,6 @@ public abstract class WebSocketNode { protected void releaseSemaphore() { if (this.semaphore != null) this.semaphore.release(); - System.out.println("---------this.semaphore.release: " + this.semaphore ); + System.out.println("---------this.semaphore.release: " + this.semaphore); } } diff --git a/src/org/redkale/service/WebSocketNodeService.java b/src/org/redkale/service/WebSocketNodeService.java index 759ea82dd..750dbac28 100644 --- a/src/org/redkale/service/WebSocketNodeService.java +++ b/src/org/redkale/service/WebSocketNodeService.java @@ -79,7 +79,6 @@ public class WebSocketNodeService extends WebSocketNode implements Service { public CompletableFuture connect(Serializable userid, InetSocketAddress sncpAddr) { tryAcquireSemaphore(); CompletableFuture future = sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, sncpAddr); - future = future.thenAccept((a) -> sncpNodeAddresses.incr(SOURCE_SNCP_USERCOUNT_KEY)); future = future.thenAccept((a) -> sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_ADDRS_KEY, sncpAddr)); if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore()); if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " connect from " + sncpAddr); @@ -97,9 +96,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service { @Override public CompletableFuture disconnect(Serializable userid, InetSocketAddress sncpAddr) { tryAcquireSemaphore(); - CompletableFuture future = CompletableFuture.allOf( - sncpNodeAddresses.removeSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, sncpAddr), - sncpNodeAddresses.decrAsync(SOURCE_SNCP_USERCOUNT_KEY)); + CompletableFuture future = sncpNodeAddresses.removeSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, sncpAddr); if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore()); if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " disconnect from " + sncpAddr); return future;