This commit is contained in:
RedKale
2016-04-13 14:34:48 +08:00
parent 43ef1fb22f
commit 04098f976b
3 changed files with 54 additions and 11 deletions

View File

@@ -86,7 +86,8 @@
bufferPoolSize ByteBuffer池的大小默认: CPU核数*512
responsePoolSize Response池的大小默认: CPU核数*256
readTimeoutSecond: 读操作超时秒数, 默认0 表示永久不超时
writeTimeoutSecond: 写操作超时秒数, 默认0 表示永久不超时
writeTimeoutSecond: 写操作超时秒数, 默认0 表示永久不超时
nodeInterceptor: 启动/关闭NodeServer时被调用的拦截器实现类必须是org.redkale.boot.NodeInterceptor的子类默认为null
-->
<server protocol="HTTP" host="127.0.0.1" port="6060" root="root" lib="">

View File

@@ -0,0 +1,21 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.boot;
/**
*
* @author zhangjx
*/
public class NodeInterceptor {
public void preStart(NodeServer server) {
}
public void preShutdown(NodeServer server) {
}
}

View File

@@ -6,6 +6,7 @@
package org.redkale.boot;
import java.io.*;
import static java.lang.Class.forName;
import java.lang.annotation.Annotation;
import java.lang.reflect.*;
import java.net.InetSocketAddress;
@@ -65,6 +66,8 @@ public abstract class NodeServer {
protected AnyValue serverConf;
protected NodeInterceptor interceptor;
protected final Set<ServiceWrapper> localServiceWrappers = new LinkedHashSet<>();
protected final Set<ServiceWrapper> remoteServiceWrappers = new LinkedHashSet<>();
@@ -139,7 +142,11 @@ public abstract class NodeServer {
}
initResource(); //给 DataSource、CacheSource 注册依赖注入时的监听回调事件。
String interceptorClass = config.getValue("nodeInterceptor", "");
if (!interceptorClass.isEmpty()) {
Class clazz = forName(interceptorClass);
this.interceptor = (NodeInterceptor) clazz.newInstance();
}
ClassFilter<Servlet> servletFilter = createServletClassFilter();
ClassFilter<Service> serviceFilter = createServiceClassFilter();
long s = System.currentTimeMillis();
@@ -203,7 +210,7 @@ public abstract class NodeServer {
} catch (Exception e) {
logger.log(Level.SEVERE, "DataSource inject error", e);
}
},DataSource.class);
}, DataSource.class);
resourceFactory.register((ResourceFactory rf, final Object src, final String resourceName, Field field, final Object attachment) -> {
try {
if (field.getAnnotation(Resource.class) == null) return;
@@ -252,7 +259,7 @@ public abstract class NodeServer {
} catch (Exception e) {
logger.log(Level.SEVERE, "DataSource inject error", e);
}
},CacheSource.class);
}, CacheSource.class);
}
@SuppressWarnings("unchecked")
@@ -272,9 +279,9 @@ public abstract class NodeServer {
if (groups.isEmpty() && isSNCP() && this.sncpGroup != null) groups.add(this.sncpGroup);
final boolean localed = (this.sncpAddress == null && entry.isEmptyGroups() && !type.isInterface() && !Modifier.isAbstract(type.getModifiers())) //非SNCP的Server通常是单点服务
|| groups.contains(this.sncpGroup) //本地IP含在内的
|| (this.sncpGroup == null && entry.isEmptyGroups()) //空的SNCP配置
|| type.getAnnotation(LocalService.class) != null;//本地模式
|| groups.contains(this.sncpGroup) //本地IP含在内的
|| (this.sncpGroup == null && entry.isEmptyGroups()) //空的SNCP配置
|| type.getAnnotation(LocalService.class) != null;//本地模式
if (localed && (type.isInterface() || Modifier.isAbstract(type.getModifiers()))) continue; //本地模式不能实例化接口和抽象类的Service类
Service service;
@@ -283,7 +290,7 @@ public abstract class NodeServer {
} else {
service = Sncp.createRemoteService(entry.getName(), getExecutor(), type, this.sncpAddress, loadTransport(groups));
}
if(SncpClient.parseMethod(type).isEmpty()) continue; //class没有可用的方法 通常为BaseService
if (SncpClient.parseMethod(type).isEmpty()) continue; //class没有可用的方法 通常为BaseService
final ServiceWrapper wrapper = new ServiceWrapper(type, service, entry.getName(), localed ? this.sncpGroup : null, groups, entry.getProperty());
for (final Class restype : wrapper.getTypes()) {
if (resourceFactory.find(wrapper.getName(), restype) == null) {
@@ -360,7 +367,7 @@ public abstract class NodeServer {
transports.forEach(t -> addrs.addAll(Arrays.asList(t.getRemoteAddresses())));
Transport first = transports.get(0);
Transport newTransport = new Transport(groupid, application.findGroupProtocol(first.getName()), application.getWatchFactory(),
application.transportBufferPool, application.transportChannelGroup, this.sncpAddress, addrs);
application.transportBufferPool, application.transportChannelGroup, this.sncpAddress, addrs);
synchronized (application.resourceFactory) {
transport = application.resourceFactory.find(groupid, Transport.class);
if (transport == null) {
@@ -385,7 +392,7 @@ public abstract class NodeServer {
Set<InetSocketAddress> addrs = application.findGlobalGroup(group);
if (addrs == null) throw new RuntimeException("Not found <group> = " + group + " on <resources> ");
transport = new Transport(group, application.findGroupProtocol(group), application.getWatchFactory(),
application.transportBufferPool, application.transportChannelGroup, this.sncpAddress, addrs);
application.transportBufferPool, application.transportChannelGroup, this.sncpAddress, addrs);
application.resourceFactory.register(group, transport);
}
return transport;
@@ -398,7 +405,7 @@ public abstract class NodeServer {
}
protected ClassFilter createClassFilter(final String localGroup, Class<? extends Annotation> ref,
Class inter, Class<? extends Annotation> ref2, String properties, String property) {
Class inter, Class<? extends Annotation> ref2, String properties, String property) {
ClassFilter cf = new ClassFilter(ref, inter, null);
if (properties == null && properties == null) return cf;
if (this.serverConf == null) return cf;
@@ -473,10 +480,12 @@ public abstract class NodeServer {
}
public void start() throws IOException {
if (interceptor != null) interceptor.preStart(this);
server.start();
}
public void shutdown() throws IOException {
if (interceptor != null) interceptor.preShutdown(this);
final StringBuilder sb = logger.isLoggable(Level.INFO) ? new StringBuilder() : null;
localServiceWrappers.forEach(y -> {
long s = System.currentTimeMillis();
@@ -490,4 +499,16 @@ public abstract class NodeServer {
server.shutdown();
}
public <T extends Server> T getServer() {
return (T) server;
}
public Set<ServiceWrapper> getLocalServiceWrappers() {
return new LinkedHashSet<>(localServiceWrappers);
}
public Set<ServiceWrapper> getRemoteServiceWrappers() {
return new LinkedHashSet<>(remoteServiceWrappers);
}
}