This commit is contained in:
Redkale
2020-06-17 22:49:27 +08:00
parent a11127ea58
commit 9def35e2e1

View File

@@ -34,7 +34,7 @@ public abstract class WebSocketNode {
public static final String SOURCE_SNCP_USERID_PREFIX = "sncpws_uid:";
@Comment("存储当前SNCP节点列表的key")
public static final String SOURCE_SNCP_ADDRS_KEY = "sncpws_addrs";
public static final String SOURCE_SNCP_NODES_KEY = "sncpws_nodes";
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
@@ -104,7 +104,7 @@ public abstract class WebSocketNode {
//关掉所有本地本地WebSocket
this.localEngine.getLocalWebSockets().forEach(g -> g.close());
if (source != null && wsaddress != null) {
source.removeSetItem(SOURCE_SNCP_ADDRS_KEY, WebSocketAddress.class, this.wsaddress);
source.removeSetItem(SOURCE_SNCP_NODES_KEY, WebSocketAddress.class, this.wsaddress);
}
}
@@ -785,7 +785,7 @@ public abstract class WebSocketNode {
final Object remoteMessage = formatRemoteMessage(message);
CompletableFuture<Integer> localFuture = this.localEngine == null ? null : this.localEngine.broadcastLocalMessage(wsrange, message, last);
tryAcquireSemaphore();
CompletableFuture<Collection<WebSocketAddress>> addrsFuture = source.getCollectionAsync(SOURCE_SNCP_ADDRS_KEY, WebSocketAddress.class);
CompletableFuture<Collection<WebSocketAddress>> addrsFuture = source.getCollectionAsync(SOURCE_SNCP_NODES_KEY, WebSocketAddress.class);
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<WebSocketAddress> addrs) -> {
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket broadcast message (" + remoteMessage + ") on " + addrs);
@@ -815,7 +815,7 @@ public abstract class WebSocketNode {
}
CompletableFuture<Integer> localFuture = this.localEngine == null ? null : this.localEngine.broadcastLocalAction(action);
tryAcquireSemaphore();
CompletableFuture<Collection<WebSocketAddress>> addrsFuture = source.getCollectionAsync(SOURCE_SNCP_ADDRS_KEY, WebSocketAddress.class);
CompletableFuture<Collection<WebSocketAddress>> addrsFuture = source.getCollectionAsync(SOURCE_SNCP_NODES_KEY, WebSocketAddress.class);
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<WebSocketAddress> addrs) -> {
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket broadcast action (" + action + ") on " + addrs);