diff --git a/src/main/java/org/redkale/mq/MessageAgent.java b/src/main/java/org/redkale/mq/MessageAgent.java index 5f8b11a5e..145cff4a5 100644 --- a/src/main/java/org/redkale/mq/MessageAgent.java +++ b/src/main/java/org/redkale/mq/MessageAgent.java @@ -524,19 +524,19 @@ public abstract class MessageAgent implements Resourcable { consumer.init(config); } - public Future onMessage(MessageConext context, List messages) { + public Future onMessage(MessageConext context, String traceid, byte[] message) { return messageAgent.submit(() -> { + Traces.computeIfAbsent(traceid); Convert c = this.convert; MessageConsumer m = this.consumer; - for (byte[] bs : messages) { - try { - m.onMessage(context, (T) c.convertFrom(messageType, bs)); - } catch (Throwable t) { - messageAgent.getLogger().log(Level.SEVERE, m.getClass().getSimpleName() - + " onMessage error, topic: " + context.getTopic() - + ", messages: " + new String(bs, StandardCharsets.UTF_8)); - } + try { + m.onMessage(context, (T) c.convertFrom(messageType, message)); + } catch (Throwable t) { + messageAgent.getLogger().log(Level.SEVERE, m.getClass().getSimpleName() + + " onMessage error, topic: " + context.getTopic() + + ", messages: " + new String(message, StandardCharsets.UTF_8)); } + Traces.removeTraceid(); }); } diff --git a/src/main/java/org/redkale/mq/MessageRespProcessor.java b/src/main/java/org/redkale/mq/MessageRespProcessor.java index abe981cd1..f543c5391 100644 --- a/src/main/java/org/redkale/mq/MessageRespProcessor.java +++ b/src/main/java/org/redkale/mq/MessageRespProcessor.java @@ -5,6 +5,7 @@ package org.redkale.mq; import java.util.logging.Level; import java.util.logging.Logger; +import org.redkale.util.Traces; /** * ε“εΊ”η»“ζžœ @@ -26,6 +27,7 @@ public class MessageRespProcessor implements MessageProcessor { @Override public void process(final MessageRecord msg, long time) { + String traceid = msg.getTraceid(); long now = System.currentTimeMillis(); Logger logger = messageClient.logger; final boolean finest = logger.isLoggable(Level.FINEST); @@ -42,6 +44,7 @@ public class MessageRespProcessor implements MessageProcessor { logger.log(Level.FINEST, getClass().getSimpleName() + ".MessageRespFuture.receive (mq.delay = " + deplay + "ms, mq.seqid = " + msg.getSeqid() + ")"); } messageClient.getMessageAgent().execute(() -> { + Traces.computeIfAbsent(traceid); resp.future.complete(msg); long comems = System.currentTimeMillis() - now; if ((deplay > 1000 || comems > 1000) && logger.isLoggable(Level.FINE)) { @@ -51,6 +54,7 @@ public class MessageRespProcessor implements MessageProcessor { } else if (finest) { logger.log(Level.FINEST, getClass().getSimpleName() + ".MessageRespFuture.complete (mq.delay-normal = " + deplay + "ms, mq.complete-normal = " + comems + "ms) mqresp.msg: " + msg); } + Traces.removeTraceid(); }); } } diff --git a/src/main/java/org/redkale/util/Traces.java b/src/main/java/org/redkale/util/Traces.java index c43b5dd2b..101ad10e0 100644 --- a/src/main/java/org/redkale/util/Traces.java +++ b/src/main/java/org/redkale/util/Traces.java @@ -23,7 +23,7 @@ public class Traces { private static final AtomicLong sequence = new AtomicLong(System.currentTimeMillis()); - private static final Supplier tidSupplier = () -> PROCESS_ID + sequence.incrementAndGet(); + private static final Supplier tidSupplier = () -> PROCESS_ID + Long.toHexString(sequence.incrementAndGet()); private static final ThreadLocal localTrace = new ThreadLocal<>();