优化ClientWriteIOThread

This commit is contained in:
Redkale
2023-01-09 16:37:22 +08:00
parent bb4482c08c
commit d603915d2b
4 changed files with 38 additions and 36 deletions

View File

@@ -320,7 +320,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
}
}
buffer.flip();
CompletionHandler<Integer, Void> newhandler = new CompletionHandler<Integer, Void>() {
CompletionHandler<Integer, Void> newHandler = new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
offerWriteBuffer(buffer);
@@ -333,7 +333,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
handler.failed(exc, attachment);
}
};
write(buffer, null, newhandler);
write(buffer, null, newHandler);
} else {
ByteBufferWriter writer = ByteBufferWriter.create(sslEngine == null ? writeBufferSupplier : () -> pollWriteSSLBuffer(), buffer);
writer.put(headerContent, headerOffset, headerLength);
@@ -344,7 +344,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
}
}
final ByteBuffer[] buffers = writer.toBuffers();
CompletionHandler<Integer, Void> newhandler = new CompletionHandler<Integer, Void>() {
CompletionHandler<Integer, Void> newHandler = new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
offerWriteBuffer(buffers);
@@ -357,7 +357,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
handler.failed(exc, attachment);
}
};
write(buffers, null, newhandler);
write(buffers, null, newHandler);
}
}
@@ -384,7 +384,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
handler.completed(0, attachment);
} else {
ByteBuffer[] srcs = writer.toBuffers();
CompletionHandler<Integer, ? super A> newhandler = new CompletionHandler<Integer, A>() {
CompletionHandler<Integer, ? super A> newHandler = new CompletionHandler<Integer, A>() {
@Override
public void completed(Integer result, A attachment) {
offerWriteBuffer(srcs);
@@ -398,9 +398,9 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
}
};
if (srcs.length == 1) {
write(srcs[0], attachment, newhandler);
write(srcs[0], attachment, newHandler);
} else {
write(srcs, attachment, newhandler);
write(srcs, attachment, newHandler);
}
}
}

View File

@@ -176,7 +176,6 @@ public class AsyncIOThread extends WorkThread {
key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
conn.doRead(true);
} else if (conn.writeCompletionHandler != null && key.isWritable()) {
conn.currWriteInvoker = 0;
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
conn.doWrite(true);
}
@@ -186,7 +185,6 @@ public class AsyncIOThread extends WorkThread {
key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); //不放开这行在CompletableFuture时容易ReadPending
conn.doRead(true);
} else if (conn.writeCompletionHandler != null && key.isWritable()) {
conn.currWriteInvoker = 0;
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
conn.doWrite(true);
} else if (key.isConnectable()) {

View File

@@ -62,8 +62,6 @@ abstract class AsyncNioConnection extends AsyncConnection {
protected int writeTimeoutSeconds;
int currWriteInvoker;
protected byte[] writeByteTuple1Array;
protected int writeByteTuple1Offset;
@@ -154,10 +152,10 @@ abstract class AsyncNioConnection extends AsyncConnection {
}
this.readPending = true;
if (this.readTimeoutSeconds > 0) {
AsyncNioCompletionHandler newhandler = this.readTimeoutCompletionHandler;
newhandler.handler(handler, this.readByteBuffer); // new AsyncNioCompletionHandler(handler, this.readByteBuffer);
this.readCompletionHandler = newhandler;
newhandler.timeoutFuture = ioGroup.scheduleTimeout(newhandler, this.readTimeoutSeconds, TimeUnit.SECONDS);
AsyncNioCompletionHandler newHandler = this.readTimeoutCompletionHandler;
newHandler.handler(handler, this.readByteBuffer); // new AsyncNioCompletionHandler(handler, this.readByteBuffer);
this.readCompletionHandler = newHandler;
newHandler.timeoutFuture = ioGroup.scheduleTimeout(newHandler, this.readTimeoutSeconds, TimeUnit.SECONDS);
} else {
this.readCompletionHandler = handler;
}
@@ -196,14 +194,14 @@ abstract class AsyncNioConnection extends AsyncConnection {
this.writeByteTuple2Attachment = bodyAttachment;
this.writeAttachment = null;
if (this.writeTimeoutSeconds > 0) {
AsyncNioCompletionHandler newhandler = this.writeTimeoutCompletionHandler;
newhandler.handler(handler, null); // new AsyncNioCompletionHandler(handler, null);
this.writeCompletionHandler = newhandler;
newhandler.timeoutFuture = ioGroup.scheduleTimeout(newhandler, this.writeTimeoutSeconds, TimeUnit.SECONDS);
AsyncNioCompletionHandler newHandler = this.writeTimeoutCompletionHandler;
newHandler.handler(handler, null); // new AsyncNioCompletionHandler(handler, null);
this.writeCompletionHandler = newHandler;
newHandler.timeoutFuture = ioGroup.scheduleTimeout(newHandler, this.writeTimeoutSeconds, TimeUnit.SECONDS);
} else {
AsyncNioCompletionHandler newhandler = this.writeTimeoutCompletionHandler;
newhandler.handler(handler, null); // new AsyncNioCompletionHandler(handler, null);
this.writeCompletionHandler = newhandler;
AsyncNioCompletionHandler newHandler = this.writeTimeoutCompletionHandler;
newHandler.handler(handler, null); // new AsyncNioCompletionHandler(handler, null);
this.writeCompletionHandler = newHandler;
}
doWrite(true); //如果不是true则bodyCallback的执行可能会切换线程
}
@@ -224,14 +222,14 @@ abstract class AsyncNioConnection extends AsyncConnection {
this.writeByteBuffer = src;
this.writeAttachment = attachment;
if (this.writeTimeoutSeconds > 0) {
AsyncNioCompletionHandler newhandler = this.writeTimeoutCompletionHandler;
newhandler.handler(handler, attachment); // new AsyncNioCompletionHandler(handler, attachment);
this.writeCompletionHandler = newhandler;
newhandler.timeoutFuture = ioGroup.scheduleTimeout(newhandler, this.writeTimeoutSeconds, TimeUnit.SECONDS);
AsyncNioCompletionHandler newHandler = this.writeTimeoutCompletionHandler;
newHandler.handler(handler, attachment); // new AsyncNioCompletionHandler(handler, attachment);
this.writeCompletionHandler = newHandler;
newHandler.timeoutFuture = ioGroup.scheduleTimeout(newHandler, this.writeTimeoutSeconds, TimeUnit.SECONDS);
} else {
this.writeCompletionHandler = (CompletionHandler) handler;
}
doWrite(true); // || !client || currWriteInvoker < MAX_INVOKER_ONSTACK // !client && ioThread.workExecutor == null
doWrite(true);
}
@Override
@@ -252,14 +250,14 @@ abstract class AsyncNioConnection extends AsyncConnection {
this.writeLength = length;
this.writeAttachment = attachment;
if (this.writeTimeoutSeconds > 0) {
AsyncNioCompletionHandler newhandler = this.writeTimeoutCompletionHandler;
newhandler.handler(handler, attachment); // new AsyncNioCompletionHandler(handler, attachment);
this.writeCompletionHandler = newhandler;
newhandler.timeoutFuture = ioGroup.scheduleTimeout(newhandler, this.writeTimeoutSeconds, TimeUnit.SECONDS);
AsyncNioCompletionHandler newHandler = this.writeTimeoutCompletionHandler;
newHandler.handler(handler, attachment); // new AsyncNioCompletionHandler(handler, attachment);
this.writeCompletionHandler = newHandler;
newHandler.timeoutFuture = ioGroup.scheduleTimeout(newHandler, this.writeTimeoutSeconds, TimeUnit.SECONDS);
} else {
this.writeCompletionHandler = (CompletionHandler) handler;
}
doWrite(true); // || !client || currWriteInvoker < MAX_INVOKER_ONSTACK // !client && ioThread.workExecutor == null
doWrite(true);
}
public void doRead(boolean direct) {
@@ -303,9 +301,6 @@ abstract class AsyncNioConnection extends AsyncConnection {
int totalCount = 0;
boolean hasRemain = true;
boolean writeCompleted = true;
if (invokeDirect) {
currWriteInvoker++;
}
while (invokeDirect && hasRemain) { //必须要将buffer写完为止
if (writeByteTuple1Array != null) {
final ByteBuffer buffer = pollWriteBuffer();

View File

@@ -28,6 +28,8 @@ public class ClientWriteIOThread extends ClientIOThread {
@Override
public void run() {
final ByteBuffer buffer = getBufferSupplier().get();
final int capacity = buffer.capacity();
while (!isClosed()) {
ClientEntity entity;
try {
@@ -44,7 +46,14 @@ public class ClientWriteIOThread extends ClientIOThread {
ByteArray rw = conn.writeArray;
rw.clear();
request.accept(conn, rw);
conn.channel.write(rw, conn.writeHandler);
if (rw.length() <= capacity) {
buffer.clear();
buffer.put(rw.content(), 0, rw.length());
buffer.flip();
conn.channel.write(buffer, null, conn.writeHandler);
} else {
conn.channel.write(rw, conn.writeHandler);
}
}
} catch (InterruptedException e) {
}