diff --git a/src/main/java/org/redkale/mq/HttpMessageLocalClient.java b/src/main/java/org/redkale/mq/HttpMessageLocalClient.java index 3c0226987..3d722b244 100644 --- a/src/main/java/org/redkale/mq/HttpMessageLocalClient.java +++ b/src/main/java/org/redkale/mq/HttpMessageLocalClient.java @@ -103,7 +103,7 @@ public class HttpMessageLocalClient extends HttpMessageClient { } HttpRequest req = new HttpMessageLocalRequest(context(), request, userid); HttpResponse resp = new HttpMessageLocalResponse(req, future); - Traces.createTraceid(); + Traces.loadTraceid(); try { servlet.execute(req, resp); } catch (Exception e) { @@ -122,7 +122,7 @@ public class HttpMessageLocalClient extends HttpMessageClient { HttpRequest req = new HttpMessageLocalRequest(context(), request, userid); CompletableFuture future = new CompletableFuture(); HttpResponse resp = new HttpMessageLocalResponse(req, future); - Traces.createTraceid(); + Traces.loadTraceid(); try { servlet.execute(req, resp); } catch (Exception e) { @@ -149,7 +149,7 @@ public class HttpMessageLocalClient extends HttpMessageClient { } HttpRequest req = new HttpMessageLocalRequest(context(), request, userid); HttpResponse resp = new HttpMessageLocalResponse(req, null); - Traces.createTraceid(); + Traces.loadTraceid(); try { servlet.execute(req, resp); } catch (Exception e) { @@ -162,7 +162,7 @@ public class HttpMessageLocalClient extends HttpMessageClient { HttpDispatcherServlet ps = dispatcherServlet(); HttpRequest req = new HttpMessageLocalRequest(context(), request, userid); HttpResponse resp = new HttpMessageLocalResponse(req, null); - Traces.createTraceid(); + Traces.loadTraceid(); ps.filterServletsByMmcTopic(topic).forEach(s -> { try { s.execute(req, resp); diff --git a/src/main/java/org/redkale/mq/MessageClient.java b/src/main/java/org/redkale/mq/MessageClient.java index c6639457f..fd8d1fc55 100644 --- a/src/main/java/org/redkale/mq/MessageClient.java +++ b/src/main/java/org/redkale/mq/MessageClient.java @@ -57,7 +57,9 @@ public abstract class MessageClient { try { if (this.respConsumer == null) { synchronized (this) { - if (this.respConsumerid == null) this.respConsumerid = "consumer-" + this.respTopic; + if (this.respConsumerid == null) { + this.respConsumerid = "consumer-" + this.respTopic; + } if (this.respConsumer == null) { MessageProcessor processor = (msg, callback) -> { long now = System.currentTimeMillis(); @@ -119,11 +121,11 @@ public abstract class MessageClient { protected abstract MessageProducers getProducer(); public MessageRecord createMessageRecord(String resptopic, String content) { - return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, null, null, resptopic, Traces.createTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8)); + return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, null, null, resptopic, Traces.loadTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8)); } public MessageRecord createMessageRecord(String topic, String resptopic, String content) { - return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, Traces.createTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8)); + return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, Traces.loadTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8)); } public MessageRecord createMessageRecord(String topic, String resptopic, String traceid, String content) { @@ -131,7 +133,7 @@ public abstract class MessageClient { } public MessageRecord createMessageRecord(int userid, String topic, String resptopic, String content) { - return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, Traces.createTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8)); + return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, Traces.loadTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8)); } public MessageRecord createMessageRecord(int userid, String topic, String resptopic, String traceid, String content) { @@ -139,7 +141,7 @@ public abstract class MessageClient { } public MessageRecord createMessageRecord(String topic, String resptopic, Convert convert, Object bean) { - return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, Traces.createTraceid(), convert.convertToBytes(bean)); + return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, Traces.loadTraceid(), convert.convertToBytes(bean)); } public MessageRecord createMessageRecord(String topic, String resptopic, String traceid, Convert convert, Object bean) { @@ -147,27 +149,27 @@ public abstract class MessageClient { } public MessageRecord createMessageRecord(int userid, String topic, String resptopic, Convert convert, Object bean) { - return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, Traces.createTraceid(), convert.convertToBytes(bean)); + return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, Traces.loadTraceid(), convert.convertToBytes(bean)); } public MessageRecord createMessageRecord(int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) { - return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, groupid, topic, resptopic, Traces.createTraceid(), convert.convertToBytes(bean)); + return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, groupid, topic, resptopic, Traces.loadTraceid(), convert.convertToBytes(bean)); } public MessageRecord createMessageRecord(int flag, int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) { - return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, Traces.createTraceid(), convert.convertToBytes(bean)); + return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, Traces.loadTraceid(), convert.convertToBytes(bean)); } public MessageRecord createMessageRecord(String topic, String resptopic, byte[] content) { - return new MessageRecord(msgSeqno.incrementAndGet(), (byte) 0, topic, resptopic, Traces.createTraceid(), content); + return new MessageRecord(msgSeqno.incrementAndGet(), (byte) 0, topic, resptopic, Traces.loadTraceid(), content); } public MessageRecord createMessageRecord(long seqid, String topic, String resptopic, byte[] content) { - return new MessageRecord(seqid, (byte) 0, topic, resptopic, Traces.createTraceid(), content); + return new MessageRecord(seqid, (byte) 0, topic, resptopic, Traces.loadTraceid(), content); } protected MessageRecord createMessageRecord(byte ctype, String topic, String resptopic, byte[] content) { - return new MessageRecord(msgSeqno.incrementAndGet(), ctype, topic, resptopic, Traces.createTraceid(), content); + return new MessageRecord(msgSeqno.incrementAndGet(), ctype, topic, resptopic, Traces.loadTraceid(), content); } protected MessageRecord createMessageRecord(byte ctype, String topic, String resptopic, String traceid, byte[] content) { @@ -175,7 +177,7 @@ public abstract class MessageClient { } protected MessageRecord createMessageRecord(long seqid, byte ctype, String topic, String resptopic, byte[] content) { - return new MessageRecord(seqid, ctype, topic, resptopic, Traces.createTraceid(), content); + return new MessageRecord(seqid, ctype, topic, resptopic, Traces.loadTraceid(), content); } protected MessageRecord createMessageRecord(long seqid, byte ctype, String topic, String resptopic, String traceid, byte[] content) { diff --git a/src/main/java/org/redkale/mq/MessageRecord.java b/src/main/java/org/redkale/mq/MessageRecord.java index a885ccd31..c6ff59410 100644 --- a/src/main/java/org/redkale/mq/MessageRecord.java +++ b/src/main/java/org/redkale/mq/MessageRecord.java @@ -88,15 +88,15 @@ public class MessageRecord implements Serializable { public MessageRecord() { } - protected MessageRecord(long seqid, byte ctype, String topic, String resptopic, String traceid, byte[] content) { - this(seqid, ctype, 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, traceid, content); + protected MessageRecord(long seqid, byte ctype, String topic, String respTopic, String traceid, byte[] content) { + this(seqid, ctype, 1, 0, System.currentTimeMillis(), 0, null, topic, respTopic, traceid, content); } - protected MessageRecord(long seqid, byte ctype, int flag, Serializable userid, String groupid, String topic, String resptopic, String traceid, byte[] content) { - this(seqid, ctype, 1, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, traceid, content); + protected MessageRecord(long seqid, byte ctype, int flag, Serializable userid, String groupid, String topic, String respTopic, String traceid, byte[] content) { + this(seqid, ctype, 1, flag, System.currentTimeMillis(), userid, groupid, topic, respTopic, traceid, content); } - protected MessageRecord(long seqid, byte ctype, int version, int flag, long createTime, Serializable userid, String groupid, String topic, String resptopic, String traceid, byte[] content) { + protected MessageRecord(long seqid, byte ctype, int version, int flag, long createTime, Serializable userid, String groupid, String topic, String respTopic, String traceid, byte[] content) { this.seqid = seqid; this.ctype = ctype; this.version = version; @@ -105,7 +105,7 @@ public class MessageRecord implements Serializable { this.userid = userid; this.groupid = groupid; this.topic = topic; - this.respTopic = resptopic; + this.respTopic = respTopic; this.traceid = traceid; this.content = content; } diff --git a/src/main/java/org/redkale/mq/MessageRecordCoder.java b/src/main/java/org/redkale/mq/MessageRecordCoder.java index d95e97d32..ab4a02d9c 100644 --- a/src/main/java/org/redkale/mq/MessageRecordCoder.java +++ b/src/main/java/org/redkale/mq/MessageRecordCoder.java @@ -32,7 +32,7 @@ public class MessageRecordCoder implements MessageCoder { byte[] userid = MessageCoder.encodeUserid(data.getUserid()); byte[] groupid = MessageCoder.getBytes(data.getGroupid()); byte[] topic = MessageCoder.getBytes(data.getTopic()); - byte[] resptopic = MessageCoder.getBytes(data.getRespTopic()); + byte[] respTopic = MessageCoder.getBytes(data.getRespTopic()); byte[] traceid = MessageCoder.getBytes(data.getTraceid()); int count = 8 //seqid + 1 //ctype @@ -42,7 +42,7 @@ public class MessageRecordCoder implements MessageCoder { + 2 + userid.length + 2 + groupid.length + 2 + topic.length - + 2 + resptopic.length + + 2 + respTopic.length + 2 + traceid.length + 4 + (data.getContent() == null ? 0 : data.getContent().length); final byte[] bs = new byte[count]; @@ -62,8 +62,8 @@ public class MessageRecordCoder implements MessageCoder { buffer.putChar((char) topic.length); if (topic.length > 0) buffer.put(topic); - buffer.putChar((char) resptopic.length); - if (resptopic.length > 0) buffer.put(resptopic); + buffer.putChar((char) respTopic.length); + if (respTopic.length > 0) buffer.put(respTopic); buffer.putChar((char) traceid.length); if (traceid.length > 0) buffer.put(traceid); diff --git a/src/main/java/org/redkale/net/Context.java b/src/main/java/org/redkale/net/Context.java index 9c7f5bd41..f40ccba93 100644 --- a/src/main/java/org/redkale/net/Context.java +++ b/src/main/java/org/redkale/net/Context.java @@ -125,6 +125,7 @@ public class Context { workHashExecutor.execute(request.getHashid(), () -> { try { long cha = System.currentTimeMillis() - request.getCreateTime(); + Traces.currTraceid(request.getTraceid()); servlet.execute(request, response); if (cha > 1000 && response.context.logger.isLoggable(Level.WARNING)) { response.context.logger.log(Level.WARNING, "hash execute servlet delays=" + cha + "ms, request=" + request); @@ -139,6 +140,7 @@ public class Context { } else if (workExecutor != null) { workExecutor.execute(() -> { try { + Traces.currTraceid(request.getTraceid()); servlet.execute(request, response); } catch (Throwable t) { response.context.logger.log(Level.WARNING, "execute servlet abort, force to close channel ", t); @@ -147,6 +149,7 @@ public class Context { }); } else { try { + Traces.currTraceid(request.getTraceid()); servlet.execute(request, response); } catch (Throwable t) { response.context.logger.log(Level.WARNING, "execute servlet abort, force to close channel ", t); diff --git a/src/main/java/org/redkale/net/DispatcherServlet.java b/src/main/java/org/redkale/net/DispatcherServlet.java index 0555da39f..48e554d08 100644 --- a/src/main/java/org/redkale/net/DispatcherServlet.java +++ b/src/main/java/org/redkale/net/DispatcherServlet.java @@ -221,7 +221,7 @@ public abstract class DispatcherServlet { protected int hashid; + protected String traceid; + protected AsyncConnection channel; /** @@ -67,6 +69,7 @@ public abstract class Request { this.pipelineCount = request.pipelineCount; this.pipelineOver = request.pipelineOver; this.hashid = request.hashid; + this.traceid = request.traceid; this.channel = request.channel; } @@ -94,6 +97,7 @@ public abstract class Request { protected void recycle() { hashid = 0; + traceid = null; createTime = 0; pipelineIndex = 0; pipelineCount = 0; @@ -172,4 +176,12 @@ public abstract class Request { return this; } + public String getTraceid() { + return traceid; + } + + public Request traceid(String traceid) { + this.traceid = traceid; + return this; + } } diff --git a/src/main/java/org/redkale/net/http/HttpRequest.java b/src/main/java/org/redkale/net/http/HttpRequest.java index 2724d3c6c..5dc58b4f4 100644 --- a/src/main/java/org/redkale/net/http/HttpRequest.java +++ b/src/main/java/org/redkale/net/http/HttpRequest.java @@ -89,8 +89,6 @@ public class HttpRequest extends Request { protected boolean rpc; - protected String traceid; - protected int readState = READ_STATE_ROUTE; // @since 2.1.0 @@ -801,7 +799,6 @@ public class HttpRequest extends Request { this.cookies = null; this.maybews = false; this.rpc = false; - this.traceid = null; this.readState = READ_STATE_ROUTE; this.currentUserid = CURRUSERID_NIL; this.currentUserSupplier = null; diff --git a/src/main/java/org/redkale/util/Traces.java b/src/main/java/org/redkale/util/Traces.java index c9b10f085..e1224093b 100644 --- a/src/main/java/org/redkale/util/Traces.java +++ b/src/main/java/org/redkale/util/Traces.java @@ -15,22 +15,22 @@ import java.util.function.Supplier; */ public class Traces { - private static final boolean disabled = !Boolean.getBoolean("redkale.trace.enable"); + private static final boolean enable = !Boolean.getBoolean("redkale.trace.enable"); private static final ThreadLocal localTrace = new ThreadLocal<>(); private static final Supplier tidSupplier = () -> Utility.uuid(); public static boolean enable() { - return !disabled; + return enable; } public static String onceTraceid() { - return disabled ? null : tidSupplier.get(); + return enable ? tidSupplier.get() : null; } - public static String createTraceid() { - if (disabled) return null; + public static String loadTraceid() { + if (!enable) return null; String traceid = localTrace.get(); if (traceid == null) { traceid = tidSupplier.get(); @@ -40,11 +40,13 @@ public class Traces { } public static void currTraceid(String traceid) { - localTrace.set(traceid); + if (enable) { + localTrace.set(traceid); + } } public static String currTraceid() { - return disabled ? null : localTrace.get(); + return enable ? localTrace.get() : null; } }