This commit is contained in:
Redkale
2018-04-17 13:46:30 +08:00
parent 754861e036
commit 826aaf0128
10 changed files with 64 additions and 13 deletions

View File

@@ -122,6 +122,7 @@
bufferCapacity: ByteBuffer的初始化大小 默认: 32K; (HTTP 2.0、WebSocket必须要16k以上)
bufferPoolSize ByteBuffer池的大小默认: 线程总数*4
responsePoolSize Response池的大小默认: 线程总数*2
aliveTimeoutSecond: KeepAlive读操作超时秒数 默认0 表示永久不超时,-1表示禁止KeepAlive
readTimeoutSecond: 读操作超时秒数, 默认0 表示永久不超时
writeTimeoutSecond: 写操作超时秒数, 默认0 表示永久不超时
interceptor: 启动/关闭NodeServer时被调用的拦截器实现类必须是org.redkale.boot.NodeInterceptor的子类默认为null

View File

@@ -61,6 +61,20 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
public abstract void setWriteTimeoutSecond(int writeTimeoutSecond);
@Override
public abstract Future<Integer> read(ByteBuffer dst);
@Override
public abstract <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler);
public abstract <A> void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler<Integer, ? super A> handler);
@Override
public abstract Future<Integer> write(ByteBuffer src);
@Override
public abstract <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler);
public final <A> void write(ByteBuffer[] srcs, A attachment, CompletionHandler<Integer, ? super A> handler) {
write(srcs, 0, srcs.length, attachment, handler);
}
@@ -277,6 +291,11 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
}
}
@Override
public <A> void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler<Integer, ? super A> handler) {
read(dst, attachment, handler);
}
@Override
public Future<Integer> read(ByteBuffer dst) {
try {
@@ -432,6 +451,11 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
}
}
@Override
public <A> void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler<Integer, ? super A> handler) {
read(dst, attachment, handler);
}
@Override
public Future<Integer> read(ByteBuffer dst) {
try {
@@ -529,6 +553,12 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
}
}
@Override
public <A> void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler<Integer, ? super A> handler) {
this.readtime = System.currentTimeMillis();
channel.read(dst, timeout < 0 ? 0 : timeout, unit, attachment, handler);
}
@Override
public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
this.writetime = System.currentTimeMillis();

View File

@@ -58,6 +58,9 @@ public class Context {
//请求内容的大小上限, 默认64K
protected final int maxbody;
//keep alive IO读取的超时时间
protected final int aliveTimeoutSecond;
//IO读取的超时时间
protected final int readTimeoutSecond;
@@ -79,7 +82,7 @@ public class Context {
public Context(long serverStartTime, Logger logger, ThreadPoolExecutor executor, SSLContext sslContext,
int bufferCapacity, ObjectPool<ByteBuffer> bufferPool, ObjectPool<Response> responsePool,
final int maxbody, Charset charset, InetSocketAddress address, ResourceFactory resourceFactory,
final PrepareServlet prepare, final int readTimeoutSecond, final int writeTimeoutSecond) {
final PrepareServlet prepare, final int aliveTimeoutSecond, final int readTimeoutSecond, final int writeTimeoutSecond) {
this.serverStartTime = serverStartTime;
this.logger = logger;
this.executor = executor;
@@ -92,6 +95,7 @@ public class Context {
this.address = address;
this.prepare = prepare;
this.resourceFactory = resourceFactory;
this.aliveTimeoutSecond = aliveTimeoutSecond;
this.readTimeoutSecond = readTimeoutSecond;
this.writeTimeoutSecond = writeTimeoutSecond;
this.jsonFactory = JsonFactory.root();
@@ -165,6 +169,10 @@ public class Context {
return logger;
}
public int getAliveTimeoutSecond() {
return aliveTimeoutSecond;
}
public int getReadTimeoutSecond() {
return readTimeoutSecond;
}

View File

@@ -7,6 +7,7 @@ package org.redkale.net;
import java.nio.*;
import java.nio.channels.*;
import java.util.concurrent.TimeUnit;
import java.util.logging.*;
import org.redkale.util.*;
@@ -38,6 +39,7 @@ public final class PrepareRunner implements Runnable {
@Override
public void run() {
final boolean keepalive = response != null;
final PrepareServlet prepare = context.prepare;
final ObjectPool<? extends Response> responsePool = context.responsePool;
if (data != null) { //BIO模式的UDP连接创建AsyncConnection时已经获取到ByteBuffer数据了
@@ -54,16 +56,18 @@ public final class PrepareRunner implements Runnable {
if (response == null) response = responsePool.get();
final ByteBuffer buffer = response.request.pollReadBuffer();
try {
channel.read(buffer, null, new CompletionHandler<Integer, Void>() {
channel.read(buffer, keepalive ? context.getAliveTimeoutSecond() : 0, TimeUnit.SECONDS, null,
new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer count, Void attachment1) {
if (count < 1 && buffer.remaining() == buffer.limit()) {
try {
response.request.offerReadBuffer(buffer);
response.finish(true);
channel.close();
} catch (Exception e) {
context.logger.log(Level.FINEST, "PrepareRunner close channel erroneous on no read bytes", e);
if (context.logger.isLoggable(Level.FINEST)) {
context.logger.log(Level.FINEST, "PrepareRunner close channel erroneous on no read bytes", e);
}
}
return;
}

View File

@@ -91,6 +91,9 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
//请求包大小的上限,单位:字节
protected int maxbody;
//Keep-Alive IO读取的超时秒数小于1视为不设置
protected int aliveTimeoutSecond;
//IO读取的超时秒数小于1视为不设置
protected int readTimeoutSecond;
@@ -113,6 +116,7 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
this.address = new InetSocketAddress(config.getValue("host", "0.0.0.0"), config.getIntValue("port", 80));
this.charset = Charset.forName(config.getValue("charset", "UTF-8"));
this.maxconns = config.getIntValue("maxconns", 0);
this.aliveTimeoutSecond = config.getIntValue("aliveTimeoutSecond", 0);
this.readTimeoutSecond = config.getIntValue("readTimeoutSecond", 0);
this.writeTimeoutSecond = config.getIntValue("writeTimeoutSecond", 0);
this.backlog = parseLenth(config.getValue("backlog"), 16 * 1024);

View File

@@ -36,9 +36,9 @@ public class HttpContext extends Context {
public HttpContext(long serverStartTime, Logger logger, ThreadPoolExecutor executor, SSLContext sslContext,
final int bufferCapacity, final ObjectPool<ByteBuffer> bufferPool, ObjectPool<Response> responsePool,
int maxbody, Charset charset, InetSocketAddress address, ResourceFactory resourceFactory,
PrepareServlet prepare, int readTimeoutSecond, int writeTimeoutSecond) {
PrepareServlet prepare, int aliveTimeoutSecond, int readTimeoutSecond, int writeTimeoutSecond) {
super(serverStartTime, logger, executor, sslContext, bufferCapacity, bufferPool, responsePool,
maxbody, charset, address, resourceFactory, prepare, readTimeoutSecond, writeTimeoutSecond);
maxbody, charset, address, resourceFactory, prepare, aliveTimeoutSecond, readTimeoutSecond, writeTimeoutSecond);
random.setSeed(Math.abs(System.nanoTime()));
}

View File

@@ -154,7 +154,9 @@ public class HttpRequest extends Request<HttpContext> {
case "Connection":
case "connection":
this.connection = value;
this.setKeepAlive(!"close".equalsIgnoreCase(value));
if (context.getAliveTimeoutSecond() >= 0) {
this.setKeepAlive(!"close".equalsIgnoreCase(value));
}
break;
case "user-agent":
header.addValue("User-Agent", value);

View File

@@ -380,8 +380,9 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
AtomicLong createResponseCounter = new AtomicLong();
AtomicLong cycleResponseCounter = new AtomicLong();
ObjectPool<Response> responsePool = HttpResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null);
HttpContext httpcontext = new HttpContext(this.serverStartTime, this.logger, executor, this.sslContext, rcapacity, bufferPool, responsePool,
this.maxbody, this.charset, this.address, this.resourceFactory, this.prepare, this.readTimeoutSecond, this.writeTimeoutSecond);
HttpContext httpcontext = new HttpContext(this.serverStartTime, this.logger, executor, this.sslContext,
rcapacity, bufferPool, responsePool, this.maxbody, this.charset, this.address, this.resourceFactory,
this.prepare, this.aliveTimeoutSecond, this.readTimeoutSecond, this.writeTimeoutSecond);
responsePool.setCreator((Object... params) -> new HttpResponse(httpcontext, new HttpRequest(httpcontext, addrHeader),
plainType, jsonType, addHeaders, setHeaders, defCookie, options, ((HttpPrepareServlet) prepare).renders));
return httpcontext;

View File

@@ -25,8 +25,8 @@ public class SncpContext extends Context {
public SncpContext(long serverStartTime, Logger logger, ThreadPoolExecutor executor, SSLContext sslContext,
int bufferCapacity, ObjectPool<ByteBuffer> bufferPool, ObjectPool<Response> responsePool,
int maxbody, Charset charset, InetSocketAddress address, ResourceFactory resourceFactory,
PrepareServlet prepare, int readTimeoutSecond, int writeTimeoutSecond) {
PrepareServlet prepare, int aliveTimeoutSecond, int readTimeoutSecond, int writeTimeoutSecond) {
super(serverStartTime, logger, executor, sslContext, bufferCapacity, bufferPool, responsePool,
maxbody, charset, address, resourceFactory, prepare, readTimeoutSecond, writeTimeoutSecond);
maxbody, charset, address, resourceFactory, prepare, aliveTimeoutSecond, readTimeoutSecond, writeTimeoutSecond);
}
}

View File

@@ -106,8 +106,9 @@ public class SncpServer extends Server<DLong, SncpContext, SncpRequest, SncpResp
AtomicLong createResponseCounter = new AtomicLong();
AtomicLong cycleResponseCounter = new AtomicLong();
ObjectPool<Response> responsePool = SncpResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null);
SncpContext sncpcontext = new SncpContext(this.serverStartTime, this.logger, executor, this.sslContext, rcapacity, bufferPool, responsePool,
this.maxbody, this.charset, this.address, this.resourceFactory, this.prepare, this.readTimeoutSecond, this.writeTimeoutSecond);
SncpContext sncpcontext = new SncpContext(this.serverStartTime, this.logger, executor, this.sslContext,
rcapacity, bufferPool, responsePool, this.maxbody, this.charset, this.address, this.resourceFactory,
this.prepare, this.aliveTimeoutSecond, this.readTimeoutSecond, this.writeTimeoutSecond);
responsePool.setCreator((Object... params) -> new SncpResponse(sncpcontext, new SncpRequest(sncpcontext)));
return sncpcontext;
}