From a77f45075770e3e48451d1bdef778be84488ffb7 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Sun, 17 May 2020 15:49:32 +0800 Subject: [PATCH] --- src/META-INF/application-template.xml | 2 +- src/org/redkale/boot/Application.java | 14 ++--- src/org/redkale/boot/ClusterAgent.java | 59 ++++++++++++++----- src/org/redkale/boot/NodeHttpServer.java | 2 +- src/org/redkale/boot/NodeProtocol.java | 2 +- src/org/redkale/boot/NodeServer.java | 12 ++-- src/org/redkale/boot/NodeSncpServer.java | 2 +- src/org/redkale/boot/NodeWatchServer.java | 2 +- .../boot/watch/ServerWatchService.java | 2 +- 9 files changed, 63 insertions(+), 34 deletions(-) diff --git a/src/META-INF/application-template.xml b/src/META-INF/application-template.xml index e67a5398b..3d22cdee1 100644 --- a/src/META-INF/application-template.xml +++ b/src/META-INF/application-template.xml @@ -50,7 +50,7 @@ value: 类名,必须是org.redkale.boot.ClusterAgent的子类 protocols: 服务发现可以处理的协议, 默认值为: SNCP, 多个协议用分号;隔开 ports: 服务发现可以处理的端口, 多个端口用分号;隔开 - + --> diff --git a/src/org/redkale/boot/Application.java b/src/org/redkale/boot/Application.java index 965d1300f..068a7d57d 100644 --- a/src/org/redkale/boot/Application.java +++ b/src/org/redkale/boot/Application.java @@ -817,15 +817,13 @@ public final class Application { for (FilterEntry entry : entrys) { final Class type = entry.getType(); NodeProtocol pros = type.getAnnotation(NodeProtocol.class); - for (String p : pros.value()) { - p = p.toUpperCase(); - if ("SNCP".equals(p) || "HTTP".equals(p)) continue; - final Class old = nodeClasses.get(p); - if (old != null && old != type) { - throw new RuntimeException("Protocol(" + p + ") had NodeServer-Class(" + old.getName() + ") but repeat NodeServer-Class(" + type.getName() + ")"); - } - nodeClasses.put(p, type); + String p = pros.value().toUpperCase(); + if ("SNCP".equals(p) || "HTTP".equals(p)) continue; + final Class old = nodeClasses.get(p); + if (old != null && old != type) { + throw new RuntimeException("Protocol(" + p + ") had NodeServer-Class(" + old.getName() + ") but repeat NodeServer-Class(" + type.getName() + ")"); } + nodeClasses.put(p, type); } } } diff --git a/src/org/redkale/boot/ClusterAgent.java b/src/org/redkale/boot/ClusterAgent.java index 46c162634..3e2c421dc 100644 --- a/src/org/redkale/boot/ClusterAgent.java +++ b/src/org/redkale/boot/ClusterAgent.java @@ -5,8 +5,10 @@ */ package org.redkale.boot; +import java.lang.ref.WeakReference; import java.net.InetSocketAddress; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import org.redkale.convert.json.JsonConvert; import org.redkale.net.*; import org.redkale.net.sncp.*; @@ -35,6 +37,10 @@ public abstract class ClusterAgent { protected TransportFactory transportFactory; + protected final ConcurrentHashMap localEntrys = new ConcurrentHashMap<>(); + + protected final ConcurrentHashMap remoteEntrys = new ConcurrentHashMap<>(); + public void init(AnyValue config) { this.config = config; this.name = config.getValue("name", ""); @@ -49,7 +55,7 @@ public abstract class ClusterAgent { List list = new ArrayList<>(); for (String str : its) { if (str.trim().isEmpty()) continue; - list.add(Integer.getInteger(str.trim())); + list.add(Integer.parseInt(str.trim())); } if (!list.isEmpty()) this.ports = list.stream().mapToInt(x -> x).toArray(); } @@ -69,56 +75,60 @@ public abstract class ClusterAgent { } //注册服务 - public void register(NodeServer ns, Set localServices, Set remoteServices) { + public void register(NodeServer ns, String protocol, Set localServices, Set remoteServices) { if (localServices.isEmpty()) return; //注册本地模式 for (Service service : localServices) { - register(ns, service); + register(ns, protocol, service); + ClusterEntry entry = new ClusterEntry(ns, protocol, service); + localEntrys.put(entry.serviceid, entry); } //远程模式加载IP列表, 只能是SNCP协议 for (Service service : remoteServices) { - updateTransport(ns, service); + updateTransport(ns, protocol, service); + ClusterEntry entry = new ClusterEntry(ns, protocol, service); + remoteEntrys.put(entry.serviceid, entry); } } //注销服务 - public void deregister(NodeServer ns, Set localServices, Set remoteServices) { + public void deregister(NodeServer ns, String protocol, Set localServices, Set remoteServices) { //注销本地模式 for (Service service : localServices) { - deregister(ns, service); + deregister(ns, protocol, service); } //远程模式不注册 } //获取远程服务的可用ip列表 - public abstract List queryAddress(NodeServer ns, Service service); + public abstract List queryAddress(NodeServer ns, String protocol, Service service); //注册服务 - public abstract void register(NodeServer ns, Service service); + public abstract void register(NodeServer ns, String protocol, Service service); //注销服务 - public abstract void deregister(NodeServer ns, Service service); + public abstract void deregister(NodeServer ns, String protocol, Service service); //格式: protocol:classtype-resourcename - public void updateTransport(NodeServer ns, Service service) { + public void updateTransport(NodeServer ns, String protocol, Service service) { Server server = ns.getServer(); String netprotocol = server instanceof SncpServer ? ((SncpServer) server).getNetprotocol() : Transport.DEFAULT_PROTOCOL; if (!Sncp.isSncpDyn(service)) return; - List addrs = queryAddress(ns, service); + List addrs = queryAddress(ns, protocol, service); if (addrs != null && !addrs.isEmpty()) { Sncp.updateTransport(service, transportFactory, Sncp.getResourceType(service).getName() + "-" + Sncp.getResourceName(service), netprotocol, ns.getSncpAddress(), null, addrs); } } //格式: protocol:classtype-resourcename - public String generateServiceType(NodeServer ns, Service service) { - if (!Sncp.isSncpDyn(service)) return ns.server.getProtocol().toLowerCase() + ":" + service.getClass().getName(); - return ns.server.getProtocol().toLowerCase() + ":" + Sncp.getResourceType(service).getName() + "-" + Sncp.getResourceName(service); + public String generateServiceType(NodeServer ns, String protocol, Service service) { + if (!Sncp.isSncpDyn(service)) return protocol.toLowerCase() + ":" + service.getClass().getName(); + return protocol.toLowerCase() + ":" + Sncp.getResourceType(service).getName() + "-" + Sncp.getResourceName(service); } //格式: protocol:classtype-resourcename:nodeid - public String generateServiceId(NodeServer ns, Service service) { - return generateServiceType(ns, service) + ":" + this.nodeid; + public String generateServiceId(NodeServer ns, String protocol, Service service) { + return generateServiceType(ns, protocol, service) + ":" + this.nodeid; } @Override @@ -174,4 +184,21 @@ public abstract class ClusterAgent { this.config = config; } + public class ClusterEntry { + + public String serviceid; + + public String servicetype; + + public WeakReference serviceref; + + public InetSocketAddress address; + + public ClusterEntry(NodeServer ns, String protocol, Service service) { + this.serviceid = generateServiceId(ns, protocol, service); + this.servicetype = generateServiceType(ns, protocol, service); + this.address = ns.getSocketAddress(); + this.serviceref = new WeakReference(service); + } + } } diff --git a/src/org/redkale/boot/NodeHttpServer.java b/src/org/redkale/boot/NodeHttpServer.java index 6f57275b0..380f9c429 100644 --- a/src/org/redkale/boot/NodeHttpServer.java +++ b/src/org/redkale/boot/NodeHttpServer.java @@ -30,7 +30,7 @@ import org.redkale.watch.*; * * @author zhangjx */ -@NodeProtocol({"HTTP"}) +@NodeProtocol("HTTP") public class NodeHttpServer extends NodeServer { protected final boolean rest; //是否加载REST服务, 为true加载rest节点信息并将所有可REST化的Service生成RestServlet diff --git a/src/org/redkale/boot/NodeProtocol.java b/src/org/redkale/boot/NodeProtocol.java index d38cb2280..f37f8da57 100644 --- a/src/org/redkale/boot/NodeProtocol.java +++ b/src/org/redkale/boot/NodeProtocol.java @@ -20,5 +20,5 @@ import java.lang.annotation.*; @Documented public @interface NodeProtocol { - String[] value(); + String value(); } diff --git a/src/org/redkale/boot/NodeServer.java b/src/org/redkale/boot/NodeServer.java index 94c43abfb..1f2602c5a 100644 --- a/src/org/redkale/boot/NodeServer.java +++ b/src/org/redkale/boot/NodeServer.java @@ -517,10 +517,12 @@ public abstract class NodeServer { protected void preInitServices(Set localServices, Set remoteServices) { final ClusterAgent[] clusters = application.clusterAgents; if (clusters == null || clusters.length == 0) return; + NodeProtocol pros = getClass().getAnnotation(NodeProtocol.class); + String protocol = pros.value().toUpperCase(); for (ClusterAgent cluster : clusters) { - if (!cluster.containsProtocol(server.getProtocol())) continue; + if (!cluster.containsProtocol(protocol)) continue; if (!cluster.containsPort(server.getSocketAddress().getPort())) continue; - cluster.register(this, localServices, remoteServices); + cluster.register(this, protocol, localServices, remoteServices); } } @@ -528,10 +530,12 @@ public abstract class NodeServer { protected void preDestroyServices(Set localServices, Set remoteServices) { final ClusterAgent[] clusters = application.clusterAgents; if (clusters == null || clusters.length == 0) return; + NodeProtocol pros = getClass().getAnnotation(NodeProtocol.class); + String protocol = pros.value().toUpperCase(); for (ClusterAgent cluster : clusters) { - if (!cluster.containsProtocol(server.getProtocol())) continue; + if (!cluster.containsProtocol(protocol)) continue; if (!cluster.containsPort(server.getSocketAddress().getPort())) continue; - cluster.deregister(this, localServices, remoteServices); + cluster.deregister(this, protocol, localServices, remoteServices); } } diff --git a/src/org/redkale/boot/NodeSncpServer.java b/src/org/redkale/boot/NodeSncpServer.java index 822f8c19e..64a06e143 100644 --- a/src/org/redkale/boot/NodeSncpServer.java +++ b/src/org/redkale/boot/NodeSncpServer.java @@ -24,7 +24,7 @@ import org.redkale.util.AnyValue.DefaultAnyValue; * * @author zhangjx */ -@NodeProtocol({"SNCP"}) +@NodeProtocol("SNCP") public class NodeSncpServer extends NodeServer { protected final SncpServer sncpServer; diff --git a/src/org/redkale/boot/NodeWatchServer.java b/src/org/redkale/boot/NodeWatchServer.java index f1addcdf0..b7ea1f695 100644 --- a/src/org/redkale/boot/NodeWatchServer.java +++ b/src/org/redkale/boot/NodeWatchServer.java @@ -16,7 +16,7 @@ import org.redkale.watch.*; * * @author zhangjx */ -@NodeProtocol({"WATCH"}) +@NodeProtocol("WATCH") public class NodeWatchServer extends NodeHttpServer { public NodeWatchServer(Application application, AnyValue serconf) { diff --git a/src/org/redkale/boot/watch/ServerWatchService.java b/src/org/redkale/boot/watch/ServerWatchService.java index ba750b28f..f536afd94 100644 --- a/src/org/redkale/boot/watch/ServerWatchService.java +++ b/src/org/redkale/boot/watch/ServerWatchService.java @@ -81,7 +81,7 @@ public class ServerWatchService extends AbstractWatchService { protocol += "/HTTP"; } else { NodeProtocol np = node.getClass().getAnnotation(NodeProtocol.class); - if (np != null && np.value().length > 0) protocol += "/" + np.value()[0]; + protocol += "/" + np.value(); } rs.put("name", server.getName()); rs.put("protocol", protocol);