From e90f2e4142e77a88b5e224863040eed2dd26b548 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Thu, 14 Mar 2019 15:27:05 +0800 Subject: [PATCH] =?UTF-8?q?WebSocket=E5=A2=9E=E5=8A=A0mergemsg=E5=B1=9E?= =?UTF-8?q?=E6=80=A7=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/org/redkale/convert/Convert.java | 2 + src/org/redkale/convert/json/JsonConvert.java | 5 ++ src/org/redkale/net/http/Rest.java | 4 + src/org/redkale/net/http/RestWebSocket.java | 7 ++ src/org/redkale/net/http/WebSocketEngine.java | 8 +- src/org/redkale/net/http/WebSocketPacket.java | 73 +++++++++++++++---- src/org/redkale/net/http/WebSocketRunner.java | 14 +++- .../redkale/net/http/WebSocketServlet.java | 8 +- 8 files changed, 103 insertions(+), 18 deletions(-) diff --git a/src/org/redkale/convert/Convert.java b/src/org/redkale/convert/Convert.java index 4531d7a95..3c35a95d8 100644 --- a/src/org/redkale/convert/Convert.java +++ b/src/org/redkale/convert/Convert.java @@ -33,6 +33,8 @@ public abstract class Convert { public abstract boolean isBinary(); + public abstract T convertFrom(final Type type, final byte[] bytes); + public abstract T convertFrom(final Type type, final ByteBuffer... buffers); public abstract T convertFrom(final Type type, final ConvertMask mask, final ByteBuffer... buffers); diff --git a/src/org/redkale/convert/json/JsonConvert.java b/src/org/redkale/convert/json/JsonConvert.java index 3588e8bad..94821ce57 100644 --- a/src/org/redkale/convert/json/JsonConvert.java +++ b/src/org/redkale/convert/json/JsonConvert.java @@ -85,6 +85,11 @@ public final class JsonConvert extends TextConvert { } //------------------------------ convertFrom ----------------------------------------------------------- + public T convertFrom(final Type type, final byte[] bytes) { + if (bytes == null) return null; + return convertFrom(type, new String(bytes, StandardCharsets.UTF_8)); + } + public T convertFrom(final Type type, final String text) { if (text == null) return null; return convertFrom(type, Utility.charArray(text)); diff --git a/src/org/redkale/net/http/Rest.java b/src/org/redkale/net/http/Rest.java index a6f6c05da..cb90eb5d5 100644 --- a/src/org/redkale/net/http/Rest.java +++ b/src/org/redkale/net/http/Rest.java @@ -375,6 +375,10 @@ public final class Rest { pushInt(mv, rws.wsmaxbody()); mv.visitFieldInsn(PUTFIELD, newDynName, "wsmaxbody", "I"); + mv.visitVarInsn(ALOAD, 0); + mv.visitInsn(rws.mergemsg() ? ICONST_1 : ICONST_0); + mv.visitFieldInsn(PUTFIELD, newDynName, "mergemsg", "Z"); + mv.visitVarInsn(ALOAD, 0); mv.visitInsn(rws.single() ? ICONST_1 : ICONST_0); mv.visitFieldInsn(PUTFIELD, newDynName, "single", "Z"); diff --git a/src/org/redkale/net/http/RestWebSocket.java b/src/org/redkale/net/http/RestWebSocket.java index 1f921f868..0cdc18b44 100644 --- a/src/org/redkale/net/http/RestWebSocket.java +++ b/src/org/redkale/net/http/RestWebSocket.java @@ -60,6 +60,13 @@ public @interface RestWebSocket { */ boolean anyuser() default false; + /** + * 接收客户端的分包(last=false)消息时是否自动合并包 + * + * @return 默认true + */ + boolean mergemsg() default true; + /** * WebScoket服务器给客户端进行ping操作的间隔时间, 单位: 秒, 默认值:15秒 * diff --git a/src/org/redkale/net/http/WebSocketEngine.java b/src/org/redkale/net/http/WebSocketEngine.java index 99c57ddb1..948c8c9e7 100644 --- a/src/org/redkale/net/http/WebSocketEngine.java +++ b/src/org/redkale/net/http/WebSocketEngine.java @@ -76,11 +76,14 @@ public class WebSocketEngine { @Comment("最大消息体长度, 小于1表示无限制") protected int wsmaxbody; + @Comment("接收客户端的分包(last=false)消息时是否自动合并包") + protected boolean mergemsg = true; + @Comment("加密解密器") protected Cryptor cryptor; - protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveinterval, - int wsmaxconns, int wsthreads, int wsmaxbody, Cryptor cryptor, WebSocketNode node, Convert sendConvert, Logger logger) { + protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveinterval, int wsmaxconns, + int wsthreads, int wsmaxbody, boolean mergemsg, Cryptor cryptor, WebSocketNode node, Convert sendConvert, Logger logger) { this.engineid = engineid; this.single = single; this.context = context; @@ -90,6 +93,7 @@ public class WebSocketEngine { this.wsmaxconns = wsmaxconns; this.wsthreads = wsthreads; this.wsmaxbody = wsmaxbody; + this.mergemsg = mergemsg; this.cryptor = cryptor; this.logger = logger; this.index = sequence.getAndIncrement(); diff --git a/src/org/redkale/net/http/WebSocketPacket.java b/src/org/redkale/net/http/WebSocketPacket.java index 62e9ef497..75eb7d162 100644 --- a/src/org/redkale/net/http/WebSocketPacket.java +++ b/src/org/redkale/net/http/WebSocketPacket.java @@ -14,6 +14,7 @@ import java.util.function.*; import java.util.logging.*; import org.redkale.convert.*; import org.redkale.net.Cryptor; +import org.redkale.util.*; /** * @@ -24,6 +25,8 @@ import org.redkale.net.Cryptor; */ public final class WebSocketPacket { + public static final Object MESSAGE_NIL = new Object(); + static final WebSocketPacket NONE = new WebSocketPacket(); public static final WebSocketPacket DEFAULT_PING_PACKET = new WebSocketPacket(FrameType.PING, new byte[0]); @@ -34,7 +37,7 @@ public final class WebSocketPacket { public static enum FrameType { - TEXT(0x01), BINARY(0x02), CLOSE(0x08), PING(0x09), PONG(0x0A); + SERIES(0x00), TEXT(0x01), BINARY(0x02), CLOSE(0x08), PING(0x09), PONG(0x0A); private final int value; @@ -48,6 +51,7 @@ public final class WebSocketPacket { public static FrameType valueOf(int v) { switch (v) { + case 0x00: return SERIES; case 0x01: return TEXT; case 0x02: return BINARY; case 0x08: return CLOSE; @@ -344,14 +348,16 @@ public final class WebSocketPacket { * * @return boolean 已接收完返回true, 需要继续接收body返回false; */ - boolean receiveBody(WebSocket webSocket, ByteBuffer readBuffer) { + boolean receiveBody(final Logger logger, WebSocketRunner runner, WebSocket webSocket, ByteBuffer readBuffer) { + final boolean debug = false; //调试开关 int need = receiveLength - receiveCount; final int remain = readBuffer.remaining(); boolean over = remain >= need; this.receiveBuffers = Utility.append(this.receiveBuffers, readBuffer); + if (debug) logger.finest("receiveBody: receiveLength=" + receiveLength + ", this.receiveCount=" + this.receiveCount + ", readBuffer=" + remain); if (over) { this.receiveCount = this.receiveLength; - parseReceiveMessage(webSocket, this.receiveBuffers); + parseReceiveMessage(logger, runner, webSocket, this.receiveBuffers); } else { this.receiveCount += remain; } @@ -385,7 +391,7 @@ public final class WebSocketPacket { * * @return 返回NONE表示Buffer内容不够; 返回this表示解析完成或部分解析完成;返回null表示解析异常; */ - WebSocketPacket decode(final Logger logger, final WebSocket webSocket, final int wsmaxbody, + WebSocketPacket decode(final Logger logger, final WebSocketRunner runner, final WebSocket webSocket, final int wsmaxbody, final AbstractMap.SimpleEntry halfBytes, final ByteBuffer buffer) { //开始 final boolean debug = false; //调试开关 @@ -400,6 +406,7 @@ public final class WebSocketPacket { final byte opcode = buffer.get(); //第一个字节 this.last = (opcode & 0b1000_0000) != 0; this.type = FrameType.valueOf(opcode & 0xF); + if (type == FrameType.CLOSE) { if (debug) logger.log(Level.FINEST, " receive close command from websocket client"); } @@ -455,6 +462,7 @@ public final class WebSocketPacket { return null; } this.receiveLength = length; + if (debug) logger.finest("this.receiveLength: " + length + ", code=" + lengthCode + ", last=" + last); if (masked) { final byte[] masks = new byte[4]; buffer.get(masks); @@ -469,7 +477,7 @@ public final class WebSocketPacket { }; } if (buffer.remaining() >= this.receiveLength) { //内容足够, 可以解析 - this.parseReceiveMessage(webSocket, buffer); + this.parseReceiveMessage(logger, runner, webSocket, buffer); this.receiveCount = this.receiveLength; } else { this.receiveCount = buffer.remaining(); @@ -478,38 +486,77 @@ public final class WebSocketPacket { return this; } - void parseReceiveMessage(WebSocket webSocket, ByteBuffer... buffers) { + void parseReceiveMessage(final Logger logger, WebSocketRunner runner, WebSocket webSocket, ByteBuffer... buffers) { if (webSocket._engine.cryptor != null) { HttpContext context = webSocket._engine.context; buffers = webSocket._engine.cryptor.decrypt(buffers, context.getBufferSupplier(), context.getBufferConsumer()); } - if (this.type == FrameType.TEXT) { + FrameType selfType = this.type; + if (selfType == FrameType.SERIES) selfType = runner.tmpMergeFrameType; + + if (selfType == FrameType.TEXT) { Convert textConvert = webSocket.getTextConvert(); if (textConvert == null) { this.receiveMessage = new String(this.getReceiveBytes(buffers), StandardCharsets.UTF_8); this.receiveType = MessageType.STRING; } else { - this.receiveMessage = textConvert.convertFrom(webSocket._messageTextType, this.receiveMasker, buffers); + if (this.last) { + if (runner.tmpMergeMessage == null) { + this.receiveMessage = textConvert.convertFrom(webSocket._messageTextType, this.receiveMasker, buffers); + } else { + runner.tmpMergeMessage.write(this.getReceiveBytes(buffers)); + try { + this.type = selfType; + this.receiveMessage = textConvert.convertFrom(webSocket._messageTextType, runner.tmpMergeMessage.getBytes()); + } finally { + runner.tmpMergeFrameType = null; + runner.tmpMergeMessage = null; + } + } + } else { + runner.tmpMergeFrameType = selfType; + if (runner.tmpMergeMessage == null) runner.tmpMergeMessage = new ByteArray(); + runner.tmpMergeMessage.write(this.getReceiveBytes(buffers)); + this.receiveMessage = MESSAGE_NIL; + } this.receiveCount = this.receiveLength; this.receiveType = MessageType.OBJECT; } - } else if (this.type == FrameType.BINARY) { + } else if (selfType == FrameType.BINARY) { Convert binaryConvert = webSocket.getBinaryConvert(); if (binaryConvert == null) { this.receiveMessage = this.getReceiveBytes(buffers); this.receiveType = MessageType.BYTES; } else { - this.receiveMessage = binaryConvert.convertFrom(webSocket._messageTextType, this.receiveMasker, buffers); + if (this.last) { + if (runner.tmpMergeMessage == null) { + this.receiveMessage = binaryConvert.convertFrom(webSocket._messageTextType, this.receiveMasker, buffers); + } else { + runner.tmpMergeMessage.write(this.getReceiveBytes(buffers)); + try { + this.type = selfType; + this.receiveMessage = binaryConvert.convertFrom(webSocket._messageTextType, runner.tmpMergeMessage.getBytes()); + } finally { + runner.tmpMergeFrameType = null; + runner.tmpMergeMessage = null; + } + } + } else { + runner.tmpMergeFrameType = selfType; + if (runner.tmpMergeMessage == null) runner.tmpMergeMessage = new ByteArray(); + runner.tmpMergeMessage.write(this.getReceiveBytes(buffers)); + this.receiveMessage = MESSAGE_NIL; + } this.receiveCount = this.receiveLength; this.receiveType = MessageType.OBJECT; } - } else if (this.type == FrameType.PING) { + } else if (selfType == FrameType.PING) { this.receiveMessage = this.getReceiveBytes(buffers); this.receiveType = MessageType.BYTES; - } else if (this.type == FrameType.PONG) { + } else if (selfType == FrameType.PONG) { this.receiveMessage = this.getReceiveBytes(buffers); this.receiveType = MessageType.BYTES; - } else if (this.type == FrameType.CLOSE) { + } else if (selfType == FrameType.CLOSE) { this.receiveMessage = this.getReceiveBytes(buffers); this.receiveType = MessageType.BYTES; } diff --git a/src/org/redkale/net/http/WebSocketRunner.java b/src/org/redkale/net/http/WebSocketRunner.java index ba757b50d..99903efb1 100644 --- a/src/org/redkale/net/http/WebSocketRunner.java +++ b/src/org/redkale/net/http/WebSocketRunner.java @@ -15,6 +15,7 @@ import java.util.AbstractMap.SimpleEntry; import java.util.concurrent.*; import java.util.function.BiConsumer; import java.util.logging.*; +import org.redkale.util.ByteArray; /** * WebSocket的消息接收发送器, 一个WebSocket对应一个WebSocketRunner @@ -34,8 +35,14 @@ class WebSocketRunner implements Runnable { protected final HttpContext context; + protected final boolean mergemsg; + volatile boolean closed = false; + FrameType tmpMergeFrameType; + + ByteArray tmpMergeMessage; + private final BiConsumer restMessageConsumer; //主要供RestWebSocket使用 protected long lastSendTime; @@ -46,6 +53,7 @@ class WebSocketRunner implements Runnable { this.context = context; this.engine = webSocket._engine; this.webSocket = webSocket; + this.mergemsg = webSocket._engine.mergemsg; this.restMessageConsumer = messageConsumer; this.channel = channel; } @@ -53,6 +61,7 @@ class WebSocketRunner implements Runnable { @Override public void run() { final boolean debug = context.getLogger().isLoggable(Level.FINEST); + final WebSocketRunner self = this; try { webSocket.onConnected(); channel.setReadTimeoutSeconds(300); //读取超时5分钟 @@ -81,7 +90,7 @@ class WebSocketRunner implements Runnable { WebSocketPacket onePacket = null; if (unfinishPacket != null) { - if (unfinishPacket.receiveBody(webSocket, readBuffer)) { //已经接收完毕 + if (unfinishPacket.receiveBody(context.getLogger(), self, webSocket, readBuffer)) { //已经接收完毕 onePacket = unfinishPacket; unfinishPacket = null; for (ByteBuffer b : exBuffers) { @@ -98,7 +107,7 @@ class WebSocketRunner implements Runnable { if (onePacket != null) packets.add(onePacket); try { while (true) { - WebSocketPacket packet = new WebSocketPacket().decode(context.getLogger(), webSocket, wsmaxbody, halfBytes, readBuffer); + WebSocketPacket packet = new WebSocketPacket().decode(context.getLogger(), self, webSocket, wsmaxbody, halfBytes, readBuffer); if (packet == WebSocketPacket.NONE) break; //解析完毕但是buffer有多余字节 if (packet != null && !packet.isReceiveFinished()) { unfinishPacket = packet; @@ -134,6 +143,7 @@ class WebSocketRunner implements Runnable { failed(null, readBuffer); return; } + if (packet.receiveMessage == WebSocketPacket.MESSAGE_NIL) continue; //last=false && mergemsg=true 的粘包 if (packet.type == FrameType.TEXT) { try { diff --git a/src/org/redkale/net/http/WebSocketServlet.java b/src/org/redkale/net/http/WebSocketServlet.java index 6da8ec642..a44d89965 100644 --- a/src/org/redkale/net/http/WebSocketServlet.java +++ b/src/org/redkale/net/http/WebSocketServlet.java @@ -56,6 +56,9 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl @Comment("最大消息体长度, 小于1表示无限制") public static final String WEBPARAM__WSMAXBODY = "wsmaxbody"; + @Comment("接收客户端的分包(last=false)消息时是否自动合并包") + public static final String WEBPARAM__WSMERGEMSG = "wsmergemsg"; + @Comment("加密解密器") public static final String WEBPARAM__CRYPTOR = "cryptor"; @@ -88,6 +91,9 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl //同RestWebSocket.anyuser protected boolean anyuser = false; + //同RestWebSocket.mergemsg + protected boolean mergemsg = true; + //同RestWebSocket.cryptor, 变量名不可改, 被Rest.createRestWebSocketServlet用到 protected Cryptor cryptor; @@ -157,7 +163,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl } //存在WebSocketServlet,则此WebSocketNode必须是本地模式Service this.node.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", - this.single, context, liveinterval, wsmaxconns, wsthreads, wsmaxbody, this.cryptor, this.node, this.sendConvert, logger); + this.single, context, liveinterval, wsmaxconns, wsthreads, wsmaxbody, mergemsg, this.cryptor, this.node, this.sendConvert, logger); this.node.init(conf); this.node.localEngine.init(conf);