diff --git a/src/org/redkale/net/http/WebSocketRunner.java b/src/org/redkale/net/http/WebSocketRunner.java index 2b6718570..22e18e652 100644 --- a/src/org/redkale/net/http/WebSocketRunner.java +++ b/src/org/redkale/net/http/WebSocketRunner.java @@ -34,6 +34,10 @@ class WebSocketRunner implements Runnable { protected final boolean mergemsg; + protected final Semaphore writeSemaphore = new Semaphore(1); + + protected final LinkedBlockingQueue writeQueue = new LinkedBlockingQueue(); + volatile boolean closed = false; FrameType currSeriesMergeFrameType; @@ -228,9 +232,7 @@ class WebSocketRunner implements Runnable { try { ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encode(webSocket._channel.getBufferSupplier(), webSocket._channel.getBufferConsumer(), webSocket._engine.cryptor); //if (debug) context.getLogger().log(Level.FINEST, "wsrunner.sending websocket message: " + packet); - - this.lastSendTime = System.currentTimeMillis(); - webSocket._channel.write(buffers, buffers, new CompletionHandler() { + CompletionHandler handler = new CompletionHandler() { private CompletableFuture future = futureResult; @@ -285,7 +287,14 @@ class WebSocketRunner implements Runnable { } } - }); + }; + this.lastSendTime = System.currentTimeMillis(); + if (writeSemaphore.tryAcquire()) { + webSocket._channel.write(buffers, buffers, handler); + } else { + writeQueue.add(new WriteEntry(buffers, handler)); + } + } catch (Exception t) { futureResult.complete(RETCODE_SENDEXCEPTION); closeRunner(RETCODE_SENDEXCEPTION, "websocket send message failed on channel.write"); @@ -294,7 +303,14 @@ class WebSocketRunner implements Runnable { } } - return futureResult; + return futureResult.whenComplete((r, t) -> { + WriteEntry entry = writeQueue.poll(); + if (entry != null) { + webSocket._channel.write(entry.writeBuffers, entry.writeBuffers, entry.writeHandler); + } else { + writeSemaphore.release(); + } + }); } public boolean isClosed() { @@ -314,4 +330,16 @@ class WebSocketRunner implements Runnable { } } + private static class WriteEntry { + + ByteBuffer[] writeBuffers; + + CompletionHandler writeHandler; + + public WriteEntry(ByteBuffer[] writeBuffers, CompletionHandler writeHandler) { + this.writeBuffers = writeBuffers; + this.writeHandler = writeHandler; + } + + } }