From 8416826827eaedb3e7e5c36adf429842eb4a47d4 Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Wed, 9 May 2018 15:53:34 +0800 Subject: [PATCH] --- src/org/redkale/net/AsyncConnection.java | 171 +++++++++--- src/org/redkale/net/ProtocolServer.java | 330 +++++++++++++++-------- 2 files changed, 356 insertions(+), 145 deletions(-) diff --git a/src/org/redkale/net/AsyncConnection.java b/src/org/redkale/net/AsyncConnection.java index 241213873..d9db00dec 100644 --- a/src/org/redkale/net/AsyncConnection.java +++ b/src/org/redkale/net/AsyncConnection.java @@ -219,20 +219,44 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl return future; } - private static class NIOTCPAsyncConnection extends AsyncConnection { + static class NIOTCPAsyncConnection extends AsyncConnection { private int readTimeoutSeconds; private int writeTimeoutSeconds; + private final Selector selector; + + private SelectionKey key; + private final SocketChannel channel; private final SocketAddress remoteAddress; + ByteBuffer readBuffer; + + Object readAttachment; + + CompletionHandler readHandler; + + ByteBuffer writeOneBuffer; + + ByteBuffer[] writeBuffers; + + int writeOffset; + + int writeLength; + + Object writeAttachment; + + CompletionHandler writeHandler; + public NIOTCPAsyncConnection(final SocketChannel ch, SocketAddress addr0, + final Selector selector, final int readTimeoutSeconds0, final int writeTimeoutSeconds0, final AtomicLong livingCounter, final AtomicLong closedCounter) { this.channel = ch; + this.selector = selector; this.readTimeoutSeconds = readTimeoutSeconds0; this.writeTimeoutSeconds = writeTimeoutSeconds0; SocketAddress addr = addr0; @@ -282,25 +306,64 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl } } - @Override - public void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler) { - try { - int rs = (int) channel.write(srcs, offset, length); - this.writetime = System.currentTimeMillis(); - if (handler != null) handler.completed(rs, attachment); - } catch (Exception e) { - if (handler != null) handler.failed(e, attachment); - } + void completeRead(int rs) { + Object attach = this.readAttachment; + CompletionHandler handler = this.readHandler; + this.readBuffer = null; + this.readAttachment = null; + this.readHandler = null; + handler.completed(rs, attach); + } + + void faileRead(Throwable t) { + Object attach = this.readAttachment; + CompletionHandler handler = this.readHandler; + this.readBuffer = null; + this.readAttachment = null; + this.readHandler = null; + handler.failed(t, attach); + } + + void completeWrite(int rs) { + Object attach = this.writeAttachment; + CompletionHandler handler = this.writeHandler; + this.writeOneBuffer = null; + this.writeBuffers = null; + this.writeOffset = 0; + this.writeLength = 0; + this.writeAttachment = null; + this.writeHandler = null; + handler.completed(rs, attach); + } + + void faileWrite(Throwable t) { + Object attach = this.writeAttachment; + CompletionHandler handler = this.writeHandler; + this.writeOneBuffer = null; + this.writeBuffers = null; + this.writeOffset = 0; + this.writeLength = 0; + this.writeAttachment = null; + this.writeHandler = null; + handler.failed(t, attach); } @Override public void read(ByteBuffer dst, A attachment, CompletionHandler handler) { + if (this.readHandler != null) throw new RuntimeException("pending read"); try { - int rs = channel.read(dst); - this.readtime = System.currentTimeMillis(); - if (handler != null) handler.completed(rs, attachment); + this.readBuffer = dst; + this.readAttachment = attachment; + this.readHandler = handler; + if (key == null) { + key = channel.register(selector, SelectionKey.OP_READ); + key.attach(this); + } else { + key.interestOps(SelectionKey.OP_READ); + } + selector.wakeup(); } catch (Exception e) { - if (handler != null) handler.failed(e, attachment); + faileRead(e); } } @@ -311,41 +374,83 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl @Override public Future read(ByteBuffer dst) { + CompletableFuture future = new CompletableFuture(); + read(dst, null, new CompletionHandler() { + @Override + public void completed(Integer result, Void attachment) { + future.complete(result); + } + + @Override + public void failed(Throwable exc, Void attachment) { + future.completeExceptionally(exc); + } + }); + return future; + } + + @Override + public void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler) { + if (this.writeHandler != null) throw new RuntimeException("pending write"); try { - int rs = channel.read(dst); - this.readtime = System.currentTimeMillis(); - return CompletableFuture.completedFuture(rs); + this.writeBuffers = srcs; + this.writeOffset = offset; + this.writeLength = length; + this.writeAttachment = attachment; + this.writeHandler = handler; + if (key == null) { + key = channel.register(selector, SelectionKey.OP_WRITE); + key.attach(this); + } else { + key.interestOps(SelectionKey.OP_WRITE); + } + selector.wakeup(); } catch (Exception e) { - throw new RuntimeException(e); + faileWrite(e); } } @Override public void write(ByteBuffer src, A attachment, CompletionHandler handler) { + if (this.writeHandler != null) throw new RuntimeException("pending write"); try { - int rs = channel.write(src); - this.writetime = System.currentTimeMillis(); - if (handler != null) handler.completed(rs, attachment); + this.writeOneBuffer = src; + this.writeAttachment = attachment; + this.writeHandler = handler; + if (key == null) { + key = channel.register(selector, SelectionKey.OP_WRITE); + key.attach(this); + } else { + key.interestOps(SelectionKey.OP_WRITE); + } + selector.wakeup(); } catch (Exception e) { - if (handler != null) handler.failed(e, attachment); + faileWrite(e); } } @Override public Future write(ByteBuffer src) { - try { - int rs = channel.read(src); - this.writetime = System.currentTimeMillis(); - return CompletableFuture.completedFuture(rs); - } catch (Exception e) { - throw new RuntimeException(e); - } + CompletableFuture future = new CompletableFuture(); + write(src, null, new CompletionHandler() { + @Override + public void completed(Integer result, Void attachment) { + future.complete(result); + } + + @Override + public void failed(Throwable exc, Void attachment) { + future.completeExceptionally(exc); + } + }); + return future; } @Override public final void close() throws IOException { super.close(); channel.close(); + key.cancel(); } @Override @@ -359,15 +464,15 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl } } - public static AsyncConnection create(final SocketChannel ch, SocketAddress addr, + public static AsyncConnection create(final SocketChannel ch, SocketAddress addr, final Selector selector, final int readTimeoutSeconds0, final int writeTimeoutSeconds0) { - return new NIOTCPAsyncConnection(ch, addr, readTimeoutSeconds0, writeTimeoutSeconds0, null, null); + return new NIOTCPAsyncConnection(ch, addr, selector, readTimeoutSeconds0, writeTimeoutSeconds0, null, null); } - public static AsyncConnection create(final SocketChannel ch, SocketAddress addr, + public static AsyncConnection create(final SocketChannel ch, SocketAddress addr, final Selector selector, final int readTimeoutSeconds0, final int writeTimeoutSeconds0, final AtomicLong livingCounter, final AtomicLong closedCounter) { - return new NIOTCPAsyncConnection(ch, addr, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter); + return new NIOTCPAsyncConnection(ch, addr, selector, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter); } private static class BIOUDPAsyncConnection extends AsyncConnection { diff --git a/src/org/redkale/net/ProtocolServer.java b/src/org/redkale/net/ProtocolServer.java index 1698fdfba..ebc725f29 100644 --- a/src/org/redkale/net/ProtocolServer.java +++ b/src/org/redkale/net/ProtocolServer.java @@ -12,6 +12,7 @@ import java.nio.channels.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; +import org.redkale.net.AsyncConnection.NIOTCPAsyncConnection; import org.redkale.util.AnyValue; /** @@ -97,7 +98,7 @@ public abstract class ProtocolServer { return supportTcpKeepAlive; } - private static final class ProtocolUDPServer extends ProtocolServer { + static final class ProtocolUDPServer extends ProtocolServer { private boolean running; @@ -184,117 +185,7 @@ public abstract class ProtocolServer { } } - private static final class ProtocolNIOTCPServer extends ProtocolServer { - - private final Context context; - - private Selector acceptSelector; - - private Selector readSelector; - - private Selector writeSelector; - - private ServerSocketChannel serverChannel; - - private boolean running; - - public ProtocolNIOTCPServer(Context context) { - this.context = context; - } - - @Override - public void open(AnyValue config) throws IOException { - acceptSelector = Selector.open(); - readSelector = Selector.open(); - writeSelector = Selector.open(); - this.serverChannel = ServerSocketChannel.open(); - serverChannel.configureBlocking(false); - ServerSocket socket = serverChannel.socket(); - socket.setReceiveBufferSize(16 * 1024); - socket.setReuseAddress(true); - } - - @Override - public void bind(SocketAddress local, int backlog) throws IOException { - this.serverChannel.bind(local, backlog); - } - - @Override - public Set> supportedOptions() { - return this.serverChannel.supportedOptions(); - } - - @Override - public void setOption(SocketOption name, T value) throws IOException { - this.serverChannel.setOption(name, value); - } - - @Override - public void accept() throws IOException { - this.serverChannel.register(acceptSelector, SelectionKey.OP_ACCEPT); - final int readTimeoutSeconds = this.context.readTimeoutSeconds; - final int writeTimeoutSeconds = this.context.writeTimeoutSeconds; - final CountDownLatch cdl = new CountDownLatch(1); - this.running = true; - new Thread() { - @Override - public void run() { - cdl.countDown(); - while (running) { - try { - acceptSelector.select(); - Set selectedKeys = acceptSelector.selectedKeys(); - synchronized (selectedKeys) { - Iterator iter = selectedKeys.iterator(); - while (iter.hasNext()) { - SelectionKey key = (SelectionKey) iter.next(); - iter.remove(); - if (key.isAcceptable()) { - try { - SocketChannel channel = ((ServerSocketChannel) key.channel()).accept(); - channel.setOption(StandardSocketOptions.TCP_NODELAY, true); - channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); - channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); - channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); - channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); - channel.configureBlocking(false); - channel.register(readSelector, SelectionKey.OP_READ); - channel.register(writeSelector, SelectionKey.OP_WRITE); - createCounter.incrementAndGet(); - livingCounter.incrementAndGet(); - AsyncConnection conn = AsyncConnection.create(channel, null, readTimeoutSeconds, writeTimeoutSeconds); - context.runAsync(new PrepareRunner(context, conn, null, null)); - } catch (IOException io) { - io.printStackTrace(); - } - } - } - } - } catch (Throwable t) { - t.printStackTrace(); - } - } - } - }.start(); - try { - cdl.await(); - } catch (Exception e) { - e.printStackTrace(); - } - } - - @Override - public void close() throws IOException { - this.running = false; - serverChannel.close(); - acceptSelector.close(); - readSelector.close(); - writeSelector.close(); - } - - } - - private static final class ProtocolAIOTCPServer extends ProtocolServer { + static final class ProtocolAIOTCPServer extends ProtocolServer { private final Context context; @@ -399,4 +290,219 @@ public abstract class ProtocolServer { } + static final class ProtocolNIOTCPServer extends ProtocolServer { + + private final Context context; + + private Selector acceptSelector; + + private ServerSocketChannel serverChannel; + + private NIOThreadWorker[] workers; + + private NIOThreadWorker currWorker; + + private boolean running; + + public ProtocolNIOTCPServer(Context context) { + this.context = context; + } + + @Override + public void open(AnyValue config) throws IOException { + acceptSelector = Selector.open(); + this.serverChannel = ServerSocketChannel.open(); + serverChannel.configureBlocking(false); + ServerSocket socket = serverChannel.socket(); + socket.setReceiveBufferSize(16 * 1024); + socket.setReuseAddress(true); + } + + @Override + public void bind(SocketAddress local, int backlog) throws IOException { + this.serverChannel.bind(local, backlog); + } + + @Override + public Set> supportedOptions() { + return this.serverChannel.supportedOptions(); + } + + @Override + public void setOption(SocketOption name, T value) throws IOException { + this.serverChannel.setOption(name, value); + } + + @Override + public void accept() throws IOException { + this.serverChannel.register(acceptSelector, SelectionKey.OP_ACCEPT); + final CountDownLatch cdl = new CountDownLatch(1); + this.running = true; + this.workers = new NIOThreadWorker[Runtime.getRuntime().availableProcessors()]; + for (int i = 0; i < workers.length; i++) { + workers[i] = new NIOThreadWorker(); + workers[i].setDaemon(true); + workers[i].start(); + } + for (int i = 0; i < workers.length - 1; i++) { //构成环形 + workers[i].next = workers[i + 1]; + } + workers[workers.length - 1].next = workers[0]; + currWorker = workers[0]; + new Thread() { + @Override + public void run() { + cdl.countDown(); + while (running) { + try { + acceptSelector.select(); + Set selectedKeys = acceptSelector.selectedKeys(); + synchronized (selectedKeys) { + Iterator iter = selectedKeys.iterator(); + while (iter.hasNext()) { + SelectionKey key = (SelectionKey) iter.next(); + iter.remove(); + if (key.isAcceptable()) { + try { + SocketChannel channel = ((ServerSocketChannel) key.channel()).accept(); + channel.configureBlocking(false); + channel.setOption(StandardSocketOptions.TCP_NODELAY, true); + channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); + channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); + channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); + channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); + createCounter.incrementAndGet(); + livingCounter.incrementAndGet(); + currWorker.addChannel(channel); + currWorker = currWorker.next; + } catch (IOException io) { + io.printStackTrace(); + } + } + } + } + } catch (Throwable t) { + t.printStackTrace(); + } + } + } + }.start(); + try { + cdl.await(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public void close() throws IOException { + if (!this.running) return; + this.running = false; + serverChannel.close(); + acceptSelector.close(); + for (NIOThreadWorker worker : workers) { + worker.interrupt(); + } + } + + class NIOThreadWorker extends Thread { + + final Selector selector; + + NIOThreadWorker next; + + public NIOThreadWorker() { + try { + this.selector = Selector.open(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public void addChannel(SocketChannel channel) throws IOException { + AsyncConnection conn = AsyncConnection.create(channel, null, this.selector, 0, 0); + context.runAsync(new PrepareRunner(context, conn, null, null)); + } + + @Override + public void run() { + while (running) { + try { + selector.select(50); + } catch (IOException e) { + e.printStackTrace(); + } + try { + Set selectedKeys = selector.selectedKeys(); + synchronized (selectedKeys) { + Iterator iter = selectedKeys.iterator(); + while (iter.hasNext()) { + SelectionKey key = (SelectionKey) iter.next(); + iter.remove(); + processKey(key); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + private void processKey(SelectionKey key) { + if (key == null || !key.isValid()) return; + SocketChannel socket = (SocketChannel) key.channel(); + NIOTCPAsyncConnection conn = (NIOTCPAsyncConnection) key.attachment(); + if (key.isReadable()) { + if (conn.readHandler != null) readOP(key, socket, conn); + } else if (key.isWritable()) { + if (conn.writeHandler != null) writeOP(key, socket, conn); + } + } + + private void readOP(SelectionKey key, SocketChannel socket, NIOTCPAsyncConnection conn) { + try { + final int rs = socket.read(conn.readBuffer); + key.interestOps(SelectionKey.OP_CONNECT); + if (rs <= 0) return; + context.runAsync(() -> conn.completeRead(rs)); + } catch (Throwable t) { + context.runAsync(() -> conn.faileRead(t)); + } + } + + private void writeOP(SelectionKey key, SocketChannel socket, NIOTCPAsyncConnection conn) { + try { + int rs = 0; + if (conn.writeOneBuffer == null) { + final ByteBuffer[] buffers = conn.writeBuffers; + int offset = conn.writeOffset; + int length = conn.writeLength; + for (;;) { + rs += socket.write(buffers, offset, length); + boolean over = true; + int end = offset + length; + for (int i = offset; i < end; i++) { + if (buffers[i].hasRemaining()) { + over = false; + length -= i - offset; + offset = i; + } + } + if (over) break; + } + } else { + final ByteBuffer buffer = conn.writeOneBuffer; + while (buffer.hasRemaining()) rs += socket.write(buffer); + } + key.interestOps(SelectionKey.OP_CONNECT); + final int rs0 = rs; + context.runAsync(() -> conn.completeWrite(rs0)); + } catch (Throwable t) { + context.runAsync(() -> conn.faileWrite(t)); + } + } + + } + } + }