diff --git a/src/main/java/org/redkale/net/AsyncGroup.java b/src/main/java/org/redkale/net/AsyncGroup.java index 17b858f68..835cc5807 100644 --- a/src/main/java/org/redkale/net/AsyncGroup.java +++ b/src/main/java/org/redkale/net/AsyncGroup.java @@ -31,14 +31,6 @@ public abstract class AsyncGroup { return new AsyncIOGroup(threadNameFormat, workExecutor, safeBufferPool); } - public static AsyncGroup create(String threadNameFormat, int threads, ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { - return new AsyncIOGroup(threadNameFormat, threads, workExecutor, bufferCapacity, bufferPoolSize); - } - - public static AsyncGroup create(String threadNameFormat, int threads, ExecutorService workExecutor, ByteBufferPool safeBufferPool) { - return new AsyncIOGroup(threadNameFormat, threads, workExecutor, safeBufferPool); - } - public CompletableFuture createTCPClient(final SocketAddress address) { return createTCPClient(address, 0, 0); } diff --git a/src/main/java/org/redkale/net/AsyncIOGroup.java b/src/main/java/org/redkale/net/AsyncIOGroup.java index cf9f82681..9e90a71f6 100644 --- a/src/main/java/org/redkale/net/AsyncIOGroup.java +++ b/src/main/java/org/redkale/net/AsyncIOGroup.java @@ -57,41 +57,33 @@ public class AsyncIOGroup extends AsyncGroup { //关闭数 protected final LongAdder connClosedCounter = new LongAdder(); - protected final ScheduledThreadPoolExecutor timeoutExecutor; + //超时器 + protected final ScheduledExecutorService timeoutExecutor; public AsyncIOGroup(final int bufferCapacity, final int bufferPoolSize) { - this("Redkale-AnonymousClient-IOThread-%s", Utility.cpus(), null, bufferCapacity, bufferPoolSize); + this("Redkale-AnonymousClient-IOThread-%s", null, bufferCapacity, bufferPoolSize); } public AsyncIOGroup(String threadNameFormat, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { - this(threadNameFormat, Utility.cpus(), workExecutor, bufferCapacity, bufferPoolSize); - } - - public AsyncIOGroup(String threadNameFormat, int threads, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { - this(threadNameFormat, threads, workExecutor, ByteBufferPool.createSafePool(bufferPoolSize, bufferCapacity)); + this(threadNameFormat, workExecutor, ByteBufferPool.createSafePool(bufferPoolSize, bufferCapacity)); } @SuppressWarnings("OverridableMethodCallInConstructor") public AsyncIOGroup(String threadNameFormat, ExecutorService workExecutor, final ByteBufferPool safeBufferPool) { - this(threadNameFormat, Utility.cpus(), workExecutor, safeBufferPool); - } - - @SuppressWarnings("OverridableMethodCallInConstructor") - public AsyncIOGroup(String threadNameFormat, int threads, ExecutorService workExecutor, final ByteBufferPool safeBufferPool) { + final int threads = Utility.cpus(); //固定值,不可改 this.bufferCapacity = safeBufferPool.getBufferCapacity(); this.ioReadThreads = new AsyncIOThread[threads]; this.ioWriteThreads = new AsyncIOThread[threads]; final ThreadGroup g = new ThreadGroup(String.format(threadNameFormat, "Group")); - - this.timeoutExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, (Runnable r) -> { + this.timeoutExecutor = Executors.newScheduledThreadPool(1, (Runnable r) -> { Thread t = new Thread(r, String.format(threadNameFormat, "Timeout")); t.setDaemon(true); return t; }); try { for (int i = 0; i < threads; i++) { - String indexfix = WorkThread.formatIndex(threads, i + 1); - this.ioReadThreads[i] = createAsyncIOThread(g, String.format(threadNameFormat, indexfix), i, threads, workExecutor, safeBufferPool); + String indexFix = WorkThread.formatIndex(threads, i + 1); + this.ioReadThreads[i] = createAsyncIOThread(g, String.format(threadNameFormat, indexFix), i, threads, workExecutor, safeBufferPool); this.ioWriteThreads[i] = this.ioReadThreads[i]; } this.connectThread = createConnectIOThread(g, String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, safeBufferPool); diff --git a/src/main/java/org/redkale/net/client/Client.java b/src/main/java/org/redkale/net/client/Client.java index b2f5c0df0..948b1e6d7 100644 --- a/src/main/java/org/redkale/net/client/Client.java +++ b/src/main/java/org/redkale/net/client/Client.java @@ -30,6 +30,8 @@ public abstract class Client, R extends ClientR public static final int DEFAULT_MAX_PIPELINES = 128; + protected boolean debug; + protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); protected final String name; diff --git a/src/main/java/org/redkale/net/client/ClientCodec.java b/src/main/java/org/redkale/net/client/ClientCodec.java index ae3cc6491..5d0f94ebf 100644 --- a/src/main/java/org/redkale/net/client/ClientCodec.java +++ b/src/main/java/org/redkale/net/client/ClientCodec.java @@ -99,39 +99,37 @@ public abstract class ClientCodec implements Complet void responseComplete(boolean halfCompleted, ClientFuture respFuture, P message, Throwable exc) { if (respFuture != null) { R request = respFuture.request; - WorkThread workThread = null; + AsyncIOThread readThread = connection.channel.getReadIOThread(); + final WorkThread workThread = request == null ? readThread : request.removeWorkThread(readThread); try { if (!halfCompleted && request != null && !request.isCompleted()) { if (exc == null) { connection.sendHalfWrite(request, exc); //request没有发送完,respFuture需要再次接收 return; - } else { //异常了需要清掉半包 + } else { connection.sendHalfWrite(request, exc); + //异常了需要清掉半包 } } - if (request != null) { - workThread = request.workThread; - request.workThread = null; - } connection.respWaitingCounter.decrement(); if (connection.isAuthenticated()) { connection.client.incrRespDoneCounter(); } respFuture.cancelTimeout(); - //if (client.finest) client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + ClientConnection.this + ", 回调处理, req=" + request + ", message=" + rs.message); +// if (connection.client.debug) { +// connection.client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + connection + ", 回调处理, req=" + request + ", message=" + message, exc); +// } connection.preComplete(message, (R) request, exc); - boolean reqInIO = workThread != null && workThread.inIO(); - if (workThread == null || workThread.getWorkExecutor() == null) { - workThread = connection.channel.getReadIOThread(); - } if (exc != null) { - if (reqInIO) { //request在IO线程中发送请求,说明request是在异步模式中 - if (request != null) { - Traces.currTraceid(request.traceid); - } - respFuture.completeExceptionally(exc); + if (workThread.inIO()) { + workThread.execute(() -> { + if (request != null) { + Traces.currTraceid(request.traceid); + } + respFuture.completeExceptionally(exc); + }); } else { workThread.runWork(() -> { if (request != null) { @@ -142,11 +140,13 @@ public abstract class ClientCodec implements Complet } } else { final Object rs = request == null || request.respTransfer == null ? message : request.respTransfer.apply(message); - if (reqInIO) { //request在IO线程中发送请求,说明request是在异步模式中 - if (request != null) { - Traces.currTraceid(request.traceid); - } - ((ClientFuture) respFuture).complete(rs); + if (workThread.inIO()) { + workThread.execute(() -> { + if (request != null) { + Traces.currTraceid(request.traceid); + } + ((ClientFuture) respFuture).complete(rs); + }); } else { workThread.runWork(() -> { if (request != null) { @@ -157,11 +157,13 @@ public abstract class ClientCodec implements Complet } } } catch (Throwable t) { - if (workThread == null || workThread.inIO()) { - if (request != null) { - Traces.currTraceid(request.traceid); - } - respFuture.completeExceptionally(t); + if (workThread.inIO()) { + workThread.execute(() -> { + if (request != null) { + Traces.currTraceid(request.traceid); + } + respFuture.completeExceptionally(t); + }); } else { workThread.runWork(() -> { if (request != null) { diff --git a/src/main/java/org/redkale/net/client/ClientRequest.java b/src/main/java/org/redkale/net/client/ClientRequest.java index 46e118de2..5dd40a0ae 100644 --- a/src/main/java/org/redkale/net/client/ClientRequest.java +++ b/src/main/java/org/redkale/net/client/ClientRequest.java @@ -32,6 +32,12 @@ public abstract class ClientRequest { public abstract void writeTo(ClientConnection conn, ByteArray array); + WorkThread removeWorkThread(WorkThread defaultValue) { + WorkThread t = this.workThread; + this.workThread = null; + return t == null ? defaultValue : t; + } + public Serializable getRequestid() { return null; } diff --git a/src/main/java/org/redkale/net/http/WebSocketWriteIOThread.java b/src/main/java/org/redkale/net/http/WebSocketWriteIOThread.java index 76077061c..265750ce9 100644 --- a/src/main/java/org/redkale/net/http/WebSocketWriteIOThread.java +++ b/src/main/java/org/redkale/net/http/WebSocketWriteIOThread.java @@ -24,11 +24,11 @@ import org.redkale.util.*; @Deprecated(since = "2.8.0") public class WebSocketWriteIOThread extends AsyncIOThread { - private final ScheduledThreadPoolExecutor timeoutExecutor; + private final ScheduledExecutorService timeoutExecutor; private final BlockingDeque requestQueue = new LinkedBlockingDeque<>(); - public WebSocketWriteIOThread(ScheduledThreadPoolExecutor timeoutExecutor, ThreadGroup g, String name, int index, int threads, + public WebSocketWriteIOThread(ScheduledExecutorService timeoutExecutor, ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ByteBufferPool safeBufferPool) throws IOException { super(g, name, index, threads, workExecutor, safeBufferPool); Objects.requireNonNull(timeoutExecutor);