This commit is contained in:
Redkale
2020-05-16 22:12:45 +08:00
parent 8440b58d6c
commit a1e37643d0
8 changed files with 36 additions and 46 deletions

View File

@@ -59,7 +59,6 @@
一个group节点对应一个 Transport 对象。
name: 服务组ID长度不能超过11个字节. 默认为空字符串。 注意: name不能包含$符号。
protocol 值范围UDP TCP 默认TCP
subprotocol: 子协议,预留字段。默认值为空
注意: 一个node只能所属一个group。只要存在protocol=SNCP的Server节点信息 就必须有group节点信息。
-->
<group name="" protocol="TCP">

View File

@@ -572,7 +572,7 @@ public final class Application {
if (!"TCP".equalsIgnoreCase(protocol) && !"UDP".equalsIgnoreCase(protocol)) {
throw new RuntimeException("Not supported Transport Protocol " + conf.getValue("protocol"));
}
TransportGroupInfo ginfo = new TransportGroupInfo(group, protocol, conf.getValue("subprotocol", ""), new LinkedHashSet<>());
TransportGroupInfo ginfo = new TransportGroupInfo(group, protocol, new LinkedHashSet<>());
for (AnyValue node : conf.getAnyValues("node")) {
final InetSocketAddress addr = new InetSocketAddress(node.getValue("addr"), node.getIntValue("port"));
ginfo.putAddress(addr);

View File

@@ -74,14 +74,13 @@ public abstract class ClusterAgent {
register(ns, transportFactory, service);
}
Server server = ns.getServer();
String subprotocol = server instanceof SncpServer ? ((SncpServer) server).getSubprotocol() : "TCP";
String netprotocol = server instanceof SncpServer ? ((SncpServer) server).getNetprotocol() : Transport.DEFAULT_PROTOCOL;
//远程模式加载IP列表, 只能是SNCP协议
for (Service service : remoteServices) {
if (!Sncp.isSncpDyn(service)) continue;
List<InetSocketAddress> addrs = queryAddress(ns, service);
if (addrs != null && !addrs.isEmpty()) {
SncpClient client = Sncp.getSncpClient(service);
if (client != null) client.setRemoteGroupTransport(transportFactory.createTransport(Sncp.getResourceType(service).getName(), server.getProtocol(), subprotocol, ns.getSncpAddress(), addrs));
Sncp.updateTransport(service, transportFactory, Sncp.getResourceType(service).getName() + "-" + Sncp.getResourceName(service), netprotocol, ns.getSncpAddress(), null, addrs);
}
}
}

View File

@@ -38,8 +38,6 @@ public final class Transport {
protected final String name; //即<group>的name属性
protected final String subprotocol; //即<group>的subprotocol属性
protected final boolean tcp;
protected final String protocol;
@@ -58,18 +56,16 @@ public final class Transport {
//负载均衡策略
protected final TransportStrategy strategy;
protected Transport(String name, String subprotocol, TransportFactory factory, final ObjectPool<ByteBuffer> transportBufferPool,
protected Transport(String name, TransportFactory factory, final ObjectPool<ByteBuffer> transportBufferPool,
final AsynchronousChannelGroup transportChannelGroup, final SSLContext sslContext, final InetSocketAddress clientAddress,
final Collection<InetSocketAddress> addresses, final TransportStrategy strategy) {
this(name, DEFAULT_PROTOCOL, subprotocol, factory, transportBufferPool, transportChannelGroup, sslContext, clientAddress, addresses, strategy);
this(name, DEFAULT_PROTOCOL, factory, transportBufferPool, transportChannelGroup, sslContext, clientAddress, addresses, strategy);
}
protected Transport(String name, String protocol, String subprotocol,
final TransportFactory factory, final ObjectPool<ByteBuffer> transportBufferPool,
protected Transport(String name, String protocol, final TransportFactory factory, final ObjectPool<ByteBuffer> transportBufferPool,
final AsynchronousChannelGroup transportChannelGroup, final SSLContext sslContext, final InetSocketAddress clientAddress,
final Collection<InetSocketAddress> addresses, final TransportStrategy strategy) {
this.name = name;
this.subprotocol = subprotocol == null ? "" : subprotocol.trim();
this.protocol = protocol;
this.factory = factory;
factory.transportReferences.add(new WeakReference<>(this));
@@ -138,10 +134,6 @@ public final class Transport {
return name;
}
public String getSubprotocol() {
return subprotocol;
}
public void close() {
TransportNode[] nodes = this.transportNodes;
if (nodes == null) return;
@@ -198,6 +190,10 @@ public final class Transport {
return group;
}
public String getProtocol() {
return protocol;
}
public boolean isTCP() {
return tcp;
}

View File

@@ -203,16 +203,11 @@ public class TransportFactory {
}
public Transport createTransportTCP(String name, final InetSocketAddress clientAddress, final Collection<InetSocketAddress> addresses) {
return new Transport(name, "TCP", "", this, this.bufferPool, this.channelGroup, this.sslContext, clientAddress, addresses, strategy);
return new Transport(name, "TCP", this, this.bufferPool, this.channelGroup, this.sslContext, clientAddress, addresses, strategy);
}
public Transport createTransport(String name, String protocol, final InetSocketAddress clientAddress, final Collection<InetSocketAddress> addresses) {
return new Transport(name, protocol, "", this, this.bufferPool, this.channelGroup, this.sslContext, clientAddress, addresses, strategy);
}
public Transport createTransport(String name, String protocol, String subprotocol,
final InetSocketAddress clientAddress, final Collection<InetSocketAddress> addresses) {
return new Transport(name, protocol, subprotocol, this, this.bufferPool, this.channelGroup, this.sslContext, clientAddress, addresses, strategy);
return new Transport(name, protocol, this, this.bufferPool, this.channelGroup, this.sslContext, clientAddress, addresses, strategy);
}
public String findGroupName(InetSocketAddress addr) {
@@ -251,7 +246,6 @@ public class TransportFactory {
if (!checkName(info.name)) throw new RuntimeException("Transport.group.name only 0-9 a-z A-Z _ cannot begin 0-9");
TransportGroupInfo old = groupInfos.get(info.name);
if (old != null && !old.protocol.equals(info.protocol)) throw new RuntimeException("Transport.group.name repeat but protocol is different");
if (old != null && !old.subprotocol.equals(info.subprotocol)) throw new RuntimeException("Transport.group.name repeat but subprotocol is different");
for (InetSocketAddress addr : info.addresses) {
if (!groupAddrs.getOrDefault(addr, info.name).equals(info.name)) throw new RuntimeException(addr + " repeat but different group.name");
}
@@ -277,7 +271,7 @@ public class TransportFactory {
}
if (info == null) info = new TransportGroupInfo("TCP");
if (sncpAddress != null) addresses.remove(sncpAddress);
return new Transport(groups.stream().sorted().collect(Collectors.joining(";")), info.protocol, info.subprotocol, this, this.bufferPool, this.channelGroup, this.sslContext, sncpAddress, addresses, this.strategy);
return new Transport(groups.stream().sorted().collect(Collectors.joining(";")), info.protocol, this, this.bufferPool, this.channelGroup, this.sslContext, sncpAddress, addresses, this.strategy);
}
public ExecutorService getExecutor() {

View File

@@ -24,30 +24,27 @@ public class TransportGroupInfo {
protected String protocol; //协议 取值范围: TCP、UDP
protected String subprotocol; //子协议,预留使用
protected Set<InetSocketAddress> addresses; //地址列表, 对应 resources-&#62;group-&#62;node节点信息
public TransportGroupInfo() {
}
public TransportGroupInfo(String name, InetSocketAddress... addrs) {
this(name, "TCP", "", Utility.ofSet(addrs));
this(name, "TCP", Utility.ofSet(addrs));
}
public TransportGroupInfo(String name, Set<InetSocketAddress> addrs) {
this(name, "TCP", "", addrs);
this(name, "TCP", addrs);
}
public TransportGroupInfo(String name, String protocol, String subprotocol, InetSocketAddress... addrs) {
this(name, protocol, subprotocol, Utility.ofSet(addrs));
public TransportGroupInfo(String name, String protocol, InetSocketAddress... addrs) {
this(name, protocol, Utility.ofSet(addrs));
}
public TransportGroupInfo(String name, String protocol, String subprotocol, Set<InetSocketAddress> addrs) {
public TransportGroupInfo(String name, String protocol, Set<InetSocketAddress> addrs) {
Objects.requireNonNull(name, "Transport.group.name can not null");
this.name = name;
this.protocol = protocol == null ? "TCP" : protocol;
this.subprotocol = subprotocol == null ? "" : subprotocol;
this.addresses = addrs;
}
@@ -68,15 +65,6 @@ public class TransportGroupInfo {
this.protocol = protocol == null ? "TCP" : protocol;
}
public String getSubprotocol() {
return subprotocol;
}
public void setSubprotocol(String subprotocol) {
this.subprotocol = subprotocol == null ? "" : subprotocol;
this.subprotocol = subprotocol;
}
public Set<InetSocketAddress> getAddresses() {
return addresses;
}

View File

@@ -139,6 +139,20 @@ public abstract class Sncp {
}
}
public static boolean updateTransport(Service service,
final TransportFactory transportFactory, String name, String protocol, InetSocketAddress clientAddress,
final Set<String> groups, final Collection<InetSocketAddress> addresses) {
if (!isSncpDyn(service)) return false;
SncpClient client = getSncpClient(service);
client.setRemoteGroups(groups);
if (client.getRemoteGroupTransport() != null) {
client.getRemoteGroupTransport().updateRemoteAddresses(addresses);
} else {
client.setRemoteGroupTransport(transportFactory.createTransport(name, protocol, clientAddress, addresses));
}
return true;
}
static void checkAsyncModifier(Class param, Method method) {
if (param == CompletionHandler.class) return;
if (Modifier.isFinal(param.getModifiers())) {

View File

@@ -30,7 +30,7 @@ public class SncpServer extends Server<DLong, SncpContext, SncpRequest, SncpResp
private final AtomicInteger maxNameLength = new AtomicInteger();
//协议层协议名
protected static final String subprotocol = "TCP";
protected static final String netprotocol = "TCP";
public SncpServer() {
this(System.currentTimeMillis(), ResourceFactory.root());
@@ -41,7 +41,7 @@ public class SncpServer extends Server<DLong, SncpContext, SncpRequest, SncpResp
}
public SncpServer(long serverStartTime, ResourceFactory resourceFactory) {
super(serverStartTime, subprotocol, resourceFactory, new SncpPrepareServlet());
super(serverStartTime, netprotocol, resourceFactory, new SncpPrepareServlet());
}
@Override
@@ -49,8 +49,8 @@ public class SncpServer extends Server<DLong, SncpContext, SncpRequest, SncpResp
super.init(config);
}
public String getSubprotocol() {
return subprotocol;
public String getNetprotocol() {
return netprotocol;
}
public List<SncpServlet> getSncpServlets() {