diff --git a/src/org/redkale/mq/HttpMessageClient.java b/src/org/redkale/mq/HttpMessageClient.java index 3e7907822..334b795d8 100644 --- a/src/org/redkale/mq/HttpMessageClient.java +++ b/src/org/redkale/mq/HttpMessageClient.java @@ -232,7 +232,7 @@ public class HttpMessageClient extends MessageClient { } @Override - protected MessageProducer getProducer() { + protected MessageProducers getProducer() { return messageAgent.getHttpProducer(); - } + } } diff --git a/src/org/redkale/mq/HttpMessageProcessor.java b/src/org/redkale/mq/HttpMessageProcessor.java index 860e23612..8c20ca99d 100644 --- a/src/org/redkale/mq/HttpMessageProcessor.java +++ b/src/org/redkale/mq/HttpMessageProcessor.java @@ -26,7 +26,7 @@ public class HttpMessageProcessor implements MessageProcessor { protected final Logger logger; - protected final MessageProducer producer; + protected final MessageProducers producer; protected final NodeHttpServer server; @@ -48,7 +48,7 @@ public class HttpMessageProcessor implements MessageProcessor { if (cdl != null) cdl.countDown(); }; - public HttpMessageProcessor(Logger logger, MessageProducer producer, NodeHttpServer server, Service service, HttpServlet servlet) { + public HttpMessageProcessor(Logger logger, MessageProducers producer, NodeHttpServer server, Service service, HttpServlet servlet) { this.logger = logger; this.finest = logger.isLoggable(Level.FINEST); this.producer = producer; @@ -85,11 +85,11 @@ public class HttpMessageProcessor implements MessageProcessor { if (multiconsumer) { request.setRequestURI(request.getRequestURI().replaceFirst(this.multimodule, this.restmodule)); } - HttpMessageResponse response = new HttpMessageResponse(context, request, callback, null, null, producer); + HttpMessageResponse response = new HttpMessageResponse(context, request, callback, null, null, producer.getProducer(message)); servlet.execute(request, response); } catch (Throwable ex) { if (message.getResptopic() != null && !message.getResptopic().isEmpty()) { - HttpMessageResponse.finishHttpResult(finest, message, callback, producer, message.getResptopic(), new HttpResult().status(500)); + HttpMessageResponse.finishHttpResult(finest, message, callback, producer.getProducer(message), message.getResptopic(), new HttpResult().status(500)); } logger.log(Level.SEVERE, HttpMessageProcessor.class.getSimpleName() + " process error, message=" + message, ex); } @@ -105,7 +105,7 @@ public class HttpMessageProcessor implements MessageProcessor { } } - public MessageProducer getProducer() { + public MessageProducers getProducer() { return producer; } diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index 4cb79ccb2..6de5fc967 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -39,9 +39,9 @@ public abstract class MessageAgent { protected AnyValue config; - protected MessageProducer httpProducer; + protected MessageProducers httpProducer; - protected MessageProducer sncpProducer; + protected MessageProducers sncpProducer; protected final Object httpProducerLock = new Object(); @@ -110,12 +110,12 @@ public abstract class MessageAgent { protected List getAllMessageProducer() { List producers = new ArrayList<>(); - if (this.httpProducer != null) producers.add(this.httpProducer); - if (this.sncpProducer != null) producers.add(this.sncpProducer); - MessageProducer one = this.httpMessageClient == null ? null : this.httpMessageClient.getProducer(); - if (one != null) producers.add(one); + if (this.httpProducer != null) producers.addAll(List.of(this.httpProducer.producers)); + if (this.sncpProducer != null) producers.addAll(List.of(this.sncpProducer.producers)); + MessageProducers one = this.httpMessageClient == null ? null : this.httpMessageClient.getProducer(); + if (one != null) producers.addAll(List.of(one.producers)); one = this.sncpMessageClient == null ? null : this.sncpMessageClient.getProducer(); - if (one != null) producers.add(one); + if (one != null) producers.addAll(List.of(one.producers)); return producers; } @@ -155,26 +155,34 @@ public abstract class MessageAgent { } //获取指定topic的生产处理器 - public MessageProducer getSncpProducer() { + public MessageProducers getSncpProducer() { if (this.sncpProducer == null) { synchronized (sncpProducerLock) { if (this.sncpProducer == null) { - MessageProducer producer = createProducer("SncpProducer"); - producer.startup().join(); - this.sncpProducer = producer; + MessageProducer[] producers = new MessageProducer[Runtime.getRuntime().availableProcessors()]; + for (int i = 0; i < producers.length; i++) { + MessageProducer producer = createProducer("SncpProducer"); + producer.startup().join(); + producers[i] = producer; + } + this.sncpProducer = new MessageProducers(producers); } } } return this.sncpProducer; } - public MessageProducer getHttpProducer() { + public MessageProducers getHttpProducer() { if (this.httpProducer == null) { synchronized (httpProducerLock) { if (this.httpProducer == null) { - MessageProducer producer = createProducer("HttpProducer"); - producer.startup().join(); - this.httpProducer = producer; + MessageProducer[] producers = new MessageProducer[Runtime.getRuntime().availableProcessors()]; + for (int i = 0; i < producers.length; i++) { + MessageProducer producer = createProducer("HttpProducer"); + producer.startup().join(); + producers[i] = producer; + } + this.httpProducer = new MessageProducers(producers); } } } diff --git a/src/org/redkale/mq/MessageClient.java b/src/org/redkale/mq/MessageClient.java index c4bd7c427..f805a3486 100644 --- a/src/org/redkale/mq/MessageClient.java +++ b/src/org/redkale/mq/MessageClient.java @@ -85,5 +85,5 @@ public abstract class MessageClient { } } - protected abstract MessageProducer getProducer(); + protected abstract MessageProducers getProducer(); } diff --git a/src/org/redkale/mq/MessageProducers.java b/src/org/redkale/mq/MessageProducers.java new file mode 100644 index 000000000..15fddc44a --- /dev/null +++ b/src/org/redkale/mq/MessageProducers.java @@ -0,0 +1,68 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.mq; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.1.0 + */ +public class MessageProducers { + + protected final MessageProducer[] producers; + + protected final AtomicInteger index = new AtomicInteger(); + + public MessageProducers(MessageProducer[] producers) { + this.producers = producers; + } + + public MessageProducer getProducer(MessageRecord message) { + int hash; + if (message.getGroupid() != null && !message.getGroupid().isEmpty()) { + hash = message.getGroupid().hashCode(); + } else if (message.getUserid() > 0) { + hash = message.getUserid(); + } else { + hash = index.incrementAndGet(); + if (index.get() > 1000 * producers.length) { + synchronized (index) { + if (index.get() > 1000 * producers.length) { + index.addAndGet(-1000 * producers.length); + } + } + } + } + return producers[hash % producers.length]; + } + + public CompletableFuture apply(MessageRecord message) { + return getProducer(message).apply(message); + } + + public CompletableFuture startup() { + CompletableFuture[] futures = new CompletableFuture[producers.length]; + for (int i = 0; i < producers.length; i++) { + futures[i] = producers[i].startup(); + } + return CompletableFuture.allOf(futures); + } + + public CompletableFuture shutdown() { + CompletableFuture[] futures = new CompletableFuture[producers.length]; + for (int i = 0; i < producers.length; i++) { + futures[i] = producers[i].shutdown(); + } + return CompletableFuture.allOf(futures); + } +} diff --git a/src/org/redkale/mq/SncpMessageClient.java b/src/org/redkale/mq/SncpMessageClient.java index f12e047e0..6b97ea703 100644 --- a/src/org/redkale/mq/SncpMessageClient.java +++ b/src/org/redkale/mq/SncpMessageClient.java @@ -27,7 +27,7 @@ public class SncpMessageClient extends MessageClient { } @Override - protected MessageProducer getProducer() { + protected MessageProducers getProducer() { return messageAgent.getSncpProducer(); } diff --git a/src/org/redkale/mq/SncpMessageProcessor.java b/src/org/redkale/mq/SncpMessageProcessor.java index 12441aebf..82478a51e 100644 --- a/src/org/redkale/mq/SncpMessageProcessor.java +++ b/src/org/redkale/mq/SncpMessageProcessor.java @@ -24,7 +24,7 @@ public class SncpMessageProcessor implements MessageProcessor { protected final Logger logger; - protected final MessageProducer producer; + protected final MessageProducers producer; protected final NodeSncpServer server; @@ -40,7 +40,7 @@ public class SncpMessageProcessor implements MessageProcessor { if (cdl != null) cdl.countDown(); }; - public SncpMessageProcessor(Logger logger, MessageProducer producer, NodeSncpServer server, Service service, SncpServlet servlet) { + public SncpMessageProcessor(Logger logger, MessageProducers producer, NodeSncpServer server, Service service, SncpServlet servlet) { this.logger = logger; this.producer = producer; this.server = server; @@ -68,8 +68,8 @@ public class SncpMessageProcessor implements MessageProcessor { try { SncpContext context = server.getSncpServer().getContext(); SncpMessageRequest request = new SncpMessageRequest(context, message); - response = new SncpMessageResponse(context, request, callback, null, producer); - servlet.execute(request, response); + response = new SncpMessageResponse(context, request, callback, null, producer.getProducer(message)); + servlet.execute(request, response); } catch (Throwable ex) { if (response != null) response.finish(SncpResponse.RETCODE_ILLSERVICEID, null); logger.log(Level.SEVERE, SncpMessageProcessor.class.getSimpleName() + " process error, message=" + message, ex); @@ -86,7 +86,7 @@ public class SncpMessageProcessor implements MessageProcessor { } } - public MessageProducer getProducer() { + public MessageProducers getProducer() { return producer; }