diff --git a/src/org/redkale/mq/HttpMessageProcessor.java b/src/org/redkale/mq/HttpMessageProcessor.java index a0b00cae4..48a49fb3e 100644 --- a/src/org/redkale/mq/HttpMessageProcessor.java +++ b/src/org/redkale/mq/HttpMessageProcessor.java @@ -10,6 +10,7 @@ import java.util.logging.*; import org.redkale.boot.NodeHttpServer; import org.redkale.net.http.*; import org.redkale.service.Service; +import org.redkale.util.ThreadHashExecutor; /** * @@ -30,7 +31,7 @@ public class HttpMessageProcessor implements MessageProcessor { protected final NodeHttpServer server; - protected final ThreadPoolExecutor workExecutor; + protected final ThreadHashExecutor workExecutor; protected final Service service; @@ -48,7 +49,7 @@ public class HttpMessageProcessor implements MessageProcessor { if (cdl != null) cdl.countDown(); }; - public HttpMessageProcessor(Logger logger, MessageProducers producer, NodeHttpServer server, Service service, HttpServlet servlet) { + public HttpMessageProcessor(Logger logger, ThreadHashExecutor workExecutor, MessageProducers producer, NodeHttpServer server, Service service, HttpServlet servlet) { this.logger = logger; this.finest = logger.isLoggable(Level.FINEST); this.producer = producer; @@ -59,7 +60,7 @@ public class HttpMessageProcessor implements MessageProcessor { this.multiconsumer = mmc != null; this.restmodule = "/" + Rest.getRestModule(service) + "/"; this.multimodule = mmc != null ? ("/" + mmc.module() + "/") : null; - this.workExecutor = server.getServer().getWorkExecutor(); + this.workExecutor = workExecutor; } @Override @@ -72,7 +73,7 @@ public class HttpMessageProcessor implements MessageProcessor { if (this.workExecutor == null) { execute(message, innerCallback); } else { - this.workExecutor.execute(() -> execute(message, innerCallback)); + this.workExecutor.execute(message.hash(), () -> execute(message, innerCallback)); } } diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index 6de5fc967..e128a2f08 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -53,6 +53,8 @@ public abstract class MessageAgent { protected ScheduledThreadPoolExecutor timeoutExecutor; + protected ThreadHashExecutor workExecutor; + //本地Service消息接收处理器, key:consumer protected HashMap messageNodes = new LinkedHashMap<>(); @@ -60,6 +62,7 @@ public abstract class MessageAgent { this.name = checkName(config.getValue("name", "")); this.httpMessageClient = new HttpMessageClient(this); this.sncpMessageClient = new SncpMessageClient(this); + this.workExecutor = new ThreadHashExecutor(Math.max(4, Runtime.getRuntime().availableProcessors())); // application (it doesn't execute completion handlers). this.timeoutExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, (Runnable r) -> { Thread t = new Thread(r); @@ -93,6 +96,7 @@ public abstract class MessageAgent { public void destroy(AnyValue config) { this.httpMessageClient.close().join(); this.sncpMessageClient.close().join(); + this.workExecutor.shutdown(); if (this.timeoutExecutor != null) this.timeoutExecutor.shutdown(); if (this.sncpProducer != null) this.sncpProducer.shutdown().join(); if (this.httpProducer != null) this.httpProducer.shutdown().join(); @@ -211,7 +215,7 @@ public abstract class MessageAgent { String[] topics = generateHttpReqTopics(service); String consumerid = generateHttpConsumerid(topics, service); if (messageNodes.containsKey(consumerid)) throw new RuntimeException("consumerid(" + consumerid + ") is repeat"); - HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, getHttpProducer(), ns, service, servlet); + HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, this.workExecutor, getHttpProducer(), ns, service, servlet); this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(topics, consumerid, processor))); } @@ -219,7 +223,7 @@ public abstract class MessageAgent { String topic = generateSncpReqTopic(service); String consumerid = generateSncpConsumerid(topic, service); if (messageNodes.containsKey(consumerid)) throw new RuntimeException("consumerid(" + consumerid + ") is repeat"); - SncpMessageProcessor processor = new SncpMessageProcessor(this.logger, getSncpProducer(), ns, service, servlet); + SncpMessageProcessor processor = new SncpMessageProcessor(this.logger, this.workExecutor, getSncpProducer(), ns, service, servlet); this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(new String[]{topic}, consumerid, processor))); } diff --git a/src/org/redkale/mq/SncpMessageProcessor.java b/src/org/redkale/mq/SncpMessageProcessor.java index 9a1125638..937f5ce52 100644 --- a/src/org/redkale/mq/SncpMessageProcessor.java +++ b/src/org/redkale/mq/SncpMessageProcessor.java @@ -10,6 +10,7 @@ import java.util.logging.*; import org.redkale.boot.NodeSncpServer; import org.redkale.net.sncp.*; import org.redkale.service.Service; +import org.redkale.util.ThreadHashExecutor; /** * @@ -28,7 +29,7 @@ public class SncpMessageProcessor implements MessageProcessor { protected final NodeSncpServer server; - protected final ThreadPoolExecutor workExecutor; + protected final ThreadHashExecutor workExecutor; protected final Service service; @@ -40,13 +41,13 @@ public class SncpMessageProcessor implements MessageProcessor { if (cdl != null) cdl.countDown(); }; - public SncpMessageProcessor(Logger logger, MessageProducers producer, NodeSncpServer server, Service service, SncpServlet servlet) { + public SncpMessageProcessor(Logger logger, ThreadHashExecutor workExecutor, MessageProducers producer, NodeSncpServer server, Service service, SncpServlet servlet) { this.logger = logger; this.producer = producer; this.server = server; this.service = service; this.servlet = servlet; - this.workExecutor = server.getServer().getWorkExecutor(); + this.workExecutor = workExecutor; } @Override @@ -59,7 +60,7 @@ public class SncpMessageProcessor implements MessageProcessor { if (this.workExecutor == null) { execute(message, innerCallback); } else { - this.workExecutor.execute(() -> execute(message, innerCallback)); + this.workExecutor.execute(message.hash(), () -> execute(message, innerCallback)); } } @@ -69,7 +70,7 @@ public class SncpMessageProcessor implements MessageProcessor { SncpContext context = server.getSncpServer().getContext(); SncpMessageRequest request = new SncpMessageRequest(context, message); response = new SncpMessageResponse(context, request, callback, null, producer.getProducer(message)); - servlet.execute(request, response); + servlet.execute(request, response); } catch (Throwable ex) { if (response != null) response.finish(SncpResponse.RETCODE_ILLSERVICEID, null); logger.log(Level.SEVERE, SncpMessageProcessor.class.getSimpleName() + " process error, message=" + message, ex); diff --git a/src/org/redkale/util/ThreadHashExecutor.java b/src/org/redkale/util/ThreadHashExecutor.java new file mode 100644 index 000000000..9c34741e8 --- /dev/null +++ b/src/org/redkale/util/ThreadHashExecutor.java @@ -0,0 +1,61 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.util; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * 线程池 + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.1.0 + */ +public class ThreadHashExecutor { + + private final AtomicInteger index = new AtomicInteger(); + + private final ExecutorService[] executors; + + public ThreadHashExecutor(int size) { + ExecutorService[] array = new ExecutorService[size]; + for (int i = 0; i < array.length; i++) { + array[i] = Executors.newSingleThreadExecutor(); + } + this.executors = array; + } + + public void execute(int hash, Runnable command) { + if (hash < 1) { + this.executors[index.incrementAndGet() % this.executors.length].execute(command); + } else { + this.executors[hash % this.executors.length].execute(command); + } + } + + public void shutdown() { + for (ExecutorService executor : this.executors) { + executor.shutdown(); + } + } + + public List shutdownNow() { + List list = new ArrayList<>(); + for (ExecutorService executor : this.executors) { + list.addAll(executor.shutdownNow()); + } + return list; + } + + public boolean isShutdown() { + return this.executors[0].isShutdown(); + } +}