修改WebSocketPacket解析协议方式,并修复多报并发请求导致后面的消息丢失的BUG
This commit is contained in:
@@ -8,6 +8,8 @@ package org.redkale.net.http;
|
|||||||
import org.redkale.util.Utility;
|
import org.redkale.util.Utility;
|
||||||
import java.io.*;
|
import java.io.*;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.charset.Charset;
|
||||||
|
import java.util.AbstractMap;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
import java.util.logging.*;
|
import java.util.logging.*;
|
||||||
import org.redkale.convert.*;
|
import org.redkale.convert.*;
|
||||||
@@ -21,8 +23,16 @@ import org.redkale.convert.*;
|
|||||||
*/
|
*/
|
||||||
public final class WebSocketPacket {
|
public final class WebSocketPacket {
|
||||||
|
|
||||||
|
private static final Charset UTF_8 = Charset.forName("UTF-8");
|
||||||
|
|
||||||
|
static final WebSocketPacket NONE = new WebSocketPacket();
|
||||||
|
|
||||||
public static final WebSocketPacket DEFAULT_PING_PACKET = new WebSocketPacket(FrameType.PING, new byte[0]);
|
public static final WebSocketPacket DEFAULT_PING_PACKET = new WebSocketPacket(FrameType.PING, new byte[0]);
|
||||||
|
|
||||||
|
public static enum MessageType {
|
||||||
|
STRING, BYTES, OBJECT;
|
||||||
|
}
|
||||||
|
|
||||||
public static enum FrameType {
|
public static enum FrameType {
|
||||||
|
|
||||||
TEXT(0x01), BINARY(0x02), CLOSE(0x08), PING(0x09), PONG(0x0A);
|
TEXT(0x01), BINARY(0x02), CLOSE(0x08), PING(0x09), PONG(0x0A);
|
||||||
@@ -57,14 +67,24 @@ public final class WebSocketPacket {
|
|||||||
|
|
||||||
protected boolean last = true;
|
protected boolean last = true;
|
||||||
|
|
||||||
|
//---------------发送------------------------
|
||||||
Object sendJson;
|
Object sendJson;
|
||||||
|
|
||||||
Convert sendConvert;
|
Convert sendConvert;
|
||||||
|
|
||||||
boolean mapconvable;
|
boolean sendMapconvable;
|
||||||
|
|
||||||
ByteBuffer[] sendBuffers;
|
ByteBuffer[] sendBuffers;
|
||||||
|
|
||||||
|
//---------------接收------------------------
|
||||||
|
MessageType receiveType;
|
||||||
|
|
||||||
|
int receiveCount;
|
||||||
|
|
||||||
|
int receiveLength;
|
||||||
|
|
||||||
|
Object receiveMessage;
|
||||||
|
|
||||||
ConvertMask receiveMasker;
|
ConvertMask receiveMasker;
|
||||||
|
|
||||||
ByteBuffer[] receiveBuffers;
|
ByteBuffer[] receiveBuffers;
|
||||||
@@ -119,7 +139,7 @@ public final class WebSocketPacket {
|
|||||||
WebSocketPacket(Convert convert, boolean mapconvable, Object json, boolean fin) {
|
WebSocketPacket(Convert convert, boolean mapconvable, Object json, boolean fin) {
|
||||||
this.type = (convert == null || !convert.isBinary()) ? FrameType.TEXT : FrameType.BINARY;
|
this.type = (convert == null || !convert.isBinary()) ? FrameType.TEXT : FrameType.BINARY;
|
||||||
this.sendConvert = convert;
|
this.sendConvert = convert;
|
||||||
this.mapconvable = mapconvable;
|
this.sendMapconvable = mapconvable;
|
||||||
this.sendJson = json;
|
this.sendJson = json;
|
||||||
this.last = fin;
|
this.last = fin;
|
||||||
if (mapconvable && !(json instanceof Object[])) throw new IllegalArgumentException();
|
if (mapconvable && !(json instanceof Object[])) throw new IllegalArgumentException();
|
||||||
@@ -183,7 +203,7 @@ public final class WebSocketPacket {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return this.getClass().getSimpleName() + "[type=" + type + ", last=" + last + (payload != null ? (", payload=" + payload) : "") + (bytes != null ? (", bytes=[" + bytes.length + ']') : "") + (sendJson != null ? (", json=" + (mapconvable ? Utility.ofMap((Object[]) sendJson) : sendJson)) : "") + "]";
|
return this.getClass().getSimpleName() + "[type=" + type + ", last=" + last + (payload != null ? (", payload=" + payload) : "") + (bytes != null ? (", bytes=[" + bytes.length + ']') : (receiveLength > 0 ? (", receivemsg=" + receiveMessage) : "")) + (sendJson != null ? (", json=" + (sendMapconvable ? Utility.ofMap((Object[]) sendJson) : sendJson)) : "") + "]";
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -211,7 +231,7 @@ public final class WebSocketPacket {
|
|||||||
return supplier.get();
|
return supplier.get();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
ByteBuffer[] buffers = this.mapconvable ? this.sendConvert.convertMapTo(newsupplier, (Object[]) sendJson) : this.sendConvert.convertTo(newsupplier, sendJson);
|
ByteBuffer[] buffers = this.sendMapconvable ? this.sendConvert.convertMapTo(newsupplier, (Object[]) sendJson) : this.sendConvert.convertTo(newsupplier, sendJson);
|
||||||
int len = 0;
|
int len = 0;
|
||||||
for (ByteBuffer buf : buffers) {
|
for (ByteBuffer buf : buffers) {
|
||||||
len += buf.remaining();
|
len += buf.remaining();
|
||||||
@@ -291,108 +311,20 @@ public final class WebSocketPacket {
|
|||||||
// System.out.println(rs);
|
// System.out.println(rs);
|
||||||
// }
|
// }
|
||||||
/**
|
/**
|
||||||
* 消息解码 <br>
|
|
||||||
*
|
*
|
||||||
* 0 1 2 3
|
* @param webSocket WebSocket
|
||||||
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
|
* @param readBuffer ByteBuffer
|
||||||
* +-+-+-+-+-------+-+-------------+-------------------------------+
|
|
||||||
* |F|R|R|R| opcode|M| Payload len | Extended payload length |
|
|
||||||
* |I|S|S|S| (4) |A| (7) | (16/64) |
|
|
||||||
* |N|V|V|V| |S| | (if payload len==126/127) |
|
|
||||||
* | |1|2|3| |K| | |
|
|
||||||
* +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
|
|
||||||
* | Extended payload length continued, if payload len == 127 |
|
|
||||||
* + - - - - - - - - - - - - - - - +-------------------------------+
|
|
||||||
* | |Masking-key, if MASK set to 1 |
|
|
||||||
* +-------------------------------+-------------------------------+
|
|
||||||
* | Masking-key (continued) | Payload Data |
|
|
||||||
* +-------------------------------- - - - - - - - - - - - - - - - +
|
|
||||||
* : Payload Data continued :
|
|
||||||
* + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
|
|
||||||
* | Payload Data continued |
|
|
||||||
* +-----------------------------------------------------------------------+
|
|
||||||
*
|
*
|
||||||
* @param buffer
|
* @return boolean 已接收完返回true, 需要继续接收body返回false;
|
||||||
* @param exbuffers
|
|
||||||
*
|
|
||||||
* @return 1:表示解析成功且能继续解析;0:表示解析成功; -1:表示解析失败
|
|
||||||
*/
|
*/
|
||||||
/*
|
boolean receiveBody(WebSocket webSocket, ByteBuffer readBuffer) {
|
||||||
static int decode(final Logger logger, final List<WebSocketPacket> packets,
|
int need = receiveLength - receiveCount;
|
||||||
final int wsmaxbody, final ByteBuffer buffer, ByteBuffer... exbuffers) {
|
boolean over = readBuffer.remaining() >= need;
|
||||||
final boolean debug = false; //调试开关
|
this.receiveBuffers = Utility.append(this.receiveBuffers, readBuffer);
|
||||||
if (debug) {
|
if (over) parseReceiveMessage(webSocket, this.receiveBuffers);
|
||||||
int remain = buffer.remaining();
|
return over;
|
||||||
if (exbuffers != null) {
|
|
||||||
for (ByteBuffer b : exbuffers) {
|
|
||||||
remain += b == null ? 0 : b.remaining();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
logger.log(Level.FINEST, "read websocket message's length = " + remain);
|
|
||||||
}
|
|
||||||
if (buffer.remaining() < 2) return -1;
|
|
||||||
final byte opcode = buffer.get();
|
|
||||||
final WebSocketPacket packet = new WebSocketPacket();
|
|
||||||
packet.last = (opcode & 0b1000_0000) != 0;
|
|
||||||
packet.type = FrameType.valueOf(opcode & 0xF);
|
|
||||||
if (packet.type == FrameType.CLOSE) {
|
|
||||||
if (debug) logger.log(Level.FINEST, " receive close command from websocket client");
|
|
||||||
packets.add(packet);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
final boolean checkrsv = false;//暂时不校验
|
|
||||||
if (checkrsv && (opcode & 0b0111_0000) != 0) {
|
|
||||||
if (debug) logger.log(Level.FINE, "rsv1 rsv2 rsv3 must be 0, but not (" + opcode + ")");
|
|
||||||
return -1; //rsv1 rsv2 rsv3 must be 0
|
|
||||||
}
|
|
||||||
//0x00 表示一个后续帧
|
|
||||||
//0x01 表示一个文本帧
|
|
||||||
//0x02 表示一个二进制帧
|
|
||||||
//0x03-07 为以后的非控制帧保留
|
|
||||||
//0x8 表示一个连接关闭
|
|
||||||
//0x9 表示一个ping
|
|
||||||
//0xA 表示一个pong
|
|
||||||
//0x0B-0F 为以后的控制帧保留
|
|
||||||
final boolean control = (opcode & 0b0000_1000) != 0; //是否控制帧
|
|
||||||
byte lengthCode = buffer.get();
|
|
||||||
final boolean masked = (lengthCode & 0x80) == 0x80;
|
|
||||||
if (masked) lengthCode ^= 0x80; //mask
|
|
||||||
int length;
|
|
||||||
if (lengthCode <= 0x7D) { //125
|
|
||||||
length = lengthCode;
|
|
||||||
} else {
|
|
||||||
if (control) {
|
|
||||||
if (debug) logger.log(Level.FINE, " receive control command from websocket client");
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
if (lengthCode == 0x7E) {//0x7E=126
|
|
||||||
length = (int) buffer.getChar();
|
|
||||||
} else {
|
|
||||||
length = buffer.getInt();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (length > wsmaxbody && wsmaxbody > 0) {
|
|
||||||
if (debug) logger.log(Level.FINE, "message body(" + length + ") is too big, but must less " + wsmaxbody);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
ConvertMask masker = null;
|
|
||||||
if (masked) {
|
|
||||||
final byte[] masks = new byte[4];
|
|
||||||
buffer.get(masks);
|
|
||||||
masker = new ConvertMask() {
|
|
||||||
|
|
||||||
private int index = 0;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public byte unmask(byte value) {
|
|
||||||
return (byte) (value ^ masks[index++ % 4]);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
this.receiveBuffers = Utility.append(new ByteBuffer[]{buffer}, exbuffers);
|
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
/**
|
/**
|
||||||
* 消息解码 <br>
|
* 消息解码 <br>
|
||||||
*
|
*
|
||||||
@@ -418,26 +350,25 @@ public final class WebSocketPacket {
|
|||||||
* @param buffer
|
* @param buffer
|
||||||
* @param exbuffers
|
* @param exbuffers
|
||||||
*
|
*
|
||||||
* @return
|
* @return 返回NONE表示Buffer内容不够; 返回this表示解析完成或部分解析完成;返回null表示解析异常;
|
||||||
*/
|
*/
|
||||||
WebSocketPacket decode(final Logger logger, final ByteBuffer buffer, ByteBuffer... exbuffers) {
|
WebSocketPacket decode(final Logger logger, final WebSocket webSocket, final int wsmaxbody,
|
||||||
|
final AbstractMap.SimpleEntry<String, byte[]> halfBytes, final ByteBuffer buffer) {
|
||||||
|
//开始
|
||||||
final boolean debug = false; //调试开关
|
final boolean debug = false; //调试开关
|
||||||
if (debug) {
|
if (debug) logger.log(Level.FINEST, "read websocket message's length = " + buffer.remaining());
|
||||||
int remain = buffer.remaining();
|
if (!buffer.hasRemaining()) return NONE;
|
||||||
if (exbuffers != null) {
|
if (buffer.remaining() < 2) {
|
||||||
for (ByteBuffer b : exbuffers) {
|
byte[] bs = new byte[buffer.remaining()];
|
||||||
remain += b == null ? 0 : b.remaining();
|
buffer.get(bs);
|
||||||
}
|
halfBytes.setValue(bs);
|
||||||
}
|
return NONE;
|
||||||
logger.log(Level.FINEST, "read websocket message's length = " + remain);
|
|
||||||
}
|
}
|
||||||
if (buffer.remaining() < 2) return null;
|
final byte opcode = buffer.get(); //第一个字节
|
||||||
byte opcode = buffer.get();
|
|
||||||
this.last = (opcode & 0b1000_0000) != 0;
|
this.last = (opcode & 0b1000_0000) != 0;
|
||||||
this.type = FrameType.valueOf(opcode & 0xF);
|
this.type = FrameType.valueOf(opcode & 0xF);
|
||||||
if (type == FrameType.CLOSE) {
|
if (type == FrameType.CLOSE) {
|
||||||
if (debug) logger.log(Level.FINEST, " receive close command from websocket client");
|
if (debug) logger.log(Level.FINEST, " receive close command from websocket client");
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
final boolean checkrsv = false;//暂时不校验
|
final boolean checkrsv = false;//暂时不校验
|
||||||
if (checkrsv && (opcode & 0b0111_0000) != 0) {
|
if (checkrsv && (opcode & 0b0111_0000) != 0) {
|
||||||
@@ -453,9 +384,23 @@ public final class WebSocketPacket {
|
|||||||
//0xA 表示一个pong
|
//0xA 表示一个pong
|
||||||
//0x0B-0F 为以后的控制帧保留
|
//0x0B-0F 为以后的控制帧保留
|
||||||
final boolean control = (opcode & 0b0000_1000) != 0; //是否控制帧
|
final boolean control = (opcode & 0b0000_1000) != 0; //是否控制帧
|
||||||
byte lengthCode = buffer.get();
|
final byte crcode = buffer.get(); //第二个字节
|
||||||
|
|
||||||
|
byte lengthCode = crcode;
|
||||||
final boolean masked = (lengthCode & 0x80) == 0x80;
|
final boolean masked = (lengthCode & 0x80) == 0x80;
|
||||||
if (masked) lengthCode ^= 0x80; //mask
|
if (masked) lengthCode ^= 0x80; //mask
|
||||||
|
|
||||||
|
//判断Buffer剩余内容够不够基本信息的创建
|
||||||
|
int minBufferLength = ((lengthCode <= 0x7D) ? 0 : (lengthCode == 0x7E ? 2 : 4)) + (masked ? 4 : 0);
|
||||||
|
if (buffer.remaining() < minBufferLength) {
|
||||||
|
byte[] bs = new byte[2 + buffer.remaining()];
|
||||||
|
bs[0] = opcode;
|
||||||
|
bs[1] = crcode;
|
||||||
|
buffer.get(bs, 2, buffer.remaining());
|
||||||
|
halfBytes.setValue(bs);
|
||||||
|
return NONE;
|
||||||
|
}
|
||||||
|
|
||||||
int length;
|
int length;
|
||||||
if (lengthCode <= 0x7D) { //125
|
if (lengthCode <= 0x7D) { //125
|
||||||
length = lengthCode;
|
length = lengthCode;
|
||||||
@@ -470,6 +415,11 @@ public final class WebSocketPacket {
|
|||||||
length = buffer.getInt();
|
length = buffer.getInt();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (length > wsmaxbody && wsmaxbody > 0) {
|
||||||
|
if (debug) logger.log(Level.FINE, "message length (" + length + ") too big, must less " + wsmaxbody + "");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
this.receiveLength = length;
|
||||||
if (masked) {
|
if (masked) {
|
||||||
final byte[] masks = new byte[4];
|
final byte[] masks = new byte[4];
|
||||||
buffer.get(masks);
|
buffer.get(masks);
|
||||||
@@ -483,25 +433,65 @@ public final class WebSocketPacket {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
this.receiveBuffers = Utility.append(new ByteBuffer[]{buffer}, exbuffers);
|
if (buffer.remaining() >= this.receiveLength) { //内容足够, 可以解析
|
||||||
|
this.parseReceiveMessage(webSocket, buffer);
|
||||||
|
this.receiveCount = this.receiveLength;
|
||||||
|
} else {
|
||||||
|
this.receiveCount = buffer.remaining();
|
||||||
|
this.receiveBuffers = buffer.hasRemaining() ? new ByteBuffer[]{buffer} : null;
|
||||||
|
}
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
byte[] getReceiveBytes() {
|
void parseReceiveMessage(WebSocket webSocket, ByteBuffer... buffers) {
|
||||||
if (this.receiveBuffers.length == 0) return new byte[0];
|
if (this.type == FrameType.TEXT) {
|
||||||
if (this.receiveBuffers.length == 1 && this.receiveBuffers[0].remaining() == 0) return new byte[0];
|
Convert textConvert = webSocket.getTextConvert();
|
||||||
|
if (textConvert == null) {
|
||||||
int count = 0;
|
this.receiveMessage = new String(this.getReceiveBytes(buffers), UTF_8);
|
||||||
for (ByteBuffer buf : this.receiveBuffers) {
|
this.receiveType = MessageType.STRING;
|
||||||
count += buf.remaining();
|
} else {
|
||||||
|
this.receiveMessage = textConvert.convertFrom(webSocket._messageTextType, this.receiveMasker, buffers);
|
||||||
|
this.receiveCount = this.receiveLength;
|
||||||
|
this.receiveType = MessageType.OBJECT;
|
||||||
|
}
|
||||||
|
} else if (this.type == FrameType.BINARY) {
|
||||||
|
Convert binaryConvert = webSocket.getBinaryConvert();
|
||||||
|
if (binaryConvert == null) {
|
||||||
|
this.receiveMessage = this.getReceiveBytes(buffers);
|
||||||
|
this.receiveType = MessageType.BYTES;
|
||||||
|
} else {
|
||||||
|
this.receiveMessage = binaryConvert.convertFrom(webSocket._messageTextType, this.receiveMasker, buffers);
|
||||||
|
this.receiveCount = this.receiveLength;
|
||||||
|
this.receiveType = MessageType.OBJECT;
|
||||||
|
}
|
||||||
|
} else if (this.type == FrameType.PING) {
|
||||||
|
this.receiveMessage = this.getReceiveBytes(buffers);
|
||||||
|
this.receiveType = MessageType.BYTES;
|
||||||
|
} else if (this.type == FrameType.PONG) {
|
||||||
|
this.receiveMessage = this.getReceiveBytes(buffers);
|
||||||
|
this.receiveType = MessageType.BYTES;
|
||||||
|
} else if (this.type == FrameType.CLOSE) {
|
||||||
|
this.receiveMessage = this.getReceiveBytes(buffers);
|
||||||
|
this.receiveType = MessageType.BYTES;
|
||||||
}
|
}
|
||||||
byte[] bs = new byte[count];
|
}
|
||||||
|
|
||||||
|
boolean isReceiveFinished() {
|
||||||
|
return this.receiveLength <= this.receiveCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[] getReceiveBytes(ByteBuffer... buffers) {
|
||||||
|
final int length = this.receiveLength;
|
||||||
|
if (length == 0) return new byte[0];
|
||||||
|
byte[] bs = new byte[length];
|
||||||
int index = 0;
|
int index = 0;
|
||||||
for (ByteBuffer buf : this.receiveBuffers) {
|
for (ByteBuffer buf : buffers) {
|
||||||
int r = buf.remaining();
|
int r = Math.min(buf.remaining(), length - index);
|
||||||
buf.get(bs, index, r);
|
buf.get(bs, index, r);
|
||||||
index += r;
|
index += r;
|
||||||
|
if (index >= length) break;
|
||||||
}
|
}
|
||||||
|
this.receiveCount = index;
|
||||||
ConvertMask mask = this.receiveMasker;
|
ConvertMask mask = this.receiveMasker;
|
||||||
if (mask != null) {
|
if (mask != null) {
|
||||||
for (int i = 0; i < bs.length; i++) {
|
for (int i = 0; i < bs.length; i++) {
|
||||||
@@ -510,4 +500,5 @@ public final class WebSocketPacket {
|
|||||||
}
|
}
|
||||||
return bs;
|
return bs;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,12 +12,11 @@ import org.redkale.net.http.WebSocketPacket.FrameType;
|
|||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.channels.*;
|
import java.nio.channels.*;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
import java.util.AbstractMap.SimpleEntry;
|
||||||
import java.util.concurrent.*;
|
import java.util.concurrent.*;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.function.BiConsumer;
|
import java.util.function.BiConsumer;
|
||||||
import java.util.logging.*;
|
import java.util.logging.*;
|
||||||
import org.redkale.convert.Convert;
|
|
||||||
import org.redkale.util.Utility;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* WebSocket的消息接收发送器, 一个WebSocket对应一个WebSocketRunner
|
* WebSocket的消息接收发送器, 一个WebSocket对应一个WebSocketRunner
|
||||||
@@ -62,185 +61,273 @@ class WebSocketRunner implements Runnable {
|
|||||||
public void run() {
|
public void run() {
|
||||||
final boolean debug = true;
|
final boolean debug = true;
|
||||||
try {
|
try {
|
||||||
final int wsmaxbody = webSocket._engine.wsmaxbody;
|
|
||||||
webSocket.onConnected();
|
webSocket.onConnected();
|
||||||
channel.setReadTimeoutSecond(300); //读取超时5分钟
|
channel.setReadTimeoutSecond(300); //读取超时5分钟
|
||||||
if (channel.isOpen()) {
|
if (channel.isOpen()) {
|
||||||
|
final int wsmaxbody = webSocket._engine.wsmaxbody;
|
||||||
channel.read(readBuffer, null, new CompletionHandler<Integer, Void>() {
|
channel.read(readBuffer, null, new CompletionHandler<Integer, Void>() {
|
||||||
|
|
||||||
private ByteBuffer recentExBuffer;
|
//尚未解析完的数据包
|
||||||
|
private WebSocketPacket unfinishPacket;
|
||||||
private final List<WebSocketPacket> packets = new ArrayList<>();
|
|
||||||
|
|
||||||
//当接收的数据流长度大于ByteBuffer长度时, 则需要额外的ByteBuffer 辅助;
|
//当接收的数据流长度大于ByteBuffer长度时, 则需要额外的ByteBuffer 辅助;
|
||||||
private final List<ByteBuffer> readBuffers = new ArrayList<>();
|
private final List<ByteBuffer> exBuffers = new ArrayList<>();
|
||||||
|
|
||||||
|
private final SimpleEntry<String, byte[]> halfBytes = new SimpleEntry("", null);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void completed(Integer count, Void attachment1) {
|
public void completed(Integer count, Void attachment1) {
|
||||||
if (count < 1 && readBuffers.isEmpty()) {
|
if (count < 1) {
|
||||||
closeRunner(0);
|
closeRunner(0);
|
||||||
if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on read buffer count, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds");
|
if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on read buffer count, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (readBuffer == null) return;
|
if (readBuffer == null) return;
|
||||||
if (!readBuffer.hasRemaining() && (recentExBuffer == null || !recentExBuffer.hasRemaining())) {
|
|
||||||
final ByteBuffer buffer = context.pollBuffer();
|
|
||||||
recentExBuffer = buffer;
|
|
||||||
readBuffers.add(buffer);
|
|
||||||
channel.read(buffer, null, this);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
readBuffer.flip();
|
readBuffer.flip();
|
||||||
|
|
||||||
ByteBuffer[] exBuffers = null;
|
WebSocketPacket onePacket = null;
|
||||||
if (!readBuffers.isEmpty()) {
|
if (unfinishPacket != null) {
|
||||||
exBuffers = readBuffers.toArray(new ByteBuffer[readBuffers.size()]);
|
if (unfinishPacket.receiveBody(webSocket, readBuffer)) { //已经接收完毕
|
||||||
readBuffers.clear();
|
onePacket = unfinishPacket;
|
||||||
recentExBuffer = null;
|
unfinishPacket = null;
|
||||||
for (ByteBuffer b : exBuffers) {
|
for (ByteBuffer b : exBuffers) {
|
||||||
b.flip();
|
context.offerBuffer(b);
|
||||||
|
}
|
||||||
|
exBuffers.clear();
|
||||||
|
} else { //需要继续接收
|
||||||
|
readBuffer = context.pollBuffer();
|
||||||
|
channel.read(readBuffer, null, this);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final List<WebSocketPacket> packets = new ArrayList<>();
|
||||||
|
if (onePacket != null) packets.add(onePacket);
|
||||||
try {
|
try {
|
||||||
WebSocketPacket packet;
|
while (true) {
|
||||||
try {
|
WebSocketPacket packet = new WebSocketPacket().decode(context.getLogger(), webSocket, wsmaxbody, halfBytes, readBuffer);
|
||||||
packet = new WebSocketPacket().decode(context.getLogger(), readBuffer, exBuffers);
|
if (packet == WebSocketPacket.NONE) break; //解析完毕但是buffer有多余字节
|
||||||
} catch (Exception e) { //接收的消息体解析失败
|
if (packet != null && !packet.isReceiveFinished()) {
|
||||||
webSocket.onOccurException(e, Utility.append(new ByteBuffer[]{readBuffer}, exBuffers == null ? new ByteBuffer[0] : exBuffers));
|
unfinishPacket = packet;
|
||||||
if (readBuffer != null) {
|
if (readBuffer.hasRemaining()) {
|
||||||
readBuffer.clear();
|
exBuffers.add(readBuffer);
|
||||||
channel.read(readBuffer, null, this);
|
readBuffer = context.pollBuffer();
|
||||||
|
}
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
return;
|
packets.add(packet);
|
||||||
|
if (packet == null || !readBuffer.hasRemaining()) break;
|
||||||
}
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
context.getLogger().log(Level.SEVERE, "WebSocket parse message error", e);
|
||||||
|
webSocket.onOccurException(e, null);
|
||||||
|
}
|
||||||
|
//继续监听消息
|
||||||
|
readBuffer.clear();
|
||||||
|
if (halfBytes.getValue() != null) {
|
||||||
|
readBuffer.put(halfBytes.getValue());
|
||||||
|
halfBytes.setValue(null);
|
||||||
|
}
|
||||||
|
channel.read(readBuffer, null, this);
|
||||||
|
|
||||||
|
//消息处理
|
||||||
|
for (final WebSocketPacket packet : packets) {
|
||||||
if (packet == null) {
|
if (packet == null) {
|
||||||
failed(null, attachment1);
|
failed(null, attachment1);
|
||||||
if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on decode WebSocketPacket, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds");
|
if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on decode WebSocketPacket, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (packet.type == FrameType.TEXT) {
|
if (packet.type == FrameType.TEXT) {
|
||||||
Convert textConvert = webSocket.getTextConvert();
|
try {
|
||||||
if (textConvert == null) {
|
if (packet.receiveType == WebSocketPacket.MessageType.STRING) {
|
||||||
byte[] message = packet.getReceiveBytes();
|
webSocket.onMessage((String) packet.receiveMessage, packet.last);
|
||||||
if (readBuffer != null) {
|
} else {
|
||||||
readBuffer.clear();
|
|
||||||
channel.read(readBuffer, null, this);
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
webSocket.onMessage(new String(message, "UTF-8"), packet.last);
|
|
||||||
} catch (Exception e) {
|
|
||||||
context.getLogger().log(Level.SEVERE, "WebSocket onBinaryMessage error (" + packet + ")", e);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
Object message;
|
|
||||||
try {
|
|
||||||
message = textConvert.convertFrom(webSocket._messageTextType, packet.receiveMasker, packet.receiveBuffers);
|
|
||||||
} catch (Exception e) { //接收的消息体解析失败
|
|
||||||
webSocket.onOccurException(e, packet.receiveBuffers);
|
|
||||||
if (readBuffer != null) {
|
|
||||||
readBuffer.clear();
|
|
||||||
channel.read(readBuffer, null, this);
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (readBuffer != null) {
|
|
||||||
readBuffer.clear();
|
|
||||||
channel.read(readBuffer, null, this);
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
if (restMessageConsumer != null) { //主要供RestWebSocket使用
|
if (restMessageConsumer != null) { //主要供RestWebSocket使用
|
||||||
restMessageConsumer.accept(webSocket, message);
|
restMessageConsumer.accept(webSocket, packet.receiveMessage);
|
||||||
} else {
|
} else {
|
||||||
webSocket.onMessage(message, packet.last);
|
webSocket.onMessage(packet.receiveMessage, packet.last);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
|
||||||
context.getLogger().log(Level.SEVERE, "WebSocket onTextMessage error (" + packet + ")", e);
|
|
||||||
}
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
context.getLogger().log(Level.SEVERE, "WebSocket onTextMessage error (" + packet + ")", e);
|
||||||
}
|
}
|
||||||
} else if (packet.type == FrameType.BINARY) {
|
} else if (packet.type == FrameType.BINARY) {
|
||||||
Convert binaryConvert = webSocket.getBinaryConvert();
|
|
||||||
if (binaryConvert == null) {
|
|
||||||
byte[] message = packet.getReceiveBytes();
|
|
||||||
if (readBuffer != null) {
|
|
||||||
readBuffer.clear();
|
|
||||||
channel.read(readBuffer, null, this);
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
webSocket.onMessage(message, packet.last);
|
|
||||||
} catch (Exception e) {
|
|
||||||
context.getLogger().log(Level.SEVERE, "WebSocket onBinaryMessage error (" + packet + ")", e);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
Object message;
|
|
||||||
try {
|
|
||||||
message = binaryConvert.convertFrom(webSocket._messageTextType, packet.receiveMasker, packet.receiveBuffers);
|
|
||||||
} catch (Exception e) { //接收的消息体解析失败
|
|
||||||
webSocket.onOccurException(e, packet.receiveBuffers);
|
|
||||||
if (readBuffer != null) {
|
|
||||||
readBuffer.clear();
|
|
||||||
channel.read(readBuffer, null, this);
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (readBuffer != null) {
|
|
||||||
readBuffer.clear();
|
|
||||||
channel.read(readBuffer, null, this);
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
if (restMessageConsumer != null) { //主要供RestWebSocket使用
|
|
||||||
restMessageConsumer.accept(webSocket, message);
|
|
||||||
} else {
|
|
||||||
webSocket.onMessage(message, packet.last);
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
context.getLogger().log(Level.SEVERE, "WebSocket onTextMessage error (" + packet + ")", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if (packet.type == FrameType.PONG) {
|
|
||||||
byte[] message = packet.getReceiveBytes();
|
|
||||||
if (readBuffer != null) {
|
|
||||||
readBuffer.clear();
|
|
||||||
channel.read(readBuffer, null, this);
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
webSocket.onPong(message);
|
if (packet.receiveType == WebSocketPacket.MessageType.BYTES) {
|
||||||
|
webSocket.onMessage((byte[]) packet.receiveMessage, packet.last);
|
||||||
|
} else {
|
||||||
|
if (restMessageConsumer != null) { //主要供RestWebSocket使用
|
||||||
|
restMessageConsumer.accept(webSocket, packet.receiveMessage);
|
||||||
|
} else {
|
||||||
|
webSocket.onMessage(packet.receiveMessage, packet.last);
|
||||||
|
}
|
||||||
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
context.getLogger().log(Level.SEVERE, "WebSocket onPong error (" + packet + ")", e);
|
context.getLogger().log(Level.SEVERE, "WebSocket onBinaryMessage error (" + packet + ")", e);
|
||||||
}
|
}
|
||||||
} else if (packet.type == FrameType.PING) {
|
} else if (packet.type == FrameType.PING) {
|
||||||
byte[] message = packet.getReceiveBytes();
|
|
||||||
if (readBuffer != null) {
|
|
||||||
readBuffer.clear();
|
|
||||||
channel.read(readBuffer, null, this);
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
webSocket.onPing(message);
|
webSocket.onPing((byte[]) packet.receiveMessage);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
context.getLogger().log(Level.SEVERE, "WebSocket onPing error (" + packet + ")", e);
|
context.getLogger().log(Level.SEVERE, "WebSocket onPing error (" + packet + ")", e);
|
||||||
}
|
}
|
||||||
|
} else if (packet.type == FrameType.PONG) {
|
||||||
|
try {
|
||||||
|
webSocket.onPong((byte[]) packet.receiveMessage);
|
||||||
|
} catch (Exception e) {
|
||||||
|
context.getLogger().log(Level.SEVERE, "WebSocket onPong error (" + packet + ")", e);
|
||||||
|
}
|
||||||
} else if (packet.type == FrameType.CLOSE) {
|
} else if (packet.type == FrameType.CLOSE) {
|
||||||
Logger logger = context.getLogger();
|
Logger logger = context.getLogger();
|
||||||
if (logger.isLoggable(Level.FINEST)) logger.log(Level.FINEST, "WebSocketRunner onMessage by CLOSE FrameType : " + packet);
|
if (logger.isLoggable(Level.FINEST)) logger.log(Level.FINEST, "WebSocketRunner onMessage by CLOSE FrameType : " + packet);
|
||||||
closeRunner(0);
|
closeRunner(0);
|
||||||
|
return;
|
||||||
} else {
|
} else {
|
||||||
context.getLogger().log(Level.WARNING, "WebSocketRunner onMessage by unknown FrameType : " + packet);
|
context.getLogger().log(Level.WARNING, "WebSocketRunner onMessage by unknown FrameType : " + packet);
|
||||||
if (readBuffer != null) {
|
closeRunner(0);
|
||||||
readBuffer.clear();
|
return;
|
||||||
channel.read(readBuffer, null, this);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (Throwable t) {
|
|
||||||
closeRunner(0);
|
|
||||||
if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on read WebSocketPacket, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", t);
|
|
||||||
} finally {
|
|
||||||
if (exBuffers != null) {
|
|
||||||
for (ByteBuffer b : exBuffers) {
|
|
||||||
context.offerBuffer(b);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// if (true) return; //以下代码废弃
|
||||||
|
// try {
|
||||||
|
// WebSocketPacket packet;
|
||||||
|
// try {
|
||||||
|
// packet = new WebSocketPacket().decode(context.getLogger(), readBuffer, exBuffers);
|
||||||
|
// } catch (Exception e) { //接收的消息体解析失败
|
||||||
|
// webSocket.onOccurException(e, Utility.append(new ByteBuffer[]{readBuffer}, exBuffers == null ? new ByteBuffer[0] : exBuffers));
|
||||||
|
// if (readBuffer != null) {
|
||||||
|
// readBuffer.clear();
|
||||||
|
// channel.read(readBuffer, null, this);
|
||||||
|
// }
|
||||||
|
// return;
|
||||||
|
// }
|
||||||
|
// if (packet == null) {
|
||||||
|
// failed(null, attachment1);
|
||||||
|
// if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on decode WebSocketPacket, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds");
|
||||||
|
// return;
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// if (packet.type == FrameType.TEXT) {
|
||||||
|
// Convert textConvert = webSocket.getTextConvert();
|
||||||
|
// if (textConvert == null) {
|
||||||
|
// byte[] message = packet.getReceiveBytes();
|
||||||
|
// if (readBuffer != null) {
|
||||||
|
// readBuffer.clear();
|
||||||
|
// channel.read(readBuffer, null, this);
|
||||||
|
// }
|
||||||
|
// try {
|
||||||
|
// webSocket.onMessage(new String(message, "UTF-8"), packet.last);
|
||||||
|
// } catch (Exception e) {
|
||||||
|
// context.getLogger().log(Level.SEVERE, "WebSocket onBinaryMessage error (" + packet + ")", e);
|
||||||
|
// }
|
||||||
|
// } else {
|
||||||
|
// Object message;
|
||||||
|
// try {
|
||||||
|
// message = textConvert.convertFrom(webSocket._messageTextType, packet.receiveMasker, packet.receiveBuffers);
|
||||||
|
// } catch (Exception e) { //接收的消息体解析失败
|
||||||
|
// webSocket.onOccurException(e, packet.receiveBuffers);
|
||||||
|
// if (readBuffer != null) {
|
||||||
|
// readBuffer.clear();
|
||||||
|
// channel.read(readBuffer, null, this);
|
||||||
|
// }
|
||||||
|
// return;
|
||||||
|
// }
|
||||||
|
// if (readBuffer != null) {
|
||||||
|
// readBuffer.clear();
|
||||||
|
// channel.read(readBuffer, null, this);
|
||||||
|
// }
|
||||||
|
// try {
|
||||||
|
// if (restMessageConsumer != null) { //主要供RestWebSocket使用
|
||||||
|
// restMessageConsumer.accept(webSocket, message);
|
||||||
|
// } else {
|
||||||
|
// webSocket.onMessage(message, packet.last);
|
||||||
|
// }
|
||||||
|
// } catch (Exception e) {
|
||||||
|
// context.getLogger().log(Level.SEVERE, "WebSocket onTextMessage error (" + packet + ")", e);
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// } else if (packet.type == FrameType.BINARY) {
|
||||||
|
// Convert binaryConvert = webSocket.getBinaryConvert();
|
||||||
|
// if (binaryConvert == null) {
|
||||||
|
// byte[] message = packet.getReceiveBytes();
|
||||||
|
// if (readBuffer != null) {
|
||||||
|
// readBuffer.clear();
|
||||||
|
// channel.read(readBuffer, null, this);
|
||||||
|
// }
|
||||||
|
// try {
|
||||||
|
// webSocket.onMessage(message, packet.last);
|
||||||
|
// } catch (Exception e) {
|
||||||
|
// context.getLogger().log(Level.SEVERE, "WebSocket onBinaryMessage error (" + packet + ")", e);
|
||||||
|
// }
|
||||||
|
// } else {
|
||||||
|
// Object message;
|
||||||
|
// try {
|
||||||
|
// message = binaryConvert.convertFrom(webSocket._messageTextType, packet.receiveMasker, packet.receiveBuffers);
|
||||||
|
// } catch (Exception e) { //接收的消息体解析失败
|
||||||
|
// webSocket.onOccurException(e, packet.receiveBuffers);
|
||||||
|
// if (readBuffer != null) {
|
||||||
|
// readBuffer.clear();
|
||||||
|
// channel.read(readBuffer, null, this);
|
||||||
|
// }
|
||||||
|
// return;
|
||||||
|
// }
|
||||||
|
// if (readBuffer != null) {
|
||||||
|
// readBuffer.clear();
|
||||||
|
// channel.read(readBuffer, null, this);
|
||||||
|
// }
|
||||||
|
// try {
|
||||||
|
// if (restMessageConsumer != null) { //主要供RestWebSocket使用
|
||||||
|
// restMessageConsumer.accept(webSocket, message);
|
||||||
|
// } else {
|
||||||
|
// webSocket.onMessage(message, packet.last);
|
||||||
|
// }
|
||||||
|
// } catch (Exception e) {
|
||||||
|
// context.getLogger().log(Level.SEVERE, "WebSocket onTextMessage error (" + packet + ")", e);
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// } else if (packet.type == FrameType.PONG) {
|
||||||
|
// byte[] message = packet.getReceiveBytes();
|
||||||
|
// if (readBuffer != null) {
|
||||||
|
// readBuffer.clear();
|
||||||
|
// channel.read(readBuffer, null, this);
|
||||||
|
// }
|
||||||
|
// try {
|
||||||
|
// webSocket.onPong(message);
|
||||||
|
// } catch (Exception e) {
|
||||||
|
// context.getLogger().log(Level.SEVERE, "WebSocket onPong error (" + packet + ")", e);
|
||||||
|
// }
|
||||||
|
// } else if (packet.type == FrameType.PING) {
|
||||||
|
// byte[] message = packet.getReceiveBytes();
|
||||||
|
// if (readBuffer != null) {
|
||||||
|
// readBuffer.clear();
|
||||||
|
// channel.read(readBuffer, null, this);
|
||||||
|
// }
|
||||||
|
// try {
|
||||||
|
// webSocket.onPing(message);
|
||||||
|
// } catch (Exception e) {
|
||||||
|
// context.getLogger().log(Level.SEVERE, "WebSocket onPing error (" + packet + ")", e);
|
||||||
|
// }
|
||||||
|
// } else if (packet.type == FrameType.CLOSE) {
|
||||||
|
// Logger logger = context.getLogger();
|
||||||
|
// if (logger.isLoggable(Level.FINEST)) logger.log(Level.FINEST, "WebSocketRunner onMessage by CLOSE FrameType : " + packet);
|
||||||
|
// closeRunner(0);
|
||||||
|
// } else {
|
||||||
|
// context.getLogger().log(Level.WARNING, "WebSocketRunner onMessage by unknown FrameType : " + packet);
|
||||||
|
// if (readBuffer != null) {
|
||||||
|
// readBuffer.clear();
|
||||||
|
// channel.read(readBuffer, null, this);
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// } catch (Throwable t) {
|
||||||
|
// closeRunner(0);
|
||||||
|
// if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on read WebSocketPacket, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", t);
|
||||||
|
// } finally {
|
||||||
|
// if (exBuffers != null) {
|
||||||
|
// for (ByteBuffer b : exBuffers) {
|
||||||
|
// context.offerBuffer(b);
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
Reference in New Issue
Block a user