This commit is contained in:
Redkale
2017-11-13 09:39:51 +08:00
parent 6fddd8b53b
commit 97f43a4d8d
2 changed files with 5 additions and 5 deletions

View File

@@ -32,7 +32,7 @@ public abstract class WebSocketNode {
public static final String SOURCE_SNCP_USERID_PREFIX = "sncpws_uid:";
@Comment("存储当前SNCP节点列表的key")
public static final String SOURCE_SNCP_NODES_KEY = "sncpws_nodes";
public static final String SOURCE_SNCP_ADDRS_KEY = "sncpws_addrs";
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
@@ -64,7 +64,7 @@ public abstract class WebSocketNode {
//关掉所有本地本地WebSocket
this.localEngine.getLocalWebSockets().forEach(g -> disconnect(g.getUserid()));
if (sncpNodeAddresses != null && localSncpAddress != null) {
sncpNodeAddresses.removeSetItem(SOURCE_SNCP_NODES_KEY, localSncpAddress);
sncpNodeAddresses.removeSetItem(SOURCE_SNCP_ADDRS_KEY, localSncpAddress);
}
}
@@ -175,7 +175,7 @@ public abstract class WebSocketNode {
return CompletableFuture.completedFuture(this.localEngine.getLocalUserSize());
}
return this.sncpNodeAddresses.getKeySizeAsync().thenCompose(count -> {
return sncpNodeAddresses.existsAsync(SOURCE_SNCP_NODES_KEY).thenApply(exists -> exists ? (count - 1) : count);
return sncpNodeAddresses.existsAsync(SOURCE_SNCP_ADDRS_KEY).thenApply(exists -> exists ? (count - 1) : count);
});
}
@@ -398,7 +398,7 @@ public abstract class WebSocketNode {
return this.localEngine.broadcastMessage(message, last);
}
CompletableFuture<Integer> localFuture = this.localEngine == null ? null : this.localEngine.broadcastMessage(message, last);
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_NODES_KEY);
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_ADDRS_KEY);
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket broadcast message on " + addrs);
if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0);

View File

@@ -69,7 +69,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
@Override
public CompletableFuture<Void> connect(Serializable userid, InetSocketAddress sncpAddr) {
CompletableFuture<Void> future = sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, sncpAddr);
future = future.thenAccept((a) -> sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_NODES_KEY, sncpAddr));
future = future.thenAccept((a) -> sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_ADDRS_KEY, sncpAddr));
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " connect from " + sncpAddr);
return future;
}