This commit is contained in:
Redkale
2017-05-23 14:25:24 +08:00
parent 6f4c9dca48
commit 6562ac0a2b
3 changed files with 48 additions and 9 deletions

View File

@@ -30,6 +30,7 @@
所有服务所需的资源
-->
<resources>
<!--
【节点全局唯一】
transport节点只能有一个用于配置所有Transport的池参数没配置该节点将自动创建一个。
@@ -38,8 +39,9 @@
bufferPoolSize ByteBuffer池的大小默认: <group>节点数*CPU核数*8
-->
<transport bufferCapacity="8K" bufferPoolSize="32" threads="32"/>
<!--
一个组包含多个NODE 同一Service服务可以由多个进程提供这些进程称为一个GROUP且同一GROUP内的进程必须在同一机房或局域网内
一个组包含多个node 同一Service服务可以由多个进程提供这些进程称为一个GROUP且同一GROUP内的进程必须在同一机房或局域网内
一个group节点对应一个 Transport 对象。
name: 服务组ID长度不能超过11个字节. 默认为空字符串。 注意: name不能包含$符号。
protocol值范围UDP TCP 默认TCP
@@ -57,6 +59,17 @@
-->
<node addr="127.0.0.1" port="7070"/>
</group>
<!--
全局的数据源设置, 可以是CacheSource、DataSource JDBC的DataSource通常通过persistence.xml配置此处多用于CacheSource的配置
name: 资源名,用于依赖注入。
type类名必须是CacheSource或DataSource的子类且必须实现Service接口。
xxx: 其他属性与子节点通过Service.init方法传入的AnyValue获取。
-->
<source name="redis" type="org.redkalex.cache.RedisCacheSource" xxx="16">
<node addr="127.0.0.1" port="7070"/>
</source>
<!--
【节点全局唯一】
全局的参数配置, 可以通过@Resource(name="property.xxxxxx") 进行注入<property>的信息, 被注解的字段类型只能是String、primitive class
@@ -93,7 +106,7 @@
backlog: 默认10K
threads 线程总数, 默认: CPU核数*16
maxbody: request.body最大值 默认: 64K
bufferCapacity: ByteBuffer的初始化大小 默认: 8K; 如果是HTTP协议则默认: 16K + 8B (兼容HTTP 2.0)
bufferCapacity: ByteBuffer的初始化大小 默认: 8K; 如果是HTTP协议则默认: 16K + 16B (兼容HTTP 2.0、WebSocket)
bufferPoolSize ByteBuffer池的大小默认: CPU核数*512
responsePoolSize Response池的大小默认: CPU核数*256
readTimeoutSecond: 读操作超时秒数, 默认0 表示永久不超时

View File

@@ -36,8 +36,8 @@ import org.w3c.dom.*;
* <pre>
* 程序启动执行步骤:
* 1、读取application.xml
* 2、进行classpath扫描动态加载Service与Servlet
* 3、优先加载所有SNCP协议的服务再加载其他协议服务
* 2、进行classpath扫描动态加载Service、WebSocket与Servlet
* 3、优先加载所有SNCP协议的服务再加载其他协议服务 最后加载WATCH协议的服务
* 4、最后进行Service、Servlet与其他资源之间的依赖注入
* </pre>
* <p>
@@ -696,17 +696,19 @@ public final class Application {
});
for (DataSource source : dataSources) {
if (source == null) continue;
try {
source.getClass().getMethod("close").invoke(source);
} catch (Exception e) {
logger.log(Level.FINER, "close DataSource erroneous", e);
logger.log(Level.FINER, source.getClass() + " close DataSource erroneous", e);
}
}
for (CacheSource source : cacheSources) {
if (source == null) continue;
try {
source.getClass().getMethod("close").invoke(source);
} catch (Exception e) {
logger.log(Level.FINER, "close CacheSource erroneous", e);
logger.log(Level.FINER, source.getClass() + " close CacheSource erroneous", e);
}
}
if (this.transportChannelGroup != null) {

View File

@@ -184,6 +184,29 @@ public abstract class NodeServer {
final NodeServer self = this;
//---------------------------------------------------------------------------------------------
final ResourceFactory appResFactory = application.getResourceFactory();
final AnyValue resources = application.config.getAnyValue("resources");
final Map<String, AnyValue> cacheResource = new HashMap<>();
//final Map<String, AnyValue> dataResources = new HashMap<>();
if (resources != null) {
for (AnyValue sourceConf : resources.getAnyValues("source")) {
try {
Class type = Class.forName(sourceConf.getValue("type"));
if (!Service.class.isAssignableFrom(type)) {
logger.log(Level.SEVERE, "load application source resource, but not Service error: " + sourceConf);
} else if (CacheSource.class.isAssignableFrom(type)) {
cacheResource.put(sourceConf.getValue("name", ""), sourceConf);
} else if (DataSource.class.isAssignableFrom(type)) {
//dataResources.put(sourceConf.getValue("name", ""), sourceConf);
//暂时不支持DataSource通过<resources>设置
logger.log(Level.SEVERE, "load application source resource, but not CacheSource error: " + sourceConf);
} else {
logger.log(Level.SEVERE, "load application source resource, but not CacheSource error: " + sourceConf);
}
} catch (Exception e) {
logger.log(Level.SEVERE, "load application source resource error: " + sourceConf, e);
}
}
}
//------------------------------------- 注册Resource --------------------------------------------------------
resourceFactory.register((ResourceFactory rf, final Object src, String resourceName, Field field, final Object attachment) -> {
try {
@@ -193,7 +216,6 @@ public abstract class NodeServer {
Class type = field.getType();
if (type != AnyValue.class && type != AnyValue[].class) return;
Object resource = null;
final AnyValue resources = application.config.getAnyValue("resources");
final AnyValue properties = resources == null ? null : resources.getAnyValue("properties");
if (properties != null && type == AnyValue.class) {
resource = properties.getAnyValue(res.name().substring("properties.".length()));
@@ -251,7 +273,9 @@ public abstract class NodeServer {
Transport[] dts = Sncp.getDiffGroupTransports((Service) src);
List<Transport> diffGroupTransports = dts == null ? new ArrayList<>() : Arrays.asList(dts);
final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress();
final CacheMemorySource source = Sncp.createLocalService(resourceName, getExecutor(), appResFactory, CacheMemorySource.class, sncpAddr, Sncp.getSncpGroup(srcService), Sncp.getGroups(srcService), Sncp.getConf(srcService), sameGroupTransport, diffGroupTransports);
final AnyValue sourceConf = cacheResource.get(resourceName);
Class sourceType = sourceConf == null ? CacheMemorySource.class : Class.forName(sourceConf.getValue("type"));
final CacheMemorySource source = Sncp.createLocalService(resourceName, getExecutor(), appResFactory, sourceType, sncpAddr, Sncp.getSncpGroup(srcService), Sncp.getGroups(srcService), Sncp.getConf(srcService), sameGroupTransport, diffGroupTransports);
Type genericType = field.getGenericType();
ParameterizedType pt = (genericType instanceof ParameterizedType) ? (ParameterizedType) genericType : null;
Type valType = pt == null ? null : pt.getActualTypeArguments()[1];
@@ -262,7 +286,7 @@ public abstract class NodeServer {
appResFactory.register(resourceName, CacheSource.class, source);
field.set(src, source);
rf.inject(source, self); //
if (source instanceof Service) ((Service) source).init(null);
if (source instanceof Service) ((Service) source).init(sourceConf);
if ((src instanceof WebSocketNodeService) && sncpAddr != null) { //只有WebSocketNodeService的服务才需要给SNCP服务注入CacheMemorySource
NodeSncpServer sncpServer = application.findNodeSncpServer(sncpAddr);