diff --git a/src/org/redkale/net/http/WebSocketPacket.java b/src/org/redkale/net/http/WebSocketPacket.java index 12f9426df..5a677e9ca 100644 --- a/src/org/redkale/net/http/WebSocketPacket.java +++ b/src/org/redkale/net/http/WebSocketPacket.java @@ -392,7 +392,7 @@ public final class WebSocketPacket { WebSocketPacket decodePacket(final Logger logger, final WebSocketRunner runner, final WebSocket webSocket, final int wsmaxbody, final AbstractMap.SimpleEntry halfBytes, final ByteBuffer buffer) { //开始 - final boolean debug = false; //调试开关 + final boolean debug = true; //调试开关 if (debug) logger.log(Level.FINEST, "read websocket message's length = " + buffer.remaining()); if (!buffer.hasRemaining()) return NONE; if (buffer.remaining() < 2) { @@ -402,8 +402,8 @@ public final class WebSocketPacket { return NONE; } final byte opcode = buffer.get(); //第一个字节 - this.last = (opcode & 0b1000_0000) != 0; - this.type = FrameType.valueOf(opcode & 0xF); + this.last = (opcode & 0B1000_0000) != 0; + this.type = FrameType.valueOf(opcode & 0B0000_1111); //0x00 表示一个后续帧 //0x01 表示一个文本帧 @@ -413,17 +413,17 @@ public final class WebSocketPacket { //0x9 表示一个ping //0xA 表示一个pong //0x0B-0F 为以后的控制帧保留 - final boolean control = (opcode & 0b0000_1000) != 0; //是否控制帧 - this.receiveCompress = !control && webSocket.deflater != null && (opcode & 0b0100_0000) != 0; //rsv1 为 1 + final boolean control = (opcode & 0B0000_1000) != 0; //是否控制帧 + this.receiveCompress = !control && webSocket.inflater != null && (opcode & 0B0100_0000) != 0; //rsv1 为 1 if (type == FrameType.CLOSE) { if (debug) logger.log(Level.FINEST, " receive close command from websocket client"); } if (type == null) { - logger.log(Level.SEVERE, " receive unknown frametype(opcode=" + (opcode & 0xF) + ") from websocket client"); + logger.log(Level.SEVERE, " receive unknown frametype(opcode=" + (opcode & 0B0000_1111) + ") from websocket client"); } final boolean checkrsv = false;//暂时不校验 - if (checkrsv && (opcode & 0b0111_0000) != 0) { + if (checkrsv && (opcode & 0B0111_0000) != 0) { if (debug) logger.log(Level.FINE, "rsv1 rsv2 rsv3 must be 0, but not (" + opcode + ")"); return null; //rsv1 rsv2 rsv3 must be 0 } @@ -432,6 +432,7 @@ public final class WebSocketPacket { byte lengthCode = crcode; final boolean masked = (lengthCode & 0x80) == 0x80; if (masked) lengthCode ^= 0x80; //mask + if (debug) logger.log(Level.FINEST, " receive type=" + type + ", control=" + control + ", opcode=" + opcode + ", masked=" + masked + ", remaining=" + buffer.remaining()); //判断Buffer剩余内容够不够基本信息的创建 int minBufferLength = ((lengthCode <= 0x7D) ? 0 : (lengthCode == 0x7E ? 2 : 4)) + (masked ? 4 : 0); @@ -441,6 +442,7 @@ public final class WebSocketPacket { bs[1] = crcode; buffer.get(bs, 2, buffer.remaining()); halfBytes.setValue(bs); + if (debug) logger.log(Level.FINEST, " receive not enough buffer,length =" + bs.length); return NONE; } @@ -506,15 +508,18 @@ public final class WebSocketPacket { if (selfType == FrameType.TEXT) { Convert textConvert = webSocket.getTextConvert(); if (textConvert == null || (!runner.mergemsg && (series || !this.last))) { - this.receiveMessage = new String(this.getReceiveBytes(webSocket, buffers), StandardCharsets.UTF_8); + this.receiveMessage = new String(this.getReceiveBytes(webSocket, selfType, buffers), StandardCharsets.UTF_8); this.receiveType = MessageType.STRING; } else { if (this.last || !runner.mergemsg) { - if (runner.currSeriesMergeMessage == null && !this.receiveCompress) { - this.receiveMessage = textConvert.convertFrom(webSocket._messageTextType, this.receiveMasker, buffers); + if (runner.currSeriesMergeMessage == null) { + if (this.receiveCompress) { + this.receiveMessage = textConvert.convertFrom(webSocket._messageTextType, this.getReceiveBytes(webSocket, selfType, buffers)); + } else { + this.receiveMessage = textConvert.convertFrom(webSocket._messageTextType, this.receiveMasker, buffers); + } } else { - if (runner.currSeriesMergeMessage == null) runner.currSeriesMergeMessage = new ByteArray(); - runner.currSeriesMergeMessage.write(this.getReceiveBytes(webSocket, buffers)); + runner.currSeriesMergeMessage.write(this.getReceiveBytes(webSocket, selfType, buffers)); try { this.receiveMessage = textConvert.convertFrom(webSocket._messageTextType, runner.currSeriesMergeMessage.getBytes()); } finally { @@ -523,7 +528,7 @@ public final class WebSocketPacket { } } else { if (runner.currSeriesMergeMessage == null) runner.currSeriesMergeMessage = new ByteArray(); - runner.currSeriesMergeMessage.write(this.getReceiveBytes(webSocket, buffers)); + runner.currSeriesMergeMessage.write(this.getReceiveBytes(webSocket, selfType, buffers)); this.receiveMessage = MESSAGE_NIL; } this.receiveCount = this.receiveLength; @@ -532,15 +537,18 @@ public final class WebSocketPacket { } else if (selfType == FrameType.BINARY) { Convert binaryConvert = webSocket.getBinaryConvert(); if (binaryConvert == null || (!runner.mergemsg && (series || !this.last))) { - this.receiveMessage = this.getReceiveBytes(webSocket, buffers); + this.receiveMessage = this.getReceiveBytes(webSocket, selfType, buffers); this.receiveType = MessageType.BYTES; } else { if (this.last || !runner.mergemsg) { - if (runner.currSeriesMergeMessage == null && !this.receiveCompress) { - this.receiveMessage = binaryConvert.convertFrom(webSocket._messageTextType, this.receiveMasker, buffers); + if (runner.currSeriesMergeMessage == null) { + if (this.receiveCompress) { + this.receiveMessage = binaryConvert.convertFrom(webSocket._messageTextType, this.getReceiveBytes(webSocket, selfType, buffers)); + } else { + this.receiveMessage = binaryConvert.convertFrom(webSocket._messageTextType, this.receiveMasker, buffers); + } } else { - if (runner.currSeriesMergeMessage == null) runner.currSeriesMergeMessage = new ByteArray(); - runner.currSeriesMergeMessage.write(this.getReceiveBytes(webSocket, buffers)); + runner.currSeriesMergeMessage.write(this.getReceiveBytes(webSocket, selfType, buffers)); try { this.receiveMessage = binaryConvert.convertFrom(webSocket._messageTextType, runner.currSeriesMergeMessage.getBytes()); } finally { @@ -549,20 +557,20 @@ public final class WebSocketPacket { } } else { if (runner.currSeriesMergeMessage == null) runner.currSeriesMergeMessage = new ByteArray(); - runner.currSeriesMergeMessage.write(this.getReceiveBytes(webSocket, buffers)); + runner.currSeriesMergeMessage.write(this.getReceiveBytes(webSocket, selfType, buffers)); this.receiveMessage = MESSAGE_NIL; } this.receiveCount = this.receiveLength; this.receiveType = MessageType.OBJECT; } } else if (selfType == FrameType.PING) { - this.receiveMessage = this.getReceiveBytes(webSocket, buffers); + this.receiveMessage = this.getReceiveBytes(webSocket, selfType, buffers); this.receiveType = MessageType.BYTES; } else if (selfType == FrameType.PONG) { - this.receiveMessage = this.getReceiveBytes(webSocket, buffers); + this.receiveMessage = this.getReceiveBytes(webSocket, selfType, buffers); this.receiveType = MessageType.BYTES; } else if (selfType == FrameType.CLOSE) { - this.receiveMessage = this.getReceiveBytes(webSocket, buffers); + this.receiveMessage = this.getReceiveBytes(webSocket, selfType, buffers); this.receiveType = MessageType.BYTES; } } @@ -571,7 +579,7 @@ public final class WebSocketPacket { return this.receiveLength <= this.receiveCount; } - byte[] getReceiveBytes(WebSocket webSocket, ByteBuffer... buffers) { + byte[] getReceiveBytes(WebSocket webSocket, FrameType frameType, ByteBuffer... buffers) { final int length = this.receiveLength; if (length == 0) return new byte[0]; byte[] bs = new byte[length]; @@ -589,9 +597,9 @@ public final class WebSocketPacket { bs[i] = mask.unmask(bs[i]); } } - - if (this.receiveCompress) { - Inflater inflater = new Inflater(true); + if (bs.length > 6 && this.receiveCompress && (frameType == FrameType.BINARY || frameType == FrameType.TEXT || frameType == FrameType.SERIES)) { + Inflater inflater = webSocket.inflater; + inflater.reset(); ByteArrayOutputStream baos = new ByteArrayOutputStream(bs.length); inflater.setInput(Utility.append(bs, EOM_BYTES)); byte[] buff = new byte[1024]; @@ -603,6 +611,7 @@ public final class WebSocketPacket { } return baos.toByteArray(); } catch (Exception ex) { + ex.printStackTrace(); return bs; } } diff --git a/src/org/redkale/net/http/WebSocketServlet.java b/src/org/redkale/net/http/WebSocketServlet.java index 23783d153..2d68d989f 100644 --- a/src/org/redkale/net/http/WebSocketServlet.java +++ b/src/org/redkale/net/http/WebSocketServlet.java @@ -98,6 +98,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl //同RestWebSocket.cryptor, 变量名不可改, 被Rest.createRestWebSocketServlet用到 protected Cryptor cryptor; + protected boolean permessageDeflate = false; + @Resource(name = "jsonconvert") protected Convert jsonConvert; @@ -211,7 +213,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl webSocket._remoteAddress = request.getRemoteAddress(); webSocket._remoteAddr = request.getRemoteAddr(); webSocket._sncpAddress = this.node.localSncpAddress; - if (request.getHeader("Sec-WebSocket-Extensions", "").contains("permessage-deflate")) { + if (this.permessageDeflate && request.getHeader("Sec-WebSocket-Extensions", "").contains("permessage-deflate")) { webSocket.deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, true); webSocket.inflater = new Inflater(true); }