From f88db8abf976daf0358cba1f7f068b6b5d364b52 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Thu, 25 Jun 2020 23:33:57 +0800 Subject: [PATCH] --- src/org/redkale/net/ProtocolServer.java | 2 +- .../net/{nio => }/TcpNioAsyncConnection.java | 13 ++++--- .../net/{nio => }/TcpNioProtocolServer.java | 35 +++++++++++++++++-- .../redkale/net/nio/NioCompletionHandler.java | 4 +-- src/org/redkale/net/nio/NioThread.java | 9 +++-- src/org/redkale/net/nio/NioThreadGroup.java | 33 +++++++++++++++-- 6 files changed, 81 insertions(+), 15 deletions(-) rename src/org/redkale/net/{nio => }/TcpNioAsyncConnection.java (95%) rename src/org/redkale/net/{nio => }/TcpNioProtocolServer.java (65%) diff --git a/src/org/redkale/net/ProtocolServer.java b/src/org/redkale/net/ProtocolServer.java index 4bdd071a7..b8c5b9f53 100644 --- a/src/org/redkale/net/ProtocolServer.java +++ b/src/org/redkale/net/ProtocolServer.java @@ -73,7 +73,7 @@ public abstract class ProtocolServer { } else if ("aio".equalsIgnoreCase(netimpl)) { return new TcpAioProtocolServer(context); } else if ("nio".equalsIgnoreCase(netimpl)) { - return null;// return new TcpNioProtocolServer(context); + return new TcpNioProtocolServer(context); } } else if ("UDP".equalsIgnoreCase(protocol)) { if (netimpl == null || netimpl.isEmpty()) { diff --git a/src/org/redkale/net/nio/TcpNioAsyncConnection.java b/src/org/redkale/net/TcpNioAsyncConnection.java similarity index 95% rename from src/org/redkale/net/nio/TcpNioAsyncConnection.java rename to src/org/redkale/net/TcpNioAsyncConnection.java index 8728f8327..c2eb11f0d 100644 --- a/src/org/redkale/net/nio/TcpNioAsyncConnection.java +++ b/src/org/redkale/net/TcpNioAsyncConnection.java @@ -3,7 +3,7 @@ * To change this template file, choose Tools | Templates * and open the template in the editor. */ -package org.redkale.net.nio; +package org.redkale.net; import java.io.IOException; import java.net.*; @@ -15,6 +15,9 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.*; import javax.net.ssl.SSLContext; import org.redkale.net.AsyncConnection; +import org.redkale.net.nio.NioCompletionHandler; +import org.redkale.net.nio.NioThread; +import org.redkale.net.nio.NioThreadGroup; import org.redkale.util.ObjectPool; /** @@ -26,7 +29,7 @@ import org.redkale.util.ObjectPool; * * @since 2.1.0 */ -class TcpNioAsyncConnection extends AsyncConnection { +public class TcpNioAsyncConnection extends AsyncConnection { private int readTimeoutSeconds; @@ -304,7 +307,7 @@ class TcpNioAsyncConnection extends AsyncConnection { doWrite(); } - void doConnect() { + public void doConnect() { try { boolean connected = channel.isConnectionPending(); if (connected || channel.connect(remoteAddress)) { @@ -372,7 +375,7 @@ class TcpNioAsyncConnection extends AsyncConnection { this.connectPending = false;//必须放最后 } - void doRead() { + public void doRead() { try { final boolean invokeDirect = this.ioThread.inSameThread(); int totalCount = 0; @@ -440,7 +443,7 @@ class TcpNioAsyncConnection extends AsyncConnection { this.readPending = false; //必须放最后 } - void doWrite() { + public void doWrite() { try { final boolean invokeDirect = this.ioThread.inSameThread(); int totalCount = 0; diff --git a/src/org/redkale/net/nio/TcpNioProtocolServer.java b/src/org/redkale/net/TcpNioProtocolServer.java similarity index 65% rename from src/org/redkale/net/nio/TcpNioProtocolServer.java rename to src/org/redkale/net/TcpNioProtocolServer.java index 6d524d797..eba97271a 100644 --- a/src/org/redkale/net/nio/TcpNioProtocolServer.java +++ b/src/org/redkale/net/TcpNioProtocolServer.java @@ -3,14 +3,17 @@ * To change this template file, choose Tools | Templates * and open the template in the editor. */ -package org.redkale.net.nio; +package org.redkale.net; import java.io.IOException; import java.net.*; +import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.*; +import java.util.concurrent.atomic.AtomicLong; import org.redkale.net.*; -import org.redkale.util.AnyValue; +import org.redkale.net.nio.*; +import org.redkale.util.*; /** * @@ -23,6 +26,10 @@ import org.redkale.util.AnyValue; */ public class TcpNioProtocolServer extends ProtocolServer { + private ObjectPool bufferPool; + + private ObjectPool responsePool; + private ServerSocketChannel serverChannel; private Selector selector; @@ -78,6 +85,18 @@ public class TcpNioProtocolServer extends ProtocolServer { @Override public void accept(Server server) throws IOException { this.serverChannel.register(this.selector, SelectionKey.OP_ACCEPT); + + AtomicLong createBufferCounter = new AtomicLong(); + AtomicLong cycleBufferCounter = new AtomicLong(); + this.bufferPool = server.createBufferPool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize); + AtomicLong createResponseCounter = new AtomicLong(); + AtomicLong cycleResponseCounter = new AtomicLong(); + this.responsePool = server.createResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize); + this.responsePool.setCreator(server.createResponseCreator(bufferPool, responsePool)); + + this.ioGroup = new NioThreadGroup(Runtime.getRuntime().availableProcessors(), context.executor, bufferPool); + this.ioGroup.start(); + this.acceptThread = new Thread() { @Override public void run() { @@ -88,7 +107,6 @@ public class TcpNioProtocolServer extends ProtocolServer { Set keys = selector.selectedKeys(); Iterator it = keys.iterator(); while (it.hasNext()) { - // 获取事件 SelectionKey key = it.next(); it.remove(); if (key.isAcceptable()) accept(key); @@ -105,13 +123,24 @@ public class TcpNioProtocolServer extends ProtocolServer { private void accept(SelectionKey key) throws IOException { SocketChannel channel = this.serverChannel.accept(); channel.configureBlocking(false); + channel.setOption(StandardSocketOptions.TCP_NODELAY, true); + channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); + channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); + channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); + channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); + NioThread ioThread = ioGroup.nextThread(); + AsyncConnection conn = new TcpNioAsyncConnection(ioGroup, ioThread, context.executor, bufferPool, channel, context.getSSLContext(), null, livingCounter, closedCounter); + new PrepareRunner(context, responsePool, conn, null, null).run(); } @Override public void close() throws IOException { if (this.closed) return; this.closed = true; + this.selector.wakeup(); + this.ioGroup.close(); this.serverChannel.close(); + this.selector.close(); } } diff --git a/src/org/redkale/net/nio/NioCompletionHandler.java b/src/org/redkale/net/nio/NioCompletionHandler.java index b51859aa1..7a6c8fb78 100644 --- a/src/org/redkale/net/nio/NioCompletionHandler.java +++ b/src/org/redkale/net/nio/NioCompletionHandler.java @@ -17,13 +17,13 @@ import java.util.concurrent.*; * * @since 2.1.0 */ -class NioCompletionHandler implements CompletionHandler, Runnable { +public class NioCompletionHandler implements CompletionHandler, Runnable { private final CompletionHandler handler; private final A attachment; - ScheduledFuture timeoutFuture; + public ScheduledFuture timeoutFuture; public NioCompletionHandler(CompletionHandler handler, A attachment) { this.handler = handler; diff --git a/src/org/redkale/net/nio/NioThread.java b/src/org/redkale/net/nio/NioThread.java index 44e88ba14..10308bfde 100644 --- a/src/org/redkale/net/nio/NioThread.java +++ b/src/org/redkale/net/nio/NioThread.java @@ -10,6 +10,7 @@ import java.nio.channels.*; import java.util.*; import java.util.concurrent.*; import java.util.function.Consumer; +import org.redkale.net.TcpNioAsyncConnection; import org.redkale.util.*; /** @@ -22,7 +23,7 @@ import org.redkale.util.*; * * @since 2.1.0 */ -class NioThread extends Thread { +public class NioThread extends Thread { final Selector selector; @@ -44,7 +45,7 @@ class NioThread extends Thread { this.setDaemon(true); } - void register(Consumer consumer) { + public void register(Consumer consumer) { registers.offer(consumer); selector.wakeup(); } @@ -89,4 +90,8 @@ class NioThread extends Thread { return this.localThread == thread; } + public void close() { + this.closed = true; + this.interrupt(); + } } diff --git a/src/org/redkale/net/nio/NioThreadGroup.java b/src/org/redkale/net/nio/NioThreadGroup.java index 7756424b4..2715b1490 100644 --- a/src/org/redkale/net/nio/NioThreadGroup.java +++ b/src/org/redkale/net/nio/NioThreadGroup.java @@ -5,8 +5,12 @@ */ package org.redkale.net.nio; -import java.nio.channels.SelectionKey; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; +import org.redkale.util.ObjectPool; /** * 协议处理的IO线程组 @@ -18,12 +22,37 @@ import java.util.concurrent.*; * * @since 2.1.0 */ -class NioThreadGroup { +public class NioThreadGroup { private NioThread[] ioThreads; + private final AtomicInteger index = new AtomicInteger(); + private ScheduledThreadPoolExecutor timeoutExecutor; + public NioThreadGroup(int threads, ExecutorService executor, ObjectPool bufferPool) throws IOException { + this.ioThreads = new NioThread[Math.max(threads, 1)]; + for (int i = 0; i < ioThreads.length; i++) { + this.ioThreads[i] = new NioThread(Selector.open(), executor, bufferPool); + } + } + + public void start() { + for (int i = 0; i < ioThreads.length; i++) { + this.ioThreads[i].start(); + } + } + + public void close() { + for (int i = 0; i < ioThreads.length; i++) { + this.ioThreads[i].close(); + } + } + + public NioThread nextThread() { + return ioThreads[Math.abs(index.getAndIncrement()) % ioThreads.length]; + } + public ScheduledFuture scheduleTimeout(Runnable callable, long delay, TimeUnit unit) { return timeoutExecutor.schedule(callable, delay, unit); }