diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index 19693f92f..f07e71d9e 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -205,6 +205,16 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { return ioWriteThread; } + public abstract AsyncConnection pipelineHandler(CompletionHandler handler); + + /** + * 快速发送 + * + * @see org.redkale.net.AsyncNioConnection#pipelineWrite(java.util.function.Consumer) + * @param consumers Consumer + */ + public abstract void pipelineWrite(Consumer consumer); + public abstract boolean isTCP(); public abstract boolean shutdownInput(); diff --git a/src/main/java/org/redkale/net/AsyncIOThread.java b/src/main/java/org/redkale/net/AsyncIOThread.java index 52b6b5df4..a7a78dc73 100644 --- a/src/main/java/org/redkale/net/AsyncIOThread.java +++ b/src/main/java/org/redkale/net/AsyncIOThread.java @@ -138,6 +138,10 @@ public class AsyncIOThread extends WorkThread { selector.wakeup(); } + public final void wakeup() { + selector.wakeup(); + } + public Supplier getBufferSupplier() { return bufferSupplier; } diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index 6e1e72706..0d8efb695 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -10,9 +10,12 @@ import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.function.Consumer; import javax.net.ssl.SSLContext; +import org.redkale.util.ByteArray; import org.redkale.util.ByteBufferWriter; +import org.redkale.util.RedkaleException; /** * 详情见: https://redkale.org @@ -37,6 +40,13 @@ abstract class AsyncNioConnection extends AsyncConnection { protected CompletionHandler readCompletionHandler; protected SelectionKey readKey; + + // ------------------------------ pipeline写操作 ------------------------------------ + protected ByteArray pipelineWriteArray; + + protected Queue> pipelineWriteQueue; + + protected CompletionHandler pipelineWriteHandler; // -------------------------------- 写操作 -------------------------------------- protected byte[] writeByteTuple1Array; @@ -84,12 +94,6 @@ abstract class AsyncNioConnection extends AsyncConnection { return remoteAddress; } - // @Override - // public AsyncConnection fastHandler(CompletionHandler handler) { - // Objects.requireNonNull(handler); - // this.writeFastHandler = (CompletionHandler) handler; - // return this; - // } @Override protected void startHandshake(final Consumer callback) { ioReadThread.register(t -> super.startHandshake(callback)); @@ -158,6 +162,66 @@ abstract class AsyncNioConnection extends AsyncConnection { doRead(this.ioReadThread.inCurrThread()); } + @Override + public final AsyncConnection pipelineHandler(CompletionHandler handler) { + if (!clientMode) { + throw new RedkaleException("fast-writer only for client connection"); + } + this.pipelineWriteHandler = Objects.requireNonNull(handler); + this.pipelineWriteArray = new ByteArray(); + this.pipelineWriteQueue = new ConcurrentLinkedQueue<>(); + return this; + } + + @Override + public final void pipelineWrite(Consumer consumer) { + if (pipelineWriteHandler == null) { + throw new RedkaleException("fast-writer handler is null"); + } + this.pipelineWriteQueue.offer(consumer); + if (writeKey == null) { + this.ioWriteThread.register(this::pipelineWriteRegister); + } else { + this.writeCompletionHandler = this.pipelineWriteHandler; + writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE); + this.ioWriteThread.wakeup(); + } + } + + private void pipelineWriteRegister(Selector selector) { + try { + if (writeKey == null) { + writeKey = keyFor(selector); + } + if (writeKey == null) { + writeKey = implRegister(selector, SelectionKey.OP_WRITE); + writeKey.attach(this); + } else { + writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE); + } + // writeCompletionHandler必须赋值,不然会跳过doWrite + this.writeCompletionHandler = this.pipelineWriteHandler; + } catch (ClosedChannelException e) { + e.printStackTrace(); + this.pipelineWriteQueue.clear(); + handleWrite(0, e); + } + } + + private void pipelineWritePrepare() { + ByteArray array = this.pipelineWriteArray.clear(); + Consumer func; + while ((func = pipelineWriteQueue.poll()) != null) { + func.accept(array); + } + this.writePending = true; + this.writeAttachment = null; + this.writeByteTuple1Array = array.content(); + this.writeByteTuple1Offset = array.offset(); + this.writeByteTuple1Length = array.length(); + this.writeCompletionHandler = this.pipelineWriteHandler; + } + @Override public void write( byte[] headerContent, @@ -300,6 +364,10 @@ abstract class AsyncNioConnection extends AsyncConnection { boolean hasRemain = true; boolean writeCompleted = true; boolean error = false; + // pipelineWrite + if (clientMode && pipelineWriteArray != null && writeByteBuffer == null && writeByteBuffers == null) { + pipelineWritePrepare(); + } int batchOffset = writeBuffersOffset; int batchLength = writeBuffersLength; while (hasRemain) { // 必须要将buffer写完为止 diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index c6102dec8..32a702ed9 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -94,7 +94,7 @@ public abstract class ClientConnection, R, P> client, AsyncConnection channel) { this.client = client; this.codec = createCodec(); - this.channel = channel.beforeCloseListener(this); // .fastHandler(writeHandler); + this.channel = channel.beforeCloseListener(this); // .pipelineHandler(writeHandler); this.writeBuffer = channel.pollWriteBuffer(); } @@ -150,7 +150,7 @@ public abstract class ClientConnection