From 9ff161c97d61eee93948dba99c316610144f0202 Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Wed, 20 Dec 2017 08:54:54 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0WebSocket=E5=9C=A8onOpen?= =?UTF-8?q?=E5=92=8CcreateUserid=E6=96=B9=E6=B3=95=E9=87=8C=E8=BF=94?= =?UTF-8?q?=E5=9B=9E=E6=97=A0=E6=95=88=E7=94=A8=E6=88=B7=E4=BF=A1=E6=81=AF?= =?UTF-8?q?=E5=89=8D=E4=B9=9F=E5=8F=AF=E4=BB=A5=E5=8F=91=E9=80=81=E4=BF=A1?= =?UTF-8?q?=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/org/redkale/net/http/WebSocket.java | 10 ++++++ .../redkale/net/http/WebSocketServlet.java | 34 +++++++++++++++++-- 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/src/org/redkale/net/http/WebSocket.java b/src/org/redkale/net/http/WebSocket.java index 56276e1ec..2786e07b1 100644 --- a/src/org/redkale/net/http/WebSocket.java +++ b/src/org/redkale/net/http/WebSocket.java @@ -64,6 +64,9 @@ public abstract class WebSocket { @Comment("WebSocket已离线") public static final int RETCODE_WSOFFLINE = 1 << 8; //256 + @Comment("WebSocket将延迟发送") + public static final int RETCODE_DEAYSEND = 1 << 9; //512 + WebSocketRunner _runner; //不可能为空 WebSocketEngine _engine; //不可能为空 @@ -90,6 +93,8 @@ public abstract class WebSocket { private Map attributes = new HashMap<>(); //非线程安全 + List delayPackets; + protected WebSocket() { } @@ -225,6 +230,11 @@ public abstract class WebSocket { * @return 0表示成功, 非0表示错误码 */ CompletableFuture sendPacket(WebSocketPacket packet) { + if (this._runner == null) { + if (delayPackets == null) delayPackets = new ArrayList<>(); + delayPackets.add(packet); + return CompletableFuture.completedFuture(RETCODE_DEAYSEND); + } CompletableFuture rs = this._runner.sendMessage(packet); if (_engine.logger.isLoggable(Level.FINEST) && packet != WebSocketPacket.DEFAULT_PING_PACKET) { _engine.logger.finest("userid:" + getUserid() + " send websocket message(" + packet + ")" + " on " + this); diff --git a/src/org/redkale/net/http/WebSocketServlet.java b/src/org/redkale/net/http/WebSocketServlet.java index aa2cade38..596e4892e 100644 --- a/src/org/redkale/net/http/WebSocketServlet.java +++ b/src/org/redkale/net/http/WebSocketServlet.java @@ -178,7 +178,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl return; } sessionFuture.whenComplete((sessionid, ex) -> { - if (sessionid == null || ex != null) { + if ((sessionid == null && webSocket.delayPackets == null) || ex != null) { if (debug || ex != null) logger.log(ex == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Not found sessionid or occur error. request=" + request, ex); response.finish(true); return; @@ -198,6 +198,21 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl @Override public void completed(Integer result, Void attachment) { HttpContext context = response.getContext(); + if (sessionid == null && webSocket.delayPackets != null) { + WebSocketRunner temprunner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.getChannel()); + List delayPackets = webSocket.delayPackets; + webSocket.delayPackets = null; + CompletableFuture cf = null; + for (WebSocketPacket packet : delayPackets) { + if (cf == null) { + cf = temprunner.sendMessage(packet); + } else { + cf = cf.thenCombine(temprunner.sendMessage(packet), (a, b) -> a | b); + } + } + cf.whenComplete((v, t) -> response.finish(true)); + return; + } CompletableFuture userFuture = webSocket.createUserid(); if (userFuture == null) { if (debug) logger.finest("WebSocket connect abort, Create userid abort. request = " + request); @@ -205,11 +220,26 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl return; } userFuture.whenComplete((userid, ex2) -> { - if (userid == null || ex2 != null) { + if ((userid == null && webSocket.delayPackets == null) || ex2 != null) { if (debug || ex2 != null) logger.log(ex2 == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Create userid abort. request = " + request, ex2); response.finish(true); return; } + if (userid == null && webSocket.delayPackets != null) { + WebSocketRunner temprunner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.getChannel()); + List delayPackets = webSocket.delayPackets; + webSocket.delayPackets = null; + CompletableFuture cf = null; + for (WebSocketPacket packet : delayPackets) { + if (cf == null) { + cf = temprunner.sendMessage(packet); + } else { + cf = cf.thenCombine(temprunner.sendMessage(packet), (a, b) -> a | b); + } + } + cf.whenComplete((v, t) -> response.finish(true)); + return; + } webSocket._userid = userid; if (single && !anyuser) { WebSocketServlet.this.node.existsWebSocket(userid).whenComplete((rs, ex) -> {