From 6dacbafc29dc2fd438899017d4fa6b302e58efb2 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Fri, 5 Jun 2020 07:50:10 +0800 Subject: [PATCH] --- src/org/redkale/boot/Application.java | 14 ++++++++++---- src/org/redkale/mq/MessageAgent.java | 13 ++++++++----- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/src/org/redkale/boot/Application.java b/src/org/redkale/boot/Application.java index b3040c05d..626bc8bdc 100644 --- a/src/org/redkale/boot/Application.java +++ b/src/org/redkale/boot/Application.java @@ -24,7 +24,7 @@ import javax.xml.parsers.*; import org.redkale.boot.ClassFilter.FilterEntry; import org.redkale.convert.Convert; import org.redkale.convert.bson.BsonFactory; -import org.redkale.convert.json.JsonFactory; +import org.redkale.convert.json.*; import org.redkale.mq.MessageAgent; import org.redkale.net.*; import org.redkale.net.http.MimeType; @@ -781,7 +781,9 @@ public final class Application { if (this.messageAgents != null) { long s = System.currentTimeMillis(); final StringBuffer sb = new StringBuffer(); + Set names = new HashSet<>(); for (MessageAgent agent : this.messageAgents) { + names.add(agent.getName()); Map map = agent.start().join(); AtomicInteger maxlen = new AtomicInteger(); map.keySet().forEach(str -> { @@ -791,7 +793,7 @@ public final class Application { ); } if (sb.length() > 0) logger.info(sb.toString().trim()); - logger.info(this.getClass().getSimpleName() + " MessageAgent init in " + (System.currentTimeMillis() - s) + " ms"); + logger.info("MessageAgent(names=" + JsonConvert.root().convertTo(names) + ") init in " + (System.currentTimeMillis() - s) + " ms"); } //if (!singletonrun) signalHandle(); //if (!singletonrun) clearPersistData(); @@ -1015,11 +1017,13 @@ public final class Application { List localServers = new ArrayList<>(servers); //顺序sncps, others, watchs Collections.reverse(localServers); //倒序, 必须让watchs先关闭,watch包含服务发现和注销逻辑 if (this.messageAgents != null) { + Set names = new HashSet<>(); long s = System.currentTimeMillis(); for (MessageAgent agent : this.messageAgents) { + names.add(agent.getName()); agent.stop().join(); } - logger.info(this.getClass().getSimpleName() + " MessageAgent stop in " + (System.currentTimeMillis() - s) + " ms\r\n"); + logger.info("MessageAgent(names=" + JsonConvert.root().convertTo(names) + ") stop in " + (System.currentTimeMillis() - s) + " ms\r\n"); } localServers.stream().forEach((server) -> { try { @@ -1034,11 +1038,13 @@ public final class Application { clusterAgent.destroy(clusterAgent.getConfig()); } if (this.messageAgents != null) { + Set names = new HashSet<>(); long s = System.currentTimeMillis(); for (MessageAgent agent : this.messageAgents) { + names.add(agent.getName()); agent.destroy(agent.getConfig()); } - logger.info(this.getClass().getSimpleName() + " MessageAgent destroy in " + (System.currentTimeMillis() - s) + " ms\r\n"); + logger.info("MessageAgent(names=" + JsonConvert.root().convertTo(names) + ") destroy in " + (System.currentTimeMillis() - s) + " ms\r\n"); } for (DataSource source : dataSources) { if (source == null) continue; diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index 6ff71b8e6..ce0832ff0 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -73,21 +73,24 @@ public abstract class MessageAgent { public CompletableFuture> start() { final LinkedHashMap map = new LinkedHashMap<>(); if (this.sncpRespStartms >= 0) map.put(this.sncpRespConsumer.topic, this.sncpRespStartms); + final List futures = new ArrayList<>(); this.messageNodes.values().forEach(node -> { long s = System.currentTimeMillis(); - node.consumer.startup().join(); - map.put(node.consumer.topic, System.currentTimeMillis() - s); + futures.add(node.consumer.startup().whenComplete((r, t) -> map.put(node.consumer.topic, System.currentTimeMillis() - s))); }); - return CompletableFuture.completedFuture(map); + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).thenApply(r -> map); } + //Application.shutdown 在执行server.shutdown之前执行 public CompletableFuture stop() { + List futures = new ArrayList<>(); this.messageNodes.values().forEach(node -> { - node.consumer.shutdown().join(); + futures.add(node.consumer.shutdown()); }); - return CompletableFuture.completedFuture(null); + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])); } + //Application.shutdown 在所有server.shutdown执行后执行 public void destroy(AnyValue config) { if (this.sncpRespConsumer != null) this.sncpRespConsumer.shutdown().join(); if (this.producer != null) this.producer.shutdown().join();