diff --git a/src/org/redkale/net/Context.java b/src/org/redkale/net/Context.java index 6cafbffc0..a2bd447bc 100644 --- a/src/org/redkale/net/Context.java +++ b/src/org/redkale/net/Context.java @@ -127,6 +127,13 @@ public class Context { bufferPool.offer(buffer); } + public void offerBuffer(ByteBuffer... buffers) { + if (buffers == null) return; + for (ByteBuffer buffer : buffers) { + bufferPool.offer(buffer); + } + } + public Logger getLogger() { return logger; } diff --git a/src/org/redkale/net/http/WebSocketEngine.java b/src/org/redkale/net/http/WebSocketEngine.java index 83a13b482..dc6eed82a 100644 --- a/src/org/redkale/net/http/WebSocketEngine.java +++ b/src/org/redkale/net/http/WebSocketEngine.java @@ -35,6 +35,9 @@ public final class WebSocketEngine { //当前WebSocket对应的Node protected final WebSocketNode node; + //HttpContext + protected final HttpContext context; + //在线用户ID对应的WebSocket组,当WebSocketGroup内没有WebSocket会从containers删掉 private final Map containers = new ConcurrentHashMap<>(); @@ -47,8 +50,9 @@ public final class WebSocketEngine { //FINEST日志级别 protected final boolean finest; - protected WebSocketEngine(String engineid, WebSocketNode node, Logger logger) { + protected WebSocketEngine(String engineid, HttpContext context, WebSocketNode node, Logger logger) { this.engineid = engineid; + this.context = context; this.node = node; this.logger = logger; this.index = sequence.getAndIncrement(); @@ -78,7 +82,7 @@ public final class WebSocketEngine { void add(WebSocket socket) { //非线程安全, 在常规场景中无需锁 WebSocketGroup group = containers.get(socket._groupid); if (group == null) { - group = new WebSocketGroup(socket._groupid); + group = new WebSocketGroup(context, socket._groupid); containers.putIfAbsent(socket._groupid, group); if (node != null) node.connect(socket._groupid); } diff --git a/src/org/redkale/net/http/WebSocketGroup.java b/src/org/redkale/net/http/WebSocketGroup.java index 07fa622f7..176e662c9 100644 --- a/src/org/redkale/net/http/WebSocketGroup.java +++ b/src/org/redkale/net/http/WebSocketGroup.java @@ -21,13 +21,16 @@ public final class WebSocketGroup { private final Serializable groupid; + private final HttpContext context; + private WebSocket recentWebSocket; private final List list = new CopyOnWriteArrayList<>(); private final Map attributes = new HashMap<>(); - WebSocketGroup(Serializable groupid) { + WebSocketGroup(HttpContext context, Serializable groupid) { + this.context = context; this.groupid = groupid; } @@ -91,12 +94,22 @@ public final class WebSocketGroup { } } + final CompletableFuture send(boolean recent, final WebSocketPacket packet) { + if (recent) { + return recentWebSocket.send(packet); + } else { + return sendEach(packet); + } + } + public final CompletableFuture sendEach(Object message) { return sendEach(message, true); } - public final CompletableFuture sendEach(WebSocketPacket packet) { + public final CompletableFuture sendEach(final WebSocketPacket packet) { CompletableFuture future = null; + final boolean more = packet.sendBuffers == null && list.size() > 1; + if (more) packet.setSendBuffers(packet.encode(context.getBufferSupplier())); for (WebSocket s : list) { if (future == null) { future = s.sendPacket(packet); @@ -104,6 +117,7 @@ public final class WebSocketGroup { future = future.thenCombine(s.sendPacket(packet), (a, b) -> a | (Integer) b); } } + if (more && future != null) future = future.whenComplete((rs, ex) -> context.offerBuffer(packet.sendBuffers)); return future == null ? CompletableFuture.completedFuture(0) : future; } diff --git a/src/org/redkale/net/http/WebSocketServlet.java b/src/org/redkale/net/http/WebSocketServlet.java index cc63cd748..ef529636e 100644 --- a/src/org/redkale/net/http/WebSocketServlet.java +++ b/src/org/redkale/net/http/WebSocketServlet.java @@ -96,7 +96,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl if (logger.isLoggable(Level.WARNING)) logger.warning("Not found WebSocketNode, create a default value for " + getClass().getName()); } //存在WebSocketServlet,则此WebSocketNode必须是本地模式Service - this.node.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", this.node, logger); + this.node.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", context, this.node, logger); this.node.init(conf); this.node.localEngine.init(conf); }