trace完善

This commit is contained in:
redkale
2023-10-20 16:08:51 +08:00
parent 09596e9b4b
commit 2019c0ce3c
3 changed files with 14 additions and 10 deletions

View File

@@ -524,19 +524,19 @@ public abstract class MessageAgent implements Resourcable {
consumer.init(config);
}
public Future onMessage(MessageConext context, List<byte[]> messages) {
public Future onMessage(MessageConext context, String traceid, byte[] message) {
return messageAgent.submit(() -> {
Traces.computeIfAbsent(traceid);
Convert c = this.convert;
MessageConsumer m = this.consumer;
for (byte[] bs : messages) {
try {
m.onMessage(context, (T) c.convertFrom(messageType, bs));
} catch (Throwable t) {
messageAgent.getLogger().log(Level.SEVERE, m.getClass().getSimpleName()
+ " onMessage error, topic: " + context.getTopic()
+ ", messages: " + new String(bs, StandardCharsets.UTF_8));
}
try {
m.onMessage(context, (T) c.convertFrom(messageType, message));
} catch (Throwable t) {
messageAgent.getLogger().log(Level.SEVERE, m.getClass().getSimpleName()
+ " onMessage error, topic: " + context.getTopic()
+ ", messages: " + new String(message, StandardCharsets.UTF_8));
}
Traces.removeTraceid();
});
}

View File

@@ -5,6 +5,7 @@ package org.redkale.mq;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.redkale.util.Traces;
/**
* 响应结果
@@ -26,6 +27,7 @@ public class MessageRespProcessor implements MessageProcessor {
@Override
public void process(final MessageRecord msg, long time) {
String traceid = msg.getTraceid();
long now = System.currentTimeMillis();
Logger logger = messageClient.logger;
final boolean finest = logger.isLoggable(Level.FINEST);
@@ -42,6 +44,7 @@ public class MessageRespProcessor implements MessageProcessor {
logger.log(Level.FINEST, getClass().getSimpleName() + ".MessageRespFuture.receive (mq.delay = " + deplay + "ms, mq.seqid = " + msg.getSeqid() + ")");
}
messageClient.getMessageAgent().execute(() -> {
Traces.computeIfAbsent(traceid);
resp.future.complete(msg);
long comems = System.currentTimeMillis() - now;
if ((deplay > 1000 || comems > 1000) && logger.isLoggable(Level.FINE)) {
@@ -51,6 +54,7 @@ public class MessageRespProcessor implements MessageProcessor {
} else if (finest) {
logger.log(Level.FINEST, getClass().getSimpleName() + ".MessageRespFuture.complete (mq.delay-normal = " + deplay + "ms, mq.complete-normal = " + comems + "ms) mqresp.msg: " + msg);
}
Traces.removeTraceid();
});
}
}

View File

@@ -23,7 +23,7 @@ public class Traces {
private static final AtomicLong sequence = new AtomicLong(System.currentTimeMillis());
private static final Supplier<String> tidSupplier = () -> PROCESS_ID + sequence.incrementAndGet();
private static final Supplier<String> tidSupplier = () -> PROCESS_ID + Long.toHexString(sequence.incrementAndGet());
private static final ThreadLocal<String> localTrace = new ThreadLocal<>();