AsyncConnection写异常时需要终结

This commit is contained in:
redkale
2023-10-20 08:17:12 +08:00
parent 3d8dcc0026
commit ca93aceafc

View File

@@ -11,7 +11,6 @@ import java.nio.ByteBuffer;
import java.nio.channels.*; import java.nio.channels.*;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer; import java.util.function.Consumer;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import org.redkale.util.ByteBufferWriter; import org.redkale.util.ByteBufferWriter;
@@ -29,8 +28,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
protected SocketAddress remoteAddress; protected SocketAddress remoteAddress;
protected final AtomicLong fastWriteCount = new AtomicLong(); //protected final AtomicLong fastWriteCount = new AtomicLong();
protected final Queue<byte[]> fastWriteQueue = new ConcurrentLinkedQueue<>(); protected final Queue<byte[]> fastWriteQueue = new ConcurrentLinkedQueue<>();
//-------------------------------- 连操作 -------------------------------------- //-------------------------------- 连操作 --------------------------------------
@@ -86,7 +84,6 @@ abstract class AsyncNioConnection extends AsyncConnection {
protected SelectionKey writeKey; protected SelectionKey writeKey;
// protected CompletionHandler<Integer, Object> writeFastHandler; // protected CompletionHandler<Integer, Object> writeFastHandler;
public AsyncNioConnection(boolean clientMode, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, public AsyncNioConnection(boolean clientMode, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread,
AsyncIOThread ioWriteThread, final int bufferCapacity, SSLBuilder sslBuilder, SSLContext sslContext) { AsyncIOThread ioWriteThread, final int bufferCapacity, SSLBuilder sslBuilder, SSLContext sslContext) {
super(clientMode, ioGroup, ioReadThread, ioWriteThread, bufferCapacity, sslBuilder, sslContext); super(clientMode, ioGroup, ioReadThread, ioWriteThread, bufferCapacity, sslBuilder, sslContext);
@@ -123,7 +120,6 @@ abstract class AsyncNioConnection extends AsyncConnection {
// this.writeFastHandler = (CompletionHandler) handler; // this.writeFastHandler = (CompletionHandler) handler;
// return this; // return this;
// } // }
@Override @Override
protected void startHandshake(final Consumer<Throwable> callback) { protected void startHandshake(final Consumer<Throwable> callback) {
ioReadThread.register(t -> super.startHandshake(callback)); ioReadThread.register(t -> super.startHandshake(callback));
@@ -337,7 +333,6 @@ abstract class AsyncNioConnection extends AsyncConnection {
// handleWrite(0, e); // handleWrite(0, e);
// } // }
// } // }
public void doRead(boolean direct) { public void doRead(boolean direct) {
try { try {
this.readTime = System.currentTimeMillis(); this.readTime = System.currentTimeMillis();
@@ -384,36 +379,37 @@ abstract class AsyncNioConnection extends AsyncConnection {
int totalCount = 0; int totalCount = 0;
boolean hasRemain = true; boolean hasRemain = true;
boolean writeCompleted = true; boolean writeCompleted = true;
if (writeByteBuffer == null && writeByteBuffers == null && writeByteTuple1Array == null && fastWriteCount.get() > 0) { boolean error = false;
final ByteBuffer buffer = pollWriteBuffer(); // if (writeByteBuffer == null && writeByteBuffers == null && writeByteTuple1Array == null && fastWriteCount.get() > 0) {
ByteBufferWriter writer = null; // final ByteBuffer buffer = pollWriteBuffer();
byte[] item; // ByteBufferWriter writer = null;
while ((item = fastWriteQueue.poll()) != null) { // byte[] item;
fastWriteCount.decrementAndGet(); // while ((item = fastWriteQueue.poll()) != null) {
if (writer != null) { // fastWriteCount.decrementAndGet();
writer.put(item); // if (writer != null) {
} else if (buffer.remaining() >= item.length) { // writer.put(item);
buffer.put(item); // } else if (buffer.remaining() >= item.length) {
} else { // buffer.put(item);
writer = ByteBufferWriter.create(getWriteBufferSupplier(), buffer); // } else {
writer.put(item); // writer = ByteBufferWriter.create(getWriteBufferSupplier(), buffer);
} // writer.put(item);
} // }
this.writeBuffersOffset = 0; // }
if (writer == null) { // this.writeBuffersOffset = 0;
this.writeByteBuffer = buffer.flip(); // if (writer == null) {
this.writeBuffersLength = 0; // this.writeByteBuffer = buffer.flip();
} else { // this.writeBuffersLength = 0;
this.writeByteBuffers = writer.toBuffers(); // } else {
this.writeBuffersLength = this.writeByteBuffers.length; // this.writeByteBuffers = writer.toBuffers();
} // this.writeBuffersLength = this.writeByteBuffers.length;
this.writeByteTuple1Array = null; // }
this.writeByteTuple1Offset = 0; // this.writeByteTuple1Array = null;
this.writeByteTuple1Length = 0; // this.writeByteTuple1Offset = 0;
this.writeByteTuple2Array = null; // this.writeByteTuple1Length = 0;
this.writeByteTuple2Offset = 0; // this.writeByteTuple2Array = null;
this.writeByteTuple2Length = 0; // this.writeByteTuple2Offset = 0;
} // this.writeByteTuple2Length = 0;
// }
int batchOffset = writeBuffersOffset; int batchOffset = writeBuffersOffset;
int batchLength = writeBuffersLength; int batchLength = writeBuffersLength;
@@ -485,9 +481,8 @@ abstract class AsyncNioConnection extends AsyncConnection {
} }
break; break;
} else if (writeCount < 0) { } else if (writeCount < 0) {
if (totalCount == 0) { error = true;
totalCount = writeCount; totalCount = writeCount;
}
break; break;
} else { } else {
totalCount += writeCount; totalCount += writeCount;
@@ -497,11 +492,13 @@ abstract class AsyncNioConnection extends AsyncConnection {
} }
} }
if (writeCompleted && (totalCount != 0 || !hasRemain)) { if (error) {
handleWrite(totalCount, new ClosedChannelException());
} else if (writeCompleted && (totalCount != 0 || !hasRemain)) {
handleWrite(this.writeTotal + totalCount, null); handleWrite(this.writeTotal + totalCount, null);
if (fastWriteCount.get() > 0) { // if (fastWriteCount.get() > 0) {
doWrite(); // doWrite();
} // }
} else if (writeKey == null) { } else if (writeKey == null) {
ioWriteThread.register(selector -> { ioWriteThread.register(selector -> {
try { try {