diff --git a/src/main/java/META-INF/application-template.xml b/src/main/java/META-INF/application-template.xml index 28e0e16ae..a260c57bc 100644 --- a/src/main/java/META-INF/application-template.xml +++ b/src/main/java/META-INF/application-template.xml @@ -133,6 +133,7 @@ System.setProperty("redkale.convert.tiny", "true"); System.setProperty("redkale.convert.pool.size", "128"); System.setProperty("redkale.convert.writer.buffer.defsize", "4096"); + System.setProperty("redkale.trace.enable", "false"); 节点下也可包含非节点. 非其节点可以通过@Resource(name="properties.xxxxxx")进行注入, 被注解的字段类型只能是AnyValue、AnyValue[] diff --git a/src/main/java/org/redkale/boot/Application.java b/src/main/java/org/redkale/boot/Application.java index 9c8a6f3e8..67359a25a 100644 --- a/src/main/java/org/redkale/boot/Application.java +++ b/src/main/java/org/redkale/boot/Application.java @@ -677,6 +677,7 @@ public final class Application { System.setProperty("redkale.convert.tiny", "true"); System.setProperty("redkale.convert.pool.size", "128"); System.setProperty("redkale.convert.writer.buffer.defsize", "4096"); + System.setProperty("redkale.trace.enable", "false"); final String confDir = this.confPath.toString(); // String pidstr = ""; diff --git a/src/main/java/org/redkale/mq/HttpMessageClient.java b/src/main/java/org/redkale/mq/HttpMessageClient.java index 81a7c4b19..bf29d82fb 100644 --- a/src/main/java/org/redkale/mq/HttpMessageClient.java +++ b/src/main/java/org/redkale/mq/HttpMessageClient.java @@ -174,20 +174,20 @@ public class HttpMessageClient extends MessageClient { } public CompletableFuture> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { - MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, HttpSimpleRequestCoder.getInstance().encode(request)); + MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), HttpSimpleRequestCoder.getInstance().encode(request)); message.userid(userid).groupid(groupid); //if (finest) logger.log(Level.FINEST, "HttpMessageClient.sendMessage: " + message); return sendMessage(message, true, counter).thenApply(r -> r.decodeContent(HttpResultCoder.getInstance())); } public void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { - MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, HttpSimpleRequestCoder.getInstance().encode(request)); + MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), HttpSimpleRequestCoder.getInstance().encode(request)); message.userid(userid).groupid(groupid); sendMessage(message, false, counter); } public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { - MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, HttpSimpleRequestCoder.getInstance().encode(request)); + MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), HttpSimpleRequestCoder.getInstance().encode(request)); message.userid(userid).groupid(groupid); sendMessage(message, false, counter); } diff --git a/src/main/java/org/redkale/mq/HttpSimpleRequestCoder.java b/src/main/java/org/redkale/mq/HttpSimpleRequestCoder.java index 661c4ac3e..d8d718ca7 100644 --- a/src/main/java/org/redkale/mq/HttpSimpleRequestCoder.java +++ b/src/main/java/org/redkale/mq/HttpSimpleRequestCoder.java @@ -30,6 +30,7 @@ public class HttpSimpleRequestCoder implements MessageCoder { @Override public byte[] encode(HttpSimpleRequest data) { + byte[] traceid = MessageCoder.getBytes(data.getTraceid());//short-string byte[] requestURI = MessageCoder.getBytes(data.getRequestURI()); //long-string byte[] path = MessageCoder.getBytes(data.getPath()); //short-string byte[] remoteAddr = MessageCoder.getBytes(data.getRemoteAddr());//short-string @@ -39,11 +40,11 @@ public class HttpSimpleRequestCoder implements MessageCoder { byte[] params = MessageCoder.getBytes(data.getParams()); byte[] body = MessageCoder.getBytes(data.getBody()); byte[] userid = MessageCoder.encodeUserid(data.getCurrentUserid()); - int count = 1 //rpc - + 1 //frombody + int count = 1 //rpc + frombody + 4 //hashid + 4 //reqConvertType + 4 //respConvertType + + 2 + traceid.length + 4 + requestURI.length + 2 + path.length + 2 + remoteAddr.length @@ -54,11 +55,12 @@ public class HttpSimpleRequestCoder implements MessageCoder { + 4 + body.length; byte[] bs = new byte[count]; ByteBuffer buffer = ByteBuffer.wrap(bs); - buffer.put((byte) (data.isRpc() ? 'T' : 'F')); - buffer.put((byte) (data.isFrombody() ? 'T' : 'F')); + buffer.put((byte) ((data.isRpc() ? 0b01 : 0) | (data.isFrombody() ? 0b10 : 0))); buffer.putInt(data.getHashid()); buffer.putInt(data.getReqConvertType() == null ? 0 : data.getReqConvertType().getValue()); buffer.putInt(data.getRespConvertType() == null ? 0 : data.getRespConvertType().getValue()); + buffer.putChar((char) traceid.length); + if (traceid.length > 0) buffer.put(traceid); buffer.putInt(requestURI.length); if (requestURI.length > 0) buffer.put(requestURI); buffer.putChar((char) path.length); @@ -83,13 +85,15 @@ public class HttpSimpleRequestCoder implements MessageCoder { if (data == null) return null; ByteBuffer buffer = ByteBuffer.wrap(data); HttpSimpleRequest req = new HttpSimpleRequest(); - req.setRpc(buffer.get() == 'T'); - req.setFrombody(buffer.get() == 'T'); + byte opt = buffer.get(); + req.setRpc((opt & 0b01) > 0); + req.setFrombody((opt & 0b10) > 0); req.setHashid(buffer.getInt()); int reqformat = buffer.getInt(); int respformat = buffer.getInt(); if (reqformat != 0) req.setReqConvertType(ConvertType.find(reqformat)); if (respformat != 0) req.setRespConvertType(ConvertType.find(respformat)); + req.setTraceid(MessageCoder.getShortString(buffer)); req.setRequestURI(MessageCoder.getLongString(buffer)); req.setPath(MessageCoder.getShortString(buffer)); req.setRemoteAddr(MessageCoder.getShortString(buffer)); diff --git a/src/main/java/org/redkale/mq/MessageClient.java b/src/main/java/org/redkale/mq/MessageClient.java index e1eaabcf2..10d635c55 100644 --- a/src/main/java/org/redkale/mq/MessageClient.java +++ b/src/main/java/org/redkale/mq/MessageClient.java @@ -178,10 +178,18 @@ public abstract class MessageClient { return new MessageRecord(msgSeqno.incrementAndGet(), ctype, topic, resptopic, Traces.createTraceid(), content); } + protected MessageRecord createMessageRecord(byte ctype, String topic, String resptopic, String traceid, byte[] content) { + return new MessageRecord(msgSeqno.incrementAndGet(), ctype, topic, resptopic, traceid, content); + } + protected MessageRecord createMessageRecord(long seqid, byte ctype, String topic, String resptopic, byte[] content) { return new MessageRecord(seqid, ctype, topic, resptopic, Traces.createTraceid(), content); } + protected MessageRecord createMessageRecord(long seqid, byte ctype, String topic, String resptopic, String traceid, byte[] content) { + return new MessageRecord(seqid, ctype, topic, resptopic, traceid, content); + } + private byte ctype(Convert convert, Object bean) { byte ctype = 0; if (convert instanceof JsonConvert) { diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index 157013e23..8233f8f11 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -188,16 +188,24 @@ public abstract class ClientConnection implements Co if (rs.exc != null) { if (workThread == null || workThread == Thread.currentThread() || workThread.inIO() || workThread.getState() != Thread.State.RUNNABLE) { + Traces.currTraceid(request.traceid); respFuture.completeExceptionally(rs.exc); } else { - workThread.execute(() -> respFuture.completeExceptionally(rs.exc)); + workThread.execute(() -> { + Traces.currTraceid(request.traceid); + respFuture.completeExceptionally(rs.exc); + }); } } else { if (workThread == null || workThread == Thread.currentThread() || workThread.inIO() || workThread.getState() != Thread.State.RUNNABLE) { + Traces.currTraceid(request.traceid); respFuture.complete(rs.result); } else { - workThread.execute(() -> respFuture.complete(rs.result)); + workThread.execute(() -> { + Traces.currTraceid(request.traceid); + respFuture.complete(rs.result); + }); } } } catch (Throwable t) { diff --git a/src/main/java/org/redkale/net/client/ClientRequest.java b/src/main/java/org/redkale/net/client/ClientRequest.java index 2ac8508cb..316c3786f 100644 --- a/src/main/java/org/redkale/net/client/ClientRequest.java +++ b/src/main/java/org/redkale/net/client/ClientRequest.java @@ -7,7 +7,7 @@ package org.redkale.net.client; import java.util.function.*; import org.redkale.net.WorkThread; -import org.redkale.util.ByteArray; +import org.redkale.util.*; /** * @@ -24,12 +24,18 @@ public abstract class ClientRequest implements BiConsumer T currThread(WorkThread thread) { this.workThread = thread; return (T) this; @@ -56,10 +62,12 @@ public abstract class ClientRequest implements BiConsumer { protected boolean rpc; + protected String traceid; + protected int readState = READ_STATE_ROUTE; // @since 2.1.0 @@ -183,6 +185,7 @@ public class HttpRequest extends Request { protected HttpRequest initSimpleRequest(HttpSimpleRequest req, boolean needPath) { if (req != null) { this.rpc = req.rpc; + this.traceid = req.traceid; if (req.getBody() != null) this.array.put(req.getBody()); if (req.getHeaders() != null) this.headers.putAll(req.getHeaders()); this.frombody = req.isFrombody(); @@ -236,6 +239,7 @@ public class HttpRequest extends Request { req.setRequestURI(uri); req.setSessionid(getSessionid(false)); req.setRpc(this.rpc); + req.setTraceid(this.traceid); return req; } @@ -303,6 +307,7 @@ public class HttpRequest extends Request { this.keepAlive = httplast.keepAlive; this.maybews = httplast.maybews; this.rpc = httplast.rpc; + this.traceid = httplast.traceid; this.hashid = httplast.hashid; this.currentUserid = httplast.currentUserid; this.frombody = httplast.frombody; @@ -760,6 +765,7 @@ public class HttpRequest extends Request { req.keepAlive = this.keepAlive; req.maybews = this.maybews; req.rpc = this.rpc; + req.traceid = this.traceid; req.hashid = this.hashid; req.currentUserid = this.currentUserid; req.currentUserSupplier = this.currentUserSupplier; @@ -791,6 +797,7 @@ public class HttpRequest extends Request { this.cookies = null; this.maybews = false; this.rpc = false; + this.traceid = null; this.readState = READ_STATE_ROUTE; this.currentUserid = CURRUSERID_NIL; this.currentUserSupplier = null; diff --git a/src/main/java/org/redkale/net/http/HttpSimpleRequest.java b/src/main/java/org/redkale/net/http/HttpSimpleRequest.java index fda27b0d2..3ae4b15c6 100644 --- a/src/main/java/org/redkale/net/http/HttpSimpleRequest.java +++ b/src/main/java/org/redkale/net/http/HttpSimpleRequest.java @@ -35,61 +35,65 @@ public class HttpSimpleRequest implements java.io.Serializable { protected boolean frombody; @ConvertColumn(index = 3) + @Comment("链路ID") + protected String traceid; + + @ConvertColumn(index = 4) @Comment("请求参数的ConvertType") protected ConvertType reqConvertType; - @ConvertColumn(index = 4) + @ConvertColumn(index = 5) @Comment("输出结果的ConvertType") protected ConvertType respConvertType; - @ConvertColumn(index = 5) + @ConvertColumn(index = 6) @Comment("请求的URI") protected String requestURI; - @ConvertColumn(index = 6) + @ConvertColumn(index = 7) @Comment("请求的前缀") protected String path; - @ConvertColumn(index = 7) + @ConvertColumn(index = 8) @Comment("客户端IP") protected String remoteAddr; - @ConvertColumn(index = 8) + @ConvertColumn(index = 9) @Comment("Locale国际化") protected String locale; - @ConvertColumn(index = 9) + @ConvertColumn(index = 10) @Comment("会话ID") protected String sessionid; - @ConvertColumn(index = 10) + @ConvertColumn(index = 11) @Comment("Content-Type") protected String contentType; - @ConvertColumn(index = 11) + @ConvertColumn(index = 12) protected int hashid; - @ConvertColumn(index = 12) //@since 2.5.0 由int改成Serializable, 具体数据类型只能是int、long、String + @ConvertColumn(index = 13) //@since 2.5.0 由int改成Serializable, 具体数据类型只能是int、long、String protected Serializable currentUserid; - @ConvertColumn(index = 13) + @ConvertColumn(index = 14) @Comment("http header信息") protected Map headers; - @ConvertColumn(index = 14) + @ConvertColumn(index = 15) @Comment("参数信息") protected Map params; - @ConvertColumn(index = 15) + @ConvertColumn(index = 16) @Comment("http body信息") protected byte[] body; //对应HttpRequest.array public static HttpSimpleRequest create(String requestURI) { - return new HttpSimpleRequest().requestURI(requestURI); + return new HttpSimpleRequest().requestURI(requestURI).traceid(Traces.currTraceid()); } public static HttpSimpleRequest create(String requestURI, Object... params) { - HttpSimpleRequest req = new HttpSimpleRequest().requestURI(requestURI); + HttpSimpleRequest req = new HttpSimpleRequest().requestURI(requestURI).traceid(Traces.currTraceid()); int len = params.length / 2; for (int i = 0; i < len; i++) { req.param(params[i * 2].toString(), params[i * 2 + 1]); @@ -115,6 +119,11 @@ public class HttpSimpleRequest implements java.io.Serializable { return this; } + public HttpSimpleRequest traceid(String traceid) { + this.traceid = traceid; + return this; + } + public HttpSimpleRequest requestURI(String requestURI) { this.requestURI = requestURI; return this; @@ -298,6 +307,14 @@ public class HttpSimpleRequest implements java.io.Serializable { this.rpc = rpc; } + public String getTraceid() { + return traceid; + } + + public void setTraceid(String traceid) { + this.traceid = traceid; + } + public String getRequestURI() { return requestURI; } diff --git a/src/main/java/org/redkale/util/Traces.java b/src/main/java/org/redkale/util/Traces.java index 88712d09c..c9b10f085 100644 --- a/src/main/java/org/redkale/util/Traces.java +++ b/src/main/java/org/redkale/util/Traces.java @@ -2,6 +2,8 @@ */ package org.redkale.util; +import java.util.function.Supplier; + /** * 创建traceid工具类 * @@ -15,21 +17,23 @@ public class Traces { private static final boolean disabled = !Boolean.getBoolean("redkale.trace.enable"); - private static ThreadLocal localTrace = new ThreadLocal<>(); + private static final ThreadLocal localTrace = new ThreadLocal<>(); + + private static final Supplier tidSupplier = () -> Utility.uuid(); public static boolean enable() { return !disabled; } public static String onceTraceid() { - return disabled ? null : Utility.uuid(); + return disabled ? null : tidSupplier.get(); } public static String createTraceid() { if (disabled) return null; String traceid = localTrace.get(); if (traceid == null) { - traceid = Utility.uuid(); + traceid = tidSupplier.get(); localTrace.set(traceid); } return traceid;