From 16ebdcde8580edff8e2179df5562998863f5a9a6 Mon Sep 17 00:00:00 2001 From: Redkale Date: Thu, 24 Nov 2022 10:09:44 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=A2=E6=88=B7=E7=AB=AF=E7=BB=84AsyncGroup?= =?UTF-8?q?=E4=B8=8D=E5=86=8D=E4=BD=BF=E7=94=A8WorkExecutor=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=E6=B1=A0=E8=80=8C=E6=98=AF=E5=8D=95=E7=8B=AC=E6=96=B0?= =?UTF-8?q?=E5=BB=BA=E4=B8=80=E4=B8=AA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/boot/Application.java | 43 +++++++++++-------- .../org/redkale/source/DataSqlSource.java | 4 +- 2 files changed, 28 insertions(+), 19 deletions(-) diff --git a/src/main/java/org/redkale/boot/Application.java b/src/main/java/org/redkale/boot/Application.java index ab805f905..6651d0db7 100644 --- a/src/main/java/org/redkale/boot/Application.java +++ b/src/main/java/org/redkale/boot/Application.java @@ -109,7 +109,7 @@ public final class Application { * * @since 2.3.0 */ - public static final String RESNAME_APP_ASYNCGROUP = "APP_ASYNCGROUP"; + public static final String RESNAME_APP_CLIENT_ASYNCGROUP = "APP_CLIENT_ASYNCGROUP"; /** * 环境变量, 类型:Environment @@ -178,7 +178,7 @@ public final class Application { final TransportFactory sncpTransportFactory; //给客户端使用,包含SNCP客户端、自定义数据库客户端连接池 - final AsyncGroup asyncGroup; + final AsyncGroup clientAsyncGroup; //配置源管理接口 //@since 2.7.0 @@ -579,25 +579,26 @@ public final class Application { } ExecutorService workExecutor0 = null; + ExecutorService clientExecutor; { if (executorConf == null) executorConf = DefaultAnyValue.create(); final AtomicReference workref = new AtomicReference<>(); final int executorThreads = executorConf.getIntValue("threads", Math.max(2, Utility.cpus())); boolean executorHash = executorConf.getBoolValue("hash"); if (executorThreads > 0) { - final AtomicInteger workcounter = new AtomicInteger(); + final AtomicInteger workCounter = new AtomicInteger(); if (executorHash) { workExecutor0 = new ThreadHashExecutor(executorThreads, (Runnable r) -> { - int i = workcounter.get(); - int c = workcounter.incrementAndGet(); + int i = workCounter.get(); + int c = workCounter.incrementAndGet(); String threadname = "Redkale-HashWorkThread-" + (c > 9 ? c : ("0" + c)); Thread t = new WorkThread(threadname, i, executorThreads, workref.get(), r); return t; }); } else { workExecutor0 = Executors.newFixedThreadPool(executorThreads, (Runnable r) -> { - int i = workcounter.get(); - int c = workcounter.incrementAndGet(); + int i = workCounter.get(); + int c = workCounter.incrementAndGet(); String threadname = "Redkale-WorkThread-" + (c > 9 ? c : ("0" + c)); Thread t = new WorkThread(threadname, i, executorThreads, workref.get(), r); return t; @@ -605,16 +606,24 @@ public final class Application { } workref.set(workExecutor0); } + final AtomicInteger wclientCounter = new AtomicInteger(); + clientExecutor = Executors.newFixedThreadPool(Math.max(2, executorThreads / 2), (Runnable r) -> { + int i = wclientCounter.get(); + int c = wclientCounter.incrementAndGet(); + String threadname = "Redkale-ClientThread-" + (c > 9 ? c : ("0" + c)); + Thread t = new WorkThread(threadname, i, executorThreads, workref.get(), r); + return t; + }); } this.workExecutor = workExecutor0; this.resourceFactory.register(RESNAME_APP_EXECUTOR, Executor.class, this.workExecutor); this.resourceFactory.register(RESNAME_APP_EXECUTOR, ExecutorService.class, this.workExecutor); - this.asyncGroup = new AsyncIOGroup(true, null, this.workExecutor, bufferCapacity, bufferPoolSize); - this.resourceFactory.register(RESNAME_APP_ASYNCGROUP, AsyncGroup.class, this.asyncGroup); + this.clientAsyncGroup = new AsyncIOGroup(true, null, clientExecutor, bufferCapacity, bufferPoolSize); + this.resourceFactory.register(RESNAME_APP_CLIENT_ASYNCGROUP, AsyncGroup.class, this.clientAsyncGroup); this.excludelibs = excludelib0; - this.sncpTransportFactory = TransportFactory.create(this.asyncGroup, (SSLContext) null, Transport.DEFAULT_NETPROTOCOL, readTimeoutSeconds, writeTimeoutSeconds, strategy); + this.sncpTransportFactory = TransportFactory.create(this.clientAsyncGroup, (SSLContext) null, Transport.DEFAULT_NETPROTOCOL, readTimeoutSeconds, writeTimeoutSeconds, strategy); DefaultAnyValue tarnsportConf = DefaultAnyValue.create(TransportFactory.NAME_POOLMAXCONNS, System.getProperty("redkale.net.transport.pool.maxconns", "100")) .addValue(TransportFactory.NAME_PINGINTERVAL, System.getProperty("redkale.net.transport.ping.interval", "30")) .addValue(TransportFactory.NAME_CHECKINTERVAL, System.getProperty("redkale.net.transport.check.interval", "30")); @@ -923,7 +932,7 @@ public final class Application { resourceFactory.register((ResourceFactory rf, String srcResourceName, final Object srcObj, String resourceName, Field field, final Object attachment) -> { try { if (field.getAnnotation(Resource.class) == null) return; - HttpSimpleClient httpClient = HttpSimpleClient.create(asyncGroup); + HttpSimpleClient httpClient = HttpSimpleClient.create(clientAsyncGroup); field.set(srcObj, httpClient); rf.inject(resourceName, httpClient, null); // 给其可能包含@Resource的字段赋值; rf.register(resourceName, HttpSimpleClient.class, httpClient); @@ -932,8 +941,8 @@ public final class Application { } }, HttpSimpleClient.class); //-------------------------------------------------------------------------- - if (this.asyncGroup != null) { - ((AsyncIOGroup) this.asyncGroup).start(); + if (this.clientAsyncGroup != null) { + ((AsyncIOGroup) this.clientAsyncGroup).start(); } if (this.clusterAgent != null) { if (logger.isLoggable(Level.FINER)) logger.log(Level.FINER, "ClusterAgent (type = " + this.clusterAgent.getClass().getSimpleName() + ") initing"); @@ -1813,9 +1822,9 @@ public final class Application { this.propertiesAgent.destroy(config.getAnyValue("resources").getAnyValue("properties")); logger.info(this.propertiesAgent.getClass().getSimpleName() + " destroy in " + (System.currentTimeMillis() - s) + " ms"); } - if (this.asyncGroup != null) { + if (this.clientAsyncGroup != null) { long s = System.currentTimeMillis(); - ((AsyncIOGroup) this.asyncGroup).close(); + ((AsyncIOGroup) this.clientAsyncGroup).close(); logger.info("AsyncGroup destroy in " + (System.currentTimeMillis() - s) + " ms"); } this.sncpTransportFactory.shutdownNow(); @@ -1825,8 +1834,8 @@ public final class Application { return workExecutor; } - public AsyncGroup getAsyncGroup() { - return asyncGroup; + public AsyncGroup getClientAsyncGroup() { + return clientAsyncGroup; } public ResourceFactory getResourceFactory() { diff --git a/src/main/java/org/redkale/source/DataSqlSource.java b/src/main/java/org/redkale/source/DataSqlSource.java index d4d08253f..80969310f 100644 --- a/src/main/java/org/redkale/source/DataSqlSource.java +++ b/src/main/java/org/redkale/source/DataSqlSource.java @@ -54,8 +54,8 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi protected Properties writeConfProps; - @Resource(name = RESNAME_APP_ASYNCGROUP) - protected AsyncGroup asyncGroup; + @Resource(name = RESNAME_APP_CLIENT_ASYNCGROUP) + protected AsyncGroup clientAsyncGroup; @Resource(name = RESNAME_APP_EXECUTOR) protected ExecutorService workExecutor;