From 23a0a80e3a84679362eebcb52b97c96bf632fd77 Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Thu, 28 Dec 2017 10:24:15 +0800 Subject: [PATCH] =?UTF-8?q?Transport=E5=A2=9E=E5=8A=A0readTimeoutSecond?= =?UTF-8?q?=E3=80=81writeTimeoutSecond=E9=85=8D=E7=BD=AE=E9=A1=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/META-INF/application-template.xml | 13 ++++--- src/module-info.java | 3 +- src/org/redkale/boot/Application.java | 6 +++- src/org/redkale/boot/NodeServer.java | 1 + src/org/redkale/net/AsyncConnection.java | 4 +-- src/org/redkale/net/ProtocolServer.java | 2 +- src/org/redkale/net/TransportFactory.java | 40 +++++++++++++++++----- src/org/redkale/net/http/HttpResponse.java | 25 +++++--------- src/org/redkale/net/http/HttpServlet.java | 3 +- 9 files changed, 62 insertions(+), 35 deletions(-) diff --git a/src/META-INF/application-template.xml b/src/META-INF/application-template.xml index 6fd279038..9f3662461 100644 --- a/src/META-INF/application-template.xml +++ b/src/META-INF/application-template.xml @@ -37,9 +37,11 @@ threads: 线程总数, 默认: 节点数*CPU核数*8 bufferCapacity: ByteBuffer的初始化大小, 默认: 8K; bufferPoolSize: ByteBuffer池的大小,默认: 节点数*CPU核数*8 + readTimeoutSecond: TCP读取超时秒数, 默认为6秒, 为0表示无超时限制 + writeTimeoutSecond: TCP写入超时秒数, 默认为6秒, 为0表示无超时限制 strategy: 远程请求的负载均衡策略, 必须是org.redkale.net.TransportStrategy的实现类 --> - + @@ -92,7 +94,8 @@ System.setProperty("convert.json.writer.buffer.defsize", "4096"); System.setProperty("convert.bson.writer.buffer.defsize", "4096"); - 节点下也可包含非节点,其节点可以通过@Resource(name="properties.xxxxxx")进行注入, 被注解的字段类型只能是AnyValue、AnyValue[] + 节点下也可包含非节点. + 非其节点可以通过@Resource(name="properties.xxxxxx")进行注入, 被注解的字段类型只能是AnyValue、AnyValue[] --> @@ -147,7 +150,7 @@ - + @@ -172,7 +175,7 @@ - + diff --git a/src/module-info.java b/src/module-info.java index 57d404dac..9e31830ad 100644 --- a/src/module-info.java +++ b/src/module-info.java @@ -7,10 +7,11 @@ module org.redkale { requires java.se; - requires jdk.unsupported; + requires jdk.unsupported; //sun.misc.Unsafe exports javax.annotation; exports javax.persistence; + exports org.redkale.asm; exports org.redkale.boot; exports org.redkale.boot.watch; exports org.redkale.convert; diff --git a/src/org/redkale/boot/Application.java b/src/org/redkale/boot/Application.java index b02fd0ce1..f850fd94d 100644 --- a/src/org/redkale/boot/Application.java +++ b/src/org/redkale/boot/Application.java @@ -245,6 +245,8 @@ public final class Application { TransportStrategy strategy = null; int bufferCapacity = 8 * 1024; int bufferPoolSize = Runtime.getRuntime().availableProcessors() * 16; + int readTimeoutSecond = TransportFactory.DEFAULT_READTIMEOUTSECOND; + int writeTimeoutSecond = TransportFactory.DEFAULT_WRITETIMEOUTSECOND; AtomicLong createBufferCounter = new AtomicLong(); AtomicLong cycleBufferCounter = new AtomicLong(); if (resources != null) { @@ -255,6 +257,8 @@ public final class Application { //--------------transportBufferPool----------- bufferCapacity = Math.max(parseLenth(transportConf.getValue("bufferCapacity"), bufferCapacity), 4 * 1024); bufferPoolSize = parseLenth(transportConf.getValue("bufferPoolSize"), groupsize * Runtime.getRuntime().availableProcessors() * 8); + readTimeoutSecond = transportConf.getIntValue("readTimeoutSecond", readTimeoutSecond); + writeTimeoutSecond = transportConf.getIntValue("writeTimeoutSecond", writeTimeoutSecond); final int threads = parseLenth(transportConf.getValue("threads"), groupsize * Runtime.getRuntime().availableProcessors() * 8); final int capacity = bufferCapacity; transportPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, bufferPoolSize, @@ -306,7 +310,7 @@ public final class Application { return true; }); } - this.sncpTransportFactory = TransportFactory.create(transportExec, transportPool, transportGroup, strategy); + this.sncpTransportFactory = TransportFactory.create(transportExec, transportPool, transportGroup, readTimeoutSecond, writeTimeoutSecond, strategy); DefaultAnyValue tarnsportConf = DefaultAnyValue.create(TransportFactory.NAME_PINGINTERVAL, System.getProperty("net.transport.pinginterval", "30")); this.sncpTransportFactory.init(tarnsportConf, Sncp.PING_BUFFER, Sncp.PONG_BUFFER.remaining()); Thread.currentThread().setContextClassLoader(this.classLoader); diff --git a/src/org/redkale/boot/NodeServer.java b/src/org/redkale/boot/NodeServer.java index 1323912cf..6f25f7441 100644 --- a/src/org/redkale/boot/NodeServer.java +++ b/src/org/redkale/boot/NodeServer.java @@ -179,6 +179,7 @@ public abstract class NodeServer { for (AnyValue sourceConf : resources.getAnyValues("source")) { try { Class type = serverClassLoader.loadClass(sourceConf.getValue("value")); + if (type == DataSource.class) type = DataJdbcSource.class; if (!Service.class.isAssignableFrom(type)) { logger.log(Level.SEVERE, "load application source resource, but not Service error: " + sourceConf); } else if (CacheSource.class.isAssignableFrom(type)) { diff --git a/src/org/redkale/net/AsyncConnection.java b/src/org/redkale/net/AsyncConnection.java index 623c65169..b2845073a 100644 --- a/src/org/redkale/net/AsyncConnection.java +++ b/src/org/redkale/net/AsyncConnection.java @@ -31,10 +31,10 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl protected volatile long writetime; //关闭数 - AtomicLong closedCounter; + protected AtomicLong closedCounter; //在线数 - AtomicLong livingCounter; + protected AtomicLong livingCounter; public final long getLastReadTime() { return readtime; diff --git a/src/org/redkale/net/ProtocolServer.java b/src/org/redkale/net/ProtocolServer.java index b48e08aa9..b166f0a66 100644 --- a/src/org/redkale/net/ProtocolServer.java +++ b/src/org/redkale/net/ProtocolServer.java @@ -217,7 +217,7 @@ public abstract class ProtocolServer { AsyncConnection conn = AsyncConnection.create(channel, null, context.readTimeoutSecond, context.writeTimeoutSecond); conn.livingCounter = livingCounter; conn.closedCounter = closedCounter; - context.submitAsync(new PrepareRunner(context, conn, null)); + context.runAsync(new PrepareRunner(context, conn, null)); } @Override diff --git a/src/org/redkale/net/TransportFactory.java b/src/org/redkale/net/TransportFactory.java index 0eb9667fd..d69991e02 100644 --- a/src/org/redkale/net/TransportFactory.java +++ b/src/org/redkale/net/TransportFactory.java @@ -29,6 +29,12 @@ import org.redkale.util.*; */ public class TransportFactory { + @Comment("默认TCP读取超时秒数") + public static int DEFAULT_READTIMEOUTSECOND = 6; + + @Comment("默认TCP写入超时秒数") + public static int DEFAULT_WRITETIMEOUTSECOND = 6; + public static final String NAME_PINGINTERVAL = "pinginterval"; protected static final Logger logger = Logger.getLogger(TransportFactory.class.getSimpleName()); @@ -55,6 +61,12 @@ public class TransportFactory { //心跳周期, 单位:秒 protected int pinginterval; + //TCP读取超时秒数 + protected int readTimeoutSecond; + + //TCP写入超时秒数 + protected int writeTimeoutSecond; + //ping的定时器 private ScheduledThreadPoolExecutor pingScheduler; @@ -68,15 +80,18 @@ public class TransportFactory { protected final TransportStrategy strategy; protected TransportFactory(ExecutorService executor, ObjectPool bufferPool, AsynchronousChannelGroup channelGroup, - final TransportStrategy strategy) { + int readTimeoutSecond, int writeTimeoutSecond, final TransportStrategy strategy) { this.executor = executor; this.bufferPool = bufferPool; this.channelGroup = channelGroup; + this.readTimeoutSecond = readTimeoutSecond; + this.writeTimeoutSecond = writeTimeoutSecond; this.strategy = strategy; } - protected TransportFactory(ExecutorService executor, ObjectPool bufferPool, AsynchronousChannelGroup channelGroup) { - this(executor, bufferPool, channelGroup, null); + protected TransportFactory(ExecutorService executor, ObjectPool bufferPool, AsynchronousChannelGroup channelGroup, + int readTimeoutSecond, int writeTimeoutSecond) { + this(executor, bufferPool, channelGroup, readTimeoutSecond, writeTimeoutSecond, null); } public void init(AnyValue conf, ByteBuffer pingBuffer, int pongLength) { @@ -100,10 +115,14 @@ public class TransportFactory { } public static TransportFactory create(int threads) { - return create(threads, threads * 2, 8 * 1024); + return create(threads, threads * 2, 8 * 1024, DEFAULT_READTIMEOUTSECOND, DEFAULT_WRITETIMEOUTSECOND); } public static TransportFactory create(int threads, int bufferPoolSize, int bufferCapacity) { + return create(threads, bufferPoolSize, bufferCapacity, DEFAULT_READTIMEOUTSECOND, DEFAULT_WRITETIMEOUTSECOND); + } + + public static TransportFactory create(int threads, int bufferPoolSize, int bufferCapacity, int readTimeoutSecond, int writeTimeoutSecond) { final ObjectPool transportPool = new ObjectPool<>(new AtomicLong(), new AtomicLong(), bufferPoolSize, (Object... params) -> ByteBuffer.allocateDirect(bufferCapacity), null, (e) -> { if (e == null || e.isReadOnly() || e.capacity() != bufferCapacity) return false; @@ -123,16 +142,21 @@ public class TransportFactory { } catch (IOException e) { throw new RuntimeException(e); } - return create(transportExec, transportPool, transportGroup); + return create(transportExec, transportPool, transportGroup, readTimeoutSecond, writeTimeoutSecond); } public static TransportFactory create(ExecutorService executor, ObjectPool bufferPool, AsynchronousChannelGroup channelGroup) { - return new TransportFactory(executor, bufferPool, channelGroup, null); + return new TransportFactory(executor, bufferPool, channelGroup, DEFAULT_READTIMEOUTSECOND, DEFAULT_WRITETIMEOUTSECOND, null); } public static TransportFactory create(ExecutorService executor, ObjectPool bufferPool, AsynchronousChannelGroup channelGroup, - final TransportStrategy strategy) { - return new TransportFactory(executor, bufferPool, channelGroup, strategy); + int readTimeoutSecond, int writeTimeoutSecond) { + return new TransportFactory(executor, bufferPool, channelGroup, readTimeoutSecond, writeTimeoutSecond, null); + } + + public static TransportFactory create(ExecutorService executor, ObjectPool bufferPool, AsynchronousChannelGroup channelGroup, + int readTimeoutSecond, int writeTimeoutSecond, final TransportStrategy strategy) { + return new TransportFactory(executor, bufferPool, channelGroup, readTimeoutSecond, writeTimeoutSecond, strategy); } public Transport createTransportTCP(String name, final InetSocketAddress clientAddress, final Collection addresses) { diff --git a/src/org/redkale/net/http/HttpResponse.java b/src/org/redkale/net/http/HttpResponse.java index 262645495..bf918a452 100644 --- a/src/org/redkale/net/http/HttpResponse.java +++ b/src/org/redkale/net/http/HttpResponse.java @@ -15,6 +15,7 @@ import java.text.*; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiFunction; import java.util.logging.Level; import org.redkale.convert.*; import org.redkale.convert.json.JsonConvert; @@ -34,15 +35,6 @@ import org.redkale.util.*; */ public class HttpResponse extends Response { - /** - * HttpResponse.finish 方法内调用 - * 主要给@HttpCacheable使用 - */ - protected static interface BufferHandler { - - public ByteBuffer[] execute(final HttpResponse response, final ByteBuffer[] buffers); - } - private static final ByteBuffer buffer304 = ByteBuffer.wrap("HTTP/1.1 304 Not Modified\r\nContent-Length:0\r\n\r\n".getBytes()).asReadOnlyBuffer(); private static final ByteBuffer buffer404 = ByteBuffer.wrap("HTTP/1.1 404 Not Found\r\nContent-Length:0\r\n\r\n".getBytes()).asReadOnlyBuffer(); @@ -117,7 +109,7 @@ public class HttpResponse extends Response { private boolean headsended = false; - private BufferHandler bufferHandler; + private BiFunction bufferHandler; //------------------------------------------------ private final DefaultAnyValue header = new DefaultAnyValue(); @@ -498,7 +490,7 @@ public class HttpResponse extends Response { } if (context.getCharset() == null) { if (bufferHandler != null) { - bufferHandler.execute(this, new ByteBuffer[]{ByteBuffer.wrap(Utility.encodeUTF8(obj))}); + bufferHandler.apply(this, new ByteBuffer[]{ByteBuffer.wrap(Utility.encodeUTF8(obj))}); } final char[] chars = Utility.charArray(obj); this.contentLength = Utility.encodeUTF8Length(chars); @@ -513,7 +505,7 @@ public class HttpResponse extends Response { } else { ByteBuffer buffer = context.getCharset().encode(obj); if (bufferHandler != null) { - ByteBuffer[] bufs = bufferHandler.execute(this, new ByteBuffer[]{buffer}); + ByteBuffer[] bufs = bufferHandler.apply(this, new ByteBuffer[]{buffer}); if (bufs != null) buffer = bufs[0]; } this.contentLength = buffer.remaining(); @@ -621,7 +613,7 @@ public class HttpResponse extends Response { public void finish(boolean kill, ByteBuffer... buffers) { if (isClosed()) return; //避免重复关闭 if (bufferHandler != null) { - ByteBuffer[] bufs = bufferHandler.execute(this, buffers); + ByteBuffer[] bufs = bufferHandler.apply(this, buffers); if (bufs != null) buffers = bufs; } if (kill) refuseAlive(); @@ -801,7 +793,8 @@ public class HttpResponse extends Response { this.channel.write(hbuffer, hbuffer, new TransferFileHandler(file, offset, length)); } - private ByteBuffer createHeader() { + //Header大小不能超过一个ByteBuffer的容量 + protected ByteBuffer createHeader() { this.headsended = true; ByteBuffer buffer = this.context.pollBuffer(); buffer.put(("HTTP/1.1 " + this.status + " " + (this.status == 200 ? "OK" : httpCodes.get(this.status)) + "\r\n").getBytes()); @@ -1015,7 +1008,7 @@ public class HttpResponse extends Response { * * @return 拦截器 */ - protected BufferHandler getBufferHandler() { + protected BiFunction getBufferHandler() { return bufferHandler; } @@ -1024,7 +1017,7 @@ public class HttpResponse extends Response { * * @param bufferHandler 拦截器 */ - protected void setBufferHandler(BufferHandler bufferHandler) { + protected void setBufferHandler(BiFunction bufferHandler) { this.bufferHandler = bufferHandler; } diff --git a/src/org/redkale/net/http/HttpServlet.java b/src/org/redkale/net/http/HttpServlet.java index 10f76ef95..55748ea09 100644 --- a/src/org/redkale/net/http/HttpServlet.java +++ b/src/org/redkale/net/http/HttpServlet.java @@ -10,6 +10,7 @@ import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BiFunction; import jdk.internal.org.objectweb.asm.*; import static jdk.internal.org.objectweb.asm.ClassWriter.COMPUTE_FRAMES; import static jdk.internal.org.objectweb.asm.Opcodes.*; @@ -315,7 +316,7 @@ public class HttpServlet extends Servlet return false; } - public final HttpResponse.BufferHandler cacheHandler; + public final BiFunction cacheHandler; public final ConcurrentHashMap cache;