增加WebSocket在onOpen和createUserid方法里返回无效用户信息前也可以发送信息

This commit is contained in:
Redkale
2017-12-20 08:54:54 +08:00
parent 5b1f820621
commit 9ff161c97d
2 changed files with 42 additions and 2 deletions

View File

@@ -64,6 +64,9 @@ public abstract class WebSocket<G extends Serializable, T> {
@Comment("WebSocket已离线") @Comment("WebSocket已离线")
public static final int RETCODE_WSOFFLINE = 1 << 8; //256 public static final int RETCODE_WSOFFLINE = 1 << 8; //256
@Comment("WebSocket将延迟发送")
public static final int RETCODE_DEAYSEND = 1 << 9; //512
WebSocketRunner _runner; //不可能为空 WebSocketRunner _runner; //不可能为空
WebSocketEngine _engine; //不可能为空 WebSocketEngine _engine; //不可能为空
@@ -90,6 +93,8 @@ public abstract class WebSocket<G extends Serializable, T> {
private Map<String, Object> attributes = new HashMap<>(); //非线程安全 private Map<String, Object> attributes = new HashMap<>(); //非线程安全
List<WebSocketPacket> delayPackets;
protected WebSocket() { protected WebSocket() {
} }
@@ -225,6 +230,11 @@ public abstract class WebSocket<G extends Serializable, T> {
* @return 0表示成功 非0表示错误码 * @return 0表示成功 非0表示错误码
*/ */
CompletableFuture<Integer> sendPacket(WebSocketPacket packet) { CompletableFuture<Integer> sendPacket(WebSocketPacket packet) {
if (this._runner == null) {
if (delayPackets == null) delayPackets = new ArrayList<>();
delayPackets.add(packet);
return CompletableFuture.completedFuture(RETCODE_DEAYSEND);
}
CompletableFuture<Integer> rs = this._runner.sendMessage(packet); CompletableFuture<Integer> rs = this._runner.sendMessage(packet);
if (_engine.logger.isLoggable(Level.FINEST) && packet != WebSocketPacket.DEFAULT_PING_PACKET) { if (_engine.logger.isLoggable(Level.FINEST) && packet != WebSocketPacket.DEFAULT_PING_PACKET) {
_engine.logger.finest("userid:" + getUserid() + " send websocket message(" + packet + ")" + " on " + this); _engine.logger.finest("userid:" + getUserid() + " send websocket message(" + packet + ")" + " on " + this);

View File

@@ -178,7 +178,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
return; return;
} }
sessionFuture.whenComplete((sessionid, ex) -> { sessionFuture.whenComplete((sessionid, ex) -> {
if (sessionid == null || ex != null) { if ((sessionid == null && webSocket.delayPackets == null) || ex != null) {
if (debug || ex != null) logger.log(ex == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Not found sessionid or occur error. request=" + request, ex); if (debug || ex != null) logger.log(ex == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Not found sessionid or occur error. request=" + request, ex);
response.finish(true); response.finish(true);
return; return;
@@ -198,6 +198,21 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
@Override @Override
public void completed(Integer result, Void attachment) { public void completed(Integer result, Void attachment) {
HttpContext context = response.getContext(); HttpContext context = response.getContext();
if (sessionid == null && webSocket.delayPackets != null) {
WebSocketRunner temprunner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.getChannel());
List<WebSocketPacket> delayPackets = webSocket.delayPackets;
webSocket.delayPackets = null;
CompletableFuture<Integer> cf = null;
for (WebSocketPacket packet : delayPackets) {
if (cf == null) {
cf = temprunner.sendMessage(packet);
} else {
cf = cf.thenCombine(temprunner.sendMessage(packet), (a, b) -> a | b);
}
}
cf.whenComplete((v, t) -> response.finish(true));
return;
}
CompletableFuture<Serializable> userFuture = webSocket.createUserid(); CompletableFuture<Serializable> userFuture = webSocket.createUserid();
if (userFuture == null) { if (userFuture == null) {
if (debug) logger.finest("WebSocket connect abort, Create userid abort. request = " + request); if (debug) logger.finest("WebSocket connect abort, Create userid abort. request = " + request);
@@ -205,11 +220,26 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
return; return;
} }
userFuture.whenComplete((userid, ex2) -> { userFuture.whenComplete((userid, ex2) -> {
if (userid == null || ex2 != null) { if ((userid == null && webSocket.delayPackets == null) || ex2 != null) {
if (debug || ex2 != null) logger.log(ex2 == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Create userid abort. request = " + request, ex2); if (debug || ex2 != null) logger.log(ex2 == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Create userid abort. request = " + request, ex2);
response.finish(true); response.finish(true);
return; return;
} }
if (userid == null && webSocket.delayPackets != null) {
WebSocketRunner temprunner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.getChannel());
List<WebSocketPacket> delayPackets = webSocket.delayPackets;
webSocket.delayPackets = null;
CompletableFuture<Integer> cf = null;
for (WebSocketPacket packet : delayPackets) {
if (cf == null) {
cf = temprunner.sendMessage(packet);
} else {
cf = cf.thenCombine(temprunner.sendMessage(packet), (a, b) -> a | b);
}
}
cf.whenComplete((v, t) -> response.finish(true));
return;
}
webSocket._userid = userid; webSocket._userid = userid;
if (single && !anyuser) { if (single && !anyuser) {
WebSocketServlet.this.node.existsWebSocket(userid).whenComplete((rs, ex) -> { WebSocketServlet.this.node.existsWebSocket(userid).whenComplete((rs, ex) -> {