From adc2106bec55fb8bf35495044a55bae3f1c2dda6 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Wed, 3 Jun 2020 14:15:42 +0800 Subject: [PATCH] --- src/org/redkale/boot/Application.java | 7 +++++++ src/org/redkale/boot/NodeServer.java | 3 --- src/org/redkale/mq/MessageAgent.java | 6 +++++- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/org/redkale/boot/Application.java b/src/org/redkale/boot/Application.java index d292dab66..3286cca14 100644 --- a/src/org/redkale/boot/Application.java +++ b/src/org/redkale/boot/Application.java @@ -778,6 +778,13 @@ public final class Application { runServers(timecd, others); runServers(timecd, watchs); //必须在所有服务都启动后再启动WATCH服务 timecd.await(); + if (this.messageAgents != null) { + long s = System.currentTimeMillis(); + for (NodeServer ns : servers) { + ns.messageAgents.values().forEach(agent -> agent.start().join()); + } + logger.info(this.getClass().getSimpleName() + " messageagent init in " + (System.currentTimeMillis() - s) + " ms\r\n"); + } //if (!singletonrun) signalHandle(); //if (!singletonrun) clearPersistData(); logger.info(this.getClass().getSimpleName() + " started in " + (System.currentTimeMillis() - startTime) + " ms\r\n"); diff --git a/src/org/redkale/boot/NodeServer.java b/src/org/redkale/boot/NodeServer.java index a426f307e..66640c915 100644 --- a/src/org/redkale/boot/NodeServer.java +++ b/src/org/redkale/boot/NodeServer.java @@ -550,9 +550,6 @@ public abstract class NodeServer { //Server.start执行之后调用 protected void postStartServer(Set localServices, Set remoteServices) { - if (!this.messageAgents.isEmpty()) { //MQ - this.messageAgents.values().forEach(agent -> agent.start()); - } } protected abstract ClassFilter createFilterClassFilter(); diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index 567528fe1..e99a31a57 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -48,7 +48,11 @@ public abstract class MessageAgent { } public CompletableFuture start() { - return null; + this.httpNodes.values().forEach(node -> { + node.consumer.start(); + node.consumer.waitFor(); + }); + return CompletableFuture.completedFuture(null); } public CompletableFuture stop() {