This commit is contained in:
@@ -319,88 +319,88 @@ public abstract class WebSocketNode {
|
||||
* 向指定用户发送消息,先发送本地连接,再发送远程连接 <br>
|
||||
* 如果当前WebSocketNode是远程模式,此方法只发送远程连接
|
||||
*
|
||||
* @param message 消息内容
|
||||
* @param userids Stream
|
||||
* @param message 消息内容
|
||||
* @param useridOrAddrs Stream
|
||||
*
|
||||
* @return 为0表示成功, 其他值表示部分发送异常
|
||||
*/
|
||||
@Local
|
||||
public final CompletableFuture<Integer> sendMessage(Object message, final Stream<? extends Serializable> userids) {
|
||||
return sendMessage((Convert) null, message, true, userids);
|
||||
public final CompletableFuture<Integer> sendMessage(Object message, final Stream<? extends Serializable> useridOrAddrs) {
|
||||
return sendMessage((Convert) null, message, true, useridOrAddrs);
|
||||
}
|
||||
|
||||
/**
|
||||
* 向指定用户发送消息,先发送本地连接,再发送远程连接 <br>
|
||||
* 如果当前WebSocketNode是远程模式,此方法只发送远程连接
|
||||
*
|
||||
* @param message 消息内容
|
||||
* @param userids Serializable[]
|
||||
* @param message 消息内容
|
||||
* @param useridOrAddrs Serializable[]
|
||||
*
|
||||
* @return 为0表示成功, 其他值表示部分发送异常
|
||||
*/
|
||||
@Local
|
||||
public final CompletableFuture<Integer> sendMessage(Object message, final Serializable... userids) {
|
||||
return sendMessage((Convert) null, message, true, userids);
|
||||
public final CompletableFuture<Integer> sendMessage(Object message, final Serializable... useridOrAddrs) {
|
||||
return sendMessage((Convert) null, message, true, useridOrAddrs);
|
||||
}
|
||||
|
||||
/**
|
||||
* 向指定用户发送消息,先发送本地连接,再发送远程连接 <br>
|
||||
* 如果当前WebSocketNode是远程模式,此方法只发送远程连接
|
||||
*
|
||||
* @param convert Convert
|
||||
* @param message 消息内容
|
||||
* @param userids Stream
|
||||
* @param convert Convert
|
||||
* @param message 消息内容
|
||||
* @param useridOrAddrs Stream
|
||||
*
|
||||
* @return 为0表示成功, 其他值表示部分发送异常
|
||||
*/
|
||||
@Local
|
||||
public final CompletableFuture<Integer> sendMessage(final Convert convert, Object message, final Stream<? extends Serializable> userids) {
|
||||
return sendMessage(convert, message, true, userids);
|
||||
public final CompletableFuture<Integer> sendMessage(final Convert convert, Object message, final Stream<? extends Serializable> useridOrAddrs) {
|
||||
return sendMessage(convert, message, true, useridOrAddrs);
|
||||
}
|
||||
|
||||
/**
|
||||
* 向指定用户发送消息,先发送本地连接,再发送远程连接 <br>
|
||||
* 如果当前WebSocketNode是远程模式,此方法只发送远程连接
|
||||
*
|
||||
* @param convert Convert
|
||||
* @param message 消息内容
|
||||
* @param userids Serializable[]
|
||||
* @param convert Convert
|
||||
* @param message 消息内容
|
||||
* @param useridOrAddrs Serializable[]
|
||||
*
|
||||
* @return 为0表示成功, 其他值表示部分发送异常
|
||||
*/
|
||||
@Local
|
||||
public final CompletableFuture<Integer> sendMessage(final Convert convert, Object message, final Serializable... userids) {
|
||||
return sendMessage(convert, message, true, userids);
|
||||
public final CompletableFuture<Integer> sendMessage(final Convert convert, Object message, final Serializable... useridOrAddrs) {
|
||||
return sendMessage(convert, message, true, useridOrAddrs);
|
||||
}
|
||||
|
||||
/**
|
||||
* 向指定用户发送消息,先发送本地连接,再发送远程连接 <br>
|
||||
* 如果当前WebSocketNode是远程模式,此方法只发送远程连接
|
||||
*
|
||||
* @param message 消息内容
|
||||
* @param last 是否最后一条
|
||||
* @param userids Stream
|
||||
* @param message 消息内容
|
||||
* @param last 是否最后一条
|
||||
* @param useridOrAddrs Stream
|
||||
*
|
||||
* @return 为0表示成功, 其他值表示部分发送异常
|
||||
*/
|
||||
@Local
|
||||
public final CompletableFuture<Integer> sendMessage(final Object message, final boolean last, final Stream<? extends Serializable> userids) {
|
||||
return sendMessage((Convert) null, message, last, userids);
|
||||
public final CompletableFuture<Integer> sendMessage(final Object message, final boolean last, final Stream<? extends Serializable> useridOrAddrs) {
|
||||
return sendMessage((Convert) null, message, last, useridOrAddrs);
|
||||
}
|
||||
|
||||
/**
|
||||
* 向指定用户发送消息,先发送本地连接,再发送远程连接 <br>
|
||||
* 如果当前WebSocketNode是远程模式,此方法只发送远程连接
|
||||
*
|
||||
* @param message 消息内容
|
||||
* @param last 是否最后一条
|
||||
* @param userids Serializable[]
|
||||
* @param message 消息内容
|
||||
* @param last 是否最后一条
|
||||
* @param useridOrAddrs Serializable[]
|
||||
*
|
||||
* @return 为0表示成功, 其他值表示部分发送异常
|
||||
*/
|
||||
@Local
|
||||
public final CompletableFuture<Integer> sendMessage(final Object message, final boolean last, final Serializable... userids) {
|
||||
return sendMessage((Convert) null, message, last, userids);
|
||||
public final CompletableFuture<Integer> sendMessage(final Object message, final boolean last, final Serializable... useridOrAddrs) {
|
||||
return sendMessage((Convert) null, message, last, useridOrAddrs);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -438,6 +438,13 @@ public abstract class WebSocketNode {
|
||||
@Local
|
||||
public CompletableFuture<Integer> sendMessage(final Convert convert, final Object message0, final boolean last, final Serializable... userids) {
|
||||
if (userids == null || userids.length < 1) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
|
||||
if (userids[0] instanceof WebSocketUserAddress) {
|
||||
WebSocketUserAddress[] useraddrs = new WebSocketUserAddress[userids.length];
|
||||
for (int i = 0; i < useraddrs.length; i++) {
|
||||
useraddrs[i] = (WebSocketUserAddress) userids[i];
|
||||
}
|
||||
return sendMessage(convert, message0, last, useraddrs);
|
||||
}
|
||||
if (message0 instanceof CompletableFuture) return ((CompletableFuture) message0).thenApply(msg -> sendMessage(convert, msg, last, userids));
|
||||
final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0 : ((convert instanceof TextConvert) ? new WebSocketPacket(((TextConvert) convert).convertTo(message0), last) : new WebSocketPacket(((BinaryConvert) convert).convertTo(message0), last));
|
||||
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式
|
||||
@@ -483,6 +490,41 @@ public abstract class WebSocketNode {
|
||||
return rsfuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : rsfuture;
|
||||
}
|
||||
|
||||
/**
|
||||
* 向指定用户发送消息,先发送本地连接,再发送远程连接 <br>
|
||||
* 如果当前WebSocketNode是远程模式,此方法只发送远程连接
|
||||
*
|
||||
* @param convert Convert
|
||||
* @param message0 消息内容
|
||||
* @param last 是否最后一条
|
||||
* @param useraddrs WebSocketUserAddress[]
|
||||
*
|
||||
* @return 为0表示成功, 其他值表示部分发送异常
|
||||
*/
|
||||
@Local
|
||||
public CompletableFuture<Integer> sendMessage(final Convert convert, final Object message0, final boolean last, final WebSocketUserAddress... useraddrs) {
|
||||
if (useraddrs == null || useraddrs.length < 1) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
|
||||
if (message0 instanceof CompletableFuture) return ((CompletableFuture) message0).thenApply(msg -> sendMessage(convert, msg, last, useraddrs));
|
||||
final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0 : ((convert instanceof TextConvert) ? new WebSocketPacket(((TextConvert) convert).convertTo(message0), last) : new WebSocketPacket(((BinaryConvert) convert).convertTo(message0), last));
|
||||
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式
|
||||
return this.localEngine.sendLocalMessage(message, last, userAddressToUserids(useraddrs));
|
||||
}
|
||||
|
||||
final Object remoteMessage = formatRemoteMessage(message);
|
||||
final Map<InetSocketAddress, List<Serializable>> addrUsers = userAddressToAddrMap(useraddrs);
|
||||
if (logger.isLoggable(Level.FINEST)) {
|
||||
logger.finest("websocket(localaddr=" + localSncpAddress + ", useraddrs=" + JsonConvert.root().convertTo(useraddrs) + ") found message-addr-userids: " + addrUsers);
|
||||
}
|
||||
CompletableFuture<Integer> future = null;
|
||||
for (Map.Entry<InetSocketAddress, List<Serializable>> en : addrUsers.entrySet()) {
|
||||
Serializable[] oneaddrUserids = en.getValue().toArray(new Serializable[en.getValue().size()]);
|
||||
future = future == null ? sendOneAddrMessage(en.getKey(), remoteMessage, last, oneaddrUserids)
|
||||
: future.thenCombine(sendOneAddrMessage(en.getKey(), remoteMessage, last, oneaddrUserids), (a, b) -> a | b);
|
||||
}
|
||||
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
|
||||
|
||||
}
|
||||
|
||||
protected CompletableFuture<Integer> sendOneUserMessage(final Object message, final boolean last, final Serializable userid) {
|
||||
if (message instanceof CompletableFuture) return ((CompletableFuture) message).thenApply(msg -> sendOneUserMessage(msg, last, userid));
|
||||
if (logger.isLoggable(Level.FINEST)) {
|
||||
@@ -534,6 +576,32 @@ public abstract class WebSocketNode {
|
||||
return remoteNode.sendMessage(sncpAddr, remoteMessage, last, userids);
|
||||
}
|
||||
|
||||
protected Serializable[] userAddressToUserids(WebSocketUserAddress... useraddrs) {
|
||||
if (useraddrs == null || useraddrs.length == 1) return new Serializable[0];
|
||||
Set<Serializable> set = new HashSet<>();
|
||||
for (WebSocketUserAddress userAddress : useraddrs) {
|
||||
set.add(userAddress.userid());
|
||||
}
|
||||
return set.toArray(new Serializable[set.size()]);
|
||||
}
|
||||
|
||||
protected Map<InetSocketAddress, List<Serializable>> userAddressToAddrMap(WebSocketUserAddress... useraddrs) {
|
||||
final Map<InetSocketAddress, List<Serializable>> addrUsers = new HashMap<>();
|
||||
for (WebSocketUserAddress userAddress : useraddrs) {
|
||||
if (userAddress.sncpAddress() != null) {
|
||||
addrUsers.computeIfAbsent(userAddress.sncpAddress(), k -> new ArrayList<>()).add(userAddress.userid());
|
||||
}
|
||||
if (userAddress.sncpAddresses() != null) {
|
||||
for (InetSocketAddress addr : userAddress.sncpAddresses()) {
|
||||
if (addr != null) {
|
||||
addrUsers.computeIfAbsent(addr, k -> new ArrayList<>()).add(userAddress.userid());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return addrUsers;
|
||||
}
|
||||
|
||||
/**
|
||||
* 广播消息, 给所有人发消息
|
||||
*
|
||||
@@ -705,6 +773,13 @@ public abstract class WebSocketNode {
|
||||
@Local
|
||||
public CompletableFuture<Integer> sendAction(final WebSocketAction action, final Serializable... userids) {
|
||||
if (userids == null || userids.length < 1) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
|
||||
if (userids[0] instanceof WebSocketUserAddress) {
|
||||
WebSocketUserAddress[] useraddrs = new WebSocketUserAddress[userids.length];
|
||||
for (int i = 0; i < useraddrs.length; i++) {
|
||||
useraddrs[i] = (WebSocketUserAddress) userids[i];
|
||||
}
|
||||
return sendAction(action, useraddrs);
|
||||
}
|
||||
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式
|
||||
return this.localEngine.sendLocalAction(action, userids);
|
||||
}
|
||||
@@ -747,6 +822,35 @@ public abstract class WebSocketNode {
|
||||
return rsfuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : rsfuture;
|
||||
}
|
||||
|
||||
/**
|
||||
* 向指定用户发送操作,先发送本地连接,再发送远程连接 <br>
|
||||
* 如果当前WebSocketNode是远程模式,此方法只发送远程连接
|
||||
*
|
||||
* @param action 操作参数
|
||||
* @param useraddrs WebSocketUserAddress[]
|
||||
*
|
||||
* @return 为0表示成功, 其他值表示部分发送异常
|
||||
*/
|
||||
@Local
|
||||
public CompletableFuture<Integer> sendAction(final WebSocketAction action, final WebSocketUserAddress... useraddrs) {
|
||||
if (useraddrs == null || useraddrs.length < 1) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
|
||||
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式
|
||||
return this.localEngine.sendLocalAction(action, userAddressToUserids(useraddrs));
|
||||
}
|
||||
|
||||
final Map<InetSocketAddress, List<Serializable>> addrUsers = userAddressToAddrMap(useraddrs);
|
||||
if (logger.isLoggable(Level.FINEST)) {
|
||||
logger.finest("websocket(localaddr=" + localSncpAddress + ", useraddrs=" + JsonConvert.root().convertTo(useraddrs) + ") found action-userid-addrs: " + addrUsers);
|
||||
}
|
||||
CompletableFuture<Integer> future = null;
|
||||
for (Map.Entry<InetSocketAddress, List<Serializable>> en : addrUsers.entrySet()) {
|
||||
Serializable[] oneaddrUserids = en.getValue().toArray(new Serializable[en.getValue().size()]);
|
||||
future = future == null ? sendOneAddrAction(en.getKey(), action, oneaddrUserids)
|
||||
: future.thenCombine(sendOneAddrAction(en.getKey(), action, oneaddrUserids), (a, b) -> a | b);
|
||||
}
|
||||
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
|
||||
}
|
||||
|
||||
protected CompletableFuture<Integer> sendOneUserAction(final WebSocketAction action, final Serializable userid) {
|
||||
if (logger.isLoggable(Level.FINEST)) {
|
||||
logger.finest("websocket want send action {userid:" + userid + ", action:" + action + "} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine");
|
||||
|
||||
@@ -13,12 +13,10 @@ import org.redkale.convert.json.JsonConvert;
|
||||
/**
|
||||
* userid 与 sncpaddress组合对象
|
||||
*
|
||||
* <p>
|
||||
* 详情见: https://redkale.org
|
||||
*
|
||||
* @author zhangjx
|
||||
*/
|
||||
public interface WebSocketUserAddress {
|
||||
public interface WebSocketUserAddress extends Serializable {
|
||||
|
||||
Serializable userid();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user