This commit is contained in:
Redkale
2020-06-10 17:25:52 +08:00
parent b445b99572
commit 03de684940
3 changed files with 32 additions and 10 deletions

View File

@@ -44,7 +44,7 @@ public abstract class MessageAgent {
protected SncpMessageClient sncpMessageClient;
//本地Service消息接收处理器 key:topic
//本地Service消息接收处理器 key:consumer
protected HashMap<String, MessageConsumerNode> messageNodes = new LinkedHashMap<>();
public void init(AnyValue config) {
@@ -58,7 +58,7 @@ public abstract class MessageAgent {
final List<CompletableFuture> futures = new ArrayList<>();
this.messageNodes.values().forEach(node -> {
long s = System.currentTimeMillis();
futures.add(node.consumer.startup().whenComplete((r, t) -> map.put(node.consumer.topic, System.currentTimeMillis() - s)));
futures.add(node.consumer.startup().whenComplete((r, t) -> map.put(node.consumer.consumerid, System.currentTimeMillis() - s)));
});
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).thenApply(r -> map);
}
@@ -139,29 +139,36 @@ public abstract class MessageAgent {
public abstract boolean match(AnyValue config);
//创建指定topic的消费处理器
public abstract MessageConsumer createConsumer(String topic, MessageProcessor processor);
public abstract MessageConsumer createConsumer(String topic, String group, MessageProcessor processor);
public final synchronized void putService(NodeHttpServer ns, Service service, HttpServlet servlet) {
String topic = generateHttpReqTopic(service);
if (messageNodes.containsKey(topic)) throw new RuntimeException("topic(" + topic + ") is repeat");
String consumerid = generateHttpConsumerid(topic, service);
if (messageNodes.containsKey(consumerid)) throw new RuntimeException("consumerid(" + consumerid + ") is repeat");
HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, getProducer(), ns, service, servlet);
this.messageNodes.put(topic, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(topic, processor)));
this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(topic, consumerid, processor)));
}
public final synchronized void putService(NodeSncpServer ns, Service service, SncpServlet servlet) {
String topic = generateSncpReqTopic(service);
if (messageNodes.containsKey(topic)) throw new RuntimeException("topic(" + topic + ") is repeat");
String consumerid = generateSncpConsumerid(topic, service);
if (messageNodes.containsKey(consumerid)) throw new RuntimeException("consumerid(" + consumerid + ") is repeat");
SncpMessageProcessor processor = new SncpMessageProcessor(this.logger, getProducer(), ns, service, servlet);
this.messageNodes.put(topic, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(topic, processor)));
this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(topic, consumerid, processor)));
}
//格式: sncp.req.user
public String generateSncpReqTopic(Service service) {
public final String generateSncpReqTopic(Service service) {
String resname = Sncp.getResourceName(service);
if (service instanceof WebSocketNode) return "sncp.req.ws" + (resname.isEmpty() ? "" : ("-" + resname));
return "sncp.req." + Sncp.getResourceType(service).getSimpleName().replaceAll("Service.*$", "").toLowerCase() + (resname.isEmpty() ? "" : ("-" + resname));
}
//格式: consumer-sncp.req.user 不提供外部使用
protected final String generateSncpConsumerid(String topic, Service service) {
return "consumer-" + topic;
}
//格式: http.req.user
public String generateHttpReqTopic(String module) {
return "http.req." + module.toLowerCase();
@@ -183,6 +190,14 @@ public abstract class MessageAgent {
return "http.req." + Rest.getRestName(service).toLowerCase() + (resname.isEmpty() ? "" : ("-" + resname));
}
//格式: consumer-http.req.user
protected String generateHttpConsumerid(String topic, Service service) {
String resname = Sncp.getResourceName(service);
String key = Rest.getRestName(service).toLowerCase();
return "consumer-http.req." + key + (resname.isEmpty() ? "" : ("-" + resname));
}
//格式: ws.resp.wsgame.node100
public String generateWebSocketRespTopic(WebSocketNode node) {
return "ws.resp." + node.getName() + ".node" + nodeid;

View File

@@ -29,6 +29,8 @@ public class MessageClient {
protected String respTopic;
protected String respConsumerid;
protected ConvertType convertType;
protected MessageClient(MessageAgent messageAgent) {
@@ -61,6 +63,7 @@ public class MessageClient {
try {
if (this.consumer == null) {
synchronized (this) {
if (this.respConsumerid == null) this.respConsumerid = "consumer-" + this.respTopic;
if (this.consumer == null) {
MessageProcessor processor = msg -> {
MessageRespFutureNode node = respNodes.get(msg.getSeqid());
@@ -71,7 +74,7 @@ public class MessageClient {
if (node.getCounter() != null) node.getCounter().decrementAndGet();
node.future.complete(msg);
};
this.consumer = messageAgent.createConsumer(respTopic, processor);
this.consumer = messageAgent.createConsumer(respTopic, respConsumerid, processor);
this.consumer.startup().join();
}
}

View File

@@ -23,6 +23,8 @@ public abstract class MessageConsumer {
protected final String topic;
protected final String consumerid;
protected MessageAgent messageAgent;
protected final MessageProcessor processor;
@@ -31,12 +33,14 @@ public abstract class MessageConsumer {
protected volatile boolean closed;
protected MessageConsumer(MessageAgent messageAgent, String topic, MessageProcessor processor) {
protected MessageConsumer(MessageAgent messageAgent, String topic,final String consumerid, MessageProcessor processor) {
Objects.requireNonNull(messageAgent);
Objects.requireNonNull(topic);
Objects.requireNonNull(consumerid);
Objects.requireNonNull(processor);
this.messageAgent = messageAgent;
this.topic = topic;
this.consumerid = consumerid;
this.processor = processor;
}