From f38adacbe4560a57a672126d0e3fdabfcd2c69fc Mon Sep 17 00:00:00 2001 From: redkale Date: Fri, 30 Jun 2023 10:18:00 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96net?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/org/redkale/net/WorkThread.java | 14 ++++--- .../org/redkale/net/client/ClientCodec.java | 38 +++---------------- 2 files changed, 14 insertions(+), 38 deletions(-) diff --git a/src/main/java/org/redkale/net/WorkThread.java b/src/main/java/org/redkale/net/WorkThread.java index 76fd36849..6713de812 100644 --- a/src/main/java/org/redkale/net/WorkThread.java +++ b/src/main/java/org/redkale/net/WorkThread.java @@ -86,11 +86,6 @@ public class WorkThread extends Thread implements Executor { } } - //与execute的区别在于子类AsyncIOThread中execute会被重载,确保在IO线程中执行 - public final void runWork(Runnable command) { - execute(command); - } - public void execute(Runnable... commands) { if (workExecutor == null) { for (Runnable command : commands) { @@ -118,6 +113,15 @@ public class WorkThread extends Thread implements Executor { } } + //与execute的区别在于子类AsyncIOThread中execute会被重载,确保在IO线程中执行 + public final void runWork(Runnable command) { + if (workExecutor == null) { + command.run(); + } else { + workExecutor.execute(command); + } + } + public ExecutorService getWorkExecutor() { return workExecutor; } diff --git a/src/main/java/org/redkale/net/client/ClientCodec.java b/src/main/java/org/redkale/net/client/ClientCodec.java index a0a75d72d..1c248e0e4 100644 --- a/src/main/java/org/redkale/net/client/ClientCodec.java +++ b/src/main/java/org/redkale/net/client/ClientCodec.java @@ -123,21 +123,11 @@ public abstract class ClientCodec implements Complet connection.preComplete(message, (R) request, exc); if (exc != null) { - if (workThread == readThread) { - workThread.runWork(() -> { + if (workThread.inIO() && workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE + workThread.execute(() -> { Traces.currentTraceid(request.traceid); respFuture.completeExceptionally(exc); }); - } else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE - if (workThread.inIO()) { - Traces.currentTraceid(request.traceid); - respFuture.completeExceptionally(exc); - } else { - workThread.execute(() -> { - Traces.currentTraceid(request.traceid); - respFuture.completeExceptionally(exc); - }); - } } else { workThread.runWork(() -> { Traces.currentTraceid(request.traceid); @@ -146,21 +136,11 @@ public abstract class ClientCodec implements Complet } } else { final P rs = request.respTransfer == null ? message : (P) request.respTransfer.apply(message); - if (workThread == readThread) { - workThread.runWork(() -> { + if (workThread.inIO() && workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE + workThread.execute(() -> { Traces.currentTraceid(request.traceid); respFuture.complete(rs); }); - } else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE - if (workThread.inIO()) { - Traces.currentTraceid(request.traceid); - respFuture.complete(rs); - } else { - workThread.execute(() -> { - Traces.currentTraceid(request.traceid); - respFuture.complete(rs); - }); - } } else { workThread.runWork(() -> { Traces.currentTraceid(request.traceid); @@ -169,15 +149,7 @@ public abstract class ClientCodec implements Complet } } } catch (Throwable t) { - if (workThread == readThread) { - workThread.runWork(() -> { - Traces.currentTraceid(request.traceid); - respFuture.completeExceptionally(t); - }); - } else if (workThread.inIO()) { - Traces.currentTraceid(request.traceid); - respFuture.completeExceptionally(t); - } else if (workThread.getState() == Thread.State.RUNNABLE) { + if (workThread.inIO() && workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE workThread.execute(() -> { Traces.currentTraceid(request.traceid); respFuture.completeExceptionally(t);