WebSocket增加mergemsg属性功能

This commit is contained in:
Redkale
2019-03-14 15:27:05 +08:00
parent 2a19ea709b
commit e90f2e4142
8 changed files with 103 additions and 18 deletions

View File

@@ -33,6 +33,8 @@ public abstract class Convert<R extends Reader, W extends Writer> {
public abstract boolean isBinary(); public abstract boolean isBinary();
public abstract <T> T convertFrom(final Type type, final byte[] bytes);
public abstract <T> T convertFrom(final Type type, final ByteBuffer... buffers); public abstract <T> T convertFrom(final Type type, final ByteBuffer... buffers);
public abstract <T> T convertFrom(final Type type, final ConvertMask mask, final ByteBuffer... buffers); public abstract <T> T convertFrom(final Type type, final ConvertMask mask, final ByteBuffer... buffers);

View File

@@ -85,6 +85,11 @@ public final class JsonConvert extends TextConvert<JsonReader, JsonWriter> {
} }
//------------------------------ convertFrom ----------------------------------------------------------- //------------------------------ convertFrom -----------------------------------------------------------
public <T> T convertFrom(final Type type, final byte[] bytes) {
if (bytes == null) return null;
return convertFrom(type, new String(bytes, StandardCharsets.UTF_8));
}
public <T> T convertFrom(final Type type, final String text) { public <T> T convertFrom(final Type type, final String text) {
if (text == null) return null; if (text == null) return null;
return convertFrom(type, Utility.charArray(text)); return convertFrom(type, Utility.charArray(text));

View File

@@ -375,6 +375,10 @@ public final class Rest {
pushInt(mv, rws.wsmaxbody()); pushInt(mv, rws.wsmaxbody());
mv.visitFieldInsn(PUTFIELD, newDynName, "wsmaxbody", "I"); mv.visitFieldInsn(PUTFIELD, newDynName, "wsmaxbody", "I");
mv.visitVarInsn(ALOAD, 0);
mv.visitInsn(rws.mergemsg() ? ICONST_1 : ICONST_0);
mv.visitFieldInsn(PUTFIELD, newDynName, "mergemsg", "Z");
mv.visitVarInsn(ALOAD, 0); mv.visitVarInsn(ALOAD, 0);
mv.visitInsn(rws.single() ? ICONST_1 : ICONST_0); mv.visitInsn(rws.single() ? ICONST_1 : ICONST_0);
mv.visitFieldInsn(PUTFIELD, newDynName, "single", "Z"); mv.visitFieldInsn(PUTFIELD, newDynName, "single", "Z");

View File

@@ -60,6 +60,13 @@ public @interface RestWebSocket {
*/ */
boolean anyuser() default false; boolean anyuser() default false;
/**
* 接收客户端的分包(last=false)消息时是否自动合并包
*
* @return 默认true
*/
boolean mergemsg() default true;
/** /**
* WebScoket服务器给客户端进行ping操作的间隔时间, 单位: 秒, 默认值15秒 * WebScoket服务器给客户端进行ping操作的间隔时间, 单位: 秒, 默认值15秒
* *

View File

@@ -76,11 +76,14 @@ public class WebSocketEngine {
@Comment("最大消息体长度, 小于1表示无限制") @Comment("最大消息体长度, 小于1表示无限制")
protected int wsmaxbody; protected int wsmaxbody;
@Comment("接收客户端的分包(last=false)消息时是否自动合并包")
protected boolean mergemsg = true;
@Comment("加密解密器") @Comment("加密解密器")
protected Cryptor cryptor; protected Cryptor cryptor;
protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveinterval, protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveinterval, int wsmaxconns,
int wsmaxconns, int wsthreads, int wsmaxbody, Cryptor cryptor, WebSocketNode node, Convert sendConvert, Logger logger) { int wsthreads, int wsmaxbody, boolean mergemsg, Cryptor cryptor, WebSocketNode node, Convert sendConvert, Logger logger) {
this.engineid = engineid; this.engineid = engineid;
this.single = single; this.single = single;
this.context = context; this.context = context;
@@ -90,6 +93,7 @@ public class WebSocketEngine {
this.wsmaxconns = wsmaxconns; this.wsmaxconns = wsmaxconns;
this.wsthreads = wsthreads; this.wsthreads = wsthreads;
this.wsmaxbody = wsmaxbody; this.wsmaxbody = wsmaxbody;
this.mergemsg = mergemsg;
this.cryptor = cryptor; this.cryptor = cryptor;
this.logger = logger; this.logger = logger;
this.index = sequence.getAndIncrement(); this.index = sequence.getAndIncrement();

View File

@@ -14,6 +14,7 @@ import java.util.function.*;
import java.util.logging.*; import java.util.logging.*;
import org.redkale.convert.*; import org.redkale.convert.*;
import org.redkale.net.Cryptor; import org.redkale.net.Cryptor;
import org.redkale.util.*;
/** /**
* *
@@ -24,6 +25,8 @@ import org.redkale.net.Cryptor;
*/ */
public final class WebSocketPacket { public final class WebSocketPacket {
public static final Object MESSAGE_NIL = new Object();
static final WebSocketPacket NONE = new WebSocketPacket(); static final WebSocketPacket NONE = new WebSocketPacket();
public static final WebSocketPacket DEFAULT_PING_PACKET = new WebSocketPacket(FrameType.PING, new byte[0]); public static final WebSocketPacket DEFAULT_PING_PACKET = new WebSocketPacket(FrameType.PING, new byte[0]);
@@ -34,7 +37,7 @@ public final class WebSocketPacket {
public static enum FrameType { public static enum FrameType {
TEXT(0x01), BINARY(0x02), CLOSE(0x08), PING(0x09), PONG(0x0A); SERIES(0x00), TEXT(0x01), BINARY(0x02), CLOSE(0x08), PING(0x09), PONG(0x0A);
private final int value; private final int value;
@@ -48,6 +51,7 @@ public final class WebSocketPacket {
public static FrameType valueOf(int v) { public static FrameType valueOf(int v) {
switch (v) { switch (v) {
case 0x00: return SERIES;
case 0x01: return TEXT; case 0x01: return TEXT;
case 0x02: return BINARY; case 0x02: return BINARY;
case 0x08: return CLOSE; case 0x08: return CLOSE;
@@ -344,14 +348,16 @@ public final class WebSocketPacket {
* *
* @return boolean 已接收完返回true 需要继续接收body返回false * @return boolean 已接收完返回true 需要继续接收body返回false
*/ */
boolean receiveBody(WebSocket webSocket, ByteBuffer readBuffer) { boolean receiveBody(final Logger logger, WebSocketRunner runner, WebSocket webSocket, ByteBuffer readBuffer) {
final boolean debug = false; //调试开关
int need = receiveLength - receiveCount; int need = receiveLength - receiveCount;
final int remain = readBuffer.remaining(); final int remain = readBuffer.remaining();
boolean over = remain >= need; boolean over = remain >= need;
this.receiveBuffers = Utility.append(this.receiveBuffers, readBuffer); this.receiveBuffers = Utility.append(this.receiveBuffers, readBuffer);
if (debug) logger.finest("receiveBody: receiveLength=" + receiveLength + ", this.receiveCount=" + this.receiveCount + ", readBuffer=" + remain);
if (over) { if (over) {
this.receiveCount = this.receiveLength; this.receiveCount = this.receiveLength;
parseReceiveMessage(webSocket, this.receiveBuffers); parseReceiveMessage(logger, runner, webSocket, this.receiveBuffers);
} else { } else {
this.receiveCount += remain; this.receiveCount += remain;
} }
@@ -385,7 +391,7 @@ public final class WebSocketPacket {
* *
* @return 返回NONE表示Buffer内容不够 返回this表示解析完成或部分解析完成返回null表示解析异常 * @return 返回NONE表示Buffer内容不够 返回this表示解析完成或部分解析完成返回null表示解析异常
*/ */
WebSocketPacket decode(final Logger logger, final WebSocket webSocket, final int wsmaxbody, WebSocketPacket decode(final Logger logger, final WebSocketRunner runner, final WebSocket webSocket, final int wsmaxbody,
final AbstractMap.SimpleEntry<String, byte[]> halfBytes, final ByteBuffer buffer) { final AbstractMap.SimpleEntry<String, byte[]> halfBytes, final ByteBuffer buffer) {
//开始 //开始
final boolean debug = false; //调试开关 final boolean debug = false; //调试开关
@@ -400,6 +406,7 @@ public final class WebSocketPacket {
final byte opcode = buffer.get(); //第一个字节 final byte opcode = buffer.get(); //第一个字节
this.last = (opcode & 0b1000_0000) != 0; this.last = (opcode & 0b1000_0000) != 0;
this.type = FrameType.valueOf(opcode & 0xF); this.type = FrameType.valueOf(opcode & 0xF);
if (type == FrameType.CLOSE) { if (type == FrameType.CLOSE) {
if (debug) logger.log(Level.FINEST, " receive close command from websocket client"); if (debug) logger.log(Level.FINEST, " receive close command from websocket client");
} }
@@ -455,6 +462,7 @@ public final class WebSocketPacket {
return null; return null;
} }
this.receiveLength = length; this.receiveLength = length;
if (debug) logger.finest("this.receiveLength: " + length + ", code=" + lengthCode + ", last=" + last);
if (masked) { if (masked) {
final byte[] masks = new byte[4]; final byte[] masks = new byte[4];
buffer.get(masks); buffer.get(masks);
@@ -469,7 +477,7 @@ public final class WebSocketPacket {
}; };
} }
if (buffer.remaining() >= this.receiveLength) { //内容足够, 可以解析 if (buffer.remaining() >= this.receiveLength) { //内容足够, 可以解析
this.parseReceiveMessage(webSocket, buffer); this.parseReceiveMessage(logger, runner, webSocket, buffer);
this.receiveCount = this.receiveLength; this.receiveCount = this.receiveLength;
} else { } else {
this.receiveCount = buffer.remaining(); this.receiveCount = buffer.remaining();
@@ -478,38 +486,77 @@ public final class WebSocketPacket {
return this; return this;
} }
void parseReceiveMessage(WebSocket webSocket, ByteBuffer... buffers) { void parseReceiveMessage(final Logger logger, WebSocketRunner runner, WebSocket webSocket, ByteBuffer... buffers) {
if (webSocket._engine.cryptor != null) { if (webSocket._engine.cryptor != null) {
HttpContext context = webSocket._engine.context; HttpContext context = webSocket._engine.context;
buffers = webSocket._engine.cryptor.decrypt(buffers, context.getBufferSupplier(), context.getBufferConsumer()); buffers = webSocket._engine.cryptor.decrypt(buffers, context.getBufferSupplier(), context.getBufferConsumer());
} }
if (this.type == FrameType.TEXT) { FrameType selfType = this.type;
if (selfType == FrameType.SERIES) selfType = runner.tmpMergeFrameType;
if (selfType == FrameType.TEXT) {
Convert textConvert = webSocket.getTextConvert(); Convert textConvert = webSocket.getTextConvert();
if (textConvert == null) { if (textConvert == null) {
this.receiveMessage = new String(this.getReceiveBytes(buffers), StandardCharsets.UTF_8); this.receiveMessage = new String(this.getReceiveBytes(buffers), StandardCharsets.UTF_8);
this.receiveType = MessageType.STRING; this.receiveType = MessageType.STRING;
} else { } else {
this.receiveMessage = textConvert.convertFrom(webSocket._messageTextType, this.receiveMasker, buffers); if (this.last) {
if (runner.tmpMergeMessage == null) {
this.receiveMessage = textConvert.convertFrom(webSocket._messageTextType, this.receiveMasker, buffers);
} else {
runner.tmpMergeMessage.write(this.getReceiveBytes(buffers));
try {
this.type = selfType;
this.receiveMessage = textConvert.convertFrom(webSocket._messageTextType, runner.tmpMergeMessage.getBytes());
} finally {
runner.tmpMergeFrameType = null;
runner.tmpMergeMessage = null;
}
}
} else {
runner.tmpMergeFrameType = selfType;
if (runner.tmpMergeMessage == null) runner.tmpMergeMessage = new ByteArray();
runner.tmpMergeMessage.write(this.getReceiveBytes(buffers));
this.receiveMessage = MESSAGE_NIL;
}
this.receiveCount = this.receiveLength; this.receiveCount = this.receiveLength;
this.receiveType = MessageType.OBJECT; this.receiveType = MessageType.OBJECT;
} }
} else if (this.type == FrameType.BINARY) { } else if (selfType == FrameType.BINARY) {
Convert binaryConvert = webSocket.getBinaryConvert(); Convert binaryConvert = webSocket.getBinaryConvert();
if (binaryConvert == null) { if (binaryConvert == null) {
this.receiveMessage = this.getReceiveBytes(buffers); this.receiveMessage = this.getReceiveBytes(buffers);
this.receiveType = MessageType.BYTES; this.receiveType = MessageType.BYTES;
} else { } else {
this.receiveMessage = binaryConvert.convertFrom(webSocket._messageTextType, this.receiveMasker, buffers); if (this.last) {
if (runner.tmpMergeMessage == null) {
this.receiveMessage = binaryConvert.convertFrom(webSocket._messageTextType, this.receiveMasker, buffers);
} else {
runner.tmpMergeMessage.write(this.getReceiveBytes(buffers));
try {
this.type = selfType;
this.receiveMessage = binaryConvert.convertFrom(webSocket._messageTextType, runner.tmpMergeMessage.getBytes());
} finally {
runner.tmpMergeFrameType = null;
runner.tmpMergeMessage = null;
}
}
} else {
runner.tmpMergeFrameType = selfType;
if (runner.tmpMergeMessage == null) runner.tmpMergeMessage = new ByteArray();
runner.tmpMergeMessage.write(this.getReceiveBytes(buffers));
this.receiveMessage = MESSAGE_NIL;
}
this.receiveCount = this.receiveLength; this.receiveCount = this.receiveLength;
this.receiveType = MessageType.OBJECT; this.receiveType = MessageType.OBJECT;
} }
} else if (this.type == FrameType.PING) { } else if (selfType == FrameType.PING) {
this.receiveMessage = this.getReceiveBytes(buffers); this.receiveMessage = this.getReceiveBytes(buffers);
this.receiveType = MessageType.BYTES; this.receiveType = MessageType.BYTES;
} else if (this.type == FrameType.PONG) { } else if (selfType == FrameType.PONG) {
this.receiveMessage = this.getReceiveBytes(buffers); this.receiveMessage = this.getReceiveBytes(buffers);
this.receiveType = MessageType.BYTES; this.receiveType = MessageType.BYTES;
} else if (this.type == FrameType.CLOSE) { } else if (selfType == FrameType.CLOSE) {
this.receiveMessage = this.getReceiveBytes(buffers); this.receiveMessage = this.getReceiveBytes(buffers);
this.receiveType = MessageType.BYTES; this.receiveType = MessageType.BYTES;
} }

View File

@@ -15,6 +15,7 @@ import java.util.AbstractMap.SimpleEntry;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.logging.*; import java.util.logging.*;
import org.redkale.util.ByteArray;
/** /**
* WebSocket的消息接收发送器, 一个WebSocket对应一个WebSocketRunner * WebSocket的消息接收发送器, 一个WebSocket对应一个WebSocketRunner
@@ -34,8 +35,14 @@ class WebSocketRunner implements Runnable {
protected final HttpContext context; protected final HttpContext context;
protected final boolean mergemsg;
volatile boolean closed = false; volatile boolean closed = false;
FrameType tmpMergeFrameType;
ByteArray tmpMergeMessage;
private final BiConsumer<WebSocket, Object> restMessageConsumer; //主要供RestWebSocket使用 private final BiConsumer<WebSocket, Object> restMessageConsumer; //主要供RestWebSocket使用
protected long lastSendTime; protected long lastSendTime;
@@ -46,6 +53,7 @@ class WebSocketRunner implements Runnable {
this.context = context; this.context = context;
this.engine = webSocket._engine; this.engine = webSocket._engine;
this.webSocket = webSocket; this.webSocket = webSocket;
this.mergemsg = webSocket._engine.mergemsg;
this.restMessageConsumer = messageConsumer; this.restMessageConsumer = messageConsumer;
this.channel = channel; this.channel = channel;
} }
@@ -53,6 +61,7 @@ class WebSocketRunner implements Runnable {
@Override @Override
public void run() { public void run() {
final boolean debug = context.getLogger().isLoggable(Level.FINEST); final boolean debug = context.getLogger().isLoggable(Level.FINEST);
final WebSocketRunner self = this;
try { try {
webSocket.onConnected(); webSocket.onConnected();
channel.setReadTimeoutSeconds(300); //读取超时5分钟 channel.setReadTimeoutSeconds(300); //读取超时5分钟
@@ -81,7 +90,7 @@ class WebSocketRunner implements Runnable {
WebSocketPacket onePacket = null; WebSocketPacket onePacket = null;
if (unfinishPacket != null) { if (unfinishPacket != null) {
if (unfinishPacket.receiveBody(webSocket, readBuffer)) { //已经接收完毕 if (unfinishPacket.receiveBody(context.getLogger(), self, webSocket, readBuffer)) { //已经接收完毕
onePacket = unfinishPacket; onePacket = unfinishPacket;
unfinishPacket = null; unfinishPacket = null;
for (ByteBuffer b : exBuffers) { for (ByteBuffer b : exBuffers) {
@@ -98,7 +107,7 @@ class WebSocketRunner implements Runnable {
if (onePacket != null) packets.add(onePacket); if (onePacket != null) packets.add(onePacket);
try { try {
while (true) { while (true) {
WebSocketPacket packet = new WebSocketPacket().decode(context.getLogger(), webSocket, wsmaxbody, halfBytes, readBuffer); WebSocketPacket packet = new WebSocketPacket().decode(context.getLogger(), self, webSocket, wsmaxbody, halfBytes, readBuffer);
if (packet == WebSocketPacket.NONE) break; //解析完毕但是buffer有多余字节 if (packet == WebSocketPacket.NONE) break; //解析完毕但是buffer有多余字节
if (packet != null && !packet.isReceiveFinished()) { if (packet != null && !packet.isReceiveFinished()) {
unfinishPacket = packet; unfinishPacket = packet;
@@ -134,6 +143,7 @@ class WebSocketRunner implements Runnable {
failed(null, readBuffer); failed(null, readBuffer);
return; return;
} }
if (packet.receiveMessage == WebSocketPacket.MESSAGE_NIL) continue; //last=false && mergemsg=true 的粘包
if (packet.type == FrameType.TEXT) { if (packet.type == FrameType.TEXT) {
try { try {

View File

@@ -56,6 +56,9 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
@Comment("最大消息体长度, 小于1表示无限制") @Comment("最大消息体长度, 小于1表示无限制")
public static final String WEBPARAM__WSMAXBODY = "wsmaxbody"; public static final String WEBPARAM__WSMAXBODY = "wsmaxbody";
@Comment("接收客户端的分包(last=false)消息时是否自动合并包")
public static final String WEBPARAM__WSMERGEMSG = "wsmergemsg";
@Comment("加密解密器") @Comment("加密解密器")
public static final String WEBPARAM__CRYPTOR = "cryptor"; public static final String WEBPARAM__CRYPTOR = "cryptor";
@@ -88,6 +91,9 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
//同RestWebSocket.anyuser //同RestWebSocket.anyuser
protected boolean anyuser = false; protected boolean anyuser = false;
//同RestWebSocket.mergemsg
protected boolean mergemsg = true;
//同RestWebSocket.cryptor, 变量名不可改, 被Rest.createRestWebSocketServlet用到 //同RestWebSocket.cryptor, 变量名不可改, 被Rest.createRestWebSocketServlet用到
protected Cryptor cryptor; protected Cryptor cryptor;
@@ -157,7 +163,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
} }
//存在WebSocketServlet则此WebSocketNode必须是本地模式Service //存在WebSocketServlet则此WebSocketNode必须是本地模式Service
this.node.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", this.node.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]",
this.single, context, liveinterval, wsmaxconns, wsthreads, wsmaxbody, this.cryptor, this.node, this.sendConvert, logger); this.single, context, liveinterval, wsmaxconns, wsthreads, wsmaxbody, mergemsg, this.cryptor, this.node, this.sendConvert, logger);
this.node.init(conf); this.node.init(conf);
this.node.localEngine.init(conf); this.node.localEngine.init(conf);