Transport增加readTimeoutSecond、writeTimeoutSecond配置项

This commit is contained in:
Redkale
2017-12-28 10:24:15 +08:00
parent bb493d43b5
commit 23a0a80e3a
9 changed files with 62 additions and 35 deletions

View File

@@ -37,9 +37,11 @@
threads 线程总数, 默认: <group>节点数*CPU核数*8
bufferCapacity: ByteBuffer的初始化大小 默认: 8K;
bufferPoolSize ByteBuffer池的大小默认: <group>节点数*CPU核数*8
readTimeoutSecond: TCP读取超时秒数, 默认为6秒 为0表示无超时限制
writeTimeoutSecond: TCP写入超时秒数, 默认为6秒 为0表示无超时限制
strategy: 远程请求的负载均衡策略, 必须是org.redkale.net.TransportStrategy的实现类
-->
<transport bufferCapacity="8K" bufferPoolSize="32" threads="32"/>
<transport bufferCapacity="8K" bufferPoolSize="32" threads="32" readTimeoutSecond="6" writeTimeoutSecond="6"/>
<!--
一个组包含多个node 同一Service服务可以由多个进程提供这些进程称为一个GROUP且同一GROUP内的进程必须在同一机房或局域网内
@@ -64,7 +66,7 @@
<!--
全局的数据源设置, 可以是CacheSource、DataSource JDBC的DataSource通常通过persistence.xml配置此处多用于CacheSource的配置
name: 资源名,用于依赖注入。
value类名必须是CacheSource或DataSource的子类且必须实现Service接口。
value类名必须是CacheSource或DataSource的子类且必须实现Service接口。如果是DataSource.class系统自动映射成DataJdbcSource.class
groups: 指定groups。
xxx: 其他属性与子节点通过Service.init方法传入的AnyValue获取。
-->
@@ -92,7 +94,8 @@
System.setProperty("convert.json.writer.buffer.defsize", "4096");
System.setProperty("convert.bson.writer.buffer.defsize", "4096");
<properties>节点下也可包含非<property>节点,其节点可以通过@Resource(name="properties.xxxxxx")进行注入, 被注解的字段类型只能是AnyValue、AnyValue[]
<properties>节点下也可包含非<property>节点.
非<property>其节点可以通过@Resource(name="properties.xxxxxx")进行注入, 被注解的字段类型只能是AnyValue、AnyValue[]
-->
<properties load="config.properties">
<property name="system.property.yyyy" value="YYYYYY"/>
@@ -147,7 +150,7 @@
<service value="com.xxx.XXX2Service" name="" groups="xxx;yyy"/>
<!-- 给Service增加配置属性 -->
<service value="com.xxx.XXX1Service">
<!-- property节点值在 public void init(AnyValue conf) 方法中可以通过 AnyValue properties = conf.getAnyValue("properties");获取 -->
<!-- property值在public void init(AnyValue conf)方法中可以通过AnyValue properties=conf.getAnyValue("properties")获取 -->
<property name="xxxxxx" value="XXXXXXXX"/>
<property name="xxxxxx" value="XXXXXXXX"/>
</service>
@@ -172,7 +175,7 @@
<!-- 给Filter增加配置属性 -->
<filter value="com.xxx.XXX12Filter">
<!-- property节点值在 public void init(AnyValue conf) 方法中可以通过 AnyValue properties = conf.getAnyValue("properties");获取 -->
<!-- property值在public void init(AnyValue conf)方法中可以通过AnyValue properties=conf.getAnyValue("properties")获取 -->
<property name="xxxxxx" value="XXXXXXXX"/>
<property name="xxxxxx" value="XXXXXXXX"/>
</filter>

View File

@@ -7,10 +7,11 @@
module org.redkale {
requires java.se;
requires jdk.unsupported;
requires jdk.unsupported; //sun.misc.Unsafe
exports javax.annotation;
exports javax.persistence;
exports org.redkale.asm;
exports org.redkale.boot;
exports org.redkale.boot.watch;
exports org.redkale.convert;

View File

@@ -245,6 +245,8 @@ public final class Application {
TransportStrategy strategy = null;
int bufferCapacity = 8 * 1024;
int bufferPoolSize = Runtime.getRuntime().availableProcessors() * 16;
int readTimeoutSecond = TransportFactory.DEFAULT_READTIMEOUTSECOND;
int writeTimeoutSecond = TransportFactory.DEFAULT_WRITETIMEOUTSECOND;
AtomicLong createBufferCounter = new AtomicLong();
AtomicLong cycleBufferCounter = new AtomicLong();
if (resources != null) {
@@ -255,6 +257,8 @@ public final class Application {
//--------------transportBufferPool-----------
bufferCapacity = Math.max(parseLenth(transportConf.getValue("bufferCapacity"), bufferCapacity), 4 * 1024);
bufferPoolSize = parseLenth(transportConf.getValue("bufferPoolSize"), groupsize * Runtime.getRuntime().availableProcessors() * 8);
readTimeoutSecond = transportConf.getIntValue("readTimeoutSecond", readTimeoutSecond);
writeTimeoutSecond = transportConf.getIntValue("writeTimeoutSecond", writeTimeoutSecond);
final int threads = parseLenth(transportConf.getValue("threads"), groupsize * Runtime.getRuntime().availableProcessors() * 8);
final int capacity = bufferCapacity;
transportPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, bufferPoolSize,
@@ -306,7 +310,7 @@ public final class Application {
return true;
});
}
this.sncpTransportFactory = TransportFactory.create(transportExec, transportPool, transportGroup, strategy);
this.sncpTransportFactory = TransportFactory.create(transportExec, transportPool, transportGroup, readTimeoutSecond, writeTimeoutSecond, strategy);
DefaultAnyValue tarnsportConf = DefaultAnyValue.create(TransportFactory.NAME_PINGINTERVAL, System.getProperty("net.transport.pinginterval", "30"));
this.sncpTransportFactory.init(tarnsportConf, Sncp.PING_BUFFER, Sncp.PONG_BUFFER.remaining());
Thread.currentThread().setContextClassLoader(this.classLoader);

View File

@@ -179,6 +179,7 @@ public abstract class NodeServer {
for (AnyValue sourceConf : resources.getAnyValues("source")) {
try {
Class type = serverClassLoader.loadClass(sourceConf.getValue("value"));
if (type == DataSource.class) type = DataJdbcSource.class;
if (!Service.class.isAssignableFrom(type)) {
logger.log(Level.SEVERE, "load application source resource, but not Service error: " + sourceConf);
} else if (CacheSource.class.isAssignableFrom(type)) {

View File

@@ -31,10 +31,10 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
protected volatile long writetime;
//关闭数
AtomicLong closedCounter;
protected AtomicLong closedCounter;
//在线数
AtomicLong livingCounter;
protected AtomicLong livingCounter;
public final long getLastReadTime() {
return readtime;

View File

@@ -217,7 +217,7 @@ public abstract class ProtocolServer {
AsyncConnection conn = AsyncConnection.create(channel, null, context.readTimeoutSecond, context.writeTimeoutSecond);
conn.livingCounter = livingCounter;
conn.closedCounter = closedCounter;
context.submitAsync(new PrepareRunner(context, conn, null));
context.runAsync(new PrepareRunner(context, conn, null));
}
@Override

View File

@@ -29,6 +29,12 @@ import org.redkale.util.*;
*/
public class TransportFactory {
@Comment("默认TCP读取超时秒数")
public static int DEFAULT_READTIMEOUTSECOND = 6;
@Comment("默认TCP写入超时秒数")
public static int DEFAULT_WRITETIMEOUTSECOND = 6;
public static final String NAME_PINGINTERVAL = "pinginterval";
protected static final Logger logger = Logger.getLogger(TransportFactory.class.getSimpleName());
@@ -55,6 +61,12 @@ public class TransportFactory {
//心跳周期, 单位:秒
protected int pinginterval;
//TCP读取超时秒数
protected int readTimeoutSecond;
//TCP写入超时秒数
protected int writeTimeoutSecond;
//ping的定时器
private ScheduledThreadPoolExecutor pingScheduler;
@@ -68,15 +80,18 @@ public class TransportFactory {
protected final TransportStrategy strategy;
protected TransportFactory(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup,
final TransportStrategy strategy) {
int readTimeoutSecond, int writeTimeoutSecond, final TransportStrategy strategy) {
this.executor = executor;
this.bufferPool = bufferPool;
this.channelGroup = channelGroup;
this.readTimeoutSecond = readTimeoutSecond;
this.writeTimeoutSecond = writeTimeoutSecond;
this.strategy = strategy;
}
protected TransportFactory(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup) {
this(executor, bufferPool, channelGroup, null);
protected TransportFactory(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup,
int readTimeoutSecond, int writeTimeoutSecond) {
this(executor, bufferPool, channelGroup, readTimeoutSecond, writeTimeoutSecond, null);
}
public void init(AnyValue conf, ByteBuffer pingBuffer, int pongLength) {
@@ -100,10 +115,14 @@ public class TransportFactory {
}
public static TransportFactory create(int threads) {
return create(threads, threads * 2, 8 * 1024);
return create(threads, threads * 2, 8 * 1024, DEFAULT_READTIMEOUTSECOND, DEFAULT_WRITETIMEOUTSECOND);
}
public static TransportFactory create(int threads, int bufferPoolSize, int bufferCapacity) {
return create(threads, bufferPoolSize, bufferCapacity, DEFAULT_READTIMEOUTSECOND, DEFAULT_WRITETIMEOUTSECOND);
}
public static TransportFactory create(int threads, int bufferPoolSize, int bufferCapacity, int readTimeoutSecond, int writeTimeoutSecond) {
final ObjectPool<ByteBuffer> transportPool = new ObjectPool<>(new AtomicLong(), new AtomicLong(), bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(bufferCapacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != bufferCapacity) return false;
@@ -123,16 +142,21 @@ public class TransportFactory {
} catch (IOException e) {
throw new RuntimeException(e);
}
return create(transportExec, transportPool, transportGroup);
return create(transportExec, transportPool, transportGroup, readTimeoutSecond, writeTimeoutSecond);
}
public static TransportFactory create(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup) {
return new TransportFactory(executor, bufferPool, channelGroup, null);
return new TransportFactory(executor, bufferPool, channelGroup, DEFAULT_READTIMEOUTSECOND, DEFAULT_WRITETIMEOUTSECOND, null);
}
public static TransportFactory create(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup,
final TransportStrategy strategy) {
return new TransportFactory(executor, bufferPool, channelGroup, strategy);
int readTimeoutSecond, int writeTimeoutSecond) {
return new TransportFactory(executor, bufferPool, channelGroup, readTimeoutSecond, writeTimeoutSecond, null);
}
public static TransportFactory create(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup,
int readTimeoutSecond, int writeTimeoutSecond, final TransportStrategy strategy) {
return new TransportFactory(executor, bufferPool, channelGroup, readTimeoutSecond, writeTimeoutSecond, strategy);
}
public Transport createTransportTCP(String name, final InetSocketAddress clientAddress, final Collection<InetSocketAddress> addresses) {

View File

@@ -15,6 +15,7 @@ import java.text.*;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.logging.Level;
import org.redkale.convert.*;
import org.redkale.convert.json.JsonConvert;
@@ -34,15 +35,6 @@ import org.redkale.util.*;
*/
public class HttpResponse extends Response<HttpContext, HttpRequest> {
/**
* HttpResponse.finish 方法内调用
* 主要给@HttpCacheable使用
*/
protected static interface BufferHandler {
public ByteBuffer[] execute(final HttpResponse response, final ByteBuffer[] buffers);
}
private static final ByteBuffer buffer304 = ByteBuffer.wrap("HTTP/1.1 304 Not Modified\r\nContent-Length:0\r\n\r\n".getBytes()).asReadOnlyBuffer();
private static final ByteBuffer buffer404 = ByteBuffer.wrap("HTTP/1.1 404 Not Found\r\nContent-Length:0\r\n\r\n".getBytes()).asReadOnlyBuffer();
@@ -117,7 +109,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
private boolean headsended = false;
private BufferHandler bufferHandler;
private BiFunction<HttpResponse, ByteBuffer[], ByteBuffer[]> bufferHandler;
//------------------------------------------------
private final DefaultAnyValue header = new DefaultAnyValue();
@@ -498,7 +490,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
}
if (context.getCharset() == null) {
if (bufferHandler != null) {
bufferHandler.execute(this, new ByteBuffer[]{ByteBuffer.wrap(Utility.encodeUTF8(obj))});
bufferHandler.apply(this, new ByteBuffer[]{ByteBuffer.wrap(Utility.encodeUTF8(obj))});
}
final char[] chars = Utility.charArray(obj);
this.contentLength = Utility.encodeUTF8Length(chars);
@@ -513,7 +505,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
} else {
ByteBuffer buffer = context.getCharset().encode(obj);
if (bufferHandler != null) {
ByteBuffer[] bufs = bufferHandler.execute(this, new ByteBuffer[]{buffer});
ByteBuffer[] bufs = bufferHandler.apply(this, new ByteBuffer[]{buffer});
if (bufs != null) buffer = bufs[0];
}
this.contentLength = buffer.remaining();
@@ -621,7 +613,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
public void finish(boolean kill, ByteBuffer... buffers) {
if (isClosed()) return; //避免重复关闭
if (bufferHandler != null) {
ByteBuffer[] bufs = bufferHandler.execute(this, buffers);
ByteBuffer[] bufs = bufferHandler.apply(this, buffers);
if (bufs != null) buffers = bufs;
}
if (kill) refuseAlive();
@@ -801,7 +793,8 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
this.channel.write(hbuffer, hbuffer, new TransferFileHandler(file, offset, length));
}
private ByteBuffer createHeader() {
//Header大小不能超过一个ByteBuffer的容量
protected ByteBuffer createHeader() {
this.headsended = true;
ByteBuffer buffer = this.context.pollBuffer();
buffer.put(("HTTP/1.1 " + this.status + " " + (this.status == 200 ? "OK" : httpCodes.get(this.status)) + "\r\n").getBytes());
@@ -1015,7 +1008,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
*
* @return 拦截器
*/
protected BufferHandler getBufferHandler() {
protected BiFunction<HttpResponse, ByteBuffer[], ByteBuffer[]> getBufferHandler() {
return bufferHandler;
}
@@ -1024,7 +1017,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
*
* @param bufferHandler 拦截器
*/
protected void setBufferHandler(BufferHandler bufferHandler) {
protected void setBufferHandler(BiFunction<HttpResponse, ByteBuffer[], ByteBuffer[]> bufferHandler) {
this.bufferHandler = bufferHandler;
}

View File

@@ -10,6 +10,7 @@ import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import jdk.internal.org.objectweb.asm.*;
import static jdk.internal.org.objectweb.asm.ClassWriter.COMPUTE_FRAMES;
import static jdk.internal.org.objectweb.asm.Opcodes.*;
@@ -315,7 +316,7 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
return false;
}
public final HttpResponse.BufferHandler cacheHandler;
public final BiFunction<HttpResponse, ByteBuffer[], ByteBuffer[]> cacheHandler;
public final ConcurrentHashMap<String, CacheEntry> cache;