diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index 13e245886..7d34d8a38 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -104,6 +104,8 @@ public abstract class WebSocketNode { protected abstract CompletableFuture changeUserid(Serializable fromuserid, Serializable touserid, InetSocketAddress sncpAddr); + protected abstract CompletableFuture existsWebSocket(Serializable userid, @RpcTargetAddress InetSocketAddress targetAddress); + protected abstract CompletableFuture forceCloseWebSocket(Serializable userid, @RpcTargetAddress InetSocketAddress targetAddress); //-------------------------------------------------------------------------------- @@ -186,23 +188,6 @@ public abstract class WebSocketNode { }); } - /** - * 判断指定用户是否WebSocket在线 - * - * @param userid Serializable - * - * @return boolean - */ - public CompletableFuture existsWebSocket(final Serializable userid) { - if (this.localEngine != null && this.sncpNodeAddresses == null) { - return CompletableFuture.completedFuture(this.localEngine.existsLocalWebSocket(userid)); - } - tryAcquireSemaphore(); - CompletableFuture rs = this.sncpNodeAddresses.existsAsync(SOURCE_SNCP_USERID_PREFIX + userid); - if (semaphore != null) rs.whenComplete((r, e) -> releaseSemaphore()); - return rs; - } - /** * 获取在线用户总数 * @@ -219,6 +204,59 @@ public abstract class WebSocketNode { return rs; } + /** + * @deprecated + * + * 判断指定用户是否WebSocket在线 + * + * @param userid Serializable + * + * @return boolean + */ + private CompletableFuture existsWebSocket2(final Serializable userid) { + if (this.localEngine != null && this.sncpNodeAddresses == null) { + return CompletableFuture.completedFuture(this.localEngine.existsLocalWebSocket(userid)); + } + tryAcquireSemaphore(); + CompletableFuture rs = this.sncpNodeAddresses.existsAsync(SOURCE_SNCP_USERID_PREFIX + userid); + if (semaphore != null) rs.whenComplete((r, e) -> releaseSemaphore()); + return rs; + } + + /** + * 判断指定用户是否WebSocket在线 + * + * @param userid Serializable + * + * @return boolean + */ + @Local + public CompletableFuture existsWebSocket(final Serializable userid) { + CompletableFuture localFuture = null; + if (this.localEngine != null) localFuture = CompletableFuture.completedFuture(localEngine.existsLocalWebSocket(userid)); + if (this.sncpNodeAddresses == null || this.remoteNode == null) { + if (logger.isLoggable(Level.FINEST)) logger.finest("websocket remote node is null"); + //没有CacheSource就不会有分布式节点 + return localFuture; + } + //远程节点关闭 + tryAcquireSemaphore(); + CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class); + if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore()); + CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { + if (logger.isLoggable(Level.FINEST)) logger.finest("websocket found userid:" + userid + " on " + addrs); + if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(false); + CompletableFuture future = null; + for (InetSocketAddress addr : addrs) { + if (addr == null || addr.equals(localSncpAddress)) continue; + future = future == null ? remoteNode.existsWebSocket(userid, addr) + : future.thenCombine(remoteNode.existsWebSocket(userid, addr), (a, b) -> a | b); + } + return future == null ? CompletableFuture.completedFuture(false) : future; + }); + return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b); + } + /** * 强制关闭用户WebSocket * diff --git a/src/org/redkale/service/WebSocketNodeService.java b/src/org/redkale/service/WebSocketNodeService.java index 49627679f..5d68c85e1 100644 --- a/src/org/redkale/service/WebSocketNodeService.java +++ b/src/org/redkale/service/WebSocketNodeService.java @@ -133,6 +133,21 @@ public class WebSocketNodeService extends WebSocketNode implements Service { return future; } + /** + * 判断用户是否有WebSocket + * + * @param userid Serializable + * @param targetAddress InetSocketAddress + * + * @return 无返回值 + */ + @Override + public CompletableFuture existsWebSocket(Serializable userid, @RpcTargetAddress InetSocketAddress targetAddress) { + if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " existsWebSocket from " + targetAddress); + if (localEngine == null) return CompletableFuture.completedFuture(false); + return CompletableFuture.completedFuture(localEngine.existsLocalWebSocket(userid)); + } + /** * 强制关闭用户的WebSocket *