This commit is contained in:
Redkale
2020-06-28 22:49:14 +08:00
parent 73de9dfc33
commit fc74dc33c1
3 changed files with 34 additions and 27 deletions

View File

@@ -14,6 +14,7 @@ import org.redkale.convert.json.JsonConvert;
import org.redkale.net.http.*;
/**
* 不依赖MessageRecord则可兼容RPC方式
*
* <p>
* 详情见: https://redkale.org
@@ -31,15 +32,18 @@ public class HttpMessageClient extends MessageClient {
//格式: http.req.user
public String generateHttpReqTopic(String module) {
if (messageAgent == null) return null; //RPC方式下无messageAgent
return messageAgent.generateHttpReqTopic(module);
}
//格式: http.req.user-n10
public String generateHttpReqTopic(String module, String resname) {
if (messageAgent == null) return null; //RPC方式下无messageAgent
return messageAgent.generateHttpReqTopic(module, resname);
}
public String generateHttpReqTopic(HttpSimpleRequest request, String path) {
if (messageAgent == null) return null; //RPC方式下无messageAgent
String module = request.getRequestURI();
if (path != null && !path.isEmpty() && module.startsWith(path)) module = module.substring(path.length());
module = module.substring(1); //去掉/
@@ -162,13 +166,13 @@ public class HttpMessageClient extends MessageClient {
public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
MessageRecord message = new MessageRecord(convertType, topic, null, HttpSimpleRequestCoder.getInstance().encode(request));
message.userid(userid).groupid(groupid);
return sendMessage(message, counter).thenApply(r -> r.decodeContent(HttpResultCoder.getInstance()));
return sendMessage(message, true, counter).thenApply(r -> r.decodeContent(HttpResultCoder.getInstance()));
}
public final void produceMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
MessageRecord message = new MessageRecord(convertType, topic, null, HttpSimpleRequestCoder.getInstance().encode(request));
message.userid(userid).groupid(groupid);
produceMessage(message, counter);
sendMessage(message, false, counter);
}
@Override

View File

@@ -42,31 +42,7 @@ public abstract class MessageClient {
return this.consumer.shutdown();
}
public String getRespTopic() {
return this.respTopic;
}
//只发送消息,不需要响应
public final void produceMessage(MessageRecord message) {
produceMessage(message, null);
}
//只发送消息,不需要响应
public final void produceMessage(MessageRecord message, AtomicLong counter) {
sendMessage(message, false, counter);
}
//发送消息,需要响应
public final CompletableFuture<MessageRecord> sendMessage(MessageRecord message) {
return sendMessage(message, null);
}
//发送消息,需要响应
public final CompletableFuture<MessageRecord> sendMessage(MessageRecord message, AtomicLong counter) {
return sendMessage(message, true, counter);
}
private CompletableFuture<MessageRecord> sendMessage(MessageRecord message, boolean needresp, AtomicLong counter) {
protected CompletableFuture<MessageRecord> sendMessage(MessageRecord message, boolean needresp, AtomicLong counter) {
CompletableFuture<MessageRecord> future = new CompletableFuture<>();
try {
if (this.consumer == null) {

View File

@@ -5,6 +5,8 @@
*/
package org.redkale.mq;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import org.redkale.convert.ConvertType;
/**
@@ -28,4 +30,29 @@ public class SncpMessageClient extends MessageClient {
protected MessageProducer getProducer() {
return messageAgent.getSncpProducer();
}
public String getRespTopic() {
return this.respTopic;
}
//只发送消息,不需要响应
public final void produceMessage(MessageRecord message) {
produceMessage(message, null);
}
//只发送消息,不需要响应
public final void produceMessage(MessageRecord message, AtomicLong counter) {
sendMessage(message, false, counter);
}
//发送消息,需要响应
public final CompletableFuture<MessageRecord> sendMessage(MessageRecord message) {
return sendMessage(message, null);
}
//发送消息,需要响应
public final CompletableFuture<MessageRecord> sendMessage(MessageRecord message, AtomicLong counter) {
return sendMessage(message, true, counter);
}
}