This commit is contained in:
Redkale
2016-08-26 06:37:47 +08:00
parent 54462e8c26
commit 554932201b
3 changed files with 25 additions and 9 deletions

View File

@@ -67,6 +67,8 @@ public final class Application {
final Map<String, Set<InetSocketAddress>> globalGroups = new HashMap<>(); final Map<String, Set<InetSocketAddress>> globalGroups = new HashMap<>();
final Map<String, String> globalGroupKinds = new HashMap<>();
final Map<String, String> globalGroupProtocols = new HashMap<>(); final Map<String, String> globalGroupProtocols = new HashMap<>();
final InetAddress localAddress; final InetAddress localAddress;
@@ -335,6 +337,7 @@ public final class Application {
if (addrs == null) { if (addrs == null) {
addrs = new LinkedHashSet<>(); addrs = new LinkedHashSet<>();
globalGroupProtocols.put(group, protocol); globalGroupProtocols.put(group, protocol);
globalGroupKinds.put(group, conf.getValue("kind", ""));
globalGroups.put(group, addrs); globalGroups.put(group, addrs);
} }
for (AnyValue node : conf.getAnyValues("node")) { for (AnyValue node : conf.getAnyValues("node")) {
@@ -577,6 +580,11 @@ public final class Application {
return globalGroupProtocols.get(group); return globalGroupProtocols.get(group);
} }
String findGroupKind(String group) {
if (group == null) return null;
return globalGroupKinds.get(group);
}
Set<InetSocketAddress> findGlobalGroup(String group) { Set<InetSocketAddress> findGlobalGroup(String group) {
if (group == null) return null; if (group == null) return null;
Set<InetSocketAddress> set = globalGroups.get(group); Set<InetSocketAddress> set = globalGroups.get(group);

View File

@@ -53,7 +53,7 @@ public abstract class NodeServer {
//日志是否为FINEST级别 //日志是否为FINEST级别
protected final boolean finest; protected final boolean finest;
//进程主类 //进程主类
protected final Application application; protected final Application application;
@@ -403,7 +403,7 @@ public abstract class NodeServer {
transports.forEach(t -> addrs.addAll(Arrays.asList(t.getRemoteAddresses()))); transports.forEach(t -> addrs.addAll(Arrays.asList(t.getRemoteAddresses())));
Transport first = transports.get(0); Transport first = transports.get(0);
Transport newTransport = new Transport(groupid, application.findGroupProtocol(first.getName()), application.getWatchFactory(), Transport newTransport = new Transport(groupid, application.findGroupProtocol(first.getName()), application.getWatchFactory(),
application.transportBufferPool, application.transportChannelGroup, this.sncpAddress, addrs); application.findGroupKind(first.getName()), application.transportBufferPool, application.transportChannelGroup, this.sncpAddress, addrs);
synchronized (application.resourceFactory) { synchronized (application.resourceFactory) {
transport = application.resourceFactory.find(groupid, Transport.class); transport = application.resourceFactory.find(groupid, Transport.class);
if (transport == null) { if (transport == null) {
@@ -428,7 +428,7 @@ public abstract class NodeServer {
Set<InetSocketAddress> addrs = application.findGlobalGroup(group); Set<InetSocketAddress> addrs = application.findGlobalGroup(group);
if (addrs == null) throw new RuntimeException("Not found <group> = " + group + " on <resources> "); if (addrs == null) throw new RuntimeException("Not found <group> = " + group + " on <resources> ");
transport = new Transport(group, application.findGroupProtocol(group), application.getWatchFactory(), transport = new Transport(group, application.findGroupProtocol(group), application.getWatchFactory(),
application.transportBufferPool, application.transportChannelGroup, this.sncpAddress, addrs); application.findGroupKind(group), application.transportBufferPool, application.transportChannelGroup, this.sncpAddress, addrs);
application.resourceFactory.register(group, transport); application.resourceFactory.register(group, transport);
} }
return transport; return transport;

View File

@@ -44,6 +44,8 @@ public final class Transport {
protected final String name; //即<group>的name属性 protected final String name; //即<group>的name属性
protected final String kind; //即<group>的kind属性
protected final boolean tcp; protected final boolean tcp;
protected final String protocol; protected final String protocol;
@@ -54,21 +56,22 @@ public final class Transport {
protected final InetSocketAddress clientAddress; protected final InetSocketAddress clientAddress;
protected InetSocketAddress[] remoteAddres = new InetSocketAddress[0]; protected InetSocketAddress[] remoteAddres = new InetSocketAddress[0];
protected final ObjectPool<ByteBuffer> bufferPool; protected final ObjectPool<ByteBuffer> bufferPool;
protected final ConcurrentHashMap<SocketAddress, BlockingQueue<AsyncConnection>> connPool = new ConcurrentHashMap<>(); protected final ConcurrentHashMap<SocketAddress, BlockingQueue<AsyncConnection>> connPool = new ConcurrentHashMap<>();
public Transport(String name, WatchFactory watch, final ObjectPool<ByteBuffer> transportBufferPool, public Transport(String name, WatchFactory watch, String kind, final ObjectPool<ByteBuffer> transportBufferPool,
final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress, final Collection<InetSocketAddress> addresses) { final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress, final Collection<InetSocketAddress> addresses) {
this(name, DEFAULT_PROTOCOL, watch, transportBufferPool, transportChannelGroup, clientAddress, addresses); this(name, DEFAULT_PROTOCOL, watch, kind, transportBufferPool, transportChannelGroup, clientAddress, addresses);
} }
public Transport(String name, String protocol, WatchFactory watch, final ObjectPool<ByteBuffer> transportBufferPool, public Transport(String name, String protocol, WatchFactory watch, String kind, final ObjectPool<ByteBuffer> transportBufferPool,
final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress, final Collection<InetSocketAddress> addresses) { final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress, final Collection<InetSocketAddress> addresses) {
this.name = name; this.name = name;
this.watch = watch; this.watch = watch;
this.kind = kind == null ? "" : kind.trim();
this.protocol = protocol; this.protocol = protocol;
this.tcp = "TCP".equalsIgnoreCase(protocol); this.tcp = "TCP".equalsIgnoreCase(protocol);
this.group = transportChannelGroup; this.group = transportChannelGroup;
@@ -87,6 +90,7 @@ public final class Transport {
//必须按字母排列顺序确保相同内容的transport列表组合的name相同而不会因为list的顺序不同产生不同的name //必须按字母排列顺序确保相同内容的transport列表组合的name相同而不会因为list的顺序不同产生不同的name
this.name = tmpgroup.stream().sorted().collect(Collectors.joining(";")); this.name = tmpgroup.stream().sorted().collect(Collectors.joining(";"));
this.watch = first.watch; this.watch = first.watch;
this.kind = first.kind;
this.protocol = first.protocol; this.protocol = first.protocol;
this.tcp = "TCP".equalsIgnoreCase(first.protocol); this.tcp = "TCP".equalsIgnoreCase(first.protocol);
this.group = first.group; this.group = first.group;
@@ -114,6 +118,10 @@ public final class Transport {
return name; return name;
} }
public String getKind() {
return kind;
}
public void close() { public void close() {
connPool.forEach((k, v) -> v.forEach(c -> c.dispose())); connPool.forEach((k, v) -> v.forEach(c -> c.dispose()));
} }