diff --git a/src/org/redkale/boot/Application.java b/src/org/redkale/boot/Application.java index 3286cca14..b4b3517c8 100644 --- a/src/org/redkale/boot/Application.java +++ b/src/org/redkale/boot/Application.java @@ -780,10 +780,12 @@ public final class Application { timecd.await(); if (this.messageAgents != null) { long s = System.currentTimeMillis(); - for (NodeServer ns : servers) { - ns.messageAgents.values().forEach(agent -> agent.start().join()); + final StringBuffer sb = new StringBuffer(); + for (MessageAgent agent : this.messageAgents) { + agent.start(sb).join(); } - logger.info(this.getClass().getSimpleName() + " messageagent init in " + (System.currentTimeMillis() - s) + " ms\r\n"); + if (sb.length() > 0) logger.info(sb.toString()); + logger.info(this.getClass().getSimpleName() + " MessageAgent init in " + (System.currentTimeMillis() - s) + " ms\r\n"); } //if (!singletonrun) signalHandle(); //if (!singletonrun) clearPersistData(); @@ -995,6 +997,13 @@ public final class Application { } List localServers = new ArrayList<>(servers); //顺序sncps, others, watchs Collections.reverse(localServers); //倒序, 必须让watchs先关闭,watch包含服务发现和注销逻辑 + if (this.messageAgents != null) { + long s = System.currentTimeMillis(); + for (MessageAgent agent : this.messageAgents) { + agent.stop().join(); + } + logger.info(this.getClass().getSimpleName() + " MessageAgent stop in " + (System.currentTimeMillis() - s) + " ms\r\n"); + } localServers.stream().forEach((server) -> { try { server.shutdown(); diff --git a/src/org/redkale/boot/NodeServer.java b/src/org/redkale/boot/NodeServer.java index 66640c915..4cccf4742 100644 --- a/src/org/redkale/boot/NodeServer.java +++ b/src/org/redkale/boot/NodeServer.java @@ -73,6 +73,8 @@ public abstract class NodeServer { //server节点的配置 protected AnyValue serverConf; + protected final String threadName; + //加载server节点后的拦截器 protected NodeInterceptor interceptor; @@ -93,6 +95,7 @@ public abstract class NodeServer { private volatile int maxNameLength = 0; public NodeServer(Application application, Server server) { + this.threadName = Thread.currentThread().getName(); this.application = application; this.server = server; this.resourceFactory = server.getResourceFactory(); @@ -721,4 +724,7 @@ public abstract class NodeServer { return new LinkedHashSet<>(remoteServices); } + public String getThreadName() { + return this.threadName; + } } diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index 3141928a5..800d7acce 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -7,6 +7,7 @@ package org.redkale.mq; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; import javax.annotation.Resource; import org.redkale.boot.*; @@ -47,10 +48,16 @@ public abstract class MessageAgent { public void init(AnyValue config) { } - public CompletableFuture start() { + public CompletableFuture start(final StringBuffer sb) { + AtomicInteger maxlen = new AtomicInteger(); this.httpNodes.values().forEach(node -> { + if (node.consumer.topic.length() > maxlen.get()) maxlen.set(node.consumer.topic.length()); + }); + this.httpNodes.values().forEach(node -> { + long s = System.currentTimeMillis(); node.consumer.start(); node.consumer.waitFor(); + sb.append("[").append(node.server.getThreadName()).append("] MessageConsumer(topic=").append(fillString(node.consumer.topic, maxlen.get())).append(") init and start in ").append(System.currentTimeMillis() - s).append(" ms\r\n"); }); return CompletableFuture.completedFuture(null); } @@ -157,6 +164,15 @@ public abstract class MessageAgent { return protocol + ".resp.node" + nodeid; } + protected static String fillString(String value, int maxlen) { + StringBuilder sb = new StringBuilder(maxlen); + sb.append(value); + for (int i = 0; i < maxlen - value.length(); i++) { + sb.append(' '); + } + return sb.toString(); + } + protected static class HttpMessageNode { public final NodeHttpServer server;