This commit is contained in:
Redkale
2017-05-26 10:18:55 +08:00
parent 4d0a16d35d
commit ceeb924d4d
2 changed files with 28 additions and 11 deletions

View File

@@ -11,6 +11,7 @@ import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.*; import java.util.concurrent.atomic.*;
import java.util.logging.*; import java.util.logging.*;
import org.redkale.convert.json.JsonConvert;
import static org.redkale.net.http.WebSocket.RETCODE_GROUP_EMPTY; import static org.redkale.net.http.WebSocket.RETCODE_GROUP_EMPTY;
import org.redkale.util.*; import org.redkale.util.*;
@@ -38,6 +39,9 @@ public final class WebSocketEngine {
//HttpContext //HttpContext
protected final HttpContext context; protected final HttpContext context;
//JsonConvert
protected final JsonConvert convert;
//在线用户ID对应的WebSocket组当WebSocketGroup内没有WebSocket会从containers删掉 //在线用户ID对应的WebSocket组当WebSocketGroup内没有WebSocket会从containers删掉
private final Map<Serializable, WebSocketGroup> containers = new ConcurrentHashMap<>(); private final Map<Serializable, WebSocketGroup> containers = new ConcurrentHashMap<>();
@@ -53,6 +57,7 @@ public final class WebSocketEngine {
protected WebSocketEngine(String engineid, HttpContext context, WebSocketNode node, Logger logger) { protected WebSocketEngine(String engineid, HttpContext context, WebSocketNode node, Logger logger) {
this.engineid = engineid; this.engineid = engineid;
this.context = context; this.context = context;
this.convert = context.getJsonConvert();
this.node = node; this.node = node;
this.logger = logger; this.logger = logger;
this.index = sequence.getAndIncrement(); this.index = sequence.getAndIncrement();
@@ -106,13 +111,29 @@ public final class WebSocketEngine {
if (message instanceof CompletableFuture) { if (message instanceof CompletableFuture) {
return ((CompletableFuture) message).thenCompose((json) -> sendMessage(recent, json, last, groupids)); return ((CompletableFuture) message).thenCompose((json) -> sendMessage(recent, json, last, groupids));
} }
CompletableFuture<Integer> future = null; final boolean more = !(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null || groupids.length > 1;
for (Serializable groupid : groupids) { if (more) {
WebSocketGroup group = getWebSocketGroup(groupid); final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message
if (group == null) continue; : ((message == null || message instanceof CharSequence || message instanceof byte[])
future = future == null ? group.send(recent, message, last) : future.thenCombine(group.send(recent, message, last), (a, b) -> a | b); ? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.convert, message, last));
packet.setSendBuffers(packet.encode(context.getBufferSupplier()));
CompletableFuture<Integer> future = null;
for (Serializable groupid : groupids) {
WebSocketGroup group = getWebSocketGroup(groupid);
if (group == null) continue;
future = future == null ? group.send(recent, message, last) : future.thenCombine(group.send(recent, message, last), (a, b) -> a | b);
}
if (future != null) future = future.whenComplete((rs, ex) -> context.offerBuffer(packet.sendBuffers));
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
} else {
CompletableFuture<Integer> future = null;
for (Serializable groupid : groupids) {
WebSocketGroup group = getWebSocketGroup(groupid);
if (group == null) continue;
future = future == null ? group.send(recent, message, last) : future.thenCombine(group.send(recent, message, last), (a, b) -> a | b);
}
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
} }
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
} }
Collection<WebSocketGroup> getWebSocketGroups() { Collection<WebSocketGroup> getWebSocketGroups() {

View File

@@ -111,11 +111,7 @@ public final class WebSocketGroup {
final boolean more = packet.sendBuffers == null && list.size() > 1; final boolean more = packet.sendBuffers == null && list.size() > 1;
if (more) packet.setSendBuffers(packet.encode(context.getBufferSupplier())); if (more) packet.setSendBuffers(packet.encode(context.getBufferSupplier()));
for (WebSocket s : list) { for (WebSocket s : list) {
if (future == null) { future = future == null ? s.sendPacket(packet) : future.thenCombine(s.sendPacket(packet), (a, b) -> a | (Integer) b);
future = s.sendPacket(packet);
} else {
future = future.thenCombine(s.sendPacket(packet), (a, b) -> a | (Integer) b);
}
} }
if (more && future != null) future = future.whenComplete((rs, ex) -> context.offerBuffer(packet.sendBuffers)); if (more && future != null) future = future.whenComplete((rs, ex) -> context.offerBuffer(packet.sendBuffers));
return future == null ? CompletableFuture.completedFuture(0) : future; return future == null ? CompletableFuture.completedFuture(0) : future;