From f3475341a0f6c78746fd4ac1443f092471595bab Mon Sep 17 00:00:00 2001 From: redkale Date: Tue, 27 Jun 2023 09:10:24 +0800 Subject: [PATCH] =?UTF-8?q?AsyncConnection=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/net/AsyncConnection.java | 28 ++- .../java/org/redkale/net/AsyncIOThread.java | 4 +- .../org/redkale/net/AsyncNioConnection.java | 182 ++++++++---------- src/main/java/org/redkale/net/Response.java | 4 +- 4 files changed, 97 insertions(+), 121 deletions(-) diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index c16fdb25d..86d861a2f 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -352,38 +352,35 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } public final void write(byte[] bytes, CompletionHandler handler) { - write(bytes, 0, bytes.length, null, 0, 0, null, null, handler); + write(bytes, 0, bytes.length, null, 0, 0, handler); } public final void write(ByteTuple array, CompletionHandler handler) { - write(array.content(), array.offset(), array.length(), null, 0, 0, null, null, handler); + write(array.content(), array.offset(), array.length(), null, 0, 0, handler); } public final void write(ByteTuple array, A attachment, CompletionHandler handler) { - write(array.content(), array.offset(), array.length(), null, 0, 0, null, null, attachment, handler); + write(array.content(), array.offset(), array.length(), null, 0, 0, attachment, handler); } public final void write(byte[] bytes, int offset, int length, CompletionHandler handler) { - write(bytes, offset, length, null, 0, 0, null, null, handler); + write(bytes, offset, length, null, 0, 0, handler); } public final void write(ByteTuple header, ByteTuple body, CompletionHandler handler) { - write(header.content(), header.offset(), header.length(), body == null ? null : body.content(), body == null ? 0 : body.offset(), body == null ? 0 : body.length(), null, null, handler); + write(header.content(), header.offset(), header.length(), body == null ? null : body.content(), body == null ? 0 : body.offset(), body == null ? 0 : body.length(), handler); } - public void write(byte[] headerContent, int headerOffset, int headerLength, byte[] bodyContent, int bodyOffset, int bodyLength, Consumer bodyCallback, Object bodyAttachment, CompletionHandler handler) { - write(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength, bodyCallback, bodyAttachment, null, handler); + public void write(byte[] headerContent, int headerOffset, int headerLength, byte[] bodyContent, int bodyOffset, int bodyLength, CompletionHandler handler) { + write(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength, null, handler); } - public void write(byte[] headerContent, int headerOffset, int headerLength, byte[] bodyContent, int bodyOffset, int bodyLength, Consumer bodyCallback, Object bodyAttachment, Object handlerAttachment, CompletionHandler handler) { + public void write(byte[] headerContent, int headerOffset, int headerLength, byte[] bodyContent, int bodyOffset, int bodyLength, Object handlerAttachment, CompletionHandler handler) { final ByteBuffer buffer = sslEngine == null ? pollWriteBuffer() : pollWriteSSLBuffer(); if (buffer.remaining() >= headerLength + bodyLength) { buffer.put(headerContent, headerOffset, headerLength); if (bodyLength > 0) { buffer.put(bodyContent, bodyOffset, bodyLength); - if (bodyCallback != null) { - bodyCallback.accept(bodyAttachment); - } } buffer.flip(); CompletionHandler newHandler = new CompletionHandler() { @@ -405,9 +402,6 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { writer.put(headerContent, headerOffset, headerLength); if (bodyLength > 0) { writer.put(bodyContent, bodyOffset, bodyLength); - if (bodyCallback != null) { - bodyCallback.accept(bodyAttachment); - } } final ByteBuffer[] buffers = writer.toBuffers(); CompletionHandler newHandler = new CompletionHandler() { @@ -486,11 +480,11 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } } - public final void writeInIOThread(byte[] headerContent, int headerOffset, int headerLength, byte[] bodyContent, int bodyOffset, int bodyLength, Consumer bodyCallback, Object bodyAttachment, CompletionHandler handler) { + public final void writeInIOThread(byte[] headerContent, int headerOffset, int headerLength, byte[] bodyContent, int bodyOffset, int bodyLength, CompletionHandler handler) { if (inCurrWriteThread()) { - write(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength, bodyCallback, bodyAttachment, handler); + write(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength, handler); } else { - executeWrite(() -> write(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength, bodyCallback, bodyAttachment, handler)); + executeWrite(() -> write(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength, handler)); } } diff --git a/src/main/java/org/redkale/net/AsyncIOThread.java b/src/main/java/org/redkale/net/AsyncIOThread.java index 92bb92aa2..8a7374be9 100644 --- a/src/main/java/org/redkale/net/AsyncIOThread.java +++ b/src/main/java/org/redkale/net/AsyncIOThread.java @@ -184,7 +184,7 @@ public class AsyncIOThread extends WorkThread { conn.doConnect(); } else if (conn.writeCompletionHandler != null && key.isWritable()) { key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); - conn.doWrite(true); + conn.doWrite(); } else if (conn.readCompletionHandler != null && key.isReadable()) { key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); conn.doRead(true); @@ -195,7 +195,7 @@ public class AsyncIOThread extends WorkThread { conn.doRead(true); } else if (conn.writeCompletionHandler != null && key.isWritable()) { key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); - conn.doWrite(true); + conn.doWrite(); } else if (key.isConnectable()) { key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT); conn.doConnect(); diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index c33ca6207..e52960aed 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -63,10 +63,6 @@ abstract class AsyncNioConnection extends AsyncConnection { protected int writeByteTuple2Length; - protected Consumer writeByteTuple2Callback; - - protected Object writeByteTuple2Attachment; - //写操作, 二选一,要么writeByteBuffer有值,要么writeByteBuffers、writeOffset、writeLength有值 protected ByteBuffer writeByteBuffer; @@ -188,11 +184,10 @@ abstract class AsyncNioConnection extends AsyncConnection { @Override public void write(byte[] headerContent, int headerOffset, int headerLength, - byte[] bodyContent, int bodyOffset, int bodyLength, - Consumer bodyCallback, Object bodyAttachment, CompletionHandler handler) { + byte[] bodyContent, int bodyOffset, int bodyLength, CompletionHandler handler) { if (sslEngine != null) { - super.write(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength, bodyCallback, bodyAttachment, handler); + super.write(headerContent, headerOffset, headerLength, bodyContent, bodyOffset, bodyLength, handler); return; } Objects.requireNonNull(headerContent); @@ -212,8 +207,6 @@ abstract class AsyncNioConnection extends AsyncConnection { this.writeByteTuple2Array = bodyContent; this.writeByteTuple2Offset = bodyOffset; this.writeByteTuple2Length = bodyLength; - this.writeByteTuple2Callback = bodyCallback; - this.writeByteTuple2Attachment = bodyAttachment; this.writeAttachment = null; if (this.writeTimeoutSeconds > 0) { AsyncNioCompletionHandler newHandler = this.writeTimeoutCompletionHandler; @@ -225,7 +218,7 @@ abstract class AsyncNioConnection extends AsyncConnection { newHandler.handler(handler, null); // new AsyncNioCompletionHandler(handler, null); this.writeCompletionHandler = newHandler; } - doWrite(true); //如果不是true,则bodyCallback的执行可能会切换线程 + doWrite(); //如果不是true,则bodyCallback的执行可能会切换线程 } @Override @@ -251,7 +244,7 @@ abstract class AsyncNioConnection extends AsyncConnection { } else { this.writeCompletionHandler = (CompletionHandler) handler; } - doWrite(true); + doWrite(); } @Override @@ -279,7 +272,7 @@ abstract class AsyncNioConnection extends AsyncConnection { } else { this.writeCompletionHandler = (CompletionHandler) handler; } - doWrite(true); + doWrite(); } public void doRead(boolean direct) { @@ -315,105 +308,94 @@ abstract class AsyncNioConnection extends AsyncConnection { } } - public void doWrite(boolean direct) { + public void doWrite() { try { this.writeTime = System.currentTimeMillis(); int totalCount = 0; boolean hasRemain = true; boolean writeCompleted = true; - if (direct) { - int batchOffset = writeOffset; - int batchLength = writeLength; - while (hasRemain) { //必须要将buffer写完为止 - if (writeByteTuple1Array != null) { - final ByteBuffer buffer = pollWriteBuffer(); - if (buffer.remaining() >= writeByteTuple1Length + writeByteTuple2Length) { - buffer.put(writeByteTuple1Array, writeByteTuple1Offset, writeByteTuple1Length); - if (writeByteTuple2Length > 0) { - buffer.put(writeByteTuple2Array, writeByteTuple2Offset, writeByteTuple2Length); - if (writeByteTuple2Callback != null) { - writeByteTuple2Callback.accept(writeByteTuple2Attachment); - } - } - buffer.flip(); - writeByteBuffer = buffer; - writeByteTuple1Array = null; - writeByteTuple1Offset = 0; - writeByteTuple1Length = 0; - writeByteTuple2Array = null; - writeByteTuple2Offset = 0; - writeByteTuple2Length = 0; - writeByteTuple2Callback = null; - writeByteTuple2Attachment = null; - } else { - ByteBufferWriter writer = ByteBufferWriter.create(getWriteBufferSupplier(), buffer); - writer.put(writeByteTuple1Array, writeByteTuple1Offset, writeByteTuple1Length); - if (writeByteTuple2Length > 0) { - writer.put(writeByteTuple2Array, writeByteTuple2Offset, writeByteTuple2Length); - if (writeByteTuple2Callback != null) { - writeByteTuple2Callback.accept(writeByteTuple2Attachment); - } - } - final ByteBuffer[] buffers = writer.toBuffers(); - writeByteBuffers = buffers; - writeOffset = 0; - writeLength = buffers.length; - batchOffset = writeOffset; - batchLength = writeLength; - writeByteTuple1Array = null; - writeByteTuple1Offset = 0; - writeByteTuple1Length = 0; - writeByteTuple2Array = null; - writeByteTuple2Offset = 0; - writeByteTuple2Length = 0; - writeByteTuple2Callback = null; - writeByteTuple2Attachment = null; - } - if (this.writeCompletionHandler == this.writeTimeoutCompletionHandler) { - if (writeByteBuffer == null) { - this.writeTimeoutCompletionHandler.buffers(writeByteBuffers); - } else { - this.writeTimeoutCompletionHandler.buffer(writeByteBuffer); - } - } - } - int writeCount; - if (writeByteBuffer != null) { - writeCount = implWrite(writeByteBuffer); - hasRemain = writeByteBuffer.hasRemaining(); - } else { - writeCount = implWrite(writeByteBuffers, batchOffset, batchLength); - boolean remain = false; - for (int i = 0; i < batchLength; i++) { - if (writeByteBuffers[batchOffset + i].hasRemaining()) { - remain = true; - batchOffset += i; - batchLength -= i; - break; - } - } - hasRemain = remain; - } - if (writeCount == 0) { - if (hasRemain) { - //writeCompleted = false; - //writeTotal = totalCount; - continue; //要全部输出完才返回 + int batchOffset = writeOffset; + int batchLength = writeLength; + while (hasRemain) { //必须要将buffer写完为止 + if (writeByteTuple1Array != null) { + final ByteBuffer buffer = pollWriteBuffer(); + if (buffer.remaining() >= writeByteTuple1Length + writeByteTuple2Length) { + buffer.put(writeByteTuple1Array, writeByteTuple1Offset, writeByteTuple1Length); + if (writeByteTuple2Length > 0) { + buffer.put(writeByteTuple2Array, writeByteTuple2Offset, writeByteTuple2Length); } - break; - } else if (writeCount < 0) { - if (totalCount == 0) { - totalCount = writeCount; - } - break; + buffer.flip(); + writeByteBuffer = buffer; + writeByteTuple1Array = null; + writeByteTuple1Offset = 0; + writeByteTuple1Length = 0; + writeByteTuple2Array = null; + writeByteTuple2Offset = 0; + writeByteTuple2Length = 0; } else { - totalCount += writeCount; + ByteBufferWriter writer = ByteBufferWriter.create(getWriteBufferSupplier(), buffer); + writer.put(writeByteTuple1Array, writeByteTuple1Offset, writeByteTuple1Length); + if (writeByteTuple2Length > 0) { + writer.put(writeByteTuple2Array, writeByteTuple2Offset, writeByteTuple2Length); + } + final ByteBuffer[] buffers = writer.toBuffers(); + writeByteBuffers = buffers; + writeOffset = 0; + writeLength = buffers.length; + batchOffset = writeOffset; + batchLength = writeLength; + writeByteTuple1Array = null; + writeByteTuple1Offset = 0; + writeByteTuple1Length = 0; + writeByteTuple2Array = null; + writeByteTuple2Offset = 0; + writeByteTuple2Length = 0; } - if (!hasRemain) { - break; + if (this.writeCompletionHandler == this.writeTimeoutCompletionHandler) { + if (writeByteBuffer == null) { + this.writeTimeoutCompletionHandler.buffers(writeByteBuffers); + } else { + this.writeTimeoutCompletionHandler.buffer(writeByteBuffer); + } } } + int writeCount; + if (writeByteBuffer != null) { + writeCount = implWrite(writeByteBuffer); + hasRemain = writeByteBuffer.hasRemaining(); + } else { + writeCount = implWrite(writeByteBuffers, batchOffset, batchLength); + boolean remain = false; + for (int i = 0; i < batchLength; i++) { + if (writeByteBuffers[batchOffset + i].hasRemaining()) { + remain = true; + batchOffset += i; + batchLength -= i; + break; + } + } + hasRemain = remain; + } + + if (writeCount == 0) { + if (hasRemain) { + //writeCompleted = false; + //writeTotal = totalCount; + continue; //要全部输出完才返回 + } + break; + } else if (writeCount < 0) { + if (totalCount == 0) { + totalCount = writeCount; + } + break; + } else { + totalCount += writeCount; + } + if (!hasRemain) { + break; + } } if (writeCompleted && (totalCount != 0 || !hasRemain)) { diff --git a/src/main/java/org/redkale/net/Response.java b/src/main/java/org/redkale/net/Response.java index 033d3a6dd..b26842773 100644 --- a/src/main/java/org/redkale/net/Response.java +++ b/src/main/java/org/redkale/net/Response.java @@ -367,7 +367,7 @@ public abstract class Response> { } } - public void finish(boolean kill, final byte[] bs1, int offset1, int length1, final byte[] bs2, int offset2, int length2, Consumer callback, A attachment) { + public void finish(boolean kill, final byte[] bs1, int offset1, int length1, final byte[] bs2, int offset2, int length2) { if (kill) { refuseAlive(); } @@ -384,7 +384,7 @@ public abstract class Response> { this.channel.appendPipeline(request.pipelineIndex, request.pipelineCount, bs1, offset1, length1, bs2, offset2, length2); this.channel.writePipelineInIOThread(this.finishBytesIOThreadHandler); } else { - this.channel.writeInIOThread(bs1, offset1, length1, bs2, offset2, length2, callback, attachment, finishBytesIOThreadHandler); + this.channel.writeInIOThread(bs1, offset1, length1, bs2, offset2, length2, finishBytesIOThreadHandler); } }