This commit is contained in:
@@ -44,6 +44,10 @@ public class HttpMessageProcessor implements MessageProcessor {
|
||||
|
||||
protected CountDownLatch cdl;
|
||||
|
||||
protected final Runnable innerCallback = () -> {
|
||||
if (cdl != null) cdl.countDown();
|
||||
};
|
||||
|
||||
public HttpMessageProcessor(Logger logger, MessageProducer producer, NodeHttpServer server, Service service, HttpServlet servlet) {
|
||||
this.logger = logger;
|
||||
this.finest = logger.isLoggable(Level.FINEST);
|
||||
@@ -66,9 +70,9 @@ public class HttpMessageProcessor implements MessageProcessor {
|
||||
@Override
|
||||
public void process(final MessageRecord message, final Runnable callback) {
|
||||
if (this.workExecutor == null) {
|
||||
execute(message, callback);
|
||||
execute(message, innerCallback);
|
||||
} else {
|
||||
this.workExecutor.execute(() -> execute(message, callback));
|
||||
this.workExecutor.execute(() -> execute(message, innerCallback));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -88,8 +92,6 @@ public class HttpMessageProcessor implements MessageProcessor {
|
||||
HttpMessageResponse.finishHttpResult(finest, message, callback, producer, message.getResptopic(), new HttpResult().status(500));
|
||||
}
|
||||
logger.log(Level.SEVERE, HttpMessageProcessor.class.getSimpleName() + " process error, message=" + message, ex);
|
||||
} finally {
|
||||
if (cdl != null) cdl.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -97,7 +99,7 @@ public class HttpMessageProcessor implements MessageProcessor {
|
||||
public void commit() {
|
||||
if (this.cdl != null) {
|
||||
try {
|
||||
this.cdl.await();
|
||||
this.cdl.await(30, TimeUnit.SECONDS);
|
||||
} catch (Exception ex) {
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user