AsyncConnection优化
This commit is contained in:
@@ -14,7 +14,7 @@ import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Consumer;
|
||||
import javax.net.ssl.SSLContext;
|
||||
import org.redkale.util.*;
|
||||
import org.redkale.util.ByteBufferWriter;
|
||||
|
||||
/**
|
||||
*
|
||||
@@ -68,14 +68,14 @@ abstract class AsyncNioConnection extends AsyncConnection {
|
||||
|
||||
protected int writeByteTuple2Length;
|
||||
|
||||
//写操作, 二选一,要么writeByteBuffer有值,要么writeByteBuffers、writeOffset、writeLength有值
|
||||
//写操作, 二选一,要么writeByteBuffer有值,要么writeByteBuffers、writeBuffersOffset、writeBuffersLength有值
|
||||
protected ByteBuffer writeByteBuffer;
|
||||
|
||||
protected ByteBuffer[] writeByteBuffers;
|
||||
|
||||
protected int writeOffset;
|
||||
protected int writeBuffersOffset;
|
||||
|
||||
protected int writeLength;
|
||||
protected int writeBuffersLength;
|
||||
|
||||
protected int writeTotal;
|
||||
|
||||
@@ -154,9 +154,10 @@ abstract class AsyncNioConnection extends AsyncConnection {
|
||||
ioReadThread.register(selector -> {
|
||||
try {
|
||||
if (readKey == null) {
|
||||
SelectionKey oldKey = keyFor(selector);
|
||||
int ops = oldKey == null ? SelectionKey.OP_READ : (SelectionKey.OP_READ | oldKey.interestOps());
|
||||
readKey = implRegister(selector, ops);
|
||||
readKey = keyFor(selector);
|
||||
}
|
||||
if (readKey == null) {
|
||||
readKey = implRegister(selector, SelectionKey.OP_READ);
|
||||
readKey.attach(this);
|
||||
} else {
|
||||
readKey.interestOps(readKey.interestOps() | SelectionKey.OP_READ);
|
||||
@@ -275,8 +276,8 @@ abstract class AsyncNioConnection extends AsyncConnection {
|
||||
}
|
||||
this.writePending = true;
|
||||
this.writeByteBuffers = srcs;
|
||||
this.writeOffset = offset;
|
||||
this.writeLength = length;
|
||||
this.writeBuffersOffset = offset;
|
||||
this.writeBuffersLength = length;
|
||||
this.writeAttachment = attachment;
|
||||
if (this.writeTimeoutSeconds > 0) {
|
||||
AsyncNioCompletionHandler newHandler = this.writeTimeoutCompletionHandler;
|
||||
@@ -307,9 +308,10 @@ abstract class AsyncNioConnection extends AsyncConnection {
|
||||
ioWriteThread.register(selector -> {
|
||||
try {
|
||||
if (writeKey == null) {
|
||||
SelectionKey oldKey = keyFor(selector);
|
||||
int ops = oldKey == null ? SelectionKey.OP_WRITE : (SelectionKey.OP_WRITE | oldKey.interestOps());
|
||||
writeKey = implRegister(selector, ops);
|
||||
writeKey = keyFor(selector);
|
||||
}
|
||||
if (writeKey == null) {
|
||||
writeKey = implRegister(selector, SelectionKey.OP_WRITE);
|
||||
writeKey.attach(this);
|
||||
} else {
|
||||
writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE);
|
||||
@@ -346,9 +348,10 @@ abstract class AsyncNioConnection extends AsyncConnection {
|
||||
ioReadThread.register(selector -> {
|
||||
try {
|
||||
if (readKey == null) {
|
||||
SelectionKey oldKey = keyFor(selector);
|
||||
int ops = oldKey == null ? SelectionKey.OP_READ : (SelectionKey.OP_READ | oldKey.interestOps());
|
||||
readKey = implRegister(selector, ops);
|
||||
readKey = keyFor(selector);
|
||||
}
|
||||
if (readKey == null) {
|
||||
readKey = implRegister(selector, SelectionKey.OP_READ);
|
||||
readKey.attach(this);
|
||||
} else {
|
||||
readKey.interestOps(readKey.interestOps() | SelectionKey.OP_READ);
|
||||
@@ -372,25 +375,39 @@ abstract class AsyncNioConnection extends AsyncConnection {
|
||||
boolean hasRemain = true;
|
||||
boolean writeCompleted = true;
|
||||
|
||||
if (writeByteTuple1Array == null && fastWriteCount.get() > 0) {
|
||||
byte[] bs = null;
|
||||
if (writeByteBuffer == null && writeByteBuffers == null && writeByteTuple1Array == null && fastWriteCount.get() > 0) {
|
||||
final ByteBuffer buffer = pollWriteBuffer();
|
||||
ByteBufferWriter writer = null;
|
||||
byte[] item;
|
||||
while ((item = fastWriteQueue.poll()) != null) {
|
||||
fastWriteCount.decrementAndGet();
|
||||
bs = Utility.append(bs, item);
|
||||
if (writer != null) {
|
||||
writer.put(item);
|
||||
} else if (buffer.remaining() >= item.length) {
|
||||
buffer.put(item);
|
||||
} else {
|
||||
writer = ByteBufferWriter.create(getWriteBufferSupplier(), buffer);
|
||||
writer.put(item);
|
||||
}
|
||||
}
|
||||
this.writeByteTuple1Array = bs;
|
||||
this.writeBuffersOffset = 0;
|
||||
if (writer == null) {
|
||||
this.writeByteBuffer = buffer.flip();
|
||||
this.writeBuffersLength = 0;
|
||||
} else {
|
||||
this.writeByteBuffers = writer.toBuffers();
|
||||
this.writeBuffersLength = this.writeByteBuffers.length;
|
||||
}
|
||||
this.writeByteTuple1Array = null;
|
||||
this.writeByteTuple1Offset = 0;
|
||||
this.writeByteTuple1Length = bs == null ? 0 : bs.length;
|
||||
this.writeByteTuple1Length = 0;
|
||||
this.writeByteTuple2Array = null;
|
||||
this.writeByteTuple2Offset = 0;
|
||||
this.writeByteTuple2Length = 0;
|
||||
this.writeOffset = 0;
|
||||
this.writeLength = this.writeByteTuple1Length;
|
||||
}
|
||||
|
||||
int batchOffset = writeOffset;
|
||||
int batchLength = writeLength;
|
||||
int batchOffset = writeBuffersOffset;
|
||||
int batchLength = writeBuffersLength;
|
||||
while (hasRemain) { //必须要将buffer写完为止
|
||||
if (writeByteTuple1Array != null) {
|
||||
final ByteBuffer buffer = pollWriteBuffer();
|
||||
@@ -399,14 +416,13 @@ abstract class AsyncNioConnection extends AsyncConnection {
|
||||
if (writeByteTuple2Length > 0) {
|
||||
buffer.put(writeByteTuple2Array, writeByteTuple2Offset, writeByteTuple2Length);
|
||||
}
|
||||
buffer.flip();
|
||||
writeByteBuffer = buffer;
|
||||
writeByteTuple1Array = null;
|
||||
writeByteTuple1Offset = 0;
|
||||
writeByteTuple1Length = 0;
|
||||
writeByteTuple2Array = null;
|
||||
writeByteTuple2Offset = 0;
|
||||
writeByteTuple2Length = 0;
|
||||
this.writeByteBuffer = buffer.flip();
|
||||
this.writeByteTuple1Array = null;
|
||||
this.writeByteTuple1Offset = 0;
|
||||
this.writeByteTuple1Length = 0;
|
||||
this.writeByteTuple2Array = null;
|
||||
this.writeByteTuple2Offset = 0;
|
||||
this.writeByteTuple2Length = 0;
|
||||
} else {
|
||||
ByteBufferWriter writer = ByteBufferWriter.create(getWriteBufferSupplier(), buffer);
|
||||
writer.put(writeByteTuple1Array, writeByteTuple1Offset, writeByteTuple1Length);
|
||||
@@ -414,17 +430,17 @@ abstract class AsyncNioConnection extends AsyncConnection {
|
||||
writer.put(writeByteTuple2Array, writeByteTuple2Offset, writeByteTuple2Length);
|
||||
}
|
||||
final ByteBuffer[] buffers = writer.toBuffers();
|
||||
writeByteBuffers = buffers;
|
||||
writeOffset = 0;
|
||||
writeLength = buffers.length;
|
||||
batchOffset = writeOffset;
|
||||
batchLength = writeLength;
|
||||
writeByteTuple1Array = null;
|
||||
writeByteTuple1Offset = 0;
|
||||
writeByteTuple1Length = 0;
|
||||
writeByteTuple2Array = null;
|
||||
writeByteTuple2Offset = 0;
|
||||
writeByteTuple2Length = 0;
|
||||
this.writeByteBuffers = buffers;
|
||||
this.writeBuffersOffset = 0;
|
||||
this.writeBuffersLength = buffers.length;
|
||||
batchOffset = writeBuffersOffset;
|
||||
batchLength = writeBuffersLength;
|
||||
this.writeByteTuple1Array = null;
|
||||
this.writeByteTuple1Offset = 0;
|
||||
this.writeByteTuple1Length = 0;
|
||||
this.writeByteTuple2Array = null;
|
||||
this.writeByteTuple2Offset = 0;
|
||||
this.writeByteTuple2Length = 0;
|
||||
}
|
||||
if (this.writeCompletionHandler == this.writeTimeoutCompletionHandler) {
|
||||
if (writeByteBuffer == null) {
|
||||
@@ -473,14 +489,15 @@ abstract class AsyncNioConnection extends AsyncConnection {
|
||||
}
|
||||
|
||||
if (writeCompleted && (totalCount != 0 || !hasRemain)) {
|
||||
handleWrite(writeTotal + totalCount, null);
|
||||
handleWrite(this.writeTotal + totalCount, null);
|
||||
} else if (writeKey == null) {
|
||||
ioWriteThread.register(selector -> {
|
||||
try {
|
||||
if (writeKey == null) {
|
||||
SelectionKey oldKey = keyFor(selector);
|
||||
int ops = oldKey == null ? SelectionKey.OP_WRITE : (SelectionKey.OP_WRITE | oldKey.interestOps());
|
||||
writeKey = implRegister(selector, ops);
|
||||
writeKey = keyFor(selector);
|
||||
}
|
||||
if (writeKey == null) {
|
||||
writeKey = implRegister(selector, SelectionKey.OP_WRITE);
|
||||
writeKey.attach(this);
|
||||
} else {
|
||||
writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE);
|
||||
@@ -549,8 +566,8 @@ abstract class AsyncNioConnection extends AsyncConnection {
|
||||
this.writeAttachment = null;
|
||||
this.writeByteBuffer = null;
|
||||
this.writeByteBuffers = null;
|
||||
this.writeOffset = 0;
|
||||
this.writeLength = 0;
|
||||
this.writeBuffersOffset = 0;
|
||||
this.writeBuffersLength = 0;
|
||||
this.writeTotal = 0;
|
||||
this.writePending = false; //必须放最后
|
||||
|
||||
|
||||
Reference in New Issue
Block a user