This commit is contained in:
@@ -50,7 +50,7 @@
|
|||||||
value: 类名,必须是org.redkale.boot.ClusterAgent的子类
|
value: 类名,必须是org.redkale.boot.ClusterAgent的子类
|
||||||
protocols: 服务发现可以处理的协议, 默认值为: SNCP, 多个协议用分号;隔开
|
protocols: 服务发现可以处理的协议, 默认值为: SNCP, 多个协议用分号;隔开
|
||||||
ports: 服务发现可以处理的端口, 多个端口用分号;隔开
|
ports: 服务发现可以处理的端口, 多个端口用分号;隔开
|
||||||
<cluster name="cluster1" value="org.redkalex.consul.ConsulClusterAgent" protocols="SNCP" ports="7070;7071">
|
<cluster name="cluster1" value="org.redkalex.cluster.ConsulClusterAgent" protocols="SNCP" ports="7070;7071">
|
||||||
<property name="xxxxxx" value="XXXXXXXX"/>
|
<property name="xxxxxx" value="XXXXXXXX"/>
|
||||||
</cluster>
|
</cluster>
|
||||||
-->
|
-->
|
||||||
|
|||||||
@@ -817,15 +817,13 @@ public final class Application {
|
|||||||
for (FilterEntry<NodeServer> entry : entrys) {
|
for (FilterEntry<NodeServer> entry : entrys) {
|
||||||
final Class<? extends NodeServer> type = entry.getType();
|
final Class<? extends NodeServer> type = entry.getType();
|
||||||
NodeProtocol pros = type.getAnnotation(NodeProtocol.class);
|
NodeProtocol pros = type.getAnnotation(NodeProtocol.class);
|
||||||
for (String p : pros.value()) {
|
String p = pros.value().toUpperCase();
|
||||||
p = p.toUpperCase();
|
if ("SNCP".equals(p) || "HTTP".equals(p)) continue;
|
||||||
if ("SNCP".equals(p) || "HTTP".equals(p)) continue;
|
final Class<? extends NodeServer> old = nodeClasses.get(p);
|
||||||
final Class<? extends NodeServer> old = nodeClasses.get(p);
|
if (old != null && old != type) {
|
||||||
if (old != null && old != type) {
|
throw new RuntimeException("Protocol(" + p + ") had NodeServer-Class(" + old.getName() + ") but repeat NodeServer-Class(" + type.getName() + ")");
|
||||||
throw new RuntimeException("Protocol(" + p + ") had NodeServer-Class(" + old.getName() + ") but repeat NodeServer-Class(" + type.getName() + ")");
|
|
||||||
}
|
|
||||||
nodeClasses.put(p, type);
|
|
||||||
}
|
}
|
||||||
|
nodeClasses.put(p, type);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,8 +5,10 @@
|
|||||||
*/
|
*/
|
||||||
package org.redkale.boot;
|
package org.redkale.boot;
|
||||||
|
|
||||||
|
import java.lang.ref.WeakReference;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import org.redkale.convert.json.JsonConvert;
|
import org.redkale.convert.json.JsonConvert;
|
||||||
import org.redkale.net.*;
|
import org.redkale.net.*;
|
||||||
import org.redkale.net.sncp.*;
|
import org.redkale.net.sncp.*;
|
||||||
@@ -35,6 +37,10 @@ public abstract class ClusterAgent {
|
|||||||
|
|
||||||
protected TransportFactory transportFactory;
|
protected TransportFactory transportFactory;
|
||||||
|
|
||||||
|
protected final ConcurrentHashMap<String, ClusterEntry> localEntrys = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
protected final ConcurrentHashMap<String, ClusterEntry> remoteEntrys = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
public void init(AnyValue config) {
|
public void init(AnyValue config) {
|
||||||
this.config = config;
|
this.config = config;
|
||||||
this.name = config.getValue("name", "");
|
this.name = config.getValue("name", "");
|
||||||
@@ -49,7 +55,7 @@ public abstract class ClusterAgent {
|
|||||||
List<Integer> list = new ArrayList<>();
|
List<Integer> list = new ArrayList<>();
|
||||||
for (String str : its) {
|
for (String str : its) {
|
||||||
if (str.trim().isEmpty()) continue;
|
if (str.trim().isEmpty()) continue;
|
||||||
list.add(Integer.getInteger(str.trim()));
|
list.add(Integer.parseInt(str.trim()));
|
||||||
}
|
}
|
||||||
if (!list.isEmpty()) this.ports = list.stream().mapToInt(x -> x).toArray();
|
if (!list.isEmpty()) this.ports = list.stream().mapToInt(x -> x).toArray();
|
||||||
}
|
}
|
||||||
@@ -69,56 +75,60 @@ public abstract class ClusterAgent {
|
|||||||
}
|
}
|
||||||
|
|
||||||
//注册服务
|
//注册服务
|
||||||
public void register(NodeServer ns, Set<Service> localServices, Set<Service> remoteServices) {
|
public void register(NodeServer ns, String protocol, Set<Service> localServices, Set<Service> remoteServices) {
|
||||||
if (localServices.isEmpty()) return;
|
if (localServices.isEmpty()) return;
|
||||||
//注册本地模式
|
//注册本地模式
|
||||||
for (Service service : localServices) {
|
for (Service service : localServices) {
|
||||||
register(ns, service);
|
register(ns, protocol, service);
|
||||||
|
ClusterEntry entry = new ClusterEntry(ns, protocol, service);
|
||||||
|
localEntrys.put(entry.serviceid, entry);
|
||||||
}
|
}
|
||||||
//远程模式加载IP列表, 只能是SNCP协议
|
//远程模式加载IP列表, 只能是SNCP协议
|
||||||
for (Service service : remoteServices) {
|
for (Service service : remoteServices) {
|
||||||
updateTransport(ns, service);
|
updateTransport(ns, protocol, service);
|
||||||
|
ClusterEntry entry = new ClusterEntry(ns, protocol, service);
|
||||||
|
remoteEntrys.put(entry.serviceid, entry);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//注销服务
|
//注销服务
|
||||||
public void deregister(NodeServer ns, Set<Service> localServices, Set<Service> remoteServices) {
|
public void deregister(NodeServer ns, String protocol, Set<Service> localServices, Set<Service> remoteServices) {
|
||||||
//注销本地模式
|
//注销本地模式
|
||||||
for (Service service : localServices) {
|
for (Service service : localServices) {
|
||||||
deregister(ns, service);
|
deregister(ns, protocol, service);
|
||||||
}
|
}
|
||||||
//远程模式不注册
|
//远程模式不注册
|
||||||
}
|
}
|
||||||
|
|
||||||
//获取远程服务的可用ip列表
|
//获取远程服务的可用ip列表
|
||||||
public abstract List<InetSocketAddress> queryAddress(NodeServer ns, Service service);
|
public abstract List<InetSocketAddress> queryAddress(NodeServer ns, String protocol, Service service);
|
||||||
|
|
||||||
//注册服务
|
//注册服务
|
||||||
public abstract void register(NodeServer ns, Service service);
|
public abstract void register(NodeServer ns, String protocol, Service service);
|
||||||
|
|
||||||
//注销服务
|
//注销服务
|
||||||
public abstract void deregister(NodeServer ns, Service service);
|
public abstract void deregister(NodeServer ns, String protocol, Service service);
|
||||||
|
|
||||||
//格式: protocol:classtype-resourcename
|
//格式: protocol:classtype-resourcename
|
||||||
public void updateTransport(NodeServer ns, Service service) {
|
public void updateTransport(NodeServer ns, String protocol, Service service) {
|
||||||
Server server = ns.getServer();
|
Server server = ns.getServer();
|
||||||
String netprotocol = server instanceof SncpServer ? ((SncpServer) server).getNetprotocol() : Transport.DEFAULT_PROTOCOL;
|
String netprotocol = server instanceof SncpServer ? ((SncpServer) server).getNetprotocol() : Transport.DEFAULT_PROTOCOL;
|
||||||
if (!Sncp.isSncpDyn(service)) return;
|
if (!Sncp.isSncpDyn(service)) return;
|
||||||
List<InetSocketAddress> addrs = queryAddress(ns, service);
|
List<InetSocketAddress> addrs = queryAddress(ns, protocol, service);
|
||||||
if (addrs != null && !addrs.isEmpty()) {
|
if (addrs != null && !addrs.isEmpty()) {
|
||||||
Sncp.updateTransport(service, transportFactory, Sncp.getResourceType(service).getName() + "-" + Sncp.getResourceName(service), netprotocol, ns.getSncpAddress(), null, addrs);
|
Sncp.updateTransport(service, transportFactory, Sncp.getResourceType(service).getName() + "-" + Sncp.getResourceName(service), netprotocol, ns.getSncpAddress(), null, addrs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//格式: protocol:classtype-resourcename
|
//格式: protocol:classtype-resourcename
|
||||||
public String generateServiceType(NodeServer ns, Service service) {
|
public String generateServiceType(NodeServer ns, String protocol, Service service) {
|
||||||
if (!Sncp.isSncpDyn(service)) return ns.server.getProtocol().toLowerCase() + ":" + service.getClass().getName();
|
if (!Sncp.isSncpDyn(service)) return protocol.toLowerCase() + ":" + service.getClass().getName();
|
||||||
return ns.server.getProtocol().toLowerCase() + ":" + Sncp.getResourceType(service).getName() + "-" + Sncp.getResourceName(service);
|
return protocol.toLowerCase() + ":" + Sncp.getResourceType(service).getName() + "-" + Sncp.getResourceName(service);
|
||||||
}
|
}
|
||||||
|
|
||||||
//格式: protocol:classtype-resourcename:nodeid
|
//格式: protocol:classtype-resourcename:nodeid
|
||||||
public String generateServiceId(NodeServer ns, Service service) {
|
public String generateServiceId(NodeServer ns, String protocol, Service service) {
|
||||||
return generateServiceType(ns, service) + ":" + this.nodeid;
|
return generateServiceType(ns, protocol, service) + ":" + this.nodeid;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -174,4 +184,21 @@ public abstract class ClusterAgent {
|
|||||||
this.config = config;
|
this.config = config;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public class ClusterEntry {
|
||||||
|
|
||||||
|
public String serviceid;
|
||||||
|
|
||||||
|
public String servicetype;
|
||||||
|
|
||||||
|
public WeakReference<Service> serviceref;
|
||||||
|
|
||||||
|
public InetSocketAddress address;
|
||||||
|
|
||||||
|
public ClusterEntry(NodeServer ns, String protocol, Service service) {
|
||||||
|
this.serviceid = generateServiceId(ns, protocol, service);
|
||||||
|
this.servicetype = generateServiceType(ns, protocol, service);
|
||||||
|
this.address = ns.getSocketAddress();
|
||||||
|
this.serviceref = new WeakReference(service);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -30,7 +30,7 @@ import org.redkale.watch.*;
|
|||||||
*
|
*
|
||||||
* @author zhangjx
|
* @author zhangjx
|
||||||
*/
|
*/
|
||||||
@NodeProtocol({"HTTP"})
|
@NodeProtocol("HTTP")
|
||||||
public class NodeHttpServer extends NodeServer {
|
public class NodeHttpServer extends NodeServer {
|
||||||
|
|
||||||
protected final boolean rest; //是否加载REST服务, 为true加载rest节点信息并将所有可REST化的Service生成RestServlet
|
protected final boolean rest; //是否加载REST服务, 为true加载rest节点信息并将所有可REST化的Service生成RestServlet
|
||||||
|
|||||||
@@ -20,5 +20,5 @@ import java.lang.annotation.*;
|
|||||||
@Documented
|
@Documented
|
||||||
public @interface NodeProtocol {
|
public @interface NodeProtocol {
|
||||||
|
|
||||||
String[] value();
|
String value();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -517,10 +517,12 @@ public abstract class NodeServer {
|
|||||||
protected void preInitServices(Set<Service> localServices, Set<Service> remoteServices) {
|
protected void preInitServices(Set<Service> localServices, Set<Service> remoteServices) {
|
||||||
final ClusterAgent[] clusters = application.clusterAgents;
|
final ClusterAgent[] clusters = application.clusterAgents;
|
||||||
if (clusters == null || clusters.length == 0) return;
|
if (clusters == null || clusters.length == 0) return;
|
||||||
|
NodeProtocol pros = getClass().getAnnotation(NodeProtocol.class);
|
||||||
|
String protocol = pros.value().toUpperCase();
|
||||||
for (ClusterAgent cluster : clusters) {
|
for (ClusterAgent cluster : clusters) {
|
||||||
if (!cluster.containsProtocol(server.getProtocol())) continue;
|
if (!cluster.containsProtocol(protocol)) continue;
|
||||||
if (!cluster.containsPort(server.getSocketAddress().getPort())) continue;
|
if (!cluster.containsPort(server.getSocketAddress().getPort())) continue;
|
||||||
cluster.register(this, localServices, remoteServices);
|
cluster.register(this, protocol, localServices, remoteServices);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -528,10 +530,12 @@ public abstract class NodeServer {
|
|||||||
protected void preDestroyServices(Set<Service> localServices, Set<Service> remoteServices) {
|
protected void preDestroyServices(Set<Service> localServices, Set<Service> remoteServices) {
|
||||||
final ClusterAgent[] clusters = application.clusterAgents;
|
final ClusterAgent[] clusters = application.clusterAgents;
|
||||||
if (clusters == null || clusters.length == 0) return;
|
if (clusters == null || clusters.length == 0) return;
|
||||||
|
NodeProtocol pros = getClass().getAnnotation(NodeProtocol.class);
|
||||||
|
String protocol = pros.value().toUpperCase();
|
||||||
for (ClusterAgent cluster : clusters) {
|
for (ClusterAgent cluster : clusters) {
|
||||||
if (!cluster.containsProtocol(server.getProtocol())) continue;
|
if (!cluster.containsProtocol(protocol)) continue;
|
||||||
if (!cluster.containsPort(server.getSocketAddress().getPort())) continue;
|
if (!cluster.containsPort(server.getSocketAddress().getPort())) continue;
|
||||||
cluster.deregister(this, localServices, remoteServices);
|
cluster.deregister(this, protocol, localServices, remoteServices);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ import org.redkale.util.AnyValue.DefaultAnyValue;
|
|||||||
*
|
*
|
||||||
* @author zhangjx
|
* @author zhangjx
|
||||||
*/
|
*/
|
||||||
@NodeProtocol({"SNCP"})
|
@NodeProtocol("SNCP")
|
||||||
public class NodeSncpServer extends NodeServer {
|
public class NodeSncpServer extends NodeServer {
|
||||||
|
|
||||||
protected final SncpServer sncpServer;
|
protected final SncpServer sncpServer;
|
||||||
|
|||||||
@@ -16,7 +16,7 @@ import org.redkale.watch.*;
|
|||||||
*
|
*
|
||||||
* @author zhangjx
|
* @author zhangjx
|
||||||
*/
|
*/
|
||||||
@NodeProtocol({"WATCH"})
|
@NodeProtocol("WATCH")
|
||||||
public class NodeWatchServer extends NodeHttpServer {
|
public class NodeWatchServer extends NodeHttpServer {
|
||||||
|
|
||||||
public NodeWatchServer(Application application, AnyValue serconf) {
|
public NodeWatchServer(Application application, AnyValue serconf) {
|
||||||
|
|||||||
@@ -81,7 +81,7 @@ public class ServerWatchService extends AbstractWatchService {
|
|||||||
protocol += "/HTTP";
|
protocol += "/HTTP";
|
||||||
} else {
|
} else {
|
||||||
NodeProtocol np = node.getClass().getAnnotation(NodeProtocol.class);
|
NodeProtocol np = node.getClass().getAnnotation(NodeProtocol.class);
|
||||||
if (np != null && np.value().length > 0) protocol += "/" + np.value()[0];
|
protocol += "/" + np.value();
|
||||||
}
|
}
|
||||||
rs.put("name", server.getName());
|
rs.put("name", server.getName());
|
||||||
rs.put("protocol", protocol);
|
rs.put("protocol", protocol);
|
||||||
|
|||||||
Reference in New Issue
Block a user