From ad1d9f33d4e6d4e72d1f0c489493283abcfad005 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Tue, 7 Jan 2020 13:07:04 +0800 Subject: [PATCH] =?UTF-8?q?WebSocket=E5=A2=9E=E5=8A=A0getUserSet=E6=96=B9?= =?UTF-8?q?=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/org/redkale/net/http/WebSocketEngine.java | 5 +++++ src/org/redkale/net/http/WebSocketNode.java | 18 +++++++++++++++++- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/src/org/redkale/net/http/WebSocketEngine.java b/src/org/redkale/net/http/WebSocketEngine.java index ee06fc55a..8e03af683 100644 --- a/src/org/redkale/net/http/WebSocketEngine.java +++ b/src/org/redkale/net/http/WebSocketEngine.java @@ -452,6 +452,11 @@ public class WebSocketEngine { return (int) websockets2.values().stream().mapToInt(sublist -> sublist.size()).count(); } + @Comment("获取当前用户总数") + public Set getLocalUserSet() { + return single ? new LinkedHashSet<>(websockets.keySet()) : new LinkedHashSet<>(websockets2.keySet()); + } + @Comment("获取当前用户总数") public int getLocalUserSize() { return single ? websockets.size() : websockets2.size(); diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index 87051f82f..0ccecfba0 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -11,7 +11,7 @@ import java.net.*; import java.util.*; import java.util.concurrent.*; import java.util.logging.*; -import java.util.stream.Stream; +import java.util.stream.*; import javax.annotation.*; import org.redkale.boot.*; import org.redkale.convert.*; @@ -208,6 +208,22 @@ public abstract class WebSocketNode { return rs; } + /** + * 获取在线用户总数 + * + * + * @return boolean + */ + public CompletableFuture> getUserSet() { + if (this.localEngine != null && this.sncpNodeAddresses == null) { + return CompletableFuture.completedFuture(new LinkedHashSet<>(this.localEngine.getLocalUserSet().stream().map(x -> String.valueOf(x)).collect(Collectors.toList()))); + } + tryAcquireSemaphore(); + CompletableFuture> rs = this.sncpNodeAddresses.queryKeysStartsWithAsync(SOURCE_SNCP_USERID_PREFIX).thenApply(v -> new LinkedHashSet<>(v.stream().map(x -> x.substring(SOURCE_SNCP_USERID_PREFIX.length())).collect(Collectors.toList()))); + if (semaphore != null) rs.whenComplete((r, e) -> releaseSemaphore()); + return rs; + } + /** * 判断指定用户是否WebSocket在线 *