diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index 4536d065b..9db399ba7 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -68,7 +68,7 @@ abstract class AsyncNioConnection extends AsyncConnection { protected Object writeAttachment; - protected CompletionHandler writeCompletionHandler; + protected CompletionHandler writeCompletionHandler; protected SelectionKey writeKey; @@ -480,7 +480,7 @@ abstract class AsyncNioConnection extends AsyncConnection { } protected void handleWrite(final int totalCount, Throwable t) { - CompletionHandler handler = this.writeCompletionHandler; + CompletionHandler handler = this.writeCompletionHandler; Object attach = this.writeAttachment; // 清空写参数 this.writeCompletionHandler = null; diff --git a/src/main/java/org/redkale/net/PipelinePacket.java b/src/main/java/org/redkale/net/PipelinePacket.java index d5980e6ba..3751fd81b 100644 --- a/src/main/java/org/redkale/net/PipelinePacket.java +++ b/src/main/java/org/redkale/net/PipelinePacket.java @@ -26,40 +26,35 @@ public class PipelinePacket { protected int tupleLength; @ConvertColumn(index = 4) - protected CompletionHandler handler; + protected CompletionHandler handler; @ConvertColumn(index = 5) protected Object attach; public PipelinePacket() {} - public PipelinePacket(ByteTuple data, CompletionHandler handler) { + public PipelinePacket(ByteTuple data, CompletionHandler handler) { this(data, handler, null); } - public PipelinePacket(ByteTuple data, CompletionHandler handler, Object attach) { + public PipelinePacket(ByteTuple data, CompletionHandler handler, Object attach) { this(data.content(), data.offset(), data.length(), handler, attach); } - public PipelinePacket(byte[] tupleBytes, CompletionHandler handler) { + public PipelinePacket(byte[] tupleBytes, CompletionHandler handler) { this(tupleBytes, 0, tupleBytes.length, handler, null); } - public PipelinePacket(byte[] tupleBytes, CompletionHandler handler, Object attach) { + public PipelinePacket(byte[] tupleBytes, CompletionHandler handler, Object attach) { this(tupleBytes, 0, tupleBytes.length, handler, attach); } - public PipelinePacket( - byte[] tupleBytes, int tupleOffset, int tupleLength, CompletionHandler handler) { + public PipelinePacket(byte[] tupleBytes, int tupleOffset, int tupleLength, CompletionHandler handler) { this(tupleBytes, tupleOffset, tupleLength, handler, null); } public PipelinePacket( - byte[] tupleBytes, - int tupleOffset, - int tupleLength, - CompletionHandler handler, - Object attach) { + byte[] tupleBytes, int tupleOffset, int tupleLength, CompletionHandler handler, Object attach) { this.tupleBytes = tupleBytes; this.tupleOffset = tupleOffset; this.tupleLength = tupleLength; @@ -91,11 +86,11 @@ public class PipelinePacket { this.tupleLength = tupleLength; } - public CompletionHandler getHandler() { + public CompletionHandler getHandler() { return handler; } - public void setHandler(CompletionHandler handler) { + public void setHandler(CompletionHandler handler) { this.handler = handler; } diff --git a/src/main/java/org/redkale/net/http/WebSocket.java b/src/main/java/org/redkale/net/http/WebSocket.java index eac087b21..424e9d11f 100644 --- a/src/main/java/org/redkale/net/http/WebSocket.java +++ b/src/main/java/org/redkale/net/http/WebSocket.java @@ -8,6 +8,7 @@ package org.redkale.net.http; import java.io.Serializable; import java.net.*; import java.nio.ByteBuffer; +import java.nio.channels.CompletionHandler; import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.CompletableFuture; @@ -21,6 +22,7 @@ import org.redkale.annotation.Nonnull; import org.redkale.convert.Convert; import org.redkale.net.AsyncConnection; import org.redkale.net.http.WebSocketPacket.FrameType; +import org.redkale.util.ByteArray; /** * @@ -96,8 +98,6 @@ public abstract class WebSocket { WebSocketReadHandler _readHandler; - WebSocketWriteHandler _writeHandler; - // 分布式下不可为空 InetSocketAddress _sncpAddress; @@ -273,18 +273,54 @@ public abstract class WebSocket { * @return 0表示成功, 非0表示错误码 */ CompletableFuture sendPacket(WebSocketPacket packet) { - if (this._writeHandler == null) { // if (this._writeIOThread == null) { + if (this._readHandler == null) { // if (this._writeIOThread == null) { if (delayPackets == null) { delayPackets = new ArrayList<>(); } delayPackets.add(packet); return CompletableFuture.completedFuture(RETCODE_DELAYSEND); } - CompletableFuture rs = this._writeHandler.send(packet); // this._writeIOThread.send(this, packet); + return _sendToChannel(packet); + } + + /** + * 给自身发送消息体, 包含二进制/文本 + * + * @param packet WebSocketPacket + * @return 0表示成功, 非0表示错误码 + */ + CompletableFuture _sendToChannel(WebSocketPacket packet) { + if (_channel == null || closed.get()) { + return CompletableFuture.completedFuture(RETCODE_WSOCKET_CLOSED); + } + WebSocketFuture future = new WebSocketFuture(); + _channel.writeInIOThread(packet.encodeToBytes(), future); if (_engine.logger.isLoggable(Level.FINER) && packet != WebSocketPacket.DEFAULT_PING_PACKET) { _engine.logger.finer("userid:" + getUserid() + " send websocket message(" + packet + ")" + " on " + this); } - return rs == null ? CompletableFuture.completedFuture(RETCODE_WSOCKET_CLOSED) : rs; + return future; + } + + /** + * 给自身发送消息体, 包含二进制/文本 + * + * @param packets WebSocketPacket集合 + * @return 0表示成功, 非0表示错误码 + */ + CompletableFuture _sendToChannel(List packets) { + if (_channel == null || closed.get()) { + return CompletableFuture.completedFuture(RETCODE_WSOCKET_CLOSED); + } + WebSocketFuture future = new WebSocketFuture(); + ByteArray array = new ByteArray(); + for (WebSocketPacket packet : packets) { + array.put(packet.encodeToBytes()); + } + _channel.writeInIOThread(array.toArray(), future); + if (_engine.logger.isLoggable(Level.FINER)) { + _engine.logger.finer("userid:" + getUserid() + " send websocket messages(" + packets + ")" + " on " + this); + } + return future; } // ---------------------------------------------------------------- @@ -952,9 +988,6 @@ public abstract class WebSocket { if (_readHandler != null) { _readHandler.byteArrayPool.accept(_readHandler.halfFrameBytes); } - if (_writeHandler != null) { - _writeHandler.byteArrayPool.accept(_writeHandler.writeArray); - } return onClose(code, reason); }; CompletableFuture future = _engine.removeLocalThenDisconnect(this); @@ -979,4 +1012,22 @@ public abstract class WebSocket { public String toString() { return this.getUserid() + "@" + _remoteAddr + "@" + Objects.hashCode(this); } + + protected class WebSocketFuture extends CompletableFuture implements CompletionHandler { + + public WebSocketFuture() { + super(); + } + + @Override + public void completed(Integer result, Void attachment) { + super.complete(0); + } + + @Override + public void failed(Throwable exc, Void attachment) { + super.completeExceptionally(exc); + kill(RETCODE_SENDEXCEPTION, "websocket send message failed on CompletionHandler"); + } + } } diff --git a/src/main/java/org/redkale/net/http/WebSocketPacket.java b/src/main/java/org/redkale/net/http/WebSocketPacket.java index b3deed192..956ff52c3 100644 --- a/src/main/java/org/redkale/net/http/WebSocketPacket.java +++ b/src/main/java/org/redkale/net/http/WebSocketPacket.java @@ -9,7 +9,6 @@ import java.io.Serializable; import java.nio.charset.StandardCharsets; import org.redkale.convert.ConvertColumn; import org.redkale.net.http.WebSocketPacket.FrameType; -import org.redkale.util.ByteArray; /** * 详情见: https://redkale.org @@ -94,23 +93,39 @@ public final class WebSocketPacket { } // 消息编码 - public void writeEncode(final ByteArray array) { + public byte[] encodeToBytes() { final byte opcode = (byte) (type.getValue() | 0x80); final byte[] content = getPayload(); final int len = content.length; if (len <= 0x7D) { // 125 - array.put(opcode); - array.put((byte) len); + byte[] data = new byte[2 + len]; + data[0] = opcode; + data[1] = (byte) len; + System.arraycopy(content, 0, data, 2, len); + return data; } else if (len <= 0xFFFF) { // 65535 - array.put(opcode); - array.put((byte) 0x7E); // 126 - array.putChar((char) len); + byte[] data = new byte[4 + len]; + data[0] = opcode; + data[1] = (byte) 0x7E; // 126 + data[2] = (byte) (len >> 8 & 0xFF); + data[3] = (byte) (len & 0xFF); + System.arraycopy(content, 0, data, 4, len); + return data; } else { - array.put(opcode); - array.put((byte) 0x7F); // 127 - array.putLong(len); + byte[] data = new byte[10 + len]; + data[0] = opcode; + data[1] = (byte) 0x7F; // 127 + data[2] = (byte) (len >> 56 & 0xFF); + data[3] = (byte) (len >> 48 & 0xFF); + data[4] = (byte) (len >> 40 & 0xFF); + data[5] = (byte) (len >> 32 & 0xFF); + data[6] = (byte) (len >> 24 & 0xFF); + data[7] = (byte) (len >> 16 & 0xFF); + data[8] = (byte) (len >> 8 & 0xFF); + data[9] = (byte) (len & 0xFF); + System.arraycopy(content, 0, data, 10, len); + return data; } - array.put(content); } public byte[] getPayload() { diff --git a/src/main/java/org/redkale/net/http/WebSocketServlet.java b/src/main/java/org/redkale/net/http/WebSocketServlet.java index 4ae56a7e4..e46a83c6b 100644 --- a/src/main/java/org/redkale/net/http/WebSocketServlet.java +++ b/src/main/java/org/redkale/net/http/WebSocketServlet.java @@ -329,8 +329,6 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl Traces.currentTraceid(request.getTraceid()); webSocket._readHandler = new WebSocketReadHandler( response.getContext(), webSocket, byteArrayPool, restMessageConsumer); - webSocket._writeHandler = - new WebSocketWriteHandler(response.getContext(), webSocket, byteArrayPool); Runnable createUseridHandler = () -> { CompletableFuture userFuture = webSocket.createUserid(); @@ -415,8 +413,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl webSocket.delayPackets = null; // CompletableFuture cf = webSocket._writeIOThread.send(webSocket, // delayPackets.toArray(new WebSocketPacket[delayPackets.size()])); - CompletableFuture cf = webSocket._writeHandler.send( - delayPackets.toArray(new WebSocketPacket[delayPackets.size()])); + CompletableFuture cf = webSocket._sendToChannel(delayPackets); cf.whenComplete((Integer v, Throwable t) -> { Traces.currentTraceid(request.getTraceid()); if (userid == null || t != null) { @@ -442,8 +439,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl webSocket.delayPackets = null; // CompletableFuture cf = webSocket._writeIOThread.send(webSocket, // delayPackets.toArray(new WebSocketPacket[delayPackets.size()])); - CompletableFuture cf = webSocket._writeHandler.send( - delayPackets.toArray(new WebSocketPacket[delayPackets.size()])); + CompletableFuture cf = webSocket._sendToChannel(delayPackets); cf.whenComplete((Integer v, Throwable t) -> { Traces.currentTraceid(request.getTraceid()); if (sessionid == null || t != null) { diff --git a/src/main/java/org/redkale/net/http/WebSocketWriteHandler.java b/src/main/java/org/redkale/net/http/WebSocketWriteHandler.java deleted file mode 100644 index 3109a7a60..000000000 --- a/src/main/java/org/redkale/net/http/WebSocketWriteHandler.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package org.redkale.net.http; - -import static org.redkale.net.http.WebSocket.*; - -import java.nio.channels.CompletionHandler; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.logging.Level; -import org.redkale.util.*; - -/** @author zhangjx */ -public class WebSocketWriteHandler implements CompletionHandler { - - protected final HttpContext context; - - protected final WebSocket webSocket; - - protected final AtomicBoolean writePending = new AtomicBoolean(); - - protected final ObjectPool byteArrayPool; - - protected final ByteArray writeArray; - - protected final List> respList = new ArrayList(); - - protected final ConcurrentLinkedQueue> requestQueue = new ConcurrentLinkedQueue(); - - public WebSocketWriteHandler(HttpContext context, WebSocket webSocket, ObjectPool byteArrayPool) { - this.context = context; - this.webSocket = webSocket; - this.byteArrayPool = byteArrayPool; - this.writeArray = byteArrayPool.get(); - } - - public CompletableFuture send(WebSocketPacket... packets) { - WebSocketFuture future = new WebSocketFuture<>(packets); - if (writePending.compareAndSet(false, true)) { - respList.clear(); - respList.add(future); - ByteArray array = this.writeArray; - array.clear(); - for (WebSocketPacket p : packets) { - writeEncode(array, p); - } - webSocket._channel.write(array, this); - } else { - requestQueue.offer(future); - } - return future; - } - - @Override - public void completed(Integer result, Void attachment) { - webSocket.lastSendTime = System.currentTimeMillis(); - for (WebSocketFuture future : respList) { - future.complete(0); - } - respList.clear(); - ByteArray array = this.writeArray; - array.clear(); - WebSocketFuture req; - while ((req = requestQueue.poll()) != null) { - respList.add(req); - for (WebSocketPacket p : req.packets) { - writeEncode(array, p); - } - } - if (array.isEmpty()) { - if (!writePending.compareAndSet(true, false)) { - completed(0, attachment); - } - } else { - webSocket._channel.write(array, this); - } - } - - @Override - public void failed(Throwable exc, Void attachment) { - writePending.set(false); - WebSocketFuture req; - try { - while ((req = requestQueue.poll()) != null) { - req.completeExceptionally(exc); - } - for (WebSocketFuture future : respList) { - future.completeExceptionally(exc); - } - respList.clear(); - } catch (Exception e) { - // do nothing - } - webSocket.kill(RETCODE_SENDEXCEPTION, "websocket send message failed on CompletionHandler"); - if (exc != null && context.getLogger().isLoggable(Level.FINER)) { - context.getLogger() - .log( - Level.FINER, - "WebSocket sendMessage on CompletionHandler failed, force to close channel, live " - + (System.currentTimeMillis() - webSocket.getCreateTime()) / 1000 + " seconds", - exc); - } - } - - // 消息编码 - protected void writeEncode(final ByteArray array, final WebSocketPacket packet) { - final byte opcode = (byte) (packet.type.getValue() | 0x80); - final byte[] content = packet.getPayload(); - final int len = content.length; - if (len <= 0x7D) { // 125 - array.put(opcode); - array.put((byte) len); - } else if (len <= 0xFFFF) { // 65535 - array.put(opcode); - array.put((byte) 0x7E); // 126 - array.putChar((char) len); - } else { - array.put(opcode); - array.put((byte) 0x7F); // 127 - array.putLong(len); - } - array.put(content); - } - - protected static class WebSocketFuture extends CompletableFuture { - - protected WebSocketPacket[] packets; - - public WebSocketFuture() { - super(); - } - - public WebSocketFuture(WebSocketPacket... packets) { - super(); - this.packets = packets; - } - } -}