优化DataSqlSource

This commit is contained in:
Redkale
2023-01-02 11:37:48 +08:00
parent 496b165d42
commit 21f66d13ee
4 changed files with 63 additions and 11 deletions

View File

@@ -618,8 +618,8 @@ public final class Application {
clientExecutor = Executors.newFixedThreadPool(clientThreads, (Runnable r) -> {
int i = wclientCounter.get();
int c = wclientCounter.incrementAndGet();
String threadname = "Redkale-Client-WorkThread-" + (c > 9 ? c : ("0" + c));
Thread t = new WorkThread(threadname, i, clientThreads, clientref.get(), r);
String threadName = "Redkale-Client-WorkThread-" + (c > 9 ? c : ("0" + c));
Thread t = new WorkThread(threadName, i, clientThreads, clientref.get(), r);
return t;
});
clientref.set(clientExecutor);

View File

@@ -38,8 +38,9 @@ public abstract class AbstractService implements Service {
* @param command 任务
*/
protected void runAsync(Runnable command) {
if (workExecutor != null) {
workExecutor.execute(command);
ExecutorService executor = this.workExecutor;
if (executor != null) {
executor.execute(command);
} else {
Thread thread = Thread.currentThread();
if (thread instanceof WorkThread) {
@@ -57,15 +58,16 @@ public abstract class AbstractService implements Service {
* @param command 任务
*/
protected void runAsync(int hash, Runnable command) {
if (workExecutor != null) {
if (workExecutor instanceof ThreadHashExecutor) {
((ThreadHashExecutor) workExecutor).execute(hash, command);
ExecutorService executor = this.workExecutor;
if (executor != null) {
if (executor instanceof ThreadHashExecutor) {
((ThreadHashExecutor) executor).execute(hash, command);
} else {
Thread thread = Thread.currentThread();
if (thread instanceof WorkThread) {
((WorkThread) thread).runAsync(hash, command);
} else {
workExecutor.execute(command);
executor.execute(command);
}
}
} else {
@@ -84,11 +86,16 @@ public abstract class AbstractService implements Service {
* @return ExecutorService
*/
protected ExecutorService getExecutor() {
if (workExecutor != null) return workExecutor;
ExecutorService executor = this.workExecutor;
if (executor != null) {
return executor;
}
Thread thread = Thread.currentThread();
if (thread instanceof WorkThread) {
ExecutorService e = ((WorkThread) thread).getWorkExecutor();
if (e != null) return e;
if (e != null) {
return e;
}
}
return ForkJoinPool.commonPool();
}

View File

@@ -8,7 +8,8 @@ package org.redkale.source;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.function.*;
import java.util.stream.Stream;
import org.redkale.annotation.AutoLoad;
@@ -16,6 +17,7 @@ import org.redkale.annotation.Comment;
import org.redkale.annotation.ResourceListener;
import org.redkale.annotation.ResourceType;
import org.redkale.convert.json.JsonConvert;
import org.redkale.net.WorkThread;
import org.redkale.persistence.Entity;
import org.redkale.service.*;
import org.redkale.util.*;
@@ -75,6 +77,9 @@ public abstract class AbstractDataSource extends AbstractService implements Data
//@since 2.8.0 //超过多少毫秒视为较慢, 会打印警告级别的日志, 默认值: 3000
public static final String DATA_SOURCE_SLOWMS_ERROR = "errorslowms";
//@since 2.8.0 //sourceExecutor线程数, 默认值: 内核数
public static final String DATA_SOURCE_THREADS = "threads";
//@since 2.7.0
public static final String DATA_SOURCE_AUTOMAPPING = "auto-mapping";
@@ -102,6 +107,22 @@ public abstract class AbstractDataSource extends AbstractService implements Data
//@since 2.7.0
public static final String DATA_SOURCE_TABLECOPY_SQLTEMPLATE = "tablecopy-sqltemplate";
private final Object executorLock = new Object();
private int sourceThreads = Utility.cpus();
private ExecutorService sourceExecutor;
@Override
public void init(AnyValue conf) {
super.init(conf);
if (conf.getAnyValue("read") == null) {
this.sourceThreads = conf.getIntValue(DATA_SOURCE_THREADS, Utility.cpus());
} else {
this.sourceThreads = conf.getAnyValue("read").getIntValue(DATA_SOURCE_THREADS, Utility.cpus());
}
}
@ResourceListener
public abstract void onResourceChange(ResourceEvent[] events);
@@ -255,6 +276,29 @@ public abstract class AbstractDataSource extends AbstractService implements Data
}
}
@Override
protected ExecutorService getExecutor() {
ExecutorService executor = this.sourceExecutor;
if (executor == null) {
synchronized (executorLock) {
if (this.sourceExecutor == null) {
final AtomicReference<ExecutorService> ref = new AtomicReference<>();
final AtomicInteger counter = new AtomicInteger();
final int threads = sourceThreads;
executor = Executors.newFixedThreadPool(threads, (Runnable r) -> {
int i = counter.get();
int c = counter.incrementAndGet();
String threadName = "Redkale-DataSource-WorkThread-" + (c > 9 ? c : ("0" + c));
Thread t = new WorkThread(threadName, i, threads, ref.get(), r);
return t;
});
this.sourceExecutor = executor;
}
}
}
return executor;
}
/**
* 是否虚拟化的持久对象
*

View File

@@ -100,6 +100,7 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
@Override
public void init(AnyValue conf) {
super.init(conf);
this.config = conf;
if (conf.getAnyValue("read") == null) { //没有读写分离
Properties rwConf = new Properties();