优化ClientCodec回调

This commit is contained in:
redkale
2023-07-11 11:58:52 +08:00
parent f234ed0614
commit ad71343650
2 changed files with 36 additions and 27 deletions

View File

@@ -100,7 +100,7 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
if (respFuture != null) {
R request = respFuture.request;
AsyncIOThread readThread = connection.channel.getReadIOThread();
final WorkThread workThread = request.removeWorkThread(readThread);
final WorkThread workThread = request.workThread;
try {
if (!halfCompleted && !request.isCompleted()) {
if (exc == null) {
@@ -122,26 +122,14 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
// }
connection.preComplete(message, (R) request, exc);
if (exc != null) {
if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
if (workThread.inIO()) {
Traces.computeIfAbsent(request.traceid);
respFuture.completeExceptionally(exc);
} else {
workThread.execute(() -> {
Traces.computeIfAbsent(request.traceid);
respFuture.completeExceptionally(exc);
});
}
} else {
workThread.runWork(() -> {
Traces.computeIfAbsent(request.traceid);
respFuture.completeExceptionally(exc);
});
}
} else {
if (exc == null) {
final P rs = request.respTransfer == null ? message : (P) request.respTransfer.apply(message);
if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
if (workThread == null) {
readThread.runWork(() -> {
Traces.computeIfAbsent(request.traceid);
respFuture.complete(rs);
});
} else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
if (workThread.inIO()) {
Traces.computeIfAbsent(request.traceid);
respFuture.complete(rs);
@@ -157,9 +145,36 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
respFuture.complete(rs);
});
}
} else { //异常
if (workThread == null) {
readThread.runWork(() -> {
Traces.computeIfAbsent(request.traceid);
respFuture.completeExceptionally(exc);
});
} else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
if (workThread.inIO()) {
Traces.computeIfAbsent(request.traceid);
respFuture.completeExceptionally(exc);
} else {
workThread.execute(() -> {
Traces.computeIfAbsent(request.traceid);
respFuture.completeExceptionally(exc);
});
}
} else {
workThread.runWork(() -> {
Traces.computeIfAbsent(request.traceid);
respFuture.completeExceptionally(exc);
});
}
}
} catch (Throwable t) {
if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
if (workThread == null) {
readThread.runWork(() -> {
Traces.computeIfAbsent(request.traceid);
respFuture.completeExceptionally(t);
});
} else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
if (workThread.inIO()) {
Traces.computeIfAbsent(request.traceid);
respFuture.completeExceptionally(t);

View File

@@ -35,12 +35,6 @@ public abstract class ClientRequest {
public abstract void writeTo(ClientConnection conn, ByteArray array);
WorkThread removeWorkThread(WorkThread defaultValue) {
WorkThread t = this.workThread;
this.workThread = null;
return t == null ? defaultValue : t;
}
public Serializable getRequestid() {
return null;
}