From 826aaf01282be4f691337ddb7105041dd7ba95f8 Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Tue, 17 Apr 2018 13:46:30 +0800 Subject: [PATCH] --- src/META-INF/application-template.xml | 1 + src/org/redkale/net/AsyncConnection.java | 30 +++++++++++++++++++++++ src/org/redkale/net/Context.java | 10 +++++++- src/org/redkale/net/PrepareRunner.java | 10 +++++--- src/org/redkale/net/Server.java | 4 +++ src/org/redkale/net/http/HttpContext.java | 4 +-- src/org/redkale/net/http/HttpRequest.java | 4 ++- src/org/redkale/net/http/HttpServer.java | 5 ++-- src/org/redkale/net/sncp/SncpContext.java | 4 +-- src/org/redkale/net/sncp/SncpServer.java | 5 ++-- 10 files changed, 64 insertions(+), 13 deletions(-) diff --git a/src/META-INF/application-template.xml b/src/META-INF/application-template.xml index 7bd2e46e1..5e12a1882 100644 --- a/src/META-INF/application-template.xml +++ b/src/META-INF/application-template.xml @@ -122,6 +122,7 @@ bufferCapacity: ByteBuffer的初始化大小, 默认: 32K; (HTTP 2.0、WebSocket,必须要16k以上) bufferPoolSize: ByteBuffer池的大小,默认: 线程总数*4 responsePoolSize: Response池的大小,默认: 线程总数*2 + aliveTimeoutSecond: KeepAlive读操作超时秒数, 默认0, 表示永久不超时,-1表示禁止KeepAlive readTimeoutSecond: 读操作超时秒数, 默认0, 表示永久不超时 writeTimeoutSecond: 写操作超时秒数, 默认0, 表示永久不超时 interceptor: 启动/关闭NodeServer时被调用的拦截器实现类,必须是org.redkale.boot.NodeInterceptor的子类,默认为null diff --git a/src/org/redkale/net/AsyncConnection.java b/src/org/redkale/net/AsyncConnection.java index 45ccde03c..64764b19a 100644 --- a/src/org/redkale/net/AsyncConnection.java +++ b/src/org/redkale/net/AsyncConnection.java @@ -61,6 +61,20 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl public abstract void setWriteTimeoutSecond(int writeTimeoutSecond); + @Override + public abstract Future read(ByteBuffer dst); + + @Override + public abstract void read(ByteBuffer dst, A attachment, CompletionHandler handler); + + public abstract void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler handler); + + @Override + public abstract Future write(ByteBuffer src); + + @Override + public abstract void write(ByteBuffer src, A attachment, CompletionHandler handler); + public final void write(ByteBuffer[] srcs, A attachment, CompletionHandler handler) { write(srcs, 0, srcs.length, attachment, handler); } @@ -277,6 +291,11 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl } } + @Override + public void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { + read(dst, attachment, handler); + } + @Override public Future read(ByteBuffer dst) { try { @@ -432,6 +451,11 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl } } + @Override + public void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { + read(dst, attachment, handler); + } + @Override public Future read(ByteBuffer dst) { try { @@ -529,6 +553,12 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl } } + @Override + public void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { + this.readtime = System.currentTimeMillis(); + channel.read(dst, timeout < 0 ? 0 : timeout, unit, attachment, handler); + } + @Override public void write(ByteBuffer src, A attachment, CompletionHandler handler) { this.writetime = System.currentTimeMillis(); diff --git a/src/org/redkale/net/Context.java b/src/org/redkale/net/Context.java index cb3950562..ad0323b1b 100644 --- a/src/org/redkale/net/Context.java +++ b/src/org/redkale/net/Context.java @@ -58,6 +58,9 @@ public class Context { //请求内容的大小上限, 默认64K protected final int maxbody; + //keep alive IO读取的超时时间 + protected final int aliveTimeoutSecond; + //IO读取的超时时间 protected final int readTimeoutSecond; @@ -79,7 +82,7 @@ public class Context { public Context(long serverStartTime, Logger logger, ThreadPoolExecutor executor, SSLContext sslContext, int bufferCapacity, ObjectPool bufferPool, ObjectPool responsePool, final int maxbody, Charset charset, InetSocketAddress address, ResourceFactory resourceFactory, - final PrepareServlet prepare, final int readTimeoutSecond, final int writeTimeoutSecond) { + final PrepareServlet prepare, final int aliveTimeoutSecond, final int readTimeoutSecond, final int writeTimeoutSecond) { this.serverStartTime = serverStartTime; this.logger = logger; this.executor = executor; @@ -92,6 +95,7 @@ public class Context { this.address = address; this.prepare = prepare; this.resourceFactory = resourceFactory; + this.aliveTimeoutSecond = aliveTimeoutSecond; this.readTimeoutSecond = readTimeoutSecond; this.writeTimeoutSecond = writeTimeoutSecond; this.jsonFactory = JsonFactory.root(); @@ -165,6 +169,10 @@ public class Context { return logger; } + public int getAliveTimeoutSecond() { + return aliveTimeoutSecond; + } + public int getReadTimeoutSecond() { return readTimeoutSecond; } diff --git a/src/org/redkale/net/PrepareRunner.java b/src/org/redkale/net/PrepareRunner.java index 04d58d1f9..5fba9b307 100644 --- a/src/org/redkale/net/PrepareRunner.java +++ b/src/org/redkale/net/PrepareRunner.java @@ -7,6 +7,7 @@ package org.redkale.net; import java.nio.*; import java.nio.channels.*; +import java.util.concurrent.TimeUnit; import java.util.logging.*; import org.redkale.util.*; @@ -38,6 +39,7 @@ public final class PrepareRunner implements Runnable { @Override public void run() { + final boolean keepalive = response != null; final PrepareServlet prepare = context.prepare; final ObjectPool responsePool = context.responsePool; if (data != null) { //BIO模式的UDP连接创建AsyncConnection时已经获取到ByteBuffer数据了 @@ -54,16 +56,18 @@ public final class PrepareRunner implements Runnable { if (response == null) response = responsePool.get(); final ByteBuffer buffer = response.request.pollReadBuffer(); try { - channel.read(buffer, null, new CompletionHandler() { + channel.read(buffer, keepalive ? context.getAliveTimeoutSecond() : 0, TimeUnit.SECONDS, null, + new CompletionHandler() { @Override public void completed(Integer count, Void attachment1) { if (count < 1 && buffer.remaining() == buffer.limit()) { try { response.request.offerReadBuffer(buffer); response.finish(true); - channel.close(); } catch (Exception e) { - context.logger.log(Level.FINEST, "PrepareRunner close channel erroneous on no read bytes", e); + if (context.logger.isLoggable(Level.FINEST)) { + context.logger.log(Level.FINEST, "PrepareRunner close channel erroneous on no read bytes", e); + } } return; } diff --git a/src/org/redkale/net/Server.java b/src/org/redkale/net/Server.java index b84f0277c..d1c2cbeb2 100644 --- a/src/org/redkale/net/Server.java +++ b/src/org/redkale/net/Server.java @@ -91,6 +91,9 @@ public abstract class Server bufferPool, ObjectPool responsePool, int maxbody, Charset charset, InetSocketAddress address, ResourceFactory resourceFactory, - PrepareServlet prepare, int readTimeoutSecond, int writeTimeoutSecond) { + PrepareServlet prepare, int aliveTimeoutSecond, int readTimeoutSecond, int writeTimeoutSecond) { super(serverStartTime, logger, executor, sslContext, bufferCapacity, bufferPool, responsePool, - maxbody, charset, address, resourceFactory, prepare, readTimeoutSecond, writeTimeoutSecond); + maxbody, charset, address, resourceFactory, prepare, aliveTimeoutSecond, readTimeoutSecond, writeTimeoutSecond); random.setSeed(Math.abs(System.nanoTime())); } diff --git a/src/org/redkale/net/http/HttpRequest.java b/src/org/redkale/net/http/HttpRequest.java index 855720c5c..86e89973b 100644 --- a/src/org/redkale/net/http/HttpRequest.java +++ b/src/org/redkale/net/http/HttpRequest.java @@ -154,7 +154,9 @@ public class HttpRequest extends Request { case "Connection": case "connection": this.connection = value; - this.setKeepAlive(!"close".equalsIgnoreCase(value)); + if (context.getAliveTimeoutSecond() >= 0) { + this.setKeepAlive(!"close".equalsIgnoreCase(value)); + } break; case "user-agent": header.addValue("User-Agent", value); diff --git a/src/org/redkale/net/http/HttpServer.java b/src/org/redkale/net/http/HttpServer.java index 321ec304a..1b0c0c1aa 100644 --- a/src/org/redkale/net/http/HttpServer.java +++ b/src/org/redkale/net/http/HttpServer.java @@ -380,8 +380,9 @@ public class HttpServer extends Server responsePool = HttpResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null); - HttpContext httpcontext = new HttpContext(this.serverStartTime, this.logger, executor, this.sslContext, rcapacity, bufferPool, responsePool, - this.maxbody, this.charset, this.address, this.resourceFactory, this.prepare, this.readTimeoutSecond, this.writeTimeoutSecond); + HttpContext httpcontext = new HttpContext(this.serverStartTime, this.logger, executor, this.sslContext, + rcapacity, bufferPool, responsePool, this.maxbody, this.charset, this.address, this.resourceFactory, + this.prepare, this.aliveTimeoutSecond, this.readTimeoutSecond, this.writeTimeoutSecond); responsePool.setCreator((Object... params) -> new HttpResponse(httpcontext, new HttpRequest(httpcontext, addrHeader), plainType, jsonType, addHeaders, setHeaders, defCookie, options, ((HttpPrepareServlet) prepare).renders)); return httpcontext; diff --git a/src/org/redkale/net/sncp/SncpContext.java b/src/org/redkale/net/sncp/SncpContext.java index 0961baa31..2e5697199 100644 --- a/src/org/redkale/net/sncp/SncpContext.java +++ b/src/org/redkale/net/sncp/SncpContext.java @@ -25,8 +25,8 @@ public class SncpContext extends Context { public SncpContext(long serverStartTime, Logger logger, ThreadPoolExecutor executor, SSLContext sslContext, int bufferCapacity, ObjectPool bufferPool, ObjectPool responsePool, int maxbody, Charset charset, InetSocketAddress address, ResourceFactory resourceFactory, - PrepareServlet prepare, int readTimeoutSecond, int writeTimeoutSecond) { + PrepareServlet prepare, int aliveTimeoutSecond, int readTimeoutSecond, int writeTimeoutSecond) { super(serverStartTime, logger, executor, sslContext, bufferCapacity, bufferPool, responsePool, - maxbody, charset, address, resourceFactory, prepare, readTimeoutSecond, writeTimeoutSecond); + maxbody, charset, address, resourceFactory, prepare, aliveTimeoutSecond, readTimeoutSecond, writeTimeoutSecond); } } diff --git a/src/org/redkale/net/sncp/SncpServer.java b/src/org/redkale/net/sncp/SncpServer.java index 0156e4a33..8cbb571b2 100644 --- a/src/org/redkale/net/sncp/SncpServer.java +++ b/src/org/redkale/net/sncp/SncpServer.java @@ -106,8 +106,9 @@ public class SncpServer extends Server responsePool = SncpResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null); - SncpContext sncpcontext = new SncpContext(this.serverStartTime, this.logger, executor, this.sslContext, rcapacity, bufferPool, responsePool, - this.maxbody, this.charset, this.address, this.resourceFactory, this.prepare, this.readTimeoutSecond, this.writeTimeoutSecond); + SncpContext sncpcontext = new SncpContext(this.serverStartTime, this.logger, executor, this.sslContext, + rcapacity, bufferPool, responsePool, this.maxbody, this.charset, this.address, this.resourceFactory, + this.prepare, this.aliveTimeoutSecond, this.readTimeoutSecond, this.writeTimeoutSecond); responsePool.setCreator((Object... params) -> new SncpResponse(sncpcontext, new SncpRequest(sncpcontext))); return sncpcontext; }