This commit is contained in:
Redkale
2020-06-11 19:24:16 +08:00
parent 92bb0a561b
commit 98f29d6a6e
3 changed files with 16 additions and 16 deletions

View File

@@ -150,14 +150,14 @@ public abstract class MessageAgent {
public abstract boolean match(AnyValue config);
//创建指定topic的消费处理器
public abstract MessageConsumer createConsumer(String topic, String group, MessageProcessor processor);
public abstract MessageConsumer createConsumer(String[] topics, String group, MessageProcessor processor);
public final synchronized void putService(NodeHttpServer ns, Service service, HttpServlet servlet) {
String topic = generateHttpReqTopic(service);
String consumerid = generateHttpConsumerid(topic, service);
String[] topics = generateHttpReqTopics(service);
String consumerid = generateHttpConsumerid(topics, service);
if (messageNodes.containsKey(consumerid)) throw new RuntimeException("consumerid(" + consumerid + ") is repeat");
HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, getProducer(), ns, service, servlet);
this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(topic, consumerid, processor)));
this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(topics, consumerid, processor)));
}
public final synchronized void putService(NodeSncpServer ns, Service service, SncpServlet servlet) {
@@ -165,7 +165,7 @@ public abstract class MessageAgent {
String consumerid = generateSncpConsumerid(topic, service);
if (messageNodes.containsKey(consumerid)) throw new RuntimeException("consumerid(" + consumerid + ") is repeat");
SncpMessageProcessor processor = new SncpMessageProcessor(this.logger, getProducer(), ns, service, servlet);
this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(topic, consumerid, processor)));
this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(new String[]{topic}, consumerid, processor)));
}
//格式: sncp.req.user
@@ -199,16 +199,16 @@ public abstract class MessageAgent {
}
//格式: http.req.user
protected String generateHttpReqTopic(Service service) {
protected String[] generateHttpReqTopics(Service service) {
String resname = Sncp.getResourceName(service);
String module = Rest.getRestModule(service).toLowerCase();
MessageMultiConsumer mmc = service.getClass().getAnnotation(MessageMultiConsumer.class);
if (mmc != null) return generateHttpReqTopic(mmc.module()) + (resname.isEmpty() ? "" : ("-" + resname));
return "http.req." + module + (resname.isEmpty() ? "" : ("-" + resname));
if (mmc != null) return new String[]{generateHttpReqTopic(mmc.module()) + (resname.isEmpty() ? "" : ("-" + resname))};
return new String[]{"http.req." + module + (resname.isEmpty() ? "" : ("-" + resname))};
}
//格式: consumer-http.req.user
protected String generateHttpConsumerid(String topic, Service service) {
protected String generateHttpConsumerid(String[] topics, Service service) {
String resname = Sncp.getResourceName(service);
String key = Rest.getRestModule(service).toLowerCase();
return "consumer-http.req." + key + (resname.isEmpty() ? "" : ("-" + resname));

View File

@@ -74,7 +74,7 @@ public class MessageClient {
if (node.getCounter() != null) node.getCounter().decrementAndGet();
node.future.complete(msg);
};
this.consumer = messageAgent.createConsumer(respTopic, respConsumerid, processor);
this.consumer = messageAgent.createConsumer(new String[]{respTopic}, respConsumerid, processor);
this.consumer.startup().join();
}
}

View File

@@ -21,7 +21,7 @@ import java.util.logging.Logger;
*/
public abstract class MessageConsumer {
protected final String topic;
protected final String[] topics;
protected final String consumerid;
@@ -33,13 +33,13 @@ public abstract class MessageConsumer {
protected volatile boolean closed;
protected MessageConsumer(MessageAgent messageAgent, String topic,final String consumerid, MessageProcessor processor) {
protected MessageConsumer(MessageAgent messageAgent, String[] topics,final String consumerid, MessageProcessor processor) {
Objects.requireNonNull(messageAgent);
Objects.requireNonNull(topic);
Objects.requireNonNull(topics);
Objects.requireNonNull(consumerid);
Objects.requireNonNull(processor);
this.messageAgent = messageAgent;
this.topic = topic;
this.topics = topics;
this.consumerid = consumerid;
this.processor = processor;
}
@@ -48,8 +48,8 @@ public abstract class MessageConsumer {
return processor;
}
public String getTopic() {
return topic;
public String[] getTopics() {
return topics;
}
public abstract CompletableFuture<Void> startup();