This commit is contained in:
Redkale
2018-05-09 15:53:34 +08:00
parent 0a296ee857
commit 8416826827
2 changed files with 356 additions and 145 deletions

View File

@@ -219,20 +219,44 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
return future;
}
private static class NIOTCPAsyncConnection extends AsyncConnection {
static class NIOTCPAsyncConnection extends AsyncConnection {
private int readTimeoutSeconds;
private int writeTimeoutSeconds;
private final Selector selector;
private SelectionKey key;
private final SocketChannel channel;
private final SocketAddress remoteAddress;
ByteBuffer readBuffer;
Object readAttachment;
CompletionHandler readHandler;
ByteBuffer writeOneBuffer;
ByteBuffer[] writeBuffers;
int writeOffset;
int writeLength;
Object writeAttachment;
CompletionHandler writeHandler;
public NIOTCPAsyncConnection(final SocketChannel ch, SocketAddress addr0,
final Selector selector,
final int readTimeoutSeconds0, final int writeTimeoutSeconds0,
final AtomicLong livingCounter, final AtomicLong closedCounter) {
this.channel = ch;
this.selector = selector;
this.readTimeoutSeconds = readTimeoutSeconds0;
this.writeTimeoutSeconds = writeTimeoutSeconds0;
SocketAddress addr = addr0;
@@ -282,25 +306,64 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
}
}
@Override
public <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) {
try {
int rs = (int) channel.write(srcs, offset, length);
this.writetime = System.currentTimeMillis();
if (handler != null) handler.completed(rs, attachment);
} catch (Exception e) {
if (handler != null) handler.failed(e, attachment);
}
void completeRead(int rs) {
Object attach = this.readAttachment;
CompletionHandler handler = this.readHandler;
this.readBuffer = null;
this.readAttachment = null;
this.readHandler = null;
handler.completed(rs, attach);
}
void faileRead(Throwable t) {
Object attach = this.readAttachment;
CompletionHandler handler = this.readHandler;
this.readBuffer = null;
this.readAttachment = null;
this.readHandler = null;
handler.failed(t, attach);
}
void completeWrite(int rs) {
Object attach = this.writeAttachment;
CompletionHandler handler = this.writeHandler;
this.writeOneBuffer = null;
this.writeBuffers = null;
this.writeOffset = 0;
this.writeLength = 0;
this.writeAttachment = null;
this.writeHandler = null;
handler.completed(rs, attach);
}
void faileWrite(Throwable t) {
Object attach = this.writeAttachment;
CompletionHandler handler = this.writeHandler;
this.writeOneBuffer = null;
this.writeBuffers = null;
this.writeOffset = 0;
this.writeLength = 0;
this.writeAttachment = null;
this.writeHandler = null;
handler.failed(t, attach);
}
@Override
public <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) {
if (this.readHandler != null) throw new RuntimeException("pending read");
try {
int rs = channel.read(dst);
this.readtime = System.currentTimeMillis();
if (handler != null) handler.completed(rs, attachment);
this.readBuffer = dst;
this.readAttachment = attachment;
this.readHandler = handler;
if (key == null) {
key = channel.register(selector, SelectionKey.OP_READ);
key.attach(this);
} else {
key.interestOps(SelectionKey.OP_READ);
}
selector.wakeup();
} catch (Exception e) {
if (handler != null) handler.failed(e, attachment);
faileRead(e);
}
}
@@ -311,41 +374,83 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
@Override
public Future<Integer> read(ByteBuffer dst) {
CompletableFuture future = new CompletableFuture();
read(dst, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
future.complete(result);
}
@Override
public void failed(Throwable exc, Void attachment) {
future.completeExceptionally(exc);
}
});
return future;
}
@Override
public <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) {
if (this.writeHandler != null) throw new RuntimeException("pending write");
try {
int rs = channel.read(dst);
this.readtime = System.currentTimeMillis();
return CompletableFuture.completedFuture(rs);
this.writeBuffers = srcs;
this.writeOffset = offset;
this.writeLength = length;
this.writeAttachment = attachment;
this.writeHandler = handler;
if (key == null) {
key = channel.register(selector, SelectionKey.OP_WRITE);
key.attach(this);
} else {
key.interestOps(SelectionKey.OP_WRITE);
}
selector.wakeup();
} catch (Exception e) {
throw new RuntimeException(e);
faileWrite(e);
}
}
@Override
public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
if (this.writeHandler != null) throw new RuntimeException("pending write");
try {
int rs = channel.write(src);
this.writetime = System.currentTimeMillis();
if (handler != null) handler.completed(rs, attachment);
this.writeOneBuffer = src;
this.writeAttachment = attachment;
this.writeHandler = handler;
if (key == null) {
key = channel.register(selector, SelectionKey.OP_WRITE);
key.attach(this);
} else {
key.interestOps(SelectionKey.OP_WRITE);
}
selector.wakeup();
} catch (Exception e) {
if (handler != null) handler.failed(e, attachment);
faileWrite(e);
}
}
@Override
public Future<Integer> write(ByteBuffer src) {
try {
int rs = channel.read(src);
this.writetime = System.currentTimeMillis();
return CompletableFuture.completedFuture(rs);
} catch (Exception e) {
throw new RuntimeException(e);
}
CompletableFuture future = new CompletableFuture();
write(src, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
future.complete(result);
}
@Override
public void failed(Throwable exc, Void attachment) {
future.completeExceptionally(exc);
}
});
return future;
}
@Override
public final void close() throws IOException {
super.close();
channel.close();
key.cancel();
}
@Override
@@ -359,15 +464,15 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
}
}
public static AsyncConnection create(final SocketChannel ch, SocketAddress addr,
public static AsyncConnection create(final SocketChannel ch, SocketAddress addr, final Selector selector,
final int readTimeoutSeconds0, final int writeTimeoutSeconds0) {
return new NIOTCPAsyncConnection(ch, addr, readTimeoutSeconds0, writeTimeoutSeconds0, null, null);
return new NIOTCPAsyncConnection(ch, addr, selector, readTimeoutSeconds0, writeTimeoutSeconds0, null, null);
}
public static AsyncConnection create(final SocketChannel ch, SocketAddress addr,
public static AsyncConnection create(final SocketChannel ch, SocketAddress addr, final Selector selector,
final int readTimeoutSeconds0, final int writeTimeoutSeconds0,
final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new NIOTCPAsyncConnection(ch, addr, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter);
return new NIOTCPAsyncConnection(ch, addr, selector, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter);
}
private static class BIOUDPAsyncConnection extends AsyncConnection {

View File

@@ -12,6 +12,7 @@ import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import org.redkale.net.AsyncConnection.NIOTCPAsyncConnection;
import org.redkale.util.AnyValue;
/**
@@ -97,7 +98,7 @@ public abstract class ProtocolServer {
return supportTcpKeepAlive;
}
private static final class ProtocolUDPServer extends ProtocolServer {
static final class ProtocolUDPServer extends ProtocolServer {
private boolean running;
@@ -184,117 +185,7 @@ public abstract class ProtocolServer {
}
}
private static final class ProtocolNIOTCPServer extends ProtocolServer {
private final Context context;
private Selector acceptSelector;
private Selector readSelector;
private Selector writeSelector;
private ServerSocketChannel serverChannel;
private boolean running;
public ProtocolNIOTCPServer(Context context) {
this.context = context;
}
@Override
public void open(AnyValue config) throws IOException {
acceptSelector = Selector.open();
readSelector = Selector.open();
writeSelector = Selector.open();
this.serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
ServerSocket socket = serverChannel.socket();
socket.setReceiveBufferSize(16 * 1024);
socket.setReuseAddress(true);
}
@Override
public void bind(SocketAddress local, int backlog) throws IOException {
this.serverChannel.bind(local, backlog);
}
@Override
public <T> Set<SocketOption<?>> supportedOptions() {
return this.serverChannel.supportedOptions();
}
@Override
public <T> void setOption(SocketOption<T> name, T value) throws IOException {
this.serverChannel.setOption(name, value);
}
@Override
public void accept() throws IOException {
this.serverChannel.register(acceptSelector, SelectionKey.OP_ACCEPT);
final int readTimeoutSeconds = this.context.readTimeoutSeconds;
final int writeTimeoutSeconds = this.context.writeTimeoutSeconds;
final CountDownLatch cdl = new CountDownLatch(1);
this.running = true;
new Thread() {
@Override
public void run() {
cdl.countDown();
while (running) {
try {
acceptSelector.select();
Set<SelectionKey> selectedKeys = acceptSelector.selectedKeys();
synchronized (selectedKeys) {
Iterator<?> iter = selectedKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = (SelectionKey) iter.next();
iter.remove();
if (key.isAcceptable()) {
try {
SocketChannel channel = ((ServerSocketChannel) key.channel()).accept();
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024);
channel.configureBlocking(false);
channel.register(readSelector, SelectionKey.OP_READ);
channel.register(writeSelector, SelectionKey.OP_WRITE);
createCounter.incrementAndGet();
livingCounter.incrementAndGet();
AsyncConnection conn = AsyncConnection.create(channel, null, readTimeoutSeconds, writeTimeoutSeconds);
context.runAsync(new PrepareRunner(context, conn, null, null));
} catch (IOException io) {
io.printStackTrace();
}
}
}
}
} catch (Throwable t) {
t.printStackTrace();
}
}
}
}.start();
try {
cdl.await();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void close() throws IOException {
this.running = false;
serverChannel.close();
acceptSelector.close();
readSelector.close();
writeSelector.close();
}
}
private static final class ProtocolAIOTCPServer extends ProtocolServer {
static final class ProtocolAIOTCPServer extends ProtocolServer {
private final Context context;
@@ -399,4 +290,219 @@ public abstract class ProtocolServer {
}
static final class ProtocolNIOTCPServer extends ProtocolServer {
private final Context context;
private Selector acceptSelector;
private ServerSocketChannel serverChannel;
private NIOThreadWorker[] workers;
private NIOThreadWorker currWorker;
private boolean running;
public ProtocolNIOTCPServer(Context context) {
this.context = context;
}
@Override
public void open(AnyValue config) throws IOException {
acceptSelector = Selector.open();
this.serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
ServerSocket socket = serverChannel.socket();
socket.setReceiveBufferSize(16 * 1024);
socket.setReuseAddress(true);
}
@Override
public void bind(SocketAddress local, int backlog) throws IOException {
this.serverChannel.bind(local, backlog);
}
@Override
public <T> Set<SocketOption<?>> supportedOptions() {
return this.serverChannel.supportedOptions();
}
@Override
public <T> void setOption(SocketOption<T> name, T value) throws IOException {
this.serverChannel.setOption(name, value);
}
@Override
public void accept() throws IOException {
this.serverChannel.register(acceptSelector, SelectionKey.OP_ACCEPT);
final CountDownLatch cdl = new CountDownLatch(1);
this.running = true;
this.workers = new NIOThreadWorker[Runtime.getRuntime().availableProcessors()];
for (int i = 0; i < workers.length; i++) {
workers[i] = new NIOThreadWorker();
workers[i].setDaemon(true);
workers[i].start();
}
for (int i = 0; i < workers.length - 1; i++) { //构成环形
workers[i].next = workers[i + 1];
}
workers[workers.length - 1].next = workers[0];
currWorker = workers[0];
new Thread() {
@Override
public void run() {
cdl.countDown();
while (running) {
try {
acceptSelector.select();
Set<SelectionKey> selectedKeys = acceptSelector.selectedKeys();
synchronized (selectedKeys) {
Iterator<?> iter = selectedKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = (SelectionKey) iter.next();
iter.remove();
if (key.isAcceptable()) {
try {
SocketChannel channel = ((ServerSocketChannel) key.channel()).accept();
channel.configureBlocking(false);
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024);
createCounter.incrementAndGet();
livingCounter.incrementAndGet();
currWorker.addChannel(channel);
currWorker = currWorker.next;
} catch (IOException io) {
io.printStackTrace();
}
}
}
}
} catch (Throwable t) {
t.printStackTrace();
}
}
}
}.start();
try {
cdl.await();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void close() throws IOException {
if (!this.running) return;
this.running = false;
serverChannel.close();
acceptSelector.close();
for (NIOThreadWorker worker : workers) {
worker.interrupt();
}
}
class NIOThreadWorker extends Thread {
final Selector selector;
NIOThreadWorker next;
public NIOThreadWorker() {
try {
this.selector = Selector.open();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void addChannel(SocketChannel channel) throws IOException {
AsyncConnection conn = AsyncConnection.create(channel, null, this.selector, 0, 0);
context.runAsync(new PrepareRunner(context, conn, null, null));
}
@Override
public void run() {
while (running) {
try {
selector.select(50);
} catch (IOException e) {
e.printStackTrace();
}
try {
Set<SelectionKey> selectedKeys = selector.selectedKeys();
synchronized (selectedKeys) {
Iterator<?> iter = selectedKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = (SelectionKey) iter.next();
iter.remove();
processKey(key);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void processKey(SelectionKey key) {
if (key == null || !key.isValid()) return;
SocketChannel socket = (SocketChannel) key.channel();
NIOTCPAsyncConnection conn = (NIOTCPAsyncConnection) key.attachment();
if (key.isReadable()) {
if (conn.readHandler != null) readOP(key, socket, conn);
} else if (key.isWritable()) {
if (conn.writeHandler != null) writeOP(key, socket, conn);
}
}
private void readOP(SelectionKey key, SocketChannel socket, NIOTCPAsyncConnection conn) {
try {
final int rs = socket.read(conn.readBuffer);
key.interestOps(SelectionKey.OP_CONNECT);
if (rs <= 0) return;
context.runAsync(() -> conn.completeRead(rs));
} catch (Throwable t) {
context.runAsync(() -> conn.faileRead(t));
}
}
private void writeOP(SelectionKey key, SocketChannel socket, NIOTCPAsyncConnection conn) {
try {
int rs = 0;
if (conn.writeOneBuffer == null) {
final ByteBuffer[] buffers = conn.writeBuffers;
int offset = conn.writeOffset;
int length = conn.writeLength;
for (;;) {
rs += socket.write(buffers, offset, length);
boolean over = true;
int end = offset + length;
for (int i = offset; i < end; i++) {
if (buffers[i].hasRemaining()) {
over = false;
length -= i - offset;
offset = i;
}
}
if (over) break;
}
} else {
final ByteBuffer buffer = conn.writeOneBuffer;
while (buffer.hasRemaining()) rs += socket.write(buffer);
}
key.interestOps(SelectionKey.OP_CONNECT);
final int rs0 = rs;
context.runAsync(() -> conn.completeWrite(rs0));
} catch (Throwable t) {
context.runAsync(() -> conn.faileWrite(t));
}
}
}
}
}