AsyncNioConnection优化fastWrite
This commit is contained in:
@@ -11,6 +11,7 @@ import java.nio.ByteBuffer;
|
||||
import java.nio.channels.*;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Consumer;
|
||||
import javax.net.ssl.SSLContext;
|
||||
import org.redkale.util.*;
|
||||
@@ -28,6 +29,10 @@ abstract class AsyncNioConnection extends AsyncConnection {
|
||||
|
||||
protected SocketAddress remoteAddress;
|
||||
|
||||
protected final AtomicLong fastWriteCount = new AtomicLong();
|
||||
|
||||
protected final Queue<byte[]> fastWriteQueue = new ConcurrentLinkedQueue<>();
|
||||
|
||||
//-------------------------------- 连操作 --------------------------------------
|
||||
protected Object connectAttachment;
|
||||
|
||||
@@ -80,10 +85,6 @@ abstract class AsyncNioConnection extends AsyncConnection {
|
||||
|
||||
protected SelectionKey writeKey;
|
||||
|
||||
//-------------------------- 用于客户端的Socket --------------------------
|
||||
//用于客户端的Socket
|
||||
protected final Queue<byte[]> clientModeWriteQueue = new ConcurrentLinkedQueue<>();
|
||||
|
||||
public AsyncNioConnection(boolean clientMode, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread,
|
||||
AsyncIOThread ioWriteThread, final int bufferCapacity, SSLBuilder sslBuilder, SSLContext sslContext) {
|
||||
super(clientMode, ioGroup, ioReadThread, ioWriteThread, bufferCapacity, sslBuilder, sslContext);
|
||||
@@ -297,7 +298,8 @@ abstract class AsyncNioConnection extends AsyncConnection {
|
||||
Objects.requireNonNull(data);
|
||||
Objects.requireNonNull(handler);
|
||||
this.writePending = true;
|
||||
this.clientModeWriteQueue.offer(data);
|
||||
this.fastWriteQueue.offer(data);
|
||||
this.fastWriteCount.incrementAndGet();
|
||||
this.writeCompletionHandler = (CompletionHandler) handler;
|
||||
this.writeAttachment = attachment;
|
||||
try {
|
||||
@@ -370,13 +372,13 @@ abstract class AsyncNioConnection extends AsyncConnection {
|
||||
boolean hasRemain = true;
|
||||
boolean writeCompleted = true;
|
||||
|
||||
if (clientMode && writeByteTuple1Array == null && !clientModeWriteQueue.isEmpty()) {
|
||||
if (writeByteTuple1Array == null && fastWriteCount.get() > 0) {
|
||||
byte[] bs = null;
|
||||
byte[] item;
|
||||
while ((item = clientModeWriteQueue.poll()) != null) {
|
||||
while ((item = fastWriteQueue.poll()) != null) {
|
||||
fastWriteCount.decrementAndGet();
|
||||
bs = Utility.append(bs, item);
|
||||
}
|
||||
this.writePending = true;
|
||||
this.writeByteTuple1Array = bs;
|
||||
this.writeByteTuple1Offset = 0;
|
||||
this.writeByteTuple1Length = bs == null ? 0 : bs.length;
|
||||
|
||||
Reference in New Issue
Block a user