WebSocket

This commit is contained in:
redkale
2023-10-09 09:28:21 +08:00
parent c58c603e6d
commit c66aafe7fe

View File

@@ -11,7 +11,9 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.logging.*;
import java.util.stream.Stream;
import java.util.zip.*;
@@ -917,25 +919,46 @@ public abstract class WebSocket<G extends Serializable, T> {
}
}
//closeRunner
/**
* 会出现线程锁的问题
* io.vertx.core.VertxException: Thread blocked
* at java.base/jdk.internal.misc.Unsafe.park(Native Method)
* at java.base/java.util.concurrent.locks.LockSupport.park(LockSupport.java:221)
* at java.base/java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1864)
* at java.base/java.util.concurrent.ForkJoinPool.unmanagedBlock(ForkJoinPool.java:3780)
* at java.base/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3725)
* at java.base/java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1898)
* at java.base/java.util.concurrent.CompletableFuture.join(CompletableFuture.java:2117)
* at org.redkale.net.http.WebSocket.close(WebSocket.java:916)
* at org.redkale.net.http.WebSocketEngine.forceCloseLocalWebSocket(WebSocketEngine.java:229)
* at org.redkale.net.http.WebSocketNode.forceCloseWebSocket(WebSocketNode.java:428)
* at org.redkale.net.http.WebSocketNode.forceCloseWebSocket(WebSocketNode.java:410)
* at org.redkale.net.http.WebSocket.forceCloseWebSocket(WebSocket.java:575)
* at org.redkale.net.http.WebSocket.onSingleRepeatConnect(WebSocket.java:856)
* at org.redkale.net.http.WebSocketServlet$1.lambda$completed$2(WebSocketServlet.java:314)
* at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
* at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
* at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
* at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179)
*/
CompletableFuture<Void> kill(int code, String reason) {
if (closed.compareAndSet(false, true)) {
if (_channel == null) {
return null;
}
Supplier<CompletableFuture> compose = () -> {
_channel.dispose();
if (_readHandler != null) {
_readHandler.byteArrayPool.accept(_readHandler.halfFrameBytes);
}
if (_writeHandler != null) {
_writeHandler.byteArrayPool.accept(_writeHandler.writeArray);
}
return onClose(code, reason);
};
CompletableFuture<Void> future = _engine.removeLocalThenDisconnect(this);
_channel.dispose();
if (_readHandler != null) {
_readHandler.byteArrayPool.accept(_readHandler.halfFrameBytes);
}
if (_writeHandler != null) {
_writeHandler.byteArrayPool.accept(_writeHandler.writeArray);
}
CompletableFuture closeFuture = onClose(code, reason);
if (closeFuture == null) {
return future;
}
return CompletableFuture.allOf(future, closeFuture);
return future == null ? compose.get()
: future.exceptionally(t -> null).thenCompose(v -> (CompletionStage) compose);
} else {
return null;
}