This commit is contained in:
Redkale
2017-05-22 12:43:09 +08:00
parent d9a318bba8
commit dee2002cf3
4 changed files with 60 additions and 45 deletions

View File

@@ -503,8 +503,8 @@ public abstract class WebSocket<T> {
* *
* @return sessionid * @return sessionid
*/ */
public Serializable onOpen(final HttpRequest request) { public CompletableFuture<Serializable> onOpen(final HttpRequest request) {
return request.getSessionid(false); return CompletableFuture.completedFuture(request.getSessionid(false));
} }
/** /**
@@ -512,7 +512,7 @@ public abstract class WebSocket<T> {
* *
* @return groupid * @return groupid
*/ */
protected abstract Serializable createGroupid(); protected abstract CompletableFuture<Serializable> createGroupid();
/** /**
* 标记为WebSocketBinary才需要重写此方法 * 标记为WebSocketBinary才需要重写此方法

View File

@@ -11,6 +11,7 @@ import java.net.*;
import java.nio.*; import java.nio.*;
import java.security.*; import java.security.*;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.logging.*; import java.util.logging.*;
import javax.annotation.*; import javax.annotation.*;
import org.redkale.convert.json.JsonConvert; import org.redkale.convert.json.JsonConvert;
@@ -117,7 +118,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
response.finish(true); response.finish(true);
return; return;
} }
String key = request.getHeader("Sec-WebSocket-Key"); final String key = request.getHeader("Sec-WebSocket-Key");
if (key == null) { if (key == null) {
if (debug) logger.finest("WebSocket connect abort, Not found Sec-WebSocket-Key header. request=" + request); if (debug) logger.finest("WebSocket connect abort, Not found Sec-WebSocket-Key header. request=" + request);
response.finish(true); response.finish(true);
@@ -129,47 +130,60 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
webSocket._jsonConvert = jsonConvert; webSocket._jsonConvert = jsonConvert;
webSocket._remoteAddress = request.getRemoteAddress(); webSocket._remoteAddress = request.getRemoteAddress();
webSocket._remoteAddr = request.getRemoteAddr(); webSocket._remoteAddr = request.getRemoteAddr();
Serializable sessionid = webSocket.onOpen(request); CompletableFuture<Serializable> sessionFuture = webSocket.onOpen(request);
if (sessionid == null) { if (sessionFuture == null) {
if (debug) logger.finest("WebSocket connect abort, Not found sessionid. request=" + request); if (debug) logger.finest("WebSocket connect abort, Not found sessionid. request=" + request);
response.finish(true); response.finish(true);
return; return;
} }
webSocket._sessionid = sessionid; sessionFuture.whenComplete((sessionid, ex) -> {
request.setKeepAlive(true); if (sessionid == null || ex != null) {
byte[] bytes = (key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11").getBytes(); if (debug || ex != null) logger.log(ex == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Not found sessionid or occur error. request=" + request, ex);
synchronized (digest) { response.finish(true);
bytes = digest.digest(bytes); return;
} }
key = Base64.getEncoder().encodeToString(bytes); webSocket._sessionid = sessionid;
response.setStatus(101); request.setKeepAlive(true);
response.setHeader("Connection", "Upgrade"); byte[] bytes = (key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11").getBytes();
response.addHeader("Upgrade", "websocket"); synchronized (digest) {
response.addHeader("Sec-WebSocket-Accept", key); bytes = digest.digest(bytes);
response.sendBody((ByteBuffer) null, null, new AsyncHandler<Integer, Void>() { }
response.setStatus(101);
response.setHeader("Connection", "Upgrade");
response.addHeader("Upgrade", "websocket");
response.addHeader("Sec-WebSocket-Accept", Base64.getEncoder().encodeToString(bytes));
response.sendBody((ByteBuffer) null, null, new AsyncHandler<Integer, Void>() {
@Override @Override
public void completed(Integer result, Void attachment) { public void completed(Integer result, Void attachment) {
HttpContext context = response.getContext(); HttpContext context = response.getContext();
Serializable groupid = webSocket.createGroupid(); CompletableFuture<Serializable> groupFuture = webSocket.createGroupid();
if (groupid == null) { if (groupFuture == null) {
if (debug) logger.finest("WebSocket connect abort, Create groupid abort. request = " + request); if (debug) logger.finest("WebSocket connect abort, Create groupid abort. request = " + request);
response.finish(true); response.finish(true);
return; return;
}
groupFuture.whenComplete((groupid, ex) -> {
if (groupid == null || ex != null) {
if (debug || ex != null) logger.log(ex == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Create groupid abort. request = " + request, ex);
response.finish(true);
return;
}
webSocket._groupid = groupid;
WebSocketServlet.this.node.localEngine.add(webSocket);
WebSocketRunner runner = new WebSocketRunner(context, webSocket, response.removeChannel(), wsbinary);
webSocket._runner = runner;
context.runAsync(runner);
response.finish(true);
});
} }
webSocket._groupid = groupid;
WebSocketServlet.this.node.localEngine.add(webSocket);
WebSocketRunner runner = new WebSocketRunner(context, webSocket, response.removeChannel(), wsbinary);
webSocket._runner = runner;
context.runAsync(runner);
response.finish(true);
}
@Override @Override
public void failed(Throwable exc, Void attachment) { public void failed(Throwable exc, Void attachment) {
logger.log(Level.FINEST, "WebSocket connect abort, Response send abort. request = " + request, exc); logger.log(Level.FINEST, "WebSocket connect abort, Response send abort. request = " + request, exc);
response.finish(true); response.finish(true);
} }
});
}); });
} }

View File

@@ -9,6 +9,7 @@ import org.redkale.net.http.WebServlet;
import org.redkale.net.http.WebSocketServlet; import org.redkale.net.http.WebSocketServlet;
import org.redkale.net.http.WebSocket; import org.redkale.net.http.WebSocket;
import java.io.*; import java.io.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.*; import java.util.concurrent.atomic.*;
import org.redkale.convert.json.JsonConvert; import org.redkale.convert.json.JsonConvert;
import org.redkale.util.Utility; import org.redkale.util.Utility;
@@ -63,8 +64,8 @@ public class ChatWebSocketServlet extends WebSocketServlet {
} }
@Override @Override
protected Serializable createGroupid() { protected CompletableFuture<Serializable> createGroupid() {
return ""; return CompletableFuture.completedFuture("2");
} }
}; };

View File

@@ -14,7 +14,7 @@ import org.redkale.util.TypeToken;
import org.redkale.util.AnyValue; import org.redkale.util.AnyValue;
import java.io.*; import java.io.*;
import java.util.*; import java.util.*;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.*;
/** /**
* *
@@ -52,7 +52,7 @@ public class VideoWebSocketServlet extends WebSocketServlet {
private boolean repeat = false; private boolean repeat = false;
@Override @Override
public String onOpen(final HttpRequest request) { public CompletableFuture<String> onOpen(final HttpRequest request) {
String uri = request.getRequestURI(); String uri = request.getRequestURI();
int pos = uri.indexOf("/listen/"); int pos = uri.indexOf("/listen/");
uri = uri.substring(pos + "/listen/".length()); uri = uri.substring(pos + "/listen/".length());
@@ -61,7 +61,7 @@ public class VideoWebSocketServlet extends WebSocketServlet {
String sessionid = Long.toString(System.nanoTime()); String sessionid = Long.toString(System.nanoTime());
if (uri.indexOf('\'') >= 0 || uri.indexOf('"') >= 0) return null; if (uri.indexOf('\'') >= 0 || uri.indexOf('"') >= 0) return null;
if (!repeat) sessionid = uri; if (!repeat) sessionid = uri;
return sessionid; return CompletableFuture.completedFuture(sessionid);
} }
@Override @Override
@@ -104,8 +104,8 @@ public class VideoWebSocketServlet extends WebSocketServlet {
} }
@Override @Override
protected Serializable createGroupid() { protected CompletableFuture<Serializable> createGroupid() {
return ""; return CompletableFuture.completedFuture("2");
} }
}; };
return socket; return socket;