This commit is contained in:
@@ -37,11 +37,11 @@
|
|||||||
threads: 线程总数, 默认: <group>节点数*CPU核数*2
|
threads: 线程总数, 默认: <group>节点数*CPU核数*2
|
||||||
bufferCapacity: ByteBuffer的初始化大小, 默认: 32K;
|
bufferCapacity: ByteBuffer的初始化大小, 默认: 32K;
|
||||||
bufferPoolSize: ByteBuffer池的大小,默认: 线程总数*4
|
bufferPoolSize: ByteBuffer池的大小,默认: 线程总数*4
|
||||||
readTimeoutSecond: TCP读取超时秒数, 默认为6秒, 为0表示无超时限制
|
readTimeoutSeconds: TCP读取超时秒数, 默认为6秒, 为0表示无超时限制
|
||||||
writeTimeoutSecond: TCP写入超时秒数, 默认为6秒, 为0表示无超时限制
|
writeTimeoutSeconds: TCP写入超时秒数, 默认为6秒, 为0表示无超时限制
|
||||||
strategy: 远程请求的负载均衡策略, 必须是org.redkale.net.TransportStrategy的实现类
|
strategy: 远程请求的负载均衡策略, 必须是org.redkale.net.TransportStrategy的实现类
|
||||||
-->
|
-->
|
||||||
<transport bufferCapacity="32K" bufferPoolSize="32" threads="32" readTimeoutSecond="6" writeTimeoutSecond="6"/>
|
<transport bufferCapacity="32K" bufferPoolSize="32" threads="32" readTimeoutSeconds="6" writeTimeoutSeconds="6"/>
|
||||||
|
|
||||||
<!--
|
<!--
|
||||||
一个组包含多个node, 同一Service服务可以由多个进程提供,这些进程称为一个GROUP,且同一GROUP内的进程必须在同一机房或局域网内
|
一个组包含多个node, 同一Service服务可以由多个进程提供,这些进程称为一个GROUP,且同一GROUP内的进程必须在同一机房或局域网内
|
||||||
@@ -122,9 +122,9 @@
|
|||||||
bufferCapacity: ByteBuffer的初始化大小, 默认: 32K; (HTTP 2.0、WebSocket,必须要16k以上)
|
bufferCapacity: ByteBuffer的初始化大小, 默认: 32K; (HTTP 2.0、WebSocket,必须要16k以上)
|
||||||
bufferPoolSize: ByteBuffer池的大小,默认: 线程总数*4
|
bufferPoolSize: ByteBuffer池的大小,默认: 线程总数*4
|
||||||
responsePoolSize: Response池的大小,默认: 线程总数*2
|
responsePoolSize: Response池的大小,默认: 线程总数*2
|
||||||
aliveTimeoutSecond: KeepAlive读操作超时秒数, 默认0, 表示永久不超时,-1表示禁止KeepAlive
|
aliveTimeoutSeconds: KeepAlive读操作超时秒数, 默认0, 表示永久不超时,-1表示禁止KeepAlive
|
||||||
readTimeoutSecond: 读操作超时秒数, 默认0, 表示永久不超时
|
readTimeoutSeconds: 读操作超时秒数, 默认0, 表示永久不超时
|
||||||
writeTimeoutSecond: 写操作超时秒数, 默认0, 表示永久不超时
|
writeTimeoutSeconds: 写操作超时秒数, 默认0, 表示永久不超时
|
||||||
interceptor: 启动/关闭NodeServer时被调用的拦截器实现类,必须是org.redkale.boot.NodeInterceptor的子类,默认为null
|
interceptor: 启动/关闭NodeServer时被调用的拦截器实现类,必须是org.redkale.boot.NodeInterceptor的子类,默认为null
|
||||||
-->
|
-->
|
||||||
<server protocol="HTTP" host="127.0.0.1" port="6060" root="root" lib="">
|
<server protocol="HTTP" host="127.0.0.1" port="6060" root="root" lib="">
|
||||||
|
|||||||
@@ -249,8 +249,8 @@ public final class Application {
|
|||||||
TransportStrategy strategy = null;
|
TransportStrategy strategy = null;
|
||||||
int bufferCapacity = 32 * 1024;
|
int bufferCapacity = 32 * 1024;
|
||||||
int bufferPoolSize = Runtime.getRuntime().availableProcessors() * 8;
|
int bufferPoolSize = Runtime.getRuntime().availableProcessors() * 8;
|
||||||
int readTimeoutSecond = TransportFactory.DEFAULT_READTIMEOUTSECOND;
|
int readTimeoutSeconds = TransportFactory.DEFAULT_READTIMEOUTSECONDS;
|
||||||
int writeTimeoutSecond = TransportFactory.DEFAULT_WRITETIMEOUTSECOND;
|
int writeTimeoutSeconds = TransportFactory.DEFAULT_WRITETIMEOUTSECONDS;
|
||||||
AtomicLong createBufferCounter = new AtomicLong();
|
AtomicLong createBufferCounter = new AtomicLong();
|
||||||
AtomicLong cycleBufferCounter = new AtomicLong();
|
AtomicLong cycleBufferCounter = new AtomicLong();
|
||||||
if (resources != null) {
|
if (resources != null) {
|
||||||
@@ -260,8 +260,8 @@ public final class Application {
|
|||||||
if (transportConf != null) {
|
if (transportConf != null) {
|
||||||
//--------------transportBufferPool-----------
|
//--------------transportBufferPool-----------
|
||||||
bufferCapacity = Math.max(parseLenth(transportConf.getValue("bufferCapacity"), bufferCapacity), 8 * 1024);
|
bufferCapacity = Math.max(parseLenth(transportConf.getValue("bufferCapacity"), bufferCapacity), 8 * 1024);
|
||||||
readTimeoutSecond = transportConf.getIntValue("readTimeoutSecond", readTimeoutSecond);
|
readTimeoutSeconds = transportConf.getIntValue("readTimeoutSeconds", readTimeoutSeconds);
|
||||||
writeTimeoutSecond = transportConf.getIntValue("writeTimeoutSecond", writeTimeoutSecond);
|
writeTimeoutSeconds = transportConf.getIntValue("writeTimeoutSeconds", writeTimeoutSeconds);
|
||||||
final int threads = parseLenth(transportConf.getValue("threads"), groupsize * Runtime.getRuntime().availableProcessors() * 2);
|
final int threads = parseLenth(transportConf.getValue("threads"), groupsize * Runtime.getRuntime().availableProcessors() * 2);
|
||||||
bufferPoolSize = parseLenth(transportConf.getValue("bufferPoolSize"), threads * 4);
|
bufferPoolSize = parseLenth(transportConf.getValue("bufferPoolSize"), threads * 4);
|
||||||
final int capacity = bufferCapacity;
|
final int capacity = bufferCapacity;
|
||||||
@@ -314,7 +314,7 @@ public final class Application {
|
|||||||
return true;
|
return true;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
this.sncpTransportFactory = TransportFactory.create(transportExec, transportPool, transportGroup, (SSLContext) null, readTimeoutSecond, writeTimeoutSecond, strategy);
|
this.sncpTransportFactory = TransportFactory.create(transportExec, transportPool, transportGroup, (SSLContext) null, readTimeoutSeconds, writeTimeoutSeconds, strategy);
|
||||||
DefaultAnyValue tarnsportConf = DefaultAnyValue.create(TransportFactory.NAME_POOLMAXCONNS, System.getProperty("net.transport.poolmaxconns", "100"))
|
DefaultAnyValue tarnsportConf = DefaultAnyValue.create(TransportFactory.NAME_POOLMAXCONNS, System.getProperty("net.transport.poolmaxconns", "100"))
|
||||||
.addValue(TransportFactory.NAME_PINGINTERVAL, System.getProperty("net.transport.pinginterval", "30"))
|
.addValue(TransportFactory.NAME_PINGINTERVAL, System.getProperty("net.transport.pinginterval", "30"))
|
||||||
.addValue(TransportFactory.NAME_CHECKINTERVAL, System.getProperty("net.transport.checkinterval", "30"));
|
.addValue(TransportFactory.NAME_CHECKINTERVAL, System.getProperty("net.transport.checkinterval", "30"));
|
||||||
|
|||||||
@@ -53,13 +53,13 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
|
|||||||
|
|
||||||
public abstract SocketAddress getLocalAddress();
|
public abstract SocketAddress getLocalAddress();
|
||||||
|
|
||||||
public abstract int getReadTimeoutSecond();
|
public abstract int getReadTimeoutSeconds();
|
||||||
|
|
||||||
public abstract int getWriteTimeoutSecond();
|
public abstract int getWriteTimeoutSeconds();
|
||||||
|
|
||||||
public abstract void setReadTimeoutSecond(int readTimeoutSecond);
|
public abstract void setReadTimeoutSeconds(int readTimeoutSeconds);
|
||||||
|
|
||||||
public abstract void setWriteTimeoutSecond(int writeTimeoutSecond);
|
public abstract void setWriteTimeoutSeconds(int writeTimeoutSeconds);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public abstract Future<Integer> read(ByteBuffer dst);
|
public abstract Future<Integer> read(ByteBuffer dst);
|
||||||
@@ -144,14 +144,14 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
|
|||||||
*
|
*
|
||||||
* @param address 连接点子
|
* @param address 连接点子
|
||||||
* @param group 连接AsynchronousChannelGroup
|
* @param group 连接AsynchronousChannelGroup
|
||||||
* @param readTimeoutSecond 读取超时秒数
|
* @param readTimeoutSeconds 读取超时秒数
|
||||||
* @param writeTimeoutSecond 写入超时秒数
|
* @param writeTimeoutSeconds 写入超时秒数
|
||||||
*
|
*
|
||||||
* @return 连接CompletableFuture
|
* @return 连接CompletableFuture
|
||||||
*/
|
*/
|
||||||
public static CompletableFuture<AsyncConnection> createTCP(final AsynchronousChannelGroup group, final SocketAddress address,
|
public static CompletableFuture<AsyncConnection> createTCP(final AsynchronousChannelGroup group, final SocketAddress address,
|
||||||
final int readTimeoutSecond, final int writeTimeoutSecond) {
|
final int readTimeoutSeconds, final int writeTimeoutSeconds) {
|
||||||
return createTCP(group, null, address, false, readTimeoutSecond, writeTimeoutSecond);
|
return createTCP(group, null, address, false, readTimeoutSeconds, writeTimeoutSeconds);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -160,14 +160,14 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
|
|||||||
* @param address 连接点子
|
* @param address 连接点子
|
||||||
* @param sslContext SSLContext
|
* @param sslContext SSLContext
|
||||||
* @param group 连接AsynchronousChannelGroup
|
* @param group 连接AsynchronousChannelGroup
|
||||||
* @param readTimeoutSecond 读取超时秒数
|
* @param readTimeoutSeconds 读取超时秒数
|
||||||
* @param writeTimeoutSecond 写入超时秒数
|
* @param writeTimeoutSeconds 写入超时秒数
|
||||||
*
|
*
|
||||||
* @return 连接CompletableFuture
|
* @return 连接CompletableFuture
|
||||||
*/
|
*/
|
||||||
public static CompletableFuture<AsyncConnection> createTCP(final AsynchronousChannelGroup group, final SSLContext sslContext,
|
public static CompletableFuture<AsyncConnection> createTCP(final AsynchronousChannelGroup group, final SSLContext sslContext,
|
||||||
final SocketAddress address, final int readTimeoutSecond, final int writeTimeoutSecond) {
|
final SocketAddress address, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
|
||||||
return createTCP(group, sslContext, address, false, readTimeoutSecond, writeTimeoutSecond);
|
return createTCP(group, sslContext, address, false, readTimeoutSeconds, writeTimeoutSeconds);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -177,13 +177,13 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
|
|||||||
* @param sslContext SSLContext
|
* @param sslContext SSLContext
|
||||||
* @param group 连接AsynchronousChannelGroup
|
* @param group 连接AsynchronousChannelGroup
|
||||||
* @param noDelay TcpNoDelay
|
* @param noDelay TcpNoDelay
|
||||||
* @param readTimeoutSecond 读取超时秒数
|
* @param readTimeoutSeconds 读取超时秒数
|
||||||
* @param writeTimeoutSecond 写入超时秒数
|
* @param writeTimeoutSeconds 写入超时秒数
|
||||||
*
|
*
|
||||||
* @return 连接CompletableFuture
|
* @return 连接CompletableFuture
|
||||||
*/
|
*/
|
||||||
public static CompletableFuture<AsyncConnection> createTCP(final AsynchronousChannelGroup group, final SSLContext sslContext,
|
public static CompletableFuture<AsyncConnection> createTCP(final AsynchronousChannelGroup group, final SSLContext sslContext,
|
||||||
final SocketAddress address, final boolean noDelay, final int readTimeoutSecond, final int writeTimeoutSecond) {
|
final SocketAddress address, final boolean noDelay, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
|
||||||
final CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
|
final CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
|
||||||
try {
|
try {
|
||||||
final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group);
|
final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group);
|
||||||
@@ -196,7 +196,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
|
|||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
future.complete(create(channel, sslContext, address, readTimeoutSecond, writeTimeoutSecond));
|
future.complete(create(channel, sslContext, address, readTimeoutSeconds, writeTimeoutSeconds));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -212,9 +212,9 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
|
|||||||
|
|
||||||
private static class BIOUDPAsyncConnection extends AsyncConnection {
|
private static class BIOUDPAsyncConnection extends AsyncConnection {
|
||||||
|
|
||||||
private int readTimeoutSecond;
|
private int readTimeoutSeconds;
|
||||||
|
|
||||||
private int writeTimeoutSecond;
|
private int writeTimeoutSeconds;
|
||||||
|
|
||||||
private final DatagramChannel channel;
|
private final DatagramChannel channel;
|
||||||
|
|
||||||
@@ -223,32 +223,32 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
|
|||||||
private final boolean client;
|
private final boolean client;
|
||||||
|
|
||||||
public BIOUDPAsyncConnection(final DatagramChannel ch, SocketAddress addr,
|
public BIOUDPAsyncConnection(final DatagramChannel ch, SocketAddress addr,
|
||||||
final boolean client0, final int readTimeoutSecond0, final int writeTimeoutSecond0) {
|
final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0) {
|
||||||
this.channel = ch;
|
this.channel = ch;
|
||||||
this.client = client0;
|
this.client = client0;
|
||||||
this.readTimeoutSecond = readTimeoutSecond0;
|
this.readTimeoutSeconds = readTimeoutSeconds0;
|
||||||
this.writeTimeoutSecond = writeTimeoutSecond0;
|
this.writeTimeoutSeconds = writeTimeoutSeconds0;
|
||||||
this.remoteAddress = addr;
|
this.remoteAddress = addr;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setReadTimeoutSecond(int readTimeoutSecond) {
|
public void setReadTimeoutSeconds(int readTimeoutSeconds) {
|
||||||
this.readTimeoutSecond = readTimeoutSecond;
|
this.readTimeoutSeconds = readTimeoutSeconds;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setWriteTimeoutSecond(int writeTimeoutSecond) {
|
public void setWriteTimeoutSeconds(int writeTimeoutSeconds) {
|
||||||
this.writeTimeoutSecond = writeTimeoutSecond;
|
this.writeTimeoutSeconds = writeTimeoutSeconds;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getReadTimeoutSecond() {
|
public int getReadTimeoutSeconds() {
|
||||||
return this.readTimeoutSecond;
|
return this.readTimeoutSeconds;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getWriteTimeoutSecond() {
|
public int getWriteTimeoutSeconds() {
|
||||||
return this.writeTimeoutSecond;
|
return this.writeTimeoutSeconds;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -347,15 +347,15 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static AsyncConnection create(final DatagramChannel ch, SocketAddress addr,
|
public static AsyncConnection create(final DatagramChannel ch, SocketAddress addr,
|
||||||
final boolean client0, final int readTimeoutSecond0, final int writeTimeoutSecond0) {
|
final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0) {
|
||||||
return new BIOUDPAsyncConnection(ch, addr, client0, readTimeoutSecond0, writeTimeoutSecond0);
|
return new BIOUDPAsyncConnection(ch, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class BIOTCPAsyncConnection extends AsyncConnection {
|
private static class BIOTCPAsyncConnection extends AsyncConnection {
|
||||||
|
|
||||||
private int readTimeoutSecond;
|
private int readTimeoutSeconds;
|
||||||
|
|
||||||
private int writeTimeoutSecond;
|
private int writeTimeoutSeconds;
|
||||||
|
|
||||||
private final Socket socket;
|
private final Socket socket;
|
||||||
|
|
||||||
@@ -365,12 +365,12 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
|
|||||||
|
|
||||||
private final SocketAddress remoteAddress;
|
private final SocketAddress remoteAddress;
|
||||||
|
|
||||||
public BIOTCPAsyncConnection(final Socket socket, final SocketAddress addr0, final int readTimeoutSecond0, final int writeTimeoutSecond0) {
|
public BIOTCPAsyncConnection(final Socket socket, final SocketAddress addr0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0) {
|
||||||
this.socket = socket;
|
this.socket = socket;
|
||||||
ReadableByteChannel rc = null;
|
ReadableByteChannel rc = null;
|
||||||
WritableByteChannel wc = null;
|
WritableByteChannel wc = null;
|
||||||
try {
|
try {
|
||||||
socket.setSoTimeout(Math.max(readTimeoutSecond0, writeTimeoutSecond0));
|
socket.setSoTimeout(Math.max(readTimeoutSeconds0, writeTimeoutSeconds0));
|
||||||
rc = Channels.newChannel(socket.getInputStream());
|
rc = Channels.newChannel(socket.getInputStream());
|
||||||
wc = Channels.newChannel(socket.getOutputStream());
|
wc = Channels.newChannel(socket.getOutputStream());
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
@@ -378,8 +378,8 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
|
|||||||
}
|
}
|
||||||
this.readChannel = rc;
|
this.readChannel = rc;
|
||||||
this.writeChannel = wc;
|
this.writeChannel = wc;
|
||||||
this.readTimeoutSecond = readTimeoutSecond0;
|
this.readTimeoutSeconds = readTimeoutSeconds0;
|
||||||
this.writeTimeoutSecond = writeTimeoutSecond0;
|
this.writeTimeoutSeconds = writeTimeoutSeconds0;
|
||||||
SocketAddress addr = addr0;
|
SocketAddress addr = addr0;
|
||||||
if (addr == null) {
|
if (addr == null) {
|
||||||
try {
|
try {
|
||||||
@@ -407,23 +407,23 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getReadTimeoutSecond() {
|
public int getReadTimeoutSeconds() {
|
||||||
return readTimeoutSecond;
|
return readTimeoutSeconds;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getWriteTimeoutSecond() {
|
public int getWriteTimeoutSeconds() {
|
||||||
return writeTimeoutSecond;
|
return writeTimeoutSeconds;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setReadTimeoutSecond(int readTimeoutSecond) {
|
public void setReadTimeoutSeconds(int readTimeoutSeconds) {
|
||||||
this.readTimeoutSecond = readTimeoutSecond;
|
this.readTimeoutSeconds = readTimeoutSeconds;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setWriteTimeoutSecond(int writeTimeoutSecond) {
|
public void setWriteTimeoutSeconds(int writeTimeoutSeconds) {
|
||||||
this.writeTimeoutSecond = writeTimeoutSecond;
|
this.writeTimeoutSeconds = writeTimeoutSeconds;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -518,20 +518,20 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
|
|||||||
|
|
||||||
private static class AIOTCPAsyncConnection extends AsyncConnection {
|
private static class AIOTCPAsyncConnection extends AsyncConnection {
|
||||||
|
|
||||||
private int readTimeoutSecond;
|
private int readTimeoutSeconds;
|
||||||
|
|
||||||
private int writeTimeoutSecond;
|
private int writeTimeoutSeconds;
|
||||||
|
|
||||||
private final AsynchronousSocketChannel channel;
|
private final AsynchronousSocketChannel channel;
|
||||||
|
|
||||||
private final SocketAddress remoteAddress;
|
private final SocketAddress remoteAddress;
|
||||||
|
|
||||||
public AIOTCPAsyncConnection(final AsynchronousSocketChannel ch, SSLContext sslContext,
|
public AIOTCPAsyncConnection(final AsynchronousSocketChannel ch, SSLContext sslContext,
|
||||||
final SocketAddress addr0, final int readTimeoutSecond0, final int writeTimeoutSecond0) {
|
final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
|
||||||
this.channel = ch;
|
this.channel = ch;
|
||||||
this.sslContext = sslContext;
|
this.sslContext = sslContext;
|
||||||
this.readTimeoutSecond = readTimeoutSecond0;
|
this.readTimeoutSeconds = readTimeoutSeconds;
|
||||||
this.writeTimeoutSecond = writeTimeoutSecond0;
|
this.writeTimeoutSeconds = writeTimeoutSeconds;
|
||||||
SocketAddress addr = addr0;
|
SocketAddress addr = addr0;
|
||||||
if (addr == null) {
|
if (addr == null) {
|
||||||
try {
|
try {
|
||||||
@@ -546,8 +546,8 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
|
|||||||
@Override
|
@Override
|
||||||
public <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) {
|
public <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) {
|
||||||
this.readtime = System.currentTimeMillis();
|
this.readtime = System.currentTimeMillis();
|
||||||
if (readTimeoutSecond > 0) {
|
if (readTimeoutSeconds > 0) {
|
||||||
channel.read(dst, readTimeoutSecond, TimeUnit.SECONDS, attachment, handler);
|
channel.read(dst, readTimeoutSeconds, TimeUnit.SECONDS, attachment, handler);
|
||||||
} else {
|
} else {
|
||||||
channel.read(dst, attachment, handler);
|
channel.read(dst, attachment, handler);
|
||||||
}
|
}
|
||||||
@@ -562,8 +562,8 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
|
|||||||
@Override
|
@Override
|
||||||
public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
|
public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
|
||||||
this.writetime = System.currentTimeMillis();
|
this.writetime = System.currentTimeMillis();
|
||||||
if (writeTimeoutSecond > 0) {
|
if (writeTimeoutSeconds > 0) {
|
||||||
channel.write(src, writeTimeoutSecond, TimeUnit.SECONDS, attachment, handler);
|
channel.write(src, writeTimeoutSeconds, TimeUnit.SECONDS, attachment, handler);
|
||||||
} else {
|
} else {
|
||||||
channel.write(src, attachment, handler);
|
channel.write(src, attachment, handler);
|
||||||
}
|
}
|
||||||
@@ -572,7 +572,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
|
|||||||
@Override
|
@Override
|
||||||
public <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, final CompletionHandler<Integer, ? super A> handler) {
|
public <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, final CompletionHandler<Integer, ? super A> handler) {
|
||||||
this.writetime = System.currentTimeMillis();
|
this.writetime = System.currentTimeMillis();
|
||||||
channel.write(srcs, offset, length, writeTimeoutSecond > 0 ? writeTimeoutSecond : 60, TimeUnit.SECONDS,
|
channel.write(srcs, offset, length, writeTimeoutSeconds > 0 ? writeTimeoutSeconds : 60, TimeUnit.SECONDS,
|
||||||
attachment, new CompletionHandler<Long, A>() {
|
attachment, new CompletionHandler<Long, A>() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -589,23 +589,23 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setReadTimeoutSecond(int readTimeoutSecond) {
|
public void setReadTimeoutSeconds(int readTimeoutSeconds) {
|
||||||
this.readTimeoutSecond = readTimeoutSecond;
|
this.readTimeoutSeconds = readTimeoutSeconds;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setWriteTimeoutSecond(int writeTimeoutSecond) {
|
public void setWriteTimeoutSeconds(int writeTimeoutSeconds) {
|
||||||
this.writeTimeoutSecond = writeTimeoutSecond;
|
this.writeTimeoutSeconds = writeTimeoutSeconds;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getReadTimeoutSecond() {
|
public int getReadTimeoutSeconds() {
|
||||||
return this.readTimeoutSecond;
|
return this.readTimeoutSeconds;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getWriteTimeoutSecond() {
|
public int getWriteTimeoutSeconds() {
|
||||||
return this.writeTimeoutSecond;
|
return this.writeTimeoutSeconds;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -654,15 +654,15 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
|
|||||||
return create(ch, null, 0, 0);
|
return create(ch, null, 0, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, final int readTimeoutSecond, final int writeTimeoutSecond) {
|
public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
|
||||||
return new AIOTCPAsyncConnection(ch, null, addr0, readTimeoutSecond, writeTimeoutSecond);
|
return new AIOTCPAsyncConnection(ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static AsyncConnection create(final AsynchronousSocketChannel ch, SSLContext sslContext, final SocketAddress addr0, final int readTimeoutSecond, final int writeTimeoutSecond) {
|
public static AsyncConnection create(final AsynchronousSocketChannel ch, SSLContext sslContext, final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
|
||||||
return new AIOTCPAsyncConnection(ch, sslContext, addr0, readTimeoutSecond, writeTimeoutSecond);
|
return new AIOTCPAsyncConnection(ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, final Context context) {
|
public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, final Context context) {
|
||||||
return new AIOTCPAsyncConnection(ch, context.sslContext, addr0, context.readTimeoutSecond, context.writeTimeoutSecond);
|
return new AIOTCPAsyncConnection(ch, context.sslContext, addr0, context.readTimeoutSeconds, context.writeTimeoutSeconds);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -59,13 +59,13 @@ public class Context {
|
|||||||
protected final int maxbody;
|
protected final int maxbody;
|
||||||
|
|
||||||
//keep alive IO读取的超时时间
|
//keep alive IO读取的超时时间
|
||||||
protected final int aliveTimeoutSecond;
|
protected final int aliveTimeoutSeconds;
|
||||||
|
|
||||||
//IO读取的超时时间
|
//IO读取的超时时间
|
||||||
protected final int readTimeoutSecond;
|
protected final int readTimeoutSeconds;
|
||||||
|
|
||||||
//IO写入的超时时间
|
//IO写入的超时时间
|
||||||
protected final int writeTimeoutSecond;
|
protected final int writeTimeoutSeconds;
|
||||||
|
|
||||||
//日志Logger
|
//日志Logger
|
||||||
protected final Logger logger;
|
protected final Logger logger;
|
||||||
@@ -82,7 +82,7 @@ public class Context {
|
|||||||
public Context(long serverStartTime, Logger logger, ThreadPoolExecutor executor, SSLContext sslContext,
|
public Context(long serverStartTime, Logger logger, ThreadPoolExecutor executor, SSLContext sslContext,
|
||||||
int bufferCapacity, ObjectPool<ByteBuffer> bufferPool, ObjectPool<Response> responsePool,
|
int bufferCapacity, ObjectPool<ByteBuffer> bufferPool, ObjectPool<Response> responsePool,
|
||||||
final int maxbody, Charset charset, InetSocketAddress address, ResourceFactory resourceFactory,
|
final int maxbody, Charset charset, InetSocketAddress address, ResourceFactory resourceFactory,
|
||||||
final PrepareServlet prepare, final int aliveTimeoutSecond, final int readTimeoutSecond, final int writeTimeoutSecond) {
|
final PrepareServlet prepare, final int aliveTimeoutSeconds, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
|
||||||
this.serverStartTime = serverStartTime;
|
this.serverStartTime = serverStartTime;
|
||||||
this.logger = logger;
|
this.logger = logger;
|
||||||
this.executor = executor;
|
this.executor = executor;
|
||||||
@@ -95,9 +95,9 @@ public class Context {
|
|||||||
this.address = address;
|
this.address = address;
|
||||||
this.prepare = prepare;
|
this.prepare = prepare;
|
||||||
this.resourceFactory = resourceFactory;
|
this.resourceFactory = resourceFactory;
|
||||||
this.aliveTimeoutSecond = aliveTimeoutSecond;
|
this.aliveTimeoutSeconds = aliveTimeoutSeconds;
|
||||||
this.readTimeoutSecond = readTimeoutSecond;
|
this.readTimeoutSeconds = readTimeoutSeconds;
|
||||||
this.writeTimeoutSecond = writeTimeoutSecond;
|
this.writeTimeoutSeconds = writeTimeoutSeconds;
|
||||||
this.jsonFactory = JsonFactory.root();
|
this.jsonFactory = JsonFactory.root();
|
||||||
this.bsonFactory = BsonFactory.root();
|
this.bsonFactory = BsonFactory.root();
|
||||||
}
|
}
|
||||||
@@ -169,16 +169,16 @@ public class Context {
|
|||||||
return logger;
|
return logger;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getAliveTimeoutSecond() {
|
public int getAliveTimeoutSeconds() {
|
||||||
return aliveTimeoutSecond;
|
return aliveTimeoutSeconds;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getReadTimeoutSecond() {
|
public int getReadTimeoutSeconds() {
|
||||||
return readTimeoutSecond;
|
return readTimeoutSeconds;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getWriteTimeoutSecond() {
|
public int getWriteTimeoutSeconds() {
|
||||||
return writeTimeoutSecond;
|
return writeTimeoutSeconds;
|
||||||
}
|
}
|
||||||
|
|
||||||
public JsonConvert getJsonConvert() {
|
public JsonConvert getJsonConvert() {
|
||||||
|
|||||||
@@ -56,7 +56,7 @@ public final class PrepareRunner implements Runnable {
|
|||||||
if (response == null) response = responsePool.get();
|
if (response == null) response = responsePool.get();
|
||||||
final ByteBuffer buffer = response.request.pollReadBuffer();
|
final ByteBuffer buffer = response.request.pollReadBuffer();
|
||||||
try {
|
try {
|
||||||
channel.read(buffer, keepalive ? context.getAliveTimeoutSecond() : 0, TimeUnit.SECONDS, null,
|
channel.read(buffer, keepalive ? context.getAliveTimeoutSeconds() : 0, TimeUnit.SECONDS, null,
|
||||||
new CompletionHandler<Integer, Void>() {
|
new CompletionHandler<Integer, Void>() {
|
||||||
@Override
|
@Override
|
||||||
public void completed(Integer count, Void attachment1) {
|
public void completed(Integer count, Void attachment1) {
|
||||||
|
|||||||
@@ -109,8 +109,8 @@ public abstract class ProtocolServer {
|
|||||||
@Override
|
@Override
|
||||||
public void accept() {
|
public void accept() {
|
||||||
final DatagramChannel serchannel = this.serverChannel;
|
final DatagramChannel serchannel = this.serverChannel;
|
||||||
final int readTimeoutSecond = this.context.readTimeoutSecond;
|
final int readTimeoutSeconds = this.context.readTimeoutSeconds;
|
||||||
final int writeTimeoutSecond = this.context.writeTimeoutSecond;
|
final int writeTimeoutSeconds = this.context.writeTimeoutSeconds;
|
||||||
final CountDownLatch cdl = new CountDownLatch(1);
|
final CountDownLatch cdl = new CountDownLatch(1);
|
||||||
this.running = true;
|
this.running = true;
|
||||||
new Thread() {
|
new Thread() {
|
||||||
@@ -122,7 +122,7 @@ public abstract class ProtocolServer {
|
|||||||
try {
|
try {
|
||||||
SocketAddress address = serchannel.receive(buffer);
|
SocketAddress address = serchannel.receive(buffer);
|
||||||
buffer.flip();
|
buffer.flip();
|
||||||
AsyncConnection conn = AsyncConnection.create(serchannel, address, false, readTimeoutSecond, writeTimeoutSecond);
|
AsyncConnection conn = AsyncConnection.create(serchannel, address, false, readTimeoutSeconds, writeTimeoutSeconds);
|
||||||
context.runAsync(new PrepareRunner(context, conn, buffer, null));
|
context.runAsync(new PrepareRunner(context, conn, buffer, null));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
context.offerBuffer(buffer);
|
context.offerBuffer(buffer);
|
||||||
|
|||||||
@@ -92,13 +92,13 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
|
|||||||
protected int maxbody;
|
protected int maxbody;
|
||||||
|
|
||||||
//Keep-Alive IO读取的超时秒数,小于1视为不设置
|
//Keep-Alive IO读取的超时秒数,小于1视为不设置
|
||||||
protected int aliveTimeoutSecond;
|
protected int aliveTimeoutSeconds;
|
||||||
|
|
||||||
//IO读取的超时秒数,小于1视为不设置
|
//IO读取的超时秒数,小于1视为不设置
|
||||||
protected int readTimeoutSecond;
|
protected int readTimeoutSeconds;
|
||||||
|
|
||||||
//IO写入 的超时秒数,小于1视为不设置
|
//IO写入 的超时秒数,小于1视为不设置
|
||||||
protected int writeTimeoutSecond;
|
protected int writeTimeoutSeconds;
|
||||||
|
|
||||||
//最大连接数
|
//最大连接数
|
||||||
protected int maxconns;
|
protected int maxconns;
|
||||||
@@ -116,9 +116,9 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
|
|||||||
this.address = new InetSocketAddress(config.getValue("host", "0.0.0.0"), config.getIntValue("port", 80));
|
this.address = new InetSocketAddress(config.getValue("host", "0.0.0.0"), config.getIntValue("port", 80));
|
||||||
this.charset = Charset.forName(config.getValue("charset", "UTF-8"));
|
this.charset = Charset.forName(config.getValue("charset", "UTF-8"));
|
||||||
this.maxconns = config.getIntValue("maxconns", 0);
|
this.maxconns = config.getIntValue("maxconns", 0);
|
||||||
this.aliveTimeoutSecond = config.getIntValue("aliveTimeoutSecond", 0);
|
this.aliveTimeoutSeconds = config.getIntValue("aliveTimeoutSeconds", 0);
|
||||||
this.readTimeoutSecond = config.getIntValue("readTimeoutSecond", 0);
|
this.readTimeoutSeconds = config.getIntValue("readTimeoutSeconds", 0);
|
||||||
this.writeTimeoutSecond = config.getIntValue("writeTimeoutSecond", 0);
|
this.writeTimeoutSeconds = config.getIntValue("writeTimeoutSeconds", 0);
|
||||||
this.backlog = parseLenth(config.getValue("backlog"), 16 * 1024);
|
this.backlog = parseLenth(config.getValue("backlog"), 16 * 1024);
|
||||||
this.maxbody = parseLenth(config.getValue("maxbody"), 64 * 1024);
|
this.maxbody = parseLenth(config.getValue("maxbody"), 64 * 1024);
|
||||||
int bufCapacity = parseLenth(config.getValue("bufferCapacity"), 32 * 1024);
|
int bufCapacity = parseLenth(config.getValue("bufferCapacity"), 32 * 1024);
|
||||||
|
|||||||
@@ -228,12 +228,12 @@ public final class Transport {
|
|||||||
DatagramChannel channel = DatagramChannel.open();
|
DatagramChannel channel = DatagramChannel.open();
|
||||||
channel.configureBlocking(true);
|
channel.configureBlocking(true);
|
||||||
channel.connect(udpaddr);
|
channel.connect(udpaddr);
|
||||||
return CompletableFuture.completedFuture(AsyncConnection.create(channel, udpaddr, true, factory.readTimeoutSecond, factory.writeTimeoutSecond));
|
return CompletableFuture.completedFuture(AsyncConnection.create(channel, udpaddr, true, factory.readTimeoutSeconds, factory.writeTimeoutSeconds));
|
||||||
}
|
}
|
||||||
if (!rand) { //指定地址
|
if (!rand) { //指定地址
|
||||||
TransportNode node = findTransportNode(addr);
|
TransportNode node = findTransportNode(addr);
|
||||||
if (node == null) {
|
if (node == null) {
|
||||||
return AsyncConnection.createTCP(group, sslContext, addr, supportTcpNoDelay, factory.readTimeoutSecond, factory.writeTimeoutSecond);
|
return AsyncConnection.createTCP(group, sslContext, addr, supportTcpNoDelay, factory.readTimeoutSeconds, factory.writeTimeoutSeconds);
|
||||||
}
|
}
|
||||||
final BlockingQueue<AsyncConnection> queue = node.conns;
|
final BlockingQueue<AsyncConnection> queue = node.conns;
|
||||||
if (!queue.isEmpty()) {
|
if (!queue.isEmpty()) {
|
||||||
@@ -242,7 +242,7 @@ public final class Transport {
|
|||||||
if (conn.isOpen()) return CompletableFuture.completedFuture(conn);
|
if (conn.isOpen()) return CompletableFuture.completedFuture(conn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return AsyncConnection.createTCP(group, sslContext, addr, supportTcpNoDelay, factory.readTimeoutSecond, factory.writeTimeoutSecond);
|
return AsyncConnection.createTCP(group, sslContext, addr, supportTcpNoDelay, factory.readTimeoutSeconds, factory.writeTimeoutSeconds);
|
||||||
}
|
}
|
||||||
|
|
||||||
//---------------------随机取地址------------------------
|
//---------------------随机取地址------------------------
|
||||||
@@ -269,7 +269,7 @@ public final class Transport {
|
|||||||
@Override
|
@Override
|
||||||
public void completed(Void result, TransportNode attachment) {
|
public void completed(Void result, TransportNode attachment) {
|
||||||
attachment.disabletime = 0;
|
attachment.disabletime = 0;
|
||||||
AsyncConnection asyncConn = AsyncConnection.create(channel, attachment.address, factory.readTimeoutSecond, factory.writeTimeoutSecond);
|
AsyncConnection asyncConn = AsyncConnection.create(channel, attachment.address, factory.readTimeoutSeconds, factory.writeTimeoutSeconds);
|
||||||
if (future.isDone()) {
|
if (future.isDone()) {
|
||||||
if (!attachment.conns.offer(asyncConn)) asyncConn.dispose();
|
if (!attachment.conns.offer(asyncConn)) asyncConn.dispose();
|
||||||
} else {
|
} else {
|
||||||
@@ -319,7 +319,7 @@ public final class Transport {
|
|||||||
@Override
|
@Override
|
||||||
public void completed(Void result, TransportNode attachment) {
|
public void completed(Void result, TransportNode attachment) {
|
||||||
attachment.disabletime = 0;
|
attachment.disabletime = 0;
|
||||||
AsyncConnection asyncConn = AsyncConnection.create(channel, attachment.address, factory.readTimeoutSecond, factory.writeTimeoutSecond);
|
AsyncConnection asyncConn = AsyncConnection.create(channel, attachment.address, factory.readTimeoutSeconds, factory.writeTimeoutSeconds);
|
||||||
if (future.isDone()) {
|
if (future.isDone()) {
|
||||||
if (!attachment.conns.offer(asyncConn)) asyncConn.dispose();
|
if (!attachment.conns.offer(asyncConn)) asyncConn.dispose();
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -32,10 +32,10 @@ import org.redkale.util.*;
|
|||||||
public class TransportFactory {
|
public class TransportFactory {
|
||||||
|
|
||||||
@Comment("默认TCP读取超时秒数")
|
@Comment("默认TCP读取超时秒数")
|
||||||
public static int DEFAULT_READTIMEOUTSECOND = 6;
|
public static int DEFAULT_READTIMEOUTSECONDS = 6;
|
||||||
|
|
||||||
@Comment("默认TCP写入超时秒数")
|
@Comment("默认TCP写入超时秒数")
|
||||||
public static int DEFAULT_WRITETIMEOUTSECOND = 6;
|
public static int DEFAULT_WRITETIMEOUTSECONDS = 6;
|
||||||
|
|
||||||
public static final String NAME_POOLMAXCONNS = "poolmaxconns";
|
public static final String NAME_POOLMAXCONNS = "poolmaxconns";
|
||||||
|
|
||||||
@@ -74,10 +74,10 @@ public class TransportFactory {
|
|||||||
protected int pinginterval;
|
protected int pinginterval;
|
||||||
|
|
||||||
//TCP读取超时秒数
|
//TCP读取超时秒数
|
||||||
protected int readTimeoutSecond;
|
protected int readTimeoutSeconds;
|
||||||
|
|
||||||
//TCP写入超时秒数
|
//TCP写入超时秒数
|
||||||
protected int writeTimeoutSecond;
|
protected int writeTimeoutSeconds;
|
||||||
|
|
||||||
//ping和检查的定时器
|
//ping和检查的定时器
|
||||||
private ScheduledThreadPoolExecutor scheduler;
|
private ScheduledThreadPoolExecutor scheduler;
|
||||||
@@ -94,19 +94,19 @@ public class TransportFactory {
|
|||||||
protected final TransportStrategy strategy;
|
protected final TransportStrategy strategy;
|
||||||
|
|
||||||
protected TransportFactory(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup,
|
protected TransportFactory(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup,
|
||||||
SSLContext sslContext, int readTimeoutSecond, int writeTimeoutSecond, final TransportStrategy strategy) {
|
SSLContext sslContext, int readTimeoutSeconds, int writeTimeoutSeconds, final TransportStrategy strategy) {
|
||||||
this.executor = executor;
|
this.executor = executor;
|
||||||
this.bufferPool = bufferPool;
|
this.bufferPool = bufferPool;
|
||||||
this.channelGroup = channelGroup;
|
this.channelGroup = channelGroup;
|
||||||
this.sslContext = sslContext;
|
this.sslContext = sslContext;
|
||||||
this.readTimeoutSecond = readTimeoutSecond;
|
this.readTimeoutSeconds = readTimeoutSeconds;
|
||||||
this.writeTimeoutSecond = writeTimeoutSecond;
|
this.writeTimeoutSeconds = writeTimeoutSeconds;
|
||||||
this.strategy = strategy;
|
this.strategy = strategy;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected TransportFactory(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup,
|
protected TransportFactory(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup,
|
||||||
SSLContext sslContext, int readTimeoutSecond, int writeTimeoutSecond) {
|
SSLContext sslContext, int readTimeoutSeconds, int writeTimeoutSeconds) {
|
||||||
this(executor, bufferPool, channelGroup, sslContext, readTimeoutSecond, writeTimeoutSecond, null);
|
this(executor, bufferPool, channelGroup, sslContext, readTimeoutSeconds, writeTimeoutSeconds, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void init(AnyValue conf, ByteBuffer pingBuffer, int pongLength) {
|
public void init(AnyValue conf, ByteBuffer pingBuffer, int pongLength) {
|
||||||
@@ -140,14 +140,14 @@ public class TransportFactory {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static TransportFactory create(int threads) {
|
public static TransportFactory create(int threads) {
|
||||||
return create(threads, threads * 2, 8 * 1024, DEFAULT_READTIMEOUTSECOND, DEFAULT_WRITETIMEOUTSECOND);
|
return create(threads, threads * 2, 8 * 1024, DEFAULT_READTIMEOUTSECONDS, DEFAULT_WRITETIMEOUTSECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static TransportFactory create(int threads, int bufferPoolSize, int bufferCapacity) {
|
public static TransportFactory create(int threads, int bufferPoolSize, int bufferCapacity) {
|
||||||
return create(threads, bufferPoolSize, bufferCapacity, DEFAULT_READTIMEOUTSECOND, DEFAULT_WRITETIMEOUTSECOND);
|
return create(threads, bufferPoolSize, bufferCapacity, DEFAULT_READTIMEOUTSECONDS, DEFAULT_WRITETIMEOUTSECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static TransportFactory create(int threads, int bufferPoolSize, int bufferCapacity, int readTimeoutSecond, int writeTimeoutSecond) {
|
public static TransportFactory create(int threads, int bufferPoolSize, int bufferCapacity, int readTimeoutSeconds, int writeTimeoutSeconds) {
|
||||||
final ObjectPool<ByteBuffer> transportPool = new ObjectPool<>(new AtomicLong(), new AtomicLong(), bufferPoolSize,
|
final ObjectPool<ByteBuffer> transportPool = new ObjectPool<>(new AtomicLong(), new AtomicLong(), bufferPoolSize,
|
||||||
(Object... params) -> ByteBuffer.allocateDirect(bufferCapacity), null, (e) -> {
|
(Object... params) -> ByteBuffer.allocateDirect(bufferCapacity), null, (e) -> {
|
||||||
if (e == null || e.isReadOnly() || e.capacity() != bufferCapacity) return false;
|
if (e == null || e.isReadOnly() || e.capacity() != bufferCapacity) return false;
|
||||||
@@ -167,35 +167,35 @@ public class TransportFactory {
|
|||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
return create(transportExec, transportPool, transportGroup, readTimeoutSecond, writeTimeoutSecond);
|
return create(transportExec, transportPool, transportGroup, readTimeoutSeconds, writeTimeoutSeconds);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static TransportFactory create(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup) {
|
public static TransportFactory create(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup) {
|
||||||
return new TransportFactory(executor, bufferPool, channelGroup, null, DEFAULT_READTIMEOUTSECOND, DEFAULT_WRITETIMEOUTSECOND, null);
|
return new TransportFactory(executor, bufferPool, channelGroup, null, DEFAULT_READTIMEOUTSECONDS, DEFAULT_WRITETIMEOUTSECONDS, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static TransportFactory create(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup,
|
public static TransportFactory create(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup,
|
||||||
int readTimeoutSecond, int writeTimeoutSecond) {
|
int readTimeoutSeconds, int writeTimeoutSeconds) {
|
||||||
return new TransportFactory(executor, bufferPool, channelGroup, null, readTimeoutSecond, writeTimeoutSecond, null);
|
return new TransportFactory(executor, bufferPool, channelGroup, null, readTimeoutSeconds, writeTimeoutSeconds, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static TransportFactory create(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup,
|
public static TransportFactory create(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup,
|
||||||
int readTimeoutSecond, int writeTimeoutSecond, final TransportStrategy strategy) {
|
int readTimeoutSeconds, int writeTimeoutSeconds, final TransportStrategy strategy) {
|
||||||
return new TransportFactory(executor, bufferPool, channelGroup, null, readTimeoutSecond, writeTimeoutSecond, strategy);
|
return new TransportFactory(executor, bufferPool, channelGroup, null, readTimeoutSeconds, writeTimeoutSeconds, strategy);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static TransportFactory create(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup, SSLContext sslContext) {
|
public static TransportFactory create(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup, SSLContext sslContext) {
|
||||||
return new TransportFactory(executor, bufferPool, channelGroup, sslContext, DEFAULT_READTIMEOUTSECOND, DEFAULT_WRITETIMEOUTSECOND, null);
|
return new TransportFactory(executor, bufferPool, channelGroup, sslContext, DEFAULT_READTIMEOUTSECONDS, DEFAULT_WRITETIMEOUTSECONDS, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static TransportFactory create(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup,
|
public static TransportFactory create(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup,
|
||||||
SSLContext sslContext, int readTimeoutSecond, int writeTimeoutSecond) {
|
SSLContext sslContext, int readTimeoutSeconds, int writeTimeoutSeconds) {
|
||||||
return new TransportFactory(executor, bufferPool, channelGroup, sslContext, readTimeoutSecond, writeTimeoutSecond, null);
|
return new TransportFactory(executor, bufferPool, channelGroup, sslContext, readTimeoutSeconds, writeTimeoutSeconds, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static TransportFactory create(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup,
|
public static TransportFactory create(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup,
|
||||||
SSLContext sslContext, int readTimeoutSecond, int writeTimeoutSecond, final TransportStrategy strategy) {
|
SSLContext sslContext, int readTimeoutSeconds, int writeTimeoutSeconds, final TransportStrategy strategy) {
|
||||||
return new TransportFactory(executor, bufferPool, channelGroup, sslContext, readTimeoutSecond, writeTimeoutSecond, strategy);
|
return new TransportFactory(executor, bufferPool, channelGroup, sslContext, readTimeoutSeconds, writeTimeoutSeconds, strategy);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Transport createTransportTCP(String name, final InetSocketAddress clientAddress, final Collection<InetSocketAddress> addresses) {
|
public Transport createTransportTCP(String name, final InetSocketAddress clientAddress, final Collection<InetSocketAddress> addresses) {
|
||||||
|
|||||||
@@ -36,9 +36,9 @@ public class HttpContext extends Context {
|
|||||||
public HttpContext(long serverStartTime, Logger logger, ThreadPoolExecutor executor, SSLContext sslContext,
|
public HttpContext(long serverStartTime, Logger logger, ThreadPoolExecutor executor, SSLContext sslContext,
|
||||||
final int bufferCapacity, final ObjectPool<ByteBuffer> bufferPool, ObjectPool<Response> responsePool,
|
final int bufferCapacity, final ObjectPool<ByteBuffer> bufferPool, ObjectPool<Response> responsePool,
|
||||||
int maxbody, Charset charset, InetSocketAddress address, ResourceFactory resourceFactory,
|
int maxbody, Charset charset, InetSocketAddress address, ResourceFactory resourceFactory,
|
||||||
PrepareServlet prepare, int aliveTimeoutSecond, int readTimeoutSecond, int writeTimeoutSecond) {
|
PrepareServlet prepare, int aliveTimeoutSeconds, int readTimeoutSeconds, int writeTimeoutSeconds) {
|
||||||
super(serverStartTime, logger, executor, sslContext, bufferCapacity, bufferPool, responsePool,
|
super(serverStartTime, logger, executor, sslContext, bufferCapacity, bufferPool, responsePool,
|
||||||
maxbody, charset, address, resourceFactory, prepare, aliveTimeoutSecond, readTimeoutSecond, writeTimeoutSecond);
|
maxbody, charset, address, resourceFactory, prepare, aliveTimeoutSeconds, readTimeoutSeconds, writeTimeoutSeconds);
|
||||||
|
|
||||||
random.setSeed(Math.abs(System.nanoTime()));
|
random.setSeed(Math.abs(System.nanoTime()));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -154,7 +154,7 @@ public class HttpRequest extends Request<HttpContext> {
|
|||||||
case "Connection":
|
case "Connection":
|
||||||
case "connection":
|
case "connection":
|
||||||
this.connection = value;
|
this.connection = value;
|
||||||
if (context.getAliveTimeoutSecond() >= 0) {
|
if (context.getAliveTimeoutSeconds() >= 0) {
|
||||||
this.setKeepAlive(!"close".equalsIgnoreCase(value));
|
this.setKeepAlive(!"close".equalsIgnoreCase(value));
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|||||||
@@ -382,7 +382,7 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
|
|||||||
ObjectPool<Response> responsePool = HttpResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null);
|
ObjectPool<Response> responsePool = HttpResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null);
|
||||||
HttpContext httpcontext = new HttpContext(this.serverStartTime, this.logger, executor, this.sslContext,
|
HttpContext httpcontext = new HttpContext(this.serverStartTime, this.logger, executor, this.sslContext,
|
||||||
rcapacity, bufferPool, responsePool, this.maxbody, this.charset, this.address, this.resourceFactory,
|
rcapacity, bufferPool, responsePool, this.maxbody, this.charset, this.address, this.resourceFactory,
|
||||||
this.prepare, this.aliveTimeoutSecond, this.readTimeoutSecond, this.writeTimeoutSecond);
|
this.prepare, this.aliveTimeoutSeconds, this.readTimeoutSeconds, this.writeTimeoutSeconds);
|
||||||
responsePool.setCreator((Object... params) -> new HttpResponse(httpcontext, new HttpRequest(httpcontext, addrHeader),
|
responsePool.setCreator((Object... params) -> new HttpResponse(httpcontext, new HttpRequest(httpcontext, addrHeader),
|
||||||
plainType, jsonType, addHeaders, setHeaders, defCookie, options, ((HttpPrepareServlet) prepare).renders));
|
plainType, jsonType, addHeaders, setHeaders, defCookie, options, ((HttpPrepareServlet) prepare).renders));
|
||||||
return httpcontext;
|
return httpcontext;
|
||||||
|
|||||||
@@ -64,7 +64,7 @@ class WebSocketRunner implements Runnable {
|
|||||||
final boolean debug = context.getLogger().isLoggable(Level.FINEST);
|
final boolean debug = context.getLogger().isLoggable(Level.FINEST);
|
||||||
try {
|
try {
|
||||||
webSocket.onConnected();
|
webSocket.onConnected();
|
||||||
channel.setReadTimeoutSecond(300); //读取超时5分钟
|
channel.setReadTimeoutSeconds(300); //读取超时5分钟
|
||||||
if (channel.isOpen()) {
|
if (channel.isOpen()) {
|
||||||
final int wsmaxbody = webSocket._engine.wsmaxbody;
|
final int wsmaxbody = webSocket._engine.wsmaxbody;
|
||||||
channel.read(readBuffer, null, new CompletionHandler<Integer, Void>() {
|
channel.read(readBuffer, null, new CompletionHandler<Integer, Void>() {
|
||||||
|
|||||||
@@ -25,8 +25,8 @@ public class SncpContext extends Context {
|
|||||||
public SncpContext(long serverStartTime, Logger logger, ThreadPoolExecutor executor, SSLContext sslContext,
|
public SncpContext(long serverStartTime, Logger logger, ThreadPoolExecutor executor, SSLContext sslContext,
|
||||||
int bufferCapacity, ObjectPool<ByteBuffer> bufferPool, ObjectPool<Response> responsePool,
|
int bufferCapacity, ObjectPool<ByteBuffer> bufferPool, ObjectPool<Response> responsePool,
|
||||||
int maxbody, Charset charset, InetSocketAddress address, ResourceFactory resourceFactory,
|
int maxbody, Charset charset, InetSocketAddress address, ResourceFactory resourceFactory,
|
||||||
PrepareServlet prepare, int aliveTimeoutSecond, int readTimeoutSecond, int writeTimeoutSecond) {
|
PrepareServlet prepare, int aliveTimeoutSeconds, int readTimeoutSeconds, int writeTimeoutSeconds) {
|
||||||
super(serverStartTime, logger, executor, sslContext, bufferCapacity, bufferPool, responsePool,
|
super(serverStartTime, logger, executor, sslContext, bufferCapacity, bufferPool, responsePool,
|
||||||
maxbody, charset, address, resourceFactory, prepare, aliveTimeoutSecond, readTimeoutSecond, writeTimeoutSecond);
|
maxbody, charset, address, resourceFactory, prepare, aliveTimeoutSeconds, readTimeoutSeconds, writeTimeoutSeconds);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -108,7 +108,7 @@ public class SncpServer extends Server<DLong, SncpContext, SncpRequest, SncpResp
|
|||||||
ObjectPool<Response> responsePool = SncpResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null);
|
ObjectPool<Response> responsePool = SncpResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null);
|
||||||
SncpContext sncpcontext = new SncpContext(this.serverStartTime, this.logger, executor, this.sslContext,
|
SncpContext sncpcontext = new SncpContext(this.serverStartTime, this.logger, executor, this.sslContext,
|
||||||
rcapacity, bufferPool, responsePool, this.maxbody, this.charset, this.address, this.resourceFactory,
|
rcapacity, bufferPool, responsePool, this.maxbody, this.charset, this.address, this.resourceFactory,
|
||||||
this.prepare, this.aliveTimeoutSecond, this.readTimeoutSecond, this.writeTimeoutSecond);
|
this.prepare, this.aliveTimeoutSeconds, this.readTimeoutSeconds, this.writeTimeoutSeconds);
|
||||||
responsePool.setCreator((Object... params) -> new SncpResponse(sncpcontext, new SncpRequest(sncpcontext)));
|
responsePool.setCreator((Object... params) -> new SncpResponse(sncpcontext, new SncpRequest(sncpcontext)));
|
||||||
return sncpcontext;
|
return sncpcontext;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user