From 982fc8440c2938b4d63a295bd94c2a610ab56e74 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Wed, 3 Jun 2020 17:53:55 +0800 Subject: [PATCH] --- src/org/redkale/boot/Application.java | 4 ++-- src/org/redkale/boot/NodeServer.java | 27 +++++++++++++++++++----- src/org/redkale/boot/NodeSncpServer.java | 10 ++++++--- src/org/redkale/mq/MessageAgent.java | 23 ++++++++++++++------ src/org/redkale/net/sncp/SncpServer.java | 3 ++- 5 files changed, 50 insertions(+), 17 deletions(-) diff --git a/src/org/redkale/boot/Application.java b/src/org/redkale/boot/Application.java index b4b3517c8..523064609 100644 --- a/src/org/redkale/boot/Application.java +++ b/src/org/redkale/boot/Application.java @@ -784,8 +784,8 @@ public final class Application { for (MessageAgent agent : this.messageAgents) { agent.start(sb).join(); } - if (sb.length() > 0) logger.info(sb.toString()); - logger.info(this.getClass().getSimpleName() + " MessageAgent init in " + (System.currentTimeMillis() - s) + " ms\r\n"); + if (sb.length() > 0) logger.info(sb.toString().trim()); + logger.info(this.getClass().getSimpleName() + " MessageAgent init in " + (System.currentTimeMillis() - s) + " ms"); } //if (!singletonrun) signalHandle(); //if (!singletonrun) clearPersistData(); diff --git a/src/org/redkale/boot/NodeServer.java b/src/org/redkale/boot/NodeServer.java index 19ebf257f..282a9916f 100644 --- a/src/org/redkale/boot/NodeServer.java +++ b/src/org/redkale/boot/NodeServer.java @@ -68,7 +68,7 @@ public abstract class NodeServer { private InetSocketAddress sncpAddress; //加载Service时的处理函数 - protected Consumer consumer; + protected BiConsumer consumer; //server节点的配置 protected AnyValue serverConf; @@ -90,6 +90,9 @@ public abstract class NodeServer { //MessageAgent对象集合 protected final Map messageAgents = new HashMap<>(); + //需要远程模式Service的MessageAgent对象集合 + protected final Map sncpRemoteAgents = new HashMap<>(); + private volatile int maxClassNameLength = 0; private volatile int maxNameLength = 0; @@ -375,7 +378,7 @@ public abstract class NodeServer { rf.inject(nodeService); //动态加载的Service也存在按需加载的注入资源 localServices.add(nodeService); interceptorServices.add(nodeService); - if (consumer != null) consumer.accept(nodeService); + if (consumer != null) consumer.accept(null, nodeService); } } catch (Exception e) { logger.log(Level.SEVERE, "WebSocketNode inject error", e); @@ -441,13 +444,19 @@ public abstract class NodeServer { } else if (isSNCP() && !entry.isAutoload()) { throw new RuntimeException(restype.getSimpleName() + "(class:" + serviceImplClass.getName() + ", name:" + resourceName + ", group:" + groups + ") is repeat."); } + MessageAgent agent = null; + if (entry.getProperty() != null && entry.getProperty().getValue("mq") != null) { + agent = application.getMessageAgent(entry.getProperty().getValue("mq")); + if (agent != null) messageAgents.put(agent.getName(), agent); + } if (Sncp.isRemote(service)) { remoteServices.add(service); + if (agent != null) sncpRemoteAgents.put(agent.getName(), agent); } else { if (field != null) rf.inject(service); //动态加载的Service也存在按需加载的注入资源 localServices.add(service); interceptorServices.add(service); - if (consumer != null) consumer.accept(service); + if (consumer != null) consumer.accept(agent, service); } } catch (RuntimeException ex) { throw ex; @@ -483,6 +492,12 @@ public abstract class NodeServer { sb.append(localThreadName).append(Sncp.toSimpleString(y, maxNameLength, maxClassNameLength)).append(" load and inject").append(LINE_SEPARATOR); }); } + if (isSNCP() && !sncpRemoteAgents.isEmpty()) { + sncpRemoteAgents.values().forEach(agent -> { + agent.putSncpResp((NodeSncpServer) this); + agent.startSncpRespConsumer(); + }); + } //----------------- init ----------------- List swlist = new ArrayList<>(localServices); swlist.sort((o1, o2) -> { @@ -587,14 +602,16 @@ public abstract class NodeServer { for (AnyValue list : proplist) { AnyValue.DefaultAnyValue prop = null; String sc = list.getValue("groups"); + String mq = list.getValue("mq"); if (sc != null) { sc = sc.trim(); if (sc.endsWith(";")) sc = sc.substring(0, sc.length() - 1); } if (sc == null) sc = localGroup; - if (sc != null) { + if (sc != null || mq != null) { prop = new AnyValue.DefaultAnyValue(); - prop.addValue("groups", sc); + if (sc != null) prop.addValue("groups", sc); + if (mq != null) prop.addValue("mq", mq); } ClassFilter filter = new ClassFilter(this.serverClassLoader, ref, inter, excludeSuperClasses, prop); for (AnyValue av : list.getAnyValues(property)) { // 节点 diff --git a/src/org/redkale/boot/NodeSncpServer.java b/src/org/redkale/boot/NodeSncpServer.java index 350f39275..a60809623 100644 --- a/src/org/redkale/boot/NodeSncpServer.java +++ b/src/org/redkale/boot/NodeSncpServer.java @@ -10,6 +10,7 @@ import java.net.*; import java.util.*; import java.util.logging.Level; import org.redkale.boot.ClassFilter.FilterEntry; +import org.redkale.mq.MessageAgent; import org.redkale.net.*; import org.redkale.net.sncp.*; import org.redkale.service.Service; @@ -32,7 +33,10 @@ public class NodeSncpServer extends NodeServer { private NodeSncpServer(Application application, AnyValue serconf) { super(application, createServer(application, serconf)); this.sncpServer = (SncpServer) this.server; - this.consumer = sncpServer == null || application.singletonrun ? null : x -> sncpServer.addSncpServlet(x); //singleton模式下不生成SncpServlet + this.consumer = sncpServer == null || application.singletonrun ? null : (agent, x) -> { + SncpDynServlet servlet = sncpServer.addSncpServlet(x); //singleton模式下不生成SncpServlet + if (agent != null) agent.putService(this, x, servlet); + }; } public static NodeServer createNodeServer(Application application, AnyValue serconf) { @@ -48,8 +52,8 @@ public class NodeSncpServer extends NodeServer { return sncpServer == null ? null : sncpServer.getSocketAddress(); } - public void consumerAccept(Service service) { - if (this.consumer != null) this.consumer.accept(service); + public void consumerAccept(MessageAgent agent, Service service) { + if (this.consumer != null) this.consumer.accept(agent, service); } @Override diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index 60d983dd9..544b1c04e 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -45,22 +45,32 @@ public abstract class MessageAgent { protected SncpMessageProcessor sncpRespProcessor; + //sncpRespConsumer启动耗时, 小于0表示未启动 + protected long sncpRespStartms = -1; + //本地Service消息接收处理器, key:topic - protected ConcurrentHashMap messageNodes = new ConcurrentHashMap<>(); + protected HashMap messageNodes = new LinkedHashMap<>(); public void init(AnyValue config) { } + public final synchronized void startSncpRespConsumer() { + if (this.sncpRespStartms >= 0) return; + long s = System.currentTimeMillis(); + if (this.sncpRespConsumer != null) { + this.sncpRespConsumer.start(); + this.sncpRespConsumer.waitFor(); + } + this.sncpRespStartms = System.currentTimeMillis() - s; + } + public CompletableFuture start(final StringBuffer sb) { AtomicInteger maxlen = new AtomicInteger(sncpRespConsumer == null ? 0 : sncpRespConsumer.topic.length()); this.messageNodes.values().forEach(node -> { if (node.consumer.topic.length() > maxlen.get()) maxlen.set(node.consumer.topic.length()); }); - if (this.sncpRespConsumer != null) { - long s = System.currentTimeMillis(); - this.sncpRespConsumer.start(); - this.sncpRespConsumer.waitFor(); - sb.append("MessageConsumer(topic=").append(fillString(this.sncpRespConsumer.topic, maxlen.get())).append(") init and start in ").append(System.currentTimeMillis() - s).append(" ms\r\n"); + if (this.sncpRespStartms >= 0) { + sb.append("MessageConsumer(topic=").append(fillString(this.sncpRespConsumer.topic, maxlen.get())).append(") init and start in ").append(this.sncpRespStartms).append(" ms\r\n"); } this.messageNodes.values().forEach(node -> { long s = System.currentTimeMillis(); @@ -152,6 +162,7 @@ public abstract class MessageAgent { //格式: sncp.req.user protected String generateSncpReqTopic(Service service) { String resname = Sncp.getResourceName(service); + if (service instanceof WebSocketNode) return "sncp.req.wsn" + (resname.isEmpty() ? "" : ("-" + resname)); return "sncp.req." + Sncp.getResourceType(service).getSimpleName().replaceAll("Service.*$", "").toLowerCase() + (resname.isEmpty() ? "" : ("-" + resname)); } diff --git a/src/org/redkale/net/sncp/SncpServer.java b/src/org/redkale/net/sncp/SncpServer.java index ca5ab1912..6d320a5ce 100644 --- a/src/org/redkale/net/sncp/SncpServer.java +++ b/src/org/redkale/net/sncp/SncpServer.java @@ -97,10 +97,11 @@ public class SncpServer extends Server