This commit is contained in:
redkale
2023-12-07 14:00:07 +08:00
parent 989153815f
commit 875d335cc9
4 changed files with 116 additions and 32 deletions

View File

@@ -19,6 +19,7 @@ import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.logging.*;
import org.redkale.annotation.Nonnull;
import org.redkale.annotation.Resource;
import org.redkale.boot.ClassFilter.FilterEntry;
import org.redkale.cluster.*;
@@ -140,6 +141,7 @@ public final class Application {
//业务逻辑线程池
//@since 2.3.0
@Nonnull
final ExecutorService workExecutor;
//日志配置资源
@@ -590,37 +592,29 @@ public final class Application {
final AnyValue executorConf = config.getAnyValue("executor", true);
StringBuilder executorLog = new StringBuilder();
ExecutorService workExecutor0 = null;
final int workThreads = executorConf.getIntValue("threads", Utility.cpus() * 4);
if (workThreads > 0) {
//指定threads则不使用虚拟线程池
workExecutor0 = executorConf.getValue("threads") != null ? WorkThread.createExecutor(workThreads, "Redkale-WorkThread-%s") : WorkThread.createWorkExecutor(workThreads, "Redkale-WorkThread-%s");
String executorName = workExecutor0.getClass().getSimpleName();
executorLog.append("defaultWorkExecutor: {type=" + executorName);
if (executorName.contains("VirtualExecutor") || executorName.contains("PerTaskExecutor")) {
executorLog.append(", threads=[virtual]}");
} else {
executorLog.append(", threads=" + workThreads + "}");
}
final int workThreads = Math.max(Utility.cpus(), executorConf.getIntValue("threads", Utility.cpus() * 4));
//指定threads则不使用虚拟线程池
this.workExecutor = executorConf.getValue("threads") != null
? WorkThread.createExecutor(workThreads, "Redkale-WorkThread-%s")
: WorkThread.createWorkExecutor(workThreads, "Redkale-WorkThread-%s");
String executorName = this.workExecutor.getClass().getSimpleName();
executorLog.append("defaultWorkExecutor: {type=" + executorName);
if (executorName.contains("VirtualExecutor") || executorName.contains("PerTaskExecutor")) {
executorLog.append(", threads=[virtual]}");
} else {
executorLog.append(", threads=" + workThreads + "}");
}
this.workExecutor = workExecutor0;
this.resourceFactory.register(RESNAME_APP_EXECUTOR, Executor.class, this.workExecutor);
this.resourceFactory.register(RESNAME_APP_EXECUTOR, ExecutorService.class, this.workExecutor);
ExecutorService clientWorkExecutor = workExecutor0;
if (clientWorkExecutor == null) {
//给所有client给一个默认的ExecutorService
int clients = executorConf.getIntValue("clients", Utility.cpus());
clientWorkExecutor = WorkThread.createWorkExecutor(clients, "Redkale-DefaultClient-WorkThread-%s");
String executorName = clientWorkExecutor.getClass().getSimpleName();
executorLog.append("clientWorkExecutor: {type=" + executorName);
if (executorName.contains("VirtualExecutor") || executorName.contains("PerTaskExecutor")) {
executorLog.append(", threads=[virtual]}");
} else {
executorLog.append(", threads=" + workThreads + "}");
}
} else {
ExecutorService clientWorkExecutor = this.workExecutor;
if (executorName.contains("VirtualExecutor") || executorName.contains("PerTaskExecutor")) {
executorLog.append(", clientWorkExecutor: [workExecutor]");
} else {
//给所有client给一个新的默认ExecutorService
int clientThreads = executorConf.getIntValue("clients", Utility.cpus() * 4);
clientWorkExecutor = WorkThread.createWorkExecutor(clientThreads, "Redkale-DefaultClient-WorkThread-%s");
executorLog.append(", threads=" + clientThreads + "}");
}
this.clientAsyncGroup = new AsyncIOGroup("Redkale-DefaultClient-IOThread-%s", clientWorkExecutor, bufferCapacity, bufferPoolSize).skipClose(true);
this.resourceFactory.register(RESNAME_APP_CLIENT_ASYNCGROUP, AsyncGroup.class, this.clientAsyncGroup);

View File

@@ -54,6 +54,11 @@ public class AsyncIOThread extends WorkThread {
return closed.get();
}
/**
* 当前IOThread线程不是IOThread返回null
*
* @return IOThread线程
*/
public static AsyncIOThread currentAsyncIOThread() {
Thread t = Thread.currentThread();
return t instanceof AsyncIOThread ? (AsyncIOThread) t : null;
@@ -136,6 +141,9 @@ public class AsyncIOThread extends WorkThread {
return bufferConsumer;
}
/**
* 运行线程
*/
@Override
public void run() {
final Queue<Runnable> commands = this.commandQueue;
@@ -208,6 +216,9 @@ public class AsyncIOThread extends WorkThread {
}
}
/**
* 关闭线程
*/
public void close() {
if (this.closed.compareAndSet(false, true)) {
try {

View File

@@ -38,16 +38,37 @@ public class WorkThread extends Thread implements Executor {
this.setDaemon(true);
}
/**
* 当前WorkThread线程不是WorkThread返回null
*
* @return WorkThread线程
*/
public static WorkThread currentWorkThread() {
Thread t = Thread.currentThread();
return t instanceof WorkThread ? (WorkThread) t : null;
}
/**
* 创建线程池当前JDK若支持虚拟线程池返回虚拟线程池
*
* @param threads 线程数
* @param threadNameFormat 格式化线程名
*
* @return 线程池
*/
public static ExecutorService createWorkExecutor(final int threads, final String threadNameFormat) {
final Function<String, ExecutorService> func = Utility.virtualExecutorFunction();
return func == null ? createExecutor(threads, threadNameFormat) : func.apply(threadNameFormat);
}
/**
* 创建线程池
*
* @param threads 线程数
* @param threadNameFormat 格式化线程名
*
* @return 线程池
*/
public static ExecutorService createExecutor(final int threads, final String threadNameFormat) {
final AtomicReference<ExecutorService> ref = new AtomicReference<>();
final AtomicInteger counter = new AtomicInteger();
@@ -61,7 +82,15 @@ public class WorkThread extends Thread implements Executor {
});
}
public static String formatIndex(int threads, int index) {
/**
* 根据线程池大小补位序号
*
* @param threads 线程池大小
* @param index 序号
*
* @return 返回固定长度的序号
*/
static String formatIndex(int threads, int index) {
String v = String.valueOf(index);
if (threads >= 100) {
if (index < 10) {
@@ -77,6 +106,14 @@ public class WorkThread extends Thread implements Executor {
return v;
}
/**
* 按以下优先级顺序的线程池执行给定的任务: <br>
* 1、work线程池
* 2、虚拟线程
* 3、当前线程
*
* @param command 任务
*/
@Override
public void execute(Runnable command) {
if (workExecutor == null) {
@@ -86,6 +123,14 @@ public class WorkThread extends Thread implements Executor {
}
}
/**
* 按以下优先级顺序的线程池执行给定的任务集合: <br>
* 1、work线程池
* 2、虚拟线程
* 3、当前线程
*
* @param commands 任务集合
*/
public void execute(Runnable... commands) {
if (workExecutor == null) {
for (Runnable command : commands) {
@@ -98,6 +143,14 @@ public class WorkThread extends Thread implements Executor {
}
}
/**
* 按以下优先级顺序的线程池执行给定的任务集合: <br>
* 1、work线程池
* 2、虚拟线程
* 3、当前线程
*
* @param commands 任务集合
*/
public void execute(Collection<Runnable> commands) {
if (commands == null) {
return;
@@ -113,7 +166,16 @@ public class WorkThread extends Thread implements Executor {
}
}
//与execute的区别在于子类AsyncIOThread中execute会被重载确保在IO线程中执行
/**
* 按以下优先级顺序的线程池执行给定的任务: <br>
* 1、work线程池
* 2、虚拟线程
* 3、当前线程
* <bt>
* <b>与execute的区别子类AsyncIOThread中execute会被重载确保优先在IO线程中执行</b>
*
* @param command 任务
*/
public final void runWork(Runnable command) {
if (workExecutor == null) {
Utility.execute(command);
@@ -122,6 +184,11 @@ public class WorkThread extends Thread implements Executor {
}
}
/**
* 获取work线程池
*
* @return work线程池
*/
public ExecutorService getWorkExecutor() {
return workExecutor;
}
@@ -136,10 +203,22 @@ public class WorkThread extends Thread implements Executor {
return false;
}
/**
* 判断当前线程是否为当前对象
*
* @return 是否一致
*/
public boolean inCurrThread() {
return this == Thread.currentThread();
}
/**
* 判断当前线程是否为指定线程
*
* @param thread 线程
*
* @return 是否一致
*/
public boolean inCurrThread(Thread thread) {
return this == thread;
}

View File

@@ -56,13 +56,13 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
//不可protected、public
private final ClientAddress address; //连接的地址
protected ScheduledFuture timeoutFuture;
//连随机地址模式
final int connLimit; //最大连接数
private final int connLimit; //最大连接数
//连指定地址模式
final ConcurrentHashMap<SocketAddress, AddressConnEntry[]> connAddrEntrys = new ConcurrentHashMap<>();
private final ConcurrentHashMap<SocketAddress, AddressConnEntry[]> connAddrEntrys = new ConcurrentHashMap<>();
protected ScheduledFuture timeoutFuture;
protected int maxPipelines = DEFAULT_MAX_PIPELINES; //单个连接最大并行处理数