This commit is contained in:
Redkale
2020-06-03 16:45:28 +08:00
parent b994bcbcfb
commit 0ee30aca7e
6 changed files with 79 additions and 12 deletions

View File

@@ -361,7 +361,7 @@ public class NodeHttpServer extends NodeServer {
} }
sb.append(" mapping to ").append(Arrays.toString(as.getValue())).append(LINE_SEPARATOR); sb.append(" mapping to ").append(Arrays.toString(as.getValue())).append(LINE_SEPARATOR);
} }
sb.append(threadName).append(" All HttpServlets load cost " + (System.currentTimeMillis() - starts) + " ms" + LINE_SEPARATOR); sb.append(threadName).append(" All HttpServlets load cost ").append(System.currentTimeMillis() - starts).append(" ms").append(LINE_SEPARATOR);
} }
} }
} }

View File

@@ -393,7 +393,7 @@ public abstract class NodeServer {
protected void loadService(ClassFilter<? extends Service> serviceFilter, ClassFilter otherFilter) throws Exception { protected void loadService(ClassFilter<? extends Service> serviceFilter, ClassFilter otherFilter) throws Exception {
if (serviceFilter == null) return; if (serviceFilter == null) return;
final long starts = System.currentTimeMillis(); final long starts = System.currentTimeMillis();
final String threadName = "[" + Thread.currentThread().getName() + "] "; final String localThreadName = "[" + Thread.currentThread().getName() + "] ";
final Set<FilterEntry<? extends Service>> entrys = (Set) serviceFilter.getAllFilterEntrys(); final Set<FilterEntry<? extends Service>> entrys = (Set) serviceFilter.getAllFilterEntrys();
ResourceFactory regFactory = isSNCP() ? application.getResourceFactory() : resourceFactory; ResourceFactory regFactory = isSNCP() ? application.getResourceFactory() : resourceFactory;
final ResourceFactory appResourceFactory = application.getResourceFactory(); final ResourceFactory appResourceFactory = application.getResourceFactory();
@@ -480,7 +480,7 @@ public abstract class NodeServer {
if (sb != null) { if (sb != null) {
remoteServices.forEach(y -> { remoteServices.forEach(y -> {
sb.append(threadName).append(Sncp.toSimpleString(y, maxNameLength, maxClassNameLength)).append(" load and inject").append(LINE_SEPARATOR); sb.append(localThreadName).append(Sncp.toSimpleString(y, maxNameLength, maxClassNameLength)).append(" load and inject").append(LINE_SEPARATOR);
}); });
} }
//----------------- init ----------------- //----------------- init -----------------
@@ -506,16 +506,16 @@ public abstract class NodeServer {
y.init(Sncp.getConf(y)); y.init(Sncp.getConf(y));
long e = System.currentTimeMillis() - s; long e = System.currentTimeMillis() - s;
String serstr = Sncp.toSimpleString(y, maxNameLength, maxClassNameLength); String serstr = Sncp.toSimpleString(y, maxNameLength, maxClassNameLength);
if (slist != null) slist.add(new StringBuilder().append(threadName).append(serstr).append(" load and init in ").append(e).append(" ms").append(LINE_SEPARATOR).toString()); if (slist != null) slist.add(new StringBuilder().append(localThreadName).append(serstr).append(" load and init in ").append(e).append(" ms").append(LINE_SEPARATOR).toString());
}); });
if (slist != null && sb != null) { if (slist != null && sb != null) {
List<String> wlist = new ArrayList<>(slist); //直接使用CopyOnWriteArrayList偶尔会出现莫名的异常(CopyOnWriteArrayList源码1185行) List<String> wlist = new ArrayList<>(slist); //直接使用CopyOnWriteArrayList偶尔会出现莫名的异常(CopyOnWriteArrayList源码1185行)
for (String s : wlist) { for (String s : wlist) {
sb.append(s); sb.append(s);
} }
sb.append(threadName).append("All Services load cost ").append(System.currentTimeMillis() - starts).append(" ms" + LINE_SEPARATOR); sb.append(localThreadName).append("All Services load cost ").append(System.currentTimeMillis() - starts).append(" ms" + LINE_SEPARATOR);
} }
if (sb != null && preinite > 10) sb.append(threadName).append("ALL cluster agents load ").append(preinite).append(" ms" + LINE_SEPARATOR); if (sb != null && preinite > 10) sb.append(localThreadName).append("ALL cluster agents load ").append(preinite).append(" ms" + LINE_SEPARATOR);
if (sb != null && sb.length() > 0) logger.log(Level.INFO, sb.toString()); if (sb != null && sb.length() > 0) logger.log(Level.INFO, sb.toString());
} }

View File

@@ -58,11 +58,11 @@ public class NodeSncpServer extends NodeServer {
//------------------------------------------------------------------- //-------------------------------------------------------------------
if (sncpServer == null) return; //调试时server才可能为null if (sncpServer == null) return; //调试时server才可能为null
final StringBuilder sb = logger.isLoggable(Level.FINE) ? new StringBuilder() : null; final StringBuilder sb = logger.isLoggable(Level.FINE) ? new StringBuilder() : null;
final String threadName = "[" + Thread.currentThread().getName() + "] "; final String localThreadName = "[" + Thread.currentThread().getName() + "] ";
List<SncpServlet> servlets = sncpServer.getSncpServlets(); List<SncpServlet> servlets = sncpServer.getSncpServlets();
Collections.sort(servlets); Collections.sort(servlets);
for (SncpServlet en : servlets) { for (SncpServlet en : servlets) {
if (sb != null) sb.append(threadName).append(" Load ").append(en).append(LINE_SEPARATOR); if (sb != null) sb.append(localThreadName).append(" Load ").append(en).append(LINE_SEPARATOR);
} }
if (sb != null && sb.length() > 0) logger.log(Level.FINE, sb.toString()); if (sb != null && sb.length() > 0) logger.log(Level.FINE, sb.toString());
} }
@@ -84,7 +84,7 @@ public class NodeSncpServer extends NodeServer {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected void loadSncpFilter(final AnyValue servletsConf, final ClassFilter<? extends Filter> classFilter) throws Exception { protected void loadSncpFilter(final AnyValue servletsConf, final ClassFilter<? extends Filter> classFilter) throws Exception {
final StringBuilder sb = logger.isLoggable(Level.INFO) ? new StringBuilder() : null; final StringBuilder sb = logger.isLoggable(Level.INFO) ? new StringBuilder() : null;
final String threadName = "[" + Thread.currentThread().getName() + "] "; final String localThreadName = "[" + Thread.currentThread().getName() + "] ";
List<FilterEntry<? extends Filter>> list = new ArrayList(classFilter.getFilterEntrys()); List<FilterEntry<? extends Filter>> list = new ArrayList(classFilter.getFilterEntrys());
for (FilterEntry<? extends Filter> en : list) { for (FilterEntry<? extends Filter> en : list) {
Class<SncpFilter> clazz = (Class<SncpFilter>) en.getType(); Class<SncpFilter> clazz = (Class<SncpFilter>) en.getType();
@@ -93,7 +93,7 @@ public class NodeSncpServer extends NodeServer {
resourceFactory.inject(filter, this); resourceFactory.inject(filter, this);
DefaultAnyValue filterConf = (DefaultAnyValue) en.getProperty(); DefaultAnyValue filterConf = (DefaultAnyValue) en.getProperty();
this.sncpServer.addSncpFilter(filter, filterConf); this.sncpServer.addSncpFilter(filter, filterConf);
if (sb != null) sb.append(threadName).append(" Load ").append(clazz.getName()).append(LINE_SEPARATOR); if (sb != null) sb.append(localThreadName).append(" Load ").append(clazz.getName()).append(LINE_SEPARATOR);
} }
if (sb != null && sb.length() > 0) logger.log(Level.INFO, sb.toString()); if (sb != null && sb.length() > 0) logger.log(Level.INFO, sb.toString());
} }

View File

@@ -42,6 +42,8 @@ public abstract class MessageAgent {
protected MessageConsumer sncpRespConsumer; protected MessageConsumer sncpRespConsumer;
protected SncpMessageProcessor sncpRespProcessor;
//本地Service消息接收处理器 key:topic //本地Service消息接收处理器 key:topic
protected ConcurrentHashMap<String, HttpMessageNode> httpNodes = new ConcurrentHashMap<>(); protected ConcurrentHashMap<String, HttpMessageNode> httpNodes = new ConcurrentHashMap<>();
@@ -49,15 +51,21 @@ public abstract class MessageAgent {
} }
public CompletableFuture<Void> start(final StringBuffer sb) { public CompletableFuture<Void> start(final StringBuffer sb) {
AtomicInteger maxlen = new AtomicInteger(); AtomicInteger maxlen = new AtomicInteger(sncpRespConsumer == null ? 0 : sncpRespConsumer.topic.length());
this.httpNodes.values().forEach(node -> { this.httpNodes.values().forEach(node -> {
if (node.consumer.topic.length() > maxlen.get()) maxlen.set(node.consumer.topic.length()); if (node.consumer.topic.length() > maxlen.get()) maxlen.set(node.consumer.topic.length());
}); });
if (this.sncpRespConsumer != null) {
long s = System.currentTimeMillis();
this.sncpRespConsumer.start();
this.sncpRespConsumer.waitFor();
sb.append("MessageConsumer(topic=").append(fillString(this.sncpRespConsumer.topic, maxlen.get())).append(") init and start in ").append(System.currentTimeMillis() - s).append(" ms\r\n");
}
this.httpNodes.values().forEach(node -> { this.httpNodes.values().forEach(node -> {
long s = System.currentTimeMillis(); long s = System.currentTimeMillis();
node.consumer.start(); node.consumer.start();
node.consumer.waitFor(); node.consumer.waitFor();
sb.append("[").append(node.server.getThreadName()).append("] MessageConsumer(topic=").append(fillString(node.consumer.topic, maxlen.get())).append(") init and start in ").append(System.currentTimeMillis() - s).append(" ms\r\n"); sb.append("MessageConsumer(topic=").append(fillString(node.consumer.topic, maxlen.get())).append(") init and start in ").append(System.currentTimeMillis() - s).append(" ms\r\n");
}); });
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
} }
@@ -120,6 +128,12 @@ public abstract class MessageAgent {
//创建指定topic的消费处理器 //创建指定topic的消费处理器
public abstract MessageConsumer createConsumer(String topic, MessageProcessor processor); public abstract MessageConsumer createConsumer(String topic, MessageProcessor processor);
public final synchronized void putSncpResp(NodeSncpServer ns) {
if (this.sncpRespConsumer != null) return;
this.sncpRespProcessor = new SncpMessageProcessor(logger, this);
this.sncpRespConsumer = createConsumer(generateSncpRespTopic(), sncpRespProcessor);
}
public final synchronized void putHttpService(NodeHttpServer ns, Service service, HttpServlet servlet) { public final synchronized void putHttpService(NodeHttpServer ns, Service service, HttpServlet servlet) {
String topic = generateHttpReqTopic(service); String topic = generateHttpReqTopic(service);
if (httpNodes.containsKey(topic)) throw new RuntimeException("topic(" + topic + ") is repeat"); if (httpNodes.containsKey(topic)) throw new RuntimeException("topic(" + topic + ") is repeat");

View File

@@ -0,0 +1,34 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.mq;
import java.util.logging.Logger;
/**
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.1.0
*/
public class SncpMessageProcessor implements MessageProcessor {
protected final Logger logger;
protected final MessageAgent agent;
public SncpMessageProcessor(Logger logger, MessageAgent agent) {
this.logger = logger;
this.agent = agent;
}
@Override
public void process(MessageRecord message) {
}
}

View File

@@ -0,0 +1,19 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.mq;
/**
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.1.0
*/
public class SncpRespConsumer {
}