This commit is contained in:
Redkale
2017-05-29 14:03:11 +08:00
parent 0da74be2fa
commit 06773ccdc0
9 changed files with 599 additions and 682 deletions

View File

@@ -14,7 +14,6 @@ import java.util.*;
import java.util.concurrent.*;
import java.util.function.*;
import java.util.logging.*;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import javax.persistence.Transient;
import static org.redkale.boot.Application.*;
@@ -129,7 +128,7 @@ public abstract class NodeServer {
if (isSNCP()) { // SNCP协议
String host = this.serverConf.getValue("host", "0.0.0.0").replace("0.0.0.0", "");
this.sncpAddress = new InetSocketAddress(host.isEmpty() ? application.localAddress.getHostAddress() : host, this.serverConf.getIntValue("port"));
this.sncpGroup = application.globalNodes.get(this.sncpAddress);
this.sncpGroup = application.transportFactory.findGroupName(this.sncpAddress);
//单向SNCP服务不需要对等group
//if (this.sncpGroup == null) throw new RuntimeException("Server (" + String.valueOf(config).replaceAll("\\s+", " ") + ") not found <group> info");
}
@@ -189,6 +188,7 @@ public abstract class NodeServer {
final NodeServer self = this;
//---------------------------------------------------------------------------------------------
final ResourceFactory appResFactory = application.getResourceFactory();
final TransportFactory appTranFactory = application.getTransportFactory();
final AnyValue resources = application.config.getAnyValue("resources");
final Map<String, AnyValue> cacheResource = new HashMap<>();
//final Map<String, AnyValue> dataResources = new HashMap<>();
@@ -245,13 +245,13 @@ public abstract class NodeServer {
appResFactory.register(resourceName, DataSource.class, source);
SncpClient client = Sncp.getSncpClient((Service) src);
Transport sameGroupTransport = Sncp.getSameGroupTransport((Service) src);
List<Transport> diffGroupTransports = Arrays.asList(Sncp.getDiffGroupTransports((Service) src));
final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress();
if ((src instanceof DataSource) && sncpAddr != null && resourceFactory.find(resourceName, DataCacheListener.class) == null) { //只有DataSourceService 才能赋值 DataCacheListener
final NodeSncpServer sncpServer = application.findNodeSncpServer(sncpAddr);
Set<String> gs = application.findSncpGroups(sameGroupTransport, diffGroupTransports);
Service cacheListenerService = Sncp.createLocalService(resourceName, getExecutor(), appResFactory, DataCacheListenerService.class, sncpAddr, sncpServer.getSncpGroup(), gs, Sncp.getConf((Service) src), sameGroupTransport, diffGroupTransports);
final Set<String> groups = new HashSet<>();
if (client != null && client.getSameGroup() != null) groups.add(client.getSameGroup());
if (client != null && client.getDiffGroups() != null) groups.addAll(client.getDiffGroups());
Service cacheListenerService = Sncp.createLocalService(resourceName, DataCacheListenerService.class, appResFactory, appTranFactory, sncpAddr, groups, Sncp.getConf((Service) src));
appResFactory.register(resourceName, DataCacheListener.class, cacheListenerService);
localServices.add(cacheListenerService);
sncpServer.consumerAccept(cacheListenerService);
@@ -275,15 +275,13 @@ public abstract class NodeServer {
if ((src instanceof Service) && Sncp.isRemote((Service) src)) return; //远程模式不需要注入 CacheSource
final Service srcService = (Service) src;
SncpClient client = Sncp.getSncpClient(srcService);
Transport sameGroupTransport = Sncp.getSameGroupTransport(srcService);
Transport[] dts = Sncp.getDiffGroupTransports((Service) src);
List<Transport> diffGroupTransports = dts == null ? new ArrayList<>() : Arrays.asList(dts);
final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress();
final AnyValue sourceConf = cacheResource.get(resourceName);
final Class sourceType = sourceConf == null ? CacheMemorySource.class : Class.forName(sourceConf.getValue("type"));
@SuppressWarnings("unchecked")
final CacheSource source = (CacheSource) Sncp.createLocalService(resourceName, getExecutor(), appResFactory, (Class<? extends Service>) sourceType,
sncpAddr, Sncp.getSncpGroup(srcService), Sncp.getGroups(srcService), Sncp.getConf(srcService), sameGroupTransport, diffGroupTransports);
final Set<String> groups = new HashSet<>();
if (client != null && client.getSameGroup() != null) groups.add(client.getSameGroup());
if (client != null && client.getDiffGroups() != null) groups.addAll(client.getDiffGroups());
final CacheSource source = (CacheSource) Sncp.createLocalService(resourceName, sourceType, appResFactory, appTranFactory, sncpAddr, groups, Sncp.getConf(srcService));
Type genericType = field.getGenericType();
ParameterizedType pt = (genericType instanceof ParameterizedType) ? (ParameterizedType) genericType : null;
Type valType = pt == null ? null : pt.getActualTypeArguments()[1];
@@ -301,7 +299,6 @@ public abstract class NodeServer {
if ((src instanceof WebSocketNodeService) && sncpAddr != null) { //只有WebSocketNodeService的服务才需要给SNCP服务注入CacheMemorySource
NodeSncpServer sncpServer = application.findNodeSncpServer(sncpAddr);
Set<String> gs = application.findSncpGroups(sameGroupTransport, diffGroupTransports);
sncpServer.getSncpServer().addSncpServlet((Service) source);
//logger.info("[" + Thread.currentThread().getName() + "] Load Service " + source);
}
@@ -323,7 +320,8 @@ public abstract class NodeServer {
final String threadName = "[" + Thread.currentThread().getName() + "] ";
final Set<FilterEntry<? extends Service>> entrys = (Set) serviceFilter.getAllFilterEntrys();
ResourceFactory regFactory = isSNCP() ? application.getResourceFactory() : resourceFactory;
final ResourceFactory appResourceFactory = application.getResourceFactory();
final TransportFactory appTransportFactory = application.getTransportFactory();
for (FilterEntry<? extends Service> entry : entrys) { //service实现类
final Class<? extends Service> serviceImplClass = entry.getType();
if (Modifier.isFinal(serviceImplClass.getModifiers())) continue; //修饰final的类跳过
@@ -354,13 +352,12 @@ public abstract class NodeServer {
Service service;
boolean ws = src instanceof WebSocketServlet;
if (ws || localed) { //本地模式
service = Sncp.createLocalService(resourceName, getExecutor(), application.getResourceFactory(), serviceImplClass,
NodeServer.this.sncpAddress, NodeServer.this.sncpGroup, groups, entry.getProperty(), loadTransport(NodeServer.this.sncpGroup), loadTransports(groups));
service = Sncp.createLocalService(resourceName, serviceImplClass, appResourceFactory, appTransportFactory, NodeServer.this.sncpAddress, groups, entry.getProperty());
} else {
service = Sncp.createRemoteService(resourceName, getExecutor(), serviceImplClass, NodeServer.this.sncpAddress, null, groups, entry.getProperty(), loadTransport(groups));
service = Sncp.createRemoteService(resourceName, serviceImplClass, appTransportFactory, NodeServer.this.sncpAddress, groups, entry.getProperty());
}
if (SncpClient.parseMethod(serviceImplClass).isEmpty()) return; //class没有可用的方法 通常为BaseService
//final ServiceWrapper wrapper = new ServiceWrapper(serviceImplClass, service, resourceName, localed ? NodeServer.this.sncpGroup : null, groups, entry.getProperty());
final Class restype = Sncp.getResourceType(service);
if (rf.find(resourceName, restype) == null) {
regFactory.register(resourceName, restype, service);
@@ -447,62 +444,62 @@ public abstract class NodeServer {
maxClassNameLength = Math.max(maxClassNameLength, Sncp.getResourceType(y).getName().length() + 1);
}
protected List<Transport> loadTransports(final HashSet<String> groups) {
if (groups == null) return null;
final List<Transport> transports = new ArrayList<>();
for (String group : groups) {
if (this.sncpGroup == null || !this.sncpGroup.equals(group)) {
transports.add(loadTransport(group));
}
}
return transports;
}
protected Transport loadTransport(final HashSet<String> groups) {
if (groups == null || groups.isEmpty()) return null;
final String groupid = new ArrayList<>(groups).stream().sorted().collect(Collectors.joining(";")); //按字母排列顺序
Transport transport = application.resourceFactory.find(groupid, Transport.class);
if (transport != null) return transport;
final List<Transport> transports = new ArrayList<>();
for (String group : groups) {
transports.add(loadTransport(group));
}
Set<InetSocketAddress> addrs = new HashSet();
transports.forEach(t -> addrs.addAll(Arrays.asList(t.getRemoteAddresses())));
Transport first = transports.get(0);
GroupInfo ginfo = application.findGroupInfo(first.getName());
Transport newTransport = new Transport(groupid, ginfo.getProtocol(),
ginfo.getSubprotocol(), application.transportBufferPool, application.transportChannelGroup, this.sncpAddress, addrs);
synchronized (application.resourceFactory) {
transport = application.resourceFactory.find(groupid, Transport.class);
if (transport == null) {
transport = newTransport;
application.resourceFactory.register(groupid, transport);
}
}
return transport;
}
protected Transport loadTransport(final String group) {
if (group == null) return null;
Transport transport;
synchronized (application.resourceFactory) {
transport = application.resourceFactory.find(group, Transport.class);
if (transport != null) {
if (this.sncpAddress != null && !this.sncpAddress.equals(transport.getClientAddress())) {
throw new RuntimeException(transport + "repeat create on newClientAddress = " + this.sncpAddress + ", oldClientAddress = " + transport.getClientAddress());
}
return transport;
}
GroupInfo ginfo = application.findGroupInfo(group);
Set<InetSocketAddress> addrs = ginfo.copyAddrs();
if (addrs == null) throw new RuntimeException("Not found <group> = " + group + " on <resources> ");
transport = new Transport(group, ginfo.getProtocol(), ginfo.getSubprotocol(), application.transportBufferPool, application.transportChannelGroup, this.sncpAddress, addrs);
application.resourceFactory.register(group, transport);
}
return transport;
}
/*
* protected List<Transport> loadTransports(final HashSet<String> groups) {
* if (groups == null) return null;
* final List<Transport> transports = new ArrayList<>();
* for (String group : groups) {
* if (this.sncpGroup == null || !this.sncpGroup.equals(group)) {
* transports.add(loadTransport(group));
* }
* }
* return transports;
* }
* protected Transport loadTransport(final HashSet<String> groups) {
* if (groups == null || groups.isEmpty()) return null;
* final String groupid = new ArrayList<>(groups).stream().sorted().collect(Collectors.joining(";")); //按字母排列顺序
* Transport transport = application.resourceFactory.find(groupid, Transport.class);
* if (transport != null) return transport;
* final List<Transport> transports = new ArrayList<>();
* for (String group : groups) {
* transports.add(loadTransport(group));
* }
* Set<InetSocketAddress> addrs = new HashSet();
* transports.forEach(t -> addrs.addAll(Arrays.asList(t.getRemoteAddresses())));
* Transport first = transports.get(0);
* TransportGroupInfo ginfo = application.transportFactory.findGroupInfo(first.getName());
* Transport newTransport = new Transport(groupid, ginfo.getProtocol(),
* ginfo.getSubprotocol(), application.transportBufferPool, application.transportChannelGroup, this.sncpAddress, addrs);
* synchronized (application.resourceFactory) {
* transport = application.resourceFactory.find(groupid, Transport.class);
* if (transport == null) {
* transport = newTransport;
* application.resourceFactory.register(groupid, transport);
* }
* }
* return transport;
* }
*
* protected Transport loadTransport(final String group) {
* if (group == null) return null;
* Transport transport;
* synchronized (application.resourceFactory) {
* transport = application.resourceFactory.find(group, Transport.class);
* if (transport != null) {
* if (this.sncpAddress != null && !this.sncpAddress.equals(transport.getClientAddress())) {
* throw new RuntimeException(transport + "repeat create on newClientAddress = " + this.sncpAddress + ", oldClientAddress = " + transport.getClientAddress());
* }
* return transport;
* }
* TransportGroupInfo ginfo = application.findGroupInfo(group);
* Set<InetSocketAddress> addrs = ginfo.copyAddresses();
* if (addrs == null) throw new RuntimeException("Not found <group> = " + group + " on <resources> ");
* transport = new Transport(group, ginfo.getProtocol(), ginfo.getSubprotocol(), application.transportBufferPool, application.transportChannelGroup, this.sncpAddress, addrs);
* application.resourceFactory.register(group, transport);
* }
* return transport;
* }
*/
protected abstract ClassFilter<Filter> createFilterClassFilter();
protected abstract ClassFilter<Servlet> createServletClassFilter();