Transport 改版

This commit is contained in:
wentch
2016-01-12 15:12:40 +08:00
parent f0cc76a1d6
commit 585d5866e6
24 changed files with 491 additions and 411 deletions

View File

@@ -23,7 +23,6 @@ import java.util.logging.*;
import javax.annotation.*;
import javax.persistence.*;
import org.redkale.net.*;
import org.redkale.net.http.*;
import org.redkale.service.*;
import org.redkale.source.*;
import org.redkale.util.*;
@@ -57,27 +56,19 @@ public abstract class NodeServer {
private String sncpGroup = null; //当前Server的SNCP协议的组
private String nodeProtocol = Transport.DEFAULT_PROTOCOL;
private InetSocketAddress sncpAddress; //HttpServer中的sncpAddress 为所属group对应的SncpServer, 为null表示只是单节点没有分布式结构
protected Consumer<ServiceWrapper> consumer;
protected AnyValue nodeConf;
protected final HashSet<String> sncpDefaultGroups = new LinkedHashSet<>();
protected final List<Transport> sncpSameGroupTransports = new ArrayList<>();
protected final List<Transport> sncpDiffGroupTransports = new ArrayList<>();
protected AnyValue serverConf;
protected final Set<ServiceWrapper> localServiceWrappers = new LinkedHashSet<>();
protected final Set<ServiceWrapper> remoteServiceWrappers = new LinkedHashSet<>();
public NodeServer(Application application, ResourceFactory factory, Server server) {
public NodeServer(Application application, Server server) {
this.application = application;
this.factory = factory;
this.factory = application.getResourceFactory().createChild();
this.server = server;
this.logger = Logger.getLogger(this.getClass().getSimpleName());
this.fine = logger.isLoggable(Level.FINE);
@@ -115,16 +106,15 @@ public abstract class NodeServer {
}
public void init(AnyValue config) throws Exception {
this.nodeConf = config == null ? AnyValue.create() : config;
this.serverConf = config == null ? AnyValue.create() : config;
if (isSNCP()) { // SNCP协议
String host = this.nodeConf.getValue("host", "0.0.0.0").replace("0.0.0.0", "");
this.sncpAddress = new InetSocketAddress(host.isEmpty() ? application.localAddress.getHostAddress() : host, this.nodeConf.getIntValue("port"));
String host = this.serverConf.getValue("host", "0.0.0.0").replace("0.0.0.0", "");
this.sncpAddress = new InetSocketAddress(host.isEmpty() ? application.localAddress.getHostAddress() : host, this.serverConf.getIntValue("port"));
this.sncpGroup = application.globalNodes.get(this.sncpAddress);
if (this.sncpGroup == null) throw new RuntimeException("Server (" + String.valueOf(config).replaceAll("\\s+", " ") + ") not found <group> info");
if (server != null) this.nodeProtocol = server.getProtocol();
}
initGroup();
if (this.sncpAddress != null) this.factory.register(RESNAME_SERVER_ADDR, this.sncpAddress);
if (this.sncpAddress != null) this.factory.register(RESNAME_SERVER_ADDR, this.sncpAddress); //单点服务不会有 sncpAddress、sncpGroup
if (this.sncpGroup != null) this.factory.register(RESNAME_SERVER_GROUP, this.sncpGroup);
{
//设置root文件夹
@@ -137,8 +127,9 @@ public abstract class NodeServer {
Server.loadLib(logger, config.getValue("lib", "") + ";" + homepath + "/lib/*;" + homepath + "/classes");
if (server != null) server.init(config);
}
initResource();
//prepare();
initResource(); //给 DataSource、CacheSource 注册依赖注入时的监听回调事件。
ClassFilter<Servlet> servletFilter = createServletClassFilter();
ClassFilter<Service> serviceFilter = createServiceClassFilter();
long s = System.currentTimeMillis();
@@ -166,25 +157,23 @@ public abstract class NodeServer {
DataSource source = new DataDefaultSource(resourceName);
application.dataSources.add(source);
regFactory.register(resourceName, DataSource.class, source);
List<Transport> sameGroupTransports = sncpSameGroupTransports;
List<Transport> diffGroupTransports = sncpDiffGroupTransports;
Transport sameGroupTransport = null;
List<Transport> diffGroupTransports = null;
try {
Field ts = src.getClass().getDeclaredField("_sameGroupTransports");
Field ts = src.getClass().getDeclaredField("_sameGroupTransport");
ts.setAccessible(true);
Transport[] lts = (Transport[]) ts.get(src);
sameGroupTransports = Arrays.asList(lts);
sameGroupTransport = (Transport) ts.get(src);
ts = src.getClass().getDeclaredField("_diffGroupTransports");
ts.setAccessible(true);
lts = (Transport[]) ts.get(src);
diffGroupTransports = Arrays.asList(lts);
diffGroupTransports = Arrays.asList((Transport[]) ts.get(src));
} catch (Exception e) {
//src 不含 MultiRun 方法
}
if (factory.find(resourceName, DataCacheListener.class) == null) {
Service cacheListenerService = Sncp.createLocalService(resourceName, getExecutor(), DataCacheListenerService.class, this.sncpAddress, sncpDefaultGroups, sameGroupTransports, diffGroupTransports);
Service cacheListenerService = Sncp.createLocalService(resourceName, getExecutor(), DataCacheListenerService.class, this.sncpAddress, sameGroupTransport, diffGroupTransports);
regFactory.register(resourceName, DataCacheListener.class, cacheListenerService);
ServiceWrapper wrapper = new ServiceWrapper(DataCacheListenerService.class, cacheListenerService, resourceName, sncpGroup, sncpDefaultGroups, null);
ServiceWrapper wrapper = new ServiceWrapper(DataCacheListenerService.class, cacheListenerService, resourceName, sncpGroup, null, null);
localServiceWrappers.add(wrapper);
if (consumer != null) consumer.accept(wrapper);
rf.inject(cacheListenerService, self);
@@ -199,22 +188,20 @@ public abstract class NodeServer {
try {
if (field.getAnnotation(Resource.class) == null) return;
if ((src instanceof Service) && Sncp.isRemote((Service) src)) return; //远程模式不得注入 CacheSource
List<Transport> sameGroupTransports = sncpSameGroupTransports;
List<Transport> diffGroupTransports = sncpDiffGroupTransports;
Transport sameGroupTransport = null;
List<Transport> diffGroupTransports = null;
try {
Field ts = src.getClass().getDeclaredField("_sameGroupTransports");
Field ts = src.getClass().getDeclaredField("_sameGroupTransport");
ts.setAccessible(true);
Transport[] lts = (Transport[]) ts.get(src);
sameGroupTransports = Arrays.asList(lts);
sameGroupTransport = (Transport) ts.get(src);
ts = src.getClass().getDeclaredField("_diffGroupTransports");
ts.setAccessible(true);
lts = (Transport[]) ts.get(src);
diffGroupTransports = Arrays.asList(lts);
diffGroupTransports = Arrays.asList((Transport[]) ts.get(src));
} catch (Exception e) {
//src 不含 MultiRun 方法
}
CacheSourceService source = Sncp.createLocalService(resourceName, getExecutor(), CacheSourceService.class, this.sncpAddress, sncpDefaultGroups, sameGroupTransports, diffGroupTransports);
CacheSourceService source = Sncp.createLocalService(resourceName, getExecutor(), CacheSourceService.class, this.sncpAddress, sameGroupTransport, diffGroupTransports);
Type genericType = field.getGenericType();
ParameterizedType pt = (genericType instanceof ParameterizedType) ? (ParameterizedType) genericType : null;
Type valType = pt == null ? null : pt.getActualTypeArguments()[1];
@@ -232,7 +219,7 @@ public abstract class NodeServer {
sncpServer = (NodeSncpServer) node;
}
}
ServiceWrapper wrapper = new ServiceWrapper(CacheSourceService.class, (Service) source, resourceName, getSncpGroup(), sncpDefaultGroups, null);
ServiceWrapper wrapper = new ServiceWrapper(CacheSourceService.class, (Service) source, resourceName, getSncpGroup(), null, null);
sncpServer.getSncpServer().addService(wrapper);
}
logger.fine("[" + Thread.currentThread().getName() + "] Load Source " + source);
@@ -242,120 +229,37 @@ public abstract class NodeServer {
});
}
private void initGroup() {
final AnyValue[] services = this.nodeConf.getAnyValues("services");
final String[] groups = services.length < 1 ? new String[]{""} : services[0].getValue("groups", "").split(";");
this.sncpDefaultGroups.addAll(Arrays.asList(groups));
if (!isSNCP()) {
NodeSncpServer sncpServer = null;
for (NodeServer node : application.servers) {
if (!node.isSNCP()) continue;
if (!this.sncpDefaultGroups.contains(node.sncpGroup)) continue;
sncpServer = (NodeSncpServer) node;
break;
}
if (sncpServer == null && (groups.length == 1 && groups[0].isEmpty())) {
for (NodeServer node : application.servers) {
if (!node.isSNCP()) continue;
sncpServer = (NodeSncpServer) node;
break;
}
}
if (sncpServer != null) {
this.sncpAddress = sncpServer.getSncpAddress();
this.sncpGroup = sncpServer.getSncpGroup();
this.sncpDefaultGroups.clear();
this.sncpDefaultGroups.addAll(sncpServer.sncpDefaultGroups);
this.sncpSameGroupTransports.addAll(sncpServer.sncpSameGroupTransports);
this.sncpDiffGroupTransports.addAll(sncpServer.sncpDiffGroupTransports);
return;
}
}
final Set<InetSocketAddress> sameGroupAddrs = application.findGlobalGroup(this.sncpGroup);
final Map<String, Set<InetSocketAddress>> diffGroupAddrs = new HashMap<>();
for (String groupitem : groups) {
final Set<InetSocketAddress> addrs = application.findGlobalGroup(groupitem);
if (addrs == null || groupitem.equals(this.sncpGroup)) continue;
diffGroupAddrs.put(groupitem, addrs);
}
if (sameGroupAddrs != null) {
sameGroupAddrs.remove(this.sncpAddress);
for (InetSocketAddress iaddr : sameGroupAddrs) {
sncpSameGroupTransports.add(loadTransport(this.sncpGroup, getNodeProtocol(), iaddr));
}
}
diffGroupAddrs.forEach((k, v) -> sncpDiffGroupTransports.add(loadTransport(k, getNodeProtocol(), v)));
}
@SuppressWarnings("unchecked")
protected void loadService(ClassFilter serviceFilter) throws Exception {
if (serviceFilter == null) return;
final String threadName = "[" + Thread.currentThread().getName() + "] ";
final Set<FilterEntry<Service>> entrys = serviceFilter.getFilterEntrys();
ResourceFactory regFactory = isSNCP() ? application.getResourceFactory() : factory;
final Set<InetSocketAddress> sg = application.findGlobalGroup(this.sncpGroup);
for (FilterEntry<Service> entry : entrys) { //service实现类
final Class<? extends Service> type = entry.getType();
if (Modifier.isFinal(type.getModifiers())) continue;
if (Modifier.isFinal(type.getModifiers())) continue; //修饰final的类跳过
if (!Modifier.isPublic(type.getModifiers())) continue;
if (!isSNCP() && factory.find(entry.getName(), type) != null) continue;
final Set<InetSocketAddress> sameGroupAddrs = new LinkedHashSet<>();
final Map<String, Set<InetSocketAddress>> diffGroupAddrs = new HashMap<>();
final HashSet<String> groups = entry.getGroups();
for (String g : groups) {
if (g.isEmpty()) continue;
if (g.equals(this.sncpGroup) && sg != null) sameGroupAddrs.addAll(sg);
Set<InetSocketAddress> set = application.findGlobalGroup(g);
if (set == null) throw new RuntimeException(type.getName() + " has illegal group (" + groups + ")");
if (!g.equals(this.sncpGroup)) {
diffGroupAddrs.put(g, set);
}
}
List<Transport> diffGroupTransports = new ArrayList<>();
diffGroupAddrs.forEach((k, v) -> diffGroupTransports.add(loadTransport(k, server.getProtocol(), v)));
final boolean localed = (sameGroupAddrs.isEmpty() && diffGroupAddrs.isEmpty()) || sameGroupAddrs.contains(this.sncpAddress) || type.getAnnotation(LocalService.class) != null;//本地模式
if (entry.getName().contains("$")) throw new RuntimeException("<name> value cannot contains '$' in " + entry.getProperty());
if (!isSNCP() && factory.find(entry.getName(), type) != null) continue; //非SNCP的Server加载Service时需要判断是否在SNCP的Server已经加载过了。
final HashSet<String> groups = entry.getGroups(); //groups.isEmpty()表示<services>没有配置groups属性。
if (groups.isEmpty() && isSNCP()) groups.add(this.sncpGroup);
final boolean localed = this.sncpAddress == null //非SNCP的Server通常是单点服务
|| groups.contains(this.sncpGroup) //本地IP含在内的
|| type.getAnnotation(LocalService.class) != null;//本地模式
if (localed && (type.isInterface() || Modifier.isAbstract(type.getModifiers()))) continue; //本地模式不能实例化接口和抽象类的Service类
final ServiceResource st = type.getAnnotation(ServiceResource.class);
final Class<? extends Service> resType = st == null ? type : st.value();
if (st != null && (!isSNCP() && factory.find(entry.getName(), resType) != null)) continue;
ServiceWrapper wrapper;
Service service;
if (localed) { //本地模式
sameGroupAddrs.remove(this.sncpAddress);
List<Transport> sameGroupTransports = new ArrayList<>();
for (InetSocketAddress iaddr : sameGroupAddrs) {
Set<InetSocketAddress> tset = new HashSet<>();
tset.add(iaddr);
sameGroupTransports.add(loadTransport(this.sncpGroup, server.getProtocol(), tset));
}
Service service = Sncp.createLocalService(entry.getName(), getExecutor(), type, this.sncpAddress, groups, sameGroupTransports, diffGroupTransports);
wrapper = new ServiceWrapper(resType, service, this.sncpGroup, entry);
if (fine) logger.fine("[" + Thread.currentThread().getName() + "] Load Service " + service);
service = Sncp.createLocalService(entry.getName(), getExecutor(), type, this.sncpAddress, loadTransport(this.sncpGroup), loadTransports(groups));
} else {
sameGroupAddrs.remove(this.sncpAddress);
StringBuilder g = new StringBuilder();
diffGroupAddrs.forEach((k, v) -> {
if (g.length() > 0) g.append(';');
g.append(k);
sameGroupAddrs.addAll(v);
});
if (sameGroupAddrs.isEmpty()) throw new RuntimeException(type.getName() + " has no remote address on group (" + groups + ")");
Service service = Sncp.createRemoteService(entry.getName(), getExecutor(), type, this.sncpAddress, groups, loadTransport(g.toString(), server.getProtocol(), sameGroupAddrs));
wrapper = new ServiceWrapper(resType, service, "", entry);
if (fine) logger.fine("[" + Thread.currentThread().getName() + "] Load Service " + service);
service = Sncp.createRemoteService(entry.getName(), getExecutor(), type, this.sncpAddress, loadTransport(groups));
}
final ServiceWrapper wrapper = new ServiceWrapper(type, service, entry.getName(), localed ? this.sncpGroup : null, groups, entry.getProperty());
if (fine) logger.fine("[" + Thread.currentThread().getName() + "] Load Service " + wrapper.getService());
if (factory.find(wrapper.getName(), wrapper.getType()) == null) {
regFactory.register(wrapper.getName(), wrapper.getType(), wrapper.getService());
if (wrapper.getService() instanceof DataSource) {
regFactory.register(wrapper.getName(), DataSource.class, wrapper.getService());
} else if (wrapper.getService() instanceof CacheSource) {
regFactory.register(wrapper.getName(), CacheSource.class, wrapper.getService());
} else if (wrapper.getService() instanceof DataCacheListener) {
regFactory.register(wrapper.getName(), DataCacheListener.class, wrapper.getService());
} else if (wrapper.getService() instanceof DataSQLListener) {
regFactory.register(wrapper.getName(), DataSQLListener.class, wrapper.getService());
} else if (wrapper.getService() instanceof WebSocketNode) {
regFactory.register(wrapper.getName(), WebSocketNode.class, wrapper.getService());
}
regFactory.register(wrapper.getName(), wrapper.getService());
if (wrapper.isRemote()) {
remoteServiceWrappers.add(wrapper);
} else {
@@ -392,30 +296,69 @@ public abstract class NodeServer {
if (sb != null && sb.length() > 0) logger.log(Level.INFO, sb.toString());
}
protected Transport loadTransport(String group, String protocol, InetSocketAddress addr) {
if (addr == null) return null;
Set<InetSocketAddress> set = new HashSet<>();
set.add(addr);
return loadTransport(group, protocol, set);
protected List<Transport> loadTransports(final HashSet<String> groups) {
if (groups == null) return null;
final List<Transport> transports = new ArrayList<>();
for (String group : groups) {
transports.add(loadTransport(group));
}
return transports;
}
protected Transport loadTransport(String group, String protocol, Set<InetSocketAddress> addrs) {
Transport transport = null;
if (!addrs.isEmpty()) {
synchronized (application.transports) {
for (Transport tran : application.transports) {
if (tran.match(addrs)) {
transport = tran;
break;
}
}
if (transport == null) {
transport = new Transport(group + "_" + application.transports.size(), protocol, application.getWatchFactory(), 32, addrs);
logger.info(transport + " created");
application.transports.add(transport);
}
protected Transport loadTransport(final HashSet<String> groups) {
if (groups == null || groups.isEmpty()) return null;
List<String> tmpgroup = new ArrayList<>(groups);
Collections.sort(tmpgroup); //按字母排列顺序
boolean flag = false;
StringBuilder sb = new StringBuilder();
for (String g : tmpgroup) {
if (flag) sb.append(';');
sb.append(g);
flag = true;
}
final String groupid = sb.toString();
Transport transport = application.transports.get(groupid);
if (transport != null) return transport;
final List<Transport> transports = new ArrayList<>();
for (String group : groups) {
transports.add(loadTransport(group));
}
Set<InetSocketAddress> addrs = new HashSet();
for (Transport t : transports) {
for (InetSocketAddress addr : t.getRemoteAddresses()) {
addrs.add(addr);
}
}
Transport first = transports.get(0);
Transport newTransport = new Transport(groupid, application.findGroupProtocol(first.getName()), application.getWatchFactory(),
application.transportBufferPool, application.transportChannelGroup, this.sncpAddress, addrs);
synchronized (application.transports) {
transport = application.transports.get(groupid);
if (transport == null) {
transport = newTransport;
application.transports.put(groupid, transport);
}
}
return transport;
}
protected Transport loadTransport(final String group) {
if (group == null) return null;
Transport transport;
synchronized (application.transports) {
transport = application.transports.get(group);
if (transport != null) {
if (this.sncpAddress != null && !this.sncpAddress.equals(transport.getClientAddress())) {
throw new RuntimeException(transport + "repeat create on newClientAddress = " + this.sncpAddress + ", oldClientAddress = " + transport.getClientAddress());
}
return transport;
}
Set<InetSocketAddress> addrs = application.findGlobalGroup(group);
if (addrs == null) throw new RuntimeException("Not found <group> = " + group + " on <resources> ");
transport = new Transport(group, application.findGroupProtocol(group), application.getWatchFactory(),
application.transportBufferPool, application.transportChannelGroup, this.sncpAddress, addrs);
application.transports.put(group, transport);
}
return transport;
}
@@ -429,13 +372,17 @@ public abstract class NodeServer {
Class inter, Class<? extends Annotation> ref2, String properties, String property) {
ClassFilter cf = new ClassFilter(ref, inter, null);
if (properties == null && properties == null) return cf;
if (this.nodeConf == null) return cf;
AnyValue[] proplist = this.nodeConf.getAnyValues(properties);
if (this.serverConf == null) return cf;
AnyValue[] proplist = this.serverConf.getAnyValues(properties);
if (proplist == null || proplist.length < 1) return cf;
cf = null;
for (AnyValue list : proplist) {
DefaultAnyValue prop = null;
String sc = list.getValue("groups");
if (sc != null) {
sc = sc.trim();
if (sc.endsWith(";")) sc = sc.substring(0, sc.length() - 1);
}
if (sc == null) sc = localGroup;
if (sc != null) {
prop = new AnyValue.DefaultAnyValue();
@@ -496,10 +443,6 @@ public abstract class NodeServer {
return sncpGroup;
}
public String getNodeProtocol() {
return nodeProtocol;
}
public void start() throws IOException {
server.start();
}