From 0427a76b553f1caee7c3cfb9fa26dc795bb5e2b7 Mon Sep 17 00:00:00 2001 From: redkale Date: Mon, 7 Oct 2024 20:17:21 +0800 Subject: [PATCH] =?UTF-8?q?AsyncConnection=E6=96=B0=E5=A2=9EfastWrite?= =?UTF-8?q?=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/net/AsyncConnection.java | 36 +++++- .../java/org/redkale/net/AsyncIOThread.java | 13 +++ .../org/redkale/net/AsyncNioConnection.java | 110 ++++++------------ .../redkale/net/client/ClientConnection.java | 21 ++-- 4 files changed, 92 insertions(+), 88 deletions(-) diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index 347194412..d83369e5a 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -10,6 +10,7 @@ import java.net.*; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantLock; import java.util.function.*; @@ -83,6 +84,14 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { private Consumer beforeCloseListener; + // --------------------- fast-write-start --------------------- + protected ByteArray fastWriteArray; + + protected Queue> fastWriteQueue; + + protected CompletionHandler fastWriteHandler; + // --------------------- fast-write-end --------------------- + // 用于服务端的Socket, 等同于一直存在的readCompletionHandler ProtocolCodec protocolCodec; @@ -219,9 +228,6 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { public abstract SocketAddress getLocalAddress(); - // public abstract AsyncConnection fastHandler(CompletionHandler handler); - // - // public abstract void fastWrite(byte[] data); protected abstract void readRegisterImpl(CompletionHandler handler); protected abstract void readImpl(CompletionHandler handler); @@ -251,6 +257,30 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { protected abstract void writeImpl( ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler); + // --------------------- fast-write-start --------------------- + public final AsyncConnection fastHandler(CompletionHandler handler) { + if (!clientMode) { + throw new RedkaleException("fast-writer only for client connection"); + } + this.fastWriteHandler = Objects.requireNonNull(handler); + this.fastWriteArray = new ByteArray(); + this.fastWriteQueue = new ConcurrentLinkedQueue<>(); + return this; + } + + public final void fastWrite(Consumer... consumers) { + if (fastWriteHandler == null) { + throw new RedkaleException("fast-writer handler is null"); + } + for (Consumer c : consumers) { + this.fastWriteQueue.add(c); + } + this.ioWriteThread.fastWrite(this); + } + + protected abstract void fastPrepare(Object selector); + // --------------------- fast-write-end --------------------- + protected void startRead(CompletionHandler handler) { read(handler); } diff --git a/src/main/java/org/redkale/net/AsyncIOThread.java b/src/main/java/org/redkale/net/AsyncIOThread.java index 867f8fcbb..adc6cfff3 100644 --- a/src/main/java/org/redkale/net/AsyncIOThread.java +++ b/src/main/java/org/redkale/net/AsyncIOThread.java @@ -33,6 +33,8 @@ public class AsyncIOThread extends WorkThread { private final Consumer bufferConsumer; + private final Queue fastQueue = new ConcurrentLinkedQueue<>(); + private final Queue commandQueue = new ConcurrentLinkedQueue<>(); private final Queue> registerQueue = new ConcurrentLinkedQueue<>(); @@ -138,6 +140,11 @@ public class AsyncIOThread extends WorkThread { selector.wakeup(); } + public final void fastWrite(AsyncConnection conn) { + fastQueue.add(Objects.requireNonNull(conn)); + selector.wakeup(); + } + public Supplier getBufferSupplier() { return bufferSupplier; } @@ -153,6 +160,11 @@ public class AsyncIOThread extends WorkThread { final Queue> registers = this.registerQueue; while (!isClosed()) { try { + AsyncConnection fastConn; + while ((fastConn = fastQueue.poll()) != null) { + fastConn.fastPrepare(selector); + } + Consumer register; while ((register = registers.poll()) != null) { try { @@ -163,6 +175,7 @@ public class AsyncIOThread extends WorkThread { } } } + Runnable command; while ((command = commands.poll()) != null) { try { diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index f1d68d48e..23dad4526 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -10,9 +10,9 @@ import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.*; -import java.util.concurrent.*; import java.util.function.Consumer; import javax.net.ssl.SSLContext; +import org.redkale.util.ByteArray; import org.redkale.util.ByteBufferWriter; /** @@ -25,9 +25,6 @@ abstract class AsyncNioConnection extends AsyncConnection { protected SocketAddress remoteAddress; - // protected final AtomicLong fastWriteCount = new AtomicLong(); - protected final Queue fastWriteQueue = new ConcurrentLinkedQueue<>(); - // -------------------------------- 连操作 -------------------------------------- protected Object connectAttachment; @@ -163,6 +160,41 @@ abstract class AsyncNioConnection extends AsyncConnection { doRead(this.ioReadThread.inCurrThread()); } + private void writeRegister(Selector selector) { + try { + if (writeKey == null) { + writeKey = keyFor(selector); + } + if (writeKey == null) { + writeKey = implRegister(selector, SelectionKey.OP_WRITE); + writeKey.attach(this); + } else { + writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE); + } + } catch (ClosedChannelException e) { + handleWrite(0, e); + } + } + + @Override + protected void fastPrepare(Object selector) { + if (this.writePending) { + return; + } + ByteArray array = this.fastWriteArray.clear(); + Consumer func; + while ((func = fastWriteQueue.poll()) != null) { + func.accept(array); + } + this.writePending = true; + this.writeCompletionHandler = this.fastWriteHandler; + this.writeAttachment = null; + this.writeByteTuple1Array = array.content(); + this.writeByteTuple1Offset = array.offset(); + this.writeByteTuple1Length = array.length(); + writeRegister((Selector) selector); + } + @Override public void write( byte[] headerContent, @@ -261,44 +293,6 @@ abstract class AsyncNioConnection extends AsyncConnection { doWrite(); } - // @Override - // public void fastWrite(byte[] data) { - // CompletionHandler handler = this.writeFastHandler; - // Objects.requireNonNull(data); - // Objects.requireNonNull(handler, "fastHandler is null"); - // if (!this.isConnected()) { - // handler.failed(new NotYetConnectedException(), null); - // return; - // } - // this.writePending = true; - // this.fastWriteQueue.offer(data); - // this.fastWriteCount.incrementAndGet(); - // this.writeCompletionHandler = (CompletionHandler) handler; - // this.writeAttachment = null; - // try { - // if (writeKey == null) { - // ioWriteThread.register(selector -> { - // try { - // if (writeKey == null) { - // writeKey = keyFor(selector); - // } - // if (writeKey == null) { - // writeKey = implRegister(selector, SelectionKey.OP_WRITE); - // writeKey.attach(this); - // } else { - // writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE); - // } - // } catch (ClosedChannelException e) { - // handleWrite(0, e); - // } - // }); - // } else { - // ioWriteThread.interestOpsOr(writeKey, SelectionKey.OP_WRITE); - // } - // } catch (Exception e) { - // handleWrite(0, e); - // } - // } public void doRead(boolean direct) { try { this.readTime = System.currentTimeMillis(); @@ -343,38 +337,6 @@ abstract class AsyncNioConnection extends AsyncConnection { boolean hasRemain = true; boolean writeCompleted = true; boolean error = false; - // if (writeByteBuffer == null && writeByteBuffers == null && writeByteTuple1Array == null && - // fastWriteCount.get() > 0) { - // final ByteBuffer buffer = pollWriteBuffer(); - // ByteBufferWriter writer = null; - // byte[] item; - // while ((item = fastWriteQueue.poll()) != null) { - // fastWriteCount.decrementAndGet(); - // if (writer != null) { - // writer.put(item); - // } else if (buffer.remaining() >= item.length) { - // buffer.put(item); - // } else { - // writer = ByteBufferWriter.create(getWriteBufferSupplier(), buffer); - // writer.put(item); - // } - // } - // this.writeBuffersOffset = 0; - // if (writer == null) { - // this.writeByteBuffer = buffer.flip(); - // this.writeBuffersLength = 0; - // } else { - // this.writeByteBuffers = writer.toBuffers(); - // this.writeBuffersLength = this.writeByteBuffers.length; - // } - // this.writeByteTuple1Array = null; - // this.writeByteTuple1Offset = 0; - // this.writeByteTuple1Length = 0; - // this.writeByteTuple2Array = null; - // this.writeByteTuple2Offset = 0; - // this.writeByteTuple2Length = 0; - // } - int batchOffset = writeBuffersOffset; int batchLength = writeBuffersLength; while (hasRemain) { // 必须要将buffer写完为止 diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index 171c49c73..1b5293ca2 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -46,19 +46,18 @@ public abstract class ClientConnection writeHandler = - new CompletionHandler() { + protected final CompletionHandler writeHandler = new CompletionHandler() { - @Override - public void completed(Integer result, ClientConnection attachment) { - // do nothing - } + @Override + public void completed(Integer result, Object attachment) { + // do nothing + } - @Override - public void failed(Throwable exc, ClientConnection attachment) { - attachment.dispose(exc); - } - }; + @Override + public void failed(Throwable exc, Object attachment) { + dispose(exc); + } + }; @Nonnull protected LongAdder respWaitingCounter;