diff --git a/src/org/redkale/net/http/WebSocketEngine.java b/src/org/redkale/net/http/WebSocketEngine.java index 07e8787b3..1c107f7ec 100644 --- a/src/org/redkale/net/http/WebSocketEngine.java +++ b/src/org/redkale/net/http/WebSocketEngine.java @@ -10,6 +10,7 @@ import java.io.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import java.util.function.Predicate; import java.util.logging.*; import java.util.stream.*; import org.redkale.convert.Convert; @@ -128,6 +129,10 @@ public final class WebSocketEngine { } public CompletableFuture broadcastMessage(final Object message, final boolean last) { + return broadcastMessage(null, message, last); + } + + public CompletableFuture broadcastMessage(final Predicate predicate, final Object message, final boolean last) { if (message instanceof CompletableFuture) { return ((CompletableFuture) message).thenCompose((json) -> broadcastMessage(json, last)); } @@ -140,11 +145,13 @@ public final class WebSocketEngine { CompletableFuture future = null; if (single) { for (WebSocket websocket : websockets.values()) { + if (predicate != null && !predicate.test(websocket)) continue; future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b); } } else { for (List list : websockets2.values()) { for (WebSocket websocket : list) { + if (predicate != null && !predicate.test(websocket)) continue; future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b); } }