diff --git a/src/org/redkale/net/http/WebSocket.java b/src/org/redkale/net/http/WebSocket.java index 1174fead7..d9c50dcf0 100644 --- a/src/org/redkale/net/http/WebSocket.java +++ b/src/org/redkale/net/http/WebSocket.java @@ -522,7 +522,7 @@ public abstract class WebSocket { * * @return 地址列表 */ - public CompletableFuture> getRpcNodeAddresses(final Serializable userid) { + public CompletableFuture> getRpcNodeAddresses(final Serializable userid) { if (_engine.node == null) return CompletableFuture.completedFuture(null); return _engine.node.getRpcNodeAddresses(userid); } @@ -536,7 +536,7 @@ public abstract class WebSocket { * * @return 地址集合 */ - public CompletableFuture>> getRpcNodeWebSocketAddresses(final Serializable userid) { + public CompletableFuture>> getRpcNodeWebSocketAddresses(final Serializable userid) { if (_engine.node == null) return CompletableFuture.completedFuture(null); return _engine.node.getRpcNodeWebSocketAddresses(userid); } diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index e1848c1c1..89aaad3c7 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -42,6 +42,8 @@ public abstract class WebSocketNode { @Resource(name = Application.RESNAME_SNCP_ADDR) protected InetSocketAddress localSncpAddress; //为SncpServer的服务address + protected WebSocketAddress wsaddress; + protected String name; //如果不是分布式(没有SNCP) 值为null @@ -51,11 +53,11 @@ public abstract class WebSocketNode { @Resource(name = "$_sendconvert") protected Convert sendConvert; - //存放所有用户分布在节点上的队列信息,Set 为 sncpnode 的集合, key: groupid + //存放所有用户分布在节点上的队列信息,Set 为 sncpnode 的集合, key: groupid //集合包含 localSncpAddress - //如果不是分布式(没有SNCP),sncpNodeAddresses 将不会被用到 + //如果不是分布式(没有SNCP),source 将不会被用到 @Resource(name = "$") - protected CacheSource sncpNodeAddresses; + protected CacheSource source; //当前节点的本地WebSocketEngine protected WebSocketEngine localEngine; @@ -69,14 +71,18 @@ public abstract class WebSocketNode { public void init(AnyValue conf) { this.tryAcquireSeconds = Integer.getInteger("WebSocketNode.tryAcquireSeconds", 12); - if (sncpNodeAddresses != null && "memory".equals(sncpNodeAddresses.getType())) { - sncpNodeAddresses.initValueType(InetSocketAddress.class); + if (source != null && "memory".equals(source.getType())) { + source.initValueType(WebSocketAddress.class); } if (localEngine != null) { int wsthreads = localEngine.wsthreads; if (wsthreads == 0) wsthreads = Runtime.getRuntime().availableProcessors() * 8; if (wsthreads > 0) this.semaphore = new Semaphore(wsthreads); } + String mqtopic = this.messageAgent == null ? null : this.messageAgent.generateWebSocketRespTopic(this); + if (mqtopic != null || this.localSncpAddress != null) { + this.wsaddress = new WebSocketAddress(mqtopic, localSncpAddress); + } } public void destroy(AnyValue conf) { @@ -97,45 +103,45 @@ public abstract class WebSocketNode { if (this.localEngine == null) return; //关掉所有本地本地WebSocket this.localEngine.getLocalWebSockets().forEach(g -> g.close()); - if (sncpNodeAddresses != null && localSncpAddress != null) { - sncpNodeAddresses.removeSetItem(SOURCE_SNCP_ADDRS_KEY, InetSocketAddress.class, localSncpAddress); + if (source != null && wsaddress != null) { + source.removeSetItem(SOURCE_SNCP_ADDRS_KEY, WebSocketAddress.class, this.wsaddress); } } - protected abstract CompletableFuture> getWebSocketAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable userid); + protected abstract CompletableFuture> getWebSocketAddresses(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, Serializable userid); - protected abstract CompletableFuture sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last, Serializable... userids); + protected abstract CompletableFuture sendMessage(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last, Serializable... userids); - protected abstract CompletableFuture broadcastMessage(@RpcTargetAddress InetSocketAddress targetAddress, WebSocketRange wsrange, Object message, boolean last); + protected abstract CompletableFuture broadcastMessage(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, WebSocketRange wsrange, Object message, boolean last); - protected abstract CompletableFuture sendAction(@RpcTargetAddress InetSocketAddress targetAddress, WebSocketAction action, Serializable... userids); + protected abstract CompletableFuture sendAction(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, WebSocketAction action, Serializable... userids); - protected abstract CompletableFuture broadcastAction(@RpcTargetAddress InetSocketAddress targetAddress, WebSocketAction action); + protected abstract CompletableFuture broadcastAction(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, WebSocketAction action); - protected abstract CompletableFuture connect(Serializable userid, InetSocketAddress sncpAddr); + protected abstract CompletableFuture connect(Serializable userid, WebSocketAddress wsaddr); - protected abstract CompletableFuture disconnect(Serializable userid, InetSocketAddress sncpAddr); + protected abstract CompletableFuture disconnect(Serializable userid, WebSocketAddress wsaddr); - protected abstract CompletableFuture changeUserid(Serializable fromuserid, Serializable touserid, InetSocketAddress sncpAddr); + protected abstract CompletableFuture changeUserid(Serializable fromuserid, Serializable touserid, WebSocketAddress wsaddr); - protected abstract CompletableFuture existsWebSocket(Serializable userid, @RpcTargetAddress InetSocketAddress targetAddress); + protected abstract CompletableFuture existsWebSocket(Serializable userid, @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress); - protected abstract CompletableFuture forceCloseWebSocket(Serializable userid, @RpcTargetAddress InetSocketAddress targetAddress); + protected abstract CompletableFuture forceCloseWebSocket(Serializable userid, @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress); //-------------------------------------------------------------------------------- final CompletableFuture connect(final Serializable userid) { - if (logger.isLoggable(Level.FINEST)) logger.finest(localSncpAddress + " receive websocket connect event (" + userid + " on " + (this.localEngine == null ? null : this.localEngine.getEngineid()) + ")."); - return connect(userid, localSncpAddress); + if (logger.isLoggable(Level.FINEST)) logger.finest(this.wsaddress + " receive websocket connect event (" + userid + " on " + (this.localEngine == null ? null : this.localEngine.getEngineid()) + ")."); + return connect(userid, this.wsaddress); } final CompletableFuture disconnect(final Serializable userid) { - if (logger.isLoggable(Level.FINEST)) logger.finest(localSncpAddress + " receive websocket disconnect event (" + userid + " on " + (this.localEngine == null ? null : this.localEngine.getEngineid()) + ")."); - return disconnect(userid, localSncpAddress); + if (logger.isLoggable(Level.FINEST)) logger.finest(this.wsaddress + " receive websocket disconnect event (" + userid + " on " + (this.localEngine == null ? null : this.localEngine.getEngineid()) + ")."); + return disconnect(userid, this.wsaddress); } final CompletableFuture changeUserid(Serializable olduserid, final Serializable newuserid) { - if (logger.isLoggable(Level.FINEST)) logger.finest(localSncpAddress + " receive websocket changeUserid event (from " + olduserid + " to " + newuserid + " on " + (this.localEngine == null ? null : this.localEngine.getEngineid()) + ")."); - return changeUserid(olduserid, newuserid, localSncpAddress); + if (logger.isLoggable(Level.FINEST)) logger.finest(this.wsaddress + " receive websocket changeUserid event (from " + olduserid + " to " + newuserid + " on " + (this.localEngine == null ? null : this.localEngine.getEngineid()) + ")."); + return changeUserid(olduserid, newuserid, this.wsaddress); } public final String getName() { @@ -147,15 +153,16 @@ public abstract class WebSocketNode { * 获取目标地址
* 该方法仅供内部调用 * + * @param topic RpcTargetTopic * @param targetAddress InetSocketAddress * @param userid Serializable * * @return 客户端地址列表 */ - protected CompletableFuture> remoteWebSocketAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable userid) { + protected CompletableFuture> remoteWebSocketAddresses(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, Serializable userid) { if (remoteNode == null) return CompletableFuture.completedFuture(null); try { - return remoteNode.getWebSocketAddresses(targetAddress, userid); + return remoteNode.getWebSocketAddresses(topic, targetAddress, userid); } catch (Exception e) { logger.log(Level.WARNING, "remote " + targetAddress + " websocket getOnlineRemoteAddresses error", e); return CompletableFuture.completedFuture(null); @@ -164,21 +171,21 @@ public abstract class WebSocketNode { /** * 获取用户在线的SNCP节点地址列表,不是分布式则返回元素数量为1,且元素值为null的列表
- * InetSocketAddress 为 SNCP节点地址 + * WebSocketAddress 为 SNCP节点地址 * * @param userid Serializable * * @return 地址列表 */ - public CompletableFuture> getRpcNodeAddresses(final Serializable userid) { - if (this.sncpNodeAddresses != null) { + public CompletableFuture> getRpcNodeAddresses(final Serializable userid) { + if (this.source != null) { tryAcquireSemaphore(); - CompletableFuture> result = this.sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class); + CompletableFuture> result = this.source.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, WebSocketAddress.class); if (semaphore != null) result.whenComplete((r, e) -> releaseSemaphore()); return result; } - List rs = new ArrayList<>(); - rs.add(this.localSncpAddress); + List rs = new ArrayList<>(); + rs.add(this.wsaddress); return CompletableFuture.completedFuture(rs); } @@ -191,14 +198,14 @@ public abstract class WebSocketNode { * * @return 地址集合 */ - public CompletableFuture>> getRpcNodeWebSocketAddresses(final Serializable userid) { - CompletableFuture> sncpFuture = getRpcNodeAddresses(userid); - return sncpFuture.thenCompose((Collection addrs) -> { + public CompletableFuture>> getRpcNodeWebSocketAddresses(final Serializable userid) { + CompletableFuture> sncpFuture = getRpcNodeAddresses(userid); + return sncpFuture.thenCompose((Collection addrs) -> { if (logger.isLoggable(Level.FINEST)) logger.finest("websocket found userid:" + userid + " on " + addrs); if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(new HashMap<>()); - CompletableFuture>> future = null; - for (final InetSocketAddress nodeAddress : addrs) { - CompletableFuture>> mapFuture = getWebSocketAddresses(nodeAddress, userid) + CompletableFuture>> future = null; + for (final WebSocketAddress nodeAddress : addrs) { + CompletableFuture>> mapFuture = getWebSocketAddresses(nodeAddress.getTopic(), nodeAddress.getAddr(), userid) .thenCompose((List list) -> CompletableFuture.completedFuture(Utility.ofMap(nodeAddress, list))); future = future == null ? mapFuture : future.thenCombine(mapFuture, (a, b) -> Utility.merge(a, b)); } @@ -213,11 +220,11 @@ public abstract class WebSocketNode { * @return boolean */ public CompletableFuture getUserSize() { - if (this.localEngine != null && this.sncpNodeAddresses == null) { + if (this.localEngine != null && this.source == null) { return CompletableFuture.completedFuture(this.localEngine.getLocalUserSize()); } tryAcquireSemaphore(); - CompletableFuture rs = this.sncpNodeAddresses.queryKeysStartsWithAsync(SOURCE_SNCP_USERID_PREFIX).thenApply(v -> v.size()); + CompletableFuture rs = this.source.queryKeysStartsWithAsync(SOURCE_SNCP_USERID_PREFIX).thenApply(v -> v.size()); if (semaphore != null) rs.whenComplete((r, e) -> releaseSemaphore()); return rs; } @@ -229,11 +236,11 @@ public abstract class WebSocketNode { * @return boolean */ public CompletableFuture> getUserSet() { - if (this.localEngine != null && this.sncpNodeAddresses == null) { + if (this.localEngine != null && this.source == null) { return CompletableFuture.completedFuture(new LinkedHashSet<>(this.localEngine.getLocalUserSet().stream().map(x -> String.valueOf(x)).collect(Collectors.toList()))); } tryAcquireSemaphore(); - CompletableFuture> rs = this.sncpNodeAddresses.queryKeysStartsWithAsync(SOURCE_SNCP_USERID_PREFIX).thenApply(v -> new LinkedHashSet<>(v.stream().map(x -> x.substring(SOURCE_SNCP_USERID_PREFIX.length())).collect(Collectors.toList()))); + CompletableFuture> rs = this.source.queryKeysStartsWithAsync(SOURCE_SNCP_USERID_PREFIX).thenApply(v -> new LinkedHashSet<>(v.stream().map(x -> x.substring(SOURCE_SNCP_USERID_PREFIX.length())).collect(Collectors.toList()))); if (semaphore != null) rs.whenComplete((r, e) -> releaseSemaphore()); return rs; } @@ -250,23 +257,23 @@ public abstract class WebSocketNode { if (userid instanceof WebSocketUserAddress) return existsWebSocket((WebSocketUserAddress) userid); CompletableFuture localFuture = null; if (this.localEngine != null) localFuture = CompletableFuture.completedFuture(localEngine.existsLocalWebSocket(userid)); - if (this.sncpNodeAddresses == null || this.remoteNode == null) { + if (this.source == null || this.remoteNode == null) { if (logger.isLoggable(Level.FINEST)) logger.finest("websocket remote node is null"); //没有CacheSource就不会有分布式节点 return localFuture; } //远程节点关闭 tryAcquireSemaphore(); - CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class); + CompletableFuture> addrsFuture = source.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, WebSocketAddress.class); if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore()); - CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { + CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { //if (logger.isLoggable(Level.FINEST)) logger.finest("websocket found userid:" + userid + " on " + addrs); if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(false); CompletableFuture future = null; - for (InetSocketAddress addr : addrs) { - if (addr == null || addr.equals(localSncpAddress)) continue; - future = future == null ? remoteNode.existsWebSocket(userid, addr) - : future.thenCombine(remoteNode.existsWebSocket(userid, addr), (a, b) -> a | b); + for (WebSocketAddress addr : addrs) { + if (addr == null || addr.equals(wsaddress)) continue; + future = future == null ? remoteNode.existsWebSocket(userid, addr.getTopic(), addr.getAddr()) + : future.thenCombine(remoteNode.existsWebSocket(userid, addr.getTopic(), addr.getAddr()), (a, b) -> a | b); } return future == null ? CompletableFuture.completedFuture(false) : future; }); @@ -284,23 +291,23 @@ public abstract class WebSocketNode { public CompletableFuture existsWebSocket(final WebSocketUserAddress userAddress) { CompletableFuture localFuture = null; if (this.localEngine != null) localFuture = CompletableFuture.completedFuture(localEngine.existsLocalWebSocket(userAddress.userid())); - if (this.sncpNodeAddresses == null || this.remoteNode == null) { + if (this.source == null || this.remoteNode == null) { if (logger.isLoggable(Level.FINEST)) logger.finest("websocket remote node is null"); //没有CacheSource就不会有分布式节点 return localFuture; } - Collection addrs = userAddress.sncpAddresses(); + Collection addrs = userAddress.addresses(); if (addrs != null) addrs = new ArrayList<>(addrs); //不能修改参数内部值 - if (userAddress.sncpAddress() != null) { + if (userAddress.address() != null) { if (addrs == null) addrs = new ArrayList<>(); - addrs.add(userAddress.sncpAddress()); + addrs.add(userAddress.address()); } if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(false); CompletableFuture future = null; - for (InetSocketAddress addr : addrs) { - if (addr == null || addr.equals(localSncpAddress)) continue; - future = future == null ? remoteNode.existsWebSocket(userAddress.userid(), addr) - : future.thenCombine(remoteNode.existsWebSocket(userAddress.userid(), addr), (a, b) -> a | b); + for (WebSocketAddress addr : addrs) { + if (addr == null || addr.equals(wsaddress)) continue; + future = future == null ? remoteNode.existsWebSocket(userAddress.userid(), addr.getTopic(), addr.getAddr()) + : future.thenCombine(remoteNode.existsWebSocket(userAddress.userid(), addr.getTopic(), addr.getAddr()), (a, b) -> a | b); } return future == null ? CompletableFuture.completedFuture(false) : future; } @@ -332,35 +339,35 @@ public abstract class WebSocketNode { private CompletableFuture forceCloseWebSocket(final Serializable userid, final WebSocketUserAddress userAddress) { CompletableFuture localFuture = null; if (this.localEngine != null) localFuture = CompletableFuture.completedFuture(localEngine.forceCloseLocalWebSocket(userAddress == null ? userid : userAddress.userid())); - if (this.sncpNodeAddresses == null || this.remoteNode == null) { + if (this.source == null || this.remoteNode == null) { if (logger.isLoggable(Level.FINEST)) logger.finest("websocket remote node is null"); //没有CacheSource就不会有分布式节点 return localFuture; } //远程节点关闭 - CompletableFuture> addrsFuture; + CompletableFuture> addrsFuture; if (userAddress == null) { tryAcquireSemaphore(); - addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class); + addrsFuture = source.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, WebSocketAddress.class); if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore()); } else { - Collection addrs = userAddress.sncpAddresses(); + Collection addrs = userAddress.addresses(); if (addrs != null) addrs = new ArrayList<>(addrs); //不能修改参数内部值 - if (userAddress.sncpAddress() != null) { + if (userAddress.address() != null) { if (addrs == null) addrs = new ArrayList<>(); - addrs.add(userAddress.sncpAddress()); + addrs.add(userAddress.address()); } if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0); addrsFuture = CompletableFuture.completedFuture(addrs); } - CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { + CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { if (logger.isLoggable(Level.FINEST)) logger.finest("websocket found userid:" + userid + " on " + addrs); if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0); CompletableFuture future = null; - for (InetSocketAddress addr : addrs) { - if (addr == null || addr.equals(localSncpAddress)) continue; - future = future == null ? remoteNode.forceCloseWebSocket(userid, addr) - : future.thenCombine(remoteNode.forceCloseWebSocket(userid, addr), (a, b) -> a + b); + for (WebSocketAddress addr : addrs) { + if (addr == null || addr.equals(wsaddress)) continue; + future = future == null ? remoteNode.forceCloseWebSocket(userid, addr.getTopic(), addr.getAddr()) + : future.thenCombine(remoteNode.forceCloseWebSocket(userid, addr.getTopic(), addr.getAddr()), (a, b) -> a + b); } return future == null ? CompletableFuture.completedFuture(0) : future; }); @@ -511,7 +518,7 @@ public abstract class WebSocketNode { } if (message0 instanceof CompletableFuture) return ((CompletableFuture) message0).thenApply(msg -> sendMessage(convert, msg, last, userids)); final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0 : ((convert instanceof TextConvert) ? new WebSocketPacket(((TextConvert) convert).convertTo(message0), last) : new WebSocketPacket(((BinaryConvert) convert).convertTo(message0), last)); - if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式 + if (this.localEngine != null && this.source == null) { //本地模式且没有分布式 return this.localEngine.sendLocalMessage(message, last, userids); } final Object remoteMessage = formatRemoteMessage(message); @@ -526,16 +533,16 @@ public abstract class WebSocketNode { keyuser.put(keys[i], userids[i]); } tryAcquireSemaphore(); - CompletableFuture>> addrsFuture = sncpNodeAddresses.getCollectionMapAsync(true, InetSocketAddress.class, keys); + CompletableFuture>> addrsFuture = source.getCollectionMapAsync(true, WebSocketAddress.class, keys); if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore()); - rsfuture = addrsFuture.thenCompose((Map> addrs) -> { + rsfuture = addrsFuture.thenCompose((Map> addrs) -> { if (addrs == null || addrs.isEmpty()) { if (logger.isLoggable(Level.FINER)) logger.finer("websocket not found userids:" + JsonConvert.root().convertTo(userids) + " on any node "); return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); } - Map> addrUsers = new HashMap<>(); + Map> addrUsers = new HashMap<>(); addrs.forEach((key, as) -> { - for (InetSocketAddress a : as) { + for (WebSocketAddress a : as) { addrUsers.computeIfAbsent(a, k -> new ArrayList<>()).add(keyuser.get(key)); } }); @@ -543,7 +550,7 @@ public abstract class WebSocketNode { logger.finest("websocket(localaddr=" + localSncpAddress + ", userids=" + JsonConvert.root().convertTo(userids) + ") found message-addr-userids: " + addrUsers); } CompletableFuture future = null; - for (Map.Entry> en : addrUsers.entrySet()) { + for (Map.Entry> en : addrUsers.entrySet()) { Serializable[] oneaddrUserids = en.getValue().toArray(new Serializable[en.getValue().size()]); future = future == null ? sendOneAddrMessage(en.getKey(), remoteMessage, last, oneaddrUserids) : future.thenCombine(sendOneAddrMessage(en.getKey(), remoteMessage, last, oneaddrUserids), (a, b) -> a | b); @@ -570,17 +577,17 @@ public abstract class WebSocketNode { if (useraddrs == null || useraddrs.length < 1) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); if (message0 instanceof CompletableFuture) return ((CompletableFuture) message0).thenApply(msg -> sendMessage(convert, msg, last, useraddrs)); final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0 : ((convert instanceof TextConvert) ? new WebSocketPacket(((TextConvert) convert).convertTo(message0), last) : new WebSocketPacket(((BinaryConvert) convert).convertTo(message0), last)); - if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式 + if (this.localEngine != null && this.source == null) { //本地模式且没有分布式 return this.localEngine.sendLocalMessage(message, last, userAddressToUserids(useraddrs)); } final Object remoteMessage = formatRemoteMessage(message); - final Map> addrUsers = userAddressToAddrMap(useraddrs); + final Map> addrUsers = userAddressToAddrMap(useraddrs); if (logger.isLoggable(Level.FINEST)) { logger.finest("websocket(localaddr=" + localSncpAddress + ", useraddrs=" + JsonConvert.root().convertTo(useraddrs) + ") found message-addr-userids: " + addrUsers); } CompletableFuture future = null; - for (Map.Entry> en : addrUsers.entrySet()) { + for (Map.Entry> en : addrUsers.entrySet()) { Serializable[] oneaddrUserids = en.getValue().toArray(new Serializable[en.getValue().size()]); future = future == null ? sendOneAddrMessage(en.getKey(), remoteMessage, last, oneaddrUserids) : future.thenCombine(sendOneAddrMessage(en.getKey(), remoteMessage, last, oneaddrUserids), (a, b) -> a | b); @@ -596,7 +603,7 @@ public abstract class WebSocketNode { } CompletableFuture localFuture = null; if (this.localEngine != null) localFuture = localEngine.sendLocalMessage(message, last, userid); - if (this.sncpNodeAddresses == null || this.remoteNode == null) { + if (this.source == null || this.remoteNode == null) { if (logger.isLoggable(Level.FINEST)) logger.finest("websocket remote node is null"); //没有CacheSource就不会有分布式节点 return localFuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localFuture; @@ -604,40 +611,40 @@ public abstract class WebSocketNode { //远程节点发送消息 final Object remoteMessage = formatRemoteMessage(message); tryAcquireSemaphore(); - CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class); + CompletableFuture> addrsFuture = source.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, WebSocketAddress.class); if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore()); - CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { + CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { if (addrs == null || addrs.isEmpty()) { if (logger.isLoggable(Level.FINER)) logger.finer("websocket not found userid:" + userid + " on any node "); return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); } - if (logger.isLoggable(Level.FINEST)) logger.finest("websocket(localaddr=" + localSncpAddress + ") found userid:" + userid + " on " + addrs); + if (logger.isLoggable(Level.FINEST)) logger.finest("websocket(localaddr=" + wsaddress + ") found userid:" + userid + " on " + addrs); CompletableFuture future = null; - for (InetSocketAddress addr : addrs) { - if (addr == null || addr.equals(localSncpAddress)) continue; - future = future == null ? remoteNode.sendMessage(addr, remoteMessage, last, userid) - : future.thenCombine(remoteNode.sendMessage(addr, remoteMessage, last, userid), (a, b) -> a | b); + for (WebSocketAddress addr : addrs) { + if (addr == null || addr.equals(wsaddress)) continue; + future = future == null ? remoteNode.sendMessage(addr.getTopic(), addr.getAddr(), remoteMessage, last, userid) + : future.thenCombine(remoteNode.sendMessage(addr.getTopic(), addr.getAddr(), remoteMessage, last, userid), (a, b) -> a | b); } return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; }); return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b); } - protected CompletableFuture sendOneAddrMessage(final InetSocketAddress sncpAddr, final Object message, final boolean last, final Serializable... userids) { - if (message instanceof CompletableFuture) return ((CompletableFuture) message).thenApply(msg -> sendOneAddrMessage(sncpAddr, msg, last, userids)); + protected CompletableFuture sendOneAddrMessage(final WebSocketAddress addr, final Object message, final boolean last, final Serializable... userids) { + if (message instanceof CompletableFuture) return ((CompletableFuture) message).thenApply(msg -> sendOneAddrMessage(addr, msg, last, userids)); if (logger.isLoggable(Level.FINEST) && this.localEngine == null) { //只打印远程模式的 - logger.finest("websocket want send message {userids:" + JsonConvert.root().convertTo(userids) + ", sncpaddr:" + sncpAddr + ", content:" + (message instanceof WebSocketPacket ? ((WebSocketPacket) message).toSimpleString() : (message instanceof CharSequence ? message : JsonConvert.root().convertTo(message))) + "} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine"); + logger.finest("websocket want send message {userids:" + JsonConvert.root().convertTo(userids) + ", sncpaddr:" + addr + ", content:" + (message instanceof WebSocketPacket ? ((WebSocketPacket) message).toSimpleString() : (message instanceof CharSequence ? message : JsonConvert.root().convertTo(message))) + "} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine"); } - if (Objects.equals(sncpAddr, this.localSncpAddress)) { + if (Objects.equals(addr, this.wsaddress)) { return this.localEngine == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localEngine.sendLocalMessage(message, last, userids); } - if (this.sncpNodeAddresses == null || this.remoteNode == null) { + if (this.source == null || this.remoteNode == null) { if (logger.isLoggable(Level.FINEST)) logger.finest("websocket remote node is null"); //没有CacheSource就不会有分布式节点 return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); } final Object remoteMessage = formatRemoteMessage(message); - return remoteNode.sendMessage(sncpAddr, remoteMessage, last, userids); + return remoteNode.sendMessage(addr.getTopic(), addr.getAddr(), remoteMessage, last, userids); } protected Serializable[] userAddressToUserids(WebSocketUserAddress... useraddrs) { @@ -649,14 +656,14 @@ public abstract class WebSocketNode { return set.toArray(new Serializable[set.size()]); } - protected Map> userAddressToAddrMap(WebSocketUserAddress... useraddrs) { - final Map> addrUsers = new HashMap<>(); + protected Map> userAddressToAddrMap(WebSocketUserAddress... useraddrs) { + final Map> addrUsers = new HashMap<>(); for (WebSocketUserAddress userAddress : useraddrs) { - if (userAddress.sncpAddress() != null) { - addrUsers.computeIfAbsent(userAddress.sncpAddress(), k -> new ArrayList<>()).add(userAddress.userid()); + if (userAddress.address() != null) { + addrUsers.computeIfAbsent(userAddress.address(), k -> new ArrayList<>()).add(userAddress.userid()); } - if (userAddress.sncpAddresses() != null) { - for (InetSocketAddress addr : userAddress.sncpAddresses()) { + if (userAddress.addresses() != null) { + for (WebSocketAddress addr : userAddress.addresses()) { if (addr != null) { addrUsers.computeIfAbsent(addr, k -> new ArrayList<>()).add(userAddress.userid()); } @@ -773,22 +780,22 @@ public abstract class WebSocketNode { public CompletableFuture broadcastMessage(final WebSocketRange wsrange, final Convert convert, final Object message0, final boolean last) { if (message0 instanceof CompletableFuture) return ((CompletableFuture) message0).thenApply(msg -> broadcastMessage(wsrange, convert, msg, last)); final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0 : ((convert instanceof TextConvert) ? new WebSocketPacket(((TextConvert) convert).convertTo(message0), last) : new WebSocketPacket(((BinaryConvert) convert).convertTo(message0), last)); - if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式 + if (this.localEngine != null && this.source == null) { //本地模式且没有分布式 return this.localEngine.broadcastLocalMessage(wsrange, message, last); } final Object remoteMessage = formatRemoteMessage(message); CompletableFuture localFuture = this.localEngine == null ? null : this.localEngine.broadcastLocalMessage(wsrange, message, last); tryAcquireSemaphore(); - CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_ADDRS_KEY, InetSocketAddress.class); + CompletableFuture> addrsFuture = source.getCollectionAsync(SOURCE_SNCP_ADDRS_KEY, WebSocketAddress.class); if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore()); - CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { + CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { if (logger.isLoggable(Level.FINEST)) logger.finest("websocket broadcast message (" + remoteMessage + ") on " + addrs); if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0); CompletableFuture future = null; - for (InetSocketAddress addr : addrs) { - if (addr == null || addr.equals(localSncpAddress)) continue; - future = future == null ? remoteNode.broadcastMessage(addr, wsrange, remoteMessage, last) - : future.thenCombine(remoteNode.broadcastMessage(addr, wsrange, remoteMessage, last), (a, b) -> a | b); + for (WebSocketAddress addr : addrs) { + if (addr == null || addr.equals(wsaddress)) continue; + future = future == null ? remoteNode.broadcastMessage(addr.getTopic(), addr.getAddr(), wsrange, remoteMessage, last) + : future.thenCombine(remoteNode.broadcastMessage(addr.getTopic(), addr.getAddr(), wsrange, remoteMessage, last), (a, b) -> a | b); } return future == null ? CompletableFuture.completedFuture(0) : future; }); @@ -804,21 +811,21 @@ public abstract class WebSocketNode { */ @Local public CompletableFuture broadcastAction(final WebSocketAction action) { - if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式 + if (this.localEngine != null && this.source == null) { //本地模式且没有分布式 return this.localEngine.broadcastLocalAction(action); } CompletableFuture localFuture = this.localEngine == null ? null : this.localEngine.broadcastLocalAction(action); tryAcquireSemaphore(); - CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_ADDRS_KEY, InetSocketAddress.class); + CompletableFuture> addrsFuture = source.getCollectionAsync(SOURCE_SNCP_ADDRS_KEY, WebSocketAddress.class); if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore()); - CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { + CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { if (logger.isLoggable(Level.FINEST)) logger.finest("websocket broadcast action (" + action + ") on " + addrs); if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0); CompletableFuture future = null; - for (InetSocketAddress addr : addrs) { - if (addr == null || addr.equals(localSncpAddress)) continue; - future = future == null ? remoteNode.broadcastAction(addr, action) - : future.thenCombine(remoteNode.broadcastAction(addr, action), (a, b) -> a | b); + for (WebSocketAddress addr : addrs) { + if (addr == null || addr.equals(wsaddress)) continue; + future = future == null ? remoteNode.broadcastAction(addr.getTopic(), addr.getAddr(), action) + : future.thenCombine(remoteNode.broadcastAction(addr.getTopic(), addr.getAddr(), action), (a, b) -> a | b); } return future == null ? CompletableFuture.completedFuture(0) : future; }); @@ -844,7 +851,7 @@ public abstract class WebSocketNode { } return sendAction(action, useraddrs); } - if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式 + if (this.localEngine != null && this.source == null) { //本地模式且没有分布式 return this.localEngine.sendLocalAction(action, userids); } CompletableFuture rsfuture; @@ -858,16 +865,16 @@ public abstract class WebSocketNode { keyuser.put(keys[i], userids[i]); } tryAcquireSemaphore(); - CompletableFuture>> addrsFuture = sncpNodeAddresses.getCollectionMapAsync(true, InetSocketAddress.class, keys); + CompletableFuture>> addrsFuture = source.getCollectionMapAsync(true, WebSocketAddress.class, keys); if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore()); - rsfuture = addrsFuture.thenCompose((Map> addrs) -> { + rsfuture = addrsFuture.thenCompose((Map> addrs) -> { if (addrs == null || addrs.isEmpty()) { if (logger.isLoggable(Level.FINER)) logger.finer("websocket not found userids:" + JsonConvert.root().convertTo(userids) + " on any node "); return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); } - Map> addrUsers = new HashMap<>(); + Map> addrUsers = new HashMap<>(); addrs.forEach((key, as) -> { - for (InetSocketAddress a : as) { + for (WebSocketAddress a : as) { addrUsers.computeIfAbsent(a, k -> new ArrayList<>()).add(keyuser.get(key)); } }); @@ -875,7 +882,7 @@ public abstract class WebSocketNode { logger.finest("websocket(localaddr=" + localSncpAddress + ", userids=" + JsonConvert.root().convertTo(userids) + ") found action-userid-addrs: " + addrUsers); } CompletableFuture future = null; - for (Map.Entry> en : addrUsers.entrySet()) { + for (Map.Entry> en : addrUsers.entrySet()) { Serializable[] oneaddrUserids = en.getValue().toArray(new Serializable[en.getValue().size()]); future = future == null ? sendOneAddrAction(en.getKey(), action, oneaddrUserids) : future.thenCombine(sendOneAddrAction(en.getKey(), action, oneaddrUserids), (a, b) -> a | b); @@ -898,16 +905,16 @@ public abstract class WebSocketNode { @Local public CompletableFuture sendAction(final WebSocketAction action, final WebSocketUserAddress... useraddrs) { if (useraddrs == null || useraddrs.length < 1) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); - if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式 + if (this.localEngine != null && this.source == null) { //本地模式且没有分布式 return this.localEngine.sendLocalAction(action, userAddressToUserids(useraddrs)); } - final Map> addrUsers = userAddressToAddrMap(useraddrs); + final Map> addrUsers = userAddressToAddrMap(useraddrs); if (logger.isLoggable(Level.FINEST)) { logger.finest("websocket(localaddr=" + localSncpAddress + ", useraddrs=" + JsonConvert.root().convertTo(useraddrs) + ") found action-userid-addrs: " + addrUsers); } CompletableFuture future = null; - for (Map.Entry> en : addrUsers.entrySet()) { + for (Map.Entry> en : addrUsers.entrySet()) { Serializable[] oneaddrUserids = en.getValue().toArray(new Serializable[en.getValue().size()]); future = future == null ? sendOneAddrAction(en.getKey(), action, oneaddrUserids) : future.thenCombine(sendOneAddrAction(en.getKey(), action, oneaddrUserids), (a, b) -> a | b); @@ -921,45 +928,45 @@ public abstract class WebSocketNode { } CompletableFuture localFuture = null; if (this.localEngine != null) localFuture = localEngine.sendLocalAction(action, userid); - if (this.sncpNodeAddresses == null || this.remoteNode == null) { + if (this.source == null || this.remoteNode == null) { if (logger.isLoggable(Level.FINEST)) logger.finest("websocket remote node is null"); //没有CacheSource就不会有分布式节点 return localFuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localFuture; } //远程节点发送操作 tryAcquireSemaphore(); - CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class); + CompletableFuture> addrsFuture = source.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, WebSocketAddress.class); if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore()); - CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { + CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { if (addrs == null || addrs.isEmpty()) { if (logger.isLoggable(Level.FINER)) logger.finer("websocket not found userid:" + userid + " on any node "); return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); } if (logger.isLoggable(Level.FINEST)) logger.finest("websocket(localaddr=" + localSncpAddress + ") found userid:" + userid + " on " + addrs); CompletableFuture future = null; - for (InetSocketAddress addr : addrs) { - if (addr == null || addr.equals(localSncpAddress)) continue; - future = future == null ? remoteNode.sendAction(addr, action, userid) - : future.thenCombine(remoteNode.sendAction(addr, action, userid), (a, b) -> a | b); + for (WebSocketAddress addr : addrs) { + if (addr == null || addr.equals(wsaddress)) continue; + future = future == null ? remoteNode.sendAction(addr.getTopic(), addr.getAddr(), action, userid) + : future.thenCombine(remoteNode.sendAction(addr.getTopic(), addr.getAddr(), action, userid), (a, b) -> a | b); } return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; }); return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b); } - protected CompletableFuture sendOneAddrAction(final InetSocketAddress sncpAddr, final WebSocketAction action, final Serializable... userids) { + protected CompletableFuture sendOneAddrAction(final WebSocketAddress addr, final WebSocketAction action, final Serializable... userids) { if (logger.isLoggable(Level.FINEST) && this.localEngine == null) { //只打印远程模式的 - logger.finest("websocket want send action {userids:" + JsonConvert.root().convertTo(userids) + ", sncpaddr:" + sncpAddr + ", action:" + action + " from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine"); + logger.finest("websocket want send action {userids:" + JsonConvert.root().convertTo(userids) + ", sncpaddr:" + addr + ", action:" + action + " from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine"); } - if (Objects.equals(sncpAddr, this.localSncpAddress)) { + if (Objects.equals(addr, this.wsaddress)) { return this.localEngine == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localEngine.sendLocalAction(action, userids); } - if (this.sncpNodeAddresses == null || this.remoteNode == null) { + if (this.source == null || this.remoteNode == null) { if (logger.isLoggable(Level.FINEST)) logger.finest("websocket remote node is null"); //没有CacheSource就不会有分布式节点 return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); } - return remoteNode.sendAction(sncpAddr, action, userids); + return remoteNode.sendAction(addr.getTopic(), addr.getAddr(), action, userids); } protected Object formatRemoteMessage(Object message) { diff --git a/src/org/redkale/net/http/WebSocketUserAddress.java b/src/org/redkale/net/http/WebSocketUserAddress.java index d47c91b20..7055d5154 100644 --- a/src/org/redkale/net/http/WebSocketUserAddress.java +++ b/src/org/redkale/net/http/WebSocketUserAddress.java @@ -20,76 +20,57 @@ public interface WebSocketUserAddress extends Serializable { Serializable userid(); - String mqtopic(); + WebSocketAddress address(); - Collection mqtopics(); - - InetSocketAddress sncpAddress(); - - Collection sncpAddresses(); + Collection addresses(); public static WebSocketUserAddress create(WebSocketUserAddress userAddress) { return new SimpleWebSocketUserAddress(userAddress); } - public static WebSocketUserAddress createTopic(Serializable userid, String mqtopic) { - return new SimpleWebSocketUserAddress(userid, mqtopic, null, null, null); + public static WebSocketUserAddress createTopic(Serializable userid, String mqtopic, InetSocketAddress sncpAddress) { + return new SimpleWebSocketUserAddress(userid, mqtopic, sncpAddress); } - public static WebSocketUserAddress createTopic(Serializable userid, Collection mqtopics) { - return new SimpleWebSocketUserAddress(userid, null, mqtopics, null, null); + public static WebSocketUserAddress create(Serializable userid, WebSocketAddress address) { + return new SimpleWebSocketUserAddress(userid, address); } - public static WebSocketUserAddress create(Serializable userid, InetSocketAddress sncpAddress) { - return new SimpleWebSocketUserAddress(userid, null, null, sncpAddress, null); - } - - public static WebSocketUserAddress create(Serializable userid, Collection sncpAddresses) { - return new SimpleWebSocketUserAddress(userid, null, null, null, sncpAddresses); + public static WebSocketUserAddress create(Serializable userid, Collection addresses) { + return new SimpleWebSocketUserAddress(userid, addresses); } public static class SimpleWebSocketUserAddress implements WebSocketUserAddress { private Serializable userid; - private String mqtopic; + private WebSocketAddress address; - private Collection mqtopics; - - private InetSocketAddress sncpAddress; - - private Collection sncpAddresses; + private Collection addresses; public SimpleWebSocketUserAddress() { } public SimpleWebSocketUserAddress(Serializable userid, String mqtopic, InetSocketAddress sncpAddress) { this.userid = userid; - this.mqtopic = mqtopic; - this.sncpAddress = sncpAddress; + this.address = new WebSocketAddress(mqtopic, sncpAddress); } - public SimpleWebSocketUserAddress(Serializable userid, Collection mqtopics, Collection sncpAddresses) { + public SimpleWebSocketUserAddress(Serializable userid, WebSocketAddress address) { this.userid = userid; - this.mqtopics = mqtopics; - this.sncpAddresses = sncpAddresses; + this.address = address; } - public SimpleWebSocketUserAddress(Serializable userid, String mqtopic, Collection mqtopics, InetSocketAddress sncpAddress, Collection sncpAddresses) { + public SimpleWebSocketUserAddress(Serializable userid, Collection addresses) { this.userid = userid; - this.mqtopic = mqtopic; - this.mqtopics = mqtopics; - this.sncpAddress = sncpAddress; - this.sncpAddresses = sncpAddresses; + this.addresses = addresses; } public SimpleWebSocketUserAddress(WebSocketUserAddress userAddress) { if (userAddress == null) return; this.userid = userAddress.userid(); - this.mqtopic = userAddress.mqtopic(); - this.mqtopics = userAddress.mqtopics(); - this.sncpAddress = userAddress.sncpAddress(); - this.sncpAddresses = userAddress.sncpAddresses(); + this.address = userAddress.address(); + this.addresses = userAddress.addresses(); } @Override @@ -98,23 +79,13 @@ public interface WebSocketUserAddress extends Serializable { } @Override - public String mqtopic() { - return mqtopic; + public WebSocketAddress address() { + return address; } @Override - public Collection mqtopics() { - return mqtopics; - } - - @Override - public InetSocketAddress sncpAddress() { - return sncpAddress; - } - - @Override - public Collection sncpAddresses() { - return sncpAddresses; + public Collection addresses() { + return addresses; } public Serializable getUserid() { @@ -125,36 +96,20 @@ public interface WebSocketUserAddress extends Serializable { this.userid = userid; } - public String getMqtopic() { - return mqtopic; + public WebSocketAddress getAddress() { + return address; } - public void setMqtopic(String mqtopic) { - this.mqtopic = mqtopic; + public void setAddress(WebSocketAddress address) { + this.address = address; } - public Collection getMqtopics() { - return mqtopics; + public Collection getAddresses() { + return addresses; } - public void setMqtopics(Collection mqtopics) { - this.mqtopics = mqtopics; - } - - public InetSocketAddress getSncpAddress() { - return sncpAddress; - } - - public void setSncpAddress(InetSocketAddress sncpAddress) { - this.sncpAddress = sncpAddress; - } - - public Collection getSncpAddresses() { - return sncpAddresses; - } - - public void setSncpAddresses(Collection sncpAddresses) { - this.sncpAddresses = sncpAddresses; + public void setAddresses(Collection addresses) { + this.addresses = addresses; } @Override diff --git a/src/org/redkale/net/sncp/SncpClient.java b/src/org/redkale/net/sncp/SncpClient.java index f54702283..44adbb41f 100644 --- a/src/org/redkale/net/sncp/SncpClient.java +++ b/src/org/redkale/net/sncp/SncpClient.java @@ -281,7 +281,9 @@ public final class SncpClient { if (messageAgent != null) { //MQ模式 final byte[] reqbytes = writer.toArray(); fillHeader(ByteBuffer.wrap(reqbytes), seqid, actionid, reqBodyLength); - MessageRecord message = new MessageRecord(ConvertType.BSON, this.topic, null, reqbytes); + String targetTopic = action.topicTargetParamIndex >= 0 ? (String) params[action.topicTargetParamIndex] : this.topic; + if (targetTopic == null) targetTopic = this.topic; + MessageRecord message = new MessageRecord(ConvertType.BSON, targetTopic, null, reqbytes); return messageAgent.sendRemoteSncp(null, message).thenApply(msg -> { ByteBuffer buffer = ByteBuffer.wrap(msg.getContent()); checkResult(seqid, action, buffer); @@ -511,6 +513,8 @@ public final class SncpClient { protected final int addressSourceParamIndex; + protected final int topicTargetParamIndex; + protected final boolean boolReturnTypeFuture; // 返回结果类型是否为 CompletableFuture protected final Creator futureCreator; @@ -526,6 +530,7 @@ public final class SncpClient { this.paramClass = method.getParameterTypes(); this.method = method; Annotation[][] anns = method.getParameterAnnotations(); + int tpoicAddrIndex = -1; int targetAddrIndex = -1; int sourceAddrIndex = -1; int handlerAttachIndex = -1; @@ -559,6 +564,8 @@ public final class SncpClient { targetAddrIndex = i; } else if (ann.annotationType() == RpcSourceAddress.class && SocketAddress.class.isAssignableFrom(params[i])) { sourceAddrIndex = i; + } else if (ann.annotationType() == RpcTargetTopic.class && String.class.isAssignableFrom(params[i])) { + tpoicAddrIndex = i; } } for (Annotation ann : anns[i]) { @@ -575,6 +582,7 @@ public final class SncpClient { } } } + this.topicTargetParamIndex = tpoicAddrIndex; this.addressTargetParamIndex = targetAddrIndex; this.addressSourceParamIndex = sourceAddrIndex; this.handlerFuncParamIndex = handlerFuncIndex; diff --git a/src/org/redkale/service/RpcTargetTopic.java b/src/org/redkale/service/RpcTargetTopic.java new file mode 100644 index 000000000..b94cf8a25 --- /dev/null +++ b/src/org/redkale/service/RpcTargetTopic.java @@ -0,0 +1,28 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.service; + +import java.lang.annotation.*; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +/** + * SNCP协议中标记为目标topic参数, 该注解只能标记在类型为String的参数上。 + * + * + * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.1.0 + */ +@Inherited +@Documented +@Target({PARAMETER}) +@Retention(RUNTIME) +public @interface RpcTargetTopic { + +} diff --git a/src/org/redkale/service/WebSocketNodeService.java b/src/org/redkale/service/WebSocketNodeService.java index 6b397aecf..2c13764d2 100644 --- a/src/org/redkale/service/WebSocketNodeService.java +++ b/src/org/redkale/service/WebSocketNodeService.java @@ -41,8 +41,8 @@ public class WebSocketNodeService extends WebSocketNode implements Service { } @Override - public CompletableFuture> getWebSocketAddresses(final @RpcTargetAddress InetSocketAddress targetAddress, final Serializable groupid) { - if (localSncpAddress == null || !localSncpAddress.equals(targetAddress)) return remoteWebSocketAddresses(targetAddress, groupid); + public CompletableFuture> getWebSocketAddresses(@RpcTargetTopic String topic, final @RpcTargetAddress InetSocketAddress targetAddress, final Serializable groupid) { + if ((topic == null || !topic.equals(wsaddress.getTopic())) && (localSncpAddress == null || !localSncpAddress.equals(targetAddress))) return remoteWebSocketAddresses(topic, targetAddress, groupid); if (this.localEngine == null) return CompletableFuture.completedFuture(new ArrayList<>()); ExecutorService executor = null; @@ -60,25 +60,25 @@ public class WebSocketNodeService extends WebSocketNode implements Service { } @Override - public CompletableFuture sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last, Serializable... userids) { + public CompletableFuture sendMessage(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last, Serializable... userids) { if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); return this.localEngine.sendLocalMessage(message, last, userids); } @Override - public CompletableFuture broadcastMessage(@RpcTargetAddress InetSocketAddress targetAddress, final WebSocketRange wsrange, Object message, boolean last) { + public CompletableFuture broadcastMessage(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, final WebSocketRange wsrange, Object message, boolean last) { if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); return this.localEngine.broadcastLocalMessage(wsrange, message, last); } @Override - public CompletableFuture sendAction(@RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action, Serializable... userids) { + public CompletableFuture sendAction(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action, Serializable... userids) { if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); return this.localEngine.sendLocalAction(action, userids); } @Override - public CompletableFuture broadcastAction(@RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action) { + public CompletableFuture broadcastAction(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action) { if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); return this.localEngine.broadcastLocalAction(action); } @@ -86,35 +86,35 @@ public class WebSocketNodeService extends WebSocketNode implements Service { /** * 当用户连接到节点,需要更新到CacheSource * - * @param userid Serializable - * @param sncpAddr InetSocketAddress + * @param userid Serializable + * @param wsaddr WebSocketAddress * * @return 无返回值 */ @Override - public CompletableFuture connect(Serializable userid, InetSocketAddress sncpAddr) { + public CompletableFuture connect(Serializable userid, WebSocketAddress wsaddr) { tryAcquireSemaphore(); - CompletableFuture future = sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class, sncpAddr); - future = future.thenAccept((a) -> sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_ADDRS_KEY, InetSocketAddress.class, sncpAddr)); + CompletableFuture future = source.appendSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, WebSocketAddress.class, wsaddr); + future = future.thenAccept((a) -> source.appendSetItemAsync(SOURCE_SNCP_ADDRS_KEY, WebSocketAddress.class, wsaddr)); if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore()); - if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " connect from " + sncpAddr); + if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " connect from " + wsaddr); return future; } /** * 当用户从一个节点断掉了所有的连接,需要从CacheSource中删除 * - * @param userid Serializable - * @param sncpAddr InetSocketAddress + * @param userid Serializable + * @param wsaddr WebSocketAddress * * @return 无返回值 */ @Override - public CompletableFuture disconnect(Serializable userid, InetSocketAddress sncpAddr) { + public CompletableFuture disconnect(Serializable userid, WebSocketAddress wsaddr) { tryAcquireSemaphore(); - CompletableFuture future = sncpNodeAddresses.removeSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class, sncpAddr); + CompletableFuture future = source.removeSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, WebSocketAddress.class, wsaddr); if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore()); - if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " disconnect from " + sncpAddr); + if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " disconnect from " + wsaddr); return future; } @@ -123,17 +123,17 @@ public class WebSocketNodeService extends WebSocketNode implements Service { * * @param olduserid Serializable * @param newuserid Serializable - * @param sncpAddr InetSocketAddress + * @param wsaddr WebSocketAddress * * @return 无返回值 */ @Override - public CompletableFuture changeUserid(Serializable olduserid, Serializable newuserid, InetSocketAddress sncpAddr) { + public CompletableFuture changeUserid(Serializable olduserid, Serializable newuserid, WebSocketAddress wsaddr) { tryAcquireSemaphore(); - CompletableFuture future = sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_USERID_PREFIX + newuserid, InetSocketAddress.class, sncpAddr); - future = future.thenAccept((a) -> sncpNodeAddresses.removeSetItemAsync(SOURCE_SNCP_USERID_PREFIX + olduserid, InetSocketAddress.class, sncpAddr)); + CompletableFuture future = source.appendSetItemAsync(SOURCE_SNCP_USERID_PREFIX + newuserid, WebSocketAddress.class, wsaddr); + future = future.thenAccept((a) -> source.removeSetItemAsync(SOURCE_SNCP_USERID_PREFIX + olduserid, WebSocketAddress.class, wsaddr)); if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore()); - if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + olduserid + " changeUserid to " + newuserid + " from " + sncpAddr); + if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + olduserid + " changeUserid to " + newuserid + " from " + wsaddr); return future; } @@ -141,12 +141,13 @@ public class WebSocketNodeService extends WebSocketNode implements Service { * 判断用户是否有WebSocket * * @param userid Serializable + * @param topic RpcTargetTopic * @param targetAddress InetSocketAddress * * @return 无返回值 */ @Override - public CompletableFuture existsWebSocket(Serializable userid, @RpcTargetAddress InetSocketAddress targetAddress) { + public CompletableFuture existsWebSocket(Serializable userid, @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress) { if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " existsWebSocket from " + targetAddress); if (localEngine == null) return CompletableFuture.completedFuture(false); return CompletableFuture.completedFuture(localEngine.existsLocalWebSocket(userid)); @@ -156,12 +157,13 @@ public class WebSocketNodeService extends WebSocketNode implements Service { * 强制关闭用户的WebSocket * * @param userid Serializable + * @param topic RpcTargetTopic * @param targetAddress InetSocketAddress * * @return 无返回值 */ @Override - public CompletableFuture forceCloseWebSocket(Serializable userid, @RpcTargetAddress InetSocketAddress targetAddress) { + public CompletableFuture forceCloseWebSocket(Serializable userid, @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress) { //不能从sncpNodeAddresses中移除,因为engine.forceCloseWebSocket 会调用到disconnect if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " forceCloseWebSocket from " + targetAddress); if (localEngine == null) return CompletableFuture.completedFuture(0);