WebSocketEngine.broadcastMessage 增加 Predicate<WebSocket> 参数

This commit is contained in:
Redkale
2017-07-01 15:38:47 +08:00
parent 941d09cde2
commit 76df1108d7

View File

@@ -10,6 +10,7 @@ import java.io.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.function.Predicate;
import java.util.logging.*;
import java.util.stream.*;
import org.redkale.convert.Convert;
@@ -128,6 +129,10 @@ public final class WebSocketEngine {
}
public CompletableFuture<Integer> broadcastMessage(final Object message, final boolean last) {
return broadcastMessage(null, message, last);
}
public CompletableFuture<Integer> broadcastMessage(final Predicate<WebSocket> predicate, final Object message, final boolean last) {
if (message instanceof CompletableFuture) {
return ((CompletableFuture) message).thenCompose((json) -> broadcastMessage(json, last));
}
@@ -140,11 +145,13 @@ public final class WebSocketEngine {
CompletableFuture<Integer> future = null;
if (single) {
for (WebSocket websocket : websockets.values()) {
if (predicate != null && !predicate.test(websocket)) continue;
future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b);
}
} else {
for (List<WebSocket> list : websockets2.values()) {
for (WebSocket websocket : list) {
if (predicate != null && !predicate.test(websocket)) continue;
future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b);
}
}