This commit is contained in:
Redkale
2020-04-17 11:08:30 +08:00
parent a88285d935
commit 758972890e
2 changed files with 38 additions and 27 deletions

View File

@@ -392,7 +392,7 @@ public final class WebSocketPacket {
WebSocketPacket decodePacket(final Logger logger, final WebSocketRunner runner, final WebSocket webSocket, final int wsmaxbody, WebSocketPacket decodePacket(final Logger logger, final WebSocketRunner runner, final WebSocket webSocket, final int wsmaxbody,
final AbstractMap.SimpleEntry<String, byte[]> halfBytes, final ByteBuffer buffer) { final AbstractMap.SimpleEntry<String, byte[]> halfBytes, final ByteBuffer buffer) {
//开始 //开始
final boolean debug = false; //调试开关 final boolean debug = true; //调试开关
if (debug) logger.log(Level.FINEST, "read websocket message's length = " + buffer.remaining()); if (debug) logger.log(Level.FINEST, "read websocket message's length = " + buffer.remaining());
if (!buffer.hasRemaining()) return NONE; if (!buffer.hasRemaining()) return NONE;
if (buffer.remaining() < 2) { if (buffer.remaining() < 2) {
@@ -402,8 +402,8 @@ public final class WebSocketPacket {
return NONE; return NONE;
} }
final byte opcode = buffer.get(); //第一个字节 final byte opcode = buffer.get(); //第一个字节
this.last = (opcode & 0b1000_0000) != 0; this.last = (opcode & 0B1000_0000) != 0;
this.type = FrameType.valueOf(opcode & 0xF); this.type = FrameType.valueOf(opcode & 0B0000_1111);
//0x00 表示一个后续帧 //0x00 表示一个后续帧
//0x01 表示一个文本帧 //0x01 表示一个文本帧
@@ -413,17 +413,17 @@ public final class WebSocketPacket {
//0x9 表示一个ping //0x9 表示一个ping
//0xA 表示一个pong //0xA 表示一个pong
//0x0B-0F 为以后的控制帧保留 //0x0B-0F 为以后的控制帧保留
final boolean control = (opcode & 0b0000_1000) != 0; //是否控制帧 final boolean control = (opcode & 0B0000_1000) != 0; //是否控制帧
this.receiveCompress = !control && webSocket.deflater != null && (opcode & 0b0100_0000) != 0; //rsv1 为 1 this.receiveCompress = !control && webSocket.inflater != null && (opcode & 0B0100_0000) != 0; //rsv1 为 1
if (type == FrameType.CLOSE) { if (type == FrameType.CLOSE) {
if (debug) logger.log(Level.FINEST, " receive close command from websocket client"); if (debug) logger.log(Level.FINEST, " receive close command from websocket client");
} }
if (type == null) { if (type == null) {
logger.log(Level.SEVERE, " receive unknown frametype(opcode=" + (opcode & 0xF) + ") from websocket client"); logger.log(Level.SEVERE, " receive unknown frametype(opcode=" + (opcode & 0B0000_1111) + ") from websocket client");
} }
final boolean checkrsv = false;//暂时不校验 final boolean checkrsv = false;//暂时不校验
if (checkrsv && (opcode & 0b0111_0000) != 0) { if (checkrsv && (opcode & 0B0111_0000) != 0) {
if (debug) logger.log(Level.FINE, "rsv1 rsv2 rsv3 must be 0, but not (" + opcode + ")"); if (debug) logger.log(Level.FINE, "rsv1 rsv2 rsv3 must be 0, but not (" + opcode + ")");
return null; //rsv1 rsv2 rsv3 must be 0 return null; //rsv1 rsv2 rsv3 must be 0
} }
@@ -432,6 +432,7 @@ public final class WebSocketPacket {
byte lengthCode = crcode; byte lengthCode = crcode;
final boolean masked = (lengthCode & 0x80) == 0x80; final boolean masked = (lengthCode & 0x80) == 0x80;
if (masked) lengthCode ^= 0x80; //mask if (masked) lengthCode ^= 0x80; //mask
if (debug) logger.log(Level.FINEST, " receive type=" + type + ", control=" + control + ", opcode=" + opcode + ", masked=" + masked + ", remaining=" + buffer.remaining());
//判断Buffer剩余内容够不够基本信息的创建 //判断Buffer剩余内容够不够基本信息的创建
int minBufferLength = ((lengthCode <= 0x7D) ? 0 : (lengthCode == 0x7E ? 2 : 4)) + (masked ? 4 : 0); int minBufferLength = ((lengthCode <= 0x7D) ? 0 : (lengthCode == 0x7E ? 2 : 4)) + (masked ? 4 : 0);
@@ -441,6 +442,7 @@ public final class WebSocketPacket {
bs[1] = crcode; bs[1] = crcode;
buffer.get(bs, 2, buffer.remaining()); buffer.get(bs, 2, buffer.remaining());
halfBytes.setValue(bs); halfBytes.setValue(bs);
if (debug) logger.log(Level.FINEST, " receive not enough bufferlength =" + bs.length);
return NONE; return NONE;
} }
@@ -506,15 +508,18 @@ public final class WebSocketPacket {
if (selfType == FrameType.TEXT) { if (selfType == FrameType.TEXT) {
Convert textConvert = webSocket.getTextConvert(); Convert textConvert = webSocket.getTextConvert();
if (textConvert == null || (!runner.mergemsg && (series || !this.last))) { if (textConvert == null || (!runner.mergemsg && (series || !this.last))) {
this.receiveMessage = new String(this.getReceiveBytes(webSocket, buffers), StandardCharsets.UTF_8); this.receiveMessage = new String(this.getReceiveBytes(webSocket, selfType, buffers), StandardCharsets.UTF_8);
this.receiveType = MessageType.STRING; this.receiveType = MessageType.STRING;
} else { } else {
if (this.last || !runner.mergemsg) { if (this.last || !runner.mergemsg) {
if (runner.currSeriesMergeMessage == null && !this.receiveCompress) { if (runner.currSeriesMergeMessage == null) {
this.receiveMessage = textConvert.convertFrom(webSocket._messageTextType, this.receiveMasker, buffers); if (this.receiveCompress) {
this.receiveMessage = textConvert.convertFrom(webSocket._messageTextType, this.getReceiveBytes(webSocket, selfType, buffers));
} else {
this.receiveMessage = textConvert.convertFrom(webSocket._messageTextType, this.receiveMasker, buffers);
}
} else { } else {
if (runner.currSeriesMergeMessage == null) runner.currSeriesMergeMessage = new ByteArray(); runner.currSeriesMergeMessage.write(this.getReceiveBytes(webSocket, selfType, buffers));
runner.currSeriesMergeMessage.write(this.getReceiveBytes(webSocket, buffers));
try { try {
this.receiveMessage = textConvert.convertFrom(webSocket._messageTextType, runner.currSeriesMergeMessage.getBytes()); this.receiveMessage = textConvert.convertFrom(webSocket._messageTextType, runner.currSeriesMergeMessage.getBytes());
} finally { } finally {
@@ -523,7 +528,7 @@ public final class WebSocketPacket {
} }
} else { } else {
if (runner.currSeriesMergeMessage == null) runner.currSeriesMergeMessage = new ByteArray(); if (runner.currSeriesMergeMessage == null) runner.currSeriesMergeMessage = new ByteArray();
runner.currSeriesMergeMessage.write(this.getReceiveBytes(webSocket, buffers)); runner.currSeriesMergeMessage.write(this.getReceiveBytes(webSocket, selfType, buffers));
this.receiveMessage = MESSAGE_NIL; this.receiveMessage = MESSAGE_NIL;
} }
this.receiveCount = this.receiveLength; this.receiveCount = this.receiveLength;
@@ -532,15 +537,18 @@ public final class WebSocketPacket {
} else if (selfType == FrameType.BINARY) { } else if (selfType == FrameType.BINARY) {
Convert binaryConvert = webSocket.getBinaryConvert(); Convert binaryConvert = webSocket.getBinaryConvert();
if (binaryConvert == null || (!runner.mergemsg && (series || !this.last))) { if (binaryConvert == null || (!runner.mergemsg && (series || !this.last))) {
this.receiveMessage = this.getReceiveBytes(webSocket, buffers); this.receiveMessage = this.getReceiveBytes(webSocket, selfType, buffers);
this.receiveType = MessageType.BYTES; this.receiveType = MessageType.BYTES;
} else { } else {
if (this.last || !runner.mergemsg) { if (this.last || !runner.mergemsg) {
if (runner.currSeriesMergeMessage == null && !this.receiveCompress) { if (runner.currSeriesMergeMessage == null) {
this.receiveMessage = binaryConvert.convertFrom(webSocket._messageTextType, this.receiveMasker, buffers); if (this.receiveCompress) {
this.receiveMessage = binaryConvert.convertFrom(webSocket._messageTextType, this.getReceiveBytes(webSocket, selfType, buffers));
} else {
this.receiveMessage = binaryConvert.convertFrom(webSocket._messageTextType, this.receiveMasker, buffers);
}
} else { } else {
if (runner.currSeriesMergeMessage == null) runner.currSeriesMergeMessage = new ByteArray(); runner.currSeriesMergeMessage.write(this.getReceiveBytes(webSocket, selfType, buffers));
runner.currSeriesMergeMessage.write(this.getReceiveBytes(webSocket, buffers));
try { try {
this.receiveMessage = binaryConvert.convertFrom(webSocket._messageTextType, runner.currSeriesMergeMessage.getBytes()); this.receiveMessage = binaryConvert.convertFrom(webSocket._messageTextType, runner.currSeriesMergeMessage.getBytes());
} finally { } finally {
@@ -549,20 +557,20 @@ public final class WebSocketPacket {
} }
} else { } else {
if (runner.currSeriesMergeMessage == null) runner.currSeriesMergeMessage = new ByteArray(); if (runner.currSeriesMergeMessage == null) runner.currSeriesMergeMessage = new ByteArray();
runner.currSeriesMergeMessage.write(this.getReceiveBytes(webSocket, buffers)); runner.currSeriesMergeMessage.write(this.getReceiveBytes(webSocket, selfType, buffers));
this.receiveMessage = MESSAGE_NIL; this.receiveMessage = MESSAGE_NIL;
} }
this.receiveCount = this.receiveLength; this.receiveCount = this.receiveLength;
this.receiveType = MessageType.OBJECT; this.receiveType = MessageType.OBJECT;
} }
} else if (selfType == FrameType.PING) { } else if (selfType == FrameType.PING) {
this.receiveMessage = this.getReceiveBytes(webSocket, buffers); this.receiveMessage = this.getReceiveBytes(webSocket, selfType, buffers);
this.receiveType = MessageType.BYTES; this.receiveType = MessageType.BYTES;
} else if (selfType == FrameType.PONG) { } else if (selfType == FrameType.PONG) {
this.receiveMessage = this.getReceiveBytes(webSocket, buffers); this.receiveMessage = this.getReceiveBytes(webSocket, selfType, buffers);
this.receiveType = MessageType.BYTES; this.receiveType = MessageType.BYTES;
} else if (selfType == FrameType.CLOSE) { } else if (selfType == FrameType.CLOSE) {
this.receiveMessage = this.getReceiveBytes(webSocket, buffers); this.receiveMessage = this.getReceiveBytes(webSocket, selfType, buffers);
this.receiveType = MessageType.BYTES; this.receiveType = MessageType.BYTES;
} }
} }
@@ -571,7 +579,7 @@ public final class WebSocketPacket {
return this.receiveLength <= this.receiveCount; return this.receiveLength <= this.receiveCount;
} }
byte[] getReceiveBytes(WebSocket webSocket, ByteBuffer... buffers) { byte[] getReceiveBytes(WebSocket webSocket, FrameType frameType, ByteBuffer... buffers) {
final int length = this.receiveLength; final int length = this.receiveLength;
if (length == 0) return new byte[0]; if (length == 0) return new byte[0];
byte[] bs = new byte[length]; byte[] bs = new byte[length];
@@ -589,9 +597,9 @@ public final class WebSocketPacket {
bs[i] = mask.unmask(bs[i]); bs[i] = mask.unmask(bs[i]);
} }
} }
if (bs.length > 6 && this.receiveCompress && (frameType == FrameType.BINARY || frameType == FrameType.TEXT || frameType == FrameType.SERIES)) {
if (this.receiveCompress) { Inflater inflater = webSocket.inflater;
Inflater inflater = new Inflater(true); inflater.reset();
ByteArrayOutputStream baos = new ByteArrayOutputStream(bs.length); ByteArrayOutputStream baos = new ByteArrayOutputStream(bs.length);
inflater.setInput(Utility.append(bs, EOM_BYTES)); inflater.setInput(Utility.append(bs, EOM_BYTES));
byte[] buff = new byte[1024]; byte[] buff = new byte[1024];
@@ -603,6 +611,7 @@ public final class WebSocketPacket {
} }
return baos.toByteArray(); return baos.toByteArray();
} catch (Exception ex) { } catch (Exception ex) {
ex.printStackTrace();
return bs; return bs;
} }
} }

View File

@@ -98,6 +98,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
//同RestWebSocket.cryptor, 变量名不可改, 被Rest.createRestWebSocketServlet用到 //同RestWebSocket.cryptor, 变量名不可改, 被Rest.createRestWebSocketServlet用到
protected Cryptor cryptor; protected Cryptor cryptor;
protected boolean permessageDeflate = false;
@Resource(name = "jsonconvert") @Resource(name = "jsonconvert")
protected Convert jsonConvert; protected Convert jsonConvert;
@@ -211,7 +213,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
webSocket._remoteAddress = request.getRemoteAddress(); webSocket._remoteAddress = request.getRemoteAddress();
webSocket._remoteAddr = request.getRemoteAddr(); webSocket._remoteAddr = request.getRemoteAddr();
webSocket._sncpAddress = this.node.localSncpAddress; webSocket._sncpAddress = this.node.localSncpAddress;
if (request.getHeader("Sec-WebSocket-Extensions", "").contains("permessage-deflate")) { if (this.permessageDeflate && request.getHeader("Sec-WebSocket-Extensions", "").contains("permessage-deflate")) {
webSocket.deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, true); webSocket.deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, true);
webSocket.inflater = new Inflater(true); webSocket.inflater = new Inflater(true);
} }