AsyncConnection优化

This commit is contained in:
redkale
2023-06-27 09:10:24 +08:00
parent 87899d514b
commit f3475341a0
4 changed files with 97 additions and 121 deletions

View File

@@ -352,38 +352,35 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
}
public final void write(byte[] bytes, CompletionHandler<Integer, Void> handler) {
write(bytes, 0, bytes.length, null, 0, 0, null, null, handler);
write(bytes, 0, bytes.length, null, 0, 0, handler);
}
public final void write(ByteTuple array, CompletionHandler<Integer, Void> handler) {
write(array.content(), array.offset(), array.length(), null, 0, 0, null, null, handler);
write(array.content(), array.offset(), array.length(), null, 0, 0, handler);
}
public final <A> void write(ByteTuple array, A attachment, CompletionHandler<Integer, ? super A> handler) {
write(array.content(), array.offset(), array.length(), null, 0, 0, null, null, attachment, handler);
write(array.content(), array.offset(), array.length(), null, 0, 0, attachment, handler);
}
public final void write(byte[] bytes, int offset, int length, CompletionHandler<Integer, Void> handler) {
write(bytes, offset, length, null, 0, 0, null, null, handler);
write(bytes, offset, length, null, 0, 0, handler);
}
public final void write(ByteTuple header, ByteTuple body, CompletionHandler<Integer, Void> handler) {
write(header.content(), header.offset(), header.length(), body == null ? null : body.content(), body == null ? 0 : body.offset(), body == null ? 0 : body.length(), null, null, handler);
write(header.content(), header.offset(), header.length(), body == null ? null : body.content(), body == null ? 0 : body.offset(), body == null ? 0 : body.length(), handler);
}
public void write(byte[] headerContent, int headerOffset, int headerLength, byte[] bodyContent, int bodyOffset, int bodyLength, Consumer bodyCallback, Object bodyAttachment, CompletionHandler<Integer, Void> handler) {
write(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength, bodyCallback, bodyAttachment, null, handler);
public void write(byte[] headerContent, int headerOffset, int headerLength, byte[] bodyContent, int bodyOffset, int bodyLength, CompletionHandler<Integer, Void> handler) {
write(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength, null, handler);
}
public void write(byte[] headerContent, int headerOffset, int headerLength, byte[] bodyContent, int bodyOffset, int bodyLength, Consumer bodyCallback, Object bodyAttachment, Object handlerAttachment, CompletionHandler handler) {
public void write(byte[] headerContent, int headerOffset, int headerLength, byte[] bodyContent, int bodyOffset, int bodyLength, Object handlerAttachment, CompletionHandler handler) {
final ByteBuffer buffer = sslEngine == null ? pollWriteBuffer() : pollWriteSSLBuffer();
if (buffer.remaining() >= headerLength + bodyLength) {
buffer.put(headerContent, headerOffset, headerLength);
if (bodyLength > 0) {
buffer.put(bodyContent, bodyOffset, bodyLength);
if (bodyCallback != null) {
bodyCallback.accept(bodyAttachment);
}
}
buffer.flip();
CompletionHandler<Integer, Object> newHandler = new CompletionHandler<Integer, Object>() {
@@ -405,9 +402,6 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
writer.put(headerContent, headerOffset, headerLength);
if (bodyLength > 0) {
writer.put(bodyContent, bodyOffset, bodyLength);
if (bodyCallback != null) {
bodyCallback.accept(bodyAttachment);
}
}
final ByteBuffer[] buffers = writer.toBuffers();
CompletionHandler<Integer, Object> newHandler = new CompletionHandler<Integer, Object>() {
@@ -486,11 +480,11 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
}
}
public final void writeInIOThread(byte[] headerContent, int headerOffset, int headerLength, byte[] bodyContent, int bodyOffset, int bodyLength, Consumer bodyCallback, Object bodyAttachment, CompletionHandler<Integer, Void> handler) {
public final void writeInIOThread(byte[] headerContent, int headerOffset, int headerLength, byte[] bodyContent, int bodyOffset, int bodyLength, CompletionHandler<Integer, Void> handler) {
if (inCurrWriteThread()) {
write(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength, bodyCallback, bodyAttachment, handler);
write(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength, handler);
} else {
executeWrite(() -> write(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength, bodyCallback, bodyAttachment, handler));
executeWrite(() -> write(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength, handler));
}
}

View File

@@ -184,7 +184,7 @@ public class AsyncIOThread extends WorkThread {
conn.doConnect();
} else if (conn.writeCompletionHandler != null && key.isWritable()) {
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
conn.doWrite(true);
conn.doWrite();
} else if (conn.readCompletionHandler != null && key.isReadable()) {
key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
conn.doRead(true);
@@ -195,7 +195,7 @@ public class AsyncIOThread extends WorkThread {
conn.doRead(true);
} else if (conn.writeCompletionHandler != null && key.isWritable()) {
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
conn.doWrite(true);
conn.doWrite();
} else if (key.isConnectable()) {
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);
conn.doConnect();

View File

@@ -63,10 +63,6 @@ abstract class AsyncNioConnection extends AsyncConnection {
protected int writeByteTuple2Length;
protected Consumer writeByteTuple2Callback;
protected Object writeByteTuple2Attachment;
//写操作, 二选一要么writeByteBuffer有值要么writeByteBuffers、writeOffset、writeLength有值
protected ByteBuffer writeByteBuffer;
@@ -188,11 +184,10 @@ abstract class AsyncNioConnection extends AsyncConnection {
@Override
public void write(byte[] headerContent, int headerOffset, int headerLength,
byte[] bodyContent, int bodyOffset, int bodyLength,
Consumer bodyCallback, Object bodyAttachment, CompletionHandler<Integer, Void> handler) {
byte[] bodyContent, int bodyOffset, int bodyLength, CompletionHandler<Integer, Void> handler) {
if (sslEngine != null) {
super.write(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength, bodyCallback, bodyAttachment, handler);
super.write(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength, handler);
return;
}
Objects.requireNonNull(headerContent);
@@ -212,8 +207,6 @@ abstract class AsyncNioConnection extends AsyncConnection {
this.writeByteTuple2Array = bodyContent;
this.writeByteTuple2Offset = bodyOffset;
this.writeByteTuple2Length = bodyLength;
this.writeByteTuple2Callback = bodyCallback;
this.writeByteTuple2Attachment = bodyAttachment;
this.writeAttachment = null;
if (this.writeTimeoutSeconds > 0) {
AsyncNioCompletionHandler newHandler = this.writeTimeoutCompletionHandler;
@@ -225,7 +218,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
newHandler.handler(handler, null); // new AsyncNioCompletionHandler(handler, null);
this.writeCompletionHandler = newHandler;
}
doWrite(true); //如果不是true则bodyCallback的执行可能会切换线程
doWrite(); //如果不是true则bodyCallback的执行可能会切换线程
}
@Override
@@ -251,7 +244,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
} else {
this.writeCompletionHandler = (CompletionHandler) handler;
}
doWrite(true);
doWrite();
}
@Override
@@ -279,7 +272,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
} else {
this.writeCompletionHandler = (CompletionHandler) handler;
}
doWrite(true);
doWrite();
}
public void doRead(boolean direct) {
@@ -315,105 +308,94 @@ abstract class AsyncNioConnection extends AsyncConnection {
}
}
public void doWrite(boolean direct) {
public void doWrite() {
try {
this.writeTime = System.currentTimeMillis();
int totalCount = 0;
boolean hasRemain = true;
boolean writeCompleted = true;
if (direct) {
int batchOffset = writeOffset;
int batchLength = writeLength;
while (hasRemain) { //必须要将buffer写完为止
if (writeByteTuple1Array != null) {
final ByteBuffer buffer = pollWriteBuffer();
if (buffer.remaining() >= writeByteTuple1Length + writeByteTuple2Length) {
buffer.put(writeByteTuple1Array, writeByteTuple1Offset, writeByteTuple1Length);
if (writeByteTuple2Length > 0) {
buffer.put(writeByteTuple2Array, writeByteTuple2Offset, writeByteTuple2Length);
if (writeByteTuple2Callback != null) {
writeByteTuple2Callback.accept(writeByteTuple2Attachment);
}
}
buffer.flip();
writeByteBuffer = buffer;
writeByteTuple1Array = null;
writeByteTuple1Offset = 0;
writeByteTuple1Length = 0;
writeByteTuple2Array = null;
writeByteTuple2Offset = 0;
writeByteTuple2Length = 0;
writeByteTuple2Callback = null;
writeByteTuple2Attachment = null;
} else {
ByteBufferWriter writer = ByteBufferWriter.create(getWriteBufferSupplier(), buffer);
writer.put(writeByteTuple1Array, writeByteTuple1Offset, writeByteTuple1Length);
if (writeByteTuple2Length > 0) {
writer.put(writeByteTuple2Array, writeByteTuple2Offset, writeByteTuple2Length);
if (writeByteTuple2Callback != null) {
writeByteTuple2Callback.accept(writeByteTuple2Attachment);
}
}
final ByteBuffer[] buffers = writer.toBuffers();
writeByteBuffers = buffers;
writeOffset = 0;
writeLength = buffers.length;
batchOffset = writeOffset;
batchLength = writeLength;
writeByteTuple1Array = null;
writeByteTuple1Offset = 0;
writeByteTuple1Length = 0;
writeByteTuple2Array = null;
writeByteTuple2Offset = 0;
writeByteTuple2Length = 0;
writeByteTuple2Callback = null;
writeByteTuple2Attachment = null;
}
if (this.writeCompletionHandler == this.writeTimeoutCompletionHandler) {
if (writeByteBuffer == null) {
this.writeTimeoutCompletionHandler.buffers(writeByteBuffers);
} else {
this.writeTimeoutCompletionHandler.buffer(writeByteBuffer);
}
}
}
int writeCount;
if (writeByteBuffer != null) {
writeCount = implWrite(writeByteBuffer);
hasRemain = writeByteBuffer.hasRemaining();
} else {
writeCount = implWrite(writeByteBuffers, batchOffset, batchLength);
boolean remain = false;
for (int i = 0; i < batchLength; i++) {
if (writeByteBuffers[batchOffset + i].hasRemaining()) {
remain = true;
batchOffset += i;
batchLength -= i;
break;
}
}
hasRemain = remain;
}
if (writeCount == 0) {
if (hasRemain) {
//writeCompleted = false;
//writeTotal = totalCount;
continue; //要全部输出完才返回
int batchOffset = writeOffset;
int batchLength = writeLength;
while (hasRemain) { //必须要将buffer写完为止
if (writeByteTuple1Array != null) {
final ByteBuffer buffer = pollWriteBuffer();
if (buffer.remaining() >= writeByteTuple1Length + writeByteTuple2Length) {
buffer.put(writeByteTuple1Array, writeByteTuple1Offset, writeByteTuple1Length);
if (writeByteTuple2Length > 0) {
buffer.put(writeByteTuple2Array, writeByteTuple2Offset, writeByteTuple2Length);
}
break;
} else if (writeCount < 0) {
if (totalCount == 0) {
totalCount = writeCount;
}
break;
buffer.flip();
writeByteBuffer = buffer;
writeByteTuple1Array = null;
writeByteTuple1Offset = 0;
writeByteTuple1Length = 0;
writeByteTuple2Array = null;
writeByteTuple2Offset = 0;
writeByteTuple2Length = 0;
} else {
totalCount += writeCount;
ByteBufferWriter writer = ByteBufferWriter.create(getWriteBufferSupplier(), buffer);
writer.put(writeByteTuple1Array, writeByteTuple1Offset, writeByteTuple1Length);
if (writeByteTuple2Length > 0) {
writer.put(writeByteTuple2Array, writeByteTuple2Offset, writeByteTuple2Length);
}
final ByteBuffer[] buffers = writer.toBuffers();
writeByteBuffers = buffers;
writeOffset = 0;
writeLength = buffers.length;
batchOffset = writeOffset;
batchLength = writeLength;
writeByteTuple1Array = null;
writeByteTuple1Offset = 0;
writeByteTuple1Length = 0;
writeByteTuple2Array = null;
writeByteTuple2Offset = 0;
writeByteTuple2Length = 0;
}
if (!hasRemain) {
break;
if (this.writeCompletionHandler == this.writeTimeoutCompletionHandler) {
if (writeByteBuffer == null) {
this.writeTimeoutCompletionHandler.buffers(writeByteBuffers);
} else {
this.writeTimeoutCompletionHandler.buffer(writeByteBuffer);
}
}
}
int writeCount;
if (writeByteBuffer != null) {
writeCount = implWrite(writeByteBuffer);
hasRemain = writeByteBuffer.hasRemaining();
} else {
writeCount = implWrite(writeByteBuffers, batchOffset, batchLength);
boolean remain = false;
for (int i = 0; i < batchLength; i++) {
if (writeByteBuffers[batchOffset + i].hasRemaining()) {
remain = true;
batchOffset += i;
batchLength -= i;
break;
}
}
hasRemain = remain;
}
if (writeCount == 0) {
if (hasRemain) {
//writeCompleted = false;
//writeTotal = totalCount;
continue; //要全部输出完才返回
}
break;
} else if (writeCount < 0) {
if (totalCount == 0) {
totalCount = writeCount;
}
break;
} else {
totalCount += writeCount;
}
if (!hasRemain) {
break;
}
}
if (writeCompleted && (totalCount != 0 || !hasRemain)) {

View File

@@ -367,7 +367,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
}
}
public <A> void finish(boolean kill, final byte[] bs1, int offset1, int length1, final byte[] bs2, int offset2, int length2, Consumer<A> callback, A attachment) {
public <A> void finish(boolean kill, final byte[] bs1, int offset1, int length1, final byte[] bs2, int offset2, int length2) {
if (kill) {
refuseAlive();
}
@@ -384,7 +384,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, bs1, offset1, length1, bs2, offset2, length2);
this.channel.writePipelineInIOThread(this.finishBytesIOThreadHandler);
} else {
this.channel.writeInIOThread(bs1, offset1, length1, bs2, offset2, length2, callback, attachment, finishBytesIOThreadHandler);
this.channel.writeInIOThread(bs1, offset1, length1, bs2, offset2, length2, finishBytesIOThreadHandler);
}
}