From 26d0cce40462a1bd8d164da1ec34389e499a83ae Mon Sep 17 00:00:00 2001 From: redkale Date: Thu, 4 May 2023 14:06:29 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96mq?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/redkale/mq/HttpMessageClient.java | 44 ++------------- .../mq/HttpMessageClientProcessor.java | 14 ++--- .../redkale/mq/HttpMessageClusterClient.java | 13 +++-- .../redkale/mq/HttpMessageLocalClient.java | 6 +- .../org/redkale/mq/HttpMessageResponse.java | 15 +++-- .../java/org/redkale/mq/MessageAgent.java | 41 +++++--------- .../java/org/redkale/mq/MessageClient.java | 36 ++++++++---- .../redkale/mq/MessageClientProducers.java | 56 ------------------- .../org/redkale/mq/SncpMessageClient.java | 17 +----- .../mq/SncpMessageClientProcessor.java | 8 +-- 10 files changed, 77 insertions(+), 173 deletions(-) delete mode 100644 src/main/java/org/redkale/mq/MessageClientProducers.java diff --git a/src/main/java/org/redkale/mq/HttpMessageClient.java b/src/main/java/org/redkale/mq/HttpMessageClient.java index 55e76a2c5..5cab40740 100644 --- a/src/main/java/org/redkale/mq/HttpMessageClient.java +++ b/src/main/java/org/redkale/mq/HttpMessageClient.java @@ -62,10 +62,6 @@ public class HttpMessageClient extends MessageClient { produceMessage(generateHttpReqTopic(request, null), 0, null, request, null); } - public final void produceMessage(HttpSimpleRequest request, LongAdder counter) { - produceMessage(generateHttpReqTopic(request, null), 0, null, request, counter); - } - public final void produceMessage(Serializable userid, HttpSimpleRequest request) { produceMessage(generateHttpReqTopic(request, null), userid, null, request, null); } @@ -74,18 +70,10 @@ public class HttpMessageClient extends MessageClient { produceMessage(generateHttpReqTopic(request, null), userid, groupid, request, null); } - public final void produceMessage(Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { - produceMessage(generateHttpReqTopic(request, null), userid, groupid, request, counter); - } - public final void produceMessage(String topic, HttpSimpleRequest request) { produceMessage(topic, 0, null, request, null); } - public final void produceMessage(String topic, HttpSimpleRequest request, LongAdder counter) { - produceMessage(topic, 0, null, request, counter); - } - public final void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) { produceMessage(topic, userid, groupid, request, null); } @@ -94,10 +82,6 @@ public class HttpMessageClient extends MessageClient { broadcastMessage(generateHttpReqTopic(request, null), 0, null, request, null); } - public final void broadcastMessage(HttpSimpleRequest request, LongAdder counter) { - broadcastMessage(generateHttpReqTopic(request, null), 0, null, request, counter); - } - public final void broadcastMessage(Serializable userid, HttpSimpleRequest request) { broadcastMessage(generateHttpReqTopic(request, null), userid, null, request, null); } @@ -106,18 +90,10 @@ public class HttpMessageClient extends MessageClient { broadcastMessage(generateHttpReqTopic(request, null), userid, groupid, request, null); } - public final void broadcastMessage(Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { - broadcastMessage(generateHttpReqTopic(request, null), userid, groupid, request, counter); - } - public final void broadcastMessage(String topic, HttpSimpleRequest request) { broadcastMessage(topic, 0, null, request, null); } - public final void broadcastMessage(String topic, HttpSimpleRequest request, LongAdder counter) { - broadcastMessage(topic, 0, null, request, counter); - } - public final void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) { broadcastMessage(topic, userid, groupid, request, null); } @@ -153,10 +129,6 @@ public class HttpMessageClient extends MessageClient { return sendMessage(generateHttpReqTopic(request, null), 0, null, request, null); } - public final CompletableFuture> sendMessage(HttpSimpleRequest request, LongAdder counter) { - return sendMessage(generateHttpReqTopic(request, null), 0, null, request, counter); - } - public final CompletableFuture> sendMessage(Serializable userid, HttpSimpleRequest request) { return sendMessage(generateHttpReqTopic(request, null), userid, null, request, null); } @@ -165,43 +137,35 @@ public class HttpMessageClient extends MessageClient { return sendMessage(generateHttpReqTopic(request, null), userid, groupid, request, null); } - public final CompletableFuture> sendMessage(Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { - return sendMessage(generateHttpReqTopic(request, null), userid, groupid, request, counter); - } - public final CompletableFuture> sendMessage(String topic, HttpSimpleRequest request) { return sendMessage(topic, 0, null, request, null); } - public final CompletableFuture> sendMessage(String topic, HttpSimpleRequest request, LongAdder counter) { - return sendMessage(topic, 0, null, request, counter); - } - public final CompletableFuture> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) { return sendMessage(topic, userid, null, request, (LongAdder) null); } - public CompletableFuture> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { + protected CompletableFuture> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), HttpSimpleRequestCoder.getInstance().encode(request)); message.userid(userid).groupid(groupid); //if (finest) logger.log(Level.FINEST, "HttpMessageClient.sendMessage: " + message); return sendMessage(message, true, counter).thenApply(r -> r.decodeContent(HttpResultCoder.getInstance())); } - public void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { + protected void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), HttpSimpleRequestCoder.getInstance().encode(request)); message.userid(userid).groupid(groupid); sendMessage(message, false, counter); } - public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { + protected void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), HttpSimpleRequestCoder.getInstance().encode(request)); message.userid(userid).groupid(groupid); sendMessage(message, false, counter); } @Override - protected MessageClientProducers getProducer() { + protected MessageClientProducer getProducer() { return messageAgent.getHttpMessageClientProducer(); } } diff --git a/src/main/java/org/redkale/mq/HttpMessageClientProcessor.java b/src/main/java/org/redkale/mq/HttpMessageClientProcessor.java index 0467d06a4..161a27e2a 100644 --- a/src/main/java/org/redkale/mq/HttpMessageClientProcessor.java +++ b/src/main/java/org/redkale/mq/HttpMessageClientProcessor.java @@ -29,7 +29,7 @@ public class HttpMessageClientProcessor implements MessageClientProcessor { protected HttpMessageClient messageClient; - protected final MessageClientProducers producers; + protected final MessageClientProducer producer; protected final NodeHttpServer server; @@ -59,10 +59,10 @@ public class HttpMessageClientProcessor implements MessageClientProcessor { } }; - public HttpMessageClientProcessor(Logger logger, HttpMessageClient messageClient, MessageClientProducers producers, NodeHttpServer server, Service service, HttpServlet servlet) { + public HttpMessageClientProcessor(Logger logger, HttpMessageClient messageClient, MessageClientProducer producer, NodeHttpServer server, Service service, HttpServlet servlet) { this.logger = logger; this.messageClient = messageClient; - this.producers = producers; + this.producer = producer; this.server = server; this.service = service; this.servlet = servlet; @@ -99,7 +99,7 @@ public class HttpMessageClientProcessor implements MessageClientProcessor { } HttpMessageResponse response = respSupplier.get(); request = response.request(); - response.prepare(message, callback, producers.getProducer(message)); + response.prepare(message, callback, producer); if (multiConsumer) { request.setRequestURI(request.getRequestURI().replaceFirst(this.multiModule, this.restModule)); } @@ -116,7 +116,7 @@ public class HttpMessageClientProcessor implements MessageClientProcessor { } catch (Throwable ex) { if (message.getRespTopic() != null && !message.getRespTopic().isEmpty()) { HttpMessageResponse.finishHttpResult(logger.isLoggable(Level.FINEST), request == null ? null : request.getRespConvert(), - null, message, callback, messageClient, producers.getProducer(message), message.getRespTopic(), new HttpResult().status(500)); + null, message, callback, messageClient, producer, message.getRespTopic(), new HttpResult().status(500)); } logger.log(Level.SEVERE, HttpMessageClientProcessor.class.getSimpleName() + " process error, message=" + message, ex instanceof CompletionException ? ((CompletionException) ex).getCause() : ex); } @@ -134,8 +134,8 @@ public class HttpMessageClientProcessor implements MessageClientProcessor { } } - public MessageClientProducers getProducer() { - return producers; + public MessageClientProducer getProducer() { + return producer; } public NodeHttpServer getServer() { diff --git a/src/main/java/org/redkale/mq/HttpMessageClusterClient.java b/src/main/java/org/redkale/mq/HttpMessageClusterClient.java index 89c47f734..22f710976 100644 --- a/src/main/java/org/redkale/mq/HttpMessageClusterClient.java +++ b/src/main/java/org/redkale/mq/HttpMessageClusterClient.java @@ -55,7 +55,7 @@ public class HttpMessageClusterClient extends HttpMessageClient { } @Override - public CompletableFuture> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { + protected CompletableFuture> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { if (topicServletMap.computeIfAbsent(topic, t -> localClient.findHttpServlet(t) != null)) { return localClient.sendMessage(topic, userid, groupid, request, counter); } else { @@ -64,7 +64,7 @@ public class HttpMessageClusterClient extends HttpMessageClient { } @Override - public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { + protected void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { if (topicServletMap.computeIfAbsent(topic, t -> localClient.findHttpServlet(t) != null)) { localClient.produceMessage(topic, userid, groupid, request, counter); } else { @@ -73,7 +73,7 @@ public class HttpMessageClusterClient extends HttpMessageClient { } @Override - public void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { + protected void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { mqtpAsync(userid, request); } @@ -218,11 +218,14 @@ public class HttpMessageClusterClient extends HttpMessageClient { if (logger.isLoggable(Level.FINEST)) { logger.log(Level.FINEST, "httpAsync: module=" + localModule + ", resname=" + resname + ", enter forEachCollectionFuture"); } - return forEachCollectionFuture(logger.isLoggable(Level.FINEST), userid, req, (req.getPath() != null && !req.getPath().isEmpty() ? req.getPath() : "") + req.getRequestURI(), clientHeaders, clientBody, addrs.iterator()); + return forEachCollectionFuture(logger.isLoggable(Level.FINEST), userid, req, + (req.getPath() != null && !req.getPath().isEmpty() ? req.getPath() : "") + req.getRequestURI(), + clientHeaders, clientBody, addrs.iterator()); }); } - private CompletableFuture> forEachCollectionFuture(boolean finest, Serializable userid, HttpSimpleRequest req, String requesturi, final Map clientHeaders, byte[] clientBody, Iterator it) { + private CompletableFuture> forEachCollectionFuture(boolean finest, Serializable userid, + HttpSimpleRequest req, String requesturi, final Map clientHeaders, byte[] clientBody, Iterator it) { if (!it.hasNext()) { return CompletableFuture.completedFuture(null); } diff --git a/src/main/java/org/redkale/mq/HttpMessageLocalClient.java b/src/main/java/org/redkale/mq/HttpMessageLocalClient.java index fdbad7df1..dc50b11e3 100644 --- a/src/main/java/org/redkale/mq/HttpMessageLocalClient.java +++ b/src/main/java/org/redkale/mq/HttpMessageLocalClient.java @@ -114,7 +114,7 @@ public class HttpMessageLocalClient extends HttpMessageClient { } @Override - public CompletableFuture> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { + protected CompletableFuture> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { HttpServlet servlet = findHttpServlet(topic); if (servlet == null) { if (logger.isLoggable(Level.FINE)) { @@ -146,7 +146,7 @@ public class HttpMessageLocalClient extends HttpMessageClient { } @Override - public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { + protected void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { HttpDispatcherServlet ps = dispatcherServlet(); HttpServlet servlet = ps.findServletByTopic(topic); if (servlet == null) { @@ -165,7 +165,7 @@ public class HttpMessageLocalClient extends HttpMessageClient { } @Override - public void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { + protected void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { HttpDispatcherServlet ps = dispatcherServlet(); HttpRequest req = new HttpMessageLocalRequest(context(), request, userid); HttpResponse resp = new HttpMessageLocalResponse(req, null); diff --git a/src/main/java/org/redkale/mq/HttpMessageResponse.java b/src/main/java/org/redkale/mq/HttpMessageResponse.java index f2dbad651..35ed7e741 100644 --- a/src/main/java/org/redkale/mq/HttpMessageResponse.java +++ b/src/main/java/org/redkale/mq/HttpMessageResponse.java @@ -38,7 +38,7 @@ public class HttpMessageResponse extends HttpResponse { protected Runnable callback; - public HttpMessageResponse(HttpContext context, HttpMessageClient messageClient, final Supplier respSupplier, final Consumer respConsumer) { + public HttpMessageResponse(HttpContext context, HttpMessageClient messageClient, Supplier respSupplier, Consumer respConsumer) { super(context, new HttpMessageRequest(context), null); this.responseSupplier = (Supplier) respSupplier; this.responseConsumer = (Consumer) respConsumer; @@ -57,14 +57,18 @@ public class HttpMessageResponse extends HttpResponse { } public void finishHttpResult(Type type, HttpResult result) { - finishHttpResult(producer.logger.isLoggable(Level.FINEST), ((HttpMessageRequest) this.request).getRespConvert(), type, this.message, this.callback, this.messageClient, this.producer, message.getRespTopic(), result); + finishHttpResult(producer.logger.isLoggable(Level.FINEST), ((HttpMessageRequest) this.request).getRespConvert(), + type, this.message, this.callback, this.messageClient, this.producer, message.getRespTopic(), result); } public void finishHttpResult(Type type, Convert respConvert, HttpResult result) { - finishHttpResult(producer.logger.isLoggable(Level.FINEST), respConvert == null ? ((HttpMessageRequest) this.request).getRespConvert() : respConvert, type, this.message, this.callback, this.messageClient, this.producer, message.getRespTopic(), result); + finishHttpResult(producer.logger.isLoggable(Level.FINEST), + respConvert == null ? ((HttpMessageRequest) this.request).getRespConvert() : respConvert, + type, this.message, this.callback, this.messageClient, this.producer, message.getRespTopic(), result); } - public static void finishHttpResult(boolean finest, Convert respConvert, Type type, MessageRecord msg, Runnable callback, MessageClient messageClient, MessageClientProducer producer, String resptopic, HttpResult result) { + public static void finishHttpResult(boolean finest, Convert respConvert, Type type, MessageRecord msg, + Runnable callback, MessageClient messageClient, MessageClientProducer producer, String resptopic, HttpResult result) { if (callback != null) { callback.run(); } @@ -86,7 +90,8 @@ public class HttpMessageResponse extends HttpResponse { if (innerrs instanceof byte[]) { innerrs = new String((byte[]) innerrs, StandardCharsets.UTF_8); } - producer.logger.log(Level.FINEST, "HttpMessageResponse.finishHttpResult seqid=" + msg.getSeqid() + ", content: " + innerrs + ", status: " + result.getStatus() + ", headers: " + result.getHeaders()); + producer.logger.log(Level.FINEST, "HttpMessageResponse.finishHttpResult seqid=" + msg.getSeqid() + + ", content: " + innerrs + ", status: " + result.getStatus() + ", headers: " + result.getHeaders()); } byte[] content = HttpResultCoder.getInstance().encode(result); producer.apply(messageClient.createMessageRecord(msg.getSeqid(), CTYPE_HTTP_RESULT, resptopic, null, content)); diff --git a/src/main/java/org/redkale/mq/MessageAgent.java b/src/main/java/org/redkale/mq/MessageAgent.java index a388de23a..514c0566d 100644 --- a/src/main/java/org/redkale/mq/MessageAgent.java +++ b/src/main/java/org/redkale/mq/MessageAgent.java @@ -46,9 +46,9 @@ public abstract class MessageAgent implements Resourcable { protected AnyValue config; - protected MessageClientProducers httpProducer; + protected MessageClientProducer httpProducer; - protected MessageClientProducers sncpProducer; + protected MessageClientProducer sncpProducer; protected final ReentrantLock httpProducerLock = new ReentrantLock(); @@ -68,8 +68,6 @@ public abstract class MessageAgent implements Resourcable { protected ScheduledThreadPoolExecutor timeoutExecutor; - protected int producerCount = 1; - protected MessageCoder messageCoder = MessageRecordCoder.getInstance(); //本地Service消息接收处理器, key:consumerid @@ -79,7 +77,6 @@ public abstract class MessageAgent implements Resourcable { this.name = checkName(config.getValue("name", "")); this.httpMessageClient = new HttpMessageClient(this); this.sncpMessageClient = new SncpMessageClient(this); - this.producerCount = config.getIntValue("producers", Utility.cpus()); String coderType = config.getValue("coder", ""); if (!coderType.trim().isEmpty()) { try { @@ -162,18 +159,18 @@ public abstract class MessageAgent implements Resourcable { protected List getMessageClientProducers() { List producers = new ArrayList<>(); if (this.httpProducer != null) { - producers.addAll(Utility.ofList(this.httpProducer.producers)); + producers.add(this.httpProducer); } if (this.sncpProducer != null) { - producers.addAll(Utility.ofList(this.sncpProducer.producers)); + producers.add(this.sncpProducer); } - MessageClientProducers one = this.httpMessageClient == null ? null : this.httpMessageClient.getProducer(); + MessageClientProducer one = this.httpMessageClient == null ? null : this.httpMessageClient.getProducer(); if (one != null) { - producers.addAll(Utility.ofList(one.producers)); + producers.add(one); } one = this.sncpMessageClient == null ? null : this.sncpMessageClient.getProducer(); if (one != null) { - producers.addAll(Utility.ofList(one.producers)); + producers.add(one); } return producers; } @@ -227,23 +224,17 @@ public abstract class MessageAgent implements Resourcable { } //获取指定topic的生产处理器 - public MessageClientProducers getSncpMessageClientProducer() { + public MessageClientProducer getSncpMessageClientProducer() { if (this.sncpProducer == null) { sncpProducerLock.lock(); try { if (this.sncpProducer == null) { - long s = System.currentTimeMillis(); - MessageClientProducer[] producers = new MessageClientProducer[producerCount]; - for (int i = 0; i < producers.length; i++) { - MessageClientProducer producer = createMessageClientProducer("SncpProducer"); - producer.startup().join(); - producers[i] = producer; - } + long s = System.currentTimeMillis(); + this.sncpProducer = createMessageClientProducer("SncpProducer"); long e = System.currentTimeMillis() - s; if (logger.isLoggable(Level.FINEST)) { logger.log(Level.FINEST, "MessageAgent.SncpProducer startup all in " + e + "ms"); } - this.sncpProducer = new MessageClientProducers(producers); } } finally { sncpProducerLock.unlock(); @@ -252,23 +243,17 @@ public abstract class MessageAgent implements Resourcable { return this.sncpProducer; } - public MessageClientProducers getHttpMessageClientProducer() { + public MessageClientProducer getHttpMessageClientProducer() { if (this.httpProducer == null) { httpProducerLock.lock(); try { if (this.httpProducer == null) { long s = System.currentTimeMillis(); - MessageClientProducer[] producers = new MessageClientProducer[producerCount]; - for (int i = 0; i < producers.length; i++) { - MessageClientProducer producer = createMessageClientProducer("HttpProducer"); - producer.startup().join(); - producers[i] = producer; - } + this.httpProducer = createMessageClientProducer("HttpProducer"); long e = System.currentTimeMillis() - s; if (logger.isLoggable(Level.FINEST)) { logger.log(Level.FINEST, "MessageAgent.HttpProducer startup all in " + e + "ms"); - } - this.httpProducer = new MessageClientProducers(producers); + } } } finally { httpProducerLock.unlock(); diff --git a/src/main/java/org/redkale/mq/MessageClient.java b/src/main/java/org/redkale/mq/MessageClient.java index da328408d..47a0dd82a 100644 --- a/src/main/java/org/redkale/mq/MessageClient.java +++ b/src/main/java/org/redkale/mq/MessageClient.java @@ -56,6 +56,10 @@ public abstract class MessageClient { return this.respConsumer.shutdown(); } + protected CompletableFuture sendMessage(final MessageRecord message, boolean needresp) { + return sendMessage(message, needresp, null); + } + protected CompletableFuture sendMessage(final MessageRecord message, boolean needresp, LongAdder counter) { CompletableFuture future = new CompletableFuture<>(); boolean finest = messageAgent != null && messageAgent.logger.isLoggable(Level.FINEST); @@ -136,46 +140,56 @@ public abstract class MessageClient { return message; } - protected abstract MessageClientProducers getProducer(); + protected abstract MessageClientProducer getProducer(); public MessageRecord createMessageRecord(String resptopic, String content) { - return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, null, null, resptopic, Traces.currTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8)); + return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, + null, null, resptopic, Traces.currTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8)); } public MessageRecord createMessageRecord(String topic, String resptopic, String content) { - return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, Traces.currTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8)); + return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, + null, topic, resptopic, Traces.currTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8)); } public MessageRecord createMessageRecord(String topic, String resptopic, String traceid, String content) { - return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, traceid, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); + return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, + null, topic, resptopic, traceid, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); } public MessageRecord createMessageRecord(int userid, String topic, String resptopic, String content) { - return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, Traces.currTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8)); + return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), userid, + null, topic, resptopic, Traces.currTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8)); } public MessageRecord createMessageRecord(int userid, String topic, String resptopic, String traceid, String content) { - return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, traceid, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); + return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), userid, + null, topic, resptopic, traceid, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); } public MessageRecord createMessageRecord(String topic, String resptopic, Convert convert, Object bean) { - return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, Traces.currTraceid(), convert.convertToBytes(bean)); + return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), 0, + null, topic, resptopic, Traces.currTraceid(), convert.convertToBytes(bean)); } public MessageRecord createMessageRecord(String topic, String resptopic, String traceid, Convert convert, Object bean) { - return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, traceid, convert.convertToBytes(bean)); + return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), 0, + null, topic, resptopic, traceid, convert.convertToBytes(bean)); } public MessageRecord createMessageRecord(int userid, String topic, String resptopic, Convert convert, Object bean) { - return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, Traces.currTraceid(), convert.convertToBytes(bean)); + return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, + null, topic, resptopic, Traces.currTraceid(), convert.convertToBytes(bean)); } public MessageRecord createMessageRecord(int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) { - return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, groupid, topic, resptopic, Traces.currTraceid(), convert.convertToBytes(bean)); + return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, + groupid, topic, resptopic, Traces.currTraceid(), convert.convertToBytes(bean)); } public MessageRecord createMessageRecord(int flag, int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) { - return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, Traces.currTraceid(), convert.convertToBytes(bean)); + return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, flag, System.currentTimeMillis(), userid, + groupid, topic, resptopic, Traces.currTraceid(), convert.convertToBytes(bean)); } public MessageRecord createMessageRecord(String topic, String resptopic, byte[] content) { diff --git a/src/main/java/org/redkale/mq/MessageClientProducers.java b/src/main/java/org/redkale/mq/MessageClientProducers.java deleted file mode 100644 index a460a0513..000000000 --- a/src/main/java/org/redkale/mq/MessageClientProducers.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package org.redkale.mq; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * - *

- * 详情见: https://redkale.org - * - * @author zhangjx - * - * @since 2.1.0 - */ -public class MessageClientProducers { - - protected final MessageClientProducer[] producers; - - protected final AtomicInteger index = new AtomicInteger(); - - public MessageClientProducers(MessageClientProducer[] producers) { - this.producers = producers; - } - - public MessageClientProducer getProducer(MessageRecord message) { - if (this.producers.length == 1) { - return this.producers[0]; - } - return producers[Math.abs(index.incrementAndGet()) % producers.length]; - } - - public CompletableFuture apply(MessageRecord message) { - return getProducer(message).apply(message); - } - - public CompletableFuture startup() { - CompletableFuture[] futures = new CompletableFuture[producers.length]; - for (int i = 0; i < producers.length; i++) { - futures[i] = producers[i].startup(); - } - return CompletableFuture.allOf(futures); - } - - public CompletableFuture shutdown() { - CompletableFuture[] futures = new CompletableFuture[producers.length]; - for (int i = 0; i < producers.length; i++) { - futures[i] = producers[i].shutdown(); - } - return CompletableFuture.allOf(futures); - } -} diff --git a/src/main/java/org/redkale/mq/SncpMessageClient.java b/src/main/java/org/redkale/mq/SncpMessageClient.java index e3b0dce2b..ce4b07deb 100644 --- a/src/main/java/org/redkale/mq/SncpMessageClient.java +++ b/src/main/java/org/redkale/mq/SncpMessageClient.java @@ -6,7 +6,6 @@ package org.redkale.mq; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.LongAdder; /** * @@ -25,7 +24,7 @@ public class SncpMessageClient extends MessageClient { } @Override - protected MessageClientProducers getProducer() { + protected MessageClientProducer getProducer() { return messageAgent.getSncpMessageClientProducer(); } @@ -35,22 +34,12 @@ public class SncpMessageClient extends MessageClient { //只发送消息,不需要响应 public final void produceMessage(MessageRecord message) { - produceMessage(message, null); - } - - //只发送消息,不需要响应 - public final void produceMessage(MessageRecord message, LongAdder counter) { - sendMessage(message, false, counter); + sendMessage(message, false); } //发送消息,需要响应 public final CompletableFuture sendMessage(MessageRecord message) { - return sendMessage(message, null); - } - - //发送消息,需要响应 - public final CompletableFuture sendMessage(MessageRecord message, LongAdder counter) { - return sendMessage(message, true, counter); + return sendMessage(message, true); } @Override diff --git a/src/main/java/org/redkale/mq/SncpMessageClientProcessor.java b/src/main/java/org/redkale/mq/SncpMessageClientProcessor.java index 4972ad775..cd0912c63 100644 --- a/src/main/java/org/redkale/mq/SncpMessageClientProcessor.java +++ b/src/main/java/org/redkale/mq/SncpMessageClientProcessor.java @@ -28,7 +28,7 @@ public class SncpMessageClientProcessor implements MessageClientProcessor { protected MessageClient messageClient; - protected final MessageClientProducers producer; + protected final MessageClientProducer producer; protected final NodeSncpServer server; @@ -46,7 +46,7 @@ public class SncpMessageClientProcessor implements MessageClientProcessor { } }; - public SncpMessageClientProcessor(Logger logger, SncpMessageClient messageClient, MessageClientProducers producer, NodeSncpServer server, Service service, SncpServlet servlet) { + public SncpMessageClientProcessor(Logger logger, SncpMessageClient messageClient, MessageClientProducer producer, NodeSncpServer server, Service service, SncpServlet servlet) { this.logger = logger; this.messageClient = messageClient; this.producer = producer; @@ -75,7 +75,7 @@ public class SncpMessageClientProcessor implements MessageClientProcessor { long e = now - starttime; SncpContext context = server.getSncpServer().getContext(); SncpMessageRequest request = new SncpMessageRequest(context, message); - response = new SncpMessageResponse(context, request, callback, messageClient, producer.getProducer(message)); + response = new SncpMessageResponse(context, request, callback, messageClient, producer); context.execute(servlet, request, response); long o = System.currentTimeMillis() - now; @@ -105,7 +105,7 @@ public class SncpMessageClientProcessor implements MessageClientProcessor { } } - public MessageClientProducers getProducer() { + public MessageClientProducer getProducer() { return producer; }