From c921f657b12f32107db1502d526dc935e658b6bf Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Tue, 30 Jun 2020 20:33:07 +0800 Subject: [PATCH] --- src/org/redkale/mq/MessageAgent.java | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index 2741050f3..acaf0427f 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -8,6 +8,7 @@ package org.redkale.mq; import java.util.*; import java.util.concurrent.*; import java.util.logging.Logger; +import java.util.stream.Collectors; import javax.annotation.Resource; import org.redkale.boot.*; import static org.redkale.boot.Application.RESNAME_APP_NODEID; @@ -97,6 +98,27 @@ public abstract class MessageAgent { if (this.httpProducer != null) this.httpProducer.shutdown().join(); } + protected List getAllMessageConsumer() { + List consumers = new ArrayList<>(); + MessageConsumer one = this.httpMessageClient == null ? null : this.httpMessageClient.consumer; + if (one != null) consumers.add(one); + one = this.sncpMessageClient == null ? null : this.sncpMessageClient.consumer; + if (one != null) consumers.add(one); + consumers.addAll(messageNodes.values().stream().map(mcn -> mcn.consumer).collect(Collectors.toList())); + return consumers; + } + + protected List getAllMessageProducer() { + List producers = new ArrayList<>(); + if (this.httpProducer != null) producers.add(this.httpProducer); + if (this.sncpProducer != null) producers.add(this.sncpProducer); + MessageProducer one = this.httpMessageClient == null ? null : this.httpMessageClient.getProducer(); + if (one != null) producers.add(one); + one = this.sncpMessageClient == null ? null : this.sncpMessageClient.getProducer(); + if (one != null) producers.add(one); + return producers; + } + public Logger getLogger() { return logger; }