This commit is contained in:
Redkale
2020-06-05 11:29:54 +08:00
parent 2fb3472de0
commit b6980f7cf8
5 changed files with 55 additions and 21 deletions

View File

@@ -778,6 +778,7 @@ public final class Application {
runServers(timecd, others);
runServers(timecd, watchs); //必须在所有服务都启动后再启动WATCH服务
timecd.await();
if (this.clusterAgent != null) this.clusterAgent.start();
if (this.messageAgents != null) {
long s = System.currentTimeMillis();
final StringBuffer sb = new StringBuffer();
@@ -1035,6 +1036,7 @@ public final class Application {
}
});
if (clusterAgent != null) {
clusterAgent.stop();
clusterAgent.destroy(clusterAgent.getConfig());
}
if (this.messageAgents != null) {

View File

@@ -14,6 +14,7 @@ import java.util.logging.Level;
import javax.annotation.*;
import static org.redkale.boot.Application.RESNAME_SNCP_ADDR;
import org.redkale.boot.ClassFilter.FilterEntry;
import org.redkale.cluster.ClusterAgent;
import org.redkale.mq.MessageAgent;
import org.redkale.net.*;
import org.redkale.net.http.*;
@@ -159,7 +160,7 @@ public class NodeHttpServer extends NodeServer {
if (!prefix0.isEmpty() && prefix0.charAt(prefix0.length() - 1) == '/') prefix0 = prefix0.substring(0, prefix0.length() - 1);
if (!prefix0.isEmpty() && prefix0.charAt(0) != '/') prefix0 = '/' + prefix0;
final String prefix = prefix0;
final String threadName = "[" + Thread.currentThread().getName() + "] ";
final String localThreadName = "[" + Thread.currentThread().getName() + "] ";
List<FilterEntry<? extends Servlet>> list = new ArrayList(servletFilter.getFilterEntrys());
list.sort((FilterEntry<? extends Servlet> o1, FilterEntry<? extends Servlet> o2) -> { //必须保证WebSocketServlet优先加载 因为要确保其他的HttpServlet可以注入本地模式的WebSocketNode
boolean ws1 = WebSocketServlet.class.isAssignableFrom(o1.getType());
@@ -198,7 +199,7 @@ public class NodeHttpServer extends NodeServer {
if (as.getKey().length() > max) max = as.getKey().length();
}
for (AbstractMap.SimpleEntry<String, String[]> as : ss) {
sb.append(threadName).append(" Load ").append(as.getKey());
sb.append(localThreadName).append(" Load ").append(as.getKey());
for (int i = 0; i < max - as.getKey().length(); i++) {
sb.append(' ');
}
@@ -276,6 +277,7 @@ public class NodeHttpServer extends NodeServer {
WebServlet ws = servlet.getClass().getAnnotation(WebServlet.class);
if (ws != null && !ws.repair()) prefix2 = "";
resourceFactory.inject(servlet, NodeHttpServer.this);
dynServletMap.put(service, servlet);
if (agent != null) agent.putService(this, service, servlet);
//if (finest) logger.finest(localThreadName + " Create RestServlet(resource.name='" + name + "') = " + servlet);
if (ss != null) {
@@ -364,4 +366,21 @@ public class NodeHttpServer extends NodeServer {
sb.append(localThreadName).append(" All HttpServlets load cost ").append(System.currentTimeMillis() - starts).append(" ms").append(LINE_SEPARATOR);
}
}
@Override //loadServlet执行之后调用
protected void postLoadServlets() {
final ClusterAgent cluster = application.clusterAgent;
if (cluster != null) {
NodeProtocol pros = getClass().getAnnotation(NodeProtocol.class);
String protocol = pros.value().toUpperCase();
if (!cluster.containsProtocol(protocol)) return;
if (!cluster.containsPort(server.getSocketAddress().getPort())) return;
cluster.register(this, protocol, dynServletMap.keySet(), new HashSet<>());
}
}
@Override
protected void afterClusterDeregisterOnPreDestroyServices(ClusterAgent cluster, String protocol) {
cluster.deregister(this, protocol, dynServletMap.keySet(), new HashSet<>());
}
}

View File

@@ -87,6 +87,8 @@ public abstract class NodeServer {
//远程模式的Service对象集合
protected final Set<Service> remoteServices = new LinkedHashSet<>();
protected final Map<Service, Servlet> dynServletMap = new LinkedHashMap<>();
//MessageAgent对象集合
protected final Map<String, MessageAgent> messageAgents = new HashMap<>();
@@ -189,6 +191,7 @@ public abstract class NodeServer {
if (!application.singletonrun) { //非singleton模式下才加载Filter、Servlet
loadFilter(filterFilter, otherFilter);
loadServlet(servletFilter, otherFilter);
postLoadServlets();
}
if (this.interceptor != null) this.resourceFactory.inject(this.interceptor);
}
@@ -444,7 +447,7 @@ public abstract class NodeServer {
} else {
service = Sncp.createRemoteService(serverClassLoader, resourceName, serviceImplClass, agent, appSncpTransFactory, NodeServer.this.sncpAddress, groups, entry.getProperty());
}
if(service instanceof WebSocketNodeService)((WebSocketNodeService) service).setName(resourceName);
if (service instanceof WebSocketNodeService) ((WebSocketNodeService) service).setName(resourceName);
final Class restype = Sncp.getResourceType(service);
if (rf.find(resourceName, restype) == null) {
regFactory.register(resourceName, restype, service);
@@ -550,24 +553,31 @@ public abstract class NodeServer {
if (!cluster.containsProtocol(protocol)) return;
if (!cluster.containsPort(server.getSocketAddress().getPort())) return;
cluster.register(this, protocol, localServices, remoteServices);
}
//loadServlet执行之后调用
protected void postLoadServlets() {
}
//Service.destroy执行之前调用
protected void preDestroyServices(Set<Service> localServices, Set<Service> remoteServices) {
if (application.clusterAgent != null) { //服务注销
final ClusterAgent agent = application.clusterAgent;
final ClusterAgent cluster = application.clusterAgent;
NodeProtocol pros = getClass().getAnnotation(NodeProtocol.class);
String protocol = pros.value().toUpperCase();
if (agent.containsProtocol(protocol) && agent.containsPort(server.getSocketAddress().getPort())) {
agent.deregister(this, protocol, localServices, remoteServices);
if (cluster.containsProtocol(protocol) && cluster.containsPort(server.getSocketAddress().getPort())) {
cluster.deregister(this, protocol, localServices, remoteServices);
afterClusterDeregisterOnPreDestroyServices(cluster, protocol);
}
}
if (!this.messageAgents.isEmpty()) { //MQ
}
}
protected void afterClusterDeregisterOnPreDestroyServices(ClusterAgent cluster, String protocol) {
}
//Server.start执行之后调用
protected void postStartServer(Set<Service> localServices, Set<Service> remoteServices) {
}

View File

@@ -36,6 +36,7 @@ public class NodeSncpServer extends NodeServer {
this.consumer = sncpServer == null || application.singletonrun ? null : (agent, x) -> {
if (x.getClass().getAnnotation(Local.class) != null) return;
SncpDynServlet servlet = sncpServer.addSncpServlet(x); //singleton模式下不生成SncpServlet
dynServletMap.put(x, servlet);
if (agent != null) agent.putService(this, x, servlet);
};
}

View File

@@ -86,7 +86,7 @@ public abstract class ClusterAgent {
if (localServices.isEmpty()) return;
//注册本地模式
for (Service service : localServices) {
if (!canRegister(service)) continue;
if (!canRegister(protocol, service)) continue;
register(ns, protocol, service);
ClusterEntry entry = new ClusterEntry(ns, protocol, service);
localEntrys.put(entry.serviceid, entry);
@@ -99,36 +99,38 @@ public abstract class ClusterAgent {
remoteEntrys.put(entry.serviceid, entry);
}
}
afterRegister(ns, protocol);
}
//注销服务
public void deregister(NodeServer ns, String protocol, Set<Service> localServices, Set<Service> remoteServices) {
//注销本地模式
for (Service service : localServices) {
if (!canRegister(service)) continue;
if (!canRegister(protocol, service)) continue;
deregister(ns, protocol, service);
}
int s = intervalCheckSeconds();
if (s > 0) { //暂停,弥补其他依赖本进程服务的周期偏差
try {
Thread.sleep(s * 1000);
} catch (InterruptedException ex) {
}
logger.info(this.getClass().getSimpleName() + " sleep " + s + " s after deregister");
}
//远程模式不注册
}
protected boolean canRegister(Service service) {
if (service.getClass().getAnnotation(Local.class) != null) return false;
protected boolean canRegister(String protocol, Service service) {
if ("SNCP".equalsIgnoreCase(protocol) && service.getClass().getAnnotation(Local.class) != null) return false;
if (service instanceof WebSocketNode) {
if (((WebSocketNode) service).getLocalWebSocketEngine() == null) return false;
}
return true;
}
protected void afterRegister(NodeServer ns, String protocol) {
public void start() {
}
public void stop() {
int s = intervalCheckSeconds();
if (s > 0) { //暂停,弥补其他依赖本进程服务的周期偏差
try {
Thread.sleep(s * 1000);
} catch (InterruptedException ex) {
}
logger.info(this.getClass().getSimpleName() + " sleep " + s + "s after deregister");
}
}
public int intervalCheckSeconds() {