diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index 92098d452..39f835cc5 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -58,7 +58,7 @@ public abstract class WebSocketNode { this.localEngine.getWebSocketGroups().forEach(g -> disconnect(g.getGroupid())); } - protected abstract CompletableFuture> getOnlineRemoteAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid); + protected abstract CompletableFuture> getWebSocketAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid); protected abstract CompletableFuture sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid, boolean recent, Object message, boolean last); @@ -79,17 +79,18 @@ public abstract class WebSocketNode { //-------------------------------------------------------------------------------- /** - * 获取目标地址 + * 获取目标地址
+ * 该方法只能被内部调用 * * @param targetAddress * @param groupid * * @return */ - protected CompletableFuture> remoteOnlineRemoteAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid) { + protected CompletableFuture> remoteWebSocketAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid) { if (remoteNode == null) return CompletableFuture.completedFuture(null); try { - return remoteNode.getOnlineRemoteAddresses(targetAddress, groupid); + return remoteNode.getWebSocketAddresses(targetAddress, groupid); } catch (Exception e) { logger.log(Level.WARNING, "remote " + targetAddress + " websocket getOnlineRemoteAddresses error", e); return CompletableFuture.completedFuture(null); @@ -104,7 +105,7 @@ public abstract class WebSocketNode { * * @return 地址列表 */ - public CompletableFuture> getOnlineNodes(final Serializable groupid) { + public CompletableFuture> getSncpAddresses(final Serializable groupid) { if (this.sncpAddressNodes != null) return this.sncpAddressNodes.getCollectionAsync(groupid); List rs = new ArrayList<>(); if (this.localSncpAddress != null) rs.add(this.localSncpAddress); @@ -121,9 +122,9 @@ public abstract class WebSocketNode { * @return 地址集合 */ //异步待优化 - public CompletableFuture>> getOnlineRemoteAddress(final Serializable groupid) { + public CompletableFuture>> getSncpNodeWebSocketAddresses(final Serializable groupid) { final CompletableFuture>> rs = new CompletableFuture<>(); - CompletableFuture> nodesFuture = getOnlineNodes(groupid); + CompletableFuture> nodesFuture = getSncpAddresses(groupid); if (nodesFuture == null) return CompletableFuture.completedFuture(null); nodesFuture.whenComplete((nodes, e) -> { if (e != null) { @@ -131,7 +132,7 @@ public abstract class WebSocketNode { } else { final Map> map = new HashMap(); for (final InetSocketAddress nodeAddress : nodes) { - List list = getOnlineRemoteAddresses(nodeAddress, groupid).join(); + List list = getWebSocketAddresses(nodeAddress, groupid).join(); if (list == null) list = new ArrayList(); map.put(nodeAddress, list); } @@ -173,44 +174,38 @@ public abstract class WebSocketNode { * * @return 为0表示成功, 其他值表示异常 */ - //异步待优化 + //最近连接发送逻辑还没有理清楚 public final CompletableFuture sendMessage(final Serializable groupid, final boolean recent, final Object message, final boolean last) { - return CompletableFuture.supplyAsync(() -> { - if (finest) logger.finest("websocket want send message {groupid:" + groupid + ", content:'" + message + "'} from locale node to locale engine"); - int rscode = RETCODE_GROUP_EMPTY; - WebSocketGroup group = this.localEngine == null ? null : this.localEngine.getWebSocketGroup(groupid); - if (group != null) rscode = group.send(recent, message, last).join(); //临时, 要改 - if (recent && rscode == 0) { //已经给最近连接发送的消息 - if (finest) logger.finest("websocket want send recent message success"); - return rscode; - } - if (this.sncpAddressNodes == null || this.remoteNode == null) { - if (finest) logger.finest("websocket remote node is null"); - //没有CacheSource就不会有分布式节点 - return rscode; - } - //-----------------------发送远程的----------------------------- - Collection addrs = sncpAddressNodes.getCollectionAsync(groupid).join(); + if (finest) logger.finest("websocket want send message {groupid:" + groupid + ", content:'" + message + "'} from locale node to locale engine"); + CompletableFuture localFuture = null; + final WebSocketGroup group = this.localEngine == null ? null : this.localEngine.getWebSocketGroup(groupid); + if (group != null) localFuture = group.send(recent, message, last); + if (recent && localFuture != null) { //已经给最近连接发送的消息 + if (finest) logger.finest("websocket want send recent message success"); + return localFuture; + } + if (this.sncpAddressNodes == null || this.remoteNode == null) { + if (finest) logger.finest("websocket remote node is null"); + //没有CacheSource就不会有分布式节点 + return localFuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localFuture; + } + //远程节点发送消息 + CompletableFuture> addrsFuture = sncpAddressNodes.getCollectionAsync(groupid); + CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { if (finest) logger.finest("websocket found groupid:" + groupid + " on " + addrs); - if (addrs != null && !addrs.isEmpty()) { //对方连接在远程节点(包含本地节点),所以正常情况下addrs不会为空。 - if (recent) { - InetSocketAddress one = null; - for (InetSocketAddress addr : addrs) { - one = addr; - } - rscode = remoteNode.sendMessage(one, groupid, recent, message, last).join(); + if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0); + CompletableFuture future = null; + for (InetSocketAddress addr : addrs) { + if (addr.equals(localSncpAddress)) continue; + if (future == null) { + future = remoteNode.sendMessage(addr, groupid, recent, message, last); } else { - for (InetSocketAddress addr : addrs) { - if (!addr.equals(localSncpAddress)) { - rscode |= remoteNode.sendMessage(addr, groupid, recent, message, last).join(); - } - } + future = future.thenCombine(remoteNode.sendMessage(addr, groupid, recent, message, last), (a, b) -> a | b); } - } else { - rscode = RETCODE_GROUP_EMPTY; } - return rscode; + return future == null ? CompletableFuture.completedFuture(0) : future; }); + return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b); } } diff --git a/src/org/redkale/service/WebSocketNodeService.java b/src/org/redkale/service/WebSocketNodeService.java index 7e758a9a4..a10da4437 100644 --- a/src/org/redkale/service/WebSocketNodeService.java +++ b/src/org/redkale/service/WebSocketNodeService.java @@ -35,13 +35,13 @@ public class WebSocketNodeService extends WebSocketNode implements Service { } @Override - public CompletableFuture> getOnlineRemoteAddresses(final @RpcTargetAddress InetSocketAddress targetAddress, final Serializable groupid) { - if (localSncpAddress == null || !localSncpAddress.equals(targetAddress)) return remoteOnlineRemoteAddresses(targetAddress, groupid); + public CompletableFuture> getWebSocketAddresses(final @RpcTargetAddress InetSocketAddress targetAddress, final Serializable groupid) { + if (localSncpAddress == null || !localSncpAddress.equals(targetAddress)) return remoteWebSocketAddresses(targetAddress, groupid); if (this.localEngine == null) return CompletableFuture.completedFuture(new ArrayList<>()); return CompletableFuture.supplyAsync(() -> { final List rs = new ArrayList<>(); final WebSocketGroup group = this.localEngine.getWebSocketGroup(groupid); - if (group != null) group.getWebSockets().forEach(x -> rs.add("ws" + Objects.hashCode(x) + '@' + x.getRemoteAddr())); + if (group != null) group.getWebSockets().forEach(x -> rs.add(x.getRemoteAddr())); return rs; }); }