This commit is contained in:
Redkale
2016-08-29 13:50:05 +08:00
parent 2c5876ba3e
commit 1ea816a60e
3 changed files with 91 additions and 32 deletions

View File

@@ -65,11 +65,7 @@ public final class Application {
final Map<InetSocketAddress, String> globalNodes = new HashMap<>();
final Map<String, Set<InetSocketAddress>> globalGroups = new HashMap<>();
final Map<String, String> globalGroupKinds = new HashMap<>();
final Map<String, String> globalGroupProtocols = new HashMap<>();
final Map<String, GroupInfo> globalGroups = new HashMap<>();
final InetAddress localAddress;
@@ -79,8 +75,6 @@ public final class Application {
final List<NodeServer> servers = new CopyOnWriteArrayList<>();
CountDownLatch servicecdl; //会出现两次赋值
final ObjectPool<ByteBuffer> transportBufferPool;
final ExecutorService transportExecutor;
@@ -89,6 +83,8 @@ public final class Application {
final ResourceFactory resourceFactory = ResourceFactory.root();
CountDownLatch servicecdl; //会出现两次赋值
//--------------------------------------------------------------------------------------------
private final boolean singletonrun;
@@ -333,16 +329,14 @@ public final class Application {
if (!"TCP".equalsIgnoreCase(protocol) && !"UDP".equalsIgnoreCase(protocol)) {
throw new RuntimeException("Not supported Transport Protocol " + conf.getValue("protocol"));
}
Set<InetSocketAddress> addrs = globalGroups.get(group);
if (addrs == null) {
addrs = new LinkedHashSet<>();
globalGroupProtocols.put(group, protocol);
globalGroupKinds.put(group, conf.getValue("kind", ""));
globalGroups.put(group, addrs);
GroupInfo ginfo = globalGroups.get(group);
if (ginfo == null) {
ginfo = new GroupInfo(group, protocol, conf.getValue("kind", ""), new LinkedHashSet<>());
globalGroups.put(group, ginfo);
}
for (AnyValue node : conf.getAnyValues("node")) {
final InetSocketAddress addr = new InetSocketAddress(node.getValue("addr"), node.getIntValue("port"));
addrs.add(addr);
ginfo.addrs.add(addr);
String oldgroup = globalNodes.get(addr);
if (oldgroup != null) throw new RuntimeException(addr + " had one more group " + (globalNodes.get(addr)));
globalNodes.put(addr, group);
@@ -575,20 +569,9 @@ public final class Application {
return null;
}
String findGroupProtocol(String group) {
GroupInfo findGroupInfo(String group) {
if (group == null) return null;
return globalGroupProtocols.get(group);
}
String findGroupKind(String group) {
if (group == null) return null;
return globalGroupKinds.get(group);
}
Set<InetSocketAddress> findGlobalGroup(String group) {
if (group == null) return null;
Set<InetSocketAddress> set = globalGroups.get(group);
return set == null ? null : new LinkedHashSet<>(set);
return globalGroups.get(group);
}
private void shutdown() throws Exception {

View File

@@ -0,0 +1,74 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.boot;
import java.net.InetSocketAddress;
import java.util.*;
/**
*
* <p>
* 详情见: http://redkale.org
*
* @author zhangjx
*/
public class GroupInfo {
protected String name;
protected String protocol;
protected String kind;
protected Set<InetSocketAddress> addrs;
public GroupInfo() {
}
public GroupInfo(String name, String protocol, String kind, Set<InetSocketAddress> addrs) {
this.name = name;
this.protocol = protocol;
this.kind = kind;
this.addrs = addrs;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getProtocol() {
return protocol;
}
public void setProtocol(String protocol) {
this.protocol = protocol;
}
public String getKind() {
return kind;
}
public void setKind(String kind) {
this.kind = kind;
}
public Set<InetSocketAddress> getAddrs() {
return addrs;
}
public Set<InetSocketAddress> copyAddrs() {
return addrs == null ? null : new LinkedHashSet<>(addrs);
}
public void setAddrs(Set<InetSocketAddress> addrs) {
this.addrs = addrs;
}
}

View File

@@ -402,8 +402,9 @@ public abstract class NodeServer {
Set<InetSocketAddress> addrs = new HashSet();
transports.forEach(t -> addrs.addAll(Arrays.asList(t.getRemoteAddresses())));
Transport first = transports.get(0);
Transport newTransport = new Transport(groupid, application.findGroupProtocol(first.getName()), application.getWatchFactory(),
application.findGroupKind(first.getName()), application.transportBufferPool, application.transportChannelGroup, this.sncpAddress, addrs);
GroupInfo ginfo = application.findGroupInfo(first.getName());
Transport newTransport = new Transport(groupid, ginfo.getProtocol(), application.getWatchFactory(),
ginfo.getKind(), application.transportBufferPool, application.transportChannelGroup, this.sncpAddress, addrs);
synchronized (application.resourceFactory) {
transport = application.resourceFactory.find(groupid, Transport.class);
if (transport == null) {
@@ -425,10 +426,11 @@ public abstract class NodeServer {
}
return transport;
}
Set<InetSocketAddress> addrs = application.findGlobalGroup(group);
GroupInfo ginfo = application.findGroupInfo(group);
Set<InetSocketAddress> addrs = ginfo.copyAddrs();
if (addrs == null) throw new RuntimeException("Not found <group> = " + group + " on <resources> ");
transport = new Transport(group, application.findGroupProtocol(group), application.getWatchFactory(),
application.findGroupKind(group), application.transportBufferPool, application.transportChannelGroup, this.sncpAddress, addrs);
transport = new Transport(group, ginfo.getProtocol(), application.getWatchFactory(),
ginfo.getKind(), application.transportBufferPool, application.transportChannelGroup, this.sncpAddress, addrs);
application.resourceFactory.register(group, transport);
}
return transport;