diff --git a/src/main/java/org/redkale/boot/LoggingBaseHandler.java b/src/main/java/org/redkale/boot/LoggingBaseHandler.java index da88cf39a..fa8d56c6a 100644 --- a/src/main/java/org/redkale/boot/LoggingBaseHandler.java +++ b/src/main/java/org/redkale/boot/LoggingBaseHandler.java @@ -100,7 +100,7 @@ public abstract class LoggingBaseHandler extends Handler { protected static void fillLogRecord(LogRecord log) { String traceid = null; if (traceFlag && Traces.enable()) { - traceid = Traces.currTraceid(); + traceid = Traces.currentTraceid(); if (traceid == null || traceid.isEmpty()) { traceid = "[TID:N/A] "; } else { diff --git a/src/main/java/org/redkale/boot/LoggingSearchHandler.java b/src/main/java/org/redkale/boot/LoggingSearchHandler.java index f2cd6ddd1..a609ec5da 100644 --- a/src/main/java/org/redkale/boot/LoggingSearchHandler.java +++ b/src/main/java/org/redkale/boot/LoggingSearchHandler.java @@ -298,7 +298,7 @@ public class LoggingSearchHandler extends LoggingBaseHandler { this.rawLog = log; this.rawTag = tag; this.threadName = Thread.currentThread().getName(); - this.traceid = LoggingBaseHandler.traceFlag ? Traces.currTraceid() : null; + this.traceid = LoggingBaseHandler.traceFlag ? Traces.currentTraceid() : null; String msg = log.getMessage(); if (log.getThrown() != null) { StringWriter sw = new StringWriter(); diff --git a/src/main/java/org/redkale/mq/HttpMessageClientProcessor.java b/src/main/java/org/redkale/mq/HttpMessageClientProcessor.java index 161a27e2a..7ef21b8c5 100644 --- a/src/main/java/org/redkale/mq/HttpMessageClientProcessor.java +++ b/src/main/java/org/redkale/mq/HttpMessageClientProcessor.java @@ -90,7 +90,7 @@ public class HttpMessageClientProcessor implements MessageClientProcessor { private void execute(final MessageRecord message, final Runnable callback) { HttpMessageRequest request = null; try { - Traces.currTraceid(message.getTraceid()); + Traces.currentTraceid(message.getTraceid()); long now = System.currentTimeMillis(); long cha = now - message.createTime; long e = now - startTime; diff --git a/src/main/java/org/redkale/mq/MessageClient.java b/src/main/java/org/redkale/mq/MessageClient.java index 75af8de7a..0918f4fdd 100644 --- a/src/main/java/org/redkale/mq/MessageClient.java +++ b/src/main/java/org/redkale/mq/MessageClient.java @@ -143,12 +143,12 @@ public abstract class MessageClient { public MessageRecord createMessageRecord(String resptopic, String content) { return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, - null, null, resptopic, Traces.currTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8)); + null, null, resptopic, Traces.currentTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8)); } public MessageRecord createMessageRecord(String topic, String resptopic, String content) { return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, - null, topic, resptopic, Traces.currTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8)); + null, topic, resptopic, Traces.currentTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8)); } public MessageRecord createMessageRecord(String topic, String resptopic, String traceid, String content) { @@ -158,7 +158,7 @@ public abstract class MessageClient { public MessageRecord createMessageRecord(int userid, String topic, String resptopic, String content) { return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), userid, - null, topic, resptopic, Traces.currTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8)); + null, topic, resptopic, Traces.currentTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8)); } public MessageRecord createMessageRecord(int userid, String topic, String resptopic, String traceid, String content) { @@ -168,7 +168,7 @@ public abstract class MessageClient { public MessageRecord createMessageRecord(String topic, String resptopic, Convert convert, Object bean) { return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), 0, - null, topic, resptopic, Traces.currTraceid(), convert.convertToBytes(bean)); + null, topic, resptopic, Traces.currentTraceid(), convert.convertToBytes(bean)); } public MessageRecord createMessageRecord(String topic, String resptopic, String traceid, Convert convert, Object bean) { @@ -178,29 +178,29 @@ public abstract class MessageClient { public MessageRecord createMessageRecord(int userid, String topic, String resptopic, Convert convert, Object bean) { return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, - null, topic, resptopic, Traces.currTraceid(), convert.convertToBytes(bean)); + null, topic, resptopic, Traces.currentTraceid(), convert.convertToBytes(bean)); } public MessageRecord createMessageRecord(int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) { return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, - groupid, topic, resptopic, Traces.currTraceid(), convert.convertToBytes(bean)); + groupid, topic, resptopic, Traces.currentTraceid(), convert.convertToBytes(bean)); } public MessageRecord createMessageRecord(int flag, int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) { return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, flag, System.currentTimeMillis(), userid, - groupid, topic, resptopic, Traces.currTraceid(), convert.convertToBytes(bean)); + groupid, topic, resptopic, Traces.currentTraceid(), convert.convertToBytes(bean)); } public MessageRecord createMessageRecord(String topic, String resptopic, byte[] content) { - return new MessageRecord(msgSeqno.incrementAndGet(), (byte) 0, topic, resptopic, Traces.currTraceid(), content); + return new MessageRecord(msgSeqno.incrementAndGet(), (byte) 0, topic, resptopic, Traces.currentTraceid(), content); } public MessageRecord createMessageRecord(long seqid, String topic, String resptopic, byte[] content) { - return new MessageRecord(seqid, (byte) 0, topic, resptopic, Traces.currTraceid(), content); + return new MessageRecord(seqid, (byte) 0, topic, resptopic, Traces.currentTraceid(), content); } protected MessageRecord createMessageRecord(byte ctype, String topic, String resptopic, byte[] content) { - return new MessageRecord(msgSeqno.incrementAndGet(), ctype, topic, resptopic, Traces.currTraceid(), content); + return new MessageRecord(msgSeqno.incrementAndGet(), ctype, topic, resptopic, Traces.currentTraceid(), content); } protected MessageRecord createMessageRecord(byte ctype, String topic, String resptopic, String traceid, byte[] content) { @@ -208,7 +208,7 @@ public abstract class MessageClient { } protected MessageRecord createMessageRecord(long seqid, byte ctype, String topic, String resptopic, byte[] content) { - return new MessageRecord(seqid, ctype, topic, resptopic, Traces.currTraceid(), content); + return new MessageRecord(seqid, ctype, topic, resptopic, Traces.currentTraceid(), content); } protected MessageRecord createMessageRecord(long seqid, byte ctype, String topic, String resptopic, String traceid, byte[] content) { diff --git a/src/main/java/org/redkale/mq/SncpMessageClientProcessor.java b/src/main/java/org/redkale/mq/SncpMessageClientProcessor.java index cd0912c63..502607b2a 100644 --- a/src/main/java/org/redkale/mq/SncpMessageClientProcessor.java +++ b/src/main/java/org/redkale/mq/SncpMessageClientProcessor.java @@ -69,7 +69,7 @@ public class SncpMessageClientProcessor implements MessageClientProcessor { private void execute(final MessageRecord message, final Runnable callback) { SncpMessageResponse response = null; try { - Traces.currTraceid(message.getTraceid()); + Traces.currentTraceid(message.getTraceid()); long now = System.currentTimeMillis(); long cha = now - message.createTime; long e = now - starttime; diff --git a/src/main/java/org/redkale/net/AsyncIOGroup.java b/src/main/java/org/redkale/net/AsyncIOGroup.java index 2a363d73c..87bc3fdc6 100644 --- a/src/main/java/org/redkale/net/AsyncIOGroup.java +++ b/src/main/java/org/redkale/net/AsyncIOGroup.java @@ -200,7 +200,7 @@ public class AsyncIOGroup extends AsyncGroup { AsyncIOThread readThread = null; AsyncIOThread writeThread = null; - AsyncIOThread currThread = AsyncIOThread.currAsyncIOThread(); + AsyncIOThread currThread = AsyncIOThread.currentAsyncIOThread(); if (currThread != null) { if (this.ioReadThreads[0].getThreadGroup() == currThread.getThreadGroup()) { for (int i = 0; i < this.ioReadThreads.length; i++) { @@ -284,7 +284,7 @@ public class AsyncIOGroup extends AsyncGroup { channel.configureBlocking(false); AsyncIOThread readThread = null; AsyncIOThread writeThread = null; - AsyncIOThread currThread = AsyncIOThread.currAsyncIOThread(); + AsyncIOThread currThread = AsyncIOThread.currentAsyncIOThread(); if (currThread != null) { for (int i = 0; i < this.ioReadThreads.length; i++) { if (this.ioReadThreads[i].index() == currThread.index()) { diff --git a/src/main/java/org/redkale/net/AsyncIOThread.java b/src/main/java/org/redkale/net/AsyncIOThread.java index dbae9b0ec..92bb92aa2 100644 --- a/src/main/java/org/redkale/net/AsyncIOThread.java +++ b/src/main/java/org/redkale/net/AsyncIOThread.java @@ -54,7 +54,7 @@ public class AsyncIOThread extends WorkThread { return closed.get(); } - public static AsyncIOThread currAsyncIOThread() { + public static AsyncIOThread currentAsyncIOThread() { Thread t = Thread.currentThread(); return t instanceof AsyncIOThread ? (AsyncIOThread) t : null; } diff --git a/src/main/java/org/redkale/net/Response.java b/src/main/java/org/redkale/net/Response.java index dc9f50dad..033d3a6dd 100644 --- a/src/main/java/org/redkale/net/Response.java +++ b/src/main/java/org/redkale/net/Response.java @@ -122,7 +122,7 @@ public abstract class Response> { protected Response(C context, final R request) { this.context = context; this.request = request; - this.thread = WorkThread.currWorkThread(); + this.thread = WorkThread.currentWorkThread(); this.writeBuffer = context != null ? ByteBuffer.allocateDirect(context.getBufferCapacity()) : null; this.workExecutor = context == null || context.workExecutor == null ? ForkJoinPool.commonPool() : context.workExecutor; } diff --git a/src/main/java/org/redkale/net/WorkThread.java b/src/main/java/org/redkale/net/WorkThread.java index b0b3426cb..5a2189068 100644 --- a/src/main/java/org/redkale/net/WorkThread.java +++ b/src/main/java/org/redkale/net/WorkThread.java @@ -38,7 +38,7 @@ public class WorkThread extends Thread implements Executor { this.setDaemon(true); } - public static WorkThread currWorkThread() { + public static WorkThread currentWorkThread() { Thread t = Thread.currentThread(); return t instanceof WorkThread ? (WorkThread) t : null; } diff --git a/src/main/java/org/redkale/net/client/Client.java b/src/main/java/org/redkale/net/client/Client.java index 26b333fdf..90098d69a 100644 --- a/src/main/java/org/redkale/net/client/Client.java +++ b/src/main/java/org/redkale/net/client/Client.java @@ -220,28 +220,28 @@ public abstract class Client, R extends ClientR public final CompletableFuture

sendAsync(R request) { if (request.workThread == null) { - request.workThread = WorkThread.currWorkThread(); + request.workThread = WorkThread.currentWorkThread(); } return connect().thenCompose(conn -> writeChannel(conn, request)); } public final CompletableFuture sendAsync(R request, Function respTransfer) { if (request.workThread == null) { - request.workThread = WorkThread.currWorkThread(); + request.workThread = WorkThread.currentWorkThread(); } return connect().thenCompose(conn -> writeChannel(conn, request, respTransfer)); } public final CompletableFuture

sendAsync(SocketAddress addr, R request) { if (request.workThread == null) { - request.workThread = WorkThread.currWorkThread(); + request.workThread = WorkThread.currentWorkThread(); } return connect(addr).thenCompose(conn -> writeChannel(conn, request)); } public final CompletableFuture sendAsync(SocketAddress addr, R request, Function respTransfer) { if (request.workThread == null) { - request.workThread = WorkThread.currWorkThread(); + request.workThread = WorkThread.currentWorkThread(); } return connect(addr).thenCompose(conn -> writeChannel(conn, request, respTransfer)); } @@ -264,7 +264,7 @@ public abstract class Client, R extends ClientR public final CompletableFuture connect() { final int size = this.connArray.length; - WorkThread workThread = WorkThread.currWorkThread(); + WorkThread workThread = WorkThread.currentWorkThread(); final int connIndex = (workThread != null && workThread.threads() == size) ? workThread.index() : (int) Math.abs(connIndexSeq.getAndIncrement()) % size; C cc = (C) this.connArray[connIndex]; if (cc != null && cc.isOpen()) { diff --git a/src/main/java/org/redkale/net/client/ClientCodec.java b/src/main/java/org/redkale/net/client/ClientCodec.java index 333bf2af1..73e3260e1 100644 --- a/src/main/java/org/redkale/net/client/ClientCodec.java +++ b/src/main/java/org/redkale/net/client/ClientCodec.java @@ -123,40 +123,64 @@ public abstract class ClientCodec implements Complet connection.preComplete(message, (R) request, exc); if (exc != null) { - if (workThread.inIO() && workThread.getState() == Thread.State.RUNNABLE) { + if (workThread == readThread) { + workThread.runWork(() -> { + Traces.currentTraceid(request.traceid); + respFuture.completeExceptionally(exc); + }); + } else if (workThread.inIO()) { + Traces.currentTraceid(request.traceid); + respFuture.completeExceptionally(exc); + } else if (workThread.getState() == Thread.State.RUNNABLE) { workThread.execute(() -> { - Traces.currTraceid(request.traceid); + Traces.currentTraceid(request.traceid); respFuture.completeExceptionally(exc); }); } else { workThread.runWork(() -> { - Traces.currTraceid(request.traceid); + Traces.currentTraceid(request.traceid); respFuture.completeExceptionally(exc); }); } } else { - final Object rs = request.respTransfer == null ? message : request.respTransfer.apply(message); - if (workThread.inIO() && workThread.getState() == Thread.State.RUNNABLE) { + final P rs = request.respTransfer == null ? message : (P) request.respTransfer.apply(message); + if (workThread == readThread) { + workThread.runWork(() -> { + Traces.currentTraceid(request.traceid); + respFuture.complete(rs); + }); + } else if (workThread.inIO()) { + Traces.currentTraceid(request.traceid); + respFuture.complete(rs); + } else if (workThread.getState() == Thread.State.RUNNABLE) { workThread.execute(() -> { - Traces.currTraceid(request.traceid); - ((ClientFuture) respFuture).complete(rs); + Traces.currentTraceid(request.traceid); + respFuture.complete(rs); }); } else { workThread.runWork(() -> { - Traces.currTraceid(request.traceid); - ((ClientFuture) respFuture).complete(rs); + Traces.currentTraceid(request.traceid); + respFuture.complete(rs); }); } } } catch (Throwable t) { - if (workThread.inIO() && workThread.getState() == Thread.State.RUNNABLE) { + if (workThread == readThread) { + workThread.runWork(() -> { + Traces.currentTraceid(request.traceid); + respFuture.completeExceptionally(t); + }); + } else if (workThread.inIO()) { + Traces.currentTraceid(request.traceid); + respFuture.completeExceptionally(t); + } else if (workThread.getState() == Thread.State.RUNNABLE) { workThread.execute(() -> { - Traces.currTraceid(request.traceid); + Traces.currentTraceid(request.traceid); respFuture.completeExceptionally(t); }); } else { workThread.runWork(() -> { - Traces.currTraceid(request.traceid); + Traces.currentTraceid(request.traceid); respFuture.completeExceptionally(t); }); } diff --git a/src/main/java/org/redkale/net/client/ClientRequest.java b/src/main/java/org/redkale/net/client/ClientRequest.java index 5dd40a0ae..cce76e68a 100644 --- a/src/main/java/org/redkale/net/client/ClientRequest.java +++ b/src/main/java/org/redkale/net/client/ClientRequest.java @@ -60,12 +60,12 @@ public abstract class ClientRequest { return traceid; } - public T currThread(WorkThread thread) { + public T workThread(WorkThread thread) { this.workThread = thread; return (T) this; } - public T currTraceid(String traceid) { + public T traceid(String traceid) { this.traceid = traceid; return (T) this; } @@ -77,7 +77,7 @@ public abstract class ClientRequest { protected void prepare() { this.createTime = System.currentTimeMillis(); - this.traceid = Traces.currTraceid(); + this.traceid = Traces.currentTraceid(); this.respTransfer = null; } diff --git a/src/main/java/org/redkale/net/http/HttpSimpleRequest.java b/src/main/java/org/redkale/net/http/HttpSimpleRequest.java index 7268f39ae..01b8c1696 100644 --- a/src/main/java/org/redkale/net/http/HttpSimpleRequest.java +++ b/src/main/java/org/redkale/net/http/HttpSimpleRequest.java @@ -90,11 +90,11 @@ public class HttpSimpleRequest implements java.io.Serializable { protected byte[] body; //对应HttpRequest.array public static HttpSimpleRequest create(String requestURI) { - return new HttpSimpleRequest().requestURI(requestURI).traceid(Traces.currTraceid()); + return new HttpSimpleRequest().requestURI(requestURI).traceid(Traces.currentTraceid()); } public static HttpSimpleRequest create(String requestURI, Object... params) { - HttpSimpleRequest req = new HttpSimpleRequest().requestURI(requestURI).traceid(Traces.currTraceid()); + HttpSimpleRequest req = new HttpSimpleRequest().requestURI(requestURI).traceid(Traces.currentTraceid()); int len = params.length / 2; for (int i = 0; i < len; i++) { req.param(params[i * 2].toString(), params[i * 2 + 1]); diff --git a/src/main/java/org/redkale/net/http/WebSocketServlet.java b/src/main/java/org/redkale/net/http/WebSocketServlet.java index 50bcc2eae..2351028b5 100644 --- a/src/main/java/org/redkale/net/http/WebSocketServlet.java +++ b/src/main/java/org/redkale/net/http/WebSocketServlet.java @@ -394,7 +394,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl } }); }; - WorkThread workThread = WorkThread.currWorkThread(); + WorkThread workThread = WorkThread.currentWorkThread(); sessionFuture.whenComplete((sessionid, ex) -> { if (workThread == null || workThread == Thread.currentThread()) { sessionConsumer.accept(sessionid, ex); diff --git a/src/main/java/org/redkale/net/sncp/SncpRemoteInfo.java b/src/main/java/org/redkale/net/sncp/SncpRemoteInfo.java index 2a044e353..6f4684242 100644 --- a/src/main/java/org/redkale/net/sncp/SncpRemoteInfo.java +++ b/src/main/java/org/redkale/net/sncp/SncpRemoteInfo.java @@ -105,7 +105,7 @@ public class SncpRemoteInfo { params[action.paramHandlerAttachIndex] = null; } } - final CompletableFuture future = remote(action, Traces.currTraceid(), params); + final CompletableFuture future = remote(action, Traces.currentTraceid(), params); if (action.paramHandlerIndex >= 0) { //参数中存在CompletionHandler final CompletionHandler handler = callbackHandler; final Object attach = callbackHandlerAttach; diff --git a/src/main/java/org/redkale/util/Traces.java b/src/main/java/org/redkale/util/Traces.java index c3be5e082..8a7d2d20a 100644 --- a/src/main/java/org/redkale/util/Traces.java +++ b/src/main/java/org/redkale/util/Traces.java @@ -49,13 +49,13 @@ public class Traces { // } // return traceid; // } - public static void currTraceid(String traceid) { + public static void currentTraceid(String traceid) { if (enable) { localTrace.set(traceid); } } - public static String currTraceid() { + public static String currentTraceid() { return enable ? localTrace.get() : null; }