diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index 86b151e27..b12e317bd 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -150,14 +150,14 @@ public abstract class MessageAgent { public abstract boolean match(AnyValue config); //创建指定topic的消费处理器 - public abstract MessageConsumer createConsumer(String topic, String group, MessageProcessor processor); + public abstract MessageConsumer createConsumer(String[] topics, String group, MessageProcessor processor); public final synchronized void putService(NodeHttpServer ns, Service service, HttpServlet servlet) { - String topic = generateHttpReqTopic(service); - String consumerid = generateHttpConsumerid(topic, service); + String[] topics = generateHttpReqTopics(service); + String consumerid = generateHttpConsumerid(topics, service); if (messageNodes.containsKey(consumerid)) throw new RuntimeException("consumerid(" + consumerid + ") is repeat"); HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, getProducer(), ns, service, servlet); - this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(topic, consumerid, processor))); + this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(topics, consumerid, processor))); } public final synchronized void putService(NodeSncpServer ns, Service service, SncpServlet servlet) { @@ -165,7 +165,7 @@ public abstract class MessageAgent { String consumerid = generateSncpConsumerid(topic, service); if (messageNodes.containsKey(consumerid)) throw new RuntimeException("consumerid(" + consumerid + ") is repeat"); SncpMessageProcessor processor = new SncpMessageProcessor(this.logger, getProducer(), ns, service, servlet); - this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(topic, consumerid, processor))); + this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(new String[]{topic}, consumerid, processor))); } //格式: sncp.req.user @@ -199,16 +199,16 @@ public abstract class MessageAgent { } //格式: http.req.user - protected String generateHttpReqTopic(Service service) { + protected String[] generateHttpReqTopics(Service service) { String resname = Sncp.getResourceName(service); String module = Rest.getRestModule(service).toLowerCase(); MessageMultiConsumer mmc = service.getClass().getAnnotation(MessageMultiConsumer.class); - if (mmc != null) return generateHttpReqTopic(mmc.module()) + (resname.isEmpty() ? "" : ("-" + resname)); - return "http.req." + module + (resname.isEmpty() ? "" : ("-" + resname)); + if (mmc != null) return new String[]{generateHttpReqTopic(mmc.module()) + (resname.isEmpty() ? "" : ("-" + resname))}; + return new String[]{"http.req." + module + (resname.isEmpty() ? "" : ("-" + resname))}; } //格式: consumer-http.req.user - protected String generateHttpConsumerid(String topic, Service service) { + protected String generateHttpConsumerid(String[] topics, Service service) { String resname = Sncp.getResourceName(service); String key = Rest.getRestModule(service).toLowerCase(); return "consumer-http.req." + key + (resname.isEmpty() ? "" : ("-" + resname)); diff --git a/src/org/redkale/mq/MessageClient.java b/src/org/redkale/mq/MessageClient.java index 2524e7103..0993a9473 100644 --- a/src/org/redkale/mq/MessageClient.java +++ b/src/org/redkale/mq/MessageClient.java @@ -74,7 +74,7 @@ public class MessageClient { if (node.getCounter() != null) node.getCounter().decrementAndGet(); node.future.complete(msg); }; - this.consumer = messageAgent.createConsumer(respTopic, respConsumerid, processor); + this.consumer = messageAgent.createConsumer(new String[]{respTopic}, respConsumerid, processor); this.consumer.startup().join(); } } diff --git a/src/org/redkale/mq/MessageConsumer.java b/src/org/redkale/mq/MessageConsumer.java index 6ac102729..70a3433b9 100644 --- a/src/org/redkale/mq/MessageConsumer.java +++ b/src/org/redkale/mq/MessageConsumer.java @@ -21,7 +21,7 @@ import java.util.logging.Logger; */ public abstract class MessageConsumer { - protected final String topic; + protected final String[] topics; protected final String consumerid; @@ -33,13 +33,13 @@ public abstract class MessageConsumer { protected volatile boolean closed; - protected MessageConsumer(MessageAgent messageAgent, String topic,final String consumerid, MessageProcessor processor) { + protected MessageConsumer(MessageAgent messageAgent, String[] topics,final String consumerid, MessageProcessor processor) { Objects.requireNonNull(messageAgent); - Objects.requireNonNull(topic); + Objects.requireNonNull(topics); Objects.requireNonNull(consumerid); Objects.requireNonNull(processor); this.messageAgent = messageAgent; - this.topic = topic; + this.topics = topics; this.consumerid = consumerid; this.processor = processor; } @@ -48,8 +48,8 @@ public abstract class MessageConsumer { return processor; } - public String getTopic() { - return topic; + public String[] getTopics() { + return topics; } public abstract CompletableFuture startup();