From d83e7c22ac7efe39974812807bb020f4a89f3ae0 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Sat, 16 May 2020 19:59:49 +0800 Subject: [PATCH] --- src/META-INF/application-template.xml | 4 +- src/org/redkale/boot/Application.java | 17 +++ src/org/redkale/boot/ClusterAgent.java | 146 ++++++++++++++++++++++++ src/org/redkale/boot/NodeServer.java | 14 +++ src/org/redkale/watch/ClusterAgent.java | 18 --- 5 files changed, 179 insertions(+), 20 deletions(-) create mode 100644 src/org/redkale/boot/ClusterAgent.java delete mode 100644 src/org/redkale/watch/ClusterAgent.java diff --git a/src/META-INF/application-template.xml b/src/META-INF/application-template.xml index fe5f0b45c..b9ba481cd 100644 --- a/src/META-INF/application-template.xml +++ b/src/META-INF/application-template.xml @@ -47,10 +47,10 @@ diff --git a/src/org/redkale/boot/Application.java b/src/org/redkale/boot/Application.java index 09b469676..930bf7768 100644 --- a/src/org/redkale/boot/Application.java +++ b/src/org/redkale/boot/Application.java @@ -312,6 +312,23 @@ public final class Application { logger.log(Level.INFO, Transport.class.getSimpleName() + " configure bufferCapacity = " + bufferCapacity / 1024 + "K; bufferPoolSize = " + bufferPoolSize + "; threads = " + threads + ";"); } AnyValue[] clusterConfs = resources.getAnyValues("cluster"); + if (clusterConfs != null && clusterConfs.length > 0) { + for (AnyValue clusterConf : clusterConfs) { + try { + Class type = classLoader.loadClass(clusterConf.getValue("value")); + if (!ClusterAgent.class.isAssignableFrom(type)) { + logger.log(Level.SEVERE, "load application cluster resource, but not " + ClusterAgent.class.getSimpleName() + " error: " + clusterConf); + } else { + ClusterAgent cluster = (ClusterAgent) type.getDeclaredConstructor().newInstance(); + cluster.setNodeid(this.nodeid); + cluster.init(clusterConf); + clusters.add(cluster); + } + } catch (Exception e) { + logger.log(Level.SEVERE, "load application cluster resource error: " + clusterConf, e); + } + } + } } if (transportGroup == null) { final AtomicInteger counter = new AtomicInteger(); diff --git a/src/org/redkale/boot/ClusterAgent.java b/src/org/redkale/boot/ClusterAgent.java new file mode 100644 index 000000000..3467a46ef --- /dev/null +++ b/src/org/redkale/boot/ClusterAgent.java @@ -0,0 +1,146 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.boot; + +import java.util.*; +import org.redkale.convert.json.JsonConvert; +import org.redkale.net.sncp.Sncp; +import org.redkale.service.Service; +import org.redkale.util.*; + +/** + * 第三方服务发现管理接口cluster + * + * + * 详情见: https://redkale.org + * + * @author zhangjx + */ +public abstract class ClusterAgent { + + protected int nodeid; + + protected String name; + + protected String[] protocols; //必须全大写 + + protected int[] ports; + + protected AnyValue config; + + public void init(AnyValue config) { + this.config = config; + this.name = config.getValue("name", ""); + { + String ps = config.getValue("protocols", "").toUpperCase(); + if (ps == null || ps.isEmpty()) ps = "SNCP"; + this.protocols = ps.split(";"); + } + String ts = config.getValue("ports", ""); + if (ts != null && !ts.isEmpty()) { + String[] its = ts.split(";"); + List list = new ArrayList<>(); + for (String str : its) { + if (str.trim().isEmpty()) continue; + list.add(Integer.getInteger(str.trim())); + } + if (!list.isEmpty()) this.ports = list.stream().mapToInt(x -> x).toArray(); + } + } + + public void destroy(AnyValue config) { + } + + public boolean containsProtocol(String protocol) { + if (protocol == null || protocol.isEmpty()) return false; + return protocols == null || Utility.contains(protocols, protocol.toUpperCase()); + } + + public boolean containsPort(int port) { + if (ports == null || ports.length == 0) return true; + return Utility.contains(ports, port); + } + + //注册服务 + public void register(NodeServer ns, Set localServices, Set remoteServices) { + if (localServices.isEmpty()) return; + for (Service service : localServices) { //注册本地模式 + register(ns, service); + } + //远程模式不注册 + } + + //注销服务 + public void deregister(NodeServer ns, Set localServices, Set remoteServices) { + for (Service service : localServices) {//注销本地模式 + deregister(ns, service); + } + //远程模式不注册 + } + + //注册服务 + public abstract void register(NodeServer server, Service service); + + //注销服务 + public abstract void deregister(NodeServer server, Service service); + + //格式: protocol:classtype-resourcename + public String generateServiceType(NodeServer ns, Service service) { + if (!Sncp.isSncpDyn(service)) return ns.server.getProtocol().toLowerCase() + ":" + service.getClass().getName(); + return ns.server.getProtocol().toLowerCase() + ":" + Sncp.getResourceType(service).getName() + "-" + Sncp.getResourceName(service); + } + + //格式: protocol:classtype-resourcename:nodeid + public String generateServiceId(NodeServer ns, Service service) { + return generateServiceType(ns, service) + ":" + this.nodeid; + } + + @Override + public String toString() { + return JsonConvert.root().convertTo(this); + } + + public int getNodeid() { + return nodeid; + } + + public void setNodeid(int nodeid) { + this.nodeid = nodeid; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String[] getProtocols() { + return protocols; + } + + public void setProtocols(String[] protocols) { + this.protocols = protocols; + } + + public int[] getPorts() { + return ports; + } + + public void setPorts(int[] ports) { + this.ports = ports; + } + + public AnyValue getConfig() { + return config; + } + + public void setConfig(AnyValue config) { + this.config = config; + } + +} diff --git a/src/org/redkale/boot/NodeServer.java b/src/org/redkale/boot/NodeServer.java index 73dc2ba9d..94c43abfb 100644 --- a/src/org/redkale/boot/NodeServer.java +++ b/src/org/redkale/boot/NodeServer.java @@ -515,10 +515,24 @@ public abstract class NodeServer { //Service.init执行之前调用 protected void preInitServices(Set localServices, Set remoteServices) { + final ClusterAgent[] clusters = application.clusterAgents; + if (clusters == null || clusters.length == 0) return; + for (ClusterAgent cluster : clusters) { + if (!cluster.containsProtocol(server.getProtocol())) continue; + if (!cluster.containsPort(server.getSocketAddress().getPort())) continue; + cluster.register(this, localServices, remoteServices); + } } //Service.destroy执行之前调用 protected void preDestroyServices(Set localServices, Set remoteServices) { + final ClusterAgent[] clusters = application.clusterAgents; + if (clusters == null || clusters.length == 0) return; + for (ClusterAgent cluster : clusters) { + if (!cluster.containsProtocol(server.getProtocol())) continue; + if (!cluster.containsPort(server.getSocketAddress().getPort())) continue; + cluster.deregister(this, localServices, remoteServices); + } } protected abstract ClassFilter createFilterClassFilter(); diff --git a/src/org/redkale/watch/ClusterAgent.java b/src/org/redkale/watch/ClusterAgent.java deleted file mode 100644 index 2b5c900af..000000000 --- a/src/org/redkale/watch/ClusterAgent.java +++ /dev/null @@ -1,18 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package org.redkale.watch; - -/** - * 第三方服务发现管理接口cluster - * - * - * 详情见: https://redkale.org - * - * @author zhangjx - */ -public abstract class ClusterAgent { - -}