diff --git a/src/org/redkale/net/http/WebSocketEngine.java b/src/org/redkale/net/http/WebSocketEngine.java index a43ce173a..142024819 100644 --- a/src/org/redkale/net/http/WebSocketEngine.java +++ b/src/org/redkale/net/http/WebSocketEngine.java @@ -11,6 +11,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.logging.*; +import org.redkale.convert.json.JsonConvert; import static org.redkale.net.http.WebSocket.RETCODE_GROUP_EMPTY; import org.redkale.util.*; @@ -38,6 +39,9 @@ public final class WebSocketEngine { //HttpContext protected final HttpContext context; + //JsonConvert + protected final JsonConvert convert; + //在线用户ID对应的WebSocket组,当WebSocketGroup内没有WebSocket会从containers删掉 private final Map containers = new ConcurrentHashMap<>(); @@ -53,6 +57,7 @@ public final class WebSocketEngine { protected WebSocketEngine(String engineid, HttpContext context, WebSocketNode node, Logger logger) { this.engineid = engineid; this.context = context; + this.convert = context.getJsonConvert(); this.node = node; this.logger = logger; this.index = sequence.getAndIncrement(); @@ -106,13 +111,29 @@ public final class WebSocketEngine { if (message instanceof CompletableFuture) { return ((CompletableFuture) message).thenCompose((json) -> sendMessage(recent, json, last, groupids)); } - CompletableFuture future = null; - for (Serializable groupid : groupids) { - WebSocketGroup group = getWebSocketGroup(groupid); - if (group == null) continue; - future = future == null ? group.send(recent, message, last) : future.thenCombine(group.send(recent, message, last), (a, b) -> a | b); + final boolean more = !(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null || groupids.length > 1; + if (more) { + final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message + : ((message == null || message instanceof CharSequence || message instanceof byte[]) + ? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.convert, message, last)); + packet.setSendBuffers(packet.encode(context.getBufferSupplier())); + CompletableFuture future = null; + for (Serializable groupid : groupids) { + WebSocketGroup group = getWebSocketGroup(groupid); + if (group == null) continue; + future = future == null ? group.send(recent, message, last) : future.thenCombine(group.send(recent, message, last), (a, b) -> a | b); + } + if (future != null) future = future.whenComplete((rs, ex) -> context.offerBuffer(packet.sendBuffers)); + return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; + } else { + CompletableFuture future = null; + for (Serializable groupid : groupids) { + WebSocketGroup group = getWebSocketGroup(groupid); + if (group == null) continue; + future = future == null ? group.send(recent, message, last) : future.thenCombine(group.send(recent, message, last), (a, b) -> a | b); + } + return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; } - return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future; } Collection getWebSocketGroups() { diff --git a/src/org/redkale/net/http/WebSocketGroup.java b/src/org/redkale/net/http/WebSocketGroup.java index 176e662c9..c58f4e88d 100644 --- a/src/org/redkale/net/http/WebSocketGroup.java +++ b/src/org/redkale/net/http/WebSocketGroup.java @@ -111,11 +111,7 @@ public final class WebSocketGroup { final boolean more = packet.sendBuffers == null && list.size() > 1; if (more) packet.setSendBuffers(packet.encode(context.getBufferSupplier())); for (WebSocket s : list) { - if (future == null) { - future = s.sendPacket(packet); - } else { - future = future.thenCombine(s.sendPacket(packet), (a, b) -> a | (Integer) b); - } + future = future == null ? s.sendPacket(packet) : future.thenCombine(s.sendPacket(packet), (a, b) -> a | (Integer) b); } if (more && future != null) future = future.whenComplete((rs, ex) -> context.offerBuffer(packet.sendBuffers)); return future == null ? CompletableFuture.completedFuture(0) : future;