This commit is contained in:
Redkale
2020-05-16 22:24:37 +08:00
parent a1e37643d0
commit 7633687665
3 changed files with 37 additions and 18 deletions

View File

@@ -320,8 +320,7 @@ public final class Application {
logger.log(Level.SEVERE, "load application cluster resource, but not " + ClusterAgent.class.getSimpleName() + " error: " + clusterConf);
} else {
ClusterAgent cluster = (ClusterAgent) type.getDeclaredConstructor().newInstance();
cluster.setNodeid(this.nodeid);
cluster.init(clusterConf);
cluster.setConfig(clusterConf);
clusters.add(cluster);
}
} catch (Exception e) {
@@ -353,12 +352,17 @@ public final class Application {
return true;
});
}
this.clusterAgents = clusters == null ? null : clusters.toArray(new ClusterAgent[clusters.size()]);
this.sncpTransportFactory = TransportFactory.create(transportExec, transportPool, transportGroup, (SSLContext) null, readTimeoutSeconds, writeTimeoutSeconds, strategy);
DefaultAnyValue tarnsportConf = DefaultAnyValue.create(TransportFactory.NAME_POOLMAXCONNS, System.getProperty("net.transport.poolmaxconns", "100"))
.addValue(TransportFactory.NAME_PINGINTERVAL, System.getProperty("net.transport.pinginterval", "30"))
.addValue(TransportFactory.NAME_CHECKINTERVAL, System.getProperty("net.transport.checkinterval", "30"));
this.sncpTransportFactory.init(tarnsportConf, Sncp.PING_BUFFER, Sncp.PONG_BUFFER.remaining());
for (ClusterAgent cluster : clusters) {
cluster.setNodeid(this.nodeid);
cluster.setTransportFactory(this.sncpTransportFactory);
cluster.init(cluster.getConfig());
}
this.clusterAgents = clusters.isEmpty() ? null : clusters.toArray(new ClusterAgent[clusters.size()]);
Thread.currentThread().setContextClassLoader(this.classLoader);
this.serverClassLoader = new RedkaleClassLoader(this.classLoader);
}

View File

@@ -33,6 +33,8 @@ public abstract class ClusterAgent {
protected AnyValue config;
protected TransportFactory transportFactory;
public void init(AnyValue config) {
this.config = config;
this.name = config.getValue("name", "");
@@ -67,29 +69,23 @@ public abstract class ClusterAgent {
}
//注册服务
public void register(NodeServer ns, TransportFactory transportFactory, Set<Service> localServices, Set<Service> remoteServices) {
public void register(NodeServer ns, Set<Service> localServices, Set<Service> remoteServices) {
if (localServices.isEmpty()) return;
//注册本地模式
for (Service service : localServices) {
register(ns, transportFactory, service);
register(ns, service);
}
Server server = ns.getServer();
String netprotocol = server instanceof SncpServer ? ((SncpServer) server).getNetprotocol() : Transport.DEFAULT_PROTOCOL;
//远程模式加载IP列表, 只能是SNCP协议
for (Service service : remoteServices) {
if (!Sncp.isSncpDyn(service)) continue;
List<InetSocketAddress> addrs = queryAddress(ns, service);
if (addrs != null && !addrs.isEmpty()) {
Sncp.updateTransport(service, transportFactory, Sncp.getResourceType(service).getName() + "-" + Sncp.getResourceName(service), netprotocol, ns.getSncpAddress(), null, addrs);
}
updateTransport(ns, service);
}
}
//注销服务
public void deregister(NodeServer ns, TransportFactory transportFactory, Set<Service> localServices, Set<Service> remoteServices) {
public void deregister(NodeServer ns, Set<Service> localServices, Set<Service> remoteServices) {
//注销本地模式
for (Service service : localServices) {
deregister(ns, transportFactory, service);
deregister(ns, service);
}
//远程模式不注册
}
@@ -98,10 +94,21 @@ public abstract class ClusterAgent {
public abstract List<InetSocketAddress> queryAddress(NodeServer ns, Service service);
//注册服务
public abstract void register(NodeServer ns, TransportFactory transportFactory, Service service);
public abstract void register(NodeServer ns, Service service);
//注销服务
public abstract void deregister(NodeServer ns, TransportFactory transportFactory, Service service);
public abstract void deregister(NodeServer ns, Service service);
//格式: protocol:classtype-resourcename
public void updateTransport(NodeServer ns, Service service) {
Server server = ns.getServer();
String netprotocol = server instanceof SncpServer ? ((SncpServer) server).getNetprotocol() : Transport.DEFAULT_PROTOCOL;
if (!Sncp.isSncpDyn(service)) return;
List<InetSocketAddress> addrs = queryAddress(ns, service);
if (addrs != null && !addrs.isEmpty()) {
Sncp.updateTransport(service, transportFactory, Sncp.getResourceType(service).getName() + "-" + Sncp.getResourceName(service), netprotocol, ns.getSncpAddress(), null, addrs);
}
}
//格式: protocol:classtype-resourcename
public String generateServiceType(NodeServer ns, Service service) {
@@ -127,6 +134,14 @@ public abstract class ClusterAgent {
this.nodeid = nodeid;
}
public TransportFactory getTransportFactory() {
return transportFactory;
}
public void setTransportFactory(TransportFactory transportFactory) {
this.transportFactory = transportFactory;
}
public String getName() {
return name;
}

View File

@@ -520,7 +520,7 @@ public abstract class NodeServer {
for (ClusterAgent cluster : clusters) {
if (!cluster.containsProtocol(server.getProtocol())) continue;
if (!cluster.containsPort(server.getSocketAddress().getPort())) continue;
cluster.register(this, application.getSncpTransportFactory(), localServices, remoteServices);
cluster.register(this, localServices, remoteServices);
}
}
@@ -531,7 +531,7 @@ public abstract class NodeServer {
for (ClusterAgent cluster : clusters) {
if (!cluster.containsProtocol(server.getProtocol())) continue;
if (!cluster.containsPort(server.getSocketAddress().getPort())) continue;
cluster.deregister(this, application.getSncpTransportFactory(), localServices, remoteServices);
cluster.deregister(this, localServices, remoteServices);
}
}