From 4c32422493e2c7b1bd4b8b07d8355ddfcba3e796 Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Sat, 2 Jun 2018 15:49:10 +0800 Subject: [PATCH] =?UTF-8?q?RestWebSocket=E5=A2=9E=E5=8A=A0wsthreads?= =?UTF-8?q?=E5=B1=9E=E6=80=A7=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/org/redkale/net/Transport.java | 2 +- src/org/redkale/net/http/RestWebSocket.java | 7 +++ src/org/redkale/net/http/WebSocketEngine.java | 9 +++- src/org/redkale/net/http/WebSocketNode.java | 50 +++++++++++++++++-- src/org/redkale/net/http/WebSocketRunner.java | 4 +- .../redkale/net/http/WebSocketServlet.java | 8 ++- .../redkale/service/WebSocketNodeService.java | 8 ++- 7 files changed, 78 insertions(+), 10 deletions(-) diff --git a/src/org/redkale/net/Transport.java b/src/org/redkale/net/Transport.java index f682e1f9a..893758805 100644 --- a/src/org/redkale/net/Transport.java +++ b/src/org/redkale/net/Transport.java @@ -418,7 +418,7 @@ public final class Transport { public TransportNode(int poolmaxconns, InetSocketAddress address, long disabletime) { this.address = address; this.disabletime = disabletime; - this.conns = new ArrayBlockingQueue<>(poolmaxconns); + this.conns = new LinkedBlockingQueue<>(poolmaxconns); } public int getPoolmaxconns() { diff --git a/src/org/redkale/net/http/RestWebSocket.java b/src/org/redkale/net/http/RestWebSocket.java index d7cc2c67d..1f921f868 100644 --- a/src/org/redkale/net/http/RestWebSocket.java +++ b/src/org/redkale/net/http/RestWebSocket.java @@ -81,6 +81,13 @@ public @interface RestWebSocket { */ int wsmaxconns() default 0; + /** + * 操作WebSocketNode对应CacheSource并发数, 为-1表示无限制,为0表示系统默认值(CPU*8) + * + * @return 最大连接数 + */ + int wsthreads() default 0; + /** * 最大消息体长度, 小于1表示无限制 * diff --git a/src/org/redkale/net/http/WebSocketEngine.java b/src/org/redkale/net/http/WebSocketEngine.java index 77fe39819..01f51a172 100644 --- a/src/org/redkale/net/http/WebSocketEngine.java +++ b/src/org/redkale/net/http/WebSocketEngine.java @@ -70,6 +70,9 @@ public class WebSocketEngine { @Comment("最大连接数, 为0表示无限制") protected int wsmaxconns; + @Comment("操作WebSocketNode对应CacheSource并发数, 为-1表示无限制,为0表示系统默认值(CPU*8)") + protected int wsthreads; + @Comment("最大消息体长度, 小于1表示无限制") protected int wsmaxbody; @@ -77,7 +80,7 @@ public class WebSocketEngine { protected Cryptor cryptor; protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveinterval, - int wsmaxconns, int wsmaxbody, Cryptor cryptor, WebSocketNode node, Convert sendConvert, Logger logger) { + int wsmaxconns, int wsthreads, int wsmaxbody, Cryptor cryptor, WebSocketNode node, Convert sendConvert, Logger logger) { this.engineid = engineid; this.single = single; this.context = context; @@ -85,6 +88,7 @@ public class WebSocketEngine { this.node = node; this.liveinterval = liveinterval; this.wsmaxconns = wsmaxconns; + this.wsthreads = wsthreads; this.wsmaxbody = wsmaxbody; this.cryptor = cryptor; this.logger = logger; @@ -97,6 +101,7 @@ public class WebSocketEngine { this.liveinterval = props == null ? (liveinterval < 0 ? DEFAILT_LIVEINTERVAL : liveinterval) : props.getIntValue(WEBPARAM__LIVEINTERVAL, (liveinterval < 0 ? DEFAILT_LIVEINTERVAL : liveinterval)); if (liveinterval <= 0) return; if (props != null) this.wsmaxconns = props.getIntValue(WEBPARAM__WSMAXCONNS, this.wsmaxconns); + if (props != null) this.wsthreads = props.getIntValue(WEBPARAM__WSTHREADS, this.wsthreads); if (props != null) this.wsmaxbody = props.getIntValue(WEBPARAM__WSMAXBODY, this.wsmaxbody); if (scheduler != null) return; this.scheduler = new ScheduledThreadPoolExecutor(1, (Runnable r) -> { @@ -139,7 +144,7 @@ public class WebSocketEngine { } @Comment("从WebSocketEngine删除指定WebSocket") - void remove(WebSocket socket) { + void removeThenClose(WebSocket socket) { Serializable userid = socket._userid; if (single) { currconns.decrementAndGet(); diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index f4ee6a60b..7f98dcab5 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -60,13 +60,25 @@ public abstract class WebSocketNode { //当前节点的本地WebSocketEngine protected WebSocketEngine localEngine; + protected Semaphore semaphore; + public void init(AnyValue conf) { if (sncpNodeAddresses != null) sncpNodeAddresses.initValueType(InetSocketAddress.class); + if (localEngine != null) { + int wsthreads = localEngine.wsthreads; + if (wsthreads == 0) wsthreads = Runtime.getRuntime().availableProcessors() * 8; + if (wsthreads > 0) this.semaphore = new Semaphore(wsthreads); + } } public void destroy(AnyValue conf) { } + @Local + public final Semaphore getSemaphore() { + return semaphore; + } + @Local public final void postDestroy(AnyValue conf) { if (this.localEngine == null) return; @@ -136,7 +148,12 @@ public abstract class WebSocketNode { * @return 地址列表 */ public CompletableFuture> getRpcNodeAddresses(final Serializable userid) { - if (this.sncpNodeAddresses != null) return this.sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid); + if (this.sncpNodeAddresses != null) { + tryAcquireSemaphore(); + CompletableFuture> result = this.sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid); + if (semaphore != null) result.whenComplete((r, e) -> releaseSemaphore()); + return result; + } List rs = new ArrayList<>(); rs.add(this.localSncpAddress); return CompletableFuture.completedFuture(rs); @@ -177,7 +194,10 @@ public abstract class WebSocketNode { if (this.localEngine != null && this.sncpNodeAddresses == null) { return CompletableFuture.completedFuture(this.localEngine.existsLocalWebSocket(userid)); } - return this.sncpNodeAddresses.existsAsync(SOURCE_SNCP_USERID_PREFIX + userid); + tryAcquireSemaphore(); + CompletableFuture rs = this.sncpNodeAddresses.existsAsync(SOURCE_SNCP_USERID_PREFIX + userid); + if (semaphore != null) rs.whenComplete((r, e) -> releaseSemaphore()); + return rs; } /** @@ -190,7 +210,10 @@ public abstract class WebSocketNode { if (this.localEngine != null && this.sncpNodeAddresses == null) { return CompletableFuture.completedFuture(this.localEngine.getLocalUserSize()); } - return this.sncpNodeAddresses.getLongAsync(SOURCE_SNCP_USERCOUNT_KEY, 0L).thenApply(v -> v.intValue()); + tryAcquireSemaphore(); + CompletableFuture rs = this.sncpNodeAddresses.getLongAsync(SOURCE_SNCP_USERCOUNT_KEY, 0L).thenApply(v -> v.intValue()); + if (semaphore != null) rs.whenComplete((r, e) -> releaseSemaphore()); + return rs; } /** @@ -210,7 +233,9 @@ public abstract class WebSocketNode { return localFuture; } //远程节点关闭 + tryAcquireSemaphore(); CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid); + if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore()); CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { if (logger.isLoggable(Level.FINEST)) logger.finest("websocket found userid:" + userid + " on " + addrs); if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0); @@ -486,7 +511,9 @@ public abstract class WebSocketNode { } final Object remoteMessage = formatRemoteMessage(message); CompletableFuture localFuture = this.localEngine == null ? null : this.localEngine.broadcastMessage(wsrange, message, last); + tryAcquireSemaphore(); CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_ADDRS_KEY); + if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore()); CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { if (logger.isLoggable(Level.FINEST)) logger.finest("websocket broadcast message (" + remoteMessage + ") on " + addrs); if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0); @@ -515,7 +542,9 @@ public abstract class WebSocketNode { } //远程节点发送消息 final Object remoteMessage = formatRemoteMessage(message); + tryAcquireSemaphore(); CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid); + if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore()); CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { if (addrs == null || addrs.isEmpty()) { if (logger.isLoggable(Level.FINER)) logger.finer("websocket not found userid:" + userid + " on any node "); @@ -541,4 +570,19 @@ public abstract class WebSocketNode { if (sendConvert instanceof BinaryConvert) ((BinaryConvert) sendConvert).convertTo(message); return JsonConvert.root().convertTo(message); } + + protected boolean tryAcquireSemaphore() { + if (this.semaphore == null) return true; + try { + System.out.println("---------this.semaphore.tryAcquire" ); + return this.semaphore.tryAcquire(6, TimeUnit.SECONDS); + } catch (Exception e) { + return false; + } + } + + protected void releaseSemaphore() { + if (this.semaphore != null) this.semaphore.release(); + System.out.println("---------this.semaphore.release: " + this.semaphore ); + } } diff --git a/src/org/redkale/net/http/WebSocketRunner.java b/src/org/redkale/net/http/WebSocketRunner.java index e9cab4d11..d48836bb3 100644 --- a/src/org/redkale/net/http/WebSocketRunner.java +++ b/src/org/redkale/net/http/WebSocketRunner.java @@ -79,7 +79,7 @@ class WebSocketRunner implements Runnable { @Override public void completed(Integer count, Void attachment1) { if (count < 1) { - if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on read buffer count, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds"); + if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner(userid="+webSocket.getUserid()+") abort on read buffer count, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds"); closeRunner(0, "read buffer count is " + count); return; } @@ -329,7 +329,7 @@ class WebSocketRunner implements Runnable { channel.dispose(); context.offerBuffer(readBuffer); readBuffer = null; - engine.remove(webSocket); + engine.removeThenClose(webSocket); webSocket.onClose(code, reason); QueueEntry entry = writeQueue.poll(); while (entry != null) { diff --git a/src/org/redkale/net/http/WebSocketServlet.java b/src/org/redkale/net/http/WebSocketServlet.java index 07bd8ad60..f889d2744 100644 --- a/src/org/redkale/net/http/WebSocketServlet.java +++ b/src/org/redkale/net/http/WebSocketServlet.java @@ -50,6 +50,9 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl @Comment("WebScoket服务器最大连接数,为0表示无限制") public static final String WEBPARAM__WSMAXCONNS = "wsmaxconns"; + @Comment("WebScoket服务器操作WebSocketNode对应CacheSource并发数, 为-1表示无限制,为0表示系统默认值(CPU*8)") + public static final String WEBPARAM__WSTHREADS = "wsthreads"; + @Comment("最大消息体长度, 小于1表示无限制") public static final String WEBPARAM__WSMAXBODY = "wsmaxbody"; @@ -76,6 +79,9 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl //同RestWebSocket.wsmaxconns protected int wsmaxconns = 0; + //同RestWebSocket.wsthreads + protected int wsthreads = 0; + //同RestWebSocket.wsmaxbody protected int wsmaxbody = 32 * 1024; @@ -147,7 +153,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl } //存在WebSocketServlet,则此WebSocketNode必须是本地模式Service this.node.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", - this.single, context, liveinterval, wsmaxconns, wsmaxbody, this.cryptor, this.node, this.sendConvert, logger); + this.single, context, liveinterval, wsmaxconns, wsthreads, wsmaxbody, this.cryptor, this.node, this.sendConvert, logger); this.node.init(conf); this.node.localEngine.init(conf); diff --git a/src/org/redkale/service/WebSocketNodeService.java b/src/org/redkale/service/WebSocketNodeService.java index 2ebd66e76..637970083 100644 --- a/src/org/redkale/service/WebSocketNodeService.java +++ b/src/org/redkale/service/WebSocketNodeService.java @@ -47,7 +47,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service { executor = ((WorkThread) thread).getExecutor(); } if (executor == null) executor = ForkJoinPool.commonPool(); - + return CompletableFuture.supplyAsync(() -> { final List rs = new ArrayList<>(); this.localEngine.getLocalWebSockets(groupid).forEach(x -> rs.add(x.getRemoteAddr())); @@ -77,9 +77,11 @@ public class WebSocketNodeService extends WebSocketNode implements Service { */ @Override public CompletableFuture connect(Serializable userid, InetSocketAddress sncpAddr) { + tryAcquireSemaphore(); CompletableFuture future = sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, sncpAddr); future = future.thenAccept((a) -> sncpNodeAddresses.incr(SOURCE_SNCP_USERCOUNT_KEY)); future = future.thenAccept((a) -> sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_ADDRS_KEY, sncpAddr)); + if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore()); if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " connect from " + sncpAddr); return future; } @@ -94,8 +96,10 @@ public class WebSocketNodeService extends WebSocketNode implements Service { */ @Override public CompletableFuture disconnect(Serializable userid, InetSocketAddress sncpAddr) { + tryAcquireSemaphore(); CompletableFuture future = sncpNodeAddresses.removeSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, sncpAddr); future = future.thenAccept((a) -> sncpNodeAddresses.decr(SOURCE_SNCP_USERCOUNT_KEY)); + if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore()); if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " disconnect from " + sncpAddr); return future; } @@ -111,8 +115,10 @@ public class WebSocketNodeService extends WebSocketNode implements Service { */ @Override public CompletableFuture changeUserid(Serializable olduserid, Serializable newuserid, InetSocketAddress sncpAddr) { + tryAcquireSemaphore(); CompletableFuture future = sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_USERID_PREFIX + newuserid, sncpAddr); future = future.thenAccept((a) -> sncpNodeAddresses.removeSetItemAsync(SOURCE_SNCP_USERID_PREFIX + olduserid, sncpAddr)); + if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore()); if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + olduserid + " changeUserid to " + newuserid + " from " + sncpAddr); return future; }