From c64763cb35fa028dadc36e27c44ff261b8b933a7 Mon Sep 17 00:00:00 2001 From: redkale Date: Wed, 5 Jul 2023 06:38:17 +0800 Subject: [PATCH] =?UTF-8?q?AsyncConnection=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/redkale/net/AsyncNioConnection.java | 115 ++++++++++-------- 1 file changed, 66 insertions(+), 49 deletions(-) diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index f72917a6d..d56eff441 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -14,7 +14,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import javax.net.ssl.SSLContext; -import org.redkale.util.*; +import org.redkale.util.ByteBufferWriter; /** * @@ -68,14 +68,14 @@ abstract class AsyncNioConnection extends AsyncConnection { protected int writeByteTuple2Length; - //写操作, 二选一,要么writeByteBuffer有值,要么writeByteBuffers、writeOffset、writeLength有值 + //写操作, 二选一,要么writeByteBuffer有值,要么writeByteBuffers、writeBuffersOffset、writeBuffersLength有值 protected ByteBuffer writeByteBuffer; protected ByteBuffer[] writeByteBuffers; - protected int writeOffset; + protected int writeBuffersOffset; - protected int writeLength; + protected int writeBuffersLength; protected int writeTotal; @@ -154,9 +154,10 @@ abstract class AsyncNioConnection extends AsyncConnection { ioReadThread.register(selector -> { try { if (readKey == null) { - SelectionKey oldKey = keyFor(selector); - int ops = oldKey == null ? SelectionKey.OP_READ : (SelectionKey.OP_READ | oldKey.interestOps()); - readKey = implRegister(selector, ops); + readKey = keyFor(selector); + } + if (readKey == null) { + readKey = implRegister(selector, SelectionKey.OP_READ); readKey.attach(this); } else { readKey.interestOps(readKey.interestOps() | SelectionKey.OP_READ); @@ -275,8 +276,8 @@ abstract class AsyncNioConnection extends AsyncConnection { } this.writePending = true; this.writeByteBuffers = srcs; - this.writeOffset = offset; - this.writeLength = length; + this.writeBuffersOffset = offset; + this.writeBuffersLength = length; this.writeAttachment = attachment; if (this.writeTimeoutSeconds > 0) { AsyncNioCompletionHandler newHandler = this.writeTimeoutCompletionHandler; @@ -307,9 +308,10 @@ abstract class AsyncNioConnection extends AsyncConnection { ioWriteThread.register(selector -> { try { if (writeKey == null) { - SelectionKey oldKey = keyFor(selector); - int ops = oldKey == null ? SelectionKey.OP_WRITE : (SelectionKey.OP_WRITE | oldKey.interestOps()); - writeKey = implRegister(selector, ops); + writeKey = keyFor(selector); + } + if (writeKey == null) { + writeKey = implRegister(selector, SelectionKey.OP_WRITE); writeKey.attach(this); } else { writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE); @@ -346,9 +348,10 @@ abstract class AsyncNioConnection extends AsyncConnection { ioReadThread.register(selector -> { try { if (readKey == null) { - SelectionKey oldKey = keyFor(selector); - int ops = oldKey == null ? SelectionKey.OP_READ : (SelectionKey.OP_READ | oldKey.interestOps()); - readKey = implRegister(selector, ops); + readKey = keyFor(selector); + } + if (readKey == null) { + readKey = implRegister(selector, SelectionKey.OP_READ); readKey.attach(this); } else { readKey.interestOps(readKey.interestOps() | SelectionKey.OP_READ); @@ -372,25 +375,39 @@ abstract class AsyncNioConnection extends AsyncConnection { boolean hasRemain = true; boolean writeCompleted = true; - if (writeByteTuple1Array == null && fastWriteCount.get() > 0) { - byte[] bs = null; + if (writeByteBuffer == null && writeByteBuffers == null && writeByteTuple1Array == null && fastWriteCount.get() > 0) { + final ByteBuffer buffer = pollWriteBuffer(); + ByteBufferWriter writer = null; byte[] item; while ((item = fastWriteQueue.poll()) != null) { fastWriteCount.decrementAndGet(); - bs = Utility.append(bs, item); + if (writer != null) { + writer.put(item); + } else if (buffer.remaining() >= item.length) { + buffer.put(item); + } else { + writer = ByteBufferWriter.create(getWriteBufferSupplier(), buffer); + writer.put(item); + } } - this.writeByteTuple1Array = bs; + this.writeBuffersOffset = 0; + if (writer == null) { + this.writeByteBuffer = buffer.flip(); + this.writeBuffersLength = 0; + } else { + this.writeByteBuffers = writer.toBuffers(); + this.writeBuffersLength = this.writeByteBuffers.length; + } + this.writeByteTuple1Array = null; this.writeByteTuple1Offset = 0; - this.writeByteTuple1Length = bs == null ? 0 : bs.length; + this.writeByteTuple1Length = 0; this.writeByteTuple2Array = null; this.writeByteTuple2Offset = 0; this.writeByteTuple2Length = 0; - this.writeOffset = 0; - this.writeLength = this.writeByteTuple1Length; } - int batchOffset = writeOffset; - int batchLength = writeLength; + int batchOffset = writeBuffersOffset; + int batchLength = writeBuffersLength; while (hasRemain) { //必须要将buffer写完为止 if (writeByteTuple1Array != null) { final ByteBuffer buffer = pollWriteBuffer(); @@ -399,14 +416,13 @@ abstract class AsyncNioConnection extends AsyncConnection { if (writeByteTuple2Length > 0) { buffer.put(writeByteTuple2Array, writeByteTuple2Offset, writeByteTuple2Length); } - buffer.flip(); - writeByteBuffer = buffer; - writeByteTuple1Array = null; - writeByteTuple1Offset = 0; - writeByteTuple1Length = 0; - writeByteTuple2Array = null; - writeByteTuple2Offset = 0; - writeByteTuple2Length = 0; + this.writeByteBuffer = buffer.flip(); + this.writeByteTuple1Array = null; + this.writeByteTuple1Offset = 0; + this.writeByteTuple1Length = 0; + this.writeByteTuple2Array = null; + this.writeByteTuple2Offset = 0; + this.writeByteTuple2Length = 0; } else { ByteBufferWriter writer = ByteBufferWriter.create(getWriteBufferSupplier(), buffer); writer.put(writeByteTuple1Array, writeByteTuple1Offset, writeByteTuple1Length); @@ -414,17 +430,17 @@ abstract class AsyncNioConnection extends AsyncConnection { writer.put(writeByteTuple2Array, writeByteTuple2Offset, writeByteTuple2Length); } final ByteBuffer[] buffers = writer.toBuffers(); - writeByteBuffers = buffers; - writeOffset = 0; - writeLength = buffers.length; - batchOffset = writeOffset; - batchLength = writeLength; - writeByteTuple1Array = null; - writeByteTuple1Offset = 0; - writeByteTuple1Length = 0; - writeByteTuple2Array = null; - writeByteTuple2Offset = 0; - writeByteTuple2Length = 0; + this.writeByteBuffers = buffers; + this.writeBuffersOffset = 0; + this.writeBuffersLength = buffers.length; + batchOffset = writeBuffersOffset; + batchLength = writeBuffersLength; + this.writeByteTuple1Array = null; + this.writeByteTuple1Offset = 0; + this.writeByteTuple1Length = 0; + this.writeByteTuple2Array = null; + this.writeByteTuple2Offset = 0; + this.writeByteTuple2Length = 0; } if (this.writeCompletionHandler == this.writeTimeoutCompletionHandler) { if (writeByteBuffer == null) { @@ -473,14 +489,15 @@ abstract class AsyncNioConnection extends AsyncConnection { } if (writeCompleted && (totalCount != 0 || !hasRemain)) { - handleWrite(writeTotal + totalCount, null); + handleWrite(this.writeTotal + totalCount, null); } else if (writeKey == null) { ioWriteThread.register(selector -> { try { if (writeKey == null) { - SelectionKey oldKey = keyFor(selector); - int ops = oldKey == null ? SelectionKey.OP_WRITE : (SelectionKey.OP_WRITE | oldKey.interestOps()); - writeKey = implRegister(selector, ops); + writeKey = keyFor(selector); + } + if (writeKey == null) { + writeKey = implRegister(selector, SelectionKey.OP_WRITE); writeKey.attach(this); } else { writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE); @@ -549,8 +566,8 @@ abstract class AsyncNioConnection extends AsyncConnection { this.writeAttachment = null; this.writeByteBuffer = null; this.writeByteBuffers = null; - this.writeOffset = 0; - this.writeLength = 0; + this.writeBuffersOffset = 0; + this.writeBuffersLength = 0; this.writeTotal = 0; this.writePending = false; //必须放最后