增加 @RpcTargetTopic

This commit is contained in:
Redkale
2020-06-08 19:31:29 +08:00
parent 3eb3e0104d
commit 0e5479d55e
6 changed files with 232 additions and 232 deletions

View File

@@ -522,7 +522,7 @@ public abstract class WebSocket<G extends Serializable, T> {
*
* @return 地址列表
*/
public CompletableFuture<Collection<InetSocketAddress>> getRpcNodeAddresses(final Serializable userid) {
public CompletableFuture<Collection<WebSocketAddress>> getRpcNodeAddresses(final Serializable userid) {
if (_engine.node == null) return CompletableFuture.completedFuture(null);
return _engine.node.getRpcNodeAddresses(userid);
}
@@ -536,7 +536,7 @@ public abstract class WebSocket<G extends Serializable, T> {
*
* @return 地址集合
*/
public CompletableFuture<Map<InetSocketAddress, List<String>>> getRpcNodeWebSocketAddresses(final Serializable userid) {
public CompletableFuture<Map<WebSocketAddress, List<String>>> getRpcNodeWebSocketAddresses(final Serializable userid) {
if (_engine.node == null) return CompletableFuture.completedFuture(null);
return _engine.node.getRpcNodeWebSocketAddresses(userid);
}

View File

@@ -42,6 +42,8 @@ public abstract class WebSocketNode {
@Resource(name = Application.RESNAME_SNCP_ADDR)
protected InetSocketAddress localSncpAddress; //为SncpServer的服务address
protected WebSocketAddress wsaddress;
protected String name;
//如果不是分布式(没有SNCP) 值为null
@@ -51,11 +53,11 @@ public abstract class WebSocketNode {
@Resource(name = "$_sendconvert")
protected Convert sendConvert;
//存放所有用户分布在节点上的队列信息,Set<InetSocketAddress> 为 sncpnode 的集合, key: groupid
//存放所有用户分布在节点上的队列信息,Set<WebSocketAddress> 为 sncpnode 的集合, key: groupid
//集合包含 localSncpAddress
//如果不是分布式(没有SNCP)sncpNodeAddresses 将不会被用到
//如果不是分布式(没有SNCP)source 将不会被用到
@Resource(name = "$")
protected CacheSource<InetSocketAddress> sncpNodeAddresses;
protected CacheSource<WebSocketAddress> source;
//当前节点的本地WebSocketEngine
protected WebSocketEngine localEngine;
@@ -69,14 +71,18 @@ public abstract class WebSocketNode {
public void init(AnyValue conf) {
this.tryAcquireSeconds = Integer.getInteger("WebSocketNode.tryAcquireSeconds", 12);
if (sncpNodeAddresses != null && "memory".equals(sncpNodeAddresses.getType())) {
sncpNodeAddresses.initValueType(InetSocketAddress.class);
if (source != null && "memory".equals(source.getType())) {
source.initValueType(WebSocketAddress.class);
}
if (localEngine != null) {
int wsthreads = localEngine.wsthreads;
if (wsthreads == 0) wsthreads = Runtime.getRuntime().availableProcessors() * 8;
if (wsthreads > 0) this.semaphore = new Semaphore(wsthreads);
}
String mqtopic = this.messageAgent == null ? null : this.messageAgent.generateWebSocketRespTopic(this);
if (mqtopic != null || this.localSncpAddress != null) {
this.wsaddress = new WebSocketAddress(mqtopic, localSncpAddress);
}
}
public void destroy(AnyValue conf) {
@@ -97,45 +103,45 @@ public abstract class WebSocketNode {
if (this.localEngine == null) return;
//关掉所有本地本地WebSocket
this.localEngine.getLocalWebSockets().forEach(g -> g.close());
if (sncpNodeAddresses != null && localSncpAddress != null) {
sncpNodeAddresses.removeSetItem(SOURCE_SNCP_ADDRS_KEY, InetSocketAddress.class, localSncpAddress);
if (source != null && wsaddress != null) {
source.removeSetItem(SOURCE_SNCP_ADDRS_KEY, WebSocketAddress.class, this.wsaddress);
}
}
protected abstract CompletableFuture<List<String>> getWebSocketAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable userid);
protected abstract CompletableFuture<List<String>> getWebSocketAddresses(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, Serializable userid);
protected abstract CompletableFuture<Integer> sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last, Serializable... userids);
protected abstract CompletableFuture<Integer> sendMessage(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last, Serializable... userids);
protected abstract CompletableFuture<Integer> broadcastMessage(@RpcTargetAddress InetSocketAddress targetAddress, WebSocketRange wsrange, Object message, boolean last);
protected abstract CompletableFuture<Integer> broadcastMessage(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, WebSocketRange wsrange, Object message, boolean last);
protected abstract CompletableFuture<Integer> sendAction(@RpcTargetAddress InetSocketAddress targetAddress, WebSocketAction action, Serializable... userids);
protected abstract CompletableFuture<Integer> sendAction(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, WebSocketAction action, Serializable... userids);
protected abstract CompletableFuture<Integer> broadcastAction(@RpcTargetAddress InetSocketAddress targetAddress, WebSocketAction action);
protected abstract CompletableFuture<Integer> broadcastAction(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, WebSocketAction action);
protected abstract CompletableFuture<Void> connect(Serializable userid, InetSocketAddress sncpAddr);
protected abstract CompletableFuture<Void> connect(Serializable userid, WebSocketAddress wsaddr);
protected abstract CompletableFuture<Void> disconnect(Serializable userid, InetSocketAddress sncpAddr);
protected abstract CompletableFuture<Void> disconnect(Serializable userid, WebSocketAddress wsaddr);
protected abstract CompletableFuture<Void> changeUserid(Serializable fromuserid, Serializable touserid, InetSocketAddress sncpAddr);
protected abstract CompletableFuture<Void> changeUserid(Serializable fromuserid, Serializable touserid, WebSocketAddress wsaddr);
protected abstract CompletableFuture<Boolean> existsWebSocket(Serializable userid, @RpcTargetAddress InetSocketAddress targetAddress);
protected abstract CompletableFuture<Boolean> existsWebSocket(Serializable userid, @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress);
protected abstract CompletableFuture<Integer> forceCloseWebSocket(Serializable userid, @RpcTargetAddress InetSocketAddress targetAddress);
protected abstract CompletableFuture<Integer> forceCloseWebSocket(Serializable userid, @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress);
//--------------------------------------------------------------------------------
final CompletableFuture<Void> connect(final Serializable userid) {
if (logger.isLoggable(Level.FINEST)) logger.finest(localSncpAddress + " receive websocket connect event (" + userid + " on " + (this.localEngine == null ? null : this.localEngine.getEngineid()) + ").");
return connect(userid, localSncpAddress);
if (logger.isLoggable(Level.FINEST)) logger.finest(this.wsaddress + " receive websocket connect event (" + userid + " on " + (this.localEngine == null ? null : this.localEngine.getEngineid()) + ").");
return connect(userid, this.wsaddress);
}
final CompletableFuture<Void> disconnect(final Serializable userid) {
if (logger.isLoggable(Level.FINEST)) logger.finest(localSncpAddress + " receive websocket disconnect event (" + userid + " on " + (this.localEngine == null ? null : this.localEngine.getEngineid()) + ").");
return disconnect(userid, localSncpAddress);
if (logger.isLoggable(Level.FINEST)) logger.finest(this.wsaddress + " receive websocket disconnect event (" + userid + " on " + (this.localEngine == null ? null : this.localEngine.getEngineid()) + ").");
return disconnect(userid, this.wsaddress);
}
final CompletableFuture<Void> changeUserid(Serializable olduserid, final Serializable newuserid) {
if (logger.isLoggable(Level.FINEST)) logger.finest(localSncpAddress + " receive websocket changeUserid event (from " + olduserid + " to " + newuserid + " on " + (this.localEngine == null ? null : this.localEngine.getEngineid()) + ").");
return changeUserid(olduserid, newuserid, localSncpAddress);
if (logger.isLoggable(Level.FINEST)) logger.finest(this.wsaddress + " receive websocket changeUserid event (from " + olduserid + " to " + newuserid + " on " + (this.localEngine == null ? null : this.localEngine.getEngineid()) + ").");
return changeUserid(olduserid, newuserid, this.wsaddress);
}
public final String getName() {
@@ -147,15 +153,16 @@ public abstract class WebSocketNode {
* 获取目标地址 <br>
* 该方法仅供内部调用
*
* @param topic RpcTargetTopic
* @param targetAddress InetSocketAddress
* @param userid Serializable
*
* @return 客户端地址列表
*/
protected CompletableFuture<List<String>> remoteWebSocketAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable userid) {
protected CompletableFuture<List<String>> remoteWebSocketAddresses(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, Serializable userid) {
if (remoteNode == null) return CompletableFuture.completedFuture(null);
try {
return remoteNode.getWebSocketAddresses(targetAddress, userid);
return remoteNode.getWebSocketAddresses(topic, targetAddress, userid);
} catch (Exception e) {
logger.log(Level.WARNING, "remote " + targetAddress + " websocket getOnlineRemoteAddresses error", e);
return CompletableFuture.completedFuture(null);
@@ -164,21 +171,21 @@ public abstract class WebSocketNode {
/**
* 获取用户在线的SNCP节点地址列表不是分布式则返回元素数量为1且元素值为null的列表<br>
* InetSocketAddress 为 SNCP节点地址
* WebSocketAddress 为 SNCP节点地址
*
* @param userid Serializable
*
* @return 地址列表
*/
public CompletableFuture<Collection<InetSocketAddress>> getRpcNodeAddresses(final Serializable userid) {
if (this.sncpNodeAddresses != null) {
public CompletableFuture<Collection<WebSocketAddress>> getRpcNodeAddresses(final Serializable userid) {
if (this.source != null) {
tryAcquireSemaphore();
CompletableFuture<Collection<InetSocketAddress>> result = this.sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class);
CompletableFuture<Collection<WebSocketAddress>> result = this.source.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, WebSocketAddress.class);
if (semaphore != null) result.whenComplete((r, e) -> releaseSemaphore());
return result;
}
List<InetSocketAddress> rs = new ArrayList<>();
rs.add(this.localSncpAddress);
List<WebSocketAddress> rs = new ArrayList<>();
rs.add(this.wsaddress);
return CompletableFuture.completedFuture(rs);
}
@@ -191,14 +198,14 @@ public abstract class WebSocketNode {
*
* @return 地址集合
*/
public CompletableFuture<Map<InetSocketAddress, List<String>>> getRpcNodeWebSocketAddresses(final Serializable userid) {
CompletableFuture<Collection<InetSocketAddress>> sncpFuture = getRpcNodeAddresses(userid);
return sncpFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
public CompletableFuture<Map<WebSocketAddress, List<String>>> getRpcNodeWebSocketAddresses(final Serializable userid) {
CompletableFuture<Collection<WebSocketAddress>> sncpFuture = getRpcNodeAddresses(userid);
return sncpFuture.thenCompose((Collection<WebSocketAddress> addrs) -> {
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket found userid:" + userid + " on " + addrs);
if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(new HashMap<>());
CompletableFuture<Map<InetSocketAddress, List<String>>> future = null;
for (final InetSocketAddress nodeAddress : addrs) {
CompletableFuture<Map<InetSocketAddress, List<String>>> mapFuture = getWebSocketAddresses(nodeAddress, userid)
CompletableFuture<Map<WebSocketAddress, List<String>>> future = null;
for (final WebSocketAddress nodeAddress : addrs) {
CompletableFuture<Map<WebSocketAddress, List<String>>> mapFuture = getWebSocketAddresses(nodeAddress.getTopic(), nodeAddress.getAddr(), userid)
.thenCompose((List<String> list) -> CompletableFuture.completedFuture(Utility.ofMap(nodeAddress, list)));
future = future == null ? mapFuture : future.thenCombine(mapFuture, (a, b) -> Utility.merge(a, b));
}
@@ -213,11 +220,11 @@ public abstract class WebSocketNode {
* @return boolean
*/
public CompletableFuture<Integer> getUserSize() {
if (this.localEngine != null && this.sncpNodeAddresses == null) {
if (this.localEngine != null && this.source == null) {
return CompletableFuture.completedFuture(this.localEngine.getLocalUserSize());
}
tryAcquireSemaphore();
CompletableFuture<Integer> rs = this.sncpNodeAddresses.queryKeysStartsWithAsync(SOURCE_SNCP_USERID_PREFIX).thenApply(v -> v.size());
CompletableFuture<Integer> rs = this.source.queryKeysStartsWithAsync(SOURCE_SNCP_USERID_PREFIX).thenApply(v -> v.size());
if (semaphore != null) rs.whenComplete((r, e) -> releaseSemaphore());
return rs;
}
@@ -229,11 +236,11 @@ public abstract class WebSocketNode {
* @return boolean
*/
public CompletableFuture<Set<String>> getUserSet() {
if (this.localEngine != null && this.sncpNodeAddresses == null) {
if (this.localEngine != null && this.source == null) {
return CompletableFuture.completedFuture(new LinkedHashSet<>(this.localEngine.getLocalUserSet().stream().map(x -> String.valueOf(x)).collect(Collectors.toList())));
}
tryAcquireSemaphore();
CompletableFuture<Set<String>> rs = this.sncpNodeAddresses.queryKeysStartsWithAsync(SOURCE_SNCP_USERID_PREFIX).thenApply(v -> new LinkedHashSet<>(v.stream().map(x -> x.substring(SOURCE_SNCP_USERID_PREFIX.length())).collect(Collectors.toList())));
CompletableFuture<Set<String>> rs = this.source.queryKeysStartsWithAsync(SOURCE_SNCP_USERID_PREFIX).thenApply(v -> new LinkedHashSet<>(v.stream().map(x -> x.substring(SOURCE_SNCP_USERID_PREFIX.length())).collect(Collectors.toList())));
if (semaphore != null) rs.whenComplete((r, e) -> releaseSemaphore());
return rs;
}
@@ -250,23 +257,23 @@ public abstract class WebSocketNode {
if (userid instanceof WebSocketUserAddress) return existsWebSocket((WebSocketUserAddress) userid);
CompletableFuture<Boolean> localFuture = null;
if (this.localEngine != null) localFuture = CompletableFuture.completedFuture(localEngine.existsLocalWebSocket(userid));
if (this.sncpNodeAddresses == null || this.remoteNode == null) {
if (this.source == null || this.remoteNode == null) {
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket remote node is null");
//没有CacheSource就不会有分布式节点
return localFuture;
}
//远程节点关闭
tryAcquireSemaphore();
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class);
CompletableFuture<Collection<WebSocketAddress>> addrsFuture = source.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, WebSocketAddress.class);
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
CompletableFuture<Boolean> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
CompletableFuture<Boolean> remoteFuture = addrsFuture.thenCompose((Collection<WebSocketAddress> addrs) -> {
//if (logger.isLoggable(Level.FINEST)) logger.finest("websocket found userid:" + userid + " on " + addrs);
if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(false);
CompletableFuture<Boolean> future = null;
for (InetSocketAddress addr : addrs) {
if (addr == null || addr.equals(localSncpAddress)) continue;
future = future == null ? remoteNode.existsWebSocket(userid, addr)
: future.thenCombine(remoteNode.existsWebSocket(userid, addr), (a, b) -> a | b);
for (WebSocketAddress addr : addrs) {
if (addr == null || addr.equals(wsaddress)) continue;
future = future == null ? remoteNode.existsWebSocket(userid, addr.getTopic(), addr.getAddr())
: future.thenCombine(remoteNode.existsWebSocket(userid, addr.getTopic(), addr.getAddr()), (a, b) -> a | b);
}
return future == null ? CompletableFuture.completedFuture(false) : future;
});
@@ -284,23 +291,23 @@ public abstract class WebSocketNode {
public CompletableFuture<Boolean> existsWebSocket(final WebSocketUserAddress userAddress) {
CompletableFuture<Boolean> localFuture = null;
if (this.localEngine != null) localFuture = CompletableFuture.completedFuture(localEngine.existsLocalWebSocket(userAddress.userid()));
if (this.sncpNodeAddresses == null || this.remoteNode == null) {
if (this.source == null || this.remoteNode == null) {
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket remote node is null");
//没有CacheSource就不会有分布式节点
return localFuture;
}
Collection<InetSocketAddress> addrs = userAddress.sncpAddresses();
Collection<WebSocketAddress> addrs = userAddress.addresses();
if (addrs != null) addrs = new ArrayList<>(addrs); //不能修改参数内部值
if (userAddress.sncpAddress() != null) {
if (userAddress.address() != null) {
if (addrs == null) addrs = new ArrayList<>();
addrs.add(userAddress.sncpAddress());
addrs.add(userAddress.address());
}
if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(false);
CompletableFuture<Boolean> future = null;
for (InetSocketAddress addr : addrs) {
if (addr == null || addr.equals(localSncpAddress)) continue;
future = future == null ? remoteNode.existsWebSocket(userAddress.userid(), addr)
: future.thenCombine(remoteNode.existsWebSocket(userAddress.userid(), addr), (a, b) -> a | b);
for (WebSocketAddress addr : addrs) {
if (addr == null || addr.equals(wsaddress)) continue;
future = future == null ? remoteNode.existsWebSocket(userAddress.userid(), addr.getTopic(), addr.getAddr())
: future.thenCombine(remoteNode.existsWebSocket(userAddress.userid(), addr.getTopic(), addr.getAddr()), (a, b) -> a | b);
}
return future == null ? CompletableFuture.completedFuture(false) : future;
}
@@ -332,35 +339,35 @@ public abstract class WebSocketNode {
private CompletableFuture<Integer> forceCloseWebSocket(final Serializable userid, final WebSocketUserAddress userAddress) {
CompletableFuture<Integer> localFuture = null;
if (this.localEngine != null) localFuture = CompletableFuture.completedFuture(localEngine.forceCloseLocalWebSocket(userAddress == null ? userid : userAddress.userid()));
if (this.sncpNodeAddresses == null || this.remoteNode == null) {
if (this.source == null || this.remoteNode == null) {
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket remote node is null");
//没有CacheSource就不会有分布式节点
return localFuture;
}
//远程节点关闭
CompletableFuture<Collection<InetSocketAddress>> addrsFuture;
CompletableFuture<Collection<WebSocketAddress>> addrsFuture;
if (userAddress == null) {
tryAcquireSemaphore();
addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class);
addrsFuture = source.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, WebSocketAddress.class);
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
} else {
Collection<InetSocketAddress> addrs = userAddress.sncpAddresses();
Collection<WebSocketAddress> addrs = userAddress.addresses();
if (addrs != null) addrs = new ArrayList<>(addrs); //不能修改参数内部值
if (userAddress.sncpAddress() != null) {
if (userAddress.address() != null) {
if (addrs == null) addrs = new ArrayList<>();
addrs.add(userAddress.sncpAddress());
addrs.add(userAddress.address());
}
if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0);
addrsFuture = CompletableFuture.completedFuture(addrs);
}
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<WebSocketAddress> addrs) -> {
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket found userid:" + userid + " on " + addrs);
if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0);
CompletableFuture<Integer> future = null;
for (InetSocketAddress addr : addrs) {
if (addr == null || addr.equals(localSncpAddress)) continue;
future = future == null ? remoteNode.forceCloseWebSocket(userid, addr)
: future.thenCombine(remoteNode.forceCloseWebSocket(userid, addr), (a, b) -> a + b);
for (WebSocketAddress addr : addrs) {
if (addr == null || addr.equals(wsaddress)) continue;
future = future == null ? remoteNode.forceCloseWebSocket(userid, addr.getTopic(), addr.getAddr())
: future.thenCombine(remoteNode.forceCloseWebSocket(userid, addr.getTopic(), addr.getAddr()), (a, b) -> a + b);
}
return future == null ? CompletableFuture.completedFuture(0) : future;
});
@@ -511,7 +518,7 @@ public abstract class WebSocketNode {
}
if (message0 instanceof CompletableFuture) return ((CompletableFuture) message0).thenApply(msg -> sendMessage(convert, msg, last, userids));
final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0 : ((convert instanceof TextConvert) ? new WebSocketPacket(((TextConvert) convert).convertTo(message0), last) : new WebSocketPacket(((BinaryConvert) convert).convertTo(message0), last));
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式
if (this.localEngine != null && this.source == null) { //本地模式且没有分布式
return this.localEngine.sendLocalMessage(message, last, userids);
}
final Object remoteMessage = formatRemoteMessage(message);
@@ -526,16 +533,16 @@ public abstract class WebSocketNode {
keyuser.put(keys[i], userids[i]);
}
tryAcquireSemaphore();
CompletableFuture<Map<String, Collection<InetSocketAddress>>> addrsFuture = sncpNodeAddresses.getCollectionMapAsync(true, InetSocketAddress.class, keys);
CompletableFuture<Map<String, Collection<WebSocketAddress>>> addrsFuture = source.getCollectionMapAsync(true, WebSocketAddress.class, keys);
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
rsfuture = addrsFuture.thenCompose((Map<String, Collection<InetSocketAddress>> addrs) -> {
rsfuture = addrsFuture.thenCompose((Map<String, Collection<WebSocketAddress>> addrs) -> {
if (addrs == null || addrs.isEmpty()) {
if (logger.isLoggable(Level.FINER)) logger.finer("websocket not found userids:" + JsonConvert.root().convertTo(userids) + " on any node ");
return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
}
Map<InetSocketAddress, List<Serializable>> addrUsers = new HashMap<>();
Map<WebSocketAddress, List<Serializable>> addrUsers = new HashMap<>();
addrs.forEach((key, as) -> {
for (InetSocketAddress a : as) {
for (WebSocketAddress a : as) {
addrUsers.computeIfAbsent(a, k -> new ArrayList<>()).add(keyuser.get(key));
}
});
@@ -543,7 +550,7 @@ public abstract class WebSocketNode {
logger.finest("websocket(localaddr=" + localSncpAddress + ", userids=" + JsonConvert.root().convertTo(userids) + ") found message-addr-userids: " + addrUsers);
}
CompletableFuture<Integer> future = null;
for (Map.Entry<InetSocketAddress, List<Serializable>> en : addrUsers.entrySet()) {
for (Map.Entry<WebSocketAddress, List<Serializable>> en : addrUsers.entrySet()) {
Serializable[] oneaddrUserids = en.getValue().toArray(new Serializable[en.getValue().size()]);
future = future == null ? sendOneAddrMessage(en.getKey(), remoteMessage, last, oneaddrUserids)
: future.thenCombine(sendOneAddrMessage(en.getKey(), remoteMessage, last, oneaddrUserids), (a, b) -> a | b);
@@ -570,17 +577,17 @@ public abstract class WebSocketNode {
if (useraddrs == null || useraddrs.length < 1) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
if (message0 instanceof CompletableFuture) return ((CompletableFuture) message0).thenApply(msg -> sendMessage(convert, msg, last, useraddrs));
final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0 : ((convert instanceof TextConvert) ? new WebSocketPacket(((TextConvert) convert).convertTo(message0), last) : new WebSocketPacket(((BinaryConvert) convert).convertTo(message0), last));
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式
if (this.localEngine != null && this.source == null) { //本地模式且没有分布式
return this.localEngine.sendLocalMessage(message, last, userAddressToUserids(useraddrs));
}
final Object remoteMessage = formatRemoteMessage(message);
final Map<InetSocketAddress, List<Serializable>> addrUsers = userAddressToAddrMap(useraddrs);
final Map<WebSocketAddress, List<Serializable>> addrUsers = userAddressToAddrMap(useraddrs);
if (logger.isLoggable(Level.FINEST)) {
logger.finest("websocket(localaddr=" + localSncpAddress + ", useraddrs=" + JsonConvert.root().convertTo(useraddrs) + ") found message-addr-userids: " + addrUsers);
}
CompletableFuture<Integer> future = null;
for (Map.Entry<InetSocketAddress, List<Serializable>> en : addrUsers.entrySet()) {
for (Map.Entry<WebSocketAddress, List<Serializable>> en : addrUsers.entrySet()) {
Serializable[] oneaddrUserids = en.getValue().toArray(new Serializable[en.getValue().size()]);
future = future == null ? sendOneAddrMessage(en.getKey(), remoteMessage, last, oneaddrUserids)
: future.thenCombine(sendOneAddrMessage(en.getKey(), remoteMessage, last, oneaddrUserids), (a, b) -> a | b);
@@ -596,7 +603,7 @@ public abstract class WebSocketNode {
}
CompletableFuture<Integer> localFuture = null;
if (this.localEngine != null) localFuture = localEngine.sendLocalMessage(message, last, userid);
if (this.sncpNodeAddresses == null || this.remoteNode == null) {
if (this.source == null || this.remoteNode == null) {
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket remote node is null");
//没有CacheSource就不会有分布式节点
return localFuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localFuture;
@@ -604,40 +611,40 @@ public abstract class WebSocketNode {
//远程节点发送消息
final Object remoteMessage = formatRemoteMessage(message);
tryAcquireSemaphore();
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class);
CompletableFuture<Collection<WebSocketAddress>> addrsFuture = source.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, WebSocketAddress.class);
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<WebSocketAddress> addrs) -> {
if (addrs == null || addrs.isEmpty()) {
if (logger.isLoggable(Level.FINER)) logger.finer("websocket not found userid:" + userid + " on any node ");
return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
}
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket(localaddr=" + localSncpAddress + ") found userid:" + userid + " on " + addrs);
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket(localaddr=" + wsaddress + ") found userid:" + userid + " on " + addrs);
CompletableFuture<Integer> future = null;
for (InetSocketAddress addr : addrs) {
if (addr == null || addr.equals(localSncpAddress)) continue;
future = future == null ? remoteNode.sendMessage(addr, remoteMessage, last, userid)
: future.thenCombine(remoteNode.sendMessage(addr, remoteMessage, last, userid), (a, b) -> a | b);
for (WebSocketAddress addr : addrs) {
if (addr == null || addr.equals(wsaddress)) continue;
future = future == null ? remoteNode.sendMessage(addr.getTopic(), addr.getAddr(), remoteMessage, last, userid)
: future.thenCombine(remoteNode.sendMessage(addr.getTopic(), addr.getAddr(), remoteMessage, last, userid), (a, b) -> a | b);
}
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
});
return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b);
}
protected CompletableFuture<Integer> sendOneAddrMessage(final InetSocketAddress sncpAddr, final Object message, final boolean last, final Serializable... userids) {
if (message instanceof CompletableFuture) return ((CompletableFuture) message).thenApply(msg -> sendOneAddrMessage(sncpAddr, msg, last, userids));
protected CompletableFuture<Integer> sendOneAddrMessage(final WebSocketAddress addr, final Object message, final boolean last, final Serializable... userids) {
if (message instanceof CompletableFuture) return ((CompletableFuture) message).thenApply(msg -> sendOneAddrMessage(addr, msg, last, userids));
if (logger.isLoggable(Level.FINEST) && this.localEngine == null) { //只打印远程模式的
logger.finest("websocket want send message {userids:" + JsonConvert.root().convertTo(userids) + ", sncpaddr:" + sncpAddr + ", content:" + (message instanceof WebSocketPacket ? ((WebSocketPacket) message).toSimpleString() : (message instanceof CharSequence ? message : JsonConvert.root().convertTo(message))) + "} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine");
logger.finest("websocket want send message {userids:" + JsonConvert.root().convertTo(userids) + ", sncpaddr:" + addr + ", content:" + (message instanceof WebSocketPacket ? ((WebSocketPacket) message).toSimpleString() : (message instanceof CharSequence ? message : JsonConvert.root().convertTo(message))) + "} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine");
}
if (Objects.equals(sncpAddr, this.localSncpAddress)) {
if (Objects.equals(addr, this.wsaddress)) {
return this.localEngine == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localEngine.sendLocalMessage(message, last, userids);
}
if (this.sncpNodeAddresses == null || this.remoteNode == null) {
if (this.source == null || this.remoteNode == null) {
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket remote node is null");
//没有CacheSource就不会有分布式节点
return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
}
final Object remoteMessage = formatRemoteMessage(message);
return remoteNode.sendMessage(sncpAddr, remoteMessage, last, userids);
return remoteNode.sendMessage(addr.getTopic(), addr.getAddr(), remoteMessage, last, userids);
}
protected Serializable[] userAddressToUserids(WebSocketUserAddress... useraddrs) {
@@ -649,14 +656,14 @@ public abstract class WebSocketNode {
return set.toArray(new Serializable[set.size()]);
}
protected Map<InetSocketAddress, List<Serializable>> userAddressToAddrMap(WebSocketUserAddress... useraddrs) {
final Map<InetSocketAddress, List<Serializable>> addrUsers = new HashMap<>();
protected Map<WebSocketAddress, List<Serializable>> userAddressToAddrMap(WebSocketUserAddress... useraddrs) {
final Map<WebSocketAddress, List<Serializable>> addrUsers = new HashMap<>();
for (WebSocketUserAddress userAddress : useraddrs) {
if (userAddress.sncpAddress() != null) {
addrUsers.computeIfAbsent(userAddress.sncpAddress(), k -> new ArrayList<>()).add(userAddress.userid());
if (userAddress.address() != null) {
addrUsers.computeIfAbsent(userAddress.address(), k -> new ArrayList<>()).add(userAddress.userid());
}
if (userAddress.sncpAddresses() != null) {
for (InetSocketAddress addr : userAddress.sncpAddresses()) {
if (userAddress.addresses() != null) {
for (WebSocketAddress addr : userAddress.addresses()) {
if (addr != null) {
addrUsers.computeIfAbsent(addr, k -> new ArrayList<>()).add(userAddress.userid());
}
@@ -773,22 +780,22 @@ public abstract class WebSocketNode {
public CompletableFuture<Integer> broadcastMessage(final WebSocketRange wsrange, final Convert convert, final Object message0, final boolean last) {
if (message0 instanceof CompletableFuture) return ((CompletableFuture) message0).thenApply(msg -> broadcastMessage(wsrange, convert, msg, last));
final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0 : ((convert instanceof TextConvert) ? new WebSocketPacket(((TextConvert) convert).convertTo(message0), last) : new WebSocketPacket(((BinaryConvert) convert).convertTo(message0), last));
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式
if (this.localEngine != null && this.source == null) { //本地模式且没有分布式
return this.localEngine.broadcastLocalMessage(wsrange, message, last);
}
final Object remoteMessage = formatRemoteMessage(message);
CompletableFuture<Integer> localFuture = this.localEngine == null ? null : this.localEngine.broadcastLocalMessage(wsrange, message, last);
tryAcquireSemaphore();
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_ADDRS_KEY, InetSocketAddress.class);
CompletableFuture<Collection<WebSocketAddress>> addrsFuture = source.getCollectionAsync(SOURCE_SNCP_ADDRS_KEY, WebSocketAddress.class);
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<WebSocketAddress> addrs) -> {
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket broadcast message (" + remoteMessage + ") on " + addrs);
if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0);
CompletableFuture<Integer> future = null;
for (InetSocketAddress addr : addrs) {
if (addr == null || addr.equals(localSncpAddress)) continue;
future = future == null ? remoteNode.broadcastMessage(addr, wsrange, remoteMessage, last)
: future.thenCombine(remoteNode.broadcastMessage(addr, wsrange, remoteMessage, last), (a, b) -> a | b);
for (WebSocketAddress addr : addrs) {
if (addr == null || addr.equals(wsaddress)) continue;
future = future == null ? remoteNode.broadcastMessage(addr.getTopic(), addr.getAddr(), wsrange, remoteMessage, last)
: future.thenCombine(remoteNode.broadcastMessage(addr.getTopic(), addr.getAddr(), wsrange, remoteMessage, last), (a, b) -> a | b);
}
return future == null ? CompletableFuture.completedFuture(0) : future;
});
@@ -804,21 +811,21 @@ public abstract class WebSocketNode {
*/
@Local
public CompletableFuture<Integer> broadcastAction(final WebSocketAction action) {
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式
if (this.localEngine != null && this.source == null) { //本地模式且没有分布式
return this.localEngine.broadcastLocalAction(action);
}
CompletableFuture<Integer> localFuture = this.localEngine == null ? null : this.localEngine.broadcastLocalAction(action);
tryAcquireSemaphore();
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_ADDRS_KEY, InetSocketAddress.class);
CompletableFuture<Collection<WebSocketAddress>> addrsFuture = source.getCollectionAsync(SOURCE_SNCP_ADDRS_KEY, WebSocketAddress.class);
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<WebSocketAddress> addrs) -> {
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket broadcast action (" + action + ") on " + addrs);
if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0);
CompletableFuture<Integer> future = null;
for (InetSocketAddress addr : addrs) {
if (addr == null || addr.equals(localSncpAddress)) continue;
future = future == null ? remoteNode.broadcastAction(addr, action)
: future.thenCombine(remoteNode.broadcastAction(addr, action), (a, b) -> a | b);
for (WebSocketAddress addr : addrs) {
if (addr == null || addr.equals(wsaddress)) continue;
future = future == null ? remoteNode.broadcastAction(addr.getTopic(), addr.getAddr(), action)
: future.thenCombine(remoteNode.broadcastAction(addr.getTopic(), addr.getAddr(), action), (a, b) -> a | b);
}
return future == null ? CompletableFuture.completedFuture(0) : future;
});
@@ -844,7 +851,7 @@ public abstract class WebSocketNode {
}
return sendAction(action, useraddrs);
}
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式
if (this.localEngine != null && this.source == null) { //本地模式且没有分布式
return this.localEngine.sendLocalAction(action, userids);
}
CompletableFuture<Integer> rsfuture;
@@ -858,16 +865,16 @@ public abstract class WebSocketNode {
keyuser.put(keys[i], userids[i]);
}
tryAcquireSemaphore();
CompletableFuture<Map<String, Collection<InetSocketAddress>>> addrsFuture = sncpNodeAddresses.getCollectionMapAsync(true, InetSocketAddress.class, keys);
CompletableFuture<Map<String, Collection<WebSocketAddress>>> addrsFuture = source.getCollectionMapAsync(true, WebSocketAddress.class, keys);
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
rsfuture = addrsFuture.thenCompose((Map<String, Collection<InetSocketAddress>> addrs) -> {
rsfuture = addrsFuture.thenCompose((Map<String, Collection<WebSocketAddress>> addrs) -> {
if (addrs == null || addrs.isEmpty()) {
if (logger.isLoggable(Level.FINER)) logger.finer("websocket not found userids:" + JsonConvert.root().convertTo(userids) + " on any node ");
return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
}
Map<InetSocketAddress, List<Serializable>> addrUsers = new HashMap<>();
Map<WebSocketAddress, List<Serializable>> addrUsers = new HashMap<>();
addrs.forEach((key, as) -> {
for (InetSocketAddress a : as) {
for (WebSocketAddress a : as) {
addrUsers.computeIfAbsent(a, k -> new ArrayList<>()).add(keyuser.get(key));
}
});
@@ -875,7 +882,7 @@ public abstract class WebSocketNode {
logger.finest("websocket(localaddr=" + localSncpAddress + ", userids=" + JsonConvert.root().convertTo(userids) + ") found action-userid-addrs: " + addrUsers);
}
CompletableFuture<Integer> future = null;
for (Map.Entry<InetSocketAddress, List<Serializable>> en : addrUsers.entrySet()) {
for (Map.Entry<WebSocketAddress, List<Serializable>> en : addrUsers.entrySet()) {
Serializable[] oneaddrUserids = en.getValue().toArray(new Serializable[en.getValue().size()]);
future = future == null ? sendOneAddrAction(en.getKey(), action, oneaddrUserids)
: future.thenCombine(sendOneAddrAction(en.getKey(), action, oneaddrUserids), (a, b) -> a | b);
@@ -898,16 +905,16 @@ public abstract class WebSocketNode {
@Local
public CompletableFuture<Integer> sendAction(final WebSocketAction action, final WebSocketUserAddress... useraddrs) {
if (useraddrs == null || useraddrs.length < 1) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式
if (this.localEngine != null && this.source == null) { //本地模式且没有分布式
return this.localEngine.sendLocalAction(action, userAddressToUserids(useraddrs));
}
final Map<InetSocketAddress, List<Serializable>> addrUsers = userAddressToAddrMap(useraddrs);
final Map<WebSocketAddress, List<Serializable>> addrUsers = userAddressToAddrMap(useraddrs);
if (logger.isLoggable(Level.FINEST)) {
logger.finest("websocket(localaddr=" + localSncpAddress + ", useraddrs=" + JsonConvert.root().convertTo(useraddrs) + ") found action-userid-addrs: " + addrUsers);
}
CompletableFuture<Integer> future = null;
for (Map.Entry<InetSocketAddress, List<Serializable>> en : addrUsers.entrySet()) {
for (Map.Entry<WebSocketAddress, List<Serializable>> en : addrUsers.entrySet()) {
Serializable[] oneaddrUserids = en.getValue().toArray(new Serializable[en.getValue().size()]);
future = future == null ? sendOneAddrAction(en.getKey(), action, oneaddrUserids)
: future.thenCombine(sendOneAddrAction(en.getKey(), action, oneaddrUserids), (a, b) -> a | b);
@@ -921,45 +928,45 @@ public abstract class WebSocketNode {
}
CompletableFuture<Integer> localFuture = null;
if (this.localEngine != null) localFuture = localEngine.sendLocalAction(action, userid);
if (this.sncpNodeAddresses == null || this.remoteNode == null) {
if (this.source == null || this.remoteNode == null) {
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket remote node is null");
//没有CacheSource就不会有分布式节点
return localFuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localFuture;
}
//远程节点发送操作
tryAcquireSemaphore();
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class);
CompletableFuture<Collection<WebSocketAddress>> addrsFuture = source.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, WebSocketAddress.class);
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<WebSocketAddress> addrs) -> {
if (addrs == null || addrs.isEmpty()) {
if (logger.isLoggable(Level.FINER)) logger.finer("websocket not found userid:" + userid + " on any node ");
return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
}
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket(localaddr=" + localSncpAddress + ") found userid:" + userid + " on " + addrs);
CompletableFuture<Integer> future = null;
for (InetSocketAddress addr : addrs) {
if (addr == null || addr.equals(localSncpAddress)) continue;
future = future == null ? remoteNode.sendAction(addr, action, userid)
: future.thenCombine(remoteNode.sendAction(addr, action, userid), (a, b) -> a | b);
for (WebSocketAddress addr : addrs) {
if (addr == null || addr.equals(wsaddress)) continue;
future = future == null ? remoteNode.sendAction(addr.getTopic(), addr.getAddr(), action, userid)
: future.thenCombine(remoteNode.sendAction(addr.getTopic(), addr.getAddr(), action, userid), (a, b) -> a | b);
}
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
});
return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b);
}
protected CompletableFuture<Integer> sendOneAddrAction(final InetSocketAddress sncpAddr, final WebSocketAction action, final Serializable... userids) {
protected CompletableFuture<Integer> sendOneAddrAction(final WebSocketAddress addr, final WebSocketAction action, final Serializable... userids) {
if (logger.isLoggable(Level.FINEST) && this.localEngine == null) { //只打印远程模式的
logger.finest("websocket want send action {userids:" + JsonConvert.root().convertTo(userids) + ", sncpaddr:" + sncpAddr + ", action:" + action + " from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine");
logger.finest("websocket want send action {userids:" + JsonConvert.root().convertTo(userids) + ", sncpaddr:" + addr + ", action:" + action + " from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine");
}
if (Objects.equals(sncpAddr, this.localSncpAddress)) {
if (Objects.equals(addr, this.wsaddress)) {
return this.localEngine == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localEngine.sendLocalAction(action, userids);
}
if (this.sncpNodeAddresses == null || this.remoteNode == null) {
if (this.source == null || this.remoteNode == null) {
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket remote node is null");
//没有CacheSource就不会有分布式节点
return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
}
return remoteNode.sendAction(sncpAddr, action, userids);
return remoteNode.sendAction(addr.getTopic(), addr.getAddr(), action, userids);
}
protected Object formatRemoteMessage(Object message) {

View File

@@ -20,76 +20,57 @@ public interface WebSocketUserAddress extends Serializable {
Serializable userid();
String mqtopic();
WebSocketAddress address();
Collection<String> mqtopics();
InetSocketAddress sncpAddress();
Collection<InetSocketAddress> sncpAddresses();
Collection<WebSocketAddress> addresses();
public static WebSocketUserAddress create(WebSocketUserAddress userAddress) {
return new SimpleWebSocketUserAddress(userAddress);
}
public static WebSocketUserAddress createTopic(Serializable userid, String mqtopic) {
return new SimpleWebSocketUserAddress(userid, mqtopic, null, null, null);
public static WebSocketUserAddress createTopic(Serializable userid, String mqtopic, InetSocketAddress sncpAddress) {
return new SimpleWebSocketUserAddress(userid, mqtopic, sncpAddress);
}
public static WebSocketUserAddress createTopic(Serializable userid, Collection<String> mqtopics) {
return new SimpleWebSocketUserAddress(userid, null, mqtopics, null, null);
public static WebSocketUserAddress create(Serializable userid, WebSocketAddress address) {
return new SimpleWebSocketUserAddress(userid, address);
}
public static WebSocketUserAddress create(Serializable userid, InetSocketAddress sncpAddress) {
return new SimpleWebSocketUserAddress(userid, null, null, sncpAddress, null);
}
public static WebSocketUserAddress create(Serializable userid, Collection<InetSocketAddress> sncpAddresses) {
return new SimpleWebSocketUserAddress(userid, null, null, null, sncpAddresses);
public static WebSocketUserAddress create(Serializable userid, Collection<WebSocketAddress> addresses) {
return new SimpleWebSocketUserAddress(userid, addresses);
}
public static class SimpleWebSocketUserAddress implements WebSocketUserAddress {
private Serializable userid;
private String mqtopic;
private WebSocketAddress address;
private Collection<String> mqtopics;
private InetSocketAddress sncpAddress;
private Collection<InetSocketAddress> sncpAddresses;
private Collection<WebSocketAddress> addresses;
public SimpleWebSocketUserAddress() {
}
public SimpleWebSocketUserAddress(Serializable userid, String mqtopic, InetSocketAddress sncpAddress) {
this.userid = userid;
this.mqtopic = mqtopic;
this.sncpAddress = sncpAddress;
this.address = new WebSocketAddress(mqtopic, sncpAddress);
}
public SimpleWebSocketUserAddress(Serializable userid, Collection<String> mqtopics, Collection<InetSocketAddress> sncpAddresses) {
public SimpleWebSocketUserAddress(Serializable userid, WebSocketAddress address) {
this.userid = userid;
this.mqtopics = mqtopics;
this.sncpAddresses = sncpAddresses;
this.address = address;
}
public SimpleWebSocketUserAddress(Serializable userid, String mqtopic, Collection<String> mqtopics, InetSocketAddress sncpAddress, Collection<InetSocketAddress> sncpAddresses) {
public SimpleWebSocketUserAddress(Serializable userid, Collection<WebSocketAddress> addresses) {
this.userid = userid;
this.mqtopic = mqtopic;
this.mqtopics = mqtopics;
this.sncpAddress = sncpAddress;
this.sncpAddresses = sncpAddresses;
this.addresses = addresses;
}
public SimpleWebSocketUserAddress(WebSocketUserAddress userAddress) {
if (userAddress == null) return;
this.userid = userAddress.userid();
this.mqtopic = userAddress.mqtopic();
this.mqtopics = userAddress.mqtopics();
this.sncpAddress = userAddress.sncpAddress();
this.sncpAddresses = userAddress.sncpAddresses();
this.address = userAddress.address();
this.addresses = userAddress.addresses();
}
@Override
@@ -98,23 +79,13 @@ public interface WebSocketUserAddress extends Serializable {
}
@Override
public String mqtopic() {
return mqtopic;
public WebSocketAddress address() {
return address;
}
@Override
public Collection<String> mqtopics() {
return mqtopics;
}
@Override
public InetSocketAddress sncpAddress() {
return sncpAddress;
}
@Override
public Collection<InetSocketAddress> sncpAddresses() {
return sncpAddresses;
public Collection<WebSocketAddress> addresses() {
return addresses;
}
public Serializable getUserid() {
@@ -125,36 +96,20 @@ public interface WebSocketUserAddress extends Serializable {
this.userid = userid;
}
public String getMqtopic() {
return mqtopic;
public WebSocketAddress getAddress() {
return address;
}
public void setMqtopic(String mqtopic) {
this.mqtopic = mqtopic;
public void setAddress(WebSocketAddress address) {
this.address = address;
}
public Collection<String> getMqtopics() {
return mqtopics;
public Collection<WebSocketAddress> getAddresses() {
return addresses;
}
public void setMqtopics(Collection<String> mqtopics) {
this.mqtopics = mqtopics;
}
public InetSocketAddress getSncpAddress() {
return sncpAddress;
}
public void setSncpAddress(InetSocketAddress sncpAddress) {
this.sncpAddress = sncpAddress;
}
public Collection<InetSocketAddress> getSncpAddresses() {
return sncpAddresses;
}
public void setSncpAddresses(Collection<InetSocketAddress> sncpAddresses) {
this.sncpAddresses = sncpAddresses;
public void setAddresses(Collection<WebSocketAddress> addresses) {
this.addresses = addresses;
}
@Override

View File

@@ -281,7 +281,9 @@ public final class SncpClient {
if (messageAgent != null) { //MQ模式
final byte[] reqbytes = writer.toArray();
fillHeader(ByteBuffer.wrap(reqbytes), seqid, actionid, reqBodyLength);
MessageRecord message = new MessageRecord(ConvertType.BSON, this.topic, null, reqbytes);
String targetTopic = action.topicTargetParamIndex >= 0 ? (String) params[action.topicTargetParamIndex] : this.topic;
if (targetTopic == null) targetTopic = this.topic;
MessageRecord message = new MessageRecord(ConvertType.BSON, targetTopic, null, reqbytes);
return messageAgent.sendRemoteSncp(null, message).thenApply(msg -> {
ByteBuffer buffer = ByteBuffer.wrap(msg.getContent());
checkResult(seqid, action, buffer);
@@ -511,6 +513,8 @@ public final class SncpClient {
protected final int addressSourceParamIndex;
protected final int topicTargetParamIndex;
protected final boolean boolReturnTypeFuture; // 返回结果类型是否为 CompletableFuture
protected final Creator<? extends CompletableFuture> futureCreator;
@@ -526,6 +530,7 @@ public final class SncpClient {
this.paramClass = method.getParameterTypes();
this.method = method;
Annotation[][] anns = method.getParameterAnnotations();
int tpoicAddrIndex = -1;
int targetAddrIndex = -1;
int sourceAddrIndex = -1;
int handlerAttachIndex = -1;
@@ -559,6 +564,8 @@ public final class SncpClient {
targetAddrIndex = i;
} else if (ann.annotationType() == RpcSourceAddress.class && SocketAddress.class.isAssignableFrom(params[i])) {
sourceAddrIndex = i;
} else if (ann.annotationType() == RpcTargetTopic.class && String.class.isAssignableFrom(params[i])) {
tpoicAddrIndex = i;
}
}
for (Annotation ann : anns[i]) {
@@ -575,6 +582,7 @@ public final class SncpClient {
}
}
}
this.topicTargetParamIndex = tpoicAddrIndex;
this.addressTargetParamIndex = targetAddrIndex;
this.addressSourceParamIndex = sourceAddrIndex;
this.handlerFuncParamIndex = handlerFuncIndex;

View File

@@ -0,0 +1,28 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.service;
import java.lang.annotation.*;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
/**
* SNCP协议中标记为目标topic参数, 该注解只能标记在类型为String的参数上。
*
*
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.1.0
*/
@Inherited
@Documented
@Target({PARAMETER})
@Retention(RUNTIME)
public @interface RpcTargetTopic {
}

View File

@@ -41,8 +41,8 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
}
@Override
public CompletableFuture<List<String>> getWebSocketAddresses(final @RpcTargetAddress InetSocketAddress targetAddress, final Serializable groupid) {
if (localSncpAddress == null || !localSncpAddress.equals(targetAddress)) return remoteWebSocketAddresses(targetAddress, groupid);
public CompletableFuture<List<String>> getWebSocketAddresses(@RpcTargetTopic String topic, final @RpcTargetAddress InetSocketAddress targetAddress, final Serializable groupid) {
if ((topic == null || !topic.equals(wsaddress.getTopic())) && (localSncpAddress == null || !localSncpAddress.equals(targetAddress))) return remoteWebSocketAddresses(topic, targetAddress, groupid);
if (this.localEngine == null) return CompletableFuture.completedFuture(new ArrayList<>());
ExecutorService executor = null;
@@ -60,25 +60,25 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
}
@Override
public CompletableFuture<Integer> sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last, Serializable... userids) {
public CompletableFuture<Integer> sendMessage(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last, Serializable... userids) {
if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
return this.localEngine.sendLocalMessage(message, last, userids);
}
@Override
public CompletableFuture<Integer> broadcastMessage(@RpcTargetAddress InetSocketAddress targetAddress, final WebSocketRange wsrange, Object message, boolean last) {
public CompletableFuture<Integer> broadcastMessage(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, final WebSocketRange wsrange, Object message, boolean last) {
if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
return this.localEngine.broadcastLocalMessage(wsrange, message, last);
}
@Override
public CompletableFuture<Integer> sendAction(@RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action, Serializable... userids) {
public CompletableFuture<Integer> sendAction(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action, Serializable... userids) {
if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
return this.localEngine.sendLocalAction(action, userids);
}
@Override
public CompletableFuture<Integer> broadcastAction(@RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action) {
public CompletableFuture<Integer> broadcastAction(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action) {
if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
return this.localEngine.broadcastLocalAction(action);
}
@@ -86,35 +86,35 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
/**
* 当用户连接到节点需要更新到CacheSource
*
* @param userid Serializable
* @param sncpAddr InetSocketAddress
* @param userid Serializable
* @param wsaddr WebSocketAddress
*
* @return 无返回值
*/
@Override
public CompletableFuture<Void> connect(Serializable userid, InetSocketAddress sncpAddr) {
public CompletableFuture<Void> connect(Serializable userid, WebSocketAddress wsaddr) {
tryAcquireSemaphore();
CompletableFuture<Void> future = sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class, sncpAddr);
future = future.thenAccept((a) -> sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_ADDRS_KEY, InetSocketAddress.class, sncpAddr));
CompletableFuture<Void> future = source.appendSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, WebSocketAddress.class, wsaddr);
future = future.thenAccept((a) -> source.appendSetItemAsync(SOURCE_SNCP_ADDRS_KEY, WebSocketAddress.class, wsaddr));
if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore());
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " connect from " + sncpAddr);
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " connect from " + wsaddr);
return future;
}
/**
* 当用户从一个节点断掉了所有的连接需要从CacheSource中删除
*
* @param userid Serializable
* @param sncpAddr InetSocketAddress
* @param userid Serializable
* @param wsaddr WebSocketAddress
*
* @return 无返回值
*/
@Override
public CompletableFuture<Void> disconnect(Serializable userid, InetSocketAddress sncpAddr) {
public CompletableFuture<Void> disconnect(Serializable userid, WebSocketAddress wsaddr) {
tryAcquireSemaphore();
CompletableFuture<Void> future = sncpNodeAddresses.removeSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class, sncpAddr);
CompletableFuture<Void> future = source.removeSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, WebSocketAddress.class, wsaddr);
if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore());
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " disconnect from " + sncpAddr);
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " disconnect from " + wsaddr);
return future;
}
@@ -123,17 +123,17 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
*
* @param olduserid Serializable
* @param newuserid Serializable
* @param sncpAddr InetSocketAddress
* @param wsaddr WebSocketAddress
*
* @return 无返回值
*/
@Override
public CompletableFuture<Void> changeUserid(Serializable olduserid, Serializable newuserid, InetSocketAddress sncpAddr) {
public CompletableFuture<Void> changeUserid(Serializable olduserid, Serializable newuserid, WebSocketAddress wsaddr) {
tryAcquireSemaphore();
CompletableFuture<Void> future = sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_USERID_PREFIX + newuserid, InetSocketAddress.class, sncpAddr);
future = future.thenAccept((a) -> sncpNodeAddresses.removeSetItemAsync(SOURCE_SNCP_USERID_PREFIX + olduserid, InetSocketAddress.class, sncpAddr));
CompletableFuture<Void> future = source.appendSetItemAsync(SOURCE_SNCP_USERID_PREFIX + newuserid, WebSocketAddress.class, wsaddr);
future = future.thenAccept((a) -> source.removeSetItemAsync(SOURCE_SNCP_USERID_PREFIX + olduserid, WebSocketAddress.class, wsaddr));
if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore());
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + olduserid + " changeUserid to " + newuserid + " from " + sncpAddr);
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + olduserid + " changeUserid to " + newuserid + " from " + wsaddr);
return future;
}
@@ -141,12 +141,13 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
* 判断用户是否有WebSocket
*
* @param userid Serializable
* @param topic RpcTargetTopic
* @param targetAddress InetSocketAddress
*
* @return 无返回值
*/
@Override
public CompletableFuture<Boolean> existsWebSocket(Serializable userid, @RpcTargetAddress InetSocketAddress targetAddress) {
public CompletableFuture<Boolean> existsWebSocket(Serializable userid, @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress) {
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " existsWebSocket from " + targetAddress);
if (localEngine == null) return CompletableFuture.completedFuture(false);
return CompletableFuture.completedFuture(localEngine.existsLocalWebSocket(userid));
@@ -156,12 +157,13 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
* 强制关闭用户的WebSocket
*
* @param userid Serializable
* @param topic RpcTargetTopic
* @param targetAddress InetSocketAddress
*
* @return 无返回值
*/
@Override
public CompletableFuture<Integer> forceCloseWebSocket(Serializable userid, @RpcTargetAddress InetSocketAddress targetAddress) {
public CompletableFuture<Integer> forceCloseWebSocket(Serializable userid, @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress) {
//不能从sncpNodeAddresses中移除因为engine.forceCloseWebSocket 会调用到disconnect
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " forceCloseWebSocket from " + targetAddress);
if (localEngine == null) return CompletableFuture.completedFuture(0);