diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index 90473987b..a825a2d6c 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -44,7 +44,7 @@ public abstract class MessageAgent { protected SncpMessageClient sncpMessageClient; - //本地Service消息接收处理器, key:topic + //本地Service消息接收处理器, key:consumer protected HashMap messageNodes = new LinkedHashMap<>(); public void init(AnyValue config) { @@ -58,7 +58,7 @@ public abstract class MessageAgent { final List futures = new ArrayList<>(); this.messageNodes.values().forEach(node -> { long s = System.currentTimeMillis(); - futures.add(node.consumer.startup().whenComplete((r, t) -> map.put(node.consumer.topic, System.currentTimeMillis() - s))); + futures.add(node.consumer.startup().whenComplete((r, t) -> map.put(node.consumer.consumerid, System.currentTimeMillis() - s))); }); return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).thenApply(r -> map); } @@ -139,29 +139,36 @@ public abstract class MessageAgent { public abstract boolean match(AnyValue config); //创建指定topic的消费处理器 - public abstract MessageConsumer createConsumer(String topic, MessageProcessor processor); + public abstract MessageConsumer createConsumer(String topic, String group, MessageProcessor processor); public final synchronized void putService(NodeHttpServer ns, Service service, HttpServlet servlet) { String topic = generateHttpReqTopic(service); - if (messageNodes.containsKey(topic)) throw new RuntimeException("topic(" + topic + ") is repeat"); + String consumerid = generateHttpConsumerid(topic, service); + if (messageNodes.containsKey(consumerid)) throw new RuntimeException("consumerid(" + consumerid + ") is repeat"); HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, getProducer(), ns, service, servlet); - this.messageNodes.put(topic, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(topic, processor))); + this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(topic, consumerid, processor))); } public final synchronized void putService(NodeSncpServer ns, Service service, SncpServlet servlet) { String topic = generateSncpReqTopic(service); - if (messageNodes.containsKey(topic)) throw new RuntimeException("topic(" + topic + ") is repeat"); + String consumerid = generateSncpConsumerid(topic, service); + if (messageNodes.containsKey(consumerid)) throw new RuntimeException("consumerid(" + consumerid + ") is repeat"); SncpMessageProcessor processor = new SncpMessageProcessor(this.logger, getProducer(), ns, service, servlet); - this.messageNodes.put(topic, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(topic, processor))); + this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(topic, consumerid, processor))); } //格式: sncp.req.user - public String generateSncpReqTopic(Service service) { + public final String generateSncpReqTopic(Service service) { String resname = Sncp.getResourceName(service); if (service instanceof WebSocketNode) return "sncp.req.ws" + (resname.isEmpty() ? "" : ("-" + resname)); return "sncp.req." + Sncp.getResourceType(service).getSimpleName().replaceAll("Service.*$", "").toLowerCase() + (resname.isEmpty() ? "" : ("-" + resname)); } + //格式: consumer-sncp.req.user 不提供外部使用 + protected final String generateSncpConsumerid(String topic, Service service) { + return "consumer-" + topic; + } + //格式: http.req.user public String generateHttpReqTopic(String module) { return "http.req." + module.toLowerCase(); @@ -183,6 +190,14 @@ public abstract class MessageAgent { return "http.req." + Rest.getRestName(service).toLowerCase() + (resname.isEmpty() ? "" : ("-" + resname)); } + //格式: consumer-http.req.user + protected String generateHttpConsumerid(String topic, Service service) { + String resname = Sncp.getResourceName(service); + String key = Rest.getRestName(service).toLowerCase(); + return "consumer-http.req." + key + (resname.isEmpty() ? "" : ("-" + resname)); + + } + //格式: ws.resp.wsgame.node100 public String generateWebSocketRespTopic(WebSocketNode node) { return "ws.resp." + node.getName() + ".node" + nodeid; diff --git a/src/org/redkale/mq/MessageClient.java b/src/org/redkale/mq/MessageClient.java index 38a4056de..8b7cd15aa 100644 --- a/src/org/redkale/mq/MessageClient.java +++ b/src/org/redkale/mq/MessageClient.java @@ -29,6 +29,8 @@ public class MessageClient { protected String respTopic; + protected String respConsumerid; + protected ConvertType convertType; protected MessageClient(MessageAgent messageAgent) { @@ -61,6 +63,7 @@ public class MessageClient { try { if (this.consumer == null) { synchronized (this) { + if (this.respConsumerid == null) this.respConsumerid = "consumer-" + this.respTopic; if (this.consumer == null) { MessageProcessor processor = msg -> { MessageRespFutureNode node = respNodes.get(msg.getSeqid()); @@ -71,7 +74,7 @@ public class MessageClient { if (node.getCounter() != null) node.getCounter().decrementAndGet(); node.future.complete(msg); }; - this.consumer = messageAgent.createConsumer(respTopic, processor); + this.consumer = messageAgent.createConsumer(respTopic, respConsumerid, processor); this.consumer.startup().join(); } } diff --git a/src/org/redkale/mq/MessageConsumer.java b/src/org/redkale/mq/MessageConsumer.java index 31cac56de..6ac102729 100644 --- a/src/org/redkale/mq/MessageConsumer.java +++ b/src/org/redkale/mq/MessageConsumer.java @@ -23,6 +23,8 @@ public abstract class MessageConsumer { protected final String topic; + protected final String consumerid; + protected MessageAgent messageAgent; protected final MessageProcessor processor; @@ -31,12 +33,14 @@ public abstract class MessageConsumer { protected volatile boolean closed; - protected MessageConsumer(MessageAgent messageAgent, String topic, MessageProcessor processor) { + protected MessageConsumer(MessageAgent messageAgent, String topic,final String consumerid, MessageProcessor processor) { Objects.requireNonNull(messageAgent); Objects.requireNonNull(topic); + Objects.requireNonNull(consumerid); Objects.requireNonNull(processor); this.messageAgent = messageAgent; this.topic = topic; + this.consumerid = consumerid; this.processor = processor; }