diff --git a/src/org/redkale/net/http/WebSocket.java b/src/org/redkale/net/http/WebSocket.java index bb31c06d7..d4592f61d 100644 --- a/src/org/redkale/net/http/WebSocket.java +++ b/src/org/redkale/net/http/WebSocket.java @@ -37,6 +37,9 @@ import org.redkale.util.Comment; */ public abstract class WebSocket { + @Comment("强制关闭结果码") + public static final int CLOSECODE_FORCED = 1; + @Comment("消息不合法") public static final int RETCODE_SEND_ILLPACKET = 1 << 1; //2 @@ -367,6 +370,18 @@ public abstract class WebSocket { return _engine.node.getRpcNodeWebSocketAddresses(userid); } + /** + * 强制关闭用户的所有WebSocket + * + * @param userid Serializable + * + * @return int + */ + @Comment("强制关闭用户的所有WebSocket") + public CompletableFuture forceCloseWebSocket(Serializable userid) { + return _engine.node.forceCloseWebSocket(userid); + } + /** * 获取当前WebSocket下的属性,非线程安全 * @@ -612,7 +627,16 @@ public abstract class WebSocket { * 显式地关闭WebSocket */ public final void close() { - if (this._runner != null) this._runner.closeRunner(); + if (this._runner != null) this._runner.closeRunner(CLOSECODE_FORCED); + } + + /** + * 是否关闭 + * + * @return boolean + */ + public final boolean isClosed() { + return this._runner != null ? this._runner.closed : true; } @Override diff --git a/src/org/redkale/net/http/WebSocketEngine.java b/src/org/redkale/net/http/WebSocketEngine.java index 83ea0915c..3912c3069 100644 --- a/src/org/redkale/net/http/WebSocketEngine.java +++ b/src/org/redkale/net/http/WebSocketEngine.java @@ -132,6 +132,23 @@ public class WebSocketEngine { } } + @Comment("强制关闭本地用户的WebSocket") + public int forceCloseLocalWebSocket(Serializable userid) { + if (single) { + WebSocket ws = websockets.get(userid); + if (ws == null) return 0; + ws.close(); + return 1; + } + List list = websockets2.get(userid); + if (list == null || list.isEmpty()) return 0; + List list2 = new ArrayList<>(list); + for (WebSocket ws : list2) { + ws.close(); + } + return list2.size(); + } + @Comment("给所有连接用户发送消息") public CompletableFuture broadcastMessage(final Object message, final boolean last) { return broadcastMessage(null, message, last); diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index dd1177b44..e06e1cfd2 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -79,6 +79,8 @@ public abstract class WebSocketNode { protected abstract CompletableFuture disconnect(Serializable userid, InetSocketAddress addr); + protected abstract CompletableFuture forceCloseWebSocket(Serializable userid, InetSocketAddress addr); + //-------------------------------------------------------------------------------- final CompletableFuture connect(final Serializable userid) { if (finest) logger.finest(localSncpAddress + " receive websocket connect event (" + userid + " on " + this.localEngine.getEngineid() + ")."); @@ -178,6 +180,37 @@ public abstract class WebSocketNode { }); } + /** + * 强制关闭用户WebSocket + * + * @param userid Serializable + * + * @return int + */ + public final CompletableFuture forceCloseWebSocket(final Serializable userid) { + CompletableFuture localFuture = null; + if (this.localEngine != null) localFuture = CompletableFuture.completedFuture(localEngine.forceCloseLocalWebSocket(userid)); + if (this.sncpNodeAddresses == null || this.remoteNode == null) { + if (finest) logger.finest("websocket remote node is null"); + //没有CacheSource就不会有分布式节点 + return localFuture; + } + //远程节点关闭 + CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(userid); + CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { + if (finest) logger.finest("websocket found userid:" + userid + " on " + addrs); + if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0); + CompletableFuture future = null; + for (InetSocketAddress addr : addrs) { + if (addr == null || addr.equals(localSncpAddress)) continue; + future = future == null ? remoteNode.forceCloseWebSocket(userid, addr) + : future.thenCombine(remoteNode.forceCloseWebSocket(userid, addr), (a, b) -> a + b); + } + return future == null ? CompletableFuture.completedFuture(0) : future; + }); + return localFuture.thenCombine(remoteFuture, (a, b) -> a + b); + } + //-------------------------------------------------------------------------------- /** * 获取本地的WebSocketEngine,没有则返回null diff --git a/src/org/redkale/net/http/WebSocketRunner.java b/src/org/redkale/net/http/WebSocketRunner.java index df3467433..032a117a4 100644 --- a/src/org/redkale/net/http/WebSocketRunner.java +++ b/src/org/redkale/net/http/WebSocketRunner.java @@ -39,7 +39,7 @@ class WebSocketRunner implements Runnable { private ByteBuffer readBuffer; - protected volatile boolean closed = false; + volatile boolean closed = false; private AtomicBoolean writing = new AtomicBoolean(); @@ -75,7 +75,7 @@ class WebSocketRunner implements Runnable { @Override public void completed(Integer count, Void attachment1) { if (count < 1 && readBuffers.isEmpty()) { - closeRunner(); + closeRunner(0); if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on read buffer count, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds"); return; } @@ -225,7 +225,7 @@ class WebSocketRunner implements Runnable { } } } catch (Throwable t) { - closeRunner(); + closeRunner(0); if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on read WebSocketPacket, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", t); } finally { if (exBuffers != null) { @@ -238,18 +238,18 @@ class WebSocketRunner implements Runnable { @Override public void failed(Throwable exc, Void attachment2) { - closeRunner(); + closeRunner(0); if (exc != null) { context.getLogger().log(Level.FINEST, "WebSocketRunner read WebSocketPacket failed, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", exc); } } }); } else { - closeRunner(); + closeRunner(0); context.getLogger().log(Level.FINEST, "WebSocketRunner abort by AsyncConnection closed"); } } catch (Exception e) { - closeRunner(); + closeRunner(0); context.getLogger().log(Level.FINEST, "WebSocketRunner abort on read bytes from channel, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", e); } } @@ -315,7 +315,7 @@ class WebSocketRunner implements Runnable { channel.write(buffers, buffers, this); } } catch (Exception e) { - closeRunner(); + closeRunner(0); context.getLogger().log(Level.WARNING, "WebSocket sendMessage abort on rewrite, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", e); } writing.set(false); @@ -324,7 +324,7 @@ class WebSocketRunner implements Runnable { @Override public void failed(Throwable exc, ByteBuffer[] attachments) { writing.set(false); - closeRunner(); + closeRunner(0); if (exc != null) { context.getLogger().log(Level.FINE, "WebSocket sendMessage on CompletionHandler failed, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", exc); } @@ -332,14 +332,14 @@ class WebSocketRunner implements Runnable { }); } catch (Exception t) { writing.set(false); - closeRunner(); + closeRunner(0); context.getLogger().log(Level.FINE, "WebSocket sendMessage abort, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", t); futureResult.complete(RETCODE_SENDEXCEPTION); } return futureResult; } - public void closeRunner() { + public void closeRunner(int code) { if (closed) return; synchronized (this) { if (closed) return; @@ -351,7 +351,7 @@ class WebSocketRunner implements Runnable { context.offerBuffer(readBuffer); readBuffer = null; engine.remove(webSocket); - webSocket.onClose(0, null); + webSocket.onClose(code, null); } } diff --git a/src/org/redkale/service/WebSocketNodeService.java b/src/org/redkale/service/WebSocketNodeService.java index 74a742c6e..f55e72272 100644 --- a/src/org/redkale/service/WebSocketNodeService.java +++ b/src/org/redkale/service/WebSocketNodeService.java @@ -87,4 +87,20 @@ public class WebSocketNodeService extends WebSocketNode implements Service { if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " disconnect from " + sncpAddr); return future; } + + /** + * 强制关闭用户的WebSocket + * + * @param userid String + * @param sncpAddr InetSocketAddress + * + * @return 无返回值 + */ + @Override + public CompletableFuture forceCloseWebSocket(Serializable userid, InetSocketAddress sncpAddr) { + //不能从sncpNodeAddresses中移除,因为engine.forceCloseWebSocket 会调用到disconnect + if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " forceCloseWebSocket from " + sncpAddr); + if (localEngine == null) return CompletableFuture.completedFuture(0); + return CompletableFuture.completedFuture(localEngine.forceCloseLocalWebSocket(userid)); + } }