diff --git a/src/main/java/org/redkale/net/http/HttpContext.java b/src/main/java/org/redkale/net/http/HttpContext.java index 3bc11f16f..d906002f8 100644 --- a/src/main/java/org/redkale/net/http/HttpContext.java +++ b/src/main/java/org/redkale/net/http/HttpContext.java @@ -10,7 +10,6 @@ import java.nio.charset.Charset; import java.security.SecureRandom; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Function; import org.redkale.annotation.ConstructorParameters; import org.redkale.asm.*; import static org.redkale.asm.Opcodes.*; @@ -49,8 +48,6 @@ public class HttpContext extends Context { //不带通配符的mapping url的缓存对象 final Map[] uriPathCaches = new Map[100]; - Function webSocketWriterIOThreadFunc; - public HttpContext(HttpContextConfig config) { super(config); this.remoteAddrHeader = config.remoteAddrHeader; @@ -94,14 +91,6 @@ public class HttpContext extends Context { super.updateWriteIOThread(conn, ioWriteThread); } - protected void updateWebSocketWriteIOThread(WebSocket webSocket) { - if (webSocketWriterIOThreadFunc != null) { - WebSocketWriteIOThread writeIOThread = webSocketWriterIOThreadFunc.apply(webSocket); - updateWriteIOThread(webSocket._channel, writeIOThread); - webSocket._writeIOThread = writeIOThread; - } - } - protected String createSessionid() { byte[] bytes = new byte[16]; random.nextBytes(bytes); diff --git a/src/main/java/org/redkale/net/http/HttpServer.java b/src/main/java/org/redkale/net/http/HttpServer.java index 08c466c88..f083b3c64 100644 --- a/src/main/java/org/redkale/net/http/HttpServer.java +++ b/src/main/java/org/redkale/net/http/HttpServer.java @@ -45,8 +45,6 @@ public class HttpServer extends Server APP_EXECUTOR资源为null //RESNAME_APP_EXECUTOR protected ExecutorService workExecutor; @@ -84,9 +82,6 @@ public class HttpServer extends Server { - if (asyncGroup == null) { - groupLock.lock(); - try { - if (asyncGroup == null) { - WebSocketAsyncGroup g = new WebSocketAsyncGroup("Redkale-HTTP:" + address.getPort() + "-WebSocketWriteIOThread-%s", workExecutor, safeBufferPool); - g.start(); - asyncGroup = g; - } - } finally { - groupLock.unlock(); - } - } - return (WebSocketWriteIOThread) asyncGroup.nextWriteIOThread(); - }; - } - return rs; + return new HttpContext(contextConfig); } @Override diff --git a/src/main/java/org/redkale/net/http/WebSocket.java b/src/main/java/org/redkale/net/http/WebSocket.java index 3ef1b71b1..dd683558b 100644 --- a/src/main/java/org/redkale/net/http/WebSocket.java +++ b/src/main/java/org/redkale/net/http/WebSocket.java @@ -12,7 +12,6 @@ import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.*; import java.util.logging.*; import java.util.stream.Stream; import java.util.zip.*; @@ -88,8 +87,6 @@ public abstract class WebSocket { WebSocketWriteHandler _writeHandler; - WebSocketWriteIOThread _writeIOThread; - InetSocketAddress _sncpAddress; //分布式下不可为空 AsyncConnection _channel;//不可能为空 @@ -721,42 +718,6 @@ public abstract class WebSocket { return _engine.getLocalWebSockets(); } - /** - * 获取ByteBuffer生成器 - * - * @return Supplier - */ - protected Supplier getReadBufferSupplier() { - return this._channel.getReadBufferSupplier(); - } - - /** - * 获取ByteBuffer回收器 - * - * @return Consumer - */ - protected Consumer getReadBufferConsumer() { - return this._channel.getReadBufferConsumer(); - } - - /** - * 获取ByteBuffer生成器 - * - * @return Supplier - */ - protected Supplier getWriteBufferSupplier() { - return this._channel.getWriteBufferSupplier(); - } - - /** - * 获取ByteBuffer回收器 - * - * @return Consumer - */ - protected Consumer getWriteBufferConsumer() { - return this._channel.getWriteBufferConsumer(); - } - //------------------------------------------------------------------- /** * 返回sessionid, null表示连接不合法或异常,默认实现是request.sessionid(true),通常需要重写该方法 @@ -964,6 +925,12 @@ public abstract class WebSocket { } CompletableFuture future = _engine.removeLocalThenDisconnect(this); _channel.dispose(); + if (_readHandler != null) { + _readHandler.byteArrayPool.accept(_readHandler.halfFrameBytes); + } + if (_writeHandler != null) { + _writeHandler.byteArrayPool.accept(_writeHandler.writeArray); + } CompletableFuture closeFuture = onClose(code, reason); if (closeFuture == null) { return future; diff --git a/src/main/java/org/redkale/net/http/WebSocketAsyncGroup.java b/src/main/java/org/redkale/net/http/WebSocketAsyncGroup.java deleted file mode 100644 index beef66204..000000000 --- a/src/main/java/org/redkale/net/http/WebSocketAsyncGroup.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * - */ -package org.redkale.net.http; - -import java.io.IOException; -import java.util.concurrent.ExecutorService; -import org.redkale.net.*; -import org.redkale.util.ByteBufferPool; - -/** - * WebSocket只写版的AsyncIOGroup
- * 只会用到ioWriteThread - * - * - *

- * 详情见: https://redkale.org - * - * @author zhangjx - * - * @since 2.8.0 - */ -@Deprecated(since = "2.8.0") -class WebSocketAsyncGroup extends AsyncIOGroup { - - public WebSocketAsyncGroup(String threadNameFormat, ExecutorService workExecutor, ByteBufferPool safeBufferPool) { - super(threadNameFormat, workExecutor, safeBufferPool); - } - - @Override - protected AsyncIOThread createAsyncIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ByteBufferPool safeBufferPool) throws IOException { - return new WebSocketWriteIOThread(this.timeoutExecutor, g, name, index, threads, workExecutor, safeBufferPool); - } - -} diff --git a/src/main/java/org/redkale/net/http/WebSocketReadHandler.java b/src/main/java/org/redkale/net/http/WebSocketReadHandler.java index 283cf2400..fb8fa05aa 100644 --- a/src/main/java/org/redkale/net/http/WebSocketReadHandler.java +++ b/src/main/java/org/redkale/net/http/WebSocketReadHandler.java @@ -16,7 +16,7 @@ import org.redkale.convert.Convert; import org.redkale.net.AsyncIOThread; import static org.redkale.net.http.WebSocket.*; import org.redkale.net.http.WebSocketPacket.FrameType; -import org.redkale.util.ByteArray; +import org.redkale.util.*; /** * @@ -38,7 +38,9 @@ public class WebSocketReadHandler implements CompletionHandler byteArrayPool; + + protected final ByteArray halfFrameBytes; protected byte halfFrameOpcode; @@ -52,10 +54,12 @@ public class WebSocketReadHandler implements CompletionHandler messageConsumer) { + public WebSocketReadHandler(HttpContext context, WebSocket webSocket, ObjectPool byteArrayPool, BiConsumer messageConsumer) { this.context = context; - this.restMessageConsumer = messageConsumer; this.webSocket = webSocket; + this.byteArrayPool = byteArrayPool; + this.restMessageConsumer = messageConsumer; + this.halfFrameBytes = byteArrayPool.get(); this.ioReadThread = webSocket._channel.getReadIOThread(); this.logger = context.getLogger(); } diff --git a/src/main/java/org/redkale/net/http/WebSocketServlet.java b/src/main/java/org/redkale/net/http/WebSocketServlet.java index b42a40a23..50bcc2eae 100644 --- a/src/main/java/org/redkale/net/http/WebSocketServlet.java +++ b/src/main/java/org/redkale/net/http/WebSocketServlet.java @@ -70,6 +70,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl private final BiConsumer restMessageConsumer = createRestOnMessageConsumer(); + private final ObjectPool byteArrayPool = ObjectPool.createSafePool(1000, () -> new ByteArray(), null, ByteArray::recycle); + protected Type messageRestType; //RestWebSocket时会被修改 //同RestWebSocket.single @@ -284,9 +286,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl @Override public void completed(Integer result, Void attachment) { - webSocket._readHandler = new WebSocketReadHandler(response.getContext(), webSocket, restMessageConsumer); - webSocket._writeHandler = new WebSocketWriteHandler(response.getContext(), webSocket); - //response.getContext().updateWebSocketWriteIOThread(webSocket); + webSocket._readHandler = new WebSocketReadHandler(response.getContext(), webSocket, byteArrayPool, restMessageConsumer); + webSocket._writeHandler = new WebSocketWriteHandler(response.getContext(), webSocket, byteArrayPool); Runnable createUseridHandler = () -> { CompletableFuture userFuture = webSocket.createUserid(); diff --git a/src/main/java/org/redkale/net/http/WebSocketWriteHandler.java b/src/main/java/org/redkale/net/http/WebSocketWriteHandler.java index 58586083a..6eb316ce6 100644 --- a/src/main/java/org/redkale/net/http/WebSocketWriteHandler.java +++ b/src/main/java/org/redkale/net/http/WebSocketWriteHandler.java @@ -11,7 +11,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import static org.redkale.net.http.WebSocket.*; -import org.redkale.util.ByteArray; +import org.redkale.util.*; /** * @@ -25,27 +25,32 @@ public class WebSocketWriteHandler implements CompletionHandler { protected final AtomicBoolean writePending = new AtomicBoolean(); - protected final ByteArray writeArray = new ByteArray(); + protected final ObjectPool byteArrayPool; - protected final List> respList = new ArrayList(); + protected final ByteArray writeArray; - protected final ConcurrentLinkedDeque> requestQueue = new ConcurrentLinkedDeque(); + protected final List> respList = new ArrayList(); - public WebSocketWriteHandler(HttpContext context, WebSocket webSocket) { + protected final ConcurrentLinkedDeque> requestQueue = new ConcurrentLinkedDeque(); + + public WebSocketWriteHandler(HttpContext context, WebSocket webSocket, ObjectPool byteArrayPool) { this.context = context; this.webSocket = webSocket; + this.byteArrayPool = byteArrayPool; + this.writeArray = byteArrayPool.get(); } public CompletableFuture send(WebSocketPacket... packets) { - InnerWebSocketFuture future = new InnerWebSocketFuture<>(packets); + WebSocketFuture future = new WebSocketFuture<>(packets); if (writePending.compareAndSet(false, true)) { respList.clear(); respList.add(future); - writeArray.clear(); + ByteArray array = this.writeArray; + array.clear(); for (WebSocketPacket p : packets) { - writeEncode(p); + writeEncode(array, p); } - webSocket._channel.write(writeArray, this); + webSocket._channel.write(array, this); } else { requestQueue.offer(future); } @@ -55,35 +60,36 @@ public class WebSocketWriteHandler implements CompletionHandler { @Override public void completed(Integer result, Void attachment) { webSocket.lastSendTime = System.currentTimeMillis(); - for (InnerWebSocketFuture future : respList) { + for (WebSocketFuture future : respList) { future.complete(0); } respList.clear(); - writeArray.clear(); - InnerWebSocketFuture req; + ByteArray array = this.writeArray; + array.clear(); + WebSocketFuture req; while ((req = requestQueue.poll()) != null) { respList.add(req); for (WebSocketPacket p : req.packets) { - writeEncode(p); + writeEncode(array, p); } } - if (writeArray.isEmpty()) { + if (array.isEmpty()) { if (!writePending.compareAndSet(true, false)) { completed(0, attachment); } } else { - webSocket._channel.write(writeArray, this); + webSocket._channel.write(array, this); } } @Override public void failed(Throwable exc, Void attachment) { - InnerWebSocketFuture req; + WebSocketFuture req; try { while ((req = requestQueue.poll()) != null) { req.completeExceptionally(exc); } - for (InnerWebSocketFuture future : respList) { + for (WebSocketFuture future : respList) { future.completeExceptionally(exc); } respList.clear(); @@ -96,8 +102,7 @@ public class WebSocketWriteHandler implements CompletionHandler { } //消息编码 - protected void writeEncode(final WebSocketPacket packet) { - final ByteArray array = writeArray; + protected void writeEncode(final ByteArray array, final WebSocketPacket packet) { final byte opcode = (byte) (packet.type.getValue() | 0x80); final byte[] content = packet.getPayload(); final int len = content.length; @@ -116,15 +121,15 @@ public class WebSocketWriteHandler implements CompletionHandler { array.put(content); } - protected static class InnerWebSocketFuture extends CompletableFuture { + protected static class WebSocketFuture extends CompletableFuture { protected WebSocketPacket[] packets; - public InnerWebSocketFuture() { + public WebSocketFuture() { super(); } - public InnerWebSocketFuture(WebSocketPacket... packets) { + public WebSocketFuture(WebSocketPacket... packets) { super(); this.packets = packets; } diff --git a/src/main/java/org/redkale/net/http/WebSocketWriteIOThread.java b/src/main/java/org/redkale/net/http/WebSocketWriteIOThread.java deleted file mode 100644 index 265750ce9..000000000 --- a/src/main/java/org/redkale/net/http/WebSocketWriteIOThread.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * - */ -package org.redkale.net.http; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.CompletionHandler; -import java.util.Objects; -import java.util.concurrent.*; -import org.redkale.net.AsyncIOThread; -import org.redkale.util.*; - -/** - * WebSocket连接的IO写线程 - * - *

- * 详情见: https://redkale.org - * - * @author zhangjx - * - * @since 2.8.0 - */ -@Deprecated(since = "2.8.0") -public class WebSocketWriteIOThread extends AsyncIOThread { - - private final ScheduledExecutorService timeoutExecutor; - - private final BlockingDeque requestQueue = new LinkedBlockingDeque<>(); - - public WebSocketWriteIOThread(ScheduledExecutorService timeoutExecutor, ThreadGroup g, String name, int index, int threads, - ExecutorService workExecutor, ByteBufferPool safeBufferPool) throws IOException { - super(g, name, index, threads, workExecutor, safeBufferPool); - Objects.requireNonNull(timeoutExecutor); - this.timeoutExecutor = timeoutExecutor; - } - - public CompletableFuture send(WebSocket websocket, WebSocketPacket... packets) { - Objects.requireNonNull(websocket); - Objects.requireNonNull(packets); - WebSocketFuture future = new WebSocketFuture(this, websocket, packets); - int wts = websocket._channel.getWriteTimeoutSeconds(); - if (wts > 0) { - future.timeout = timeoutExecutor.schedule(future, wts, TimeUnit.SECONDS); - } - requestQueue.offer(future); - return future; - } - - @Override - public void run() { - final ByteBuffer buffer = getBufferSupplier().get(); - final int capacity = buffer.capacity(); - final ByteArray writeArray = new ByteArray(); - while (!isClosed()) { - WebSocketFuture entry; - try { - while ((entry = requestQueue.take()) != null) { - if (!entry.isDone()) { - writeArray.clear(); - for (WebSocketPacket packet : entry.packets) { - packet.writeEncode(writeArray); - } - if (writeArray.length() > 0) { - if (writeArray.length() <= capacity) { - buffer.clear(); - buffer.put(writeArray.content(), 0, writeArray.length()); - buffer.flip(); - entry.websocket._channel.write(buffer, entry, writeHandler); - } else { - entry.websocket._channel.write(writeArray, entry, writeHandler); - } - } - } - } - } catch (InterruptedException e) { - } - } - } - - protected final CompletionHandler writeHandler = new CompletionHandler() { - - @Override - public void completed(Integer result, WebSocketFuture attachment) { - attachment.cancelTimeout(); - attachment.workThread = null; - attachment.websocket = null; - attachment.packets = null; - runWork(() -> { - attachment.complete(0); - }); - } - - @Override - public void failed(Throwable exc, WebSocketFuture attachment) { - attachment.cancelTimeout(); - attachment.websocket.close(); - attachment.workThread = null; - attachment.websocket = null; - attachment.packets = null; - runWork(() -> { - attachment.completeExceptionally(exc); - }); - } - }; - -} diff --git a/src/main/java/org/redkale/util/ByteArray.java b/src/main/java/org/redkale/util/ByteArray.java index 2e6ae3638..48ca9b483 100644 --- a/src/main/java/org/redkale/util/ByteArray.java +++ b/src/main/java/org/redkale/util/ByteArray.java @@ -52,6 +52,14 @@ public final class ByteArray implements ByteTuple { return this; } + public boolean recycle() { + this.count = 0; + if (this.content != null && this.content.length > 1024 * 32) { + this.content = new byte[1024]; + } + return true; + } + /** * 设置count的新位置 *