From 7633687665faf855d1a5ff99a87dcfe48f24aa33 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Sat, 16 May 2020 22:24:37 +0800 Subject: [PATCH] --- src/org/redkale/boot/Application.java | 10 +++++-- src/org/redkale/boot/ClusterAgent.java | 41 ++++++++++++++++++-------- src/org/redkale/boot/NodeServer.java | 4 +-- 3 files changed, 37 insertions(+), 18 deletions(-) diff --git a/src/org/redkale/boot/Application.java b/src/org/redkale/boot/Application.java index 7c536640f..965d1300f 100644 --- a/src/org/redkale/boot/Application.java +++ b/src/org/redkale/boot/Application.java @@ -320,8 +320,7 @@ public final class Application { logger.log(Level.SEVERE, "load application cluster resource, but not " + ClusterAgent.class.getSimpleName() + " error: " + clusterConf); } else { ClusterAgent cluster = (ClusterAgent) type.getDeclaredConstructor().newInstance(); - cluster.setNodeid(this.nodeid); - cluster.init(clusterConf); + cluster.setConfig(clusterConf); clusters.add(cluster); } } catch (Exception e) { @@ -353,12 +352,17 @@ public final class Application { return true; }); } - this.clusterAgents = clusters == null ? null : clusters.toArray(new ClusterAgent[clusters.size()]); this.sncpTransportFactory = TransportFactory.create(transportExec, transportPool, transportGroup, (SSLContext) null, readTimeoutSeconds, writeTimeoutSeconds, strategy); DefaultAnyValue tarnsportConf = DefaultAnyValue.create(TransportFactory.NAME_POOLMAXCONNS, System.getProperty("net.transport.poolmaxconns", "100")) .addValue(TransportFactory.NAME_PINGINTERVAL, System.getProperty("net.transport.pinginterval", "30")) .addValue(TransportFactory.NAME_CHECKINTERVAL, System.getProperty("net.transport.checkinterval", "30")); this.sncpTransportFactory.init(tarnsportConf, Sncp.PING_BUFFER, Sncp.PONG_BUFFER.remaining()); + for (ClusterAgent cluster : clusters) { + cluster.setNodeid(this.nodeid); + cluster.setTransportFactory(this.sncpTransportFactory); + cluster.init(cluster.getConfig()); + } + this.clusterAgents = clusters.isEmpty() ? null : clusters.toArray(new ClusterAgent[clusters.size()]); Thread.currentThread().setContextClassLoader(this.classLoader); this.serverClassLoader = new RedkaleClassLoader(this.classLoader); } diff --git a/src/org/redkale/boot/ClusterAgent.java b/src/org/redkale/boot/ClusterAgent.java index e804bc465..46c162634 100644 --- a/src/org/redkale/boot/ClusterAgent.java +++ b/src/org/redkale/boot/ClusterAgent.java @@ -33,6 +33,8 @@ public abstract class ClusterAgent { protected AnyValue config; + protected TransportFactory transportFactory; + public void init(AnyValue config) { this.config = config; this.name = config.getValue("name", ""); @@ -67,29 +69,23 @@ public abstract class ClusterAgent { } //注册服务 - public void register(NodeServer ns, TransportFactory transportFactory, Set localServices, Set remoteServices) { + public void register(NodeServer ns, Set localServices, Set remoteServices) { if (localServices.isEmpty()) return; //注册本地模式 for (Service service : localServices) { - register(ns, transportFactory, service); + register(ns, service); } - Server server = ns.getServer(); - String netprotocol = server instanceof SncpServer ? ((SncpServer) server).getNetprotocol() : Transport.DEFAULT_PROTOCOL; //远程模式加载IP列表, 只能是SNCP协议 for (Service service : remoteServices) { - if (!Sncp.isSncpDyn(service)) continue; - List addrs = queryAddress(ns, service); - if (addrs != null && !addrs.isEmpty()) { - Sncp.updateTransport(service, transportFactory, Sncp.getResourceType(service).getName() + "-" + Sncp.getResourceName(service), netprotocol, ns.getSncpAddress(), null, addrs); - } + updateTransport(ns, service); } } //注销服务 - public void deregister(NodeServer ns, TransportFactory transportFactory, Set localServices, Set remoteServices) { + public void deregister(NodeServer ns, Set localServices, Set remoteServices) { //注销本地模式 for (Service service : localServices) { - deregister(ns, transportFactory, service); + deregister(ns, service); } //远程模式不注册 } @@ -98,10 +94,21 @@ public abstract class ClusterAgent { public abstract List queryAddress(NodeServer ns, Service service); //注册服务 - public abstract void register(NodeServer ns, TransportFactory transportFactory, Service service); + public abstract void register(NodeServer ns, Service service); //注销服务 - public abstract void deregister(NodeServer ns, TransportFactory transportFactory, Service service); + public abstract void deregister(NodeServer ns, Service service); + + //格式: protocol:classtype-resourcename + public void updateTransport(NodeServer ns, Service service) { + Server server = ns.getServer(); + String netprotocol = server instanceof SncpServer ? ((SncpServer) server).getNetprotocol() : Transport.DEFAULT_PROTOCOL; + if (!Sncp.isSncpDyn(service)) return; + List addrs = queryAddress(ns, service); + if (addrs != null && !addrs.isEmpty()) { + Sncp.updateTransport(service, transportFactory, Sncp.getResourceType(service).getName() + "-" + Sncp.getResourceName(service), netprotocol, ns.getSncpAddress(), null, addrs); + } + } //格式: protocol:classtype-resourcename public String generateServiceType(NodeServer ns, Service service) { @@ -127,6 +134,14 @@ public abstract class ClusterAgent { this.nodeid = nodeid; } + public TransportFactory getTransportFactory() { + return transportFactory; + } + + public void setTransportFactory(TransportFactory transportFactory) { + this.transportFactory = transportFactory; + } + public String getName() { return name; } diff --git a/src/org/redkale/boot/NodeServer.java b/src/org/redkale/boot/NodeServer.java index cf3549226..94c43abfb 100644 --- a/src/org/redkale/boot/NodeServer.java +++ b/src/org/redkale/boot/NodeServer.java @@ -520,7 +520,7 @@ public abstract class NodeServer { for (ClusterAgent cluster : clusters) { if (!cluster.containsProtocol(server.getProtocol())) continue; if (!cluster.containsPort(server.getSocketAddress().getPort())) continue; - cluster.register(this, application.getSncpTransportFactory(), localServices, remoteServices); + cluster.register(this, localServices, remoteServices); } } @@ -531,7 +531,7 @@ public abstract class NodeServer { for (ClusterAgent cluster : clusters) { if (!cluster.containsProtocol(server.getProtocol())) continue; if (!cluster.containsPort(server.getSocketAddress().getPort())) continue; - cluster.deregister(this, application.getSncpTransportFactory(), localServices, remoteServices); + cluster.deregister(this, localServices, remoteServices); } }