diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index 64ea41aee..78af60580 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -319,88 +319,88 @@ public abstract class WebSocketNode { * 向指定用户发送消息,先发送本地连接,再发送远程连接
* 如果当前WebSocketNode是远程模式,此方法只发送远程连接 * - * @param message 消息内容 - * @param userids Stream + * @param message 消息内容 + * @param useridOrAddrs Stream * * @return 为0表示成功, 其他值表示部分发送异常 */ @Local - public final CompletableFuture sendMessage(Object message, final Stream userids) { - return sendMessage((Convert) null, message, true, userids); + public final CompletableFuture sendMessage(Object message, final Stream useridOrAddrs) { + return sendMessage((Convert) null, message, true, useridOrAddrs); } /** * 向指定用户发送消息,先发送本地连接,再发送远程连接
* 如果当前WebSocketNode是远程模式,此方法只发送远程连接 * - * @param message 消息内容 - * @param userids Serializable[] + * @param message 消息内容 + * @param useridOrAddrs Serializable[] * * @return 为0表示成功, 其他值表示部分发送异常 */ @Local - public final CompletableFuture sendMessage(Object message, final Serializable... userids) { - return sendMessage((Convert) null, message, true, userids); + public final CompletableFuture sendMessage(Object message, final Serializable... useridOrAddrs) { + return sendMessage((Convert) null, message, true, useridOrAddrs); } /** * 向指定用户发送消息,先发送本地连接,再发送远程连接
* 如果当前WebSocketNode是远程模式,此方法只发送远程连接 * - * @param convert Convert - * @param message 消息内容 - * @param userids Stream + * @param convert Convert + * @param message 消息内容 + * @param useridOrAddrs Stream * * @return 为0表示成功, 其他值表示部分发送异常 */ @Local - public final CompletableFuture sendMessage(final Convert convert, Object message, final Stream userids) { - return sendMessage(convert, message, true, userids); + public final CompletableFuture sendMessage(final Convert convert, Object message, final Stream useridOrAddrs) { + return sendMessage(convert, message, true, useridOrAddrs); } /** * 向指定用户发送消息,先发送本地连接,再发送远程连接
* 如果当前WebSocketNode是远程模式,此方法只发送远程连接 * - * @param convert Convert - * @param message 消息内容 - * @param userids Serializable[] + * @param convert Convert + * @param message 消息内容 + * @param useridOrAddrs Serializable[] * * @return 为0表示成功, 其他值表示部分发送异常 */ @Local - public final CompletableFuture sendMessage(final Convert convert, Object message, final Serializable... userids) { - return sendMessage(convert, message, true, userids); + public final CompletableFuture sendMessage(final Convert convert, Object message, final Serializable... useridOrAddrs) { + return sendMessage(convert, message, true, useridOrAddrs); } /** * 向指定用户发送消息,先发送本地连接,再发送远程连接
* 如果当前WebSocketNode是远程模式,此方法只发送远程连接 * - * @param message 消息内容 - * @param last 是否最后一条 - * @param userids Stream + * @param message 消息内容 + * @param last 是否最后一条 + * @param useridOrAddrs Stream * * @return 为0表示成功, 其他值表示部分发送异常 */ @Local - public final CompletableFuture sendMessage(final Object message, final boolean last, final Stream userids) { - return sendMessage((Convert) null, message, last, userids); + public final CompletableFuture sendMessage(final Object message, final boolean last, final Stream useridOrAddrs) { + return sendMessage((Convert) null, message, last, useridOrAddrs); } /** * 向指定用户发送消息,先发送本地连接,再发送远程连接
* 如果当前WebSocketNode是远程模式,此方法只发送远程连接 * - * @param message 消息内容 - * @param last 是否最后一条 - * @param userids Serializable[] + * @param message 消息内容 + * @param last 是否最后一条 + * @param useridOrAddrs Serializable[] * * @return 为0表示成功, 其他值表示部分发送异常 */ @Local - public final CompletableFuture sendMessage(final Object message, final boolean last, final Serializable... userids) { - return sendMessage((Convert) null, message, last, userids); + public final CompletableFuture sendMessage(final Object message, final boolean last, final Serializable... useridOrAddrs) { + return sendMessage((Convert) null, message, last, useridOrAddrs); } /** @@ -438,6 +438,13 @@ public abstract class WebSocketNode { @Local public CompletableFuture sendMessage(final Convert convert, final Object message0, final boolean last, final Serializable... userids) { if (userids == null || userids.length < 1) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); + if (userids[0] instanceof WebSocketUserAddress) { + WebSocketUserAddress[] useraddrs = new WebSocketUserAddress[userids.length]; + for (int i = 0; i < useraddrs.length; i++) { + useraddrs[i] = (WebSocketUserAddress) userids[i]; + } + return sendMessage(convert, message0, last, useraddrs); + } if (message0 instanceof CompletableFuture) return ((CompletableFuture) message0).thenApply(msg -> sendMessage(convert, msg, last, userids)); final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0 : ((convert instanceof TextConvert) ? new WebSocketPacket(((TextConvert) convert).convertTo(message0), last) : new WebSocketPacket(((BinaryConvert) convert).convertTo(message0), last)); if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式 @@ -483,6 +490,41 @@ public abstract class WebSocketNode { return rsfuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : rsfuture; } + /** + * 向指定用户发送消息,先发送本地连接,再发送远程连接
+ * 如果当前WebSocketNode是远程模式,此方法只发送远程连接 + * + * @param convert Convert + * @param message0 消息内容 + * @param last 是否最后一条 + * @param useraddrs WebSocketUserAddress[] + * + * @return 为0表示成功, 其他值表示部分发送异常 + */ + @Local + public CompletableFuture sendMessage(final Convert convert, final Object message0, final boolean last, final WebSocketUserAddress... useraddrs) { + if (useraddrs == null || useraddrs.length < 1) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); + if (message0 instanceof CompletableFuture) return ((CompletableFuture) message0).thenApply(msg -> sendMessage(convert, msg, last, useraddrs)); + final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0 : ((convert instanceof TextConvert) ? new WebSocketPacket(((TextConvert) convert).convertTo(message0), last) : new WebSocketPacket(((BinaryConvert) convert).convertTo(message0), last)); + if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式 + return this.localEngine.sendLocalMessage(message, last, userAddressToUserids(useraddrs)); + } + + final Object remoteMessage = formatRemoteMessage(message); + final Map> addrUsers = userAddressToAddrMap(useraddrs); + if (logger.isLoggable(Level.FINEST)) { + logger.finest("websocket(localaddr=" + localSncpAddress + ", useraddrs=" + JsonConvert.root().convertTo(useraddrs) + ") found message-addr-userids: " + addrUsers); + } + CompletableFuture future = null; + for (Map.Entry> en : addrUsers.entrySet()) { + Serializable[] oneaddrUserids = en.getValue().toArray(new Serializable[en.getValue().size()]); + future = future == null ? sendOneAddrMessage(en.getKey(), remoteMessage, last, oneaddrUserids) + : future.thenCombine(sendOneAddrMessage(en.getKey(), remoteMessage, last, oneaddrUserids), (a, b) -> a | b); + } + return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; + + } + protected CompletableFuture sendOneUserMessage(final Object message, final boolean last, final Serializable userid) { if (message instanceof CompletableFuture) return ((CompletableFuture) message).thenApply(msg -> sendOneUserMessage(msg, last, userid)); if (logger.isLoggable(Level.FINEST)) { @@ -534,6 +576,32 @@ public abstract class WebSocketNode { return remoteNode.sendMessage(sncpAddr, remoteMessage, last, userids); } + protected Serializable[] userAddressToUserids(WebSocketUserAddress... useraddrs) { + if (useraddrs == null || useraddrs.length == 1) return new Serializable[0]; + Set set = new HashSet<>(); + for (WebSocketUserAddress userAddress : useraddrs) { + set.add(userAddress.userid()); + } + return set.toArray(new Serializable[set.size()]); + } + + protected Map> userAddressToAddrMap(WebSocketUserAddress... useraddrs) { + final Map> addrUsers = new HashMap<>(); + for (WebSocketUserAddress userAddress : useraddrs) { + if (userAddress.sncpAddress() != null) { + addrUsers.computeIfAbsent(userAddress.sncpAddress(), k -> new ArrayList<>()).add(userAddress.userid()); + } + if (userAddress.sncpAddresses() != null) { + for (InetSocketAddress addr : userAddress.sncpAddresses()) { + if (addr != null) { + addrUsers.computeIfAbsent(addr, k -> new ArrayList<>()).add(userAddress.userid()); + } + } + } + } + return addrUsers; + } + /** * 广播消息, 给所有人发消息 * @@ -705,6 +773,13 @@ public abstract class WebSocketNode { @Local public CompletableFuture sendAction(final WebSocketAction action, final Serializable... userids) { if (userids == null || userids.length < 1) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); + if (userids[0] instanceof WebSocketUserAddress) { + WebSocketUserAddress[] useraddrs = new WebSocketUserAddress[userids.length]; + for (int i = 0; i < useraddrs.length; i++) { + useraddrs[i] = (WebSocketUserAddress) userids[i]; + } + return sendAction(action, useraddrs); + } if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式 return this.localEngine.sendLocalAction(action, userids); } @@ -747,6 +822,35 @@ public abstract class WebSocketNode { return rsfuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : rsfuture; } + /** + * 向指定用户发送操作,先发送本地连接,再发送远程连接
+ * 如果当前WebSocketNode是远程模式,此方法只发送远程连接 + * + * @param action 操作参数 + * @param useraddrs WebSocketUserAddress[] + * + * @return 为0表示成功, 其他值表示部分发送异常 + */ + @Local + public CompletableFuture sendAction(final WebSocketAction action, final WebSocketUserAddress... useraddrs) { + if (useraddrs == null || useraddrs.length < 1) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); + if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式 + return this.localEngine.sendLocalAction(action, userAddressToUserids(useraddrs)); + } + + final Map> addrUsers = userAddressToAddrMap(useraddrs); + if (logger.isLoggable(Level.FINEST)) { + logger.finest("websocket(localaddr=" + localSncpAddress + ", useraddrs=" + JsonConvert.root().convertTo(useraddrs) + ") found action-userid-addrs: " + addrUsers); + } + CompletableFuture future = null; + for (Map.Entry> en : addrUsers.entrySet()) { + Serializable[] oneaddrUserids = en.getValue().toArray(new Serializable[en.getValue().size()]); + future = future == null ? sendOneAddrAction(en.getKey(), action, oneaddrUserids) + : future.thenCombine(sendOneAddrAction(en.getKey(), action, oneaddrUserids), (a, b) -> a | b); + } + return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; + } + protected CompletableFuture sendOneUserAction(final WebSocketAction action, final Serializable userid) { if (logger.isLoggable(Level.FINEST)) { logger.finest("websocket want send action {userid:" + userid + ", action:" + action + "} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine"); diff --git a/src/org/redkale/net/http/WebSocketUserAddress.java b/src/org/redkale/net/http/WebSocketUserAddress.java index 85adc2896..9c2881d9b 100644 --- a/src/org/redkale/net/http/WebSocketUserAddress.java +++ b/src/org/redkale/net/http/WebSocketUserAddress.java @@ -13,12 +13,10 @@ import org.redkale.convert.json.JsonConvert; /** * userid 与 sncpaddress组合对象 * - *

- * 详情见: https://redkale.org * * @author zhangjx */ -public interface WebSocketUserAddress { +public interface WebSocketUserAddress extends Serializable { Serializable userid();