From 9cef4b4f7951eafb646f079884fe37fa156a4eaf Mon Sep 17 00:00:00 2001 From: Redkale Date: Fri, 6 Jan 2023 22:08:49 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=BA=BF=E7=A8=8B=E5=90=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/boot/Application.java | 33 +++------------ .../org/redkale/boot/LoggingFileHandler.java | 2 +- .../redkale/cluster/CacheClusterAgent.java | 5 ++- .../java/org/redkale/mq/MessageAgent.java | 3 +- src/main/java/org/redkale/net/AsyncGroup.java | 16 +++---- .../java/org/redkale/net/AsyncIOGroup.java | 27 ++++++------ .../net/AsyncNioTcpProtocolServer.java | 6 +-- .../net/AsyncNioUdpProtocolServer.java | 6 +-- src/main/java/org/redkale/net/WorkThread.java | 41 ++++++++++++++++++ .../java/org/redkale/net/client/Client.java | 42 ++++++++++++------- .../org/redkale/net/http/MultiContext.java | 8 ++-- .../java/org/redkale/net/http/MultiPart.java | 13 ++---- .../org/redkale/net/http/WebSocketEngine.java | 2 +- .../redkale/source/AbstractDataSource.java | 14 +------ .../org/redkale/source/CacheMemorySource.java | 2 +- .../java/org/redkale/source/EntityCache.java | 2 +- src/main/java/org/redkale/util/Utility.java | 38 +++++++++++++++++ 17 files changed, 155 insertions(+), 105 deletions(-) diff --git a/src/main/java/org/redkale/boot/Application.java b/src/main/java/org/redkale/boot/Application.java index 0e36d6c8b..f81d64197 100644 --- a/src/main/java/org/redkale/boot/Application.java +++ b/src/main/java/org/redkale/boot/Application.java @@ -590,45 +590,22 @@ public final class Application { final int workThreads = executorConf.getIntValue("threads", Math.max(2, Utility.cpus())); boolean workHash = executorConf.getBoolValue("hash", false); if (workThreads > 0) { - final AtomicInteger workCounter = new AtomicInteger(); if (workHash) { - workExecutor0 = new ThreadHashExecutor(workThreads, (Runnable r) -> { - int i = workCounter.get(); - int c = workCounter.incrementAndGet(); - String threadname = "Redkale-HashWorkThread-" + (c > 9 ? c : ("0" + c)); - Thread t = new WorkThread(threadname, i, workThreads, workref.get(), r); - return t; - }); + workExecutor0 = WorkThread.createHashExecutor(workThreads, "Redkale-HashWorkThread-%s"); } else { - workExecutor0 = Executors.newFixedThreadPool(workThreads, (Runnable r) -> { - int i = workCounter.get(); - int c = workCounter.incrementAndGet(); - String threadname = "Redkale-WorkThread-" + (c > 9 ? c : ("0" + c)); - Thread t = new WorkThread(threadname, i, workThreads, workref.get(), r); - return t; - }); + workExecutor0 = WorkThread.createExecutor(workThreads, "Redkale-WorkThread-%s"); } workref.set(workExecutor0); } - - //给所有client给一个默认的AsyncGroup - final AtomicReference clientref = new AtomicReference<>(); - final AtomicInteger wclientCounter = new AtomicInteger(); + //给所有client给一个默认的ExecutorService final int clientThreads = Math.max(Math.max(2, Utility.cpus()), workThreads / 2); - clientExecutor = Executors.newFixedThreadPool(clientThreads, (Runnable r) -> { - int i = wclientCounter.get(); - int c = wclientCounter.incrementAndGet(); - String threadName = "Redkale-Client-WorkThread-" + (c > 9 ? c : ("0" + c)); - Thread t = new WorkThread(threadName, i, clientThreads, clientref.get(), r); - return t; - }); - clientref.set(clientExecutor); + clientExecutor = WorkThread.createExecutor(clientThreads, "Redkale-DefaultClient-WorkThread-%s"); } this.workExecutor = workExecutor0; this.resourceFactory.register(RESNAME_APP_EXECUTOR, Executor.class, this.workExecutor); this.resourceFactory.register(RESNAME_APP_EXECUTOR, ExecutorService.class, this.workExecutor); - this.clientAsyncGroup = new AsyncIOGroup(true, null, clientExecutor, bufferCapacity, bufferPoolSize).skipClose(true); + this.clientAsyncGroup = new AsyncIOGroup(true, "Redkale-DefaultClient-IOThread-%s", clientExecutor, bufferCapacity, bufferPoolSize).skipClose(true); this.resourceFactory.register(RESNAME_APP_CLIENT_ASYNCGROUP, AsyncGroup.class, this.clientAsyncGroup); this.excludelibs = excludelib0; diff --git a/src/main/java/org/redkale/boot/LoggingFileHandler.java b/src/main/java/org/redkale/boot/LoggingFileHandler.java index c54075e09..077d24cc2 100644 --- a/src/main/java/org/redkale/boot/LoggingFileHandler.java +++ b/src/main/java/org/redkale/boot/LoggingFileHandler.java @@ -127,7 +127,7 @@ public class LoggingFileHandler extends LoggingBaseHandler { } private void open() { - final String name = "Redkale-Logging-" + getClass().getSimpleName() + "-Thread"; + final String name = "Redkale-Logging-" + getClass().getSimpleName().replace("Logging", "") + "-Thread"; new Thread() { { setName(name); diff --git a/src/main/java/org/redkale/cluster/CacheClusterAgent.java b/src/main/java/org/redkale/cluster/CacheClusterAgent.java index 2b423f48f..6d7273341 100644 --- a/src/main/java/org/redkale/cluster/CacheClusterAgent.java +++ b/src/main/java/org/redkale/cluster/CacheClusterAgent.java @@ -8,6 +8,7 @@ package org.redkale.cluster; import java.net.InetSocketAddress; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import org.redkale.annotation.*; import org.redkale.annotation.ResourceListener; @@ -115,12 +116,12 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable { @Override public void start() { if (this.scheduler == null) { + AtomicInteger counter = new AtomicInteger(); this.scheduler = new ScheduledThreadPoolExecutor(4, (Runnable r) -> { - final Thread t = new Thread(r, "Redkale-" + CacheClusterAgent.class.getSimpleName() + "-Task-Thread"); + final Thread t = new Thread(r, "Redkale-" + CacheClusterAgent.class.getSimpleName() + "-Task-Thread-" + counter.incrementAndGet()); t.setDaemon(true); return t; }); - } if (this.taskFuture != null) { this.taskFuture.cancel(true); diff --git a/src/main/java/org/redkale/mq/MessageAgent.java b/src/main/java/org/redkale/mq/MessageAgent.java index a004ea624..ce851a884 100644 --- a/src/main/java/org/redkale/mq/MessageAgent.java +++ b/src/main/java/org/redkale/mq/MessageAgent.java @@ -91,8 +91,7 @@ public abstract class MessageAgent implements Resourcable { } // application (it doesn't execute completion handlers). this.timeoutExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, (Runnable r) -> { - Thread t = new Thread(r); - t.setName("Redkale-MessageAgent-Timeout-Thread"); + Thread t = new Thread(r, "Redkale-MessageAgent-Timeout-Thread"); t.setDaemon(true); return t; }); diff --git a/src/main/java/org/redkale/net/AsyncGroup.java b/src/main/java/org/redkale/net/AsyncGroup.java index f9c951b2a..cfc24b7d9 100644 --- a/src/main/java/org/redkale/net/AsyncGroup.java +++ b/src/main/java/org/redkale/net/AsyncGroup.java @@ -22,20 +22,20 @@ import org.redkale.util.ObjectPool; */ public abstract class AsyncGroup { - public static AsyncGroup create(String threadPrefixName, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { - return new AsyncIOGroup(true, threadPrefixName, workExecutor, bufferCapacity, bufferPoolSize); + public static AsyncGroup create(String threadNameFormat, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { + return new AsyncIOGroup(true, threadNameFormat, workExecutor, bufferCapacity, bufferPoolSize); } - public static AsyncGroup create(String threadPrefixName, ExecutorService workExecutor, final int bufferCapacity, ObjectPool safeBufferPool) { - return new AsyncIOGroup(true, threadPrefixName, workExecutor, bufferCapacity, safeBufferPool); + public static AsyncGroup create(String threadNameFormat, ExecutorService workExecutor, final int bufferCapacity, ObjectPool safeBufferPool) { + return new AsyncIOGroup(true, threadNameFormat, workExecutor, bufferCapacity, safeBufferPool); } - public static AsyncGroup create(boolean client, String threadPrefixName, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { - return new AsyncIOGroup(client, threadPrefixName, workExecutor, bufferCapacity, bufferPoolSize); + public static AsyncGroup create(boolean client, String threadNameFormat, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { + return new AsyncIOGroup(client, threadNameFormat, workExecutor, bufferCapacity, bufferPoolSize); } - public static AsyncGroup create(boolean client, String threadPrefixName, ExecutorService workExecutor, final int bufferCapacity, ObjectPool safeBufferPool) { - return new AsyncIOGroup(client, threadPrefixName, workExecutor, bufferCapacity, safeBufferPool); + public static AsyncGroup create(boolean client, String threadNameFormat, ExecutorService workExecutor, final int bufferCapacity, ObjectPool safeBufferPool) { + return new AsyncIOGroup(client, threadNameFormat, workExecutor, bufferCapacity, safeBufferPool); } public CompletableFuture createTCPClient(final SocketAddress address) { diff --git a/src/main/java/org/redkale/net/AsyncIOGroup.java b/src/main/java/org/redkale/net/AsyncIOGroup.java index 8325eacaf..9b7ccf276 100644 --- a/src/main/java/org/redkale/net/AsyncIOGroup.java +++ b/src/main/java/org/redkale/net/AsyncIOGroup.java @@ -64,8 +64,8 @@ public class AsyncIOGroup extends AsyncGroup { this(true, null, null, bufferCapacity, bufferPoolSize); } - public AsyncIOGroup(boolean client, String threadPrefixName, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { - this(client, threadPrefixName, workExecutor, bufferCapacity, ObjectPool.createSafePool(null, null, bufferPoolSize, + public AsyncIOGroup(boolean client, String threadNameFormat, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { + this(client, threadNameFormat, workExecutor, bufferCapacity, ObjectPool.createSafePool(null, null, bufferPoolSize, (Object... params) -> ByteBuffer.allocateDirect(bufferCapacity), null, (e) -> { if (e == null || e.isReadOnly() || e.capacity() != bufferCapacity) { return false; @@ -75,44 +75,41 @@ public class AsyncIOGroup extends AsyncGroup { })); } - public AsyncIOGroup(boolean client, String threadPrefixName0, ExecutorService workExecutor, final int bufferCapacity, ObjectPool safeBufferPool) { + public AsyncIOGroup(boolean client, String threadNameFormat, ExecutorService workExecutor, final int bufferCapacity, ObjectPool safeBufferPool) { this.bufferCapacity = bufferCapacity; - final String threadPrefixName = threadPrefixName0 == null ? "Redkale-Client-IOThread" : threadPrefixName0; final int threads = Utility.cpus(); this.ioReadThreads = new AsyncIOThread[threads]; this.ioWriteThreads = new AsyncIOThread[threads]; try { for (int i = 0; i < threads; i++) { - String postfix = "-" + (i >= 9 ? (i + 1) : ("0" + (i + 1))); + String indexfix = WorkThread.formatIndex(threads, i + 1); ObjectPool unsafeReadBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(), safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler()); if (client) { - this.ioReadThreads[i] = new ClientIOThread(threadPrefixName + postfix, i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool); + this.ioReadThreads[i] = new ClientIOThread(String.format(threadNameFormat, indexfix), i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool); this.ioWriteThreads[i] = this.ioReadThreads[i]; - if (false) { - this.ioReadThreads[i].setName(threadPrefixName + "-Read" + postfix); + if (System.currentTimeMillis() < 1) { //暂时不使用 + this.ioReadThreads[i].setName(String.format(threadNameFormat, "Read-" + indexfix)); ObjectPool unsafeWriteBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(), safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler()); - this.ioWriteThreads[i] = new ClientWriteIOThread(threadPrefixName + "-Write" + postfix, i, threads, workExecutor, Selector.open(), unsafeWriteBufferPool, safeBufferPool); + this.ioWriteThreads[i] = new ClientWriteIOThread(String.format(threadNameFormat, "Write-" + indexfix), i, threads, workExecutor, Selector.open(), unsafeWriteBufferPool, safeBufferPool); } } else { - this.ioReadThreads[i] = new AsyncIOThread(threadPrefixName + postfix, i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool); + this.ioReadThreads[i] = new AsyncIOThread(String.format(threadNameFormat, indexfix), i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool); this.ioWriteThreads[i] = this.ioReadThreads[i]; } } if (client) { ObjectPool unsafeBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(), safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler()); - String name = threadPrefixName.replace("ServletThread", "ConnectThread").replace("IOThread", "IOConnectThread"); - this.connectThread = client ? new ClientIOThread(name, 0, 0, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool) - : new AsyncIOThread(name, 0, 0, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool); + this.connectThread = client ? new ClientIOThread(String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool) + : new AsyncIOThread(String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool); } } catch (IOException e) { throw new RuntimeException(e); } this.timeoutExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, (Runnable r) -> { - Thread t = new Thread(r); - t.setName(threadPrefixName + "-Timeout"); + Thread t = new Thread(r, String.format(threadNameFormat, "Timeout")); t.setDaemon(true); return t; }); diff --git a/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java b/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java index 0a478e0e5..55731e4c5 100644 --- a/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java +++ b/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java @@ -118,13 +118,13 @@ class AsyncNioTcpProtocolServer extends ProtocolServer { ObjectPool pool = localResponsePool.get(); (pool == null ? safeResponsePool : pool).accept(v); }; - final String threadPrefixName = server.name == null || server.name.isEmpty() ? "Redkale-IOServletThread" : ("Redkale-" + server.name.replace("Server-", "") + "-IOServletThread"); - this.ioGroup = new AsyncIOGroup(false, threadPrefixName, null, server.bufferCapacity, bufferPool); + final String threadNameFormat = server.name == null || server.name.isEmpty() ? "Redkale-IOServletThread-%s" : ("Redkale-" + server.name.replace("Server-", "") + "-IOServletThread-%s"); + this.ioGroup = new AsyncIOGroup(false, threadNameFormat, null, server.bufferCapacity, bufferPool); this.ioGroup.start(); this.acceptThread = new Thread() { { - setName(threadPrefixName.replace("ServletThread", "AcceptThread")); + setName(String.format(threadNameFormat, "Accept")); } @Override diff --git a/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java b/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java index 778aa5d73..fdee2b11c 100644 --- a/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java +++ b/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java @@ -106,8 +106,8 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { ObjectPool pool = localResponsePool.get(); (pool == null ? safeResponsePool : pool).accept(v); }; - final String threadPrefixName = server.name == null || server.name.isEmpty() ? "Redkale-IOServletThread" : ("Redkale-" + server.name.replace("Server-", "") + "-IOServletThread"); - this.ioGroup = new AsyncIOGroup(false, threadPrefixName, null, server.bufferCapacity, safeBufferPool); + final String threadNameFormat = server.name == null || server.name.isEmpty() ? "Redkale-IOServletThread-%s" : ("Redkale-" + server.name.replace("Server-", "") + "-IOServletThread-%s"); + this.ioGroup = new AsyncIOGroup(false, threadNameFormat, null, server.bufferCapacity, safeBufferPool); this.ioGroup.start(); this.serverChannel.register(this.selector, SelectionKey.OP_READ); @@ -116,7 +116,7 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler()); { - setName(threadPrefixName.replace("ServletThread", "AcceptThread")); + setName(String.format(threadNameFormat, "Accept")); } @Override diff --git a/src/main/java/org/redkale/net/WorkThread.java b/src/main/java/org/redkale/net/WorkThread.java index b0c6a6112..f5c2d90a6 100644 --- a/src/main/java/org/redkale/net/WorkThread.java +++ b/src/main/java/org/redkale/net/WorkThread.java @@ -7,6 +7,7 @@ package org.redkale.net; import java.util.Collection; import java.util.concurrent.*; +import java.util.concurrent.atomic.*; import org.redkale.util.ThreadHashExecutor; /** @@ -44,6 +45,46 @@ public class WorkThread extends Thread implements Executor { return t instanceof WorkThread ? (WorkThread) t : null; } + public static ExecutorService createHashExecutor(final int threads, final String threadNameFormat) { + final AtomicReference ref = new AtomicReference<>(); + final AtomicInteger counter = new AtomicInteger(); + return new ThreadHashExecutor(threads, (Runnable r) -> { + int i = counter.get(); + int c = counter.incrementAndGet(); + String threadName = String.format(threadNameFormat, formatIndex(threads, c)); + Thread t = new WorkThread(threadName, i, threads, ref.get(), r); + return t; + }); + } + + public static ExecutorService createExecutor(final int threads, final String threadNameFormat) { + final AtomicReference ref = new AtomicReference<>(); + final AtomicInteger counter = new AtomicInteger(); + return Executors.newFixedThreadPool(threads, (Runnable r) -> { + int i = counter.get(); + int c = counter.incrementAndGet(); + String threadName = String.format(threadNameFormat, formatIndex(threads, c)); + Thread t = new WorkThread(threadName, i, threads, ref.get(), r); + return t; + }); + } + + public static String formatIndex(int threads, int index) { + String v = String.valueOf(index); + if (threads >= 100) { + if (index < 10) { + v = "00" + v; + } else if (index < 100) { + v = "0" + v; + } + } else if (threads >= 10) { + if (index < 10) { + v = "0" + v; + } + } + return v; + } + @Override public void execute(Runnable command) { if (workExecutor == null) { diff --git a/src/main/java/org/redkale/net/client/Client.java b/src/main/java/org/redkale/net/client/Client.java index 3b6e06bf6..01788c5cf 100644 --- a/src/main/java/org/redkale/net/client/Client.java +++ b/src/main/java/org/redkale/net/client/Client.java @@ -24,12 +24,14 @@ import org.redkale.util.*; * @param 请求对象 * @param

响应对象 */ -public abstract class Client { +public abstract class Client implements Resourcable { public static final int DEFAULT_MAX_PIPELINES = 128; protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); + protected final String name; + protected final AsyncGroup group; //连接构造器 protected final boolean tcp; //是否TCP协议 @@ -76,39 +78,40 @@ public abstract class Client { //创建连接后进行的登录鉴权操作 protected Function, CompletableFuture> authenticate; - protected Client(AsyncGroup group, ClientAddress address) { - this(group, true, address, Utility.cpus(), DEFAULT_MAX_PIPELINES, null, null, null); + protected Client(String name, AsyncGroup group, ClientAddress address) { + this(name, group, true, address, Utility.cpus(), DEFAULT_MAX_PIPELINES, null, null, null); } - protected Client(AsyncGroup group, boolean tcp, ClientAddress address) { - this(group, tcp, address, Utility.cpus(), DEFAULT_MAX_PIPELINES, null, null, null); + protected Client(String name, AsyncGroup group, boolean tcp, ClientAddress address) { + this(name, group, tcp, address, Utility.cpus(), DEFAULT_MAX_PIPELINES, null, null, null); } - protected Client(AsyncGroup group, boolean tcp, ClientAddress address, int maxconns) { - this(group, tcp, address, maxconns, DEFAULT_MAX_PIPELINES, null, null, null); + protected Client(String name, AsyncGroup group, boolean tcp, ClientAddress address, int maxconns) { + this(name, group, tcp, address, maxconns, DEFAULT_MAX_PIPELINES, null, null, null); } - protected Client(AsyncGroup group, boolean tcp, ClientAddress address, int maxconns, int maxPipelines) { - this(group, tcp, address, maxconns, maxPipelines, null, null, null); + protected Client(String name, AsyncGroup group, boolean tcp, ClientAddress address, int maxconns, int maxPipelines) { + this(name, group, tcp, address, maxconns, maxPipelines, null, null, null); } - protected Client(AsyncGroup group, boolean tcp, ClientAddress address, int maxconns, + protected Client(String name, AsyncGroup group, boolean tcp, ClientAddress address, int maxconns, Function, CompletableFuture> authenticate) { - this(group, tcp, address, maxconns, DEFAULT_MAX_PIPELINES, null, null, authenticate); + this(name, group, tcp, address, maxconns, DEFAULT_MAX_PIPELINES, null, null, authenticate); } - protected Client(AsyncGroup group, boolean tcp, ClientAddress address, int maxconns, + protected Client(String name, AsyncGroup group, boolean tcp, ClientAddress address, int maxconns, Supplier closeRequestSupplier, Function, CompletableFuture> authenticate) { - this(group, tcp, address, maxconns, DEFAULT_MAX_PIPELINES, null, closeRequestSupplier, authenticate); + this(name, group, tcp, address, maxconns, DEFAULT_MAX_PIPELINES, null, closeRequestSupplier, authenticate); } @SuppressWarnings("OverridableMethodCallInConstructor") - protected Client(AsyncGroup group, boolean tcp, ClientAddress address, int maxconns, + protected Client(String name, AsyncGroup group, boolean tcp, ClientAddress address, int maxconns, int maxPipelines, Supplier pingRequestSupplier, Supplier closeRequestSupplier, Function, CompletableFuture> authenticate) { if (maxPipelines < 1) { throw new IllegalArgumentException("maxPipelines must bigger 0"); } address.checkValid(); + this.name = name; this.group = group; this.tcp = tcp; this.address = address; @@ -128,7 +131,7 @@ public abstract class Client { } //timeoutScheduler 不仅仅给超时用, 还给write用 this.timeoutScheduler = new ScheduledThreadPoolExecutor(1, (Runnable r) -> { - final Thread t = new Thread(r, "Redkale-" + Client.this.getClass().getSimpleName() + "-Interval-Thread"); + final Thread t = new Thread(r, "Redkale-" + Client.this.getClass().getSimpleName() + "-" + resourceName() + "-Timeout-Thread"); t.setDaemon(true); return t; }); @@ -305,6 +308,15 @@ public abstract class Client { return s; } + @Override + public String resourceName() { + return name; + } + + public String getName() { + return name; + } + public int getReadTimeoutSeconds() { return readTimeoutSeconds; } diff --git a/src/main/java/org/redkale/net/http/MultiContext.java b/src/main/java/org/redkale/net/http/MultiContext.java index aa03cd908..149fb22e3 100644 --- a/src/main/java/org/redkale/net/http/MultiContext.java +++ b/src/main/java/org/redkale/net/http/MultiContext.java @@ -157,13 +157,13 @@ public final class MultiContext { continue; //不遍历完后面getParameter可能获取不到值 } has = true; - if (fileNameRegx != null && !fileNameRegx.isEmpty() && !part.getFileName().matches(fileNameRegx)) { + if (fileNameRegx != null && !fileNameRegx.isEmpty() && !part.getFilename().matches(fileNameRegx)) { continue; } if (contentTypeRegx != null && !contentTypeRegx.isEmpty() && !part.getContentType().matches(contentTypeRegx)) { continue; } - File file = new File(home, "tmp/redkale-" + System.nanoTime() + "_" + part.getFileName()); + File file = new File(home, "tmp/redkale-" + System.nanoTime() + "_" + part.getFilename()); File parent = file.getParentFile(); if (!parent.isDirectory()) { parent.mkdirs(); @@ -197,13 +197,13 @@ public final class MultiContext { } List files = null; for (MultiPart part : parts()) { - if (fileNameRegx != null && !fileNameRegx.isEmpty() && !part.getFileName().matches(fileNameRegx)) { + if (fileNameRegx != null && !fileNameRegx.isEmpty() && !part.getFilename().matches(fileNameRegx)) { continue; } if (contentTypeRegx != null && !contentTypeRegx.isEmpty() && !part.getContentType().matches(contentTypeRegx)) { continue; } - File file = new File(home, "tmp/redkale-" + System.nanoTime() + "_" + part.getFileName()); + File file = new File(home, "tmp/redkale-" + System.nanoTime() + "_" + part.getFilename()); File parent = file.getParentFile(); if (!parent.isDirectory()) { parent.mkdirs(); diff --git a/src/main/java/org/redkale/net/http/MultiPart.java b/src/main/java/org/redkale/net/http/MultiPart.java index bc500a452..61d7bf977 100644 --- a/src/main/java/org/redkale/net/http/MultiPart.java +++ b/src/main/java/org/redkale/net/http/MultiPart.java @@ -17,7 +17,7 @@ import java.util.concurrent.atomic.LongAdder; */ public final class MultiPart { - private final String fileName; + private final String filename; private final String name; @@ -28,7 +28,7 @@ public final class MultiPart { private final LongAdder received; MultiPart(String fileName, String name, String contentType, LongAdder received, InputStream in) { - this.fileName = fileName; + this.filename = fileName; this.name = name; this.in = in; this.contentType = contentType; @@ -37,7 +37,7 @@ public final class MultiPart { @Override public String toString() { - return this.getClass().getSimpleName() + "{" + "name=" + name + ", fileName=" + fileName + ", contentType=" + contentType + ", received=" + received + '}'; + return this.getClass().getSimpleName() + "{" + "name=" + name + ", filename=" + filename + ", contentType=" + contentType + ", received=" + received + '}'; } public boolean save(File file) throws IOException { @@ -99,13 +99,8 @@ public final class MultiPart { return contentType; } - @Deprecated(since = "2.8.0") public String getFilename() { - return fileName; - } - - public String getFileName() { - return fileName; + return filename; } public String getName() { diff --git a/src/main/java/org/redkale/net/http/WebSocketEngine.java b/src/main/java/org/redkale/net/http/WebSocketEngine.java index f2458100e..bd7cf6da8 100644 --- a/src/main/java/org/redkale/net/http/WebSocketEngine.java +++ b/src/main/java/org/redkale/net/http/WebSocketEngine.java @@ -121,7 +121,7 @@ public class WebSocketEngine { return; } this.scheduler = new ScheduledThreadPoolExecutor(1, (Runnable r) -> { - final Thread t = new Thread(r, "Redkale-" + engineid + "-WebSocket-LiveInterval-Thread"); + final Thread t = new Thread(r, "Redkale-WebSocket-" + engineid + "-LiveInterval-Thread"); t.setDaemon(true); return t; }); diff --git a/src/main/java/org/redkale/source/AbstractDataSource.java b/src/main/java/org/redkale/source/AbstractDataSource.java index 4ecab4ab0..f9a05e542 100644 --- a/src/main/java/org/redkale/source/AbstractDataSource.java +++ b/src/main/java/org/redkale/source/AbstractDataSource.java @@ -9,7 +9,6 @@ import java.io.Serializable; import java.net.InetSocketAddress; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.*; import java.util.function.*; import java.util.stream.Stream; import org.redkale.annotation.AutoLoad; @@ -282,19 +281,10 @@ public abstract class AbstractDataSource extends AbstractService implements Data if (executor == null) { synchronized (executorLock) { if (this.sourceExecutor == null) { - final AtomicReference ref = new AtomicReference<>(); - final AtomicInteger counter = new AtomicInteger(); - final int threads = sourceThreads; - executor = Executors.newFixedThreadPool(threads, (Runnable r) -> { - int i = counter.get(); - int c = counter.incrementAndGet(); - String threadName = "Redkale-DataSource-WorkThread-" + (c > 9 ? c : ("0" + c)); - Thread t = new WorkThread(threadName, i, threads, ref.get(), r); - return t; - }); - this.sourceExecutor = executor; + this.sourceExecutor = WorkThread.createExecutor(sourceThreads, "Redkale-DataSource-WorkThread-" + resourceName() + "-%s"); } } + executor = this.sourceExecutor; } return executor; } diff --git a/src/main/java/org/redkale/source/CacheMemorySource.java b/src/main/java/org/redkale/source/CacheMemorySource.java index 35253ba2a..000a22f65 100644 --- a/src/main/java/org/redkale/source/CacheMemorySource.java +++ b/src/main/java/org/redkale/source/CacheMemorySource.java @@ -106,7 +106,7 @@ public final class CacheMemorySource extends AbstractCacheSource { } if (scheduler == null) { this.scheduler = new ScheduledThreadPoolExecutor(1, (Runnable r) -> { - final Thread t = new Thread(r, "Redkale-" + self.getClass().getSimpleName() + "-Expirer-Thread"); + final Thread t = new Thread(r, "Redkale-" + self.getClass().getSimpleName() + "-" + resourceName() + "-Expirer-Thread"); t.setDaemon(true); return t; }); diff --git a/src/main/java/org/redkale/source/EntityCache.java b/src/main/java/org/redkale/source/EntityCache.java index e8f6379dc..78ea586c4 100644 --- a/src/main/java/org/redkale/source/EntityCache.java +++ b/src/main/java/org/redkale/source/EntityCache.java @@ -152,7 +152,7 @@ public final class EntityCache { } if (this.interval > 0 && this.scheduler == null && info.fullloader != null) { this.scheduler = new ScheduledThreadPoolExecutor(1, (Runnable r) -> { - final Thread t = new Thread(r, "Redkale-EntityCache-" + type + "-Thread"); + final Thread t = new Thread(r, "Redkale-EntityCache-" + type.getSimpleName() + "-Thread"); t.setDaemon(true); return t; }); diff --git a/src/main/java/org/redkale/util/Utility.java b/src/main/java/org/redkale/util/Utility.java index 1f46d4830..8657d2b92 100644 --- a/src/main/java/org/redkale/util/Utility.java +++ b/src/main/java/org/redkale/util/Utility.java @@ -426,6 +426,44 @@ public final class Utility { }); } + /** + * 将字符串首字母大写 + * + * @param str 字符串 + * + * @return 首字母大写 + */ + public static String firstCharUpperCase(String str) { + if (str == null || str.isEmpty()) { + return str; + } + if (Character.isUpperCase(str.charAt(0))) { + return str; + } + char[] chs = str.toCharArray(); + chs[0] = Character.toUpperCase(chs[0]); + return new String(chs); + } + + /** + * 将字符串首字母小写 + * + * @param str 字符串 + * + * @return 首字母小写 + */ + public static String firstCharLowerCase(String str) { + if (str == null || str.isEmpty()) { + return str; + } + if (Character.isLowerCase(str.charAt(0))) { + return str; + } + char[] chs = str.toCharArray(); + chs[0] = Character.toLowerCase(chs[0]); + return new String(chs); + } + /** * 将多个key:value的字符串键值对组合成一个Map,items长度必须是偶数, 参数个数若是奇数的话,最后一个会被忽略 * 类似 JDK9中的 Map.of 方法