fastWrite

This commit is contained in:
redkale
2024-10-08 09:14:01 +08:00
parent 3d8c2f1fc5
commit 845a1e97ff
3 changed files with 45 additions and 46 deletions

View File

@@ -10,7 +10,6 @@ import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.*;
@@ -214,6 +213,10 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
return ioWriteThread;
}
public abstract AsyncConnection fastHandler(CompletionHandler handler);
public abstract void fastWrite(Consumer<ByteArray>... consumers);
public abstract boolean isTCP();
public abstract boolean shutdownInput();
@@ -257,30 +260,6 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
protected abstract <A> void writeImpl(
ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler);
// --------------------- fast-write-start ---------------------
public final AsyncConnection fastHandler(CompletionHandler handler) {
if (!clientMode) {
throw new RedkaleException("fast-writer only for client connection");
}
this.fastWriteHandler = Objects.requireNonNull(handler);
this.fastWriteArray = new ByteArray();
this.fastWriteQueue = new ConcurrentLinkedQueue<>();
return this;
}
public final void fastWrite(Consumer<ByteArray>... consumers) {
if (fastWriteHandler == null) {
throw new RedkaleException("fast-writer handler is null");
}
for (Consumer<ByteArray> c : consumers) {
this.fastWriteQueue.offer(c);
}
this.ioWriteThread.fastWrite(this);
}
protected abstract void fastPrepareInIOThread(Object selector);
// --------------------- fast-write-end ---------------------
protected void startRead(CompletionHandler<Integer, ByteBuffer> handler) {
read(handler);
}

View File

@@ -33,9 +33,6 @@ public class AsyncIOThread extends WorkThread {
private final Consumer<ByteBuffer> bufferConsumer;
// 应用于clientMode模式
private final Queue<AsyncConnection> fastQueue = new ConcurrentLinkedQueue<>();
private final Queue<Runnable> commandQueue = new ConcurrentLinkedQueue<>();
private final Queue<Consumer<Selector>> registerQueue = new ConcurrentLinkedQueue<>();
@@ -141,11 +138,6 @@ public class AsyncIOThread extends WorkThread {
selector.wakeup();
}
public final void fastWrite(AsyncConnection conn) {
fastQueue.offer(Objects.requireNonNull(conn));
selector.wakeup();
}
public Supplier<ByteBuffer> getBufferSupplier() {
return bufferSupplier;
}
@@ -161,11 +153,6 @@ public class AsyncIOThread extends WorkThread {
final Queue<Consumer<Selector>> registers = this.registerQueue;
while (!isClosed()) {
try {
AsyncConnection fastConn;
while ((fastConn = fastQueue.poll()) != null) {
fastConn.fastPrepareInIOThread(selector);
}
Consumer<Selector> register;
while ((register = registers.poll()) != null) {
try {

View File

@@ -10,10 +10,12 @@ import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Consumer;
import javax.net.ssl.SSLContext;
import org.redkale.util.ByteArray;
import org.redkale.util.ByteBufferWriter;
import org.redkale.util.RedkaleException;
/**
* 详情见: https://redkale.org
@@ -39,6 +41,12 @@ abstract class AsyncNioConnection extends AsyncConnection {
protected SelectionKey readKey;
// ------------------------------ fast写操作 ------------------------------------
protected ByteArray fastWriteArray;
protected Queue<Consumer<ByteArray>> fastWriteQueue;
protected CompletionHandler fastWriteHandler;
// -------------------------------- 写操作 --------------------------------------
protected byte[] writeByteTuple1Array;
@@ -160,7 +168,29 @@ abstract class AsyncNioConnection extends AsyncConnection {
doRead(this.ioReadThread.inCurrThread());
}
private void writeRegister(Selector selector) {
@Override
public final AsyncConnection fastHandler(CompletionHandler handler) {
if (!clientMode) {
throw new RedkaleException("fast-writer only for client connection");
}
this.fastWriteHandler = Objects.requireNonNull(handler);
this.fastWriteArray = new ByteArray();
this.fastWriteQueue = new ConcurrentLinkedQueue<>();
return this;
}
@Override
public final void fastWrite(Consumer<ByteArray>... consumers) {
if (fastWriteHandler == null) {
throw new RedkaleException("fast-writer handler is null");
}
for (Consumer<ByteArray> c : consumers) {
this.fastWriteQueue.offer(c);
}
this.ioWriteThread.register(this::fastWriteRegister);
}
private void fastWriteRegister(Selector selector) {
try {
if (writeKey == null) {
writeKey = keyFor(selector);
@@ -171,17 +201,17 @@ abstract class AsyncNioConnection extends AsyncConnection {
} else {
writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE);
}
// writeCompletionHandler不赋值会跳过doWrite
this.writeCompletionHandler = this.fastWriteHandler;
} catch (ClosedChannelException e) {
e.printStackTrace();
this.fastWriteQueue.clear();
handleWrite(0, e);
}
}
@Override
protected void fastPrepareInIOThread(Object selector) {
ByteArray array = this.fastWriteArray;
if (!this.writePending) {
array.clear();
}
private void fastWritePrepare() {
ByteArray array = this.fastWriteArray.clear();
Consumer<ByteArray> func;
while ((func = fastWriteQueue.poll()) != null) {
func.accept(array);
@@ -192,7 +222,6 @@ abstract class AsyncNioConnection extends AsyncConnection {
this.writeByteTuple1Array = array.content();
this.writeByteTuple1Offset = array.offset();
this.writeByteTuple1Length = array.length();
writeRegister((Selector) selector);
}
@Override
@@ -337,6 +366,10 @@ abstract class AsyncNioConnection extends AsyncConnection {
boolean hasRemain = true;
boolean writeCompleted = true;
boolean error = false;
// fastWrite
if (clientMode && fastWriteArray != null && writeByteBuffer == null && writeByteBuffers == null) {
fastWritePrepare();
}
int batchOffset = writeBuffersOffset;
int batchLength = writeBuffersLength;
while (hasRemain) { // 必须要将buffer写完为止