WebSocket支持permessage-deflate单向功能

This commit is contained in:
Redkale
2020-04-16 20:19:49 +08:00
parent 51c50415e1
commit 63ef83ec62
6 changed files with 73 additions and 33 deletions

View File

@@ -14,6 +14,7 @@ import java.util.concurrent.*;
import java.util.function.*; import java.util.function.*;
import java.util.logging.*; import java.util.logging.*;
import java.util.stream.Stream; import java.util.stream.Stream;
import java.util.zip.*;
import org.redkale.convert.Convert; import org.redkale.convert.Convert;
import org.redkale.net.AsyncConnection; import org.redkale.net.AsyncConnection;
import org.redkale.util.Comment; import org.redkale.util.Comment;
@@ -103,7 +104,11 @@ public abstract class WebSocket<G extends Serializable, T> {
java.lang.reflect.Type _messageTextType; //不可能为空 java.lang.reflect.Type _messageTextType; //不可能为空
private long createtime = System.currentTimeMillis(); Deflater deflater; //压缩
Inflater inflater; //解压
long createtime = System.currentTimeMillis();
private long pingtime; private long pingtime;
@@ -894,6 +899,8 @@ public abstract class WebSocket<G extends Serializable, T> {
* 显式地关闭WebSocket * 显式地关闭WebSocket
*/ */
public final void close() { public final void close() {
if (this.deflater != null) this.deflater.end();
if (this.inflater != null) this.inflater.end();
if (this._runner != null) { if (this._runner != null) {
CompletableFuture<Void> future = this._runner.closeRunner(CLOSECODE_SERVERCLOSE, "user close"); CompletableFuture<Void> future = this._runner.closeRunner(CLOSECODE_SERVERCLOSE, "user close");
if (future != null) future.join(); if (future != null) future.join();

View File

@@ -244,7 +244,7 @@ public class WebSocketEngine {
if (bufferSupplier == null) { if (bufferSupplier == null) {
bufferSupplier = websocket.getBufferSupplier(); bufferSupplier = websocket.getBufferSupplier();
bufferConsumer = websocket.getBufferConsumer(); bufferConsumer = websocket.getBufferConsumer();
packet.setSendBuffers(packet.encode(bufferSupplier, bufferConsumer, cryptor)); packet.setSendBuffers(packet.encodePacket(bufferSupplier, bufferConsumer, cryptor));
} }
future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b); future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b);
} }
@@ -255,7 +255,7 @@ public class WebSocketEngine {
if (bufferSupplier == null) { if (bufferSupplier == null) {
bufferSupplier = websocket.getBufferSupplier(); bufferSupplier = websocket.getBufferSupplier();
bufferConsumer = websocket.getBufferConsumer(); bufferConsumer = websocket.getBufferConsumer();
packet.setSendBuffers(packet.encode(bufferSupplier, bufferConsumer, cryptor)); packet.setSendBuffers(packet.encodePacket(bufferSupplier, bufferConsumer, cryptor));
} }
future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b); future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b);
} }
@@ -321,7 +321,7 @@ public class WebSocketEngine {
if (bufferSupplier == null) { if (bufferSupplier == null) {
bufferSupplier = websocket.getBufferSupplier(); bufferSupplier = websocket.getBufferSupplier();
bufferConsumer = websocket.getBufferConsumer(); bufferConsumer = websocket.getBufferConsumer();
packet.setSendBuffers(packet.encode(bufferSupplier, bufferConsumer, cryptor)); packet.setSendBuffers(packet.encodePacket(bufferSupplier, bufferConsumer, cryptor));
} }
future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b); future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b);
} }
@@ -333,7 +333,7 @@ public class WebSocketEngine {
if (bufferSupplier == null) { if (bufferSupplier == null) {
bufferSupplier = websocket.getBufferSupplier(); bufferSupplier = websocket.getBufferSupplier();
bufferConsumer = websocket.getBufferConsumer(); bufferConsumer = websocket.getBufferConsumer();
packet.setSendBuffers(packet.encode(bufferSupplier, bufferConsumer, cryptor)); packet.setSendBuffers(packet.encodePacket(bufferSupplier, bufferConsumer, cryptor));
} }
future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b); future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b);
} }

View File

@@ -9,9 +9,10 @@ import org.redkale.util.Utility;
import java.io.*; import java.io.*;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.AbstractMap; import java.util.*;
import java.util.function.*; import java.util.function.*;
import java.util.logging.*; import java.util.logging.*;
import java.util.zip.*;
import org.redkale.convert.*; import org.redkale.convert.*;
import org.redkale.net.Cryptor; import org.redkale.net.Cryptor;
import org.redkale.util.*; import org.redkale.util.*;
@@ -29,6 +30,8 @@ public final class WebSocketPacket {
static final WebSocketPacket NONE = new WebSocketPacket(); static final WebSocketPacket NONE = new WebSocketPacket();
private static final byte[] EOM_BYTES = new byte[]{0, 0, -1, -1};
public static final WebSocketPacket DEFAULT_PING_PACKET = new WebSocketPacket(FrameType.PING, new byte[0]); public static final WebSocketPacket DEFAULT_PING_PACKET = new WebSocketPacket(FrameType.PING, new byte[0]);
public static enum MessageType { public static enum MessageType {
@@ -82,6 +85,8 @@ public final class WebSocketPacket {
//---------------接收------------------------ //---------------接收------------------------
MessageType receiveType; MessageType receiveType;
boolean receiveCompress;
int receiveCount; int receiveCount;
int receiveLength; int receiveLength;
@@ -223,7 +228,7 @@ public final class WebSocketPacket {
* *
* @return ByteBuffer[] * @return ByteBuffer[]
*/ */
ByteBuffer[] encode(final Supplier<ByteBuffer> supplier, final Consumer<ByteBuffer> consumer, final Cryptor cryptor) { ByteBuffer[] encodePacket(final Supplier<ByteBuffer> supplier, final Consumer<ByteBuffer> consumer, final Cryptor cryptor) {
final byte opcode = (byte) (this.type.getValue() | 0x80); final byte opcode = (byte) (this.type.getValue() | 0x80);
if (this.sendConvert != null) { if (this.sendConvert != null) {
Supplier<ByteBuffer> newsupplier = new Supplier<ByteBuffer>() { Supplier<ByteBuffer> newsupplier = new Supplier<ByteBuffer>() {
@@ -391,7 +396,7 @@ public final class WebSocketPacket {
* *
* @return 返回NONE表示Buffer内容不够 返回this表示解析完成或部分解析完成返回null表示解析异常 * @return 返回NONE表示Buffer内容不够 返回this表示解析完成或部分解析完成返回null表示解析异常
*/ */
WebSocketPacket decode(final Logger logger, final WebSocketRunner runner, final WebSocket webSocket, final int wsmaxbody, WebSocketPacket decodePacket(final Logger logger, final WebSocketRunner runner, final WebSocket webSocket, final int wsmaxbody,
final AbstractMap.SimpleEntry<String, byte[]> halfBytes, final ByteBuffer buffer) { final AbstractMap.SimpleEntry<String, byte[]> halfBytes, final ByteBuffer buffer) {
//开始 //开始
final boolean debug = false; //调试开关 final boolean debug = false; //调试开关
@@ -407,6 +412,17 @@ public final class WebSocketPacket {
this.last = (opcode & 0b1000_0000) != 0; this.last = (opcode & 0b1000_0000) != 0;
this.type = FrameType.valueOf(opcode & 0xF); this.type = FrameType.valueOf(opcode & 0xF);
//0x00 表示一个后续帧
//0x01 表示一个文本帧
//0x02 表示一个二进制帧
//0x03-07 为以后的非控制帧保留
//0x8 表示一个连接关闭
//0x9 表示一个ping
//0xA 表示一个pong
//0x0B-0F 为以后的控制帧保留
final boolean control = (opcode & 0b0000_1000) != 0; //是否控制帧
this.receiveCompress = !control && webSocket.deflater != null && (opcode & 0b0100_0000) != 0; //rsv1 为 1
if (type == FrameType.CLOSE) { if (type == FrameType.CLOSE) {
if (debug) logger.log(Level.FINEST, " receive close command from websocket client"); if (debug) logger.log(Level.FINEST, " receive close command from websocket client");
} }
@@ -418,15 +434,6 @@ public final class WebSocketPacket {
if (debug) logger.log(Level.FINE, "rsv1 rsv2 rsv3 must be 0, but not (" + opcode + ")"); if (debug) logger.log(Level.FINE, "rsv1 rsv2 rsv3 must be 0, but not (" + opcode + ")");
return null; //rsv1 rsv2 rsv3 must be 0 return null; //rsv1 rsv2 rsv3 must be 0
} }
//0x00 表示一个后续帧
//0x01 表示一个文本帧
//0x02 表示一个二进制帧
//0x03-07 为以后的非控制帧保留
//0x8 表示一个连接关闭
//0x9 表示一个ping
//0xA 表示一个pong
//0x0B-0F 为以后的控制帧保留
final boolean control = (opcode & 0b0000_1000) != 0; //是否控制帧
final byte crcode = buffer.get(); //第二个字节 final byte crcode = buffer.get(); //第二个字节
byte lengthCode = crcode; byte lengthCode = crcode;
@@ -506,14 +513,15 @@ public final class WebSocketPacket {
if (selfType == FrameType.TEXT) { if (selfType == FrameType.TEXT) {
Convert textConvert = webSocket.getTextConvert(); Convert textConvert = webSocket.getTextConvert();
if (textConvert == null || (!runner.mergemsg && (series || !this.last))) { if (textConvert == null || (!runner.mergemsg && (series || !this.last))) {
this.receiveMessage = new String(this.getReceiveBytes(buffers), StandardCharsets.UTF_8); this.receiveMessage = new String(this.getReceiveBytes(webSocket, buffers), StandardCharsets.UTF_8);
this.receiveType = MessageType.STRING; this.receiveType = MessageType.STRING;
} else { } else {
if (this.last || !runner.mergemsg) { if (this.last || !runner.mergemsg) {
if (runner.currSeriesMergeMessage == null) { if (runner.currSeriesMergeMessage == null && !this.receiveCompress) {
this.receiveMessage = textConvert.convertFrom(webSocket._messageTextType, this.receiveMasker, buffers); this.receiveMessage = textConvert.convertFrom(webSocket._messageTextType, this.receiveMasker, buffers);
} else { } else {
runner.currSeriesMergeMessage.write(this.getReceiveBytes(buffers)); if (runner.currSeriesMergeMessage == null) runner.currSeriesMergeMessage = new ByteArray();
runner.currSeriesMergeMessage.write(this.getReceiveBytes(webSocket, buffers));
try { try {
this.receiveMessage = textConvert.convertFrom(webSocket._messageTextType, runner.currSeriesMergeMessage.getBytes()); this.receiveMessage = textConvert.convertFrom(webSocket._messageTextType, runner.currSeriesMergeMessage.getBytes());
} finally { } finally {
@@ -522,7 +530,7 @@ public final class WebSocketPacket {
} }
} else { } else {
if (runner.currSeriesMergeMessage == null) runner.currSeriesMergeMessage = new ByteArray(); if (runner.currSeriesMergeMessage == null) runner.currSeriesMergeMessage = new ByteArray();
runner.currSeriesMergeMessage.write(this.getReceiveBytes(buffers)); runner.currSeriesMergeMessage.write(this.getReceiveBytes(webSocket, buffers));
this.receiveMessage = MESSAGE_NIL; this.receiveMessage = MESSAGE_NIL;
} }
this.receiveCount = this.receiveLength; this.receiveCount = this.receiveLength;
@@ -531,14 +539,15 @@ public final class WebSocketPacket {
} else if (selfType == FrameType.BINARY) { } else if (selfType == FrameType.BINARY) {
Convert binaryConvert = webSocket.getBinaryConvert(); Convert binaryConvert = webSocket.getBinaryConvert();
if (binaryConvert == null || (!runner.mergemsg && (series || !this.last))) { if (binaryConvert == null || (!runner.mergemsg && (series || !this.last))) {
this.receiveMessage = this.getReceiveBytes(buffers); this.receiveMessage = this.getReceiveBytes(webSocket, buffers);
this.receiveType = MessageType.BYTES; this.receiveType = MessageType.BYTES;
} else { } else {
if (this.last || !runner.mergemsg) { if (this.last || !runner.mergemsg) {
if (runner.currSeriesMergeMessage == null) { if (runner.currSeriesMergeMessage == null && !this.receiveCompress) {
this.receiveMessage = binaryConvert.convertFrom(webSocket._messageTextType, this.receiveMasker, buffers); this.receiveMessage = binaryConvert.convertFrom(webSocket._messageTextType, this.receiveMasker, buffers);
} else { } else {
runner.currSeriesMergeMessage.write(this.getReceiveBytes(buffers)); if (runner.currSeriesMergeMessage == null) runner.currSeriesMergeMessage = new ByteArray();
runner.currSeriesMergeMessage.write(this.getReceiveBytes(webSocket, buffers));
try { try {
this.receiveMessage = binaryConvert.convertFrom(webSocket._messageTextType, runner.currSeriesMergeMessage.getBytes()); this.receiveMessage = binaryConvert.convertFrom(webSocket._messageTextType, runner.currSeriesMergeMessage.getBytes());
} finally { } finally {
@@ -547,20 +556,20 @@ public final class WebSocketPacket {
} }
} else { } else {
if (runner.currSeriesMergeMessage == null) runner.currSeriesMergeMessage = new ByteArray(); if (runner.currSeriesMergeMessage == null) runner.currSeriesMergeMessage = new ByteArray();
runner.currSeriesMergeMessage.write(this.getReceiveBytes(buffers)); runner.currSeriesMergeMessage.write(this.getReceiveBytes(webSocket, buffers));
this.receiveMessage = MESSAGE_NIL; this.receiveMessage = MESSAGE_NIL;
} }
this.receiveCount = this.receiveLength; this.receiveCount = this.receiveLength;
this.receiveType = MessageType.OBJECT; this.receiveType = MessageType.OBJECT;
} }
} else if (selfType == FrameType.PING) { } else if (selfType == FrameType.PING) {
this.receiveMessage = this.getReceiveBytes(buffers); this.receiveMessage = this.getReceiveBytes(webSocket, buffers);
this.receiveType = MessageType.BYTES; this.receiveType = MessageType.BYTES;
} else if (selfType == FrameType.PONG) { } else if (selfType == FrameType.PONG) {
this.receiveMessage = this.getReceiveBytes(buffers); this.receiveMessage = this.getReceiveBytes(webSocket, buffers);
this.receiveType = MessageType.BYTES; this.receiveType = MessageType.BYTES;
} else if (selfType == FrameType.CLOSE) { } else if (selfType == FrameType.CLOSE) {
this.receiveMessage = this.getReceiveBytes(buffers); this.receiveMessage = this.getReceiveBytes(webSocket, buffers);
this.receiveType = MessageType.BYTES; this.receiveType = MessageType.BYTES;
} }
} }
@@ -569,7 +578,7 @@ public final class WebSocketPacket {
return this.receiveLength <= this.receiveCount; return this.receiveLength <= this.receiveCount;
} }
byte[] getReceiveBytes(ByteBuffer... buffers) { byte[] getReceiveBytes(WebSocket webSocket, ByteBuffer... buffers) {
final int length = this.receiveLength; final int length = this.receiveLength;
if (length == 0) return new byte[0]; if (length == 0) return new byte[0];
byte[] bs = new byte[length]; byte[] bs = new byte[length];
@@ -587,6 +596,23 @@ public final class WebSocketPacket {
bs[i] = mask.unmask(bs[i]); bs[i] = mask.unmask(bs[i]);
} }
} }
if (this.receiveCompress) {
Inflater inflater = new Inflater(true);
ByteArrayOutputStream baos = new ByteArrayOutputStream(bs.length);
inflater.setInput(Utility.append(bs, EOM_BYTES));
byte[] buff = new byte[1024];
try {
while (!inflater.finished()) {
int count = inflater.inflate(buff);
if (count == 0) break;
baos.write(buff, 0, count);
}
return baos.toByteArray();
} catch (Exception ex) {
return bs;
}
}
return bs; return bs;
} }

View File

@@ -108,7 +108,7 @@ class WebSocketRunner implements Runnable {
if (onePacket != null) packets.add(onePacket); if (onePacket != null) packets.add(onePacket);
try { try {
while (true) { while (true) {
WebSocketPacket packet = new WebSocketPacket().decode(context.getLogger(), self, webSocket, wsmaxbody, halfBytes, readBuffer); WebSocketPacket packet = new WebSocketPacket().decodePacket(context.getLogger(), self, webSocket, wsmaxbody, halfBytes, readBuffer);
if (packet == WebSocketPacket.NONE) break; //解析完毕但是buffer有多余字节 if (packet == WebSocketPacket.NONE) break; //解析完毕但是buffer有多余字节
if (packet != null && !packet.isReceiveFinished()) { if (packet != null && !packet.isReceiveFinished()) {
unfinishPacket = packet; unfinishPacket = packet;
@@ -230,7 +230,7 @@ class WebSocketRunner implements Runnable {
//System.out.println("推送消息"); //System.out.println("推送消息");
final CompletableFuture<Integer> futureResult = new CompletableFuture<>(); final CompletableFuture<Integer> futureResult = new CompletableFuture<>();
try { try {
ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encode(webSocket._channel.getBufferSupplier(), webSocket._channel.getBufferConsumer(), webSocket._engine.cryptor); ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encodePacket(webSocket._channel.getBufferSupplier(), webSocket._channel.getBufferConsumer(), webSocket._engine.cryptor);
//if (debug) context.getLogger().log(Level.FINEST, "wsrunner.sending websocket message: " + packet); //if (debug) context.getLogger().log(Level.FINEST, "wsrunner.sending websocket message: " + packet);
CompletionHandler<Integer, ByteBuffer[]> handler = new CompletionHandler<Integer, ByteBuffer[]>() { CompletionHandler<Integer, ByteBuffer[]> handler = new CompletionHandler<Integer, ByteBuffer[]>() {

View File

@@ -15,6 +15,7 @@ import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.function.*; import java.util.function.*;
import java.util.logging.*; import java.util.logging.*;
import java.util.zip.*;
import javax.annotation.*; import javax.annotation.*;
import org.redkale.convert.Convert; import org.redkale.convert.Convert;
import org.redkale.net.Cryptor; import org.redkale.net.Cryptor;
@@ -210,6 +211,10 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
webSocket._remoteAddress = request.getRemoteAddress(); webSocket._remoteAddress = request.getRemoteAddress();
webSocket._remoteAddr = request.getRemoteAddr(); webSocket._remoteAddr = request.getRemoteAddr();
webSocket._sncpAddress = this.node.localSncpAddress; webSocket._sncpAddress = this.node.localSncpAddress;
if (request.getHeader("Sec-WebSocket-Extensions", "").contains("permessage-deflate")) {
webSocket.deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, true);
webSocket.inflater = new Inflater(true);
}
initRestWebSocket(webSocket); initRestWebSocket(webSocket);
CompletableFuture<String> sessionFuture = webSocket.onOpen(request); CompletableFuture<String> sessionFuture = webSocket.onOpen(request);
if (sessionFuture == null) { if (sessionFuture == null) {
@@ -233,6 +238,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
response.setHeader("Connection", "Upgrade"); response.setHeader("Connection", "Upgrade");
response.addHeader("Upgrade", "websocket"); response.addHeader("Upgrade", "websocket");
response.addHeader("Sec-WebSocket-Accept", Base64.getEncoder().encodeToString(bytes)); response.addHeader("Sec-WebSocket-Accept", Base64.getEncoder().encodeToString(bytes));
if (webSocket.deflater != null) response.addHeader("Sec-WebSocket-Extensions", "permessage-deflate");
response.sendBody((ByteBuffer) null, null, new CompletionHandler<Integer, Void>() { response.sendBody((ByteBuffer) null, null, new CompletionHandler<Integer, Void>() {
WebSocketRunner temprunner = null; WebSocketRunner temprunner = null;

View File

@@ -17,7 +17,7 @@ public final class Redkale {
} }
public static String getDotedVersion() { public static String getDotedVersion() {
return "2.0.0"; return "2.1.0";
} }
public static int getMajorVersion() { public static int getMajorVersion() {
@@ -25,6 +25,6 @@ public final class Redkale {
} }
public static int getMinorVersion() { public static int getMinorVersion() {
return 0; return 1;
} }
} }