This commit is contained in:
Redkale
2020-06-05 07:50:10 +08:00
parent f1fcbd7396
commit 6dacbafc29
2 changed files with 18 additions and 9 deletions

View File

@@ -24,7 +24,7 @@ import javax.xml.parsers.*;
import org.redkale.boot.ClassFilter.FilterEntry;
import org.redkale.convert.Convert;
import org.redkale.convert.bson.BsonFactory;
import org.redkale.convert.json.JsonFactory;
import org.redkale.convert.json.*;
import org.redkale.mq.MessageAgent;
import org.redkale.net.*;
import org.redkale.net.http.MimeType;
@@ -781,7 +781,9 @@ public final class Application {
if (this.messageAgents != null) {
long s = System.currentTimeMillis();
final StringBuffer sb = new StringBuffer();
Set<String> names = new HashSet<>();
for (MessageAgent agent : this.messageAgents) {
names.add(agent.getName());
Map<String, Long> map = agent.start().join();
AtomicInteger maxlen = new AtomicInteger();
map.keySet().forEach(str -> {
@@ -791,7 +793,7 @@ public final class Application {
);
}
if (sb.length() > 0) logger.info(sb.toString().trim());
logger.info(this.getClass().getSimpleName() + " MessageAgent init in " + (System.currentTimeMillis() - s) + " ms");
logger.info("MessageAgent(names=" + JsonConvert.root().convertTo(names) + ") init in " + (System.currentTimeMillis() - s) + " ms");
}
//if (!singletonrun) signalHandle();
//if (!singletonrun) clearPersistData();
@@ -1015,11 +1017,13 @@ public final class Application {
List<NodeServer> localServers = new ArrayList<>(servers); //顺序sncps, others, watchs
Collections.reverse(localServers); //倒序, 必须让watchs先关闭watch包含服务发现和注销逻辑
if (this.messageAgents != null) {
Set<String> names = new HashSet<>();
long s = System.currentTimeMillis();
for (MessageAgent agent : this.messageAgents) {
names.add(agent.getName());
agent.stop().join();
}
logger.info(this.getClass().getSimpleName() + " MessageAgent stop in " + (System.currentTimeMillis() - s) + " ms\r\n");
logger.info("MessageAgent(names=" + JsonConvert.root().convertTo(names) + ") stop in " + (System.currentTimeMillis() - s) + " ms\r\n");
}
localServers.stream().forEach((server) -> {
try {
@@ -1034,11 +1038,13 @@ public final class Application {
clusterAgent.destroy(clusterAgent.getConfig());
}
if (this.messageAgents != null) {
Set<String> names = new HashSet<>();
long s = System.currentTimeMillis();
for (MessageAgent agent : this.messageAgents) {
names.add(agent.getName());
agent.destroy(agent.getConfig());
}
logger.info(this.getClass().getSimpleName() + " MessageAgent destroy in " + (System.currentTimeMillis() - s) + " ms\r\n");
logger.info("MessageAgent(names=" + JsonConvert.root().convertTo(names) + ") destroy in " + (System.currentTimeMillis() - s) + " ms\r\n");
}
for (DataSource source : dataSources) {
if (source == null) continue;

View File

@@ -73,21 +73,24 @@ public abstract class MessageAgent {
public CompletableFuture<Map<String, Long>> start() {
final LinkedHashMap<String, Long> map = new LinkedHashMap<>();
if (this.sncpRespStartms >= 0) map.put(this.sncpRespConsumer.topic, this.sncpRespStartms);
final List<CompletableFuture> futures = new ArrayList<>();
this.messageNodes.values().forEach(node -> {
long s = System.currentTimeMillis();
node.consumer.startup().join();
map.put(node.consumer.topic, System.currentTimeMillis() - s);
futures.add(node.consumer.startup().whenComplete((r, t) -> map.put(node.consumer.topic, System.currentTimeMillis() - s)));
});
return CompletableFuture.completedFuture(map);
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).thenApply(r -> map);
}
//Application.shutdown 在执行server.shutdown之前执行
public CompletableFuture<Void> stop() {
List<CompletableFuture> futures = new ArrayList<>();
this.messageNodes.values().forEach(node -> {
node.consumer.shutdown().join();
futures.add(node.consumer.shutdown());
});
return CompletableFuture.completedFuture(null);
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
}
//Application.shutdown 在所有server.shutdown执行后执行
public void destroy(AnyValue config) {
if (this.sncpRespConsumer != null) this.sncpRespConsumer.shutdown().join();
if (this.producer != null) this.producer.shutdown().join();