From e5766e31dc0ad0aacf0395047a5bc13356a37494 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Fri, 31 Jul 2020 09:50:59 +0800 Subject: [PATCH] --- src/org/redkale/mq/HttpMessageProcessor.java | 2 +- src/org/redkale/mq/SncpMessageProcessor.java | 14 ++++++++------ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/org/redkale/mq/HttpMessageProcessor.java b/src/org/redkale/mq/HttpMessageProcessor.java index 8648eec9d..860e23612 100644 --- a/src/org/redkale/mq/HttpMessageProcessor.java +++ b/src/org/redkale/mq/HttpMessageProcessor.java @@ -87,7 +87,7 @@ public class HttpMessageProcessor implements MessageProcessor { } HttpMessageResponse response = new HttpMessageResponse(context, request, callback, null, null, producer); servlet.execute(request, response); - } catch (Exception ex) { + } catch (Throwable ex) { if (message.getResptopic() != null && !message.getResptopic().isEmpty()) { HttpMessageResponse.finishHttpResult(finest, message, callback, producer, message.getResptopic(), new HttpResult().status(500)); } diff --git a/src/org/redkale/mq/SncpMessageProcessor.java b/src/org/redkale/mq/SncpMessageProcessor.java index 2eba9af93..12441aebf 100644 --- a/src/org/redkale/mq/SncpMessageProcessor.java +++ b/src/org/redkale/mq/SncpMessageProcessor.java @@ -36,6 +36,10 @@ public class SncpMessageProcessor implements MessageProcessor { protected CountDownLatch cdl; + protected final Runnable innerCallback = () -> { + if (cdl != null) cdl.countDown(); + }; + public SncpMessageProcessor(Logger logger, MessageProducer producer, NodeSncpServer server, Service service, SncpServlet servlet) { this.logger = logger; this.producer = producer; @@ -53,9 +57,9 @@ public class SncpMessageProcessor implements MessageProcessor { @Override public void process(final MessageRecord message, final Runnable callback) { if (this.workExecutor == null) { - execute(message, callback); + execute(message, innerCallback); } else { - this.workExecutor.execute(() -> execute(message, callback)); + this.workExecutor.execute(() -> execute(message, innerCallback)); } } @@ -66,11 +70,9 @@ public class SncpMessageProcessor implements MessageProcessor { SncpMessageRequest request = new SncpMessageRequest(context, message); response = new SncpMessageResponse(context, request, callback, null, producer); servlet.execute(request, response); - } catch (Exception ex) { + } catch (Throwable ex) { if (response != null) response.finish(SncpResponse.RETCODE_ILLSERVICEID, null); logger.log(Level.SEVERE, SncpMessageProcessor.class.getSimpleName() + " process error, message=" + message, ex); - } finally { - if (cdl != null) cdl.countDown(); } } @@ -78,7 +80,7 @@ public class SncpMessageProcessor implements MessageProcessor { public void commit() { if (this.cdl != null) { try { - this.cdl.await(); + this.cdl.await(30, TimeUnit.SECONDS); } catch (Exception ex) { } }