diff --git a/src/org/redkale/mq/HttpMessageClient.java b/src/org/redkale/mq/HttpMessageClient.java index 9fefb0622..6f9804b6e 100644 --- a/src/org/redkale/mq/HttpMessageClient.java +++ b/src/org/redkale/mq/HttpMessageClient.java @@ -14,6 +14,7 @@ import org.redkale.convert.json.JsonConvert; import org.redkale.net.http.*; /** + * 不依赖MessageRecord则可兼容RPC方式 * *

* 详情见: https://redkale.org @@ -31,15 +32,18 @@ public class HttpMessageClient extends MessageClient { //格式: http.req.user public String generateHttpReqTopic(String module) { + if (messageAgent == null) return null; //RPC方式下无messageAgent return messageAgent.generateHttpReqTopic(module); } //格式: http.req.user-n10 public String generateHttpReqTopic(String module, String resname) { + if (messageAgent == null) return null; //RPC方式下无messageAgent return messageAgent.generateHttpReqTopic(module, resname); } public String generateHttpReqTopic(HttpSimpleRequest request, String path) { + if (messageAgent == null) return null; //RPC方式下无messageAgent String module = request.getRequestURI(); if (path != null && !path.isEmpty() && module.startsWith(path)) module = module.substring(path.length()); module = module.substring(1); //去掉/ @@ -162,13 +166,13 @@ public class HttpMessageClient extends MessageClient { public final CompletableFuture> sendMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { MessageRecord message = new MessageRecord(convertType, topic, null, HttpSimpleRequestCoder.getInstance().encode(request)); message.userid(userid).groupid(groupid); - return sendMessage(message, counter).thenApply(r -> r.decodeContent(HttpResultCoder.getInstance())); + return sendMessage(message, true, counter).thenApply(r -> r.decodeContent(HttpResultCoder.getInstance())); } public final void produceMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { MessageRecord message = new MessageRecord(convertType, topic, null, HttpSimpleRequestCoder.getInstance().encode(request)); message.userid(userid).groupid(groupid); - produceMessage(message, counter); + sendMessage(message, false, counter); } @Override diff --git a/src/org/redkale/mq/MessageClient.java b/src/org/redkale/mq/MessageClient.java index d2840f905..ef9964948 100644 --- a/src/org/redkale/mq/MessageClient.java +++ b/src/org/redkale/mq/MessageClient.java @@ -42,31 +42,7 @@ public abstract class MessageClient { return this.consumer.shutdown(); } - public String getRespTopic() { - return this.respTopic; - } - - //只发送消息,不需要响应 - public final void produceMessage(MessageRecord message) { - produceMessage(message, null); - } - - //只发送消息,不需要响应 - public final void produceMessage(MessageRecord message, AtomicLong counter) { - sendMessage(message, false, counter); - } - - //发送消息,需要响应 - public final CompletableFuture sendMessage(MessageRecord message) { - return sendMessage(message, null); - } - - //发送消息,需要响应 - public final CompletableFuture sendMessage(MessageRecord message, AtomicLong counter) { - return sendMessage(message, true, counter); - } - - private CompletableFuture sendMessage(MessageRecord message, boolean needresp, AtomicLong counter) { + protected CompletableFuture sendMessage(MessageRecord message, boolean needresp, AtomicLong counter) { CompletableFuture future = new CompletableFuture<>(); try { if (this.consumer == null) { diff --git a/src/org/redkale/mq/SncpMessageClient.java b/src/org/redkale/mq/SncpMessageClient.java index f151bf67a..f12e047e0 100644 --- a/src/org/redkale/mq/SncpMessageClient.java +++ b/src/org/redkale/mq/SncpMessageClient.java @@ -5,6 +5,8 @@ */ package org.redkale.mq; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; import org.redkale.convert.ConvertType; /** @@ -28,4 +30,29 @@ public class SncpMessageClient extends MessageClient { protected MessageProducer getProducer() { return messageAgent.getSncpProducer(); } + + public String getRespTopic() { + return this.respTopic; + } + + //只发送消息,不需要响应 + public final void produceMessage(MessageRecord message) { + produceMessage(message, null); + } + + //只发送消息,不需要响应 + public final void produceMessage(MessageRecord message, AtomicLong counter) { + sendMessage(message, false, counter); + } + + //发送消息,需要响应 + public final CompletableFuture sendMessage(MessageRecord message) { + return sendMessage(message, null); + } + + //发送消息,需要响应 + public final CompletableFuture sendMessage(MessageRecord message, AtomicLong counter) { + return sendMessage(message, true, counter); + } + }