diff --git a/src/org/redkale/boot/Application.java b/src/org/redkale/boot/Application.java index c0d9a3721..aa71337f6 100644 --- a/src/org/redkale/boot/Application.java +++ b/src/org/redkale/boot/Application.java @@ -445,22 +445,7 @@ public final class Application { .addValue(TransportFactory.NAME_PINGINTERVAL, System.getProperty("net.transport.ping.interval", "30")) .addValue(TransportFactory.NAME_CHECKINTERVAL, System.getProperty("net.transport.check.interval", "30")); this.sncpTransportFactory.init(tarnsportConf, Sncp.PING_BUFFER, Sncp.PONG_BUFFER.remaining()); - if (cluster != null) { - cluster.setTransportFactory(this.sncpTransportFactory); - this.resourceFactory.inject(cluster); - cluster.init(cluster.getConfig()); - this.resourceFactory.register(ClusterAgent.class, cluster); - } this.clusterAgent = cluster; - if (mqs != null) { - for (MessageAgent agent : mqs) { - this.resourceFactory.inject(agent); - agent.init(agent.getConfig()); - this.resourceFactory.register(agent.getName(), MessageAgent.class, agent); - this.resourceFactory.register(agent.getName(), HttpMessageClient.class, agent.getHttpMessageClient()); - this.resourceFactory.register(agent.getName(), SncpMessageClient.class, agent.getSncpMessageClient()); - } - } this.messageAgents = mqs; Thread.currentThread().setContextClassLoader(this.classLoader); this.serverClassLoader = new RedkaleClassLoader(this.classLoader); @@ -688,6 +673,29 @@ public final class Application { }, Application.class, ResourceFactory.class, TransportFactory.class, NodeSncpServer.class, NodeHttpServer.class, NodeWatchServer.class); //-------------------------------------------------------------------------- + if (this.clusterAgent != null) { + if (logger.isLoggable(Level.FINER)) logger.log(Level.FINER, "ClusterAgent initing"); + long s = System.currentTimeMillis(); + clusterAgent.setTransportFactory(this.sncpTransportFactory); + this.resourceFactory.inject(clusterAgent); + clusterAgent.init(clusterAgent.getConfig()); + this.resourceFactory.register(ClusterAgent.class, clusterAgent); + logger.info("ClusterAgent init in " + (System.currentTimeMillis() - s) + " ms"); + } + if (this.messageAgents != null) { + if (logger.isLoggable(Level.FINER)) logger.log(Level.FINER, "MessageAgent initing"); + long s = System.currentTimeMillis(); + for (MessageAgent agent : this.messageAgents) { + this.resourceFactory.inject(agent); + agent.init(agent.getConfig()); + this.resourceFactory.register(agent.getName(), MessageAgent.class, agent); + this.resourceFactory.register(agent.getName(), HttpMessageClient.class, agent.getHttpMessageClient()); + this.resourceFactory.register(agent.getName(), SncpMessageClient.class, agent.getSncpMessageClient()); + } + logger.info("MessageAgent init in " + (System.currentTimeMillis() - s) + " ms"); + + } + initResources(); } @@ -859,6 +867,7 @@ public final class Application { timecd.await(); if (this.clusterAgent != null) this.clusterAgent.start(); if (this.messageAgents != null) { + if (logger.isLoggable(Level.FINER)) logger.log(Level.FINER, "MessageAgent starting"); long s = System.currentTimeMillis(); final StringBuffer sb = new StringBuffer(); Set names = new HashSet<>(); @@ -873,7 +882,7 @@ public final class Application { ); } if (sb.length() > 0) logger.info(sb.toString().trim()); - logger.info("MessageAgent(names=" + JsonConvert.root().convertTo(names) + ") init in " + (System.currentTimeMillis() - s) + " ms"); + logger.info("MessageAgent(names=" + JsonConvert.root().convertTo(names) + ") start in " + (System.currentTimeMillis() - s) + " ms"); } //if (!singletonrun) signalHandle(); //if (!singletonrun) clearPersistData(); @@ -1098,12 +1107,13 @@ public final class Application { Collections.reverse(localServers); //倒序, 必须让watchs先关闭,watch包含服务发现和注销逻辑 if (this.messageAgents != null) { Set names = new HashSet<>(); + if (logger.isLoggable(Level.FINER)) logger.log(Level.FINER, "MessageAgent stopping"); long s = System.currentTimeMillis(); for (MessageAgent agent : this.messageAgents) { names.add(agent.getName()); agent.stop().join(); } - logger.info("MessageAgent(names=" + JsonConvert.root().convertTo(names) + ") stop in " + (System.currentTimeMillis() - s) + " ms\r\n"); + logger.info("MessageAgent(names=" + JsonConvert.root().convertTo(names) + ") stop in " + (System.currentTimeMillis() - s) + " ms"); } localServers.stream().forEach((server) -> { try { @@ -1115,17 +1125,21 @@ public final class Application { } }); if (clusterAgent != null) { + if (logger.isLoggable(Level.FINER)) logger.log(Level.FINER, "ClusterAgent destroying"); + long s = System.currentTimeMillis(); clusterAgent.deregister(this); clusterAgent.destroy(clusterAgent.getConfig()); + logger.info("ClusterAgent destroy in " + (System.currentTimeMillis() - s) + " ms"); } if (this.messageAgents != null) { Set names = new HashSet<>(); + if (logger.isLoggable(Level.FINER)) logger.log(Level.FINER, "MessageAgent destroying"); long s = System.currentTimeMillis(); for (MessageAgent agent : this.messageAgents) { names.add(agent.getName()); agent.destroy(agent.getConfig()); } - logger.info("MessageAgent(names=" + JsonConvert.root().convertTo(names) + ") destroy in " + (System.currentTimeMillis() - s) + " ms\r\n"); + logger.info("MessageAgent(names=" + JsonConvert.root().convertTo(names) + ") destroy in " + (System.currentTimeMillis() - s) + " ms"); } for (DataSource source : dataSources) { if (source == null) continue; diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index 41fb3968e..db8a4eef5 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -93,6 +93,10 @@ public abstract class MessageAgent { if (this.httpProducer != null) this.httpProducer.shutdown().join(); } + public Logger getLogger() { + return logger; + } + public String getName() { return name; } @@ -129,7 +133,7 @@ public abstract class MessageAgent { if (this.sncpProducer == null) { synchronized (this) { if (this.sncpProducer == null) { - this.sncpProducer = createProducer(); + this.sncpProducer = createProducer("SncpProducer"); this.sncpProducer.startup().join(); } } @@ -141,7 +145,7 @@ public abstract class MessageAgent { if (this.httpProducer == null) { synchronized (this) { if (this.httpProducer == null) { - this.httpProducer = createProducer(); + this.httpProducer = createProducer("HttpProducer"); this.httpProducer.startup().join(); } } @@ -150,7 +154,7 @@ public abstract class MessageAgent { } //创建指定topic的生产处理器 - protected abstract MessageProducer createProducer(); + protected abstract MessageProducer createProducer(String name); //创建topic,如果已存在则跳过 public abstract boolean createTopic(String... topics); diff --git a/src/org/redkale/mq/MessageConsumer.java b/src/org/redkale/mq/MessageConsumer.java index 70a3433b9..a1ffd20e3 100644 --- a/src/org/redkale/mq/MessageConsumer.java +++ b/src/org/redkale/mq/MessageConsumer.java @@ -29,16 +29,17 @@ public abstract class MessageConsumer { protected final MessageProcessor processor; - protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); + protected final Logger logger; protected volatile boolean closed; - protected MessageConsumer(MessageAgent messageAgent, String[] topics,final String consumerid, MessageProcessor processor) { + protected MessageConsumer(MessageAgent messageAgent, String[] topics, final String consumerid, MessageProcessor processor) { Objects.requireNonNull(messageAgent); Objects.requireNonNull(topics); Objects.requireNonNull(consumerid); Objects.requireNonNull(processor); this.messageAgent = messageAgent; + this.logger = messageAgent.logger; this.topics = topics; this.consumerid = consumerid; this.processor = processor; diff --git a/src/org/redkale/mq/MessageProducer.java b/src/org/redkale/mq/MessageProducer.java index 27462cd67..dcbabecc6 100644 --- a/src/org/redkale/mq/MessageProducer.java +++ b/src/org/redkale/mq/MessageProducer.java @@ -17,12 +17,19 @@ import java.util.logging.Logger; * * @since 2.1.0 */ -public abstract class MessageProducer { +public abstract class MessageProducer { - protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); + protected final Logger logger; + + protected final String name; protected volatile boolean closed; + protected MessageProducer(String name, Logger logger) { + this.name = name; + this.logger = logger; + } + public abstract CompletableFuture apply(MessageRecord message); public abstract CompletableFuture startup();