diff --git a/src/org/redkale/net/http/WebSocket.java b/src/org/redkale/net/http/WebSocket.java index 4af3357aa..b53fbd129 100644 --- a/src/org/redkale/net/http/WebSocket.java +++ b/src/org/redkale/net/http/WebSocket.java @@ -13,21 +13,21 @@ import org.redkale.net.*; /** * 一个WebSocket连接对应一个WebSocket实体,即一个WebSocket会绑定一个TCP连接。 - * WebSocket 有两种模式: - * 1) 普通模式: 协议上符合HTML5规范, 其流程顺序如下: - * 1.1 onOpen 如果方法返回null,则视为WebSocket的连接不合法,框架会强制关闭WebSocket连接;通常用于判断登录态。 - * 1.2 createGroupid 如果方法返回null,则视为WebSocket的连接不合法,框架会强制关闭WebSocket连接;通常用于判断用户权限是否符合。 - * 1.3 onConnected WebSocket成功连接后在准备接收数据前回调此方法。 - * 1.4 onMessage/onFragment+ WebSocket接收到消息后回调此消息类方法。 - * 1.5 onClose WebSocket被关闭后回调此方法。 - * + * WebSocket 有两种模式: + * 1) 普通模式: 协议上符合HTML5规范, 其流程顺序如下: + * 1.1 onOpen 如果方法返回null,则视为WebSocket的连接不合法,框架会强制关闭WebSocket连接;通常用于判断登录态。 + * 1.2 createGroupid 如果方法返回null,则视为WebSocket的连接不合法,框架会强制关闭WebSocket连接;通常用于判断用户权限是否符合。 + * 1.3 onConnected WebSocket成功连接后在准备接收数据前回调此方法。 + * 1.4 onMessage/onFragment+ WebSocket接收到消息后回调此消息类方法。 + * 1.5 onClose WebSocket被关闭后回调此方法。 + * * 此模式下 以上方法都应该被重载。 - * - * 2) 原始二进制模式: 此模式有别于HTML5规范,可以视为原始的TCP连接。通常用于音频视频通讯场景。期流程顺序如下: - * 2.1 onOpen 如果方法返回null,则视为WebSocket的连接不合法,框架会强制关闭WebSocket连接;通常用于判断登录态。 - * 2.2 createGroupid 如果方法返回null,则视为WebSocket的连接不合法,框架会强制关闭WebSocket连接;通常用于判断用户权限是否符合。 - * 2.3 onRead WebSocket成功连接后回调此方法, 由此方法处理原始的TCP连接, 同时业务代码去控制WebSocket的关闭。 - * + * + * 2) 原始二进制模式: 此模式有别于HTML5规范,可以视为原始的TCP连接。通常用于音频视频通讯场景。期流程顺序如下: + * 2.1 onOpen 如果方法返回null,则视为WebSocket的连接不合法,框架会强制关闭WebSocket连接;通常用于判断登录态。 + * 2.2 createGroupid 如果方法返回null,则视为WebSocket的连接不合法,框架会强制关闭WebSocket连接;通常用于判断用户权限是否符合。 + * 2.3 onRead WebSocket成功连接后回调此方法, 由此方法处理原始的TCP连接, 同时业务代码去控制WebSocket的关闭。 + * * 此模式下 以上方法都应该被重载。 *

* @@ -56,17 +56,15 @@ public abstract class WebSocket { public static final int RETCODE_WSOFFLINE = 1 << 8; //256 - WebSocketRunner runner; + WebSocketRunner runner; //不可能为空 - WebSocketEngine engine; + WebSocketEngine _engine; //不可能为空 - WebSocketGroup group; + WebSocketGroup _group; //不可能为空 - WebSocketNode node; + Serializable _sessionid; //不可能为空 - Serializable sessionid; - - Serializable groupid; + Serializable _groupid; //不可能为空 private final long createtime = System.currentTimeMillis(); @@ -80,7 +78,7 @@ public abstract class WebSocket { * 发送消息体, 包含二进制/文本 *

* @param packet - * @return + * @return */ public final int send(WebSocketPacket packet) { if (this.runner != null) return this.runner.sendMessage(packet); @@ -98,7 +96,7 @@ public abstract class WebSocket { * 发送单一的文本消息 *

* @param text 不可为空 - * @return + * @return */ public final int send(String text) { return send(text, true); @@ -109,7 +107,7 @@ public abstract class WebSocket { *

* @param text 不可为空 * @param last 是否最后一条 - * @return + * @return */ public final int send(String text, boolean last) { return send(new WebSocketPacket(text, last)); @@ -119,7 +117,7 @@ public abstract class WebSocket { * 发送单一的二进制消息 *

* @param data - * @return + * @return */ public final int send(byte[] data) { return send(data, true); @@ -142,7 +140,7 @@ public abstract class WebSocket { *

* @param data 不可为空 * @param last 是否最后一条 - * @return + * @return */ public final int send(byte[] data, boolean last) { return send(new WebSocketPacket(data, last)); @@ -152,8 +150,8 @@ public abstract class WebSocket { * 发送消息, 消息类型是String或byte[] *

* @param message 不可为空, 只能是String或者byte[] - * @param last 是否最后一条 - * @return + * @param last 是否最后一条 + * @return */ public final int send(Serializable message, boolean last) { return send(new WebSocketPacket(message, last)); @@ -253,13 +251,13 @@ public abstract class WebSocket { } private int sendMessage(Serializable groupid, boolean recent, String text, boolean last) { - if (node == null) return RETCODE_NODESERVICE_NULL; - return node.sendMessage(groupid, recent, text, last); + if (_engine.node == null) return RETCODE_NODESERVICE_NULL; + return _engine.node.sendMessage(groupid, recent, text, last); } private int sendMessage(Serializable groupid, boolean recent, byte[] data, boolean last) { - if (node == null) return RETCODE_NODESERVICE_NULL; - return node.sendMessage(groupid, recent, data, last); + if (_engine.node == null) return RETCODE_NODESERVICE_NULL; + return _engine.node.sendMessage(groupid, recent, data, last); } /** @@ -301,7 +299,7 @@ public abstract class WebSocket { * @return */ public final Serializable getGroupid() { - return groupid; + return _groupid; } /** @@ -310,7 +308,7 @@ public abstract class WebSocket { * @return */ public final Serializable getSessionid() { - return sessionid; + return _sessionid; } //------------------------------------------------------------------- @@ -320,7 +318,7 @@ public abstract class WebSocket { * @return */ protected final WebSocketGroup getWebSocketGroup() { - return group; + return _group; } /** @@ -330,11 +328,11 @@ public abstract class WebSocket { * @return */ protected final WebSocketGroup getWebSocketGroup(Serializable groupid) { - return engine.getWebSocketGroup(groupid); + return _engine.getWebSocketGroup(groupid); } protected final Collection getWebSocketGroups() { - return engine.getWebSocketGroups(); + return _engine.getWebSocketGroups(); } //------------------------------------------------------------------- diff --git a/src/org/redkale/net/http/WebSocketEngine.java b/src/org/redkale/net/http/WebSocketEngine.java index 4a16c1e17..4d9509a51 100644 --- a/src/org/redkale/net/http/WebSocketEngine.java +++ b/src/org/redkale/net/http/WebSocketEngine.java @@ -27,6 +27,8 @@ public final class WebSocketEngine { private final String engineid; + protected final WebSocketNode node; + private final Map containers = new ConcurrentHashMap<>(); private ScheduledThreadPoolExecutor scheduler; @@ -35,8 +37,9 @@ public final class WebSocketEngine { protected final boolean finest; - protected WebSocketEngine(String engineid, Logger logger) { + protected WebSocketEngine(String engineid, WebSocketNode node, Logger logger) { this.engineid = engineid; + this.node = node; this.logger = logger; this.index = sequence.getAndIncrement(); this.finest = logger.isLoggable(Level.FINEST); @@ -60,19 +63,26 @@ public final class WebSocketEngine { } void add(WebSocket socket) { - WebSocketGroup group = containers.get(socket.groupid); + WebSocketGroup group = containers.get(socket._groupid); if (group == null) { - group = new WebSocketGroup(socket.groupid); - containers.put(socket.groupid, group); + group = new WebSocketGroup(socket._groupid); + containers.putIfAbsent(socket._groupid, group); } group.add(socket); + if (node != null) node.connect(socket._groupid, engineid); } void remove(WebSocket socket) { - WebSocketGroup group = containers.get(socket.groupid); - if (group == null) return; + final WebSocketGroup group = containers.get(socket._groupid); + if (group == null) { + if (node != null) node.disconnect(socket._groupid, engineid); + return; + } group.remove(socket); - if (group.isEmpty()) containers.remove(socket.groupid); + if (group.isEmpty()) { + containers.remove(socket._groupid); + if (node != null) node.disconnect(socket._groupid, engineid); + } } Collection getWebSocketGroups() { diff --git a/src/org/redkale/net/http/WebSocketGroup.java b/src/org/redkale/net/http/WebSocketGroup.java index 6ddb35814..8d00ef97e 100644 --- a/src/org/redkale/net/http/WebSocketGroup.java +++ b/src/org/redkale/net/http/WebSocketGroup.java @@ -42,7 +42,7 @@ public final class WebSocketGroup { } void add(WebSocket socket) { - socket.group = this; + socket._group = this; this.recentWebSocket = socket; list.add(socket); } diff --git a/src/org/redkale/net/http/WebSocketRunner.java b/src/org/redkale/net/http/WebSocketRunner.java index 27f4aa495..90842a2c4 100644 --- a/src/org/redkale/net/http/WebSocketRunner.java +++ b/src/org/redkale/net/http/WebSocketRunner.java @@ -48,7 +48,7 @@ public class WebSocketRunner implements Runnable { public WebSocketRunner(Context context, WebSocket webSocket, AsyncConnection channel, final boolean wsbinary) { this.context = context; - this.engine = webSocket.engine; + this.engine = webSocket._engine; this.webSocket = webSocket; this.channel = channel; this.wsbinary = wsbinary; @@ -63,7 +63,6 @@ public class WebSocketRunner implements Runnable { public void run() { final boolean debug = this.coder.debugable; try { - if (webSocket.node != null) webSocket.node.connect(webSocket.groupid, webSocket.engine.getEngineid()); webSocket.onConnected(); channel.setReadTimeoutSecond(300); //读取超时5分钟 if (channel.isOpen()) { @@ -119,7 +118,7 @@ public class WebSocketRunner implements Runnable { readBuffer.clear(); channel.read(readBuffer, null, this); } - webSocket.group.setRecentWebSocket(webSocket); + webSocket._group.setRecentWebSocket(webSocket); try { if (packet.type == FrameType.TEXT) { webSocket.onMessage(packet.getPayload()); @@ -244,10 +243,6 @@ public class WebSocketRunner implements Runnable { readBuffer = null; writeBuffer = null; engine.remove(webSocket); - if (webSocket.node != null) { - WebSocketGroup group = webSocket.getWebSocketGroup(); - if (group == null || group.isEmpty()) webSocket.node.disconnect(webSocket.groupid, webSocket.engine.getEngineid()); - } webSocket.onClose(0, null); } } @@ -370,27 +365,28 @@ public class WebSocketRunner implements Runnable { private Logger logger; /** - 0 1 2 3 - 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 - +-+-+-+-+-------+-+-------------+-------------------------------+ - |F|R|R|R| opcode|M| Payload len | Extended payload length | - |I|S|S|S| (4) |A| (7) | (16/64) | - |N|V|V|V| |S| | (if payload len==126/127) | - | |1|2|3| |K| | | - +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - + - | Extended payload length continued, if payload len == 127 | - + - - - - - - - - - - - - - - - +-------------------------------+ - | |Masking-key, if MASK set to 1 | - +-------------------------------+-------------------------------+ - | Masking-key (continued) | Payload Data | - +-------------------------------- - - - - - - - - - - - - - - - + - : Payload Data continued : - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - | Payload Data continued | - +-----------------------------------------------------------------------+ - @param buffer - @param exbuffers - @return + * 0 1 2 3 + * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + * +-+-+-+-+-------+-+-------------+-------------------------------+ + * |F|R|R|R| opcode|M| Payload len | Extended payload length | + * |I|S|S|S| (4) |A| (7) | (16/64) | + * |N|V|V|V| |S| | (if payload len==126/127) | + * | |1|2|3| |K| | | + * +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - + + * | Extended payload length continued, if payload len == 127 | + * + - - - - - - - - - - - - - - - +-------------------------------+ + * | |Masking-key, if MASK set to 1 | + * +-------------------------------+-------------------------------+ + * | Masking-key (continued) | Payload Data | + * +-------------------------------- - - - - - - - - - - - - - - - + + * : Payload Data continued : + * + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + * | Payload Data continued | + * +-----------------------------------------------------------------------+ + * + * @param buffer + * @param exbuffers + * @return */ public WebSocketPacket decode(final ByteBuffer buffer, ByteBuffer... exbuffers) { final boolean debug = this.debugable; diff --git a/src/org/redkale/net/http/WebSocketServlet.java b/src/org/redkale/net/http/WebSocketServlet.java index 01664bb35..b0c81cf29 100644 --- a/src/org/redkale/net/http/WebSocketServlet.java +++ b/src/org/redkale/net/http/WebSocketServlet.java @@ -63,7 +63,7 @@ public abstract class WebSocketServlet extends HttpServlet { @Override public void init(Context context, AnyValue conf) { InetSocketAddress addr = context.getServerAddress(); - this.engine = new WebSocketEngine(addr.getHostString() + ":" + addr.getPort() + "-" + name(), logger); + this.engine = new WebSocketEngine(addr.getHostString() + ":" + addr.getPort() + "-" + name(), this.node, logger); this.node.putWebSocketEngine(engine); this.node.init(conf); this.engine.init(conf); @@ -97,15 +97,14 @@ public abstract class WebSocketServlet extends HttpServlet { return; } final WebSocket webSocket = this.createWebSocket(); - webSocket.engine = engine; - webSocket.node = node; + webSocket._engine = engine; Serializable sessionid = webSocket.onOpen(request); if (sessionid == null) { if (debug) logger.finest("WebSocket connect abort, Not found sessionid. request=" + request); response.finish(true); return; } - webSocket.sessionid = sessionid; + webSocket._sessionid = sessionid; request.setKeepAlive(true); byte[] bytes = (key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11").getBytes(); synchronized (digest) { @@ -127,7 +126,7 @@ public abstract class WebSocketServlet extends HttpServlet { response.finish(true); return; } - webSocket.groupid = groupid; + webSocket._groupid = groupid; engine.add(webSocket); context.submit(new WebSocketRunner(context, webSocket, response.removeChannel(), wsbinary)); response.finish(true);