From 5da88a7916a22aecf6260c7ee5dafd18c57a179e Mon Sep 17 00:00:00 2001 From: redkale Date: Mon, 21 Oct 2024 12:41:17 +0800 Subject: [PATCH] PipelinePacket --- .../java/org/redkale/net/AsyncConnection.java | 10 +- .../java/org/redkale/net/AsyncIOThread.java | 4 - .../org/redkale/net/AsyncNioConnection.java | 86 ++++---------- .../java/org/redkale/net/PipelinePacket.java | 105 ++++++++++++++++++ 4 files changed, 132 insertions(+), 73 deletions(-) create mode 100644 src/main/java/org/redkale/net/PipelinePacket.java diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index acc9e1599..f642f95a4 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -65,7 +65,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { private Consumer writeBufferConsumer; - private final ReentrantLock pipelineLock = new ReentrantLock(); + final ReentrantLock pipelineLock = new ReentrantLock(); private ByteBufferWriter pipelineWriter; @@ -205,15 +205,13 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { return ioWriteThread; } - public abstract AsyncConnection pipelineHandler(CompletionHandler handler); - /** * 快速发送 * - * @see org.redkale.net.AsyncNioConnection#pipelineWrite(java.util.function.Consumer) - * @param consumer Consumer + * @see org.redkale.net.AsyncNioConnection#pipelineWrite(org.redkale.net.PipelinePacket) + * @param packet PipelinePacket */ - public abstract void pipelineWrite(Consumer consumer); + public abstract void pipelineWrite(PipelinePacket packet); public abstract boolean isTCP(); diff --git a/src/main/java/org/redkale/net/AsyncIOThread.java b/src/main/java/org/redkale/net/AsyncIOThread.java index a7a78dc73..52b6b5df4 100644 --- a/src/main/java/org/redkale/net/AsyncIOThread.java +++ b/src/main/java/org/redkale/net/AsyncIOThread.java @@ -138,10 +138,6 @@ public class AsyncIOThread extends WorkThread { selector.wakeup(); } - public final void wakeup() { - selector.wakeup(); - } - public Supplier getBufferSupplier() { return bufferSupplier; } diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index 0d8efb695..b9afb449a 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -10,12 +10,10 @@ import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.*; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.function.Consumer; import javax.net.ssl.SSLContext; -import org.redkale.util.ByteArray; import org.redkale.util.ByteBufferWriter; -import org.redkale.util.RedkaleException; /** * 详情见: https://redkale.org @@ -42,11 +40,8 @@ abstract class AsyncNioConnection extends AsyncConnection { protected SelectionKey readKey; // ------------------------------ pipeline写操作 ------------------------------------ - protected ByteArray pipelineWriteArray; + protected Queue pipelineWriteQueue; - protected Queue> pipelineWriteQueue; - - protected CompletionHandler pipelineWriteHandler; // -------------------------------- 写操作 -------------------------------------- protected byte[] writeByteTuple1Array; @@ -163,63 +158,32 @@ abstract class AsyncNioConnection extends AsyncConnection { } @Override - public final AsyncConnection pipelineHandler(CompletionHandler handler) { - if (!clientMode) { - throw new RedkaleException("fast-writer only for client connection"); - } - this.pipelineWriteHandler = Objects.requireNonNull(handler); - this.pipelineWriteArray = new ByteArray(); - this.pipelineWriteQueue = new ConcurrentLinkedQueue<>(); - return this; - } - - @Override - public final void pipelineWrite(Consumer consumer) { - if (pipelineWriteHandler == null) { - throw new RedkaleException("fast-writer handler is null"); - } - this.pipelineWriteQueue.offer(consumer); - if (writeKey == null) { - this.ioWriteThread.register(this::pipelineWriteRegister); - } else { - this.writeCompletionHandler = this.pipelineWriteHandler; - writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE); - this.ioWriteThread.wakeup(); - } - } - - private void pipelineWriteRegister(Selector selector) { - try { - if (writeKey == null) { - writeKey = keyFor(selector); + public final void pipelineWrite(PipelinePacket packet) { + if (pipelineWriteQueue == null) { + pipelineLock.lock(); + try { + if (pipelineWriteQueue == null) { + pipelineWriteQueue = new ConcurrentLinkedDeque<>(); + } + } finally { + pipelineLock.unlock(); } - if (writeKey == null) { - writeKey = implRegister(selector, SelectionKey.OP_WRITE); - writeKey.attach(this); - } else { - writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE); - } - // writeCompletionHandler必须赋值,不然会跳过doWrite - this.writeCompletionHandler = this.pipelineWriteHandler; - } catch (ClosedChannelException e) { - e.printStackTrace(); - this.pipelineWriteQueue.clear(); - handleWrite(0, e); } + this.pipelineWriteQueue.offer(packet); + this.ioWriteThread.execute(this::pipelineDoWrite); } - private void pipelineWritePrepare() { - ByteArray array = this.pipelineWriteArray.clear(); - Consumer func; - while ((func = pipelineWriteQueue.poll()) != null) { - func.accept(array); + private void pipelineDoWrite() { + PipelinePacket packet; + while ((packet = pipelineWriteQueue.poll()) != null) { + this.writePending = true; + this.writeByteTuple1Array = packet.tupleBytes; + this.writeByteTuple1Offset = packet.tupleOffset; + this.writeByteTuple1Length = packet.tupleLength; + this.writeCompletionHandler = packet.handler; + this.writeAttachment = packet.attach; + doWrite(); } - this.writePending = true; - this.writeAttachment = null; - this.writeByteTuple1Array = array.content(); - this.writeByteTuple1Offset = array.offset(); - this.writeByteTuple1Length = array.length(); - this.writeCompletionHandler = this.pipelineWriteHandler; } @Override @@ -364,10 +328,6 @@ abstract class AsyncNioConnection extends AsyncConnection { boolean hasRemain = true; boolean writeCompleted = true; boolean error = false; - // pipelineWrite - if (clientMode && pipelineWriteArray != null && writeByteBuffer == null && writeByteBuffers == null) { - pipelineWritePrepare(); - } int batchOffset = writeBuffersOffset; int batchLength = writeBuffersLength; while (hasRemain) { // 必须要将buffer写完为止 diff --git a/src/main/java/org/redkale/net/PipelinePacket.java b/src/main/java/org/redkale/net/PipelinePacket.java new file mode 100644 index 000000000..6de6d063f --- /dev/null +++ b/src/main/java/org/redkale/net/PipelinePacket.java @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2016-2116 Redkale + * All rights reserved. + */ +package org.redkale.net; + +import java.nio.channels.CompletionHandler; +import org.redkale.convert.ConvertColumn; +import org.redkale.util.ByteTuple; + +/** + * pipelineWrite写入包 + * + * @author zhangjx + * @param attachment类型 + * @since 2.8.0 + */ +public class PipelinePacket { + + @ConvertColumn(index = 1) + protected byte[] tupleBytes; + + @ConvertColumn(index = 2) + protected int tupleOffset; + + @ConvertColumn(index = 3) + protected int tupleLength; + + @ConvertColumn(index = 4) + protected CompletionHandler handler; + + @ConvertColumn(index = 5) + protected T attach; + + public PipelinePacket() {} + + public PipelinePacket(ByteTuple data, CompletionHandler handler) { + this(data, handler, null); + } + + public PipelinePacket(ByteTuple data, CompletionHandler handler, T attach) { + this(data.content(), data.offset(), data.length(), handler, attach); + } + + public PipelinePacket(byte[] tupleBytes, CompletionHandler handler) { + this(tupleBytes, 0, tupleBytes.length, handler, null); + } + + public PipelinePacket(byte[] tupleBytes, CompletionHandler handler, T attach) { + this(tupleBytes, 0, tupleBytes.length, handler, attach); + } + + public PipelinePacket(byte[] tupleBytes, int tupleOffset, int tupleLength, CompletionHandler handler) { + this(tupleBytes, tupleOffset, tupleLength, handler, null); + } + + public PipelinePacket( + byte[] tupleBytes, int tupleOffset, int tupleLength, CompletionHandler handler, T attach) { + this.tupleBytes = tupleBytes; + this.tupleOffset = tupleOffset; + this.tupleLength = tupleLength; + this.handler = handler; + this.attach = attach; + } + + public byte[] getTupleBytes() { + return tupleBytes; + } + + public void setTupleBytes(byte[] tupleBytes) { + this.tupleBytes = tupleBytes; + } + + public int getTupleOffset() { + return tupleOffset; + } + + public void setTupleOffset(int tupleOffset) { + this.tupleOffset = tupleOffset; + } + + public int getTupleLength() { + return tupleLength; + } + + public void setTupleLength(int tupleLength) { + this.tupleLength = tupleLength; + } + + public CompletionHandler getHandler() { + return handler; + } + + public void setHandler(CompletionHandler handler) { + this.handler = handler; + } + + public T getAttach() { + return attach; + } + + public void setAttach(T attach) { + this.attach = attach; + } +}