diff --git a/src/main/java/org/redkale/net/http/WebSocket.java b/src/main/java/org/redkale/net/http/WebSocket.java
index 04e596d1a..fbc4377dc 100644
--- a/src/main/java/org/redkale/net/http/WebSocket.java
+++ b/src/main/java/org/redkale/net/http/WebSocket.java
@@ -5,20 +5,20 @@
*/
package org.redkale.net.http;
-import org.redkale.net.http.WebSocketPacket.FrameType;
-import java.io.*;
+import java.io.Serializable;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.*;
-import java.util.concurrent.*;
+import java.util.concurrent.CompletableFuture;
import java.util.function.*;
import java.util.logging.*;
import java.util.stream.Stream;
import java.util.zip.*;
+import org.redkale.annotation.Comment;
import org.redkale.convert.Convert;
import org.redkale.net.AsyncConnection;
-import org.redkale.annotation.Comment;
+import org.redkale.net.http.WebSocketPacket.FrameType;
/**
*
@@ -507,7 +507,7 @@ public abstract class WebSocket {
*
* @return 地址列表
*/
- public CompletableFuture> getRpcNodeAddresses(final Serializable userid) {
+ public CompletableFuture> getRpcNodeAddresses(final Serializable userid) {
if (_engine.node == null) return CompletableFuture.completedFuture(null);
return _engine.node.getRpcNodeAddresses(userid);
}
diff --git a/src/main/java/org/redkale/net/http/WebSocketNode.java b/src/main/java/org/redkale/net/http/WebSocketNode.java
index 604a366ad..ae07e5969 100644
--- a/src/main/java/org/redkale/net/http/WebSocketNode.java
+++ b/src/main/java/org/redkale/net/http/WebSocketNode.java
@@ -86,7 +86,7 @@ public abstract class WebSocketNode {
this.wsNodeAddress = new WebSocketAddress(mqtopic, new InetSocketAddress("127.0.0.1", 27));
}
if (source != null && wsNodeAddress != null) {
- source.appendSetItem(WS_SOURCE_KEY_NODES, WebSocketAddress.class, this.wsNodeAddress);
+ source.sadd(WS_SOURCE_KEY_NODES, WebSocketAddress.class, this.wsNodeAddress);
}
}
@@ -109,7 +109,7 @@ public abstract class WebSocketNode {
//关掉所有本地本地WebSocket
this.localEngine.getLocalWebSockets().forEach(g -> g.close());
if (source != null && wsNodeAddress != null) {
- source.removeSetItem(WS_SOURCE_KEY_NODES, WebSocketAddress.class, this.wsNodeAddress);
+ source.srem(WS_SOURCE_KEY_NODES, WebSocketAddress.class, this.wsNodeAddress);
}
}
@@ -184,14 +184,14 @@ public abstract class WebSocketNode {
*
* @return 地址列表
*/
- public CompletableFuture> getRpcNodeAddresses(final Serializable userid) {
+ public CompletableFuture> getRpcNodeAddresses(final Serializable userid) {
if (this.source != null) {
tryAcquireSemaphore();
- CompletableFuture> result = this.source.getCollectionAsync(WS_SOURCE_KEY_USERID_PREFIX + userid, WebSocketAddress.class);
+ CompletableFuture> result = this.source.smembersAsync(WS_SOURCE_KEY_USERID_PREFIX + userid, WebSocketAddress.class);
if (semaphore != null) result.whenComplete((r, e) -> releaseSemaphore());
return result;
}
- List rs = new ArrayList<>();
+ Set rs = new LinkedHashSet<>();
rs.add(this.wsNodeAddress);
return CompletableFuture.completedFuture(rs);
}
@@ -206,7 +206,7 @@ public abstract class WebSocketNode {
* @return 地址集合
*/
public CompletableFuture