This commit is contained in:
@@ -47,10 +47,10 @@
|
|||||||
<!--
|
<!--
|
||||||
第三方服务发现管理接口
|
第三方服务发现管理接口
|
||||||
name: cluster名称,不能包含特殊字符。
|
name: cluster名称,不能包含特殊字符。
|
||||||
value: 类名,必须是org.redkale.watch.ClusterAgent的子类
|
value: 类名,必须是org.redkale.boot.ClusterAgent的子类
|
||||||
protocols: 服务发现可以处理的协议, 默认值为: SNCP, 多个协议用分号;隔开
|
protocols: 服务发现可以处理的协议, 默认值为: SNCP, 多个协议用分号;隔开
|
||||||
ports: 服务发现可以处理的端口, 多个端口用分号;隔开
|
ports: 服务发现可以处理的端口, 多个端口用分号;隔开
|
||||||
<cluster name="cluster1" value="org.redkalex.consul.ConsulClusterAgent" protocols="SNCP;" ports="7070;7071">
|
<cluster name="cluster1" value="org.redkalex.consul.ConsulClusterAgent" protocols="SNCP" ports="7070;7071">
|
||||||
<property name="xxxxxx" value="XXXXXXXX"/>
|
<property name="xxxxxx" value="XXXXXXXX"/>
|
||||||
</cluster>
|
</cluster>
|
||||||
-->
|
-->
|
||||||
|
|||||||
@@ -312,6 +312,23 @@ public final class Application {
|
|||||||
logger.log(Level.INFO, Transport.class.getSimpleName() + " configure bufferCapacity = " + bufferCapacity / 1024 + "K; bufferPoolSize = " + bufferPoolSize + "; threads = " + threads + ";");
|
logger.log(Level.INFO, Transport.class.getSimpleName() + " configure bufferCapacity = " + bufferCapacity / 1024 + "K; bufferPoolSize = " + bufferPoolSize + "; threads = " + threads + ";");
|
||||||
}
|
}
|
||||||
AnyValue[] clusterConfs = resources.getAnyValues("cluster");
|
AnyValue[] clusterConfs = resources.getAnyValues("cluster");
|
||||||
|
if (clusterConfs != null && clusterConfs.length > 0) {
|
||||||
|
for (AnyValue clusterConf : clusterConfs) {
|
||||||
|
try {
|
||||||
|
Class type = classLoader.loadClass(clusterConf.getValue("value"));
|
||||||
|
if (!ClusterAgent.class.isAssignableFrom(type)) {
|
||||||
|
logger.log(Level.SEVERE, "load application cluster resource, but not " + ClusterAgent.class.getSimpleName() + " error: " + clusterConf);
|
||||||
|
} else {
|
||||||
|
ClusterAgent cluster = (ClusterAgent) type.getDeclaredConstructor().newInstance();
|
||||||
|
cluster.setNodeid(this.nodeid);
|
||||||
|
cluster.init(clusterConf);
|
||||||
|
clusters.add(cluster);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.log(Level.SEVERE, "load application cluster resource error: " + clusterConf, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (transportGroup == null) {
|
if (transportGroup == null) {
|
||||||
final AtomicInteger counter = new AtomicInteger();
|
final AtomicInteger counter = new AtomicInteger();
|
||||||
|
|||||||
146
src/org/redkale/boot/ClusterAgent.java
Normal file
146
src/org/redkale/boot/ClusterAgent.java
Normal file
@@ -0,0 +1,146 @@
|
|||||||
|
/*
|
||||||
|
* To change this license header, choose License Headers in Project Properties.
|
||||||
|
* To change this template file, choose Tools | Templates
|
||||||
|
* and open the template in the editor.
|
||||||
|
*/
|
||||||
|
package org.redkale.boot;
|
||||||
|
|
||||||
|
import java.util.*;
|
||||||
|
import org.redkale.convert.json.JsonConvert;
|
||||||
|
import org.redkale.net.sncp.Sncp;
|
||||||
|
import org.redkale.service.Service;
|
||||||
|
import org.redkale.util.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 第三方服务发现管理接口cluster
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* 详情见: https://redkale.org
|
||||||
|
*
|
||||||
|
* @author zhangjx
|
||||||
|
*/
|
||||||
|
public abstract class ClusterAgent {
|
||||||
|
|
||||||
|
protected int nodeid;
|
||||||
|
|
||||||
|
protected String name;
|
||||||
|
|
||||||
|
protected String[] protocols; //必须全大写
|
||||||
|
|
||||||
|
protected int[] ports;
|
||||||
|
|
||||||
|
protected AnyValue config;
|
||||||
|
|
||||||
|
public void init(AnyValue config) {
|
||||||
|
this.config = config;
|
||||||
|
this.name = config.getValue("name", "");
|
||||||
|
{
|
||||||
|
String ps = config.getValue("protocols", "").toUpperCase();
|
||||||
|
if (ps == null || ps.isEmpty()) ps = "SNCP";
|
||||||
|
this.protocols = ps.split(";");
|
||||||
|
}
|
||||||
|
String ts = config.getValue("ports", "");
|
||||||
|
if (ts != null && !ts.isEmpty()) {
|
||||||
|
String[] its = ts.split(";");
|
||||||
|
List<Integer> list = new ArrayList<>();
|
||||||
|
for (String str : its) {
|
||||||
|
if (str.trim().isEmpty()) continue;
|
||||||
|
list.add(Integer.getInteger(str.trim()));
|
||||||
|
}
|
||||||
|
if (!list.isEmpty()) this.ports = list.stream().mapToInt(x -> x).toArray();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void destroy(AnyValue config) {
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean containsProtocol(String protocol) {
|
||||||
|
if (protocol == null || protocol.isEmpty()) return false;
|
||||||
|
return protocols == null || Utility.contains(protocols, protocol.toUpperCase());
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean containsPort(int port) {
|
||||||
|
if (ports == null || ports.length == 0) return true;
|
||||||
|
return Utility.contains(ports, port);
|
||||||
|
}
|
||||||
|
|
||||||
|
//注册服务
|
||||||
|
public void register(NodeServer ns, Set<Service> localServices, Set<Service> remoteServices) {
|
||||||
|
if (localServices.isEmpty()) return;
|
||||||
|
for (Service service : localServices) { //注册本地模式
|
||||||
|
register(ns, service);
|
||||||
|
}
|
||||||
|
//远程模式不注册
|
||||||
|
}
|
||||||
|
|
||||||
|
//注销服务
|
||||||
|
public void deregister(NodeServer ns, Set<Service> localServices, Set<Service> remoteServices) {
|
||||||
|
for (Service service : localServices) {//注销本地模式
|
||||||
|
deregister(ns, service);
|
||||||
|
}
|
||||||
|
//远程模式不注册
|
||||||
|
}
|
||||||
|
|
||||||
|
//注册服务
|
||||||
|
public abstract void register(NodeServer server, Service service);
|
||||||
|
|
||||||
|
//注销服务
|
||||||
|
public abstract void deregister(NodeServer server, Service service);
|
||||||
|
|
||||||
|
//格式: protocol:classtype-resourcename
|
||||||
|
public String generateServiceType(NodeServer ns, Service service) {
|
||||||
|
if (!Sncp.isSncpDyn(service)) return ns.server.getProtocol().toLowerCase() + ":" + service.getClass().getName();
|
||||||
|
return ns.server.getProtocol().toLowerCase() + ":" + Sncp.getResourceType(service).getName() + "-" + Sncp.getResourceName(service);
|
||||||
|
}
|
||||||
|
|
||||||
|
//格式: protocol:classtype-resourcename:nodeid
|
||||||
|
public String generateServiceId(NodeServer ns, Service service) {
|
||||||
|
return generateServiceType(ns, service) + ":" + this.nodeid;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return JsonConvert.root().convertTo(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getNodeid() {
|
||||||
|
return nodeid;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setNodeid(int nodeid) {
|
||||||
|
this.nodeid = nodeid;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getName() {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setName(String name) {
|
||||||
|
this.name = name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String[] getProtocols() {
|
||||||
|
return protocols;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setProtocols(String[] protocols) {
|
||||||
|
this.protocols = protocols;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int[] getPorts() {
|
||||||
|
return ports;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPorts(int[] ports) {
|
||||||
|
this.ports = ports;
|
||||||
|
}
|
||||||
|
|
||||||
|
public AnyValue getConfig() {
|
||||||
|
return config;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setConfig(AnyValue config) {
|
||||||
|
this.config = config;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -515,10 +515,24 @@ public abstract class NodeServer {
|
|||||||
|
|
||||||
//Service.init执行之前调用
|
//Service.init执行之前调用
|
||||||
protected void preInitServices(Set<Service> localServices, Set<Service> remoteServices) {
|
protected void preInitServices(Set<Service> localServices, Set<Service> remoteServices) {
|
||||||
|
final ClusterAgent[] clusters = application.clusterAgents;
|
||||||
|
if (clusters == null || clusters.length == 0) return;
|
||||||
|
for (ClusterAgent cluster : clusters) {
|
||||||
|
if (!cluster.containsProtocol(server.getProtocol())) continue;
|
||||||
|
if (!cluster.containsPort(server.getSocketAddress().getPort())) continue;
|
||||||
|
cluster.register(this, localServices, remoteServices);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//Service.destroy执行之前调用
|
//Service.destroy执行之前调用
|
||||||
protected void preDestroyServices(Set<Service> localServices, Set<Service> remoteServices) {
|
protected void preDestroyServices(Set<Service> localServices, Set<Service> remoteServices) {
|
||||||
|
final ClusterAgent[] clusters = application.clusterAgents;
|
||||||
|
if (clusters == null || clusters.length == 0) return;
|
||||||
|
for (ClusterAgent cluster : clusters) {
|
||||||
|
if (!cluster.containsProtocol(server.getProtocol())) continue;
|
||||||
|
if (!cluster.containsPort(server.getSocketAddress().getPort())) continue;
|
||||||
|
cluster.deregister(this, localServices, remoteServices);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract ClassFilter<Filter> createFilterClassFilter();
|
protected abstract ClassFilter<Filter> createFilterClassFilter();
|
||||||
|
|||||||
@@ -1,18 +0,0 @@
|
|||||||
/*
|
|
||||||
* To change this license header, choose License Headers in Project Properties.
|
|
||||||
* To change this template file, choose Tools | Templates
|
|
||||||
* and open the template in the editor.
|
|
||||||
*/
|
|
||||||
package org.redkale.watch;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 第三方服务发现管理接口cluster
|
|
||||||
*
|
|
||||||
*
|
|
||||||
* 详情见: https://redkale.org
|
|
||||||
*
|
|
||||||
* @author zhangjx
|
|
||||||
*/
|
|
||||||
public abstract class ClusterAgent {
|
|
||||||
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user