From 8e7b27eb16eb05f20e8cdb6ca4c80b180f2614a0 Mon Sep 17 00:00:00 2001 From: Redkale Date: Fri, 13 Jan 2023 22:38:00 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B5=BC=E6=A8=BA=E5=AF=B2ClientCodec?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/redkale/net/client/ClientCodec.java | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/redkale/net/client/ClientCodec.java b/src/main/java/org/redkale/net/client/ClientCodec.java index f8a896625..11e26633a 100644 --- a/src/main/java/org/redkale/net/client/ClientCodec.java +++ b/src/main/java/org/redkale/net/client/ClientCodec.java @@ -70,7 +70,7 @@ public abstract class ClientCodec implements Complet Serializable reqid = cr.getRequestid(); ClientFuture respFuture = reqid == null ? responseQueue.poll() : responseMap.remove(reqid); if (respFuture != null) { - completeResponse(respFuture, cr.message, cr.exc); + responseComplete(respFuture, cr.message, cr.exc); } respPool.accept(cr); } @@ -90,9 +90,10 @@ public abstract class ClientCodec implements Complet } } - private void completeResponse(ClientFuture respFuture, P message, Throwable exc) { + private void responseComplete(ClientFuture respFuture, P message, Throwable exc) { if (respFuture != null) { ClientRequest request = respFuture.request; + WorkThread workThread = null; try { if (!request.isCompleted()) { if (exc == null) { @@ -103,6 +104,8 @@ public abstract class ClientCodec implements Complet connection.sendHalfWrite(exc); } } + workThread = request.workThread; + request.workThread = null; connection.respWaitingCounter.decrement(); if (connection.isAuthenticated()) { connection.client.incrRespDoneCounter(); @@ -110,8 +113,7 @@ public abstract class ClientCodec implements Complet respFuture.cancelTimeout(); //if (client.finest) client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + ClientConnection.this + ", 回调处理, req=" + request + ", message=" + rs.message); connection.preComplete(message, (R) request, exc); - WorkThread workThread = request.workThread; - request.workThread = null; + if (workThread == null || workThread.getWorkExecutor() == null) { workThread = connection.channel.getReadIOThread(); } @@ -128,6 +130,15 @@ public abstract class ClientCodec implements Complet }); } } catch (Throwable t) { + if (workThread == null) { + Traces.currTraceid(request.traceid); + respFuture.completeExceptionally(t); + } else { + workThread.runWork(() -> { + Traces.currTraceid(request.traceid); + respFuture.completeExceptionally(t); + }); + } connection.client.logger.log(Level.INFO, "Complete result error, request: " + respFuture.request, t); } }