From ca93aceafc0c0fb159b728cab361467441693b1d Mon Sep 17 00:00:00 2001 From: redkale Date: Fri, 20 Oct 2023 08:17:12 +0800 Subject: [PATCH] =?UTF-8?q?AsyncConnection=E5=86=99=E5=BC=82=E5=B8=B8?= =?UTF-8?q?=E6=97=B6=E9=9C=80=E8=A6=81=E7=BB=88=E7=BB=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/redkale/net/AsyncNioConnection.java | 83 +++++++++---------- 1 file changed, 40 insertions(+), 43 deletions(-) diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index 993fe15b4..f3333aa68 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -11,7 +11,6 @@ import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import javax.net.ssl.SSLContext; import org.redkale.util.ByteBufferWriter; @@ -29,8 +28,7 @@ abstract class AsyncNioConnection extends AsyncConnection { protected SocketAddress remoteAddress; - protected final AtomicLong fastWriteCount = new AtomicLong(); - + //protected final AtomicLong fastWriteCount = new AtomicLong(); protected final Queue fastWriteQueue = new ConcurrentLinkedQueue<>(); //-------------------------------- 连操作 -------------------------------------- @@ -86,7 +84,6 @@ abstract class AsyncNioConnection extends AsyncConnection { protected SelectionKey writeKey; // protected CompletionHandler writeFastHandler; - public AsyncNioConnection(boolean clientMode, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread, final int bufferCapacity, SSLBuilder sslBuilder, SSLContext sslContext) { super(clientMode, ioGroup, ioReadThread, ioWriteThread, bufferCapacity, sslBuilder, sslContext); @@ -123,7 +120,6 @@ abstract class AsyncNioConnection extends AsyncConnection { // this.writeFastHandler = (CompletionHandler) handler; // return this; // } - @Override protected void startHandshake(final Consumer callback) { ioReadThread.register(t -> super.startHandshake(callback)); @@ -337,7 +333,6 @@ abstract class AsyncNioConnection extends AsyncConnection { // handleWrite(0, e); // } // } - public void doRead(boolean direct) { try { this.readTime = System.currentTimeMillis(); @@ -384,36 +379,37 @@ abstract class AsyncNioConnection extends AsyncConnection { int totalCount = 0; boolean hasRemain = true; boolean writeCompleted = true; - if (writeByteBuffer == null && writeByteBuffers == null && writeByteTuple1Array == null && fastWriteCount.get() > 0) { - final ByteBuffer buffer = pollWriteBuffer(); - ByteBufferWriter writer = null; - byte[] item; - while ((item = fastWriteQueue.poll()) != null) { - fastWriteCount.decrementAndGet(); - if (writer != null) { - writer.put(item); - } else if (buffer.remaining() >= item.length) { - buffer.put(item); - } else { - writer = ByteBufferWriter.create(getWriteBufferSupplier(), buffer); - writer.put(item); - } - } - this.writeBuffersOffset = 0; - if (writer == null) { - this.writeByteBuffer = buffer.flip(); - this.writeBuffersLength = 0; - } else { - this.writeByteBuffers = writer.toBuffers(); - this.writeBuffersLength = this.writeByteBuffers.length; - } - this.writeByteTuple1Array = null; - this.writeByteTuple1Offset = 0; - this.writeByteTuple1Length = 0; - this.writeByteTuple2Array = null; - this.writeByteTuple2Offset = 0; - this.writeByteTuple2Length = 0; - } + boolean error = false; +// if (writeByteBuffer == null && writeByteBuffers == null && writeByteTuple1Array == null && fastWriteCount.get() > 0) { +// final ByteBuffer buffer = pollWriteBuffer(); +// ByteBufferWriter writer = null; +// byte[] item; +// while ((item = fastWriteQueue.poll()) != null) { +// fastWriteCount.decrementAndGet(); +// if (writer != null) { +// writer.put(item); +// } else if (buffer.remaining() >= item.length) { +// buffer.put(item); +// } else { +// writer = ByteBufferWriter.create(getWriteBufferSupplier(), buffer); +// writer.put(item); +// } +// } +// this.writeBuffersOffset = 0; +// if (writer == null) { +// this.writeByteBuffer = buffer.flip(); +// this.writeBuffersLength = 0; +// } else { +// this.writeByteBuffers = writer.toBuffers(); +// this.writeBuffersLength = this.writeByteBuffers.length; +// } +// this.writeByteTuple1Array = null; +// this.writeByteTuple1Offset = 0; +// this.writeByteTuple1Length = 0; +// this.writeByteTuple2Array = null; +// this.writeByteTuple2Offset = 0; +// this.writeByteTuple2Length = 0; +// } int batchOffset = writeBuffersOffset; int batchLength = writeBuffersLength; @@ -485,9 +481,8 @@ abstract class AsyncNioConnection extends AsyncConnection { } break; } else if (writeCount < 0) { - if (totalCount == 0) { - totalCount = writeCount; - } + error = true; + totalCount = writeCount; break; } else { totalCount += writeCount; @@ -497,11 +492,13 @@ abstract class AsyncNioConnection extends AsyncConnection { } } - if (writeCompleted && (totalCount != 0 || !hasRemain)) { + if (error) { + handleWrite(totalCount, new ClosedChannelException()); + } else if (writeCompleted && (totalCount != 0 || !hasRemain)) { handleWrite(this.writeTotal + totalCount, null); - if (fastWriteCount.get() > 0) { - doWrite(); - } +// if (fastWriteCount.get() > 0) { +// doWrite(); +// } } else if (writeKey == null) { ioWriteThread.register(selector -> { try {