修改WebSocketNode.existsWebSocket的实现
This commit is contained in:
@@ -104,6 +104,8 @@ public abstract class WebSocketNode {
|
||||
|
||||
protected abstract CompletableFuture<Void> changeUserid(Serializable fromuserid, Serializable touserid, InetSocketAddress sncpAddr);
|
||||
|
||||
protected abstract CompletableFuture<Boolean> existsWebSocket(Serializable userid, @RpcTargetAddress InetSocketAddress targetAddress);
|
||||
|
||||
protected abstract CompletableFuture<Integer> forceCloseWebSocket(Serializable userid, @RpcTargetAddress InetSocketAddress targetAddress);
|
||||
|
||||
//--------------------------------------------------------------------------------
|
||||
@@ -186,23 +188,6 @@ public abstract class WebSocketNode {
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断指定用户是否WebSocket在线
|
||||
*
|
||||
* @param userid Serializable
|
||||
*
|
||||
* @return boolean
|
||||
*/
|
||||
public CompletableFuture<Boolean> existsWebSocket(final Serializable userid) {
|
||||
if (this.localEngine != null && this.sncpNodeAddresses == null) {
|
||||
return CompletableFuture.completedFuture(this.localEngine.existsLocalWebSocket(userid));
|
||||
}
|
||||
tryAcquireSemaphore();
|
||||
CompletableFuture<Boolean> rs = this.sncpNodeAddresses.existsAsync(SOURCE_SNCP_USERID_PREFIX + userid);
|
||||
if (semaphore != null) rs.whenComplete((r, e) -> releaseSemaphore());
|
||||
return rs;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取在线用户总数
|
||||
*
|
||||
@@ -219,6 +204,59 @@ public abstract class WebSocketNode {
|
||||
return rs;
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
*
|
||||
* 判断指定用户是否WebSocket在线
|
||||
*
|
||||
* @param userid Serializable
|
||||
*
|
||||
* @return boolean
|
||||
*/
|
||||
private CompletableFuture<Boolean> existsWebSocket2(final Serializable userid) {
|
||||
if (this.localEngine != null && this.sncpNodeAddresses == null) {
|
||||
return CompletableFuture.completedFuture(this.localEngine.existsLocalWebSocket(userid));
|
||||
}
|
||||
tryAcquireSemaphore();
|
||||
CompletableFuture<Boolean> rs = this.sncpNodeAddresses.existsAsync(SOURCE_SNCP_USERID_PREFIX + userid);
|
||||
if (semaphore != null) rs.whenComplete((r, e) -> releaseSemaphore());
|
||||
return rs;
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断指定用户是否WebSocket在线
|
||||
*
|
||||
* @param userid Serializable
|
||||
*
|
||||
* @return boolean
|
||||
*/
|
||||
@Local
|
||||
public CompletableFuture<Boolean> existsWebSocket(final Serializable userid) {
|
||||
CompletableFuture<Boolean> localFuture = null;
|
||||
if (this.localEngine != null) localFuture = CompletableFuture.completedFuture(localEngine.existsLocalWebSocket(userid));
|
||||
if (this.sncpNodeAddresses == null || this.remoteNode == null) {
|
||||
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket remote node is null");
|
||||
//没有CacheSource就不会有分布式节点
|
||||
return localFuture;
|
||||
}
|
||||
//远程节点关闭
|
||||
tryAcquireSemaphore();
|
||||
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class);
|
||||
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
|
||||
CompletableFuture<Boolean> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
|
||||
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket found userid:" + userid + " on " + addrs);
|
||||
if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(false);
|
||||
CompletableFuture<Boolean> future = null;
|
||||
for (InetSocketAddress addr : addrs) {
|
||||
if (addr == null || addr.equals(localSncpAddress)) continue;
|
||||
future = future == null ? remoteNode.existsWebSocket(userid, addr)
|
||||
: future.thenCombine(remoteNode.existsWebSocket(userid, addr), (a, b) -> a | b);
|
||||
}
|
||||
return future == null ? CompletableFuture.completedFuture(false) : future;
|
||||
});
|
||||
return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b);
|
||||
}
|
||||
|
||||
/**
|
||||
* 强制关闭用户WebSocket
|
||||
*
|
||||
|
||||
@@ -133,6 +133,21 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
|
||||
return future;
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断用户是否有WebSocket
|
||||
*
|
||||
* @param userid Serializable
|
||||
* @param targetAddress InetSocketAddress
|
||||
*
|
||||
* @return 无返回值
|
||||
*/
|
||||
@Override
|
||||
public CompletableFuture<Boolean> existsWebSocket(Serializable userid, @RpcTargetAddress InetSocketAddress targetAddress) {
|
||||
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " existsWebSocket from " + targetAddress);
|
||||
if (localEngine == null) return CompletableFuture.completedFuture(false);
|
||||
return CompletableFuture.completedFuture(localEngine.existsLocalWebSocket(userid));
|
||||
}
|
||||
|
||||
/**
|
||||
* 强制关闭用户的WebSocket
|
||||
*
|
||||
|
||||
Reference in New Issue
Block a user