ClientCodec修复runWork逻辑

This commit is contained in:
redkale
2023-11-17 20:10:08 +08:00
parent 044e6c8b7a
commit ea13e89146

View File

@@ -120,7 +120,7 @@ public abstract class ClientCodec<R extends ClientRequest, P extends ClientResul
R request = respFuture.request; R request = respFuture.request;
Traces.currentTraceid(request.getTraceid()); Traces.currentTraceid(request.getTraceid());
AsyncIOThread readThread = connection.channel.getReadIOThread(); AsyncIOThread readThread = connection.channel.getReadIOThread();
final WorkThread workThread = request.workThread; final WorkThread workThread = request.workThread == null ? readThread : request.workThread;
try { try {
if (!halfCompleted && !request.isCompleted()) { if (!halfCompleted && !request.isCompleted()) {
if (exc == null) { if (exc == null) {
@@ -145,52 +145,24 @@ public abstract class ClientCodec<R extends ClientRequest, P extends ClientResul
if (exc == null) { if (exc == null) {
final Object rs = request.respTransfer == null ? message : request.respTransfer.apply(message); final Object rs = request.respTransfer == null ? message : request.respTransfer.apply(message);
if (workThread == null) { if (workThread.inIO() && workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
readThread.runWork(() -> { Traces.currentTraceid(request.traceid);
Traces.currentTraceid(request.traceid); respFuture.complete(rs);
respFuture.complete(rs); Traces.removeTraceid();
Traces.removeTraceid();
});
} else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
if (workThread.inIO()) {
Traces.currentTraceid(request.traceid);
respFuture.complete(rs);
Traces.removeTraceid();
} else {
workThread.execute(() -> {
Traces.currentTraceid(request.traceid);
respFuture.complete(rs);
Traces.removeTraceid();
});
}
} else { } else {
Utility.execute(() -> { workThread.runWork(() -> {
Traces.currentTraceid(request.traceid); Traces.currentTraceid(request.traceid);
respFuture.complete(rs); respFuture.complete(rs);
Traces.removeTraceid(); Traces.removeTraceid();
}); });
} }
} else { //异常 } else { //异常
if (workThread == null) { if (workThread.inIO() && workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
readThread.runWork(() -> { Traces.currentTraceid(request.traceid);
Traces.currentTraceid(request.traceid); respFuture.completeExceptionally(exc);
respFuture.completeExceptionally(exc); Traces.removeTraceid();
Traces.removeTraceid();
});
} else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
if (workThread.inIO()) {
Traces.currentTraceid(request.traceid);
respFuture.completeExceptionally(exc);
Traces.removeTraceid();
} else {
workThread.execute(() -> {
Traces.currentTraceid(request.traceid);
respFuture.completeExceptionally(exc);
Traces.removeTraceid();
});
}
} else { } else {
Utility.execute(() -> { workThread.runWork(() -> {
Traces.currentTraceid(request.traceid); Traces.currentTraceid(request.traceid);
respFuture.completeExceptionally(exc); respFuture.completeExceptionally(exc);
Traces.removeTraceid(); Traces.removeTraceid();
@@ -198,26 +170,12 @@ public abstract class ClientCodec<R extends ClientRequest, P extends ClientResul
} }
} }
} catch (Throwable t) { } catch (Throwable t) {
if (workThread == null) { if (workThread.inIO() && workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
readThread.runWork(() -> { Traces.currentTraceid(request.traceid);
Traces.currentTraceid(request.traceid); respFuture.completeExceptionally(t);
respFuture.completeExceptionally(t); Traces.removeTraceid();
Traces.removeTraceid();
});
} else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
if (workThread.inIO()) {
Traces.currentTraceid(request.traceid);
respFuture.completeExceptionally(t);
Traces.removeTraceid();
} else {
workThread.execute(() -> {
Traces.currentTraceid(request.traceid);
respFuture.completeExceptionally(t);
Traces.removeTraceid();
});
}
} else { } else {
Utility.execute(() -> { workThread.runWork(() -> {
Traces.currentTraceid(request.traceid); Traces.currentTraceid(request.traceid);
respFuture.completeExceptionally(t); respFuture.completeExceptionally(t);
Traces.removeTraceid(); Traces.removeTraceid();