客户端组AsyncGroup不再使用WorkExecutor线程池而是单独新建一个

This commit is contained in:
Redkale
2022-11-24 10:09:44 +08:00
parent 4e675d007e
commit 16ebdcde85
2 changed files with 28 additions and 19 deletions

View File

@@ -109,7 +109,7 @@ public final class Application {
* *
* @since 2.3.0 * @since 2.3.0
*/ */
public static final String RESNAME_APP_ASYNCGROUP = "APP_ASYNCGROUP"; public static final String RESNAME_APP_CLIENT_ASYNCGROUP = "APP_CLIENT_ASYNCGROUP";
/** /**
* 环境变量, 类型Environment * 环境变量, 类型Environment
@@ -178,7 +178,7 @@ public final class Application {
final TransportFactory sncpTransportFactory; final TransportFactory sncpTransportFactory;
//给客户端使用包含SNCP客户端、自定义数据库客户端连接池 //给客户端使用包含SNCP客户端、自定义数据库客户端连接池
final AsyncGroup asyncGroup; final AsyncGroup clientAsyncGroup;
//配置源管理接口 //配置源管理接口
//@since 2.7.0 //@since 2.7.0
@@ -579,25 +579,26 @@ public final class Application {
} }
ExecutorService workExecutor0 = null; ExecutorService workExecutor0 = null;
ExecutorService clientExecutor;
{ {
if (executorConf == null) executorConf = DefaultAnyValue.create(); if (executorConf == null) executorConf = DefaultAnyValue.create();
final AtomicReference<ExecutorService> workref = new AtomicReference<>(); final AtomicReference<ExecutorService> workref = new AtomicReference<>();
final int executorThreads = executorConf.getIntValue("threads", Math.max(2, Utility.cpus())); final int executorThreads = executorConf.getIntValue("threads", Math.max(2, Utility.cpus()));
boolean executorHash = executorConf.getBoolValue("hash"); boolean executorHash = executorConf.getBoolValue("hash");
if (executorThreads > 0) { if (executorThreads > 0) {
final AtomicInteger workcounter = new AtomicInteger(); final AtomicInteger workCounter = new AtomicInteger();
if (executorHash) { if (executorHash) {
workExecutor0 = new ThreadHashExecutor(executorThreads, (Runnable r) -> { workExecutor0 = new ThreadHashExecutor(executorThreads, (Runnable r) -> {
int i = workcounter.get(); int i = workCounter.get();
int c = workcounter.incrementAndGet(); int c = workCounter.incrementAndGet();
String threadname = "Redkale-HashWorkThread-" + (c > 9 ? c : ("0" + c)); String threadname = "Redkale-HashWorkThread-" + (c > 9 ? c : ("0" + c));
Thread t = new WorkThread(threadname, i, executorThreads, workref.get(), r); Thread t = new WorkThread(threadname, i, executorThreads, workref.get(), r);
return t; return t;
}); });
} else { } else {
workExecutor0 = Executors.newFixedThreadPool(executorThreads, (Runnable r) -> { workExecutor0 = Executors.newFixedThreadPool(executorThreads, (Runnable r) -> {
int i = workcounter.get(); int i = workCounter.get();
int c = workcounter.incrementAndGet(); int c = workCounter.incrementAndGet();
String threadname = "Redkale-WorkThread-" + (c > 9 ? c : ("0" + c)); String threadname = "Redkale-WorkThread-" + (c > 9 ? c : ("0" + c));
Thread t = new WorkThread(threadname, i, executorThreads, workref.get(), r); Thread t = new WorkThread(threadname, i, executorThreads, workref.get(), r);
return t; return t;
@@ -605,16 +606,24 @@ public final class Application {
} }
workref.set(workExecutor0); workref.set(workExecutor0);
} }
final AtomicInteger wclientCounter = new AtomicInteger();
clientExecutor = Executors.newFixedThreadPool(Math.max(2, executorThreads / 2), (Runnable r) -> {
int i = wclientCounter.get();
int c = wclientCounter.incrementAndGet();
String threadname = "Redkale-ClientThread-" + (c > 9 ? c : ("0" + c));
Thread t = new WorkThread(threadname, i, executorThreads, workref.get(), r);
return t;
});
} }
this.workExecutor = workExecutor0; this.workExecutor = workExecutor0;
this.resourceFactory.register(RESNAME_APP_EXECUTOR, Executor.class, this.workExecutor); this.resourceFactory.register(RESNAME_APP_EXECUTOR, Executor.class, this.workExecutor);
this.resourceFactory.register(RESNAME_APP_EXECUTOR, ExecutorService.class, this.workExecutor); this.resourceFactory.register(RESNAME_APP_EXECUTOR, ExecutorService.class, this.workExecutor);
this.asyncGroup = new AsyncIOGroup(true, null, this.workExecutor, bufferCapacity, bufferPoolSize); this.clientAsyncGroup = new AsyncIOGroup(true, null, clientExecutor, bufferCapacity, bufferPoolSize);
this.resourceFactory.register(RESNAME_APP_ASYNCGROUP, AsyncGroup.class, this.asyncGroup); this.resourceFactory.register(RESNAME_APP_CLIENT_ASYNCGROUP, AsyncGroup.class, this.clientAsyncGroup);
this.excludelibs = excludelib0; this.excludelibs = excludelib0;
this.sncpTransportFactory = TransportFactory.create(this.asyncGroup, (SSLContext) null, Transport.DEFAULT_NETPROTOCOL, readTimeoutSeconds, writeTimeoutSeconds, strategy); this.sncpTransportFactory = TransportFactory.create(this.clientAsyncGroup, (SSLContext) null, Transport.DEFAULT_NETPROTOCOL, readTimeoutSeconds, writeTimeoutSeconds, strategy);
DefaultAnyValue tarnsportConf = DefaultAnyValue.create(TransportFactory.NAME_POOLMAXCONNS, System.getProperty("redkale.net.transport.pool.maxconns", "100")) DefaultAnyValue tarnsportConf = DefaultAnyValue.create(TransportFactory.NAME_POOLMAXCONNS, System.getProperty("redkale.net.transport.pool.maxconns", "100"))
.addValue(TransportFactory.NAME_PINGINTERVAL, System.getProperty("redkale.net.transport.ping.interval", "30")) .addValue(TransportFactory.NAME_PINGINTERVAL, System.getProperty("redkale.net.transport.ping.interval", "30"))
.addValue(TransportFactory.NAME_CHECKINTERVAL, System.getProperty("redkale.net.transport.check.interval", "30")); .addValue(TransportFactory.NAME_CHECKINTERVAL, System.getProperty("redkale.net.transport.check.interval", "30"));
@@ -923,7 +932,7 @@ public final class Application {
resourceFactory.register((ResourceFactory rf, String srcResourceName, final Object srcObj, String resourceName, Field field, final Object attachment) -> { resourceFactory.register((ResourceFactory rf, String srcResourceName, final Object srcObj, String resourceName, Field field, final Object attachment) -> {
try { try {
if (field.getAnnotation(Resource.class) == null) return; if (field.getAnnotation(Resource.class) == null) return;
HttpSimpleClient httpClient = HttpSimpleClient.create(asyncGroup); HttpSimpleClient httpClient = HttpSimpleClient.create(clientAsyncGroup);
field.set(srcObj, httpClient); field.set(srcObj, httpClient);
rf.inject(resourceName, httpClient, null); // 给其可能包含@Resource的字段赋值; rf.inject(resourceName, httpClient, null); // 给其可能包含@Resource的字段赋值;
rf.register(resourceName, HttpSimpleClient.class, httpClient); rf.register(resourceName, HttpSimpleClient.class, httpClient);
@@ -932,8 +941,8 @@ public final class Application {
} }
}, HttpSimpleClient.class); }, HttpSimpleClient.class);
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
if (this.asyncGroup != null) { if (this.clientAsyncGroup != null) {
((AsyncIOGroup) this.asyncGroup).start(); ((AsyncIOGroup) this.clientAsyncGroup).start();
} }
if (this.clusterAgent != null) { if (this.clusterAgent != null) {
if (logger.isLoggable(Level.FINER)) logger.log(Level.FINER, "ClusterAgent (type = " + this.clusterAgent.getClass().getSimpleName() + ") initing"); if (logger.isLoggable(Level.FINER)) logger.log(Level.FINER, "ClusterAgent (type = " + this.clusterAgent.getClass().getSimpleName() + ") initing");
@@ -1813,9 +1822,9 @@ public final class Application {
this.propertiesAgent.destroy(config.getAnyValue("resources").getAnyValue("properties")); this.propertiesAgent.destroy(config.getAnyValue("resources").getAnyValue("properties"));
logger.info(this.propertiesAgent.getClass().getSimpleName() + " destroy in " + (System.currentTimeMillis() - s) + " ms"); logger.info(this.propertiesAgent.getClass().getSimpleName() + " destroy in " + (System.currentTimeMillis() - s) + " ms");
} }
if (this.asyncGroup != null) { if (this.clientAsyncGroup != null) {
long s = System.currentTimeMillis(); long s = System.currentTimeMillis();
((AsyncIOGroup) this.asyncGroup).close(); ((AsyncIOGroup) this.clientAsyncGroup).close();
logger.info("AsyncGroup destroy in " + (System.currentTimeMillis() - s) + " ms"); logger.info("AsyncGroup destroy in " + (System.currentTimeMillis() - s) + " ms");
} }
this.sncpTransportFactory.shutdownNow(); this.sncpTransportFactory.shutdownNow();
@@ -1825,8 +1834,8 @@ public final class Application {
return workExecutor; return workExecutor;
} }
public AsyncGroup getAsyncGroup() { public AsyncGroup getClientAsyncGroup() {
return asyncGroup; return clientAsyncGroup;
} }
public ResourceFactory getResourceFactory() { public ResourceFactory getResourceFactory() {

View File

@@ -54,8 +54,8 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
protected Properties writeConfProps; protected Properties writeConfProps;
@Resource(name = RESNAME_APP_ASYNCGROUP) @Resource(name = RESNAME_APP_CLIENT_ASYNCGROUP)
protected AsyncGroup asyncGroup; protected AsyncGroup clientAsyncGroup;
@Resource(name = RESNAME_APP_EXECUTOR) @Resource(name = RESNAME_APP_EXECUTOR)
protected ExecutorService workExecutor; protected ExecutorService workExecutor;