From 33da94960cf6b5e9a55d73c199691676073afaaa Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Mon, 22 May 2017 13:05:50 +0800 Subject: [PATCH] --- src/org/redkale/net/http/WebSocket.java | 235 +++++------------- src/org/redkale/net/http/WebSocketNode.java | 92 ++----- src/org/redkale/net/http/WebSocketRunner.java | 4 +- .../test/websocket/ChatWebSocketServlet.java | 2 +- .../test/websocket/VideoWebSocketServlet.java | 2 +- 5 files changed, 82 insertions(+), 253 deletions(-) diff --git a/src/org/redkale/net/http/WebSocket.java b/src/org/redkale/net/http/WebSocket.java index c04d0b230..e5306ef96 100644 --- a/src/org/redkale/net/http/WebSocket.java +++ b/src/org/redkale/net/http/WebSocket.java @@ -94,67 +94,21 @@ public abstract class WebSocket { //---------------------------------------------------------------- public final CompletableFuture sendPing() { //if (_engine.finest) _engine.logger.finest(this + " on "+_engine.getEngineid()+" ping..."); - return send(WebSocketPacket.DEFAULT_PING_PACKET); + return sendPacket(WebSocketPacket.DEFAULT_PING_PACKET); } public final CompletableFuture sendPing(byte[] data) { - return send(new WebSocketPacket(FrameType.PING, data)); + return sendPacket(new WebSocketPacket(FrameType.PING, data)); } public final CompletableFuture sendPong(byte[] data) { - return send(new WebSocketPacket(FrameType.PONG, data)); + return sendPacket(new WebSocketPacket(FrameType.PONG, data)); } public final long getCreatetime() { return createtime; } - /** - * 给自身发送单一的文本消息 - * - * @param text 不可为空 - * - * @return 0表示成功, 非0表示错误码 - */ - public final CompletableFuture send(String text) { - return send(text, true); - } - - /** - * 给自身发送文本消息 - * - * @param text 不可为空 - * @param last 是否最后一条 - * - * @return 0表示成功, 非0表示错误码 - */ - public final CompletableFuture send(String text, boolean last) { - return sendPacket(new WebSocketPacket(text, last)); - } - - /** - * 给自身发送单一的二进制消息 - * - * @param data byte[] - * - * @return 0表示成功, 非0表示错误码 - */ - public final CompletableFuture send(byte[] data) { - return send(data, true); - } - - /** - * 给自身发送二进制消息 - * - * @param data 不可为空 - * @param last 是否最后一条 - * - * @return 0表示成功, 非0表示错误码 - */ - public final CompletableFuture send(byte[] data, boolean last) { - return sendPacket(new WebSocketPacket(data, last)); - } - /** * 给自身发送消息, 消息类型是String或byte[]或可JavaBean对象 * @@ -215,39 +169,14 @@ public abstract class WebSocket { * @return 0表示成功, 非0表示错误码 */ CompletableFuture sendPacket(WebSocketPacket packet) { - CompletableFuture rs = null; - if (this._runner != null) rs = this._runner.sendMessage(packet); + CompletableFuture rs = this._runner.sendMessage(packet); if (_engine.finest) _engine.logger.finest("wsgroupid:" + getGroupid() + " send websocket result is " + rs + " on " + this + " by message(" + packet + ")"); return rs == null ? CompletableFuture.completedFuture(RETCODE_WSOCKET_CLOSED) : rs; } //---------------------------------------------------------------- /** - * 给指定groupid的WebSocketGroup下所有WebSocket节点发送文本消息 - * - * @param groupid groupid - * @param text 不可为空 - * - * @return 为0表示成功, 其他值表示异常 - */ - public final CompletableFuture sendEachMessage(Serializable groupid, String text) { - return sendEachMessage(groupid, text, true); - } - - /** - * 给指定groupid的WebSocketGroup下所有WebSocket节点发送二进制消息 - * - * @param groupid groupid - * @param data 不可为空 - * - * @return 为0表示成功, 其他值表示异常 - */ - public final CompletableFuture sendEachMessage(Serializable groupid, byte[] data) { - return sendEachMessage(groupid, data, true); - } - - /** - * 给指定groupid的WebSocketGroup下所有WebSocket节点发送可JavaBean对象消息 + * 给指定groupid的WebSocketGroup下所有WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息 * * @param groupid groupid * @param message 不可为空 @@ -259,33 +188,7 @@ public abstract class WebSocket { } /** - * 给指定groupid的WebSocketGroup下所有WebSocket节点发送文本消息 - * - * @param groupid groupid - * @param text 不可为空 - * @param last 是否最后一条 - * - * @return 为0表示成功, 其他值表示异常 - */ - public final CompletableFuture sendEachMessage(Serializable groupid, String text, boolean last) { - return sendMessage(groupid, false, text, last); - } - - /** - * 给指定groupid的WebSocketGroup下所有WebSocket节点发送二进制消息 - * - * @param groupid groupid - * @param data 不可为空 - * @param last 是否最后一条 - * - * @return 为0表示成功, 其他值表示异常 - */ - public final CompletableFuture sendEachMessage(Serializable groupid, byte[] data, boolean last) { - return sendMessage(groupid, false, data, last); - } - - /** - * 给指定groupid的WebSocketGroup下所有WebSocket节点发送可JavaBean对象消息 + * 给指定groupid的WebSocketGroup下所有WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息 * * @param groupid groupid * @param message 不可为空 @@ -298,31 +201,7 @@ public abstract class WebSocket { } /** - * 给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送文本消息 - * - * @param groupid groupid - * @param text 不可为空 - * - * @return 为0表示成功, 其他值表示异常 - */ - public final CompletableFuture sendRecentMessage(Serializable groupid, String text) { - return sendRecentMessage(groupid, text, true); - } - - /** - * 给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送二进制消息 - * - * @param groupid groupid - * @param data 不可为空 - * - * @return 为0表示成功, 其他值表示异常 - */ - public final CompletableFuture sendRecentMessage(Serializable groupid, byte[] data) { - return sendRecentMessage(groupid, data, true); - } - - /** - * 给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送可JavaBean对象消息 + * 给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息 * * @param groupid groupid * @param message 不可为空 @@ -334,33 +213,7 @@ public abstract class WebSocket { } /** - * 给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送文本消息 - * - * @param groupid groupid - * @param text 不可为空 - * @param last 是否最后一条 - * - * @return 为0表示成功, 其他值表示异常 - */ - public final CompletableFuture sendRecentMessage(Serializable groupid, String text, boolean last) { - return sendMessage(groupid, true, text, last); - } - - /** - * 给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送二进制消息 - * - * @param groupid groupid - * @param data 不可为空 - * @param last 是否最后一条 - * - * @return 为0表示成功, 其他值表示异常 - */ - public final CompletableFuture sendRecentMessage(Serializable groupid, byte[] data, boolean last) { - return sendMessage(groupid, true, data, last); - } - - /** - * 给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送可JavaBean对象消息 + * 给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息 * * @param groupid groupid * @param message 不可为空 @@ -372,21 +225,17 @@ public abstract class WebSocket { return sendMessage(groupid, true, message, last); } - private CompletableFuture sendMessage(Serializable groupid, boolean recent, String text, boolean last) { - if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL); - CompletableFuture rs = _engine.node.sendMessage(groupid, recent, text, last); - if (_engine.finest) _engine.logger.finest("wsgroupid:" + groupid + " " + (recent ? "recent " : "") + "send websocket result is " + rs + " on " + this + " by message(" + text + ")"); - return rs; - } - - private CompletableFuture sendMessage(Serializable groupid, boolean recent, byte[] data, boolean last) { - if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL); - CompletableFuture rs = _engine.node.sendMessage(groupid, recent, data, last); - if (_engine.finest) _engine.logger.finest("wsgroupid:" + groupid + " " + (recent ? "recent " : "") + "send websocket result is " + rs + " on " + this + " by message(byte[" + data.length + "])"); - return rs; - } - - private CompletableFuture sendMessage(Serializable groupid, boolean recent, Object message, boolean last) { + /** + * 给指定groupid的WebSocketGroup下WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息 + * + * @param groupid groupid + * @param recent 是否只发最近接入的WebSocket + * @param message 不可为空 + * @param last 是否最后一条 + * + * @return 为0表示成功, 其他值表示异常 + */ + public final CompletableFuture sendMessage(Serializable groupid, boolean recent, Object message, boolean last) { if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL); CompletableFuture rs = _engine.node.sendMessage(groupid, recent, message, last); if (_engine.finest) _engine.logger.finest("wsgroupid:" + groupid + " " + (recent ? "recent " : "") + "send websocket result is " + rs + " on " + this + " by message(" + _jsonConvert.convertTo(message) + ")"); @@ -522,30 +371,60 @@ public abstract class WebSocket { public void onRead(AsyncConnection channel) { } + /** + * WebSokcet连接成功后的回调方法 + */ public void onConnected() { } + /** + * ping后的回调方法 + * + * @param bytes 数据 + */ public void onPing(byte[] bytes) { } + /** + * pong后的回调方法 + * + * @param bytes 数据 + */ public void onPong(byte[] bytes) { } - public void onMessage(T message) { + /** + * 接收到消息的回调方法 + * + * @param message 消息 + * @param last 是否最后一条 + */ + public void onMessage(T message, boolean last) { } - public void onMessage(byte[] bytes) { - } - - public void onFragment(T message, boolean last) { - } - - public void onFragment(byte[] bytes, boolean last) { + /** + * 接收到二进制消息的回调方法 + * + * @param bytes 消息 + * @param last 是否最后一条 + */ + public void onMessage(byte[] bytes, boolean last) { } + /** + * 关闭的回调方法,调用此方法时WebSocket已经被关闭 + * + * @param code 结果码,非0表示非正常关闭 + * @param reason 关闭原因 + */ public void onClose(int code, String reason) { } + /** + * 获取最后一次发送消息的时间 + * + * @return long + */ public long getLastSendTime() { return this._runner == null ? 0 : this._runner.lastSendTime; } diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index c4fb1eb84..92098d452 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -141,6 +141,27 @@ public abstract class WebSocketNode { return rs; } + //-------------------------------------------------------------------------------- + public final CompletableFuture sendEachMessage(Serializable groupid, Object message) { + return sendMessage(groupid, false, message, true); + } + + public final CompletableFuture sendEachMessage(Serializable groupid, Object message, boolean last) { + return sendMessage(groupid, false, message, last); + } + + public final CompletableFuture sendRecentMessage(Serializable groupid, Object message) { + return sendMessage(groupid, true, message, true); + } + + public final CompletableFuture sendRecentMessage(Serializable groupid, Object message, boolean last) { + return sendMessage(groupid, true, message, last); + } + + public final CompletableFuture sendMessage(Serializable groupid, boolean recent, Object message) { + return sendMessage(groupid, recent, message, true); + } + /** * 向指定用户发送消息,先发送本地连接,再发送远程连接
* 如果当前WebSocketNode是远程模式,此方法只发送远程连接 @@ -192,75 +213,4 @@ public abstract class WebSocketNode { }); } - //-------------------------------------------------------------------------------- - public final CompletableFuture sendEachMessage(Serializable groupid, String text) { - return sendMessage(groupid, false, (Object) text, true); - } - - public final CompletableFuture sendEachMessage(Serializable groupid, String text, boolean last) { - return sendMessage(groupid, false, (Object) text, last); - } - - public final CompletableFuture sendRecentMessage(Serializable groupid, String text) { - return sendMessage(groupid, true, (Object) text, true); - } - - public final CompletableFuture sendRecentMessage(Serializable groupid, String text, boolean last) { - return sendMessage(groupid, true, (Object) text, last); - } - - public final CompletableFuture sendMessage(Serializable groupid, boolean recent, String text) { - return sendMessage(groupid, recent, (Object) text, true); - } - - public final CompletableFuture sendMessage(Serializable groupid, boolean recent, String text, boolean last) { - return sendMessage(groupid, recent, (Object) text, last); - } - - //-------------------------------------------------------------------------------- - public final CompletableFuture sendEachMessage(Serializable groupid, byte[] data) { - return sendMessage(groupid, false, (Object) data, true); - } - - public final CompletableFuture sendEachMessage(Serializable groupid, byte[] data, boolean last) { - return sendMessage(groupid, false, (Object) data, last); - } - - public final CompletableFuture sendRecentMessage(Serializable groupid, byte[] data) { - return sendMessage(groupid, true, (Object) data, true); - } - - public final CompletableFuture sendRecentMessage(Serializable groupid, byte[] data, boolean last) { - return sendMessage(groupid, true, (Object) data, last); - } - - public final CompletableFuture sendMessage(Serializable groupid, boolean recent, byte[] data) { - return sendMessage(groupid, recent, data, true); - } - - public final CompletableFuture sendMessage(Serializable groupid, boolean recent, byte[] data, boolean last) { - return sendMessage(groupid, recent, (Object) data, last); - } - - //-------------------------------------------------------------------------------- - public final CompletableFuture sendEachMessage(Serializable groupid, Object message) { - return sendMessage(groupid, false, message, true); - } - - public final CompletableFuture sendEachMessage(Serializable groupid, Object message, boolean last) { - return sendMessage(groupid, false, message, last); - } - - public final CompletableFuture sendRecentMessage(Serializable groupid, Object message) { - return sendMessage(groupid, true, message, true); - } - - public final CompletableFuture sendRecentMessage(Serializable groupid, Object message, boolean last) { - return sendMessage(groupid, true, message, last); - } - - public final CompletableFuture sendMessage(Serializable groupid, boolean recent, Object message) { - return sendMessage(groupid, recent, message, true); - } - } diff --git a/src/org/redkale/net/http/WebSocketRunner.java b/src/org/redkale/net/http/WebSocketRunner.java index ed40c7c5c..02fb03ef7 100644 --- a/src/org/redkale/net/http/WebSocketRunner.java +++ b/src/org/redkale/net/http/WebSocketRunner.java @@ -122,14 +122,14 @@ public class WebSocketRunner implements Runnable { readBuffer.clear(); channel.read(readBuffer, null, this); } - webSocket.onMessage(message); + webSocket.onMessage(message, packet.last); } else if (packet.type == FrameType.BINARY) { byte[] message = convert.convertFrom(byte[].class, packet.receiveMasker, packet.receiveBuffers); if (readBuffer != null) { readBuffer.clear(); channel.read(readBuffer, null, this); } - webSocket.onMessage(message); + webSocket.onMessage(message, packet.last); } else if (packet.type == FrameType.PONG) { byte[] message = convert.convertFrom(byte[].class, packet.receiveMasker, packet.receiveBuffers); if (readBuffer != null) { diff --git a/test/org/redkale/test/websocket/ChatWebSocketServlet.java b/test/org/redkale/test/websocket/ChatWebSocketServlet.java index 19b4d150d..cff4f8605 100644 --- a/test/org/redkale/test/websocket/ChatWebSocketServlet.java +++ b/test/org/redkale/test/websocket/ChatWebSocketServlet.java @@ -56,7 +56,7 @@ public class ChatWebSocketServlet extends WebSocketServlet { return new WebSocket() { @Override - public void onMessage(ChatMessage message) { + public void onMessage(ChatMessage message, boolean last) { icounter.incrementAndGet(); counter.incrementAndGet(); if (debug) System.out.println("收到消息: " + message); diff --git a/test/org/redkale/test/websocket/VideoWebSocketServlet.java b/test/org/redkale/test/websocket/VideoWebSocketServlet.java index 22b63684a..a7aa602c7 100644 --- a/test/org/redkale/test/websocket/VideoWebSocketServlet.java +++ b/test/org/redkale/test/websocket/VideoWebSocketServlet.java @@ -87,7 +87,7 @@ public class VideoWebSocketServlet extends WebSocketServlet { } @Override - public void onMessage(Object text) { + public void onMessage(Object text, boolean last) { //System.out.println("接收到消息: " + text); super.getWebSocketGroup().getWebSockets().filter(x -> x != this).forEach(x -> { x.send(text);