From e88c4fa2e39dcf7689dded8cd6641a9cc83e0267 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Tue, 24 Jul 2018 20:11:02 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E8=BF=9B=E7=A8=8B=E5=85=B3?= =?UTF-8?q?=E9=97=AD=E6=97=B6WebSocket=E6=B2=A1=E6=9C=89=E6=89=A7=E8=A1=8C?= =?UTF-8?q?onClose=E6=96=B9=E6=B3=95=E7=9A=84BUG?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/org/redkale/net/http/WebSocket.java | 5 ++++- src/org/redkale/net/http/WebSocketEngine.java | 7 ++++--- src/org/redkale/net/http/WebSocketNode.java | 2 +- src/org/redkale/net/http/WebSocketRunner.java | 11 ++++++----- 4 files changed, 15 insertions(+), 10 deletions(-) diff --git a/src/org/redkale/net/http/WebSocket.java b/src/org/redkale/net/http/WebSocket.java index 83aca6734..6de533bf3 100644 --- a/src/org/redkale/net/http/WebSocket.java +++ b/src/org/redkale/net/http/WebSocket.java @@ -802,7 +802,10 @@ public abstract class WebSocket { * 显式地关闭WebSocket */ public final void close() { - if (this._runner != null) this._runner.closeRunner(CLOSECODE_FORCED, "user close"); + if (this._runner != null) { + CompletableFuture future = this._runner.closeRunner(CLOSECODE_FORCED, "user close"); + if (future != null) future.join(); + } } /** diff --git a/src/org/redkale/net/http/WebSocketEngine.java b/src/org/redkale/net/http/WebSocketEngine.java index 01f51a172..486813705 100644 --- a/src/org/redkale/net/http/WebSocketEngine.java +++ b/src/org/redkale/net/http/WebSocketEngine.java @@ -144,12 +144,12 @@ public class WebSocketEngine { } @Comment("从WebSocketEngine删除指定WebSocket") - void removeThenClose(WebSocket socket) { + CompletableFuture removeThenClose(WebSocket socket) { Serializable userid = socket._userid; if (single) { currconns.decrementAndGet(); websockets.remove(userid); - if (node != null) node.disconnect(userid); + if (node != null) return node.disconnect(userid); } else { //非线程安全, 在常规场景中无需锁 List list = websockets2.get(userid); if (list != null) { @@ -157,10 +157,11 @@ public class WebSocketEngine { list.remove(socket); if (list.isEmpty()) { websockets2.remove(userid); - if (node != null) node.disconnect(userid); + if (node != null) return node.disconnect(userid); } } } + return null; } @Comment("更改WebSocket的userid") diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index bf9833b64..98d25cf82 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -80,7 +80,7 @@ public abstract class WebSocketNode { public final void postDestroy(AnyValue conf) { if (this.localEngine == null) return; //关掉所有本地本地WebSocket - this.localEngine.getLocalWebSockets().forEach(g -> disconnect(g.getUserid()).join()); + this.localEngine.getLocalWebSockets().forEach(g -> g.close()); if (sncpNodeAddresses != null && localSncpAddress != null) { sncpNodeAddresses.removeSetItem(SOURCE_SNCP_ADDRS_KEY, localSncpAddress); } diff --git a/src/org/redkale/net/http/WebSocketRunner.java b/src/org/redkale/net/http/WebSocketRunner.java index 444f7267c..1e6aaa830 100644 --- a/src/org/redkale/net/http/WebSocketRunner.java +++ b/src/org/redkale/net/http/WebSocketRunner.java @@ -74,7 +74,7 @@ class WebSocketRunner implements Runnable { @Override public void completed(Integer count, Void attachment1) { if (count < 1) { - if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner(userid="+webSocket.getUserid()+") abort on read buffer count, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds"); + if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner(userid=" + webSocket.getUserid() + ") abort on read buffer count, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds"); closeRunner(0, "read buffer count is " + count); return; } @@ -296,16 +296,17 @@ class WebSocketRunner implements Runnable { return closed; } - public void closeRunner(int code, String reason) { - if (closed) return; + public CompletableFuture closeRunner(int code, String reason) { + if (closed) return null; synchronized (this) { - if (closed) return; + if (closed) return null; closed = true; channel.dispose(); context.offerBuffer(readBuffer); readBuffer = null; - engine.removeThenClose(webSocket); + CompletableFuture future = engine.removeThenClose(webSocket); webSocket.onClose(code, reason); + return future; } }