WebSocket增加最大连接数设置功能

This commit is contained in:
Redkale
2017-11-07 17:39:30 +08:00
parent 4f3c2e071a
commit 2b1d09b027
4 changed files with 45 additions and 2 deletions

View File

@@ -319,6 +319,13 @@ public final class Rest {
}
mv.visitFieldInsn(PUTFIELD, newDynName, "liveinterval", "I");
mv.visitVarInsn(ALOAD, 0);
if (rws.maxconns()< 6) {
mv.visitInsn(ICONST_0 + rws.maxconns());
} else {
mv.visitIntInsn(BIPUSH, rws.maxconns());
}
mv.visitFieldInsn(PUTFIELD, newDynName, "maxconns", "I");
mv.visitVarInsn(ALOAD, 0);
mv.visitInsn(rws.single() ? ICONST_1 : ICONST_0);
mv.visitFieldInsn(PUTFIELD, newDynName, "single", "Z");
mv.visitInsn(RETURN);

View File

@@ -59,6 +59,13 @@ public @interface RestWebSocket {
*/
int liveinterval() default WebSocketServlet.DEFAILT_LIVEINTERVAL;
/**
* 最大连接数, 为0表示无限制
*
* @return 最大连接数
*/
int maxconns() default 0;
/**
* 是否屏蔽该类的转换
*

View File

@@ -53,6 +53,9 @@ public class WebSocketEngine {
@Comment("在线用户ID对应的WebSocket组用于单用户多连接模式")
private final Map<Serializable, List<WebSocket>> websockets2 = new ConcurrentHashMap<>();
@Comment("当前连接数")
private final AtomicInteger currconns = new AtomicInteger();
@Comment("用于PING的定时器")
private ScheduledThreadPoolExecutor scheduler;
@@ -65,13 +68,17 @@ public class WebSocketEngine {
@Comment("PING的间隔秒数")
private int liveinterval;
protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveinterval, WebSocketNode node, Convert sendConvert, Logger logger) {
@Comment("最大连接数, 为0表示无限制")
private int maxconns;
protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveinterval, int maxconns, WebSocketNode node, Convert sendConvert, Logger logger) {
this.engineid = engineid;
this.single = single;
this.context = context;
this.sendConvert = sendConvert;
this.node = node;
this.liveinterval = liveinterval;
this.maxconns = maxconns;
this.logger = logger;
this.finest = logger.isLoggable(Level.FINEST);
this.index = sequence.getAndIncrement();
@@ -102,6 +109,7 @@ public class WebSocketEngine {
@Comment("添加WebSocket")
void add(WebSocket socket) {
if (single) {
currconns.incrementAndGet();
websockets.put(socket._userid, socket);
} else { //非线程安全, 在常规场景中无需锁
List<WebSocket> list = websockets2.get(socket._userid);
@@ -109,6 +117,7 @@ public class WebSocketEngine {
list = new CopyOnWriteArrayList<>();
websockets2.put(socket._userid, list);
}
currconns.incrementAndGet();
list.add(socket);
}
if (node != null) node.connect(socket._userid);
@@ -118,11 +127,13 @@ public class WebSocketEngine {
void remove(WebSocket socket) {
Serializable userid = socket._userid;
if (single) {
currconns.decrementAndGet();
websockets.remove(userid);
if (node != null) node.disconnect(userid);
} else { //非线程安全, 在常规场景中无需锁
List<WebSocket> list = websockets2.get(userid);
if (list != null) {
currconns.decrementAndGet();
list.remove(socket);
if (list.isEmpty()) {
websockets2.remove(userid);
@@ -262,6 +273,17 @@ public class WebSocketEngine {
}
}
@Comment("获取最大连接数")
public int getLocalMaxconns() {
return this.maxconns;
}
@Comment("连接数是否达到上限")
public boolean isLocalConnLimited() {
if (this.maxconns < 1) return false;
return currconns.get() >= this.maxconns;
}
@Comment("获取所有连接")
public Collection<WebSocket> getLocalWebSockets() {
if (single) return websockets.values();

View File

@@ -60,6 +60,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
protected int liveinterval = DEFAILT_LIVEINTERVAL;
protected int maxconns = 0;
@Resource(name = "jsonconvert")
protected Convert jsonConvert;
@@ -106,7 +108,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
if (logger.isLoggable(Level.WARNING)) logger.warning("Not found WebSocketNode, create a default value for " + getClass().getName());
}
//存在WebSocketServlet则此WebSocketNode必须是本地模式Service
this.node.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", this.single, context, liveinterval, this.node, this.sendConvert, logger);
this.node.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", this.single, context, liveinterval, maxconns, this.node, this.sendConvert, logger);
this.node.init(conf);
this.node.localEngine.init(conf);
}
@@ -137,6 +139,11 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
response.finish(true);
return;
}
if (this.node.localEngine.isLocalConnLimited()) {
if (debug) logger.finest("WebSocket connections limit, maxconns=" + this.node.localEngine.getLocalMaxconns());
response.finish(true);
return;
}
final WebSocket webSocket = this.createWebSocket();
webSocket._engine = this.node.localEngine;
webSocket._messageTextType = this.messageTextType;