diff --git a/src/org/redkale/net/http/WebSocketRunner.java b/src/org/redkale/net/http/WebSocketRunner.java index 382850277..83b4a3b43 100644 --- a/src/org/redkale/net/http/WebSocketRunner.java +++ b/src/org/redkale/net/http/WebSocketRunner.java @@ -40,7 +40,7 @@ class WebSocketRunner implements Runnable { volatile boolean closed = false; - private AtomicBoolean writing = new AtomicBoolean(); + private final AtomicBoolean writing = new AtomicBoolean(); private final BlockingQueue queue = new ArrayBlockingQueue(1024); @@ -213,20 +213,17 @@ class WebSocketRunner implements Runnable { public CompletableFuture sendMessage(WebSocketPacket packet) { if (packet == null) return CompletableFuture.completedFuture(RETCODE_SEND_ILLPACKET); if (closed) return CompletableFuture.completedFuture(RETCODE_WSOCKET_CLOSED); - boolean debug = true; - //System.out.println("推送消息"); - //if (debug) context.getLogger().log(Level.FINEST, "send web socket message: " + packet); + boolean debug = context.getLogger().isLoggable(Level.FINEST); + //System.out.println("推送消息"); final CompletableFuture futureResult = new CompletableFuture<>(); - if (writing.getAndSet(true)) { - QueueEntry qe = new QueueEntry(futureResult, packet); - queue.add(qe); - if (!writing.get()) { //防止刚好CompletionHandler进程在poll之后获取null正准备writing.set(false)时进入queue.add(qe)。导致本消息不能发送 - queue.remove(qe); - return sendMessage(packet); + synchronized (writing) { + if (writing.getAndSet(true)) { + queue.add(new QueueEntry(futureResult, packet)); + return futureResult; } - return futureResult; } ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encode(this.context.getBufferSupplier()); + if (debug) context.getLogger().log(Level.FINEST, "sending websocket message: " + packet); try { this.lastSendTime = System.currentTimeMillis(); channel.write(buffers, buffers, new CompletionHandler() { @@ -268,24 +265,17 @@ class WebSocketRunner implements Runnable { } } } - QueueEntry entry = queue.poll(); + QueueEntry entry = null; + synchronized (writing) { + entry = queue.poll(); + if (entry == null) writing.set(false); + } if (entry != null) { future = entry.future; ByteBuffer[] buffers = entry.packet.sendBuffers != null ? entry.packet.duplicateSendBuffers() : entry.packet.encode(context.getBufferSupplier()); lastSendTime = System.currentTimeMillis(); + if (debug) context.getLogger().log(Level.FINEST, "sending websocket message: " + entry.packet); channel.write(buffers, buffers, this); - } else { - writing.set(false); - if (writing.get()) { //刚好进入writing.getAndSet(true)后 - entry = queue.poll(); - if (entry != null) { - writing.set(true); - future = entry.future; - ByteBuffer[] buffers = entry.packet.sendBuffers != null ? entry.packet.duplicateSendBuffers() : entry.packet.encode(context.getBufferSupplier()); - lastSendTime = System.currentTimeMillis(); - channel.write(buffers, buffers, this); - } - } } } catch (Exception e) { context.getLogger().log(Level.WARNING, "WebSocket sendMessage abort on rewrite, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", e);