From fda9c30dc4b8f24ca9f69103581e9212698cd4b0 Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Mon, 22 May 2017 19:22:24 +0800 Subject: [PATCH] --- src/org/redkale/net/http/WebSocket.java | 82 +++++++++++-------- src/org/redkale/net/http/WebSocketEngine.java | 11 +++ src/org/redkale/net/http/WebSocketNode.java | 52 +++++++----- .../redkale/net/http/WebSocketServlet.java | 2 +- .../redkale/service/WebSocketNodeService.java | 2 +- .../test/websocket/ChatWebSocketServlet.java | 11 ++- 6 files changed, 99 insertions(+), 61 deletions(-) diff --git a/src/org/redkale/net/http/WebSocket.java b/src/org/redkale/net/http/WebSocket.java index e5306ef96..eb85ccee8 100644 --- a/src/org/redkale/net/http/WebSocket.java +++ b/src/org/redkale/net/http/WebSocket.java @@ -36,9 +36,10 @@ import org.redkale.util.Comment; * 详情见: https://redkale.org * * @author zhangjx - * @param 泛型 + * @param Groupid的泛型 + * @param Message的泛型 */ -public abstract class WebSocket { +public abstract class WebSocket { @Comment("消息不合法") public static final int RETCODE_SEND_ILLPACKET = 1 << 1; //2 @@ -72,7 +73,7 @@ public abstract class WebSocket { Serializable _sessionid; //不可能为空 - Serializable _groupid; //不可能为空 + G _groupid; //不可能为空 SocketAddress _remoteAddress;//不可能为空 @@ -129,6 +130,15 @@ public abstract class WebSocket { * @return 0表示成功, 非0表示错误码 */ public final CompletableFuture send(Object message, boolean last) { + if (message instanceof CompletableFuture) { + return ((CompletableFuture) message).thenCompose((json) -> { + if (json == null || json instanceof CharSequence || json instanceof byte[]) { + return sendPacket(new WebSocketPacket((Serializable) json, last)); + } else { + return sendPacket(new WebSocketPacket(_jsonConvert, json, last)); + } + }); + } if (message == null || message instanceof CharSequence || message instanceof byte[]) { return sendPacket(new WebSocketPacket((Serializable) message, last)); } else { @@ -145,7 +155,7 @@ public abstract class WebSocket { * @return 0表示成功, 非0表示错误码 */ public final CompletableFuture send(JsonConvert convert, Object message) { - return sendPacket(new WebSocketPacket(convert == null ? _jsonConvert : convert, message, true)); + return send(convert, message, true); } /** @@ -158,6 +168,9 @@ public abstract class WebSocket { * @return 0表示成功, 非0表示错误码 */ public final CompletableFuture send(JsonConvert convert, Object message, boolean last) { + if (message instanceof CompletableFuture) { + return ((CompletableFuture) message).thenCompose((json) -> sendPacket(new WebSocketPacket(convert == null ? _jsonConvert : convert, json, last))); + } return sendPacket(new WebSocketPacket(convert == null ? _jsonConvert : convert, message, last)); } @@ -178,67 +191,70 @@ public abstract class WebSocket { /** * 给指定groupid的WebSocketGroup下所有WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息 * - * @param groupid groupid - * @param message 不可为空 + * @param message 不可为空 + * @param groupids Serializable[] * * @return 为0表示成功, 其他值表示异常 */ - public final CompletableFuture sendEachMessage(Serializable groupid, Object message) { - return sendEachMessage(groupid, message, true); + public final CompletableFuture sendEachMessage(Object message, G... groupids) { + return sendEachMessage(message, true, groupids); } /** * 给指定groupid的WebSocketGroup下所有WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息 * - * @param groupid groupid - * @param message 不可为空 - * @param last 是否最后一条 + * @param message 不可为空 + * @param last 是否最后一条 + * @param groupids Serializable[] * * @return 为0表示成功, 其他值表示异常 */ - public final CompletableFuture sendEachMessage(Serializable groupid, Object message, boolean last) { - return sendMessage(groupid, false, message, last); + public final CompletableFuture sendEachMessage(Object message, boolean last, G... groupids) { + return sendMessage(false, message, last, groupids); } /** * 给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息 * - * @param groupid groupid - * @param message 不可为空 + * @param message 不可为空 + * @param groupids Serializable[] * * @return 为0表示成功, 其他值表示异常 */ - public final CompletableFuture sendRecentMessage(Serializable groupid, Object message) { - return sendMessage(groupid, true, message, true); + public final CompletableFuture sendRecentMessage(Object message, G... groupids) { + return sendMessage(true, message, true, groupids); } /** * 给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息 * - * @param groupid groupid - * @param message 不可为空 - * @param last 是否最后一条 + * @param groupids Serializable[] + * @param message 不可为空 + * @param last 是否最后一条 * * @return 为0表示成功, 其他值表示异常 */ - public final CompletableFuture sendRecentMessage(Serializable groupid, Object message, boolean last) { - return sendMessage(groupid, true, message, last); + public final CompletableFuture sendRecentMessage(Object message, boolean last, G... groupids) { + return sendMessage(true, message, last, groupids); } /** * 给指定groupid的WebSocketGroup下WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息 * - * @param groupid groupid - * @param recent 是否只发最近接入的WebSocket - * @param message 不可为空 - * @param last 是否最后一条 + * @param recent 是否只发最近接入的WebSocket + * @param message 不可为空 + * @param last 是否最后一条 + * @param groupids Serializable[] * * @return 为0表示成功, 其他值表示异常 */ - public final CompletableFuture sendMessage(Serializable groupid, boolean recent, Object message, boolean last) { + public final CompletableFuture sendMessage(boolean recent, Object message, boolean last, G... groupids) { if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL); - CompletableFuture rs = _engine.node.sendMessage(groupid, recent, message, last); - if (_engine.finest) _engine.logger.finest("wsgroupid:" + groupid + " " + (recent ? "recent " : "") + "send websocket result is " + rs + " on " + this + " by message(" + _jsonConvert.convertTo(message) + ")"); + if (message instanceof CompletableFuture) { + return ((CompletableFuture) message).thenCompose((json) -> _engine.node.sendMessage(recent, json, last, groupids)); + } + CompletableFuture rs = _engine.node.sendMessage(recent, message, last, groupids); + if (_engine.finest) _engine.logger.finest("wsgroupid:" + Arrays.toString(groupids) + " " + (recent ? "recent " : "") + "send websocket result is " + rs + " on " + this + " by message(" + _jsonConvert.convertTo(message) + ")"); return rs; } @@ -283,7 +299,7 @@ public abstract class WebSocket { * * @return groupid */ - public final Serializable getGroupid() { + public final G getGroupid() { return _groupid; } @@ -327,11 +343,11 @@ public abstract class WebSocket { /** * 获取指定groupid的WebSocketGroup, 没有返回null * - * @param groupid groupid + * @param groupid Serializable * * @return WebSocketGroup */ - protected final WebSocketGroup getWebSocketGroup(Serializable groupid) { + protected final WebSocketGroup getWebSocketGroup(G groupid) { return _engine.getWebSocketGroup(groupid); } @@ -361,7 +377,7 @@ public abstract class WebSocket { * * @return groupid */ - protected abstract CompletableFuture createGroupid(); + protected abstract CompletableFuture createGroupid(); /** * 标记为WebSocketBinary才需要重写此方法 diff --git a/src/org/redkale/net/http/WebSocketEngine.java b/src/org/redkale/net/http/WebSocketEngine.java index 5a497eb89..83a13b482 100644 --- a/src/org/redkale/net/http/WebSocketEngine.java +++ b/src/org/redkale/net/http/WebSocketEngine.java @@ -11,6 +11,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.logging.*; +import static org.redkale.net.http.WebSocket.RETCODE_GROUP_EMPTY; import org.redkale.util.*; /** @@ -97,6 +98,16 @@ public final class WebSocketEngine { } } + CompletableFuture sendMessage(final boolean recent, final Object message, final boolean last, final Serializable... groupids) { + CompletableFuture future = null; + for (Serializable groupid : groupids) { + WebSocketGroup group = getWebSocketGroup(groupid); + if (group == null) continue; + future = future == null ? group.send(recent, message, last) : future.thenCombine(group.send(recent, message, last), (a, b) -> a | b); + } + return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; + } + Collection getWebSocketGroups() { return containers.values(); } diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index 3660ce5f8..b390da0ba 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -61,7 +61,7 @@ public abstract class WebSocketNode { protected abstract CompletableFuture> getWebSocketAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid); - protected abstract CompletableFuture sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid, boolean recent, Object message, boolean last); + protected abstract CompletableFuture sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, boolean recent, Object message, boolean last, Serializable groupid); protected abstract CompletableFuture connect(Serializable groupid, InetSocketAddress addr); @@ -73,7 +73,7 @@ public abstract class WebSocketNode { return connect(groupid, localSncpAddress); } - final CompletableFuture disconnect(Serializable groupid) { + final CompletableFuture disconnect(final Serializable groupid) { if (finest) logger.finest(localSncpAddress + " receive websocket disconnect event (" + groupid + " on " + this.localEngine.getEngineid() + ")."); return disconnect(groupid, localSncpAddress); } @@ -138,39 +138,52 @@ public abstract class WebSocketNode { } //-------------------------------------------------------------------------------- - public final CompletableFuture sendEachMessage(Serializable groupid, Object message) { - return sendMessage(groupid, false, message, true); + public final CompletableFuture sendEachMessage(Serializable groupid, Object message, final Serializable... groupids) { + return sendMessage(false, message, true, groupids); } - public final CompletableFuture sendEachMessage(Serializable groupid, Object message, boolean last) { - return sendMessage(groupid, false, message, last); + public final CompletableFuture sendEachMessage(Serializable groupid, Object message, boolean last, final Serializable... groupids) { + return sendMessage(false, message, last, groupids); } - public final CompletableFuture sendRecentMessage(Serializable groupid, Object message) { - return sendMessage(groupid, true, message, true); + public final CompletableFuture sendRecentMessage(Serializable groupid, Object message, final Serializable... groupids) { + return sendMessage(true, message, true, groupids); } - public final CompletableFuture sendRecentMessage(Serializable groupid, Object message, boolean last) { - return sendMessage(groupid, true, message, last); + public final CompletableFuture sendRecentMessage(Serializable groupid, Object message, boolean last, final Serializable... groupids) { + return sendMessage(true, message, last, groupids); } - public final CompletableFuture sendMessage(Serializable groupid, boolean recent, Object message) { - return sendMessage(groupid, recent, message, true); + public final CompletableFuture sendMessage(Serializable groupid, boolean recent, Object message, final Serializable... groupids) { + return sendMessage(recent, message, true, groupids); } /** * 向指定用户发送消息,先发送本地连接,再发送远程连接
* 如果当前WebSocketNode是远程模式,此方法只发送远程连接 * - * @param groupid String - * @param recent 是否只发送给最近接入的WebSocket节点 - * @param message 消息内容 - * @param last 是否最后一条 + * @param groupids Serializable[] + * @param recent 是否只发送给最近接入的WebSocket节点 + * @param message 消息内容 + * @param last 是否最后一条 * * @return 为0表示成功, 其他值表示异常 */ //最近连接发送逻辑还没有理清楚 - public final CompletableFuture sendMessage(final Serializable groupid, final boolean recent, final Object message, final boolean last) { + public final CompletableFuture sendMessage(final boolean recent, final Object message, final boolean last, final Serializable... groupids) { + if (groupids == null || groupids.length < 1) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); + if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式 + return this.localEngine.sendMessage(recent, message, last, groupids); + } + CompletableFuture future = null; + for (Serializable groupid : groupids) { + future = future == null ? sendOneMessage(recent, message, last, groupid) + : future.thenCombine(sendOneMessage(recent, message, last, groupid), (a, b) -> a | b); + } + return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; + } + + private CompletableFuture sendOneMessage(final boolean recent, final Object message, final boolean last, final Serializable groupid) { if (finest) logger.finest("websocket want send message {groupid:" + groupid + ", content:'" + message + "'} from locale node to locale engine"); CompletableFuture localFuture = null; final WebSocketGroup group = this.localEngine == null ? null : this.localEngine.getWebSocketGroup(groupid); @@ -192,12 +205,11 @@ public abstract class WebSocketNode { CompletableFuture future = null; for (InetSocketAddress addr : addrs) { if (addr == null || addr.equals(localSncpAddress)) continue; - future = future == null ? remoteNode.sendMessage(addr, groupid, recent, message, last) - : future.thenCombine(remoteNode.sendMessage(addr, groupid, recent, message, last), (a, b) -> a | b); + future = future == null ? remoteNode.sendMessage(addr, recent, message, last, groupid) + : future.thenCombine(remoteNode.sendMessage(addr, recent, message, last, groupid), (a, b) -> a | b); } return future == null ? CompletableFuture.completedFuture(0) : future; }); return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b); } - } diff --git a/src/org/redkale/net/http/WebSocketServlet.java b/src/org/redkale/net/http/WebSocketServlet.java index 459918d24..4b0088a18 100644 --- a/src/org/redkale/net/http/WebSocketServlet.java +++ b/src/org/redkale/net/http/WebSocketServlet.java @@ -191,7 +191,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl return null; } - protected abstract WebSocket createWebSocket(); + protected abstract WebSocket createWebSocket(); private static MessageDigest getMessageDigest() { try { diff --git a/src/org/redkale/service/WebSocketNodeService.java b/src/org/redkale/service/WebSocketNodeService.java index df1dbe145..31d5dfdf0 100644 --- a/src/org/redkale/service/WebSocketNodeService.java +++ b/src/org/redkale/service/WebSocketNodeService.java @@ -47,7 +47,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service { } @Override - public CompletableFuture sendMessage(@RpcTargetAddress InetSocketAddress addr, Serializable groupid, boolean recent, Object message, boolean last) { + public CompletableFuture sendMessage(@RpcTargetAddress InetSocketAddress addr, boolean recent, Object message, boolean last, Serializable groupid) { if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); final WebSocketGroup group = this.localEngine.getWebSocketGroup(groupid); if (group == null || group.isEmpty()) { diff --git a/test/org/redkale/test/websocket/ChatWebSocketServlet.java b/test/org/redkale/test/websocket/ChatWebSocketServlet.java index cff4f8605..085fd6ca5 100644 --- a/test/org/redkale/test/websocket/ChatWebSocketServlet.java +++ b/test/org/redkale/test/websocket/ChatWebSocketServlet.java @@ -8,7 +8,6 @@ package org.redkale.test.websocket; import org.redkale.net.http.WebServlet; import org.redkale.net.http.WebSocketServlet; import org.redkale.net.http.WebSocket; -import java.io.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.*; import org.redkale.convert.json.JsonConvert; @@ -51,12 +50,12 @@ public class ChatWebSocketServlet extends WebSocketServlet { } @Override - protected WebSocket createWebSocket() { + protected WebSocket createWebSocket() { - return new WebSocket() { + return new WebSocket() { @Override - public void onMessage(ChatMessage message, boolean last) { + public void onMessage(ChatMessage message, boolean last) { icounter.incrementAndGet(); counter.incrementAndGet(); if (debug) System.out.println("收到消息: " + message); @@ -64,8 +63,8 @@ public class ChatWebSocketServlet extends WebSocketServlet { } @Override - protected CompletableFuture createGroupid() { - return CompletableFuture.completedFuture("2"); + protected CompletableFuture createGroupid() { + return CompletableFuture.completedFuture("2"); } };