AsyncConnection新增fastWrite功能
This commit is contained in:
@@ -10,6 +10,7 @@ import java.net.*;
|
|||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.channels.*;
|
import java.nio.channels.*;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
import java.util.concurrent.atomic.LongAdder;
|
import java.util.concurrent.atomic.LongAdder;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
import java.util.function.*;
|
import java.util.function.*;
|
||||||
@@ -83,6 +84,14 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
|
|||||||
|
|
||||||
private Consumer<AsyncConnection> beforeCloseListener;
|
private Consumer<AsyncConnection> beforeCloseListener;
|
||||||
|
|
||||||
|
// --------------------- fast-write-start ---------------------
|
||||||
|
protected ByteArray fastWriteArray;
|
||||||
|
|
||||||
|
protected Queue<Consumer<ByteArray>> fastWriteQueue;
|
||||||
|
|
||||||
|
protected CompletionHandler fastWriteHandler;
|
||||||
|
// --------------------- fast-write-end ---------------------
|
||||||
|
|
||||||
// 用于服务端的Socket, 等同于一直存在的readCompletionHandler
|
// 用于服务端的Socket, 等同于一直存在的readCompletionHandler
|
||||||
ProtocolCodec protocolCodec;
|
ProtocolCodec protocolCodec;
|
||||||
|
|
||||||
@@ -219,9 +228,6 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
|
|||||||
|
|
||||||
public abstract SocketAddress getLocalAddress();
|
public abstract SocketAddress getLocalAddress();
|
||||||
|
|
||||||
// public abstract <A> AsyncConnection fastHandler(CompletionHandler<Integer, ? super A> handler);
|
|
||||||
//
|
|
||||||
// public abstract <A> void fastWrite(byte[] data);
|
|
||||||
protected abstract void readRegisterImpl(CompletionHandler<Integer, ByteBuffer> handler);
|
protected abstract void readRegisterImpl(CompletionHandler<Integer, ByteBuffer> handler);
|
||||||
|
|
||||||
protected abstract void readImpl(CompletionHandler<Integer, ByteBuffer> handler);
|
protected abstract void readImpl(CompletionHandler<Integer, ByteBuffer> handler);
|
||||||
@@ -251,6 +257,30 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
|
|||||||
protected abstract <A> void writeImpl(
|
protected abstract <A> void writeImpl(
|
||||||
ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler);
|
ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler);
|
||||||
|
|
||||||
|
// --------------------- fast-write-start ---------------------
|
||||||
|
public final AsyncConnection fastHandler(CompletionHandler handler) {
|
||||||
|
if (!clientMode) {
|
||||||
|
throw new RedkaleException("fast-writer only for client connection");
|
||||||
|
}
|
||||||
|
this.fastWriteHandler = Objects.requireNonNull(handler);
|
||||||
|
this.fastWriteArray = new ByteArray();
|
||||||
|
this.fastWriteQueue = new ConcurrentLinkedQueue<>();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public final void fastWrite(Consumer<ByteArray>... consumers) {
|
||||||
|
if (fastWriteHandler == null) {
|
||||||
|
throw new RedkaleException("fast-writer handler is null");
|
||||||
|
}
|
||||||
|
for (Consumer<ByteArray> c : consumers) {
|
||||||
|
this.fastWriteQueue.add(c);
|
||||||
|
}
|
||||||
|
this.ioWriteThread.fastWrite(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract void fastPrepare(Object selector);
|
||||||
|
// --------------------- fast-write-end ---------------------
|
||||||
|
|
||||||
protected void startRead(CompletionHandler<Integer, ByteBuffer> handler) {
|
protected void startRead(CompletionHandler<Integer, ByteBuffer> handler) {
|
||||||
read(handler);
|
read(handler);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -33,6 +33,8 @@ public class AsyncIOThread extends WorkThread {
|
|||||||
|
|
||||||
private final Consumer<ByteBuffer> bufferConsumer;
|
private final Consumer<ByteBuffer> bufferConsumer;
|
||||||
|
|
||||||
|
private final Queue<AsyncConnection> fastQueue = new ConcurrentLinkedQueue<>();
|
||||||
|
|
||||||
private final Queue<Runnable> commandQueue = new ConcurrentLinkedQueue<>();
|
private final Queue<Runnable> commandQueue = new ConcurrentLinkedQueue<>();
|
||||||
|
|
||||||
private final Queue<Consumer<Selector>> registerQueue = new ConcurrentLinkedQueue<>();
|
private final Queue<Consumer<Selector>> registerQueue = new ConcurrentLinkedQueue<>();
|
||||||
@@ -138,6 +140,11 @@ public class AsyncIOThread extends WorkThread {
|
|||||||
selector.wakeup();
|
selector.wakeup();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public final void fastWrite(AsyncConnection conn) {
|
||||||
|
fastQueue.add(Objects.requireNonNull(conn));
|
||||||
|
selector.wakeup();
|
||||||
|
}
|
||||||
|
|
||||||
public Supplier<ByteBuffer> getBufferSupplier() {
|
public Supplier<ByteBuffer> getBufferSupplier() {
|
||||||
return bufferSupplier;
|
return bufferSupplier;
|
||||||
}
|
}
|
||||||
@@ -153,6 +160,11 @@ public class AsyncIOThread extends WorkThread {
|
|||||||
final Queue<Consumer<Selector>> registers = this.registerQueue;
|
final Queue<Consumer<Selector>> registers = this.registerQueue;
|
||||||
while (!isClosed()) {
|
while (!isClosed()) {
|
||||||
try {
|
try {
|
||||||
|
AsyncConnection fastConn;
|
||||||
|
while ((fastConn = fastQueue.poll()) != null) {
|
||||||
|
fastConn.fastPrepare(selector);
|
||||||
|
}
|
||||||
|
|
||||||
Consumer<Selector> register;
|
Consumer<Selector> register;
|
||||||
while ((register = registers.poll()) != null) {
|
while ((register = registers.poll()) != null) {
|
||||||
try {
|
try {
|
||||||
@@ -163,6 +175,7 @@ public class AsyncIOThread extends WorkThread {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Runnable command;
|
Runnable command;
|
||||||
while ((command = commands.poll()) != null) {
|
while ((command = commands.poll()) != null) {
|
||||||
try {
|
try {
|
||||||
|
|||||||
@@ -10,9 +10,9 @@ import java.net.SocketAddress;
|
|||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.channels.*;
|
import java.nio.channels.*;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.*;
|
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import javax.net.ssl.SSLContext;
|
import javax.net.ssl.SSLContext;
|
||||||
|
import org.redkale.util.ByteArray;
|
||||||
import org.redkale.util.ByteBufferWriter;
|
import org.redkale.util.ByteBufferWriter;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -25,9 +25,6 @@ abstract class AsyncNioConnection extends AsyncConnection {
|
|||||||
|
|
||||||
protected SocketAddress remoteAddress;
|
protected SocketAddress remoteAddress;
|
||||||
|
|
||||||
// protected final AtomicLong fastWriteCount = new AtomicLong();
|
|
||||||
protected final Queue<byte[]> fastWriteQueue = new ConcurrentLinkedQueue<>();
|
|
||||||
|
|
||||||
// -------------------------------- 连操作 --------------------------------------
|
// -------------------------------- 连操作 --------------------------------------
|
||||||
protected Object connectAttachment;
|
protected Object connectAttachment;
|
||||||
|
|
||||||
@@ -163,6 +160,41 @@ abstract class AsyncNioConnection extends AsyncConnection {
|
|||||||
doRead(this.ioReadThread.inCurrThread());
|
doRead(this.ioReadThread.inCurrThread());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void writeRegister(Selector selector) {
|
||||||
|
try {
|
||||||
|
if (writeKey == null) {
|
||||||
|
writeKey = keyFor(selector);
|
||||||
|
}
|
||||||
|
if (writeKey == null) {
|
||||||
|
writeKey = implRegister(selector, SelectionKey.OP_WRITE);
|
||||||
|
writeKey.attach(this);
|
||||||
|
} else {
|
||||||
|
writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE);
|
||||||
|
}
|
||||||
|
} catch (ClosedChannelException e) {
|
||||||
|
handleWrite(0, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void fastPrepare(Object selector) {
|
||||||
|
if (this.writePending) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
ByteArray array = this.fastWriteArray.clear();
|
||||||
|
Consumer<ByteArray> func;
|
||||||
|
while ((func = fastWriteQueue.poll()) != null) {
|
||||||
|
func.accept(array);
|
||||||
|
}
|
||||||
|
this.writePending = true;
|
||||||
|
this.writeCompletionHandler = this.fastWriteHandler;
|
||||||
|
this.writeAttachment = null;
|
||||||
|
this.writeByteTuple1Array = array.content();
|
||||||
|
this.writeByteTuple1Offset = array.offset();
|
||||||
|
this.writeByteTuple1Length = array.length();
|
||||||
|
writeRegister((Selector) selector);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void write(
|
public void write(
|
||||||
byte[] headerContent,
|
byte[] headerContent,
|
||||||
@@ -261,44 +293,6 @@ abstract class AsyncNioConnection extends AsyncConnection {
|
|||||||
doWrite();
|
doWrite();
|
||||||
}
|
}
|
||||||
|
|
||||||
// @Override
|
|
||||||
// public <A> void fastWrite(byte[] data) {
|
|
||||||
// CompletionHandler<Integer, ? super A> handler = this.writeFastHandler;
|
|
||||||
// Objects.requireNonNull(data);
|
|
||||||
// Objects.requireNonNull(handler, "fastHandler is null");
|
|
||||||
// if (!this.isConnected()) {
|
|
||||||
// handler.failed(new NotYetConnectedException(), null);
|
|
||||||
// return;
|
|
||||||
// }
|
|
||||||
// this.writePending = true;
|
|
||||||
// this.fastWriteQueue.offer(data);
|
|
||||||
// this.fastWriteCount.incrementAndGet();
|
|
||||||
// this.writeCompletionHandler = (CompletionHandler) handler;
|
|
||||||
// this.writeAttachment = null;
|
|
||||||
// try {
|
|
||||||
// if (writeKey == null) {
|
|
||||||
// ioWriteThread.register(selector -> {
|
|
||||||
// try {
|
|
||||||
// if (writeKey == null) {
|
|
||||||
// writeKey = keyFor(selector);
|
|
||||||
// }
|
|
||||||
// if (writeKey == null) {
|
|
||||||
// writeKey = implRegister(selector, SelectionKey.OP_WRITE);
|
|
||||||
// writeKey.attach(this);
|
|
||||||
// } else {
|
|
||||||
// writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE);
|
|
||||||
// }
|
|
||||||
// } catch (ClosedChannelException e) {
|
|
||||||
// handleWrite(0, e);
|
|
||||||
// }
|
|
||||||
// });
|
|
||||||
// } else {
|
|
||||||
// ioWriteThread.interestOpsOr(writeKey, SelectionKey.OP_WRITE);
|
|
||||||
// }
|
|
||||||
// } catch (Exception e) {
|
|
||||||
// handleWrite(0, e);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
public void doRead(boolean direct) {
|
public void doRead(boolean direct) {
|
||||||
try {
|
try {
|
||||||
this.readTime = System.currentTimeMillis();
|
this.readTime = System.currentTimeMillis();
|
||||||
@@ -343,38 +337,6 @@ abstract class AsyncNioConnection extends AsyncConnection {
|
|||||||
boolean hasRemain = true;
|
boolean hasRemain = true;
|
||||||
boolean writeCompleted = true;
|
boolean writeCompleted = true;
|
||||||
boolean error = false;
|
boolean error = false;
|
||||||
// if (writeByteBuffer == null && writeByteBuffers == null && writeByteTuple1Array == null &&
|
|
||||||
// fastWriteCount.get() > 0) {
|
|
||||||
// final ByteBuffer buffer = pollWriteBuffer();
|
|
||||||
// ByteBufferWriter writer = null;
|
|
||||||
// byte[] item;
|
|
||||||
// while ((item = fastWriteQueue.poll()) != null) {
|
|
||||||
// fastWriteCount.decrementAndGet();
|
|
||||||
// if (writer != null) {
|
|
||||||
// writer.put(item);
|
|
||||||
// } else if (buffer.remaining() >= item.length) {
|
|
||||||
// buffer.put(item);
|
|
||||||
// } else {
|
|
||||||
// writer = ByteBufferWriter.create(getWriteBufferSupplier(), buffer);
|
|
||||||
// writer.put(item);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// this.writeBuffersOffset = 0;
|
|
||||||
// if (writer == null) {
|
|
||||||
// this.writeByteBuffer = buffer.flip();
|
|
||||||
// this.writeBuffersLength = 0;
|
|
||||||
// } else {
|
|
||||||
// this.writeByteBuffers = writer.toBuffers();
|
|
||||||
// this.writeBuffersLength = this.writeByteBuffers.length;
|
|
||||||
// }
|
|
||||||
// this.writeByteTuple1Array = null;
|
|
||||||
// this.writeByteTuple1Offset = 0;
|
|
||||||
// this.writeByteTuple1Length = 0;
|
|
||||||
// this.writeByteTuple2Array = null;
|
|
||||||
// this.writeByteTuple2Offset = 0;
|
|
||||||
// this.writeByteTuple2Length = 0;
|
|
||||||
// }
|
|
||||||
|
|
||||||
int batchOffset = writeBuffersOffset;
|
int batchOffset = writeBuffersOffset;
|
||||||
int batchLength = writeBuffersLength;
|
int batchLength = writeBuffersLength;
|
||||||
while (hasRemain) { // 必须要将buffer写完为止
|
while (hasRemain) { // 必须要将buffer写完为止
|
||||||
|
|||||||
@@ -46,17 +46,16 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
|
|||||||
|
|
||||||
protected final AsyncConnection channel;
|
protected final AsyncConnection channel;
|
||||||
|
|
||||||
protected final CompletionHandler<Integer, ClientConnection> writeHandler =
|
protected final CompletionHandler<Integer, Object> writeHandler = new CompletionHandler<Integer, Object>() {
|
||||||
new CompletionHandler<Integer, ClientConnection>() {
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void completed(Integer result, ClientConnection attachment) {
|
public void completed(Integer result, Object attachment) {
|
||||||
// do nothing
|
// do nothing
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void failed(Throwable exc, ClientConnection attachment) {
|
public void failed(Throwable exc, Object attachment) {
|
||||||
attachment.dispose(exc);
|
dispose(exc);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user