AsyncNioTcpProtocolServer优化

This commit is contained in:
Redkale
2023-01-14 11:36:07 +08:00
parent 0712b71b71
commit 6c8b482f65
7 changed files with 38 additions and 45 deletions

View File

@@ -80,8 +80,8 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
//用于服务端的Socket, 等同于一直存在的readCompletionHandler //用于服务端的Socket, 等同于一直存在的readCompletionHandler
ProtocolCodec protocolCodec; ProtocolCodec protocolCodec;
protected AsyncConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread, protected AsyncConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread,
final int bufferCapacity, SSLBuilder sslBuilder, SSLContext sslContext, final LongAdder livingCounter, final LongAdder closedCounter) { AsyncIOThread ioWriteThread, int bufferCapacity, SSLBuilder sslBuilder, SSLContext sslContext) {
Objects.requireNonNull(ioGroup); Objects.requireNonNull(ioGroup);
Objects.requireNonNull(ioReadThread); Objects.requireNonNull(ioReadThread);
Objects.requireNonNull(ioWriteThread); Objects.requireNonNull(ioWriteThread);
@@ -94,8 +94,8 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
this.readBufferConsumer = ioReadThread.getBufferConsumer(); this.readBufferConsumer = ioReadThread.getBufferConsumer();
this.writeBufferSupplier = ioWriteThread.getBufferSupplier(); this.writeBufferSupplier = ioWriteThread.getBufferSupplier();
this.writeBufferConsumer = ioWriteThread.getBufferConsumer(); this.writeBufferConsumer = ioWriteThread.getBufferConsumer();
this.livingCounter = livingCounter; this.livingCounter = ioGroup.connLivingCounter;
this.closedCounter = closedCounter; this.closedCounter = ioGroup.connClosedCounter;
if (client) { //client模式下无SSLBuilder if (client) { //client模式下无SSLBuilder
if (sslContext != null) { if (sslContext != null) {
if (sslBuilder != null) { if (sslBuilder != null) {

View File

@@ -41,7 +41,7 @@ public class AsyncIOGroup extends AsyncGroup {
//必须与ioReadThreads数量相同 //必须与ioReadThreads数量相同
final AsyncIOThread[] ioWriteThreads; final AsyncIOThread[] ioWriteThreads;
private AsyncIOThread connectThread; final AsyncIOThread connectThread;
final int bufferCapacity; final int bufferCapacity;
@@ -52,10 +52,10 @@ public class AsyncIOGroup extends AsyncGroup {
//创建数 //创建数
final LongAdder connCreateCounter = new LongAdder(); final LongAdder connCreateCounter = new LongAdder();
//关闭 //在线
final LongAdder connLivingCounter = new LongAdder(); final LongAdder connLivingCounter = new LongAdder();
//在线 //关闭
final LongAdder connClosedCounter = new LongAdder(); final LongAdder connClosedCounter = new LongAdder();
private ScheduledThreadPoolExecutor timeoutExecutor; private ScheduledThreadPoolExecutor timeoutExecutor;
@@ -100,6 +100,8 @@ public class AsyncIOGroup extends AsyncGroup {
safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler()); safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler());
this.connectThread = client ? new ClientReadIOThread(String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool) this.connectThread = client ? new ClientReadIOThread(String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool)
: new AsyncIOThread(String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool); : new AsyncIOThread(String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool);
} else {
this.connectThread = null;
} }
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
@@ -246,7 +248,7 @@ public class AsyncIOGroup extends AsyncGroup {
if (ioThreads == null) { if (ioThreads == null) {
ioThreads = nextIOThreads(); ioThreads = nextIOThreads();
} }
return new AsyncNioTcpConnection(true, this, ioThreads[0], ioThreads[1], connectThread, channel, null, null, address, connLivingCounter, connClosedCounter); return new AsyncNioTcpConnection(true, this, ioThreads[0], ioThreads[1], channel, null, null, address);
} }
@Override @Override
@@ -315,7 +317,7 @@ public class AsyncIOGroup extends AsyncGroup {
if (ioThreads == null) { if (ioThreads == null) {
ioThreads = nextIOThreads(); ioThreads = nextIOThreads();
} }
return new AsyncNioUdpConnection(true, this, ioThreads[0], ioThreads[1], connectThread, channel, null, null, address, connLivingCounter, connClosedCounter); return new AsyncNioUdpConnection(true, this, ioThreads[0], ioThreads[1], channel, null, null, address);
} }
@Override @Override

View File

@@ -11,7 +11,6 @@ import java.nio.ByteBuffer;
import java.nio.channels.*; import java.nio.channels.*;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer; import java.util.function.Consumer;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import org.redkale.util.ByteBufferWriter; import org.redkale.util.ByteBufferWriter;
@@ -97,10 +96,10 @@ abstract class AsyncNioConnection extends AsyncConnection {
protected SelectionKey writeKey; protected SelectionKey writeKey;
public AsyncNioConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread, AsyncIOThread connectThread, public AsyncNioConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread,
final int bufferCapacity, SSLBuilder sslBuilder, SSLContext sslContext, LongAdder livingCounter, LongAdder closedCounter) { AsyncIOThread ioWriteThread, final int bufferCapacity, SSLBuilder sslBuilder, SSLContext sslContext) {
super(client, ioGroup, ioReadThread, ioWriteThread, bufferCapacity, sslBuilder, sslContext, livingCounter, closedCounter); super(client, ioGroup, ioReadThread, ioWriteThread, bufferCapacity, sslBuilder, sslContext);
this.connectThread = connectThread; this.connectThread = ioGroup.connectThread;
} }
@Override @Override

View File

@@ -10,7 +10,6 @@ import java.net.*;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.*; import java.nio.channels.*;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.LongAdder;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import org.redkale.util.ByteBufferReader; import org.redkale.util.ByteBufferReader;
@@ -27,11 +26,11 @@ class AsyncNioTcpConnection extends AsyncNioConnection {
private final SocketChannel channel; private final SocketChannel channel;
public AsyncNioTcpConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread, AsyncIOThread connectThread, public AsyncNioTcpConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread,
SocketChannel ch, SSLBuilder sslBuilder, SSLContext sslContext, final SocketAddress addr0, LongAdder livingCounter, LongAdder closedCounter) { AsyncIOThread ioWriteThread, SocketChannel ch, SSLBuilder sslBuilder, SSLContext sslContext, final SocketAddress address) {
super(client, ioGroup, ioReadThread, ioWriteThread, connectThread, ioGroup.bufferCapacity, sslBuilder, sslContext, livingCounter, closedCounter); super(client, ioGroup, ioReadThread, ioWriteThread, ioGroup.bufferCapacity, sslBuilder, sslContext);
this.channel = ch; this.channel = ch;
SocketAddress addr = addr0; SocketAddress addr = address;
if (addr == null) { if (addr == null) {
try { try {
addr = ch.getRemoteAddress(); addr = ch.getRemoteAddress();

View File

@@ -168,15 +168,9 @@ class AsyncNioTcpProtocolServer extends ProtocolServer {
channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024);
LongAdder connCreateCounter = ioGroup.connCreateCounter; ioGroup.connCreateCounter.increment();
if (connCreateCounter != null) { ioGroup.connLivingCounter.increment();
connCreateCounter.increment(); AsyncNioTcpConnection conn = new AsyncNioTcpConnection(false, ioGroup, ioReadThread, ioWriteThread, channel, context.getSSLBuilder(), context.getSSLContext(), null);
}
LongAdder connLivingCounter = ioGroup.connLivingCounter;
if (connLivingCounter != null) {
connLivingCounter.increment();
}
AsyncNioTcpConnection conn = new AsyncNioTcpConnection(false, ioGroup, ioReadThread, ioWriteThread, ioGroup.connectThread(), channel, context.getSSLBuilder(), context.getSSLContext(), null, connLivingCounter, ioGroup.connClosedCounter);
ProtocolCodec codec = new ProtocolCodec(context, responseSupplier, responseConsumer, conn); ProtocolCodec codec = new ProtocolCodec(context, responseSupplier, responseConsumer, conn);
conn.protocolCodec = codec; conn.protocolCodec = codec;
if (conn.sslEngine == null) { if (conn.sslEngine == null) {

View File

@@ -10,7 +10,6 @@ import java.net.*;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.*; import java.nio.channels.*;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.LongAdder;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
/** /**
@@ -24,11 +23,11 @@ class AsyncNioUdpConnection extends AsyncNioConnection {
private final DatagramChannel channel; private final DatagramChannel channel;
public AsyncNioUdpConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread, AsyncIOThread connectThread, DatagramChannel ch, public AsyncNioUdpConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread,
SSLBuilder sslBuilder, SSLContext sslContext, final SocketAddress addr0, LongAdder livingCounter, LongAdder closedCounter) { AsyncIOThread ioWriteThread, DatagramChannel ch, SSLBuilder sslBuilder, SSLContext sslContext, final SocketAddress address) {
super(client, ioGroup, ioReadThread, ioWriteThread, connectThread, ioGroup.bufferCapacity, sslBuilder, sslContext, livingCounter, closedCounter); super(client, ioGroup, ioReadThread, ioWriteThread, ioGroup.bufferCapacity, sslBuilder, sslContext);
this.channel = ch; this.channel = ch;
SocketAddress addr = addr0; SocketAddress addr = address;
if (addr == null) { if (addr == null) {
try { try {
addr = ch.getRemoteAddress(); addr = ch.getRemoteAddress();

View File

@@ -121,12 +121,19 @@ class AsyncNioUdpProtocolServer extends ProtocolServer {
@Override @Override
public void run() { public void run() {
final AsyncIOThread[] ioReadThreads = ioGroup.ioReadThreads;
final AsyncIOThread[] ioWriteThreads = ioGroup.ioWriteThreads;
int threads = ioReadThreads.length;
int threadIndex = -1;
while (!closed) { while (!closed) {
final ByteBuffer buffer = unsafeBufferPool.get(); final ByteBuffer buffer = unsafeBufferPool.get();
try { try {
SocketAddress address = serverChannel.receive(buffer); SocketAddress address = serverChannel.receive(buffer);
buffer.flip(); buffer.flip();
accept(address, buffer); if (++threadIndex >= threads) {
threadIndex = 0;
}
accept(address, buffer, ioReadThreads[threadIndex], ioWriteThreads[threadIndex]);
} catch (Throwable t) { } catch (Throwable t) {
unsafeBufferPool.accept(buffer); unsafeBufferPool.accept(buffer);
} }
@@ -136,17 +143,10 @@ class AsyncNioUdpProtocolServer extends ProtocolServer {
this.acceptThread.start(); this.acceptThread.start();
} }
private void accept(SocketAddress address, ByteBuffer buffer) throws IOException { private void accept(SocketAddress address, ByteBuffer buffer, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread) throws IOException {
AsyncIOThread[] ioThreads = ioGroup.nextIOThreads(); ioGroup.connCreateCounter.increment();
LongAdder connCreateCounter = ioGroup.connCreateCounter; ioGroup.connLivingCounter.increment();
if (connCreateCounter != null) { AsyncNioUdpConnection conn = new AsyncNioUdpConnection(false, ioGroup, ioReadThread, ioWriteThread, this.serverChannel, context.getSSLBuilder(), context.getSSLContext(), address);
connCreateCounter.increment();
}
LongAdder connLivingCounter = ioGroup.connLivingCounter;
if (connLivingCounter != null) {
connLivingCounter.increment();
}
AsyncNioUdpConnection conn = new AsyncNioUdpConnection(false, ioGroup, ioThreads[0], ioThreads[1], ioGroup.connectThread(), this.serverChannel, context.getSSLBuilder(), context.getSSLContext(), address, connLivingCounter, ioGroup.connClosedCounter);
ProtocolCodec codec = new ProtocolCodec(context, responseSupplier, responseConsumer, conn); ProtocolCodec codec = new ProtocolCodec(context, responseSupplier, responseConsumer, conn);
conn.protocolCodec = codec; conn.protocolCodec = codec;
if (conn.sslEngine == null) { if (conn.sslEngine == null) {