diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index 3b9a169fa..70545a2b0 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -16,8 +16,6 @@ import javax.net.ssl.SSLEngineResult.HandshakeStatus; import static javax.net.ssl.SSLEngineResult.HandshakeStatus.*; import static javax.net.ssl.SSLEngineResult.Status.*; import javax.net.ssl.*; -import static javax.net.ssl.SSLEngineResult.HandshakeStatus.*; -import static javax.net.ssl.SSLEngineResult.Status.*; import org.redkale.util.*; /** diff --git a/src/main/java/org/redkale/net/ClientIOThread.java b/src/main/java/org/redkale/net/ClientIOThread.java index bf8f0f9c1..34d077cfb 100644 --- a/src/main/java/org/redkale/net/ClientIOThread.java +++ b/src/main/java/org/redkale/net/ClientIOThread.java @@ -25,8 +25,4 @@ public class ClientIOThread extends AsyncIOThread { super(name, index, threads, workExecutor, selector, unsafeBufferPool, safeBufferPool); } - @Override - public final boolean inClient() { - return true; - } } diff --git a/src/main/java/org/redkale/net/WorkThread.java b/src/main/java/org/redkale/net/WorkThread.java index 853b0c19f..b0c6a6112 100644 --- a/src/main/java/org/redkale/net/WorkThread.java +++ b/src/main/java/org/redkale/net/WorkThread.java @@ -80,6 +80,14 @@ public class WorkThread extends Thread implements Executor { } } + public void runWork(Runnable command) { + if (workExecutor == null) { + command.run(); + } else { + workExecutor.execute(command); + } + } + public void runAsync(Runnable command) { if (workExecutor == null) { ForkJoinPool.commonPool().execute(command); @@ -114,16 +122,6 @@ public class WorkThread extends Thread implements Executor { return false; } - /** - * 是否客户端的IO线程 - * - * @since 2.8.0 - * @return boolean - */ - public boolean inClient() { - return false; - } - public boolean inCurrThread() { return this == Thread.currentThread(); } diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index dc18ae821..db02fcac8 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -203,28 +203,54 @@ public abstract class ClientConnection implements Co workThread = request.workThread; request.workThread = null; } +// if (rs.exc != null) { +// if (workThread == null || workThread == Thread.currentThread() || workThread.inIO() +// || workThread.getState() != Thread.State.RUNNABLE) { +// if (request != null) { +// Traces.currTraceid(request.traceid); +// } +// respFuture.completeExceptionally(rs.exc); +// } else { +// workThread.execute(() -> { +// if (request != null) { +// Traces.currTraceid(request.traceid); +// } +// respFuture.completeExceptionally(rs.exc); +// }); +// } +// } else { +// if (workThread == null || workThread == Thread.currentThread() || workThread.inIO() +// || workThread.getState() != Thread.State.RUNNABLE) { +// if (request != null) { +// Traces.currTraceid(request.traceid); +// } +// respFuture.complete(rs.result); +// } else { +// workThread.execute(() -> { +// if (request != null) { +// Traces.currTraceid(request.traceid); +// } +// respFuture.complete(rs.result); +// }); +// } +// } + if (workThread == null || workThread.getWorkExecutor() == null) { + workThread = channel.getAsyncIOThread(); + } if (rs.exc != null) { - if (workThread == null || workThread == Thread.currentThread() || workThread.inIO() - || workThread.getState() != Thread.State.RUNNABLE) { - Traces.currTraceid(request.traceid); + workThread.runWork(() -> { + if (request != null) { + Traces.currTraceid(request.traceid); + } respFuture.completeExceptionally(rs.exc); - } else { - workThread.execute(() -> { - Traces.currTraceid(request.traceid); - respFuture.completeExceptionally(rs.exc); - }); - } + }); } else { - if (workThread == null || workThread == Thread.currentThread() || workThread.inIO() - || workThread.getState() != Thread.State.RUNNABLE) { - Traces.currTraceid(request.traceid); - respFuture.complete(rs.result); - } else { - workThread.execute(() -> { + workThread.runWork(() -> { + if (request != null) { Traces.currTraceid(request.traceid); - respFuture.complete(rs.result); - }); - } + } + respFuture.complete(rs.result); + }); } } catch (Throwable t) { client.logger.log(Level.INFO, "Complete result error, request: " + respFuture.request, t); diff --git a/src/main/java/org/redkale/net/client/ClientFuture.java b/src/main/java/org/redkale/net/client/ClientFuture.java index 587a19870..94dc89164 100644 --- a/src/main/java/org/redkale/net/client/ClientFuture.java +++ b/src/main/java/org/redkale/net/client/ClientFuture.java @@ -82,11 +82,15 @@ public class ClientFuture extends CompletableFuture implements Runnable { workThread = request.workThread; request.workThread = null; } - if (workThread == null || workThread == Thread.currentThread() || workThread.inIO() - || workThread.getState() != Thread.State.RUNNABLE) { - this.completeExceptionally(ex); - } else { - workThread.execute(() -> completeExceptionally(ex)); +// if (workThread == null || workThread == Thread.currentThread() || workThread.inIO() +// || workThread.getState() != Thread.State.RUNNABLE) { +// this.completeExceptionally(ex); +// } else { +// workThread.execute(() -> completeExceptionally(ex)); +// } + if (workThread == null || workThread.getWorkExecutor() == null) { + workThread = conn.getChannel().getAsyncIOThread(); } + workThread.runWork(() -> completeExceptionally(ex)); } }