This commit is contained in:
Redkale
2017-12-23 10:39:46 +08:00
parent 5b32a91874
commit 0f545923b2
2 changed files with 77 additions and 49 deletions

View File

@@ -433,6 +433,10 @@ class WebSocketRunner implements Runnable {
return futureResult;
}
public boolean isClosed() {
return closed;
}
public void closeRunner(int code) {
if (closed) return;
synchronized (this) {

View File

@@ -195,11 +195,72 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
response.addHeader("Sec-WebSocket-Accept", Base64.getEncoder().encodeToString(bytes));
response.sendBody((ByteBuffer) null, null, new CompletionHandler<Integer, Void>() {
WebSocketRunner temprunner = null;
@Override
public void completed(Integer result, Void attachment) {
HttpContext context = response.getContext();
if (webSocket.delayPackets != null) {
WebSocketRunner temprunner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.getChannel());
Runnable createUseridHandler = () -> {
CompletableFuture<Serializable> userFuture = webSocket.createUserid();
if (userFuture == null) {
if (debug) logger.finest("WebSocket connect abort, Create userid abort. request = " + request);
response.finish(true);
return;
}
userFuture.whenComplete((userid, ex2) -> {
if ((userid == null && webSocket.delayPackets == null) || ex2 != null) {
if (debug || ex2 != null) logger.log(ex2 == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Create userid abort. request = " + request, ex2);
response.finish(true);
return;
}
Runnable runHandler = () -> {
temprunner = null;
webSocket._userid = userid;
if (single && !anyuser) {
WebSocketServlet.this.node.existsWebSocket(userid).whenComplete((rs, ex) -> {
if (rs) webSocket.onSingleRepeatConnect();
WebSocketServlet.this.node.localEngine.add(webSocket);
WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel());
webSocket._runner = runner;
context.runAsync(runner);
response.finish(true);
});
} else {
WebSocketServlet.this.node.localEngine.add(webSocket);
WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel());
webSocket._runner = runner;
context.runAsync(runner);
response.finish(true);
}
};
if (webSocket.delayPackets != null) { //存在待发送的消息
if (temprunner == null) temprunner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.getChannel());
List<WebSocketPacket> delayPackets = webSocket.delayPackets;
webSocket.delayPackets = null;
CompletableFuture<Integer> cf = null;
for (WebSocketPacket packet : delayPackets) {
if (cf == null) {
cf = temprunner.sendMessage(packet);
} else {
cf = cf.thenCombine(temprunner.sendMessage(packet), (a, b) -> a | b);
}
}
cf.whenComplete((Integer v, Throwable t) -> {
if (userid == null || t != null || (temprunner != null && temprunner.isClosed())) {
if (t != null) logger.log(Level.FINEST, "WebSocket connect abort, Response send delayPackets abort. request = " + request, t);
response.finish(true);
} else {
runHandler.run();
}
});
} else {
runHandler.run();
}
});
};
if (webSocket.delayPackets != null) { //存在待发送的消息
if (temprunner == null) temprunner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.getChannel());
List<WebSocketPacket> delayPackets = webSocket.delayPackets;
webSocket.delayPackets = null;
CompletableFuture<Integer> cf = null;
@@ -210,54 +271,17 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
cf = cf.thenCombine(temprunner.sendMessage(packet), (a, b) -> a | b);
}
}
cf.whenComplete((v, t) -> response.finish(true));
if (sessionid == null) return;
}
CompletableFuture<Serializable> userFuture = webSocket.createUserid();
if (userFuture == null) {
if (debug) logger.finest("WebSocket connect abort, Create userid abort. request = " + request);
response.finish(true);
return;
}
userFuture.whenComplete((userid, ex2) -> {
if ((userid == null && webSocket.delayPackets == null) || ex2 != null) {
if (debug || ex2 != null) logger.log(ex2 == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Create userid abort. request = " + request, ex2);
response.finish(true);
return;
}
if (webSocket.delayPackets != null) {
WebSocketRunner temprunner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.getChannel());
List<WebSocketPacket> delayPackets = webSocket.delayPackets;
webSocket.delayPackets = null;
CompletableFuture<Integer> cf = null;
for (WebSocketPacket packet : delayPackets) {
if (cf == null) {
cf = temprunner.sendMessage(packet);
} else {
cf = cf.thenCombine(temprunner.sendMessage(packet), (a, b) -> a | b);
}
}
cf.whenComplete((v, t) -> response.finish(true));
if (userid == null) return;
}
webSocket._userid = userid;
if (single && !anyuser) {
WebSocketServlet.this.node.existsWebSocket(userid).whenComplete((rs, ex) -> {
if (rs) webSocket.onSingleRepeatConnect();
WebSocketServlet.this.node.localEngine.add(webSocket);
WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel());
webSocket._runner = runner;
context.runAsync(runner);
cf.whenComplete((Integer v, Throwable t) -> {
if (sessionid == null || t != null || (temprunner != null && temprunner.isClosed())) {
if (t != null) logger.log(Level.FINEST, "WebSocket connect abort, Response send delayPackets abort. request = " + request, t);
response.finish(true);
});
} else {
WebSocketServlet.this.node.localEngine.add(webSocket);
WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel());
webSocket._runner = runner;
context.runAsync(runner);
response.finish(true);
}
});
} else {
createUseridHandler.run();
}
});
} else {
createUseridHandler.run();
}
}
@Override