diff --git a/src/org/redkale/net/http/WebSocket.java b/src/org/redkale/net/http/WebSocket.java index 2786e07b1..3111172f5 100644 --- a/src/org/redkale/net/http/WebSocket.java +++ b/src/org/redkale/net/http/WebSocket.java @@ -369,6 +369,18 @@ public abstract class WebSocket { return broadcastMessage((Convert) null, message, true); } + /** + * 广播消息, 给所有人发消息 + * + * @param wsrange 过滤条件 + * @param message 消息内容 + * + * @return 为0表示成功, 其他值表示部分发送异常 + */ + public final CompletableFuture broadcastMessage(final WebSocketRange wsrange, final Object message) { + return broadcastMessage((WebSocketRange) null, (Convert) null, message, true); + } + /** * 广播消息, 给所有人发消息 * @@ -381,6 +393,19 @@ public abstract class WebSocket { return broadcastMessage(convert, message, true); } + /** + * 广播消息, 给所有人发消息 + * + * @param wsrange 过滤条件 + * @param convert Convert + * @param message 消息内容 + * + * @return 为0表示成功, 其他值表示部分发送异常 + */ + public final CompletableFuture broadcastMessage(final WebSocketRange wsrange, final Convert convert, final Object message) { + return broadcastMessage((WebSocketRange) null, convert, message, true); + } + /** * 广播消息, 给所有人发消息 * @@ -393,6 +418,19 @@ public abstract class WebSocket { return broadcastMessage((Convert) null, message, last); } + /** + * 广播消息, 给所有人发消息 + * + * @param wsrange 过滤条件 + * @param message 消息内容 + * @param last 是否最后一条 + * + * @return 为0表示成功, 其他值表示部分发送异常 + */ + public final CompletableFuture broadcastMessage(final WebSocketRange wsrange, final Object message, final boolean last) { + return broadcastMessage(wsrange, (Convert) null, message, last); + } + /** * 广播消息, 给所有人发消息 * @@ -403,11 +441,25 @@ public abstract class WebSocket { * @return 为0表示成功, 其他值表示部分发送异常 */ public final CompletableFuture broadcastMessage(final Convert convert, final Object message, final boolean last) { + return broadcastMessage((WebSocketRange) null, convert, message, last); + } + + /** + * 广播消息, 给所有人发消息 + * + * @param wsrange 过滤条件 + * @param convert Convert + * @param message 消息内容 + * @param last 是否最后一条 + * + * @return 为0表示成功, 其他值表示部分发送异常 + */ + public final CompletableFuture broadcastMessage(final WebSocketRange wsrange, final Convert convert, final Object message, final boolean last) { if (_engine.node == null) return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL); if (message instanceof CompletableFuture) { - return ((CompletableFuture) message).thenCompose((json) -> _engine.node.broadcastMessage(convert, json, last)); + return ((CompletableFuture) message).thenCompose((json) -> _engine.node.broadcastMessage(wsrange, convert, json, last)); } - CompletableFuture rs = _engine.node.broadcastMessage(convert, message, last); + CompletableFuture rs = _engine.node.broadcastMessage(wsrange, convert, message, last); if (_engine.logger.isLoggable(Level.FINEST)) _engine.logger.finest("broadcast send websocket message(" + message + ")"); return rs; } @@ -609,6 +661,17 @@ public abstract class WebSocket { */ protected abstract CompletableFuture createUserid(); + /** + * WebSocket.broadcastMessage时的过滤条件 + * + * @param wsrange 过滤条件 + * + * @return boolean + */ + protected boolean predicate(WebSocketRange wsrange) { + return true; + } + /** * WebSokcet连接成功后的回调方法 */ diff --git a/src/org/redkale/net/http/WebSocketEngine.java b/src/org/redkale/net/http/WebSocketEngine.java index 232fcf048..7812de336 100644 --- a/src/org/redkale/net/http/WebSocketEngine.java +++ b/src/org/redkale/net/http/WebSocketEngine.java @@ -193,7 +193,13 @@ public class WebSocketEngine { @Comment("给所有连接用户发送消息") public CompletableFuture broadcastMessage(final Object message, final boolean last) { - return broadcastMessage(null, message, last); + return broadcastMessage((Predicate) null, message, last); + } + + @Comment("给指定WebSocket连接用户发送消息") + public CompletableFuture broadcastMessage(final WebSocketRange wsrange, final Object message, final boolean last) { + Predicate predicate = wsrange == null ? null : (ws) -> ws.predicate(wsrange); + return broadcastMessage(predicate, message, last); } @Comment("给指定WebSocket连接用户发送消息") diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index e9dd690fd..5ceb7b1dd 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -80,7 +80,7 @@ public abstract class WebSocketNode { protected abstract CompletableFuture sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last, Serializable userid); - protected abstract CompletableFuture broadcastMessage(@RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last); + protected abstract CompletableFuture broadcastMessage(@RpcTargetAddress InetSocketAddress targetAddress, WebSocketRange wsrange, Object message, boolean last); protected abstract CompletableFuture connect(Serializable userid, InetSocketAddress addr); @@ -374,6 +374,18 @@ public abstract class WebSocketNode { return broadcastMessage((Convert) null, message, true); } + /** + * 广播消息, 给所有人发消息 + * + * @param wsrange 过滤条件 + * @param message 消息内容 + * + * @return 为0表示成功, 其他值表示部分发送异常 + */ + public final CompletableFuture broadcastMessage(final WebSocketRange wsrange, final Object message) { + return broadcastMessage(wsrange, (Convert) null, message, true); + } + /** * 广播消息, 给所有人发消息 * @@ -386,6 +398,19 @@ public abstract class WebSocketNode { return broadcastMessage(convert, message, true); } + /** + * 广播消息, 给所有人发消息 + * + * @param wsrange 过滤条件 + * @param convert Convert + * @param message 消息内容 + * + * @return 为0表示成功, 其他值表示部分发送异常 + */ + public final CompletableFuture broadcastMessage(final WebSocketRange wsrange, final Convert convert, final Object message) { + return broadcastMessage(wsrange, convert, message, true); + } + /** * 广播消息, 给所有人发消息 * @@ -398,6 +423,19 @@ public abstract class WebSocketNode { return broadcastMessage((Convert) null, message, last); } + /** + * 广播消息, 给所有人发消息 + * + * @param wsrange 过滤条件 + * @param message 消息内容 + * @param last 是否最后一条 + * + * @return 为0表示成功, 其他值表示部分发送异常 + */ + public final CompletableFuture broadcastMessage(final WebSocketRange wsrange, final Object message, final boolean last) { + return broadcastMessage(wsrange, (Convert) null, message, last); + } + /** * 广播消息, 给所有人发消息 * @@ -408,13 +446,27 @@ public abstract class WebSocketNode { * @return 为0表示成功, 其他值表示部分发送异常 */ public final CompletableFuture broadcastMessage(final Convert convert, final Object message0, final boolean last) { - if (message0 instanceof CompletableFuture) return ((CompletableFuture) message0).thenApply(msg -> broadcastMessage(convert, msg, last)); + return broadcastMessage((WebSocketRange) null, convert, message0, last); + } + + /** + * 广播消息, 给所有人发消息 + * + * @param wsrange 过滤条件 + * @param convert Convert + * @param message0 消息内容 + * @param last 是否最后一条 + * + * @return 为0表示成功, 其他值表示部分发送异常 + */ + public final CompletableFuture broadcastMessage(final WebSocketRange wsrange, final Convert convert, final Object message0, final boolean last) { + if (message0 instanceof CompletableFuture) return ((CompletableFuture) message0).thenApply(msg -> broadcastMessage(wsrange, convert, msg, last)); final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0 : ((convert instanceof TextConvert) ? new WebSocketPacket(((TextConvert) convert).convertTo(message0), last) : new WebSocketPacket(((BinaryConvert) convert).convertTo(message0), last)); if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式 - return this.localEngine.broadcastMessage(message, last); + return this.localEngine.broadcastMessage(wsrange, message, last); } final Object remoteMessage = formatRemoteMessage(message); - CompletableFuture localFuture = this.localEngine == null ? null : this.localEngine.broadcastMessage(message, last); + CompletableFuture localFuture = this.localEngine == null ? null : this.localEngine.broadcastMessage(wsrange, message, last); CompletableFuture> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_ADDRS_KEY); CompletableFuture remoteFuture = addrsFuture.thenCompose((Collection addrs) -> { if (logger.isLoggable(Level.FINEST)) logger.finest("websocket broadcast message on " + addrs); @@ -422,8 +474,8 @@ public abstract class WebSocketNode { CompletableFuture future = null; for (InetSocketAddress addr : addrs) { if (addr == null || addr.equals(localSncpAddress)) continue; - future = future == null ? remoteNode.broadcastMessage(addr, remoteMessage, last) - : future.thenCombine(remoteNode.broadcastMessage(addr, remoteMessage, last), (a, b) -> a | b); + future = future == null ? remoteNode.broadcastMessage(addr, wsrange, remoteMessage, last) + : future.thenCombine(remoteNode.broadcastMessage(addr, wsrange, remoteMessage, last), (a, b) -> a | b); } return future == null ? CompletableFuture.completedFuture(0) : future; }); diff --git a/src/org/redkale/net/http/WebSocketRange.java b/src/org/redkale/net/http/WebSocketRange.java new file mode 100644 index 000000000..013ea9a41 --- /dev/null +++ b/src/org/redkale/net/http/WebSocketRange.java @@ -0,0 +1,41 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.net.http; + +import java.io.Serializable; +import java.util.Map; + +/** + * WebSocket.broadcastMessage时的过滤条件 + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + */ +public class WebSocketRange implements Serializable { + + protected String wskey; + + protected Map attach; + + public String getWskey() { + return wskey; + } + + public void setWskey(String wskey) { + this.wskey = wskey; + } + + public Map getAttach() { + return attach; + } + + public void setAttach(Map attach) { + this.attach = attach; + } + +} diff --git a/src/org/redkale/service/WebSocketNodeService.java b/src/org/redkale/service/WebSocketNodeService.java index f3f8f88f0..532441ff3 100644 --- a/src/org/redkale/service/WebSocketNodeService.java +++ b/src/org/redkale/service/WebSocketNodeService.java @@ -53,9 +53,9 @@ public class WebSocketNodeService extends WebSocketNode implements Service { } @Override - public CompletableFuture broadcastMessage(@RpcTargetAddress InetSocketAddress addr, Object message, boolean last) { + public CompletableFuture broadcastMessage(@RpcTargetAddress InetSocketAddress addr, final WebSocketRange wsrange, Object message, boolean last) { if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); - return this.localEngine.broadcastMessage(message, last); + return this.localEngine.broadcastMessage(wsrange, message, last); } /**