修改Traces方法名

This commit is contained in:
Redkale
2022-12-17 14:22:18 +08:00
parent 543db6f8ab
commit 1334e2439d
9 changed files with 53 additions and 37 deletions

View File

@@ -103,7 +103,7 @@ public class HttpMessageLocalClient extends HttpMessageClient {
} }
HttpRequest req = new HttpMessageLocalRequest(context(), request, userid); HttpRequest req = new HttpMessageLocalRequest(context(), request, userid);
HttpResponse resp = new HttpMessageLocalResponse(req, future); HttpResponse resp = new HttpMessageLocalResponse(req, future);
Traces.createTraceid(); Traces.loadTraceid();
try { try {
servlet.execute(req, resp); servlet.execute(req, resp);
} catch (Exception e) { } catch (Exception e) {
@@ -122,7 +122,7 @@ public class HttpMessageLocalClient extends HttpMessageClient {
HttpRequest req = new HttpMessageLocalRequest(context(), request, userid); HttpRequest req = new HttpMessageLocalRequest(context(), request, userid);
CompletableFuture future = new CompletableFuture(); CompletableFuture future = new CompletableFuture();
HttpResponse resp = new HttpMessageLocalResponse(req, future); HttpResponse resp = new HttpMessageLocalResponse(req, future);
Traces.createTraceid(); Traces.loadTraceid();
try { try {
servlet.execute(req, resp); servlet.execute(req, resp);
} catch (Exception e) { } catch (Exception e) {
@@ -149,7 +149,7 @@ public class HttpMessageLocalClient extends HttpMessageClient {
} }
HttpRequest req = new HttpMessageLocalRequest(context(), request, userid); HttpRequest req = new HttpMessageLocalRequest(context(), request, userid);
HttpResponse resp = new HttpMessageLocalResponse(req, null); HttpResponse resp = new HttpMessageLocalResponse(req, null);
Traces.createTraceid(); Traces.loadTraceid();
try { try {
servlet.execute(req, resp); servlet.execute(req, resp);
} catch (Exception e) { } catch (Exception e) {
@@ -162,7 +162,7 @@ public class HttpMessageLocalClient extends HttpMessageClient {
HttpDispatcherServlet ps = dispatcherServlet(); HttpDispatcherServlet ps = dispatcherServlet();
HttpRequest req = new HttpMessageLocalRequest(context(), request, userid); HttpRequest req = new HttpMessageLocalRequest(context(), request, userid);
HttpResponse resp = new HttpMessageLocalResponse(req, null); HttpResponse resp = new HttpMessageLocalResponse(req, null);
Traces.createTraceid(); Traces.loadTraceid();
ps.filterServletsByMmcTopic(topic).forEach(s -> { ps.filterServletsByMmcTopic(topic).forEach(s -> {
try { try {
s.execute(req, resp); s.execute(req, resp);

View File

@@ -57,7 +57,9 @@ public abstract class MessageClient {
try { try {
if (this.respConsumer == null) { if (this.respConsumer == null) {
synchronized (this) { synchronized (this) {
if (this.respConsumerid == null) this.respConsumerid = "consumer-" + this.respTopic; if (this.respConsumerid == null) {
this.respConsumerid = "consumer-" + this.respTopic;
}
if (this.respConsumer == null) { if (this.respConsumer == null) {
MessageProcessor processor = (msg, callback) -> { MessageProcessor processor = (msg, callback) -> {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
@@ -119,11 +121,11 @@ public abstract class MessageClient {
protected abstract MessageProducers getProducer(); protected abstract MessageProducers getProducer();
public MessageRecord createMessageRecord(String resptopic, String content) { public MessageRecord createMessageRecord(String resptopic, String content) {
return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, null, null, resptopic, Traces.createTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8)); return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, null, null, resptopic, Traces.loadTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8));
} }
public MessageRecord createMessageRecord(String topic, String resptopic, String content) { public MessageRecord createMessageRecord(String topic, String resptopic, String content) {
return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, Traces.createTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8)); return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, Traces.loadTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8));
} }
public MessageRecord createMessageRecord(String topic, String resptopic, String traceid, String content) { public MessageRecord createMessageRecord(String topic, String resptopic, String traceid, String content) {
@@ -131,7 +133,7 @@ public abstract class MessageClient {
} }
public MessageRecord createMessageRecord(int userid, String topic, String resptopic, String content) { public MessageRecord createMessageRecord(int userid, String topic, String resptopic, String content) {
return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, Traces.createTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8)); return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, Traces.loadTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8));
} }
public MessageRecord createMessageRecord(int userid, String topic, String resptopic, String traceid, String content) { public MessageRecord createMessageRecord(int userid, String topic, String resptopic, String traceid, String content) {
@@ -139,7 +141,7 @@ public abstract class MessageClient {
} }
public MessageRecord createMessageRecord(String topic, String resptopic, Convert convert, Object bean) { public MessageRecord createMessageRecord(String topic, String resptopic, Convert convert, Object bean) {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, Traces.createTraceid(), convert.convertToBytes(bean)); return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, Traces.loadTraceid(), convert.convertToBytes(bean));
} }
public MessageRecord createMessageRecord(String topic, String resptopic, String traceid, Convert convert, Object bean) { public MessageRecord createMessageRecord(String topic, String resptopic, String traceid, Convert convert, Object bean) {
@@ -147,27 +149,27 @@ public abstract class MessageClient {
} }
public MessageRecord createMessageRecord(int userid, String topic, String resptopic, Convert convert, Object bean) { public MessageRecord createMessageRecord(int userid, String topic, String resptopic, Convert convert, Object bean) {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, Traces.createTraceid(), convert.convertToBytes(bean)); return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, Traces.loadTraceid(), convert.convertToBytes(bean));
} }
public MessageRecord createMessageRecord(int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) { public MessageRecord createMessageRecord(int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, groupid, topic, resptopic, Traces.createTraceid(), convert.convertToBytes(bean)); return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, groupid, topic, resptopic, Traces.loadTraceid(), convert.convertToBytes(bean));
} }
public MessageRecord createMessageRecord(int flag, int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) { public MessageRecord createMessageRecord(int flag, int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, Traces.createTraceid(), convert.convertToBytes(bean)); return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, Traces.loadTraceid(), convert.convertToBytes(bean));
} }
public MessageRecord createMessageRecord(String topic, String resptopic, byte[] content) { public MessageRecord createMessageRecord(String topic, String resptopic, byte[] content) {
return new MessageRecord(msgSeqno.incrementAndGet(), (byte) 0, topic, resptopic, Traces.createTraceid(), content); return new MessageRecord(msgSeqno.incrementAndGet(), (byte) 0, topic, resptopic, Traces.loadTraceid(), content);
} }
public MessageRecord createMessageRecord(long seqid, String topic, String resptopic, byte[] content) { public MessageRecord createMessageRecord(long seqid, String topic, String resptopic, byte[] content) {
return new MessageRecord(seqid, (byte) 0, topic, resptopic, Traces.createTraceid(), content); return new MessageRecord(seqid, (byte) 0, topic, resptopic, Traces.loadTraceid(), content);
} }
protected MessageRecord createMessageRecord(byte ctype, String topic, String resptopic, byte[] content) { protected MessageRecord createMessageRecord(byte ctype, String topic, String resptopic, byte[] content) {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype, topic, resptopic, Traces.createTraceid(), content); return new MessageRecord(msgSeqno.incrementAndGet(), ctype, topic, resptopic, Traces.loadTraceid(), content);
} }
protected MessageRecord createMessageRecord(byte ctype, String topic, String resptopic, String traceid, byte[] content) { protected MessageRecord createMessageRecord(byte ctype, String topic, String resptopic, String traceid, byte[] content) {
@@ -175,7 +177,7 @@ public abstract class MessageClient {
} }
protected MessageRecord createMessageRecord(long seqid, byte ctype, String topic, String resptopic, byte[] content) { protected MessageRecord createMessageRecord(long seqid, byte ctype, String topic, String resptopic, byte[] content) {
return new MessageRecord(seqid, ctype, topic, resptopic, Traces.createTraceid(), content); return new MessageRecord(seqid, ctype, topic, resptopic, Traces.loadTraceid(), content);
} }
protected MessageRecord createMessageRecord(long seqid, byte ctype, String topic, String resptopic, String traceid, byte[] content) { protected MessageRecord createMessageRecord(long seqid, byte ctype, String topic, String resptopic, String traceid, byte[] content) {

View File

@@ -88,15 +88,15 @@ public class MessageRecord implements Serializable {
public MessageRecord() { public MessageRecord() {
} }
protected MessageRecord(long seqid, byte ctype, String topic, String resptopic, String traceid, byte[] content) { protected MessageRecord(long seqid, byte ctype, String topic, String respTopic, String traceid, byte[] content) {
this(seqid, ctype, 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, traceid, content); this(seqid, ctype, 1, 0, System.currentTimeMillis(), 0, null, topic, respTopic, traceid, content);
} }
protected MessageRecord(long seqid, byte ctype, int flag, Serializable userid, String groupid, String topic, String resptopic, String traceid, byte[] content) { protected MessageRecord(long seqid, byte ctype, int flag, Serializable userid, String groupid, String topic, String respTopic, String traceid, byte[] content) {
this(seqid, ctype, 1, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, traceid, content); this(seqid, ctype, 1, flag, System.currentTimeMillis(), userid, groupid, topic, respTopic, traceid, content);
} }
protected MessageRecord(long seqid, byte ctype, int version, int flag, long createTime, Serializable userid, String groupid, String topic, String resptopic, String traceid, byte[] content) { protected MessageRecord(long seqid, byte ctype, int version, int flag, long createTime, Serializable userid, String groupid, String topic, String respTopic, String traceid, byte[] content) {
this.seqid = seqid; this.seqid = seqid;
this.ctype = ctype; this.ctype = ctype;
this.version = version; this.version = version;
@@ -105,7 +105,7 @@ public class MessageRecord implements Serializable {
this.userid = userid; this.userid = userid;
this.groupid = groupid; this.groupid = groupid;
this.topic = topic; this.topic = topic;
this.respTopic = resptopic; this.respTopic = respTopic;
this.traceid = traceid; this.traceid = traceid;
this.content = content; this.content = content;
} }

View File

@@ -32,7 +32,7 @@ public class MessageRecordCoder implements MessageCoder<MessageRecord> {
byte[] userid = MessageCoder.encodeUserid(data.getUserid()); byte[] userid = MessageCoder.encodeUserid(data.getUserid());
byte[] groupid = MessageCoder.getBytes(data.getGroupid()); byte[] groupid = MessageCoder.getBytes(data.getGroupid());
byte[] topic = MessageCoder.getBytes(data.getTopic()); byte[] topic = MessageCoder.getBytes(data.getTopic());
byte[] resptopic = MessageCoder.getBytes(data.getRespTopic()); byte[] respTopic = MessageCoder.getBytes(data.getRespTopic());
byte[] traceid = MessageCoder.getBytes(data.getTraceid()); byte[] traceid = MessageCoder.getBytes(data.getTraceid());
int count = 8 //seqid int count = 8 //seqid
+ 1 //ctype + 1 //ctype
@@ -42,7 +42,7 @@ public class MessageRecordCoder implements MessageCoder<MessageRecord> {
+ 2 + userid.length + 2 + userid.length
+ 2 + groupid.length + 2 + groupid.length
+ 2 + topic.length + 2 + topic.length
+ 2 + resptopic.length + 2 + respTopic.length
+ 2 + traceid.length + 2 + traceid.length
+ 4 + (data.getContent() == null ? 0 : data.getContent().length); + 4 + (data.getContent() == null ? 0 : data.getContent().length);
final byte[] bs = new byte[count]; final byte[] bs = new byte[count];
@@ -62,8 +62,8 @@ public class MessageRecordCoder implements MessageCoder<MessageRecord> {
buffer.putChar((char) topic.length); buffer.putChar((char) topic.length);
if (topic.length > 0) buffer.put(topic); if (topic.length > 0) buffer.put(topic);
buffer.putChar((char) resptopic.length); buffer.putChar((char) respTopic.length);
if (resptopic.length > 0) buffer.put(resptopic); if (respTopic.length > 0) buffer.put(respTopic);
buffer.putChar((char) traceid.length); buffer.putChar((char) traceid.length);
if (traceid.length > 0) buffer.put(traceid); if (traceid.length > 0) buffer.put(traceid);

View File

@@ -125,6 +125,7 @@ public class Context {
workHashExecutor.execute(request.getHashid(), () -> { workHashExecutor.execute(request.getHashid(), () -> {
try { try {
long cha = System.currentTimeMillis() - request.getCreateTime(); long cha = System.currentTimeMillis() - request.getCreateTime();
Traces.currTraceid(request.getTraceid());
servlet.execute(request, response); servlet.execute(request, response);
if (cha > 1000 && response.context.logger.isLoggable(Level.WARNING)) { if (cha > 1000 && response.context.logger.isLoggable(Level.WARNING)) {
response.context.logger.log(Level.WARNING, "hash execute servlet delays=" + cha + "ms, request=" + request); response.context.logger.log(Level.WARNING, "hash execute servlet delays=" + cha + "ms, request=" + request);
@@ -139,6 +140,7 @@ public class Context {
} else if (workExecutor != null) { } else if (workExecutor != null) {
workExecutor.execute(() -> { workExecutor.execute(() -> {
try { try {
Traces.currTraceid(request.getTraceid());
servlet.execute(request, response); servlet.execute(request, response);
} catch (Throwable t) { } catch (Throwable t) {
response.context.logger.log(Level.WARNING, "execute servlet abort, force to close channel ", t); response.context.logger.log(Level.WARNING, "execute servlet abort, force to close channel ", t);
@@ -147,6 +149,7 @@ public class Context {
}); });
} else { } else {
try { try {
Traces.currTraceid(request.getTraceid());
servlet.execute(request, response); servlet.execute(request, response);
} catch (Throwable t) { } catch (Throwable t) {
response.context.logger.log(Level.WARNING, "execute servlet abort, force to close channel ", t); response.context.logger.log(Level.WARNING, "execute servlet abort, force to close channel ", t);

View File

@@ -221,7 +221,7 @@ public abstract class DispatcherServlet<K extends Serializable, C extends Contex
public final void prepare(final R request, final P response) { public final void prepare(final R request, final P response) {
try { try {
Traces.createTraceid(); Traces.loadTraceid();
request.prepare(); request.prepare();
response.filter = this.headFilter; response.filter = this.headFilter;
response.servlet = this; response.servlet = this;

View File

@@ -41,6 +41,8 @@ public abstract class Request<C extends Context> {
protected int hashid; protected int hashid;
protected String traceid;
protected AsyncConnection channel; protected AsyncConnection channel;
/** /**
@@ -67,6 +69,7 @@ public abstract class Request<C extends Context> {
this.pipelineCount = request.pipelineCount; this.pipelineCount = request.pipelineCount;
this.pipelineOver = request.pipelineOver; this.pipelineOver = request.pipelineOver;
this.hashid = request.hashid; this.hashid = request.hashid;
this.traceid = request.traceid;
this.channel = request.channel; this.channel = request.channel;
} }
@@ -94,6 +97,7 @@ public abstract class Request<C extends Context> {
protected void recycle() { protected void recycle() {
hashid = 0; hashid = 0;
traceid = null;
createTime = 0; createTime = 0;
pipelineIndex = 0; pipelineIndex = 0;
pipelineCount = 0; pipelineCount = 0;
@@ -172,4 +176,12 @@ public abstract class Request<C extends Context> {
return this; return this;
} }
public String getTraceid() {
return traceid;
}
public Request<C> traceid(String traceid) {
this.traceid = traceid;
return this;
}
} }

View File

@@ -89,8 +89,6 @@ public class HttpRequest extends Request<HttpContext> {
protected boolean rpc; protected boolean rpc;
protected String traceid;
protected int readState = READ_STATE_ROUTE; protected int readState = READ_STATE_ROUTE;
// @since 2.1.0 // @since 2.1.0
@@ -801,7 +799,6 @@ public class HttpRequest extends Request<HttpContext> {
this.cookies = null; this.cookies = null;
this.maybews = false; this.maybews = false;
this.rpc = false; this.rpc = false;
this.traceid = null;
this.readState = READ_STATE_ROUTE; this.readState = READ_STATE_ROUTE;
this.currentUserid = CURRUSERID_NIL; this.currentUserid = CURRUSERID_NIL;
this.currentUserSupplier = null; this.currentUserSupplier = null;

View File

@@ -15,22 +15,22 @@ import java.util.function.Supplier;
*/ */
public class Traces { public class Traces {
private static final boolean disabled = !Boolean.getBoolean("redkale.trace.enable"); private static final boolean enable = !Boolean.getBoolean("redkale.trace.enable");
private static final ThreadLocal<String> localTrace = new ThreadLocal<>(); private static final ThreadLocal<String> localTrace = new ThreadLocal<>();
private static final Supplier<String> tidSupplier = () -> Utility.uuid(); private static final Supplier<String> tidSupplier = () -> Utility.uuid();
public static boolean enable() { public static boolean enable() {
return !disabled; return enable;
} }
public static String onceTraceid() { public static String onceTraceid() {
return disabled ? null : tidSupplier.get(); return enable ? tidSupplier.get() : null;
} }
public static String createTraceid() { public static String loadTraceid() {
if (disabled) return null; if (!enable) return null;
String traceid = localTrace.get(); String traceid = localTrace.get();
if (traceid == null) { if (traceid == null) {
traceid = tidSupplier.get(); traceid = tidSupplier.get();
@@ -40,11 +40,13 @@ public class Traces {
} }
public static void currTraceid(String traceid) { public static void currTraceid(String traceid) {
if (enable) {
localTrace.set(traceid); localTrace.set(traceid);
} }
}
public static String currTraceid() { public static String currTraceid() {
return disabled ? null : localTrace.get(); return enable ? localTrace.get() : null;
} }
} }