This commit is contained in:
Redkale
2020-06-09 21:22:17 +08:00
parent cf23ecc12c
commit a58c13cd9f
8 changed files with 303 additions and 63 deletions

View File

@@ -25,7 +25,7 @@ import org.redkale.boot.ClassFilter.FilterEntry;
import org.redkale.convert.Convert;
import org.redkale.convert.bson.BsonFactory;
import org.redkale.convert.json.*;
import org.redkale.mq.MessageAgent;
import org.redkale.mq.*;
import org.redkale.net.*;
import org.redkale.net.http.MimeType;
import org.redkale.net.sncp.*;
@@ -441,6 +441,8 @@ public final class Application {
this.resourceFactory.inject(agent);
agent.init(agent.getConfig());
this.resourceFactory.register(agent.getName(), MessageAgent.class, agent);
this.resourceFactory.register(agent.getName(), HttpMessageClient.class, agent.getHttpMessageClient());
this.resourceFactory.register(agent.getName(), SncpMessageClient.class, agent.getSncpMessageClient());
}
}
this.messageAgents = mqs;
@@ -833,7 +835,7 @@ public final class Application {
map.keySet().forEach(str -> {
if (str.length() > maxlen.get()) maxlen.set(str.length());
});
new TreeMap<String, Long>(map).forEach((topic, ms) -> sb.append("MessageConsumer(topic=").append(alignString(topic, maxlen.get())).append(") init and start in ").append(ms).append(" ms\r\n")
new TreeMap<>(map).forEach((topic, ms) -> sb.append("MessageConsumer(topic=").append(alignString(topic, maxlen.get())).append(") init and start in ").append(ms).append(" ms\r\n")
);
}
if (sb.length() > 0) logger.info(sb.toString().trim());

View File

@@ -533,8 +533,8 @@ public abstract class NodeServer {
}
if (isSNCP() && !sncpRemoteAgents.isEmpty()) {
sncpRemoteAgents.values().forEach(agent -> {
agent.putSncpResp((NodeSncpServer) this);
agent.startSncpRespConsumer();
// agent.putSncpResp((NodeSncpServer) this);
// agent.startSncpRespConsumer();
});
}
//----------------- init -----------------

View File

@@ -0,0 +1,107 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.mq;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import org.redkale.convert.ConvertType;
import org.redkale.net.http.*;
/**
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.1.0
*/
public class HttpMessageClient extends MessageClient {
protected HttpMessageClient(MessageAgent messageAgent) {
super(messageAgent);
this.respTopic = messageAgent.generateHttpRespTopic();
}
//格式: http.req.user
public String generateHttpReqTopic(String module) {
return messageAgent.generateHttpReqTopic(module);
}
public String generateHttpReqTopic(HttpSimpleRequest request, String path) {
String module = request.getRequestURI();
if (path != null && !path.isEmpty() && module.startsWith(path)) module = module.substring(path.length());
module = module.substring(1); //去掉/
module = module.substring(0, module.indexOf('/'));
return messageAgent.generateHttpReqTopic(module);
}
public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, HttpSimpleRequest request) {
return sendMessage(topic, ConvertType.JSON, 0, null, request, true, null);
}
public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, HttpSimpleRequest request, AtomicLong counter) {
return sendMessage(topic, ConvertType.JSON, 0, null, request, true, counter);
}
public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, HttpSimpleRequest request, boolean needresp) {
return sendMessage(topic, ConvertType.JSON, 0, null, request, needresp, null);
}
public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, HttpSimpleRequest request, boolean needresp, AtomicLong counter) {
return sendMessage(topic, ConvertType.JSON, 0, null, request, needresp, counter);
}
public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, ConvertType convertType, HttpSimpleRequest request) {
return sendMessage(topic, convertType, 0, null, request, true, null);
}
public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, ConvertType convertType, HttpSimpleRequest request, AtomicLong counter) {
return sendMessage(topic, convertType, 0, null, request, true, counter);
}
public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, ConvertType convertType, HttpSimpleRequest request, boolean needresp) {
return sendMessage(topic, convertType, 0, null, request, needresp, null);
}
public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, ConvertType convertType, HttpSimpleRequest request, boolean needresp, AtomicLong counter) {
return sendMessage(topic, convertType, 0, null, request, needresp, counter);
}
public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, int userid, String groupid, HttpSimpleRequest request) {
return sendMessage(topic, ConvertType.JSON, userid, groupid, request, true, null);
}
public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
return sendMessage(topic, ConvertType.JSON, userid, groupid, request, true, counter);
}
public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, int userid, String groupid, HttpSimpleRequest request, boolean needresp) {
return sendMessage(topic, ConvertType.JSON, userid, groupid, request, needresp, null);
}
public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, int userid, String groupid, HttpSimpleRequest request, boolean needresp, AtomicLong counter) {
return sendMessage(topic, ConvertType.JSON, userid, groupid, request, needresp, counter);
}
public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request) {
return sendMessage(topic, convertType, userid, groupid, request, true, null);
}
public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
return sendMessage(topic, convertType, userid, groupid, request, true, counter);
}
public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, boolean needresp) {
return sendMessage(topic, convertType, userid, groupid, request, needresp, null);
}
public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, ConvertType convertType, int userid, String groupid, HttpSimpleRequest request, boolean needresp, AtomicLong counter) {
MessageRecord message = new MessageRecord(convertType, topic, null, HttpSimpleRequestCoder.getInstance().encode(request));
message.userid(userid).groupid(groupid);
return sendMessage(message, needresp, counter).thenApply(r -> r.decodeContent(HttpResultCoder.getInstance()));
}
}

View File

@@ -7,12 +7,10 @@ package org.redkale.mq;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.logging.Logger;
import javax.annotation.Resource;
import org.redkale.boot.*;
import static org.redkale.boot.Application.RESNAME_APP_NODEID;
import org.redkale.convert.ConvertType;
import org.redkale.net.Servlet;
import org.redkale.net.http.*;
import org.redkale.net.sncp.*;
@@ -42,41 +40,41 @@ public abstract class MessageAgent {
protected MessageProducer producer;
protected String sncpRespTopic;
protected HttpMessageClient httpMessageClient;
protected MessageConsumer sncpRespConsumer;
protected SncpRespProcessor sncpRespProcessor;
protected SncpMessageClient sncpMessageClient;
//protected MessageConsumer sncpRespConsumer;
//protected SncpRespProcessor sncpRespProcessor;
//sncpRespConsumer启动耗时 小于0表示未启动
protected long sncpRespStartms = -1;
//protected long sncpRespStartms = -1;
//本地Service消息接收处理器 key:topic
protected HashMap<String, MessageNode> messageNodes = new LinkedHashMap<>();
protected HashMap<String, MessageConsumerNode> messageNodes = new LinkedHashMap<>();
public void init(AnyValue config) {
this.name = checkName(config.getValue("name", ""));
this.httpMessageClient = new HttpMessageClient(this);
this.sncpMessageClient = new SncpMessageClient(this);
}
//ServiceLoader时判断配置是否符合当前实现类
public abstract boolean match(AnyValue config);
public final CompletableFuture<MessageRecord> createSncpRespFuture(AtomicLong counter, MessageRecord message) {
return this.sncpRespProcessor.createFuture(message.getSeqid(), counter);
}
public final synchronized void startSncpRespConsumer() {
if (this.sncpRespStartms >= 0) return;
long s = System.currentTimeMillis();
if (this.sncpRespConsumer != null) {
this.sncpRespConsumer.startup().join();
}
this.sncpRespStartms = System.currentTimeMillis() - s;
}
// public final CompletableFuture<MessageRecord> createSncpRespFuture2(AtomicLong counter, MessageRecord message) {
// return this.sncpRespProcessor.createFuture2(message.getSeqid(), counter);
// }
//
// public final synchronized void startSncpRespConsumer() {
// if (this.sncpRespStartms >= 0) return;
// long s = System.currentTimeMillis();
// if (this.sncpRespConsumer != null) {
// this.sncpRespConsumer.startup().join();
// }
// this.sncpRespStartms = System.currentTimeMillis() - s;
// }
public CompletableFuture<Map<String, Long>> start() {
final LinkedHashMap<String, Long> map = new LinkedHashMap<>();
if (this.sncpRespStartms >= 0) map.put(this.sncpRespConsumer.topic, this.sncpRespStartms);
//if (this.sncpRespStartms >= 0) map.put(this.sncpRespConsumer.topic, this.sncpRespStartms);
final List<CompletableFuture> futures = new ArrayList<>();
this.messageNodes.values().forEach(node -> {
long s = System.currentTimeMillis();
@@ -96,7 +94,9 @@ public abstract class MessageAgent {
//Application.shutdown 在所有server.shutdown执行后执行
public void destroy(AnyValue config) {
if (this.sncpRespConsumer != null) this.sncpRespConsumer.shutdown().join();
//if (this.sncpRespConsumer != null) this.sncpRespConsumer.shutdown().join();
this.httpMessageClient.close().join();
this.sncpMessageClient.close().join();
if (this.producer != null) this.producer.shutdown().join();
}
@@ -112,6 +112,14 @@ public abstract class MessageAgent {
this.config = config;
}
public HttpMessageClient getHttpMessageClient() {
return httpMessageClient;
}
public SncpMessageClient getSncpMessageClient() {
return sncpMessageClient;
}
protected String checkName(String name) { //不能含特殊字符
if (name.isEmpty()) return name;
if (name.charAt(0) >= '0' && name.charAt(0) <= '9') throw new RuntimeException("name only 0-9 a-z A-Z _ cannot begin 0-9");
@@ -124,10 +132,14 @@ public abstract class MessageAgent {
}
//获取指定topic的生产处理器
public synchronized MessageProducer getProducer() {
public MessageProducer getProducer() {
if (this.producer == null) {
this.producer = createProducer();
this.producer.startup().join();
synchronized (this) {
if (this.producer == null) {
this.producer = createProducer();
this.producer.startup().join();
}
}
}
return this.producer;
}
@@ -147,36 +159,35 @@ public abstract class MessageAgent {
//创建指定topic的消费处理器
public abstract MessageConsumer createConsumer(String topic, MessageProcessor processor);
public final synchronized void putSncpResp(NodeSncpServer ns) {
if (this.sncpRespConsumer != null) return;
this.sncpRespProcessor = new SncpRespProcessor(this.logger, this);
this.sncpRespConsumer = createConsumer(generateSncpRespTopic(), sncpRespProcessor);
}
public CompletableFuture<MessageRecord> sendRemoteSncp(AtomicLong counter, MessageRecord message) {
if (this.sncpRespConsumer == null) {
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(new RuntimeException("Not open sncp consumer"));
return future;
}
message.setFormat(ConvertType.BSON);
message.setResptopic(generateSncpRespTopic());
getProducer().apply(message);
return this.sncpRespProcessor.createFuture(message.getSeqid(), counter);
}
// public final synchronized void putSncpResp(NodeSncpServer ns) {
// if (this.sncpRespConsumer != null) return;
// this.sncpRespProcessor = new SncpRespProcessor(this.logger, this);
// this.sncpRespConsumer = createConsumer(generateSncpRespTopic(), sncpRespProcessor);
// }
//
// public CompletableFuture<MessageRecord> sendRemoteSncp(AtomicLong counter, MessageRecord message) {
// if (this.sncpRespConsumer == null) {
// CompletableFuture future = new CompletableFuture();
// future.completeExceptionally(new RuntimeException("Not open sncp consumer"));
// return future;
// }
// message.setFormat(ConvertType.BSON);
// message.setResptopic(generateSncpRespTopic());
// getProducer().apply(message);
// return this.sncpRespProcessor.createFuture(message.getSeqid(), counter);
// }
public final synchronized void putService(NodeHttpServer ns, Service service, HttpServlet servlet) {
String topic = generateHttpReqTopic(service);
if (messageNodes.containsKey(topic)) throw new RuntimeException("topic(" + topic + ") is repeat");
HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, getProducer(), ns, service, servlet);
this.messageNodes.put(topic, new MessageNode(ns, service, servlet, processor, createConsumer(topic, processor)));
this.messageNodes.put(topic, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(topic, processor)));
}
public final synchronized void putService(NodeSncpServer ns, Service service, SncpServlet servlet) {
String topic = generateSncpReqTopic(service);
if (messageNodes.containsKey(topic)) throw new RuntimeException("topic(" + topic + ") is repeat");
SncpMessageProcessor processor = new SncpMessageProcessor(this.logger, getProducer(), ns, service, servlet);
this.messageNodes.put(topic, new MessageNode(ns, service, servlet, processor, createConsumer(topic, processor)));
this.messageNodes.put(topic, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(topic, processor)));
}
//格式: sncp.req.user
@@ -186,20 +197,18 @@ public abstract class MessageAgent {
return "sncp.req." + Sncp.getResourceType(service).getSimpleName().replaceAll("Service.*$", "").toLowerCase() + (resname.isEmpty() ? "" : ("-" + resname));
}
//格式: sncp.resp.node10
private String generateSncpRespTopic() {
if (this.sncpRespTopic != null) return this.sncpRespTopic;
this.sncpRespTopic = "sncp.resp.node" + nodeid;
return this.sncpRespTopic;
}
//格式: http.req.user
public String generateHttpReqTopic(String module) {
return "http.req." + module.toLowerCase();
}
//格式: sncp.resp.node10
protected String generateSncpRespTopic() {
return "sncp.resp.node" + nodeid;
}
//格式: http.resp.node10
public String generateHttpRespTopic() {
protected String generateHttpRespTopic() {
return "http.resp.node" + nodeid;
}
@@ -219,7 +228,7 @@ public abstract class MessageAgent {
return protocol + ".resp.node" + nodeid;
}
protected static class MessageNode {
protected static class MessageConsumerNode {
public final NodeServer server;
@@ -231,7 +240,7 @@ public abstract class MessageAgent {
public final MessageConsumer consumer;
public MessageNode(NodeServer server, Service service, Servlet servlet, MessageProcessor processor, MessageConsumer consumer) {
public MessageConsumerNode(NodeServer server, Service service, Servlet servlet, MessageProcessor processor, MessageConsumer consumer) {
this.server = server;
this.service = service;
this.servlet = servlet;

View File

@@ -0,0 +1,93 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.mq;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import org.redkale.convert.ConvertType;
/**
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.1.0
*/
public class MessageClient {
protected final ConcurrentHashMap<Long, MessageRespFutureNode> respNodes = new ConcurrentHashMap<>();
protected final MessageAgent messageAgent;
protected MessageConsumer consumer;
protected String respTopic;
protected ConvertType convertType;
protected MessageClient(MessageAgent messageAgent) {
this.messageAgent = messageAgent;
}
protected CompletableFuture<Void> close() {
if (this.consumer == null) return CompletableFuture.completedFuture(null);
return this.consumer.shutdown();
}
public String getRespTopic() {
return this.respTopic;
}
public final CompletableFuture<MessageRecord> sendMessage(MessageRecord message) {
return sendMessage(message, true, null);
}
public final CompletableFuture<MessageRecord> sendMessage(MessageRecord message, AtomicLong counter) {
return sendMessage(message, true, counter);
}
public final CompletableFuture<MessageRecord> sendMessage(MessageRecord message, boolean needresp) {
return sendMessage(message, needresp, null);
}
public final CompletableFuture<MessageRecord> sendMessage(MessageRecord message, boolean needresp, AtomicLong counter) {
CompletableFuture<MessageRecord> future = new CompletableFuture<>();
try {
if (this.consumer == null) {
synchronized (this) {
if (this.consumer == null) {
MessageProcessor processor = msg -> {
MessageRespFutureNode node = respNodes.get(msg.getSeqid());
if (node == null) {
messageAgent.logger.log(Level.WARNING, MessageClient.this.getClass().getSimpleName() + " process " + msg + " error");
return;
}
if (node.getCounter() != null) node.getCounter().decrementAndGet();
node.future.complete(msg);
};
this.consumer = messageAgent.createConsumer(respTopic, processor);
this.consumer.startup().join();
}
}
}
if (convertType != null) message.setFormat(convertType);
if (needresp && message.getResptopic() == null) message.setResptopic(respTopic);
messageAgent.getProducer().apply(message);
if (counter != null) counter.incrementAndGet();
if (needresp) {
MessageRespFutureNode node = new MessageRespFutureNode(message.getSeqid(), counter, future);
respNodes.put(message.getSeqid(), node);
}
} catch (Exception ex) {
future.completeExceptionally(ex);
} finally {
return future;
}
}
}

View File

@@ -0,0 +1,26 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.mq;
import org.redkale.convert.ConvertType;
/**
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.1.0
*/
public class SncpMessageClient extends MessageClient {
protected SncpMessageClient(MessageAgent messageAgent) {
super(messageAgent);
this.respTopic = messageAgent.generateSncpRespTopic();
this.convertType = ConvertType.BSON;
}
}

View File

@@ -43,7 +43,7 @@ public class SncpRespProcessor implements MessageProcessor {
node.future.complete(message);
}
public CompletableFuture<MessageRecord> createFuture(long seqid, AtomicLong counter) {
public CompletableFuture<MessageRecord> createFuture2(long seqid, AtomicLong counter) {
CompletableFuture<MessageRecord> future = new CompletableFuture<>();
MessageRespFutureNode node = new MessageRespFutureNode(seqid, counter, future);
respNodes.put(seqid, node);

View File

@@ -60,6 +60,8 @@ public final class SncpClient {
protected final MessageAgent messageAgent;
protected final SncpMessageClient messageClient;
protected final String topic;
protected final Supplier<ByteBuffer> bufferSupplier;
@@ -82,6 +84,7 @@ public final class SncpClient {
this.executor = factory.getExecutor();
this.bufferSupplier = factory.getBufferSupplier();
this.messageAgent = messageAgent;
this.messageClient = messageAgent == null ? null : messageAgent.getSncpMessageClient();
this.topic = messageAgent == null ? null : messageAgent.generateSncpReqTopic(service);
Class<?> tn = serviceTypeOrImplClass;
Version ver = tn.getAnnotation(Version.class);
@@ -284,7 +287,7 @@ public final class SncpClient {
String targetTopic = action.topicTargetParamIndex >= 0 ? (String) params[action.topicTargetParamIndex] : this.topic;
if (targetTopic == null) targetTopic = this.topic;
MessageRecord message = new MessageRecord(ConvertType.BSON, targetTopic, null, reqbytes);
return messageAgent.sendRemoteSncp(null, message).thenApply(msg -> {
return messageClient.sendMessage(message).thenApply(msg -> {
ByteBuffer buffer = ByteBuffer.wrap(msg.getContent());
checkResult(seqid, action, buffer);