From 95c8ae23346710ee168ff6eb7317c6194322f71e Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Fri, 30 Aug 2019 08:05:28 +0800 Subject: [PATCH] --- src/org/redkale/net/http/WebSocketNode.java | 32 +++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index 78af60580..185633da9 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -217,6 +217,7 @@ public abstract class WebSocketNode { */ @Local public CompletableFuture existsWebSocket(final Serializable userid) { + if (userid instanceof WebSocketUserAddress) return existsWebSocket((WebSocketUserAddress) userid); CompletableFuture localFuture = null; if (this.localEngine != null) localFuture = CompletableFuture.completedFuture(localEngine.existsLocalWebSocket(userid)); if (this.sncpNodeAddresses == null || this.remoteNode == null) { @@ -242,6 +243,37 @@ public abstract class WebSocketNode { return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b); } + /** + * 判断指定用户是否WebSocket在线 + * + * @param userAddress WebSocketUserAddress + * + * @return boolean + */ + @Local + public CompletableFuture existsWebSocket(final WebSocketUserAddress userAddress) { + CompletableFuture localFuture = null; + if (this.localEngine != null) localFuture = CompletableFuture.completedFuture(localEngine.existsLocalWebSocket(userAddress.userid())); + if (this.sncpNodeAddresses == null || this.remoteNode == null) { + if (logger.isLoggable(Level.FINEST)) logger.finest("websocket remote node is null"); + //没有CacheSource就不会有分布式节点 + return localFuture; + } + Collection addrs = userAddress.sncpAddresses(); + if (addrs != null) addrs = new ArrayList<>(addrs); //不能修改参数内部值 + if (userAddress.sncpAddress() != null) { + if (addrs == null) addrs = new ArrayList<>(); + addrs.add(userAddress.sncpAddress()); + } + CompletableFuture future = null; + for (InetSocketAddress addr : addrs) { + if (addr == null || addr.equals(localSncpAddress)) continue; + future = future == null ? remoteNode.existsWebSocket(userAddress.userid(), addr) + : future.thenCombine(remoteNode.existsWebSocket(userAddress.userid(), addr), (a, b) -> a | b); + } + return future == null ? CompletableFuture.completedFuture(false) : future; + } + /** * 强制关闭用户WebSocket *