diff --git a/src/org/redkale/boot/ClusterAgent.java b/src/org/redkale/boot/ClusterAgent.java index 0236176ba..c6b27ee1e 100644 --- a/src/org/redkale/boot/ClusterAgent.java +++ b/src/org/redkale/boot/ClusterAgent.java @@ -8,7 +8,8 @@ package org.redkale.boot; import java.net.InetSocketAddress; import java.util.*; import org.redkale.convert.json.JsonConvert; -import org.redkale.net.sncp.Sncp; +import org.redkale.net.*; +import org.redkale.net.sncp.*; import org.redkale.service.Service; import org.redkale.util.*; @@ -66,30 +67,42 @@ public abstract class ClusterAgent { } //注册服务 - public void register(NodeServer ns, Set localServices, Set remoteServices) { + public void register(NodeServer ns, TransportFactory transportFactory, Set localServices, Set remoteServices) { if (localServices.isEmpty()) return; - for (Service service : localServices) { //注册本地模式 - register(ns, service); + //注册本地模式 + for (Service service : localServices) { + register(ns, transportFactory, service); + } + Server server = ns.getServer(); + String subprotocol = server instanceof SncpServer ? ((SncpServer) server).getSubprotocol() : "TCP"; + //远程模式加载IP列表, 只能是SNCP协议 + for (Service service : remoteServices) { + if (!Sncp.isSncpDyn(service)) continue; + List addrs = queryAddress(ns, service); + if (addrs != null && !addrs.isEmpty()) { + SncpClient client = Sncp.getSncpClient(service); + if (client != null) client.setRemoteGroupTransport(transportFactory.createTransport(Sncp.getResourceType(service).getName(), server.getProtocol(), subprotocol, ns.getSncpAddress(), addrs)); + } } - //远程模式不注册 } //注销服务 - public void deregister(NodeServer ns, Set localServices, Set remoteServices) { - for (Service service : localServices) {//注销本地模式 - deregister(ns, service); + public void deregister(NodeServer ns, TransportFactory transportFactory, Set localServices, Set remoteServices) { + //注销本地模式 + for (Service service : localServices) { + deregister(ns, transportFactory, service); } //远程模式不注册 } //获取远程服务的可用ip列表 - public abstract List queryAddress(NodeServer server, Service service); + public abstract List queryAddress(NodeServer ns, Service service); //注册服务 - public abstract void register(NodeServer server, Service service); + public abstract void register(NodeServer ns, TransportFactory transportFactory, Service service); //注销服务 - public abstract void deregister(NodeServer server, Service service); + public abstract void deregister(NodeServer ns, TransportFactory transportFactory, Service service); //格式: protocol:classtype-resourcename public String generateServiceType(NodeServer ns, Service service) { diff --git a/src/org/redkale/boot/NodeServer.java b/src/org/redkale/boot/NodeServer.java index 94c43abfb..cf3549226 100644 --- a/src/org/redkale/boot/NodeServer.java +++ b/src/org/redkale/boot/NodeServer.java @@ -520,7 +520,7 @@ public abstract class NodeServer { for (ClusterAgent cluster : clusters) { if (!cluster.containsProtocol(server.getProtocol())) continue; if (!cluster.containsPort(server.getSocketAddress().getPort())) continue; - cluster.register(this, localServices, remoteServices); + cluster.register(this, application.getSncpTransportFactory(), localServices, remoteServices); } } @@ -531,7 +531,7 @@ public abstract class NodeServer { for (ClusterAgent cluster : clusters) { if (!cluster.containsProtocol(server.getProtocol())) continue; if (!cluster.containsPort(server.getSocketAddress().getPort())) continue; - cluster.deregister(this, localServices, remoteServices); + cluster.deregister(this, application.getSncpTransportFactory(), localServices, remoteServices); } } diff --git a/src/org/redkale/net/TransportFactory.java b/src/org/redkale/net/TransportFactory.java index d776b230c..3dd657ef9 100644 --- a/src/org/redkale/net/TransportFactory.java +++ b/src/org/redkale/net/TransportFactory.java @@ -266,23 +266,7 @@ public class TransportFactory { return true; } - public Transport loadSameGroupTransport(InetSocketAddress sncpAddress) { - return loadTransport(groupAddrs.get(sncpAddress), sncpAddress); - } - - public Transport[] loadDiffGroupTransports(InetSocketAddress sncpAddress, final Set diffGroups) { - if (diffGroups == null) return null; - final String sncpGroup = groupAddrs.get(sncpAddress); - final List transports = new ArrayList<>(); - for (String group : diffGroups) { - if (sncpGroup == null || !sncpGroup.equals(group)) { - transports.add(loadTransport(group, sncpAddress)); - } - } - return transports.toArray(new Transport[transports.size()]); - } - - public Transport loadRemoteTransport(InetSocketAddress sncpAddress, final Set groups) { + public Transport loadTransport(InetSocketAddress sncpAddress, final Set groups) { if (groups == null) return null; Set addresses = new HashSet<>(); TransportGroupInfo info = null; @@ -296,13 +280,6 @@ public class TransportFactory { return new Transport(groups.stream().sorted().collect(Collectors.joining(";")), info.protocol, info.subprotocol, this, this.bufferPool, this.channelGroup, this.sslContext, sncpAddress, addresses, this.strategy); } - private Transport loadTransport(final String groupName, InetSocketAddress sncpAddress) { - if (groupName == null) return null; - TransportGroupInfo info = groupInfos.get(groupName); - if (info == null) return null; - return new Transport(groupName, info.protocol, info.subprotocol, this, this.bufferPool, this.channelGroup, this.sslContext, sncpAddress, info.addresses, this.strategy); - } - public ExecutorService getExecutor() { return executor; } diff --git a/src/org/redkale/net/sncp/Sncp.java b/src/org/redkale/net/sncp/Sncp.java index 90c0baab3..a3ea643a2 100644 --- a/src/org/redkale/net/sncp/Sncp.java +++ b/src/org/redkale/net/sncp/Sncp.java @@ -547,7 +547,7 @@ public abstract class Sncp { T rs = (T) newClazz.getDeclaredConstructor().newInstance(); SncpClient client = new SncpClient(name, serviceTypeOrImplClass, rs, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress); client.setRemoteGroups(groups); - client.setRemoteGroupTransport(transportFactory.loadRemoteTransport(clientAddress, groups)); + client.setRemoteGroupTransport(transportFactory.loadTransport(clientAddress, groups)); Field c = newClazz.getDeclaredField(FIELDPREFIX + "_client"); c.setAccessible(true); c.set(rs, client); @@ -728,7 +728,7 @@ public abstract class Sncp { T rs = (T) newClazz.getDeclaredConstructor().newInstance(); SncpClient client = new SncpClient(name, serviceTypeOrImplClass, rs, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress); client.setRemoteGroups(groups); - client.setRemoteGroupTransport(transportFactory.loadRemoteTransport(clientAddress, groups)); + client.setRemoteGroupTransport(transportFactory.loadTransport(clientAddress, groups)); { Field c = newClazz.getDeclaredField(FIELDPREFIX + "_client"); c.setAccessible(true); diff --git a/src/org/redkale/net/sncp/SncpClient.java b/src/org/redkale/net/sncp/SncpClient.java index 8188d71c0..70f0af85f 100644 --- a/src/org/redkale/net/sncp/SncpClient.java +++ b/src/org/redkale/net/sncp/SncpClient.java @@ -64,10 +64,10 @@ public final class SncpClient { @Resource protected BsonConvert bsonConvert; - //远程模式 + //远程模式, 可能为null protected Set remoteGroups; - //远程模式 + //远程模式, 可能为null protected Transport remoteGroupTransport; public SncpClient(final String serviceName, final Class serviceTypeOrImplClass, final T service, final TransportFactory factory, diff --git a/src/org/redkale/net/sncp/SncpServer.java b/src/org/redkale/net/sncp/SncpServer.java index 619e3c7c1..45c70f469 100644 --- a/src/org/redkale/net/sncp/SncpServer.java +++ b/src/org/redkale/net/sncp/SncpServer.java @@ -29,6 +29,9 @@ public class SncpServer extends Server getSncpServlets() { return this.prepare.getServlets(); }