diff --git a/src/main/java/org/redkale/net/client/ClientCodec.java b/src/main/java/org/redkale/net/client/ClientCodec.java index 481476ce5..6cdfaf266 100644 --- a/src/main/java/org/redkale/net/client/ClientCodec.java +++ b/src/main/java/org/redkale/net/client/ClientCodec.java @@ -100,7 +100,7 @@ public abstract class ClientCodec implements Complet if (respFuture != null) { R request = respFuture.request; AsyncIOThread readThread = connection.channel.getReadIOThread(); - final WorkThread workThread = request.removeWorkThread(readThread); + final WorkThread workThread = request.workThread; try { if (!halfCompleted && !request.isCompleted()) { if (exc == null) { @@ -122,26 +122,14 @@ public abstract class ClientCodec implements Complet // } connection.preComplete(message, (R) request, exc); - if (exc != null) { - if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE - if (workThread.inIO()) { - Traces.computeIfAbsent(request.traceid); - respFuture.completeExceptionally(exc); - } else { - workThread.execute(() -> { - Traces.computeIfAbsent(request.traceid); - respFuture.completeExceptionally(exc); - }); - } - } else { - workThread.runWork(() -> { - Traces.computeIfAbsent(request.traceid); - respFuture.completeExceptionally(exc); - }); - } - } else { + if (exc == null) { final P rs = request.respTransfer == null ? message : (P) request.respTransfer.apply(message); - if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE + if (workThread == null) { + readThread.runWork(() -> { + Traces.computeIfAbsent(request.traceid); + respFuture.complete(rs); + }); + } else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE if (workThread.inIO()) { Traces.computeIfAbsent(request.traceid); respFuture.complete(rs); @@ -157,9 +145,36 @@ public abstract class ClientCodec implements Complet respFuture.complete(rs); }); } + } else { //异常 + if (workThread == null) { + readThread.runWork(() -> { + Traces.computeIfAbsent(request.traceid); + respFuture.completeExceptionally(exc); + }); + } else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE + if (workThread.inIO()) { + Traces.computeIfAbsent(request.traceid); + respFuture.completeExceptionally(exc); + } else { + workThread.execute(() -> { + Traces.computeIfAbsent(request.traceid); + respFuture.completeExceptionally(exc); + }); + } + } else { + workThread.runWork(() -> { + Traces.computeIfAbsent(request.traceid); + respFuture.completeExceptionally(exc); + }); + } } } catch (Throwable t) { - if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE + if (workThread == null) { + readThread.runWork(() -> { + Traces.computeIfAbsent(request.traceid); + respFuture.completeExceptionally(t); + }); + } else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE if (workThread.inIO()) { Traces.computeIfAbsent(request.traceid); respFuture.completeExceptionally(t); diff --git a/src/main/java/org/redkale/net/client/ClientRequest.java b/src/main/java/org/redkale/net/client/ClientRequest.java index a4c128807..bd6d7c001 100644 --- a/src/main/java/org/redkale/net/client/ClientRequest.java +++ b/src/main/java/org/redkale/net/client/ClientRequest.java @@ -35,12 +35,6 @@ public abstract class ClientRequest { public abstract void writeTo(ClientConnection conn, ByteArray array); - WorkThread removeWorkThread(WorkThread defaultValue) { - WorkThread t = this.workThread; - this.workThread = null; - return t == null ? defaultValue : t; - } - public Serializable getRequestid() { return null; }