增加TCP级别上的最大连接数限制

This commit is contained in:
Redkale
2017-11-11 10:37:25 +08:00
parent 123b94398a
commit 6e70f2043e
8 changed files with 38 additions and 18 deletions

View File

@@ -107,6 +107,7 @@
charset: 文本编码, 默认: UTF-8 charset: 文本编码, 默认: UTF-8
backlog: 默认10K backlog: 默认10K
threads 线程总数, 默认: CPU核数*16 threads 线程总数, 默认: CPU核数*16
maxconns最大连接数, 小于1表示无限制 默认: 0
maxbody: request.body最大值 默认: 64K maxbody: request.body最大值 默认: 64K
bufferCapacity: ByteBuffer的初始化大小 默认: 8K; 如果是HTTP协议则默认: 16K + 16B (兼容HTTP 2.0、WebSocket) bufferCapacity: ByteBuffer的初始化大小 默认: 8K; 如果是HTTP协议则默认: 16K + 16B (兼容HTTP 2.0、WebSocket)
bufferPoolSize ByteBuffer池的大小默认: CPU核数*512 bufferPoolSize ByteBuffer池的大小默认: CPU核数*512

View File

@@ -31,10 +31,10 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
protected volatile long writetime; protected volatile long writetime;
//关闭数 //关闭数
AtomicLong closedCounter = new AtomicLong(); AtomicLong closedCounter;
//在线数 //在线数
AtomicLong livingCounter = new AtomicLong(); AtomicLong livingCounter;
public final long getLastReadTime() { public final long getLastReadTime() {
return readtime; return readtime;

View File

@@ -32,6 +32,9 @@ public abstract class ProtocolServer {
//在线数 //在线数
protected final AtomicLong livingCounter = new AtomicLong(); protected final AtomicLong livingCounter = new AtomicLong();
//最大连接数小于1表示无限制
protected int maxconns;
public abstract void open() throws IOException; public abstract void open() throws IOException;
public abstract void bind(SocketAddress local, int backlog) throws IOException; public abstract void bind(SocketAddress local, int backlog) throws IOException;
@@ -42,6 +45,10 @@ public abstract class ProtocolServer {
public abstract void accept(); public abstract void accept();
public void setMaxconns(int maxconns) {
this.maxconns = maxconns;
}
public abstract void close() throws IOException; public abstract void close() throws IOException;
public abstract AsynchronousChannelGroup getChannelGroup(); public abstract AsynchronousChannelGroup getChannelGroup();
@@ -198,6 +205,13 @@ public abstract class ProtocolServer {
@Override @Override
public void completed(final AsynchronousSocketChannel channel, Void attachment) { public void completed(final AsynchronousSocketChannel channel, Void attachment) {
serchannel.accept(null, this); serchannel.accept(null, this);
if (maxconns > 0 && livingCounter.get() >= maxconns) {
try {
channel.close();
} catch (Exception e) {
}
return;
}
createCounter.incrementAndGet(); createCounter.incrementAndGet();
livingCounter.incrementAndGet(); livingCounter.incrementAndGet();
AsyncConnection conn = AsyncConnection.create(channel, null, context.readTimeoutSecond, context.writeTimeoutSecond); AsyncConnection conn = AsyncConnection.create(channel, null, context.readTimeoutSecond, context.writeTimeoutSecond);

View File

@@ -88,6 +88,9 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
//IO写入 的超时秒数小于1视为不设置 //IO写入 的超时秒数小于1视为不设置
protected int writeTimeoutSecond; protected int writeTimeoutSecond;
//最大连接数
protected int maxconns;
protected Server(long serverStartTime, String protocol, PrepareServlet<K, C, R, P, S> servlet) { protected Server(long serverStartTime, String protocol, PrepareServlet<K, C, R, P, S> servlet) {
this.serverStartTime = serverStartTime; this.serverStartTime = serverStartTime;
this.protocol = protocol; this.protocol = protocol;
@@ -99,6 +102,7 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
this.config = config; this.config = config;
this.address = new InetSocketAddress(config.getValue("host", "0.0.0.0"), config.getIntValue("port", 80)); this.address = new InetSocketAddress(config.getValue("host", "0.0.0.0"), config.getIntValue("port", 80));
this.charset = Charset.forName(config.getValue("charset", "UTF-8")); this.charset = Charset.forName(config.getValue("charset", "UTF-8"));
this.maxconns = config.getIntValue("maxconns", 0);
this.readTimeoutSecond = config.getIntValue("readTimeoutSecond", 0); this.readTimeoutSecond = config.getIntValue("readTimeoutSecond", 0);
this.writeTimeoutSecond = config.getIntValue("writeTimeoutSecond", 0); this.writeTimeoutSecond = config.getIntValue("writeTimeoutSecond", 0);
this.backlog = parseLenth(config.getValue("backlog"), 8 * 1024); this.backlog = parseLenth(config.getValue("backlog"), 8 * 1024);
@@ -187,7 +191,8 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
this.serverChannel.setOption(StandardSocketOptions.TCP_NODELAY, true); this.serverChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
} }
serverChannel.bind(address, backlog); serverChannel.bind(address, backlog);
serverChannel.accept(); serverChannel.setMaxconns(this.maxconns);
serverChannel.accept();
final String threadName = "[" + Thread.currentThread().getName() + "] "; final String threadName = "[" + Thread.currentThread().getName() + "] ";
logger.info(threadName + this.getClass().getSimpleName() + ("TCP".equalsIgnoreCase(protocol) ? "" : ("." + protocol)) + " listen: " + address logger.info(threadName + this.getClass().getSimpleName() + ("TCP".equalsIgnoreCase(protocol) ? "" : ("." + protocol)) + " listen: " + address
+ ", threads: " + threads + ", bufferCapacity: " + bufferCapacity + ", bufferPoolSize: " + bufferPoolSize + ", responsePoolSize: " + responsePoolSize + ", threads: " + threads + ", bufferCapacity: " + bufferCapacity + ", bufferPoolSize: " + bufferPoolSize + ", responsePoolSize: " + responsePoolSize

View File

@@ -317,8 +317,8 @@ public final class Rest {
mv.visitFieldInsn(PUTFIELD, newDynName, "liveinterval", "I"); mv.visitFieldInsn(PUTFIELD, newDynName, "liveinterval", "I");
mv.visitVarInsn(ALOAD, 0); mv.visitVarInsn(ALOAD, 0);
pushInt(mv, rws.maxconns()); pushInt(mv, rws.wsmaxconns());
mv.visitFieldInsn(PUTFIELD, newDynName, "maxconns", "I"); mv.visitFieldInsn(PUTFIELD, newDynName, "wsmaxconns", "I");
mv.visitVarInsn(ALOAD, 0); mv.visitVarInsn(ALOAD, 0);
mv.visitInsn(rws.single() ? ICONST_1 : ICONST_0); mv.visitInsn(rws.single() ? ICONST_1 : ICONST_0);

View File

@@ -60,11 +60,11 @@ public @interface RestWebSocket {
int liveinterval() default WebSocketServlet.DEFAILT_LIVEINTERVAL; int liveinterval() default WebSocketServlet.DEFAILT_LIVEINTERVAL;
/** /**
* 最大连接数, 为0表示无限制 * 最大连接数, 小于1表示无限制
* *
* @return 最大连接数 * @return 最大连接数
*/ */
int maxconns() default 0; int wsmaxconns() default 0;
/** /**
* 是否屏蔽该类的转换 * 是否屏蔽该类的转换

View File

@@ -67,16 +67,16 @@ public class WebSocketEngine {
private int liveinterval; private int liveinterval;
@Comment("最大连接数, 为0表示无限制") @Comment("最大连接数, 为0表示无限制")
private int maxconns; private int wsmaxconns;
protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveinterval, int maxconns, WebSocketNode node, Convert sendConvert, Logger logger) { protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveinterval, int wsmaxconns, WebSocketNode node, Convert sendConvert, Logger logger) {
this.engineid = engineid; this.engineid = engineid;
this.single = single; this.single = single;
this.context = context; this.context = context;
this.sendConvert = sendConvert; this.sendConvert = sendConvert;
this.node = node; this.node = node;
this.liveinterval = liveinterval; this.liveinterval = liveinterval;
this.maxconns = maxconns; this.wsmaxconns = wsmaxconns;
this.logger = logger; this.logger = logger;
this.index = sequence.getAndIncrement(); this.index = sequence.getAndIncrement();
} }
@@ -86,7 +86,7 @@ public class WebSocketEngine {
if (conf != null && conf.getAnyValue("properties") != null) props = conf.getAnyValue("properties"); if (conf != null && conf.getAnyValue("properties") != null) props = conf.getAnyValue("properties");
this.liveinterval = props == null ? (liveinterval < 0 ? DEFAILT_LIVEINTERVAL : liveinterval) : props.getIntValue(WEBPARAM__LIVEINTERVAL, (liveinterval < 0 ? DEFAILT_LIVEINTERVAL : liveinterval)); this.liveinterval = props == null ? (liveinterval < 0 ? DEFAILT_LIVEINTERVAL : liveinterval) : props.getIntValue(WEBPARAM__LIVEINTERVAL, (liveinterval < 0 ? DEFAILT_LIVEINTERVAL : liveinterval));
if (liveinterval <= 0) return; if (liveinterval <= 0) return;
this.maxconns = props == null ? this.maxconns : props.getIntValue(WEBPARAM__MAXCONNS, this.maxconns); this.wsmaxconns = props == null ? this.wsmaxconns : props.getIntValue(WEBPARAM__WSMAXCONNS, this.wsmaxconns);
if (scheduler != null) return; if (scheduler != null) return;
this.scheduler = new ScheduledThreadPoolExecutor(1, (Runnable r) -> { this.scheduler = new ScheduledThreadPoolExecutor(1, (Runnable r) -> {
final Thread t = new Thread(r, engineid + "-WebSocket-LiveInterval-Thread"); final Thread t = new Thread(r, engineid + "-WebSocket-LiveInterval-Thread");
@@ -99,7 +99,7 @@ public class WebSocketEngine {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
getLocalWebSockets().stream().filter(x -> (now - x.getLastSendTime()) > intervalms).forEach(x -> x.sendPing()); getLocalWebSockets().stream().filter(x -> (now - x.getLastSendTime()) > intervalms).forEach(x -> x.sendPing());
}, delay, liveinterval, TimeUnit.SECONDS); }, delay, liveinterval, TimeUnit.SECONDS);
if (logger.isLoggable(Level.FINEST)) logger.finest(this.getClass().getSimpleName() + "(" + engineid + ")" + " start keeplive(delay:" + delay + ", maxconns:" + maxconns + ", interval:" + liveinterval + "s) scheduler executor"); if (logger.isLoggable(Level.FINEST)) logger.finest(this.getClass().getSimpleName() + "(" + engineid + ")" + " start keeplive(delay:" + delay + ", wsmaxconns:" + wsmaxconns + ", interval:" + liveinterval + "s) scheduler executor");
} }
void destroy(AnyValue conf) { void destroy(AnyValue conf) {
@@ -274,14 +274,14 @@ public class WebSocketEngine {
} }
@Comment("获取最大连接数") @Comment("获取最大连接数")
public int getLocalMaxconns() { public int getLocalWsmaxconns() {
return this.maxconns; return this.wsmaxconns;
} }
@Comment("连接数是否达到上限") @Comment("连接数是否达到上限")
public boolean isLocalConnLimited() { public boolean isLocalConnLimited() {
if (this.maxconns < 1) return false; if (this.wsmaxconns < 1) return false;
return currconns.get() >= this.maxconns; return currconns.get() >= this.wsmaxconns;
} }
@Comment("获取所有连接") @Comment("获取所有连接")

View File

@@ -46,7 +46,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
public static final String WEBPARAM__LIVEINTERVAL = "liveinterval"; public static final String WEBPARAM__LIVEINTERVAL = "liveinterval";
@Comment("WebScoket服务器最大连接数为0表示无限制") @Comment("WebScoket服务器最大连接数为0表示无限制")
public static final String WEBPARAM__MAXCONNS = "maxconns"; public static final String WEBPARAM__WSMAXCONNS = "wsmaxconns";
@Comment("WebScoket服务器给客户端进行ping操作的默认间隔时间, 单位: 秒") @Comment("WebScoket服务器给客户端进行ping操作的默认间隔时间, 单位: 秒")
public static final int DEFAILT_LIVEINTERVAL = 15; public static final int DEFAILT_LIVEINTERVAL = 15;
@@ -144,7 +144,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
return; return;
} }
if (this.node.localEngine.isLocalConnLimited()) { if (this.node.localEngine.isLocalConnLimited()) {
if (debug) logger.finest("WebSocket connections limit, maxconns=" + this.node.localEngine.getLocalMaxconns()); if (debug) logger.finest("WebSocket connections limit, wsmaxconns=" + this.node.localEngine.getLocalWsmaxconns());
response.finish(true); response.finish(true);
return; return;
} }