This commit is contained in:
redkale
2024-10-21 20:26:08 +08:00
parent 5fbe56bfc1
commit 72fe0dc2ba
4 changed files with 29 additions and 29 deletions

View File

@@ -975,7 +975,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
/** /**
* 将指定byte[]按响应结果输出 * 将指定byte[]按响应结果输出
* *
* @param kill kill * @param kill abort
* @param contentType ContentType * @param contentType ContentType
* @param bs 输出内容 * @param bs 输出内容
* @param offset 偏移量 * @param offset 偏移量
@@ -988,7 +988,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
/** /**
* 将指定byte[]按响应结果输出 * 将指定byte[]按响应结果输出
* *
* @param kill kill * @param kill abort
* @param contentType ContentType * @param contentType ContentType
* @param bodyContent 输出内容 * @param bodyContent 输出内容
* @param bodyOffset 偏移量 * @param bodyOffset 偏移量
@@ -1031,11 +1031,11 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
if (cacheHandler != null) { if (cacheHandler != null) {
cacheHandler.accept(this, data.getBytes()); cacheHandler.accept(this, data.getBytes());
} }
// 不能用finish(boolean kill, final ByteTuple array) 否则会调this.finishFuture // 不能用finish(boolean abort, final ByteTuple array) 否则会调this.finishFuture
super.finish(false, data.content(), 0, data.length()); super.finish(false, data.content(), 0, data.length());
} }
void kill() { void abort() {
refuseAlive(); refuseAlive();
this.responseConsumer.accept(this); this.responseConsumer.accept(this);
} }

View File

@@ -952,7 +952,7 @@ public abstract class WebSocket<G extends Serializable, T> {
if (this.inflater != null) { if (this.inflater != null) {
this.inflater.end(); this.inflater.end();
} }
CompletableFuture<Void> future = kill(CLOSECODE_SERVERCLOSE, "user close"); CompletableFuture<Void> future = abort(CLOSECODE_SERVERCLOSE, "user close");
if (future != null) { if (future != null) {
future.join(); future.join();
} }
@@ -978,7 +978,7 @@ public abstract class WebSocket<G extends Serializable, T> {
* java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) at * java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) at
* java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179) * java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179)
*/ */
CompletableFuture<Void> kill(int code, String reason) { CompletableFuture<Void> abort(int code, String reason) {
if (closed.compareAndSet(false, true)) { if (closed.compareAndSet(false, true)) {
if (_channel == null) { if (_channel == null) {
return null; return null;
@@ -1027,7 +1027,7 @@ public abstract class WebSocket<G extends Serializable, T> {
@Override @Override
public void failed(Throwable exc, Void attachment) { public void failed(Throwable exc, Void attachment) {
super.completeExceptionally(exc); super.completeExceptionally(exc);
kill(RETCODE_SENDEXCEPTION, "websocket send message failed on CompletionHandler"); abort(RETCODE_SENDEXCEPTION, "websocket send message failed on CompletionHandler");
} }
} }
} }

View File

@@ -297,7 +297,7 @@ public class WebSocketReadHandler implements CompletionHandler<Integer, ByteBuff
"WebSocket(" + webSocket + ") abort on read buffer count, force to close channel, live " "WebSocket(" + webSocket + ") abort on read buffer count, force to close channel, live "
+ (System.currentTimeMillis() - webSocket.getCreateTime()) / 1000 + " seconds"); + (System.currentTimeMillis() - webSocket.getCreateTime()) / 1000 + " seconds");
} }
webSocket.kill(CLOSECODE_ILLPACKET, "read buffer count is " + count); webSocket.abort(CLOSECODE_ILLPACKET, "read buffer count is " + count);
return; return;
} }
try { try {
@@ -371,13 +371,13 @@ public class WebSocketReadHandler implements CompletionHandler<Integer, ByteBuff
Level.FINEST, Level.FINEST,
"WebSocket(" + webSocket + ") onMessage by CLOSE FrameType : " + packet); "WebSocket(" + webSocket + ") onMessage by CLOSE FrameType : " + packet);
} }
webSocket.kill(CLOSECODE_CLIENTCLOSE, "received CLOSE frame-type message"); webSocket.abort(CLOSECODE_CLIENTCLOSE, "received CLOSE frame-type message");
return; return;
} else { } else {
logger.log( logger.log(
Level.WARNING, Level.WARNING,
"WebSocket(" + webSocket + ") onMessage by unknown FrameType : " + packet); "WebSocket(" + webSocket + ") onMessage by unknown FrameType : " + packet);
webSocket.kill(CLOSECODE_ILLPACKET, "received unknown frame-type message"); webSocket.abort(CLOSECODE_ILLPACKET, "received unknown frame-type message");
return; return;
} }
} }
@@ -387,7 +387,7 @@ public class WebSocketReadHandler implements CompletionHandler<Integer, ByteBuff
webSocket._channel.read(this); webSocket._channel.read(this);
} catch (Throwable e) { } catch (Throwable e) {
logger.log(Level.WARNING, "WebSocket(" + webSocket + ") onMessage by received error", e); logger.log(Level.WARNING, "WebSocket(" + webSocket + ") onMessage by received error", e);
webSocket.kill(CLOSECODE_WSEXCEPTION, "websocket-received error"); webSocket.abort(CLOSECODE_WSEXCEPTION, "websocket-received error");
} }
} }
@@ -406,9 +406,9 @@ public class WebSocketReadHandler implements CompletionHandler<Integer, ByteBuff
+ (System.currentTimeMillis() - webSocket.getCreateTime()) / 1000 + " seconds", + (System.currentTimeMillis() - webSocket.getCreateTime()) / 1000 + " seconds",
exc); exc);
} }
webSocket.kill(CLOSECODE_WSEXCEPTION, "read websocket-packet failed"); webSocket.abort(CLOSECODE_WSEXCEPTION, "read websocket-packet failed");
} else { } else {
webSocket.kill(CLOSECODE_WSEXCEPTION, "decode websocket-packet error"); webSocket.abort(CLOSECODE_WSEXCEPTION, "decode websocket-packet error");
} }
} }
} }

View File

@@ -254,7 +254,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
"WebSocket connect abort, (Not GET Method)/(Connection!=Upgrade)/(Upgrade!=websocket). request=" "WebSocket connect abort, (Not GET Method)/(Connection!=Upgrade)/(Upgrade!=websocket). request="
+ request); + request);
} }
response.kill(); response.abort();
return; return;
} }
final String key = request.getHeader("Sec-WebSocket-Key"); final String key = request.getHeader("Sec-WebSocket-Key");
@@ -263,14 +263,14 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
logger.log( logger.log(
Level.FINE, "WebSocket connect abort, Not found Sec-WebSocket-Key header. request=" + request); Level.FINE, "WebSocket connect abort, Not found Sec-WebSocket-Key header. request=" + request);
} }
response.kill(); response.abort();
return; return;
} }
if (this.webSocketNode.localEngine.isLocalConnLimited()) { if (this.webSocketNode.localEngine.isLocalConnLimited()) {
logger.log( logger.log(
Level.WARNING, Level.WARNING,
"WebSocket connections limit, wsmaxconns=" + this.webSocketNode.localEngine.getLocalWsMaxConns()); "WebSocket connections limit, wsmaxconns=" + this.webSocketNode.localEngine.getLocalWsMaxConns());
response.kill(); response.abort();
return; return;
} }
final WebSocket webSocket = this.createWebSocket(); final WebSocket webSocket = this.createWebSocket();
@@ -295,7 +295,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
if (debug) { if (debug) {
logger.log(Level.FINER, "WebSocket connect abort, Not found sessionid. request=" + request); logger.log(Level.FINER, "WebSocket connect abort, Not found sessionid. request=" + request);
} }
response.kill(); response.abort();
return; return;
} }
BiConsumer<String, Throwable> sessionConsumer = (sessionid, ex) -> { BiConsumer<String, Throwable> sessionConsumer = (sessionid, ex) -> {
@@ -306,7 +306,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
"WebSocket connect abort, Not found sessionid or occur error. request=" + request, "WebSocket connect abort, Not found sessionid or occur error. request=" + request,
ex); ex);
} }
response.kill(); response.abort();
return; return;
} }
// onOpen成功或者存在delayPackets // onOpen成功或者存在delayPackets
@@ -338,7 +338,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
Level.FINEST, Level.FINEST,
"WebSocket connect abort, Create userid abort. request = " + request); "WebSocket connect abort, Create userid abort. request = " + request);
} }
response.kill(); response.abort();
return; return;
} }
userFuture.whenComplete((userid, ex2) -> { userFuture.whenComplete((userid, ex2) -> {
@@ -350,7 +350,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
"WebSocket connect abort, Create userid abort. request = " + request, "WebSocket connect abort, Create userid abort. request = " + request,
ex2); ex2);
} }
response.kill(); response.abort();
return; return;
} }
if (userid != null if (userid != null
@@ -362,7 +362,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
Level.SEVERE, Level.SEVERE,
"WebSocket userid must be Integer/Long/String/BigInteger type, but " "WebSocket userid must be Integer/Long/String/BigInteger type, but "
+ userid.getClass().getName()); + userid.getClass().getName());
response.kill(); response.abort();
return; return;
} }
@@ -378,9 +378,9 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
webSocketNode.localEngine.addLocal(webSocket); webSocketNode.localEngine.addLocal(webSocket);
response.removeChannel(); response.removeChannel();
webSocket._readHandler.startRead(); webSocket._readHandler.startRead();
response.kill(); response.abort();
} else { // 关闭新连接 } else { // 关闭新连接
response.kill(); response.abort();
} }
}; };
if (rcFuture == null) { if (rcFuture == null) {
@@ -388,7 +388,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
} else { } else {
rcFuture.whenComplete((r, e) -> { rcFuture.whenComplete((r, e) -> {
if (e != null) { if (e != null) {
response.kill(); response.abort();
} else { } else {
task.accept(r); task.accept(r);
} }
@@ -398,14 +398,14 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
webSocketNode.localEngine.addLocal(webSocket); webSocketNode.localEngine.addLocal(webSocket);
response.removeChannel(); response.removeChannel();
webSocket._readHandler.startRead(); webSocket._readHandler.startRead();
response.kill(); response.abort();
} }
}); });
} else { } else {
webSocketNode.localEngine.addLocal(webSocket); webSocketNode.localEngine.addLocal(webSocket);
response.removeChannel(); response.removeChannel();
webSocket._readHandler.startRead(); webSocket._readHandler.startRead();
response.kill(); response.abort();
} }
}; };
if (webSocket.delayPackets != null) { // 存在待发送的消息 if (webSocket.delayPackets != null) { // 存在待发送的消息
@@ -424,7 +424,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
+ request, + request,
t); t);
} }
response.kill(); response.abort();
} else { } else {
runHandler.run(); runHandler.run();
} }
@@ -450,7 +450,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
+ request, + request,
t); t);
} }
response.kill(); response.abort();
} else { } else {
createUseridHandler.run(); createUseridHandler.run();
} }
@@ -463,7 +463,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
@Override @Override
public void failed(Throwable exc, Void attachment) { public void failed(Throwable exc, Void attachment) {
logger.log(Level.FINEST, "WebSocket connect abort, Response send abort. request = " + request, exc); logger.log(Level.FINEST, "WebSocket connect abort, Response send abort. request = " + request, exc);
response.kill(); response.abort();
} }
}); });
}; };