From 2f37dd9c03959df422319a3ce8fccb953d621646 Mon Sep 17 00:00:00 2001 From: redkale Date: Sun, 8 Sep 2024 00:17:45 +0800 Subject: [PATCH] Application.shareAsyncGroup --- .../java/org/redkale/boot/Application.java | 48 ++++++++++++++++--- src/main/java/org/redkale/net/AsyncGroup.java | 2 - .../net/AsyncNioTcpProtocolServer.java | 12 +++-- src/main/java/org/redkale/net/Context.java | 2 +- src/main/java/org/redkale/net/Server.java | 29 +++++++++-- src/main/java/org/redkale/net/WorkThread.java | 4 +- .../java/org/redkale/net/sncp/SncpServer.java | 13 +---- .../java/org/redkale/util/ByteBufferPool.java | 10 ++-- .../test/sncp/SncpRequestParseTest.java | 1 + .../java/org/redkale/test/sncp/SncpTest.java | 3 +- 10 files changed, 87 insertions(+), 37 deletions(-) diff --git a/src/main/java/org/redkale/boot/Application.java b/src/main/java/org/redkale/boot/Application.java index 2b6839d56..2675ec3b2 100644 --- a/src/main/java/org/redkale/boot/Application.java +++ b/src/main/java/org/redkale/boot/Application.java @@ -180,6 +180,9 @@ public final class Application { // 给客户端使用,包含SNCP客户端、自定义数据库客户端连接池 private AsyncIOGroup clientAsyncGroup; + // 给单一服务使用,有且仅有一个Server配置且buffer相关配置都是默认值的情况下才有值 + private AsyncIOGroup shareAsyncGroup; + // 服务配置项 final AnyValue config; @@ -838,11 +841,33 @@ public final class Application { clientWorkExecutor = WorkThread.createWorkExecutor(clientThreads, "Redkale-DefaultClient-WorkThread-%s"); executorLog.append(", threads=").append(clientThreads).append("}"); } - AsyncIOGroup ioGroup = new AsyncIOGroup( - "Redkale-DefaultClient-IOThread-%s", clientWorkExecutor, bufferCapacity, bufferPoolSize) - .skipClose(true); - this.clientAsyncGroup = ioGroup.start(); - + if (config.getAnyValues("server").length == 1) { + AnyValue servConf = config.getAnyValues("server")[0]; + if ("true".equals(servConf.getValue("shareio"))) { + String servNetprotocol = Server.getConfNetprotocol(servConf); + int servBufferCapacity = Server.getConfBufferCapacity(servConf, servNetprotocol); + int serverBufferPoolSize = Server.getConfBufferPoolSize(servConf); + int defBufferCapacity = "UDP".equals(servNetprotocol) + ? ByteBufferPool.DEFAULT_BUFFER_UDP_CAPACITY + : ByteBufferPool.DEFAULT_BUFFER_TCP_CAPACITY; + if (serverBufferPoolSize == ByteBufferPool.DEFAULT_BUFFER_POOL_SIZE + && servBufferCapacity == defBufferCapacity) { + AsyncIOGroup ioGroup = new AsyncIOGroup( + "Redkale-DefaultServlet-IOThread-%s", + workExecutor, servBufferCapacity, serverBufferPoolSize) + .skipClose(true); + this.shareAsyncGroup = ioGroup.start(); + } + } + } + if (this.shareAsyncGroup != null) { + this.clientAsyncGroup = this.shareAsyncGroup; + } else { + AsyncIOGroup ioGroup = new AsyncIOGroup( + "Redkale-DefaultClient-IOThread-%s", clientWorkExecutor, bufferCapacity, bufferPoolSize) + .skipClose(true); + this.clientAsyncGroup = ioGroup.start(); + } if (executorLog.length() > 0) { logger.log(Level.INFO, executorLog.toString()); } @@ -1506,10 +1531,15 @@ public final class Application { stopServers(); this.propertiesModule.destroy(); this.workExecutor.shutdownNow(); - if (this.clientAsyncGroup != null) { + if (this.shareAsyncGroup != null) { + long s = System.currentTimeMillis(); + this.shareAsyncGroup.dispose(); + logger.info("default.share.AsyncGroup destroy in " + (System.currentTimeMillis() - s) + " ms"); + } + if (this.clientAsyncGroup != null && this.clientAsyncGroup != this.shareAsyncGroup) { long s = System.currentTimeMillis(); this.clientAsyncGroup.dispose(); - logger.info("AsyncGroup destroy in " + (System.currentTimeMillis() - s) + " ms"); + logger.info("default.client.AsyncGroup destroy in " + (System.currentTimeMillis() - s) + " ms"); } this.onAppPostShutdown(); @@ -1726,6 +1756,10 @@ public final class Application { return clientAsyncGroup; } + public AsyncIOGroup getShareAsyncGroup() { + return shareAsyncGroup; + } + public ResourceFactory getResourceFactory() { return resourceFactory; } diff --git a/src/main/java/org/redkale/net/AsyncGroup.java b/src/main/java/org/redkale/net/AsyncGroup.java index 07882eee5..53d6f6231 100644 --- a/src/main/java/org/redkale/net/AsyncGroup.java +++ b/src/main/java/org/redkale/net/AsyncGroup.java @@ -19,8 +19,6 @@ import org.redkale.util.ByteBufferPool; */ public abstract class AsyncGroup { - public static final int UDP_BUFFER_CAPACITY = Integer.getInteger("redkale.udp.buffer.apacity", 1350); - public static AsyncGroup create( String threadNameFormat, final ExecutorService workExecutor, diff --git a/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java b/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java index 8c262ce12..b086aceea 100644 --- a/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java +++ b/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java @@ -87,8 +87,6 @@ class AsyncNioTcpProtocolServer extends ProtocolServer { LongAdder createResponseCounter = new LongAdder(); LongAdder cycleResponseCounter = new LongAdder(); - ByteBufferPool safeBufferPool = - server.createSafeBufferPool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize); ObjectPool safeResponsePool = server.createSafeResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize); final int respPoolMax = server.getResponsePoolSize(); @@ -125,8 +123,14 @@ class AsyncNioTcpProtocolServer extends ProtocolServer { ? "Redkale-IOServletThread-%s" : ("Redkale-" + server.name.replace("Server-", "") + "-IOServletThread-%s"); if (this.ioGroup == null) { - this.ioGroup = new AsyncIOGroup(threadNameFormat, null, safeBufferPool); - this.ioGroup.start(); + if (application != null && application.getShareAsyncGroup() != null) { + this.ioGroup = application.getShareAsyncGroup(); + } else { + ByteBufferPool safeBufferPool = + server.createSafeBufferPool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize); + this.ioGroup = new AsyncIOGroup(threadNameFormat, null, safeBufferPool); + this.ioGroup.start(); + } } Thread acceptThread = new Thread() { diff --git a/src/main/java/org/redkale/net/Context.java b/src/main/java/org/redkale/net/Context.java index aec148749..6c4918e22 100644 --- a/src/main/java/org/redkale/net/Context.java +++ b/src/main/java/org/redkale/net/Context.java @@ -202,7 +202,7 @@ public class Context { public int getMaxHeader() { return maxHeader; } - + public int getMaxBody() { return maxBody; } diff --git a/src/main/java/org/redkale/net/Server.java b/src/main/java/org/redkale/net/Server.java index 925588c8d..1a3e919bd 100644 --- a/src/main/java/org/redkale/net/Server.java +++ b/src/main/java/org/redkale/net/Server.java @@ -15,7 +15,6 @@ import java.util.logging.*; import javax.net.ssl.SSLContext; import org.redkale.boot.Application; import org.redkale.inject.ResourceFactory; -import static org.redkale.net.AsyncGroup.UDP_BUFFER_CAPACITY; import org.redkale.net.Filter; import org.redkale.util.*; @@ -143,12 +142,10 @@ public abstract class Server< this.maxHeader = parseLenth(config.getValue("maxHeader"), 16 * 1024); this.maxBody = parseLenth(config.getValue("maxBody"), "UDP".equalsIgnoreCase(netprotocol) ? 16 * 1024 : 256 * 1024); - int bufCapacity = parseLenth( - config.getValue("bufferCapacity"), - "UDP".equalsIgnoreCase(netprotocol) ? UDP_BUFFER_CAPACITY : 32 * 1024); + int bufCapacity = getConfBufferCapacity(config, netprotocol); this.bufferCapacity = "UDP".equalsIgnoreCase(netprotocol) ? bufCapacity : (bufCapacity < 1024 ? 1024 : bufCapacity); - this.bufferPoolSize = config.getIntValue("bufferPoolSize", ByteBufferPool.DEFAULT_BUFFER_POOL_SIZE); + this.bufferPoolSize = getConfBufferPoolSize(config); this.responsePoolSize = config.getIntValue("responsePoolSize", 1024); this.name = config.getValue( "name", @@ -194,6 +191,28 @@ public abstract class Server< this.context = this.createContext(); } + public static int getConfBufferCapacity(AnyValue config, String netprotocol) { + return parseLenth( + config.getValue("bufferCapacity"), + "UDP".equalsIgnoreCase(netprotocol) + ? ByteBufferPool.DEFAULT_BUFFER_UDP_CAPACITY + : ByteBufferPool.DEFAULT_BUFFER_TCP_CAPACITY); + } + + public static int getConfBufferPoolSize(AnyValue config) { + return config.getIntValue("bufferPoolSize", ByteBufferPool.DEFAULT_BUFFER_POOL_SIZE); + } + + public static String getConfNetprotocol(AnyValue config) { + if (config != null) { + String protocol = config.getValue("protocol", "").toUpperCase(); + if ("UDP".equals(protocol) || protocol.endsWith(".UDP")) { + return "UDP"; + } + } + return "TCP"; + } + protected static int parseLenth(String value, int defValue) { return (int) parseLenth(value, defValue + 0L); } diff --git a/src/main/java/org/redkale/net/WorkThread.java b/src/main/java/org/redkale/net/WorkThread.java index 94bd8c60e..5cf621c7d 100644 --- a/src/main/java/org/redkale/net/WorkThread.java +++ b/src/main/java/org/redkale/net/WorkThread.java @@ -19,9 +19,9 @@ import org.redkale.util.Utility; * @author zhangjx */ public class WorkThread extends Thread implements Executor { - + public static final int DEFAULT_WORK_POOL_SIZE = Utility.cpus() * 8; - + protected final ExecutorService workExecutor; // WorkThread下标,从0开始 diff --git a/src/main/java/org/redkale/net/sncp/SncpServer.java b/src/main/java/org/redkale/net/sncp/SncpServer.java index bd14e1464..68ff64e23 100644 --- a/src/main/java/org/redkale/net/sncp/SncpServer.java +++ b/src/main/java/org/redkale/net/sncp/SncpServer.java @@ -34,18 +34,7 @@ public class SncpServer extends Server { - public static final int DEFAULT_BUFFER_POOL_SIZE = Utility.cpus() * 4; - - public static final int DEFAULT_BUFFER_CAPACITY = 16 * 1024; + public static final int DEFAULT_BUFFER_POOL_SIZE = + Integer.getInteger("redkale.bytebuffer.pool.size", Utility.cpus() * 4); + + public static final int DEFAULT_BUFFER_TCP_CAPACITY = + Integer.getInteger("redkale.bytebuffer.tcp.apacity", 32 * 1024); + + public static final int DEFAULT_BUFFER_UDP_CAPACITY = Integer.getInteger("redkale.bytebuffer.udp.apacity", 1350); private final int bufferCapacity; diff --git a/src/test/java/org/redkale/test/sncp/SncpRequestParseTest.java b/src/test/java/org/redkale/test/sncp/SncpRequestParseTest.java index e53b82c32..4703d1c44 100644 --- a/src/test/java/org/redkale/test/sncp/SncpRequestParseTest.java +++ b/src/test/java/org/redkale/test/sncp/SncpRequestParseTest.java @@ -36,6 +36,7 @@ public class SncpRequestParseTest { SncpContext.SncpContextConfig config = new SncpContext.SncpContextConfig(); config.logger = Logger.getLogger(SncpRequestParseTest.class.getSimpleName()); config.serverAddress = sncpAddress; + config.maxHeader = 16 * 1024; config.maxBody = 1024 * 1024 * 1024; SncpContext context = new SncpContext(config); diff --git a/src/test/java/org/redkale/test/sncp/SncpTest.java b/src/test/java/org/redkale/test/sncp/SncpTest.java index 001d1c746..56041ef68 100644 --- a/src/test/java/org/redkale/test/sncp/SncpTest.java +++ b/src/test/java/org/redkale/test/sncp/SncpTest.java @@ -29,7 +29,8 @@ public class SncpTest { private static final String protocol = "SNCP.TCP"; // TCP UDP - private static final int clientCapacity = protocol.endsWith(".UDP") ? AsyncGroup.UDP_BUFFER_CAPACITY : 8192; + private static final int clientCapacity = + protocol.endsWith(".UDP") ? ByteBufferPool.DEFAULT_BUFFER_UDP_CAPACITY : 8192; private static ResourceFactory factory;