From f68686114d56ca037450cdf35c17a71b69152de0 Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Sun, 21 May 2017 12:29:58 +0800 Subject: [PATCH] --- src/org/redkale/net/AsyncConnection.java | 6 +- src/org/redkale/net/Server.java | 3 +- src/org/redkale/net/http/WebSocketPacket.java | 162 +++++++++++++++++- src/org/redkale/net/http/WebSocketRunner.java | 114 ++++++------ 4 files changed, 221 insertions(+), 64 deletions(-) diff --git a/src/org/redkale/net/AsyncConnection.java b/src/org/redkale/net/AsyncConnection.java index 6eac416ca..17607ae34 100644 --- a/src/org/redkale/net/AsyncConnection.java +++ b/src/org/redkale/net/AsyncConnection.java @@ -43,7 +43,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl write(srcs, 0, srcs.length, attachment, handler); } - protected abstract void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler); + public abstract void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler); public void dispose() {//同close, 只是去掉throws IOException try { @@ -187,7 +187,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl } @Override - protected void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler) { + public void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler) { try { int rs = 0; for (int i = offset; i < offset + length; i++) { @@ -338,7 +338,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl } @Override - protected void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler) { + public void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler) { try { int rs = 0; for (int i = offset; i < offset + length; i++) { diff --git a/src/org/redkale/net/Server.java b/src/org/redkale/net/Server.java index 6fd6f4f31..8b460c80e 100644 --- a/src/org/redkale/net/Server.java +++ b/src/org/redkale/net/Server.java @@ -104,7 +104,8 @@ public abstract class Server 详情见: https://redkale.org + *

+ * 详情见: https://redkale.org + * * @author zhangjx */ public final class WebSocketPacket { @@ -120,4 +125,159 @@ public final class WebSocketPacket { public String toString() { return this.getClass().getSimpleName() + "[type=" + type + ", last=" + last + (payload != null ? (", payload=" + payload) : "") + (bytes != null ? (", bytes=[" + bytes.length + ']') : "") + "]"; } + + /** + * 消息编码 + * + * @param supplier Supplier + * + * @return ByteBuffer[] + */ + ByteBuffer[] encode(final Supplier supplier) { + ByteBuffer buffer = supplier.get(); //确保ByteBuffer的capacity不能小于128 + + final byte opcode = (byte) (this.type.getValue() | 0x80); + final byte[] content = getContent(); + final int len = content.length; + if (len <= 0x7D) { //125 + buffer.put(opcode); + buffer.put((byte) len); + buffer.put(content); + buffer.flip(); + return new ByteBuffer[]{buffer}; + } + if (len <= 0xFFFF) { // 65535 + buffer.put(opcode); + buffer.put((byte) 0x7E); //126 + buffer.putChar((char) len); + } else { + buffer.put(opcode); + buffer.put((byte) 0x7F); //127 + buffer.putInt(len); + } + int start = buffer.remaining(); + int pend = len - buffer.remaining(); + if (pend <= 0) { + buffer.put(content); + buffer.flip(); + return new ByteBuffer[]{buffer}; + } + buffer.put(content, 0, buffer.remaining()); + buffer.flip(); + final int capacity = buffer.capacity(); + final ByteBuffer[] buffers = new ByteBuffer[pend / capacity + 1]; + buffers[0] = buffer; + for (int i = 1; i < buffers.length; i++) { + ByteBuffer buf = supplier.get(); + buffer.put(content, start, Math.min(pend, capacity)); + buffer.flip(); + buffers[i] = buf; + start += capacity; + pend -= capacity; + } + return buffers; + } + + /** + * 消息解码
+ * + * 0 1 2 3 + * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + * +-+-+-+-+-------+-+-------------+-------------------------------+ + * |F|R|R|R| opcode|M| Payload len | Extended payload length | + * |I|S|S|S| (4) |A| (7) | (16/64) | + * |N|V|V|V| |S| | (if payload len==126/127) | + * | |1|2|3| |K| | | + * +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - + + * | Extended payload length continued, if payload len == 127 | + * + - - - - - - - - - - - - - - - +-------------------------------+ + * | |Masking-key, if MASK set to 1 | + * +-------------------------------+-------------------------------+ + * | Masking-key (continued) | Payload Data | + * +-------------------------------- - - - - - - - - - - - - - - - + + * : Payload Data continued : + * + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + * | Payload Data continued | + * +-----------------------------------------------------------------------+ + * + * @param buffer + * @param exbuffers + * + * @return + */ + WebSocketPacket decode(final Logger logger, final ByteBuffer buffer, ByteBuffer... exbuffers) { + final boolean debug = true; + if (debug) { + int remain = buffer.remaining(); + if (exbuffers != null) { + for (ByteBuffer b : exbuffers) { + remain += b == null ? 0 : b.remaining(); + } + } + logger.log(Level.FINEST, "read web socket message's length = " + remain); + } + if (buffer.remaining() < 2) return null; + byte opcode = buffer.get(); + this.last = (opcode & 0b1000_0000) != 0; + this.type = FrameType.valueOf(opcode & 0xF); + if (type == FrameType.CLOSE) { + if (debug) logger.log(Level.FINEST, " receive close command from websocket client"); + return null; + } + final boolean checkrsv = false;//暂时不校验 + if (checkrsv && (opcode & 0b0111_0000) != 0) { + if (debug) logger.log(Level.FINE, "rsv1 rsv2 rsv3 must be 0, but not (" + opcode + ")"); + return null; //rsv1 rsv2 rsv3 must be 0 + } + //0x00 表示一个后续帧 + //0x01 表示一个文本帧 + //0x02 表示一个二进制帧 + //0x03-07 为以后的非控制帧保留 + //0x8 表示一个连接关闭 + //0x9 表示一个ping + //0xA 表示一个pong + //0x0B-0F 为以后的控制帧保留 + final boolean control = (opcode & 0b0000_1000) != 0; //是否控制帧 + byte lengthCode = buffer.get(); + final boolean masked = (lengthCode & 0x80) == 0x80; + if (masked) lengthCode ^= 0x80; //mask + int length; + if (lengthCode <= 0x7D) { //125 + length = lengthCode; + } else { + if (control) { + if (debug) logger.log(Level.FINE, " receive control command from websocket client"); + return null; + } + if (lengthCode == 0x7E) {//0x7E=126 + length = (int) buffer.getChar(); + } else { + length = buffer.getInt(); + } + } + byte[] mask = null; + if (masked) { + mask = new byte[4]; + buffer.get(mask); + } + final byte[] data = new byte[length]; + if (buffer.remaining() >= length) { + buffer.get(data); + } else { //必须有 exbuffers + int offset = buffer.remaining(); + buffer.get(data, 0, offset); + for (ByteBuffer b : exbuffers) { + offset += b.remaining(); + b.get(data, offset, b.remaining()); + } + } + if (mask != null) { + for (int i = 0; i < data.length; i++) { + data[i] ^= mask[i % 4]; + } + } + this.bytes = data; + return this; + } + } diff --git a/src/org/redkale/net/http/WebSocketRunner.java b/src/org/redkale/net/http/WebSocketRunner.java index accfc9969..9d89859ca 100644 --- a/src/org/redkale/net/http/WebSocketRunner.java +++ b/src/org/redkale/net/http/WebSocketRunner.java @@ -37,14 +37,12 @@ public class WebSocketRunner implements Runnable { private ByteBuffer readBuffer; - private ByteBuffer writeBuffer; + private ByteBuffer[] writeBuffers; protected volatile boolean closed = false; private AtomicBoolean writing = new AtomicBoolean(); - private final Coder coder = new Coder(); - private final BlockingQueue queue = new ArrayBlockingQueue(1024); private final boolean wsbinary; @@ -57,15 +55,12 @@ public class WebSocketRunner implements Runnable { this.webSocket = webSocket; this.channel = channel; this.wsbinary = wsbinary; - this.coder.logger = context.getLogger(); - this.coder.debugable = false;//context.getLogger().isLoggable(Level.FINEST); this.readBuffer = context.pollBuffer(); - this.writeBuffer = context.pollBuffer(); } @Override public void run() { - final boolean debug = this.coder.debugable; + final boolean debug = true; try { webSocket.onConnected(); channel.setReadTimeoutSecond(300); //读取超时5分钟 @@ -107,10 +102,14 @@ public class WebSocketRunner implements Runnable { b.flip(); } } - WebSocketPacket packet = coder.decode(readBuffer, exBuffers); - if (exBuffers != null) { - for (ByteBuffer b : exBuffers) { - context.offerBuffer(b); + WebSocketPacket packet = null; + try { + packet = new WebSocketPacket().decode(context.getLogger(), readBuffer, exBuffers); + } finally { + if (exBuffers != null) { + for (ByteBuffer b : exBuffers) { + context.offerBuffer(b); + } } } if (packet == null) { @@ -163,73 +162,66 @@ public class WebSocketRunner implements Runnable { public CompletableFuture sendMessage(WebSocketPacket packet) { if (packet == null) return CompletableFuture.completedFuture(RETCODE_SEND_ILLPACKET); if (closed) return CompletableFuture.completedFuture(RETCODE_WSOCKET_CLOSED); - final boolean debug = this.coder.debugable; + boolean debug = true; //System.out.println("推送消息"); - final byte[] bytes = coder.encode(packet); - if (debug) context.getLogger().log(Level.FINEST, "send web socket message's length = " + bytes.length); + if (debug) context.getLogger().log(Level.FINEST, "send web socket message: " + packet); this.lastSendTime = System.currentTimeMillis(); final CompletableFuture futureResult = new CompletableFuture<>(); if (writing.getAndSet(true)) { - queue.add(new QueueEntry(futureResult, bytes)); + queue.add(new QueueEntry(futureResult, packet)); return futureResult; } - ByteBuffer localWriteBuffer = writeBuffer; - if (localWriteBuffer == null) return CompletableFuture.completedFuture(RETCODE_ILLEGALBUFFER); - ByteBuffer sendBuffer; - if (bytes.length <= localWriteBuffer.capacity()) { - localWriteBuffer.clear(); - localWriteBuffer.put(bytes); - localWriteBuffer.flip(); - sendBuffer = localWriteBuffer; - } else { - sendBuffer = ByteBuffer.wrap(bytes); - } + ByteBuffer[] buffers = packet.encode(this.context.getBufferSupplier()); + this.writeBuffers = buffers; try { - channel.write(sendBuffer, sendBuffer, new CompletionHandler() { + channel.write(buffers, buffers, new CompletionHandler() { private CompletableFuture future = futureResult; @Override - public void completed(Integer result, ByteBuffer attachment) { - if (attachment == null || closed) { + public void completed(Integer result, ByteBuffer[] attachments) { + if (attachments == null || closed) { if (future != null) { future.complete(RETCODE_WSOCKET_CLOSED); future = null; + if (writeBuffers != null) { + for (ByteBuffer buf : writeBuffers) { + context.offerBuffer(buf); + } + writeBuffers = null; + } } return; } try { - if (attachment.hasRemaining()) { - if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner write completed reemaining: " + attachment.remaining()); - channel.write(attachment, attachment, this); + int index = -1; + for (int i = 0; i < attachments.length; i++) { + if (attachments[i].hasRemaining()) { + index = i; + break; + } + } + if (index >= 0) { + channel.write(attachments, index, attachments.length - index, attachments, this); return; } if (future != null) { future.complete(0); future = null; + if (writeBuffers != null) { + for (ByteBuffer buf : writeBuffers) { + context.offerBuffer(buf); + } + writeBuffers = null; + } } QueueEntry entry = queue.poll(); - ByteBuffer localWriteBuffer = writeBuffer; - if (entry == null) return; //没有数据了 - future = entry.future; - if (localWriteBuffer == null) { - if (future != null) { - future.complete(RETCODE_WSOCKET_CLOSED); - future = null; - } - return; + if (entry != null) { + future = entry.future; + ByteBuffer[] buffers = packet.encode(context.getBufferSupplier()); + writeBuffers = buffers; + channel.write(buffers, buffers, this); } - byte[] bs = entry.bytes; - ByteBuffer sendBuffer; - if (bs.length <= localWriteBuffer.capacity()) { - localWriteBuffer.clear(); - localWriteBuffer.put(bs); - localWriteBuffer.flip(); - sendBuffer = localWriteBuffer; - } else { - sendBuffer = ByteBuffer.wrap(bs); - } - channel.write(sendBuffer, sendBuffer, this); } catch (Exception e) { closeRunner(); context.getLogger().log(Level.WARNING, "WebSocket sendMessage abort on rewrite, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", e); @@ -238,7 +230,7 @@ public class WebSocketRunner implements Runnable { } @Override - public void failed(Throwable exc, ByteBuffer attachment) { + public void failed(Throwable exc, ByteBuffer[] attachments) { writing.set(false); closeRunner(); if (exc != null) { @@ -265,9 +257,13 @@ public class WebSocketRunner implements Runnable { } catch (Throwable t) { } context.offerBuffer(readBuffer); - context.offerBuffer(writeBuffer); readBuffer = null; - writeBuffer = null; + if (writeBuffers != null) { + for (ByteBuffer buf : writeBuffers) { + context.offerBuffer(buf); + } + writeBuffers = null; + } engine.remove(webSocket); webSocket.onClose(0, null); } @@ -277,11 +273,11 @@ public class WebSocketRunner implements Runnable { public final CompletableFuture future; - public final byte[] bytes; + public final WebSocketPacket packet; - public QueueEntry(CompletableFuture future, byte[] bytes) { + public QueueEntry(CompletableFuture future, WebSocketPacket packet) { this.future = future; - this.bytes = bytes; + this.packet = packet; } } @@ -389,7 +385,7 @@ public class WebSocketRunner implements Runnable { } } - private static final class Coder { + private static final class WebSocketCoder { protected byte inFragmentedType;