This commit is contained in:
redkale
2023-05-04 14:06:29 +08:00
parent eb5142a125
commit 26d0cce404
10 changed files with 77 additions and 173 deletions

View File

@@ -62,10 +62,6 @@ public class HttpMessageClient extends MessageClient {
produceMessage(generateHttpReqTopic(request, null), 0, null, request, null); produceMessage(generateHttpReqTopic(request, null), 0, null, request, null);
} }
public final void produceMessage(HttpSimpleRequest request, LongAdder counter) {
produceMessage(generateHttpReqTopic(request, null), 0, null, request, counter);
}
public final void produceMessage(Serializable userid, HttpSimpleRequest request) { public final void produceMessage(Serializable userid, HttpSimpleRequest request) {
produceMessage(generateHttpReqTopic(request, null), userid, null, request, null); produceMessage(generateHttpReqTopic(request, null), userid, null, request, null);
} }
@@ -74,18 +70,10 @@ public class HttpMessageClient extends MessageClient {
produceMessage(generateHttpReqTopic(request, null), userid, groupid, request, null); produceMessage(generateHttpReqTopic(request, null), userid, groupid, request, null);
} }
public final void produceMessage(Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) {
produceMessage(generateHttpReqTopic(request, null), userid, groupid, request, counter);
}
public final void produceMessage(String topic, HttpSimpleRequest request) { public final void produceMessage(String topic, HttpSimpleRequest request) {
produceMessage(topic, 0, null, request, null); produceMessage(topic, 0, null, request, null);
} }
public final void produceMessage(String topic, HttpSimpleRequest request, LongAdder counter) {
produceMessage(topic, 0, null, request, counter);
}
public final void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) { public final void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) {
produceMessage(topic, userid, groupid, request, null); produceMessage(topic, userid, groupid, request, null);
} }
@@ -94,10 +82,6 @@ public class HttpMessageClient extends MessageClient {
broadcastMessage(generateHttpReqTopic(request, null), 0, null, request, null); broadcastMessage(generateHttpReqTopic(request, null), 0, null, request, null);
} }
public final void broadcastMessage(HttpSimpleRequest request, LongAdder counter) {
broadcastMessage(generateHttpReqTopic(request, null), 0, null, request, counter);
}
public final void broadcastMessage(Serializable userid, HttpSimpleRequest request) { public final void broadcastMessage(Serializable userid, HttpSimpleRequest request) {
broadcastMessage(generateHttpReqTopic(request, null), userid, null, request, null); broadcastMessage(generateHttpReqTopic(request, null), userid, null, request, null);
} }
@@ -106,18 +90,10 @@ public class HttpMessageClient extends MessageClient {
broadcastMessage(generateHttpReqTopic(request, null), userid, groupid, request, null); broadcastMessage(generateHttpReqTopic(request, null), userid, groupid, request, null);
} }
public final void broadcastMessage(Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) {
broadcastMessage(generateHttpReqTopic(request, null), userid, groupid, request, counter);
}
public final void broadcastMessage(String topic, HttpSimpleRequest request) { public final void broadcastMessage(String topic, HttpSimpleRequest request) {
broadcastMessage(topic, 0, null, request, null); broadcastMessage(topic, 0, null, request, null);
} }
public final void broadcastMessage(String topic, HttpSimpleRequest request, LongAdder counter) {
broadcastMessage(topic, 0, null, request, counter);
}
public final void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) { public final void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) {
broadcastMessage(topic, userid, groupid, request, null); broadcastMessage(topic, userid, groupid, request, null);
} }
@@ -153,10 +129,6 @@ public class HttpMessageClient extends MessageClient {
return sendMessage(generateHttpReqTopic(request, null), 0, null, request, null); return sendMessage(generateHttpReqTopic(request, null), 0, null, request, null);
} }
public final CompletableFuture<HttpResult<byte[]>> sendMessage(HttpSimpleRequest request, LongAdder counter) {
return sendMessage(generateHttpReqTopic(request, null), 0, null, request, counter);
}
public final CompletableFuture<HttpResult<byte[]>> sendMessage(Serializable userid, HttpSimpleRequest request) { public final CompletableFuture<HttpResult<byte[]>> sendMessage(Serializable userid, HttpSimpleRequest request) {
return sendMessage(generateHttpReqTopic(request, null), userid, null, request, null); return sendMessage(generateHttpReqTopic(request, null), userid, null, request, null);
} }
@@ -165,43 +137,35 @@ public class HttpMessageClient extends MessageClient {
return sendMessage(generateHttpReqTopic(request, null), userid, groupid, request, null); return sendMessage(generateHttpReqTopic(request, null), userid, groupid, request, null);
} }
public final CompletableFuture<HttpResult<byte[]>> sendMessage(Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) {
return sendMessage(generateHttpReqTopic(request, null), userid, groupid, request, counter);
}
public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, HttpSimpleRequest request) { public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, HttpSimpleRequest request) {
return sendMessage(topic, 0, null, request, null); return sendMessage(topic, 0, null, request, null);
} }
public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, HttpSimpleRequest request, LongAdder counter) {
return sendMessage(topic, 0, null, request, counter);
}
public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) { public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) {
return sendMessage(topic, userid, null, request, (LongAdder) null); return sendMessage(topic, userid, null, request, (LongAdder) null);
} }
public CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { protected CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) {
MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), HttpSimpleRequestCoder.getInstance().encode(request)); MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), HttpSimpleRequestCoder.getInstance().encode(request));
message.userid(userid).groupid(groupid); message.userid(userid).groupid(groupid);
//if (finest) logger.log(Level.FINEST, "HttpMessageClient.sendMessage: " + message); //if (finest) logger.log(Level.FINEST, "HttpMessageClient.sendMessage: " + message);
return sendMessage(message, true, counter).thenApply(r -> r.decodeContent(HttpResultCoder.getInstance())); return sendMessage(message, true, counter).thenApply(r -> r.decodeContent(HttpResultCoder.getInstance()));
} }
public void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { protected void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) {
MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), HttpSimpleRequestCoder.getInstance().encode(request)); MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), HttpSimpleRequestCoder.getInstance().encode(request));
message.userid(userid).groupid(groupid); message.userid(userid).groupid(groupid);
sendMessage(message, false, counter); sendMessage(message, false, counter);
} }
public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { protected void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) {
MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), HttpSimpleRequestCoder.getInstance().encode(request)); MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), HttpSimpleRequestCoder.getInstance().encode(request));
message.userid(userid).groupid(groupid); message.userid(userid).groupid(groupid);
sendMessage(message, false, counter); sendMessage(message, false, counter);
} }
@Override @Override
protected MessageClientProducers getProducer() { protected MessageClientProducer getProducer() {
return messageAgent.getHttpMessageClientProducer(); return messageAgent.getHttpMessageClientProducer();
} }
} }

View File

@@ -29,7 +29,7 @@ public class HttpMessageClientProcessor implements MessageClientProcessor {
protected HttpMessageClient messageClient; protected HttpMessageClient messageClient;
protected final MessageClientProducers producers; protected final MessageClientProducer producer;
protected final NodeHttpServer server; protected final NodeHttpServer server;
@@ -59,10 +59,10 @@ public class HttpMessageClientProcessor implements MessageClientProcessor {
} }
}; };
public HttpMessageClientProcessor(Logger logger, HttpMessageClient messageClient, MessageClientProducers producers, NodeHttpServer server, Service service, HttpServlet servlet) { public HttpMessageClientProcessor(Logger logger, HttpMessageClient messageClient, MessageClientProducer producer, NodeHttpServer server, Service service, HttpServlet servlet) {
this.logger = logger; this.logger = logger;
this.messageClient = messageClient; this.messageClient = messageClient;
this.producers = producers; this.producer = producer;
this.server = server; this.server = server;
this.service = service; this.service = service;
this.servlet = servlet; this.servlet = servlet;
@@ -99,7 +99,7 @@ public class HttpMessageClientProcessor implements MessageClientProcessor {
} }
HttpMessageResponse response = respSupplier.get(); HttpMessageResponse response = respSupplier.get();
request = response.request(); request = response.request();
response.prepare(message, callback, producers.getProducer(message)); response.prepare(message, callback, producer);
if (multiConsumer) { if (multiConsumer) {
request.setRequestURI(request.getRequestURI().replaceFirst(this.multiModule, this.restModule)); request.setRequestURI(request.getRequestURI().replaceFirst(this.multiModule, this.restModule));
} }
@@ -116,7 +116,7 @@ public class HttpMessageClientProcessor implements MessageClientProcessor {
} catch (Throwable ex) { } catch (Throwable ex) {
if (message.getRespTopic() != null && !message.getRespTopic().isEmpty()) { if (message.getRespTopic() != null && !message.getRespTopic().isEmpty()) {
HttpMessageResponse.finishHttpResult(logger.isLoggable(Level.FINEST), request == null ? null : request.getRespConvert(), HttpMessageResponse.finishHttpResult(logger.isLoggable(Level.FINEST), request == null ? null : request.getRespConvert(),
null, message, callback, messageClient, producers.getProducer(message), message.getRespTopic(), new HttpResult().status(500)); null, message, callback, messageClient, producer, message.getRespTopic(), new HttpResult().status(500));
} }
logger.log(Level.SEVERE, HttpMessageClientProcessor.class.getSimpleName() + " process error, message=" + message, ex instanceof CompletionException ? ((CompletionException) ex).getCause() : ex); logger.log(Level.SEVERE, HttpMessageClientProcessor.class.getSimpleName() + " process error, message=" + message, ex instanceof CompletionException ? ((CompletionException) ex).getCause() : ex);
} }
@@ -134,8 +134,8 @@ public class HttpMessageClientProcessor implements MessageClientProcessor {
} }
} }
public MessageClientProducers getProducer() { public MessageClientProducer getProducer() {
return producers; return producer;
} }
public NodeHttpServer getServer() { public NodeHttpServer getServer() {

View File

@@ -55,7 +55,7 @@ public class HttpMessageClusterClient extends HttpMessageClient {
} }
@Override @Override
public CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { protected CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) {
if (topicServletMap.computeIfAbsent(topic, t -> localClient.findHttpServlet(t) != null)) { if (topicServletMap.computeIfAbsent(topic, t -> localClient.findHttpServlet(t) != null)) {
return localClient.sendMessage(topic, userid, groupid, request, counter); return localClient.sendMessage(topic, userid, groupid, request, counter);
} else { } else {
@@ -64,7 +64,7 @@ public class HttpMessageClusterClient extends HttpMessageClient {
} }
@Override @Override
public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { protected void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) {
if (topicServletMap.computeIfAbsent(topic, t -> localClient.findHttpServlet(t) != null)) { if (topicServletMap.computeIfAbsent(topic, t -> localClient.findHttpServlet(t) != null)) {
localClient.produceMessage(topic, userid, groupid, request, counter); localClient.produceMessage(topic, userid, groupid, request, counter);
} else { } else {
@@ -73,7 +73,7 @@ public class HttpMessageClusterClient extends HttpMessageClient {
} }
@Override @Override
public void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { protected void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) {
mqtpAsync(userid, request); mqtpAsync(userid, request);
} }
@@ -218,11 +218,14 @@ public class HttpMessageClusterClient extends HttpMessageClient {
if (logger.isLoggable(Level.FINEST)) { if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "httpAsync: module=" + localModule + ", resname=" + resname + ", enter forEachCollectionFuture"); logger.log(Level.FINEST, "httpAsync: module=" + localModule + ", resname=" + resname + ", enter forEachCollectionFuture");
} }
return forEachCollectionFuture(logger.isLoggable(Level.FINEST), userid, req, (req.getPath() != null && !req.getPath().isEmpty() ? req.getPath() : "") + req.getRequestURI(), clientHeaders, clientBody, addrs.iterator()); return forEachCollectionFuture(logger.isLoggable(Level.FINEST), userid, req,
(req.getPath() != null && !req.getPath().isEmpty() ? req.getPath() : "") + req.getRequestURI(),
clientHeaders, clientBody, addrs.iterator());
}); });
} }
private CompletableFuture<HttpResult<byte[]>> forEachCollectionFuture(boolean finest, Serializable userid, HttpSimpleRequest req, String requesturi, final Map<String, String> clientHeaders, byte[] clientBody, Iterator<InetSocketAddress> it) { private CompletableFuture<HttpResult<byte[]>> forEachCollectionFuture(boolean finest, Serializable userid,
HttpSimpleRequest req, String requesturi, final Map<String, String> clientHeaders, byte[] clientBody, Iterator<InetSocketAddress> it) {
if (!it.hasNext()) { if (!it.hasNext()) {
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
} }

View File

@@ -114,7 +114,7 @@ public class HttpMessageLocalClient extends HttpMessageClient {
} }
@Override @Override
public CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { protected CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) {
HttpServlet servlet = findHttpServlet(topic); HttpServlet servlet = findHttpServlet(topic);
if (servlet == null) { if (servlet == null) {
if (logger.isLoggable(Level.FINE)) { if (logger.isLoggable(Level.FINE)) {
@@ -146,7 +146,7 @@ public class HttpMessageLocalClient extends HttpMessageClient {
} }
@Override @Override
public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { protected void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) {
HttpDispatcherServlet ps = dispatcherServlet(); HttpDispatcherServlet ps = dispatcherServlet();
HttpServlet servlet = ps.findServletByTopic(topic); HttpServlet servlet = ps.findServletByTopic(topic);
if (servlet == null) { if (servlet == null) {
@@ -165,7 +165,7 @@ public class HttpMessageLocalClient extends HttpMessageClient {
} }
@Override @Override
public void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { protected void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) {
HttpDispatcherServlet ps = dispatcherServlet(); HttpDispatcherServlet ps = dispatcherServlet();
HttpRequest req = new HttpMessageLocalRequest(context(), request, userid); HttpRequest req = new HttpMessageLocalRequest(context(), request, userid);
HttpResponse resp = new HttpMessageLocalResponse(req, null); HttpResponse resp = new HttpMessageLocalResponse(req, null);

View File

@@ -38,7 +38,7 @@ public class HttpMessageResponse extends HttpResponse {
protected Runnable callback; protected Runnable callback;
public HttpMessageResponse(HttpContext context, HttpMessageClient messageClient, final Supplier<HttpMessageResponse> respSupplier, final Consumer<HttpMessageResponse> respConsumer) { public HttpMessageResponse(HttpContext context, HttpMessageClient messageClient, Supplier<HttpMessageResponse> respSupplier, Consumer<HttpMessageResponse> respConsumer) {
super(context, new HttpMessageRequest(context), null); super(context, new HttpMessageRequest(context), null);
this.responseSupplier = (Supplier) respSupplier; this.responseSupplier = (Supplier) respSupplier;
this.responseConsumer = (Consumer) respConsumer; this.responseConsumer = (Consumer) respConsumer;
@@ -57,14 +57,18 @@ public class HttpMessageResponse extends HttpResponse {
} }
public void finishHttpResult(Type type, HttpResult result) { public void finishHttpResult(Type type, HttpResult result) {
finishHttpResult(producer.logger.isLoggable(Level.FINEST), ((HttpMessageRequest) this.request).getRespConvert(), type, this.message, this.callback, this.messageClient, this.producer, message.getRespTopic(), result); finishHttpResult(producer.logger.isLoggable(Level.FINEST), ((HttpMessageRequest) this.request).getRespConvert(),
type, this.message, this.callback, this.messageClient, this.producer, message.getRespTopic(), result);
} }
public void finishHttpResult(Type type, Convert respConvert, HttpResult result) { public void finishHttpResult(Type type, Convert respConvert, HttpResult result) {
finishHttpResult(producer.logger.isLoggable(Level.FINEST), respConvert == null ? ((HttpMessageRequest) this.request).getRespConvert() : respConvert, type, this.message, this.callback, this.messageClient, this.producer, message.getRespTopic(), result); finishHttpResult(producer.logger.isLoggable(Level.FINEST),
respConvert == null ? ((HttpMessageRequest) this.request).getRespConvert() : respConvert,
type, this.message, this.callback, this.messageClient, this.producer, message.getRespTopic(), result);
} }
public static void finishHttpResult(boolean finest, Convert respConvert, Type type, MessageRecord msg, Runnable callback, MessageClient messageClient, MessageClientProducer producer, String resptopic, HttpResult result) { public static void finishHttpResult(boolean finest, Convert respConvert, Type type, MessageRecord msg,
Runnable callback, MessageClient messageClient, MessageClientProducer producer, String resptopic, HttpResult result) {
if (callback != null) { if (callback != null) {
callback.run(); callback.run();
} }
@@ -86,7 +90,8 @@ public class HttpMessageResponse extends HttpResponse {
if (innerrs instanceof byte[]) { if (innerrs instanceof byte[]) {
innerrs = new String((byte[]) innerrs, StandardCharsets.UTF_8); innerrs = new String((byte[]) innerrs, StandardCharsets.UTF_8);
} }
producer.logger.log(Level.FINEST, "HttpMessageResponse.finishHttpResult seqid=" + msg.getSeqid() + ", content: " + innerrs + ", status: " + result.getStatus() + ", headers: " + result.getHeaders()); producer.logger.log(Level.FINEST, "HttpMessageResponse.finishHttpResult seqid=" + msg.getSeqid()
+ ", content: " + innerrs + ", status: " + result.getStatus() + ", headers: " + result.getHeaders());
} }
byte[] content = HttpResultCoder.getInstance().encode(result); byte[] content = HttpResultCoder.getInstance().encode(result);
producer.apply(messageClient.createMessageRecord(msg.getSeqid(), CTYPE_HTTP_RESULT, resptopic, null, content)); producer.apply(messageClient.createMessageRecord(msg.getSeqid(), CTYPE_HTTP_RESULT, resptopic, null, content));

View File

@@ -46,9 +46,9 @@ public abstract class MessageAgent implements Resourcable {
protected AnyValue config; protected AnyValue config;
protected MessageClientProducers httpProducer; protected MessageClientProducer httpProducer;
protected MessageClientProducers sncpProducer; protected MessageClientProducer sncpProducer;
protected final ReentrantLock httpProducerLock = new ReentrantLock(); protected final ReentrantLock httpProducerLock = new ReentrantLock();
@@ -68,8 +68,6 @@ public abstract class MessageAgent implements Resourcable {
protected ScheduledThreadPoolExecutor timeoutExecutor; protected ScheduledThreadPoolExecutor timeoutExecutor;
protected int producerCount = 1;
protected MessageCoder<MessageRecord> messageCoder = MessageRecordCoder.getInstance(); protected MessageCoder<MessageRecord> messageCoder = MessageRecordCoder.getInstance();
//本地Service消息接收处理器 key:consumerid //本地Service消息接收处理器 key:consumerid
@@ -79,7 +77,6 @@ public abstract class MessageAgent implements Resourcable {
this.name = checkName(config.getValue("name", "")); this.name = checkName(config.getValue("name", ""));
this.httpMessageClient = new HttpMessageClient(this); this.httpMessageClient = new HttpMessageClient(this);
this.sncpMessageClient = new SncpMessageClient(this); this.sncpMessageClient = new SncpMessageClient(this);
this.producerCount = config.getIntValue("producers", Utility.cpus());
String coderType = config.getValue("coder", ""); String coderType = config.getValue("coder", "");
if (!coderType.trim().isEmpty()) { if (!coderType.trim().isEmpty()) {
try { try {
@@ -162,18 +159,18 @@ public abstract class MessageAgent implements Resourcable {
protected List<MessageClientProducer> getMessageClientProducers() { protected List<MessageClientProducer> getMessageClientProducers() {
List<MessageClientProducer> producers = new ArrayList<>(); List<MessageClientProducer> producers = new ArrayList<>();
if (this.httpProducer != null) { if (this.httpProducer != null) {
producers.addAll(Utility.ofList(this.httpProducer.producers)); producers.add(this.httpProducer);
} }
if (this.sncpProducer != null) { if (this.sncpProducer != null) {
producers.addAll(Utility.ofList(this.sncpProducer.producers)); producers.add(this.sncpProducer);
} }
MessageClientProducers one = this.httpMessageClient == null ? null : this.httpMessageClient.getProducer(); MessageClientProducer one = this.httpMessageClient == null ? null : this.httpMessageClient.getProducer();
if (one != null) { if (one != null) {
producers.addAll(Utility.ofList(one.producers)); producers.add(one);
} }
one = this.sncpMessageClient == null ? null : this.sncpMessageClient.getProducer(); one = this.sncpMessageClient == null ? null : this.sncpMessageClient.getProducer();
if (one != null) { if (one != null) {
producers.addAll(Utility.ofList(one.producers)); producers.add(one);
} }
return producers; return producers;
} }
@@ -227,23 +224,17 @@ public abstract class MessageAgent implements Resourcable {
} }
//获取指定topic的生产处理器 //获取指定topic的生产处理器
public MessageClientProducers getSncpMessageClientProducer() { public MessageClientProducer getSncpMessageClientProducer() {
if (this.sncpProducer == null) { if (this.sncpProducer == null) {
sncpProducerLock.lock(); sncpProducerLock.lock();
try { try {
if (this.sncpProducer == null) { if (this.sncpProducer == null) {
long s = System.currentTimeMillis(); long s = System.currentTimeMillis();
MessageClientProducer[] producers = new MessageClientProducer[producerCount]; this.sncpProducer = createMessageClientProducer("SncpProducer");
for (int i = 0; i < producers.length; i++) {
MessageClientProducer producer = createMessageClientProducer("SncpProducer");
producer.startup().join();
producers[i] = producer;
}
long e = System.currentTimeMillis() - s; long e = System.currentTimeMillis() - s;
if (logger.isLoggable(Level.FINEST)) { if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "MessageAgent.SncpProducer startup all in " + e + "ms"); logger.log(Level.FINEST, "MessageAgent.SncpProducer startup all in " + e + "ms");
} }
this.sncpProducer = new MessageClientProducers(producers);
} }
} finally { } finally {
sncpProducerLock.unlock(); sncpProducerLock.unlock();
@@ -252,23 +243,17 @@ public abstract class MessageAgent implements Resourcable {
return this.sncpProducer; return this.sncpProducer;
} }
public MessageClientProducers getHttpMessageClientProducer() { public MessageClientProducer getHttpMessageClientProducer() {
if (this.httpProducer == null) { if (this.httpProducer == null) {
httpProducerLock.lock(); httpProducerLock.lock();
try { try {
if (this.httpProducer == null) { if (this.httpProducer == null) {
long s = System.currentTimeMillis(); long s = System.currentTimeMillis();
MessageClientProducer[] producers = new MessageClientProducer[producerCount]; this.httpProducer = createMessageClientProducer("HttpProducer");
for (int i = 0; i < producers.length; i++) {
MessageClientProducer producer = createMessageClientProducer("HttpProducer");
producer.startup().join();
producers[i] = producer;
}
long e = System.currentTimeMillis() - s; long e = System.currentTimeMillis() - s;
if (logger.isLoggable(Level.FINEST)) { if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "MessageAgent.HttpProducer startup all in " + e + "ms"); logger.log(Level.FINEST, "MessageAgent.HttpProducer startup all in " + e + "ms");
} }
this.httpProducer = new MessageClientProducers(producers);
} }
} finally { } finally {
httpProducerLock.unlock(); httpProducerLock.unlock();

View File

@@ -56,6 +56,10 @@ public abstract class MessageClient {
return this.respConsumer.shutdown(); return this.respConsumer.shutdown();
} }
protected CompletableFuture<MessageRecord> sendMessage(final MessageRecord message, boolean needresp) {
return sendMessage(message, needresp, null);
}
protected CompletableFuture<MessageRecord> sendMessage(final MessageRecord message, boolean needresp, LongAdder counter) { protected CompletableFuture<MessageRecord> sendMessage(final MessageRecord message, boolean needresp, LongAdder counter) {
CompletableFuture<MessageRecord> future = new CompletableFuture<>(); CompletableFuture<MessageRecord> future = new CompletableFuture<>();
boolean finest = messageAgent != null && messageAgent.logger.isLoggable(Level.FINEST); boolean finest = messageAgent != null && messageAgent.logger.isLoggable(Level.FINEST);
@@ -136,46 +140,56 @@ public abstract class MessageClient {
return message; return message;
} }
protected abstract MessageClientProducers getProducer(); protected abstract MessageClientProducer getProducer();
public MessageRecord createMessageRecord(String resptopic, String content) { public MessageRecord createMessageRecord(String resptopic, String content) {
return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, null, null, resptopic, Traces.currTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8)); return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0,
null, null, resptopic, Traces.currTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8));
} }
public MessageRecord createMessageRecord(String topic, String resptopic, String content) { public MessageRecord createMessageRecord(String topic, String resptopic, String content) {
return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, Traces.currTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8)); return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0,
null, topic, resptopic, Traces.currTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8));
} }
public MessageRecord createMessageRecord(String topic, String resptopic, String traceid, String content) { public MessageRecord createMessageRecord(String topic, String resptopic, String traceid, String content) {
return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, traceid, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0,
null, topic, resptopic, traceid, content == null ? null : content.getBytes(StandardCharsets.UTF_8));
} }
public MessageRecord createMessageRecord(int userid, String topic, String resptopic, String content) { public MessageRecord createMessageRecord(int userid, String topic, String resptopic, String content) {
return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, Traces.currTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8)); return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), userid,
null, topic, resptopic, Traces.currTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8));
} }
public MessageRecord createMessageRecord(int userid, String topic, String resptopic, String traceid, String content) { public MessageRecord createMessageRecord(int userid, String topic, String resptopic, String traceid, String content) {
return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, traceid, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), userid,
null, topic, resptopic, traceid, content == null ? null : content.getBytes(StandardCharsets.UTF_8));
} }
public MessageRecord createMessageRecord(String topic, String resptopic, Convert convert, Object bean) { public MessageRecord createMessageRecord(String topic, String resptopic, Convert convert, Object bean) {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, Traces.currTraceid(), convert.convertToBytes(bean)); return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), 0,
null, topic, resptopic, Traces.currTraceid(), convert.convertToBytes(bean));
} }
public MessageRecord createMessageRecord(String topic, String resptopic, String traceid, Convert convert, Object bean) { public MessageRecord createMessageRecord(String topic, String resptopic, String traceid, Convert convert, Object bean) {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, traceid, convert.convertToBytes(bean)); return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), 0,
null, topic, resptopic, traceid, convert.convertToBytes(bean));
} }
public MessageRecord createMessageRecord(int userid, String topic, String resptopic, Convert convert, Object bean) { public MessageRecord createMessageRecord(int userid, String topic, String resptopic, Convert convert, Object bean) {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, Traces.currTraceid(), convert.convertToBytes(bean)); return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid,
null, topic, resptopic, Traces.currTraceid(), convert.convertToBytes(bean));
} }
public MessageRecord createMessageRecord(int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) { public MessageRecord createMessageRecord(int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, groupid, topic, resptopic, Traces.currTraceid(), convert.convertToBytes(bean)); return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid,
groupid, topic, resptopic, Traces.currTraceid(), convert.convertToBytes(bean));
} }
public MessageRecord createMessageRecord(int flag, int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) { public MessageRecord createMessageRecord(int flag, int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, Traces.currTraceid(), convert.convertToBytes(bean)); return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, flag, System.currentTimeMillis(), userid,
groupid, topic, resptopic, Traces.currTraceid(), convert.convertToBytes(bean));
} }
public MessageRecord createMessageRecord(String topic, String resptopic, byte[] content) { public MessageRecord createMessageRecord(String topic, String resptopic, byte[] content) {

View File

@@ -1,56 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.mq;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.1.0
*/
public class MessageClientProducers {
protected final MessageClientProducer[] producers;
protected final AtomicInteger index = new AtomicInteger();
public MessageClientProducers(MessageClientProducer[] producers) {
this.producers = producers;
}
public MessageClientProducer getProducer(MessageRecord message) {
if (this.producers.length == 1) {
return this.producers[0];
}
return producers[Math.abs(index.incrementAndGet()) % producers.length];
}
public CompletableFuture<Void> apply(MessageRecord message) {
return getProducer(message).apply(message);
}
public CompletableFuture<Void> startup() {
CompletableFuture[] futures = new CompletableFuture[producers.length];
for (int i = 0; i < producers.length; i++) {
futures[i] = producers[i].startup();
}
return CompletableFuture.allOf(futures);
}
public CompletableFuture<Void> shutdown() {
CompletableFuture[] futures = new CompletableFuture[producers.length];
for (int i = 0; i < producers.length; i++) {
futures[i] = producers[i].shutdown();
}
return CompletableFuture.allOf(futures);
}
}

View File

@@ -6,7 +6,6 @@
package org.redkale.mq; package org.redkale.mq;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.LongAdder;
/** /**
* *
@@ -25,7 +24,7 @@ public class SncpMessageClient extends MessageClient {
} }
@Override @Override
protected MessageClientProducers getProducer() { protected MessageClientProducer getProducer() {
return messageAgent.getSncpMessageClientProducer(); return messageAgent.getSncpMessageClientProducer();
} }
@@ -35,22 +34,12 @@ public class SncpMessageClient extends MessageClient {
//只发送消息,不需要响应 //只发送消息,不需要响应
public final void produceMessage(MessageRecord message) { public final void produceMessage(MessageRecord message) {
produceMessage(message, null); sendMessage(message, false);
}
//只发送消息,不需要响应
public final void produceMessage(MessageRecord message, LongAdder counter) {
sendMessage(message, false, counter);
} }
//发送消息,需要响应 //发送消息,需要响应
public final CompletableFuture<MessageRecord> sendMessage(MessageRecord message) { public final CompletableFuture<MessageRecord> sendMessage(MessageRecord message) {
return sendMessage(message, null); return sendMessage(message, true);
}
//发送消息,需要响应
public final CompletableFuture<MessageRecord> sendMessage(MessageRecord message, LongAdder counter) {
return sendMessage(message, true, counter);
} }
@Override @Override

View File

@@ -28,7 +28,7 @@ public class SncpMessageClientProcessor implements MessageClientProcessor {
protected MessageClient messageClient; protected MessageClient messageClient;
protected final MessageClientProducers producer; protected final MessageClientProducer producer;
protected final NodeSncpServer server; protected final NodeSncpServer server;
@@ -46,7 +46,7 @@ public class SncpMessageClientProcessor implements MessageClientProcessor {
} }
}; };
public SncpMessageClientProcessor(Logger logger, SncpMessageClient messageClient, MessageClientProducers producer, NodeSncpServer server, Service service, SncpServlet servlet) { public SncpMessageClientProcessor(Logger logger, SncpMessageClient messageClient, MessageClientProducer producer, NodeSncpServer server, Service service, SncpServlet servlet) {
this.logger = logger; this.logger = logger;
this.messageClient = messageClient; this.messageClient = messageClient;
this.producer = producer; this.producer = producer;
@@ -75,7 +75,7 @@ public class SncpMessageClientProcessor implements MessageClientProcessor {
long e = now - starttime; long e = now - starttime;
SncpContext context = server.getSncpServer().getContext(); SncpContext context = server.getSncpServer().getContext();
SncpMessageRequest request = new SncpMessageRequest(context, message); SncpMessageRequest request = new SncpMessageRequest(context, message);
response = new SncpMessageResponse(context, request, callback, messageClient, producer.getProducer(message)); response = new SncpMessageResponse(context, request, callback, messageClient, producer);
context.execute(servlet, request, response); context.execute(servlet, request, response);
long o = System.currentTimeMillis() - now; long o = System.currentTimeMillis() - now;
@@ -105,7 +105,7 @@ public class SncpMessageClientProcessor implements MessageClientProcessor {
} }
} }
public MessageClientProducers getProducer() { public MessageClientProducer getProducer() {
return producer; return producer;
} }