diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index 78af60580..185633da9 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -217,6 +217,7 @@ public abstract class WebSocketNode { */ @Local public CompletableFuture existsWebSocket(final Serializable userid) { + if (userid instanceof WebSocketUserAddress) return existsWebSocket((WebSocketUserAddress) userid); CompletableFuture localFuture = null; if (this.localEngine != null) localFuture = CompletableFuture.completedFuture(localEngine.existsLocalWebSocket(userid)); if (this.sncpNodeAddresses == null || this.remoteNode == null) { @@ -242,6 +243,37 @@ public abstract class WebSocketNode { return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b); } + /** + * 判断指定用户是否WebSocket在线 + * + * @param userAddress WebSocketUserAddress + * + * @return boolean + */ + @Local + public CompletableFuture existsWebSocket(final WebSocketUserAddress userAddress) { + CompletableFuture localFuture = null; + if (this.localEngine != null) localFuture = CompletableFuture.completedFuture(localEngine.existsLocalWebSocket(userAddress.userid())); + if (this.sncpNodeAddresses == null || this.remoteNode == null) { + if (logger.isLoggable(Level.FINEST)) logger.finest("websocket remote node is null"); + //没有CacheSource就不会有分布式节点 + return localFuture; + } + Collection addrs = userAddress.sncpAddresses(); + if (addrs != null) addrs = new ArrayList<>(addrs); //不能修改参数内部值 + if (userAddress.sncpAddress() != null) { + if (addrs == null) addrs = new ArrayList<>(); + addrs.add(userAddress.sncpAddress()); + } + CompletableFuture future = null; + for (InetSocketAddress addr : addrs) { + if (addr == null || addr.equals(localSncpAddress)) continue; + future = future == null ? remoteNode.existsWebSocket(userAddress.userid(), addr) + : future.thenCombine(remoteNode.existsWebSocket(userAddress.userid(), addr), (a, b) -> a | b); + } + return future == null ? CompletableFuture.completedFuture(false) : future; + } + /** * 强制关闭用户WebSocket *