diff --git a/src/org/redkale/net/http/WebSocketRunner.java b/src/org/redkale/net/http/WebSocketRunner.java index a87a9a3bc..7ff87c3f5 100644 --- a/src/org/redkale/net/http/WebSocketRunner.java +++ b/src/org/redkale/net/http/WebSocketRunner.java @@ -39,16 +39,16 @@ public class WebSocketRunner implements Runnable { private ByteBuffer writeBuffer; - protected boolean closed = false; + protected volatile boolean closed = false; private AtomicBoolean writing = new AtomicBoolean(); private final Coder coder = new Coder(); - private final BlockingQueue queue = new ArrayBlockingQueue(1024); + private final BlockingQueue queue = new ArrayBlockingQueue(1024); private final boolean wsbinary; - + protected long lastSendTime; public WebSocketRunner(Context context, WebSocket webSocket, AsyncConnection channel, final boolean wsbinary) { @@ -168,23 +168,27 @@ public class WebSocketRunner implements Runnable { final byte[] bytes = coder.encode(packet); if (debug) context.getLogger().log(Level.FINEST, "send web socket message's length = " + bytes.length); this.lastSendTime = System.currentTimeMillis(); + final CompletableFuture futureResult = new CompletableFuture<>(); if (writing.getAndSet(true)) { - queue.add(bytes); - return CompletableFuture.completedFuture(0); + queue.add(new QueueEntry(futureResult, bytes)); + return futureResult; } - if (writeBuffer == null) return CompletableFuture.completedFuture(RETCODE_ILLEGALBUFFER); + ByteBuffer localWriteBuffer = writeBuffer; + if (localWriteBuffer == null) return CompletableFuture.completedFuture(RETCODE_ILLEGALBUFFER); ByteBuffer sendBuffer; - if (bytes.length <= writeBuffer.capacity()) { - writeBuffer.clear(); - writeBuffer.put(bytes); - writeBuffer.flip(); - sendBuffer = writeBuffer; + if (bytes.length <= localWriteBuffer.capacity()) { + localWriteBuffer.clear(); + localWriteBuffer.put(bytes); + localWriteBuffer.flip(); + sendBuffer = localWriteBuffer; } else { sendBuffer = ByteBuffer.wrap(bytes); } try { channel.write(sendBuffer, sendBuffer, new CompletionHandler() { + private CompletableFuture future = futureResult; + @Override public void completed(Integer result, ByteBuffer attachment) { if (attachment == null || closed) return; @@ -194,21 +198,26 @@ public class WebSocketRunner implements Runnable { channel.write(attachment, attachment, this); return; } - byte[] bs = queue.poll(); - if (bs != null && writeBuffer != null) { - ByteBuffer sendBuffer; - if (bs.length <= writeBuffer.capacity()) { - writeBuffer.clear(); - writeBuffer.put(bs); - writeBuffer.flip(); - sendBuffer = writeBuffer; - } else { - sendBuffer = ByteBuffer.wrap(bs); - } - channel.write(sendBuffer, sendBuffer, this); - return; + if (future != null) { + future.complete(0); + future = null; } - } catch (NullPointerException e) { + QueueEntry entry = queue.poll(); + ByteBuffer localWriteBuffer = writeBuffer; + if (entry == null || localWriteBuffer == null) return; //没有数据了 + future = entry.future; + byte[] bs = entry.bytes; + ByteBuffer sendBuffer; + if (bs.length <= localWriteBuffer.capacity()) { + localWriteBuffer.clear(); + localWriteBuffer.put(bs); + localWriteBuffer.flip(); + sendBuffer = localWriteBuffer; + } else { + sendBuffer = ByteBuffer.wrap(bs); + } + channel.write(sendBuffer, sendBuffer, this); + } catch (Exception e) { closeRunner(); context.getLogger().log(Level.WARNING, "WebSocket sendMessage abort on rewrite, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", e); @@ -225,13 +234,13 @@ public class WebSocketRunner implements Runnable { } } }); - return CompletableFuture.completedFuture(0); } catch (Exception t) { writing.set(false); closeRunner(); context.getLogger().log(Level.FINE, "WebSocket sendMessage abort, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", t); - return CompletableFuture.completedFuture(RETCODE_SENDEXCEPTION); + futureResult.complete(RETCODE_SENDEXCEPTION); } + return futureResult; } public void closeRunner() { @@ -252,6 +261,19 @@ public class WebSocketRunner implements Runnable { } } + private static final class QueueEntry { + + public final CompletableFuture future; + + public final byte[] bytes; + + public QueueEntry(CompletableFuture future, byte[] bytes) { + this.future = future; + this.bytes = bytes; + } + + } + private static final class Masker { public static final int MASK_SIZE = 4;