From 71cec55f1c6e0bd68803b3922a32b0e3bff7041d Mon Sep 17 00:00:00 2001 From: redkale Date: Wed, 9 Oct 2024 18:07:55 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8DWebSocket.readPending?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/org/redkale/net/Response.java | 3 ++ .../org/redkale/net/http/HttpRequest.java | 2 +- .../org/redkale/net/http/HttpResponse.java | 9 ++++ .../net/http/WebSocketReadHandler.java | 7 ++- .../redkale/net/http/WebSocketServlet.java | 49 ++++++++++--------- 5 files changed, 41 insertions(+), 29 deletions(-) diff --git a/src/main/java/org/redkale/net/Response.java b/src/main/java/org/redkale/net/Response.java index 3b2ab1845..b7f82c51c 100644 --- a/src/main/java/org/redkale/net/Response.java +++ b/src/main/java/org/redkale/net/Response.java @@ -162,6 +162,8 @@ public abstract class Response> { } this.responseSupplier = null; this.responseConsumer = null; + this.recycleListener = null; + this.afterFinishListeners = null; this.inited = false; return true; } @@ -180,6 +182,7 @@ public abstract class Response> { protected void refuseAlive() { this.request.keepAlive = false; + this.readRegistered = true; } protected void init(AsyncConnection channel) { diff --git a/src/main/java/org/redkale/net/http/HttpRequest.java b/src/main/java/org/redkale/net/http/HttpRequest.java index 6b09ee6e7..9617861a1 100644 --- a/src/main/java/org/redkale/net/http/HttpRequest.java +++ b/src/main/java/org/redkale/net/http/HttpRequest.java @@ -401,7 +401,7 @@ public class HttpRequest extends Request { this.keepAlive = false; } // readCompleted=true时ProtocolCodec会继续读下一个request - this.readCompleted = !this.boundary && !maybews; + this.readCompleted = !this.boundary; this.bodyBytes.clear(); this.readState = READ_STATE_BODY; } diff --git a/src/main/java/org/redkale/net/http/HttpResponse.java b/src/main/java/org/redkale/net/http/HttpResponse.java index 558fbade6..d68ec223e 100644 --- a/src/main/java/org/redkale/net/http/HttpResponse.java +++ b/src/main/java/org/redkale/net/http/HttpResponse.java @@ -1032,6 +1032,15 @@ public class HttpResponse extends Response { super.finish(false, data.content(), 0, data.length()); } + void kill() { + refuseAlive(); + this.responseConsumer.accept(this); + } + + void skipReadRegistered() { + this.readRegistered = true; + } + /** 以304状态码输出 */ public void finish304() { skipHeader(); diff --git a/src/main/java/org/redkale/net/http/WebSocketReadHandler.java b/src/main/java/org/redkale/net/http/WebSocketReadHandler.java index 265bf247d..efa679988 100644 --- a/src/main/java/org/redkale/net/http/WebSocketReadHandler.java +++ b/src/main/java/org/redkale/net/http/WebSocketReadHandler.java @@ -5,8 +5,6 @@ */ package org.redkale.net.http; -import static org.redkale.net.http.WebSocket.*; - import java.nio.ByteBuffer; import java.nio.channels.CompletionHandler; import java.nio.charset.StandardCharsets; @@ -16,6 +14,7 @@ import java.util.function.BiConsumer; import java.util.logging.*; import org.redkale.convert.Convert; import org.redkale.net.AsyncIOThread; +import static org.redkale.net.http.WebSocket.*; import org.redkale.net.http.WebSocketPacket.FrameType; import org.redkale.util.*; @@ -131,7 +130,7 @@ public class WebSocketReadHandler implements CompletionHandler65535 length = (int) realbuf.getLong(); } else { @@ -188,7 +187,7 @@ public class WebSocketReadHandler implements CompletionHandler65535 length = (int) realbuf.getLong(); } else { diff --git a/src/main/java/org/redkale/net/http/WebSocketServlet.java b/src/main/java/org/redkale/net/http/WebSocketServlet.java index 3792c0de1..5e6f45117 100644 --- a/src/main/java/org/redkale/net/http/WebSocketServlet.java +++ b/src/main/java/org/redkale/net/http/WebSocketServlet.java @@ -244,32 +244,32 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl @Override // 在IOThread中执行 @NonBlocking public final void execute(final HttpRequest request, final HttpResponse response) throws IOException { - final boolean debug = logger.isLoggable(Level.FINEST); + final boolean debug = logger.isLoggable(Level.FINER); + final boolean fine = logger.isLoggable(Level.FINE); if (!request.isWebSocket()) { - if (debug) { + if (fine) { logger.log( - Level.FINEST, + Level.FINE, "WebSocket connect abort, (Not GET Method)/(Connection!=Upgrade)/(Upgrade!=websocket). request=" + request); } - response.finish(true); + response.kill(); return; } final String key = request.getHeader("Sec-WebSocket-Key"); if (key == null) { - if (debug) { + if (fine) { logger.log( - Level.FINEST, - "WebSocket connect abort, Not found Sec-WebSocket-Key header. request=" + request); + Level.FINE, "WebSocket connect abort, Not found Sec-WebSocket-Key header. request=" + request); } - response.finish(true); + response.kill(); return; } if (this.webSocketNode.localEngine.isLocalConnLimited()) { logger.log( Level.WARNING, "WebSocket connections limit, wsmaxconns=" + this.webSocketNode.localEngine.getLocalWsMaxConns()); - response.finish(true); + response.kill(); return; } final WebSocket webSocket = this.createWebSocket(); @@ -287,24 +287,25 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl webSocket.deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, true); webSocket.inflater = new Inflater(true); } + response.skipReadRegistered(); initRestWebSocket(webSocket); CompletableFuture sessionFuture = webSocket.onOpen(request); if (sessionFuture == null) { if (debug) { - logger.log(Level.FINEST, "WebSocket connect abort, Not found sessionid. request=" + request); + logger.log(Level.FINER, "WebSocket connect abort, Not found sessionid. request=" + request); } - response.finish(true); + response.kill(); return; } BiConsumer sessionConsumer = (sessionid, ex) -> { if ((sessionid == null && webSocket.delayPackets == null) || ex != null) { if (debug || ex != null) { logger.log( - ex == null ? Level.FINEST : Level.FINE, + ex == null ? Level.FINER : Level.FINE, "WebSocket connect abort, Not found sessionid or occur error. request=" + request, ex); } - response.finish(true); + response.kill(); return; } // onOpen成功或者存在delayPackets @@ -338,7 +339,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl Level.FINEST, "WebSocket connect abort, Create userid abort. request = " + request); } - response.finish(true); + response.kill(); return; } userFuture.whenComplete((userid, ex2) -> { @@ -350,7 +351,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl "WebSocket connect abort, Create userid abort. request = " + request, ex2); } - response.finish(true); + response.kill(); return; } Runnable runHandler = () -> { @@ -360,14 +361,14 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl Traces.currentTraceid(request.getTraceid()); if (rs) { CompletableFuture rcFuture = webSocket.onSingleRepeatConnect(); - Consumer task = (oldkilled) -> { + Consumer task = oldkilled -> { if (oldkilled) { webSocketNode.localEngine.addLocal(webSocket); response.removeChannel(); webSocket._readHandler.startRead(); - response.finish(true); + response.kill(); } else { // 关闭新连接 - response.finish(true); + response.kill(); } }; if (rcFuture == null) { @@ -375,7 +376,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl } else { rcFuture.whenComplete((r, e) -> { if (e != null) { - response.finish(true); + response.kill(); } else { task.accept(r); } @@ -385,14 +386,14 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl webSocketNode.localEngine.addLocal(webSocket); response.removeChannel(); webSocket._readHandler.startRead(); - response.finish(true); + response.kill(); } }); } else { webSocketNode.localEngine.addLocal(webSocket); response.removeChannel(); webSocket._readHandler.startRead(); - response.finish(true); + response.kill(); } }; if (webSocket.delayPackets != null) { // 存在待发送的消息 @@ -412,7 +413,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl + request, t); } - response.finish(true); + response.kill(); } else { runHandler.run(); } @@ -439,7 +440,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl + request, t); } - response.finish(true); + response.kill(); } else { createUseridHandler.run(); } @@ -452,7 +453,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl @Override public void failed(Throwable exc, Void attachment) { logger.log(Level.FINEST, "WebSocket connect abort, Response send abort. request = " + request, exc); - response.finish(true); + response.kill(); } }); };