This commit is contained in:
Redkale
2020-07-31 09:50:59 +08:00
parent fca727ecaf
commit e5766e31dc
2 changed files with 9 additions and 7 deletions

View File

@@ -87,7 +87,7 @@ public class HttpMessageProcessor implements MessageProcessor {
} }
HttpMessageResponse response = new HttpMessageResponse(context, request, callback, null, null, producer); HttpMessageResponse response = new HttpMessageResponse(context, request, callback, null, null, producer);
servlet.execute(request, response); servlet.execute(request, response);
} catch (Exception ex) { } catch (Throwable ex) {
if (message.getResptopic() != null && !message.getResptopic().isEmpty()) { if (message.getResptopic() != null && !message.getResptopic().isEmpty()) {
HttpMessageResponse.finishHttpResult(finest, message, callback, producer, message.getResptopic(), new HttpResult().status(500)); HttpMessageResponse.finishHttpResult(finest, message, callback, producer, message.getResptopic(), new HttpResult().status(500));
} }

View File

@@ -36,6 +36,10 @@ public class SncpMessageProcessor implements MessageProcessor {
protected CountDownLatch cdl; protected CountDownLatch cdl;
protected final Runnable innerCallback = () -> {
if (cdl != null) cdl.countDown();
};
public SncpMessageProcessor(Logger logger, MessageProducer producer, NodeSncpServer server, Service service, SncpServlet servlet) { public SncpMessageProcessor(Logger logger, MessageProducer producer, NodeSncpServer server, Service service, SncpServlet servlet) {
this.logger = logger; this.logger = logger;
this.producer = producer; this.producer = producer;
@@ -53,9 +57,9 @@ public class SncpMessageProcessor implements MessageProcessor {
@Override @Override
public void process(final MessageRecord message, final Runnable callback) { public void process(final MessageRecord message, final Runnable callback) {
if (this.workExecutor == null) { if (this.workExecutor == null) {
execute(message, callback); execute(message, innerCallback);
} else { } else {
this.workExecutor.execute(() -> execute(message, callback)); this.workExecutor.execute(() -> execute(message, innerCallback));
} }
} }
@@ -66,11 +70,9 @@ public class SncpMessageProcessor implements MessageProcessor {
SncpMessageRequest request = new SncpMessageRequest(context, message); SncpMessageRequest request = new SncpMessageRequest(context, message);
response = new SncpMessageResponse(context, request, callback, null, producer); response = new SncpMessageResponse(context, request, callback, null, producer);
servlet.execute(request, response); servlet.execute(request, response);
} catch (Exception ex) { } catch (Throwable ex) {
if (response != null) response.finish(SncpResponse.RETCODE_ILLSERVICEID, null); if (response != null) response.finish(SncpResponse.RETCODE_ILLSERVICEID, null);
logger.log(Level.SEVERE, SncpMessageProcessor.class.getSimpleName() + " process error, message=" + message, ex); logger.log(Level.SEVERE, SncpMessageProcessor.class.getSimpleName() + " process error, message=" + message, ex);
} finally {
if (cdl != null) cdl.countDown();
} }
} }
@@ -78,7 +80,7 @@ public class SncpMessageProcessor implements MessageProcessor {
public void commit() { public void commit() {
if (this.cdl != null) { if (this.cdl != null) {
try { try {
this.cdl.await(); this.cdl.await(30, TimeUnit.SECONDS);
} catch (Exception ex) { } catch (Exception ex) {
} }
} }