diff --git a/src/org/redkale/boot/NodeServer.java b/src/org/redkale/boot/NodeServer.java index f128e28bb..1b401746a 100644 --- a/src/org/redkale/boot/NodeServer.java +++ b/src/org/redkale/boot/NodeServer.java @@ -154,9 +154,9 @@ public abstract class NodeServer { //必须要进行初始化, 构建Service时需要使用Context中的ExecutorService server.init(this.serverConf); //init之后才有Executor - resourceFactory.register(Server.RESNAME_SERVER_EXECUTOR, Executor.class, server.getExecutor()); - resourceFactory.register(Server.RESNAME_SERVER_EXECUTOR, ExecutorService.class, server.getExecutor()); - resourceFactory.register(Server.RESNAME_SERVER_EXECUTOR, ThreadPoolExecutor.class, server.getExecutor()); + resourceFactory.register(Server.RESNAME_SERVER_EXECUTOR, Executor.class, server.getWorkExecutor()); + resourceFactory.register(Server.RESNAME_SERVER_EXECUTOR, ExecutorService.class, server.getWorkExecutor()); + resourceFactory.register(Server.RESNAME_SERVER_EXECUTOR, ThreadPoolExecutor.class, server.getWorkExecutor()); initResource(); //给 DataSource、CacheSource 注册依赖注入时的监听回调事件。 String interceptorClass = this.serverConf.getValue("interceptor", ""); diff --git a/src/org/redkale/mq/HttpMessageProcessor.java b/src/org/redkale/mq/HttpMessageProcessor.java index 8088d0ad7..fabcc2fd2 100644 --- a/src/org/redkale/mq/HttpMessageProcessor.java +++ b/src/org/redkale/mq/HttpMessageProcessor.java @@ -5,6 +5,7 @@ */ package org.redkale.mq; +import java.util.concurrent.*; import java.util.logging.*; import org.redkale.boot.NodeHttpServer; import org.redkale.net.http.*; @@ -29,6 +30,8 @@ public class HttpMessageProcessor implements MessageProcessor { protected final NodeHttpServer server; + protected final ThreadPoolExecutor workExecutor; + protected final Service service; protected final HttpServlet servlet; @@ -39,6 +42,8 @@ public class HttpMessageProcessor implements MessageProcessor { protected final String multimodule; // 前后有/, 例如: /userstat/ + protected CountDownLatch cdl; + public HttpMessageProcessor(Logger logger, MessageProducer producer, NodeHttpServer server, Service service, HttpServlet servlet) { this.logger = logger; this.finest = logger.isLoggable(Level.FINEST); @@ -50,10 +55,24 @@ public class HttpMessageProcessor implements MessageProcessor { this.multiconsumer = mmc != null; this.restmodule = "/" + Rest.getRestModule(service) + "/"; this.multimodule = mmc != null ? ("/" + mmc.module() + "/") : null; + this.workExecutor = server.getServer().getWorkExecutor(); } @Override - public void process(MessageRecord message, Runnable callback) { + public void begin(final int size) { + if (this.workExecutor != null) this.cdl = new CountDownLatch(size); + } + + @Override + public void process(final MessageRecord message, final Runnable callback) { + if (this.workExecutor == null) { + execute(message, callback); + } else { + this.workExecutor.execute(() -> execute(message, callback)); + } + } + + private void execute(final MessageRecord message, final Runnable callback) { try { if (finest) logger.log(Level.FINEST, "HttpMessageProcessor.process message: " + message); if (multiconsumer) message.setResptopic(null); //不容许有响应 @@ -69,6 +88,18 @@ public class HttpMessageProcessor implements MessageProcessor { HttpMessageResponse.finishHttpResult(message, callback, producer, message.getResptopic(), new HttpResult().status(500)); } logger.log(Level.SEVERE, HttpMessageProcessor.class.getSimpleName() + " process error, message=" + message, ex); + } finally { + if (cdl != null) cdl.countDown(); + } + } + + @Override + public void commit() { + if (this.cdl != null) { + try { + this.cdl.await(); + } catch (Exception ex) { + } } } diff --git a/src/org/redkale/mq/MessageMultiThreadProcessor.java b/src/org/redkale/mq/MessageMultiThreadProcessor.java deleted file mode 100644 index a59b8a38b..000000000 --- a/src/org/redkale/mq/MessageMultiThreadProcessor.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package org.redkale.mq; - -import java.util.concurrent.CountDownLatch; - -/** - * - * @author zhangjx - */ -public class MessageMultiThreadProcessor implements MessageProcessor { - - protected final MessageAgent messageAgent; - - protected final MessageProcessor processor; - - protected CountDownLatch cdl; - - public MessageMultiThreadProcessor(MessageAgent messageAgent, MessageProcessor processor) { - this.messageAgent = messageAgent; - this.processor = processor; - } - - @Override - public void begin(int size) { - } - - @Override - public void process(MessageRecord message, Runnable callback) { - } - - @Override - public void commit() { - } -} diff --git a/src/org/redkale/mq/SncpMessageProcessor.java b/src/org/redkale/mq/SncpMessageProcessor.java index 3e24736e7..2eba9af93 100644 --- a/src/org/redkale/mq/SncpMessageProcessor.java +++ b/src/org/redkale/mq/SncpMessageProcessor.java @@ -5,6 +5,7 @@ */ package org.redkale.mq; +import java.util.concurrent.*; import java.util.logging.*; import org.redkale.boot.NodeSncpServer; import org.redkale.net.sncp.*; @@ -27,28 +28,59 @@ public class SncpMessageProcessor implements MessageProcessor { protected final NodeSncpServer server; + protected final ThreadPoolExecutor workExecutor; + protected final Service service; protected final SncpServlet servlet; + protected CountDownLatch cdl; + public SncpMessageProcessor(Logger logger, MessageProducer producer, NodeSncpServer server, Service service, SncpServlet servlet) { this.logger = logger; this.producer = producer; this.server = server; this.service = service; this.servlet = servlet; + this.workExecutor = server.getServer().getWorkExecutor(); } @Override - public void process(MessageRecord message, Runnable callback) { - SncpContext context = server.getSncpServer().getContext(); - SncpMessageRequest request = new SncpMessageRequest(context, message); - SncpMessageResponse response = new SncpMessageResponse(context, request, callback, null, producer); + public void begin(final int size) { + if (this.workExecutor != null) this.cdl = new CountDownLatch(size); + } + + @Override + public void process(final MessageRecord message, final Runnable callback) { + if (this.workExecutor == null) { + execute(message, callback); + } else { + this.workExecutor.execute(() -> execute(message, callback)); + } + } + + private void execute(final MessageRecord message, final Runnable callback) { + SncpMessageResponse response = null; try { + SncpContext context = server.getSncpServer().getContext(); + SncpMessageRequest request = new SncpMessageRequest(context, message); + response = new SncpMessageResponse(context, request, callback, null, producer); servlet.execute(request, response); } catch (Exception ex) { - response.finish(SncpResponse.RETCODE_ILLSERVICEID, null); + if (response != null) response.finish(SncpResponse.RETCODE_ILLSERVICEID, null); logger.log(Level.SEVERE, SncpMessageProcessor.class.getSimpleName() + " process error, message=" + message, ex); + } finally { + if (cdl != null) cdl.countDown(); + } + } + + @Override + public void commit() { + if (this.cdl != null) { + try { + this.cdl.await(); + } catch (Exception ex) { + } } } diff --git a/src/org/redkale/net/Server.java b/src/org/redkale/net/Server.java index 2843a3f1e..fb10d660a 100644 --- a/src/org/redkale/net/Server.java +++ b/src/org/redkale/net/Server.java @@ -86,7 +86,7 @@ public abstract class Server { - Thread t = new WorkThread(executor, r); + this.workExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(threads, (Runnable r) -> { + Thread t = new WorkThread(workExecutor, r); t.setName("Redkale-" + n + "-ServletThread-" + f.format(counter.incrementAndGet())); return t; }); @@ -187,8 +187,8 @@ public abstract class Server