This commit is contained in:
Redkale
2017-05-22 15:27:53 +08:00
parent 52d559ea4a
commit 9e6840f5cb
2 changed files with 35 additions and 29 deletions

View File

@@ -34,14 +34,15 @@ public abstract class WebSocketNode {
@Resource(name = Application.RESNAME_SNCP_ADDR)
protected InetSocketAddress localSncpAddress; //为SncpServer的服务address
//如果不是分布式(没有SNCP) 值为null
@RpcRemote
protected WebSocketNode remoteNode;
//存放所有用户分布在节点上的队列信息,Set<InetSocketAddress> 为 sncpnode 的集合, key: groupid
//包含 localSncpAddress
//如果不是分布式(没有SNCP)sncpAddressNodes 将不会被用到
//集合包含 localSncpAddress
//如果不是分布式(没有SNCP)sncpNodeAddresses 将不会被用到
@Resource(name = "$")
protected CacheSource<Serializable, InetSocketAddress> sncpAddressNodes;
protected CacheSource<Serializable, InetSocketAddress> sncpNodeAddresses;
//当前节点的本地WebSocketEngine
protected WebSocketEngine localEngine;
@@ -80,7 +81,7 @@ public abstract class WebSocketNode {
//--------------------------------------------------------------------------------
/**
* 获取目标地址 <br>
* 该方法只能被内部调用
* 该方法仅供内部调用
*
* @param targetAddress
* @param groupid
@@ -98,48 +99,53 @@ public abstract class WebSocketNode {
}
/**
* 获取用户在线的SNCP节点地址列表不是分布式则返回列表<br>
* 获取用户在线的SNCP节点地址列表不是分布式则返回元素数量为1且元素值为null的列表<br>
* InetSocketAddress 为 SNCP节点地址
*
* @param groupid groupid
*
* @return 地址列表
*/
public CompletableFuture<Collection<InetSocketAddress>> getSncpAddresses(final Serializable groupid) {
if (this.sncpAddressNodes != null) return this.sncpAddressNodes.getCollectionAsync(groupid);
public CompletableFuture<Collection<InetSocketAddress>> getSncpNodeAddresses(final Serializable groupid) {
if (this.sncpNodeAddresses != null) return this.sncpNodeAddresses.getCollectionAsync(groupid);
List<InetSocketAddress> rs = new ArrayList<>();
if (this.localSncpAddress != null) rs.add(this.localSncpAddress);
rs.add(this.localSncpAddress);
return CompletableFuture.completedFuture(rs);
}
/**
* 获取在线用户的详细连接信息 <br>
* Map.key 为 SNCP节点地址
* Map.key 为 SNCP节点地址, 含值为null的key表示没有分布式
* Map.value 为 用户客户端的IP
*
* @param groupid groupid
*
* @return 地址集合
*/
//异步待优化
public CompletableFuture<Map<InetSocketAddress, List<String>>> getSncpNodeWebSocketAddresses(final Serializable groupid) {
final CompletableFuture<Map<InetSocketAddress, List<String>>> rs = new CompletableFuture<>();
CompletableFuture<Collection<InetSocketAddress>> nodesFuture = getSncpAddresses(groupid);
if (nodesFuture == null) return CompletableFuture.completedFuture(null);
nodesFuture.whenComplete((nodes, e) -> {
if (e != null) {
rs.completeExceptionally(e);
} else {
final Map<InetSocketAddress, List<String>> map = new HashMap();
for (final InetSocketAddress nodeAddress : nodes) {
List<String> list = getWebSocketAddresses(nodeAddress, groupid).join();
if (list == null) list = new ArrayList();
map.put(nodeAddress, list);
CompletableFuture<Collection<InetSocketAddress>> sncpFuture = getSncpNodeAddresses(groupid);
return sncpFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
if (finest) logger.finest("websocket found groupid:" + groupid + " on " + addrs);
if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(new HashMap<>());
CompletableFuture<Map<InetSocketAddress, List<String>>> future = null;
for (final InetSocketAddress nodeAddress : addrs) {
CompletableFuture<Map<InetSocketAddress, List<String>>> mapFuture = getWebSocketAddresses(nodeAddress, groupid)
.thenCompose((List<String> list) -> {
Map<InetSocketAddress, List<String>> map = new HashMap<>();
map.put(nodeAddress, list);
return CompletableFuture.completedFuture(map);
});
if (future == null) {
future = mapFuture;
} else {
future = future.thenCombine(mapFuture, (a, b) -> {
a.putAll(b);
return a;
});
}
rs.complete(map);
}
return future == null ? CompletableFuture.completedFuture(new HashMap<>()) : future;
});
return rs;
}
//--------------------------------------------------------------------------------
@@ -184,19 +190,19 @@ public abstract class WebSocketNode {
if (finest) logger.finest("websocket want send recent message success");
return localFuture;
}
if (this.sncpAddressNodes == null || this.remoteNode == null) {
if (this.sncpNodeAddresses == null || this.remoteNode == null) {
if (finest) logger.finest("websocket remote node is null");
//没有CacheSource就不会有分布式节点
return localFuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localFuture;
}
//远程节点发送消息
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpAddressNodes.getCollectionAsync(groupid);
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(groupid);
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
if (finest) logger.finest("websocket found groupid:" + groupid + " on " + addrs);
if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0);
CompletableFuture<Integer> future = null;
for (InetSocketAddress addr : addrs) {
if (addr.equals(localSncpAddress)) continue;
if (addr == null || addr.equals(localSncpAddress)) continue;
if (future == null) {
future = remoteNode.sendMessage(addr, groupid, recent, message, last);
} else {

View File

@@ -67,7 +67,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
*/
@Override
public CompletableFuture<Void> connect(Serializable groupid, InetSocketAddress sncpAddr) {
CompletableFuture<Void> future = sncpAddressNodes.appendSetItemAsync(groupid, sncpAddr);
CompletableFuture<Void> future = sncpNodeAddresses.appendSetItemAsync(groupid, sncpAddr);
if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " connect from " + sncpAddr);
return future;
}
@@ -82,7 +82,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
*/
@Override
public CompletableFuture<Void> disconnect(Serializable groupid, InetSocketAddress sncpAddr) {
CompletableFuture<Void> future = sncpAddressNodes.removeSetItemAsync(groupid, sncpAddr);
CompletableFuture<Void> future = sncpNodeAddresses.removeSetItemAsync(groupid, sncpAddr);
if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " disconnect from " + sncpAddr);
return future;
}