diff --git a/src/org/redkale/mq/HttpMessageProcessor.java b/src/org/redkale/mq/HttpMessageProcessor.java index ad0044f5a..8648eec9d 100644 --- a/src/org/redkale/mq/HttpMessageProcessor.java +++ b/src/org/redkale/mq/HttpMessageProcessor.java @@ -44,6 +44,10 @@ public class HttpMessageProcessor implements MessageProcessor { protected CountDownLatch cdl; + protected final Runnable innerCallback = () -> { + if (cdl != null) cdl.countDown(); + }; + public HttpMessageProcessor(Logger logger, MessageProducer producer, NodeHttpServer server, Service service, HttpServlet servlet) { this.logger = logger; this.finest = logger.isLoggable(Level.FINEST); @@ -66,9 +70,9 @@ public class HttpMessageProcessor implements MessageProcessor { @Override public void process(final MessageRecord message, final Runnable callback) { if (this.workExecutor == null) { - execute(message, callback); + execute(message, innerCallback); } else { - this.workExecutor.execute(() -> execute(message, callback)); + this.workExecutor.execute(() -> execute(message, innerCallback)); } } @@ -88,8 +92,6 @@ public class HttpMessageProcessor implements MessageProcessor { HttpMessageResponse.finishHttpResult(finest, message, callback, producer, message.getResptopic(), new HttpResult().status(500)); } logger.log(Level.SEVERE, HttpMessageProcessor.class.getSimpleName() + " process error, message=" + message, ex); - } finally { - if (cdl != null) cdl.countDown(); } } @@ -97,7 +99,7 @@ public class HttpMessageProcessor implements MessageProcessor { public void commit() { if (this.cdl != null) { try { - this.cdl.await(); + this.cdl.await(30, TimeUnit.SECONDS); } catch (Exception ex) { } }