From 65b4e66677add48ce18ee62401f0878d638fec3b Mon Sep 17 00:00:00 2001 From: redkale Date: Sat, 9 Nov 2024 08:37:51 +0800 Subject: [PATCH] =?UTF-8?q?=E7=A7=BB=E9=99=A4pipelineWrite?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/net/AsyncConnection.java | 39 ++++--- .../org/redkale/net/AsyncNioConnection.java | 36 ------ .../java/org/redkale/net/PipelinePacket.java | 104 ------------------ 3 files changed, 23 insertions(+), 156 deletions(-) delete mode 100644 src/main/java/org/redkale/net/PipelinePacket.java diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index 381f6bd98..877f25bc3 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -213,14 +213,6 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { writeLock.unlock(); } - /** - * 快速发送 - * - * @see org.redkale.net.AsyncNioConnection#pipelineWrite(org.redkale.net.PipelinePacket...) - * @param packets PipelinePacket[] - */ - public abstract void pipelineWrite(PipelinePacket... packets); - public abstract boolean isTCP(); public abstract boolean shutdownInput(); @@ -300,16 +292,31 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } } + /** + * src写完才会回调 + * + * @see #lockWrite() + * @see #unlockWrite() + * @param array 内容 + * @param handler 回调函数 + */ public final void write(ByteTuple array, CompletionHandler handler) { write(array.content(), array.offset(), array.length(), (byte[]) null, 0, 0, handler); } + /** + * src写完才会回调 + * + * @see #lockWrite() + * @see #unlockWrite() + * @param buffer 内容 + * @param handler 回调函数 + */ public final void write(ByteBuffer buffer, CompletionHandler handler) { write(buffer, null, handler); } - // src写完才会回调 - final void write(ByteBuffer src, A attachment, CompletionHandler handler) { + void write(ByteBuffer src, A attachment, CompletionHandler handler) { if (sslEngine == null) { writeImpl(src, attachment, handler); } else { @@ -328,7 +335,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } } - final void write( + void write( ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler) { if (sslEngine == null) { writeImpl(srcs, offset, length, attachment, handler); @@ -348,15 +355,15 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } } - final void write(ByteBuffer[] srcs, A attachment, CompletionHandler handler) { + void write(ByteBuffer[] srcs, A attachment, CompletionHandler handler) { write(srcs, 0, srcs.length, attachment, handler); } - final void write(byte[] bytes, CompletionHandler handler) { + void write(byte[] bytes, CompletionHandler handler) { write(bytes, 0, bytes.length, (byte[]) null, 0, 0, handler); } - final void write(byte[] bytes, int offset, int length, CompletionHandler handler) { + void write(byte[] bytes, int offset, int length, CompletionHandler handler) { write(bytes, offset, length, (byte[]) null, 0, 0, handler); } @@ -505,11 +512,11 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { return writer != null && writer.position() > 0; } - public final void writePipeline(CompletionHandler handler) { + void writePipeline(CompletionHandler handler) { writePipeline(null, handler); } - public void writePipeline(A attachment, CompletionHandler handler) { + void writePipeline(A attachment, CompletionHandler handler) { ByteBufferWriter writer = this.pipelineWriter; this.pipelineWriter = null; if (writer == null) { diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index 8eee4ae69..b0bd94d84 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -10,7 +10,6 @@ import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.*; -import java.util.concurrent.ConcurrentLinkedDeque; import java.util.function.Consumer; import javax.net.ssl.SSLContext; import org.redkale.util.ByteBufferWriter; @@ -39,9 +38,6 @@ abstract class AsyncNioConnection extends AsyncConnection { protected SelectionKey readKey; - // ------------------------------ pipeline写操作 ------------------------------------ - protected Queue pipelineWriteQueue; - // -------------------------------- 写操作 -------------------------------------- protected byte[] writeByteTuple1Array; @@ -72,7 +68,6 @@ abstract class AsyncNioConnection extends AsyncConnection { protected SelectionKey writeKey; - // protected CompletionHandler writeFastHandler; public AsyncNioConnection( boolean clientMode, AsyncIOGroup ioGroup, @@ -157,37 +152,6 @@ abstract class AsyncNioConnection extends AsyncConnection { doRead(this.ioReadThread.inCurrThread()); } - @Override - public final void pipelineWrite(PipelinePacket... packets) { - if (pipelineWriteQueue == null) { - lockWrite(); - try { - if (pipelineWriteQueue == null) { - pipelineWriteQueue = new ConcurrentLinkedDeque<>(); - } - } finally { - unlockWrite(); - } - } - for (PipelinePacket packet : packets) { - this.pipelineWriteQueue.offer(packet); - } - this.ioWriteThread.execute(this::pipelineDoWrite); - } - - private void pipelineDoWrite() { - PipelinePacket packet; - while ((packet = pipelineWriteQueue.poll()) != null) { - this.writePending = true; - this.writeByteTuple1Array = packet.tupleBytes; - this.writeByteTuple1Offset = packet.tupleOffset; - this.writeByteTuple1Length = packet.tupleLength; - this.writeCompletionHandler = packet.handler; - this.writeAttachment = packet.attach; - doWrite(); - } - } - @Override public void write( byte[] headerContent, diff --git a/src/main/java/org/redkale/net/PipelinePacket.java b/src/main/java/org/redkale/net/PipelinePacket.java deleted file mode 100644 index 3751fd81b..000000000 --- a/src/main/java/org/redkale/net/PipelinePacket.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Copyright (c) 2016-2116 Redkale - * All rights reserved. - */ -package org.redkale.net; - -import java.nio.channels.CompletionHandler; -import org.redkale.convert.ConvertColumn; -import org.redkale.util.ByteTuple; - -/** - * pipelineWrite写入包 - * - * @author zhangjx - * @since 2.8.0 - */ -public class PipelinePacket { - - @ConvertColumn(index = 1) - protected byte[] tupleBytes; - - @ConvertColumn(index = 2) - protected int tupleOffset; - - @ConvertColumn(index = 3) - protected int tupleLength; - - @ConvertColumn(index = 4) - protected CompletionHandler handler; - - @ConvertColumn(index = 5) - protected Object attach; - - public PipelinePacket() {} - - public PipelinePacket(ByteTuple data, CompletionHandler handler) { - this(data, handler, null); - } - - public PipelinePacket(ByteTuple data, CompletionHandler handler, Object attach) { - this(data.content(), data.offset(), data.length(), handler, attach); - } - - public PipelinePacket(byte[] tupleBytes, CompletionHandler handler) { - this(tupleBytes, 0, tupleBytes.length, handler, null); - } - - public PipelinePacket(byte[] tupleBytes, CompletionHandler handler, Object attach) { - this(tupleBytes, 0, tupleBytes.length, handler, attach); - } - - public PipelinePacket(byte[] tupleBytes, int tupleOffset, int tupleLength, CompletionHandler handler) { - this(tupleBytes, tupleOffset, tupleLength, handler, null); - } - - public PipelinePacket( - byte[] tupleBytes, int tupleOffset, int tupleLength, CompletionHandler handler, Object attach) { - this.tupleBytes = tupleBytes; - this.tupleOffset = tupleOffset; - this.tupleLength = tupleLength; - this.handler = handler; - this.attach = attach; - } - - public byte[] getTupleBytes() { - return tupleBytes; - } - - public void setTupleBytes(byte[] tupleBytes) { - this.tupleBytes = tupleBytes; - } - - public int getTupleOffset() { - return tupleOffset; - } - - public void setTupleOffset(int tupleOffset) { - this.tupleOffset = tupleOffset; - } - - public int getTupleLength() { - return tupleLength; - } - - public void setTupleLength(int tupleLength) { - this.tupleLength = tupleLength; - } - - public CompletionHandler getHandler() { - return handler; - } - - public void setHandler(CompletionHandler handler) { - this.handler = handler; - } - - public Object getAttach() { - return attach; - } - - public void setAttach(Object attach) { - this.attach = attach; - } -}