From 0c8b0f5e1914e6464cc13325e9b1f5b7c8fed7c5 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Thu, 25 Jun 2020 20:55:17 +0800 Subject: [PATCH] --- src/org/redkale/net/nio/NioThread.java | 26 +++----- .../redkale/net/nio/TcpNioProtocolServer.java | 64 +++++++++++++++++-- 2 files changed, 66 insertions(+), 24 deletions(-) diff --git a/src/org/redkale/net/nio/NioThread.java b/src/org/redkale/net/nio/NioThread.java index 8ec770cbe..44e88ba14 100644 --- a/src/org/redkale/net/nio/NioThread.java +++ b/src/org/redkale/net/nio/NioThread.java @@ -64,23 +64,15 @@ class NioThread extends Thread { Iterator it = keys.iterator(); while (it.hasNext()) { SelectionKey key = it.next(); - try { - if (key.isAcceptable()) { - TcpNioProtocolServer sc = (TcpNioProtocolServer) key.attachment(); - sc.doAccept(); - continue; - } - TcpNioAsyncConnection conn = (TcpNioAsyncConnection) key.attachment(); - if (key.isWritable()) { - key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); - conn.doWrite(); - } else if (key.isReadable()) { - conn.doRead(); - } else if (key.isConnectable()) { - conn.doConnect(); - } - } finally { - it.remove(); + it.remove(); + TcpNioAsyncConnection conn = (TcpNioAsyncConnection) key.attachment(); + if (key.isWritable()) { + key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); + conn.doWrite(); + } else if (key.isReadable()) { + conn.doRead(); + } else if (key.isConnectable()) { + conn.doConnect(); } } } catch (Exception ex) { diff --git a/src/org/redkale/net/nio/TcpNioProtocolServer.java b/src/org/redkale/net/nio/TcpNioProtocolServer.java index 93f2b06a2..6d524d797 100644 --- a/src/org/redkale/net/nio/TcpNioProtocolServer.java +++ b/src/org/redkale/net/nio/TcpNioProtocolServer.java @@ -7,8 +7,8 @@ package org.redkale.net.nio; import java.io.IOException; import java.net.*; -import java.nio.channels.ServerSocketChannel; -import java.util.Set; +import java.nio.channels.*; +import java.util.*; import org.redkale.net.*; import org.redkale.util.AnyValue; @@ -25,15 +25,39 @@ public class TcpNioProtocolServer extends ProtocolServer { private ServerSocketChannel serverChannel; + private Selector selector; + private NioThreadGroup ioGroup; + private Thread acceptThread; + + private boolean closed; + public TcpNioProtocolServer(Context context) { super(context); } @Override public void open(AnyValue config) throws IOException { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + this.serverChannel = ServerSocketChannel.open(); + this.serverChannel.configureBlocking(false); + this.selector = Selector.open(); + final Set> options = this.serverChannel.supportedOptions(); + if (options.contains(StandardSocketOptions.TCP_NODELAY)) { + this.serverChannel.setOption(StandardSocketOptions.TCP_NODELAY, true); + } + if (options.contains(StandardSocketOptions.SO_KEEPALIVE)) { + this.serverChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); + } + if (options.contains(StandardSocketOptions.SO_REUSEADDR)) { + this.serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); + } + if (options.contains(StandardSocketOptions.SO_RCVBUF)) { + this.serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); + } + if (options.contains(StandardSocketOptions.SO_SNDBUF)) { + this.serverChannel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); + } } @Override @@ -53,15 +77,41 @@ public class TcpNioProtocolServer extends ProtocolServer { @Override public void accept(Server server) throws IOException { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + this.serverChannel.register(this.selector, SelectionKey.OP_ACCEPT); + this.acceptThread = new Thread() { + @Override + public void run() { + while (!closed) { + try { + int count = selector.select(); + if (count == 0) continue; + Set keys = selector.selectedKeys(); + Iterator it = keys.iterator(); + while (it.hasNext()) { + // 获取事件 + SelectionKey key = it.next(); + it.remove(); + if (key.isAcceptable()) accept(key); + } + } catch (Throwable t) { + t.printStackTrace(); + } + } + } + }; + this.acceptThread.start(); + } + + private void accept(SelectionKey key) throws IOException { + SocketChannel channel = this.serverChannel.accept(); + channel.configureBlocking(false); } @Override public void close() throws IOException { + if (this.closed) return; + this.closed = true; this.serverChannel.close(); } - void doAccept() { - - } }