This commit is contained in:
@@ -55,6 +55,8 @@ public abstract class MessageAgent {
|
|||||||
|
|
||||||
protected ThreadHashExecutor workExecutor;
|
protected ThreadHashExecutor workExecutor;
|
||||||
|
|
||||||
|
protected int producerCount = 1;
|
||||||
|
|
||||||
//本地Service消息接收处理器, key:consumer
|
//本地Service消息接收处理器, key:consumer
|
||||||
protected HashMap<String, MessageConsumerNode> messageNodes = new LinkedHashMap<>();
|
protected HashMap<String, MessageConsumerNode> messageNodes = new LinkedHashMap<>();
|
||||||
|
|
||||||
@@ -62,6 +64,7 @@ public abstract class MessageAgent {
|
|||||||
this.name = checkName(config.getValue("name", ""));
|
this.name = checkName(config.getValue("name", ""));
|
||||||
this.httpMessageClient = new HttpMessageClient(this);
|
this.httpMessageClient = new HttpMessageClient(this);
|
||||||
this.sncpMessageClient = new SncpMessageClient(this);
|
this.sncpMessageClient = new SncpMessageClient(this);
|
||||||
|
this.producerCount = config.getIntValue("producers", 1);
|
||||||
if ("hash".equalsIgnoreCase(config.getValue("pool", "hash"))) {
|
if ("hash".equalsIgnoreCase(config.getValue("pool", "hash"))) {
|
||||||
this.workExecutor = new ThreadHashExecutor(Math.max(4, config.getIntValue("threads", Runtime.getRuntime().availableProcessors())));
|
this.workExecutor = new ThreadHashExecutor(Math.max(4, config.getIntValue("threads", Runtime.getRuntime().availableProcessors())));
|
||||||
}
|
}
|
||||||
@@ -169,7 +172,7 @@ public abstract class MessageAgent {
|
|||||||
if (this.sncpProducer == null) {
|
if (this.sncpProducer == null) {
|
||||||
synchronized (sncpProducerLock) {
|
synchronized (sncpProducerLock) {
|
||||||
if (this.sncpProducer == null) {
|
if (this.sncpProducer == null) {
|
||||||
MessageProducer[] producers = new MessageProducer[2];
|
MessageProducer[] producers = new MessageProducer[producerCount];
|
||||||
for (int i = 0; i < producers.length; i++) {
|
for (int i = 0; i < producers.length; i++) {
|
||||||
MessageProducer producer = createProducer("SncpProducer");
|
MessageProducer producer = createProducer("SncpProducer");
|
||||||
producer.startup().join();
|
producer.startup().join();
|
||||||
@@ -186,7 +189,7 @@ public abstract class MessageAgent {
|
|||||||
if (this.httpProducer == null) {
|
if (this.httpProducer == null) {
|
||||||
synchronized (httpProducerLock) {
|
synchronized (httpProducerLock) {
|
||||||
if (this.httpProducer == null) {
|
if (this.httpProducer == null) {
|
||||||
MessageProducer[] producers = new MessageProducer[2];
|
MessageProducer[] producers = new MessageProducer[producerCount];
|
||||||
for (int i = 0; i < producers.length; i++) {
|
for (int i = 0; i < producers.length; i++) {
|
||||||
MessageProducer producer = createProducer("HttpProducer");
|
MessageProducer producer = createProducer("HttpProducer");
|
||||||
producer.startup().join();
|
producer.startup().join();
|
||||||
|
|||||||
Reference in New Issue
Block a user