From 0712b71b71dde95d4ebbef70f7a024fc6bfcbbf6 Mon Sep 17 00:00:00 2001 From: Redkale Date: Sat, 14 Jan 2023 11:17:58 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B5=BC=E6=A8=BA=E5=AF=B2AsyncNioTcpProtocolS?= =?UTF-8?q?erver?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/org/redkale/net/AsyncIOGroup.java | 4 ++-- .../org/redkale/net/AsyncNioTcpProtocolServer.java | 14 ++++++++++---- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/redkale/net/AsyncIOGroup.java b/src/main/java/org/redkale/net/AsyncIOGroup.java index 187fc0bad..45fce2e53 100644 --- a/src/main/java/org/redkale/net/AsyncIOGroup.java +++ b/src/main/java/org/redkale/net/AsyncIOGroup.java @@ -36,10 +36,10 @@ public class AsyncIOGroup extends AsyncGroup { private boolean skipClose; //必须与ioWriteThreads数量相同 - private AsyncIOThread[] ioReadThreads; + final AsyncIOThread[] ioReadThreads; //必须与ioReadThreads数量相同 - private AsyncIOThread[] ioWriteThreads; + final AsyncIOThread[] ioWriteThreads; private AsyncIOThread connectThread; diff --git a/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java b/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java index 55731e4c5..d291e9d98 100644 --- a/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java +++ b/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java @@ -129,6 +129,10 @@ class AsyncNioTcpProtocolServer extends ProtocolServer { @Override public void run() { + final AsyncIOThread[] ioReadThreads = ioGroup.ioReadThreads; + final AsyncIOThread[] ioWriteThreads = ioGroup.ioWriteThreads; + int threads = ioReadThreads.length; + int threadIndex = -1; while (!closed) { try { int count = selector.select(); @@ -141,7 +145,10 @@ class AsyncNioTcpProtocolServer extends ProtocolServer { SelectionKey key = it.next(); it.remove(); if (key.isAcceptable()) { - accept(key); + if (++threadIndex >= threads) { + threadIndex = 0; + } + accept(key, ioReadThreads[threadIndex], ioWriteThreads[threadIndex]); } } } catch (Throwable t) { @@ -153,7 +160,7 @@ class AsyncNioTcpProtocolServer extends ProtocolServer { this.acceptThread.start(); } - private void accept(SelectionKey key) throws IOException { + private void accept(SelectionKey key, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread) throws IOException { SocketChannel channel = this.serverChannel.accept(); channel.configureBlocking(false); channel.setOption(StandardSocketOptions.TCP_NODELAY, true); @@ -161,7 +168,6 @@ class AsyncNioTcpProtocolServer extends ProtocolServer { channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); - AsyncIOThread[] ioThreads = ioGroup.nextIOThreads(); LongAdder connCreateCounter = ioGroup.connCreateCounter; if (connCreateCounter != null) { connCreateCounter.increment(); @@ -170,7 +176,7 @@ class AsyncNioTcpProtocolServer extends ProtocolServer { if (connLivingCounter != null) { connLivingCounter.increment(); } - AsyncNioTcpConnection conn = new AsyncNioTcpConnection(false, ioGroup, ioThreads[0], ioThreads[1], ioGroup.connectThread(), channel, context.getSSLBuilder(), context.getSSLContext(), null, connLivingCounter, ioGroup.connClosedCounter); + AsyncNioTcpConnection conn = new AsyncNioTcpConnection(false, ioGroup, ioReadThread, ioWriteThread, ioGroup.connectThread(), channel, context.getSSLBuilder(), context.getSSLContext(), null, connLivingCounter, ioGroup.connClosedCounter); ProtocolCodec codec = new ProtocolCodec(context, responseSupplier, responseConsumer, conn); conn.protocolCodec = codec; if (conn.sslEngine == null) {