ClientCodec优化responseComplete方法

This commit is contained in:
redkale
2023-06-25 09:58:19 +08:00
parent 94774d917f
commit 27f54ebd15
16 changed files with 70 additions and 46 deletions

View File

@@ -100,7 +100,7 @@ public abstract class LoggingBaseHandler extends Handler {
protected static void fillLogRecord(LogRecord log) {
String traceid = null;
if (traceFlag && Traces.enable()) {
traceid = Traces.currTraceid();
traceid = Traces.currentTraceid();
if (traceid == null || traceid.isEmpty()) {
traceid = "[TID:N/A] ";
} else {

View File

@@ -298,7 +298,7 @@ public class LoggingSearchHandler extends LoggingBaseHandler {
this.rawLog = log;
this.rawTag = tag;
this.threadName = Thread.currentThread().getName();
this.traceid = LoggingBaseHandler.traceFlag ? Traces.currTraceid() : null;
this.traceid = LoggingBaseHandler.traceFlag ? Traces.currentTraceid() : null;
String msg = log.getMessage();
if (log.getThrown() != null) {
StringWriter sw = new StringWriter();

View File

@@ -90,7 +90,7 @@ public class HttpMessageClientProcessor implements MessageClientProcessor {
private void execute(final MessageRecord message, final Runnable callback) {
HttpMessageRequest request = null;
try {
Traces.currTraceid(message.getTraceid());
Traces.currentTraceid(message.getTraceid());
long now = System.currentTimeMillis();
long cha = now - message.createTime;
long e = now - startTime;

View File

@@ -143,12 +143,12 @@ public abstract class MessageClient {
public MessageRecord createMessageRecord(String resptopic, String content) {
return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0,
null, null, resptopic, Traces.currTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8));
null, null, resptopic, Traces.currentTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8));
}
public MessageRecord createMessageRecord(String topic, String resptopic, String content) {
return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0,
null, topic, resptopic, Traces.currTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8));
null, topic, resptopic, Traces.currentTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8));
}
public MessageRecord createMessageRecord(String topic, String resptopic, String traceid, String content) {
@@ -158,7 +158,7 @@ public abstract class MessageClient {
public MessageRecord createMessageRecord(int userid, String topic, String resptopic, String content) {
return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), userid,
null, topic, resptopic, Traces.currTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8));
null, topic, resptopic, Traces.currentTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8));
}
public MessageRecord createMessageRecord(int userid, String topic, String resptopic, String traceid, String content) {
@@ -168,7 +168,7 @@ public abstract class MessageClient {
public MessageRecord createMessageRecord(String topic, String resptopic, Convert convert, Object bean) {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), 0,
null, topic, resptopic, Traces.currTraceid(), convert.convertToBytes(bean));
null, topic, resptopic, Traces.currentTraceid(), convert.convertToBytes(bean));
}
public MessageRecord createMessageRecord(String topic, String resptopic, String traceid, Convert convert, Object bean) {
@@ -178,29 +178,29 @@ public abstract class MessageClient {
public MessageRecord createMessageRecord(int userid, String topic, String resptopic, Convert convert, Object bean) {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid,
null, topic, resptopic, Traces.currTraceid(), convert.convertToBytes(bean));
null, topic, resptopic, Traces.currentTraceid(), convert.convertToBytes(bean));
}
public MessageRecord createMessageRecord(int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid,
groupid, topic, resptopic, Traces.currTraceid(), convert.convertToBytes(bean));
groupid, topic, resptopic, Traces.currentTraceid(), convert.convertToBytes(bean));
}
public MessageRecord createMessageRecord(int flag, int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, flag, System.currentTimeMillis(), userid,
groupid, topic, resptopic, Traces.currTraceid(), convert.convertToBytes(bean));
groupid, topic, resptopic, Traces.currentTraceid(), convert.convertToBytes(bean));
}
public MessageRecord createMessageRecord(String topic, String resptopic, byte[] content) {
return new MessageRecord(msgSeqno.incrementAndGet(), (byte) 0, topic, resptopic, Traces.currTraceid(), content);
return new MessageRecord(msgSeqno.incrementAndGet(), (byte) 0, topic, resptopic, Traces.currentTraceid(), content);
}
public MessageRecord createMessageRecord(long seqid, String topic, String resptopic, byte[] content) {
return new MessageRecord(seqid, (byte) 0, topic, resptopic, Traces.currTraceid(), content);
return new MessageRecord(seqid, (byte) 0, topic, resptopic, Traces.currentTraceid(), content);
}
protected MessageRecord createMessageRecord(byte ctype, String topic, String resptopic, byte[] content) {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype, topic, resptopic, Traces.currTraceid(), content);
return new MessageRecord(msgSeqno.incrementAndGet(), ctype, topic, resptopic, Traces.currentTraceid(), content);
}
protected MessageRecord createMessageRecord(byte ctype, String topic, String resptopic, String traceid, byte[] content) {
@@ -208,7 +208,7 @@ public abstract class MessageClient {
}
protected MessageRecord createMessageRecord(long seqid, byte ctype, String topic, String resptopic, byte[] content) {
return new MessageRecord(seqid, ctype, topic, resptopic, Traces.currTraceid(), content);
return new MessageRecord(seqid, ctype, topic, resptopic, Traces.currentTraceid(), content);
}
protected MessageRecord createMessageRecord(long seqid, byte ctype, String topic, String resptopic, String traceid, byte[] content) {

View File

@@ -69,7 +69,7 @@ public class SncpMessageClientProcessor implements MessageClientProcessor {
private void execute(final MessageRecord message, final Runnable callback) {
SncpMessageResponse response = null;
try {
Traces.currTraceid(message.getTraceid());
Traces.currentTraceid(message.getTraceid());
long now = System.currentTimeMillis();
long cha = now - message.createTime;
long e = now - starttime;

View File

@@ -200,7 +200,7 @@ public class AsyncIOGroup extends AsyncGroup {
AsyncIOThread readThread = null;
AsyncIOThread writeThread = null;
AsyncIOThread currThread = AsyncIOThread.currAsyncIOThread();
AsyncIOThread currThread = AsyncIOThread.currentAsyncIOThread();
if (currThread != null) {
if (this.ioReadThreads[0].getThreadGroup() == currThread.getThreadGroup()) {
for (int i = 0; i < this.ioReadThreads.length; i++) {
@@ -284,7 +284,7 @@ public class AsyncIOGroup extends AsyncGroup {
channel.configureBlocking(false);
AsyncIOThread readThread = null;
AsyncIOThread writeThread = null;
AsyncIOThread currThread = AsyncIOThread.currAsyncIOThread();
AsyncIOThread currThread = AsyncIOThread.currentAsyncIOThread();
if (currThread != null) {
for (int i = 0; i < this.ioReadThreads.length; i++) {
if (this.ioReadThreads[i].index() == currThread.index()) {

View File

@@ -54,7 +54,7 @@ public class AsyncIOThread extends WorkThread {
return closed.get();
}
public static AsyncIOThread currAsyncIOThread() {
public static AsyncIOThread currentAsyncIOThread() {
Thread t = Thread.currentThread();
return t instanceof AsyncIOThread ? (AsyncIOThread) t : null;
}

View File

@@ -122,7 +122,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
protected Response(C context, final R request) {
this.context = context;
this.request = request;
this.thread = WorkThread.currWorkThread();
this.thread = WorkThread.currentWorkThread();
this.writeBuffer = context != null ? ByteBuffer.allocateDirect(context.getBufferCapacity()) : null;
this.workExecutor = context == null || context.workExecutor == null ? ForkJoinPool.commonPool() : context.workExecutor;
}

View File

@@ -38,7 +38,7 @@ public class WorkThread extends Thread implements Executor {
this.setDaemon(true);
}
public static WorkThread currWorkThread() {
public static WorkThread currentWorkThread() {
Thread t = Thread.currentThread();
return t instanceof WorkThread ? (WorkThread) t : null;
}

View File

@@ -220,28 +220,28 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
public final CompletableFuture<P> sendAsync(R request) {
if (request.workThread == null) {
request.workThread = WorkThread.currWorkThread();
request.workThread = WorkThread.currentWorkThread();
}
return connect().thenCompose(conn -> writeChannel(conn, request));
}
public final <T> CompletableFuture<T> sendAsync(R request, Function<P, T> respTransfer) {
if (request.workThread == null) {
request.workThread = WorkThread.currWorkThread();
request.workThread = WorkThread.currentWorkThread();
}
return connect().thenCompose(conn -> writeChannel(conn, request, respTransfer));
}
public final CompletableFuture<P> sendAsync(SocketAddress addr, R request) {
if (request.workThread == null) {
request.workThread = WorkThread.currWorkThread();
request.workThread = WorkThread.currentWorkThread();
}
return connect(addr).thenCompose(conn -> writeChannel(conn, request));
}
public final <T> CompletableFuture<T> sendAsync(SocketAddress addr, R request, Function<P, T> respTransfer) {
if (request.workThread == null) {
request.workThread = WorkThread.currWorkThread();
request.workThread = WorkThread.currentWorkThread();
}
return connect(addr).thenCompose(conn -> writeChannel(conn, request, respTransfer));
}
@@ -264,7 +264,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
public final CompletableFuture<C> connect() {
final int size = this.connArray.length;
WorkThread workThread = WorkThread.currWorkThread();
WorkThread workThread = WorkThread.currentWorkThread();
final int connIndex = (workThread != null && workThread.threads() == size) ? workThread.index() : (int) Math.abs(connIndexSeq.getAndIncrement()) % size;
C cc = (C) this.connArray[connIndex];
if (cc != null && cc.isOpen()) {

View File

@@ -123,40 +123,64 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
connection.preComplete(message, (R) request, exc);
if (exc != null) {
if (workThread.inIO() && workThread.getState() == Thread.State.RUNNABLE) {
if (workThread == readThread) {
workThread.runWork(() -> {
Traces.currentTraceid(request.traceid);
respFuture.completeExceptionally(exc);
});
} else if (workThread.inIO()) {
Traces.currentTraceid(request.traceid);
respFuture.completeExceptionally(exc);
} else if (workThread.getState() == Thread.State.RUNNABLE) {
workThread.execute(() -> {
Traces.currTraceid(request.traceid);
Traces.currentTraceid(request.traceid);
respFuture.completeExceptionally(exc);
});
} else {
workThread.runWork(() -> {
Traces.currTraceid(request.traceid);
Traces.currentTraceid(request.traceid);
respFuture.completeExceptionally(exc);
});
}
} else {
final Object rs = request.respTransfer == null ? message : request.respTransfer.apply(message);
if (workThread.inIO() && workThread.getState() == Thread.State.RUNNABLE) {
final P rs = request.respTransfer == null ? message : (P) request.respTransfer.apply(message);
if (workThread == readThread) {
workThread.runWork(() -> {
Traces.currentTraceid(request.traceid);
respFuture.complete(rs);
});
} else if (workThread.inIO()) {
Traces.currentTraceid(request.traceid);
respFuture.complete(rs);
} else if (workThread.getState() == Thread.State.RUNNABLE) {
workThread.execute(() -> {
Traces.currTraceid(request.traceid);
((ClientFuture) respFuture).complete(rs);
Traces.currentTraceid(request.traceid);
respFuture.complete(rs);
});
} else {
workThread.runWork(() -> {
Traces.currTraceid(request.traceid);
((ClientFuture) respFuture).complete(rs);
Traces.currentTraceid(request.traceid);
respFuture.complete(rs);
});
}
}
} catch (Throwable t) {
if (workThread.inIO() && workThread.getState() == Thread.State.RUNNABLE) {
if (workThread == readThread) {
workThread.runWork(() -> {
Traces.currentTraceid(request.traceid);
respFuture.completeExceptionally(t);
});
} else if (workThread.inIO()) {
Traces.currentTraceid(request.traceid);
respFuture.completeExceptionally(t);
} else if (workThread.getState() == Thread.State.RUNNABLE) {
workThread.execute(() -> {
Traces.currTraceid(request.traceid);
Traces.currentTraceid(request.traceid);
respFuture.completeExceptionally(t);
});
} else {
workThread.runWork(() -> {
Traces.currTraceid(request.traceid);
Traces.currentTraceid(request.traceid);
respFuture.completeExceptionally(t);
});
}

View File

@@ -60,12 +60,12 @@ public abstract class ClientRequest {
return traceid;
}
public <T extends ClientRequest> T currThread(WorkThread thread) {
public <T extends ClientRequest> T workThread(WorkThread thread) {
this.workThread = thread;
return (T) this;
}
public <T extends ClientRequest> T currTraceid(String traceid) {
public <T extends ClientRequest> T traceid(String traceid) {
this.traceid = traceid;
return (T) this;
}
@@ -77,7 +77,7 @@ public abstract class ClientRequest {
protected void prepare() {
this.createTime = System.currentTimeMillis();
this.traceid = Traces.currTraceid();
this.traceid = Traces.currentTraceid();
this.respTransfer = null;
}

View File

@@ -90,11 +90,11 @@ public class HttpSimpleRequest implements java.io.Serializable {
protected byte[] body; //对应HttpRequest.array
public static HttpSimpleRequest create(String requestURI) {
return new HttpSimpleRequest().requestURI(requestURI).traceid(Traces.currTraceid());
return new HttpSimpleRequest().requestURI(requestURI).traceid(Traces.currentTraceid());
}
public static HttpSimpleRequest create(String requestURI, Object... params) {
HttpSimpleRequest req = new HttpSimpleRequest().requestURI(requestURI).traceid(Traces.currTraceid());
HttpSimpleRequest req = new HttpSimpleRequest().requestURI(requestURI).traceid(Traces.currentTraceid());
int len = params.length / 2;
for (int i = 0; i < len; i++) {
req.param(params[i * 2].toString(), params[i * 2 + 1]);

View File

@@ -394,7 +394,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
}
});
};
WorkThread workThread = WorkThread.currWorkThread();
WorkThread workThread = WorkThread.currentWorkThread();
sessionFuture.whenComplete((sessionid, ex) -> {
if (workThread == null || workThread == Thread.currentThread()) {
sessionConsumer.accept(sessionid, ex);

View File

@@ -105,7 +105,7 @@ public class SncpRemoteInfo<T extends Service> {
params[action.paramHandlerAttachIndex] = null;
}
}
final CompletableFuture<byte[]> future = remote(action, Traces.currTraceid(), params);
final CompletableFuture<byte[]> future = remote(action, Traces.currentTraceid(), params);
if (action.paramHandlerIndex >= 0) { //参数中存在CompletionHandler
final CompletionHandler handler = callbackHandler;
final Object attach = callbackHandlerAttach;

View File

@@ -49,13 +49,13 @@ public class Traces {
// }
// return traceid;
// }
public static void currTraceid(String traceid) {
public static void currentTraceid(String traceid) {
if (enable) {
localTrace.set(traceid);
}
}
public static String currTraceid() {
public static String currentTraceid() {
return enable ? localTrace.get() : null;
}