diff --git a/src/org/redkale/net/http/WebSocketEngine.java b/src/org/redkale/net/http/WebSocketEngine.java index ee06fc55a..8e03af683 100644 --- a/src/org/redkale/net/http/WebSocketEngine.java +++ b/src/org/redkale/net/http/WebSocketEngine.java @@ -452,6 +452,11 @@ public class WebSocketEngine { return (int) websockets2.values().stream().mapToInt(sublist -> sublist.size()).count(); } + @Comment("获取当前用户总数") + public Set getLocalUserSet() { + return single ? new LinkedHashSet<>(websockets.keySet()) : new LinkedHashSet<>(websockets2.keySet()); + } + @Comment("获取当前用户总数") public int getLocalUserSize() { return single ? websockets.size() : websockets2.size(); diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index 87051f82f..0ccecfba0 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -11,7 +11,7 @@ import java.net.*; import java.util.*; import java.util.concurrent.*; import java.util.logging.*; -import java.util.stream.Stream; +import java.util.stream.*; import javax.annotation.*; import org.redkale.boot.*; import org.redkale.convert.*; @@ -208,6 +208,22 @@ public abstract class WebSocketNode { return rs; } + /** + * 获取在线用户总数 + * + * + * @return boolean + */ + public CompletableFuture> getUserSet() { + if (this.localEngine != null && this.sncpNodeAddresses == null) { + return CompletableFuture.completedFuture(new LinkedHashSet<>(this.localEngine.getLocalUserSet().stream().map(x -> String.valueOf(x)).collect(Collectors.toList()))); + } + tryAcquireSemaphore(); + CompletableFuture> rs = this.sncpNodeAddresses.queryKeysStartsWithAsync(SOURCE_SNCP_USERID_PREFIX).thenApply(v -> new LinkedHashSet<>(v.stream().map(x -> x.substring(SOURCE_SNCP_USERID_PREFIX.length())).collect(Collectors.toList()))); + if (semaphore != null) rs.whenComplete((r, e) -> releaseSemaphore()); + return rs; + } + /** * 判断指定用户是否WebSocket在线 *