This commit is contained in:
@@ -8,7 +8,8 @@ package org.redkale.boot;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.*;
|
||||
import org.redkale.convert.json.JsonConvert;
|
||||
import org.redkale.net.sncp.Sncp;
|
||||
import org.redkale.net.*;
|
||||
import org.redkale.net.sncp.*;
|
||||
import org.redkale.service.Service;
|
||||
import org.redkale.util.*;
|
||||
|
||||
@@ -66,30 +67,42 @@ public abstract class ClusterAgent {
|
||||
}
|
||||
|
||||
//注册服务
|
||||
public void register(NodeServer ns, Set<Service> localServices, Set<Service> remoteServices) {
|
||||
public void register(NodeServer ns, TransportFactory transportFactory, Set<Service> localServices, Set<Service> remoteServices) {
|
||||
if (localServices.isEmpty()) return;
|
||||
for (Service service : localServices) { //注册本地模式
|
||||
register(ns, service);
|
||||
//注册本地模式
|
||||
for (Service service : localServices) {
|
||||
register(ns, transportFactory, service);
|
||||
}
|
||||
Server server = ns.getServer();
|
||||
String subprotocol = server instanceof SncpServer ? ((SncpServer) server).getSubprotocol() : "TCP";
|
||||
//远程模式加载IP列表, 只能是SNCP协议
|
||||
for (Service service : remoteServices) {
|
||||
if (!Sncp.isSncpDyn(service)) continue;
|
||||
List<InetSocketAddress> addrs = queryAddress(ns, service);
|
||||
if (addrs != null && !addrs.isEmpty()) {
|
||||
SncpClient client = Sncp.getSncpClient(service);
|
||||
if (client != null) client.setRemoteGroupTransport(transportFactory.createTransport(Sncp.getResourceType(service).getName(), server.getProtocol(), subprotocol, ns.getSncpAddress(), addrs));
|
||||
}
|
||||
}
|
||||
//远程模式不注册
|
||||
}
|
||||
|
||||
//注销服务
|
||||
public void deregister(NodeServer ns, Set<Service> localServices, Set<Service> remoteServices) {
|
||||
for (Service service : localServices) {//注销本地模式
|
||||
deregister(ns, service);
|
||||
public void deregister(NodeServer ns, TransportFactory transportFactory, Set<Service> localServices, Set<Service> remoteServices) {
|
||||
//注销本地模式
|
||||
for (Service service : localServices) {
|
||||
deregister(ns, transportFactory, service);
|
||||
}
|
||||
//远程模式不注册
|
||||
}
|
||||
|
||||
//获取远程服务的可用ip列表
|
||||
public abstract List<InetSocketAddress> queryAddress(NodeServer server, Service service);
|
||||
public abstract List<InetSocketAddress> queryAddress(NodeServer ns, Service service);
|
||||
|
||||
//注册服务
|
||||
public abstract void register(NodeServer server, Service service);
|
||||
public abstract void register(NodeServer ns, TransportFactory transportFactory, Service service);
|
||||
|
||||
//注销服务
|
||||
public abstract void deregister(NodeServer server, Service service);
|
||||
public abstract void deregister(NodeServer ns, TransportFactory transportFactory, Service service);
|
||||
|
||||
//格式: protocol:classtype-resourcename
|
||||
public String generateServiceType(NodeServer ns, Service service) {
|
||||
|
||||
@@ -520,7 +520,7 @@ public abstract class NodeServer {
|
||||
for (ClusterAgent cluster : clusters) {
|
||||
if (!cluster.containsProtocol(server.getProtocol())) continue;
|
||||
if (!cluster.containsPort(server.getSocketAddress().getPort())) continue;
|
||||
cluster.register(this, localServices, remoteServices);
|
||||
cluster.register(this, application.getSncpTransportFactory(), localServices, remoteServices);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -531,7 +531,7 @@ public abstract class NodeServer {
|
||||
for (ClusterAgent cluster : clusters) {
|
||||
if (!cluster.containsProtocol(server.getProtocol())) continue;
|
||||
if (!cluster.containsPort(server.getSocketAddress().getPort())) continue;
|
||||
cluster.deregister(this, localServices, remoteServices);
|
||||
cluster.deregister(this, application.getSncpTransportFactory(), localServices, remoteServices);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -266,23 +266,7 @@ public class TransportFactory {
|
||||
return true;
|
||||
}
|
||||
|
||||
public Transport loadSameGroupTransport(InetSocketAddress sncpAddress) {
|
||||
return loadTransport(groupAddrs.get(sncpAddress), sncpAddress);
|
||||
}
|
||||
|
||||
public Transport[] loadDiffGroupTransports(InetSocketAddress sncpAddress, final Set<String> diffGroups) {
|
||||
if (diffGroups == null) return null;
|
||||
final String sncpGroup = groupAddrs.get(sncpAddress);
|
||||
final List<Transport> transports = new ArrayList<>();
|
||||
for (String group : diffGroups) {
|
||||
if (sncpGroup == null || !sncpGroup.equals(group)) {
|
||||
transports.add(loadTransport(group, sncpAddress));
|
||||
}
|
||||
}
|
||||
return transports.toArray(new Transport[transports.size()]);
|
||||
}
|
||||
|
||||
public Transport loadRemoteTransport(InetSocketAddress sncpAddress, final Set<String> groups) {
|
||||
public Transport loadTransport(InetSocketAddress sncpAddress, final Set<String> groups) {
|
||||
if (groups == null) return null;
|
||||
Set<InetSocketAddress> addresses = new HashSet<>();
|
||||
TransportGroupInfo info = null;
|
||||
@@ -296,13 +280,6 @@ public class TransportFactory {
|
||||
return new Transport(groups.stream().sorted().collect(Collectors.joining(";")), info.protocol, info.subprotocol, this, this.bufferPool, this.channelGroup, this.sslContext, sncpAddress, addresses, this.strategy);
|
||||
}
|
||||
|
||||
private Transport loadTransport(final String groupName, InetSocketAddress sncpAddress) {
|
||||
if (groupName == null) return null;
|
||||
TransportGroupInfo info = groupInfos.get(groupName);
|
||||
if (info == null) return null;
|
||||
return new Transport(groupName, info.protocol, info.subprotocol, this, this.bufferPool, this.channelGroup, this.sslContext, sncpAddress, info.addresses, this.strategy);
|
||||
}
|
||||
|
||||
public ExecutorService getExecutor() {
|
||||
return executor;
|
||||
}
|
||||
|
||||
@@ -547,7 +547,7 @@ public abstract class Sncp {
|
||||
T rs = (T) newClazz.getDeclaredConstructor().newInstance();
|
||||
SncpClient client = new SncpClient(name, serviceTypeOrImplClass, rs, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress);
|
||||
client.setRemoteGroups(groups);
|
||||
client.setRemoteGroupTransport(transportFactory.loadRemoteTransport(clientAddress, groups));
|
||||
client.setRemoteGroupTransport(transportFactory.loadTransport(clientAddress, groups));
|
||||
Field c = newClazz.getDeclaredField(FIELDPREFIX + "_client");
|
||||
c.setAccessible(true);
|
||||
c.set(rs, client);
|
||||
@@ -728,7 +728,7 @@ public abstract class Sncp {
|
||||
T rs = (T) newClazz.getDeclaredConstructor().newInstance();
|
||||
SncpClient client = new SncpClient(name, serviceTypeOrImplClass, rs, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress);
|
||||
client.setRemoteGroups(groups);
|
||||
client.setRemoteGroupTransport(transportFactory.loadRemoteTransport(clientAddress, groups));
|
||||
client.setRemoteGroupTransport(transportFactory.loadTransport(clientAddress, groups));
|
||||
{
|
||||
Field c = newClazz.getDeclaredField(FIELDPREFIX + "_client");
|
||||
c.setAccessible(true);
|
||||
|
||||
@@ -64,10 +64,10 @@ public final class SncpClient {
|
||||
@Resource
|
||||
protected BsonConvert bsonConvert;
|
||||
|
||||
//远程模式
|
||||
//远程模式, 可能为null
|
||||
protected Set<String> remoteGroups;
|
||||
|
||||
//远程模式
|
||||
//远程模式, 可能为null
|
||||
protected Transport remoteGroupTransport;
|
||||
|
||||
public <T extends Service> SncpClient(final String serviceName, final Class<T> serviceTypeOrImplClass, final T service, final TransportFactory factory,
|
||||
|
||||
@@ -29,6 +29,9 @@ public class SncpServer extends Server<DLong, SncpContext, SncpRequest, SncpResp
|
||||
|
||||
private final AtomicInteger maxNameLength = new AtomicInteger();
|
||||
|
||||
//协议层协议名
|
||||
protected static final String subprotocol = "TCP";
|
||||
|
||||
public SncpServer() {
|
||||
this(System.currentTimeMillis(), ResourceFactory.root());
|
||||
}
|
||||
@@ -38,7 +41,7 @@ public class SncpServer extends Server<DLong, SncpContext, SncpRequest, SncpResp
|
||||
}
|
||||
|
||||
public SncpServer(long serverStartTime, ResourceFactory resourceFactory) {
|
||||
super(serverStartTime, "TCP", resourceFactory, new SncpPrepareServlet());
|
||||
super(serverStartTime, subprotocol, resourceFactory, new SncpPrepareServlet());
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -46,6 +49,10 @@ public class SncpServer extends Server<DLong, SncpContext, SncpRequest, SncpResp
|
||||
super.init(config);
|
||||
}
|
||||
|
||||
public String getSubprotocol() {
|
||||
return subprotocol;
|
||||
}
|
||||
|
||||
public List<SncpServlet> getSncpServlets() {
|
||||
return this.prepare.getServlets();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user