From 2b1d09b027f23864aa2e2b1c4451be969aae9177 Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Tue, 7 Nov 2017 17:39:30 +0800 Subject: [PATCH] =?UTF-8?q?WebSocket=E5=A2=9E=E5=8A=A0=E6=9C=80=E5=A4=A7?= =?UTF-8?q?=E8=BF=9E=E6=8E=A5=E6=95=B0=E8=AE=BE=E7=BD=AE=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/org/redkale/net/http/Rest.java | 7 ++++++ src/org/redkale/net/http/RestWebSocket.java | 7 ++++++ src/org/redkale/net/http/WebSocketEngine.java | 24 ++++++++++++++++++- .../redkale/net/http/WebSocketServlet.java | 9 ++++++- 4 files changed, 45 insertions(+), 2 deletions(-) diff --git a/src/org/redkale/net/http/Rest.java b/src/org/redkale/net/http/Rest.java index 93c1d366b..a5194d68b 100644 --- a/src/org/redkale/net/http/Rest.java +++ b/src/org/redkale/net/http/Rest.java @@ -319,6 +319,13 @@ public final class Rest { } mv.visitFieldInsn(PUTFIELD, newDynName, "liveinterval", "I"); mv.visitVarInsn(ALOAD, 0); + if (rws.maxconns()< 6) { + mv.visitInsn(ICONST_0 + rws.maxconns()); + } else { + mv.visitIntInsn(BIPUSH, rws.maxconns()); + } + mv.visitFieldInsn(PUTFIELD, newDynName, "maxconns", "I"); + mv.visitVarInsn(ALOAD, 0); mv.visitInsn(rws.single() ? ICONST_1 : ICONST_0); mv.visitFieldInsn(PUTFIELD, newDynName, "single", "Z"); mv.visitInsn(RETURN); diff --git a/src/org/redkale/net/http/RestWebSocket.java b/src/org/redkale/net/http/RestWebSocket.java index 1f3ab7957..4a38fe3c6 100644 --- a/src/org/redkale/net/http/RestWebSocket.java +++ b/src/org/redkale/net/http/RestWebSocket.java @@ -59,6 +59,13 @@ public @interface RestWebSocket { */ int liveinterval() default WebSocketServlet.DEFAILT_LIVEINTERVAL; + /** + * 最大连接数, 为0表示无限制 + * + * @return 最大连接数 + */ + int maxconns() default 0; + /** * 是否屏蔽该类的转换 * diff --git a/src/org/redkale/net/http/WebSocketEngine.java b/src/org/redkale/net/http/WebSocketEngine.java index e0c4716e3..15e7769b9 100644 --- a/src/org/redkale/net/http/WebSocketEngine.java +++ b/src/org/redkale/net/http/WebSocketEngine.java @@ -53,6 +53,9 @@ public class WebSocketEngine { @Comment("在线用户ID对应的WebSocket组,用于单用户多连接模式") private final Map> websockets2 = new ConcurrentHashMap<>(); + @Comment("当前连接数") + private final AtomicInteger currconns = new AtomicInteger(); + @Comment("用于PING的定时器") private ScheduledThreadPoolExecutor scheduler; @@ -65,13 +68,17 @@ public class WebSocketEngine { @Comment("PING的间隔秒数") private int liveinterval; - protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveinterval, WebSocketNode node, Convert sendConvert, Logger logger) { + @Comment("最大连接数, 为0表示无限制") + private int maxconns; + + protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveinterval, int maxconns, WebSocketNode node, Convert sendConvert, Logger logger) { this.engineid = engineid; this.single = single; this.context = context; this.sendConvert = sendConvert; this.node = node; this.liveinterval = liveinterval; + this.maxconns = maxconns; this.logger = logger; this.finest = logger.isLoggable(Level.FINEST); this.index = sequence.getAndIncrement(); @@ -102,6 +109,7 @@ public class WebSocketEngine { @Comment("添加WebSocket") void add(WebSocket socket) { if (single) { + currconns.incrementAndGet(); websockets.put(socket._userid, socket); } else { //非线程安全, 在常规场景中无需锁 List list = websockets2.get(socket._userid); @@ -109,6 +117,7 @@ public class WebSocketEngine { list = new CopyOnWriteArrayList<>(); websockets2.put(socket._userid, list); } + currconns.incrementAndGet(); list.add(socket); } if (node != null) node.connect(socket._userid); @@ -118,11 +127,13 @@ public class WebSocketEngine { void remove(WebSocket socket) { Serializable userid = socket._userid; if (single) { + currconns.decrementAndGet(); websockets.remove(userid); if (node != null) node.disconnect(userid); } else { //非线程安全, 在常规场景中无需锁 List list = websockets2.get(userid); if (list != null) { + currconns.decrementAndGet(); list.remove(socket); if (list.isEmpty()) { websockets2.remove(userid); @@ -262,6 +273,17 @@ public class WebSocketEngine { } } + @Comment("获取最大连接数") + public int getLocalMaxconns() { + return this.maxconns; + } + + @Comment("连接数是否达到上限") + public boolean isLocalConnLimited() { + if (this.maxconns < 1) return false; + return currconns.get() >= this.maxconns; + } + @Comment("获取所有连接") public Collection getLocalWebSockets() { if (single) return websockets.values(); diff --git a/src/org/redkale/net/http/WebSocketServlet.java b/src/org/redkale/net/http/WebSocketServlet.java index 91475eaca..eb4ed4c33 100644 --- a/src/org/redkale/net/http/WebSocketServlet.java +++ b/src/org/redkale/net/http/WebSocketServlet.java @@ -60,6 +60,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl protected int liveinterval = DEFAILT_LIVEINTERVAL; + protected int maxconns = 0; + @Resource(name = "jsonconvert") protected Convert jsonConvert; @@ -106,7 +108,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl if (logger.isLoggable(Level.WARNING)) logger.warning("Not found WebSocketNode, create a default value for " + getClass().getName()); } //存在WebSocketServlet,则此WebSocketNode必须是本地模式Service - this.node.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", this.single, context, liveinterval, this.node, this.sendConvert, logger); + this.node.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", this.single, context, liveinterval, maxconns, this.node, this.sendConvert, logger); this.node.init(conf); this.node.localEngine.init(conf); } @@ -137,6 +139,11 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl response.finish(true); return; } + if (this.node.localEngine.isLocalConnLimited()) { + if (debug) logger.finest("WebSocket connections limit, maxconns=" + this.node.localEngine.getLocalMaxconns()); + response.finish(true); + return; + } final WebSocket webSocket = this.createWebSocket(); webSocket._engine = this.node.localEngine; webSocket._messageTextType = this.messageTextType;