diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index 148b16aa8..ea4c48669 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -10,7 +10,6 @@ import java.net.*; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.*; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantLock; import java.util.function.*; @@ -214,6 +213,10 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { return ioWriteThread; } + public abstract AsyncConnection fastHandler(CompletionHandler handler); + + public abstract void fastWrite(Consumer... consumers); + public abstract boolean isTCP(); public abstract boolean shutdownInput(); @@ -257,30 +260,6 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { protected abstract void writeImpl( ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler); - // --------------------- fast-write-start --------------------- - public final AsyncConnection fastHandler(CompletionHandler handler) { - if (!clientMode) { - throw new RedkaleException("fast-writer only for client connection"); - } - this.fastWriteHandler = Objects.requireNonNull(handler); - this.fastWriteArray = new ByteArray(); - this.fastWriteQueue = new ConcurrentLinkedQueue<>(); - return this; - } - - public final void fastWrite(Consumer... consumers) { - if (fastWriteHandler == null) { - throw new RedkaleException("fast-writer handler is null"); - } - for (Consumer c : consumers) { - this.fastWriteQueue.offer(c); - } - this.ioWriteThread.fastWrite(this); - } - - protected abstract void fastPrepareInIOThread(Object selector); - // --------------------- fast-write-end --------------------- - protected void startRead(CompletionHandler handler) { read(handler); } diff --git a/src/main/java/org/redkale/net/AsyncIOThread.java b/src/main/java/org/redkale/net/AsyncIOThread.java index 202bcf5f6..52b6b5df4 100644 --- a/src/main/java/org/redkale/net/AsyncIOThread.java +++ b/src/main/java/org/redkale/net/AsyncIOThread.java @@ -33,9 +33,6 @@ public class AsyncIOThread extends WorkThread { private final Consumer bufferConsumer; - // 应用于clientMode模式 - private final Queue fastQueue = new ConcurrentLinkedQueue<>(); - private final Queue commandQueue = new ConcurrentLinkedQueue<>(); private final Queue> registerQueue = new ConcurrentLinkedQueue<>(); @@ -141,11 +138,6 @@ public class AsyncIOThread extends WorkThread { selector.wakeup(); } - public final void fastWrite(AsyncConnection conn) { - fastQueue.offer(Objects.requireNonNull(conn)); - selector.wakeup(); - } - public Supplier getBufferSupplier() { return bufferSupplier; } @@ -161,11 +153,6 @@ public class AsyncIOThread extends WorkThread { final Queue> registers = this.registerQueue; while (!isClosed()) { try { - AsyncConnection fastConn; - while ((fastConn = fastQueue.poll()) != null) { - fastConn.fastPrepareInIOThread(selector); - } - Consumer register; while ((register = registers.poll()) != null) { try { diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index 545e0f4f9..e64767a68 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -10,10 +10,12 @@ import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.function.Consumer; import javax.net.ssl.SSLContext; import org.redkale.util.ByteArray; import org.redkale.util.ByteBufferWriter; +import org.redkale.util.RedkaleException; /** * 详情见: https://redkale.org @@ -39,6 +41,12 @@ abstract class AsyncNioConnection extends AsyncConnection { protected SelectionKey readKey; + // ------------------------------ fast写操作 ------------------------------------ + protected ByteArray fastWriteArray; + + protected Queue> fastWriteQueue; + + protected CompletionHandler fastWriteHandler; // -------------------------------- 写操作 -------------------------------------- protected byte[] writeByteTuple1Array; @@ -160,7 +168,29 @@ abstract class AsyncNioConnection extends AsyncConnection { doRead(this.ioReadThread.inCurrThread()); } - private void writeRegister(Selector selector) { + @Override + public final AsyncConnection fastHandler(CompletionHandler handler) { + if (!clientMode) { + throw new RedkaleException("fast-writer only for client connection"); + } + this.fastWriteHandler = Objects.requireNonNull(handler); + this.fastWriteArray = new ByteArray(); + this.fastWriteQueue = new ConcurrentLinkedQueue<>(); + return this; + } + + @Override + public final void fastWrite(Consumer... consumers) { + if (fastWriteHandler == null) { + throw new RedkaleException("fast-writer handler is null"); + } + for (Consumer c : consumers) { + this.fastWriteQueue.offer(c); + } + this.ioWriteThread.register(this::fastWriteRegister); + } + + private void fastWriteRegister(Selector selector) { try { if (writeKey == null) { writeKey = keyFor(selector); @@ -171,17 +201,17 @@ abstract class AsyncNioConnection extends AsyncConnection { } else { writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE); } + // writeCompletionHandler不赋值会跳过doWrite + this.writeCompletionHandler = this.fastWriteHandler; } catch (ClosedChannelException e) { + e.printStackTrace(); + this.fastWriteQueue.clear(); handleWrite(0, e); } } - @Override - protected void fastPrepareInIOThread(Object selector) { - ByteArray array = this.fastWriteArray; - if (!this.writePending) { - array.clear(); - } + private void fastWritePrepare() { + ByteArray array = this.fastWriteArray.clear(); Consumer func; while ((func = fastWriteQueue.poll()) != null) { func.accept(array); @@ -192,7 +222,6 @@ abstract class AsyncNioConnection extends AsyncConnection { this.writeByteTuple1Array = array.content(); this.writeByteTuple1Offset = array.offset(); this.writeByteTuple1Length = array.length(); - writeRegister((Selector) selector); } @Override @@ -337,6 +366,10 @@ abstract class AsyncNioConnection extends AsyncConnection { boolean hasRemain = true; boolean writeCompleted = true; boolean error = false; + // fastWrite + if (clientMode && fastWriteArray != null && writeByteBuffer == null && writeByteBuffers == null) { + fastWritePrepare(); + } int batchOffset = writeBuffersOffset; int batchLength = writeBuffersLength; while (hasRemain) { // 必须要将buffer写完为止