From c80bb876a355f1aecab0e297e2dab173442a5546 Mon Sep 17 00:00:00 2001 From: redkale Date: Tue, 4 Jul 2023 22:30:06 +0800 Subject: [PATCH] =?UTF-8?q?AsyncNioConnection=E4=BC=98=E5=8C=96fastWrite?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/redkale/net/AsyncNioConnection.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index 05c5630da..f72917a6d 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -11,6 +11,7 @@ import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import javax.net.ssl.SSLContext; import org.redkale.util.*; @@ -28,6 +29,10 @@ abstract class AsyncNioConnection extends AsyncConnection { protected SocketAddress remoteAddress; + protected final AtomicLong fastWriteCount = new AtomicLong(); + + protected final Queue fastWriteQueue = new ConcurrentLinkedQueue<>(); + //-------------------------------- 连操作 -------------------------------------- protected Object connectAttachment; @@ -80,10 +85,6 @@ abstract class AsyncNioConnection extends AsyncConnection { protected SelectionKey writeKey; - //-------------------------- 用于客户端的Socket -------------------------- - //用于客户端的Socket - protected final Queue clientModeWriteQueue = new ConcurrentLinkedQueue<>(); - public AsyncNioConnection(boolean clientMode, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread, final int bufferCapacity, SSLBuilder sslBuilder, SSLContext sslContext) { super(clientMode, ioGroup, ioReadThread, ioWriteThread, bufferCapacity, sslBuilder, sslContext); @@ -297,7 +298,8 @@ abstract class AsyncNioConnection extends AsyncConnection { Objects.requireNonNull(data); Objects.requireNonNull(handler); this.writePending = true; - this.clientModeWriteQueue.offer(data); + this.fastWriteQueue.offer(data); + this.fastWriteCount.incrementAndGet(); this.writeCompletionHandler = (CompletionHandler) handler; this.writeAttachment = attachment; try { @@ -370,13 +372,13 @@ abstract class AsyncNioConnection extends AsyncConnection { boolean hasRemain = true; boolean writeCompleted = true; - if (clientMode && writeByteTuple1Array == null && !clientModeWriteQueue.isEmpty()) { + if (writeByteTuple1Array == null && fastWriteCount.get() > 0) { byte[] bs = null; byte[] item; - while ((item = clientModeWriteQueue.poll()) != null) { + while ((item = fastWriteQueue.poll()) != null) { + fastWriteCount.decrementAndGet(); bs = Utility.append(bs, item); } - this.writePending = true; this.writeByteTuple1Array = bs; this.writeByteTuple1Offset = 0; this.writeByteTuple1Length = bs == null ? 0 : bs.length;