Application.shareAsyncGroup

This commit is contained in:
redkale
2024-09-08 00:17:45 +08:00
parent 9da964657d
commit 2f37dd9c03
10 changed files with 87 additions and 37 deletions

View File

@@ -180,6 +180,9 @@ public final class Application {
// 给客户端使用包含SNCP客户端、自定义数据库客户端连接池
private AsyncIOGroup clientAsyncGroup;
// 给单一服务使用有且仅有一个Server配置且buffer相关配置都是默认值的情况下才有值
private AsyncIOGroup shareAsyncGroup;
// 服务配置项
final AnyValue config;
@@ -838,11 +841,33 @@ public final class Application {
clientWorkExecutor = WorkThread.createWorkExecutor(clientThreads, "Redkale-DefaultClient-WorkThread-%s");
executorLog.append(", threads=").append(clientThreads).append("}");
}
AsyncIOGroup ioGroup = new AsyncIOGroup(
"Redkale-DefaultClient-IOThread-%s", clientWorkExecutor, bufferCapacity, bufferPoolSize)
.skipClose(true);
this.clientAsyncGroup = ioGroup.start();
if (config.getAnyValues("server").length == 1) {
AnyValue servConf = config.getAnyValues("server")[0];
if ("true".equals(servConf.getValue("shareio"))) {
String servNetprotocol = Server.getConfNetprotocol(servConf);
int servBufferCapacity = Server.getConfBufferCapacity(servConf, servNetprotocol);
int serverBufferPoolSize = Server.getConfBufferPoolSize(servConf);
int defBufferCapacity = "UDP".equals(servNetprotocol)
? ByteBufferPool.DEFAULT_BUFFER_UDP_CAPACITY
: ByteBufferPool.DEFAULT_BUFFER_TCP_CAPACITY;
if (serverBufferPoolSize == ByteBufferPool.DEFAULT_BUFFER_POOL_SIZE
&& servBufferCapacity == defBufferCapacity) {
AsyncIOGroup ioGroup = new AsyncIOGroup(
"Redkale-DefaultServlet-IOThread-%s",
workExecutor, servBufferCapacity, serverBufferPoolSize)
.skipClose(true);
this.shareAsyncGroup = ioGroup.start();
}
}
}
if (this.shareAsyncGroup != null) {
this.clientAsyncGroup = this.shareAsyncGroup;
} else {
AsyncIOGroup ioGroup = new AsyncIOGroup(
"Redkale-DefaultClient-IOThread-%s", clientWorkExecutor, bufferCapacity, bufferPoolSize)
.skipClose(true);
this.clientAsyncGroup = ioGroup.start();
}
if (executorLog.length() > 0) {
logger.log(Level.INFO, executorLog.toString());
}
@@ -1506,10 +1531,15 @@ public final class Application {
stopServers();
this.propertiesModule.destroy();
this.workExecutor.shutdownNow();
if (this.clientAsyncGroup != null) {
if (this.shareAsyncGroup != null) {
long s = System.currentTimeMillis();
this.shareAsyncGroup.dispose();
logger.info("default.share.AsyncGroup destroy in " + (System.currentTimeMillis() - s) + " ms");
}
if (this.clientAsyncGroup != null && this.clientAsyncGroup != this.shareAsyncGroup) {
long s = System.currentTimeMillis();
this.clientAsyncGroup.dispose();
logger.info("AsyncGroup destroy in " + (System.currentTimeMillis() - s) + " ms");
logger.info("default.client.AsyncGroup destroy in " + (System.currentTimeMillis() - s) + " ms");
}
this.onAppPostShutdown();
@@ -1726,6 +1756,10 @@ public final class Application {
return clientAsyncGroup;
}
public AsyncIOGroup getShareAsyncGroup() {
return shareAsyncGroup;
}
public ResourceFactory getResourceFactory() {
return resourceFactory;
}

View File

@@ -19,8 +19,6 @@ import org.redkale.util.ByteBufferPool;
*/
public abstract class AsyncGroup {
public static final int UDP_BUFFER_CAPACITY = Integer.getInteger("redkale.udp.buffer.apacity", 1350);
public static AsyncGroup create(
String threadNameFormat,
final ExecutorService workExecutor,

View File

@@ -87,8 +87,6 @@ class AsyncNioTcpProtocolServer extends ProtocolServer {
LongAdder createResponseCounter = new LongAdder();
LongAdder cycleResponseCounter = new LongAdder();
ByteBufferPool safeBufferPool =
server.createSafeBufferPool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize);
ObjectPool<Response> safeResponsePool =
server.createSafeResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize);
final int respPoolMax = server.getResponsePoolSize();
@@ -125,8 +123,14 @@ class AsyncNioTcpProtocolServer extends ProtocolServer {
? "Redkale-IOServletThread-%s"
: ("Redkale-" + server.name.replace("Server-", "") + "-IOServletThread-%s");
if (this.ioGroup == null) {
this.ioGroup = new AsyncIOGroup(threadNameFormat, null, safeBufferPool);
this.ioGroup.start();
if (application != null && application.getShareAsyncGroup() != null) {
this.ioGroup = application.getShareAsyncGroup();
} else {
ByteBufferPool safeBufferPool =
server.createSafeBufferPool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize);
this.ioGroup = new AsyncIOGroup(threadNameFormat, null, safeBufferPool);
this.ioGroup.start();
}
}
Thread acceptThread = new Thread() {

View File

@@ -202,7 +202,7 @@ public class Context {
public int getMaxHeader() {
return maxHeader;
}
public int getMaxBody() {
return maxBody;
}

View File

@@ -15,7 +15,6 @@ import java.util.logging.*;
import javax.net.ssl.SSLContext;
import org.redkale.boot.Application;
import org.redkale.inject.ResourceFactory;
import static org.redkale.net.AsyncGroup.UDP_BUFFER_CAPACITY;
import org.redkale.net.Filter;
import org.redkale.util.*;
@@ -143,12 +142,10 @@ public abstract class Server<
this.maxHeader = parseLenth(config.getValue("maxHeader"), 16 * 1024);
this.maxBody =
parseLenth(config.getValue("maxBody"), "UDP".equalsIgnoreCase(netprotocol) ? 16 * 1024 : 256 * 1024);
int bufCapacity = parseLenth(
config.getValue("bufferCapacity"),
"UDP".equalsIgnoreCase(netprotocol) ? UDP_BUFFER_CAPACITY : 32 * 1024);
int bufCapacity = getConfBufferCapacity(config, netprotocol);
this.bufferCapacity =
"UDP".equalsIgnoreCase(netprotocol) ? bufCapacity : (bufCapacity < 1024 ? 1024 : bufCapacity);
this.bufferPoolSize = config.getIntValue("bufferPoolSize", ByteBufferPool.DEFAULT_BUFFER_POOL_SIZE);
this.bufferPoolSize = getConfBufferPoolSize(config);
this.responsePoolSize = config.getIntValue("responsePoolSize", 1024);
this.name = config.getValue(
"name",
@@ -194,6 +191,28 @@ public abstract class Server<
this.context = this.createContext();
}
public static int getConfBufferCapacity(AnyValue config, String netprotocol) {
return parseLenth(
config.getValue("bufferCapacity"),
"UDP".equalsIgnoreCase(netprotocol)
? ByteBufferPool.DEFAULT_BUFFER_UDP_CAPACITY
: ByteBufferPool.DEFAULT_BUFFER_TCP_CAPACITY);
}
public static int getConfBufferPoolSize(AnyValue config) {
return config.getIntValue("bufferPoolSize", ByteBufferPool.DEFAULT_BUFFER_POOL_SIZE);
}
public static String getConfNetprotocol(AnyValue config) {
if (config != null) {
String protocol = config.getValue("protocol", "").toUpperCase();
if ("UDP".equals(protocol) || protocol.endsWith(".UDP")) {
return "UDP";
}
}
return "TCP";
}
protected static int parseLenth(String value, int defValue) {
return (int) parseLenth(value, defValue + 0L);
}

View File

@@ -19,9 +19,9 @@ import org.redkale.util.Utility;
* @author zhangjx
*/
public class WorkThread extends Thread implements Executor {
public static final int DEFAULT_WORK_POOL_SIZE = Utility.cpus() * 8;
protected final ExecutorService workExecutor;
// WorkThread下标从0开始

View File

@@ -34,18 +34,7 @@ public class SncpServer extends Server<Uint128, SncpContext, SncpRequest, SncpRe
public SncpServer(
Application application, long serverStartTime, AnyValue serconf, ResourceFactory resourceFactory) {
super(application, serverStartTime, netprotocol(serconf), resourceFactory, new SncpDispatcherServlet());
}
private static String netprotocol(AnyValue serconf) {
if (serconf == null) {
return "TCP";
}
String protocol = serconf.getValue("protocol", "").toUpperCase();
if (protocol.endsWith(".UDP")) {
return "UDP";
}
return "TCP";
super(application, serverStartTime, getConfNetprotocol(serconf), resourceFactory, new SncpDispatcherServlet());
}
@Override

View File

@@ -18,9 +18,13 @@ import java.util.concurrent.atomic.LongAdder;
*/
public class ByteBufferPool extends ObjectPool<ByteBuffer> {
public static final int DEFAULT_BUFFER_POOL_SIZE = Utility.cpus() * 4;
public static final int DEFAULT_BUFFER_CAPACITY = 16 * 1024;
public static final int DEFAULT_BUFFER_POOL_SIZE =
Integer.getInteger("redkale.bytebuffer.pool.size", Utility.cpus() * 4);
public static final int DEFAULT_BUFFER_TCP_CAPACITY =
Integer.getInteger("redkale.bytebuffer.tcp.apacity", 32 * 1024);
public static final int DEFAULT_BUFFER_UDP_CAPACITY = Integer.getInteger("redkale.bytebuffer.udp.apacity", 1350);
private final int bufferCapacity;

View File

@@ -36,6 +36,7 @@ public class SncpRequestParseTest {
SncpContext.SncpContextConfig config = new SncpContext.SncpContextConfig();
config.logger = Logger.getLogger(SncpRequestParseTest.class.getSimpleName());
config.serverAddress = sncpAddress;
config.maxHeader = 16 * 1024;
config.maxBody = 1024 * 1024 * 1024;
SncpContext context = new SncpContext(config);

View File

@@ -29,7 +29,8 @@ public class SncpTest {
private static final String protocol = "SNCP.TCP"; // TCP UDP
private static final int clientCapacity = protocol.endsWith(".UDP") ? AsyncGroup.UDP_BUFFER_CAPACITY : 8192;
private static final int clientCapacity =
protocol.endsWith(".UDP") ? ByteBufferPool.DEFAULT_BUFFER_UDP_CAPACITY : 8192;
private static ResourceFactory factory;