diff --git a/src/main/java/org/redkale/net/client/ClientCodec.java b/src/main/java/org/redkale/net/client/ClientCodec.java index f8a896625..11e26633a 100644 --- a/src/main/java/org/redkale/net/client/ClientCodec.java +++ b/src/main/java/org/redkale/net/client/ClientCodec.java @@ -70,7 +70,7 @@ public abstract class ClientCodec implements Complet Serializable reqid = cr.getRequestid(); ClientFuture respFuture = reqid == null ? responseQueue.poll() : responseMap.remove(reqid); if (respFuture != null) { - completeResponse(respFuture, cr.message, cr.exc); + responseComplete(respFuture, cr.message, cr.exc); } respPool.accept(cr); } @@ -90,9 +90,10 @@ public abstract class ClientCodec implements Complet } } - private void completeResponse(ClientFuture respFuture, P message, Throwable exc) { + private void responseComplete(ClientFuture respFuture, P message, Throwable exc) { if (respFuture != null) { ClientRequest request = respFuture.request; + WorkThread workThread = null; try { if (!request.isCompleted()) { if (exc == null) { @@ -103,6 +104,8 @@ public abstract class ClientCodec implements Complet connection.sendHalfWrite(exc); } } + workThread = request.workThread; + request.workThread = null; connection.respWaitingCounter.decrement(); if (connection.isAuthenticated()) { connection.client.incrRespDoneCounter(); @@ -110,8 +113,7 @@ public abstract class ClientCodec implements Complet respFuture.cancelTimeout(); //if (client.finest) client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + ClientConnection.this + ", 回调处理, req=" + request + ", message=" + rs.message); connection.preComplete(message, (R) request, exc); - WorkThread workThread = request.workThread; - request.workThread = null; + if (workThread == null || workThread.getWorkExecutor() == null) { workThread = connection.channel.getReadIOThread(); } @@ -128,6 +130,15 @@ public abstract class ClientCodec implements Complet }); } } catch (Throwable t) { + if (workThread == null) { + Traces.currTraceid(request.traceid); + respFuture.completeExceptionally(t); + } else { + workThread.runWork(() -> { + Traces.currTraceid(request.traceid); + respFuture.completeExceptionally(t); + }); + } connection.client.logger.log(Level.INFO, "Complete result error, request: " + respFuture.request, t); } }