diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index 2aef4516f..d7e62fb36 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -80,8 +80,8 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl //用于服务端的Socket, 等同于一直存在的readCompletionHandler ProtocolCodec protocolCodec; - protected AsyncConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread, - final int bufferCapacity, SSLBuilder sslBuilder, SSLContext sslContext, final LongAdder livingCounter, final LongAdder closedCounter) { + protected AsyncConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, + AsyncIOThread ioWriteThread, int bufferCapacity, SSLBuilder sslBuilder, SSLContext sslContext) { Objects.requireNonNull(ioGroup); Objects.requireNonNull(ioReadThread); Objects.requireNonNull(ioWriteThread); @@ -94,8 +94,8 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl this.readBufferConsumer = ioReadThread.getBufferConsumer(); this.writeBufferSupplier = ioWriteThread.getBufferSupplier(); this.writeBufferConsumer = ioWriteThread.getBufferConsumer(); - this.livingCounter = livingCounter; - this.closedCounter = closedCounter; + this.livingCounter = ioGroup.connLivingCounter; + this.closedCounter = ioGroup.connClosedCounter; if (client) { //client模式下无SSLBuilder if (sslContext != null) { if (sslBuilder != null) { diff --git a/src/main/java/org/redkale/net/AsyncIOGroup.java b/src/main/java/org/redkale/net/AsyncIOGroup.java index 45fce2e53..3aa34526d 100644 --- a/src/main/java/org/redkale/net/AsyncIOGroup.java +++ b/src/main/java/org/redkale/net/AsyncIOGroup.java @@ -41,7 +41,7 @@ public class AsyncIOGroup extends AsyncGroup { //必须与ioReadThreads数量相同 final AsyncIOThread[] ioWriteThreads; - private AsyncIOThread connectThread; + final AsyncIOThread connectThread; final int bufferCapacity; @@ -52,10 +52,10 @@ public class AsyncIOGroup extends AsyncGroup { //创建数 final LongAdder connCreateCounter = new LongAdder(); - //关闭数 + //在线数 final LongAdder connLivingCounter = new LongAdder(); - //在线数 + //关闭数 final LongAdder connClosedCounter = new LongAdder(); private ScheduledThreadPoolExecutor timeoutExecutor; @@ -100,6 +100,8 @@ public class AsyncIOGroup extends AsyncGroup { safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler()); this.connectThread = client ? new ClientReadIOThread(String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool) : new AsyncIOThread(String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool); + } else { + this.connectThread = null; } } catch (IOException e) { throw new RuntimeException(e); @@ -246,7 +248,7 @@ public class AsyncIOGroup extends AsyncGroup { if (ioThreads == null) { ioThreads = nextIOThreads(); } - return new AsyncNioTcpConnection(true, this, ioThreads[0], ioThreads[1], connectThread, channel, null, null, address, connLivingCounter, connClosedCounter); + return new AsyncNioTcpConnection(true, this, ioThreads[0], ioThreads[1], channel, null, null, address); } @Override @@ -315,7 +317,7 @@ public class AsyncIOGroup extends AsyncGroup { if (ioThreads == null) { ioThreads = nextIOThreads(); } - return new AsyncNioUdpConnection(true, this, ioThreads[0], ioThreads[1], connectThread, channel, null, null, address, connLivingCounter, connClosedCounter); + return new AsyncNioUdpConnection(true, this, ioThreads[0], ioThreads[1], channel, null, null, address); } @Override diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index 696c62f09..015f34310 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -11,7 +11,6 @@ import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Objects; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.LongAdder; import java.util.function.Consumer; import javax.net.ssl.SSLContext; import org.redkale.util.ByteBufferWriter; @@ -97,10 +96,10 @@ abstract class AsyncNioConnection extends AsyncConnection { protected SelectionKey writeKey; - public AsyncNioConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread, AsyncIOThread connectThread, - final int bufferCapacity, SSLBuilder sslBuilder, SSLContext sslContext, LongAdder livingCounter, LongAdder closedCounter) { - super(client, ioGroup, ioReadThread, ioWriteThread, bufferCapacity, sslBuilder, sslContext, livingCounter, closedCounter); - this.connectThread = connectThread; + public AsyncNioConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, + AsyncIOThread ioWriteThread, final int bufferCapacity, SSLBuilder sslBuilder, SSLContext sslContext) { + super(client, ioGroup, ioReadThread, ioWriteThread, bufferCapacity, sslBuilder, sslContext); + this.connectThread = ioGroup.connectThread; } @Override diff --git a/src/main/java/org/redkale/net/AsyncNioTcpConnection.java b/src/main/java/org/redkale/net/AsyncNioTcpConnection.java index 948c7c84f..9a3436af4 100644 --- a/src/main/java/org/redkale/net/AsyncNioTcpConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioTcpConnection.java @@ -10,7 +10,6 @@ import java.net.*; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Set; -import java.util.concurrent.atomic.LongAdder; import javax.net.ssl.SSLContext; import org.redkale.util.ByteBufferReader; @@ -27,11 +26,11 @@ class AsyncNioTcpConnection extends AsyncNioConnection { private final SocketChannel channel; - public AsyncNioTcpConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread, AsyncIOThread connectThread, - SocketChannel ch, SSLBuilder sslBuilder, SSLContext sslContext, final SocketAddress addr0, LongAdder livingCounter, LongAdder closedCounter) { - super(client, ioGroup, ioReadThread, ioWriteThread, connectThread, ioGroup.bufferCapacity, sslBuilder, sslContext, livingCounter, closedCounter); + public AsyncNioTcpConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, + AsyncIOThread ioWriteThread, SocketChannel ch, SSLBuilder sslBuilder, SSLContext sslContext, final SocketAddress address) { + super(client, ioGroup, ioReadThread, ioWriteThread, ioGroup.bufferCapacity, sslBuilder, sslContext); this.channel = ch; - SocketAddress addr = addr0; + SocketAddress addr = address; if (addr == null) { try { addr = ch.getRemoteAddress(); diff --git a/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java b/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java index d291e9d98..778287f15 100644 --- a/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java +++ b/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java @@ -168,15 +168,9 @@ class AsyncNioTcpProtocolServer extends ProtocolServer { channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); - LongAdder connCreateCounter = ioGroup.connCreateCounter; - if (connCreateCounter != null) { - connCreateCounter.increment(); - } - LongAdder connLivingCounter = ioGroup.connLivingCounter; - if (connLivingCounter != null) { - connLivingCounter.increment(); - } - AsyncNioTcpConnection conn = new AsyncNioTcpConnection(false, ioGroup, ioReadThread, ioWriteThread, ioGroup.connectThread(), channel, context.getSSLBuilder(), context.getSSLContext(), null, connLivingCounter, ioGroup.connClosedCounter); + ioGroup.connCreateCounter.increment(); + ioGroup.connLivingCounter.increment(); + AsyncNioTcpConnection conn = new AsyncNioTcpConnection(false, ioGroup, ioReadThread, ioWriteThread, channel, context.getSSLBuilder(), context.getSSLContext(), null); ProtocolCodec codec = new ProtocolCodec(context, responseSupplier, responseConsumer, conn); conn.protocolCodec = codec; if (conn.sslEngine == null) { diff --git a/src/main/java/org/redkale/net/AsyncNioUdpConnection.java b/src/main/java/org/redkale/net/AsyncNioUdpConnection.java index 12e6519d3..640222b19 100644 --- a/src/main/java/org/redkale/net/AsyncNioUdpConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioUdpConnection.java @@ -10,7 +10,6 @@ import java.net.*; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Set; -import java.util.concurrent.atomic.LongAdder; import javax.net.ssl.SSLContext; /** @@ -24,11 +23,11 @@ class AsyncNioUdpConnection extends AsyncNioConnection { private final DatagramChannel channel; - public AsyncNioUdpConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread, AsyncIOThread connectThread, DatagramChannel ch, - SSLBuilder sslBuilder, SSLContext sslContext, final SocketAddress addr0, LongAdder livingCounter, LongAdder closedCounter) { - super(client, ioGroup, ioReadThread, ioWriteThread, connectThread, ioGroup.bufferCapacity, sslBuilder, sslContext, livingCounter, closedCounter); + public AsyncNioUdpConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, + AsyncIOThread ioWriteThread, DatagramChannel ch, SSLBuilder sslBuilder, SSLContext sslContext, final SocketAddress address) { + super(client, ioGroup, ioReadThread, ioWriteThread, ioGroup.bufferCapacity, sslBuilder, sslContext); this.channel = ch; - SocketAddress addr = addr0; + SocketAddress addr = address; if (addr == null) { try { addr = ch.getRemoteAddress(); diff --git a/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java b/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java index fdee2b11c..64bab2aa8 100644 --- a/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java +++ b/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java @@ -121,12 +121,19 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { @Override public void run() { + final AsyncIOThread[] ioReadThreads = ioGroup.ioReadThreads; + final AsyncIOThread[] ioWriteThreads = ioGroup.ioWriteThreads; + int threads = ioReadThreads.length; + int threadIndex = -1; while (!closed) { final ByteBuffer buffer = unsafeBufferPool.get(); try { SocketAddress address = serverChannel.receive(buffer); buffer.flip(); - accept(address, buffer); + if (++threadIndex >= threads) { + threadIndex = 0; + } + accept(address, buffer, ioReadThreads[threadIndex], ioWriteThreads[threadIndex]); } catch (Throwable t) { unsafeBufferPool.accept(buffer); } @@ -136,17 +143,10 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { this.acceptThread.start(); } - private void accept(SocketAddress address, ByteBuffer buffer) throws IOException { - AsyncIOThread[] ioThreads = ioGroup.nextIOThreads(); - LongAdder connCreateCounter = ioGroup.connCreateCounter; - if (connCreateCounter != null) { - connCreateCounter.increment(); - } - LongAdder connLivingCounter = ioGroup.connLivingCounter; - if (connLivingCounter != null) { - connLivingCounter.increment(); - } - AsyncNioUdpConnection conn = new AsyncNioUdpConnection(false, ioGroup, ioThreads[0], ioThreads[1], ioGroup.connectThread(), this.serverChannel, context.getSSLBuilder(), context.getSSLContext(), address, connLivingCounter, ioGroup.connClosedCounter); + private void accept(SocketAddress address, ByteBuffer buffer, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread) throws IOException { + ioGroup.connCreateCounter.increment(); + ioGroup.connLivingCounter.increment(); + AsyncNioUdpConnection conn = new AsyncNioUdpConnection(false, ioGroup, ioReadThread, ioWriteThread, this.serverChannel, context.getSSLBuilder(), context.getSSLContext(), address); ProtocolCodec codec = new ProtocolCodec(context, responseSupplier, responseConsumer, conn); conn.protocolCodec = codec; if (conn.sslEngine == null) {