diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index 39f835cc5..3fdacea7f 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -34,14 +34,15 @@ public abstract class WebSocketNode { @Resource(name = Application.RESNAME_SNCP_ADDR) protected InetSocketAddress localSncpAddress; //为SncpServer的服务address + //如果不是分布式(没有SNCP) 值为null @RpcRemote protected WebSocketNode remoteNode; //存放所有用户分布在节点上的队列信息,Set 为 sncpnode 的集合, key: groupid - //包含 localSncpAddress - //如果不是分布式(没有SNCP),sncpAddressNodes 将不会被用到 + //集合包含 localSncpAddress + //如果不是分布式(没有SNCP),sncpNodeAddresses 将不会被用到 @Resource(name = "$") - protected CacheSource sncpAddressNodes; + protected CacheSource sncpNodeAddresses; //当前节点的本地WebSocketEngine protected WebSocketEngine localEngine; @@ -80,7 +81,7 @@ public abstract class WebSocketNode { //-------------------------------------------------------------------------------- /** * 获取目标地址
- * 该方法只能被内部调用 + * 该方法仅供内部调用 * * @param targetAddress * @param groupid @@ -98,48 +99,53 @@ public abstract class WebSocketNode { } /** - * 获取用户在线的SNCP节点地址列表,不是分布式则返回空列表
+ * 获取用户在线的SNCP节点地址列表,不是分布式则返回元素数量为1,且元素值为null的列表
* InetSocketAddress 为 SNCP节点地址 * * @param groupid groupid * * @return 地址列表 */ - public CompletableFuture> getSncpAddresses(final Serializable groupid) { - if (this.sncpAddressNodes != null) return this.sncpAddressNodes.getCollectionAsync(groupid); + public CompletableFuture> getSncpNodeAddresses(final Serializable groupid) { + if (this.sncpNodeAddresses != null) return this.sncpNodeAddresses.getCollectionAsync(groupid); List rs = new ArrayList<>(); - if (this.localSncpAddress != null) rs.add(this.localSncpAddress); + rs.add(this.localSncpAddress); return CompletableFuture.completedFuture(rs); } /** * 获取在线用户的详细连接信息
- * Map.key 为 SNCP节点地址 + * Map.key 为 SNCP节点地址, 含值为null的key表示没有分布式 * Map.value 为 用户客户端的IP * * @param groupid groupid * * @return 地址集合 */ - //异步待优化 public CompletableFuture>> getSncpNodeWebSocketAddresses(final Serializable groupid) { - final CompletableFuture>> rs = new CompletableFuture<>(); - CompletableFuture> nodesFuture = getSncpAddresses(groupid); - if (nodesFuture == null) return CompletableFuture.completedFuture(null); - nodesFuture.whenComplete((nodes, e) -> { - if (e != null) { - rs.completeExceptionally(e); - } else { - final Map> map = new HashMap(); - for (final InetSocketAddress nodeAddress : nodes) { - List list = getWebSocketAddresses(nodeAddress, groupid).join(); - if (list == null) list = new ArrayList(); - map.put(nodeAddress, list); + CompletableFuture> sncpFuture = getSncpNodeAddresses(groupid); + return sncpFuture.thenCompose((Collection addrs) -> { + if (finest) logger.finest("websocket found groupid:" + groupid + " on " + addrs); + if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(new HashMap<>()); + CompletableFuture>> future = null; + for (final InetSocketAddress nodeAddress : addrs) { + CompletableFuture>> mapFuture = getWebSocketAddresses(nodeAddress, groupid) + .thenCompose((List list) -> { + Map> map = new HashMap<>(); + map.put(nodeAddress, list); + return CompletableFuture.completedFuture(map); + }); + if (future == null) { + future = mapFuture; + } else { + future = future.thenCombine(mapFuture, (a, b) -> { + a.putAll(b); + return a; + }); } - rs.complete(map); } + return future == null ? CompletableFuture.completedFuture(new HashMap<>()) : future; }); - return rs; } //-------------------------------------------------------------------------------- @@ -184,19 +190,19 @@ public abstract class WebSocketNode { if (finest) logger.finest("websocket want send recent message success"); return localFuture; } - if (this.sncpAddressNodes == null || this.remoteNode == null) { + if (this.sncpNodeAddresses == null || this.remoteNode == null) { if (finest) logger.finest("websocket remote node is null"); //没有CacheSource就不会有分布式节点 return localFuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localFuture; } //远程节点发送消息 - CompletableFuture> addrsFuture = sncpAddressNodes.getCollectionAsync(groupid); + CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(groupid); CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { if (finest) logger.finest("websocket found groupid:" + groupid + " on " + addrs); if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0); CompletableFuture future = null; for (InetSocketAddress addr : addrs) { - if (addr.equals(localSncpAddress)) continue; + if (addr == null || addr.equals(localSncpAddress)) continue; if (future == null) { future = remoteNode.sendMessage(addr, groupid, recent, message, last); } else { diff --git a/src/org/redkale/service/WebSocketNodeService.java b/src/org/redkale/service/WebSocketNodeService.java index a10da4437..df1dbe145 100644 --- a/src/org/redkale/service/WebSocketNodeService.java +++ b/src/org/redkale/service/WebSocketNodeService.java @@ -67,7 +67,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service { */ @Override public CompletableFuture connect(Serializable groupid, InetSocketAddress sncpAddr) { - CompletableFuture future = sncpAddressNodes.appendSetItemAsync(groupid, sncpAddr); + CompletableFuture future = sncpNodeAddresses.appendSetItemAsync(groupid, sncpAddr); if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " connect from " + sncpAddr); return future; } @@ -82,7 +82,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service { */ @Override public CompletableFuture disconnect(Serializable groupid, InetSocketAddress sncpAddr) { - CompletableFuture future = sncpAddressNodes.removeSetItemAsync(groupid, sncpAddr); + CompletableFuture future = sncpNodeAddresses.removeSetItemAsync(groupid, sncpAddr); if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " disconnect from " + sncpAddr); return future; }