修复WebSocket.readPending问题

This commit is contained in:
redkale
2024-10-09 18:07:55 +08:00
parent 76623e9c6b
commit 71cec55f1c
5 changed files with 41 additions and 29 deletions

View File

@@ -162,6 +162,8 @@ public abstract class Response<C extends Context, R extends Request<C>> {
} }
this.responseSupplier = null; this.responseSupplier = null;
this.responseConsumer = null; this.responseConsumer = null;
this.recycleListener = null;
this.afterFinishListeners = null;
this.inited = false; this.inited = false;
return true; return true;
} }
@@ -180,6 +182,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
protected void refuseAlive() { protected void refuseAlive() {
this.request.keepAlive = false; this.request.keepAlive = false;
this.readRegistered = true;
} }
protected void init(AsyncConnection channel) { protected void init(AsyncConnection channel) {

View File

@@ -401,7 +401,7 @@ public class HttpRequest extends Request<HttpContext> {
this.keepAlive = false; this.keepAlive = false;
} }
// readCompleted=true时ProtocolCodec会继续读下一个request // readCompleted=true时ProtocolCodec会继续读下一个request
this.readCompleted = !this.boundary && !maybews; this.readCompleted = !this.boundary;
this.bodyBytes.clear(); this.bodyBytes.clear();
this.readState = READ_STATE_BODY; this.readState = READ_STATE_BODY;
} }

View File

@@ -1032,6 +1032,15 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
super.finish(false, data.content(), 0, data.length()); super.finish(false, data.content(), 0, data.length());
} }
void kill() {
refuseAlive();
this.responseConsumer.accept(this);
}
void skipReadRegistered() {
this.readRegistered = true;
}
/** 以304状态码输出 */ /** 以304状态码输出 */
public void finish304() { public void finish304() {
skipHeader(); skipHeader();

View File

@@ -5,8 +5,6 @@
*/ */
package org.redkale.net.http; package org.redkale.net.http;
import static org.redkale.net.http.WebSocket.*;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler; import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
@@ -16,6 +14,7 @@ import java.util.function.BiConsumer;
import java.util.logging.*; import java.util.logging.*;
import org.redkale.convert.Convert; import org.redkale.convert.Convert;
import org.redkale.net.AsyncIOThread; import org.redkale.net.AsyncIOThread;
import static org.redkale.net.http.WebSocket.*;
import org.redkale.net.http.WebSocketPacket.FrameType; import org.redkale.net.http.WebSocketPacket.FrameType;
import org.redkale.util.*; import org.redkale.util.*;
@@ -131,7 +130,7 @@ public class WebSocketReadHandler implements CompletionHandler<Integer, ByteBuff
if (lengthCode <= 0x7D) { // 125 长度<=125 if (lengthCode <= 0x7D) { // 125 长度<=125
length = lengthCode; length = lengthCode;
} else if (lengthCode == 0x7E) { // 0x7E=126 长度:126~65535 } else if (lengthCode == 0x7E) { // 0x7E=126 长度:126~65535
length = (int) realbuf.getChar(); length = realbuf.getChar();
} else if (lengthCode == 0x7F) { // 0x7E=127 长度>65535 } else if (lengthCode == 0x7F) { // 0x7E=127 长度>65535
length = (int) realbuf.getLong(); length = (int) realbuf.getLong();
} else { } else {
@@ -188,7 +187,7 @@ public class WebSocketReadHandler implements CompletionHandler<Integer, ByteBuff
if (lengthCode <= 0x7D) { // 125 长度<=125 if (lengthCode <= 0x7D) { // 125 长度<=125
length = lengthCode; length = lengthCode;
} else if (lengthCode == 0x7E) { // 0x7E=126 长度:126~65535 } else if (lengthCode == 0x7E) { // 0x7E=126 长度:126~65535
length = (int) realbuf.getChar(); length = realbuf.getChar();
} else if (lengthCode == 0x7F) { // 0x7E=127 长度>65535 } else if (lengthCode == 0x7F) { // 0x7E=127 长度>65535
length = (int) realbuf.getLong(); length = (int) realbuf.getLong();
} else { } else {

View File

@@ -244,32 +244,32 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
@Override // 在IOThread中执行 @Override // 在IOThread中执行
@NonBlocking @NonBlocking
public final void execute(final HttpRequest request, final HttpResponse response) throws IOException { public final void execute(final HttpRequest request, final HttpResponse response) throws IOException {
final boolean debug = logger.isLoggable(Level.FINEST); final boolean debug = logger.isLoggable(Level.FINER);
final boolean fine = logger.isLoggable(Level.FINE);
if (!request.isWebSocket()) { if (!request.isWebSocket()) {
if (debug) { if (fine) {
logger.log( logger.log(
Level.FINEST, Level.FINE,
"WebSocket connect abort, (Not GET Method)/(Connection!=Upgrade)/(Upgrade!=websocket). request=" "WebSocket connect abort, (Not GET Method)/(Connection!=Upgrade)/(Upgrade!=websocket). request="
+ request); + request);
} }
response.finish(true); response.kill();
return; return;
} }
final String key = request.getHeader("Sec-WebSocket-Key"); final String key = request.getHeader("Sec-WebSocket-Key");
if (key == null) { if (key == null) {
if (debug) { if (fine) {
logger.log( logger.log(
Level.FINEST, Level.FINE, "WebSocket connect abort, Not found Sec-WebSocket-Key header. request=" + request);
"WebSocket connect abort, Not found Sec-WebSocket-Key header. request=" + request);
} }
response.finish(true); response.kill();
return; return;
} }
if (this.webSocketNode.localEngine.isLocalConnLimited()) { if (this.webSocketNode.localEngine.isLocalConnLimited()) {
logger.log( logger.log(
Level.WARNING, Level.WARNING,
"WebSocket connections limit, wsmaxconns=" + this.webSocketNode.localEngine.getLocalWsMaxConns()); "WebSocket connections limit, wsmaxconns=" + this.webSocketNode.localEngine.getLocalWsMaxConns());
response.finish(true); response.kill();
return; return;
} }
final WebSocket webSocket = this.createWebSocket(); final WebSocket webSocket = this.createWebSocket();
@@ -287,24 +287,25 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
webSocket.deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, true); webSocket.deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, true);
webSocket.inflater = new Inflater(true); webSocket.inflater = new Inflater(true);
} }
response.skipReadRegistered();
initRestWebSocket(webSocket); initRestWebSocket(webSocket);
CompletableFuture<String> sessionFuture = webSocket.onOpen(request); CompletableFuture<String> sessionFuture = webSocket.onOpen(request);
if (sessionFuture == null) { if (sessionFuture == null) {
if (debug) { if (debug) {
logger.log(Level.FINEST, "WebSocket connect abort, Not found sessionid. request=" + request); logger.log(Level.FINER, "WebSocket connect abort, Not found sessionid. request=" + request);
} }
response.finish(true); response.kill();
return; return;
} }
BiConsumer<String, Throwable> sessionConsumer = (sessionid, ex) -> { BiConsumer<String, Throwable> sessionConsumer = (sessionid, ex) -> {
if ((sessionid == null && webSocket.delayPackets == null) || ex != null) { if ((sessionid == null && webSocket.delayPackets == null) || ex != null) {
if (debug || ex != null) { if (debug || ex != null) {
logger.log( logger.log(
ex == null ? Level.FINEST : Level.FINE, ex == null ? Level.FINER : Level.FINE,
"WebSocket connect abort, Not found sessionid or occur error. request=" + request, "WebSocket connect abort, Not found sessionid or occur error. request=" + request,
ex); ex);
} }
response.finish(true); response.kill();
return; return;
} }
// onOpen成功或者存在delayPackets // onOpen成功或者存在delayPackets
@@ -338,7 +339,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
Level.FINEST, Level.FINEST,
"WebSocket connect abort, Create userid abort. request = " + request); "WebSocket connect abort, Create userid abort. request = " + request);
} }
response.finish(true); response.kill();
return; return;
} }
userFuture.whenComplete((userid, ex2) -> { userFuture.whenComplete((userid, ex2) -> {
@@ -350,7 +351,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
"WebSocket connect abort, Create userid abort. request = " + request, "WebSocket connect abort, Create userid abort. request = " + request,
ex2); ex2);
} }
response.finish(true); response.kill();
return; return;
} }
Runnable runHandler = () -> { Runnable runHandler = () -> {
@@ -360,14 +361,14 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
Traces.currentTraceid(request.getTraceid()); Traces.currentTraceid(request.getTraceid());
if (rs) { if (rs) {
CompletableFuture<Boolean> rcFuture = webSocket.onSingleRepeatConnect(); CompletableFuture<Boolean> rcFuture = webSocket.onSingleRepeatConnect();
Consumer<Boolean> task = (oldkilled) -> { Consumer<Boolean> task = oldkilled -> {
if (oldkilled) { if (oldkilled) {
webSocketNode.localEngine.addLocal(webSocket); webSocketNode.localEngine.addLocal(webSocket);
response.removeChannel(); response.removeChannel();
webSocket._readHandler.startRead(); webSocket._readHandler.startRead();
response.finish(true); response.kill();
} else { // 关闭新连接 } else { // 关闭新连接
response.finish(true); response.kill();
} }
}; };
if (rcFuture == null) { if (rcFuture == null) {
@@ -375,7 +376,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
} else { } else {
rcFuture.whenComplete((r, e) -> { rcFuture.whenComplete((r, e) -> {
if (e != null) { if (e != null) {
response.finish(true); response.kill();
} else { } else {
task.accept(r); task.accept(r);
} }
@@ -385,14 +386,14 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
webSocketNode.localEngine.addLocal(webSocket); webSocketNode.localEngine.addLocal(webSocket);
response.removeChannel(); response.removeChannel();
webSocket._readHandler.startRead(); webSocket._readHandler.startRead();
response.finish(true); response.kill();
} }
}); });
} else { } else {
webSocketNode.localEngine.addLocal(webSocket); webSocketNode.localEngine.addLocal(webSocket);
response.removeChannel(); response.removeChannel();
webSocket._readHandler.startRead(); webSocket._readHandler.startRead();
response.finish(true); response.kill();
} }
}; };
if (webSocket.delayPackets != null) { // 存在待发送的消息 if (webSocket.delayPackets != null) { // 存在待发送的消息
@@ -412,7 +413,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
+ request, + request,
t); t);
} }
response.finish(true); response.kill();
} else { } else {
runHandler.run(); runHandler.run();
} }
@@ -439,7 +440,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
+ request, + request,
t); t);
} }
response.finish(true); response.kill();
} else { } else {
createUseridHandler.run(); createUseridHandler.run();
} }
@@ -452,7 +453,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
@Override @Override
public void failed(Throwable exc, Void attachment) { public void failed(Throwable exc, Void attachment) {
logger.log(Level.FINEST, "WebSocket connect abort, Response send abort. request = " + request, exc); logger.log(Level.FINEST, "WebSocket connect abort, Response send abort. request = " + request, exc);
response.finish(true); response.kill();
} }
}); });
}; };