This commit is contained in:
Redkale
2018-08-18 11:27:50 +08:00
parent e039b0e9f6
commit 333ba0f162
2 changed files with 93 additions and 31 deletions

View File

@@ -22,17 +22,17 @@ import java.util.concurrent.atomic.AtomicLong;
*/ */
public class TcpNioAsyncConnection extends AsyncConnection { public class TcpNioAsyncConnection extends AsyncConnection {
private int readTimeoutSeconds; protected int readTimeoutSeconds;
private int writeTimeoutSeconds; protected int writeTimeoutSeconds;
private final Selector selector; protected final Selector selector;
private SelectionKey key; protected SelectionKey key;
private final SocketChannel channel; protected final SocketChannel channel;
private final SocketAddress remoteAddress; protected final SocketAddress remoteAddress;
ByteBuffer readBuffer; ByteBuffer readBuffer;
@@ -362,4 +362,5 @@ public class TcpNioAsyncConnection extends AsyncConnection {
public final boolean isTCP() { public final boolean isTCP() {
return true; return true;
} }
} }

View File

@@ -10,7 +10,7 @@ import java.net.*;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.*; import java.nio.channels.*;
import java.util.*; import java.util.*;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.*;
import org.redkale.util.AnyValue; import org.redkale.util.AnyValue;
/** /**
@@ -27,9 +27,9 @@ public class TcpNioProtocolServer extends ProtocolServer {
private ServerSocketChannel serverChannel; private ServerSocketChannel serverChannel;
private NIOThreadWorker[] workers; private NioThreadWorker[] workers;
private NIOThreadWorker currWorker; private NioThreadWorker currWorker;
private boolean running; private boolean running;
@@ -82,11 +82,11 @@ public class TcpNioProtocolServer extends ProtocolServer {
@Override @Override
public void accept() throws IOException { public void accept() throws IOException {
this.serverChannel.register(acceptSelector, SelectionKey.OP_ACCEPT); this.serverChannel.register(acceptSelector, SelectionKey.OP_ACCEPT);
final CountDownLatch cdl = new CountDownLatch(1);
this.running = true; this.running = true;
this.workers = new NIOThreadWorker[Runtime.getRuntime().availableProcessors()]; this.workers = new NioThreadWorker[Runtime.getRuntime().availableProcessors()];
final CountDownLatch wkcdl = new CountDownLatch(workers.length);
for (int i = 0; i < workers.length; i++) { for (int i = 0; i < workers.length; i++) {
workers[i] = new NIOThreadWorker(); workers[i] = new NioThreadWorker(wkcdl, i + 1, workers.length);
workers[i].setDaemon(true); workers[i].setDaemon(true);
workers[i].start(); workers[i].start();
} }
@@ -95,6 +95,12 @@ public class TcpNioProtocolServer extends ProtocolServer {
} }
workers[workers.length - 1].next = workers[0]; workers[workers.length - 1].next = workers[0];
currWorker = workers[0]; currWorker = workers[0];
try {
wkcdl.await(3, TimeUnit.SECONDS);
} catch (Exception e) {
throw new IOException(e);
}
final CountDownLatch cdl = new CountDownLatch(1);
new Thread() { new Thread() {
@Override @Override
public void run() { public void run() {
@@ -111,12 +117,6 @@ public class TcpNioProtocolServer extends ProtocolServer {
if (key.isAcceptable()) { if (key.isAcceptable()) {
try { try {
SocketChannel channel = ((ServerSocketChannel) key.channel()).accept(); SocketChannel channel = ((ServerSocketChannel) key.channel()).accept();
channel.configureBlocking(false);
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024);
createCounter.incrementAndGet(); createCounter.incrementAndGet();
livingCounter.incrementAndGet(); livingCounter.incrementAndGet();
currWorker.addChannel(channel); currWorker.addChannel(channel);
@@ -134,45 +134,91 @@ public class TcpNioProtocolServer extends ProtocolServer {
} }
}.start(); }.start();
try { try {
cdl.await(); cdl.await(3, TimeUnit.SECONDS);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); throw new IOException(e);
} }
} }
@Override @Override
public void close() throws IOException { public void close() throws IOException {
if (!this.running) return; if (!this.running) return;
this.running = false;
serverChannel.close(); serverChannel.close();
acceptSelector.close(); acceptSelector.close();
for (NIOThreadWorker worker : workers) { for (NioThreadWorker worker : workers) {
worker.interrupt(); worker.interrupt();
} }
this.running = false;
} }
class NIOThreadWorker extends Thread { class NioThreadWorker extends Thread {
final Selector selector; final Selector selector;
NIOThreadWorker next; final CountDownLatch cdl;
public NIOThreadWorker() { private final Queue<TcpNioAsyncConnection> connected;
private final CopyOnWriteArrayList<TcpNioAsyncConnection> done;
protected volatile Thread ownerThread;
NioThreadWorker next;
public NioThreadWorker(final CountDownLatch cdl, int idx, int count) {
this.cdl = cdl;
String idxstr = "000000" + idx;
this.setName("NioThreadWorker:" + context.getServerAddress().getPort() + "-" + idxstr.substring(idxstr.length() - ("" + count).length()));
try { try {
this.selector = Selector.open(); this.selector = Selector.open();
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
this.connected = new ArrayBlockingQueue<>(1000000);
this.done = new CopyOnWriteArrayList<>();
} }
public void addChannel(SocketChannel channel) throws IOException { public boolean addChannel(SocketChannel channel) throws IOException {
AsyncConnection conn = new TcpNioAsyncConnection(channel, null, selector, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null); TcpNioAsyncConnection conn = new TcpNioAsyncConnection(channel, null, selector, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null);
context.runAsync(new PrepareRunner(context, conn, null, null)); return connected.add(conn);
}
protected void processConnected() {
TcpNioAsyncConnection schannel;
try {
while ((schannel = connected.poll()) != null) {
SocketChannel channel = schannel.channel;
channel.configureBlocking(false);
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024);
channel.register(selector, SelectionKey.OP_READ).attach(schannel);
}
} catch (IOException e) {
// do nothing
}
synchronized (done) {
for (TcpNioAsyncConnection conn : done) {
if (conn.key != null && conn.key.isValid()) {
conn.key.interestOps(SelectionKey.OP_WRITE);
}
}
done.clear();
}
}
public boolean isSameThread() {
return this.ownerThread == Thread.currentThread();
} }
@Override @Override
public void run() { public void run() {
this.ownerThread = Thread.currentThread();
if (cdl != null) cdl.countDown();
while (running) { while (running) {
processConnected();
try { try {
selector.select(50); selector.select(50);
} catch (IOException e) { } catch (IOException e) {
@@ -207,13 +253,28 @@ public class TcpNioProtocolServer extends ProtocolServer {
return; return;
} }
if (conn == null) return; if (conn == null) return;
if (key.isWritable()) { if (key.isReadable()) {
if (conn.writeHandler != null) writeOP(key, socket, conn);
} else if (key.isReadable()) {
if (conn.readHandler != null) readOP(key, socket, conn); if (conn.readHandler != null) readOP(key, socket, conn);
} else if (key.isWritable()) {
if (conn.writeHandler != null) writeOP(key, socket, conn);
} }
} }
private void closeOP(SelectionKey key) {
if (key == null) return;
TcpNioAsyncConnection conn = (TcpNioAsyncConnection) key.attachment();
try {
if (key.isValid()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.close();
key.attach(null);
key.cancel();
}
} catch (IOException e) {
}
conn.dispose();
}
private void readOP(SelectionKey key, SocketChannel socket, TcpNioAsyncConnection conn) { private void readOP(SelectionKey key, SocketChannel socket, TcpNioAsyncConnection conn) {
final CompletionHandler handler = conn.removeReadHandler(); final CompletionHandler handler = conn.removeReadHandler();
final ByteBuffer buffer = conn.removeReadBuffer(); final ByteBuffer buffer = conn.removeReadBuffer();