From 0ee30aca7e1432796325040a452ffaad8feb8a7a Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Wed, 3 Jun 2020 16:45:28 +0800 Subject: [PATCH] --- src/org/redkale/boot/NodeHttpServer.java | 2 +- src/org/redkale/boot/NodeServer.java | 10 +++--- src/org/redkale/boot/NodeSncpServer.java | 8 ++--- src/org/redkale/mq/MessageAgent.java | 18 +++++++++-- src/org/redkale/mq/SncpMessageProcessor.java | 34 ++++++++++++++++++++ src/org/redkale/mq/SncpRespConsumer.java | 19 +++++++++++ 6 files changed, 79 insertions(+), 12 deletions(-) create mode 100644 src/org/redkale/mq/SncpMessageProcessor.java create mode 100644 src/org/redkale/mq/SncpRespConsumer.java diff --git a/src/org/redkale/boot/NodeHttpServer.java b/src/org/redkale/boot/NodeHttpServer.java index a19844813..16940d250 100644 --- a/src/org/redkale/boot/NodeHttpServer.java +++ b/src/org/redkale/boot/NodeHttpServer.java @@ -361,7 +361,7 @@ public class NodeHttpServer extends NodeServer { } sb.append(" mapping to ").append(Arrays.toString(as.getValue())).append(LINE_SEPARATOR); } - sb.append(threadName).append(" All HttpServlets load cost " + (System.currentTimeMillis() - starts) + " ms" + LINE_SEPARATOR); + sb.append(threadName).append(" All HttpServlets load cost ").append(System.currentTimeMillis() - starts).append(" ms").append(LINE_SEPARATOR); } } } diff --git a/src/org/redkale/boot/NodeServer.java b/src/org/redkale/boot/NodeServer.java index 4cccf4742..19ebf257f 100644 --- a/src/org/redkale/boot/NodeServer.java +++ b/src/org/redkale/boot/NodeServer.java @@ -393,7 +393,7 @@ public abstract class NodeServer { protected void loadService(ClassFilter serviceFilter, ClassFilter otherFilter) throws Exception { if (serviceFilter == null) return; final long starts = System.currentTimeMillis(); - final String threadName = "[" + Thread.currentThread().getName() + "] "; + final String localThreadName = "[" + Thread.currentThread().getName() + "] "; final Set> entrys = (Set) serviceFilter.getAllFilterEntrys(); ResourceFactory regFactory = isSNCP() ? application.getResourceFactory() : resourceFactory; final ResourceFactory appResourceFactory = application.getResourceFactory(); @@ -480,7 +480,7 @@ public abstract class NodeServer { if (sb != null) { remoteServices.forEach(y -> { - sb.append(threadName).append(Sncp.toSimpleString(y, maxNameLength, maxClassNameLength)).append(" load and inject").append(LINE_SEPARATOR); + sb.append(localThreadName).append(Sncp.toSimpleString(y, maxNameLength, maxClassNameLength)).append(" load and inject").append(LINE_SEPARATOR); }); } //----------------- init ----------------- @@ -506,16 +506,16 @@ public abstract class NodeServer { y.init(Sncp.getConf(y)); long e = System.currentTimeMillis() - s; String serstr = Sncp.toSimpleString(y, maxNameLength, maxClassNameLength); - if (slist != null) slist.add(new StringBuilder().append(threadName).append(serstr).append(" load and init in ").append(e).append(" ms").append(LINE_SEPARATOR).toString()); + if (slist != null) slist.add(new StringBuilder().append(localThreadName).append(serstr).append(" load and init in ").append(e).append(" ms").append(LINE_SEPARATOR).toString()); }); if (slist != null && sb != null) { List wlist = new ArrayList<>(slist); //直接使用CopyOnWriteArrayList偶尔会出现莫名的异常(CopyOnWriteArrayList源码1185行) for (String s : wlist) { sb.append(s); } - sb.append(threadName).append("All Services load cost ").append(System.currentTimeMillis() - starts).append(" ms" + LINE_SEPARATOR); + sb.append(localThreadName).append("All Services load cost ").append(System.currentTimeMillis() - starts).append(" ms" + LINE_SEPARATOR); } - if (sb != null && preinite > 10) sb.append(threadName).append("ALL cluster agents load ").append(preinite).append(" ms" + LINE_SEPARATOR); + if (sb != null && preinite > 10) sb.append(localThreadName).append("ALL cluster agents load ").append(preinite).append(" ms" + LINE_SEPARATOR); if (sb != null && sb.length() > 0) logger.log(Level.INFO, sb.toString()); } diff --git a/src/org/redkale/boot/NodeSncpServer.java b/src/org/redkale/boot/NodeSncpServer.java index aa528ee87..350f39275 100644 --- a/src/org/redkale/boot/NodeSncpServer.java +++ b/src/org/redkale/boot/NodeSncpServer.java @@ -58,11 +58,11 @@ public class NodeSncpServer extends NodeServer { //------------------------------------------------------------------- if (sncpServer == null) return; //调试时server才可能为null final StringBuilder sb = logger.isLoggable(Level.FINE) ? new StringBuilder() : null; - final String threadName = "[" + Thread.currentThread().getName() + "] "; + final String localThreadName = "[" + Thread.currentThread().getName() + "] "; List servlets = sncpServer.getSncpServlets(); Collections.sort(servlets); for (SncpServlet en : servlets) { - if (sb != null) sb.append(threadName).append(" Load ").append(en).append(LINE_SEPARATOR); + if (sb != null) sb.append(localThreadName).append(" Load ").append(en).append(LINE_SEPARATOR); } if (sb != null && sb.length() > 0) logger.log(Level.FINE, sb.toString()); } @@ -84,7 +84,7 @@ public class NodeSncpServer extends NodeServer { @SuppressWarnings("unchecked") protected void loadSncpFilter(final AnyValue servletsConf, final ClassFilter classFilter) throws Exception { final StringBuilder sb = logger.isLoggable(Level.INFO) ? new StringBuilder() : null; - final String threadName = "[" + Thread.currentThread().getName() + "] "; + final String localThreadName = "[" + Thread.currentThread().getName() + "] "; List> list = new ArrayList(classFilter.getFilterEntrys()); for (FilterEntry en : list) { Class clazz = (Class) en.getType(); @@ -93,7 +93,7 @@ public class NodeSncpServer extends NodeServer { resourceFactory.inject(filter, this); DefaultAnyValue filterConf = (DefaultAnyValue) en.getProperty(); this.sncpServer.addSncpFilter(filter, filterConf); - if (sb != null) sb.append(threadName).append(" Load ").append(clazz.getName()).append(LINE_SEPARATOR); + if (sb != null) sb.append(localThreadName).append(" Load ").append(clazz.getName()).append(LINE_SEPARATOR); } if (sb != null && sb.length() > 0) logger.log(Level.INFO, sb.toString()); } diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index 800d7acce..4dafd3243 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -42,6 +42,8 @@ public abstract class MessageAgent { protected MessageConsumer sncpRespConsumer; + protected SncpMessageProcessor sncpRespProcessor; + //本地Service消息接收处理器, key:topic protected ConcurrentHashMap httpNodes = new ConcurrentHashMap<>(); @@ -49,15 +51,21 @@ public abstract class MessageAgent { } public CompletableFuture start(final StringBuffer sb) { - AtomicInteger maxlen = new AtomicInteger(); + AtomicInteger maxlen = new AtomicInteger(sncpRespConsumer == null ? 0 : sncpRespConsumer.topic.length()); this.httpNodes.values().forEach(node -> { if (node.consumer.topic.length() > maxlen.get()) maxlen.set(node.consumer.topic.length()); }); + if (this.sncpRespConsumer != null) { + long s = System.currentTimeMillis(); + this.sncpRespConsumer.start(); + this.sncpRespConsumer.waitFor(); + sb.append("MessageConsumer(topic=").append(fillString(this.sncpRespConsumer.topic, maxlen.get())).append(") init and start in ").append(System.currentTimeMillis() - s).append(" ms\r\n"); + } this.httpNodes.values().forEach(node -> { long s = System.currentTimeMillis(); node.consumer.start(); node.consumer.waitFor(); - sb.append("[").append(node.server.getThreadName()).append("] MessageConsumer(topic=").append(fillString(node.consumer.topic, maxlen.get())).append(") init and start in ").append(System.currentTimeMillis() - s).append(" ms\r\n"); + sb.append("MessageConsumer(topic=").append(fillString(node.consumer.topic, maxlen.get())).append(") init and start in ").append(System.currentTimeMillis() - s).append(" ms\r\n"); }); return CompletableFuture.completedFuture(null); } @@ -120,6 +128,12 @@ public abstract class MessageAgent { //创建指定topic的消费处理器 public abstract MessageConsumer createConsumer(String topic, MessageProcessor processor); + public final synchronized void putSncpResp(NodeSncpServer ns) { + if (this.sncpRespConsumer != null) return; + this.sncpRespProcessor = new SncpMessageProcessor(logger, this); + this.sncpRespConsumer = createConsumer(generateSncpRespTopic(), sncpRespProcessor); + } + public final synchronized void putHttpService(NodeHttpServer ns, Service service, HttpServlet servlet) { String topic = generateHttpReqTopic(service); if (httpNodes.containsKey(topic)) throw new RuntimeException("topic(" + topic + ") is repeat"); diff --git a/src/org/redkale/mq/SncpMessageProcessor.java b/src/org/redkale/mq/SncpMessageProcessor.java new file mode 100644 index 000000000..e73084a78 --- /dev/null +++ b/src/org/redkale/mq/SncpMessageProcessor.java @@ -0,0 +1,34 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.mq; + +import java.util.logging.Logger; + +/** + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.1.0 + */ +public class SncpMessageProcessor implements MessageProcessor { + + protected final Logger logger; + + protected final MessageAgent agent; + + public SncpMessageProcessor(Logger logger, MessageAgent agent) { + this.logger = logger; + this.agent = agent; + } + + @Override + public void process(MessageRecord message) { + + } +} diff --git a/src/org/redkale/mq/SncpRespConsumer.java b/src/org/redkale/mq/SncpRespConsumer.java new file mode 100644 index 000000000..80ce97e72 --- /dev/null +++ b/src/org/redkale/mq/SncpRespConsumer.java @@ -0,0 +1,19 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.mq; + +/** + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.1.0 + */ +public class SncpRespConsumer { + +}