diff --git a/src/main/java/org/redkale/boot/Application.java b/src/main/java/org/redkale/boot/Application.java index 6fded3cc5..2398b2f78 100644 --- a/src/main/java/org/redkale/boot/Application.java +++ b/src/main/java/org/redkale/boot/Application.java @@ -618,8 +618,8 @@ public final class Application { clientExecutor = Executors.newFixedThreadPool(clientThreads, (Runnable r) -> { int i = wclientCounter.get(); int c = wclientCounter.incrementAndGet(); - String threadname = "Redkale-Client-WorkThread-" + (c > 9 ? c : ("0" + c)); - Thread t = new WorkThread(threadname, i, clientThreads, clientref.get(), r); + String threadName = "Redkale-Client-WorkThread-" + (c > 9 ? c : ("0" + c)); + Thread t = new WorkThread(threadName, i, clientThreads, clientref.get(), r); return t; }); clientref.set(clientExecutor); diff --git a/src/main/java/org/redkale/service/AbstractService.java b/src/main/java/org/redkale/service/AbstractService.java index a6a08c64f..7e107a167 100644 --- a/src/main/java/org/redkale/service/AbstractService.java +++ b/src/main/java/org/redkale/service/AbstractService.java @@ -38,8 +38,9 @@ public abstract class AbstractService implements Service { * @param command 任务 */ protected void runAsync(Runnable command) { - if (workExecutor != null) { - workExecutor.execute(command); + ExecutorService executor = this.workExecutor; + if (executor != null) { + executor.execute(command); } else { Thread thread = Thread.currentThread(); if (thread instanceof WorkThread) { @@ -57,15 +58,16 @@ public abstract class AbstractService implements Service { * @param command 任务 */ protected void runAsync(int hash, Runnable command) { - if (workExecutor != null) { - if (workExecutor instanceof ThreadHashExecutor) { - ((ThreadHashExecutor) workExecutor).execute(hash, command); + ExecutorService executor = this.workExecutor; + if (executor != null) { + if (executor instanceof ThreadHashExecutor) { + ((ThreadHashExecutor) executor).execute(hash, command); } else { Thread thread = Thread.currentThread(); if (thread instanceof WorkThread) { ((WorkThread) thread).runAsync(hash, command); } else { - workExecutor.execute(command); + executor.execute(command); } } } else { @@ -84,11 +86,16 @@ public abstract class AbstractService implements Service { * @return ExecutorService */ protected ExecutorService getExecutor() { - if (workExecutor != null) return workExecutor; + ExecutorService executor = this.workExecutor; + if (executor != null) { + return executor; + } Thread thread = Thread.currentThread(); if (thread instanceof WorkThread) { ExecutorService e = ((WorkThread) thread).getWorkExecutor(); - if (e != null) return e; + if (e != null) { + return e; + } } return ForkJoinPool.commonPool(); } diff --git a/src/main/java/org/redkale/source/AbstractDataSource.java b/src/main/java/org/redkale/source/AbstractDataSource.java index 937e01fb1..4ecab4ab0 100644 --- a/src/main/java/org/redkale/source/AbstractDataSource.java +++ b/src/main/java/org/redkale/source/AbstractDataSource.java @@ -8,7 +8,8 @@ package org.redkale.source; import java.io.Serializable; import java.net.InetSocketAddress; import java.util.*; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; import java.util.function.*; import java.util.stream.Stream; import org.redkale.annotation.AutoLoad; @@ -16,6 +17,7 @@ import org.redkale.annotation.Comment; import org.redkale.annotation.ResourceListener; import org.redkale.annotation.ResourceType; import org.redkale.convert.json.JsonConvert; +import org.redkale.net.WorkThread; import org.redkale.persistence.Entity; import org.redkale.service.*; import org.redkale.util.*; @@ -75,6 +77,9 @@ public abstract class AbstractDataSource extends AbstractService implements Data //@since 2.8.0 //超过多少毫秒视为较慢, 会打印警告级别的日志, 默认值: 3000 public static final String DATA_SOURCE_SLOWMS_ERROR = "errorslowms"; + //@since 2.8.0 //sourceExecutor线程数, 默认值: 内核数 + public static final String DATA_SOURCE_THREADS = "threads"; + //@since 2.7.0 public static final String DATA_SOURCE_AUTOMAPPING = "auto-mapping"; @@ -102,6 +107,22 @@ public abstract class AbstractDataSource extends AbstractService implements Data //@since 2.7.0 public static final String DATA_SOURCE_TABLECOPY_SQLTEMPLATE = "tablecopy-sqltemplate"; + private final Object executorLock = new Object(); + + private int sourceThreads = Utility.cpus(); + + private ExecutorService sourceExecutor; + + @Override + public void init(AnyValue conf) { + super.init(conf); + if (conf.getAnyValue("read") == null) { + this.sourceThreads = conf.getIntValue(DATA_SOURCE_THREADS, Utility.cpus()); + } else { + this.sourceThreads = conf.getAnyValue("read").getIntValue(DATA_SOURCE_THREADS, Utility.cpus()); + } + } + @ResourceListener public abstract void onResourceChange(ResourceEvent[] events); @@ -255,6 +276,29 @@ public abstract class AbstractDataSource extends AbstractService implements Data } } + @Override + protected ExecutorService getExecutor() { + ExecutorService executor = this.sourceExecutor; + if (executor == null) { + synchronized (executorLock) { + if (this.sourceExecutor == null) { + final AtomicReference ref = new AtomicReference<>(); + final AtomicInteger counter = new AtomicInteger(); + final int threads = sourceThreads; + executor = Executors.newFixedThreadPool(threads, (Runnable r) -> { + int i = counter.get(); + int c = counter.incrementAndGet(); + String threadName = "Redkale-DataSource-WorkThread-" + (c > 9 ? c : ("0" + c)); + Thread t = new WorkThread(threadName, i, threads, ref.get(), r); + return t; + }); + this.sourceExecutor = executor; + } + } + } + return executor; + } + /** * 是否虚拟化的持久对象 * diff --git a/src/main/java/org/redkale/source/DataSqlSource.java b/src/main/java/org/redkale/source/DataSqlSource.java index 3c990f43b..a3be1c2bd 100644 --- a/src/main/java/org/redkale/source/DataSqlSource.java +++ b/src/main/java/org/redkale/source/DataSqlSource.java @@ -100,6 +100,7 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi @Override public void init(AnyValue conf) { + super.init(conf); this.config = conf; if (conf.getAnyValue("read") == null) { //没有读写分离 Properties rwConf = new Properties();