From c43325e40244a97fa0c4e775e48b448522c3e39b Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Mon, 15 Jun 2020 18:01:49 +0800 Subject: [PATCH] --- src/org/redkale/mq/HttpMessageClient.java | 122 +++++++++++----------- src/org/redkale/mq/MessageClient.java | 22 ++-- 2 files changed, 77 insertions(+), 67 deletions(-) diff --git a/src/org/redkale/mq/HttpMessageClient.java b/src/org/redkale/mq/HttpMessageClient.java index 3efa0573d..4a85a71fc 100644 --- a/src/org/redkale/mq/HttpMessageClient.java +++ b/src/org/redkale/mq/HttpMessageClient.java @@ -39,102 +39,104 @@ public class HttpMessageClient extends MessageClient { return messageAgent.generateHttpReqTopic(module); } + public final void produceMessage(HttpSimpleRequest request) { + produceMessage(generateHttpReqTopic(request, null), ConvertType.JSON, 0, null, request, null); + } + + public final void produceMessage(HttpSimpleRequest request, AtomicLong counter) { + produceMessage(generateHttpReqTopic(request, null), ConvertType.JSON, 0, null, request, counter); + } + + public final void produceMessage(int userid, String groupid, HttpSimpleRequest request) { + produceMessage(generateHttpReqTopic(request, null), ConvertType.JSON, userid, groupid, request, null); + } + + public final void produceMessage(int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { + produceMessage(generateHttpReqTopic(request, null), ConvertType.JSON, userid, groupid, request, counter); + } + + public final void produceMessage(String topic, HttpSimpleRequest request) { + produceMessage(topic, ConvertType.JSON, 0, null, request, null); + } + + public final void produceMessage(String topic, HttpSimpleRequest request, AtomicLong counter) { + produceMessage(topic, ConvertType.JSON, 0, null, request, counter); + } + + public final void produceMessage(String topic, ConvertType convertType, HttpSimpleRequest request) { + produceMessage(topic, convertType, 0, null, request, null); + } + + public final void produceMessage(String topic, ConvertType convertType, HttpSimpleRequest request, AtomicLong counter) { + produceMessage(topic, convertType, 0, null, request, counter); + } + + public final void produceMessage(String topic, int userid, String groupid, HttpSimpleRequest request) { + produceMessage(topic, ConvertType.JSON, userid, groupid, request, null); + } + + public final void produceMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { + produceMessage(topic, ConvertType.JSON, userid, groupid, request, counter); + } + + public final void produceMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request) { + produceMessage(topic, convertType, userid, groupid, request, null); + } + public final CompletableFuture> sendMessage(HttpSimpleRequest request) { - return sendMessage(generateHttpReqTopic(request, null), ConvertType.JSON, 0, null, request, true, null); + return sendMessage(generateHttpReqTopic(request, null), ConvertType.JSON, 0, null, request, null); } public final CompletableFuture> sendMessage(HttpSimpleRequest request, AtomicLong counter) { - return sendMessage(generateHttpReqTopic(request, null), ConvertType.JSON, 0, null, request, true, counter); - } - - public final CompletableFuture> sendMessage(HttpSimpleRequest request, boolean needresp) { - return sendMessage(generateHttpReqTopic(request, null), ConvertType.JSON, 0, null, request, needresp, null); - } - - public final CompletableFuture> sendMessage(HttpSimpleRequest request, boolean needresp, AtomicLong counter) { - return sendMessage(generateHttpReqTopic(request, null), ConvertType.JSON, 0, null, request, needresp, counter); + return sendMessage(generateHttpReqTopic(request, null), ConvertType.JSON, 0, null, request, counter); } public final CompletableFuture> sendMessage(int userid, String groupid, HttpSimpleRequest request) { - return sendMessage(generateHttpReqTopic(request, null), ConvertType.JSON, userid, groupid, request, true, null); + return sendMessage(generateHttpReqTopic(request, null), ConvertType.JSON, userid, groupid, request, null); } public final CompletableFuture> sendMessage(int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { - return sendMessage(generateHttpReqTopic(request, null), ConvertType.JSON, userid, groupid, request, true, counter); - } - - public final CompletableFuture> sendMessage(int userid, String groupid, HttpSimpleRequest request, boolean needresp) { - return sendMessage(generateHttpReqTopic(request, null), ConvertType.JSON, userid, groupid, request, needresp, null); - } - - public final CompletableFuture> sendMessage(int userid, String groupid, HttpSimpleRequest request, boolean needresp, AtomicLong counter) { - return sendMessage(generateHttpReqTopic(request, null), ConvertType.JSON, userid, groupid, request, needresp, counter); + return sendMessage(generateHttpReqTopic(request, null), ConvertType.JSON, userid, groupid, request, counter); } public final CompletableFuture> sendMessage(String topic, HttpSimpleRequest request) { - return sendMessage(topic, ConvertType.JSON, 0, null, request, true, null); + return sendMessage(topic, ConvertType.JSON, 0, null, request, null); } public final CompletableFuture> sendMessage(String topic, HttpSimpleRequest request, AtomicLong counter) { - return sendMessage(topic, ConvertType.JSON, 0, null, request, true, counter); - } - - public final CompletableFuture> sendMessage(String topic, HttpSimpleRequest request, boolean needresp) { - return sendMessage(topic, ConvertType.JSON, 0, null, request, needresp, null); - } - - public final CompletableFuture> sendMessage(String topic, HttpSimpleRequest request, boolean needresp, AtomicLong counter) { - return sendMessage(topic, ConvertType.JSON, 0, null, request, needresp, counter); + return sendMessage(topic, ConvertType.JSON, 0, null, request, counter); } public final CompletableFuture> sendMessage(String topic, ConvertType convertType, HttpSimpleRequest request) { - return sendMessage(topic, convertType, 0, null, request, true, null); + return sendMessage(topic, convertType, 0, null, request, null); } public final CompletableFuture> sendMessage(String topic, ConvertType convertType, HttpSimpleRequest request, AtomicLong counter) { - return sendMessage(topic, convertType, 0, null, request, true, counter); - } - - public final CompletableFuture> sendMessage(String topic, ConvertType convertType, HttpSimpleRequest request, boolean needresp) { - return sendMessage(topic, convertType, 0, null, request, needresp, null); - } - - public final CompletableFuture> sendMessage(String topic, ConvertType convertType, HttpSimpleRequest request, boolean needresp, AtomicLong counter) { - return sendMessage(topic, convertType, 0, null, request, needresp, counter); + return sendMessage(topic, convertType, 0, null, request, counter); } public final CompletableFuture> sendMessage(String topic, int userid, String groupid, HttpSimpleRequest request) { - return sendMessage(topic, ConvertType.JSON, userid, groupid, request, true, null); + return sendMessage(topic, ConvertType.JSON, userid, groupid, request, null); } public final CompletableFuture> sendMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { - return sendMessage(topic, ConvertType.JSON, userid, groupid, request, true, counter); - } - - public final CompletableFuture> sendMessage(String topic, int userid, String groupid, HttpSimpleRequest request, boolean needresp) { - return sendMessage(topic, ConvertType.JSON, userid, groupid, request, needresp, null); - } - - public final CompletableFuture> sendMessage(String topic, int userid, String groupid, HttpSimpleRequest request, boolean needresp, AtomicLong counter) { - return sendMessage(topic, ConvertType.JSON, userid, groupid, request, needresp, counter); + return sendMessage(topic, ConvertType.JSON, userid, groupid, request, counter); } public final CompletableFuture> sendMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request) { - return sendMessage(topic, convertType, userid, groupid, request, true, null); + return sendMessage(topic, convertType, userid, groupid, request, null); } public final CompletableFuture> sendMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { - return sendMessage(topic, convertType, userid, groupid, request, true, counter); - } - - public final CompletableFuture> sendMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, boolean needresp) { - return sendMessage(topic, convertType, userid, groupid, request, needresp, null); - } - - public final CompletableFuture> sendMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, boolean needresp, AtomicLong counter) { MessageRecord message = new MessageRecord(convertType, topic, null, HttpSimpleRequestCoder.getInstance().encode(request)); message.userid(userid).groupid(groupid); - return sendMessage(message, needresp, counter).thenApply(r -> r.decodeContent(HttpResultCoder.getInstance())); + return sendMessage(message, counter).thenApply(r -> r.decodeContent(HttpResultCoder.getInstance())); + } + + public final void produceMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { + MessageRecord message = new MessageRecord(convertType, topic, null, HttpSimpleRequestCoder.getInstance().encode(request)); + message.userid(userid).groupid(groupid); + produceMessage(message, counter); } @Override diff --git a/src/org/redkale/mq/MessageClient.java b/src/org/redkale/mq/MessageClient.java index 4ffbf3382..d2840f905 100644 --- a/src/org/redkale/mq/MessageClient.java +++ b/src/org/redkale/mq/MessageClient.java @@ -46,19 +46,27 @@ public abstract class MessageClient { return this.respTopic; } - public final CompletableFuture sendMessage(MessageRecord message) { - return sendMessage(message, true, null); + //只发送消息,不需要响应 + public final void produceMessage(MessageRecord message) { + produceMessage(message, null); } + //只发送消息,不需要响应 + public final void produceMessage(MessageRecord message, AtomicLong counter) { + sendMessage(message, false, counter); + } + + //发送消息,需要响应 + public final CompletableFuture sendMessage(MessageRecord message) { + return sendMessage(message, null); + } + + //发送消息,需要响应 public final CompletableFuture sendMessage(MessageRecord message, AtomicLong counter) { return sendMessage(message, true, counter); } - public final CompletableFuture sendMessage(MessageRecord message, boolean needresp) { - return sendMessage(message, needresp, null); - } - - public final CompletableFuture sendMessage(MessageRecord message, boolean needresp, AtomicLong counter) { + private CompletableFuture sendMessage(MessageRecord message, boolean needresp, AtomicLong counter) { CompletableFuture future = new CompletableFuture<>(); try { if (this.consumer == null) {