From 333ba0f16256b0a25109f89f5ad1f0f11e5804da Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Sat, 18 Aug 2018 11:27:50 +0800 Subject: [PATCH] --- .../redkale/net/TcpNioAsyncConnection.java | 13 +- src/org/redkale/net/TcpNioProtocolServer.java | 111 ++++++++++++++---- 2 files changed, 93 insertions(+), 31 deletions(-) diff --git a/src/org/redkale/net/TcpNioAsyncConnection.java b/src/org/redkale/net/TcpNioAsyncConnection.java index 5f5c776de..d075a8ea3 100644 --- a/src/org/redkale/net/TcpNioAsyncConnection.java +++ b/src/org/redkale/net/TcpNioAsyncConnection.java @@ -22,17 +22,17 @@ import java.util.concurrent.atomic.AtomicLong; */ public class TcpNioAsyncConnection extends AsyncConnection { - private int readTimeoutSeconds; + protected int readTimeoutSeconds; - private int writeTimeoutSeconds; + protected int writeTimeoutSeconds; - private final Selector selector; + protected final Selector selector; - private SelectionKey key; + protected SelectionKey key; - private final SocketChannel channel; + protected final SocketChannel channel; - private final SocketAddress remoteAddress; + protected final SocketAddress remoteAddress; ByteBuffer readBuffer; @@ -362,4 +362,5 @@ public class TcpNioAsyncConnection extends AsyncConnection { public final boolean isTCP() { return true; } + } diff --git a/src/org/redkale/net/TcpNioProtocolServer.java b/src/org/redkale/net/TcpNioProtocolServer.java index 9cf54fa4e..05f7380a0 100644 --- a/src/org/redkale/net/TcpNioProtocolServer.java +++ b/src/org/redkale/net/TcpNioProtocolServer.java @@ -10,7 +10,7 @@ import java.net.*; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.*; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.*; import org.redkale.util.AnyValue; /** @@ -27,9 +27,9 @@ public class TcpNioProtocolServer extends ProtocolServer { private ServerSocketChannel serverChannel; - private NIOThreadWorker[] workers; + private NioThreadWorker[] workers; - private NIOThreadWorker currWorker; + private NioThreadWorker currWorker; private boolean running; @@ -82,11 +82,11 @@ public class TcpNioProtocolServer extends ProtocolServer { @Override public void accept() throws IOException { this.serverChannel.register(acceptSelector, SelectionKey.OP_ACCEPT); - final CountDownLatch cdl = new CountDownLatch(1); this.running = true; - this.workers = new NIOThreadWorker[Runtime.getRuntime().availableProcessors()]; + this.workers = new NioThreadWorker[Runtime.getRuntime().availableProcessors()]; + final CountDownLatch wkcdl = new CountDownLatch(workers.length); for (int i = 0; i < workers.length; i++) { - workers[i] = new NIOThreadWorker(); + workers[i] = new NioThreadWorker(wkcdl, i + 1, workers.length); workers[i].setDaemon(true); workers[i].start(); } @@ -95,6 +95,12 @@ public class TcpNioProtocolServer extends ProtocolServer { } workers[workers.length - 1].next = workers[0]; currWorker = workers[0]; + try { + wkcdl.await(3, TimeUnit.SECONDS); + } catch (Exception e) { + throw new IOException(e); + } + final CountDownLatch cdl = new CountDownLatch(1); new Thread() { @Override public void run() { @@ -111,12 +117,6 @@ public class TcpNioProtocolServer extends ProtocolServer { if (key.isAcceptable()) { try { SocketChannel channel = ((ServerSocketChannel) key.channel()).accept(); - channel.configureBlocking(false); - channel.setOption(StandardSocketOptions.TCP_NODELAY, true); - channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); - channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); - channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); - channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); createCounter.incrementAndGet(); livingCounter.incrementAndGet(); currWorker.addChannel(channel); @@ -134,45 +134,91 @@ public class TcpNioProtocolServer extends ProtocolServer { } }.start(); try { - cdl.await(); + cdl.await(3, TimeUnit.SECONDS); } catch (Exception e) { - e.printStackTrace(); + throw new IOException(e); } } @Override public void close() throws IOException { if (!this.running) return; - this.running = false; serverChannel.close(); acceptSelector.close(); - for (NIOThreadWorker worker : workers) { + for (NioThreadWorker worker : workers) { worker.interrupt(); } + this.running = false; } - class NIOThreadWorker extends Thread { + class NioThreadWorker extends Thread { final Selector selector; - NIOThreadWorker next; + final CountDownLatch cdl; - public NIOThreadWorker() { + private final Queue connected; + + private final CopyOnWriteArrayList done; + + protected volatile Thread ownerThread; + + NioThreadWorker next; + + public NioThreadWorker(final CountDownLatch cdl, int idx, int count) { + this.cdl = cdl; + String idxstr = "000000" + idx; + this.setName("NioThreadWorker:" + context.getServerAddress().getPort() + "-" + idxstr.substring(idxstr.length() - ("" + count).length())); try { this.selector = Selector.open(); } catch (IOException e) { throw new RuntimeException(e); } + this.connected = new ArrayBlockingQueue<>(1000000); + this.done = new CopyOnWriteArrayList<>(); } - public void addChannel(SocketChannel channel) throws IOException { - AsyncConnection conn = new TcpNioAsyncConnection(channel, null, selector, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null); - context.runAsync(new PrepareRunner(context, conn, null, null)); + public boolean addChannel(SocketChannel channel) throws IOException { + TcpNioAsyncConnection conn = new TcpNioAsyncConnection(channel, null, selector, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null); + return connected.add(conn); + } + + protected void processConnected() { + TcpNioAsyncConnection schannel; + try { + while ((schannel = connected.poll()) != null) { + SocketChannel channel = schannel.channel; + channel.configureBlocking(false); + channel.setOption(StandardSocketOptions.TCP_NODELAY, true); + channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); + channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); + channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); + channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); + channel.register(selector, SelectionKey.OP_READ).attach(schannel); + } + } catch (IOException e) { + // do nothing + } + synchronized (done) { + for (TcpNioAsyncConnection conn : done) { + if (conn.key != null && conn.key.isValid()) { + conn.key.interestOps(SelectionKey.OP_WRITE); + } + } + done.clear(); + } + } + + public boolean isSameThread() { + return this.ownerThread == Thread.currentThread(); } @Override public void run() { + this.ownerThread = Thread.currentThread(); + if (cdl != null) cdl.countDown(); while (running) { + processConnected(); try { selector.select(50); } catch (IOException e) { @@ -207,13 +253,28 @@ public class TcpNioProtocolServer extends ProtocolServer { return; } if (conn == null) return; - if (key.isWritable()) { - if (conn.writeHandler != null) writeOP(key, socket, conn); - } else if (key.isReadable()) { + if (key.isReadable()) { if (conn.readHandler != null) readOP(key, socket, conn); + } else if (key.isWritable()) { + if (conn.writeHandler != null) writeOP(key, socket, conn); } } + private void closeOP(SelectionKey key) { + if (key == null) return; + TcpNioAsyncConnection conn = (TcpNioAsyncConnection) key.attachment(); + try { + if (key.isValid()) { + SocketChannel socketChannel = (SocketChannel) key.channel(); + socketChannel.close(); + key.attach(null); + key.cancel(); + } + } catch (IOException e) { + } + conn.dispose(); + } + private void readOP(SelectionKey key, SocketChannel socket, TcpNioAsyncConnection conn) { final CompletionHandler handler = conn.removeReadHandler(); final ByteBuffer buffer = conn.removeReadBuffer();