diff --git a/src/org/redkale/net/http/WebSocket.java b/src/org/redkale/net/http/WebSocket.java index b2673d524..56276e1ec 100644 --- a/src/org/redkale/net/http/WebSocket.java +++ b/src/org/redkale/net/http/WebSocket.java @@ -429,6 +429,18 @@ public abstract class WebSocket { return _engine.node.getRpcNodeWebSocketAddresses(userid); } + /** + * 更改本WebSocket的userid + * + * @param newuserid 新用户ID,不能为null + * + * @return CompletableFuture + */ + public CompletableFuture changeUserid(final G newuserid) { + if (newuserid == null) throw new NullPointerException("newuserid is null"); + return _engine.changeUserid(this, newuserid); + } + /** * 强制关闭用户的所有WebSocket * diff --git a/src/org/redkale/net/http/WebSocketEngine.java b/src/org/redkale/net/http/WebSocketEngine.java index 0c1b6deb6..751a0cb06 100644 --- a/src/org/redkale/net/http/WebSocketEngine.java +++ b/src/org/redkale/net/http/WebSocketEngine.java @@ -149,6 +149,31 @@ public class WebSocketEngine { } } + @Comment("更改WebSocket的userid") + CompletableFuture changeUserid(WebSocket socket, final Serializable newuserid) { + if (newuserid == null) throw new NullPointerException("newuserid is null"); + final Serializable olduserid = socket._userid; + socket._userid = newuserid; + if (single) { + websockets.remove(olduserid); + websockets.put(newuserid, socket); + } else { //非线程安全, 在常规场景中无需锁 + List oldlist = websockets2.get(olduserid); + if (oldlist != null) { + oldlist.remove(socket); + if (oldlist.isEmpty()) websockets2.remove(olduserid); + } + List newlist = websockets2.get(newuserid); + if (newlist == null) { + newlist = new CopyOnWriteArrayList<>(); + websockets2.put(newuserid, newlist); + } + newlist.add(socket); + } + if (node != null) return node.changeUserid(olduserid, newuserid); + return CompletableFuture.completedFuture(null); + } + @Comment("强制关闭本地用户的WebSocket") public int forceCloseLocalWebSocket(Serializable userid) { if (single) { diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index 03ef81815..f61215c13 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -61,7 +61,7 @@ public abstract class WebSocketNode { protected WebSocketEngine localEngine; public void init(AnyValue conf) { - if(sncpNodeAddresses != null) sncpNodeAddresses.initValueType(InetSocketAddress.class); + if (sncpNodeAddresses != null) sncpNodeAddresses.initValueType(InetSocketAddress.class); } public void destroy(AnyValue conf) { @@ -86,6 +86,8 @@ public abstract class WebSocketNode { protected abstract CompletableFuture disconnect(Serializable userid, InetSocketAddress addr); + protected abstract CompletableFuture changeUserid(Serializable fromuserid, Serializable touserid, InetSocketAddress addr); + protected abstract CompletableFuture forceCloseWebSocket(Serializable userid, InetSocketAddress addr); //-------------------------------------------------------------------------------- @@ -99,6 +101,11 @@ public abstract class WebSocketNode { return disconnect(userid, localSncpAddress); } + final CompletableFuture changeUserid(Serializable olduserid, final Serializable newuserid) { + if (logger.isLoggable(Level.FINEST)) logger.finest(localSncpAddress + " receive websocket changeUserid event (from " + olduserid + " to " + newuserid + " on " + (this.localEngine == null ? null : this.localEngine.getEngineid()) + ")."); + return changeUserid(olduserid, newuserid, localSncpAddress); + } + //-------------------------------------------------------------------------------- /** * 获取目标地址
diff --git a/src/org/redkale/service/WebSocketNodeService.java b/src/org/redkale/service/WebSocketNodeService.java index 10f0f1fdd..f3f8f88f0 100644 --- a/src/org/redkale/service/WebSocketNodeService.java +++ b/src/org/redkale/service/WebSocketNodeService.java @@ -61,7 +61,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service { /** * 当用户连接到节点,需要更新到CacheSource * - * @param userid String + * @param userid Serializable * @param sncpAddr InetSocketAddress * * @return 无返回值 @@ -78,7 +78,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service { /** * 当用户从一个节点断掉了所有的连接,需要从CacheSource中删除 * - * @param userid String + * @param userid Serializable * @param sncpAddr InetSocketAddress * * @return 无返回值 @@ -91,10 +91,27 @@ public class WebSocketNodeService extends WebSocketNode implements Service { return future; } + /** + * 更改用户ID,需要更新到CacheSource + * + * @param olduserid Serializable + * @param newuserid Serializable + * @param sncpAddr InetSocketAddress + * + * @return 无返回值 + */ + @Override + public CompletableFuture changeUserid(Serializable olduserid, Serializable newuserid, InetSocketAddress sncpAddr) { + CompletableFuture future = sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_USERID_PREFIX + newuserid, sncpAddr); + future = future.thenAccept((a) -> sncpNodeAddresses.removeSetItemAsync(SOURCE_SNCP_USERID_PREFIX + olduserid, sncpAddr)); + if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + olduserid + " changeUserid to " + newuserid + " from " + sncpAddr); + return future; + } + /** * 强制关闭用户的WebSocket * - * @param userid String + * @param userid Serializable * @param sncpAddr InetSocketAddress * * @return 无返回值