diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index f4cb4c3e4..87b87fcc7 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -251,17 +251,43 @@ public abstract class WebSocketNode { */ @Local public CompletableFuture forceCloseWebSocket(final Serializable userid) { + return forceCloseWebSocket(userid, (WebSocketUserAddress) null); + } + + /** + * 强制关闭用户WebSocket + * + * @param userAddress WebSocketUserAddress + * + * @return int + */ + @Local + public CompletableFuture forceCloseWebSocket(final WebSocketUserAddress userAddress) { + return forceCloseWebSocket(null, userAddress); + } + + private CompletableFuture forceCloseWebSocket(final Serializable userid, final WebSocketUserAddress userAddress) { CompletableFuture localFuture = null; - if (this.localEngine != null) localFuture = CompletableFuture.completedFuture(localEngine.forceCloseLocalWebSocket(userid)); + if (this.localEngine != null) localFuture = CompletableFuture.completedFuture(localEngine.forceCloseLocalWebSocket(userAddress == null ? userid : userAddress.userid())); if (this.sncpNodeAddresses == null || this.remoteNode == null) { if (logger.isLoggable(Level.FINEST)) logger.finest("websocket remote node is null"); //没有CacheSource就不会有分布式节点 return localFuture; } //远程节点关闭 - tryAcquireSemaphore(); - CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class); - if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore()); + CompletableFuture> addrsFuture; + if (userAddress == null) { + tryAcquireSemaphore(); + addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class); + if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore()); + } else { + Collection addrs = userAddress.sncpAddresses(); + if (userAddress.sncpAddress() != null) { + if (addrs == null) addrs = new ArrayList<>(); + addrs.add(userAddress.sncpAddress()); + } + addrsFuture = CompletableFuture.completedFuture(addrs); + } CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { if (logger.isLoggable(Level.FINEST)) logger.finest("websocket found userid:" + userid + " on " + addrs); if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0); @@ -446,9 +472,9 @@ public abstract class WebSocketNode { } CompletableFuture future = null; for (Map.Entry> en : addrUsers.entrySet()) { - Serializable[] us = en.getValue().toArray(new Serializable[en.getValue().size()]); - future = future == null ? sendOneAddrMessage(en.getKey(), remoteMessage, last, us) - : future.thenCombine(sendOneAddrMessage(en.getKey(), remoteMessage, last, us), (a, b) -> a | b); + Serializable[] oneaddrUserids = en.getValue().toArray(new Serializable[en.getValue().size()]); + future = future == null ? sendOneAddrMessage(en.getKey(), remoteMessage, last, oneaddrUserids) + : future.thenCombine(sendOneAddrMessage(en.getKey(), remoteMessage, last, oneaddrUserids), (a, b) -> a | b); } return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; }); @@ -710,9 +736,9 @@ public abstract class WebSocketNode { } CompletableFuture future = null; for (Map.Entry> en : addrUsers.entrySet()) { - Serializable[] us = en.getValue().toArray(new Serializable[en.getValue().size()]); - future = future == null ? sendOneAddrAction(en.getKey(), action, us) - : future.thenCombine(sendOneAddrAction(en.getKey(), action, us), (a, b) -> a | b); + Serializable[] oneaddrUserids = en.getValue().toArray(new Serializable[en.getValue().size()]); + future = future == null ? sendOneAddrAction(en.getKey(), action, oneaddrUserids) + : future.thenCombine(sendOneAddrAction(en.getKey(), action, oneaddrUserids), (a, b) -> a | b); } return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; }); diff --git a/src/org/redkale/net/http/WebSocketUserAddress.java b/src/org/redkale/net/http/WebSocketUserAddress.java index c988efff5..85adc2896 100644 --- a/src/org/redkale/net/http/WebSocketUserAddress.java +++ b/src/org/redkale/net/http/WebSocketUserAddress.java @@ -8,6 +8,7 @@ package org.redkale.net.http; import java.io.Serializable; import java.net.InetSocketAddress; import java.util.Collection; +import org.redkale.convert.json.JsonConvert; /** * userid 与 sncpaddress组合对象 @@ -24,4 +25,85 @@ public interface WebSocketUserAddress { InetSocketAddress sncpAddress(); Collection sncpAddresses(); + + public static WebSocketUserAddress create(WebSocketUserAddress userAddress) { + return new SimpleWebSocketUserAddress(userAddress); + } + + public static WebSocketUserAddress create(Serializable userid, InetSocketAddress sncpAddress) { + return new SimpleWebSocketUserAddress(userid, sncpAddress, null); + } + + public static WebSocketUserAddress create(Serializable userid, Collection sncpAddresses) { + return new SimpleWebSocketUserAddress(userid, null, sncpAddresses); + } + + public static class SimpleWebSocketUserAddress implements WebSocketUserAddress { + + private Serializable userid; + + private InetSocketAddress sncpAddress; + + private Collection sncpAddresses; + + public SimpleWebSocketUserAddress() { + } + + public SimpleWebSocketUserAddress(Serializable userid, InetSocketAddress sncpAddress, Collection sncpAddresses) { + this.userid = userid; + this.sncpAddress = sncpAddress; + this.sncpAddresses = sncpAddresses; + } + + public SimpleWebSocketUserAddress(WebSocketUserAddress userAddress) { + if (userAddress == null) return; + this.userid = userAddress.userid(); + this.sncpAddress = userAddress.sncpAddress(); + this.sncpAddresses = userAddress.sncpAddresses(); + } + + @Override + public Serializable userid() { + return userid; + } + + @Override + public InetSocketAddress sncpAddress() { + return sncpAddress; + } + + @Override + public Collection sncpAddresses() { + return sncpAddresses; + } + + public Serializable getUserid() { + return userid; + } + + public void setUserid(Serializable userid) { + this.userid = userid; + } + + public InetSocketAddress getSncpAddress() { + return sncpAddress; + } + + public void setSncpAddress(InetSocketAddress sncpAddress) { + this.sncpAddress = sncpAddress; + } + + public Collection getSncpAddresses() { + return sncpAddresses; + } + + public void setSncpAddresses(Collection sncpAddresses) { + this.sncpAddresses = sncpAddresses; + } + + @Override + public String toString() { + return JsonConvert.root().convertTo(this); + } + } }