From 4d79f17a3ab0b85fea25c079fb1d8a20c5bd0782 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Thu, 12 Nov 2020 21:38:21 +0800 Subject: [PATCH] --- src/org/redkale/mq/MessageAgent.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index 3ab046793..15af987f8 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -55,6 +55,8 @@ public abstract class MessageAgent { protected ThreadHashExecutor workExecutor; + protected int producerCount = 1; + //本地Service消息接收处理器, key:consumer protected HashMap messageNodes = new LinkedHashMap<>(); @@ -62,6 +64,7 @@ public abstract class MessageAgent { this.name = checkName(config.getValue("name", "")); this.httpMessageClient = new HttpMessageClient(this); this.sncpMessageClient = new SncpMessageClient(this); + this.producerCount = config.getIntValue("producers", 1); if ("hash".equalsIgnoreCase(config.getValue("pool", "hash"))) { this.workExecutor = new ThreadHashExecutor(Math.max(4, config.getIntValue("threads", Runtime.getRuntime().availableProcessors()))); } @@ -169,7 +172,7 @@ public abstract class MessageAgent { if (this.sncpProducer == null) { synchronized (sncpProducerLock) { if (this.sncpProducer == null) { - MessageProducer[] producers = new MessageProducer[2]; + MessageProducer[] producers = new MessageProducer[producerCount]; for (int i = 0; i < producers.length; i++) { MessageProducer producer = createProducer("SncpProducer"); producer.startup().join(); @@ -186,7 +189,7 @@ public abstract class MessageAgent { if (this.httpProducer == null) { synchronized (httpProducerLock) { if (this.httpProducer == null) { - MessageProducer[] producers = new MessageProducer[2]; + MessageProducer[] producers = new MessageProducer[producerCount]; for (int i = 0; i < producers.length; i++) { MessageProducer producer = createProducer("HttpProducer"); producer.startup().join();