From dee2002cf3578028ebf60571fc767b20bef52586 Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Mon, 22 May 2017 12:43:09 +0800 Subject: [PATCH] --- src/org/redkale/net/http/WebSocket.java | 6 +- .../redkale/net/http/WebSocketServlet.java | 84 +++++++++++-------- .../test/websocket/ChatWebSocketServlet.java | 5 +- .../test/websocket/VideoWebSocketServlet.java | 10 +-- 4 files changed, 60 insertions(+), 45 deletions(-) diff --git a/src/org/redkale/net/http/WebSocket.java b/src/org/redkale/net/http/WebSocket.java index 787d1e693..c04d0b230 100644 --- a/src/org/redkale/net/http/WebSocket.java +++ b/src/org/redkale/net/http/WebSocket.java @@ -503,8 +503,8 @@ public abstract class WebSocket { * * @return sessionid */ - public Serializable onOpen(final HttpRequest request) { - return request.getSessionid(false); + public CompletableFuture onOpen(final HttpRequest request) { + return CompletableFuture.completedFuture(request.getSessionid(false)); } /** @@ -512,7 +512,7 @@ public abstract class WebSocket { * * @return groupid */ - protected abstract Serializable createGroupid(); + protected abstract CompletableFuture createGroupid(); /** * 标记为WebSocketBinary才需要重写此方法 diff --git a/src/org/redkale/net/http/WebSocketServlet.java b/src/org/redkale/net/http/WebSocketServlet.java index 6beef6625..459918d24 100644 --- a/src/org/redkale/net/http/WebSocketServlet.java +++ b/src/org/redkale/net/http/WebSocketServlet.java @@ -11,6 +11,7 @@ import java.net.*; import java.nio.*; import java.security.*; import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.logging.*; import javax.annotation.*; import org.redkale.convert.json.JsonConvert; @@ -117,7 +118,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl response.finish(true); return; } - String key = request.getHeader("Sec-WebSocket-Key"); + final String key = request.getHeader("Sec-WebSocket-Key"); if (key == null) { if (debug) logger.finest("WebSocket connect abort, Not found Sec-WebSocket-Key header. request=" + request); response.finish(true); @@ -129,47 +130,60 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl webSocket._jsonConvert = jsonConvert; webSocket._remoteAddress = request.getRemoteAddress(); webSocket._remoteAddr = request.getRemoteAddr(); - Serializable sessionid = webSocket.onOpen(request); - if (sessionid == null) { + CompletableFuture sessionFuture = webSocket.onOpen(request); + if (sessionFuture == null) { if (debug) logger.finest("WebSocket connect abort, Not found sessionid. request=" + request); response.finish(true); return; } - webSocket._sessionid = sessionid; - request.setKeepAlive(true); - byte[] bytes = (key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11").getBytes(); - synchronized (digest) { - bytes = digest.digest(bytes); - } - key = Base64.getEncoder().encodeToString(bytes); - response.setStatus(101); - response.setHeader("Connection", "Upgrade"); - response.addHeader("Upgrade", "websocket"); - response.addHeader("Sec-WebSocket-Accept", key); - response.sendBody((ByteBuffer) null, null, new AsyncHandler() { + sessionFuture.whenComplete((sessionid, ex) -> { + if (sessionid == null || ex != null) { + if (debug || ex != null) logger.log(ex == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Not found sessionid or occur error. request=" + request, ex); + response.finish(true); + return; + } + webSocket._sessionid = sessionid; + request.setKeepAlive(true); + byte[] bytes = (key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11").getBytes(); + synchronized (digest) { + bytes = digest.digest(bytes); + } + response.setStatus(101); + response.setHeader("Connection", "Upgrade"); + response.addHeader("Upgrade", "websocket"); + response.addHeader("Sec-WebSocket-Accept", Base64.getEncoder().encodeToString(bytes)); + response.sendBody((ByteBuffer) null, null, new AsyncHandler() { - @Override - public void completed(Integer result, Void attachment) { - HttpContext context = response.getContext(); - Serializable groupid = webSocket.createGroupid(); - if (groupid == null) { - if (debug) logger.finest("WebSocket connect abort, Create groupid abort. request = " + request); - response.finish(true); - return; + @Override + public void completed(Integer result, Void attachment) { + HttpContext context = response.getContext(); + CompletableFuture groupFuture = webSocket.createGroupid(); + if (groupFuture == null) { + if (debug) logger.finest("WebSocket connect abort, Create groupid abort. request = " + request); + response.finish(true); + return; + } + groupFuture.whenComplete((groupid, ex) -> { + if (groupid == null || ex != null) { + if (debug || ex != null) logger.log(ex == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Create groupid abort. request = " + request, ex); + response.finish(true); + return; + } + webSocket._groupid = groupid; + WebSocketServlet.this.node.localEngine.add(webSocket); + WebSocketRunner runner = new WebSocketRunner(context, webSocket, response.removeChannel(), wsbinary); + webSocket._runner = runner; + context.runAsync(runner); + response.finish(true); + }); } - webSocket._groupid = groupid; - WebSocketServlet.this.node.localEngine.add(webSocket); - WebSocketRunner runner = new WebSocketRunner(context, webSocket, response.removeChannel(), wsbinary); - webSocket._runner = runner; - context.runAsync(runner); - response.finish(true); - } - @Override - public void failed(Throwable exc, Void attachment) { - logger.log(Level.FINEST, "WebSocket connect abort, Response send abort. request = " + request, exc); - response.finish(true); - } + @Override + public void failed(Throwable exc, Void attachment) { + logger.log(Level.FINEST, "WebSocket connect abort, Response send abort. request = " + request, exc); + response.finish(true); + } + }); }); } diff --git a/test/org/redkale/test/websocket/ChatWebSocketServlet.java b/test/org/redkale/test/websocket/ChatWebSocketServlet.java index d53588f74..19b4d150d 100644 --- a/test/org/redkale/test/websocket/ChatWebSocketServlet.java +++ b/test/org/redkale/test/websocket/ChatWebSocketServlet.java @@ -9,6 +9,7 @@ import org.redkale.net.http.WebServlet; import org.redkale.net.http.WebSocketServlet; import org.redkale.net.http.WebSocket; import java.io.*; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.*; import org.redkale.convert.json.JsonConvert; import org.redkale.util.Utility; @@ -63,8 +64,8 @@ public class ChatWebSocketServlet extends WebSocketServlet { } @Override - protected Serializable createGroupid() { - return ""; + protected CompletableFuture createGroupid() { + return CompletableFuture.completedFuture("2"); } }; diff --git a/test/org/redkale/test/websocket/VideoWebSocketServlet.java b/test/org/redkale/test/websocket/VideoWebSocketServlet.java index 32d042107..22b63684a 100644 --- a/test/org/redkale/test/websocket/VideoWebSocketServlet.java +++ b/test/org/redkale/test/websocket/VideoWebSocketServlet.java @@ -14,7 +14,7 @@ import org.redkale.util.TypeToken; import org.redkale.util.AnyValue; import java.io.*; import java.util.*; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.*; /** * @@ -52,7 +52,7 @@ public class VideoWebSocketServlet extends WebSocketServlet { private boolean repeat = false; @Override - public String onOpen(final HttpRequest request) { + public CompletableFuture onOpen(final HttpRequest request) { String uri = request.getRequestURI(); int pos = uri.indexOf("/listen/"); uri = uri.substring(pos + "/listen/".length()); @@ -61,7 +61,7 @@ public class VideoWebSocketServlet extends WebSocketServlet { String sessionid = Long.toString(System.nanoTime()); if (uri.indexOf('\'') >= 0 || uri.indexOf('"') >= 0) return null; if (!repeat) sessionid = uri; - return sessionid; + return CompletableFuture.completedFuture(sessionid); } @Override @@ -104,8 +104,8 @@ public class VideoWebSocketServlet extends WebSocketServlet { } @Override - protected Serializable createGroupid() { - return ""; + protected CompletableFuture createGroupid() { + return CompletableFuture.completedFuture("2"); } }; return socket;