diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index a4f34a662..4bc7e31a4 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -52,9 +52,13 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl protected final int bufferCapacity; - private final Supplier bufferSupplier; + private final Supplier readBufferSupplier; - private final Consumer bufferConsumer; + private final Consumer readBufferConsumer; + + private final Supplier writeBufferSupplier; + + private final Consumer writeBufferConsumer; private ByteBufferWriter pipelineWriter; @@ -78,24 +82,34 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl //用于服务端的Socket, 等同于一直存在的readCompletionHandler ProtocolCodec protocolCodec; - protected AsyncConnection(boolean client, AsyncGroup ioGroup, AsyncIOThread ioThread, final int bufferCapacity, ObjectPool bufferPool, - SSLBuilder sslBuilder, SSLContext sslContext, final LongAdder livingCounter, final LongAdder closedCounter) { - this(client, ioGroup, ioThread, bufferCapacity, bufferPool, bufferPool, sslBuilder, sslContext, livingCounter, closedCounter); + protected AsyncConnection(boolean client, AsyncGroup ioGroup, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread, + final int bufferCapacity, SSLBuilder sslBuilder, SSLContext sslContext, final LongAdder livingCounter, final LongAdder closedCounter) { + this(client, ioGroup, ioReadThread, ioWriteThread, bufferCapacity, + ioReadThread.getBufferSupplier(), ioReadThread.getBufferConsumer(), + ioWriteThread.getBufferSupplier(), ioWriteThread.getBufferConsumer(), + sslBuilder, sslContext, livingCounter, closedCounter); } - protected AsyncConnection(boolean client, AsyncGroup ioGroup, AsyncIOThread ioThread, final int bufferCapacity, Supplier bufferSupplier, - Consumer bufferConsumer, SSLBuilder sslBuilder, SSLContext sslContext, final LongAdder livingCounter, final LongAdder closedCounter) { + protected AsyncConnection(boolean client, AsyncGroup ioGroup, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread, final int bufferCapacity, + Supplier readBufferSupplier, Consumer readBufferConsumer, + Supplier writeBufferSupplier, Consumer writeBufferConsumer, + SSLBuilder sslBuilder, SSLContext sslContext, final LongAdder livingCounter, final LongAdder closedCounter) { Objects.requireNonNull(ioGroup); - Objects.requireNonNull(ioThread); - Objects.requireNonNull(bufferSupplier); - Objects.requireNonNull(bufferConsumer); + Objects.requireNonNull(ioReadThread); + Objects.requireNonNull(ioWriteThread); + Objects.requireNonNull(readBufferSupplier); + Objects.requireNonNull(readBufferConsumer); + Objects.requireNonNull(writeBufferSupplier); + Objects.requireNonNull(writeBufferConsumer); this.client = client; this.ioGroup = ioGroup; - this.ioReadThread = ioThread; - this.ioWriteThread = ioThread; + this.ioReadThread = ioReadThread; + this.ioWriteThread = ioWriteThread; this.bufferCapacity = bufferCapacity; - this.bufferSupplier = bufferSupplier; - this.bufferConsumer = bufferConsumer; + this.readBufferSupplier = readBufferSupplier; + this.readBufferConsumer = readBufferConsumer; + this.writeBufferSupplier = writeBufferSupplier; + this.writeBufferConsumer = writeBufferConsumer; this.livingCounter = livingCounter; this.closedCounter = closedCounter; if (client) { //client模式下无SSLBuilder @@ -114,19 +128,19 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl } public Supplier getReadBufferSupplier() { - return this.bufferSupplier; + return this.readBufferSupplier; } public Consumer getReadBufferConsumer() { - return this.bufferConsumer; + return this.readBufferConsumer; } public Supplier getWriteBufferSupplier() { - return this.bufferSupplier; + return this.writeBufferSupplier; } public Consumer getWriteBufferConsumer() { - return this.bufferConsumer; + return this.writeBufferConsumer; } public final long getLastReadTime() { @@ -337,7 +351,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl }; write(buffer, null, newhandler); } else { - ByteBufferWriter writer = ByteBufferWriter.create(sslEngine == null ? bufferSupplier : () -> pollWriteSSLBuffer(), buffer); + ByteBufferWriter writer = ByteBufferWriter.create(sslEngine == null ? writeBufferSupplier : () -> pollWriteSSLBuffer(), buffer); writer.put(headerContent, headerOffset, headerLength); if (bodyLength > 0) { writer.put(bodyContent, bodyOffset, bodyLength); @@ -583,7 +597,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl this.readSSLHalfBuffer = null; return rs; } - return bufferSupplier.get(); + return readBufferSupplier.get(); } public ByteBuffer pollReadBuffer() { @@ -592,21 +606,21 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl this.readBuffer = null; return rs; } - return bufferSupplier.get(); + return readBufferSupplier.get(); } public void offerReadBuffer(ByteBuffer buffer) { if (buffer == null) { return; } - bufferConsumer.accept(buffer); + readBufferConsumer.accept(buffer); } public void offerReadBuffer(ByteBuffer... buffers) { if (buffers == null) { return; } - Consumer consumer = this.bufferConsumer; + Consumer consumer = this.readBufferConsumer; for (ByteBuffer buffer : buffers) { consumer.accept(buffer); } @@ -616,25 +630,25 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl if (buffer == null) { return; } - bufferConsumer.accept(buffer); + writeBufferConsumer.accept(buffer); } public void offerWriteBuffer(ByteBuffer... buffers) { if (buffers == null) { return; } - Consumer consumer = this.bufferConsumer; + Consumer consumer = this.writeBufferConsumer; for (ByteBuffer buffer : buffers) { consumer.accept(buffer); } } public ByteBuffer pollWriteSSLBuffer() { - return bufferSupplier.get(); + return writeBufferSupplier.get(); } public ByteBuffer pollWriteBuffer() { - return bufferSupplier.get(); + return writeBufferSupplier.get(); } public void dispose() {//同close, 只是去掉throws IOException @@ -674,7 +688,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl } } if (this.readBuffer != null) { - Consumer consumer = this.bufferConsumer; + Consumer consumer = this.readBufferConsumer; if (consumer != null) { consumer.accept(this.readBuffer); } diff --git a/src/main/java/org/redkale/net/AsyncIOGroup.java b/src/main/java/org/redkale/net/AsyncIOGroup.java index 2b292b638..1cbfc4546 100644 --- a/src/main/java/org/redkale/net/AsyncIOGroup.java +++ b/src/main/java/org/redkale/net/AsyncIOGroup.java @@ -9,6 +9,7 @@ import java.io.IOException; import java.net.*; import java.nio.ByteBuffer; import java.nio.channels.*; +import java.util.Objects; import java.util.concurrent.*; import java.util.concurrent.atomic.*; import org.redkale.annotation.ResourceType; @@ -33,7 +34,11 @@ public class AsyncIOGroup extends AsyncGroup { private boolean skipClose; - AsyncIOThread[] ioThreads; + //必须与ioWriteThreads数量相同 + private AsyncIOThread[] ioReadThreads; + + //必须与ioReadThreads数量相同 + private AsyncIOThread[] ioWriteThreads; private AsyncIOThread connectThread; @@ -72,14 +77,23 @@ public class AsyncIOGroup extends AsyncGroup { public AsyncIOGroup(boolean client, String threadPrefixName0, ExecutorService workExecutor, final int bufferCapacity, ObjectPool safeBufferPool) { this.bufferCapacity = bufferCapacity; final String threadPrefixName = threadPrefixName0 == null ? "Redkale-Client-IOThread" : threadPrefixName0; - this.ioThreads = new AsyncIOThread[Utility.cpus()]; + final int threads = Utility.cpus(); + this.ioReadThreads = new AsyncIOThread[threads]; + this.ioWriteThreads = new AsyncIOThread[threads]; try { - for (int i = 0; i < this.ioThreads.length; i++) { - ObjectPool unsafeBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(), + for (int i = 0; i < threads; i++) { + ObjectPool unsafeReadBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(), safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler()); String name = threadPrefixName + "-" + (i >= 9 ? (i + 1) : ("0" + (i + 1))); - this.ioThreads[i] = client ? new ClientIOThread(name, i, ioThreads.length, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool) - : new AsyncIOThread(name, i, ioThreads.length, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool); + this.ioReadThreads[i] = client ? new ClientIOThread(name, i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool) + : new AsyncIOThread(name, i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool); + if (client) { + this.ioReadThreads[i] = new ClientIOThread(name, i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool); + this.ioWriteThreads[i] = this.ioReadThreads[i]; + } else { + this.ioReadThreads[i] = new AsyncIOThread(name, i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool); + this.ioWriteThreads[i] = this.ioReadThreads[i]; + } } if (client) { ObjectPool unsafeBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(), @@ -99,10 +113,6 @@ public class AsyncIOGroup extends AsyncGroup { }); } - public int size() { - return this.ioThreads.length; - } - @Override public AsyncGroup start() { if (started) { @@ -111,8 +121,11 @@ public class AsyncIOGroup extends AsyncGroup { if (closed) { throw new RuntimeException("group is closed"); } - for (AsyncIOThread thread : ioThreads) { - thread.start(); + for (int i = 0; i < this.ioReadThreads.length; i++) { + this.ioReadThreads[i].start(); + if (this.ioWriteThreads[i] != this.ioReadThreads[i]) { + this.ioWriteThreads[i].start(); + } } if (connectThread != null) { connectThread.start(); @@ -142,8 +155,11 @@ public class AsyncIOGroup extends AsyncGroup { if (closed) { return this; } - for (AsyncIOThread thread : ioThreads) { - thread.close(); + for (int i = 0; i < this.ioReadThreads.length; i++) { + this.ioReadThreads[i].close(); + if (this.ioWriteThreads[i] != this.ioReadThreads[i]) { + this.ioWriteThreads[i].close(); + } } if (connectThread != null) { connectThread.close(); @@ -165,8 +181,9 @@ public class AsyncIOGroup extends AsyncGroup { return connClosedCounter; } - public AsyncIOThread nextIOThread() { - return ioThreads[Math.abs(readIndex.getAndIncrement()) % ioThreads.length]; + public AsyncIOThread[] nextIOThreads() { + int i = Math.abs(readIndex.getAndIncrement()) % ioReadThreads.length; + return new AsyncIOThread[]{ioReadThreads[i], ioWriteThreads[i]}; } public AsyncIOThread connectThread() { @@ -202,32 +219,47 @@ public class AsyncIOGroup extends AsyncGroup { } } - @Override - public CompletableFuture createTCPClient(final SocketAddress address, final int readTimeoutSeconds, final int writeTimeoutSeconds) { - SocketChannel channel; + //创建一个AsyncConnection对象,只给测试代码使用 + public AsyncConnection newTCPClientConnection() { try { - channel = SocketChannel.open(); - channel.configureBlocking(false); - channel.setOption(StandardSocketOptions.TCP_NODELAY, true); - channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); - channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); + return newTCPClientConnection(null); } catch (IOException e) { - return CompletableFuture.failedFuture(e); + throw new RuntimeException(e); } - AsyncIOThread ioThread = null; + } + + private AsyncNioTcpConnection newTCPClientConnection(final SocketAddress address) throws IOException { + SocketChannel channel = SocketChannel.open(); + channel.configureBlocking(false); + channel.setOption(StandardSocketOptions.TCP_NODELAY, true); + channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); + channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); + + AsyncIOThread[] ioThreads = null; Thread currThread = Thread.currentThread(); if (currThread instanceof AsyncIOThread) { - for (AsyncIOThread thread : ioThreads) { - if (thread == currThread) { - ioThread = thread; + for (int i = 0; i < this.ioReadThreads.length; i++) { + if (this.ioReadThreads[i] == currThread || this.ioWriteThreads[i] == currThread) { + ioThreads = new AsyncIOThread[]{this.ioReadThreads[i], this.ioWriteThreads[i]}; break; } } } - if (ioThread == null) { - ioThread = nextIOThread(); + if (ioThreads == null) { + ioThreads = nextIOThreads(); + } + return new AsyncNioTcpConnection(true, this, ioThreads[0], ioThreads[1], connectThread, channel, null, null, address, connLivingCounter, connClosedCounter); + } + + @Override + public CompletableFuture createTCPClient(final SocketAddress address, final int readTimeoutSeconds, final int writeTimeoutSeconds) { + Objects.requireNonNull(address); + AsyncNioTcpConnection conn; + try { + conn = newTCPClientConnection(address); + } catch (IOException e) { + return CompletableFuture.failedFuture(e); } - final AsyncNioTcpConnection conn = new AsyncNioTcpConnection(true, this, ioThread, connectThread, channel, null, null, address, connLivingCounter, connClosedCounter); final CompletableFuture future = new CompletableFuture<>(); conn.connect(address, null, new CompletionHandler() { @Override @@ -261,30 +293,41 @@ public class AsyncIOGroup extends AsyncGroup { return Utility.orTimeout(future, 30, TimeUnit.SECONDS); } - @Override - public CompletableFuture createUDPClient(final SocketAddress address, final int readTimeoutSeconds, final int writeTimeoutSeconds) { - DatagramChannel channel; + //创建一个AsyncConnection对象,只给测试代码使用 + public AsyncConnection newUDPClientConnection() { try { - channel = DatagramChannel.open(); + return newUDPClientConnection(null); } catch (IOException e) { - CompletableFuture future = new CompletableFuture(); - future.completeExceptionally(e); - return future; + throw new RuntimeException(e); } - AsyncIOThread ioThread = null; + } + + private AsyncNioUdpConnection newUDPClientConnection(final SocketAddress address) throws IOException { + DatagramChannel channel = DatagramChannel.open(); + AsyncIOThread[] ioThreads = null; Thread currThread = Thread.currentThread(); if (currThread instanceof AsyncIOThread) { - for (AsyncIOThread thread : ioThreads) { - if (thread == currThread) { - ioThread = thread; + for (int i = 0; i < this.ioReadThreads.length; i++) { + if (this.ioReadThreads[i] == currThread || this.ioWriteThreads[i] == currThread) { + ioThreads = new AsyncIOThread[]{this.ioReadThreads[i], this.ioWriteThreads[i]}; break; } } } - if (ioThread == null) { - ioThread = nextIOThread(); + if (ioThreads == null) { + ioThreads = nextIOThreads(); + } + return new AsyncNioUdpConnection(true, this, ioThreads[0], ioThreads[1], connectThread, channel, null, null, address, connLivingCounter, connClosedCounter); + } + + @Override + public CompletableFuture createUDPClient(final SocketAddress address, final int readTimeoutSeconds, final int writeTimeoutSeconds) { + AsyncNioUdpConnection conn; + try { + conn = newUDPClientConnection(address); + } catch (IOException e) { + return CompletableFuture.failedFuture(e); } - AsyncNioUdpConnection conn = new AsyncNioUdpConnection(true, this, ioThread, connectThread, channel, null, null, address, connLivingCounter, connClosedCounter); CompletableFuture future = new CompletableFuture(); conn.connect(address, null, new CompletionHandler() { @Override diff --git a/src/main/java/org/redkale/net/AsyncIOThread.java b/src/main/java/org/redkale/net/AsyncIOThread.java index ce3fc9083..9f93b3a74 100644 --- a/src/main/java/org/redkale/net/AsyncIOThread.java +++ b/src/main/java/org/redkale/net/AsyncIOThread.java @@ -73,7 +73,7 @@ public class AsyncIOThread extends WorkThread { * @param command */ @Override - public final void execute(Runnable command) { + public void execute(Runnable command) { commandQueue.offer(command); selector.wakeup(); } @@ -84,7 +84,7 @@ public class AsyncIOThread extends WorkThread { * @param commands */ @Override - public final void execute(Runnable... commands) { + public void execute(Runnable... commands) { for (Runnable command : commands) { commandQueue.offer(command); } @@ -97,7 +97,7 @@ public class AsyncIOThread extends WorkThread { * @param commands */ @Override - public final void execute(Collection commands) { + public void execute(Collection commands) { if (commands != null) { for (Runnable command : commands) { commandQueue.offer(command); diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index e00641ef1..9832e49b8 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -12,9 +12,9 @@ import java.nio.channels.*; import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.LongAdder; -import java.util.function.*; +import java.util.function.Consumer; import javax.net.ssl.SSLContext; -import org.redkale.util.*; +import org.redkale.util.ByteBufferWriter; /** * @@ -99,15 +99,9 @@ abstract class AsyncNioConnection extends AsyncConnection { protected SelectionKey writeKey; - public AsyncNioConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioThread, AsyncIOThread connectThread, - final int bufferCapacity, ObjectPool bufferPool, SSLBuilder sslBuilder, SSLContext sslContext, LongAdder livingCounter, LongAdder closedCounter) { - super(client, ioGroup, ioThread, bufferCapacity, bufferPool, sslBuilder, sslContext, livingCounter, closedCounter); - this.connectThread = connectThread; - } - - public AsyncNioConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioThread, AsyncIOThread connectThread, - final int bufferCapacity, Supplier bufferSupplier, Consumer bufferConsumer, SSLBuilder sslBuilder, SSLContext sslContext, LongAdder livingCounter, LongAdder closedCounter) { - super(client, ioGroup, ioThread, bufferCapacity, bufferSupplier, bufferConsumer, sslBuilder, sslContext, livingCounter, closedCounter); + public AsyncNioConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread, AsyncIOThread connectThread, + final int bufferCapacity, SSLBuilder sslBuilder, SSLContext sslContext, LongAdder livingCounter, LongAdder closedCounter) { + super(client, ioGroup, ioReadThread, ioWriteThread, bufferCapacity, sslBuilder, sslContext, livingCounter, closedCounter); this.connectThread = connectThread; } diff --git a/src/main/java/org/redkale/net/AsyncNioTcpConnection.java b/src/main/java/org/redkale/net/AsyncNioTcpConnection.java index 33b6b5acf..948c7c84f 100644 --- a/src/main/java/org/redkale/net/AsyncNioTcpConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioTcpConnection.java @@ -27,9 +27,9 @@ class AsyncNioTcpConnection extends AsyncNioConnection { private final SocketChannel channel; - public AsyncNioTcpConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioThread, AsyncIOThread connectThread, + public AsyncNioTcpConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread, AsyncIOThread connectThread, SocketChannel ch, SSLBuilder sslBuilder, SSLContext sslContext, final SocketAddress addr0, LongAdder livingCounter, LongAdder closedCounter) { - super(client, ioGroup, ioThread, connectThread, ioGroup.bufferCapacity, ioThread.getBufferSupplier(), ioThread.getBufferConsumer(), sslBuilder, sslContext, livingCounter, closedCounter); + super(client, ioGroup, ioReadThread, ioWriteThread, connectThread, ioGroup.bufferCapacity, sslBuilder, sslContext, livingCounter, closedCounter); this.channel = ch; SocketAddress addr = addr0; if (addr == null) { @@ -40,7 +40,7 @@ class AsyncNioTcpConnection extends AsyncNioConnection { } } this.remoteAddress = addr; - ioThread.connCounter.incrementAndGet(); + ioReadThread.connCounter.incrementAndGet(); } @Override diff --git a/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java b/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java index e2275331a..0a478e0e5 100644 --- a/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java +++ b/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java @@ -161,7 +161,7 @@ class AsyncNioTcpProtocolServer extends ProtocolServer { channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); - AsyncIOThread readThread = ioGroup.nextIOThread(); + AsyncIOThread[] ioThreads = ioGroup.nextIOThreads(); LongAdder connCreateCounter = ioGroup.connCreateCounter; if (connCreateCounter != null) { connCreateCounter.increment(); @@ -170,7 +170,7 @@ class AsyncNioTcpProtocolServer extends ProtocolServer { if (connLivingCounter != null) { connLivingCounter.increment(); } - AsyncNioTcpConnection conn = new AsyncNioTcpConnection(false, ioGroup, readThread, ioGroup.connectThread(), channel, context.getSSLBuilder(), context.getSSLContext(), null, connLivingCounter, ioGroup.connClosedCounter); + AsyncNioTcpConnection conn = new AsyncNioTcpConnection(false, ioGroup, ioThreads[0], ioThreads[1], ioGroup.connectThread(), channel, context.getSSLBuilder(), context.getSSLContext(), null, connLivingCounter, ioGroup.connClosedCounter); ProtocolCodec codec = new ProtocolCodec(context, responseSupplier, responseConsumer, conn); conn.protocolCodec = codec; if (conn.sslEngine == null) { diff --git a/src/main/java/org/redkale/net/AsyncNioUdpConnection.java b/src/main/java/org/redkale/net/AsyncNioUdpConnection.java index ed8320749..12e6519d3 100644 --- a/src/main/java/org/redkale/net/AsyncNioUdpConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioUdpConnection.java @@ -24,9 +24,9 @@ class AsyncNioUdpConnection extends AsyncNioConnection { private final DatagramChannel channel; - public AsyncNioUdpConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioThread, AsyncIOThread connectThread, DatagramChannel ch, + public AsyncNioUdpConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread, AsyncIOThread connectThread, DatagramChannel ch, SSLBuilder sslBuilder, SSLContext sslContext, final SocketAddress addr0, LongAdder livingCounter, LongAdder closedCounter) { - super(client, ioGroup, ioThread, connectThread, ioGroup.bufferCapacity, ioThread.getBufferSupplier(), ioThread.getBufferConsumer(), sslBuilder, sslContext, livingCounter, closedCounter); + super(client, ioGroup, ioReadThread, ioWriteThread, connectThread, ioGroup.bufferCapacity, sslBuilder, sslContext, livingCounter, closedCounter); this.channel = ch; SocketAddress addr = addr0; if (addr == null) { diff --git a/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java b/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java index 8c7dcc550..778aa5d73 100644 --- a/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java +++ b/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java @@ -137,7 +137,7 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { } private void accept(SocketAddress address, ByteBuffer buffer) throws IOException { - AsyncIOThread readThread = ioGroup.nextIOThread(); + AsyncIOThread[] ioThreads = ioGroup.nextIOThreads(); LongAdder connCreateCounter = ioGroup.connCreateCounter; if (connCreateCounter != null) { connCreateCounter.increment(); @@ -146,7 +146,7 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { if (connLivingCounter != null) { connLivingCounter.increment(); } - AsyncNioUdpConnection conn = new AsyncNioUdpConnection(false, ioGroup, readThread, ioGroup.connectThread(), this.serverChannel, context.getSSLBuilder(), context.getSSLContext(), address, connLivingCounter, ioGroup.connClosedCounter); + AsyncNioUdpConnection conn = new AsyncNioUdpConnection(false, ioGroup, ioThreads[0], ioThreads[1], ioGroup.connectThread(), this.serverChannel, context.getSSLBuilder(), context.getSSLContext(), address, connLivingCounter, ioGroup.connClosedCounter); ProtocolCodec codec = new ProtocolCodec(context, responseSupplier, responseConsumer, conn); conn.protocolCodec = codec; if (conn.sslEngine == null) {