优化线程名

This commit is contained in:
Redkale
2023-01-06 22:08:49 +08:00
parent 1898fa7717
commit 9cef4b4f79
17 changed files with 155 additions and 105 deletions

View File

@@ -590,45 +590,22 @@ public final class Application {
final int workThreads = executorConf.getIntValue("threads", Math.max(2, Utility.cpus())); final int workThreads = executorConf.getIntValue("threads", Math.max(2, Utility.cpus()));
boolean workHash = executorConf.getBoolValue("hash", false); boolean workHash = executorConf.getBoolValue("hash", false);
if (workThreads > 0) { if (workThreads > 0) {
final AtomicInteger workCounter = new AtomicInteger();
if (workHash) { if (workHash) {
workExecutor0 = new ThreadHashExecutor(workThreads, (Runnable r) -> { workExecutor0 = WorkThread.createHashExecutor(workThreads, "Redkale-HashWorkThread-%s");
int i = workCounter.get();
int c = workCounter.incrementAndGet();
String threadname = "Redkale-HashWorkThread-" + (c > 9 ? c : ("0" + c));
Thread t = new WorkThread(threadname, i, workThreads, workref.get(), r);
return t;
});
} else { } else {
workExecutor0 = Executors.newFixedThreadPool(workThreads, (Runnable r) -> { workExecutor0 = WorkThread.createExecutor(workThreads, "Redkale-WorkThread-%s");
int i = workCounter.get();
int c = workCounter.incrementAndGet();
String threadname = "Redkale-WorkThread-" + (c > 9 ? c : ("0" + c));
Thread t = new WorkThread(threadname, i, workThreads, workref.get(), r);
return t;
});
} }
workref.set(workExecutor0); workref.set(workExecutor0);
} }
//给所有client给一个默认的ExecutorService
//给所有client给一个默认的AsyncGroup
final AtomicReference<ExecutorService> clientref = new AtomicReference<>();
final AtomicInteger wclientCounter = new AtomicInteger();
final int clientThreads = Math.max(Math.max(2, Utility.cpus()), workThreads / 2); final int clientThreads = Math.max(Math.max(2, Utility.cpus()), workThreads / 2);
clientExecutor = Executors.newFixedThreadPool(clientThreads, (Runnable r) -> { clientExecutor = WorkThread.createExecutor(clientThreads, "Redkale-DefaultClient-WorkThread-%s");
int i = wclientCounter.get();
int c = wclientCounter.incrementAndGet();
String threadName = "Redkale-Client-WorkThread-" + (c > 9 ? c : ("0" + c));
Thread t = new WorkThread(threadName, i, clientThreads, clientref.get(), r);
return t;
});
clientref.set(clientExecutor);
} }
this.workExecutor = workExecutor0; this.workExecutor = workExecutor0;
this.resourceFactory.register(RESNAME_APP_EXECUTOR, Executor.class, this.workExecutor); this.resourceFactory.register(RESNAME_APP_EXECUTOR, Executor.class, this.workExecutor);
this.resourceFactory.register(RESNAME_APP_EXECUTOR, ExecutorService.class, this.workExecutor); this.resourceFactory.register(RESNAME_APP_EXECUTOR, ExecutorService.class, this.workExecutor);
this.clientAsyncGroup = new AsyncIOGroup(true, null, clientExecutor, bufferCapacity, bufferPoolSize).skipClose(true); this.clientAsyncGroup = new AsyncIOGroup(true, "Redkale-DefaultClient-IOThread-%s", clientExecutor, bufferCapacity, bufferPoolSize).skipClose(true);
this.resourceFactory.register(RESNAME_APP_CLIENT_ASYNCGROUP, AsyncGroup.class, this.clientAsyncGroup); this.resourceFactory.register(RESNAME_APP_CLIENT_ASYNCGROUP, AsyncGroup.class, this.clientAsyncGroup);
this.excludelibs = excludelib0; this.excludelibs = excludelib0;

View File

@@ -127,7 +127,7 @@ public class LoggingFileHandler extends LoggingBaseHandler {
} }
private void open() { private void open() {
final String name = "Redkale-Logging-" + getClass().getSimpleName() + "-Thread"; final String name = "Redkale-Logging-" + getClass().getSimpleName().replace("Logging", "") + "-Thread";
new Thread() { new Thread() {
{ {
setName(name); setName(name);

View File

@@ -8,6 +8,7 @@ package org.redkale.cluster;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level; import java.util.logging.Level;
import org.redkale.annotation.*; import org.redkale.annotation.*;
import org.redkale.annotation.ResourceListener; import org.redkale.annotation.ResourceListener;
@@ -115,12 +116,12 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
@Override @Override
public void start() { public void start() {
if (this.scheduler == null) { if (this.scheduler == null) {
AtomicInteger counter = new AtomicInteger();
this.scheduler = new ScheduledThreadPoolExecutor(4, (Runnable r) -> { this.scheduler = new ScheduledThreadPoolExecutor(4, (Runnable r) -> {
final Thread t = new Thread(r, "Redkale-" + CacheClusterAgent.class.getSimpleName() + "-Task-Thread"); final Thread t = new Thread(r, "Redkale-" + CacheClusterAgent.class.getSimpleName() + "-Task-Thread-" + counter.incrementAndGet());
t.setDaemon(true); t.setDaemon(true);
return t; return t;
}); });
} }
if (this.taskFuture != null) { if (this.taskFuture != null) {
this.taskFuture.cancel(true); this.taskFuture.cancel(true);

View File

@@ -91,8 +91,7 @@ public abstract class MessageAgent implements Resourcable {
} }
// application (it doesn't execute completion handlers). // application (it doesn't execute completion handlers).
this.timeoutExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, (Runnable r) -> { this.timeoutExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, (Runnable r) -> {
Thread t = new Thread(r); Thread t = new Thread(r, "Redkale-MessageAgent-Timeout-Thread");
t.setName("Redkale-MessageAgent-Timeout-Thread");
t.setDaemon(true); t.setDaemon(true);
return t; return t;
}); });

View File

@@ -22,20 +22,20 @@ import org.redkale.util.ObjectPool;
*/ */
public abstract class AsyncGroup { public abstract class AsyncGroup {
public static AsyncGroup create(String threadPrefixName, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { public static AsyncGroup create(String threadNameFormat, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) {
return new AsyncIOGroup(true, threadPrefixName, workExecutor, bufferCapacity, bufferPoolSize); return new AsyncIOGroup(true, threadNameFormat, workExecutor, bufferCapacity, bufferPoolSize);
} }
public static AsyncGroup create(String threadPrefixName, ExecutorService workExecutor, final int bufferCapacity, ObjectPool<ByteBuffer> safeBufferPool) { public static AsyncGroup create(String threadNameFormat, ExecutorService workExecutor, final int bufferCapacity, ObjectPool<ByteBuffer> safeBufferPool) {
return new AsyncIOGroup(true, threadPrefixName, workExecutor, bufferCapacity, safeBufferPool); return new AsyncIOGroup(true, threadNameFormat, workExecutor, bufferCapacity, safeBufferPool);
} }
public static AsyncGroup create(boolean client, String threadPrefixName, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { public static AsyncGroup create(boolean client, String threadNameFormat, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) {
return new AsyncIOGroup(client, threadPrefixName, workExecutor, bufferCapacity, bufferPoolSize); return new AsyncIOGroup(client, threadNameFormat, workExecutor, bufferCapacity, bufferPoolSize);
} }
public static AsyncGroup create(boolean client, String threadPrefixName, ExecutorService workExecutor, final int bufferCapacity, ObjectPool<ByteBuffer> safeBufferPool) { public static AsyncGroup create(boolean client, String threadNameFormat, ExecutorService workExecutor, final int bufferCapacity, ObjectPool<ByteBuffer> safeBufferPool) {
return new AsyncIOGroup(client, threadPrefixName, workExecutor, bufferCapacity, safeBufferPool); return new AsyncIOGroup(client, threadNameFormat, workExecutor, bufferCapacity, safeBufferPool);
} }
public CompletableFuture<AsyncConnection> createTCPClient(final SocketAddress address) { public CompletableFuture<AsyncConnection> createTCPClient(final SocketAddress address) {

View File

@@ -64,8 +64,8 @@ public class AsyncIOGroup extends AsyncGroup {
this(true, null, null, bufferCapacity, bufferPoolSize); this(true, null, null, bufferCapacity, bufferPoolSize);
} }
public AsyncIOGroup(boolean client, String threadPrefixName, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { public AsyncIOGroup(boolean client, String threadNameFormat, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) {
this(client, threadPrefixName, workExecutor, bufferCapacity, ObjectPool.createSafePool(null, null, bufferPoolSize, this(client, threadNameFormat, workExecutor, bufferCapacity, ObjectPool.createSafePool(null, null, bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(bufferCapacity), null, (e) -> { (Object... params) -> ByteBuffer.allocateDirect(bufferCapacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != bufferCapacity) { if (e == null || e.isReadOnly() || e.capacity() != bufferCapacity) {
return false; return false;
@@ -75,44 +75,41 @@ public class AsyncIOGroup extends AsyncGroup {
})); }));
} }
public AsyncIOGroup(boolean client, String threadPrefixName0, ExecutorService workExecutor, final int bufferCapacity, ObjectPool<ByteBuffer> safeBufferPool) { public AsyncIOGroup(boolean client, String threadNameFormat, ExecutorService workExecutor, final int bufferCapacity, ObjectPool<ByteBuffer> safeBufferPool) {
this.bufferCapacity = bufferCapacity; this.bufferCapacity = bufferCapacity;
final String threadPrefixName = threadPrefixName0 == null ? "Redkale-Client-IOThread" : threadPrefixName0;
final int threads = Utility.cpus(); final int threads = Utility.cpus();
this.ioReadThreads = new AsyncIOThread[threads]; this.ioReadThreads = new AsyncIOThread[threads];
this.ioWriteThreads = new AsyncIOThread[threads]; this.ioWriteThreads = new AsyncIOThread[threads];
try { try {
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
String postfix = "-" + (i >= 9 ? (i + 1) : ("0" + (i + 1))); String indexfix = WorkThread.formatIndex(threads, i + 1);
ObjectPool<ByteBuffer> unsafeReadBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(), ObjectPool<ByteBuffer> unsafeReadBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(),
safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler()); safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler());
if (client) { if (client) {
this.ioReadThreads[i] = new ClientIOThread(threadPrefixName + postfix, i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool); this.ioReadThreads[i] = new ClientIOThread(String.format(threadNameFormat, indexfix), i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool);
this.ioWriteThreads[i] = this.ioReadThreads[i]; this.ioWriteThreads[i] = this.ioReadThreads[i];
if (false) { if (System.currentTimeMillis() < 1) { //暂时不使用
this.ioReadThreads[i].setName(threadPrefixName + "-Read" + postfix); this.ioReadThreads[i].setName(String.format(threadNameFormat, "Read-" + indexfix));
ObjectPool<ByteBuffer> unsafeWriteBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(), ObjectPool<ByteBuffer> unsafeWriteBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(),
safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler()); safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler());
this.ioWriteThreads[i] = new ClientWriteIOThread(threadPrefixName + "-Write" + postfix, i, threads, workExecutor, Selector.open(), unsafeWriteBufferPool, safeBufferPool); this.ioWriteThreads[i] = new ClientWriteIOThread(String.format(threadNameFormat, "Write-" + indexfix), i, threads, workExecutor, Selector.open(), unsafeWriteBufferPool, safeBufferPool);
} }
} else { } else {
this.ioReadThreads[i] = new AsyncIOThread(threadPrefixName + postfix, i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool); this.ioReadThreads[i] = new AsyncIOThread(String.format(threadNameFormat, indexfix), i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool);
this.ioWriteThreads[i] = this.ioReadThreads[i]; this.ioWriteThreads[i] = this.ioReadThreads[i];
} }
} }
if (client) { if (client) {
ObjectPool<ByteBuffer> unsafeBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(), ObjectPool<ByteBuffer> unsafeBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(),
safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler()); safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler());
String name = threadPrefixName.replace("ServletThread", "ConnectThread").replace("IOThread", "IOConnectThread"); this.connectThread = client ? new ClientIOThread(String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool)
this.connectThread = client ? new ClientIOThread(name, 0, 0, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool) : new AsyncIOThread(String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool);
: new AsyncIOThread(name, 0, 0, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool);
} }
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
this.timeoutExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, (Runnable r) -> { this.timeoutExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, (Runnable r) -> {
Thread t = new Thread(r); Thread t = new Thread(r, String.format(threadNameFormat, "Timeout"));
t.setName(threadPrefixName + "-Timeout");
t.setDaemon(true); t.setDaemon(true);
return t; return t;
}); });

View File

@@ -118,13 +118,13 @@ class AsyncNioTcpProtocolServer extends ProtocolServer {
ObjectPool<Response> pool = localResponsePool.get(); ObjectPool<Response> pool = localResponsePool.get();
(pool == null ? safeResponsePool : pool).accept(v); (pool == null ? safeResponsePool : pool).accept(v);
}; };
final String threadPrefixName = server.name == null || server.name.isEmpty() ? "Redkale-IOServletThread" : ("Redkale-" + server.name.replace("Server-", "") + "-IOServletThread"); final String threadNameFormat = server.name == null || server.name.isEmpty() ? "Redkale-IOServletThread-%s" : ("Redkale-" + server.name.replace("Server-", "") + "-IOServletThread-%s");
this.ioGroup = new AsyncIOGroup(false, threadPrefixName, null, server.bufferCapacity, bufferPool); this.ioGroup = new AsyncIOGroup(false, threadNameFormat, null, server.bufferCapacity, bufferPool);
this.ioGroup.start(); this.ioGroup.start();
this.acceptThread = new Thread() { this.acceptThread = new Thread() {
{ {
setName(threadPrefixName.replace("ServletThread", "AcceptThread")); setName(String.format(threadNameFormat, "Accept"));
} }
@Override @Override

View File

@@ -106,8 +106,8 @@ class AsyncNioUdpProtocolServer extends ProtocolServer {
ObjectPool<Response> pool = localResponsePool.get(); ObjectPool<Response> pool = localResponsePool.get();
(pool == null ? safeResponsePool : pool).accept(v); (pool == null ? safeResponsePool : pool).accept(v);
}; };
final String threadPrefixName = server.name == null || server.name.isEmpty() ? "Redkale-IOServletThread" : ("Redkale-" + server.name.replace("Server-", "") + "-IOServletThread"); final String threadNameFormat = server.name == null || server.name.isEmpty() ? "Redkale-IOServletThread-%s" : ("Redkale-" + server.name.replace("Server-", "") + "-IOServletThread-%s");
this.ioGroup = new AsyncIOGroup(false, threadPrefixName, null, server.bufferCapacity, safeBufferPool); this.ioGroup = new AsyncIOGroup(false, threadNameFormat, null, server.bufferCapacity, safeBufferPool);
this.ioGroup.start(); this.ioGroup.start();
this.serverChannel.register(this.selector, SelectionKey.OP_READ); this.serverChannel.register(this.selector, SelectionKey.OP_READ);
@@ -116,7 +116,7 @@ class AsyncNioUdpProtocolServer extends ProtocolServer {
safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler()); safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler());
{ {
setName(threadPrefixName.replace("ServletThread", "AcceptThread")); setName(String.format(threadNameFormat, "Accept"));
} }
@Override @Override

View File

@@ -7,6 +7,7 @@ package org.redkale.net;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import org.redkale.util.ThreadHashExecutor; import org.redkale.util.ThreadHashExecutor;
/** /**
@@ -44,6 +45,46 @@ public class WorkThread extends Thread implements Executor {
return t instanceof WorkThread ? (WorkThread) t : null; return t instanceof WorkThread ? (WorkThread) t : null;
} }
public static ExecutorService createHashExecutor(final int threads, final String threadNameFormat) {
final AtomicReference<ExecutorService> ref = new AtomicReference<>();
final AtomicInteger counter = new AtomicInteger();
return new ThreadHashExecutor(threads, (Runnable r) -> {
int i = counter.get();
int c = counter.incrementAndGet();
String threadName = String.format(threadNameFormat, formatIndex(threads, c));
Thread t = new WorkThread(threadName, i, threads, ref.get(), r);
return t;
});
}
public static ExecutorService createExecutor(final int threads, final String threadNameFormat) {
final AtomicReference<ExecutorService> ref = new AtomicReference<>();
final AtomicInteger counter = new AtomicInteger();
return Executors.newFixedThreadPool(threads, (Runnable r) -> {
int i = counter.get();
int c = counter.incrementAndGet();
String threadName = String.format(threadNameFormat, formatIndex(threads, c));
Thread t = new WorkThread(threadName, i, threads, ref.get(), r);
return t;
});
}
public static String formatIndex(int threads, int index) {
String v = String.valueOf(index);
if (threads >= 100) {
if (index < 10) {
v = "00" + v;
} else if (index < 100) {
v = "0" + v;
}
} else if (threads >= 10) {
if (index < 10) {
v = "0" + v;
}
}
return v;
}
@Override @Override
public void execute(Runnable command) { public void execute(Runnable command) {
if (workExecutor == null) { if (workExecutor == null) {

View File

@@ -24,12 +24,14 @@ import org.redkale.util.*;
* @param <R> 请求对象 * @param <R> 请求对象
* @param <P> 响应对象 * @param <P> 响应对象
*/ */
public abstract class Client<R extends ClientRequest, P> { public abstract class Client<R extends ClientRequest, P> implements Resourcable {
public static final int DEFAULT_MAX_PIPELINES = 128; public static final int DEFAULT_MAX_PIPELINES = 128;
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
protected final String name;
protected final AsyncGroup group; //连接构造器 protected final AsyncGroup group; //连接构造器
protected final boolean tcp; //是否TCP协议 protected final boolean tcp; //是否TCP协议
@@ -76,39 +78,40 @@ public abstract class Client<R extends ClientRequest, P> {
//创建连接后进行的登录鉴权操作 //创建连接后进行的登录鉴权操作
protected Function<CompletableFuture<ClientConnection>, CompletableFuture<ClientConnection>> authenticate; protected Function<CompletableFuture<ClientConnection>, CompletableFuture<ClientConnection>> authenticate;
protected Client(AsyncGroup group, ClientAddress address) { protected Client(String name, AsyncGroup group, ClientAddress address) {
this(group, true, address, Utility.cpus(), DEFAULT_MAX_PIPELINES, null, null, null); this(name, group, true, address, Utility.cpus(), DEFAULT_MAX_PIPELINES, null, null, null);
} }
protected Client(AsyncGroup group, boolean tcp, ClientAddress address) { protected Client(String name, AsyncGroup group, boolean tcp, ClientAddress address) {
this(group, tcp, address, Utility.cpus(), DEFAULT_MAX_PIPELINES, null, null, null); this(name, group, tcp, address, Utility.cpus(), DEFAULT_MAX_PIPELINES, null, null, null);
} }
protected Client(AsyncGroup group, boolean tcp, ClientAddress address, int maxconns) { protected Client(String name, AsyncGroup group, boolean tcp, ClientAddress address, int maxconns) {
this(group, tcp, address, maxconns, DEFAULT_MAX_PIPELINES, null, null, null); this(name, group, tcp, address, maxconns, DEFAULT_MAX_PIPELINES, null, null, null);
} }
protected Client(AsyncGroup group, boolean tcp, ClientAddress address, int maxconns, int maxPipelines) { protected Client(String name, AsyncGroup group, boolean tcp, ClientAddress address, int maxconns, int maxPipelines) {
this(group, tcp, address, maxconns, maxPipelines, null, null, null); this(name, group, tcp, address, maxconns, maxPipelines, null, null, null);
} }
protected Client(AsyncGroup group, boolean tcp, ClientAddress address, int maxconns, protected Client(String name, AsyncGroup group, boolean tcp, ClientAddress address, int maxconns,
Function<CompletableFuture<ClientConnection>, CompletableFuture<ClientConnection>> authenticate) { Function<CompletableFuture<ClientConnection>, CompletableFuture<ClientConnection>> authenticate) {
this(group, tcp, address, maxconns, DEFAULT_MAX_PIPELINES, null, null, authenticate); this(name, group, tcp, address, maxconns, DEFAULT_MAX_PIPELINES, null, null, authenticate);
} }
protected Client(AsyncGroup group, boolean tcp, ClientAddress address, int maxconns, protected Client(String name, AsyncGroup group, boolean tcp, ClientAddress address, int maxconns,
Supplier<R> closeRequestSupplier, Function<CompletableFuture<ClientConnection>, CompletableFuture<ClientConnection>> authenticate) { Supplier<R> closeRequestSupplier, Function<CompletableFuture<ClientConnection>, CompletableFuture<ClientConnection>> authenticate) {
this(group, tcp, address, maxconns, DEFAULT_MAX_PIPELINES, null, closeRequestSupplier, authenticate); this(name, group, tcp, address, maxconns, DEFAULT_MAX_PIPELINES, null, closeRequestSupplier, authenticate);
} }
@SuppressWarnings("OverridableMethodCallInConstructor") @SuppressWarnings("OverridableMethodCallInConstructor")
protected Client(AsyncGroup group, boolean tcp, ClientAddress address, int maxconns, protected Client(String name, AsyncGroup group, boolean tcp, ClientAddress address, int maxconns,
int maxPipelines, Supplier<R> pingRequestSupplier, Supplier<R> closeRequestSupplier, Function<CompletableFuture<ClientConnection>, CompletableFuture<ClientConnection>> authenticate) { int maxPipelines, Supplier<R> pingRequestSupplier, Supplier<R> closeRequestSupplier, Function<CompletableFuture<ClientConnection>, CompletableFuture<ClientConnection>> authenticate) {
if (maxPipelines < 1) { if (maxPipelines < 1) {
throw new IllegalArgumentException("maxPipelines must bigger 0"); throw new IllegalArgumentException("maxPipelines must bigger 0");
} }
address.checkValid(); address.checkValid();
this.name = name;
this.group = group; this.group = group;
this.tcp = tcp; this.tcp = tcp;
this.address = address; this.address = address;
@@ -128,7 +131,7 @@ public abstract class Client<R extends ClientRequest, P> {
} }
//timeoutScheduler 不仅仅给超时用, 还给write用 //timeoutScheduler 不仅仅给超时用, 还给write用
this.timeoutScheduler = new ScheduledThreadPoolExecutor(1, (Runnable r) -> { this.timeoutScheduler = new ScheduledThreadPoolExecutor(1, (Runnable r) -> {
final Thread t = new Thread(r, "Redkale-" + Client.this.getClass().getSimpleName() + "-Interval-Thread"); final Thread t = new Thread(r, "Redkale-" + Client.this.getClass().getSimpleName() + "-" + resourceName() + "-Timeout-Thread");
t.setDaemon(true); t.setDaemon(true);
return t; return t;
}); });
@@ -305,6 +308,15 @@ public abstract class Client<R extends ClientRequest, P> {
return s; return s;
} }
@Override
public String resourceName() {
return name;
}
public String getName() {
return name;
}
public int getReadTimeoutSeconds() { public int getReadTimeoutSeconds() {
return readTimeoutSeconds; return readTimeoutSeconds;
} }

View File

@@ -157,13 +157,13 @@ public final class MultiContext {
continue; //不遍历完后面getParameter可能获取不到值 continue; //不遍历完后面getParameter可能获取不到值
} }
has = true; has = true;
if (fileNameRegx != null && !fileNameRegx.isEmpty() && !part.getFileName().matches(fileNameRegx)) { if (fileNameRegx != null && !fileNameRegx.isEmpty() && !part.getFilename().matches(fileNameRegx)) {
continue; continue;
} }
if (contentTypeRegx != null && !contentTypeRegx.isEmpty() && !part.getContentType().matches(contentTypeRegx)) { if (contentTypeRegx != null && !contentTypeRegx.isEmpty() && !part.getContentType().matches(contentTypeRegx)) {
continue; continue;
} }
File file = new File(home, "tmp/redkale-" + System.nanoTime() + "_" + part.getFileName()); File file = new File(home, "tmp/redkale-" + System.nanoTime() + "_" + part.getFilename());
File parent = file.getParentFile(); File parent = file.getParentFile();
if (!parent.isDirectory()) { if (!parent.isDirectory()) {
parent.mkdirs(); parent.mkdirs();
@@ -197,13 +197,13 @@ public final class MultiContext {
} }
List<File> files = null; List<File> files = null;
for (MultiPart part : parts()) { for (MultiPart part : parts()) {
if (fileNameRegx != null && !fileNameRegx.isEmpty() && !part.getFileName().matches(fileNameRegx)) { if (fileNameRegx != null && !fileNameRegx.isEmpty() && !part.getFilename().matches(fileNameRegx)) {
continue; continue;
} }
if (contentTypeRegx != null && !contentTypeRegx.isEmpty() && !part.getContentType().matches(contentTypeRegx)) { if (contentTypeRegx != null && !contentTypeRegx.isEmpty() && !part.getContentType().matches(contentTypeRegx)) {
continue; continue;
} }
File file = new File(home, "tmp/redkale-" + System.nanoTime() + "_" + part.getFileName()); File file = new File(home, "tmp/redkale-" + System.nanoTime() + "_" + part.getFilename());
File parent = file.getParentFile(); File parent = file.getParentFile();
if (!parent.isDirectory()) { if (!parent.isDirectory()) {
parent.mkdirs(); parent.mkdirs();

View File

@@ -17,7 +17,7 @@ import java.util.concurrent.atomic.LongAdder;
*/ */
public final class MultiPart { public final class MultiPart {
private final String fileName; private final String filename;
private final String name; private final String name;
@@ -28,7 +28,7 @@ public final class MultiPart {
private final LongAdder received; private final LongAdder received;
MultiPart(String fileName, String name, String contentType, LongAdder received, InputStream in) { MultiPart(String fileName, String name, String contentType, LongAdder received, InputStream in) {
this.fileName = fileName; this.filename = fileName;
this.name = name; this.name = name;
this.in = in; this.in = in;
this.contentType = contentType; this.contentType = contentType;
@@ -37,7 +37,7 @@ public final class MultiPart {
@Override @Override
public String toString() { public String toString() {
return this.getClass().getSimpleName() + "{" + "name=" + name + ", fileName=" + fileName + ", contentType=" + contentType + ", received=" + received + '}'; return this.getClass().getSimpleName() + "{" + "name=" + name + ", filename=" + filename + ", contentType=" + contentType + ", received=" + received + '}';
} }
public boolean save(File file) throws IOException { public boolean save(File file) throws IOException {
@@ -99,13 +99,8 @@ public final class MultiPart {
return contentType; return contentType;
} }
@Deprecated(since = "2.8.0")
public String getFilename() { public String getFilename() {
return fileName; return filename;
}
public String getFileName() {
return fileName;
} }
public String getName() { public String getName() {

View File

@@ -121,7 +121,7 @@ public class WebSocketEngine {
return; return;
} }
this.scheduler = new ScheduledThreadPoolExecutor(1, (Runnable r) -> { this.scheduler = new ScheduledThreadPoolExecutor(1, (Runnable r) -> {
final Thread t = new Thread(r, "Redkale-" + engineid + "-WebSocket-LiveInterval-Thread"); final Thread t = new Thread(r, "Redkale-WebSocket-" + engineid + "-LiveInterval-Thread");
t.setDaemon(true); t.setDaemon(true);
return t; return t;
}); });

View File

@@ -9,7 +9,6 @@ import java.io.Serializable;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.function.*; import java.util.function.*;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.redkale.annotation.AutoLoad; import org.redkale.annotation.AutoLoad;
@@ -282,19 +281,10 @@ public abstract class AbstractDataSource extends AbstractService implements Data
if (executor == null) { if (executor == null) {
synchronized (executorLock) { synchronized (executorLock) {
if (this.sourceExecutor == null) { if (this.sourceExecutor == null) {
final AtomicReference<ExecutorService> ref = new AtomicReference<>(); this.sourceExecutor = WorkThread.createExecutor(sourceThreads, "Redkale-DataSource-WorkThread-" + resourceName() + "-%s");
final AtomicInteger counter = new AtomicInteger();
final int threads = sourceThreads;
executor = Executors.newFixedThreadPool(threads, (Runnable r) -> {
int i = counter.get();
int c = counter.incrementAndGet();
String threadName = "Redkale-DataSource-WorkThread-" + (c > 9 ? c : ("0" + c));
Thread t = new WorkThread(threadName, i, threads, ref.get(), r);
return t;
});
this.sourceExecutor = executor;
} }
} }
executor = this.sourceExecutor;
} }
return executor; return executor;
} }

View File

@@ -106,7 +106,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
} }
if (scheduler == null) { if (scheduler == null) {
this.scheduler = new ScheduledThreadPoolExecutor(1, (Runnable r) -> { this.scheduler = new ScheduledThreadPoolExecutor(1, (Runnable r) -> {
final Thread t = new Thread(r, "Redkale-" + self.getClass().getSimpleName() + "-Expirer-Thread"); final Thread t = new Thread(r, "Redkale-" + self.getClass().getSimpleName() + "-" + resourceName() + "-Expirer-Thread");
t.setDaemon(true); t.setDaemon(true);
return t; return t;
}); });

View File

@@ -152,7 +152,7 @@ public final class EntityCache<T> {
} }
if (this.interval > 0 && this.scheduler == null && info.fullloader != null) { if (this.interval > 0 && this.scheduler == null && info.fullloader != null) {
this.scheduler = new ScheduledThreadPoolExecutor(1, (Runnable r) -> { this.scheduler = new ScheduledThreadPoolExecutor(1, (Runnable r) -> {
final Thread t = new Thread(r, "Redkale-EntityCache-" + type + "-Thread"); final Thread t = new Thread(r, "Redkale-EntityCache-" + type.getSimpleName() + "-Thread");
t.setDaemon(true); t.setDaemon(true);
return t; return t;
}); });

View File

@@ -426,6 +426,44 @@ public final class Utility {
}); });
} }
/**
* 将字符串首字母大写
*
* @param str 字符串
*
* @return 首字母大写
*/
public static String firstCharUpperCase(String str) {
if (str == null || str.isEmpty()) {
return str;
}
if (Character.isUpperCase(str.charAt(0))) {
return str;
}
char[] chs = str.toCharArray();
chs[0] = Character.toUpperCase(chs[0]);
return new String(chs);
}
/**
* 将字符串首字母小写
*
* @param str 字符串
*
* @return 首字母小写
*/
public static String firstCharLowerCase(String str) {
if (str == null || str.isEmpty()) {
return str;
}
if (Character.isLowerCase(str.charAt(0))) {
return str;
}
char[] chs = str.toCharArray();
chs[0] = Character.toLowerCase(chs[0]);
return new String(chs);
}
/** /**
* 将多个key:value的字符串键值对组合成一个Mapitems长度必须是偶数, 参数个数若是奇数的话,最后一个会被忽略 * 将多个key:value的字符串键值对组合成一个Mapitems长度必须是偶数, 参数个数若是奇数的话,最后一个会被忽略
* 类似 JDK9中的 Map.of 方法 * 类似 JDK9中的 Map.of 方法