This commit is contained in:
Redkale
2019-08-28 16:15:28 +08:00
parent a495829a3c
commit e281cac3d3
2 changed files with 118 additions and 10 deletions

View File

@@ -251,17 +251,43 @@ public abstract class WebSocketNode {
*/
@Local
public CompletableFuture<Integer> forceCloseWebSocket(final Serializable userid) {
return forceCloseWebSocket(userid, (WebSocketUserAddress) null);
}
/**
* 强制关闭用户WebSocket
*
* @param userAddress WebSocketUserAddress
*
* @return int
*/
@Local
public CompletableFuture<Integer> forceCloseWebSocket(final WebSocketUserAddress userAddress) {
return forceCloseWebSocket(null, userAddress);
}
private CompletableFuture<Integer> forceCloseWebSocket(final Serializable userid, final WebSocketUserAddress userAddress) {
CompletableFuture<Integer> localFuture = null;
if (this.localEngine != null) localFuture = CompletableFuture.completedFuture(localEngine.forceCloseLocalWebSocket(userid));
if (this.localEngine != null) localFuture = CompletableFuture.completedFuture(localEngine.forceCloseLocalWebSocket(userAddress == null ? userid : userAddress.userid()));
if (this.sncpNodeAddresses == null || this.remoteNode == null) {
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket remote node is null");
//没有CacheSource就不会有分布式节点
return localFuture;
}
//远程节点关闭
tryAcquireSemaphore();
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class);
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
CompletableFuture<Collection<InetSocketAddress>> addrsFuture;
if (userAddress == null) {
tryAcquireSemaphore();
addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class);
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
} else {
Collection<InetSocketAddress> addrs = userAddress.sncpAddresses();
if (userAddress.sncpAddress() != null) {
if (addrs == null) addrs = new ArrayList<>();
addrs.add(userAddress.sncpAddress());
}
addrsFuture = CompletableFuture.completedFuture(addrs);
}
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket found userid:" + userid + " on " + addrs);
if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0);
@@ -446,9 +472,9 @@ public abstract class WebSocketNode {
}
CompletableFuture<Integer> future = null;
for (Map.Entry<InetSocketAddress, List<Serializable>> en : addrUsers.entrySet()) {
Serializable[] us = en.getValue().toArray(new Serializable[en.getValue().size()]);
future = future == null ? sendOneAddrMessage(en.getKey(), remoteMessage, last, us)
: future.thenCombine(sendOneAddrMessage(en.getKey(), remoteMessage, last, us), (a, b) -> a | b);
Serializable[] oneaddrUserids = en.getValue().toArray(new Serializable[en.getValue().size()]);
future = future == null ? sendOneAddrMessage(en.getKey(), remoteMessage, last, oneaddrUserids)
: future.thenCombine(sendOneAddrMessage(en.getKey(), remoteMessage, last, oneaddrUserids), (a, b) -> a | b);
}
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
});
@@ -710,9 +736,9 @@ public abstract class WebSocketNode {
}
CompletableFuture<Integer> future = null;
for (Map.Entry<InetSocketAddress, List<Serializable>> en : addrUsers.entrySet()) {
Serializable[] us = en.getValue().toArray(new Serializable[en.getValue().size()]);
future = future == null ? sendOneAddrAction(en.getKey(), action, us)
: future.thenCombine(sendOneAddrAction(en.getKey(), action, us), (a, b) -> a | b);
Serializable[] oneaddrUserids = en.getValue().toArray(new Serializable[en.getValue().size()]);
future = future == null ? sendOneAddrAction(en.getKey(), action, oneaddrUserids)
: future.thenCombine(sendOneAddrAction(en.getKey(), action, oneaddrUserids), (a, b) -> a | b);
}
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
});

View File

@@ -8,6 +8,7 @@ package org.redkale.net.http;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.Collection;
import org.redkale.convert.json.JsonConvert;
/**
* userid 与 sncpaddress组合对象
@@ -24,4 +25,85 @@ public interface WebSocketUserAddress {
InetSocketAddress sncpAddress();
Collection<InetSocketAddress> sncpAddresses();
public static WebSocketUserAddress create(WebSocketUserAddress userAddress) {
return new SimpleWebSocketUserAddress(userAddress);
}
public static WebSocketUserAddress create(Serializable userid, InetSocketAddress sncpAddress) {
return new SimpleWebSocketUserAddress(userid, sncpAddress, null);
}
public static WebSocketUserAddress create(Serializable userid, Collection<InetSocketAddress> sncpAddresses) {
return new SimpleWebSocketUserAddress(userid, null, sncpAddresses);
}
public static class SimpleWebSocketUserAddress implements WebSocketUserAddress {
private Serializable userid;
private InetSocketAddress sncpAddress;
private Collection<InetSocketAddress> sncpAddresses;
public SimpleWebSocketUserAddress() {
}
public SimpleWebSocketUserAddress(Serializable userid, InetSocketAddress sncpAddress, Collection<InetSocketAddress> sncpAddresses) {
this.userid = userid;
this.sncpAddress = sncpAddress;
this.sncpAddresses = sncpAddresses;
}
public SimpleWebSocketUserAddress(WebSocketUserAddress userAddress) {
if (userAddress == null) return;
this.userid = userAddress.userid();
this.sncpAddress = userAddress.sncpAddress();
this.sncpAddresses = userAddress.sncpAddresses();
}
@Override
public Serializable userid() {
return userid;
}
@Override
public InetSocketAddress sncpAddress() {
return sncpAddress;
}
@Override
public Collection<InetSocketAddress> sncpAddresses() {
return sncpAddresses;
}
public Serializable getUserid() {
return userid;
}
public void setUserid(Serializable userid) {
this.userid = userid;
}
public InetSocketAddress getSncpAddress() {
return sncpAddress;
}
public void setSncpAddress(InetSocketAddress sncpAddress) {
this.sncpAddress = sncpAddress;
}
public Collection<InetSocketAddress> getSncpAddresses() {
return sncpAddresses;
}
public void setSncpAddresses(Collection<InetSocketAddress> sncpAddresses) {
this.sncpAddresses = sncpAddresses;
}
@Override
public String toString() {
return JsonConvert.root().convertTo(this);
}
}
}