增加SERVER_EXECUTOR线程池的资源注入

This commit is contained in:
Redkale
2018-01-12 09:14:10 +08:00
parent 21af487aab
commit 1cd5fe0b02
4 changed files with 32 additions and 7 deletions

View File

@@ -92,6 +92,11 @@ public final class Application {
*/
public static final String RESNAME_SERVER_ROOT = Server.RESNAME_SERVER_ROOT;
/**
* 当前Server的线程池
*/
public static final String RESNAME_SERVER_EXECUTOR = Server.RESNAME_SERVER_EXECUTOR;
//本地IP地址
final InetAddress localAddress;

View File

@@ -140,6 +140,10 @@ public abstract class NodeServer {
}
//必须要进行初始化, 构建Service时需要使用Context中的ExecutorService
server.init(this.serverConf);
//init之后才有Executor
resourceFactory.register(Server.RESNAME_SERVER_EXECUTOR, Executor.class, server.getExecutor());
resourceFactory.register(Server.RESNAME_SERVER_EXECUTOR, ExecutorService.class, server.getExecutor());
resourceFactory.register(Server.RESNAME_SERVER_EXECUTOR, ThreadPoolExecutor.class, server.getExecutor());
initResource(); //给 DataSource、CacheSource 注册依赖注入时的监听回调事件。
String interceptorClass = this.serverConf.getValue("interceptor", "");

View File

@@ -31,6 +31,8 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
public static final String RESNAME_SERVER_ROOT = "SERVER_ROOT";
public static final String RESNAME_SERVER_EXECUTOR = "SERVER_EXECUTOR";
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
//-------------------------------------------------------------
@@ -90,7 +92,7 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
//最大连接数
protected int maxconns;
protected Server(long serverStartTime, String protocol, PrepareServlet<K, C, R, P, S> servlet) {
this.serverStartTime = serverStartTime;
this.protocol = protocol;
@@ -146,6 +148,10 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
this.prepare.destroy(context, config);
}
public ThreadPoolExecutor getExecutor() {
return executor;
}
public InetSocketAddress getSocketAddress() {
return address;
}
@@ -192,7 +198,7 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
}
serverChannel.bind(address, backlog);
serverChannel.setMaxconns(this.maxconns);
serverChannel.accept();
serverChannel.accept();
final String threadName = "[" + Thread.currentThread().getName() + "] ";
logger.info(threadName + this.getClass().getSimpleName() + ("TCP".equalsIgnoreCase(protocol) ? "" : ("." + protocol)) + " listen: " + address
+ ", threads: " + threads + ", bufferCapacity: " + bufferCapacity + ", bufferPoolSize: " + bufferPoolSize + ", responsePoolSize: " + responsePoolSize

View File

@@ -6,7 +6,8 @@
package org.redkale.service;
import java.util.concurrent.*;
import org.redkale.net.WorkThread;
import javax.annotation.Resource;
import org.redkale.net.*;
/**
*
@@ -14,16 +15,25 @@ import org.redkale.net.WorkThread;
*/
public abstract class AbstractService implements Service {
//如果开启了SNCP此处线程池为SncpServer的线程池
@Resource(name = Server.RESNAME_SERVER_EXECUTOR)
private ExecutorService serverExecutor;
protected void runAsync(Runnable runner) {
Thread thread = Thread.currentThread();
if (thread instanceof WorkThread) {
((WorkThread) thread).runAsync(runner);
if (serverExecutor != null) {
serverExecutor.execute(runner);
} else {
ForkJoinPool.commonPool().execute(runner);
Thread thread = Thread.currentThread();
if (thread instanceof WorkThread) {
((WorkThread) thread).runAsync(runner);
} else {
ForkJoinPool.commonPool().execute(runner);
}
}
}
protected ExecutorService getExecutor() {
if (serverExecutor != null) return serverExecutor;
Thread thread = Thread.currentThread();
if (thread instanceof WorkThread) {
return ((WorkThread) thread).getExecutor();