From b27bbb7836fb2d57177d1add19469f661328d1ee Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Fri, 31 Mar 2017 08:11:46 +0800 Subject: [PATCH] --- src/org/redkale/net/http/WebSocket.java | 85 +++++++++++++++++-- src/org/redkale/net/http/WebSocketEngine.java | 4 +- src/org/redkale/net/http/WebSocketGroup.java | 13 +-- src/org/redkale/net/http/WebSocketNode.java | 48 ++++++++--- .../redkale/net/http/WebSocketServlet.java | 5 ++ .../redkale/service/WebSocketNodeService.java | 2 +- 6 files changed, 132 insertions(+), 25 deletions(-) diff --git a/src/org/redkale/net/http/WebSocket.java b/src/org/redkale/net/http/WebSocket.java index 79fd2cad2..3dfaeb50b 100644 --- a/src/org/redkale/net/http/WebSocket.java +++ b/src/org/redkale/net/http/WebSocket.java @@ -10,6 +10,7 @@ import java.io.*; import java.net.*; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import org.redkale.convert.json.JsonConvert; import org.redkale.net.*; import org.redkale.util.Comment; @@ -76,6 +77,8 @@ public abstract class WebSocket { String _remoteAddr;//不可能为空 + JsonConvert _jsonConvert; //不可能为空 + private final long createtime = System.currentTimeMillis(); private final Map attributes = new ConcurrentHashMap<>(); @@ -162,15 +165,30 @@ public abstract class WebSocket { } /** - * 给自身发送消息, 消息类型是String或byte[] + * 给自身发送消息, 消息类型是String或byte[]或可JSON化对象 * - * @param message 不可为空, 只能是String或者byte[] + * @param message 不可为空, 只能是String或byte[]或可JSON化对象 + * + * @return 0表示成功, 非0表示错误码 + */ + public final int send(Object message) { + return send(message, true); + } + + /** + * 给自身发送消息, 消息类型是String或byte[]或可JSON化对象 + * + * @param message 不可为空, 只能是String或byte[]或可JSON化对象 * @param last 是否最后一条 * * @return 0表示成功, 非0表示错误码 */ - public final int send(Serializable message, boolean last) { - return send(new WebSocketPacket(message, last)); + public final int send(Object message, boolean last) { + if (message == null || message instanceof CharSequence || message instanceof byte[]) { + return send(new WebSocketPacket((Serializable) message, last)); + } else { + return send(new WebSocketPacket(_jsonConvert.convertTo(message), last)); + } } //---------------------------------------------------------------- @@ -195,7 +213,19 @@ public abstract class WebSocket { * @return 为0表示成功, 其他值表示异常 */ public final int sendEachMessage(Serializable groupid, byte[] data) { - return WebSocket.this.sendEachMessage(groupid, data, true); + return sendEachMessage(groupid, data, true); + } + + /** + * 给指定groupid的WebSocketGroup下所有WebSocket节点发送可JSON化对象消息 + * + * @param groupid groupid + * @param message 不可为空 + * + * @return 为0表示成功, 其他值表示异常 + */ + public final int sendEachMessage(Serializable groupid, Object message) { + return sendEachMessage(groupid, message, true); } /** @@ -224,6 +254,19 @@ public abstract class WebSocket { return sendMessage(groupid, false, data, last); } + /** + * 给指定groupid的WebSocketGroup下所有WebSocket节点发送可JSON化对象消息 + * + * @param groupid groupid + * @param message 不可为空 + * @param last 是否最后一条 + * + * @return 为0表示成功, 其他值表示异常 + */ + public final int sendEachMessage(Serializable groupid, Object message, boolean last) { + return sendMessage(groupid, false, message, last); + } + /** * 给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送文本消息 * @@ -248,6 +291,18 @@ public abstract class WebSocket { return sendRecentMessage(groupid, data, true); } + /** + * 给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送可JSON化对象消息 + * + * @param groupid groupid + * @param message 不可为空 + * + * @return 为0表示成功, 其他值表示异常 + */ + public final int sendRecentMessage(Serializable groupid, Object message) { + return sendMessage(groupid, true, message, true); + } + /** * 给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送文本消息 * @@ -274,6 +329,19 @@ public abstract class WebSocket { return sendMessage(groupid, true, data, last); } + /** + * 给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送可JSON化对象消息 + * + * @param groupid groupid + * @param message 不可为空 + * @param last 是否最后一条 + * + * @return 为0表示成功, 其他值表示异常 + */ + public final int sendRecentMessage(Serializable groupid, Object message, boolean last) { + return sendMessage(groupid, true, message, last); + } + private int sendMessage(Serializable groupid, boolean recent, String text, boolean last) { if (_engine.node == null) return RETCODE_NODESERVICE_NULL; int rs = _engine.node.sendMessage(groupid, recent, text, last); @@ -288,6 +356,13 @@ public abstract class WebSocket { return rs; } + private int sendMessage(Serializable groupid, boolean recent, Object message, boolean last) { + if (_engine.node == null) return RETCODE_NODESERVICE_NULL; + int rs = _engine.node.sendMessage(groupid, recent, message, last); + if (_engine.finest) _engine.logger.finest("wsgroupid:" + groupid + " " + (recent ? "recent " : "") + "send websocket result is " + rs + " on " + this + " by message(" + _jsonConvert.convertTo(message) + ")"); + return rs; + } + /** * 获取指定groupid在线用户的节点地址列表 * diff --git a/src/org/redkale/net/http/WebSocketEngine.java b/src/org/redkale/net/http/WebSocketEngine.java index 9d1e58898..a68990a5e 100644 --- a/src/org/redkale/net/http/WebSocketEngine.java +++ b/src/org/redkale/net/http/WebSocketEngine.java @@ -15,7 +15,9 @@ import org.redkale.util.*; /** * - *

详情见: https://redkale.org + *

+ * 详情见: https://redkale.org + * * @author zhangjx */ public final class WebSocketEngine { diff --git a/src/org/redkale/net/http/WebSocketGroup.java b/src/org/redkale/net/http/WebSocketGroup.java index f6656e9aa..8cb110076 100644 --- a/src/org/redkale/net/http/WebSocketGroup.java +++ b/src/org/redkale/net/http/WebSocketGroup.java @@ -83,7 +83,7 @@ public final class WebSocketGroup { attributes.put(name, value); } - public final int send(boolean recent, Serializable message, boolean last) { + public final int send(boolean recent, Object message, boolean last) { if (recent) { return recentWebSocket.send(message, last); } else { @@ -91,7 +91,7 @@ public final class WebSocketGroup { } } - public final int sendEach(Serializable message) { + public final int sendEach(Object message) { return sendEach(message, true); } @@ -111,7 +111,7 @@ public final class WebSocketGroup { return rs; } - public final int sendRecent(Serializable message) { + public final int sendRecent(Object message) { return sendRecent(message, true); } @@ -119,7 +119,10 @@ public final class WebSocketGroup { return recentWebSocket.send(packet); } - public final int sendEach(Serializable message, boolean last) { + public final int sendEach(Object message, boolean last) { + if (message != null && !(message instanceof byte[]) && !(message instanceof CharSequence)) { + message = recentWebSocket._jsonConvert.convertTo(message); + } int rs = 0; for (WebSocket s : list) { rs |= s.send(message, last); @@ -127,7 +130,7 @@ public final class WebSocketGroup { return rs; } - public final int sendRecent(Serializable message, boolean last) { + public final int sendRecent(Object message, boolean last) { return recentWebSocket.send(message, last); } diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index 610915328..6d5300719 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -60,7 +60,7 @@ public abstract class WebSocketNode { protected abstract List getOnlineRemoteAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid); - protected abstract int sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid, boolean recent, Serializable message, boolean last); + protected abstract int sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid, boolean recent, Object message, boolean last); protected abstract void connect(Serializable groupid, InetSocketAddress addr); @@ -133,7 +133,7 @@ public abstract class WebSocketNode { engines.put(engine.getEngineid(), engine); } - public final int sendMessage(Serializable groupid, boolean recent, Serializable message, boolean last) { + public final int sendMessage(Serializable groupid, boolean recent, Object message, boolean last) { final Set engineids = localNodes.get(groupid); if (finest) logger.finest("websocket want send message {groupid:" + groupid + ", content:'" + message + "'} from locale node to " + engineids); int rscode = RETCODE_GROUP_EMPTY; @@ -187,44 +187,44 @@ public abstract class WebSocketNode { //-------------------------------------------------------------------------------- public final int sendEachMessage(Serializable groupid, String text) { - return sendMessage(groupid, false, text); + return sendMessage(groupid, false, (Object) text, true); } public final int sendEachMessage(Serializable groupid, String text, boolean last) { - return sendMessage(groupid, false, text, last); + return sendMessage(groupid, false, (Object) text, last); } public final int sendRecentMessage(Serializable groupid, String text) { - return sendMessage(groupid, true, text); + return sendMessage(groupid, true, (Object) text, true); } public final int sendRecentMessage(Serializable groupid, String text, boolean last) { - return sendMessage(groupid, true, text, last); + return sendMessage(groupid, true, (Object) text, last); } public final int sendMessage(Serializable groupid, boolean recent, String text) { - return sendMessage(groupid, recent, text, true); + return sendMessage(groupid, recent, (Object) text, true); } public final int sendMessage(Serializable groupid, boolean recent, String text, boolean last) { - return sendMessage(groupid, recent, (Serializable) text, last); + return sendMessage(groupid, recent, (Object) text, last); } //-------------------------------------------------------------------------------- public final int sendEachMessage(Serializable groupid, byte[] data) { - return sendMessage(groupid, false, data); + return sendMessage(groupid, false, (Object) data, true); } public final int sendEachMessage(Serializable groupid, byte[] data, boolean last) { - return sendMessage(groupid, false, data, last); + return sendMessage(groupid, false, (Object) data, last); } public final int sendRecentMessage(Serializable groupid, byte[] data) { - return sendMessage(groupid, true, data); + return sendMessage(groupid, true, (Object) data, true); } public final int sendRecentMessage(Serializable groupid, byte[] data, boolean last) { - return sendMessage(groupid, true, data, last); + return sendMessage(groupid, true, (Object) data, last); } public final int sendMessage(Serializable groupid, boolean recent, byte[] data) { @@ -232,6 +232,28 @@ public abstract class WebSocketNode { } public final int sendMessage(Serializable groupid, boolean recent, byte[] data, boolean last) { - return sendMessage(groupid, recent, (Serializable) data, last); + return sendMessage(groupid, recent, (Object) data, last); } + + //-------------------------------------------------------------------------------- + public final int sendEachMessage(Serializable groupid, Object message) { + return sendMessage(groupid, false, message, true); + } + + public final int sendEachMessage(Serializable groupid, Object message, boolean last) { + return sendMessage(groupid, false, message, last); + } + + public final int sendRecentMessage(Serializable groupid, Object message) { + return sendMessage(groupid, true, message, true); + } + + public final int sendRecentMessage(Serializable groupid, Object message, boolean last) { + return sendMessage(groupid, true, message, last); + } + + public final int sendMessage(Serializable groupid, boolean recent, Object message) { + return sendMessage(groupid, recent, message, true); + } + } diff --git a/src/org/redkale/net/http/WebSocketServlet.java b/src/org/redkale/net/http/WebSocketServlet.java index 0acfa50e5..40bce9686 100644 --- a/src/org/redkale/net/http/WebSocketServlet.java +++ b/src/org/redkale/net/http/WebSocketServlet.java @@ -12,6 +12,7 @@ import java.security.*; import java.util.*; import java.util.logging.*; import javax.annotation.*; +import org.redkale.convert.json.JsonConvert; import org.redkale.service.WebSocketNodeService; import org.redkale.util.*; @@ -62,6 +63,9 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl @Comment("是否用于二进制流传输") protected final boolean wsbinary = getClass().getAnnotation(WebSocketBinary.class) != null; + @Resource + protected JsonConvert jsonConvert; + @Resource(name = "$") protected WebSocketNode node; @@ -109,6 +113,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl } final WebSocket webSocket = this.createWebSocket(); webSocket._engine = engine; + webSocket._jsonConvert = jsonConvert; webSocket._remoteAddress = request.getRemoteAddress(); webSocket._remoteAddr = request.getRemoteAddr(); Serializable sessionid = webSocket.onOpen(request); diff --git a/src/org/redkale/service/WebSocketNodeService.java b/src/org/redkale/service/WebSocketNodeService.java index 77dafd8ba..412057ba3 100644 --- a/src/org/redkale/service/WebSocketNodeService.java +++ b/src/org/redkale/service/WebSocketNodeService.java @@ -49,7 +49,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service { } @Override - public int sendMessage(@RpcTargetAddress InetSocketAddress addr, Serializable groupid, boolean recent, Serializable message, boolean last) { + public int sendMessage(@RpcTargetAddress InetSocketAddress addr, Serializable groupid, boolean recent, Object message, boolean last) { final Set engineids = localNodes.get(groupid); if (engineids == null || engineids.isEmpty()) return RETCODE_GROUP_EMPTY; int code = RETCODE_GROUP_EMPTY;