From bdd9a82682ebb9065922da214607f9a5a20c4d45 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Fri, 25 Dec 2020 18:03:19 +0800 Subject: [PATCH] =?UTF-8?q?mq=E5=8A=A0=E5=85=A5=E4=B8=80=E4=BA=9Bdebug?= =?UTF-8?q?=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/org/redkale/mq/HttpMessageProcessor.java | 15 ++++++++++----- src/org/redkale/mq/MessageProcessor.java | 2 +- src/org/redkale/mq/SncpMessageProcessor.java | 19 ++++++++++++++++++- 3 files changed, 29 insertions(+), 7 deletions(-) diff --git a/src/org/redkale/mq/HttpMessageProcessor.java b/src/org/redkale/mq/HttpMessageProcessor.java index 1b9656be1..7d4798164 100644 --- a/src/org/redkale/mq/HttpMessageProcessor.java +++ b/src/org/redkale/mq/HttpMessageProcessor.java @@ -47,6 +47,8 @@ public class HttpMessageProcessor implements MessageProcessor { protected CountDownLatch cdl; + protected long starttime; + protected final Runnable innerCallback = () -> { if (cdl != null) cdl.countDown(); }; @@ -67,7 +69,8 @@ public class HttpMessageProcessor implements MessageProcessor { } @Override - public void begin(final int size) { + public void begin(final int size, long starttime) { + this.starttime = starttime; if (this.workExecutor != null) this.cdl = new CountDownLatch(size); } @@ -83,11 +86,13 @@ public class HttpMessageProcessor implements MessageProcessor { private void execute(final MessageRecord message, final Runnable callback) { HttpMessageRequest request = null; try { - long cha = System.currentTimeMillis() - message.createtime; - if (cha > 50 || finer) { - logger.log(Level.FINER, "HttpMessageProcessor.process (mq.delays = " + cha + " ms) message: " + message); + long now = System.currentTimeMillis(); + long cha = now - message.createtime; + long e = now - starttime; + if (cha > 50 || e > 10 || finer) { + logger.log(Level.FINER, "HttpMessageProcessor.process (mq.delays = " + cha + " ms, mq.blocks = " + e + " ms) message: " + message); } else if (finest) { - logger.log(Level.FINEST, "HttpMessageProcessor.process (mq.delay = " + cha + " ms) message: " + message); + logger.log(Level.FINEST, "HttpMessageProcessor.process (mq.delay = " + cha + " ms, mq.blocks = " + e + " ms) message: " + message); } if (multiconsumer) message.setResptopic(null); //不容许有响应 HttpContext context = server.getHttpServer().getContext(); diff --git a/src/org/redkale/mq/MessageProcessor.java b/src/org/redkale/mq/MessageProcessor.java index d8d024759..6751e38d4 100644 --- a/src/org/redkale/mq/MessageProcessor.java +++ b/src/org/redkale/mq/MessageProcessor.java @@ -16,7 +16,7 @@ package org.redkale.mq; */ public interface MessageProcessor { - default void begin(int size) { + default void begin(int size, long starttime) { } public void process(MessageRecord message, Runnable callback); diff --git a/src/org/redkale/mq/SncpMessageProcessor.java b/src/org/redkale/mq/SncpMessageProcessor.java index f4548b07c..54f94e1eb 100644 --- a/src/org/redkale/mq/SncpMessageProcessor.java +++ b/src/org/redkale/mq/SncpMessageProcessor.java @@ -23,6 +23,10 @@ import org.redkale.util.ThreadHashExecutor; */ public class SncpMessageProcessor implements MessageProcessor { + protected final boolean finest; + + protected final boolean finer; + protected final Logger logger; protected final MessageProducers producer; @@ -37,12 +41,16 @@ public class SncpMessageProcessor implements MessageProcessor { protected CountDownLatch cdl; + protected long starttime; + protected final Runnable innerCallback = () -> { if (cdl != null) cdl.countDown(); }; public SncpMessageProcessor(Logger logger, ThreadHashExecutor workExecutor, MessageProducers producer, NodeSncpServer server, Service service, SncpServlet servlet) { this.logger = logger; + this.finest = logger.isLoggable(Level.FINEST); + this.finer = logger.isLoggable(Level.FINER); this.producer = producer; this.server = server; this.service = service; @@ -51,7 +59,8 @@ public class SncpMessageProcessor implements MessageProcessor { } @Override - public void begin(final int size) { + public void begin(final int size, long starttime) { + this.starttime = starttime; if (this.workExecutor != null) this.cdl = new CountDownLatch(size); } @@ -67,6 +76,14 @@ public class SncpMessageProcessor implements MessageProcessor { private void execute(final MessageRecord message, final Runnable callback) { SncpMessageResponse response = null; try { + long now = System.currentTimeMillis(); + long cha = now - message.createtime; + long e = now - starttime; + if (cha > 50 || e > 10 || finer) { + logger.log(Level.FINER, "SncpMessageProcessor.process (mq.delays = " + cha + " ms, mq.blocks = " + e + " ms) message: " + message); + } else if (finest) { + logger.log(Level.FINEST, "SncpMessageProcessor.process (mq.delay = " + cha + " ms, mq.blocks = " + e + " ms) message: " + message); + } SncpContext context = server.getSncpServer().getContext(); SncpMessageRequest request = new SncpMessageRequest(context, message); response = new SncpMessageResponse(context, request, callback, null, producer.getProducer(message));