diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index 2741050f3..acaf0427f 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -8,6 +8,7 @@ package org.redkale.mq; import java.util.*; import java.util.concurrent.*; import java.util.logging.Logger; +import java.util.stream.Collectors; import javax.annotation.Resource; import org.redkale.boot.*; import static org.redkale.boot.Application.RESNAME_APP_NODEID; @@ -97,6 +98,27 @@ public abstract class MessageAgent { if (this.httpProducer != null) this.httpProducer.shutdown().join(); } + protected List getAllMessageConsumer() { + List consumers = new ArrayList<>(); + MessageConsumer one = this.httpMessageClient == null ? null : this.httpMessageClient.consumer; + if (one != null) consumers.add(one); + one = this.sncpMessageClient == null ? null : this.sncpMessageClient.consumer; + if (one != null) consumers.add(one); + consumers.addAll(messageNodes.values().stream().map(mcn -> mcn.consumer).collect(Collectors.toList())); + return consumers; + } + + protected List getAllMessageProducer() { + List producers = new ArrayList<>(); + if (this.httpProducer != null) producers.add(this.httpProducer); + if (this.sncpProducer != null) producers.add(this.sncpProducer); + MessageProducer one = this.httpMessageClient == null ? null : this.httpMessageClient.getProducer(); + if (one != null) producers.add(one); + one = this.sncpMessageClient == null ? null : this.sncpMessageClient.getProducer(); + if (one != null) producers.add(one); + return producers; + } + public Logger getLogger() { return logger; }