This commit is contained in:
Redkale
2020-06-12 09:35:41 +08:00
parent 4cf160e5d3
commit d4bc751a20
4 changed files with 51 additions and 25 deletions

View File

@@ -445,22 +445,7 @@ public final class Application {
.addValue(TransportFactory.NAME_PINGINTERVAL, System.getProperty("net.transport.ping.interval", "30"))
.addValue(TransportFactory.NAME_CHECKINTERVAL, System.getProperty("net.transport.check.interval", "30"));
this.sncpTransportFactory.init(tarnsportConf, Sncp.PING_BUFFER, Sncp.PONG_BUFFER.remaining());
if (cluster != null) {
cluster.setTransportFactory(this.sncpTransportFactory);
this.resourceFactory.inject(cluster);
cluster.init(cluster.getConfig());
this.resourceFactory.register(ClusterAgent.class, cluster);
}
this.clusterAgent = cluster;
if (mqs != null) {
for (MessageAgent agent : mqs) {
this.resourceFactory.inject(agent);
agent.init(agent.getConfig());
this.resourceFactory.register(agent.getName(), MessageAgent.class, agent);
this.resourceFactory.register(agent.getName(), HttpMessageClient.class, agent.getHttpMessageClient());
this.resourceFactory.register(agent.getName(), SncpMessageClient.class, agent.getSncpMessageClient());
}
}
this.messageAgents = mqs;
Thread.currentThread().setContextClassLoader(this.classLoader);
this.serverClassLoader = new RedkaleClassLoader(this.classLoader);
@@ -688,6 +673,29 @@ public final class Application {
}, Application.class, ResourceFactory.class, TransportFactory.class, NodeSncpServer.class, NodeHttpServer.class, NodeWatchServer.class);
//--------------------------------------------------------------------------
if (this.clusterAgent != null) {
if (logger.isLoggable(Level.FINER)) logger.log(Level.FINER, "ClusterAgent initing");
long s = System.currentTimeMillis();
clusterAgent.setTransportFactory(this.sncpTransportFactory);
this.resourceFactory.inject(clusterAgent);
clusterAgent.init(clusterAgent.getConfig());
this.resourceFactory.register(ClusterAgent.class, clusterAgent);
logger.info("ClusterAgent init in " + (System.currentTimeMillis() - s) + " ms");
}
if (this.messageAgents != null) {
if (logger.isLoggable(Level.FINER)) logger.log(Level.FINER, "MessageAgent initing");
long s = System.currentTimeMillis();
for (MessageAgent agent : this.messageAgents) {
this.resourceFactory.inject(agent);
agent.init(agent.getConfig());
this.resourceFactory.register(agent.getName(), MessageAgent.class, agent);
this.resourceFactory.register(agent.getName(), HttpMessageClient.class, agent.getHttpMessageClient());
this.resourceFactory.register(agent.getName(), SncpMessageClient.class, agent.getSncpMessageClient());
}
logger.info("MessageAgent init in " + (System.currentTimeMillis() - s) + " ms");
}
initResources();
}
@@ -859,6 +867,7 @@ public final class Application {
timecd.await();
if (this.clusterAgent != null) this.clusterAgent.start();
if (this.messageAgents != null) {
if (logger.isLoggable(Level.FINER)) logger.log(Level.FINER, "MessageAgent starting");
long s = System.currentTimeMillis();
final StringBuffer sb = new StringBuffer();
Set<String> names = new HashSet<>();
@@ -873,7 +882,7 @@ public final class Application {
);
}
if (sb.length() > 0) logger.info(sb.toString().trim());
logger.info("MessageAgent(names=" + JsonConvert.root().convertTo(names) + ") init in " + (System.currentTimeMillis() - s) + " ms");
logger.info("MessageAgent(names=" + JsonConvert.root().convertTo(names) + ") start in " + (System.currentTimeMillis() - s) + " ms");
}
//if (!singletonrun) signalHandle();
//if (!singletonrun) clearPersistData();
@@ -1098,12 +1107,13 @@ public final class Application {
Collections.reverse(localServers); //倒序, 必须让watchs先关闭watch包含服务发现和注销逻辑
if (this.messageAgents != null) {
Set<String> names = new HashSet<>();
if (logger.isLoggable(Level.FINER)) logger.log(Level.FINER, "MessageAgent stopping");
long s = System.currentTimeMillis();
for (MessageAgent agent : this.messageAgents) {
names.add(agent.getName());
agent.stop().join();
}
logger.info("MessageAgent(names=" + JsonConvert.root().convertTo(names) + ") stop in " + (System.currentTimeMillis() - s) + " ms\r\n");
logger.info("MessageAgent(names=" + JsonConvert.root().convertTo(names) + ") stop in " + (System.currentTimeMillis() - s) + " ms");
}
localServers.stream().forEach((server) -> {
try {
@@ -1115,17 +1125,21 @@ public final class Application {
}
});
if (clusterAgent != null) {
if (logger.isLoggable(Level.FINER)) logger.log(Level.FINER, "ClusterAgent destroying");
long s = System.currentTimeMillis();
clusterAgent.deregister(this);
clusterAgent.destroy(clusterAgent.getConfig());
logger.info("ClusterAgent destroy in " + (System.currentTimeMillis() - s) + " ms");
}
if (this.messageAgents != null) {
Set<String> names = new HashSet<>();
if (logger.isLoggable(Level.FINER)) logger.log(Level.FINER, "MessageAgent destroying");
long s = System.currentTimeMillis();
for (MessageAgent agent : this.messageAgents) {
names.add(agent.getName());
agent.destroy(agent.getConfig());
}
logger.info("MessageAgent(names=" + JsonConvert.root().convertTo(names) + ") destroy in " + (System.currentTimeMillis() - s) + " ms\r\n");
logger.info("MessageAgent(names=" + JsonConvert.root().convertTo(names) + ") destroy in " + (System.currentTimeMillis() - s) + " ms");
}
for (DataSource source : dataSources) {
if (source == null) continue;

View File

@@ -93,6 +93,10 @@ public abstract class MessageAgent {
if (this.httpProducer != null) this.httpProducer.shutdown().join();
}
public Logger getLogger() {
return logger;
}
public String getName() {
return name;
}
@@ -129,7 +133,7 @@ public abstract class MessageAgent {
if (this.sncpProducer == null) {
synchronized (this) {
if (this.sncpProducer == null) {
this.sncpProducer = createProducer();
this.sncpProducer = createProducer("SncpProducer");
this.sncpProducer.startup().join();
}
}
@@ -141,7 +145,7 @@ public abstract class MessageAgent {
if (this.httpProducer == null) {
synchronized (this) {
if (this.httpProducer == null) {
this.httpProducer = createProducer();
this.httpProducer = createProducer("HttpProducer");
this.httpProducer.startup().join();
}
}
@@ -150,7 +154,7 @@ public abstract class MessageAgent {
}
//创建指定topic的生产处理器
protected abstract MessageProducer createProducer();
protected abstract MessageProducer createProducer(String name);
//创建topic如果已存在则跳过
public abstract boolean createTopic(String... topics);

View File

@@ -29,16 +29,17 @@ public abstract class MessageConsumer {
protected final MessageProcessor processor;
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
protected final Logger logger;
protected volatile boolean closed;
protected MessageConsumer(MessageAgent messageAgent, String[] topics,final String consumerid, MessageProcessor processor) {
protected MessageConsumer(MessageAgent messageAgent, String[] topics, final String consumerid, MessageProcessor processor) {
Objects.requireNonNull(messageAgent);
Objects.requireNonNull(topics);
Objects.requireNonNull(consumerid);
Objects.requireNonNull(processor);
this.messageAgent = messageAgent;
this.logger = messageAgent.logger;
this.topics = topics;
this.consumerid = consumerid;
this.processor = processor;

View File

@@ -17,12 +17,19 @@ import java.util.logging.Logger;
*
* @since 2.1.0
*/
public abstract class MessageProducer {
public abstract class MessageProducer {
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
protected final Logger logger;
protected final String name;
protected volatile boolean closed;
protected MessageProducer(String name, Logger logger) {
this.name = name;
this.logger = logger;
}
public abstract CompletableFuture<Void> apply(MessageRecord message);
public abstract CompletableFuture<Void> startup();