diff --git a/src/org/redkale/net/http/WebSocket.java b/src/org/redkale/net/http/WebSocket.java index 83aca6734..6de533bf3 100644 --- a/src/org/redkale/net/http/WebSocket.java +++ b/src/org/redkale/net/http/WebSocket.java @@ -802,7 +802,10 @@ public abstract class WebSocket { * 显式地关闭WebSocket */ public final void close() { - if (this._runner != null) this._runner.closeRunner(CLOSECODE_FORCED, "user close"); + if (this._runner != null) { + CompletableFuture future = this._runner.closeRunner(CLOSECODE_FORCED, "user close"); + if (future != null) future.join(); + } } /** diff --git a/src/org/redkale/net/http/WebSocketEngine.java b/src/org/redkale/net/http/WebSocketEngine.java index 01f51a172..486813705 100644 --- a/src/org/redkale/net/http/WebSocketEngine.java +++ b/src/org/redkale/net/http/WebSocketEngine.java @@ -144,12 +144,12 @@ public class WebSocketEngine { } @Comment("从WebSocketEngine删除指定WebSocket") - void removeThenClose(WebSocket socket) { + CompletableFuture removeThenClose(WebSocket socket) { Serializable userid = socket._userid; if (single) { currconns.decrementAndGet(); websockets.remove(userid); - if (node != null) node.disconnect(userid); + if (node != null) return node.disconnect(userid); } else { //非线程安全, 在常规场景中无需锁 List list = websockets2.get(userid); if (list != null) { @@ -157,10 +157,11 @@ public class WebSocketEngine { list.remove(socket); if (list.isEmpty()) { websockets2.remove(userid); - if (node != null) node.disconnect(userid); + if (node != null) return node.disconnect(userid); } } } + return null; } @Comment("更改WebSocket的userid") diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index bf9833b64..98d25cf82 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -80,7 +80,7 @@ public abstract class WebSocketNode { public final void postDestroy(AnyValue conf) { if (this.localEngine == null) return; //关掉所有本地本地WebSocket - this.localEngine.getLocalWebSockets().forEach(g -> disconnect(g.getUserid()).join()); + this.localEngine.getLocalWebSockets().forEach(g -> g.close()); if (sncpNodeAddresses != null && localSncpAddress != null) { sncpNodeAddresses.removeSetItem(SOURCE_SNCP_ADDRS_KEY, localSncpAddress); } diff --git a/src/org/redkale/net/http/WebSocketRunner.java b/src/org/redkale/net/http/WebSocketRunner.java index 444f7267c..1e6aaa830 100644 --- a/src/org/redkale/net/http/WebSocketRunner.java +++ b/src/org/redkale/net/http/WebSocketRunner.java @@ -74,7 +74,7 @@ class WebSocketRunner implements Runnable { @Override public void completed(Integer count, Void attachment1) { if (count < 1) { - if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner(userid="+webSocket.getUserid()+") abort on read buffer count, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds"); + if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner(userid=" + webSocket.getUserid() + ") abort on read buffer count, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds"); closeRunner(0, "read buffer count is " + count); return; } @@ -296,16 +296,17 @@ class WebSocketRunner implements Runnable { return closed; } - public void closeRunner(int code, String reason) { - if (closed) return; + public CompletableFuture closeRunner(int code, String reason) { + if (closed) return null; synchronized (this) { - if (closed) return; + if (closed) return null; closed = true; channel.dispose(); context.offerBuffer(readBuffer); readBuffer = null; - engine.removeThenClose(webSocket); + CompletableFuture future = engine.removeThenClose(webSocket); webSocket.onClose(code, reason); + return future; } }