From b6980f7cf87afd610f64b5080f590013cb470814 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Fri, 5 Jun 2020 11:29:54 +0800 Subject: [PATCH] --- src/org/redkale/boot/Application.java | 2 ++ src/org/redkale/boot/NodeHttpServer.java | 23 +++++++++++++++-- src/org/redkale/boot/NodeServer.java | 20 +++++++++++---- src/org/redkale/boot/NodeSncpServer.java | 1 + src/org/redkale/cluster/ClusterAgent.java | 30 ++++++++++++----------- 5 files changed, 55 insertions(+), 21 deletions(-) diff --git a/src/org/redkale/boot/Application.java b/src/org/redkale/boot/Application.java index 626bc8bdc..da8f496d4 100644 --- a/src/org/redkale/boot/Application.java +++ b/src/org/redkale/boot/Application.java @@ -778,6 +778,7 @@ public final class Application { runServers(timecd, others); runServers(timecd, watchs); //必须在所有服务都启动后再启动WATCH服务 timecd.await(); + if (this.clusterAgent != null) this.clusterAgent.start(); if (this.messageAgents != null) { long s = System.currentTimeMillis(); final StringBuffer sb = new StringBuffer(); @@ -1035,6 +1036,7 @@ public final class Application { } }); if (clusterAgent != null) { + clusterAgent.stop(); clusterAgent.destroy(clusterAgent.getConfig()); } if (this.messageAgents != null) { diff --git a/src/org/redkale/boot/NodeHttpServer.java b/src/org/redkale/boot/NodeHttpServer.java index fffd8e7d2..cc3d919a1 100644 --- a/src/org/redkale/boot/NodeHttpServer.java +++ b/src/org/redkale/boot/NodeHttpServer.java @@ -14,6 +14,7 @@ import java.util.logging.Level; import javax.annotation.*; import static org.redkale.boot.Application.RESNAME_SNCP_ADDR; import org.redkale.boot.ClassFilter.FilterEntry; +import org.redkale.cluster.ClusterAgent; import org.redkale.mq.MessageAgent; import org.redkale.net.*; import org.redkale.net.http.*; @@ -159,7 +160,7 @@ public class NodeHttpServer extends NodeServer { if (!prefix0.isEmpty() && prefix0.charAt(prefix0.length() - 1) == '/') prefix0 = prefix0.substring(0, prefix0.length() - 1); if (!prefix0.isEmpty() && prefix0.charAt(0) != '/') prefix0 = '/' + prefix0; final String prefix = prefix0; - final String threadName = "[" + Thread.currentThread().getName() + "] "; + final String localThreadName = "[" + Thread.currentThread().getName() + "] "; List> list = new ArrayList(servletFilter.getFilterEntrys()); list.sort((FilterEntry o1, FilterEntry o2) -> { //必须保证WebSocketServlet优先加载, 因为要确保其他的HttpServlet可以注入本地模式的WebSocketNode boolean ws1 = WebSocketServlet.class.isAssignableFrom(o1.getType()); @@ -198,7 +199,7 @@ public class NodeHttpServer extends NodeServer { if (as.getKey().length() > max) max = as.getKey().length(); } for (AbstractMap.SimpleEntry as : ss) { - sb.append(threadName).append(" Load ").append(as.getKey()); + sb.append(localThreadName).append(" Load ").append(as.getKey()); for (int i = 0; i < max - as.getKey().length(); i++) { sb.append(' '); } @@ -276,6 +277,7 @@ public class NodeHttpServer extends NodeServer { WebServlet ws = servlet.getClass().getAnnotation(WebServlet.class); if (ws != null && !ws.repair()) prefix2 = ""; resourceFactory.inject(servlet, NodeHttpServer.this); + dynServletMap.put(service, servlet); if (agent != null) agent.putService(this, service, servlet); //if (finest) logger.finest(localThreadName + " Create RestServlet(resource.name='" + name + "') = " + servlet); if (ss != null) { @@ -364,4 +366,21 @@ public class NodeHttpServer extends NodeServer { sb.append(localThreadName).append(" All HttpServlets load cost ").append(System.currentTimeMillis() - starts).append(" ms").append(LINE_SEPARATOR); } } + + @Override //loadServlet执行之后调用 + protected void postLoadServlets() { + final ClusterAgent cluster = application.clusterAgent; + if (cluster != null) { + NodeProtocol pros = getClass().getAnnotation(NodeProtocol.class); + String protocol = pros.value().toUpperCase(); + if (!cluster.containsProtocol(protocol)) return; + if (!cluster.containsPort(server.getSocketAddress().getPort())) return; + cluster.register(this, protocol, dynServletMap.keySet(), new HashSet<>()); + } + } + + @Override + protected void afterClusterDeregisterOnPreDestroyServices(ClusterAgent cluster, String protocol) { + cluster.deregister(this, protocol, dynServletMap.keySet(), new HashSet<>()); + } } diff --git a/src/org/redkale/boot/NodeServer.java b/src/org/redkale/boot/NodeServer.java index 911d2ff53..66c672252 100644 --- a/src/org/redkale/boot/NodeServer.java +++ b/src/org/redkale/boot/NodeServer.java @@ -87,6 +87,8 @@ public abstract class NodeServer { //远程模式的Service对象集合 protected final Set remoteServices = new LinkedHashSet<>(); + protected final Map dynServletMap = new LinkedHashMap<>(); + //MessageAgent对象集合 protected final Map messageAgents = new HashMap<>(); @@ -189,6 +191,7 @@ public abstract class NodeServer { if (!application.singletonrun) { //非singleton模式下才加载Filter、Servlet loadFilter(filterFilter, otherFilter); loadServlet(servletFilter, otherFilter); + postLoadServlets(); } if (this.interceptor != null) this.resourceFactory.inject(this.interceptor); } @@ -444,7 +447,7 @@ public abstract class NodeServer { } else { service = Sncp.createRemoteService(serverClassLoader, resourceName, serviceImplClass, agent, appSncpTransFactory, NodeServer.this.sncpAddress, groups, entry.getProperty()); } - if(service instanceof WebSocketNodeService)((WebSocketNodeService) service).setName(resourceName); + if (service instanceof WebSocketNodeService) ((WebSocketNodeService) service).setName(resourceName); final Class restype = Sncp.getResourceType(service); if (rf.find(resourceName, restype) == null) { regFactory.register(resourceName, restype, service); @@ -550,24 +553,31 @@ public abstract class NodeServer { if (!cluster.containsProtocol(protocol)) return; if (!cluster.containsPort(server.getSocketAddress().getPort())) return; cluster.register(this, protocol, localServices, remoteServices); + } + + //loadServlet执行之后调用 + protected void postLoadServlets() { } //Service.destroy执行之前调用 protected void preDestroyServices(Set localServices, Set remoteServices) { if (application.clusterAgent != null) { //服务注销 - final ClusterAgent agent = application.clusterAgent; + final ClusterAgent cluster = application.clusterAgent; NodeProtocol pros = getClass().getAnnotation(NodeProtocol.class); String protocol = pros.value().toUpperCase(); - if (agent.containsProtocol(protocol) && agent.containsPort(server.getSocketAddress().getPort())) { - agent.deregister(this, protocol, localServices, remoteServices); + if (cluster.containsProtocol(protocol) && cluster.containsPort(server.getSocketAddress().getPort())) { + cluster.deregister(this, protocol, localServices, remoteServices); + afterClusterDeregisterOnPreDestroyServices(cluster, protocol); } } if (!this.messageAgents.isEmpty()) { //MQ - } } + protected void afterClusterDeregisterOnPreDestroyServices(ClusterAgent cluster, String protocol) { + } + //Server.start执行之后调用 protected void postStartServer(Set localServices, Set remoteServices) { } diff --git a/src/org/redkale/boot/NodeSncpServer.java b/src/org/redkale/boot/NodeSncpServer.java index 9ab3c96ac..41bae9ce8 100644 --- a/src/org/redkale/boot/NodeSncpServer.java +++ b/src/org/redkale/boot/NodeSncpServer.java @@ -36,6 +36,7 @@ public class NodeSncpServer extends NodeServer { this.consumer = sncpServer == null || application.singletonrun ? null : (agent, x) -> { if (x.getClass().getAnnotation(Local.class) != null) return; SncpDynServlet servlet = sncpServer.addSncpServlet(x); //singleton模式下不生成SncpServlet + dynServletMap.put(x, servlet); if (agent != null) agent.putService(this, x, servlet); }; } diff --git a/src/org/redkale/cluster/ClusterAgent.java b/src/org/redkale/cluster/ClusterAgent.java index 46d060937..3b9653d34 100644 --- a/src/org/redkale/cluster/ClusterAgent.java +++ b/src/org/redkale/cluster/ClusterAgent.java @@ -86,7 +86,7 @@ public abstract class ClusterAgent { if (localServices.isEmpty()) return; //注册本地模式 for (Service service : localServices) { - if (!canRegister(service)) continue; + if (!canRegister(protocol, service)) continue; register(ns, protocol, service); ClusterEntry entry = new ClusterEntry(ns, protocol, service); localEntrys.put(entry.serviceid, entry); @@ -99,36 +99,38 @@ public abstract class ClusterAgent { remoteEntrys.put(entry.serviceid, entry); } } - afterRegister(ns, protocol); } //注销服务 public void deregister(NodeServer ns, String protocol, Set localServices, Set remoteServices) { //注销本地模式 for (Service service : localServices) { - if (!canRegister(service)) continue; + if (!canRegister(protocol, service)) continue; deregister(ns, protocol, service); } - int s = intervalCheckSeconds(); - if (s > 0) { //暂停,弥补其他依赖本进程服务的周期偏差 - try { - Thread.sleep(s * 1000); - } catch (InterruptedException ex) { - } - logger.info(this.getClass().getSimpleName() + " sleep " + s + " s after deregister"); - } //远程模式不注册 } - protected boolean canRegister(Service service) { - if (service.getClass().getAnnotation(Local.class) != null) return false; + protected boolean canRegister(String protocol, Service service) { + if ("SNCP".equalsIgnoreCase(protocol) && service.getClass().getAnnotation(Local.class) != null) return false; if (service instanceof WebSocketNode) { if (((WebSocketNode) service).getLocalWebSocketEngine() == null) return false; } return true; } - protected void afterRegister(NodeServer ns, String protocol) { + public void start() { + } + + public void stop() { + int s = intervalCheckSeconds(); + if (s > 0) { //暂停,弥补其他依赖本进程服务的周期偏差 + try { + Thread.sleep(s * 1000); + } catch (InterruptedException ex) { + } + logger.info(this.getClass().getSimpleName() + " sleep " + s + "s after deregister"); + } } public int intervalCheckSeconds() {