移除fastWrite
This commit is contained in:
@@ -83,14 +83,6 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
|
|||||||
|
|
||||||
private Consumer<AsyncConnection> beforeCloseListener;
|
private Consumer<AsyncConnection> beforeCloseListener;
|
||||||
|
|
||||||
// --------------------- clientMode: fast-write-start ---------------------
|
|
||||||
protected ByteArray fastWriteArray;
|
|
||||||
|
|
||||||
protected Queue<Consumer<ByteArray>> fastWriteQueue;
|
|
||||||
|
|
||||||
protected CompletionHandler fastWriteHandler;
|
|
||||||
// --------------------- clientMode: fast-write-end ---------------------
|
|
||||||
|
|
||||||
// 用于服务端的Socket, 等同于一直存在的readCompletionHandler
|
// 用于服务端的Socket, 等同于一直存在的readCompletionHandler
|
||||||
ProtocolCodec protocolCodec;
|
ProtocolCodec protocolCodec;
|
||||||
|
|
||||||
@@ -213,10 +205,6 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
|
|||||||
return ioWriteThread;
|
return ioWriteThread;
|
||||||
}
|
}
|
||||||
|
|
||||||
public abstract AsyncConnection fastHandler(CompletionHandler handler);
|
|
||||||
|
|
||||||
public abstract void fastWrite(Consumer<ByteArray>... consumers);
|
|
||||||
|
|
||||||
public abstract boolean isTCP();
|
public abstract boolean isTCP();
|
||||||
|
|
||||||
public abstract boolean shutdownInput();
|
public abstract boolean shutdownInput();
|
||||||
|
|||||||
@@ -10,12 +10,9 @@ import java.net.SocketAddress;
|
|||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.channels.*;
|
import java.nio.channels.*;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import javax.net.ssl.SSLContext;
|
import javax.net.ssl.SSLContext;
|
||||||
import org.redkale.util.ByteArray;
|
|
||||||
import org.redkale.util.ByteBufferWriter;
|
import org.redkale.util.ByteBufferWriter;
|
||||||
import org.redkale.util.RedkaleException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 详情见: https://redkale.org
|
* 详情见: https://redkale.org
|
||||||
@@ -40,13 +37,6 @@ abstract class AsyncNioConnection extends AsyncConnection {
|
|||||||
protected CompletionHandler<Integer, ByteBuffer> readCompletionHandler;
|
protected CompletionHandler<Integer, ByteBuffer> readCompletionHandler;
|
||||||
|
|
||||||
protected SelectionKey readKey;
|
protected SelectionKey readKey;
|
||||||
|
|
||||||
// ------------------------------ fast写操作 ------------------------------------
|
|
||||||
protected ByteArray fastWriteArray;
|
|
||||||
|
|
||||||
protected Queue<Consumer<ByteArray>> fastWriteQueue;
|
|
||||||
|
|
||||||
protected CompletionHandler fastWriteHandler;
|
|
||||||
// -------------------------------- 写操作 --------------------------------------
|
// -------------------------------- 写操作 --------------------------------------
|
||||||
protected byte[] writeByteTuple1Array;
|
protected byte[] writeByteTuple1Array;
|
||||||
|
|
||||||
@@ -168,62 +158,6 @@ abstract class AsyncNioConnection extends AsyncConnection {
|
|||||||
doRead(this.ioReadThread.inCurrThread());
|
doRead(this.ioReadThread.inCurrThread());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public final AsyncConnection fastHandler(CompletionHandler handler) {
|
|
||||||
if (!clientMode) {
|
|
||||||
throw new RedkaleException("fast-writer only for client connection");
|
|
||||||
}
|
|
||||||
this.fastWriteHandler = Objects.requireNonNull(handler);
|
|
||||||
this.fastWriteArray = new ByteArray();
|
|
||||||
this.fastWriteQueue = new ConcurrentLinkedQueue<>();
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public final void fastWrite(Consumer<ByteArray>... consumers) {
|
|
||||||
if (fastWriteHandler == null) {
|
|
||||||
throw new RedkaleException("fast-writer handler is null");
|
|
||||||
}
|
|
||||||
for (Consumer<ByteArray> c : consumers) {
|
|
||||||
this.fastWriteQueue.offer(c);
|
|
||||||
}
|
|
||||||
this.ioWriteThread.register(this::fastWriteRegister);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void fastWriteRegister(Selector selector) {
|
|
||||||
try {
|
|
||||||
if (writeKey == null) {
|
|
||||||
writeKey = keyFor(selector);
|
|
||||||
}
|
|
||||||
if (writeKey == null) {
|
|
||||||
writeKey = implRegister(selector, SelectionKey.OP_WRITE);
|
|
||||||
writeKey.attach(this);
|
|
||||||
} else {
|
|
||||||
writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE);
|
|
||||||
}
|
|
||||||
// writeCompletionHandler不赋值会跳过doWrite
|
|
||||||
this.writeCompletionHandler = this.fastWriteHandler;
|
|
||||||
} catch (ClosedChannelException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
this.fastWriteQueue.clear();
|
|
||||||
handleWrite(0, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void fastWritePrepare() {
|
|
||||||
ByteArray array = this.fastWriteArray.clear();
|
|
||||||
Consumer<ByteArray> func;
|
|
||||||
while ((func = fastWriteQueue.poll()) != null) {
|
|
||||||
func.accept(array);
|
|
||||||
}
|
|
||||||
this.writePending = true;
|
|
||||||
this.writeCompletionHandler = this.fastWriteHandler;
|
|
||||||
this.writeAttachment = null;
|
|
||||||
this.writeByteTuple1Array = array.content();
|
|
||||||
this.writeByteTuple1Offset = array.offset();
|
|
||||||
this.writeByteTuple1Length = array.length();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void write(
|
public void write(
|
||||||
byte[] headerContent,
|
byte[] headerContent,
|
||||||
@@ -366,10 +300,6 @@ abstract class AsyncNioConnection extends AsyncConnection {
|
|||||||
boolean hasRemain = true;
|
boolean hasRemain = true;
|
||||||
boolean writeCompleted = true;
|
boolean writeCompleted = true;
|
||||||
boolean error = false;
|
boolean error = false;
|
||||||
// fastWrite
|
|
||||||
if (clientMode && fastWriteArray != null && writeByteBuffer == null && writeByteBuffers == null) {
|
|
||||||
fastWritePrepare();
|
|
||||||
}
|
|
||||||
int batchOffset = writeBuffersOffset;
|
int batchOffset = writeBuffersOffset;
|
||||||
int batchLength = writeBuffersLength;
|
int batchLength = writeBuffersLength;
|
||||||
while (hasRemain) { // 必须要将buffer写完为止
|
while (hasRemain) { // 必须要将buffer写完为止
|
||||||
@@ -406,9 +336,6 @@ abstract class AsyncNioConnection extends AsyncConnection {
|
|||||||
this.writeByteTuple2Offset = 0;
|
this.writeByteTuple2Offset = 0;
|
||||||
this.writeByteTuple2Length = 0;
|
this.writeByteTuple2Length = 0;
|
||||||
}
|
}
|
||||||
if (this.fastWriteArray != null) {
|
|
||||||
this.fastWriteArray.clear();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
int writeCount;
|
int writeCount;
|
||||||
if (writeByteBuffer != null) {
|
if (writeByteBuffer != null) {
|
||||||
|
|||||||
Reference in New Issue
Block a user