diff --git a/src/org/redkale/net/http/WebSocket.java b/src/org/redkale/net/http/WebSocket.java index 56276e1ec..2786e07b1 100644 --- a/src/org/redkale/net/http/WebSocket.java +++ b/src/org/redkale/net/http/WebSocket.java @@ -64,6 +64,9 @@ public abstract class WebSocket { @Comment("WebSocket已离线") public static final int RETCODE_WSOFFLINE = 1 << 8; //256 + @Comment("WebSocket将延迟发送") + public static final int RETCODE_DEAYSEND = 1 << 9; //512 + WebSocketRunner _runner; //不可能为空 WebSocketEngine _engine; //不可能为空 @@ -90,6 +93,8 @@ public abstract class WebSocket { private Map attributes = new HashMap<>(); //非线程安全 + List delayPackets; + protected WebSocket() { } @@ -225,6 +230,11 @@ public abstract class WebSocket { * @return 0表示成功, 非0表示错误码 */ CompletableFuture sendPacket(WebSocketPacket packet) { + if (this._runner == null) { + if (delayPackets == null) delayPackets = new ArrayList<>(); + delayPackets.add(packet); + return CompletableFuture.completedFuture(RETCODE_DEAYSEND); + } CompletableFuture rs = this._runner.sendMessage(packet); if (_engine.logger.isLoggable(Level.FINEST) && packet != WebSocketPacket.DEFAULT_PING_PACKET) { _engine.logger.finest("userid:" + getUserid() + " send websocket message(" + packet + ")" + " on " + this); diff --git a/src/org/redkale/net/http/WebSocketServlet.java b/src/org/redkale/net/http/WebSocketServlet.java index aa2cade38..596e4892e 100644 --- a/src/org/redkale/net/http/WebSocketServlet.java +++ b/src/org/redkale/net/http/WebSocketServlet.java @@ -178,7 +178,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl return; } sessionFuture.whenComplete((sessionid, ex) -> { - if (sessionid == null || ex != null) { + if ((sessionid == null && webSocket.delayPackets == null) || ex != null) { if (debug || ex != null) logger.log(ex == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Not found sessionid or occur error. request=" + request, ex); response.finish(true); return; @@ -198,6 +198,21 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl @Override public void completed(Integer result, Void attachment) { HttpContext context = response.getContext(); + if (sessionid == null && webSocket.delayPackets != null) { + WebSocketRunner temprunner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.getChannel()); + List delayPackets = webSocket.delayPackets; + webSocket.delayPackets = null; + CompletableFuture cf = null; + for (WebSocketPacket packet : delayPackets) { + if (cf == null) { + cf = temprunner.sendMessage(packet); + } else { + cf = cf.thenCombine(temprunner.sendMessage(packet), (a, b) -> a | b); + } + } + cf.whenComplete((v, t) -> response.finish(true)); + return; + } CompletableFuture userFuture = webSocket.createUserid(); if (userFuture == null) { if (debug) logger.finest("WebSocket connect abort, Create userid abort. request = " + request); @@ -205,11 +220,26 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl return; } userFuture.whenComplete((userid, ex2) -> { - if (userid == null || ex2 != null) { + if ((userid == null && webSocket.delayPackets == null) || ex2 != null) { if (debug || ex2 != null) logger.log(ex2 == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Create userid abort. request = " + request, ex2); response.finish(true); return; } + if (userid == null && webSocket.delayPackets != null) { + WebSocketRunner temprunner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.getChannel()); + List delayPackets = webSocket.delayPackets; + webSocket.delayPackets = null; + CompletableFuture cf = null; + for (WebSocketPacket packet : delayPackets) { + if (cf == null) { + cf = temprunner.sendMessage(packet); + } else { + cf = cf.thenCombine(temprunner.sendMessage(packet), (a, b) -> a | b); + } + } + cf.whenComplete((v, t) -> response.finish(true)); + return; + } webSocket._userid = userid; if (single && !anyuser) { WebSocketServlet.this.node.existsWebSocket(userid).whenComplete((rs, ex) -> {