From 63ef83ec622e2c7adf5286891bc0d90a0ec2a4a8 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Thu, 16 Apr 2020 20:19:49 +0800 Subject: [PATCH] =?UTF-8?q?WebSocket=E6=94=AF=E6=8C=81permessage-deflate?= =?UTF-8?q?=E5=8D=95=E5=90=91=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/org/redkale/net/http/WebSocket.java | 9 ++- src/org/redkale/net/http/WebSocketEngine.java | 8 +- src/org/redkale/net/http/WebSocketPacket.java | 74 +++++++++++++------ src/org/redkale/net/http/WebSocketRunner.java | 4 +- .../redkale/net/http/WebSocketServlet.java | 7 ++ src/org/redkale/util/Redkale.java | 4 +- 6 files changed, 73 insertions(+), 33 deletions(-) diff --git a/src/org/redkale/net/http/WebSocket.java b/src/org/redkale/net/http/WebSocket.java index 42456f469..1174fead7 100644 --- a/src/org/redkale/net/http/WebSocket.java +++ b/src/org/redkale/net/http/WebSocket.java @@ -14,6 +14,7 @@ import java.util.concurrent.*; import java.util.function.*; import java.util.logging.*; import java.util.stream.Stream; +import java.util.zip.*; import org.redkale.convert.Convert; import org.redkale.net.AsyncConnection; import org.redkale.util.Comment; @@ -103,7 +104,11 @@ public abstract class WebSocket { java.lang.reflect.Type _messageTextType; //不可能为空 - private long createtime = System.currentTimeMillis(); + Deflater deflater; //压缩 + + Inflater inflater; //解压 + + long createtime = System.currentTimeMillis(); private long pingtime; @@ -894,6 +899,8 @@ public abstract class WebSocket { * 显式地关闭WebSocket */ public final void close() { + if (this.deflater != null) this.deflater.end(); + if (this.inflater != null) this.inflater.end(); if (this._runner != null) { CompletableFuture future = this._runner.closeRunner(CLOSECODE_SERVERCLOSE, "user close"); if (future != null) future.join(); diff --git a/src/org/redkale/net/http/WebSocketEngine.java b/src/org/redkale/net/http/WebSocketEngine.java index 8e03af683..a3f9292eb 100644 --- a/src/org/redkale/net/http/WebSocketEngine.java +++ b/src/org/redkale/net/http/WebSocketEngine.java @@ -244,7 +244,7 @@ public class WebSocketEngine { if (bufferSupplier == null) { bufferSupplier = websocket.getBufferSupplier(); bufferConsumer = websocket.getBufferConsumer(); - packet.setSendBuffers(packet.encode(bufferSupplier, bufferConsumer, cryptor)); + packet.setSendBuffers(packet.encodePacket(bufferSupplier, bufferConsumer, cryptor)); } future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b); } @@ -255,7 +255,7 @@ public class WebSocketEngine { if (bufferSupplier == null) { bufferSupplier = websocket.getBufferSupplier(); bufferConsumer = websocket.getBufferConsumer(); - packet.setSendBuffers(packet.encode(bufferSupplier, bufferConsumer, cryptor)); + packet.setSendBuffers(packet.encodePacket(bufferSupplier, bufferConsumer, cryptor)); } future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b); } @@ -321,7 +321,7 @@ public class WebSocketEngine { if (bufferSupplier == null) { bufferSupplier = websocket.getBufferSupplier(); bufferConsumer = websocket.getBufferConsumer(); - packet.setSendBuffers(packet.encode(bufferSupplier, bufferConsumer, cryptor)); + packet.setSendBuffers(packet.encodePacket(bufferSupplier, bufferConsumer, cryptor)); } future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b); } @@ -333,7 +333,7 @@ public class WebSocketEngine { if (bufferSupplier == null) { bufferSupplier = websocket.getBufferSupplier(); bufferConsumer = websocket.getBufferConsumer(); - packet.setSendBuffers(packet.encode(bufferSupplier, bufferConsumer, cryptor)); + packet.setSendBuffers(packet.encodePacket(bufferSupplier, bufferConsumer, cryptor)); } future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b); } diff --git a/src/org/redkale/net/http/WebSocketPacket.java b/src/org/redkale/net/http/WebSocketPacket.java index af41d3c2d..35a1317f6 100644 --- a/src/org/redkale/net/http/WebSocketPacket.java +++ b/src/org/redkale/net/http/WebSocketPacket.java @@ -9,9 +9,10 @@ import org.redkale.util.Utility; import java.io.*; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.util.AbstractMap; +import java.util.*; import java.util.function.*; import java.util.logging.*; +import java.util.zip.*; import org.redkale.convert.*; import org.redkale.net.Cryptor; import org.redkale.util.*; @@ -29,6 +30,8 @@ public final class WebSocketPacket { static final WebSocketPacket NONE = new WebSocketPacket(); + private static final byte[] EOM_BYTES = new byte[]{0, 0, -1, -1}; + public static final WebSocketPacket DEFAULT_PING_PACKET = new WebSocketPacket(FrameType.PING, new byte[0]); public static enum MessageType { @@ -82,6 +85,8 @@ public final class WebSocketPacket { //---------------接收------------------------ MessageType receiveType; + boolean receiveCompress; + int receiveCount; int receiveLength; @@ -223,7 +228,7 @@ public final class WebSocketPacket { * * @return ByteBuffer[] */ - ByteBuffer[] encode(final Supplier supplier, final Consumer consumer, final Cryptor cryptor) { + ByteBuffer[] encodePacket(final Supplier supplier, final Consumer consumer, final Cryptor cryptor) { final byte opcode = (byte) (this.type.getValue() | 0x80); if (this.sendConvert != null) { Supplier newsupplier = new Supplier() { @@ -391,7 +396,7 @@ public final class WebSocketPacket { * * @return 返回NONE表示Buffer内容不够; 返回this表示解析完成或部分解析完成;返回null表示解析异常; */ - WebSocketPacket decode(final Logger logger, final WebSocketRunner runner, final WebSocket webSocket, final int wsmaxbody, + WebSocketPacket decodePacket(final Logger logger, final WebSocketRunner runner, final WebSocket webSocket, final int wsmaxbody, final AbstractMap.SimpleEntry halfBytes, final ByteBuffer buffer) { //开始 final boolean debug = false; //调试开关 @@ -407,6 +412,17 @@ public final class WebSocketPacket { this.last = (opcode & 0b1000_0000) != 0; this.type = FrameType.valueOf(opcode & 0xF); + //0x00 表示一个后续帧 + //0x01 表示一个文本帧 + //0x02 表示一个二进制帧 + //0x03-07 为以后的非控制帧保留 + //0x8 表示一个连接关闭 + //0x9 表示一个ping + //0xA 表示一个pong + //0x0B-0F 为以后的控制帧保留 + final boolean control = (opcode & 0b0000_1000) != 0; //是否控制帧 + this.receiveCompress = !control && webSocket.deflater != null && (opcode & 0b0100_0000) != 0; //rsv1 为 1 + if (type == FrameType.CLOSE) { if (debug) logger.log(Level.FINEST, " receive close command from websocket client"); } @@ -418,15 +434,6 @@ public final class WebSocketPacket { if (debug) logger.log(Level.FINE, "rsv1 rsv2 rsv3 must be 0, but not (" + opcode + ")"); return null; //rsv1 rsv2 rsv3 must be 0 } - //0x00 表示一个后续帧 - //0x01 表示一个文本帧 - //0x02 表示一个二进制帧 - //0x03-07 为以后的非控制帧保留 - //0x8 表示一个连接关闭 - //0x9 表示一个ping - //0xA 表示一个pong - //0x0B-0F 为以后的控制帧保留 - final boolean control = (opcode & 0b0000_1000) != 0; //是否控制帧 final byte crcode = buffer.get(); //第二个字节 byte lengthCode = crcode; @@ -506,14 +513,15 @@ public final class WebSocketPacket { if (selfType == FrameType.TEXT) { Convert textConvert = webSocket.getTextConvert(); if (textConvert == null || (!runner.mergemsg && (series || !this.last))) { - this.receiveMessage = new String(this.getReceiveBytes(buffers), StandardCharsets.UTF_8); + this.receiveMessage = new String(this.getReceiveBytes(webSocket, buffers), StandardCharsets.UTF_8); this.receiveType = MessageType.STRING; } else { if (this.last || !runner.mergemsg) { - if (runner.currSeriesMergeMessage == null) { + if (runner.currSeriesMergeMessage == null && !this.receiveCompress) { this.receiveMessage = textConvert.convertFrom(webSocket._messageTextType, this.receiveMasker, buffers); } else { - runner.currSeriesMergeMessage.write(this.getReceiveBytes(buffers)); + if (runner.currSeriesMergeMessage == null) runner.currSeriesMergeMessage = new ByteArray(); + runner.currSeriesMergeMessage.write(this.getReceiveBytes(webSocket, buffers)); try { this.receiveMessage = textConvert.convertFrom(webSocket._messageTextType, runner.currSeriesMergeMessage.getBytes()); } finally { @@ -522,7 +530,7 @@ public final class WebSocketPacket { } } else { if (runner.currSeriesMergeMessage == null) runner.currSeriesMergeMessage = new ByteArray(); - runner.currSeriesMergeMessage.write(this.getReceiveBytes(buffers)); + runner.currSeriesMergeMessage.write(this.getReceiveBytes(webSocket, buffers)); this.receiveMessage = MESSAGE_NIL; } this.receiveCount = this.receiveLength; @@ -531,14 +539,15 @@ public final class WebSocketPacket { } else if (selfType == FrameType.BINARY) { Convert binaryConvert = webSocket.getBinaryConvert(); if (binaryConvert == null || (!runner.mergemsg && (series || !this.last))) { - this.receiveMessage = this.getReceiveBytes(buffers); + this.receiveMessage = this.getReceiveBytes(webSocket, buffers); this.receiveType = MessageType.BYTES; } else { if (this.last || !runner.mergemsg) { - if (runner.currSeriesMergeMessage == null) { + if (runner.currSeriesMergeMessage == null && !this.receiveCompress) { this.receiveMessage = binaryConvert.convertFrom(webSocket._messageTextType, this.receiveMasker, buffers); } else { - runner.currSeriesMergeMessage.write(this.getReceiveBytes(buffers)); + if (runner.currSeriesMergeMessage == null) runner.currSeriesMergeMessage = new ByteArray(); + runner.currSeriesMergeMessage.write(this.getReceiveBytes(webSocket, buffers)); try { this.receiveMessage = binaryConvert.convertFrom(webSocket._messageTextType, runner.currSeriesMergeMessage.getBytes()); } finally { @@ -547,20 +556,20 @@ public final class WebSocketPacket { } } else { if (runner.currSeriesMergeMessage == null) runner.currSeriesMergeMessage = new ByteArray(); - runner.currSeriesMergeMessage.write(this.getReceiveBytes(buffers)); + runner.currSeriesMergeMessage.write(this.getReceiveBytes(webSocket, buffers)); this.receiveMessage = MESSAGE_NIL; } this.receiveCount = this.receiveLength; this.receiveType = MessageType.OBJECT; } } else if (selfType == FrameType.PING) { - this.receiveMessage = this.getReceiveBytes(buffers); + this.receiveMessage = this.getReceiveBytes(webSocket, buffers); this.receiveType = MessageType.BYTES; } else if (selfType == FrameType.PONG) { - this.receiveMessage = this.getReceiveBytes(buffers); + this.receiveMessage = this.getReceiveBytes(webSocket, buffers); this.receiveType = MessageType.BYTES; } else if (selfType == FrameType.CLOSE) { - this.receiveMessage = this.getReceiveBytes(buffers); + this.receiveMessage = this.getReceiveBytes(webSocket, buffers); this.receiveType = MessageType.BYTES; } } @@ -569,7 +578,7 @@ public final class WebSocketPacket { return this.receiveLength <= this.receiveCount; } - byte[] getReceiveBytes(ByteBuffer... buffers) { + byte[] getReceiveBytes(WebSocket webSocket, ByteBuffer... buffers) { final int length = this.receiveLength; if (length == 0) return new byte[0]; byte[] bs = new byte[length]; @@ -587,6 +596,23 @@ public final class WebSocketPacket { bs[i] = mask.unmask(bs[i]); } } + + if (this.receiveCompress) { + Inflater inflater = new Inflater(true); + ByteArrayOutputStream baos = new ByteArrayOutputStream(bs.length); + inflater.setInput(Utility.append(bs, EOM_BYTES)); + byte[] buff = new byte[1024]; + try { + while (!inflater.finished()) { + int count = inflater.inflate(buff); + if (count == 0) break; + baos.write(buff, 0, count); + } + return baos.toByteArray(); + } catch (Exception ex) { + return bs; + } + } return bs; } diff --git a/src/org/redkale/net/http/WebSocketRunner.java b/src/org/redkale/net/http/WebSocketRunner.java index 22e18e652..350d43453 100644 --- a/src/org/redkale/net/http/WebSocketRunner.java +++ b/src/org/redkale/net/http/WebSocketRunner.java @@ -108,7 +108,7 @@ class WebSocketRunner implements Runnable { if (onePacket != null) packets.add(onePacket); try { while (true) { - WebSocketPacket packet = new WebSocketPacket().decode(context.getLogger(), self, webSocket, wsmaxbody, halfBytes, readBuffer); + WebSocketPacket packet = new WebSocketPacket().decodePacket(context.getLogger(), self, webSocket, wsmaxbody, halfBytes, readBuffer); if (packet == WebSocketPacket.NONE) break; //解析完毕但是buffer有多余字节 if (packet != null && !packet.isReceiveFinished()) { unfinishPacket = packet; @@ -230,7 +230,7 @@ class WebSocketRunner implements Runnable { //System.out.println("推送消息"); final CompletableFuture futureResult = new CompletableFuture<>(); try { - ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encode(webSocket._channel.getBufferSupplier(), webSocket._channel.getBufferConsumer(), webSocket._engine.cryptor); + ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encodePacket(webSocket._channel.getBufferSupplier(), webSocket._channel.getBufferConsumer(), webSocket._engine.cryptor); //if (debug) context.getLogger().log(Level.FINEST, "wsrunner.sending websocket message: " + packet); CompletionHandler handler = new CompletionHandler() { diff --git a/src/org/redkale/net/http/WebSocketServlet.java b/src/org/redkale/net/http/WebSocketServlet.java index 6ab912479..23783d153 100644 --- a/src/org/redkale/net/http/WebSocketServlet.java +++ b/src/org/redkale/net/http/WebSocketServlet.java @@ -15,6 +15,7 @@ import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.function.*; import java.util.logging.*; +import java.util.zip.*; import javax.annotation.*; import org.redkale.convert.Convert; import org.redkale.net.Cryptor; @@ -210,6 +211,10 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl webSocket._remoteAddress = request.getRemoteAddress(); webSocket._remoteAddr = request.getRemoteAddr(); webSocket._sncpAddress = this.node.localSncpAddress; + if (request.getHeader("Sec-WebSocket-Extensions", "").contains("permessage-deflate")) { + webSocket.deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, true); + webSocket.inflater = new Inflater(true); + } initRestWebSocket(webSocket); CompletableFuture sessionFuture = webSocket.onOpen(request); if (sessionFuture == null) { @@ -233,6 +238,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl response.setHeader("Connection", "Upgrade"); response.addHeader("Upgrade", "websocket"); response.addHeader("Sec-WebSocket-Accept", Base64.getEncoder().encodeToString(bytes)); + if (webSocket.deflater != null) response.addHeader("Sec-WebSocket-Extensions", "permessage-deflate"); + response.sendBody((ByteBuffer) null, null, new CompletionHandler() { WebSocketRunner temprunner = null; diff --git a/src/org/redkale/util/Redkale.java b/src/org/redkale/util/Redkale.java index a1cdd25ef..10b298a3e 100644 --- a/src/org/redkale/util/Redkale.java +++ b/src/org/redkale/util/Redkale.java @@ -17,7 +17,7 @@ public final class Redkale { } public static String getDotedVersion() { - return "2.0.0"; + return "2.1.0"; } public static int getMajorVersion() { @@ -25,6 +25,6 @@ public final class Redkale { } public static int getMinorVersion() { - return 0; + return 1; } }