This commit is contained in:
Redkale
2020-08-19 23:03:35 +08:00
parent 60903c844e
commit 72155e31dd

View File

@@ -57,7 +57,7 @@ public abstract class WebSocketNode {
//集合包含 localSncpAddress
//如果不是分布式(没有SNCP)source 将不会被用到
@Resource(name = "$")
protected CacheSource<WebSocketAddress> source;
protected CacheSource source;
//当前节点的本地WebSocketEngine
protected WebSocketEngine localEngine;
@@ -224,7 +224,8 @@ public abstract class WebSocketNode {
return CompletableFuture.completedFuture(this.localEngine.getLocalUserSize());
}
tryAcquireSemaphore();
CompletableFuture<Integer> rs = this.source.queryKeysStartsWithAsync(SOURCE_SNCP_USERID_PREFIX).thenApply(v -> v.size());
CompletableFuture<List<String>> listFuture = this.source.queryKeysStartsWithAsync(SOURCE_SNCP_USERID_PREFIX);
CompletableFuture<Integer> rs = listFuture.thenApply(v -> v.size());
if (semaphore != null) rs.whenComplete((r, e) -> releaseSemaphore());
return rs;
}
@@ -240,7 +241,8 @@ public abstract class WebSocketNode {
return CompletableFuture.completedFuture(new LinkedHashSet<>(this.localEngine.getLocalUserSet().stream().map(x -> String.valueOf(x)).collect(Collectors.toList())));
}
tryAcquireSemaphore();
CompletableFuture<Set<String>> rs = this.source.queryKeysStartsWithAsync(SOURCE_SNCP_USERID_PREFIX).thenApply(v -> new LinkedHashSet<>(v.stream().map(x -> x.substring(SOURCE_SNCP_USERID_PREFIX.length())).collect(Collectors.toList())));
CompletableFuture<List<String>> listFuture = this.source.queryKeysStartsWithAsync(SOURCE_SNCP_USERID_PREFIX);
CompletableFuture<Set<String>> rs = listFuture.thenApply(v -> new LinkedHashSet<>(v.stream().map(x -> x.substring(SOURCE_SNCP_USERID_PREFIX.length())).collect(Collectors.toList())));
if (semaphore != null) rs.whenComplete((r, e) -> releaseSemaphore());
return rs;
}