This commit is contained in:
Redkale
2020-07-31 10:13:53 +08:00
parent e5766e31dc
commit de19861974
7 changed files with 105 additions and 29 deletions

View File

@@ -232,7 +232,7 @@ public class HttpMessageClient extends MessageClient {
}
@Override
protected MessageProducer getProducer() {
protected MessageProducers getProducer() {
return messageAgent.getHttpProducer();
}
}
}

View File

@@ -26,7 +26,7 @@ public class HttpMessageProcessor implements MessageProcessor {
protected final Logger logger;
protected final MessageProducer producer;
protected final MessageProducers producer;
protected final NodeHttpServer server;
@@ -48,7 +48,7 @@ public class HttpMessageProcessor implements MessageProcessor {
if (cdl != null) cdl.countDown();
};
public HttpMessageProcessor(Logger logger, MessageProducer producer, NodeHttpServer server, Service service, HttpServlet servlet) {
public HttpMessageProcessor(Logger logger, MessageProducers producer, NodeHttpServer server, Service service, HttpServlet servlet) {
this.logger = logger;
this.finest = logger.isLoggable(Level.FINEST);
this.producer = producer;
@@ -85,11 +85,11 @@ public class HttpMessageProcessor implements MessageProcessor {
if (multiconsumer) {
request.setRequestURI(request.getRequestURI().replaceFirst(this.multimodule, this.restmodule));
}
HttpMessageResponse response = new HttpMessageResponse(context, request, callback, null, null, producer);
HttpMessageResponse response = new HttpMessageResponse(context, request, callback, null, null, producer.getProducer(message));
servlet.execute(request, response);
} catch (Throwable ex) {
if (message.getResptopic() != null && !message.getResptopic().isEmpty()) {
HttpMessageResponse.finishHttpResult(finest, message, callback, producer, message.getResptopic(), new HttpResult().status(500));
HttpMessageResponse.finishHttpResult(finest, message, callback, producer.getProducer(message), message.getResptopic(), new HttpResult().status(500));
}
logger.log(Level.SEVERE, HttpMessageProcessor.class.getSimpleName() + " process error, message=" + message, ex);
}
@@ -105,7 +105,7 @@ public class HttpMessageProcessor implements MessageProcessor {
}
}
public MessageProducer getProducer() {
public MessageProducers getProducer() {
return producer;
}

View File

@@ -39,9 +39,9 @@ public abstract class MessageAgent {
protected AnyValue config;
protected MessageProducer httpProducer;
protected MessageProducers httpProducer;
protected MessageProducer sncpProducer;
protected MessageProducers sncpProducer;
protected final Object httpProducerLock = new Object();
@@ -110,12 +110,12 @@ public abstract class MessageAgent {
protected List<MessageProducer> getAllMessageProducer() {
List<MessageProducer> producers = new ArrayList<>();
if (this.httpProducer != null) producers.add(this.httpProducer);
if (this.sncpProducer != null) producers.add(this.sncpProducer);
MessageProducer one = this.httpMessageClient == null ? null : this.httpMessageClient.getProducer();
if (one != null) producers.add(one);
if (this.httpProducer != null) producers.addAll(List.of(this.httpProducer.producers));
if (this.sncpProducer != null) producers.addAll(List.of(this.sncpProducer.producers));
MessageProducers one = this.httpMessageClient == null ? null : this.httpMessageClient.getProducer();
if (one != null) producers.addAll(List.of(one.producers));
one = this.sncpMessageClient == null ? null : this.sncpMessageClient.getProducer();
if (one != null) producers.add(one);
if (one != null) producers.addAll(List.of(one.producers));
return producers;
}
@@ -155,26 +155,34 @@ public abstract class MessageAgent {
}
//获取指定topic的生产处理器
public MessageProducer getSncpProducer() {
public MessageProducers getSncpProducer() {
if (this.sncpProducer == null) {
synchronized (sncpProducerLock) {
if (this.sncpProducer == null) {
MessageProducer producer = createProducer("SncpProducer");
producer.startup().join();
this.sncpProducer = producer;
MessageProducer[] producers = new MessageProducer[Runtime.getRuntime().availableProcessors()];
for (int i = 0; i < producers.length; i++) {
MessageProducer producer = createProducer("SncpProducer");
producer.startup().join();
producers[i] = producer;
}
this.sncpProducer = new MessageProducers(producers);
}
}
}
return this.sncpProducer;
}
public MessageProducer getHttpProducer() {
public MessageProducers getHttpProducer() {
if (this.httpProducer == null) {
synchronized (httpProducerLock) {
if (this.httpProducer == null) {
MessageProducer producer = createProducer("HttpProducer");
producer.startup().join();
this.httpProducer = producer;
MessageProducer[] producers = new MessageProducer[Runtime.getRuntime().availableProcessors()];
for (int i = 0; i < producers.length; i++) {
MessageProducer producer = createProducer("HttpProducer");
producer.startup().join();
producers[i] = producer;
}
this.httpProducer = new MessageProducers(producers);
}
}
}

View File

@@ -85,5 +85,5 @@ public abstract class MessageClient {
}
}
protected abstract MessageProducer getProducer();
protected abstract MessageProducers getProducer();
}

View File

@@ -0,0 +1,68 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.mq;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.1.0
*/
public class MessageProducers {
protected final MessageProducer[] producers;
protected final AtomicInteger index = new AtomicInteger();
public MessageProducers(MessageProducer[] producers) {
this.producers = producers;
}
public MessageProducer getProducer(MessageRecord message) {
int hash;
if (message.getGroupid() != null && !message.getGroupid().isEmpty()) {
hash = message.getGroupid().hashCode();
} else if (message.getUserid() > 0) {
hash = message.getUserid();
} else {
hash = index.incrementAndGet();
if (index.get() > 1000 * producers.length) {
synchronized (index) {
if (index.get() > 1000 * producers.length) {
index.addAndGet(-1000 * producers.length);
}
}
}
}
return producers[hash % producers.length];
}
public CompletableFuture<Void> apply(MessageRecord message) {
return getProducer(message).apply(message);
}
public CompletableFuture<Void> startup() {
CompletableFuture[] futures = new CompletableFuture[producers.length];
for (int i = 0; i < producers.length; i++) {
futures[i] = producers[i].startup();
}
return CompletableFuture.allOf(futures);
}
public CompletableFuture<Void> shutdown() {
CompletableFuture[] futures = new CompletableFuture[producers.length];
for (int i = 0; i < producers.length; i++) {
futures[i] = producers[i].shutdown();
}
return CompletableFuture.allOf(futures);
}
}

View File

@@ -27,7 +27,7 @@ public class SncpMessageClient extends MessageClient {
}
@Override
protected MessageProducer getProducer() {
protected MessageProducers getProducer() {
return messageAgent.getSncpProducer();
}

View File

@@ -24,7 +24,7 @@ public class SncpMessageProcessor implements MessageProcessor {
protected final Logger logger;
protected final MessageProducer producer;
protected final MessageProducers producer;
protected final NodeSncpServer server;
@@ -40,7 +40,7 @@ public class SncpMessageProcessor implements MessageProcessor {
if (cdl != null) cdl.countDown();
};
public SncpMessageProcessor(Logger logger, MessageProducer producer, NodeSncpServer server, Service service, SncpServlet servlet) {
public SncpMessageProcessor(Logger logger, MessageProducers producer, NodeSncpServer server, Service service, SncpServlet servlet) {
this.logger = logger;
this.producer = producer;
this.server = server;
@@ -68,8 +68,8 @@ public class SncpMessageProcessor implements MessageProcessor {
try {
SncpContext context = server.getSncpServer().getContext();
SncpMessageRequest request = new SncpMessageRequest(context, message);
response = new SncpMessageResponse(context, request, callback, null, producer);
servlet.execute(request, response);
response = new SncpMessageResponse(context, request, callback, null, producer.getProducer(message));
servlet.execute(request, response);
} catch (Throwable ex) {
if (response != null) response.finish(SncpResponse.RETCODE_ILLSERVICEID, null);
logger.log(Level.SEVERE, SncpMessageProcessor.class.getSimpleName() + " process error, message=" + message, ex);
@@ -86,7 +86,7 @@ public class SncpMessageProcessor implements MessageProcessor {
}
}
public MessageProducer getProducer() {
public MessageProducers getProducer() {
return producer;
}