优化WebSocket给多个userid发消息的实现
This commit is contained in:
@@ -148,6 +148,14 @@ public class Context {
|
||||
executor.execute(r);
|
||||
}
|
||||
|
||||
public int getCorePoolSize() {
|
||||
return executor.getCorePoolSize();
|
||||
}
|
||||
|
||||
public ThreadFactory getThreadFactory() {
|
||||
return executor.getThreadFactory();
|
||||
}
|
||||
|
||||
public int getBufferCapacity() {
|
||||
return bufferCapacity;
|
||||
}
|
||||
|
||||
@@ -540,7 +540,7 @@ public abstract class WebSocket<G extends Serializable, T> {
|
||||
*/
|
||||
public CompletableFuture<Void> changeUserid(final G newuserid) {
|
||||
if (newuserid == null) throw new NullPointerException("newuserid is null");
|
||||
return _engine.changeUserid(this, newuserid);
|
||||
return _engine.changeLocalUserid(this, newuserid);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -127,7 +127,7 @@ public class WebSocketEngine {
|
||||
}
|
||||
|
||||
@Comment("添加WebSocket")
|
||||
void add(WebSocket socket) {
|
||||
void addLocal(WebSocket socket) {
|
||||
if (single) {
|
||||
currconns.incrementAndGet();
|
||||
websockets.put(socket._userid, socket);
|
||||
@@ -144,7 +144,7 @@ public class WebSocketEngine {
|
||||
}
|
||||
|
||||
@Comment("从WebSocketEngine删除指定WebSocket")
|
||||
CompletableFuture<Void> removeThenClose(WebSocket socket) {
|
||||
CompletableFuture<Void> removeLocalThenClose(WebSocket socket) {
|
||||
Serializable userid = socket._userid;
|
||||
if (single) {
|
||||
currconns.decrementAndGet();
|
||||
@@ -165,7 +165,7 @@ public class WebSocketEngine {
|
||||
}
|
||||
|
||||
@Comment("更改WebSocket的userid")
|
||||
CompletableFuture<Void> changeUserid(WebSocket socket, final Serializable newuserid) {
|
||||
CompletableFuture<Void> changeLocalUserid(WebSocket socket, final Serializable newuserid) {
|
||||
if (newuserid == null) throw new NullPointerException("newuserid is null");
|
||||
final Serializable olduserid = socket._userid;
|
||||
socket._userid = newuserid;
|
||||
@@ -207,20 +207,20 @@ public class WebSocketEngine {
|
||||
}
|
||||
|
||||
@Comment("给所有连接用户发送消息")
|
||||
public CompletableFuture<Integer> broadcastMessage(final Object message, final boolean last) {
|
||||
return broadcastMessage((Predicate) null, message, last);
|
||||
public CompletableFuture<Integer> broadcastLocalMessage(final Object message, final boolean last) {
|
||||
return WebSocketEngine.this.broadcastLocalMessage((Predicate) null, message, last);
|
||||
}
|
||||
|
||||
@Comment("给指定WebSocket连接用户发送消息")
|
||||
public CompletableFuture<Integer> broadcastMessage(final WebSocketRange wsrange, final Object message, final boolean last) {
|
||||
public CompletableFuture<Integer> broadcastLocalMessage(final WebSocketRange wsrange, final Object message, final boolean last) {
|
||||
Predicate<WebSocket> predicate = wsrange == null ? null : (ws) -> ws.predicate(wsrange);
|
||||
return broadcastMessage(predicate, message, last);
|
||||
return WebSocketEngine.this.broadcastLocalMessage(predicate, message, last);
|
||||
}
|
||||
|
||||
@Comment("给指定WebSocket连接用户发送消息")
|
||||
public CompletableFuture<Integer> broadcastMessage(final Predicate<WebSocket> predicate, final Object message, final boolean last) {
|
||||
public CompletableFuture<Integer> broadcastLocalMessage(final Predicate<WebSocket> predicate, final Object message, final boolean last) {
|
||||
if (message instanceof CompletableFuture) {
|
||||
return ((CompletableFuture) message).thenCompose((json) -> broadcastMessage(predicate, json, last));
|
||||
return ((CompletableFuture) message).thenCompose((json) -> WebSocketEngine.this.broadcastLocalMessage(predicate, json, last));
|
||||
}
|
||||
final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null);
|
||||
if (more) {
|
||||
@@ -265,19 +265,19 @@ public class WebSocketEngine {
|
||||
}
|
||||
|
||||
@Comment("给指定用户组发送消息")
|
||||
public CompletableFuture<Integer> sendMessage(final Object message, final boolean last, final Stream<? extends Serializable> userids) {
|
||||
public CompletableFuture<Integer> sendLocalMessage(final Object message, final boolean last, final Stream<? extends Serializable> userids) {
|
||||
Object[] array = userids.toArray();
|
||||
Serializable[] ss = new Serializable[array.length];
|
||||
for (int i = 0; i < array.length; i++) {
|
||||
ss[i] = (Serializable) array[i];
|
||||
}
|
||||
return sendMessage(message, last, ss);
|
||||
return WebSocketEngine.this.sendLocalMessage(message, last, ss);
|
||||
}
|
||||
|
||||
@Comment("给指定用户组发送消息")
|
||||
public CompletableFuture<Integer> sendMessage(final Object message, final boolean last, final Serializable... userids) {
|
||||
public CompletableFuture<Integer> sendLocalMessage(final Object message, final boolean last, final Serializable... userids) {
|
||||
if (message instanceof CompletableFuture) {
|
||||
return ((CompletableFuture) message).thenCompose((json) -> sendMessage(json, last, userids));
|
||||
return ((CompletableFuture) message).thenCompose((json) -> WebSocketEngine.this.sendLocalMessage(json, last, userids));
|
||||
}
|
||||
final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null) && userids.length > 1;
|
||||
if (more) {
|
||||
@@ -326,7 +326,7 @@ public class WebSocketEngine {
|
||||
}
|
||||
|
||||
@Comment("给指定WebSocket连接用户发起操作指令")
|
||||
public CompletableFuture<Integer> broadcastAction(final WebSocketAction action) {
|
||||
public CompletableFuture<Integer> broadcastLocalAction(final WebSocketAction action) {
|
||||
CompletableFuture<Integer> future = null;
|
||||
if (single) {
|
||||
for (WebSocket websocket : websockets.values()) {
|
||||
@@ -343,17 +343,17 @@ public class WebSocketEngine {
|
||||
}
|
||||
|
||||
@Comment("给指定用户组发送操作")
|
||||
public CompletableFuture<Integer> sendAction(final WebSocketAction action, final Stream<? extends Serializable> userids) {
|
||||
public CompletableFuture<Integer> sendLocalAction(final WebSocketAction action, final Stream<? extends Serializable> userids) {
|
||||
Object[] array = userids.toArray();
|
||||
Serializable[] ss = new Serializable[array.length];
|
||||
for (int i = 0; i < array.length; i++) {
|
||||
ss[i] = (Serializable) array[i];
|
||||
}
|
||||
return sendAction(action, ss);
|
||||
return WebSocketEngine.this.sendLocalAction(action, ss);
|
||||
}
|
||||
|
||||
@Comment("给指定用户组发送操作")
|
||||
public CompletableFuture<Integer> sendAction(final WebSocketAction action, final Serializable... userids) {
|
||||
public CompletableFuture<Integer> sendLocalAction(final WebSocketAction action, final Serializable... userids) {
|
||||
CompletableFuture<Integer> future = null;
|
||||
if (single) {
|
||||
for (Serializable userid : userids) {
|
||||
|
||||
@@ -90,11 +90,11 @@ public abstract class WebSocketNode {
|
||||
|
||||
protected abstract CompletableFuture<List<String>> getWebSocketAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable userid);
|
||||
|
||||
protected abstract CompletableFuture<Integer> sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last, Serializable userid);
|
||||
protected abstract CompletableFuture<Integer> sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last, Serializable... userids);
|
||||
|
||||
protected abstract CompletableFuture<Integer> broadcastMessage(@RpcTargetAddress InetSocketAddress targetAddress, WebSocketRange wsrange, Object message, boolean last);
|
||||
|
||||
protected abstract CompletableFuture<Integer> sendAction(@RpcTargetAddress InetSocketAddress targetAddress, WebSocketAction action, Serializable userid);
|
||||
protected abstract CompletableFuture<Integer> sendAction(@RpcTargetAddress InetSocketAddress targetAddress, WebSocketAction action, Serializable... userids);
|
||||
|
||||
protected abstract CompletableFuture<Integer> broadcastAction(@RpcTargetAddress InetSocketAddress targetAddress, WebSocketAction action);
|
||||
|
||||
@@ -204,25 +204,6 @@ public abstract class WebSocketNode {
|
||||
return rs;
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
*
|
||||
* 判断指定用户是否WebSocket在线
|
||||
*
|
||||
* @param userid Serializable
|
||||
*
|
||||
* @return boolean
|
||||
*/
|
||||
private CompletableFuture<Boolean> existsWebSocket2(final Serializable userid) {
|
||||
if (this.localEngine != null && this.sncpNodeAddresses == null) {
|
||||
return CompletableFuture.completedFuture(this.localEngine.existsLocalWebSocket(userid));
|
||||
}
|
||||
tryAcquireSemaphore();
|
||||
CompletableFuture<Boolean> rs = this.sncpNodeAddresses.existsAsync(SOURCE_SNCP_USERID_PREFIX + userid);
|
||||
if (semaphore != null) rs.whenComplete((r, e) -> releaseSemaphore());
|
||||
return rs;
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断指定用户是否WebSocket在线
|
||||
*
|
||||
@@ -429,15 +410,97 @@ public abstract class WebSocketNode {
|
||||
if (message0 instanceof CompletableFuture) return ((CompletableFuture) message0).thenApply(msg -> sendMessage(convert, msg, last, userids));
|
||||
final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0 : ((convert instanceof TextConvert) ? new WebSocketPacket(((TextConvert) convert).convertTo(message0), last) : new WebSocketPacket(((BinaryConvert) convert).convertTo(message0), last));
|
||||
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式
|
||||
return this.localEngine.sendMessage(message, last, userids);
|
||||
return this.localEngine.sendLocalMessage(message, last, userids);
|
||||
}
|
||||
final Object remoteMessage = formatRemoteMessage(message);
|
||||
CompletableFuture<Integer> future = null;
|
||||
for (Serializable userid : userids) {
|
||||
future = future == null ? sendOneMessage(remoteMessage, last, userid)
|
||||
: future.thenCombine(sendOneMessage(remoteMessage, last, userid), (a, b) -> a | b);
|
||||
CompletableFuture<Integer> rsfuture;
|
||||
if (userids.length == 1) {
|
||||
rsfuture = sendOneUserMessage(remoteMessage, last, userids[0]);
|
||||
} else {
|
||||
String[] keys = new String[userids.length];
|
||||
final Map<String, Serializable> keyuser = new HashMap<>();
|
||||
for (int i = 0; i < userids.length; i++) {
|
||||
keys[i] = SOURCE_SNCP_USERID_PREFIX + userids[i];
|
||||
keyuser.put(keys[i], userids[i]);
|
||||
}
|
||||
tryAcquireSemaphore();
|
||||
CompletableFuture<Map<String, Collection<InetSocketAddress>>> addrsFuture = sncpNodeAddresses.getCollectionMapAsync(InetSocketAddress.class, keys);
|
||||
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
|
||||
rsfuture = addrsFuture.thenCompose((Map<String, Collection<InetSocketAddress>> addrs) -> {
|
||||
if (addrs == null || addrs.isEmpty()) {
|
||||
if (logger.isLoggable(Level.FINER)) logger.finer("websocket not found userids:" + JsonConvert.root().convertTo(userids) + " on any node ");
|
||||
return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
|
||||
}
|
||||
Map<InetSocketAddress, List<Serializable>> addrUsers = new HashMap<>();
|
||||
addrs.forEach((key, as) -> {
|
||||
for (InetSocketAddress a : as) {
|
||||
addrUsers.computeIfAbsent(a, k -> new ArrayList<>()).add(keyuser.get(key));
|
||||
}
|
||||
});
|
||||
if (logger.isLoggable(Level.FINEST)) {
|
||||
logger.finest("websocket(localaddr=" + localSncpAddress + ", userids=" + JsonConvert.root().convertTo(userids) + ") found message-addr-userids: " + addrUsers);
|
||||
}
|
||||
CompletableFuture<Integer> future = null;
|
||||
for (Map.Entry<InetSocketAddress, List<Serializable>> en : addrUsers.entrySet()) {
|
||||
Serializable[] us = en.getValue().toArray(new Serializable[en.getValue().size()]);
|
||||
future = future == null ? sendOneAddrMessage(en.getKey(), remoteMessage, last, us)
|
||||
: future.thenCombine(sendOneAddrMessage(en.getKey(), remoteMessage, last, us), (a, b) -> a | b);
|
||||
}
|
||||
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
|
||||
});
|
||||
}
|
||||
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
|
||||
return rsfuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : rsfuture;
|
||||
}
|
||||
|
||||
protected CompletableFuture<Integer> sendOneUserMessage(final Object message, final boolean last, final Serializable userid) {
|
||||
if (message instanceof CompletableFuture) return ((CompletableFuture) message).thenApply(msg -> sendOneUserMessage(msg, last, userid));
|
||||
if (logger.isLoggable(Level.FINEST)) {
|
||||
logger.finest("websocket want send message {userid:" + userid + ", content:'" + (message instanceof WebSocketPacket ? ((WebSocketPacket) message).toSimpleString() : JsonConvert.root().convertTo(message)) + "'} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine");
|
||||
}
|
||||
CompletableFuture<Integer> localFuture = null;
|
||||
if (this.localEngine != null) localFuture = localEngine.sendLocalMessage(message, last, userid);
|
||||
if (this.sncpNodeAddresses == null || this.remoteNode == null) {
|
||||
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket remote node is null");
|
||||
//没有CacheSource就不会有分布式节点
|
||||
return localFuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localFuture;
|
||||
}
|
||||
//远程节点发送消息
|
||||
final Object remoteMessage = formatRemoteMessage(message);
|
||||
tryAcquireSemaphore();
|
||||
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class);
|
||||
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
|
||||
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
|
||||
if (addrs == null || addrs.isEmpty()) {
|
||||
if (logger.isLoggable(Level.FINER)) logger.finer("websocket not found userid:" + userid + " on any node ");
|
||||
return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
|
||||
}
|
||||
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket(localaddr=" + localSncpAddress + ") found userid:" + userid + " on " + addrs);
|
||||
CompletableFuture<Integer> future = null;
|
||||
for (InetSocketAddress addr : addrs) {
|
||||
if (addr == null || addr.equals(localSncpAddress)) continue;
|
||||
future = future == null ? remoteNode.sendMessage(addr, remoteMessage, last, userid)
|
||||
: future.thenCombine(remoteNode.sendMessage(addr, remoteMessage, last, userid), (a, b) -> a | b);
|
||||
}
|
||||
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
|
||||
});
|
||||
return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b);
|
||||
}
|
||||
|
||||
protected CompletableFuture<Integer> sendOneAddrMessage(final InetSocketAddress sncpAddr, final Object message, final boolean last, final Serializable... userids) {
|
||||
if (message instanceof CompletableFuture) return ((CompletableFuture) message).thenApply(msg -> sendOneAddrMessage(sncpAddr, msg, last, userids));
|
||||
if (logger.isLoggable(Level.FINEST)) {
|
||||
logger.finest("websocket want send message {userids:" + JsonConvert.root().convertTo(userids) + ", sncpaddr:" + sncpAddr + ", content:'" + (message instanceof WebSocketPacket ? ((WebSocketPacket) message).toSimpleString() : JsonConvert.root().convertTo(message)) + "'} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine");
|
||||
}
|
||||
if (Objects.equals(sncpAddr, this.localSncpAddress)) {
|
||||
return this.localEngine == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localEngine.sendLocalMessage(message, last, userids);
|
||||
}
|
||||
if (this.sncpNodeAddresses == null || this.remoteNode == null) {
|
||||
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket remote node is null");
|
||||
//没有CacheSource就不会有分布式节点
|
||||
return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
|
||||
}
|
||||
final Object remoteMessage = formatRemoteMessage(message);
|
||||
return remoteNode.sendMessage(sncpAddr, remoteMessage, last, userids);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -548,10 +611,10 @@ public abstract class WebSocketNode {
|
||||
if (message0 instanceof CompletableFuture) return ((CompletableFuture) message0).thenApply(msg -> broadcastMessage(wsrange, convert, msg, last));
|
||||
final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0 : ((convert instanceof TextConvert) ? new WebSocketPacket(((TextConvert) convert).convertTo(message0), last) : new WebSocketPacket(((BinaryConvert) convert).convertTo(message0), last));
|
||||
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式
|
||||
return this.localEngine.broadcastMessage(wsrange, message, last);
|
||||
return this.localEngine.broadcastLocalMessage(wsrange, message, last);
|
||||
}
|
||||
final Object remoteMessage = formatRemoteMessage(message);
|
||||
CompletableFuture<Integer> localFuture = this.localEngine == null ? null : this.localEngine.broadcastMessage(wsrange, message, last);
|
||||
CompletableFuture<Integer> localFuture = this.localEngine == null ? null : this.localEngine.broadcastLocalMessage(wsrange, message, last);
|
||||
tryAcquireSemaphore();
|
||||
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_ADDRS_KEY, InetSocketAddress.class);
|
||||
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
|
||||
@@ -569,40 +632,6 @@ public abstract class WebSocketNode {
|
||||
return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b);
|
||||
}
|
||||
|
||||
protected CompletableFuture<Integer> sendOneMessage(final Object message, final boolean last, final Serializable userid) {
|
||||
if (message instanceof CompletableFuture) return ((CompletableFuture) message).thenApply(msg -> sendOneMessage(msg, last, userid));
|
||||
if (logger.isLoggable(Level.FINEST)) {
|
||||
logger.finest("websocket want send message {userid:" + userid + ", content:'" + (message instanceof WebSocketPacket ? ((WebSocketPacket) message).toSimpleString() : JsonConvert.root().convertTo(message)) + "'} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine");
|
||||
}
|
||||
CompletableFuture<Integer> localFuture = null;
|
||||
if (this.localEngine != null) localFuture = localEngine.sendMessage(message, last, userid);
|
||||
if (this.sncpNodeAddresses == null || this.remoteNode == null) {
|
||||
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket remote node is null");
|
||||
//没有CacheSource就不会有分布式节点
|
||||
return localFuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localFuture;
|
||||
}
|
||||
//远程节点发送消息
|
||||
final Object remoteMessage = formatRemoteMessage(message);
|
||||
tryAcquireSemaphore();
|
||||
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class);
|
||||
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
|
||||
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
|
||||
if (addrs == null || addrs.isEmpty()) {
|
||||
if (logger.isLoggable(Level.FINER)) logger.finer("websocket not found userid:" + userid + " on any node ");
|
||||
return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
|
||||
}
|
||||
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket(localaddr=" + localSncpAddress + ") found userid:" + userid + " on " + addrs);
|
||||
CompletableFuture<Integer> future = null;
|
||||
for (InetSocketAddress addr : addrs) {
|
||||
if (addr == null || addr.equals(localSncpAddress)) continue;
|
||||
future = future == null ? remoteNode.sendMessage(addr, remoteMessage, last, userid)
|
||||
: future.thenCombine(remoteNode.sendMessage(addr, remoteMessage, last, userid), (a, b) -> a | b);
|
||||
}
|
||||
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
|
||||
});
|
||||
return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b);
|
||||
}
|
||||
|
||||
/**
|
||||
* 广播操作, 给所有人发操作
|
||||
*
|
||||
@@ -613,9 +642,9 @@ public abstract class WebSocketNode {
|
||||
@Local
|
||||
public CompletableFuture<Integer> broadcastAction(final WebSocketAction action) {
|
||||
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式
|
||||
return this.localEngine.broadcastAction(action);
|
||||
return this.localEngine.broadcastLocalAction(action);
|
||||
}
|
||||
CompletableFuture<Integer> localFuture = this.localEngine == null ? null : this.localEngine.broadcastAction(action);
|
||||
CompletableFuture<Integer> localFuture = this.localEngine == null ? null : this.localEngine.broadcastLocalAction(action);
|
||||
tryAcquireSemaphore();
|
||||
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_ADDRS_KEY, InetSocketAddress.class);
|
||||
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
|
||||
@@ -646,21 +675,53 @@ public abstract class WebSocketNode {
|
||||
public CompletableFuture<Integer> sendAction(final WebSocketAction action, final Serializable... userids) {
|
||||
if (userids == null || userids.length < 1) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
|
||||
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式
|
||||
return this.localEngine.sendAction(action, userids);
|
||||
return this.localEngine.sendLocalAction(action, userids);
|
||||
}
|
||||
CompletableFuture<Integer> future = null;
|
||||
for (Serializable userid : userids) {
|
||||
future = future == null ? sendOneAction(action, userid) : future.thenCombine(sendOneAction(action, userid), (a, b) -> a | b);
|
||||
CompletableFuture<Integer> rsfuture;
|
||||
if (userids.length == 1) {
|
||||
rsfuture = sendOneUserAction(action, userids[0]);
|
||||
} else {
|
||||
String[] keys = new String[userids.length];
|
||||
final Map<String, Serializable> keyuser = new HashMap<>();
|
||||
for (int i = 0; i < userids.length; i++) {
|
||||
keys[i] = SOURCE_SNCP_USERID_PREFIX + userids[i];
|
||||
keyuser.put(keys[i], userids[i]);
|
||||
}
|
||||
tryAcquireSemaphore();
|
||||
CompletableFuture<Map<String, Collection<InetSocketAddress>>> addrsFuture = sncpNodeAddresses.getCollectionMapAsync(InetSocketAddress.class, keys);
|
||||
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
|
||||
rsfuture = addrsFuture.thenCompose((Map<String, Collection<InetSocketAddress>> addrs) -> {
|
||||
if (addrs == null || addrs.isEmpty()) {
|
||||
if (logger.isLoggable(Level.FINER)) logger.finer("websocket not found userids:" + JsonConvert.root().convertTo(userids) + " on any node ");
|
||||
return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
|
||||
}
|
||||
Map<InetSocketAddress, List<Serializable>> addrUsers = new HashMap<>();
|
||||
addrs.forEach((key, as) -> {
|
||||
for (InetSocketAddress a : as) {
|
||||
addrUsers.computeIfAbsent(a, k -> new ArrayList<>()).add(keyuser.get(key));
|
||||
}
|
||||
});
|
||||
if (logger.isLoggable(Level.FINEST)) {
|
||||
logger.finest("websocket(localaddr=" + localSncpAddress + ", userids=" + JsonConvert.root().convertTo(userids) + ") found action-userid-addrs: " + addrUsers);
|
||||
}
|
||||
CompletableFuture<Integer> future = null;
|
||||
for (Map.Entry<InetSocketAddress, List<Serializable>> en : addrUsers.entrySet()) {
|
||||
Serializable[] us = en.getValue().toArray(new Serializable[en.getValue().size()]);
|
||||
future = future == null ? sendOneAddrAction(en.getKey(), action, us)
|
||||
: future.thenCombine(sendOneAddrAction(en.getKey(), action, us), (a, b) -> a | b);
|
||||
}
|
||||
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
|
||||
});
|
||||
}
|
||||
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
|
||||
return rsfuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : rsfuture;
|
||||
}
|
||||
|
||||
protected CompletableFuture<Integer> sendOneAction(final WebSocketAction action, final Serializable userid) {
|
||||
protected CompletableFuture<Integer> sendOneUserAction(final WebSocketAction action, final Serializable userid) {
|
||||
if (logger.isLoggable(Level.FINEST)) {
|
||||
logger.finest("websocket want send action {userid:" + userid + ", action:" + action + "} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine");
|
||||
}
|
||||
CompletableFuture<Integer> localFuture = null;
|
||||
if (this.localEngine != null) localFuture = localEngine.sendAction(action, userid);
|
||||
if (this.localEngine != null) localFuture = localEngine.sendLocalAction(action, userid);
|
||||
if (this.sncpNodeAddresses == null || this.remoteNode == null) {
|
||||
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket remote node is null");
|
||||
//没有CacheSource就不会有分布式节点
|
||||
@@ -687,6 +748,21 @@ public abstract class WebSocketNode {
|
||||
return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b);
|
||||
}
|
||||
|
||||
protected CompletableFuture<Integer> sendOneAddrAction(final InetSocketAddress sncpAddr, final WebSocketAction action, final Serializable... userids) {
|
||||
if (logger.isLoggable(Level.FINEST)) {
|
||||
logger.finest("websocket want send action {userids:" + JsonConvert.root().convertTo(userids) + ", sncpaddr:" + sncpAddr + ", action:" + action + " from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine");
|
||||
}
|
||||
if (Objects.equals(sncpAddr, this.localSncpAddress)) {
|
||||
return this.localEngine == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localEngine.sendLocalAction(action, userids);
|
||||
}
|
||||
if (this.sncpNodeAddresses == null || this.remoteNode == null) {
|
||||
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket remote node is null");
|
||||
//没有CacheSource就不会有分布式节点
|
||||
return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
|
||||
}
|
||||
return remoteNode.sendAction(sncpAddr, action, userids);
|
||||
}
|
||||
|
||||
protected Object formatRemoteMessage(Object message) {
|
||||
if (message instanceof WebSocketPacket) return message;
|
||||
if (message instanceof byte[]) return message;
|
||||
|
||||
@@ -294,7 +294,7 @@ class WebSocketRunner implements Runnable {
|
||||
if (closed) return null;
|
||||
closed = true;
|
||||
channel.dispose();
|
||||
CompletableFuture<Void> future = engine.removeThenClose(webSocket);
|
||||
CompletableFuture<Void> future = engine.removeLocalThenClose(webSocket);
|
||||
webSocket.onClose(code, reason);
|
||||
return future;
|
||||
}
|
||||
|
||||
@@ -255,7 +255,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
|
||||
CompletableFuture<Boolean> rcFuture = webSocket.onSingleRepeatConnect();
|
||||
Consumer<Boolean> task = (oldkilled) -> {
|
||||
if (oldkilled) {
|
||||
WebSocketServlet.this.node.localEngine.add(webSocket);
|
||||
WebSocketServlet.this.node.localEngine.addLocal(webSocket);
|
||||
WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel());
|
||||
webSocket._runner = runner;
|
||||
context.runAsync(runner);
|
||||
@@ -276,7 +276,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
|
||||
});
|
||||
}
|
||||
} else {
|
||||
WebSocketServlet.this.node.localEngine.add(webSocket);
|
||||
WebSocketServlet.this.node.localEngine.addLocal(webSocket);
|
||||
WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel());
|
||||
webSocket._runner = runner;
|
||||
context.runAsync(runner);
|
||||
@@ -284,7 +284,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
|
||||
}
|
||||
});
|
||||
} else {
|
||||
WebSocketServlet.this.node.localEngine.add(webSocket);
|
||||
WebSocketServlet.this.node.localEngine.addLocal(webSocket);
|
||||
WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel());
|
||||
webSocket._runner = runner;
|
||||
context.runAsync(runner);
|
||||
|
||||
@@ -56,27 +56,27 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Integer> sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last, Serializable userid) {
|
||||
public CompletableFuture<Integer> sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last, Serializable... userids) {
|
||||
if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
|
||||
return this.localEngine.sendMessage(message, last, userid);
|
||||
return this.localEngine.sendLocalMessage(message, last, userids);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Integer> broadcastMessage(@RpcTargetAddress InetSocketAddress targetAddress, final WebSocketRange wsrange, Object message, boolean last) {
|
||||
if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
|
||||
return this.localEngine.broadcastMessage(wsrange, message, last);
|
||||
return this.localEngine.broadcastLocalMessage(wsrange, message, last);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Integer> sendAction(@RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action, Serializable userid) {
|
||||
public CompletableFuture<Integer> sendAction(@RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action, Serializable... userids) {
|
||||
if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
|
||||
return this.localEngine.sendAction(action, userid);
|
||||
return this.localEngine.sendLocalAction(action, userids);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Integer> broadcastAction(@RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action) {
|
||||
if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
|
||||
return this.localEngine.broadcastAction(action);
|
||||
return this.localEngine.broadcastLocalAction(action);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user