浼樺寲AsyncNioTcpProtocolServer

This commit is contained in:
Redkale
2023-01-14 11:17:58 +08:00
parent 52b9fd8326
commit 0712b71b71
2 changed files with 12 additions and 6 deletions

View File

@@ -36,10 +36,10 @@ public class AsyncIOGroup extends AsyncGroup {
private boolean skipClose;
//必须与ioWriteThreads数量相同
private AsyncIOThread[] ioReadThreads;
final AsyncIOThread[] ioReadThreads;
//必须与ioReadThreads数量相同
private AsyncIOThread[] ioWriteThreads;
final AsyncIOThread[] ioWriteThreads;
private AsyncIOThread connectThread;

View File

@@ -129,6 +129,10 @@ class AsyncNioTcpProtocolServer extends ProtocolServer {
@Override
public void run() {
final AsyncIOThread[] ioReadThreads = ioGroup.ioReadThreads;
final AsyncIOThread[] ioWriteThreads = ioGroup.ioWriteThreads;
int threads = ioReadThreads.length;
int threadIndex = -1;
while (!closed) {
try {
int count = selector.select();
@@ -141,7 +145,10 @@ class AsyncNioTcpProtocolServer extends ProtocolServer {
SelectionKey key = it.next();
it.remove();
if (key.isAcceptable()) {
accept(key);
if (++threadIndex >= threads) {
threadIndex = 0;
}
accept(key, ioReadThreads[threadIndex], ioWriteThreads[threadIndex]);
}
}
} catch (Throwable t) {
@@ -153,7 +160,7 @@ class AsyncNioTcpProtocolServer extends ProtocolServer {
this.acceptThread.start();
}
private void accept(SelectionKey key) throws IOException {
private void accept(SelectionKey key, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread) throws IOException {
SocketChannel channel = this.serverChannel.accept();
channel.configureBlocking(false);
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
@@ -161,7 +168,6 @@ class AsyncNioTcpProtocolServer extends ProtocolServer {
channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024);
AsyncIOThread[] ioThreads = ioGroup.nextIOThreads();
LongAdder connCreateCounter = ioGroup.connCreateCounter;
if (connCreateCounter != null) {
connCreateCounter.increment();
@@ -170,7 +176,7 @@ class AsyncNioTcpProtocolServer extends ProtocolServer {
if (connLivingCounter != null) {
connLivingCounter.increment();
}
AsyncNioTcpConnection conn = new AsyncNioTcpConnection(false, ioGroup, ioThreads[0], ioThreads[1], ioGroup.connectThread(), channel, context.getSSLBuilder(), context.getSSLContext(), null, connLivingCounter, ioGroup.connClosedCounter);
AsyncNioTcpConnection conn = new AsyncNioTcpConnection(false, ioGroup, ioReadThread, ioWriteThread, ioGroup.connectThread(), channel, context.getSSLBuilder(), context.getSSLContext(), null, connLivingCounter, ioGroup.connClosedCounter);
ProtocolCodec codec = new ProtocolCodec(context, responseSupplier, responseConsumer, conn);
conn.protocolCodec = codec;
if (conn.sslEngine == null) {