浼樺寲ClientCodec
This commit is contained in:
@@ -70,7 +70,7 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
|
|||||||
Serializable reqid = cr.getRequestid();
|
Serializable reqid = cr.getRequestid();
|
||||||
ClientFuture respFuture = reqid == null ? responseQueue.poll() : responseMap.remove(reqid);
|
ClientFuture respFuture = reqid == null ? responseQueue.poll() : responseMap.remove(reqid);
|
||||||
if (respFuture != null) {
|
if (respFuture != null) {
|
||||||
completeResponse(respFuture, cr.message, cr.exc);
|
responseComplete(respFuture, cr.message, cr.exc);
|
||||||
}
|
}
|
||||||
respPool.accept(cr);
|
respPool.accept(cr);
|
||||||
}
|
}
|
||||||
@@ -90,9 +90,10 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void completeResponse(ClientFuture respFuture, P message, Throwable exc) {
|
private void responseComplete(ClientFuture respFuture, P message, Throwable exc) {
|
||||||
if (respFuture != null) {
|
if (respFuture != null) {
|
||||||
ClientRequest request = respFuture.request;
|
ClientRequest request = respFuture.request;
|
||||||
|
WorkThread workThread = null;
|
||||||
try {
|
try {
|
||||||
if (!request.isCompleted()) {
|
if (!request.isCompleted()) {
|
||||||
if (exc == null) {
|
if (exc == null) {
|
||||||
@@ -103,6 +104,8 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
|
|||||||
connection.sendHalfWrite(exc);
|
connection.sendHalfWrite(exc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
workThread = request.workThread;
|
||||||
|
request.workThread = null;
|
||||||
connection.respWaitingCounter.decrement();
|
connection.respWaitingCounter.decrement();
|
||||||
if (connection.isAuthenticated()) {
|
if (connection.isAuthenticated()) {
|
||||||
connection.client.incrRespDoneCounter();
|
connection.client.incrRespDoneCounter();
|
||||||
@@ -110,8 +113,7 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
|
|||||||
respFuture.cancelTimeout();
|
respFuture.cancelTimeout();
|
||||||
//if (client.finest) client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + ClientConnection.this + ", 回调处理, req=" + request + ", message=" + rs.message);
|
//if (client.finest) client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + ClientConnection.this + ", 回调处理, req=" + request + ", message=" + rs.message);
|
||||||
connection.preComplete(message, (R) request, exc);
|
connection.preComplete(message, (R) request, exc);
|
||||||
WorkThread workThread = request.workThread;
|
|
||||||
request.workThread = null;
|
|
||||||
if (workThread == null || workThread.getWorkExecutor() == null) {
|
if (workThread == null || workThread.getWorkExecutor() == null) {
|
||||||
workThread = connection.channel.getReadIOThread();
|
workThread = connection.channel.getReadIOThread();
|
||||||
}
|
}
|
||||||
@@ -128,6 +130,15 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
|
if (workThread == null) {
|
||||||
|
Traces.currTraceid(request.traceid);
|
||||||
|
respFuture.completeExceptionally(t);
|
||||||
|
} else {
|
||||||
|
workThread.runWork(() -> {
|
||||||
|
Traces.currTraceid(request.traceid);
|
||||||
|
respFuture.completeExceptionally(t);
|
||||||
|
});
|
||||||
|
}
|
||||||
connection.client.logger.log(Level.INFO, "Complete result error, request: " + respFuture.request, t);
|
connection.client.logger.log(Level.INFO, "Complete result error, request: " + respFuture.request, t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user