diff --git a/src/org/redkale/net/http/WebSocketRunner.java b/src/org/redkale/net/http/WebSocketRunner.java index 203db80a9..939942164 100644 --- a/src/org/redkale/net/http/WebSocketRunner.java +++ b/src/org/redkale/net/http/WebSocketRunner.java @@ -433,6 +433,10 @@ class WebSocketRunner implements Runnable { return futureResult; } + public boolean isClosed() { + return closed; + } + public void closeRunner(int code) { if (closed) return; synchronized (this) { diff --git a/src/org/redkale/net/http/WebSocketServlet.java b/src/org/redkale/net/http/WebSocketServlet.java index f3e1f0276..6e0a2ec06 100644 --- a/src/org/redkale/net/http/WebSocketServlet.java +++ b/src/org/redkale/net/http/WebSocketServlet.java @@ -195,11 +195,72 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl response.addHeader("Sec-WebSocket-Accept", Base64.getEncoder().encodeToString(bytes)); response.sendBody((ByteBuffer) null, null, new CompletionHandler() { + WebSocketRunner temprunner = null; + @Override public void completed(Integer result, Void attachment) { HttpContext context = response.getContext(); - if (webSocket.delayPackets != null) { - WebSocketRunner temprunner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.getChannel()); + + Runnable createUseridHandler = () -> { + CompletableFuture userFuture = webSocket.createUserid(); + if (userFuture == null) { + if (debug) logger.finest("WebSocket connect abort, Create userid abort. request = " + request); + response.finish(true); + return; + } + userFuture.whenComplete((userid, ex2) -> { + if ((userid == null && webSocket.delayPackets == null) || ex2 != null) { + if (debug || ex2 != null) logger.log(ex2 == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Create userid abort. request = " + request, ex2); + response.finish(true); + return; + } + Runnable runHandler = () -> { + temprunner = null; + webSocket._userid = userid; + if (single && !anyuser) { + WebSocketServlet.this.node.existsWebSocket(userid).whenComplete((rs, ex) -> { + if (rs) webSocket.onSingleRepeatConnect(); + WebSocketServlet.this.node.localEngine.add(webSocket); + WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel()); + webSocket._runner = runner; + context.runAsync(runner); + response.finish(true); + }); + } else { + WebSocketServlet.this.node.localEngine.add(webSocket); + WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel()); + webSocket._runner = runner; + context.runAsync(runner); + response.finish(true); + } + }; + if (webSocket.delayPackets != null) { //存在待发送的消息 + if (temprunner == null) temprunner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.getChannel()); + List delayPackets = webSocket.delayPackets; + webSocket.delayPackets = null; + CompletableFuture cf = null; + for (WebSocketPacket packet : delayPackets) { + if (cf == null) { + cf = temprunner.sendMessage(packet); + } else { + cf = cf.thenCombine(temprunner.sendMessage(packet), (a, b) -> a | b); + } + } + cf.whenComplete((Integer v, Throwable t) -> { + if (userid == null || t != null || (temprunner != null && temprunner.isClosed())) { + if (t != null) logger.log(Level.FINEST, "WebSocket connect abort, Response send delayPackets abort. request = " + request, t); + response.finish(true); + } else { + runHandler.run(); + } + }); + } else { + runHandler.run(); + } + }); + }; + if (webSocket.delayPackets != null) { //存在待发送的消息 + if (temprunner == null) temprunner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.getChannel()); List delayPackets = webSocket.delayPackets; webSocket.delayPackets = null; CompletableFuture cf = null; @@ -210,54 +271,17 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl cf = cf.thenCombine(temprunner.sendMessage(packet), (a, b) -> a | b); } } - cf.whenComplete((v, t) -> response.finish(true)); - if (sessionid == null) return; - } - CompletableFuture userFuture = webSocket.createUserid(); - if (userFuture == null) { - if (debug) logger.finest("WebSocket connect abort, Create userid abort. request = " + request); - response.finish(true); - return; - } - userFuture.whenComplete((userid, ex2) -> { - if ((userid == null && webSocket.delayPackets == null) || ex2 != null) { - if (debug || ex2 != null) logger.log(ex2 == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Create userid abort. request = " + request, ex2); - response.finish(true); - return; - } - if (webSocket.delayPackets != null) { - WebSocketRunner temprunner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.getChannel()); - List delayPackets = webSocket.delayPackets; - webSocket.delayPackets = null; - CompletableFuture cf = null; - for (WebSocketPacket packet : delayPackets) { - if (cf == null) { - cf = temprunner.sendMessage(packet); - } else { - cf = cf.thenCombine(temprunner.sendMessage(packet), (a, b) -> a | b); - } - } - cf.whenComplete((v, t) -> response.finish(true)); - if (userid == null) return; - } - webSocket._userid = userid; - if (single && !anyuser) { - WebSocketServlet.this.node.existsWebSocket(userid).whenComplete((rs, ex) -> { - if (rs) webSocket.onSingleRepeatConnect(); - WebSocketServlet.this.node.localEngine.add(webSocket); - WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel()); - webSocket._runner = runner; - context.runAsync(runner); + cf.whenComplete((Integer v, Throwable t) -> { + if (sessionid == null || t != null || (temprunner != null && temprunner.isClosed())) { + if (t != null) logger.log(Level.FINEST, "WebSocket connect abort, Response send delayPackets abort. request = " + request, t); response.finish(true); - }); - } else { - WebSocketServlet.this.node.localEngine.add(webSocket); - WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel()); - webSocket._runner = runner; - context.runAsync(runner); - response.finish(true); - } - }); + } else { + createUseridHandler.run(); + } + }); + } else { + createUseridHandler.run(); + } } @Override