废弃ThreadHashExecutor

This commit is contained in:
redkale
2023-04-20 19:56:02 +08:00
parent 8db6fd7b73
commit 82fa56d66c
12 changed files with 16 additions and 146 deletions

View File

@@ -38,9 +38,8 @@
【节点全局唯一】 @since 2.3.0
全局Serivce执行的线程池 Application.workExecutor, 没配置该节点将自动创建一个。
threads 线程数为0表示不启用workExecutor只用IO线程。默认: CPU核数, 核数=1的情况下默认值为2
hash: 是否使用ThreadHashExecutor作为线程池默认值为false
-->
<executor threads="4" hash="false"/>
<executor threads="4"/>
<!--
【节点全局唯一】

View File

@@ -560,14 +560,9 @@ public final class Application {
executorConf = DefaultAnyValue.create();
}
final int workThreads = executorConf.getIntValue("threads", Utility.cpus() * 4);
boolean workHash = executorConf.getBoolValue("hash", false);
if (workThreads > 0) {
if (workHash) {
workExecutor0 = WorkThread.createHashExecutor(workThreads, "Redkale-HashWorkThread-%s");
} else {
//指定threads则不使用虚拟线程池
workExecutor0 = executorConf.getValue("threads") != null ? WorkThread.createExecutor(workThreads, "Redkale-WorkThread-%s") : WorkThread.createWorkExecutor(workThreads, "Redkale-WorkThread-%s");
}
//指定threads则不使用虚拟线程池
workExecutor0 = executorConf.getValue("threads") != null ? WorkThread.createExecutor(workThreads, "Redkale-WorkThread-%s") : WorkThread.createWorkExecutor(workThreads, "Redkale-WorkThread-%s");
}
this.workExecutor = workExecutor0;
this.resourceFactory.register(RESNAME_APP_EXECUTOR, Executor.class, this.workExecutor);

View File

@@ -30,8 +30,6 @@ public class Context {
//Server的线程池
protected final ExecutorService workExecutor;
protected final ThreadHashExecutor workHashExecutor;
//SSL
protected final SSLBuilder sslBuilder;
@@ -103,11 +101,6 @@ public class Context {
this.writeTimeoutSeconds = writeTimeoutSeconds;
this.jsonFactory = JsonFactory.root();
this.bsonFactory = BsonFactory.root();
if (workExecutor instanceof ThreadHashExecutor) {
this.workHashExecutor = (ThreadHashExecutor) workExecutor;
} else {
this.workHashExecutor = null;
}
}
protected void executeDispatch(Request request, Response response) {
@@ -115,24 +108,7 @@ public class Context {
}
public void execute(Servlet servlet, Request request, Response response) {
if (workHashExecutor != null) {
response.updateNonBlocking(false);
workHashExecutor.execute(request.getHashid(), () -> {
try {
long cha = System.currentTimeMillis() - request.getCreateTime();
Traces.computeCurrTraceid(request.getTraceid());
servlet.execute(request, response);
if (cha > 1000 && response.context.logger.isLoggable(Level.WARNING)) {
response.context.logger.log(Level.WARNING, "hash execute servlet delays=" + cha + "ms, request=" + request);
} else if (cha > 100 && response.context.logger.isLoggable(Level.FINE)) {
response.context.logger.log(Level.FINE, "hash execute servlet delay=" + cha + "ms, request=" + request);
}
} catch (Throwable t) {
response.context.logger.log(Level.WARNING, "Execute servlet occur exception. request = " + request, t);
response.finishError(t);
}
});
} else if (workExecutor != null && response.inNonBlocking() && !servlet.isNonBlocking()) {
if (workExecutor != null && response.inNonBlocking() && !servlet.isNonBlocking()) {
response.updateNonBlocking(false);
workExecutor.execute(() -> {
try {

View File

@@ -34,8 +34,6 @@ public abstract class Response<C extends Context, R extends Request<C>> {
protected final ExecutorService workExecutor;
protected final ThreadHashExecutor workHashExecutor;
protected final R request;
protected final WorkThread thread;
@@ -124,7 +122,6 @@ public abstract class Response<C extends Context, R extends Request<C>> {
this.thread = WorkThread.currWorkThread();
this.writeBuffer = context != null ? ByteBuffer.allocateDirect(context.getBufferCapacity()) : null;
this.workExecutor = context == null || context.workExecutor == null ? ForkJoinPool.commonPool() : context.workExecutor;
this.workHashExecutor = context == null ? null : context.workHashExecutor;
}
protected AsyncConnection removeChannel() {
@@ -165,10 +162,6 @@ public abstract class Response<C extends Context, R extends Request<C>> {
return workExecutor;
}
protected ThreadHashExecutor getWorkHashExecutor() {
return workHashExecutor;
}
protected void updateNonBlocking(boolean nonBlocking) {
this.inNonBlocking = nonBlocking;
}

View File

@@ -318,8 +318,6 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
int workThreads = 0;
if (workExecutor instanceof ThreadPoolExecutor) {
workThreads = ((ThreadPoolExecutor) workExecutor).getCorePoolSize();
} else if (workExecutor instanceof ThreadHashExecutor) {
workThreads = ((ThreadHashExecutor) workExecutor).getCorePoolSize();
} else if (workExecutor != null) { //virtual thread pool
workThreads = -1;
}

View File

@@ -9,7 +9,7 @@ import java.util.Collection;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.function.Function;
import org.redkale.util.*;
import org.redkale.util.Utility;
/**
* 协议处理的自定义线程类
@@ -23,8 +23,6 @@ public class WorkThread extends Thread implements Executor {
protected final ExecutorService workExecutor;
protected final ThreadHashExecutor hashExecutor;
private final int index; //WorkThread下标从0开始
private final int threads; //WorkThread个数
@@ -37,7 +35,6 @@ public class WorkThread extends Thread implements Executor {
this.index = index;
this.threads = threads;
this.workExecutor = workExecutor;
this.hashExecutor = workExecutor instanceof ThreadHashExecutor ? (ThreadHashExecutor) workExecutor : null;
this.setDaemon(true);
}
@@ -46,19 +43,6 @@ public class WorkThread extends Thread implements Executor {
return t instanceof WorkThread ? (WorkThread) t : null;
}
public static ExecutorService createHashExecutor(final int threads, final String threadNameFormat) {
final AtomicReference<ExecutorService> ref = new AtomicReference<>();
final AtomicInteger counter = new AtomicInteger();
final ThreadGroup g = new ThreadGroup(String.format(threadNameFormat, "Group"));
return new ThreadHashExecutor(threads, (Runnable r) -> {
int i = counter.get();
int c = counter.incrementAndGet();
String threadName = String.format(threadNameFormat, formatIndex(threads, c));
Thread t = new WorkThread(g, threadName, i, threads, ref.get(), r);
return t;
});
}
public static ExecutorService createWorkExecutor(final int threads, final String threadNameFormat) {
final Function<String, ExecutorService> func = Utility.virtualExecutorFunction();
return func == null ? createExecutor(threads, threadNameFormat) : func.apply(threadNameFormat);
@@ -137,30 +121,6 @@ public class WorkThread extends Thread implements Executor {
}
}
public void runWork(int hash, Runnable command) {
if (hashExecutor == null) {
if (workExecutor == null) {
command.run();
} else {
workExecutor.execute(command);
}
} else {
hashExecutor.execute(hash, command);
}
}
public void runWork(java.io.Serializable hash, Runnable command) {
if (hashExecutor == null) {
if (workExecutor == null) {
command.run();
} else {
workExecutor.execute(command);
}
} else {
hashExecutor.execute(hash, command);
}
}
public void runAsync(Runnable command) {
if (workExecutor == null) {
ForkJoinPool.commonPool().execute(command);
@@ -169,18 +129,6 @@ public class WorkThread extends Thread implements Executor {
}
}
public void runAsync(int hash, Runnable command) {
if (hashExecutor == null) {
if (workExecutor == null) {
ForkJoinPool.commonPool().execute(command);
} else {
workExecutor.execute(command);
}
} else {
hashExecutor.execute(hash, command);
}
}
public ExecutorService getWorkExecutor() {
return workExecutor;
}

View File

@@ -241,11 +241,6 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
return super.getWorkExecutor();
}
@Override
protected ThreadHashExecutor getWorkHashExecutor() {
return super.getWorkHashExecutor();
}
@Override
protected void updateNonBlocking(boolean nonBlocking) {
super.updateNonBlocking(nonBlocking);

View File

@@ -316,7 +316,7 @@ public class WebSocketReadHandler implements CompletionHandler<Integer, ByteBuff
//消息处理
for (final WebSocketPacket packet : currPackets) {
if (packet.type == FrameType.TEXT) {
ioReadThread.runWork(webSocket._userid, () -> {
ioReadThread.runWork(() -> {
try {
Convert convert = webSocket.getTextConvert();
if (restMessageConsumer != null && convert != null) { //主要供RestWebSocket使用
@@ -329,7 +329,7 @@ public class WebSocketReadHandler implements CompletionHandler<Integer, ByteBuff
}
});
} else if (packet.type == FrameType.BINARY) {
ioReadThread.runWork(webSocket._userid, () -> {
ioReadThread.runWork(() -> {
try {
Convert convert = webSocket.getBinaryConvert();
if (restMessageConsumer != null && convert != null) { //主要供RestWebSocket使用
@@ -342,7 +342,7 @@ public class WebSocketReadHandler implements CompletionHandler<Integer, ByteBuff
}
});
} else if (packet.type == FrameType.PING) {
ioReadThread.runWork(webSocket._userid, () -> {
ioReadThread.runWork(() -> {
try {
webSocket.onPing(packet.getPayload());
} catch (Exception e) {
@@ -350,7 +350,7 @@ public class WebSocketReadHandler implements CompletionHandler<Integer, ByteBuff
}
});
} else if (packet.type == FrameType.PONG) {
ioReadThread.runWork(webSocket._userid, () -> {
ioReadThread.runWork(() -> {
try {
//if (debug) logger.log(Level.FINEST, "WebSocket onMessage by PONG FrameType : " + packet);
webSocket.onPong(packet.getPayload());

View File

@@ -11,7 +11,7 @@ import java.util.concurrent.*;
import org.redkale.convert.bson.BsonWriter;
import org.redkale.net.Response;
import static org.redkale.net.sncp.SncpHeader.HEADER_SIZE;
import org.redkale.util.*;
import org.redkale.util.ByteArray;
/**
*
@@ -111,11 +111,6 @@ public class SncpResponse extends Response<SncpContext, SncpRequest> {
return super.getWorkExecutor();
}
@Override
protected ThreadHashExecutor getWorkHashExecutor() {
return super.getWorkHashExecutor();
}
@Override
protected void updateNonBlocking(boolean nonBlocking) {
super.updateNonBlocking(nonBlocking);

View File

@@ -6,10 +6,10 @@
package org.redkale.service;
import java.util.concurrent.*;
import org.redkale.annotation.*;
import org.redkale.boot.*;
import org.redkale.net.*;
import org.redkale.util.*;
import org.redkale.annotation.Resource;
import org.redkale.boot.Application;
import org.redkale.net.WorkThread;
import org.redkale.util.ResourceFactory;
/**
*
@@ -55,35 +55,6 @@ public abstract class AbstractService implements Service {
}
}
/**
* 异步执行任务
*
* @param hash hash值
* @param command 任务
*/
protected void runAsync(int hash, Runnable command) {
ExecutorService executor = this.workExecutor;
if (executor != null) {
if (executor instanceof ThreadHashExecutor) {
((ThreadHashExecutor) executor).execute(hash, command);
} else {
Thread thread = Thread.currentThread();
if (thread instanceof WorkThread) {
((WorkThread) thread).runAsync(hash, command);
} else {
executor.execute(command);
}
}
} else {
Thread thread = Thread.currentThread();
if (thread instanceof WorkThread) {
((WorkThread) thread).runAsync(hash, command);
} else {
ForkJoinPool.commonPool().execute(command);
}
}
}
/**
* 获取线程池
*

View File

@@ -2710,8 +2710,6 @@ public class DataJdbcSource extends AbstractDataSqlSource {
int defMaxConns = Utility.cpus() * 4;
if (workExecutor instanceof ThreadPoolExecutor) {
defMaxConns = ((ThreadPoolExecutor) workExecutor).getCorePoolSize();
} else if (workExecutor instanceof ThreadHashExecutor) {
defMaxConns = ((ThreadHashExecutor) workExecutor).getCorePoolSize();
} else if (workExecutor != null) { //maybe virtual thread pool
defMaxConns = Math.min(1024, Utility.cpus() * 100);
}

View File

@@ -16,9 +16,11 @@ import java.util.concurrent.atomic.AtomicInteger;
* 详情见: https://redkale.org
*
* @author zhangjx
* @deprecated
*
* @since 2.1.0
*/
@Deprecated(since = "2.8.0")
public class ThreadHashExecutor extends AbstractExecutorService {
private final LinkedBlockingQueue<Runnable>[] queues;