From f073c9a0bc5e41f4fe07678e878a4f7f99de8a7a Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Thu, 16 Apr 2020 21:08:00 +0800 Subject: [PATCH] --- src/org/redkale/net/http/WebSocketEngine.java | 10 ++++---- src/org/redkale/net/http/WebSocketPacket.java | 23 +++++++------------ src/org/redkale/net/http/WebSocketRunner.java | 3 ++- 3 files changed, 15 insertions(+), 21 deletions(-) diff --git a/src/org/redkale/net/http/WebSocketEngine.java b/src/org/redkale/net/http/WebSocketEngine.java index a3f9292eb..75a74d99f 100644 --- a/src/org/redkale/net/http/WebSocketEngine.java +++ b/src/org/redkale/net/http/WebSocketEngine.java @@ -244,7 +244,7 @@ public class WebSocketEngine { if (bufferSupplier == null) { bufferSupplier = websocket.getBufferSupplier(); bufferConsumer = websocket.getBufferConsumer(); - packet.setSendBuffers(packet.encodePacket(bufferSupplier, bufferConsumer, cryptor)); + packet.encodePacket(bufferSupplier, bufferConsumer, cryptor); } future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b); } @@ -255,7 +255,7 @@ public class WebSocketEngine { if (bufferSupplier == null) { bufferSupplier = websocket.getBufferSupplier(); bufferConsumer = websocket.getBufferConsumer(); - packet.setSendBuffers(packet.encodePacket(bufferSupplier, bufferConsumer, cryptor)); + packet.encodePacket(bufferSupplier, bufferConsumer, cryptor); } future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b); } @@ -312,7 +312,7 @@ public class WebSocketEngine { final WebSocketPacket packet = (message instanceof WebSocketPacket) ? (WebSocketPacket) message : ((message == null || message instanceof CharSequence || message instanceof byte[]) ? new WebSocketPacket((Serializable) message, last) : new WebSocketPacket(this.sendConvert, false, message, last)); - //packet.setSendBuffers(packet.encode(context.getBufferSupplier(), context.getBufferConsumer(), cryptor)); + //packet.encode(context.getBufferSupplier(), context.getBufferConsumer(), cryptor); CompletableFuture future = null; if (single) { for (Serializable userid : userids) { @@ -321,7 +321,7 @@ public class WebSocketEngine { if (bufferSupplier == null) { bufferSupplier = websocket.getBufferSupplier(); bufferConsumer = websocket.getBufferConsumer(); - packet.setSendBuffers(packet.encodePacket(bufferSupplier, bufferConsumer, cryptor)); + packet.encodePacket(bufferSupplier, bufferConsumer, cryptor); } future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b); } @@ -333,7 +333,7 @@ public class WebSocketEngine { if (bufferSupplier == null) { bufferSupplier = websocket.getBufferSupplier(); bufferConsumer = websocket.getBufferConsumer(); - packet.setSendBuffers(packet.encodePacket(bufferSupplier, bufferConsumer, cryptor)); + packet.encodePacket(bufferSupplier, bufferConsumer, cryptor); } future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b); } diff --git a/src/org/redkale/net/http/WebSocketPacket.java b/src/org/redkale/net/http/WebSocketPacket.java index 35a1317f6..12f9426df 100644 --- a/src/org/redkale/net/http/WebSocketPacket.java +++ b/src/org/redkale/net/http/WebSocketPacket.java @@ -153,16 +153,6 @@ public final class WebSocketPacket { if (mapconvable && !(json instanceof Object[])) throw new IllegalArgumentException(); } - WebSocketPacket(ByteBuffer[] sendBuffers, FrameType type, boolean fin) { - this.type = type; - this.last = fin; - this.setSendBuffers(sendBuffers); - } - - void setSendBuffers(ByteBuffer[] sendBuffers) { - this.sendBuffers = sendBuffers; - } - ByteBuffer[] duplicateSendBuffers() { ByteBuffer[] rs = new ByteBuffer[this.sendBuffers.length]; for (int i = 0; i < this.sendBuffers.length; i++) { @@ -228,7 +218,7 @@ public final class WebSocketPacket { * * @return ByteBuffer[] */ - ByteBuffer[] encodePacket(final Supplier supplier, final Consumer consumer, final Cryptor cryptor) { + void encodePacket(final Supplier supplier, final Consumer consumer, final Cryptor cryptor) { final byte opcode = (byte) (this.type.getValue() | 0x80); if (this.sendConvert != null) { Supplier newsupplier = new Supplier() { @@ -268,7 +258,8 @@ public final class WebSocketPacket { firstbuf.put(1, (byte) 0x7F); //127 firstbuf.putInt(2, contentLength); } - return buffers; + this.sendBuffers = buffers; + return; } ByteBuffer buffer = supplier.get(); //确保ByteBuffer的capacity不能小于128 @@ -299,7 +290,8 @@ public final class WebSocketPacket { buffer.put((byte) len); buffer.put(content); buffer.flip(); - return new ByteBuffer[]{buffer}; + this.sendBuffers = new ByteBuffer[]{buffer}; + return; } if (len <= 0xFFFF) { // 65535 buffer.put(opcode); @@ -315,7 +307,8 @@ public final class WebSocketPacket { if (pend <= 0) { buffer.put(content); buffer.flip(); - return new ByteBuffer[]{buffer}; + this.sendBuffers = new ByteBuffer[]{buffer}; + return; } buffer.put(content, 0, buffer.remaining()); buffer.flip(); @@ -330,7 +323,7 @@ public final class WebSocketPacket { start += capacity; pend -= capacity; } - return buffers; + this.sendBuffers = buffers; } // public static void main(String[] args) throws Throwable { diff --git a/src/org/redkale/net/http/WebSocketRunner.java b/src/org/redkale/net/http/WebSocketRunner.java index 350d43453..f47a84ba9 100644 --- a/src/org/redkale/net/http/WebSocketRunner.java +++ b/src/org/redkale/net/http/WebSocketRunner.java @@ -230,7 +230,8 @@ class WebSocketRunner implements Runnable { //System.out.println("推送消息"); final CompletableFuture futureResult = new CompletableFuture<>(); try { - ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encodePacket(webSocket._channel.getBufferSupplier(), webSocket._channel.getBufferConsumer(), webSocket._engine.cryptor); + if (packet.sendBuffers == null) packet.encodePacket(webSocket._channel.getBufferSupplier(), webSocket._channel.getBufferConsumer(), webSocket._engine.cryptor); + ByteBuffer[] buffers = packet.duplicateSendBuffers(); //if (debug) context.getLogger().log(Level.FINEST, "wsrunner.sending websocket message: " + packet); CompletionHandler handler = new CompletionHandler() {