From 6cd232efd23e54aff2153e579bea11996ec1d186 Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Sun, 28 May 2017 10:19:56 +0800 Subject: [PATCH] --- src/org/redkale/net/http/Rest.java | 3 + src/org/redkale/net/http/RestWebSocket.java | 7 + src/org/redkale/net/http/WebSocket.java | 180 ++++++------------ src/org/redkale/net/http/WebSocketEngine.java | 161 +++++++++++----- src/org/redkale/net/http/WebSocketGroup.java | 150 --------------- src/org/redkale/net/http/WebSocketNode.java | 144 +++++--------- src/org/redkale/net/http/WebSocketRunner.java | 1 - .../redkale/net/http/WebSocketServlet.java | 23 ++- .../redkale/service/WebSocketNodeService.java | 32 ++-- test/org/redkale/test/http/WebSocketDesc.java | 14 +- .../test/websocket/ChatWebSocketServlet.java | 4 +- .../test/websocket/VideoWebSocketServlet.java | 16 +- test/org/redkale/test/ws/ChatService.java | 2 +- test/org/redkale/test/ws/ChatWebSocket.java | 12 +- 14 files changed, 264 insertions(+), 485 deletions(-) delete mode 100644 src/org/redkale/net/http/WebSocketGroup.java diff --git a/src/org/redkale/net/http/Rest.java b/src/org/redkale/net/http/Rest.java index 110321edb..4384b9d3f 100644 --- a/src/org/redkale/net/http/Rest.java +++ b/src/org/redkale/net/http/Rest.java @@ -257,6 +257,9 @@ public final class Rest { mv.visitIntInsn(BIPUSH, rws.liveinterval()); } mv.visitFieldInsn(PUTFIELD, newDynName, "liveinterval", "I"); + mv.visitVarInsn(ALOAD, 0); + mv.visitInsn(rws.single() ? ICONST_1 : ICONST_0); + mv.visitFieldInsn(PUTFIELD, newDynName, "single", "Z"); mv.visitInsn(RETURN); mv.visitMaxs(2, 1); mv.visitEnd(); diff --git a/src/org/redkale/net/http/RestWebSocket.java b/src/org/redkale/net/http/RestWebSocket.java index 3ac3c2997..fc5b396a8 100644 --- a/src/org/redkale/net/http/RestWebSocket.java +++ b/src/org/redkale/net/http/RestWebSocket.java @@ -37,6 +37,13 @@ public @interface RestWebSocket { */ String catalog() default ""; + /** + * 是否单用户单连接, 默认单用户单连接 + * + * @return 是否单用户单连接 + */ + boolean single() default true; + /** * WebScoket服务器给客户端进行ping操作的间隔时间, 单位: 秒, 默认值:60秒 * diff --git a/src/org/redkale/net/http/WebSocket.java b/src/org/redkale/net/http/WebSocket.java index cee8cb4b0..5dc515b9b 100644 --- a/src/org/redkale/net/http/WebSocket.java +++ b/src/org/redkale/net/http/WebSocket.java @@ -10,6 +10,7 @@ import java.io.*; import java.net.*; import java.util.*; import java.util.concurrent.*; +import java.util.stream.Stream; import org.redkale.convert.json.JsonConvert; import org.redkale.net.*; import org.redkale.util.Comment; @@ -20,7 +21,7 @@ import org.redkale.util.Comment; * WebSocket 有两种模式: * 1) 普通模式: 协议上符合HTML5规范, 其流程顺序如下: * 1.1 onOpen 若返回null,视为WebSocket的连接不合法,强制关闭WebSocket连接;通常用于判断登录态。 - * 1.2 createGroupid 若返回null,视为WebSocket的连接不合法,强制关闭WebSocket连接;通常用于判断用户权限是否符合。 + * 1.2 createUserid 若返回null,视为WebSocket的连接不合法,强制关闭WebSocket连接;通常用于判断用户权限是否符合。 * 1.3 onConnected WebSocket成功连接后在准备接收数据前回调此方法。 * 1.4 onMessage/onFragment+ WebSocket接收到消息后回调此消息类方法。 * 1.5 onClose WebSocket被关闭后回调此方法。 @@ -28,7 +29,7 @@ import org.redkale.util.Comment; * * 2) 原始二进制模式: 此模式有别于HTML5规范,可以视为原始的TCP连接。通常用于音频视频通讯场景。其流程顺序如下: * 2.1 onOpen 若返回null,视为WebSocket的连接不合法,强制关闭WebSocket连接;通常用于判断登录态。 - * 2.2 createGroupid 若返回null,视为WebSocket的连接不合法,强制关闭WebSocket连接;通常用于判断用户权限是否符合。 + * 2.2 createWebSocketid 若返回null,视为WebSocket的连接不合法,强制关闭WebSocket连接;通常用于判断用户权限是否符合。 * 2.3 onRead WebSocket成功连接后回调此方法, 由此方法处理原始的TCP连接, 需要业务代码去控制WebSocket的关闭。 * 二进制模式下 以上方法都应该被重载。 * @@ -69,11 +70,9 @@ public abstract class WebSocket { WebSocketEngine _engine; //不可能为空 - WebSocketGroup _group; //不可能为空 - String _sessionid; //不可能为空 - G _groupid; //不可能为空 + G _userid; //不可能为空 SocketAddress _remoteAddress;//不可能为空 @@ -87,8 +86,6 @@ public abstract class WebSocket { private Map attributes = new HashMap<>(); //非线程安全 - protected long websocketid = Math.abs(System.nanoTime()); //唯一ID - protected WebSocket() { } @@ -187,131 +184,56 @@ public abstract class WebSocket { */ CompletableFuture sendPacket(WebSocketPacket packet) { CompletableFuture rs = this._runner.sendMessage(packet); - if (_engine.finest) _engine.logger.finest("wsgroupid:" + getGroupid() + " send websocket result is " + rs + " on " + this + " by message(" + packet + ")"); + if (_engine.finest) _engine.logger.finest("userid:" + userid() + " send websocket result is " + rs + " on " + this + " by message(" + packet + ")"); return rs == null ? CompletableFuture.completedFuture(RETCODE_WSOCKET_CLOSED) : rs; } //---------------------------------------------------------------- /** - * 给指定groupid的WebSocketGroup下所有WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息 + * 给指定userid的WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息 * - * @param message 不可为空 - * @param groupids Serializable[] + * @param message 不可为空 + * @param last 是否最后一条 + * @param userids Serializable[] * * @return 为0表示成功, 其他值表示异常 */ - public final CompletableFuture sendEachMessage(Object message, G... groupids) { - return sendEachMessage(message, true, groupids); - } - - /** - * 给指定groupid的WebSocketGroup下所有WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息 - * - * @param message 不可为空 - * @param last 是否最后一条 - * @param groupids Serializable[] - * - * @return 为0表示成功, 其他值表示异常 - */ - public final CompletableFuture sendEachMessage(Object message, boolean last, G... groupids) { - return sendMessage(false, message, last, groupids); - } - - /** - * 给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息 - * - * @param message 不可为空 - * @param groupids Serializable[] - * - * @return 为0表示成功, 其他值表示异常 - */ - public final CompletableFuture sendRecentMessage(Object message, G... groupids) { - return sendMessage(true, message, true, groupids); - } - - /** - * 给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息 - * - * @param groupids Serializable[] - * @param message 不可为空 - * @param last 是否最后一条 - * - * @return 为0表示成功, 其他值表示异常 - */ - public final CompletableFuture sendRecentMessage(Object message, boolean last, G... groupids) { - return sendMessage(true, message, last, groupids); - } - - /** - * 给指定groupid的WebSocketGroup下WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息 - * - * @param recent 是否只发最近接入的WebSocket - * @param message 不可为空 - * @param last 是否最后一条 - * @param groupids Serializable[] - * - * @return 为0表示成功, 其他值表示异常 - */ - public final CompletableFuture sendMessage(boolean recent, Object message, boolean last, G... groupids) { + public final CompletableFuture sendMessage(Object message, boolean last, G... userids) { if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL); if (message instanceof CompletableFuture) { - return ((CompletableFuture) message).thenCompose((json) -> _engine.node.sendMessage(recent, json, last, groupids)); + return ((CompletableFuture) message).thenCompose((json) -> _engine.node.sendMessage(json, last, userids)); } - CompletableFuture rs = _engine.node.sendMessage(recent, message, last, groupids); - if (_engine.finest) _engine.logger.finest("wsgroupid:" + Arrays.toString(groupids) + " " + (recent ? "recent " : "") + "send websocket result is " + rs + " on " + this + " by message(" + _jsonConvert.convertTo(message) + ")"); + CompletableFuture rs = _engine.node.sendMessage(message, last, userids); + if (_engine.finest) _engine.logger.finest("userids:" + Arrays.toString(userids) + " send websocket result is " + rs + " on " + this + " by message(" + _jsonConvert.convertTo(message) + ")"); return rs; } /** - * 广播消息, 给所有人的所有接入的WebSocket节点发消息 + * 广播消息, 给所有人发消息 * * @param message 消息内容 * * @return 为0表示成功, 其他值表示部分发送异常 */ - public final CompletableFuture broadcastEachMessage(final Object message) { - return broadcastMessage(false, message, true); - } - - /** - * 广播消息, 给所有人最近接入的WebSocket节点发消息 - * - * @param message 消息内容 - * - * @return 为0表示成功, 其他值表示部分发送异常 - */ - public final CompletableFuture broadcastRecentMessage(final Object message) { - return broadcastMessage(true, message, true); + public final CompletableFuture broadcastMessage(final Object message) { + return broadcastMessage(message, true); } /** * 广播消息, 给所有人发消息 * - * @param recent 是否只发送给最近接入的WebSocket节点 - * @param message 消息内容 - * - * @return 为0表示成功, 其他值表示部分发送异常 - */ - public final CompletableFuture broadcastMessage(final boolean recent, final Object message) { - return broadcastMessage(recent, message, true); - } - - /** - * 广播消息, 给所有人发消息 - * - * @param recent 是否只发送给最近接入的WebSocket节点 * @param message 消息内容 * @param last 是否最后一条 * * @return 为0表示成功, 其他值表示部分发送异常 */ - public final CompletableFuture broadcastMessage(final boolean recent, final Object message, final boolean last) { + public final CompletableFuture broadcastMessage(final Object message, final boolean last) { if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL); if (message instanceof CompletableFuture) { - return ((CompletableFuture) message).thenCompose((json) -> _engine.node.broadcastMessage(recent, json, last)); + return ((CompletableFuture) message).thenCompose((json) -> _engine.node.broadcastMessage(json, last)); } - CompletableFuture rs = _engine.node.broadcastMessage(recent, message, last); - if (_engine.finest) _engine.logger.finest("broadcast " + (recent ? "recent " : "") + "send websocket result is " + rs + " on " + this + " by message(" + _jsonConvert.convertTo(message) + ")"); + CompletableFuture rs = _engine.node.broadcastMessage(message, last); + if (_engine.finest) _engine.logger.finest("broadcast send websocket result is " + rs + " on " + this + " by message(" + _jsonConvert.convertTo(message) + ")"); return rs; } @@ -319,13 +241,13 @@ public abstract class WebSocket { * 获取用户在线的SNCP节点地址列表,不是分布式则返回元素数量为1,且元素值为null的列表
* InetSocketAddress 为 SNCP节点地址 * - * @param groupid Serializable + * @param userid Serializable * * @return 地址列表 */ - public CompletableFuture> getRpcNodeAddresses(final Serializable groupid) { + public CompletableFuture> getRpcNodeAddresses(final Serializable userid) { if (_engine.node == null) return CompletableFuture.completedFuture(null); - return _engine.node.getRpcNodeAddresses(groupid); + return _engine.node.getRpcNodeAddresses(userid); } /** @@ -333,13 +255,13 @@ public abstract class WebSocket { * Map.key 为 SNCP节点地址, 含值为null的key表示没有分布式 * Map.value 为 用户客户端的IP * - * @param groupid Serializable + * @param userid Serializable * * @return 地址集合 */ - public CompletableFuture>> getRpcNodeWebSocketAddresses(final Serializable groupid) { + public CompletableFuture>> getRpcNodeWebSocketAddresses(final Serializable userid) { if (_engine.node == null) return CompletableFuture.completedFuture(null); - return _engine.node.getRpcNodeWebSocketAddresses(groupid); + return _engine.node.getRpcNodeWebSocketAddresses(userid); } /** @@ -379,12 +301,12 @@ public abstract class WebSocket { } /** - * 获取当前WebSocket所属的groupid + * 获取当前WebSocket所属的userid * - * @return groupid + * @return userid */ - public final G getGroupid() { - return _groupid; + public final G userid() { + return _userid; } /** @@ -416,37 +338,41 @@ public abstract class WebSocket { //------------------------------------------------------------------- /** - * 获取当前WebSocket所属的WebSocketGroup, 不会为null + * 获取指定userid的WebSocket数组, 没有返回null
+ * 此方法用于单用户多连接模式 * - * @return WebSocketGroup + * @param userid Serializable + * + * @return WebSocket集合 */ - protected final WebSocketGroup getWebSocketGroup() { - return _group; + protected final Stream getWebSockets(G userid) { + return _engine.getWebSockets(userid); } - + /** - * 获取指定groupid的WebSocketGroup, 没有返回null + * 获取指定userid的WebSocket数组, 没有返回null
+ * 此方法用于单用户单连接模式 * - * @param groupid Serializable + * @param userid Serializable * - * @return WebSocketGroup + * @return WebSocket */ - protected final WebSocketGroup getWebSocketGroup(G groupid) { - return _engine.getWebSocketGroup(groupid); + protected final WebSocket findWebSocket(G userid) { + return _engine.findWebSocket(userid); } - + /** - * 获取当前进程节点所有在线的WebSocketGroup + * 获取当前进程节点所有在线的WebSocket * * @return WebSocketGroup列表 */ - protected final Collection getWebSocketGroups() { - return _engine.getWebSocketGroups(); + protected final Collection getWebSockets() { + return _engine.getWebSockets(); } //------------------------------------------------------------------- /** - * 返回sessionid, null表示连接不合法或异常,默认实现是request.getSessionid(true),通常需要重写该方法 + * 返回sessionid, null表示连接不合法或异常,默认实现是request.sessionid(true),通常需要重写该方法 * * @param request HttpRequest * @@ -457,11 +383,11 @@ public abstract class WebSocket { } /** - * 创建groupid, null表示异常, 必须实现该方法, 通常为用户ID为groupid + * 创建userid, null表示异常, 必须实现该方法 * - * @return groupid + * @return userid */ - protected abstract CompletableFuture createGroupid(); + protected abstract CompletableFuture createUserid(); /** * 标记为WebSocketBinary才需要重写此方法 @@ -538,6 +464,6 @@ public abstract class WebSocket { @Override public String toString() { - return this.websocketid + "@" + _remoteAddr; + return this.userid() + "@" + _remoteAddr; } } diff --git a/src/org/redkale/net/http/WebSocketEngine.java b/src/org/redkale/net/http/WebSocketEngine.java index 3d6678a8f..6f8de451a 100644 --- a/src/org/redkale/net/http/WebSocketEngine.java +++ b/src/org/redkale/net/http/WebSocketEngine.java @@ -11,6 +11,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.logging.*; +import java.util.stream.*; import org.redkale.convert.json.JsonConvert; import static org.redkale.net.http.WebSocket.RETCODE_GROUP_EMPTY; import org.redkale.util.*; @@ -42,8 +43,13 @@ public final class WebSocketEngine { //JsonConvert protected final JsonConvert convert; - //在线用户ID对应的WebSocket组,当WebSocketGroup内没有WebSocket会从containers删掉 - private final Map containers = new ConcurrentHashMap<>(); + protected final boolean single; //是否单用户单连接 + + //在线用户ID对应的WebSocket组,用于单用户单连接模式 + private final Map websockets = new ConcurrentHashMap<>(); + + //在线用户ID对应的WebSocket组,用于单用户多连接模式 + private final Map> websockets2 = new ConcurrentHashMap<>(); //用于PING的定时器 private ScheduledThreadPoolExecutor scheduler; @@ -56,8 +62,9 @@ public final class WebSocketEngine { private int liveinterval; - protected WebSocketEngine(String engineid, HttpContext context, int liveinterval, WebSocketNode node, Logger logger) { + protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveinterval, WebSocketNode node, Logger logger) { this.engineid = engineid; + this.single = single; this.context = context; this.convert = context.getJsonConvert(); this.node = node; @@ -78,7 +85,7 @@ public final class WebSocketEngine { }); long delay = (interval - System.currentTimeMillis() / 1000 % interval) + index * 5; scheduler.scheduleWithFixedDelay(() -> { - getWebSocketGroups().stream().forEach(x -> x.sendEachPing()); + getWebSockets().forEach(x -> x.sendPing()); }, delay, interval, TimeUnit.SECONDS); if (finest) logger.finest(this.getClass().getSimpleName() + "(" + engineid + ")" + " start keeplive(delay:" + delay + ", interval:" + interval + "s) scheduler executor"); } @@ -87,96 +94,154 @@ public final class WebSocketEngine { if (scheduler != null) scheduler.shutdownNow(); } - void add(WebSocket socket) { //非线程安全, 在常规场景中无需锁 - WebSocketGroup group = containers.get(socket._groupid); - if (group == null) { - group = new WebSocketGroup(context, socket._groupid); - containers.putIfAbsent(socket._groupid, group); - if (node != null) node.connect(socket._groupid); + void add(WebSocket socket) { + if (single) { + websockets.put(socket._userid, socket); + } else { //非线程安全, 在常规场景中无需锁 + List list = websockets2.get(socket._userid); + if (list == null) { + list = new CopyOnWriteArrayList<>(); + websockets2.put(socket._userid, list); + } + list.add(socket); } - group.add(socket); + if (node != null) node.connect(socket._userid); } - void remove(WebSocket socket) { //非线程安全, 在常规场景中无需锁 - final WebSocketGroup group = containers.get(socket._groupid); - if (group == null) { - if (node != null) node.disconnect(socket._groupid); - return; - } - group.remove(socket); - if (group.isEmpty()) { - containers.remove(socket._groupid); - if (node != null) node.disconnect(socket._groupid); + void remove(WebSocket socket) { + Serializable userid = socket._userid; + if (single) { + websockets.remove(userid); + if (node != null) node.disconnect(userid); + } else { //非线程安全, 在常规场景中无需锁 + List list = websockets2.get(userid); + if (list != null) { + list.remove(socket); + if (list.isEmpty()) { + websockets2.remove(userid); + if (node != null) node.disconnect(userid); + } + } } } - public CompletableFuture broadcastMessage(final boolean recent, final Object message, final boolean last) { + public CompletableFuture broadcastMessage(final Object message, final boolean last) { if (message instanceof CompletableFuture) { - return ((CompletableFuture) message).thenCompose((json) -> broadcastMessage(recent, json, last)); + return ((CompletableFuture) message).thenCompose((json) -> broadcastMessage(json, last)); } - final Collection groups = getWebSocketGroups(); - final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null) && groups.size() > 1; + final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null); if (more) { final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message : ((message == null || message instanceof CharSequence || message instanceof byte[]) ? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.convert, message, last)); packet.setSendBuffers(packet.encode(context.getBufferSupplier())); CompletableFuture future = null; - for (WebSocketGroup group : groups) { - if (group == null) continue; - future = future == null ? group.send(recent, message, last) : future.thenCombine(group.send(recent, message, last), (a, b) -> a | b); + if (single) { + for (WebSocket websocket : websockets.values()) { + future = future == null ? websocket.send(message, last) : future.thenCombine(websocket.send(message, last), (a, b) -> a | (Integer) b); + } + } else { + for (List list : websockets2.values()) { + for (WebSocket websocket : list) { + future = future == null ? websocket.send(message, last) : future.thenCombine(websocket.send(message, last), (a, b) -> a | (Integer) b); + } + } } if (future != null) future = future.whenComplete((rs, ex) -> context.offerBuffer(packet.sendBuffers)); return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; } else { CompletableFuture future = null; - for (WebSocketGroup group : groups) { - if (group == null) continue; - future = future == null ? group.send(recent, message, last) : future.thenCombine(group.send(recent, message, last), (a, b) -> a | b); + if (single) { + for (WebSocket websocket : websockets.values()) { + future = future == null ? websocket.send(message, last) : future.thenCombine(websocket.send(message, last), (a, b) -> a | (Integer) b); + } + } else { + for (List list : websockets2.values()) { + for (WebSocket websocket : list) { + future = future == null ? websocket.send(message, last) : future.thenCombine(websocket.send(message, last), (a, b) -> a | (Integer) b); + } + } } return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; } } - CompletableFuture sendMessage(final boolean recent, final Object message, final boolean last, final Serializable... groupids) { + public CompletableFuture sendMessage(final Object message, final boolean last, final Serializable... userids) { if (message instanceof CompletableFuture) { - return ((CompletableFuture) message).thenCompose((json) -> sendMessage(recent, json, last, groupids)); + return ((CompletableFuture) message).thenCompose((json) -> sendMessage(json, last, userids)); } - final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null) && groupids.length > 1; + final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null) && userids.length > 1; if (more) { final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message : ((message == null || message instanceof CharSequence || message instanceof byte[]) ? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.convert, message, last)); packet.setSendBuffers(packet.encode(context.getBufferSupplier())); CompletableFuture future = null; - for (Serializable groupid : groupids) { - WebSocketGroup group = getWebSocketGroup(groupid); - if (group == null) continue; - future = future == null ? group.send(recent, message, last) : future.thenCombine(group.send(recent, message, last), (a, b) -> a | b); + if (single) { + for (Serializable userid : userids) { + WebSocket websocket = websockets.get(userid); + if (websocket == null) continue; + future = future == null ? websocket.send(message, last) : future.thenCombine(websocket.send(message, last), (a, b) -> a | (Integer) b); + } + } else { + for (Serializable userid : userids) { + List list = websockets2.get(userid); + if (list == null) continue; + for (WebSocket websocket : list) { + future = future == null ? websocket.send(message, last) : future.thenCombine(websocket.send(message, last), (a, b) -> a | (Integer) b); + } + } } if (future != null) future = future.whenComplete((rs, ex) -> context.offerBuffer(packet.sendBuffers)); return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; } else { CompletableFuture future = null; - for (Serializable groupid : groupids) { - WebSocketGroup group = getWebSocketGroup(groupid); - if (group == null) continue; - future = future == null ? group.send(recent, message, last) : future.thenCombine(group.send(recent, message, last), (a, b) -> a | b); + if (single) { + for (Serializable userid : userids) { + WebSocket websocket = websockets.get(userid); + if (websocket == null) continue; + future = future == null ? websocket.send(message, last) : future.thenCombine(websocket.send(message, last), (a, b) -> a | (Integer) b); + } + } else { + for (Serializable userid : userids) { + List list = websockets2.get(userid); + if (list == null) continue; + for (WebSocket websocket : list) { + future = future == null ? websocket.send(message, last) : future.thenCombine(websocket.send(message, last), (a, b) -> a | (Integer) b); + } + } } return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; } } - Collection getWebSocketGroups() { - return containers.values(); + Collection getWebSockets() { + if (single) return websockets.values(); + List list = new ArrayList<>(); + websockets2.values().forEach(x -> list.addAll(x)); + return list; } - public WebSocketGroup getWebSocketGroup(Serializable groupid) { - return containers.get(groupid); + //适用于单用户单连接模式 + public WebSocket findWebSocket(Serializable userid) { + if (single) return websockets.get(userid); + List list = websockets2.get(userid); + return (list == null || list.isEmpty()) ? null : list.get(list.size() - 1); } - public boolean existsWebSocketGroup(Serializable groupid) { - return containers.containsKey(groupid); + //适用于单用户多连接模式 + public Stream getWebSockets(Serializable userid) { + if (single) { + WebSocket websocket = websockets.get(userid); + return websocket == null ? Stream.empty() : Stream.of(websocket); + } else { + List list = websockets2.get(userid); + return list == null ? Stream.empty() : list.stream(); + } + } + + public boolean existsWebSocket(Serializable userid) { + return single ? websockets.containsKey(userid) : websockets2.containsKey(userid); } public String getEngineid() { diff --git a/src/org/redkale/net/http/WebSocketGroup.java b/src/org/redkale/net/http/WebSocketGroup.java deleted file mode 100644 index c58f4e88d..000000000 --- a/src/org/redkale/net/http/WebSocketGroup.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package org.redkale.net.http; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.stream.Stream; - -/** - * - *

- * 详情见: https://redkale.org - * - * @author zhangjx - */ -public final class WebSocketGroup { - - private final Serializable groupid; - - private final HttpContext context; - - private WebSocket recentWebSocket; - - private final List list = new CopyOnWriteArrayList<>(); - - private final Map attributes = new HashMap<>(); - - WebSocketGroup(HttpContext context, Serializable groupid) { - this.context = context; - this.groupid = groupid; - } - - public Serializable getGroupid() { - return groupid; - } - - public Stream getWebSockets() { - return list.stream(); - } - - void remove(WebSocket socket) { - list.remove(socket); - } - - void add(WebSocket socket) { - socket._group = this; - this.recentWebSocket = socket; - list.add(socket); - } - - void setRecentWebSocket(WebSocket socket) { - this.recentWebSocket = socket; - } - - public final boolean isEmpty() { - return list.isEmpty(); - } - - public final int size() { - return list.size(); - } - - /** - * 最近发送消息的WebSocket - * - * @return WebSocket - */ - public final WebSocket getRecentWebSocket() { - return recentWebSocket; - } - - @SuppressWarnings("unchecked") - public final T getAttribute(String name) { - return (T) attributes.get(name); - } - - public final void removeAttribute(String name) { - attributes.remove(name); - } - - public final void setAttribute(String name, Object value) { - attributes.put(name, value); - } - - public final CompletableFuture send(boolean recent, Object message, boolean last) { - if (recent) { - return recentWebSocket.send(message, last); - } else { - return sendEach(message, last); - } - } - - final CompletableFuture send(boolean recent, final WebSocketPacket packet) { - if (recent) { - return recentWebSocket.send(packet); - } else { - return sendEach(packet); - } - } - - public final CompletableFuture sendEach(Object message) { - return sendEach(message, true); - } - - public final CompletableFuture sendEach(final WebSocketPacket packet) { - CompletableFuture future = null; - final boolean more = packet.sendBuffers == null && list.size() > 1; - if (more) packet.setSendBuffers(packet.encode(context.getBufferSupplier())); - for (WebSocket s : list) { - future = future == null ? s.sendPacket(packet) : future.thenCombine(s.sendPacket(packet), (a, b) -> a | (Integer) b); - } - if (more && future != null) future = future.whenComplete((rs, ex) -> context.offerBuffer(packet.sendBuffers)); - return future == null ? CompletableFuture.completedFuture(0) : future; - } - - public final CompletableFuture sendEachPing() { - return sendEach(WebSocketPacket.DEFAULT_PING_PACKET); - } - - public final CompletableFuture sendRecent(Object message) { - return sendRecent(message, true); - } - - public final CompletableFuture sendRecent(WebSocketPacket packet) { - return recentWebSocket.send(packet); - } - - public final CompletableFuture sendEach(Object message, boolean last) { - if (message instanceof WebSocketPacket) { - return sendEach((WebSocketPacket) message); - } else if (message != null && !(message instanceof byte[]) && !(message instanceof CharSequence)) { - message = recentWebSocket._jsonConvert.convertTo(message); - } - return sendEach(new WebSocketPacket((Serializable) message, last)); - } - - public final CompletableFuture sendRecent(Object message, boolean last) { - return recentWebSocket.send(message, last); - } - - @Override - public String toString() { - return "{groupid: " + groupid + ", list.size: " + (list == null ? -1 : list.size()) + "}"; - } - -} diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index cf43268a8..7d04227aa 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -56,29 +56,29 @@ public abstract class WebSocketNode { public final void postDestroy(AnyValue conf) { if (this.localEngine == null) return; //关掉所有本地本地WebSocket - this.localEngine.getWebSocketGroups().forEach(g -> disconnect(g.getGroupid())); + this.localEngine.getWebSockets().forEach(g -> disconnect(g.userid())); if (sncpNodeAddresses != null && localSncpAddress != null) sncpNodeAddresses.removeSetItem("redkale_sncpnodes", localSncpAddress); } - protected abstract CompletableFuture> getWebSocketAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid); + protected abstract CompletableFuture> getWebSocketAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable userid); - protected abstract CompletableFuture sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, boolean recent, Object message, boolean last, Serializable groupid); + protected abstract CompletableFuture sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last, Serializable userid); - protected abstract CompletableFuture broadcastMessage(@RpcTargetAddress InetSocketAddress targetAddress, boolean recent, Object message, boolean last); + protected abstract CompletableFuture broadcastMessage(@RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last); - protected abstract CompletableFuture connect(Serializable groupid, InetSocketAddress addr); + protected abstract CompletableFuture connect(Serializable userid, InetSocketAddress addr); - protected abstract CompletableFuture disconnect(Serializable groupid, InetSocketAddress addr); + protected abstract CompletableFuture disconnect(Serializable userid, InetSocketAddress addr); //-------------------------------------------------------------------------------- - final CompletableFuture connect(final Serializable groupid) { - if (finest) logger.finest(localSncpAddress + " receive websocket connect event (" + groupid + " on " + this.localEngine.getEngineid() + ")."); - return connect(groupid, localSncpAddress); + final CompletableFuture connect(final Serializable userid) { + if (finest) logger.finest(localSncpAddress + " receive websocket connect event (" + userid + " on " + this.localEngine.getEngineid() + ")."); + return connect(userid, localSncpAddress); } - final CompletableFuture disconnect(final Serializable groupid) { - if (finest) logger.finest(localSncpAddress + " receive websocket disconnect event (" + groupid + " on " + this.localEngine.getEngineid() + ")."); - return disconnect(groupid, localSncpAddress); + final CompletableFuture disconnect(final Serializable userid) { + if (finest) logger.finest(localSncpAddress + " receive websocket disconnect event (" + userid + " on " + this.localEngine.getEngineid() + ")."); + return disconnect(userid, localSncpAddress); } //-------------------------------------------------------------------------------- @@ -87,14 +87,14 @@ public abstract class WebSocketNode { * 该方法仅供内部调用 * * @param targetAddress InetSocketAddress - * @param groupid Serializable + * @param userid Serializable * * @return 客户端地址列表 */ - protected CompletableFuture> remoteWebSocketAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid) { + protected CompletableFuture> remoteWebSocketAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable userid) { if (remoteNode == null) return CompletableFuture.completedFuture(null); try { - return remoteNode.getWebSocketAddresses(targetAddress, groupid); + return remoteNode.getWebSocketAddresses(targetAddress, userid); } catch (Exception e) { logger.log(Level.WARNING, "remote " + targetAddress + " websocket getOnlineRemoteAddresses error", e); return CompletableFuture.completedFuture(null); @@ -105,12 +105,12 @@ public abstract class WebSocketNode { * 获取用户在线的SNCP节点地址列表,不是分布式则返回元素数量为1,且元素值为null的列表
* InetSocketAddress 为 SNCP节点地址 * - * @param groupid Serializable + * @param userid Serializable * * @return 地址列表 */ - public CompletableFuture> getRpcNodeAddresses(final Serializable groupid) { - if (this.sncpNodeAddresses != null) return this.sncpNodeAddresses.getCollectionAsync(groupid); + public CompletableFuture> getRpcNodeAddresses(final Serializable userid) { + if (this.sncpNodeAddresses != null) return this.sncpNodeAddresses.getCollectionAsync(userid); List rs = new ArrayList<>(); rs.add(this.localSncpAddress); return CompletableFuture.completedFuture(rs); @@ -121,18 +121,18 @@ public abstract class WebSocketNode { * Map.key 为 SNCP节点地址, 含值为null的key表示没有分布式 * Map.value 为 用户客户端的IP * - * @param groupid Serializable + * @param userid Serializable * * @return 地址集合 */ - public CompletableFuture>> getRpcNodeWebSocketAddresses(final Serializable groupid) { - CompletableFuture> sncpFuture = getRpcNodeAddresses(groupid); + public CompletableFuture>> getRpcNodeWebSocketAddresses(final Serializable userid) { + CompletableFuture> sncpFuture = getRpcNodeAddresses(userid); return sncpFuture.thenCompose((Collection addrs) -> { - if (finest) logger.finest("websocket found groupid:" + groupid + " on " + addrs); + if (finest) logger.finest("websocket found userid:" + userid + " on " + addrs); if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(new HashMap<>()); CompletableFuture>> future = null; for (final InetSocketAddress nodeAddress : addrs) { - CompletableFuture>> mapFuture = getWebSocketAddresses(nodeAddress, groupid) + CompletableFuture>> mapFuture = getWebSocketAddresses(nodeAddress, userid) .thenCompose((List list) -> CompletableFuture.completedFuture(Utility.ofMap(nodeAddress, list))); future = future == null ? mapFuture : future.thenCombine(mapFuture, (a, b) -> Utility.merge(a, b)); } @@ -141,99 +141,58 @@ public abstract class WebSocketNode { } //-------------------------------------------------------------------------------- - public final CompletableFuture sendEachMessage(Serializable groupid, Object message, final Serializable... groupids) { - return sendMessage(false, message, true, groupids); - } - - public final CompletableFuture sendEachMessage(Serializable groupid, Object message, boolean last, final Serializable... groupids) { - return sendMessage(false, message, last, groupids); - } - - public final CompletableFuture sendRecentMessage(Serializable groupid, Object message, final Serializable... groupids) { - return sendMessage(true, message, true, groupids); - } - - public final CompletableFuture sendRecentMessage(Serializable groupid, Object message, boolean last, final Serializable... groupids) { - return sendMessage(true, message, last, groupids); - } - - public final CompletableFuture sendMessage(Serializable groupid, boolean recent, Object message, final Serializable... groupids) { - return sendMessage(recent, message, true, groupids); + public final CompletableFuture sendMessage(Object message, final Serializable... userids) { + return sendMessage(message, true, userids); } /** * 向指定用户发送消息,先发送本地连接,再发送远程连接
* 如果当前WebSocketNode是远程模式,此方法只发送远程连接 * - * @param groupids Serializable[] - * @param recent 是否只发送给最近接入的WebSocket节点 - * @param message 消息内容 - * @param last 是否最后一条 + * @param message 消息内容 + * @param last 是否最后一条 + * @param userids Serializable[] * * @return 为0表示成功, 其他值表示部分发送异常 */ //最近连接发送逻辑还没有理清楚 - public final CompletableFuture sendMessage(final boolean recent, final Object message, final boolean last, final Serializable... groupids) { - if (groupids == null || groupids.length < 1) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); + public final CompletableFuture sendMessage(final Object message, final boolean last, final Serializable... userids) { + if (userids == null || userids.length < 1) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式 - return this.localEngine.sendMessage(recent, message, last, groupids); + return this.localEngine.sendMessage(message, last, userids); } CompletableFuture future = null; - for (Serializable groupid : groupids) { - future = future == null ? sendOneMessage(recent, message, last, groupid) - : future.thenCombine(sendOneMessage(recent, message, last, groupid), (a, b) -> a | b); + for (Serializable userid : userids) { + future = future == null ? sendOneMessage(message, last, userid) + : future.thenCombine(sendOneMessage(message, last, userid), (a, b) -> a | b); } return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; } /** - * 广播消息, 给所有人的所有接入的WebSocket节点发消息 + * 广播消息, 给所有人发消息 * * @param message 消息内容 * * @return 为0表示成功, 其他值表示部分发送异常 */ - public final CompletableFuture broadcastEachMessage(final Object message) { - return broadcastMessage(false, message, true); - } - - /** - * 广播消息, 给所有人最近接入的WebSocket节点发消息 - * - * @param message 消息内容 - * - * @return 为0表示成功, 其他值表示部分发送异常 - */ - public final CompletableFuture broadcastRecentMessage(final Object message) { - return broadcastMessage(true, message, true); + public final CompletableFuture broadcastMessage(final Object message) { + return broadcastMessage(message, true); } /** * 广播消息, 给所有人发消息 * - * @param recent 是否只发送给最近接入的WebSocket节点 - * @param message 消息内容 - * - * @return 为0表示成功, 其他值表示部分发送异常 - */ - public final CompletableFuture broadcastMessage(final boolean recent, final Object message) { - return broadcastMessage(recent, message, true); - } - - /** - * 广播消息, 给所有人发消息 - * - * @param recent 是否只发送给最近接入的WebSocket节点 * @param message 消息内容 * @param last 是否最后一条 * * @return 为0表示成功, 其他值表示部分发送异常 */ - public final CompletableFuture broadcastMessage(final boolean recent, final Object message, final boolean last) { + public final CompletableFuture broadcastMessage(final Object message, final boolean last) { if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式 - return this.localEngine.broadcastMessage(recent, message, last); + return this.localEngine.broadcastMessage(message, last); } - CompletableFuture localFuture = this.localEngine == null ? null : this.localEngine.broadcastMessage(recent, message, last); + CompletableFuture localFuture = this.localEngine == null ? null : this.localEngine.broadcastMessage(message, last); CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync("redkale_sncpnodes"); CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { if (finest) logger.finest("websocket broadcast message on " + addrs); @@ -241,38 +200,33 @@ public abstract class WebSocketNode { CompletableFuture future = null; for (InetSocketAddress addr : addrs) { if (addr == null || addr.equals(localSncpAddress)) continue; - future = future == null ? remoteNode.broadcastMessage(addr, recent, message, last) - : future.thenCombine(remoteNode.broadcastMessage(addr, recent, message, last), (a, b) -> a | b); + future = future == null ? remoteNode.broadcastMessage(addr, message, last) + : future.thenCombine(remoteNode.broadcastMessage(addr, message, last), (a, b) -> a | b); } return future == null ? CompletableFuture.completedFuture(0) : future; }); return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b); } - private CompletableFuture sendOneMessage(final boolean recent, final Object message, final boolean last, final Serializable groupid) { - if (finest) logger.finest("websocket want send message {groupid:" + groupid + ", content:'" + message + "'} from locale node to locale engine"); + private CompletableFuture sendOneMessage(final Object message, final boolean last, final Serializable userid) { + if (finest) logger.finest("websocket want send message {userid:" + userid + ", content:'" + message + "'} from locale node to locale engine"); CompletableFuture localFuture = null; - final WebSocketGroup group = this.localEngine == null ? null : this.localEngine.getWebSocketGroup(groupid); - if (group != null) localFuture = group.send(recent, message, last); - if (recent && localFuture != null) { //已经给最近连接发送的消息 - if (finest) logger.finest("websocket want send recent message success"); - return localFuture; - } + if (this.localEngine != null) localFuture = localEngine.sendMessage(message, last, userid); if (this.sncpNodeAddresses == null || this.remoteNode == null) { if (finest) logger.finest("websocket remote node is null"); //没有CacheSource就不会有分布式节点 return localFuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localFuture; } //远程节点发送消息 - CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(groupid); + CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(userid); CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { - if (finest) logger.finest("websocket found groupid:" + groupid + " on " + addrs); + if (finest) logger.finest("websocket found userid:" + userid + " on " + addrs); if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0); CompletableFuture future = null; for (InetSocketAddress addr : addrs) { if (addr == null || addr.equals(localSncpAddress)) continue; - future = future == null ? remoteNode.sendMessage(addr, recent, message, last, groupid) - : future.thenCombine(remoteNode.sendMessage(addr, recent, message, last, groupid), (a, b) -> a | b); + future = future == null ? remoteNode.sendMessage(addr, message, last, userid) + : future.thenCombine(remoteNode.sendMessage(addr, message, last, userid), (a, b) -> a | b); } return future == null ? CompletableFuture.completedFuture(0) : future; }); diff --git a/src/org/redkale/net/http/WebSocketRunner.java b/src/org/redkale/net/http/WebSocketRunner.java index 519421af5..4256249a4 100644 --- a/src/org/redkale/net/http/WebSocketRunner.java +++ b/src/org/redkale/net/http/WebSocketRunner.java @@ -116,7 +116,6 @@ class WebSocketRunner implements Runnable { if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on decode WebSocketPacket, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds"); return; } - webSocket._group.setRecentWebSocket(webSocket); if (packet.type == FrameType.TEXT) { Object message = convert.convertFrom(webSocket._messageTextType, packet.receiveMasker, packet.receiveBuffers); diff --git a/src/org/redkale/net/http/WebSocketServlet.java b/src/org/redkale/net/http/WebSocketServlet.java index 34efa96c6..57dc21c4e 100644 --- a/src/org/redkale/net/http/WebSocketServlet.java +++ b/src/org/redkale/net/http/WebSocketServlet.java @@ -31,10 +31,7 @@ import org.redkale.util.*; * / \ * / \ * / \ - * WebSocketGroup1 WebSocketGroup2 - * / \ / \ - * / \ / \ - * WebSocket1 WebSocket2 WebSocket3 WebSocket4 + * WebSocket1 WebSocket2 * * * @@ -62,6 +59,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl protected Type messageTextType; //RestWebSocket时会被修改 + protected boolean single = true; //是否单用户单连接 + protected int liveinterval = DEFAILT_LIVEINTERVAL; @Resource @@ -98,7 +97,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl if (logger.isLoggable(Level.WARNING)) logger.warning("Not found WebSocketNode, create a default value for " + getClass().getName()); } //存在WebSocketServlet,则此WebSocketNode必须是本地模式Service - this.node.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", context, liveinterval, this.node, logger); + this.node.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", this.single, context, liveinterval, this.node, logger); this.node.init(conf); this.node.localEngine.init(conf); } @@ -163,19 +162,19 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl @Override public void completed(Integer result, Void attachment) { HttpContext context = response.getContext(); - CompletableFuture groupFuture = webSocket.createGroupid(); - if (groupFuture == null) { - if (debug) logger.finest("WebSocket connect abort, Create groupid abort. request = " + request); + CompletableFuture userFuture = webSocket.createUserid(); + if (userFuture == null) { + if (debug) logger.finest("WebSocket connect abort, Create userid abort. request = " + request); response.finish(true); return; } - groupFuture.whenComplete((groupid, ex2) -> { - if (groupid == null || ex2 != null) { - if (debug || ex2 != null) logger.log(ex2 == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Create groupid abort. request = " + request, ex2); + userFuture.whenComplete((userid, ex2) -> { + if (userid == null || ex2 != null) { + if (debug || ex2 != null) logger.log(ex2 == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Create userid abort. request = " + request, ex2); response.finish(true); return; } - webSocket._groupid = groupid; + webSocket._userid = userid; WebSocketServlet.this.node.localEngine.add(webSocket); WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel(), wsbinary); webSocket._runner = runner; diff --git a/src/org/redkale/service/WebSocketNodeService.java b/src/org/redkale/service/WebSocketNodeService.java index 6c6a34368..bf8b85c23 100644 --- a/src/org/redkale/service/WebSocketNodeService.java +++ b/src/org/redkale/service/WebSocketNodeService.java @@ -40,57 +40,51 @@ public class WebSocketNodeService extends WebSocketNode implements Service { if (this.localEngine == null) return CompletableFuture.completedFuture(new ArrayList<>()); return CompletableFuture.supplyAsync(() -> { final List rs = new ArrayList<>(); - final WebSocketGroup group = this.localEngine.getWebSocketGroup(groupid); - if (group != null) group.getWebSockets().forEach(x -> rs.add(x.getRemoteAddr())); + this.localEngine.getWebSockets(groupid).forEach(x -> rs.add(x.getRemoteAddr())); return rs; }); } @Override - public CompletableFuture sendMessage(@RpcTargetAddress InetSocketAddress addr, boolean recent, Object message, boolean last, Serializable groupid) { + public CompletableFuture sendMessage(@RpcTargetAddress InetSocketAddress addr, Object message, boolean last, Serializable userid) { if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); - final WebSocketGroup group = this.localEngine.getWebSocketGroup(groupid); - if (group == null || group.isEmpty()) { - if (finest) logger.finest("receive websocket message {engineid:'" + this.localEngine.getEngineid() + "', groupid:" + groupid + ", content:'" + message + "'} from " + addr + " but send result is " + RETCODE_GROUP_EMPTY); - return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); - } - return group.send(recent, message, last); + return this.localEngine.sendMessage(message, last, userid); } @Override - public CompletableFuture broadcastMessage(@RpcTargetAddress InetSocketAddress addr, boolean recent, Object message, boolean last) { + public CompletableFuture broadcastMessage(@RpcTargetAddress InetSocketAddress addr, Object message, boolean last) { if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); - return this.localEngine.broadcastMessage(recent, message, last); + return this.localEngine.broadcastMessage(message, last); } /** * 当用户连接到节点,需要更新到CacheSource * - * @param groupid String + * @param userid String * @param sncpAddr InetSocketAddress * * @return 无返回值 */ @Override - public CompletableFuture connect(Serializable groupid, InetSocketAddress sncpAddr) { - CompletableFuture future = sncpNodeAddresses.appendSetItemAsync(groupid, sncpAddr); + public CompletableFuture connect(Serializable userid, InetSocketAddress sncpAddr) { + CompletableFuture future = sncpNodeAddresses.appendSetItemAsync(userid, sncpAddr); future = future.thenAccept((a) -> sncpNodeAddresses.appendSetItemAsync("redkale_sncpnodes", sncpAddr)); - if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " connect from " + sncpAddr); + if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " connect from " + sncpAddr); return future; } /** * 当用户从一个节点断掉了所有的连接,需要从CacheSource中删除 * - * @param groupid String + * @param userid String * @param sncpAddr InetSocketAddress * * @return 无返回值 */ @Override - public CompletableFuture disconnect(Serializable groupid, InetSocketAddress sncpAddr) { - CompletableFuture future = sncpNodeAddresses.removeSetItemAsync(groupid, sncpAddr); - if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " disconnect from " + sncpAddr); + public CompletableFuture disconnect(Serializable userid, InetSocketAddress sncpAddr) { + CompletableFuture future = sncpNodeAddresses.removeSetItemAsync(userid, sncpAddr); + if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " disconnect from " + sncpAddr); return future; } } diff --git a/test/org/redkale/test/http/WebSocketDesc.java b/test/org/redkale/test/http/WebSocketDesc.java index 174cb3868..71aa26820 100644 --- a/test/org/redkale/test/http/WebSocketDesc.java +++ b/test/org/redkale/test/http/WebSocketDesc.java @@ -96,18 +96,6 @@ public interface WebSocketDesc { public void close(); - //获取当前WebSocket所属的WebSocketGroup, 不会为null - /* protected */ WebSocketGroup getWebSocketGroup(); - - - //获取指定groupid的WebSocketGroup, 没有返回null - /* protected */ WebSocketGroup getWebSocketGroup(Serializable groupid); - - - //获取当前进程节点所有在线的WebSocketGroup - /* protected */ Collection getWebSocketGroups(); - - //获取在线用户的节点地址列表 /* protected */ Collection getOnlineNodes(Serializable groupid); @@ -138,7 +126,7 @@ public interface WebSocketDesc { default void onPong(byte[] bytes) { } - + //接收二进制消息响应事件,可能会接收到二进制消息需要重写该方法 default void onMessage(byte[] bytes) { } diff --git a/test/org/redkale/test/websocket/ChatWebSocketServlet.java b/test/org/redkale/test/websocket/ChatWebSocketServlet.java index 085fd6ca5..93ec0cb19 100644 --- a/test/org/redkale/test/websocket/ChatWebSocketServlet.java +++ b/test/org/redkale/test/websocket/ChatWebSocketServlet.java @@ -59,11 +59,11 @@ public class ChatWebSocketServlet extends WebSocketServlet { icounter.incrementAndGet(); counter.incrementAndGet(); if (debug) System.out.println("收到消息: " + message); - super.getWebSocketGroup().getWebSockets().forEach(x -> x.send(message)); + super.getWebSockets().forEach(x -> x.send(message)); } @Override - protected CompletableFuture createGroupid() { + protected CompletableFuture createUserid() { return CompletableFuture.completedFuture("2"); } diff --git a/test/org/redkale/test/websocket/VideoWebSocketServlet.java b/test/org/redkale/test/websocket/VideoWebSocketServlet.java index a7aa602c7..9d1a298d1 100644 --- a/test/org/redkale/test/websocket/VideoWebSocketServlet.java +++ b/test/org/redkale/test/websocket/VideoWebSocketServlet.java @@ -80,31 +80,25 @@ public class VideoWebSocketServlet extends WebSocketServlet { } super.send(("{'type':'user_list','users':[" + sb + "]}").replace('\'', '"')); String msg = ("{'type':'discover_user','user':{'userid':'" + this.getSessionid() + "','username':'" + users.get(this.getSessionid()) + "'}}").replace('\'', '"'); - super.getWebSocketGroup().getWebSockets().filter(x -> x != this).forEach(x -> { - x.send(msg); - }); + super.broadcastMessage(msg); } } @Override public void onMessage(Object text, boolean last) { //System.out.println("接收到消息: " + text); - super.getWebSocketGroup().getWebSockets().filter(x -> x != this).forEach(x -> { - x.send(text); - }); + super.broadcastMessage(text, last); } @Override public void onClose(int code, String reason) { sessions.remove(this.getSessionid()); String msg = ("{'type':'remove_user','user':{'userid':'" + this.getSessionid() + "','username':'" + users.get(this.getSessionid()) + "'}}").replace('\'', '"'); - super.getWebSocketGroup().getWebSockets().filter(x -> x != this).forEach(x -> { - x.send(msg); - }); - } + super.broadcastMessage(msg); + } @Override - protected CompletableFuture createGroupid() { + protected CompletableFuture createUserid() { return CompletableFuture.completedFuture("2"); } }; diff --git a/test/org/redkale/test/ws/ChatService.java b/test/org/redkale/test/ws/ChatService.java index c71ede47a..90ac79620 100644 --- a/test/org/redkale/test/ws/ChatService.java +++ b/test/org/redkale/test/ws/ChatService.java @@ -21,7 +21,7 @@ public class ChatService implements Service { protected final AtomicInteger idcreator = new AtomicInteger(10000); - public int createGroupid() { + public int createUserid() { int v = idcreator.incrementAndGet(); setIdcreator(v); return v; diff --git a/test/org/redkale/test/ws/ChatWebSocket.java b/test/org/redkale/test/ws/ChatWebSocket.java index edd1b1926..98583e9ff 100644 --- a/test/org/redkale/test/ws/ChatWebSocket.java +++ b/test/org/redkale/test/ws/ChatWebSocket.java @@ -24,21 +24,21 @@ public class ChatWebSocket extends WebSocket { protected ChatService service; @Override - protected CompletableFuture createGroupid() { - return CompletableFuture.completedFuture(service.createGroupid()); + protected CompletableFuture createUserid() { + return CompletableFuture.completedFuture(service.createUserid()); } @RestOnMessage(name = "sendmessage") public void onChatMessage(ChatMessage message, Map extmap) { - message.fromuserid = getGroupid(); - message.fromusername = "用户" + getGroupid(); + message.fromuserid = userid(); + message.fromusername = "用户" + userid(); System.out.println("获取消息: message: " + message + ", map: " + extmap); - super.broadcastEachMessage(message); + super.broadcastMessage(message); } @RestOnMessage(name = "joinroom") public void onJoinRoom(int roomid) { - service.joinRoom(getGroupid(), roomid); + service.joinRoom(userid(), roomid); System.out.println("加入房间: roomid: " + roomid); }