diff --git a/src/main/java/org/redkale/net/AsyncIOGroup.java b/src/main/java/org/redkale/net/AsyncIOGroup.java index a824ac271..125bed1ff 100644 --- a/src/main/java/org/redkale/net/AsyncIOGroup.java +++ b/src/main/java/org/redkale/net/AsyncIOGroup.java @@ -48,15 +48,15 @@ public class AsyncIOGroup extends AsyncGroup { private final AtomicInteger writeIndex = new AtomicInteger(); //创建数 - final LongAdder connCreateCounter = new LongAdder(); + protected final LongAdder connCreateCounter = new LongAdder(); //在线数 - final LongAdder connLivingCounter = new LongAdder(); + protected final LongAdder connLivingCounter = new LongAdder(); //关闭数 - final LongAdder connClosedCounter = new LongAdder(); + protected final LongAdder connClosedCounter = new LongAdder(); - private ScheduledThreadPoolExecutor timeoutExecutor; + protected final ScheduledThreadPoolExecutor timeoutExecutor; public AsyncIOGroup(final int bufferCapacity, final int bufferPoolSize) { this(true, "Redkale-AnonymousClient-IOThread-%s", null, bufferCapacity, bufferPoolSize); @@ -73,36 +73,50 @@ public class AsyncIOGroup extends AsyncGroup { })); } + @SuppressWarnings("OverridableMethodCallInConstructor") public AsyncIOGroup(boolean client, String threadNameFormat, ExecutorService workExecutor, final int bufferCapacity, ObjectPool safeBufferPool) { this.bufferCapacity = bufferCapacity; final int threads = Utility.cpus(); this.ioReadThreads = new AsyncIOThread[threads]; this.ioWriteThreads = new AsyncIOThread[threads]; final ThreadGroup g = new ThreadGroup(String.format(threadNameFormat, "Group")); + + this.timeoutExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, (Runnable r) -> { + Thread t = new Thread(r, String.format(threadNameFormat, "Timeout")); + t.setDaemon(true); + return t; + }); try { for (int i = 0; i < threads; i++) { String indexfix = WorkThread.formatIndex(threads, i + 1); if (client) { - this.ioReadThreads[i] = new ClientReadIOThread(g, String.format(threadNameFormat, "Read-" + indexfix), i, threads, workExecutor, safeBufferPool); - this.ioWriteThreads[i] = new ClientWriteIOThread(g, String.format(threadNameFormat, "Write-" + indexfix), i, threads, workExecutor, safeBufferPool); + this.ioReadThreads[i] = createClientReadIOThread(g, String.format(threadNameFormat, "Read-" + indexfix), i, threads, workExecutor, safeBufferPool); + this.ioWriteThreads[i] = createClientWriteIOThread(g, String.format(threadNameFormat, "Write-" + indexfix), i, threads, workExecutor, safeBufferPool); } else { - this.ioReadThreads[i] = new AsyncIOThread(g, String.format(threadNameFormat, indexfix), i, threads, workExecutor, safeBufferPool); + this.ioReadThreads[i] = createAsyncIOThread(g, String.format(threadNameFormat, indexfix), i, threads, workExecutor, safeBufferPool); this.ioWriteThreads[i] = this.ioReadThreads[i]; } } if (client) { - this.connectThread = new ClientReadIOThread(g, String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, safeBufferPool); + this.connectThread = createClientReadIOThread(g, String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, safeBufferPool); } else { this.connectThread = null; } } catch (IOException e) { throw new RuntimeException(e); } - this.timeoutExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, (Runnable r) -> { - Thread t = new Thread(r, String.format(threadNameFormat, "Timeout")); - t.setDaemon(true); - return t; - }); + } + + protected AsyncIOThread createAsyncIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ObjectPool safeBufferPool) throws IOException { + return new AsyncIOThread(g, name, index, threads, workExecutor, safeBufferPool); + } + + protected AsyncIOThread createClientReadIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ObjectPool safeBufferPool) throws IOException { + return new ClientReadIOThread(g, name, index, threads, workExecutor, safeBufferPool); + } + + protected AsyncIOThread createClientWriteIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ObjectPool safeBufferPool) throws IOException { + return new ClientWriteIOThread(g, name, index, threads, workExecutor, safeBufferPool); } @Override diff --git a/src/main/java/org/redkale/net/http/HttpContext.java b/src/main/java/org/redkale/net/http/HttpContext.java index 253b45dd7..ab38c4ef6 100644 --- a/src/main/java/org/redkale/net/http/HttpContext.java +++ b/src/main/java/org/redkale/net/http/HttpContext.java @@ -8,6 +8,7 @@ package org.redkale.net.http; import java.nio.channels.CompletionHandler; import java.security.SecureRandom; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; import org.redkale.annotation.ConstructorParameters; import org.redkale.asm.*; import static org.redkale.asm.Opcodes.*; @@ -43,6 +44,8 @@ public class HttpContext extends Context { //所有Servlet方法都不需要读取http-header,lazyHeaders=true protected boolean lazyHeaders; //存在动态改值 + Function webSocketWriterIOThreadFunc; + // protected RequestURINode[] uriCacheNodes; public HttpContext(HttpContextConfig config) { super(config); @@ -79,6 +82,12 @@ public class HttpContext extends Context { super.updateWriteIOThread(conn, ioWriteThread); } + protected void updateWebSocketWriteIOThread(WebSocket webSocket) { + WebSocketWriteIOThread writeIOThread = webSocketWriterIOThreadFunc.apply(webSocket); + updateWriteIOThread(webSocket._channel, writeIOThread); + webSocket._writeIOThread = writeIOThread; + } + protected String createSessionid() { byte[] bytes = new byte[16]; random.nextBytes(bytes); diff --git a/src/main/java/org/redkale/net/http/HttpServer.java b/src/main/java/org/redkale/net/http/HttpServer.java index 1b4c3b9a2..7ff03d48d 100644 --- a/src/main/java/org/redkale/net/http/HttpServer.java +++ b/src/main/java/org/redkale/net/http/HttpServer.java @@ -43,6 +43,10 @@ public class HttpServer extends Server safeBufferPool; + private final Object groupLock = new Object(); + + private WebSocketAsyncGroup asyncGroup; + //配置 APP_EXECUTOR资源为null //RESNAME_APP_EXECUTOR protected ExecutorService workExecutor; @@ -80,6 +84,9 @@ public class HttpServer extends Server { + if (asyncGroup == null) { + synchronized (groupLock) { + if (asyncGroup == null) { + WebSocketAsyncGroup g = new WebSocketAsyncGroup("Redkale-HTTP:" + address.getPort() + "-WebSocketWriteIOThread-%s", workExecutor, bufferCapacity, safeBufferPool); + g.start(); + asyncGroup = g; + } + } + } + return (WebSocketWriteIOThread) asyncGroup.nextWriteIOThread(); + }; + return rs; } @Override diff --git a/src/main/java/org/redkale/net/http/Rest.java b/src/main/java/org/redkale/net/http/Rest.java index 0c295cce1..82d36aa66 100644 --- a/src/main/java/org/redkale/net/http/Rest.java +++ b/src/main/java/org/redkale/net/http/Rest.java @@ -544,7 +544,7 @@ public final class Rest { { av0 = fv.visitAnnotation(resDesc, true); av0.visit("name", res != null ? res.name() : res2.name()); - av0.visit("required", res != null ? res.required() : false); + av0.visit("required", res != null ? res.required() : true); av0.visitEnd(); } fv.visitEnd(); diff --git a/src/main/java/org/redkale/net/http/WebSocket.java b/src/main/java/org/redkale/net/http/WebSocket.java index 86c8c07ba..e413441f5 100644 --- a/src/main/java/org/redkale/net/http/WebSocket.java +++ b/src/main/java/org/redkale/net/http/WebSocket.java @@ -83,10 +83,10 @@ public abstract class WebSocket { WebSocketEngine _engine; //不可能为空 - //WebSocketRunner _runner; //不可能为空 WebSocketReadHandler _readHandler; - WebSocketWriteHandler _writeHandler; + //WebSocketWriteHandler _writeHandler; + WebSocketWriteIOThread _writeIOThread; InetSocketAddress _sncpAddress; //分布式下不可为空 @@ -238,14 +238,14 @@ public abstract class WebSocket { * @return 0表示成功, 非0表示错误码 */ CompletableFuture sendPacket(WebSocketPacket packet) { - if (this._writeHandler == null) { + if (this._writeIOThread == null) { if (delayPackets == null) { delayPackets = new ArrayList<>(); } delayPackets.add(packet); return CompletableFuture.completedFuture(RETCODE_DEAYSEND); } - CompletableFuture rs = this._writeHandler.send(packet); + CompletableFuture rs = this._writeIOThread.send(this, packet); if (_engine.logger.isLoggable(Level.FINER) && packet != WebSocketPacket.DEFAULT_PING_PACKET) { _engine.logger.finer("userid:" + getUserid() + " send websocket message(" + packet + ")" + " on " + this); } diff --git a/src/main/java/org/redkale/net/http/WebSocketAsyncGroup.java b/src/main/java/org/redkale/net/http/WebSocketAsyncGroup.java new file mode 100644 index 000000000..7172968eb --- /dev/null +++ b/src/main/java/org/redkale/net/http/WebSocketAsyncGroup.java @@ -0,0 +1,35 @@ +/* + * + */ +package org.redkale.net.http; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import org.redkale.net.*; +import org.redkale.util.ObjectPool; + +/** + * WebSocket只写版的AsyncIOGroup
+ * 只会用到ioWriteThread + * + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.8.0 + */ +class WebSocketAsyncGroup extends AsyncIOGroup { + + public WebSocketAsyncGroup(String threadNameFormat, ExecutorService workExecutor, int bufferCapacity, ObjectPool safeBufferPool) { + super(false, threadNameFormat, workExecutor, bufferCapacity, safeBufferPool); + } + + @Override + protected AsyncIOThread createAsyncIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ObjectPool safeBufferPool) throws IOException { + return new WebSocketWriteIOThread(this.timeoutExecutor, g, name, index, threads, workExecutor, safeBufferPool); + } + +} diff --git a/src/main/java/org/redkale/net/http/WebSocketFuture.java b/src/main/java/org/redkale/net/http/WebSocketFuture.java index 561e26a75..cbd53b1b3 100644 --- a/src/main/java/org/redkale/net/http/WebSocketFuture.java +++ b/src/main/java/org/redkale/net/http/WebSocketFuture.java @@ -3,7 +3,7 @@ */ package org.redkale.net.http; -import java.util.Objects; +import java.util.*; import java.util.concurrent.*; import org.redkale.net.WorkThread; @@ -19,7 +19,7 @@ import org.redkale.net.WorkThread; */ public class WebSocketFuture extends CompletableFuture implements Runnable { - WebSocketPacket packet; + WebSocketPacket[] packets; WebSocket websocket; @@ -27,12 +27,11 @@ public class WebSocketFuture extends CompletableFuture implements Runna ScheduledFuture timeout; - WebSocketFuture(WorkThread workThread, WebSocket websocket, WebSocketPacket packet) { + WebSocketFuture(WorkThread workThread, WebSocket websocket, WebSocketPacket... packets) { super(); - Objects.requireNonNull(workThread); this.workThread = workThread; this.websocket = websocket; - this.packet = packet; + this.packets = packets; } void cancelTimeout() { @@ -43,7 +42,7 @@ public class WebSocketFuture extends CompletableFuture implements Runna @Override //JDK9+ public WebSocketFuture newIncompleteFuture() { - WebSocketFuture future = new WebSocketFuture(workThread, websocket, packet); + WebSocketFuture future = new WebSocketFuture(workThread, websocket, packets); future.timeout = timeout; return future; } @@ -56,6 +55,6 @@ public class WebSocketFuture extends CompletableFuture implements Runna @Override public String toString() { - return getClass().getSimpleName() + "_" + Objects.hash(this) + "{websocket = " + websocket + ", packet = " + packet + "}"; + return getClass().getSimpleName() + "_" + Objects.hash(this) + "{websocket = " + websocket + ", packets = " + Arrays.toString(packets) + "}"; } } diff --git a/src/main/java/org/redkale/net/http/WebSocketServlet.java b/src/main/java/org/redkale/net/http/WebSocketServlet.java index f80d0a52b..dccd5c967 100644 --- a/src/main/java/org/redkale/net/http/WebSocketServlet.java +++ b/src/main/java/org/redkale/net/http/WebSocketServlet.java @@ -80,17 +80,17 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl //同RestWebSocket.single protected boolean single = true; //是否单用户单连接 - //同RestWebSocket.liveInterval - protected int liveInterval = DEFAILT_LIVEINTERVAL; + //同RestWebSocket.liveinterval + protected int liveinterval = DEFAILT_LIVEINTERVAL; //同RestWebSocket.wsMaxConns - protected int wsMaxConns = 0; + protected int wsmaxconns = 0; //同RestWebSocket.wsThreads - protected int wsThreads = 0; + protected int wsthreads = 0; //同RestWebSocket.wsMaxBody - protected int wsMaxBody = 32 * 1024; + protected int wsmaxbody = 32 * 1024; //同RestWebSocket.anyuser protected boolean anyuser = false; @@ -198,7 +198,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl } //存在WebSocketServlet,则此WebSocketNode必须是本地模式Service this.webSocketNode.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", - this.single, context, liveInterval, wsMaxConns, wsThreads, wsMaxBody, mergemsg, this.cryptor, this.webSocketNode, this.sendConvert, logger); + this.single, context, liveinterval, wsmaxconns, wsthreads, wsmaxbody, mergemsg, this.cryptor, this.webSocketNode, this.sendConvert, logger); this.webSocketNode.init(conf); this.webSocketNode.localEngine.init(conf); @@ -293,7 +293,9 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl @Override public void completed(Integer result, Void attachment) { webSocket._readHandler = new WebSocketReadHandler(response.getContext(), webSocket, restMessageConsumer); - webSocket._writeHandler = new WebSocketWriteHandler(response.getContext(), webSocket); + //webSocket._writeHandler = new WebSocketWriteHandler(response.getContext(), webSocket); + response.getContext().updateWebSocketWriteIOThread(webSocket); + Runnable createUseridHandler = () -> { CompletableFuture userFuture = webSocket.createUserid(); if (userFuture == null) { @@ -355,7 +357,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl if (webSocket.delayPackets != null) { //存在待发送的消息 List delayPackets = webSocket.delayPackets; webSocket.delayPackets = null; - CompletableFuture cf = webSocket._writeHandler.send(delayPackets.toArray(new WebSocketPacket[delayPackets.size()])); + CompletableFuture cf = webSocket._writeIOThread.send(webSocket, delayPackets.toArray(new WebSocketPacket[delayPackets.size()])); cf.whenComplete((Integer v, Throwable t) -> { if (userid == null || t != null) { if (t != null) { @@ -374,7 +376,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl if (webSocket.delayPackets != null) { //存在待发送的消息 List delayPackets = webSocket.delayPackets; webSocket.delayPackets = null; - CompletableFuture cf = webSocket._writeHandler.send(delayPackets.toArray(new WebSocketPacket[delayPackets.size()])); + CompletableFuture cf = webSocket._writeIOThread.send(webSocket, delayPackets.toArray(new WebSocketPacket[delayPackets.size()])); cf.whenComplete((Integer v, Throwable t) -> { if (sessionid == null || t != null) { if (t != null) { diff --git a/src/main/java/org/redkale/net/http/WebSocketWriteHandler.java b/src/main/java/org/redkale/net/http/WebSocketWriteHandler.java index 3f09d458d..e08edfa8c 100644 --- a/src/main/java/org/redkale/net/http/WebSocketWriteHandler.java +++ b/src/main/java/org/redkale/net/http/WebSocketWriteHandler.java @@ -5,18 +5,19 @@ */ package org.redkale.net.http; +import java.nio.channels.CompletionHandler; import java.util.*; -import java.util.logging.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; -import java.nio.channels.CompletionHandler; -import org.redkale.util.ByteArray; +import java.util.logging.Level; import static org.redkale.net.http.WebSocket.*; +import org.redkale.util.ByteArray; /** * * @author zhangjx */ +@Deprecated(since = "2.8.0") public class WebSocketWriteHandler implements CompletionHandler { protected final HttpContext context; diff --git a/src/main/java/org/redkale/net/http/WebSocketWriteIOThread.java b/src/main/java/org/redkale/net/http/WebSocketWriteIOThread.java index b4421460b..6218b9644 100644 --- a/src/main/java/org/redkale/net/http/WebSocketWriteIOThread.java +++ b/src/main/java/org/redkale/net/http/WebSocketWriteIOThread.java @@ -6,6 +6,7 @@ package org.redkale.net.http; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.CompletionHandler; +import java.util.Objects; import java.util.concurrent.*; import org.redkale.net.AsyncIOThread; import org.redkale.util.*; @@ -22,21 +23,24 @@ import org.redkale.util.*; */ public class WebSocketWriteIOThread extends AsyncIOThread { - private final BlockingDeque requestQueue = new LinkedBlockingDeque<>(); - private final ScheduledThreadPoolExecutor timeoutExecutor; + private final BlockingDeque requestQueue = new LinkedBlockingDeque<>(); + public WebSocketWriteIOThread(ScheduledThreadPoolExecutor timeoutExecutor, ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ObjectPool safeBufferPool) throws IOException { super(g, name, index, threads, workExecutor, safeBufferPool); + Objects.requireNonNull(timeoutExecutor); this.timeoutExecutor = timeoutExecutor; } - public CompletableFuture offerRequest(WebSocket websocket, WebSocketPacket packet) { - WebSocketFuture future = new WebSocketFuture(this, websocket, packet); - int ts = websocket._channel.getWriteTimeoutSeconds(); - if (ts > 0) { - future.timeout = timeoutExecutor.schedule(future, ts, TimeUnit.SECONDS); + public CompletableFuture send(WebSocket websocket, WebSocketPacket... packets) { + Objects.requireNonNull(websocket); + Objects.requireNonNull(packets); + WebSocketFuture future = new WebSocketFuture(this, websocket, packets); + int wts = websocket._channel.getWriteTimeoutSeconds(); + if (wts > 0) { + future.timeout = timeoutExecutor.schedule(future, wts, TimeUnit.SECONDS); } requestQueue.offer(future); return future; @@ -53,7 +57,9 @@ public class WebSocketWriteIOThread extends AsyncIOThread { while ((entry = requestQueue.take()) != null) { if (!entry.isDone()) { writeArray.clear(); - entry.packet.writeEncode(writeArray); + for (WebSocketPacket packet : entry.packets) { + packet.writeEncode(writeArray); + } if (writeArray.length() > 0) { if (writeArray.length() <= capacity) { buffer.clear(); @@ -78,7 +84,7 @@ public class WebSocketWriteIOThread extends AsyncIOThread { attachment.cancelTimeout(); attachment.workThread = null; attachment.websocket = null; - attachment.packet = null; + attachment.packets = null; runWork(() -> { attachment.complete(0); }); @@ -90,7 +96,7 @@ public class WebSocketWriteIOThread extends AsyncIOThread { attachment.websocket.close(); attachment.workThread = null; attachment.websocket = null; - attachment.packet = null; + attachment.packets = null; runWork(() -> { attachment.completeExceptionally(exc); });