This commit is contained in:
地平线
2015-08-13 17:58:05 +08:00
parent 8d840cfbb6
commit 110062076f
11 changed files with 335 additions and 257 deletions

View File

@@ -50,22 +50,6 @@
--> -->
<node addr="127.0.0.1" port="7070"/> <node addr="127.0.0.1" port="7070"/>
</group> </group>
<!--
远程client地址组资源. 注意: remote的name值不能为LOCAL不区分大小写
protocol 值只能是UDP TCP 默认UDP
clients: 连接池数, 默认: CPU核数*8
buffers: ByteBuffer对象池的大小 默认: CPU核数*16
group: 组名, 默认是空字符串, 通常不同机房使用不同的group值
-->
<remote name="myremote" protocol="UDP" group="">
<!--
weight: 权重百分比。 不指定则平均。 weight之和必须<=100
[注: weight尚未实现]
-->
<address addr="127.0.0.1" port="7070" weight="30"/>
<address addr="127.0.0.1" port="7071" weight="30"/>
<address addr="127.0.0.1" port="7072" weight="40"/>
</remote>
<!-- <!--
全局的参数配置, 可以通过@Resource(name="property.xxxxxx") 进行注入, 被注解的字段类型只能是String、primitive class 全局的参数配置, 可以通过@Resource(name="property.xxxxxx") 进行注入, 被注解的字段类型只能是String、primitive class
如果name是system.property.开头的值将会在进程启动时进行System.setProperty("yyyy", "YYYYYY")操作。 如果name是system.property.开头的值将会在进程启动时进行System.setProperty("yyyy", "YYYYYY")操作。
@@ -84,12 +68,9 @@
</properties> </properties>
</resources> </resources>
<!-- <!--
protocol: required server所启动的协议有HTTP、SNCP 目前只支持HTTP、SNCP。SNCP分TCP、UDP实现默认使用UDP实现TCP实现则使用SNCP.TCP值; protocol: required server所启动的协议有HTTP、SNCP 目前只支持HTTP、SNCP。SNCP分TCP、UDP实现默认使用TCP实现UDP实现则使用SNCP.UDP值;
host: 服务所占address 默认: 0.0.0.0 host: 服务所占address 默认: 0.0.0.0
port: required 服务所占端口 port: required 服务所占端口
group: 所属组的节点,多个节点值用;隔开如果配置文件中存在多个SNCP协议的Server节点需要显式指定group属性.
当 protocol == SNCP 时 group表示当前Server与哪些节点组关联。
当 protocol != SNCP 时 group只能是空或者一个group的节点值不能为多个节点值。
root: 如果是web类型服务则包含页面 默认:{APP_HOME}/root root: 如果是web类型服务则包含页面 默认:{APP_HOME}/root
lib: server额外的class目录 默认为空 lib: server额外的class目录 默认为空
charset: 文本编码, 默认: UTF-8 charset: 文本编码, 默认: UTF-8
@@ -114,6 +95,9 @@
autoload="false" 需要显著的指定Service类 autoload="false" 需要显著的指定Service类
includes 当autoload="true" 拉取类名与includes中的正则表达式匹配的类, 多个正则表达式用分号;隔开 includes 当autoload="true" 拉取类名与includes中的正则表达式匹配的类, 多个正则表达式用分号;隔开
excludes 当autoload="true" 排除类名与includes中的正则表达式匹配的类, 多个正则表达式用分号;隔开 excludes 当autoload="true" 排除类名与includes中的正则表达式匹配的类, 多个正则表达式用分号;隔开
groups: 所属组的节点,多个节点值用;隔开如果配置文件中存在多个SNCP协议的Server节点需要显式指定group属性.
当 protocol == SNCP 时 group表示当前Server与哪些节点组关联。
当 protocol != SNCP 时 group只能是空或者一个group的节点值不能为多个节点值。
--> -->
<services autoload="true" includes="" excludes=""> <services autoload="true" includes="" excludes="">
<!-- <!--

View File

@@ -56,6 +56,9 @@ public final class Application {
//当前SNCP Server所属的组 类型: String //当前SNCP Server所属的组 类型: String
public static final String RESNAME_SNCP_GROUP = "SNCP_GROUP"; public static final String RESNAME_SNCP_GROUP = "SNCP_GROUP";
//当前Service所属的组 类型: Set<String>、String[]
public static final String RESNAME_SNCP_GROUPS = Sncp.RESNAME_SNCP_GROUPS; //SNCP_GROUPS
//当前SNCP Server的IP地址+端口 类型: SocketAddress、InetSocketAddress、String //当前SNCP Server的IP地址+端口 类型: SocketAddress、InetSocketAddress、String
public static final String RESNAME_SNCP_NODE = "SNCP_NODE"; public static final String RESNAME_SNCP_NODE = "SNCP_NODE";
@@ -460,7 +463,7 @@ public final class Application {
public static <T extends Service> T singleton(Class<T> serviceClass, boolean remote) throws Exception { public static <T extends Service> T singleton(Class<T> serviceClass, boolean remote) throws Exception {
final Application application = Application.create(); final Application application = Application.create();
T service = remote ? Sncp.createRemoteService("", serviceClass, null, null) : Sncp.createLocalService("", serviceClass, null, null, null); T service = remote ? Sncp.createRemoteService("", serviceClass, null, null) : Sncp.createLocalService("", serviceClass, null, new LinkedHashSet<>(), null, null);
application.init(); application.init();
application.factory.register(service); application.factory.register(service);
new NodeSncpServer(application, new CountDownLatch(1), null).init(application.config); new NodeSncpServer(application, new CountDownLatch(1), null).init(application.config);
@@ -495,6 +498,7 @@ public final class Application {
} }
Set<InetSocketAddress> findGlobalGroup(String group) { Set<InetSocketAddress> findGlobalGroup(String group) {
if (group == null) return null;
Set<InetSocketAddress> set = globalGroups.get(group); Set<InetSocketAddress> set = globalGroups.get(group);
return set == null ? null : new LinkedHashSet<>(set); return set == null ? null : new LinkedHashSet<>(set);
} }

View File

@@ -8,6 +8,7 @@ package com.wentch.redkale.boot;
import com.wentch.redkale.util.Ignore; import com.wentch.redkale.util.Ignore;
import com.wentch.redkale.util.AutoLoad; import com.wentch.redkale.util.AutoLoad;
import com.wentch.redkale.util.AnyValue; import com.wentch.redkale.util.AnyValue;
import com.wentch.redkale.util.AnyValue.DefaultAnyValue;
import java.io.*; import java.io.*;
import java.lang.annotation.*; import java.lang.annotation.*;
import java.lang.reflect.*; import java.lang.reflect.*;
@@ -111,14 +112,22 @@ public final class ClassFilter<T> {
if (cf == null) return; if (cf == null) return;
try { try {
Class clazz = Class.forName(clazzname); Class clazz = Class.forName(clazzname);
if (cf.accept(property, clazz, autoscan)) { if (!cf.accept(property, clazz, autoscan)) return;
if (conf != null) { if (cf.conf != null) {
if (property == null) { if (property == null) {
property = cf.conf; property = cf.conf;
} else {
if (property instanceof DefaultAnyValue) {
((DefaultAnyValue) property).addAll(cf.conf);
} else {
DefaultAnyValue dav = new DefaultAnyValue();
dav.addAll(property);
dav.addAll(cf.conf);
property = dav;
} }
} }
entrys.add(new FilterEntry(clazz, property));
} }
entrys.add(new FilterEntry(clazz, property));
} catch (Throwable cfe) { } catch (Throwable cfe) {
} }
} }
@@ -243,7 +252,7 @@ public final class ClassFilter<T> {
*/ */
public static final class FilterEntry<T> { public static final class FilterEntry<T> {
private String group; private final HashSet<String> groups = new LinkedHashSet<>();
private final String name; private final String name;
@@ -253,7 +262,8 @@ public final class ClassFilter<T> {
public FilterEntry(Class<T> type, AnyValue property) { public FilterEntry(Class<T> type, AnyValue property) {
this.type = type; this.type = type;
this.group = property == null ? "" : property.getValue("group", ""); String str = property == null ? null : property.getValue("groups");
if (str != null) groups.addAll(Arrays.asList(str.split(";")));
this.property = property; this.property = property;
this.name = property == null ? "" : property.getValue("name", ""); this.name = property == null ? "" : property.getValue("name", "");
} }
@@ -261,7 +271,7 @@ public final class ClassFilter<T> {
@Override @Override
public String toString() { public String toString() {
return this.getClass().getSimpleName() + "[thread=" + Thread.currentThread().getName() return this.getClass().getSimpleName() + "[thread=" + Thread.currentThread().getName()
+ ", type=" + this.type.getSimpleName() + ", name=" + name + ", group=" + group + "]"; + ", type=" + this.type.getSimpleName() + ", name=" + name + ", groups=" + groups + "]";
} }
@Override @Override
@@ -273,7 +283,7 @@ public final class ClassFilter<T> {
public boolean equals(Object obj) { public boolean equals(Object obj) {
if (obj == null) return false; if (obj == null) return false;
if (getClass() != obj.getClass()) return false; if (getClass() != obj.getClass()) return false;
return (this.type == ((FilterEntry<?>) obj).type && this.group.equals(((FilterEntry<?>) obj).group) && this.name.equals(((FilterEntry<?>) obj).name)); return (this.type == ((FilterEntry<?>) obj).type && this.groups.equals(((FilterEntry<?>) obj).groups) && this.name.equals(((FilterEntry<?>) obj).name));
} }
public Class<T> getType() { public Class<T> getType() {
@@ -288,13 +298,10 @@ public final class ClassFilter<T> {
return property; return property;
} }
public String getGroup() { public HashSet<String> getGroups() {
return group; return groups;
} }
public void setGroup(String group) {
this.group = group;
}
} }
/** /**

View File

@@ -77,20 +77,17 @@ public final class NodeHttpServer extends NodeServer {
if (nodeService == null) { if (nodeService == null) {
Class<? extends Service> sc = (Class<? extends Service>) application.webSocketNodeClass; Class<? extends Service> sc = (Class<? extends Service>) application.webSocketNodeClass;
nodeService = Sncp.createLocalService(rcname, (Class<? extends Service>) (sc == null ? WebSocketNodeService.class : sc), nodeService = Sncp.createLocalService(rcname, (Class<? extends Service>) (sc == null ? WebSocketNodeService.class : sc),
getSncpAddress(), (sc == null ? null : nodeSameGroupTransports), (sc == null ? null : nodeDiffGroupTransports)); getSncpAddress(), new LinkedHashSet<>(), (sc == null ? null : sncpSameGroupTransports), (sc == null ? null : sncpDiffGroupTransports));
regFactory.register(rcname, WebSocketNode.class, nodeService); regFactory.register(rcname, WebSocketNode.class, nodeService);
WebSocketNode wsn = (WebSocketNode) nodeService; WebSocketNode wsn = (WebSocketNode) nodeService;
wsn.setLocalSncpAddress(getSncpAddress()); wsn.setLocalSncpAddress(getSncpAddress());
final Set<InetSocketAddress> alladdrs = new HashSet<>(); final Set<InetSocketAddress> alladdrs = new HashSet<>();
application.globalNodes.forEach((k, v) -> alladdrs.add(k)); application.globalNodes.forEach((k, v) -> alladdrs.add(k));
alladdrs.remove(getSncpAddress()); alladdrs.remove(getSncpAddress());
WebSocketNode remoteNode = (WebSocketNode) Sncp.createRemoteService(rcname, (Class<? extends Service>) (sc == null ? WebSocketNodeService.class : sc), final Class<? extends Service> serviceType = (sc == null ? WebSocketNodeService.class : sc);
getSncpAddress(), (sc == null ? null : loadTransport(getSncpGroup(), getNodeProtocol(), alladdrs)));
wsn.setRemoteWebSocketNode(remoteNode);
factory.inject(nodeService); factory.inject(nodeService);
factory.inject(remoteNode);
if (sncpServer != null) { if (sncpServer != null) {
ServiceWrapper wrapper = new ServiceWrapper((Class<? extends Service>) (sc == null ? WebSocketNodeService.class : sc), nodeService, getSncpGroup(), rcname, null); ServiceWrapper wrapper = new ServiceWrapper(serviceType, nodeService, rcname, getSncpGroup(), new LinkedHashSet<>(), null);
sncpServer.getSncpServer().addService(wrapper); sncpServer.getSncpServer().addService(wrapper);
} }
} }

View File

@@ -14,6 +14,7 @@ import com.wentch.redkale.util.AnyValue;
import com.wentch.redkale.util.Ignore; import com.wentch.redkale.util.Ignore;
import com.wentch.redkale.boot.ClassFilter.FilterEntry; import com.wentch.redkale.boot.ClassFilter.FilterEntry;
import com.wentch.redkale.net.*; import com.wentch.redkale.net.*;
import com.wentch.redkale.net.http.*;
import com.wentch.redkale.source.*; import com.wentch.redkale.source.*;
import com.wentch.redkale.util.*; import com.wentch.redkale.util.*;
import com.wentch.redkale.util.AnyValue.DefaultAnyValue; import com.wentch.redkale.util.AnyValue.DefaultAnyValue;
@@ -47,7 +48,7 @@ public abstract class NodeServer {
private InetSocketAddress sncpAddress; //HttpServer中的sncpAddress 为所属group对应的SncpServer, 为null表示只是单节点没有分布式结构 private InetSocketAddress sncpAddress; //HttpServer中的sncpAddress 为所属group对应的SncpServer, 为null表示只是单节点没有分布式结构
private String sncpGroup = ""; //当前Server的SNCP协议的组 private String sncpGroup = null; //当前Server的SNCP协议的组
private AnyValue nodeConf; private AnyValue nodeConf;
@@ -55,9 +56,9 @@ public abstract class NodeServer {
protected Consumer<ServiceWrapper> consumer; protected Consumer<ServiceWrapper> consumer;
protected final List<Transport> nodeSameGroupTransports = new ArrayList<>(); protected final List<Transport> sncpSameGroupTransports = new ArrayList<>();
protected final List<Transport> nodeDiffGroupTransports = new ArrayList<>(); protected final List<Transport> sncpDiffGroupTransports = new ArrayList<>();
protected final Set<ServiceWrapper> localServices = new LinkedHashSet<>(); protected final Set<ServiceWrapper> localServices = new LinkedHashSet<>();
@@ -78,27 +79,9 @@ public abstract class NodeServer {
if (isSNCP()) { // SNCP协议 if (isSNCP()) { // SNCP协议
String host = this.nodeConf.getValue("host", "0.0.0.0").replace("0.0.0.0", ""); String host = this.nodeConf.getValue("host", "0.0.0.0").replace("0.0.0.0", "");
this.sncpAddress = new InetSocketAddress(host.isEmpty() ? application.localAddress.getHostAddress() : host, this.nodeConf.getIntValue("port")); this.sncpAddress = new InetSocketAddress(host.isEmpty() ? application.localAddress.getHostAddress() : host, this.nodeConf.getIntValue("port"));
this.sncpGroup = application.globalNodes.getOrDefault(this.sncpAddress, ""); this.sncpGroup = application.globalNodes.get(this.sncpAddress);
if (this.sncpGroup == null) throw new RuntimeException("Server (" + String.valueOf(config).replaceAll("\\s+", " ") + ") not found <group> info");
if (server != null) this.nodeProtocol = server.getProtocol(); if (server != null) this.nodeProtocol = server.getProtocol();
} else { // HTTP协议
this.sncpAddress = null;
this.sncpGroup = "";
this.nodeProtocol = Sncp.DEFAULT_PROTOCOL;
String mbgroup = this.nodeConf.getValue("group", "");
NodeServer sncpServer = null; //有匹配的就取匹配的, 没有且SNCP只有一个则取此SNCP。
for (NodeServer ns : application.servers) {
if (!ns.isSNCP()) continue;
if (sncpServer == null) sncpServer = ns;
if (ns.getSncpGroup().equals(mbgroup)) {
sncpServer = ns;
break;
}
}
if (sncpServer != null) {
this.sncpAddress = sncpServer.getSncpAddress();
this.sncpGroup = sncpServer.getSncpGroup();
this.nodeProtocol = sncpServer.getNodeProtocol();
}
} }
if (this.sncpAddress != null) { // 无分布式结构下 HTTP协议的sncpAddress 为 null if (this.sncpAddress != null) { // 无分布式结构下 HTTP协议的sncpAddress 为 null
this.factory.register(RESNAME_SNCP_NODE, SocketAddress.class, this.sncpAddress); this.factory.register(RESNAME_SNCP_NODE, SocketAddress.class, this.sncpAddress);
@@ -123,11 +106,12 @@ public abstract class NodeServer {
private void initResource() { private void initResource() {
final List<Transport>[] transportses = parseTransport(this.nodeConf.getValue("group", "").split(";")); final List<Transport>[] transportses = parseTransport(this.nodeConf.getValue("group", "").split(";"));
this.nodeSameGroupTransports.addAll(transportses[0]); this.sncpSameGroupTransports.addAll(transportses[0]);
this.nodeDiffGroupTransports.addAll(transportses[1]); this.sncpDiffGroupTransports.addAll(transportses[1]);
//--------------------------------------------------------------------------------------------- //---------------------------------------------------------------------------------------------
final ResourceFactory regFactory = application.factory; final ResourceFactory regFactory = application.factory;
final HashSet<String> defGroups = new LinkedHashSet<>();
factory.add(DataSource.class, (ResourceFactory rf, final Object src, Field field) -> { factory.add(DataSource.class, (ResourceFactory rf, final Object src, Field field) -> {
try { try {
Resource rs = field.getAnnotation(Resource.class); Resource rs = field.getAnnotation(Resource.class);
@@ -138,9 +122,9 @@ public abstract class NodeServer {
regFactory.register(rs.name(), DataSource.class, source); regFactory.register(rs.name(), DataSource.class, source);
Class<? extends Service> sc = (Class<? extends Service>) application.dataCacheListenerClass; Class<? extends Service> sc = (Class<? extends Service>) application.dataCacheListenerClass;
if (sc != null) { if (sc != null) {
Service cacheListenerService = Sncp.createLocalService(rs.name(), sc, this.sncpAddress, nodeSameGroupTransports, nodeDiffGroupTransports); Service cacheListenerService = Sncp.createLocalService(rs.name(), sc, this.sncpAddress, defGroups, sncpSameGroupTransports, sncpDiffGroupTransports);
regFactory.register(rs.name(), DataCacheListener.class, cacheListenerService); regFactory.register(rs.name(), DataCacheListener.class, cacheListenerService);
ServiceWrapper wrapper = new ServiceWrapper(sc, cacheListenerService, sncpGroup, rs.name(), null); ServiceWrapper wrapper = new ServiceWrapper(sc, cacheListenerService, rs.name(), sncpGroup, defGroups, null);
localServices.add(wrapper); localServices.add(wrapper);
if (consumer != null) consumer.accept(wrapper); if (consumer != null) consumer.accept(wrapper);
rf.inject(cacheListenerService); rf.inject(cacheListenerService);
@@ -173,6 +157,170 @@ public abstract class NodeServer {
return new List[]{sameGroupTransports0, diffGroupTransports0}; return new List[]{sameGroupTransports0, diffGroupTransports0};
} }
@SuppressWarnings("unchecked")
protected void loadService(ClassFilter serviceFilter) throws Exception {
if (serviceFilter == null) return;
final String threadName = "[" + Thread.currentThread().getName() + "] ";
final Set<FilterEntry<Service>> entrys = serviceFilter.getFilterEntrys();
ResourceFactory regFactory = isSNCP() ? application.factory : factory;
for (FilterEntry<Service> entry : entrys) { //service实现类
final Class<? extends Service> type = entry.getType();
if (type.isInterface()) continue;
if (Modifier.isFinal(type.getModifiers())) continue;
if (!Modifier.isPublic(type.getModifiers())) continue;
if (Modifier.isAbstract(type.getModifiers())) continue;
if (type.getAnnotation(Ignore.class) != null) continue;
if (!isSNCP() && factory.find(entry.getName(), type) != null) continue;
final Set<InetSocketAddress> sameGroupAddrs = new LinkedHashSet<>();
Set<InetSocketAddress> sg = application.findGlobalGroup(this.sncpGroup);
if (sg != null) sameGroupAddrs.addAll(sg);
final Map<String, Set<InetSocketAddress>> diffGroupAddrs = new HashMap<>();
final HashSet<String> groups = entry.getGroups();
for (String g : groups) {
if (g.isEmpty()) continue;
Set<InetSocketAddress> set = application.findGlobalGroup(g);
if (set == null) throw new RuntimeException(type.getName() + " has illegal group (" + groups + ")");
if (!g.equals(this.sncpGroup)) {
diffGroupAddrs.put(g, set);
}
}
List<Transport> diffGroupTransports = new ArrayList<>();
diffGroupAddrs.forEach((k, v) -> diffGroupTransports.add(loadTransport(k, server.getProtocol(), v)));
ServiceWrapper wrapper;
if ((sameGroupAddrs.isEmpty() && diffGroupAddrs.isEmpty()) || sameGroupAddrs.contains(this.sncpAddress)) { //本地模式
sameGroupAddrs.remove(this.sncpAddress);
List<Transport> sameGroupTransports = new ArrayList<>();
for (InetSocketAddress iaddr : sameGroupAddrs) {
Set<InetSocketAddress> tset = new HashSet<>();
tset.add(iaddr);
sameGroupTransports.add(loadTransport(this.sncpGroup, server.getProtocol(), tset));
}
Service service = Sncp.createLocalService(entry.getName(), type, this.sncpAddress, groups, sameGroupTransports, diffGroupTransports);
wrapper = new ServiceWrapper(type, service, this.sncpGroup, entry);
} else {
sameGroupAddrs.remove(this.sncpAddress);
StringBuilder g = new StringBuilder();
diffGroupAddrs.forEach((k, v) -> {
if (g.length() > 0) g.append(';');
g.append(k);
sameGroupAddrs.addAll(v);
});
if (sameGroupAddrs.isEmpty()) throw new RuntimeException(type.getName() + " has no remote address on group (" + groups + ")");
Service service = Sncp.createRemoteService(entry.getName(), type, this.sncpAddress, loadTransport(g.toString(), server.getProtocol(), sameGroupAddrs));
wrapper = new ServiceWrapper(type, service, "", entry);
}
if (factory.find(wrapper.getName(), wrapper.getType()) == null) {
regFactory.register(wrapper.getName(), wrapper.getType(), wrapper.getService());
if (wrapper.getService() instanceof DataSource) {
regFactory.register(wrapper.getName(), DataSource.class, wrapper.getService());
} else if (wrapper.getService() instanceof DataCacheListener) {
regFactory.register(wrapper.getName(), DataCacheListener.class, wrapper.getService());
} else if (wrapper.getService() instanceof DataSQLListener) {
regFactory.register(wrapper.getName(), DataSQLListener.class, wrapper.getService());
} else if (wrapper.getService() instanceof WebSocketNode) {
regFactory.register(wrapper.getName(), WebSocketNode.class, wrapper.getService());
}
if (wrapper.isRemote()) {
remoteServices.add(wrapper);
} else {
localServices.add(wrapper);
if (consumer != null) consumer.accept(wrapper);
}
} else if (isSNCP()) {
throw new RuntimeException(ServiceWrapper.class.getSimpleName() + "(class:" + type.getName() + ", name:" + entry.getName() + ", group:" + groups + ") is repeat.");
}
}
servicecdl.countDown();
servicecdl.await();
final StringBuilder sb = logger.isLoggable(Level.INFO) ? new StringBuilder() : null;
//---------------- inject ----------------
new HashSet<>(localServices).forEach(y -> {
factory.inject(y.getService());
});
remoteServices.forEach(y -> {
factory.inject(y.getService());
});
//----------------- init -----------------
localServices.parallelStream().forEach(y -> {
long s = System.currentTimeMillis();
y.getService().init(y.getConf());
long e = System.currentTimeMillis() - s;
if (e > 2 && sb != null) {
sb.append(threadName).append("LocalService(").append(y.getType()).append(':').append(y.getName()).append(") init ").append(e).append("ms").append(LINE_SEPARATOR);
}
});
if (sb != null && sb.length() > 0) logger.log(Level.INFO, sb.toString());
}
protected Transport loadTransport(String group, String protocol, InetSocketAddress addr) {
if (addr == null) return null;
Set<InetSocketAddress> set = new HashSet<>();
set.add(addr);
return loadTransport(group, protocol, set);
}
protected Transport loadTransport(String group, String protocol, Set<InetSocketAddress> addrs) {
Transport transport = null;
if (!addrs.isEmpty()) {
synchronized (application.transports) {
for (Transport tran : application.transports) {
if (tran.match(addrs)) {
transport = tran;
break;
}
}
if (transport == null) {
transport = new Transport(group + "_" + application.transports.size(), protocol, application.watch, 32, addrs);
application.transports.add(transport);
}
}
}
return transport;
}
protected ClassFilter<Service> createServiceClassFilter(final AnyValue config) {
return createClassFilter(this.sncpGroup, config, null, Service.class, Annotation.class, "services", "service");
}
protected static ClassFilter createClassFilter(final String localGroup, final AnyValue config, Class<? extends Annotation> ref,
Class inter, Class<? extends Annotation> ref2, String properties, String property) {
ClassFilter cf = new ClassFilter(ref, inter, null);
if (properties == null && properties == null) return cf;
if (config == null) return cf;
AnyValue[] proplist = config.getAnyValues(properties);
if (proplist == null || proplist.length < 1) return cf;
cf = null;
for (AnyValue list : proplist) {
DefaultAnyValue prop = null;
String sc = list.getValue("groups");
if (sc == null) sc = localGroup;
if (sc != null) {
prop = new AnyValue.DefaultAnyValue();
prop.addValue("groups", sc);
}
ClassFilter filter = new ClassFilter(ref, inter, prop);
for (AnyValue av : list.getAnyValues(property)) {
filter.filter(av, av.getValue("value"), false);
}
if (list.getBoolValue("autoload", true)) {
String includes = list.getValue("includes", "");
String excludes = list.getValue("excludes", "");
filter.setIncludePatterns(includes.split(";"));
filter.setExcludePatterns(excludes.split(";"));
} else {
if (ref2 == null || ref2 == Annotation.class) { //service如果是autoload=false则不需要加载
filter.setRefused(true);
} else if (ref2 != Annotation.class) {
filter.setAnnotationClass(ref2);
}
}
cf = (cf == null) ? filter : cf.or(filter);
}
return cf;
}
public abstract InetSocketAddress getSocketAddress(); public abstract InetSocketAddress getSocketAddress();
public abstract boolean isSNCP(); public abstract boolean isSNCP();
@@ -207,160 +355,4 @@ public abstract class NodeServer {
server.shutdown(); server.shutdown();
} }
protected Transport loadTransport(String group, String protocol, InetSocketAddress addr) {
if (addr == null) return null;
Set<InetSocketAddress> set = new HashSet<>();
set.add(addr);
return loadTransport(group, protocol, set);
}
protected Transport loadTransport(String group, String protocol, Set<InetSocketAddress> addrs) {
Transport transport = null;
if (!addrs.isEmpty()) {
synchronized (application.transports) {
for (Transport tran : application.transports) {
if (tran.match(addrs)) {
transport = tran;
break;
}
}
if (transport == null) {
transport = new Transport(group + "_" + application.transports.size(), protocol, application.watch, 32, addrs);
application.transports.add(transport);
}
}
}
return transport;
}
@SuppressWarnings("unchecked")
protected void loadService(ClassFilter serviceFilter) throws Exception {
if (serviceFilter == null) return;
final String threadName = "[" + Thread.currentThread().getName() + "] ";
final Set<FilterEntry<Service>> entrys = serviceFilter.getFilterEntrys();
final String defgroups = nodeConf == null ? "" : nodeConf.getValue("group", ""); //Server节点获取group信息
ResourceFactory regFactory = isSNCP() ? application.factory : factory;
for (FilterEntry<Service> entry : entrys) { //service实现类
final Class<? extends Service> type = entry.getType();
if (type.isInterface()) continue;
if (Modifier.isFinal(type.getModifiers())) continue;
if (!Modifier.isPublic(type.getModifiers())) continue;
if (Modifier.isAbstract(type.getModifiers())) continue;
if (type.getAnnotation(Ignore.class) != null) continue;
if (!isSNCP() && factory.find(entry.getName(), type) != null) continue;
String groups = entry.getGroup();
if (groups == null || groups.isEmpty()) groups = defgroups;
final Set<InetSocketAddress> sameGroupAddrs = new LinkedHashSet<>();
final Map<String, Set<InetSocketAddress>> diffGroupAddrs = new HashMap<>();
for (String g : groups.split(";")) {
if (g.isEmpty()) continue;
Set<InetSocketAddress> set = application.findGlobalGroup(g);
if (set == null) throw new RuntimeException(type.getName() + " has illegal group (" + groups + ")");
if (g.equals(this.sncpGroup)) {
sameGroupAddrs.addAll(set);
} else {
diffGroupAddrs.put(g, set);
}
}
final boolean localable = this.sncpAddress == null || sameGroupAddrs.contains(this.sncpAddress);
Service service;
List<Transport> diffGroupTransports = new ArrayList<>();
diffGroupAddrs.forEach((k, v) -> diffGroupTransports.add(loadTransport(k, server.getProtocol(), v)));
if (localable || (sameGroupAddrs.isEmpty() && diffGroupTransports.isEmpty())) {
sameGroupAddrs.remove(this.sncpAddress);
List<Transport> sameGroupTransports = new ArrayList<>();
for (InetSocketAddress iaddr : sameGroupAddrs) {
Set<InetSocketAddress> tset = new HashSet<>();
tset.add(iaddr);
sameGroupTransports.add(loadTransport(this.sncpGroup, server.getProtocol(), tset));
}
service = Sncp.createLocalService(entry.getName(), type, this.sncpAddress, sameGroupTransports, diffGroupTransports);
} else {
StringBuilder g = new StringBuilder(this.sncpGroup);
diffGroupAddrs.forEach((k, v) -> {
if (g.length() > 0) g.append(';');
g.append(k);
sameGroupAddrs.addAll(v);
});
if (sameGroupAddrs.isEmpty()) throw new RuntimeException(type.getName() + " has no remote address on group (" + groups + ")");
service = Sncp.createRemoteService(entry.getName(), type, this.sncpAddress, loadTransport(g.toString(), server.getProtocol(), sameGroupAddrs));
}
ServiceWrapper wrapper = new ServiceWrapper(type, service, entry);
if (factory.find(wrapper.getName(), wrapper.getType()) == null) {
regFactory.register(wrapper.getName(), wrapper.getType(), wrapper.getService());
if (wrapper.isRemote()) {
remoteServices.add(wrapper);
} else {
localServices.add(wrapper);
if (consumer != null) consumer.accept(wrapper);
}
} else if (isSNCP()) {
throw new RuntimeException(ServiceWrapper.class.getSimpleName() + "(class:" + type.getName() + ", name:" + entry.getName() + ", group:" + groups + ") is repeat.");
}
}
servicecdl.countDown();
servicecdl.await();
final StringBuilder sb = logger.isLoggable(Level.INFO) ? new StringBuilder() : null;
//---------------- inject ----------------
new HashSet<>(localServices).forEach(y -> {
factory.inject(y.getService());
});
remoteServices.forEach(y -> {
factory.inject(y.getService());
});
//----------------- init -----------------
localServices.parallelStream().forEach(y -> {
long s = System.currentTimeMillis();
y.getService().init(y.getConf());
long e = System.currentTimeMillis() - s;
if (e > 2 && sb != null) {
sb.append(threadName).append("LocalService(").append(y.getType()).append(':').append(y.getName()).append(") init ").append(e).append("ms").append(LINE_SEPARATOR);
}
});
if (sb != null && sb.length() > 0) logger.log(Level.INFO, sb.toString());
}
protected ClassFilter<Service> createServiceClassFilter(final AnyValue config) {
return createClassFilter(this.sncpGroup, config, null, Service.class, Annotation.class, "services", "service");
}
protected static ClassFilter createClassFilter(final String localGroup, final AnyValue config, Class<? extends Annotation> ref,
Class inter, Class<? extends Annotation> ref2, String properties, String property) {
ClassFilter cf = new ClassFilter(ref, inter, null);
if (properties == null && properties == null) return cf;
if (config == null) return cf;
AnyValue[] proplist = config.getAnyValues(properties);
if (proplist == null || proplist.length < 1) return cf;
cf = null;
for (AnyValue list : proplist) {
DefaultAnyValue prop = null;
String sc = list.getValue("group", "");
if (!sc.isEmpty()) {
prop = new AnyValue.DefaultAnyValue();
prop.addValue("group", sc);
}
ClassFilter filter = new ClassFilter(ref, inter, prop);
for (AnyValue av : list.getAnyValues(property)) {
filter.filter(av, av.getValue("value"), false);
}
if (list.getBoolValue("autoload", true)) {
String includes = list.getValue("includes", "");
String excludes = list.getValue("excludes", "");
filter.setIncludePatterns(includes.split(";"));
filter.setExcludePatterns(excludes.split(";"));
} else {
if (ref2 == null || ref2 == Annotation.class) { //service如果是autoload=false则不需要加载
filter.setRefused(true);
} else if (ref2 != Annotation.class) {
filter.setAnnotationClass(ref2);
}
}
cf = (cf == null) ? filter : cf.or(filter);
}
return cf;
}
} }

View File

@@ -25,6 +25,8 @@ public final class Transport {
protected final String name; protected final String name;
protected final int bufferPoolSize;
protected final String protocol; protected final String protocol;
protected final AsynchronousChannelGroup group; protected final AsynchronousChannelGroup group;
@@ -37,9 +39,14 @@ public final class Transport {
protected final ConcurrentHashMap<SocketAddress, BlockingQueue<AsyncConnection>> connPool = new ConcurrentHashMap<>(); protected final ConcurrentHashMap<SocketAddress, BlockingQueue<AsyncConnection>> connPool = new ConcurrentHashMap<>();
public Transport(Transport transport, InetSocketAddress localAddress, Collection<Transport> transports) {
this(transport.name, transport.protocol, null, transport.bufferPoolSize, parse(localAddress, transports));
}
public Transport(String name, String protocol, WatchFactory watch, int bufferPoolSize, Collection<InetSocketAddress> addresses) { public Transport(String name, String protocol, WatchFactory watch, int bufferPoolSize, Collection<InetSocketAddress> addresses) {
this.name = name; this.name = name;
this.protocol = protocol; this.protocol = protocol;
this.bufferPoolSize = bufferPoolSize;
AsynchronousChannelGroup g = null; AsynchronousChannelGroup g = null;
try { try {
final AtomicInteger counter = new AtomicInteger(); final AtomicInteger counter = new AtomicInteger();
@@ -54,8 +61,8 @@ public final class Transport {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
this.group = g; this.group = g;
AtomicLong createBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber(Transport.class.getSimpleName() + "_" + protocol + ".Buffer.creatCounter"); AtomicLong createBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber(Transport.class.getSimpleName() + "-" + name + "-" + protocol + ".Buffer.creatCounter");
AtomicLong cycleBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber(Transport.class.getSimpleName() + "_" + protocol + ".Buffer.cycleCounter"); AtomicLong cycleBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber(Transport.class.getSimpleName() + "-" + name + "-" + protocol + ".Buffer.cycleCounter");
int rcapacity = 8192; int rcapacity = 8192;
this.bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, bufferPoolSize, this.bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> { (Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> {
@@ -66,6 +73,15 @@ public final class Transport {
this.remoteAddres = addresses.toArray(new InetSocketAddress[addresses.size()]); this.remoteAddres = addresses.toArray(new InetSocketAddress[addresses.size()]);
} }
private static Collection<InetSocketAddress> parse(InetSocketAddress addr, Collection<Transport> transports) {
final Set<InetSocketAddress> set = new LinkedHashSet<>();
for (Transport t : transports) {
set.addAll(Arrays.asList(t.remoteAddres));
}
set.remove(addr);
return set;
}
public void close() { public void close() {
connPool.forEach((k, v) -> v.forEach(c -> c.dispose())); connPool.forEach((k, v) -> v.forEach(c -> c.dispose()));
} }

View File

@@ -33,6 +33,7 @@ public abstract class WebSocketNode {
protected InetSocketAddress localSncpAddress; //为SncpServer的服务address protected InetSocketAddress localSncpAddress; //为SncpServer的服务address
@SncpRemote
protected WebSocketNode remoteNode; protected WebSocketNode remoteNode;
//存放所有用户分布在节点上的队列信息,Set<InetSocketAddress> 为 sncpnode 的集合 //存放所有用户分布在节点上的队列信息,Set<InetSocketAddress> 为 sncpnode 的集合
@@ -109,11 +110,7 @@ public abstract class WebSocketNode {
this.localSncpAddress = localSncpAddress; this.localSncpAddress = localSncpAddress;
} }
public final void setRemoteWebSocketNode(WebSocketNode node) { public final void putWebSocketEngine(WebSocketEngine engine) {
this.remoteNode = node;
}
public final void addWebSocketEngine(WebSocketEngine engine) {
engines.put(engine.getEngineid(), engine); engines.put(engine.getEngineid(), engine);
} }

View File

@@ -59,7 +59,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Nameable {
public void init(Context context, AnyValue conf) { public void init(Context context, AnyValue conf) {
InetSocketAddress addr = context.getServerAddress(); InetSocketAddress addr = context.getServerAddress();
this.engine = new WebSocketEngine(addr.getHostString() + ":" + addr.getPort() + "-" + name()); this.engine = new WebSocketEngine(addr.getHostString() + ":" + addr.getPort() + "-" + name());
this.node.addWebSocketEngine(engine); this.node.putWebSocketEngine(engine);
this.node.init(conf); this.node.init(conf);
} }

View File

@@ -25,19 +25,22 @@ public final class ServiceWrapper<T extends Service> {
private final String group; private final String group;
private final Set<String> groups;
private final String name; private final String name;
private final boolean remote; private final boolean remote;
public ServiceWrapper(Class<T> type, T service, ClassFilter.FilterEntry<Service> entry) { public ServiceWrapper(Class<T> type, T service, String group, ClassFilter.FilterEntry<Service> entry) {
this(type, service, entry.getGroup(), entry.getName(), entry.getProperty()); this(type, service, entry.getName(), group, entry.getGroups(), entry.getProperty());
} }
public ServiceWrapper(Class<T> type, T service, String group, String name, AnyValue conf) { public ServiceWrapper(Class<T> type, T service, String name, String group, Set<String> groups, AnyValue conf) {
this.type = type == null ? (Class<T>) service.getClass() : type; this.type = type == null ? (Class<T>) service.getClass() : type;
this.service = service; this.service = service;
this.conf = conf; this.conf = conf;
this.group = group; this.group = group;
this.groups = groups;
this.name = name; this.name = name;
this.remote = Sncp.isRemote(service); this.remote = Sncp.isRemote(service);
} }
@@ -81,4 +84,8 @@ public final class ServiceWrapper<T extends Service> {
return remote; return remote;
} }
public Set<String> getGroups() {
return groups;
}
} }

View File

@@ -13,6 +13,7 @@ import com.wentch.redkale.util.*;
import java.lang.reflect.*; import java.lang.reflect.*;
import java.net.*; import java.net.*;
import java.util.*; import java.util.*;
import javax.annotation.*;
import jdk.internal.org.objectweb.asm.*; import jdk.internal.org.objectweb.asm.*;
import static jdk.internal.org.objectweb.asm.ClassWriter.COMPUTE_FRAMES; import static jdk.internal.org.objectweb.asm.ClassWriter.COMPUTE_FRAMES;
import static jdk.internal.org.objectweb.asm.Opcodes.*; import static jdk.internal.org.objectweb.asm.Opcodes.*;
@@ -24,6 +25,15 @@ import jdk.internal.org.objectweb.asm.Type;
*/ */
public abstract class Sncp { public abstract class Sncp {
//当前Service所属的组 类型: Set<String>、String[]
public static final String RESNAME_SNCP_GROUPS = "SNCP_GROUPS";
private static final java.lang.reflect.Type GROUPS_TYPE1 = new TypeToken<Set<String>>() {
}.getType();
private static final java.lang.reflect.Type GROUPS_TYPE2 = new TypeToken<String[]>() {
}.getType();
public static final String DEFAULT_PROTOCOL = "TCP"; public static final String DEFAULT_PROTOCOL = "TCP";
static final String LOCALPREFIX = "_DynLocal"; static final String LOCALPREFIX = "_DynLocal";
@@ -551,33 +561,74 @@ public abstract class Sncp {
* @param name * @param name
* @param serviceClass * @param serviceClass
* @param clientAddress * @param clientAddress
* @param groups
* @param sameGroupTransports * @param sameGroupTransports
* @param diffGroupTransports * @param diffGroupTransports
* @return * @return
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static <T extends Service> T createLocalService(final String name, final Class<T> serviceClass, public static <T extends Service> T createLocalService(final String name, final Class<T> serviceClass,
final InetSocketAddress clientAddress, Collection<Transport> sameGroupTransports, Collection<Transport> diffGroupTransports) { final InetSocketAddress clientAddress, HashSet<String> groups, Collection<Transport> sameGroupTransports, Collection<Transport> diffGroupTransports) {
try { try {
Class newClazz = createLocalServiceClass(name, serviceClass); final Class newClazz = createLocalServiceClass(name, serviceClass);
T rs = (T) newClazz.newInstance(); T rs = (T) newClazz.newInstance();
Field e = null; //--------------------------------------
try {
e = newClazz.getDeclaredField("_client");
} catch (NoSuchFieldException ne) {
return rs;
}
e.setAccessible(true);
e.set(rs, new SncpClient(name, hash(serviceClass), false, newClazz, true, clientAddress));
if (sameGroupTransports == null) sameGroupTransports = new ArrayList<>(); if (sameGroupTransports == null) sameGroupTransports = new ArrayList<>();
Field c = newClazz.getDeclaredField("_sameGroupTransports");
c.setAccessible(true);
c.set(rs, sameGroupTransports.toArray(new Transport[sameGroupTransports.size()]));
if (diffGroupTransports == null) diffGroupTransports = new ArrayList<>(); if (diffGroupTransports == null) diffGroupTransports = new ArrayList<>();
Field t = newClazz.getDeclaredField("_diffGroupTransports"); Transport remoteTransport = null;
t.setAccessible(true); {
t.set(rs, diffGroupTransports.toArray(new Transport[diffGroupTransports.size()])); Class loop = newClazz;
String[] groupArray = null;
do {
for (Field field : loop.getDeclaredFields()) {
int mod = field.getModifiers();
if (Modifier.isFinal(mod) || Modifier.isStatic(mod)) continue;
if (field.getAnnotation(SncpRemote.class) != null) {
field.setAccessible(true);
if (remoteTransport == null) {
List<Transport> list = new ArrayList<>();
list.addAll(sameGroupTransports);
list.addAll(diffGroupTransports);
if (!list.isEmpty()) remoteTransport = new Transport(list.get(0), clientAddress, list);
}
if (field.getType().isAssignableFrom(newClazz) && remoteTransport != null) {
field.set(rs, createRemoteService(name, serviceClass, clientAddress, remoteTransport));
}
continue;
}
Resource res = field.getAnnotation(Resource.class);
if (res == null || !res.name().equals(RESNAME_SNCP_GROUPS)) continue;
field.setAccessible(true);
if (groups == null) groups = new LinkedHashSet<>();
if (groupArray == null) groupArray = groups.toArray(new String[groups.size()]);
if (field.getGenericType().equals(GROUPS_TYPE1)) {
field.set(rs, groups);
} else if (field.getGenericType().equals(GROUPS_TYPE2)) {
field.set(rs, groupArray);
}
}
} while ((loop = loop.getSuperclass()) != Object.class);
}
{
Field e;
try {
e = newClazz.getDeclaredField("_client");
} catch (NoSuchFieldException ne) {
return rs;
}
e.setAccessible(true);
e.set(rs, new SncpClient(name, hash(serviceClass), false, newClazz, true, clientAddress));
}
{
Field c = newClazz.getDeclaredField("_sameGroupTransports");
c.setAccessible(true);
c.set(rs, sameGroupTransports.toArray(new Transport[sameGroupTransports.size()]));
}
{
Field t = newClazz.getDeclaredField("_diffGroupTransports");
t.setAccessible(true);
t.set(rs, diffGroupTransports.toArray(new Transport[diffGroupTransports.size()]));
}
return rs; return rs;
} catch (RuntimeException rex) { } catch (RuntimeException rex) {
throw rex; throw rex;

View File

@@ -0,0 +1,23 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package com.wentch.redkale.net.sncp;
import java.lang.annotation.*;
import static java.lang.annotation.ElementType.*;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
/**
* 用于在 Service 中创建自身远程模式的对象
*
* @author zhangjx
*/
@Inherited
@Documented
@Target({FIELD})
@Retention(RUNTIME)
public @interface SncpRemote {
}