mq加入一些debug日志

This commit is contained in:
Redkale
2020-12-25 18:03:19 +08:00
parent c551c157d1
commit bdd9a82682
3 changed files with 29 additions and 7 deletions

View File

@@ -47,6 +47,8 @@ public class HttpMessageProcessor implements MessageProcessor {
protected CountDownLatch cdl; protected CountDownLatch cdl;
protected long starttime;
protected final Runnable innerCallback = () -> { protected final Runnable innerCallback = () -> {
if (cdl != null) cdl.countDown(); if (cdl != null) cdl.countDown();
}; };
@@ -67,7 +69,8 @@ public class HttpMessageProcessor implements MessageProcessor {
} }
@Override @Override
public void begin(final int size) { public void begin(final int size, long starttime) {
this.starttime = starttime;
if (this.workExecutor != null) this.cdl = new CountDownLatch(size); if (this.workExecutor != null) this.cdl = new CountDownLatch(size);
} }
@@ -83,11 +86,13 @@ public class HttpMessageProcessor implements MessageProcessor {
private void execute(final MessageRecord message, final Runnable callback) { private void execute(final MessageRecord message, final Runnable callback) {
HttpMessageRequest request = null; HttpMessageRequest request = null;
try { try {
long cha = System.currentTimeMillis() - message.createtime; long now = System.currentTimeMillis();
if (cha > 50 || finer) { long cha = now - message.createtime;
logger.log(Level.FINER, "HttpMessageProcessor.process (mq.delays = " + cha + " ms) message: " + message); long e = now - starttime;
if (cha > 50 || e > 10 || finer) {
logger.log(Level.FINER, "HttpMessageProcessor.process (mq.delays = " + cha + " ms, mq.blocks = " + e + " ms) message: " + message);
} else if (finest) { } else if (finest) {
logger.log(Level.FINEST, "HttpMessageProcessor.process (mq.delay = " + cha + " ms) message: " + message); logger.log(Level.FINEST, "HttpMessageProcessor.process (mq.delay = " + cha + " ms, mq.blocks = " + e + " ms) message: " + message);
} }
if (multiconsumer) message.setResptopic(null); //不容许有响应 if (multiconsumer) message.setResptopic(null); //不容许有响应
HttpContext context = server.getHttpServer().getContext(); HttpContext context = server.getHttpServer().getContext();

View File

@@ -16,7 +16,7 @@ package org.redkale.mq;
*/ */
public interface MessageProcessor { public interface MessageProcessor {
default void begin(int size) { default void begin(int size, long starttime) {
} }
public void process(MessageRecord message, Runnable callback); public void process(MessageRecord message, Runnable callback);

View File

@@ -23,6 +23,10 @@ import org.redkale.util.ThreadHashExecutor;
*/ */
public class SncpMessageProcessor implements MessageProcessor { public class SncpMessageProcessor implements MessageProcessor {
protected final boolean finest;
protected final boolean finer;
protected final Logger logger; protected final Logger logger;
protected final MessageProducers producer; protected final MessageProducers producer;
@@ -37,12 +41,16 @@ public class SncpMessageProcessor implements MessageProcessor {
protected CountDownLatch cdl; protected CountDownLatch cdl;
protected long starttime;
protected final Runnable innerCallback = () -> { protected final Runnable innerCallback = () -> {
if (cdl != null) cdl.countDown(); if (cdl != null) cdl.countDown();
}; };
public SncpMessageProcessor(Logger logger, ThreadHashExecutor workExecutor, MessageProducers producer, NodeSncpServer server, Service service, SncpServlet servlet) { public SncpMessageProcessor(Logger logger, ThreadHashExecutor workExecutor, MessageProducers producer, NodeSncpServer server, Service service, SncpServlet servlet) {
this.logger = logger; this.logger = logger;
this.finest = logger.isLoggable(Level.FINEST);
this.finer = logger.isLoggable(Level.FINER);
this.producer = producer; this.producer = producer;
this.server = server; this.server = server;
this.service = service; this.service = service;
@@ -51,7 +59,8 @@ public class SncpMessageProcessor implements MessageProcessor {
} }
@Override @Override
public void begin(final int size) { public void begin(final int size, long starttime) {
this.starttime = starttime;
if (this.workExecutor != null) this.cdl = new CountDownLatch(size); if (this.workExecutor != null) this.cdl = new CountDownLatch(size);
} }
@@ -67,6 +76,14 @@ public class SncpMessageProcessor implements MessageProcessor {
private void execute(final MessageRecord message, final Runnable callback) { private void execute(final MessageRecord message, final Runnable callback) {
SncpMessageResponse response = null; SncpMessageResponse response = null;
try { try {
long now = System.currentTimeMillis();
long cha = now - message.createtime;
long e = now - starttime;
if (cha > 50 || e > 10 || finer) {
logger.log(Level.FINER, "SncpMessageProcessor.process (mq.delays = " + cha + " ms, mq.blocks = " + e + " ms) message: " + message);
} else if (finest) {
logger.log(Level.FINEST, "SncpMessageProcessor.process (mq.delay = " + cha + " ms, mq.blocks = " + e + " ms) message: " + message);
}
SncpContext context = server.getSncpServer().getContext(); SncpContext context = server.getSncpServer().getContext();
SncpMessageRequest request = new SncpMessageRequest(context, message); SncpMessageRequest request = new SncpMessageRequest(context, message);
response = new SncpMessageResponse(context, request, callback, null, producer.getProducer(message)); response = new SncpMessageResponse(context, request, callback, null, producer.getProducer(message));