This commit is contained in:
Redkale
2020-06-03 16:23:16 +08:00
parent cc23b44409
commit b994bcbcfb
3 changed files with 35 additions and 4 deletions

View File

@@ -780,10 +780,12 @@ public final class Application {
timecd.await();
if (this.messageAgents != null) {
long s = System.currentTimeMillis();
for (NodeServer ns : servers) {
ns.messageAgents.values().forEach(agent -> agent.start().join());
final StringBuffer sb = new StringBuffer();
for (MessageAgent agent : this.messageAgents) {
agent.start(sb).join();
}
logger.info(this.getClass().getSimpleName() + " messageagent init in " + (System.currentTimeMillis() - s) + " ms\r\n");
if (sb.length() > 0) logger.info(sb.toString());
logger.info(this.getClass().getSimpleName() + " MessageAgent init in " + (System.currentTimeMillis() - s) + " ms\r\n");
}
//if (!singletonrun) signalHandle();
//if (!singletonrun) clearPersistData();
@@ -995,6 +997,13 @@ public final class Application {
}
List<NodeServer> localServers = new ArrayList<>(servers); //顺序sncps, others, watchs
Collections.reverse(localServers); //倒序, 必须让watchs先关闭watch包含服务发现和注销逻辑
if (this.messageAgents != null) {
long s = System.currentTimeMillis();
for (MessageAgent agent : this.messageAgents) {
agent.stop().join();
}
logger.info(this.getClass().getSimpleName() + " MessageAgent stop in " + (System.currentTimeMillis() - s) + " ms\r\n");
}
localServers.stream().forEach((server) -> {
try {
server.shutdown();

View File

@@ -73,6 +73,8 @@ public abstract class NodeServer {
//server节点的配置
protected AnyValue serverConf;
protected final String threadName;
//加载server节点后的拦截器
protected NodeInterceptor interceptor;
@@ -93,6 +95,7 @@ public abstract class NodeServer {
private volatile int maxNameLength = 0;
public NodeServer(Application application, Server server) {
this.threadName = Thread.currentThread().getName();
this.application = application;
this.server = server;
this.resourceFactory = server.getResourceFactory();
@@ -721,4 +724,7 @@ public abstract class NodeServer {
return new LinkedHashSet<>(remoteServices);
}
public String getThreadName() {
return this.threadName;
}
}

View File

@@ -7,6 +7,7 @@ package org.redkale.mq;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import javax.annotation.Resource;
import org.redkale.boot.*;
@@ -47,10 +48,16 @@ public abstract class MessageAgent {
public void init(AnyValue config) {
}
public CompletableFuture<Void> start() {
public CompletableFuture<Void> start(final StringBuffer sb) {
AtomicInteger maxlen = new AtomicInteger();
this.httpNodes.values().forEach(node -> {
if (node.consumer.topic.length() > maxlen.get()) maxlen.set(node.consumer.topic.length());
});
this.httpNodes.values().forEach(node -> {
long s = System.currentTimeMillis();
node.consumer.start();
node.consumer.waitFor();
sb.append("[").append(node.server.getThreadName()).append("] MessageConsumer(topic=").append(fillString(node.consumer.topic, maxlen.get())).append(") init and start in ").append(System.currentTimeMillis() - s).append(" ms\r\n");
});
return CompletableFuture.completedFuture(null);
}
@@ -157,6 +164,15 @@ public abstract class MessageAgent {
return protocol + ".resp.node" + nodeid;
}
protected static String fillString(String value, int maxlen) {
StringBuilder sb = new StringBuilder(maxlen);
sb.append(value);
for (int i = 0; i < maxlen - value.length(); i++) {
sb.append(' ');
}
return sb.toString();
}
protected static class HttpMessageNode {
public final NodeHttpServer server;