From 685667fd69dd367546e6bfd4ae45c45b5d2e3038 Mon Sep 17 00:00:00 2001
From: Redkale <22250530@qq.com>
Date: Thu, 16 Nov 2017 17:11:11 +0800
Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9WebSocketPacket=E8=A7=A3?=
=?UTF-8?q?=E6=9E=90=E5=8D=8F=E8=AE=AE=E6=96=B9=E5=BC=8F=EF=BC=8C=E5=B9=B6?=
=?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=A4=9A=E6=8A=A5=E5=B9=B6=E5=8F=91=E8=AF=B7?=
=?UTF-8?q?=E6=B1=82=E5=AF=BC=E8=87=B4=E5=90=8E=E9=9D=A2=E7=9A=84=E6=B6=88?=
=?UTF-8?q?=E6=81=AF=E4=B8=A2=E5=A4=B1=E7=9A=84BUG?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
src/org/redkale/net/http/WebSocketPacket.java | 245 ++++++-------
src/org/redkale/net/http/WebSocketRunner.java | 347 +++++++++++-------
2 files changed, 335 insertions(+), 257 deletions(-)
diff --git a/src/org/redkale/net/http/WebSocketPacket.java b/src/org/redkale/net/http/WebSocketPacket.java
index e899dc4f8..c284e0998 100644
--- a/src/org/redkale/net/http/WebSocketPacket.java
+++ b/src/org/redkale/net/http/WebSocketPacket.java
@@ -8,6 +8,8 @@ package org.redkale.net.http;
import org.redkale.util.Utility;
import java.io.*;
import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.AbstractMap;
import java.util.function.Supplier;
import java.util.logging.*;
import org.redkale.convert.*;
@@ -21,8 +23,16 @@ import org.redkale.convert.*;
*/
public final class WebSocketPacket {
+ private static final Charset UTF_8 = Charset.forName("UTF-8");
+
+ static final WebSocketPacket NONE = new WebSocketPacket();
+
public static final WebSocketPacket DEFAULT_PING_PACKET = new WebSocketPacket(FrameType.PING, new byte[0]);
+ public static enum MessageType {
+ STRING, BYTES, OBJECT;
+ }
+
public static enum FrameType {
TEXT(0x01), BINARY(0x02), CLOSE(0x08), PING(0x09), PONG(0x0A);
@@ -57,14 +67,24 @@ public final class WebSocketPacket {
protected boolean last = true;
+ //---------------发送------------------------
Object sendJson;
Convert sendConvert;
- boolean mapconvable;
+ boolean sendMapconvable;
ByteBuffer[] sendBuffers;
+ //---------------接收------------------------
+ MessageType receiveType;
+
+ int receiveCount;
+
+ int receiveLength;
+
+ Object receiveMessage;
+
ConvertMask receiveMasker;
ByteBuffer[] receiveBuffers;
@@ -119,7 +139,7 @@ public final class WebSocketPacket {
WebSocketPacket(Convert convert, boolean mapconvable, Object json, boolean fin) {
this.type = (convert == null || !convert.isBinary()) ? FrameType.TEXT : FrameType.BINARY;
this.sendConvert = convert;
- this.mapconvable = mapconvable;
+ this.sendMapconvable = mapconvable;
this.sendJson = json;
this.last = fin;
if (mapconvable && !(json instanceof Object[])) throw new IllegalArgumentException();
@@ -183,7 +203,7 @@ public final class WebSocketPacket {
@Override
public String toString() {
- return this.getClass().getSimpleName() + "[type=" + type + ", last=" + last + (payload != null ? (", payload=" + payload) : "") + (bytes != null ? (", bytes=[" + bytes.length + ']') : "") + (sendJson != null ? (", json=" + (mapconvable ? Utility.ofMap((Object[]) sendJson) : sendJson)) : "") + "]";
+ return this.getClass().getSimpleName() + "[type=" + type + ", last=" + last + (payload != null ? (", payload=" + payload) : "") + (bytes != null ? (", bytes=[" + bytes.length + ']') : (receiveLength > 0 ? (", receivemsg=" + receiveMessage) : "")) + (sendJson != null ? (", json=" + (sendMapconvable ? Utility.ofMap((Object[]) sendJson) : sendJson)) : "") + "]";
}
/**
@@ -211,7 +231,7 @@ public final class WebSocketPacket {
return supplier.get();
}
};
- ByteBuffer[] buffers = this.mapconvable ? this.sendConvert.convertMapTo(newsupplier, (Object[]) sendJson) : this.sendConvert.convertTo(newsupplier, sendJson);
+ ByteBuffer[] buffers = this.sendMapconvable ? this.sendConvert.convertMapTo(newsupplier, (Object[]) sendJson) : this.sendConvert.convertTo(newsupplier, sendJson);
int len = 0;
for (ByteBuffer buf : buffers) {
len += buf.remaining();
@@ -291,108 +311,20 @@ public final class WebSocketPacket {
// System.out.println(rs);
// }
/**
- * 消息解码
*
- * 0 1 2 3
- * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
- * +-+-+-+-+-------+-+-------------+-------------------------------+
- * |F|R|R|R| opcode|M| Payload len | Extended payload length |
- * |I|S|S|S| (4) |A| (7) | (16/64) |
- * |N|V|V|V| |S| | (if payload len==126/127) |
- * | |1|2|3| |K| | |
- * +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
- * | Extended payload length continued, if payload len == 127 |
- * + - - - - - - - - - - - - - - - +-------------------------------+
- * | |Masking-key, if MASK set to 1 |
- * +-------------------------------+-------------------------------+
- * | Masking-key (continued) | Payload Data |
- * +-------------------------------- - - - - - - - - - - - - - - - +
- * : Payload Data continued :
- * + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
- * | Payload Data continued |
- * +-----------------------------------------------------------------------+
+ * @param webSocket WebSocket
+ * @param readBuffer ByteBuffer
*
- * @param buffer
- * @param exbuffers
- *
- * @return 1:表示解析成功且能继续解析;0:表示解析成功; -1:表示解析失败
+ * @return boolean 已接收完返回true, 需要继续接收body返回false;
*/
- /*
- static int decode(final Logger logger, final List packets,
- final int wsmaxbody, final ByteBuffer buffer, ByteBuffer... exbuffers) {
- final boolean debug = false; //调试开关
- if (debug) {
- int remain = buffer.remaining();
- if (exbuffers != null) {
- for (ByteBuffer b : exbuffers) {
- remain += b == null ? 0 : b.remaining();
- }
- }
- logger.log(Level.FINEST, "read websocket message's length = " + remain);
- }
- if (buffer.remaining() < 2) return -1;
- final byte opcode = buffer.get();
- final WebSocketPacket packet = new WebSocketPacket();
- packet.last = (opcode & 0b1000_0000) != 0;
- packet.type = FrameType.valueOf(opcode & 0xF);
- if (packet.type == FrameType.CLOSE) {
- if (debug) logger.log(Level.FINEST, " receive close command from websocket client");
- packets.add(packet);
- return 0;
- }
- final boolean checkrsv = false;//暂时不校验
- if (checkrsv && (opcode & 0b0111_0000) != 0) {
- if (debug) logger.log(Level.FINE, "rsv1 rsv2 rsv3 must be 0, but not (" + opcode + ")");
- return -1; //rsv1 rsv2 rsv3 must be 0
- }
- //0x00 表示一个后续帧
- //0x01 表示一个文本帧
- //0x02 表示一个二进制帧
- //0x03-07 为以后的非控制帧保留
- //0x8 表示一个连接关闭
- //0x9 表示一个ping
- //0xA 表示一个pong
- //0x0B-0F 为以后的控制帧保留
- final boolean control = (opcode & 0b0000_1000) != 0; //是否控制帧
- byte lengthCode = buffer.get();
- final boolean masked = (lengthCode & 0x80) == 0x80;
- if (masked) lengthCode ^= 0x80; //mask
- int length;
- if (lengthCode <= 0x7D) { //125
- length = lengthCode;
- } else {
- if (control) {
- if (debug) logger.log(Level.FINE, " receive control command from websocket client");
- return -1;
- }
- if (lengthCode == 0x7E) {//0x7E=126
- length = (int) buffer.getChar();
- } else {
- length = buffer.getInt();
- }
- }
- if (length > wsmaxbody && wsmaxbody > 0) {
- if (debug) logger.log(Level.FINE, "message body(" + length + ") is too big, but must less " + wsmaxbody);
- return -1;
- }
- ConvertMask masker = null;
- if (masked) {
- final byte[] masks = new byte[4];
- buffer.get(masks);
- masker = new ConvertMask() {
-
- private int index = 0;
-
- @Override
- public byte unmask(byte value) {
- return (byte) (value ^ masks[index++ % 4]);
- }
- };
- }
- this.receiveBuffers = Utility.append(new ByteBuffer[]{buffer}, exbuffers);
- return this;
+ boolean receiveBody(WebSocket webSocket, ByteBuffer readBuffer) {
+ int need = receiveLength - receiveCount;
+ boolean over = readBuffer.remaining() >= need;
+ this.receiveBuffers = Utility.append(this.receiveBuffers, readBuffer);
+ if (over) parseReceiveMessage(webSocket, this.receiveBuffers);
+ return over;
}
-*/
+
/**
* 消息解码
*
@@ -418,26 +350,25 @@ public final class WebSocketPacket {
* @param buffer
* @param exbuffers
*
- * @return
+ * @return 返回NONE表示Buffer内容不够; 返回this表示解析完成或部分解析完成;返回null表示解析异常;
*/
- WebSocketPacket decode(final Logger logger, final ByteBuffer buffer, ByteBuffer... exbuffers) {
+ WebSocketPacket decode(final Logger logger, final WebSocket webSocket, final int wsmaxbody,
+ final AbstractMap.SimpleEntry halfBytes, final ByteBuffer buffer) {
+ //开始
final boolean debug = false; //调试开关
- if (debug) {
- int remain = buffer.remaining();
- if (exbuffers != null) {
- for (ByteBuffer b : exbuffers) {
- remain += b == null ? 0 : b.remaining();
- }
- }
- logger.log(Level.FINEST, "read websocket message's length = " + remain);
+ if (debug) logger.log(Level.FINEST, "read websocket message's length = " + buffer.remaining());
+ if (!buffer.hasRemaining()) return NONE;
+ if (buffer.remaining() < 2) {
+ byte[] bs = new byte[buffer.remaining()];
+ buffer.get(bs);
+ halfBytes.setValue(bs);
+ return NONE;
}
- if (buffer.remaining() < 2) return null;
- byte opcode = buffer.get();
+ final byte opcode = buffer.get(); //第一个字节
this.last = (opcode & 0b1000_0000) != 0;
this.type = FrameType.valueOf(opcode & 0xF);
if (type == FrameType.CLOSE) {
if (debug) logger.log(Level.FINEST, " receive close command from websocket client");
- return this;
}
final boolean checkrsv = false;//暂时不校验
if (checkrsv && (opcode & 0b0111_0000) != 0) {
@@ -453,9 +384,23 @@ public final class WebSocketPacket {
//0xA 表示一个pong
//0x0B-0F 为以后的控制帧保留
final boolean control = (opcode & 0b0000_1000) != 0; //是否控制帧
- byte lengthCode = buffer.get();
+ final byte crcode = buffer.get(); //第二个字节
+
+ byte lengthCode = crcode;
final boolean masked = (lengthCode & 0x80) == 0x80;
if (masked) lengthCode ^= 0x80; //mask
+
+ //判断Buffer剩余内容够不够基本信息的创建
+ int minBufferLength = ((lengthCode <= 0x7D) ? 0 : (lengthCode == 0x7E ? 2 : 4)) + (masked ? 4 : 0);
+ if (buffer.remaining() < minBufferLength) {
+ byte[] bs = new byte[2 + buffer.remaining()];
+ bs[0] = opcode;
+ bs[1] = crcode;
+ buffer.get(bs, 2, buffer.remaining());
+ halfBytes.setValue(bs);
+ return NONE;
+ }
+
int length;
if (lengthCode <= 0x7D) { //125
length = lengthCode;
@@ -470,6 +415,11 @@ public final class WebSocketPacket {
length = buffer.getInt();
}
}
+ if (length > wsmaxbody && wsmaxbody > 0) {
+ if (debug) logger.log(Level.FINE, "message length (" + length + ") too big, must less " + wsmaxbody + "");
+ return null;
+ }
+ this.receiveLength = length;
if (masked) {
final byte[] masks = new byte[4];
buffer.get(masks);
@@ -483,25 +433,65 @@ public final class WebSocketPacket {
}
};
}
- this.receiveBuffers = Utility.append(new ByteBuffer[]{buffer}, exbuffers);
+ if (buffer.remaining() >= this.receiveLength) { //内容足够, 可以解析
+ this.parseReceiveMessage(webSocket, buffer);
+ this.receiveCount = this.receiveLength;
+ } else {
+ this.receiveCount = buffer.remaining();
+ this.receiveBuffers = buffer.hasRemaining() ? new ByteBuffer[]{buffer} : null;
+ }
return this;
}
- byte[] getReceiveBytes() {
- if (this.receiveBuffers.length == 0) return new byte[0];
- if (this.receiveBuffers.length == 1 && this.receiveBuffers[0].remaining() == 0) return new byte[0];
-
- int count = 0;
- for (ByteBuffer buf : this.receiveBuffers) {
- count += buf.remaining();
+ void parseReceiveMessage(WebSocket webSocket, ByteBuffer... buffers) {
+ if (this.type == FrameType.TEXT) {
+ Convert textConvert = webSocket.getTextConvert();
+ if (textConvert == null) {
+ this.receiveMessage = new String(this.getReceiveBytes(buffers), UTF_8);
+ this.receiveType = MessageType.STRING;
+ } else {
+ this.receiveMessage = textConvert.convertFrom(webSocket._messageTextType, this.receiveMasker, buffers);
+ this.receiveCount = this.receiveLength;
+ this.receiveType = MessageType.OBJECT;
+ }
+ } else if (this.type == FrameType.BINARY) {
+ Convert binaryConvert = webSocket.getBinaryConvert();
+ if (binaryConvert == null) {
+ this.receiveMessage = this.getReceiveBytes(buffers);
+ this.receiveType = MessageType.BYTES;
+ } else {
+ this.receiveMessage = binaryConvert.convertFrom(webSocket._messageTextType, this.receiveMasker, buffers);
+ this.receiveCount = this.receiveLength;
+ this.receiveType = MessageType.OBJECT;
+ }
+ } else if (this.type == FrameType.PING) {
+ this.receiveMessage = this.getReceiveBytes(buffers);
+ this.receiveType = MessageType.BYTES;
+ } else if (this.type == FrameType.PONG) {
+ this.receiveMessage = this.getReceiveBytes(buffers);
+ this.receiveType = MessageType.BYTES;
+ } else if (this.type == FrameType.CLOSE) {
+ this.receiveMessage = this.getReceiveBytes(buffers);
+ this.receiveType = MessageType.BYTES;
}
- byte[] bs = new byte[count];
+ }
+
+ boolean isReceiveFinished() {
+ return this.receiveLength <= this.receiveCount;
+ }
+
+ byte[] getReceiveBytes(ByteBuffer... buffers) {
+ final int length = this.receiveLength;
+ if (length == 0) return new byte[0];
+ byte[] bs = new byte[length];
int index = 0;
- for (ByteBuffer buf : this.receiveBuffers) {
- int r = buf.remaining();
+ for (ByteBuffer buf : buffers) {
+ int r = Math.min(buf.remaining(), length - index);
buf.get(bs, index, r);
index += r;
+ if (index >= length) break;
}
+ this.receiveCount = index;
ConvertMask mask = this.receiveMasker;
if (mask != null) {
for (int i = 0; i < bs.length; i++) {
@@ -510,4 +500,5 @@ public final class WebSocketPacket {
}
return bs;
}
+
}
diff --git a/src/org/redkale/net/http/WebSocketRunner.java b/src/org/redkale/net/http/WebSocketRunner.java
index bbdf35111..203db80a9 100644
--- a/src/org/redkale/net/http/WebSocketRunner.java
+++ b/src/org/redkale/net/http/WebSocketRunner.java
@@ -12,12 +12,11 @@ import org.redkale.net.http.WebSocketPacket.FrameType;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.*;
+import java.util.AbstractMap.SimpleEntry;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.logging.*;
-import org.redkale.convert.Convert;
-import org.redkale.util.Utility;
/**
* WebSocket的消息接收发送器, 一个WebSocket对应一个WebSocketRunner
@@ -62,185 +61,273 @@ class WebSocketRunner implements Runnable {
public void run() {
final boolean debug = true;
try {
- final int wsmaxbody = webSocket._engine.wsmaxbody;
webSocket.onConnected();
channel.setReadTimeoutSecond(300); //读取超时5分钟
if (channel.isOpen()) {
+ final int wsmaxbody = webSocket._engine.wsmaxbody;
channel.read(readBuffer, null, new CompletionHandler() {
- private ByteBuffer recentExBuffer;
-
- private final List packets = new ArrayList<>();
+ //尚未解析完的数据包
+ private WebSocketPacket unfinishPacket;
//当接收的数据流长度大于ByteBuffer长度时, 则需要额外的ByteBuffer 辅助;
- private final List readBuffers = new ArrayList<>();
+ private final List exBuffers = new ArrayList<>();
+
+ private final SimpleEntry halfBytes = new SimpleEntry("", null);
@Override
public void completed(Integer count, Void attachment1) {
- if (count < 1 && readBuffers.isEmpty()) {
+ if (count < 1) {
closeRunner(0);
if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on read buffer count, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds");
return;
}
if (readBuffer == null) return;
- if (!readBuffer.hasRemaining() && (recentExBuffer == null || !recentExBuffer.hasRemaining())) {
- final ByteBuffer buffer = context.pollBuffer();
- recentExBuffer = buffer;
- readBuffers.add(buffer);
- channel.read(buffer, null, this);
- return;
- }
readBuffer.flip();
- ByteBuffer[] exBuffers = null;
- if (!readBuffers.isEmpty()) {
- exBuffers = readBuffers.toArray(new ByteBuffer[readBuffers.size()]);
- readBuffers.clear();
- recentExBuffer = null;
- for (ByteBuffer b : exBuffers) {
- b.flip();
+ WebSocketPacket onePacket = null;
+ if (unfinishPacket != null) {
+ if (unfinishPacket.receiveBody(webSocket, readBuffer)) { //已经接收完毕
+ onePacket = unfinishPacket;
+ unfinishPacket = null;
+ for (ByteBuffer b : exBuffers) {
+ context.offerBuffer(b);
+ }
+ exBuffers.clear();
+ } else { //需要继续接收
+ readBuffer = context.pollBuffer();
+ channel.read(readBuffer, null, this);
+ return;
}
}
+ final List packets = new ArrayList<>();
+ if (onePacket != null) packets.add(onePacket);
try {
- WebSocketPacket packet;
- try {
- packet = new WebSocketPacket().decode(context.getLogger(), readBuffer, exBuffers);
- } catch (Exception e) { //接收的消息体解析失败
- webSocket.onOccurException(e, Utility.append(new ByteBuffer[]{readBuffer}, exBuffers == null ? new ByteBuffer[0] : exBuffers));
- if (readBuffer != null) {
- readBuffer.clear();
- channel.read(readBuffer, null, this);
+ while (true) {
+ WebSocketPacket packet = new WebSocketPacket().decode(context.getLogger(), webSocket, wsmaxbody, halfBytes, readBuffer);
+ if (packet == WebSocketPacket.NONE) break; //解析完毕但是buffer有多余字节
+ if (packet != null && !packet.isReceiveFinished()) {
+ unfinishPacket = packet;
+ if (readBuffer.hasRemaining()) {
+ exBuffers.add(readBuffer);
+ readBuffer = context.pollBuffer();
+ }
+ break;
}
- return;
+ packets.add(packet);
+ if (packet == null || !readBuffer.hasRemaining()) break;
}
+ } catch (Exception e) {
+ context.getLogger().log(Level.SEVERE, "WebSocket parse message error", e);
+ webSocket.onOccurException(e, null);
+ }
+ //继续监听消息
+ readBuffer.clear();
+ if (halfBytes.getValue() != null) {
+ readBuffer.put(halfBytes.getValue());
+ halfBytes.setValue(null);
+ }
+ channel.read(readBuffer, null, this);
+
+ //消息处理
+ for (final WebSocketPacket packet : packets) {
if (packet == null) {
failed(null, attachment1);
if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on decode WebSocketPacket, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds");
return;
}
-
if (packet.type == FrameType.TEXT) {
- Convert textConvert = webSocket.getTextConvert();
- if (textConvert == null) {
- byte[] message = packet.getReceiveBytes();
- if (readBuffer != null) {
- readBuffer.clear();
- channel.read(readBuffer, null, this);
- }
- try {
- webSocket.onMessage(new String(message, "UTF-8"), packet.last);
- } catch (Exception e) {
- context.getLogger().log(Level.SEVERE, "WebSocket onBinaryMessage error (" + packet + ")", e);
- }
- } else {
- Object message;
- try {
- message = textConvert.convertFrom(webSocket._messageTextType, packet.receiveMasker, packet.receiveBuffers);
- } catch (Exception e) { //接收的消息体解析失败
- webSocket.onOccurException(e, packet.receiveBuffers);
- if (readBuffer != null) {
- readBuffer.clear();
- channel.read(readBuffer, null, this);
- }
- return;
- }
- if (readBuffer != null) {
- readBuffer.clear();
- channel.read(readBuffer, null, this);
- }
- try {
+ try {
+ if (packet.receiveType == WebSocketPacket.MessageType.STRING) {
+ webSocket.onMessage((String) packet.receiveMessage, packet.last);
+ } else {
if (restMessageConsumer != null) { //主要供RestWebSocket使用
- restMessageConsumer.accept(webSocket, message);
+ restMessageConsumer.accept(webSocket, packet.receiveMessage);
} else {
- webSocket.onMessage(message, packet.last);
+ webSocket.onMessage(packet.receiveMessage, packet.last);
}
- } catch (Exception e) {
- context.getLogger().log(Level.SEVERE, "WebSocket onTextMessage error (" + packet + ")", e);
}
+ } catch (Exception e) {
+ context.getLogger().log(Level.SEVERE, "WebSocket onTextMessage error (" + packet + ")", e);
}
} else if (packet.type == FrameType.BINARY) {
- Convert binaryConvert = webSocket.getBinaryConvert();
- if (binaryConvert == null) {
- byte[] message = packet.getReceiveBytes();
- if (readBuffer != null) {
- readBuffer.clear();
- channel.read(readBuffer, null, this);
- }
- try {
- webSocket.onMessage(message, packet.last);
- } catch (Exception e) {
- context.getLogger().log(Level.SEVERE, "WebSocket onBinaryMessage error (" + packet + ")", e);
- }
- } else {
- Object message;
- try {
- message = binaryConvert.convertFrom(webSocket._messageTextType, packet.receiveMasker, packet.receiveBuffers);
- } catch (Exception e) { //接收的消息体解析失败
- webSocket.onOccurException(e, packet.receiveBuffers);
- if (readBuffer != null) {
- readBuffer.clear();
- channel.read(readBuffer, null, this);
- }
- return;
- }
- if (readBuffer != null) {
- readBuffer.clear();
- channel.read(readBuffer, null, this);
- }
- try {
- if (restMessageConsumer != null) { //主要供RestWebSocket使用
- restMessageConsumer.accept(webSocket, message);
- } else {
- webSocket.onMessage(message, packet.last);
- }
- } catch (Exception e) {
- context.getLogger().log(Level.SEVERE, "WebSocket onTextMessage error (" + packet + ")", e);
- }
- }
- } else if (packet.type == FrameType.PONG) {
- byte[] message = packet.getReceiveBytes();
- if (readBuffer != null) {
- readBuffer.clear();
- channel.read(readBuffer, null, this);
- }
try {
- webSocket.onPong(message);
+ if (packet.receiveType == WebSocketPacket.MessageType.BYTES) {
+ webSocket.onMessage((byte[]) packet.receiveMessage, packet.last);
+ } else {
+ if (restMessageConsumer != null) { //主要供RestWebSocket使用
+ restMessageConsumer.accept(webSocket, packet.receiveMessage);
+ } else {
+ webSocket.onMessage(packet.receiveMessage, packet.last);
+ }
+ }
} catch (Exception e) {
- context.getLogger().log(Level.SEVERE, "WebSocket onPong error (" + packet + ")", e);
+ context.getLogger().log(Level.SEVERE, "WebSocket onBinaryMessage error (" + packet + ")", e);
}
} else if (packet.type == FrameType.PING) {
- byte[] message = packet.getReceiveBytes();
- if (readBuffer != null) {
- readBuffer.clear();
- channel.read(readBuffer, null, this);
- }
try {
- webSocket.onPing(message);
+ webSocket.onPing((byte[]) packet.receiveMessage);
} catch (Exception e) {
context.getLogger().log(Level.SEVERE, "WebSocket onPing error (" + packet + ")", e);
}
+ } else if (packet.type == FrameType.PONG) {
+ try {
+ webSocket.onPong((byte[]) packet.receiveMessage);
+ } catch (Exception e) {
+ context.getLogger().log(Level.SEVERE, "WebSocket onPong error (" + packet + ")", e);
+ }
} else if (packet.type == FrameType.CLOSE) {
Logger logger = context.getLogger();
if (logger.isLoggable(Level.FINEST)) logger.log(Level.FINEST, "WebSocketRunner onMessage by CLOSE FrameType : " + packet);
closeRunner(0);
+ return;
} else {
context.getLogger().log(Level.WARNING, "WebSocketRunner onMessage by unknown FrameType : " + packet);
- if (readBuffer != null) {
- readBuffer.clear();
- channel.read(readBuffer, null, this);
- }
- }
- } catch (Throwable t) {
- closeRunner(0);
- if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on read WebSocketPacket, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", t);
- } finally {
- if (exBuffers != null) {
- for (ByteBuffer b : exBuffers) {
- context.offerBuffer(b);
- }
+ closeRunner(0);
+ return;
}
}
+// if (true) return; //以下代码废弃
+// try {
+// WebSocketPacket packet;
+// try {
+// packet = new WebSocketPacket().decode(context.getLogger(), readBuffer, exBuffers);
+// } catch (Exception e) { //接收的消息体解析失败
+// webSocket.onOccurException(e, Utility.append(new ByteBuffer[]{readBuffer}, exBuffers == null ? new ByteBuffer[0] : exBuffers));
+// if (readBuffer != null) {
+// readBuffer.clear();
+// channel.read(readBuffer, null, this);
+// }
+// return;
+// }
+// if (packet == null) {
+// failed(null, attachment1);
+// if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on decode WebSocketPacket, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds");
+// return;
+// }
+//
+// if (packet.type == FrameType.TEXT) {
+// Convert textConvert = webSocket.getTextConvert();
+// if (textConvert == null) {
+// byte[] message = packet.getReceiveBytes();
+// if (readBuffer != null) {
+// readBuffer.clear();
+// channel.read(readBuffer, null, this);
+// }
+// try {
+// webSocket.onMessage(new String(message, "UTF-8"), packet.last);
+// } catch (Exception e) {
+// context.getLogger().log(Level.SEVERE, "WebSocket onBinaryMessage error (" + packet + ")", e);
+// }
+// } else {
+// Object message;
+// try {
+// message = textConvert.convertFrom(webSocket._messageTextType, packet.receiveMasker, packet.receiveBuffers);
+// } catch (Exception e) { //接收的消息体解析失败
+// webSocket.onOccurException(e, packet.receiveBuffers);
+// if (readBuffer != null) {
+// readBuffer.clear();
+// channel.read(readBuffer, null, this);
+// }
+// return;
+// }
+// if (readBuffer != null) {
+// readBuffer.clear();
+// channel.read(readBuffer, null, this);
+// }
+// try {
+// if (restMessageConsumer != null) { //主要供RestWebSocket使用
+// restMessageConsumer.accept(webSocket, message);
+// } else {
+// webSocket.onMessage(message, packet.last);
+// }
+// } catch (Exception e) {
+// context.getLogger().log(Level.SEVERE, "WebSocket onTextMessage error (" + packet + ")", e);
+// }
+// }
+// } else if (packet.type == FrameType.BINARY) {
+// Convert binaryConvert = webSocket.getBinaryConvert();
+// if (binaryConvert == null) {
+// byte[] message = packet.getReceiveBytes();
+// if (readBuffer != null) {
+// readBuffer.clear();
+// channel.read(readBuffer, null, this);
+// }
+// try {
+// webSocket.onMessage(message, packet.last);
+// } catch (Exception e) {
+// context.getLogger().log(Level.SEVERE, "WebSocket onBinaryMessage error (" + packet + ")", e);
+// }
+// } else {
+// Object message;
+// try {
+// message = binaryConvert.convertFrom(webSocket._messageTextType, packet.receiveMasker, packet.receiveBuffers);
+// } catch (Exception e) { //接收的消息体解析失败
+// webSocket.onOccurException(e, packet.receiveBuffers);
+// if (readBuffer != null) {
+// readBuffer.clear();
+// channel.read(readBuffer, null, this);
+// }
+// return;
+// }
+// if (readBuffer != null) {
+// readBuffer.clear();
+// channel.read(readBuffer, null, this);
+// }
+// try {
+// if (restMessageConsumer != null) { //主要供RestWebSocket使用
+// restMessageConsumer.accept(webSocket, message);
+// } else {
+// webSocket.onMessage(message, packet.last);
+// }
+// } catch (Exception e) {
+// context.getLogger().log(Level.SEVERE, "WebSocket onTextMessage error (" + packet + ")", e);
+// }
+// }
+// } else if (packet.type == FrameType.PONG) {
+// byte[] message = packet.getReceiveBytes();
+// if (readBuffer != null) {
+// readBuffer.clear();
+// channel.read(readBuffer, null, this);
+// }
+// try {
+// webSocket.onPong(message);
+// } catch (Exception e) {
+// context.getLogger().log(Level.SEVERE, "WebSocket onPong error (" + packet + ")", e);
+// }
+// } else if (packet.type == FrameType.PING) {
+// byte[] message = packet.getReceiveBytes();
+// if (readBuffer != null) {
+// readBuffer.clear();
+// channel.read(readBuffer, null, this);
+// }
+// try {
+// webSocket.onPing(message);
+// } catch (Exception e) {
+// context.getLogger().log(Level.SEVERE, "WebSocket onPing error (" + packet + ")", e);
+// }
+// } else if (packet.type == FrameType.CLOSE) {
+// Logger logger = context.getLogger();
+// if (logger.isLoggable(Level.FINEST)) logger.log(Level.FINEST, "WebSocketRunner onMessage by CLOSE FrameType : " + packet);
+// closeRunner(0);
+// } else {
+// context.getLogger().log(Level.WARNING, "WebSocketRunner onMessage by unknown FrameType : " + packet);
+// if (readBuffer != null) {
+// readBuffer.clear();
+// channel.read(readBuffer, null, this);
+// }
+// }
+// } catch (Throwable t) {
+// closeRunner(0);
+// if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on read WebSocketPacket, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", t);
+// } finally {
+// if (exBuffers != null) {
+// for (ByteBuffer b : exBuffers) {
+// context.offerBuffer(b);
+// }
+// }
+// }
}
@Override