This commit is contained in:
Redkale
2020-05-17 23:37:22 +08:00
parent 82e5fb593e
commit 16dec2cde5
3 changed files with 32 additions and 40 deletions

View File

@@ -45,12 +45,12 @@
<transport bufferCapacity="32K" bufferPoolSize="32" threads="32" readTimeoutSeconds="6" writeTimeoutSeconds="6"/> <transport bufferCapacity="32K" bufferPoolSize="32" threads="32" readTimeoutSeconds="6" writeTimeoutSeconds="6"/>
<!-- <!--
【节点全局唯一】
第三方服务发现管理接口 第三方服务发现管理接口
name: cluster名称不能包含特殊字符。
value 类名必须是org.redkale.boot.ClusterAgent的子类 value 类名必须是org.redkale.boot.ClusterAgent的子类
protocols: 服务发现可以处理的协议, 默认值为: SNCP, 多个协议用分号;隔开 protocols: 服务发现可以处理的协议, 默认值为: SNCP, 多个协议用分号;隔开
ports: 服务发现可以处理的端口, 多个端口用分号;隔开 ports: 服务发现可以处理的端口, 多个端口用分号;隔开
<cluster name="cluster1" value="org.redkalex.cluster.ConsulClusterAgent" protocols="SNCP" ports="7070;7071"> <cluster value="org.redkalex.cluster.ConsulClusterAgent" protocols="SNCP" ports="7070;7071">
<property name="xxxxxx" value="XXXXXXXX"/> <property name="xxxxxx" value="XXXXXXXX"/>
</cluster> </cluster>
--> -->

View File

@@ -125,7 +125,7 @@ public final class Application {
final TransportFactory sncpTransportFactory; final TransportFactory sncpTransportFactory;
//第三方服务发现管理接口 //第三方服务发现管理接口
final ClusterAgent[] clusterAgents; final ClusterAgent clusterAgent;
//全局根ResourceFactory //全局根ResourceFactory
final ResourceFactory resourceFactory = ResourceFactory.root(); final ResourceFactory resourceFactory = ResourceFactory.root();
@@ -267,7 +267,7 @@ public final class Application {
AsynchronousChannelGroup transportGroup = null; AsynchronousChannelGroup transportGroup = null;
final AnyValue resources = config.getAnyValue("resources"); final AnyValue resources = config.getAnyValue("resources");
TransportStrategy strategy = null; TransportStrategy strategy = null;
List<ClusterAgent> clusters = new ArrayList<>(); ClusterAgent cluster = null;
int bufferCapacity = 32 * 1024; int bufferCapacity = 32 * 1024;
int bufferPoolSize = Runtime.getRuntime().availableProcessors() * 8; int bufferPoolSize = Runtime.getRuntime().availableProcessors() * 8;
int readTimeoutSeconds = TransportFactory.DEFAULT_READTIMEOUTSECONDS; int readTimeoutSeconds = TransportFactory.DEFAULT_READTIMEOUTSECONDS;
@@ -311,24 +311,21 @@ public final class Application {
} }
logger.log(Level.INFO, Transport.class.getSimpleName() + " configure bufferCapacity = " + bufferCapacity / 1024 + "K; bufferPoolSize = " + bufferPoolSize + "; threads = " + threads + ";"); logger.log(Level.INFO, Transport.class.getSimpleName() + " configure bufferCapacity = " + bufferCapacity / 1024 + "K; bufferPoolSize = " + bufferPoolSize + "; threads = " + threads + ";");
} }
AnyValue[] clusterConfs = resources.getAnyValues("cluster"); AnyValue clusterConf = resources.getAnyValue("cluster");
if (clusterConfs != null && clusterConfs.length > 0) { if (clusterConf != null) {
for (AnyValue clusterConf : clusterConfs) {
try { try {
Class type = classLoader.loadClass(clusterConf.getValue("value")); Class type = classLoader.loadClass(clusterConf.getValue("value"));
if (!ClusterAgent.class.isAssignableFrom(type)) { if (!ClusterAgent.class.isAssignableFrom(type)) {
logger.log(Level.SEVERE, "load application cluster resource, but not " + ClusterAgent.class.getSimpleName() + " error: " + clusterConf); logger.log(Level.SEVERE, "load application cluster resource, but not " + ClusterAgent.class.getSimpleName() + " error: " + clusterConf);
} else { } else {
ClusterAgent cluster = (ClusterAgent) type.getDeclaredConstructor().newInstance(); cluster = (ClusterAgent) type.getDeclaredConstructor().newInstance();
cluster.setConfig(clusterConf); cluster.setConfig(clusterConf);
clusters.add(cluster);
} }
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.SEVERE, "load application cluster resource error: " + clusterConf, e); logger.log(Level.SEVERE, "load application cluster resource error: " + clusterConf, e);
} }
} }
} }
}
if (transportGroup == null) { if (transportGroup == null) {
final AtomicInteger counter = new AtomicInteger(); final AtomicInteger counter = new AtomicInteger();
transportExec = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 8, (Runnable r) -> { transportExec = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 8, (Runnable r) -> {
@@ -357,12 +354,12 @@ public final class Application {
.addValue(TransportFactory.NAME_PINGINTERVAL, System.getProperty("net.transport.pinginterval", "30")) .addValue(TransportFactory.NAME_PINGINTERVAL, System.getProperty("net.transport.pinginterval", "30"))
.addValue(TransportFactory.NAME_CHECKINTERVAL, System.getProperty("net.transport.checkinterval", "30")); .addValue(TransportFactory.NAME_CHECKINTERVAL, System.getProperty("net.transport.checkinterval", "30"));
this.sncpTransportFactory.init(tarnsportConf, Sncp.PING_BUFFER, Sncp.PONG_BUFFER.remaining()); this.sncpTransportFactory.init(tarnsportConf, Sncp.PING_BUFFER, Sncp.PONG_BUFFER.remaining());
for (ClusterAgent cluster : clusters) { if (cluster != null) {
cluster.setNodeid(this.nodeid); cluster.setNodeid(this.nodeid);
cluster.setTransportFactory(this.sncpTransportFactory); cluster.setTransportFactory(this.sncpTransportFactory);
cluster.init(cluster.getConfig()); cluster.init(cluster.getConfig());
} }
this.clusterAgents = clusters.isEmpty() ? null : clusters.toArray(new ClusterAgent[clusters.size()]); this.clusterAgent = cluster;
Thread.currentThread().setContextClassLoader(this.classLoader); Thread.currentThread().setContextClassLoader(this.classLoader);
this.serverClassLoader = new RedkaleClassLoader(this.classLoader); this.serverClassLoader = new RedkaleClassLoader(this.classLoader);
} }
@@ -375,8 +372,8 @@ public final class Application {
return sncpTransportFactory; return sncpTransportFactory;
} }
public ClusterAgent[] getClusterAgents() { public ClusterAgent getClusterAgent() {
return clusterAgents; return clusterAgent;
} }
public RedkaleClassLoader getClassLoader() { public RedkaleClassLoader getClassLoader() {
@@ -947,10 +944,8 @@ public final class Application {
serversLatch.countDown(); serversLatch.countDown();
} }
}); });
if (clusterAgents != null) { if (clusterAgent != null) {
for (ClusterAgent cluster : clusterAgents) { clusterAgent.destroy(clusterAgent.getConfig());
if (cluster != null) cluster.destroy(cluster.getConfig());
}
} }
for (DataSource source : dataSources) { for (DataSource source : dataSources) {
if (source == null) continue; if (source == null) continue;

View File

@@ -518,29 +518,26 @@ public abstract class NodeServer {
//Service.init执行之前调用 //Service.init执行之前调用
protected void preInitServices(Set<Service> localServices, Set<Service> remoteServices) { protected void preInitServices(Set<Service> localServices, Set<Service> remoteServices) {
final ClusterAgent[] clusters = application.clusterAgents; final ClusterAgent cluster = application.clusterAgent;
if (clusters == null || clusters.length == 0) return; if (cluster == null) return;
NodeProtocol pros = getClass().getAnnotation(NodeProtocol.class); NodeProtocol pros = getClass().getAnnotation(NodeProtocol.class);
String protocol = pros.value().toUpperCase(); String protocol = pros.value().toUpperCase();
for (ClusterAgent cluster : clusters) { if (!cluster.containsProtocol(protocol)) return;
if (!cluster.containsProtocol(protocol)) continue; if (!cluster.containsPort(server.getSocketAddress().getPort())) return;
if (!cluster.containsPort(server.getSocketAddress().getPort())) continue;
cluster.register(this, protocol, localServices, remoteServices); cluster.register(this, protocol, localServices, remoteServices);
}
} }
//Service.destroy执行之前调用 //Service.destroy执行之前调用
protected void preDestroyServices(Set<Service> localServices, Set<Service> remoteServices) { protected void preDestroyServices(Set<Service> localServices, Set<Service> remoteServices) {
final ClusterAgent[] clusters = application.clusterAgents; final ClusterAgent cluster = application.clusterAgent;
if (clusters == null || clusters.length == 0) return; if (cluster == null) return;
NodeProtocol pros = getClass().getAnnotation(NodeProtocol.class); NodeProtocol pros = getClass().getAnnotation(NodeProtocol.class);
String protocol = pros.value().toUpperCase(); String protocol = pros.value().toUpperCase();
for (ClusterAgent cluster : clusters) { if (!cluster.containsProtocol(protocol)) return;
if (!cluster.containsProtocol(protocol)) continue; if (!cluster.containsPort(server.getSocketAddress().getPort())) return;
if (!cluster.containsPort(server.getSocketAddress().getPort())) continue;
cluster.deregister(this, protocol, localServices, remoteServices); cluster.deregister(this, protocol, localServices, remoteServices);
} }
}
protected abstract ClassFilter<Filter> createFilterClassFilter(); protected abstract ClassFilter<Filter> createFilterClassFilter();