diff --git a/src/org/redkale/net/http/WebSocket.java b/src/org/redkale/net/http/WebSocket.java index 4beef2eed..31c85bef2 100644 --- a/src/org/redkale/net/http/WebSocket.java +++ b/src/org/redkale/net/http/WebSocket.java @@ -262,6 +262,59 @@ public abstract class WebSocket { return rs; } + /** + * 广播消息, 给所有人的所有接入的WebSocket节点发消息 + * + * @param message 消息内容 + * + * @return 为0表示成功, 其他值表示部分发送异常 + */ + public final CompletableFuture broadcastEachMessage(final Object message) { + return broadcastMessage(false, message, true); + } + + /** + * 广播消息, 给所有人最近接入的WebSocket节点发消息 + * + * @param message 消息内容 + * + * @return 为0表示成功, 其他值表示部分发送异常 + */ + public final CompletableFuture broadcastRecentMessage(final Object message) { + return broadcastMessage(true, message, true); + } + + /** + * 广播消息, 给所有人发消息 + * + * @param recent 是否只发送给最近接入的WebSocket节点 + * @param message 消息内容 + * + * @return 为0表示成功, 其他值表示部分发送异常 + */ + public final CompletableFuture broadcastMessage(final boolean recent, final Object message) { + return broadcastMessage(recent, message, true); + } + + /** + * 广播消息, 给所有人发消息 + * + * @param recent 是否只发送给最近接入的WebSocket节点 + * @param message 消息内容 + * @param last 是否最后一条 + * + * @return 为0表示成功, 其他值表示部分发送异常 + */ + public final CompletableFuture broadcastMessage(final boolean recent, final Object message, final boolean last) { + if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL); + if (message instanceof CompletableFuture) { + return ((CompletableFuture) message).thenCompose((json) -> _engine.node.broadcastMessage(recent, json, last)); + } + CompletableFuture rs = _engine.node.broadcastMessage(recent, message, last); + if (_engine.finest) _engine.logger.finest("broadcast " + (recent ? "recent " : "") + "send websocket result is " + rs + " on " + this + " by message(" + _jsonConvert.convertTo(message) + ")"); + return rs; + } + /** * 获取当前WebSocket下的属性,非线程安全 * diff --git a/src/org/redkale/net/http/WebSocketEngine.java b/src/org/redkale/net/http/WebSocketEngine.java index 142024819..8e982a3cb 100644 --- a/src/org/redkale/net/http/WebSocketEngine.java +++ b/src/org/redkale/net/http/WebSocketEngine.java @@ -107,11 +107,39 @@ public final class WebSocketEngine { } } + public CompletableFuture broadcastMessage(final boolean recent, final Object message, final boolean last) { + if (message instanceof CompletableFuture) { + return ((CompletableFuture) message).thenCompose((json) -> broadcastMessage(recent, json, last)); + } + final Collection groups = getWebSocketGroups(); + final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null) && groups.size() > 1; + if (more) { + final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message + : ((message == null || message instanceof CharSequence || message instanceof byte[]) + ? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.convert, message, last)); + packet.setSendBuffers(packet.encode(context.getBufferSupplier())); + CompletableFuture future = null; + for (WebSocketGroup group : groups) { + if (group == null) continue; + future = future == null ? group.send(recent, message, last) : future.thenCombine(group.send(recent, message, last), (a, b) -> a | b); + } + if (future != null) future = future.whenComplete((rs, ex) -> context.offerBuffer(packet.sendBuffers)); + return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; + } else { + CompletableFuture future = null; + for (WebSocketGroup group : groups) { + if (group == null) continue; + future = future == null ? group.send(recent, message, last) : future.thenCombine(group.send(recent, message, last), (a, b) -> a | b); + } + return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; + } + } + CompletableFuture sendMessage(final boolean recent, final Object message, final boolean last, final Serializable... groupids) { if (message instanceof CompletableFuture) { return ((CompletableFuture) message).thenCompose((json) -> sendMessage(recent, json, last, groupids)); } - final boolean more = !(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null || groupids.length > 1; + final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null) && groupids.length > 1; if (more) { final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message : ((message == null || message instanceof CharSequence || message instanceof byte[]) diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index c0b87e4c3..0d54ca9b5 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -41,7 +41,7 @@ public abstract class WebSocketNode { //存放所有用户分布在节点上的队列信息,Set 为 sncpnode 的集合, key: groupid //集合包含 localSncpAddress //如果不是分布式(没有SNCP),sncpNodeAddresses 将不会被用到 - @Resource(name = "$") + @Resource(name = "$_nodes") protected CacheSource sncpNodeAddresses; //当前节点的本地WebSocketEngine @@ -57,12 +57,15 @@ public abstract class WebSocketNode { if (this.localEngine == null) return; //关掉所有本地本地WebSocket this.localEngine.getWebSocketGroups().forEach(g -> disconnect(g.getGroupid())); + if (sncpNodeAddresses != null && localSncpAddress != null) sncpNodeAddresses.removeSetItem("redkale_sncpnodes", localSncpAddress); } protected abstract CompletableFuture> getWebSocketAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid); protected abstract CompletableFuture sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, boolean recent, Object message, boolean last, Serializable groupid); + protected abstract CompletableFuture broadcastMessage(@RpcTargetAddress InetSocketAddress targetAddress, boolean recent, Object message, boolean last); + protected abstract CompletableFuture connect(Serializable groupid, InetSocketAddress addr); protected abstract CompletableFuture disconnect(Serializable groupid, InetSocketAddress addr); @@ -167,7 +170,7 @@ public abstract class WebSocketNode { * @param message 消息内容 * @param last 是否最后一条 * - * @return 为0表示成功, 其他值表示异常 + * @return 为0表示成功, 其他值表示部分发送异常 */ //最近连接发送逻辑还没有理清楚 public final CompletableFuture sendMessage(final boolean recent, final Object message, final boolean last, final Serializable... groupids) { @@ -183,6 +186,69 @@ public abstract class WebSocketNode { return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; } + /** + * 广播消息, 给所有人的所有接入的WebSocket节点发消息 + * + * @param message 消息内容 + * + * @return 为0表示成功, 其他值表示部分发送异常 + */ + public final CompletableFuture broadcastEachMessage(final Object message) { + return broadcastMessage(false, message, true); + } + + /** + * 广播消息, 给所有人最近接入的WebSocket节点发消息 + * + * @param message 消息内容 + * + * @return 为0表示成功, 其他值表示部分发送异常 + */ + public final CompletableFuture broadcastRecentMessage(final Object message) { + return broadcastMessage(true, message, true); + } + + /** + * 广播消息, 给所有人发消息 + * + * @param recent 是否只发送给最近接入的WebSocket节点 + * @param message 消息内容 + * + * @return 为0表示成功, 其他值表示部分发送异常 + */ + public final CompletableFuture broadcastMessage(final boolean recent, final Object message) { + return broadcastMessage(recent, message, true); + } + + /** + * 广播消息, 给所有人发消息 + * + * @param recent 是否只发送给最近接入的WebSocket节点 + * @param message 消息内容 + * @param last 是否最后一条 + * + * @return 为0表示成功, 其他值表示部分发送异常 + */ + public final CompletableFuture broadcastMessage(final boolean recent, final Object message, final boolean last) { + if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式 + return this.localEngine.broadcastMessage(recent, message, last); + } + + CompletableFuture localFuture = this.localEngine == null ? null : this.localEngine.broadcastMessage(recent, message, last); + CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync("redkale_sncpnodes"); + CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { + if (finest) logger.finest("websocket broadcast message on " + addrs); + if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0); + CompletableFuture future = null; + for (InetSocketAddress addr : addrs) { + future = future == null ? remoteNode.broadcastMessage(addr, recent, message, last) + : future.thenCombine(remoteNode.broadcastMessage(addr, recent, message, last), (a, b) -> a | b); + } + return future == null ? CompletableFuture.completedFuture(0) : future; + }); + return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b); + } + private CompletableFuture sendOneMessage(final boolean recent, final Object message, final boolean last, final Serializable groupid) { if (finest) logger.finest("websocket want send message {groupid:" + groupid + ", content:'" + message + "'} from locale node to locale engine"); CompletableFuture localFuture = null; diff --git a/src/org/redkale/service/WebSocketNodeService.java b/src/org/redkale/service/WebSocketNodeService.java index 31d5dfdf0..7f7dff4c7 100644 --- a/src/org/redkale/service/WebSocketNodeService.java +++ b/src/org/redkale/service/WebSocketNodeService.java @@ -57,6 +57,12 @@ public class WebSocketNodeService extends WebSocketNode implements Service { return group.send(recent, message, last); } + @Override + public CompletableFuture broadcastMessage(@RpcTargetAddress InetSocketAddress addr, boolean recent, Object message, boolean last) { + if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); + return this.localEngine.broadcastMessage(recent, message, last); + } + /** * 当用户连接到节点,需要更新到CacheSource * @@ -68,6 +74,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service { @Override public CompletableFuture connect(Serializable groupid, InetSocketAddress sncpAddr) { CompletableFuture future = sncpNodeAddresses.appendSetItemAsync(groupid, sncpAddr); + future = future.thenAccept((a) -> sncpNodeAddresses.appendSetItemAsync("redkale_sncpnodes", sncpAddr)); if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " connect from " + sncpAddr); return future; } diff --git a/test/org/redkale/test/ws/ChatWebSocket.java b/test/org/redkale/test/ws/ChatWebSocket.java index e322e5484..35f37061d 100644 --- a/test/org/redkale/test/ws/ChatWebSocket.java +++ b/test/org/redkale/test/ws/ChatWebSocket.java @@ -30,8 +30,8 @@ public class ChatWebSocket extends WebSocket { @RestOnMessage(name = "sendmessage") public void onChatMessage(ChatMessage message, Map extmap) { - System.out.println("获取消息: message: " + message + ", map: " + extmap); - super.send(message); + System.out.println("获取消息: message: " + message + ", map: " + extmap); + super.broadcastEachMessage(message); //给所有人广播 } @RestOnMessage(name = "joinroom")