diff --git a/src/org/redkale/net/http/WebSocketPacket.java b/src/org/redkale/net/http/WebSocketPacket.java index e899dc4f8..c284e0998 100644 --- a/src/org/redkale/net/http/WebSocketPacket.java +++ b/src/org/redkale/net/http/WebSocketPacket.java @@ -8,6 +8,8 @@ package org.redkale.net.http; import org.redkale.util.Utility; import java.io.*; import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.AbstractMap; import java.util.function.Supplier; import java.util.logging.*; import org.redkale.convert.*; @@ -21,8 +23,16 @@ import org.redkale.convert.*; */ public final class WebSocketPacket { + private static final Charset UTF_8 = Charset.forName("UTF-8"); + + static final WebSocketPacket NONE = new WebSocketPacket(); + public static final WebSocketPacket DEFAULT_PING_PACKET = new WebSocketPacket(FrameType.PING, new byte[0]); + public static enum MessageType { + STRING, BYTES, OBJECT; + } + public static enum FrameType { TEXT(0x01), BINARY(0x02), CLOSE(0x08), PING(0x09), PONG(0x0A); @@ -57,14 +67,24 @@ public final class WebSocketPacket { protected boolean last = true; + //---------------发送------------------------ Object sendJson; Convert sendConvert; - boolean mapconvable; + boolean sendMapconvable; ByteBuffer[] sendBuffers; + //---------------接收------------------------ + MessageType receiveType; + + int receiveCount; + + int receiveLength; + + Object receiveMessage; + ConvertMask receiveMasker; ByteBuffer[] receiveBuffers; @@ -119,7 +139,7 @@ public final class WebSocketPacket { WebSocketPacket(Convert convert, boolean mapconvable, Object json, boolean fin) { this.type = (convert == null || !convert.isBinary()) ? FrameType.TEXT : FrameType.BINARY; this.sendConvert = convert; - this.mapconvable = mapconvable; + this.sendMapconvable = mapconvable; this.sendJson = json; this.last = fin; if (mapconvable && !(json instanceof Object[])) throw new IllegalArgumentException(); @@ -183,7 +203,7 @@ public final class WebSocketPacket { @Override public String toString() { - return this.getClass().getSimpleName() + "[type=" + type + ", last=" + last + (payload != null ? (", payload=" + payload) : "") + (bytes != null ? (", bytes=[" + bytes.length + ']') : "") + (sendJson != null ? (", json=" + (mapconvable ? Utility.ofMap((Object[]) sendJson) : sendJson)) : "") + "]"; + return this.getClass().getSimpleName() + "[type=" + type + ", last=" + last + (payload != null ? (", payload=" + payload) : "") + (bytes != null ? (", bytes=[" + bytes.length + ']') : (receiveLength > 0 ? (", receivemsg=" + receiveMessage) : "")) + (sendJson != null ? (", json=" + (sendMapconvable ? Utility.ofMap((Object[]) sendJson) : sendJson)) : "") + "]"; } /** @@ -211,7 +231,7 @@ public final class WebSocketPacket { return supplier.get(); } }; - ByteBuffer[] buffers = this.mapconvable ? this.sendConvert.convertMapTo(newsupplier, (Object[]) sendJson) : this.sendConvert.convertTo(newsupplier, sendJson); + ByteBuffer[] buffers = this.sendMapconvable ? this.sendConvert.convertMapTo(newsupplier, (Object[]) sendJson) : this.sendConvert.convertTo(newsupplier, sendJson); int len = 0; for (ByteBuffer buf : buffers) { len += buf.remaining(); @@ -291,108 +311,20 @@ public final class WebSocketPacket { // System.out.println(rs); // } /** - * 消息解码
* - * 0 1 2 3 - * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 - * +-+-+-+-+-------+-+-------------+-------------------------------+ - * |F|R|R|R| opcode|M| Payload len | Extended payload length | - * |I|S|S|S| (4) |A| (7) | (16/64) | - * |N|V|V|V| |S| | (if payload len==126/127) | - * | |1|2|3| |K| | | - * +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - + - * | Extended payload length continued, if payload len == 127 | - * + - - - - - - - - - - - - - - - +-------------------------------+ - * | |Masking-key, if MASK set to 1 | - * +-------------------------------+-------------------------------+ - * | Masking-key (continued) | Payload Data | - * +-------------------------------- - - - - - - - - - - - - - - - + - * : Payload Data continued : - * + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - * | Payload Data continued | - * +-----------------------------------------------------------------------+ + * @param webSocket WebSocket + * @param readBuffer ByteBuffer * - * @param buffer - * @param exbuffers - * - * @return 1:表示解析成功且能继续解析;0:表示解析成功; -1:表示解析失败 + * @return boolean 已接收完返回true, 需要继续接收body返回false; */ - /* - static int decode(final Logger logger, final List packets, - final int wsmaxbody, final ByteBuffer buffer, ByteBuffer... exbuffers) { - final boolean debug = false; //调试开关 - if (debug) { - int remain = buffer.remaining(); - if (exbuffers != null) { - for (ByteBuffer b : exbuffers) { - remain += b == null ? 0 : b.remaining(); - } - } - logger.log(Level.FINEST, "read websocket message's length = " + remain); - } - if (buffer.remaining() < 2) return -1; - final byte opcode = buffer.get(); - final WebSocketPacket packet = new WebSocketPacket(); - packet.last = (opcode & 0b1000_0000) != 0; - packet.type = FrameType.valueOf(opcode & 0xF); - if (packet.type == FrameType.CLOSE) { - if (debug) logger.log(Level.FINEST, " receive close command from websocket client"); - packets.add(packet); - return 0; - } - final boolean checkrsv = false;//暂时不校验 - if (checkrsv && (opcode & 0b0111_0000) != 0) { - if (debug) logger.log(Level.FINE, "rsv1 rsv2 rsv3 must be 0, but not (" + opcode + ")"); - return -1; //rsv1 rsv2 rsv3 must be 0 - } - //0x00 表示一个后续帧 - //0x01 表示一个文本帧 - //0x02 表示一个二进制帧 - //0x03-07 为以后的非控制帧保留 - //0x8 表示一个连接关闭 - //0x9 表示一个ping - //0xA 表示一个pong - //0x0B-0F 为以后的控制帧保留 - final boolean control = (opcode & 0b0000_1000) != 0; //是否控制帧 - byte lengthCode = buffer.get(); - final boolean masked = (lengthCode & 0x80) == 0x80; - if (masked) lengthCode ^= 0x80; //mask - int length; - if (lengthCode <= 0x7D) { //125 - length = lengthCode; - } else { - if (control) { - if (debug) logger.log(Level.FINE, " receive control command from websocket client"); - return -1; - } - if (lengthCode == 0x7E) {//0x7E=126 - length = (int) buffer.getChar(); - } else { - length = buffer.getInt(); - } - } - if (length > wsmaxbody && wsmaxbody > 0) { - if (debug) logger.log(Level.FINE, "message body(" + length + ") is too big, but must less " + wsmaxbody); - return -1; - } - ConvertMask masker = null; - if (masked) { - final byte[] masks = new byte[4]; - buffer.get(masks); - masker = new ConvertMask() { - - private int index = 0; - - @Override - public byte unmask(byte value) { - return (byte) (value ^ masks[index++ % 4]); - } - }; - } - this.receiveBuffers = Utility.append(new ByteBuffer[]{buffer}, exbuffers); - return this; + boolean receiveBody(WebSocket webSocket, ByteBuffer readBuffer) { + int need = receiveLength - receiveCount; + boolean over = readBuffer.remaining() >= need; + this.receiveBuffers = Utility.append(this.receiveBuffers, readBuffer); + if (over) parseReceiveMessage(webSocket, this.receiveBuffers); + return over; } -*/ + /** * 消息解码
* @@ -418,26 +350,25 @@ public final class WebSocketPacket { * @param buffer * @param exbuffers * - * @return + * @return 返回NONE表示Buffer内容不够; 返回this表示解析完成或部分解析完成;返回null表示解析异常; */ - WebSocketPacket decode(final Logger logger, final ByteBuffer buffer, ByteBuffer... exbuffers) { + WebSocketPacket decode(final Logger logger, final WebSocket webSocket, final int wsmaxbody, + final AbstractMap.SimpleEntry halfBytes, final ByteBuffer buffer) { + //开始 final boolean debug = false; //调试开关 - if (debug) { - int remain = buffer.remaining(); - if (exbuffers != null) { - for (ByteBuffer b : exbuffers) { - remain += b == null ? 0 : b.remaining(); - } - } - logger.log(Level.FINEST, "read websocket message's length = " + remain); + if (debug) logger.log(Level.FINEST, "read websocket message's length = " + buffer.remaining()); + if (!buffer.hasRemaining()) return NONE; + if (buffer.remaining() < 2) { + byte[] bs = new byte[buffer.remaining()]; + buffer.get(bs); + halfBytes.setValue(bs); + return NONE; } - if (buffer.remaining() < 2) return null; - byte opcode = buffer.get(); + final byte opcode = buffer.get(); //第一个字节 this.last = (opcode & 0b1000_0000) != 0; this.type = FrameType.valueOf(opcode & 0xF); if (type == FrameType.CLOSE) { if (debug) logger.log(Level.FINEST, " receive close command from websocket client"); - return this; } final boolean checkrsv = false;//暂时不校验 if (checkrsv && (opcode & 0b0111_0000) != 0) { @@ -453,9 +384,23 @@ public final class WebSocketPacket { //0xA 表示一个pong //0x0B-0F 为以后的控制帧保留 final boolean control = (opcode & 0b0000_1000) != 0; //是否控制帧 - byte lengthCode = buffer.get(); + final byte crcode = buffer.get(); //第二个字节 + + byte lengthCode = crcode; final boolean masked = (lengthCode & 0x80) == 0x80; if (masked) lengthCode ^= 0x80; //mask + + //判断Buffer剩余内容够不够基本信息的创建 + int minBufferLength = ((lengthCode <= 0x7D) ? 0 : (lengthCode == 0x7E ? 2 : 4)) + (masked ? 4 : 0); + if (buffer.remaining() < minBufferLength) { + byte[] bs = new byte[2 + buffer.remaining()]; + bs[0] = opcode; + bs[1] = crcode; + buffer.get(bs, 2, buffer.remaining()); + halfBytes.setValue(bs); + return NONE; + } + int length; if (lengthCode <= 0x7D) { //125 length = lengthCode; @@ -470,6 +415,11 @@ public final class WebSocketPacket { length = buffer.getInt(); } } + if (length > wsmaxbody && wsmaxbody > 0) { + if (debug) logger.log(Level.FINE, "message length (" + length + ") too big, must less " + wsmaxbody + ""); + return null; + } + this.receiveLength = length; if (masked) { final byte[] masks = new byte[4]; buffer.get(masks); @@ -483,25 +433,65 @@ public final class WebSocketPacket { } }; } - this.receiveBuffers = Utility.append(new ByteBuffer[]{buffer}, exbuffers); + if (buffer.remaining() >= this.receiveLength) { //内容足够, 可以解析 + this.parseReceiveMessage(webSocket, buffer); + this.receiveCount = this.receiveLength; + } else { + this.receiveCount = buffer.remaining(); + this.receiveBuffers = buffer.hasRemaining() ? new ByteBuffer[]{buffer} : null; + } return this; } - byte[] getReceiveBytes() { - if (this.receiveBuffers.length == 0) return new byte[0]; - if (this.receiveBuffers.length == 1 && this.receiveBuffers[0].remaining() == 0) return new byte[0]; - - int count = 0; - for (ByteBuffer buf : this.receiveBuffers) { - count += buf.remaining(); + void parseReceiveMessage(WebSocket webSocket, ByteBuffer... buffers) { + if (this.type == FrameType.TEXT) { + Convert textConvert = webSocket.getTextConvert(); + if (textConvert == null) { + this.receiveMessage = new String(this.getReceiveBytes(buffers), UTF_8); + this.receiveType = MessageType.STRING; + } else { + this.receiveMessage = textConvert.convertFrom(webSocket._messageTextType, this.receiveMasker, buffers); + this.receiveCount = this.receiveLength; + this.receiveType = MessageType.OBJECT; + } + } else if (this.type == FrameType.BINARY) { + Convert binaryConvert = webSocket.getBinaryConvert(); + if (binaryConvert == null) { + this.receiveMessage = this.getReceiveBytes(buffers); + this.receiveType = MessageType.BYTES; + } else { + this.receiveMessage = binaryConvert.convertFrom(webSocket._messageTextType, this.receiveMasker, buffers); + this.receiveCount = this.receiveLength; + this.receiveType = MessageType.OBJECT; + } + } else if (this.type == FrameType.PING) { + this.receiveMessage = this.getReceiveBytes(buffers); + this.receiveType = MessageType.BYTES; + } else if (this.type == FrameType.PONG) { + this.receiveMessage = this.getReceiveBytes(buffers); + this.receiveType = MessageType.BYTES; + } else if (this.type == FrameType.CLOSE) { + this.receiveMessage = this.getReceiveBytes(buffers); + this.receiveType = MessageType.BYTES; } - byte[] bs = new byte[count]; + } + + boolean isReceiveFinished() { + return this.receiveLength <= this.receiveCount; + } + + byte[] getReceiveBytes(ByteBuffer... buffers) { + final int length = this.receiveLength; + if (length == 0) return new byte[0]; + byte[] bs = new byte[length]; int index = 0; - for (ByteBuffer buf : this.receiveBuffers) { - int r = buf.remaining(); + for (ByteBuffer buf : buffers) { + int r = Math.min(buf.remaining(), length - index); buf.get(bs, index, r); index += r; + if (index >= length) break; } + this.receiveCount = index; ConvertMask mask = this.receiveMasker; if (mask != null) { for (int i = 0; i < bs.length; i++) { @@ -510,4 +500,5 @@ public final class WebSocketPacket { } return bs; } + } diff --git a/src/org/redkale/net/http/WebSocketRunner.java b/src/org/redkale/net/http/WebSocketRunner.java index bbdf35111..203db80a9 100644 --- a/src/org/redkale/net/http/WebSocketRunner.java +++ b/src/org/redkale/net/http/WebSocketRunner.java @@ -12,12 +12,11 @@ import org.redkale.net.http.WebSocketPacket.FrameType; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.*; +import java.util.AbstractMap.SimpleEntry; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.logging.*; -import org.redkale.convert.Convert; -import org.redkale.util.Utility; /** * WebSocket的消息接收发送器, 一个WebSocket对应一个WebSocketRunner @@ -62,185 +61,273 @@ class WebSocketRunner implements Runnable { public void run() { final boolean debug = true; try { - final int wsmaxbody = webSocket._engine.wsmaxbody; webSocket.onConnected(); channel.setReadTimeoutSecond(300); //读取超时5分钟 if (channel.isOpen()) { + final int wsmaxbody = webSocket._engine.wsmaxbody; channel.read(readBuffer, null, new CompletionHandler() { - private ByteBuffer recentExBuffer; - - private final List packets = new ArrayList<>(); + //尚未解析完的数据包 + private WebSocketPacket unfinishPacket; //当接收的数据流长度大于ByteBuffer长度时, 则需要额外的ByteBuffer 辅助; - private final List readBuffers = new ArrayList<>(); + private final List exBuffers = new ArrayList<>(); + + private final SimpleEntry halfBytes = new SimpleEntry("", null); @Override public void completed(Integer count, Void attachment1) { - if (count < 1 && readBuffers.isEmpty()) { + if (count < 1) { closeRunner(0); if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on read buffer count, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds"); return; } if (readBuffer == null) return; - if (!readBuffer.hasRemaining() && (recentExBuffer == null || !recentExBuffer.hasRemaining())) { - final ByteBuffer buffer = context.pollBuffer(); - recentExBuffer = buffer; - readBuffers.add(buffer); - channel.read(buffer, null, this); - return; - } readBuffer.flip(); - ByteBuffer[] exBuffers = null; - if (!readBuffers.isEmpty()) { - exBuffers = readBuffers.toArray(new ByteBuffer[readBuffers.size()]); - readBuffers.clear(); - recentExBuffer = null; - for (ByteBuffer b : exBuffers) { - b.flip(); + WebSocketPacket onePacket = null; + if (unfinishPacket != null) { + if (unfinishPacket.receiveBody(webSocket, readBuffer)) { //已经接收完毕 + onePacket = unfinishPacket; + unfinishPacket = null; + for (ByteBuffer b : exBuffers) { + context.offerBuffer(b); + } + exBuffers.clear(); + } else { //需要继续接收 + readBuffer = context.pollBuffer(); + channel.read(readBuffer, null, this); + return; } } + final List packets = new ArrayList<>(); + if (onePacket != null) packets.add(onePacket); try { - WebSocketPacket packet; - try { - packet = new WebSocketPacket().decode(context.getLogger(), readBuffer, exBuffers); - } catch (Exception e) { //接收的消息体解析失败 - webSocket.onOccurException(e, Utility.append(new ByteBuffer[]{readBuffer}, exBuffers == null ? new ByteBuffer[0] : exBuffers)); - if (readBuffer != null) { - readBuffer.clear(); - channel.read(readBuffer, null, this); + while (true) { + WebSocketPacket packet = new WebSocketPacket().decode(context.getLogger(), webSocket, wsmaxbody, halfBytes, readBuffer); + if (packet == WebSocketPacket.NONE) break; //解析完毕但是buffer有多余字节 + if (packet != null && !packet.isReceiveFinished()) { + unfinishPacket = packet; + if (readBuffer.hasRemaining()) { + exBuffers.add(readBuffer); + readBuffer = context.pollBuffer(); + } + break; } - return; + packets.add(packet); + if (packet == null || !readBuffer.hasRemaining()) break; } + } catch (Exception e) { + context.getLogger().log(Level.SEVERE, "WebSocket parse message error", e); + webSocket.onOccurException(e, null); + } + //继续监听消息 + readBuffer.clear(); + if (halfBytes.getValue() != null) { + readBuffer.put(halfBytes.getValue()); + halfBytes.setValue(null); + } + channel.read(readBuffer, null, this); + + //消息处理 + for (final WebSocketPacket packet : packets) { if (packet == null) { failed(null, attachment1); if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on decode WebSocketPacket, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds"); return; } - if (packet.type == FrameType.TEXT) { - Convert textConvert = webSocket.getTextConvert(); - if (textConvert == null) { - byte[] message = packet.getReceiveBytes(); - if (readBuffer != null) { - readBuffer.clear(); - channel.read(readBuffer, null, this); - } - try { - webSocket.onMessage(new String(message, "UTF-8"), packet.last); - } catch (Exception e) { - context.getLogger().log(Level.SEVERE, "WebSocket onBinaryMessage error (" + packet + ")", e); - } - } else { - Object message; - try { - message = textConvert.convertFrom(webSocket._messageTextType, packet.receiveMasker, packet.receiveBuffers); - } catch (Exception e) { //接收的消息体解析失败 - webSocket.onOccurException(e, packet.receiveBuffers); - if (readBuffer != null) { - readBuffer.clear(); - channel.read(readBuffer, null, this); - } - return; - } - if (readBuffer != null) { - readBuffer.clear(); - channel.read(readBuffer, null, this); - } - try { + try { + if (packet.receiveType == WebSocketPacket.MessageType.STRING) { + webSocket.onMessage((String) packet.receiveMessage, packet.last); + } else { if (restMessageConsumer != null) { //主要供RestWebSocket使用 - restMessageConsumer.accept(webSocket, message); + restMessageConsumer.accept(webSocket, packet.receiveMessage); } else { - webSocket.onMessage(message, packet.last); + webSocket.onMessage(packet.receiveMessage, packet.last); } - } catch (Exception e) { - context.getLogger().log(Level.SEVERE, "WebSocket onTextMessage error (" + packet + ")", e); } + } catch (Exception e) { + context.getLogger().log(Level.SEVERE, "WebSocket onTextMessage error (" + packet + ")", e); } } else if (packet.type == FrameType.BINARY) { - Convert binaryConvert = webSocket.getBinaryConvert(); - if (binaryConvert == null) { - byte[] message = packet.getReceiveBytes(); - if (readBuffer != null) { - readBuffer.clear(); - channel.read(readBuffer, null, this); - } - try { - webSocket.onMessage(message, packet.last); - } catch (Exception e) { - context.getLogger().log(Level.SEVERE, "WebSocket onBinaryMessage error (" + packet + ")", e); - } - } else { - Object message; - try { - message = binaryConvert.convertFrom(webSocket._messageTextType, packet.receiveMasker, packet.receiveBuffers); - } catch (Exception e) { //接收的消息体解析失败 - webSocket.onOccurException(e, packet.receiveBuffers); - if (readBuffer != null) { - readBuffer.clear(); - channel.read(readBuffer, null, this); - } - return; - } - if (readBuffer != null) { - readBuffer.clear(); - channel.read(readBuffer, null, this); - } - try { - if (restMessageConsumer != null) { //主要供RestWebSocket使用 - restMessageConsumer.accept(webSocket, message); - } else { - webSocket.onMessage(message, packet.last); - } - } catch (Exception e) { - context.getLogger().log(Level.SEVERE, "WebSocket onTextMessage error (" + packet + ")", e); - } - } - } else if (packet.type == FrameType.PONG) { - byte[] message = packet.getReceiveBytes(); - if (readBuffer != null) { - readBuffer.clear(); - channel.read(readBuffer, null, this); - } try { - webSocket.onPong(message); + if (packet.receiveType == WebSocketPacket.MessageType.BYTES) { + webSocket.onMessage((byte[]) packet.receiveMessage, packet.last); + } else { + if (restMessageConsumer != null) { //主要供RestWebSocket使用 + restMessageConsumer.accept(webSocket, packet.receiveMessage); + } else { + webSocket.onMessage(packet.receiveMessage, packet.last); + } + } } catch (Exception e) { - context.getLogger().log(Level.SEVERE, "WebSocket onPong error (" + packet + ")", e); + context.getLogger().log(Level.SEVERE, "WebSocket onBinaryMessage error (" + packet + ")", e); } } else if (packet.type == FrameType.PING) { - byte[] message = packet.getReceiveBytes(); - if (readBuffer != null) { - readBuffer.clear(); - channel.read(readBuffer, null, this); - } try { - webSocket.onPing(message); + webSocket.onPing((byte[]) packet.receiveMessage); } catch (Exception e) { context.getLogger().log(Level.SEVERE, "WebSocket onPing error (" + packet + ")", e); } + } else if (packet.type == FrameType.PONG) { + try { + webSocket.onPong((byte[]) packet.receiveMessage); + } catch (Exception e) { + context.getLogger().log(Level.SEVERE, "WebSocket onPong error (" + packet + ")", e); + } } else if (packet.type == FrameType.CLOSE) { Logger logger = context.getLogger(); if (logger.isLoggable(Level.FINEST)) logger.log(Level.FINEST, "WebSocketRunner onMessage by CLOSE FrameType : " + packet); closeRunner(0); + return; } else { context.getLogger().log(Level.WARNING, "WebSocketRunner onMessage by unknown FrameType : " + packet); - if (readBuffer != null) { - readBuffer.clear(); - channel.read(readBuffer, null, this); - } - } - } catch (Throwable t) { - closeRunner(0); - if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on read WebSocketPacket, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", t); - } finally { - if (exBuffers != null) { - for (ByteBuffer b : exBuffers) { - context.offerBuffer(b); - } + closeRunner(0); + return; } } +// if (true) return; //以下代码废弃 +// try { +// WebSocketPacket packet; +// try { +// packet = new WebSocketPacket().decode(context.getLogger(), readBuffer, exBuffers); +// } catch (Exception e) { //接收的消息体解析失败 +// webSocket.onOccurException(e, Utility.append(new ByteBuffer[]{readBuffer}, exBuffers == null ? new ByteBuffer[0] : exBuffers)); +// if (readBuffer != null) { +// readBuffer.clear(); +// channel.read(readBuffer, null, this); +// } +// return; +// } +// if (packet == null) { +// failed(null, attachment1); +// if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on decode WebSocketPacket, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds"); +// return; +// } +// +// if (packet.type == FrameType.TEXT) { +// Convert textConvert = webSocket.getTextConvert(); +// if (textConvert == null) { +// byte[] message = packet.getReceiveBytes(); +// if (readBuffer != null) { +// readBuffer.clear(); +// channel.read(readBuffer, null, this); +// } +// try { +// webSocket.onMessage(new String(message, "UTF-8"), packet.last); +// } catch (Exception e) { +// context.getLogger().log(Level.SEVERE, "WebSocket onBinaryMessage error (" + packet + ")", e); +// } +// } else { +// Object message; +// try { +// message = textConvert.convertFrom(webSocket._messageTextType, packet.receiveMasker, packet.receiveBuffers); +// } catch (Exception e) { //接收的消息体解析失败 +// webSocket.onOccurException(e, packet.receiveBuffers); +// if (readBuffer != null) { +// readBuffer.clear(); +// channel.read(readBuffer, null, this); +// } +// return; +// } +// if (readBuffer != null) { +// readBuffer.clear(); +// channel.read(readBuffer, null, this); +// } +// try { +// if (restMessageConsumer != null) { //主要供RestWebSocket使用 +// restMessageConsumer.accept(webSocket, message); +// } else { +// webSocket.onMessage(message, packet.last); +// } +// } catch (Exception e) { +// context.getLogger().log(Level.SEVERE, "WebSocket onTextMessage error (" + packet + ")", e); +// } +// } +// } else if (packet.type == FrameType.BINARY) { +// Convert binaryConvert = webSocket.getBinaryConvert(); +// if (binaryConvert == null) { +// byte[] message = packet.getReceiveBytes(); +// if (readBuffer != null) { +// readBuffer.clear(); +// channel.read(readBuffer, null, this); +// } +// try { +// webSocket.onMessage(message, packet.last); +// } catch (Exception e) { +// context.getLogger().log(Level.SEVERE, "WebSocket onBinaryMessage error (" + packet + ")", e); +// } +// } else { +// Object message; +// try { +// message = binaryConvert.convertFrom(webSocket._messageTextType, packet.receiveMasker, packet.receiveBuffers); +// } catch (Exception e) { //接收的消息体解析失败 +// webSocket.onOccurException(e, packet.receiveBuffers); +// if (readBuffer != null) { +// readBuffer.clear(); +// channel.read(readBuffer, null, this); +// } +// return; +// } +// if (readBuffer != null) { +// readBuffer.clear(); +// channel.read(readBuffer, null, this); +// } +// try { +// if (restMessageConsumer != null) { //主要供RestWebSocket使用 +// restMessageConsumer.accept(webSocket, message); +// } else { +// webSocket.onMessage(message, packet.last); +// } +// } catch (Exception e) { +// context.getLogger().log(Level.SEVERE, "WebSocket onTextMessage error (" + packet + ")", e); +// } +// } +// } else if (packet.type == FrameType.PONG) { +// byte[] message = packet.getReceiveBytes(); +// if (readBuffer != null) { +// readBuffer.clear(); +// channel.read(readBuffer, null, this); +// } +// try { +// webSocket.onPong(message); +// } catch (Exception e) { +// context.getLogger().log(Level.SEVERE, "WebSocket onPong error (" + packet + ")", e); +// } +// } else if (packet.type == FrameType.PING) { +// byte[] message = packet.getReceiveBytes(); +// if (readBuffer != null) { +// readBuffer.clear(); +// channel.read(readBuffer, null, this); +// } +// try { +// webSocket.onPing(message); +// } catch (Exception e) { +// context.getLogger().log(Level.SEVERE, "WebSocket onPing error (" + packet + ")", e); +// } +// } else if (packet.type == FrameType.CLOSE) { +// Logger logger = context.getLogger(); +// if (logger.isLoggable(Level.FINEST)) logger.log(Level.FINEST, "WebSocketRunner onMessage by CLOSE FrameType : " + packet); +// closeRunner(0); +// } else { +// context.getLogger().log(Level.WARNING, "WebSocketRunner onMessage by unknown FrameType : " + packet); +// if (readBuffer != null) { +// readBuffer.clear(); +// channel.read(readBuffer, null, this); +// } +// } +// } catch (Throwable t) { +// closeRunner(0); +// if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on read WebSocketPacket, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", t); +// } finally { +// if (exBuffers != null) { +// for (ByteBuffer b : exBuffers) { +// context.offerBuffer(b); +// } +// } +// } } @Override