From d603915d2ba28334f617cf2b04c689c9f233e16a Mon Sep 17 00:00:00 2001 From: Redkale Date: Mon, 9 Jan 2023 16:37:22 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96ClientWriteIOThread?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/net/AsyncConnection.java | 14 +++--- .../java/org/redkale/net/AsyncIOThread.java | 2 - .../org/redkale/net/AsyncNioConnection.java | 47 +++++++++---------- .../net/client/ClientWriteIOThread.java | 11 ++++- 4 files changed, 38 insertions(+), 36 deletions(-) diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index 3b8f451a3..e9857ae4f 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -320,7 +320,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl } } buffer.flip(); - CompletionHandler newhandler = new CompletionHandler() { + CompletionHandler newHandler = new CompletionHandler() { @Override public void completed(Integer result, Void attachment) { offerWriteBuffer(buffer); @@ -333,7 +333,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl handler.failed(exc, attachment); } }; - write(buffer, null, newhandler); + write(buffer, null, newHandler); } else { ByteBufferWriter writer = ByteBufferWriter.create(sslEngine == null ? writeBufferSupplier : () -> pollWriteSSLBuffer(), buffer); writer.put(headerContent, headerOffset, headerLength); @@ -344,7 +344,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl } } final ByteBuffer[] buffers = writer.toBuffers(); - CompletionHandler newhandler = new CompletionHandler() { + CompletionHandler newHandler = new CompletionHandler() { @Override public void completed(Integer result, Void attachment) { offerWriteBuffer(buffers); @@ -357,7 +357,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl handler.failed(exc, attachment); } }; - write(buffers, null, newhandler); + write(buffers, null, newHandler); } } @@ -384,7 +384,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl handler.completed(0, attachment); } else { ByteBuffer[] srcs = writer.toBuffers(); - CompletionHandler newhandler = new CompletionHandler() { + CompletionHandler newHandler = new CompletionHandler() { @Override public void completed(Integer result, A attachment) { offerWriteBuffer(srcs); @@ -398,9 +398,9 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl } }; if (srcs.length == 1) { - write(srcs[0], attachment, newhandler); + write(srcs[0], attachment, newHandler); } else { - write(srcs, attachment, newhandler); + write(srcs, attachment, newHandler); } } } diff --git a/src/main/java/org/redkale/net/AsyncIOThread.java b/src/main/java/org/redkale/net/AsyncIOThread.java index f4a692120..369933dab 100644 --- a/src/main/java/org/redkale/net/AsyncIOThread.java +++ b/src/main/java/org/redkale/net/AsyncIOThread.java @@ -176,7 +176,6 @@ public class AsyncIOThread extends WorkThread { key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); conn.doRead(true); } else if (conn.writeCompletionHandler != null && key.isWritable()) { - conn.currWriteInvoker = 0; key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); conn.doWrite(true); } @@ -186,7 +185,6 @@ public class AsyncIOThread extends WorkThread { key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); //不放开这行,在CompletableFuture时容易ReadPending conn.doRead(true); } else if (conn.writeCompletionHandler != null && key.isWritable()) { - conn.currWriteInvoker = 0; key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); conn.doWrite(true); } else if (key.isConnectable()) { diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index 95f75726b..022367a2b 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -62,8 +62,6 @@ abstract class AsyncNioConnection extends AsyncConnection { protected int writeTimeoutSeconds; - int currWriteInvoker; - protected byte[] writeByteTuple1Array; protected int writeByteTuple1Offset; @@ -154,10 +152,10 @@ abstract class AsyncNioConnection extends AsyncConnection { } this.readPending = true; if (this.readTimeoutSeconds > 0) { - AsyncNioCompletionHandler newhandler = this.readTimeoutCompletionHandler; - newhandler.handler(handler, this.readByteBuffer); // new AsyncNioCompletionHandler(handler, this.readByteBuffer); - this.readCompletionHandler = newhandler; - newhandler.timeoutFuture = ioGroup.scheduleTimeout(newhandler, this.readTimeoutSeconds, TimeUnit.SECONDS); + AsyncNioCompletionHandler newHandler = this.readTimeoutCompletionHandler; + newHandler.handler(handler, this.readByteBuffer); // new AsyncNioCompletionHandler(handler, this.readByteBuffer); + this.readCompletionHandler = newHandler; + newHandler.timeoutFuture = ioGroup.scheduleTimeout(newHandler, this.readTimeoutSeconds, TimeUnit.SECONDS); } else { this.readCompletionHandler = handler; } @@ -196,14 +194,14 @@ abstract class AsyncNioConnection extends AsyncConnection { this.writeByteTuple2Attachment = bodyAttachment; this.writeAttachment = null; if (this.writeTimeoutSeconds > 0) { - AsyncNioCompletionHandler newhandler = this.writeTimeoutCompletionHandler; - newhandler.handler(handler, null); // new AsyncNioCompletionHandler(handler, null); - this.writeCompletionHandler = newhandler; - newhandler.timeoutFuture = ioGroup.scheduleTimeout(newhandler, this.writeTimeoutSeconds, TimeUnit.SECONDS); + AsyncNioCompletionHandler newHandler = this.writeTimeoutCompletionHandler; + newHandler.handler(handler, null); // new AsyncNioCompletionHandler(handler, null); + this.writeCompletionHandler = newHandler; + newHandler.timeoutFuture = ioGroup.scheduleTimeout(newHandler, this.writeTimeoutSeconds, TimeUnit.SECONDS); } else { - AsyncNioCompletionHandler newhandler = this.writeTimeoutCompletionHandler; - newhandler.handler(handler, null); // new AsyncNioCompletionHandler(handler, null); - this.writeCompletionHandler = newhandler; + AsyncNioCompletionHandler newHandler = this.writeTimeoutCompletionHandler; + newHandler.handler(handler, null); // new AsyncNioCompletionHandler(handler, null); + this.writeCompletionHandler = newHandler; } doWrite(true); //如果不是true,则bodyCallback的执行可能会切换线程 } @@ -224,14 +222,14 @@ abstract class AsyncNioConnection extends AsyncConnection { this.writeByteBuffer = src; this.writeAttachment = attachment; if (this.writeTimeoutSeconds > 0) { - AsyncNioCompletionHandler newhandler = this.writeTimeoutCompletionHandler; - newhandler.handler(handler, attachment); // new AsyncNioCompletionHandler(handler, attachment); - this.writeCompletionHandler = newhandler; - newhandler.timeoutFuture = ioGroup.scheduleTimeout(newhandler, this.writeTimeoutSeconds, TimeUnit.SECONDS); + AsyncNioCompletionHandler newHandler = this.writeTimeoutCompletionHandler; + newHandler.handler(handler, attachment); // new AsyncNioCompletionHandler(handler, attachment); + this.writeCompletionHandler = newHandler; + newHandler.timeoutFuture = ioGroup.scheduleTimeout(newHandler, this.writeTimeoutSeconds, TimeUnit.SECONDS); } else { this.writeCompletionHandler = (CompletionHandler) handler; } - doWrite(true); // || !client || currWriteInvoker < MAX_INVOKER_ONSTACK // !client && ioThread.workExecutor == null + doWrite(true); } @Override @@ -252,14 +250,14 @@ abstract class AsyncNioConnection extends AsyncConnection { this.writeLength = length; this.writeAttachment = attachment; if (this.writeTimeoutSeconds > 0) { - AsyncNioCompletionHandler newhandler = this.writeTimeoutCompletionHandler; - newhandler.handler(handler, attachment); // new AsyncNioCompletionHandler(handler, attachment); - this.writeCompletionHandler = newhandler; - newhandler.timeoutFuture = ioGroup.scheduleTimeout(newhandler, this.writeTimeoutSeconds, TimeUnit.SECONDS); + AsyncNioCompletionHandler newHandler = this.writeTimeoutCompletionHandler; + newHandler.handler(handler, attachment); // new AsyncNioCompletionHandler(handler, attachment); + this.writeCompletionHandler = newHandler; + newHandler.timeoutFuture = ioGroup.scheduleTimeout(newHandler, this.writeTimeoutSeconds, TimeUnit.SECONDS); } else { this.writeCompletionHandler = (CompletionHandler) handler; } - doWrite(true); // || !client || currWriteInvoker < MAX_INVOKER_ONSTACK // !client && ioThread.workExecutor == null + doWrite(true); } public void doRead(boolean direct) { @@ -303,9 +301,6 @@ abstract class AsyncNioConnection extends AsyncConnection { int totalCount = 0; boolean hasRemain = true; boolean writeCompleted = true; - if (invokeDirect) { - currWriteInvoker++; - } while (invokeDirect && hasRemain) { //必须要将buffer写完为止 if (writeByteTuple1Array != null) { final ByteBuffer buffer = pollWriteBuffer(); diff --git a/src/main/java/org/redkale/net/client/ClientWriteIOThread.java b/src/main/java/org/redkale/net/client/ClientWriteIOThread.java index 2fa341a85..c07a7643f 100644 --- a/src/main/java/org/redkale/net/client/ClientWriteIOThread.java +++ b/src/main/java/org/redkale/net/client/ClientWriteIOThread.java @@ -28,6 +28,8 @@ public class ClientWriteIOThread extends ClientIOThread { @Override public void run() { + final ByteBuffer buffer = getBufferSupplier().get(); + final int capacity = buffer.capacity(); while (!isClosed()) { ClientEntity entity; try { @@ -44,7 +46,14 @@ public class ClientWriteIOThread extends ClientIOThread { ByteArray rw = conn.writeArray; rw.clear(); request.accept(conn, rw); - conn.channel.write(rw, conn.writeHandler); + if (rw.length() <= capacity) { + buffer.clear(); + buffer.put(rw.content(), 0, rw.length()); + buffer.flip(); + conn.channel.write(buffer, null, conn.writeHandler); + } else { + conn.channel.write(rw, conn.writeHandler); + } } } catch (InterruptedException e) { }