From c66aafe7fec5410512da3eadcb7a8eaf1f265b18 Mon Sep 17 00:00:00 2001 From: redkale Date: Mon, 9 Oct 2023 09:28:21 +0800 Subject: [PATCH] WebSocket --- .../java/org/redkale/net/http/WebSocket.java | 49 ++++++++++++++----- 1 file changed, 36 insertions(+), 13 deletions(-) diff --git a/src/main/java/org/redkale/net/http/WebSocket.java b/src/main/java/org/redkale/net/http/WebSocket.java index 42f65450b..1cfb3931a 100644 --- a/src/main/java/org/redkale/net/http/WebSocket.java +++ b/src/main/java/org/redkale/net/http/WebSocket.java @@ -11,7 +11,9 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import java.util.logging.*; import java.util.stream.Stream; import java.util.zip.*; @@ -917,25 +919,46 @@ public abstract class WebSocket { } } - //closeRunner + /** + * 会出现线程锁的问题 + * io.vertx.core.VertxException: Thread blocked + * at java.base/jdk.internal.misc.Unsafe.park(Native Method) + * at java.base/java.util.concurrent.locks.LockSupport.park(LockSupport.java:221) + * at java.base/java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1864) + * at java.base/java.util.concurrent.ForkJoinPool.unmanagedBlock(ForkJoinPool.java:3780) + * at java.base/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3725) + * at java.base/java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1898) + * at java.base/java.util.concurrent.CompletableFuture.join(CompletableFuture.java:2117) + * at org.redkale.net.http.WebSocket.close(WebSocket.java:916) + * at org.redkale.net.http.WebSocketEngine.forceCloseLocalWebSocket(WebSocketEngine.java:229) + * at org.redkale.net.http.WebSocketNode.forceCloseWebSocket(WebSocketNode.java:428) + * at org.redkale.net.http.WebSocketNode.forceCloseWebSocket(WebSocketNode.java:410) + * at org.redkale.net.http.WebSocket.forceCloseWebSocket(WebSocket.java:575) + * at org.redkale.net.http.WebSocket.onSingleRepeatConnect(WebSocket.java:856) + * at org.redkale.net.http.WebSocketServlet$1.lambda$completed$2(WebSocketServlet.java:314) + * at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) + * at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) + * at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) + * at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179) + */ CompletableFuture kill(int code, String reason) { if (closed.compareAndSet(false, true)) { if (_channel == null) { return null; } + Supplier compose = () -> { + _channel.dispose(); + if (_readHandler != null) { + _readHandler.byteArrayPool.accept(_readHandler.halfFrameBytes); + } + if (_writeHandler != null) { + _writeHandler.byteArrayPool.accept(_writeHandler.writeArray); + } + return onClose(code, reason); + }; CompletableFuture future = _engine.removeLocalThenDisconnect(this); - _channel.dispose(); - if (_readHandler != null) { - _readHandler.byteArrayPool.accept(_readHandler.halfFrameBytes); - } - if (_writeHandler != null) { - _writeHandler.byteArrayPool.accept(_writeHandler.writeArray); - } - CompletableFuture closeFuture = onClose(code, reason); - if (closeFuture == null) { - return future; - } - return CompletableFuture.allOf(future, closeFuture); + return future == null ? compose.get() + : future.exceptionally(t -> null).thenCompose(v -> (CompletionStage) compose); } else { return null; }