From 875d335cc928c923ab4fae280d84994813b4f354 Mon Sep 17 00:00:00 2001 From: redkale Date: Thu, 7 Dec 2023 14:00:07 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B3=A8=E9=87=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/boot/Application.java | 46 +++++----- .../java/org/redkale/net/AsyncIOThread.java | 11 +++ src/main/java/org/redkale/net/WorkThread.java | 83 ++++++++++++++++++- .../java/org/redkale/net/client/Client.java | 8 +- 4 files changed, 116 insertions(+), 32 deletions(-) diff --git a/src/main/java/org/redkale/boot/Application.java b/src/main/java/org/redkale/boot/Application.java index 7ba97955b..5a4a80c00 100644 --- a/src/main/java/org/redkale/boot/Application.java +++ b/src/main/java/org/redkale/boot/Application.java @@ -19,6 +19,7 @@ import java.util.concurrent.atomic.*; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.logging.*; +import org.redkale.annotation.Nonnull; import org.redkale.annotation.Resource; import org.redkale.boot.ClassFilter.FilterEntry; import org.redkale.cluster.*; @@ -140,6 +141,7 @@ public final class Application { //业务逻辑线程池 //@since 2.3.0 + @Nonnull final ExecutorService workExecutor; //日志配置资源 @@ -590,37 +592,29 @@ public final class Application { final AnyValue executorConf = config.getAnyValue("executor", true); StringBuilder executorLog = new StringBuilder(); - ExecutorService workExecutor0 = null; - final int workThreads = executorConf.getIntValue("threads", Utility.cpus() * 4); - if (workThreads > 0) { - //指定threads则不使用虚拟线程池 - workExecutor0 = executorConf.getValue("threads") != null ? WorkThread.createExecutor(workThreads, "Redkale-WorkThread-%s") : WorkThread.createWorkExecutor(workThreads, "Redkale-WorkThread-%s"); - String executorName = workExecutor0.getClass().getSimpleName(); - executorLog.append("defaultWorkExecutor: {type=" + executorName); - if (executorName.contains("VirtualExecutor") || executorName.contains("PerTaskExecutor")) { - executorLog.append(", threads=[virtual]}"); - } else { - executorLog.append(", threads=" + workThreads + "}"); - } + final int workThreads = Math.max(Utility.cpus(), executorConf.getIntValue("threads", Utility.cpus() * 4)); + //指定threads则不使用虚拟线程池 + this.workExecutor = executorConf.getValue("threads") != null + ? WorkThread.createExecutor(workThreads, "Redkale-WorkThread-%s") + : WorkThread.createWorkExecutor(workThreads, "Redkale-WorkThread-%s"); + String executorName = this.workExecutor.getClass().getSimpleName(); + executorLog.append("defaultWorkExecutor: {type=" + executorName); + if (executorName.contains("VirtualExecutor") || executorName.contains("PerTaskExecutor")) { + executorLog.append(", threads=[virtual]}"); + } else { + executorLog.append(", threads=" + workThreads + "}"); } - this.workExecutor = workExecutor0; this.resourceFactory.register(RESNAME_APP_EXECUTOR, Executor.class, this.workExecutor); this.resourceFactory.register(RESNAME_APP_EXECUTOR, ExecutorService.class, this.workExecutor); - ExecutorService clientWorkExecutor = workExecutor0; - if (clientWorkExecutor == null) { - //给所有client给一个默认的ExecutorService - int clients = executorConf.getIntValue("clients", Utility.cpus()); - clientWorkExecutor = WorkThread.createWorkExecutor(clients, "Redkale-DefaultClient-WorkThread-%s"); - String executorName = clientWorkExecutor.getClass().getSimpleName(); - executorLog.append("clientWorkExecutor: {type=" + executorName); - if (executorName.contains("VirtualExecutor") || executorName.contains("PerTaskExecutor")) { - executorLog.append(", threads=[virtual]}"); - } else { - executorLog.append(", threads=" + workThreads + "}"); - } - } else { + ExecutorService clientWorkExecutor = this.workExecutor; + if (executorName.contains("VirtualExecutor") || executorName.contains("PerTaskExecutor")) { executorLog.append(", clientWorkExecutor: [workExecutor]"); + } else { + //给所有client给一个新的默认ExecutorService + int clientThreads = executorConf.getIntValue("clients", Utility.cpus() * 4); + clientWorkExecutor = WorkThread.createWorkExecutor(clientThreads, "Redkale-DefaultClient-WorkThread-%s"); + executorLog.append(", threads=" + clientThreads + "}"); } this.clientAsyncGroup = new AsyncIOGroup("Redkale-DefaultClient-IOThread-%s", clientWorkExecutor, bufferCapacity, bufferPoolSize).skipClose(true); this.resourceFactory.register(RESNAME_APP_CLIENT_ASYNCGROUP, AsyncGroup.class, this.clientAsyncGroup); diff --git a/src/main/java/org/redkale/net/AsyncIOThread.java b/src/main/java/org/redkale/net/AsyncIOThread.java index 97ca10ba4..88e39892a 100644 --- a/src/main/java/org/redkale/net/AsyncIOThread.java +++ b/src/main/java/org/redkale/net/AsyncIOThread.java @@ -54,6 +54,11 @@ public class AsyncIOThread extends WorkThread { return closed.get(); } + /** + * 当前IOThread线程,不是IOThread返回null + * + * @return IOThread线程 + */ public static AsyncIOThread currentAsyncIOThread() { Thread t = Thread.currentThread(); return t instanceof AsyncIOThread ? (AsyncIOThread) t : null; @@ -136,6 +141,9 @@ public class AsyncIOThread extends WorkThread { return bufferConsumer; } + /** + * 运行线程 + */ @Override public void run() { final Queue commands = this.commandQueue; @@ -208,6 +216,9 @@ public class AsyncIOThread extends WorkThread { } } + /** + * 关闭线程 + */ public void close() { if (this.closed.compareAndSet(false, true)) { try { diff --git a/src/main/java/org/redkale/net/WorkThread.java b/src/main/java/org/redkale/net/WorkThread.java index 64cc7abd8..5580a0dad 100644 --- a/src/main/java/org/redkale/net/WorkThread.java +++ b/src/main/java/org/redkale/net/WorkThread.java @@ -38,16 +38,37 @@ public class WorkThread extends Thread implements Executor { this.setDaemon(true); } + /** + * 当前WorkThread线程,不是WorkThread返回null + * + * @return WorkThread线程 + */ public static WorkThread currentWorkThread() { Thread t = Thread.currentThread(); return t instanceof WorkThread ? (WorkThread) t : null; } + /** + * 创建线程池,当前JDK若支持虚拟线程池,返回虚拟线程池 + * + * @param threads 线程数 + * @param threadNameFormat 格式化线程名 + * + * @return 线程池 + */ public static ExecutorService createWorkExecutor(final int threads, final String threadNameFormat) { final Function func = Utility.virtualExecutorFunction(); return func == null ? createExecutor(threads, threadNameFormat) : func.apply(threadNameFormat); } + /** + * 创建线程池 + * + * @param threads 线程数 + * @param threadNameFormat 格式化线程名 + * + * @return 线程池 + */ public static ExecutorService createExecutor(final int threads, final String threadNameFormat) { final AtomicReference ref = new AtomicReference<>(); final AtomicInteger counter = new AtomicInteger(); @@ -61,7 +82,15 @@ public class WorkThread extends Thread implements Executor { }); } - public static String formatIndex(int threads, int index) { + /** + * 根据线程池大小补位序号 + * + * @param threads 线程池大小 + * @param index 序号 + * + * @return 返回固定长度的序号 + */ + static String formatIndex(int threads, int index) { String v = String.valueOf(index); if (threads >= 100) { if (index < 10) { @@ -77,6 +106,14 @@ public class WorkThread extends Thread implements Executor { return v; } + /** + * 按以下优先级顺序的线程池执行给定的任务:
+ * 1、work线程池 + * 2、虚拟线程 + * 3、当前线程 + * + * @param command 任务 + */ @Override public void execute(Runnable command) { if (workExecutor == null) { @@ -86,6 +123,14 @@ public class WorkThread extends Thread implements Executor { } } + /** + * 按以下优先级顺序的线程池执行给定的任务集合:
+ * 1、work线程池 + * 2、虚拟线程 + * 3、当前线程 + * + * @param commands 任务集合 + */ public void execute(Runnable... commands) { if (workExecutor == null) { for (Runnable command : commands) { @@ -98,6 +143,14 @@ public class WorkThread extends Thread implements Executor { } } + /** + * 按以下优先级顺序的线程池执行给定的任务集合:
+ * 1、work线程池 + * 2、虚拟线程 + * 3、当前线程 + * + * @param commands 任务集合 + */ public void execute(Collection commands) { if (commands == null) { return; @@ -113,7 +166,16 @@ public class WorkThread extends Thread implements Executor { } } - //与execute的区别在于子类AsyncIOThread中execute会被重载,确保在IO线程中执行 + /** + * 按以下优先级顺序的线程池执行给定的任务:
+ * 1、work线程池 + * 2、虚拟线程 + * 3、当前线程 + * + * 与execute的区别:子类AsyncIOThread中execute会被重载,确保优先在IO线程中执行 + * + * @param command 任务 + */ public final void runWork(Runnable command) { if (workExecutor == null) { Utility.execute(command); @@ -122,6 +184,11 @@ public class WorkThread extends Thread implements Executor { } } + /** + * 获取work线程池 + * + * @return work线程池 + */ public ExecutorService getWorkExecutor() { return workExecutor; } @@ -136,10 +203,22 @@ public class WorkThread extends Thread implements Executor { return false; } + /** + * 判断当前线程是否为当前对象 + * + * @return 是否一致 + */ public boolean inCurrThread() { return this == Thread.currentThread(); } + /** + * 判断当前线程是否为指定线程 + * + * @param thread 线程 + * + * @return 是否一致 + */ public boolean inCurrThread(Thread thread) { return this == thread; } diff --git a/src/main/java/org/redkale/net/client/Client.java b/src/main/java/org/redkale/net/client/Client.java index ddfed5baf..84168e94a 100644 --- a/src/main/java/org/redkale/net/client/Client.java +++ b/src/main/java/org/redkale/net/client/Client.java @@ -56,13 +56,13 @@ public abstract class Client, R extends ClientR //不可protected、public private final ClientAddress address; //连接的地址 - protected ScheduledFuture timeoutFuture; - //连随机地址模式 - final int connLimit; //最大连接数 + private final int connLimit; //最大连接数 //连指定地址模式 - final ConcurrentHashMap connAddrEntrys = new ConcurrentHashMap<>(); + private final ConcurrentHashMap connAddrEntrys = new ConcurrentHashMap<>(); + + protected ScheduledFuture timeoutFuture; protected int maxPipelines = DEFAULT_MAX_PIPELINES; //单个连接最大并行处理数