From a542c03e8b3d947501018cb25d747cfa0574600c Mon Sep 17 00:00:00 2001 From: redkale Date: Tue, 8 Oct 2024 15:54:51 +0800 Subject: [PATCH] =?UTF-8?q?=E7=A7=BB=E9=99=A4fastWrite?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/net/AsyncConnection.java | 12 --- .../org/redkale/net/AsyncNioConnection.java | 73 ------------------- 2 files changed, 85 deletions(-) diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index ea4c48669..19693f92f 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -83,14 +83,6 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { private Consumer beforeCloseListener; - // --------------------- clientMode: fast-write-start --------------------- - protected ByteArray fastWriteArray; - - protected Queue> fastWriteQueue; - - protected CompletionHandler fastWriteHandler; - // --------------------- clientMode: fast-write-end --------------------- - // 用于服务端的Socket, 等同于一直存在的readCompletionHandler ProtocolCodec protocolCodec; @@ -213,10 +205,6 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { return ioWriteThread; } - public abstract AsyncConnection fastHandler(CompletionHandler handler); - - public abstract void fastWrite(Consumer... consumers); - public abstract boolean isTCP(); public abstract boolean shutdownInput(); diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index e64767a68..6e1e72706 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -10,12 +10,9 @@ import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.*; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.function.Consumer; import javax.net.ssl.SSLContext; -import org.redkale.util.ByteArray; import org.redkale.util.ByteBufferWriter; -import org.redkale.util.RedkaleException; /** * 详情见: https://redkale.org @@ -40,13 +37,6 @@ abstract class AsyncNioConnection extends AsyncConnection { protected CompletionHandler readCompletionHandler; protected SelectionKey readKey; - - // ------------------------------ fast写操作 ------------------------------------ - protected ByteArray fastWriteArray; - - protected Queue> fastWriteQueue; - - protected CompletionHandler fastWriteHandler; // -------------------------------- 写操作 -------------------------------------- protected byte[] writeByteTuple1Array; @@ -168,62 +158,6 @@ abstract class AsyncNioConnection extends AsyncConnection { doRead(this.ioReadThread.inCurrThread()); } - @Override - public final AsyncConnection fastHandler(CompletionHandler handler) { - if (!clientMode) { - throw new RedkaleException("fast-writer only for client connection"); - } - this.fastWriteHandler = Objects.requireNonNull(handler); - this.fastWriteArray = new ByteArray(); - this.fastWriteQueue = new ConcurrentLinkedQueue<>(); - return this; - } - - @Override - public final void fastWrite(Consumer... consumers) { - if (fastWriteHandler == null) { - throw new RedkaleException("fast-writer handler is null"); - } - for (Consumer c : consumers) { - this.fastWriteQueue.offer(c); - } - this.ioWriteThread.register(this::fastWriteRegister); - } - - private void fastWriteRegister(Selector selector) { - try { - if (writeKey == null) { - writeKey = keyFor(selector); - } - if (writeKey == null) { - writeKey = implRegister(selector, SelectionKey.OP_WRITE); - writeKey.attach(this); - } else { - writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE); - } - // writeCompletionHandler不赋值会跳过doWrite - this.writeCompletionHandler = this.fastWriteHandler; - } catch (ClosedChannelException e) { - e.printStackTrace(); - this.fastWriteQueue.clear(); - handleWrite(0, e); - } - } - - private void fastWritePrepare() { - ByteArray array = this.fastWriteArray.clear(); - Consumer func; - while ((func = fastWriteQueue.poll()) != null) { - func.accept(array); - } - this.writePending = true; - this.writeCompletionHandler = this.fastWriteHandler; - this.writeAttachment = null; - this.writeByteTuple1Array = array.content(); - this.writeByteTuple1Offset = array.offset(); - this.writeByteTuple1Length = array.length(); - } - @Override public void write( byte[] headerContent, @@ -366,10 +300,6 @@ abstract class AsyncNioConnection extends AsyncConnection { boolean hasRemain = true; boolean writeCompleted = true; boolean error = false; - // fastWrite - if (clientMode && fastWriteArray != null && writeByteBuffer == null && writeByteBuffers == null) { - fastWritePrepare(); - } int batchOffset = writeBuffersOffset; int batchLength = writeBuffersLength; while (hasRemain) { // 必须要将buffer写完为止 @@ -406,9 +336,6 @@ abstract class AsyncNioConnection extends AsyncConnection { this.writeByteTuple2Offset = 0; this.writeByteTuple2Length = 0; } - if (this.fastWriteArray != null) { - this.fastWriteArray.clear(); - } } int writeCount; if (writeByteBuffer != null) {