From dd444e3a0f4f28485af0d3ca867d73272bba38b9 Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Mon, 28 May 2018 22:13:50 +0800 Subject: [PATCH] --- src/META-INF/application-template.xml | 1 + src/org/redkale/boot/NodeServer.java | 1 + src/org/redkale/net/AsyncConnection.java | 802 +----------------- src/org/redkale/net/ProtocolServer.java | 541 +----------- src/org/redkale/net/Server.java | 13 +- .../redkale/net/TcpAioAsyncConnection.java | 158 ++++ src/org/redkale/net/TcpAioProtocolServer.java | 140 +++ .../redkale/net/TcpBioAsyncConnection.java | 173 ++++ .../redkale/net/TcpNioAsyncConnection.java | 329 +++++++ src/org/redkale/net/TcpNioProtocolServer.java | 309 +++++++ .../redkale/net/UdpBioAsyncConnection.java | 167 ++++ src/org/redkale/net/UdpBioProtocolServer.java | 123 +++ 12 files changed, 1467 insertions(+), 1290 deletions(-) create mode 100644 src/org/redkale/net/TcpAioAsyncConnection.java create mode 100644 src/org/redkale/net/TcpAioProtocolServer.java create mode 100644 src/org/redkale/net/TcpBioAsyncConnection.java create mode 100644 src/org/redkale/net/TcpNioAsyncConnection.java create mode 100644 src/org/redkale/net/TcpNioProtocolServer.java create mode 100644 src/org/redkale/net/UdpBioAsyncConnection.java create mode 100644 src/org/redkale/net/UdpBioProtocolServer.java diff --git a/src/META-INF/application-template.xml b/src/META-INF/application-template.xml index 6cbd57674..6d5b00c84 100644 --- a/src/META-INF/application-template.xml +++ b/src/META-INF/application-template.xml @@ -125,6 +125,7 @@ aliveTimeoutSeconds: KeepAlive读操作超时秒数, 默认30, 0表示永久不超时; -1表示禁止KeepAlive readTimeoutSeconds: 读操作超时秒数, 默认0, 表示永久不超时 writeTimeoutSeconds: 写操作超时秒数, 默认0, 表示永久不超时 + netimpl: ProtocolServer的实现类。TCP情况下值也可以是aio或nio,默认值为aio;UDP情况下值也可以是bio,默认值为bio; interceptor: 启动/关闭NodeServer时被调用的拦截器实现类,必须是org.redkale.boot.NodeInterceptor的子类,默认为null --> diff --git a/src/org/redkale/boot/NodeServer.java b/src/org/redkale/boot/NodeServer.java index 09e45b5a5..f698fb912 100644 --- a/src/org/redkale/boot/NodeServer.java +++ b/src/org/redkale/boot/NodeServer.java @@ -95,6 +95,7 @@ public abstract class NodeServer { this.serverClassLoader = new RedkaleClassLoader(application.getServerClassLoader()); Thread.currentThread().setContextClassLoader(this.serverClassLoader); this.serverThread = Thread.currentThread(); + this.server.setServerClassLoader(serverClassLoader); } public static NodeServer create(Class clazz, Application application, AnyValue serconf) { diff --git a/src/org/redkale/net/AsyncConnection.java b/src/org/redkale/net/AsyncConnection.java index 55a81c846..95d0e44c9 100644 --- a/src/org/redkale/net/AsyncConnection.java +++ b/src/org/redkale/net/AsyncConnection.java @@ -223,641 +223,6 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl return future; } - static class AsyncNIOTCPConnection extends AsyncConnection { - - private int readTimeoutSeconds; - - private int writeTimeoutSeconds; - - private final Selector selector; - - private SelectionKey key; - - private final SocketChannel channel; - - private final SocketAddress remoteAddress; - - ByteBuffer readBuffer; - - Object readAttachment; - - CompletionHandler readHandler; - - ByteBuffer writeOneBuffer; - - ByteBuffer[] writeBuffers; - - int writingCount; - - int writeOffset; - - int writeLength; - - Object writeAttachment; - - CompletionHandler writeHandler; - - public AsyncNIOTCPConnection(final SocketChannel ch, SocketAddress addr0, - final Selector selector, - final int readTimeoutSeconds0, final int writeTimeoutSeconds0, - final AtomicLong livingCounter, final AtomicLong closedCounter) { - this.channel = ch; - this.selector = selector; - this.readTimeoutSeconds = readTimeoutSeconds0; - this.writeTimeoutSeconds = writeTimeoutSeconds0; - SocketAddress addr = addr0; - if (addr == null) { - try { - addr = ch.getRemoteAddress(); - } catch (Exception e) { - //do nothing - } - } - this.remoteAddress = addr; - this.livingCounter = livingCounter; - this.closedCounter = closedCounter; - } - - @Override - public void setReadTimeoutSeconds(int readTimeoutSeconds) { - this.readTimeoutSeconds = readTimeoutSeconds; - } - - @Override - public void setWriteTimeoutSeconds(int writeTimeoutSeconds) { - this.writeTimeoutSeconds = writeTimeoutSeconds; - } - - @Override - public int getReadTimeoutSeconds() { - return this.readTimeoutSeconds; - } - - @Override - public int getWriteTimeoutSeconds() { - return this.writeTimeoutSeconds; - } - - @Override - public final SocketAddress getRemoteAddress() { - return remoteAddress; - } - - @Override - public SocketAddress getLocalAddress() { - try { - return channel.getLocalAddress(); - } catch (IOException e) { - return null; - } - } - - CompletionHandler removeReadHandler() { - CompletionHandler handler = this.readHandler; - this.readHandler = null; - return handler; - } - - ByteBuffer removeReadBuffer() { - ByteBuffer buffer = this.readBuffer; - this.readBuffer = null; - return buffer; - } - - Object removeReadAttachment() { - Object attach = this.readAttachment; - this.readAttachment = null; - return attach; - } - - void completeRead(int rs) { - Object attach = this.readAttachment; - CompletionHandler handler = this.readHandler; - this.readBuffer = null; - this.readAttachment = null; - this.readHandler = null; - handler.completed(rs, attach); - } - - void faileRead(Throwable t) { - Object attach = this.readAttachment; - CompletionHandler handler = this.readHandler; - this.readBuffer = null; - this.readAttachment = null; - this.readHandler = null; - handler.failed(t, attach); - } - - CompletionHandler removeWriteHandler() { - CompletionHandler handler = this.writeHandler; - this.writeHandler = null; - return handler; - } - - ByteBuffer removeWriteOneBuffer() { - ByteBuffer buffer = this.writeOneBuffer; - this.writeOneBuffer = null; - return buffer; - } - - ByteBuffer[] removeWriteBuffers() { - ByteBuffer[] buffers = this.writeBuffers; - this.writeBuffers = null; - return buffers; - } - - int removeWritingCount() { - int rs = this.writingCount; - this.writingCount = 0; - return rs; - } - - int removeWriteOffset() { - int rs = this.writeOffset; - this.writeOffset = 0; - return rs; - } - - int removeWriteLength() { - int rs = this.writeLength; - this.writeLength = 0; - return rs; - } - - Object removeWriteAttachment() { - Object attach = this.writeAttachment; - this.writeAttachment = null; - return attach; - } - - void completeWrite(int rs) { - Object attach = this.writeAttachment; - CompletionHandler handler = this.writeHandler; - this.writeOneBuffer = null; - this.writeBuffers = null; - this.writeOffset = 0; - this.writeLength = 0; - this.writeAttachment = null; - this.writeHandler = null; - handler.completed(rs, attach); - } - - void faileWrite(Throwable t) { - Object attach = this.writeAttachment; - CompletionHandler handler = this.writeHandler; - this.writeOneBuffer = null; - this.writeBuffers = null; - this.writeOffset = 0; - this.writeLength = 0; - this.writeAttachment = null; - this.writeHandler = null; - handler.failed(t, attach); - } - - @Override - public void read(ByteBuffer dst, A attachment, CompletionHandler handler) { - if (this.readHandler != null) throw new RuntimeException("pending read"); - try { - this.readBuffer = dst; - this.readAttachment = attachment; - this.readHandler = handler; - if (key == null) { - key = channel.register(selector, SelectionKey.OP_READ); - key.attach(this); - } else { - key.interestOps(SelectionKey.OP_READ); - } - selector.wakeup(); - } catch (Exception e) { - faileRead(e); - } - } - - @Override - public void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { - read(dst, attachment, handler); - } - - @Override - public Future read(ByteBuffer dst) { - CompletableFuture future = new CompletableFuture(); - read(dst, null, new CompletionHandler() { - @Override - public void completed(Integer result, Void attachment) { - future.complete(result); - } - - @Override - public void failed(Throwable exc, Void attachment) { - future.completeExceptionally(exc); - } - }); - return future; - } - - @Override - public void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler) { - if (this.writeHandler != null) throw new RuntimeException("pending write"); - try { - this.writeBuffers = srcs; - this.writeOffset = offset; - this.writeLength = length; - this.writingCount = 0; - this.writeAttachment = attachment; - this.writeHandler = handler; - if (key == null) { - key = channel.register(selector, SelectionKey.OP_WRITE); - key.attach(this); - } else { - key.interestOps(SelectionKey.OP_WRITE); - } - selector.wakeup(); - } catch (Exception e) { - faileWrite(e); - } - } - - @Override - public void write(ByteBuffer src, A attachment, CompletionHandler handler) { - if (this.writeHandler != null) throw new RuntimeException("pending write"); - try { - this.writeOneBuffer = src; - this.writingCount = 0; - this.writeAttachment = attachment; - this.writeHandler = handler; - if (key == null) { - key = channel.register(selector, SelectionKey.OP_WRITE); - key.attach(this); - } else { - key.interestOps(SelectionKey.OP_WRITE); - } - selector.wakeup(); - } catch (Exception e) { - faileWrite(e); - } - } - - @Override - public Future write(ByteBuffer src) { - CompletableFuture future = new CompletableFuture(); - write(src, null, new CompletionHandler() { - @Override - public void completed(Integer result, Void attachment) { - future.complete(result); - } - - @Override - public void failed(Throwable exc, Void attachment) { - future.completeExceptionally(exc); - } - }); - return future; - } - - @Override - public final void close() throws IOException { - super.close(); - channel.close(); - key.cancel(); - } - - @Override - public final boolean isOpen() { - return channel.isOpen(); - } - - @Override - public final boolean isTCP() { - return true; - } - } - - public static AsyncConnection create(final SocketChannel ch, SocketAddress addr, final Selector selector, - final int readTimeoutSeconds0, final int writeTimeoutSeconds0) { - return new AsyncNIOTCPConnection(ch, addr, selector, readTimeoutSeconds0, writeTimeoutSeconds0, null, null); - } - - public static AsyncConnection create(final SocketChannel ch, final SocketAddress addr0, final Selector selector, final Context context) { - return new AsyncNIOTCPConnection(ch, addr0, selector, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null); - } - - public static AsyncConnection create(final SocketChannel ch, SocketAddress addr, final Selector selector, - final int readTimeoutSeconds0, final int writeTimeoutSeconds0, - final AtomicLong livingCounter, final AtomicLong closedCounter) { - return new AsyncNIOTCPConnection(ch, addr, selector, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter); - } - - private static class AsyncBIOUDPConnection extends AsyncConnection { - - private int readTimeoutSeconds; - - private int writeTimeoutSeconds; - - private final DatagramChannel channel; - - private final SocketAddress remoteAddress; - - private final boolean client; - - public AsyncBIOUDPConnection(final DatagramChannel ch, SocketAddress addr0, - final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0, - final AtomicLong livingCounter, final AtomicLong closedCounter) { - this.channel = ch; - this.client = client0; - this.readTimeoutSeconds = readTimeoutSeconds0; - this.writeTimeoutSeconds = writeTimeoutSeconds0; - SocketAddress addr = addr0; - if (addr == null) { - try { - addr = ch.getRemoteAddress(); - } catch (Exception e) { - //do nothing - } - } - this.remoteAddress = addr; - this.livingCounter = livingCounter; - this.closedCounter = closedCounter; - } - - @Override - public void setReadTimeoutSeconds(int readTimeoutSeconds) { - this.readTimeoutSeconds = readTimeoutSeconds; - } - - @Override - public void setWriteTimeoutSeconds(int writeTimeoutSeconds) { - this.writeTimeoutSeconds = writeTimeoutSeconds; - } - - @Override - public int getReadTimeoutSeconds() { - return this.readTimeoutSeconds; - } - - @Override - public int getWriteTimeoutSeconds() { - return this.writeTimeoutSeconds; - } - - @Override - public final SocketAddress getRemoteAddress() { - return remoteAddress; - } - - @Override - public SocketAddress getLocalAddress() { - try { - return channel.getLocalAddress(); - } catch (IOException e) { - return null; - } - } - - @Override - public void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler) { - try { - int rs = 0; - for (int i = offset; i < offset + length; i++) { - rs += channel.send(srcs[i], remoteAddress); - if (i != offset) Thread.sleep(10); - } - this.writetime = System.currentTimeMillis(); - if (handler != null) handler.completed(rs, attachment); - } catch (Exception e) { - if (handler != null) handler.failed(e, attachment); - } - } - - @Override - public void read(ByteBuffer dst, A attachment, CompletionHandler handler) { - try { - int rs = channel.read(dst); - this.readtime = System.currentTimeMillis(); - if (handler != null) handler.completed(rs, attachment); - } catch (IOException e) { - if (handler != null) handler.failed(e, attachment); - } - } - - @Override - public void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { - read(dst, attachment, handler); - } - - @Override - public Future read(ByteBuffer dst) { - try { - int rs = channel.read(dst); - this.readtime = System.currentTimeMillis(); - return CompletableFuture.completedFuture(rs); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void write(ByteBuffer src, A attachment, CompletionHandler handler) { - try { - int rs = channel.send(src, remoteAddress); - this.writetime = System.currentTimeMillis(); - if (handler != null) handler.completed(rs, attachment); - } catch (IOException e) { - if (handler != null) handler.failed(e, attachment); - } - } - - @Override - public Future write(ByteBuffer src) { - try { - int rs = channel.send(src, remoteAddress); - this.writetime = System.currentTimeMillis(); - return CompletableFuture.completedFuture(rs); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public final void close() throws IOException { - super.close(); - if (client) channel.close(); - } - - @Override - public final boolean isOpen() { - return channel.isOpen(); - } - - @Override - public final boolean isTCP() { - return false; - } - } - - public static AsyncConnection create(final DatagramChannel ch, SocketAddress addr, - final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0) { - return new AsyncBIOUDPConnection(ch, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, null, null); - } - - public static AsyncConnection create(final DatagramChannel ch, SocketAddress addr, - final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0, - final AtomicLong livingCounter, final AtomicLong closedCounter) { - return new AsyncBIOUDPConnection(ch, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter); - } - - private static class AsyncBIOTCPConnection extends AsyncConnection { - - private int readTimeoutSeconds; - - private int writeTimeoutSeconds; - - private final Socket socket; - - private final ReadableByteChannel readChannel; - - private final WritableByteChannel writeChannel; - - private final SocketAddress remoteAddress; - - public AsyncBIOTCPConnection(final Socket socket, final SocketAddress addr0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0, - final AtomicLong livingCounter, final AtomicLong closedCounter) { - this.socket = socket; - ReadableByteChannel rc = null; - WritableByteChannel wc = null; - try { - socket.setSoTimeout(Math.max(readTimeoutSeconds0, writeTimeoutSeconds0)); - rc = Channels.newChannel(socket.getInputStream()); - wc = Channels.newChannel(socket.getOutputStream()); - } catch (IOException e) { - e.printStackTrace(); - } - this.readChannel = rc; - this.writeChannel = wc; - this.readTimeoutSeconds = readTimeoutSeconds0; - this.writeTimeoutSeconds = writeTimeoutSeconds0; - SocketAddress addr = addr0; - if (addr == null) { - try { - addr = socket.getRemoteSocketAddress(); - } catch (Exception e) { - //do nothing - } - } - this.remoteAddress = addr; - this.livingCounter = livingCounter; - this.closedCounter = closedCounter; - } - - @Override - public boolean isTCP() { - return true; - } - - @Override - public SocketAddress getRemoteAddress() { - return remoteAddress; - } - - @Override - public SocketAddress getLocalAddress() { - return socket.getLocalSocketAddress(); - } - - @Override - public int getReadTimeoutSeconds() { - return readTimeoutSeconds; - } - - @Override - public int getWriteTimeoutSeconds() { - return writeTimeoutSeconds; - } - - @Override - public void setReadTimeoutSeconds(int readTimeoutSeconds) { - this.readTimeoutSeconds = readTimeoutSeconds; - } - - @Override - public void setWriteTimeoutSeconds(int writeTimeoutSeconds) { - this.writeTimeoutSeconds = writeTimeoutSeconds; - } - - @Override - public void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler) { - try { - int rs = 0; - for (int i = offset; i < offset + length; i++) { - rs += writeChannel.write(srcs[i]); - } - this.writetime = System.currentTimeMillis(); - if (handler != null) handler.completed(rs, attachment); - } catch (IOException e) { - if (handler != null) handler.failed(e, attachment); - } - } - - @Override - public void read(ByteBuffer dst, A attachment, CompletionHandler handler) { - try { - int rs = readChannel.read(dst); - this.readtime = System.currentTimeMillis(); - if (handler != null) handler.completed(rs, attachment); - } catch (IOException e) { - if (handler != null) handler.failed(e, attachment); - } - } - - @Override - public void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { - read(dst, attachment, handler); - } - - @Override - public Future read(ByteBuffer dst) { - try { - int rs = readChannel.read(dst); - this.readtime = System.currentTimeMillis(); - return CompletableFuture.completedFuture(rs); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void write(ByteBuffer src, A attachment, CompletionHandler handler) { - try { - int rs = writeChannel.write(src); - this.writetime = System.currentTimeMillis(); - if (handler != null) handler.completed(rs, attachment); - } catch (IOException e) { - if (handler != null) handler.failed(e, attachment); - } - } - - @Override - public Future write(ByteBuffer src) { - try { - int rs = writeChannel.write(src); - this.writetime = System.currentTimeMillis(); - return CompletableFuture.completedFuture(rs); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void close() throws IOException { - super.close(); - this.socket.close(); - } - - @Override - public boolean isOpen() { - return !socket.isClosed(); - } - } - /** * 通常用于 ssl socket * @@ -870,149 +235,38 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl } public static AsyncConnection create(final Socket socket, final SocketAddress addr0, final int readTimeoutSecond0, final int writeTimeoutSecond0) { - return new AsyncBIOTCPConnection(socket, addr0, readTimeoutSecond0, writeTimeoutSecond0, null, null); + return new TcpBioAsyncConnection(socket, addr0, readTimeoutSecond0, writeTimeoutSecond0, null, null); } public static AsyncConnection create(final Socket socket, final SocketAddress addr0, final int readTimeoutSecond0, final int writeTimeoutSecond0, final AtomicLong livingCounter, final AtomicLong closedCounter) { - return new AsyncBIOTCPConnection(socket, addr0, readTimeoutSecond0, writeTimeoutSecond0, livingCounter, closedCounter); + return new TcpBioAsyncConnection(socket, addr0, readTimeoutSecond0, writeTimeoutSecond0, livingCounter, closedCounter); } - private static class AsyncAIOTCPConnection extends AsyncConnection { + public static AsyncConnection create(final SocketChannel ch, SocketAddress addr, final Selector selector, + final int readTimeoutSeconds0, final int writeTimeoutSeconds0) { + return new TcpNioAsyncConnection(ch, addr, selector, readTimeoutSeconds0, writeTimeoutSeconds0, null, null); + } - private int readTimeoutSeconds; + public static AsyncConnection create(final SocketChannel ch, final SocketAddress addr0, final Selector selector, final Context context) { + return new TcpNioAsyncConnection(ch, addr0, selector, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null); + } - private int writeTimeoutSeconds; + public static AsyncConnection create(final SocketChannel ch, SocketAddress addr, final Selector selector, + final int readTimeoutSeconds0, final int writeTimeoutSeconds0, + final AtomicLong livingCounter, final AtomicLong closedCounter) { + return new TcpNioAsyncConnection(ch, addr, selector, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter); + } - private final AsynchronousSocketChannel channel; - - private final SocketAddress remoteAddress; - - public AsyncAIOTCPConnection(final AsynchronousSocketChannel ch, SSLContext sslContext, - final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds, - final AtomicLong livingCounter, final AtomicLong closedCounter) { - this.channel = ch; - this.sslContext = sslContext; - this.readTimeoutSeconds = readTimeoutSeconds; - this.writeTimeoutSeconds = writeTimeoutSeconds; - SocketAddress addr = addr0; - if (addr == null) { - try { - addr = ch.getRemoteAddress(); - } catch (Exception e) { - //do nothing - } - } - this.remoteAddress = addr; - this.livingCounter = livingCounter; - this.closedCounter = closedCounter; - } - - @Override - public void read(ByteBuffer dst, A attachment, CompletionHandler handler) { - this.readtime = System.currentTimeMillis(); - if (readTimeoutSeconds > 0) { - channel.read(dst, readTimeoutSeconds, TimeUnit.SECONDS, attachment, handler); - } else { - channel.read(dst, attachment, handler); - } - } - - @Override - public void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { - this.readtime = System.currentTimeMillis(); - channel.read(dst, timeout < 0 ? 0 : timeout, unit, attachment, handler); - } - - @Override - public void write(ByteBuffer src, A attachment, CompletionHandler handler) { - this.writetime = System.currentTimeMillis(); - if (writeTimeoutSeconds > 0) { - channel.write(src, writeTimeoutSeconds, TimeUnit.SECONDS, attachment, handler); - } else { - channel.write(src, attachment, handler); - } - } - - @Override - public void write(ByteBuffer[] srcs, int offset, int length, A attachment, final CompletionHandler handler) { - this.writetime = System.currentTimeMillis(); - channel.write(srcs, offset, length, writeTimeoutSeconds > 0 ? writeTimeoutSeconds : 60, TimeUnit.SECONDS, - attachment, new CompletionHandler() { - - @Override - public void completed(Long result, A attachment) { - handler.completed(result.intValue(), attachment); - } - - @Override - public void failed(Throwable exc, A attachment) { - handler.failed(exc, attachment); - } - - }); - } - - @Override - public void setReadTimeoutSeconds(int readTimeoutSeconds) { - this.readTimeoutSeconds = readTimeoutSeconds; - } - - @Override - public void setWriteTimeoutSeconds(int writeTimeoutSeconds) { - this.writeTimeoutSeconds = writeTimeoutSeconds; - } - - @Override - public int getReadTimeoutSeconds() { - return this.readTimeoutSeconds; - } - - @Override - public int getWriteTimeoutSeconds() { - return this.writeTimeoutSeconds; - } - - @Override - public final SocketAddress getRemoteAddress() { - return remoteAddress; - } - - @Override - public SocketAddress getLocalAddress() { - try { - return channel.getLocalAddress(); - } catch (IOException e) { - return null; - } - } - - @Override - public final Future read(ByteBuffer dst) { - return channel.read(dst); - } - - @Override - public final Future write(ByteBuffer src) { - return channel.write(src); - } - - @Override - public final void close() throws IOException { - super.close(); - channel.close(); - } - - @Override - public final boolean isOpen() { - return channel.isOpen(); - } - - @Override - public final boolean isTCP() { - return true; - } + public static AsyncConnection create(final DatagramChannel ch, SocketAddress addr, + final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0) { + return new UdpBioAsyncConnection(ch, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, null, null); + } + public static AsyncConnection create(final DatagramChannel ch, SocketAddress addr, + final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0, + final AtomicLong livingCounter, final AtomicLong closedCounter) { + return new UdpBioAsyncConnection(ch, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter); } public static AsyncConnection create(final AsynchronousSocketChannel ch) { @@ -1020,29 +274,29 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl } public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) { - return new AsyncAIOTCPConnection(ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null); + return new TcpAioAsyncConnection(ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null); } public static AsyncConnection create(final AsynchronousSocketChannel ch, SSLContext sslContext, final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) { - return new AsyncAIOTCPConnection(ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null); + return new TcpAioAsyncConnection(ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null); } public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, final Context context) { - return new AsyncAIOTCPConnection(ch, context.sslContext, addr0, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null); + return new TcpAioAsyncConnection(ch, context.sslContext, addr0, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null); } public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) { - return new AsyncAIOTCPConnection(ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter); + return new TcpAioAsyncConnection(ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter); } public static AsyncConnection create(final AsynchronousSocketChannel ch, SSLContext sslContext, final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) { - return new AsyncAIOTCPConnection(ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter); + return new TcpAioAsyncConnection(ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter); } public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, final Context context, final AtomicLong livingCounter, final AtomicLong closedCounter) { - return new AsyncAIOTCPConnection(ch, context.sslContext, addr0, context.readTimeoutSeconds, context.writeTimeoutSeconds, livingCounter, closedCounter); + return new TcpAioAsyncConnection(ch, context.sslContext, addr0, context.readTimeoutSeconds, context.writeTimeoutSeconds, livingCounter, closedCounter); } } diff --git a/src/org/redkale/net/ProtocolServer.java b/src/org/redkale/net/ProtocolServer.java index f7166abc5..8fbdf5db5 100644 --- a/src/org/redkale/net/ProtocolServer.java +++ b/src/org/redkale/net/ProtocolServer.java @@ -7,12 +7,9 @@ package org.redkale.net; import java.io.IOException; import java.net.*; -import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.*; -import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; -import org.redkale.net.AsyncConnection.AsyncNIOTCPConnection; import org.redkale.util.AnyValue; /** @@ -87,10 +84,32 @@ public abstract class ProtocolServer { } //--------------------------------------------------------------------- - public static ProtocolServer create(String protocol, Context context) { - if ("TCP".equalsIgnoreCase(protocol)) return new ProtocolAIOTCPServer(context); - if ("UDP".equalsIgnoreCase(protocol)) return new ProtocolBIOUDPServer(context); - throw new RuntimeException("ProtocolServer not support protocol " + protocol); + public static ProtocolServer create(String protocol, Context context, ClassLoader classLoader, String netimpl) { + if (netimpl != null) netimpl = netimpl.trim(); + if ("TCP".equalsIgnoreCase(protocol)) { + if (netimpl == null || netimpl.isEmpty()) { + return new TcpAioProtocolServer(context); + } else if ("aio".equalsIgnoreCase(netimpl)) { + return new TcpAioProtocolServer(context); + } else if ("nio".equalsIgnoreCase(netimpl)) { + return new TcpNioProtocolServer(context); + } + } else if ("UDP".equalsIgnoreCase(protocol)) { + if (netimpl == null || netimpl.isEmpty()) { + return new UdpBioProtocolServer(context); + } else if ("bio".equalsIgnoreCase(netimpl)) { + return new UdpBioProtocolServer(context); + } + } else { + throw new RuntimeException("ProtocolServer not support protocol " + protocol); + } + try { + if (classLoader == null) classLoader = Thread.currentThread().getContextClassLoader(); + Class clazz = classLoader.loadClass(netimpl); + return (ProtocolServer) clazz.getDeclaredConstructor(Context.class).newInstance(context); + } catch (Exception e) { + throw new RuntimeException("ProtocolServer(netimple=" + netimpl + ") newinstance error", e); + } } public static boolean supportTcpNoDelay() { @@ -101,512 +120,4 @@ public abstract class ProtocolServer { return supportTcpKeepAlive; } - static final class ProtocolBIOUDPServer extends ProtocolServer { - - private boolean running; - - private DatagramChannel serverChannel; - - public ProtocolBIOUDPServer(Context context) { - super(context); - } - - @Override - public void open(AnyValue config) throws IOException { - DatagramChannel ch = DatagramChannel.open(); - ch.configureBlocking(true); - this.serverChannel = ch; - final Set> options = this.serverChannel.supportedOptions(); - if (options.contains(StandardSocketOptions.TCP_NODELAY)) { - this.serverChannel.setOption(StandardSocketOptions.TCP_NODELAY, true); - } - if (options.contains(StandardSocketOptions.SO_KEEPALIVE)) { - this.serverChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); - } - if (options.contains(StandardSocketOptions.SO_REUSEADDR)) { - this.serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); - } - if (options.contains(StandardSocketOptions.SO_RCVBUF)) { - this.serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); - } - if (options.contains(StandardSocketOptions.SO_SNDBUF)) { - this.serverChannel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); - } - } - - @Override - public void bind(SocketAddress local, int backlog) throws IOException { - this.serverChannel.bind(local); - } - - @Override - public void setOption(SocketOption name, T value) throws IOException { - this.serverChannel.setOption(name, value); - } - - @Override - public Set> supportedOptions() { - return this.serverChannel.supportedOptions(); - } - - @Override - public void accept() throws IOException { - final DatagramChannel serchannel = this.serverChannel; - final int readTimeoutSeconds = this.context.readTimeoutSeconds; - final int writeTimeoutSeconds = this.context.writeTimeoutSeconds; - final CountDownLatch cdl = new CountDownLatch(1); - this.running = true; - new Thread() { - @Override - public void run() { - cdl.countDown(); - while (running) { - final ByteBuffer buffer = context.pollBuffer(); - try { - SocketAddress address = serchannel.receive(buffer); - buffer.flip(); - AsyncConnection conn = AsyncConnection.create(serchannel, address, false, readTimeoutSeconds, writeTimeoutSeconds); - context.runAsync(new PrepareRunner(context, conn, buffer, null)); - } catch (Exception e) { - context.offerBuffer(buffer); - } - } - } - }.start(); - try { - cdl.await(); - } catch (Exception e) { - e.printStackTrace(); - } - } - - @Override - public void close() throws IOException { - this.running = false; - this.serverChannel.close(); - } - - @Override - public long getCreateCount() { - return -1; - } - - @Override - public long getClosedCount() { - return -1; - } - - @Override - public long getLivingCount() { - return -1; - } - } - - static final class ProtocolAIOTCPServer extends ProtocolServer { - - private AsynchronousChannelGroup group; - - private AsynchronousServerSocketChannel serverChannel; - - public ProtocolAIOTCPServer(Context context) { - super(context); - } - - @Override - public void open(AnyValue config) throws IOException { - group = AsynchronousChannelGroup.withCachedThreadPool(context.executor, 1); - this.serverChannel = AsynchronousServerSocketChannel.open(group); - - final Set> options = this.serverChannel.supportedOptions(); - if (options.contains(StandardSocketOptions.TCP_NODELAY)) { - this.serverChannel.setOption(StandardSocketOptions.TCP_NODELAY, true); - } - if (options.contains(StandardSocketOptions.SO_KEEPALIVE)) { - this.serverChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); - } - if (options.contains(StandardSocketOptions.SO_REUSEADDR)) { - this.serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); - } - if (options.contains(StandardSocketOptions.SO_RCVBUF)) { - this.serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); - } - if (options.contains(StandardSocketOptions.SO_SNDBUF)) { - this.serverChannel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); - } - } - - @Override - public void bind(SocketAddress local, int backlog) throws IOException { - this.serverChannel.bind(local, backlog); - } - - @Override - public void setOption(SocketOption name, T value) throws IOException { - this.serverChannel.setOption(name, value); - } - - @Override - public Set> supportedOptions() { - return this.serverChannel.supportedOptions(); - } - - @Override - public void accept() throws IOException { - final AsynchronousServerSocketChannel serchannel = this.serverChannel; - serchannel.accept(null, new CompletionHandler() { - - private boolean supportInited; - - private boolean supportTcpLay; - - private boolean supportAlive; - - private boolean supportReuse; - - private boolean supportRcv; - - private boolean supportSnd; - - @Override - public void completed(final AsynchronousSocketChannel channel, Void attachment) { - serchannel.accept(null, this); - if (maxconns > 0 && livingCounter.get() >= maxconns) { - try { - channel.close(); - } catch (Exception e) { - } - return; - } - createCounter.incrementAndGet(); - livingCounter.incrementAndGet(); - AsyncConnection conn = AsyncConnection.create(channel, null, context); - conn.livingCounter = livingCounter; - conn.closedCounter = closedCounter; - try { - if (!supportInited) { - synchronized (this) { - if (!supportInited) { - supportInited = true; - final Set> options = channel.supportedOptions(); - supportTcpLay = options.contains(StandardSocketOptions.TCP_NODELAY); - supportAlive = options.contains(StandardSocketOptions.SO_KEEPALIVE); - supportReuse = options.contains(StandardSocketOptions.SO_REUSEADDR); - supportRcv = options.contains(StandardSocketOptions.SO_RCVBUF); - supportSnd = options.contains(StandardSocketOptions.SO_SNDBUF); - } - } - } - if (supportTcpLay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true); - if (supportAlive) channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); - if (supportReuse) channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); - if (supportRcv) channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); - if (supportSnd) channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); - } catch (IOException e) { - e.printStackTrace(); - } - context.runAsync(new PrepareRunner(context, conn, null, null)); - } - - @Override - public void failed(Throwable exc, Void attachment) { - serchannel.accept(null, this); - //if (exc != null) context.logger.log(Level.FINEST, AsynchronousServerSocketChannel.class.getSimpleName() + " accept erroneous", exc); - } - }); - } - - @Override - public void close() throws IOException { - this.serverChannel.close(); - } - - } - - static final class ProtocolNIOTCPServer extends ProtocolServer { - - private Selector acceptSelector; - - private ServerSocketChannel serverChannel; - - private NIOThreadWorker[] workers; - - private NIOThreadWorker currWorker; - - private boolean running; - - public ProtocolNIOTCPServer(Context context) { - super(context); - } - - @Override - public void open(AnyValue config) throws IOException { - acceptSelector = Selector.open(); - this.serverChannel = ServerSocketChannel.open(); - serverChannel.configureBlocking(false); - ServerSocket socket = serverChannel.socket(); - socket.setReceiveBufferSize(16 * 1024); - socket.setReuseAddress(true); - - final Set> options = this.serverChannel.supportedOptions(); - if (options.contains(StandardSocketOptions.TCP_NODELAY)) { - this.serverChannel.setOption(StandardSocketOptions.TCP_NODELAY, true); - } - if (options.contains(StandardSocketOptions.SO_KEEPALIVE)) { - this.serverChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); - } - if (options.contains(StandardSocketOptions.SO_REUSEADDR)) { - this.serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); - } - if (options.contains(StandardSocketOptions.SO_RCVBUF)) { - this.serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); - } - if (options.contains(StandardSocketOptions.SO_SNDBUF)) { - this.serverChannel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); - } - } - - @Override - public void bind(SocketAddress local, int backlog) throws IOException { - this.serverChannel.bind(local, backlog); - } - - @Override - public Set> supportedOptions() { - return this.serverChannel.supportedOptions(); - } - - @Override - public void setOption(SocketOption name, T value) throws IOException { - this.serverChannel.setOption(name, value); - } - - @Override - public void accept() throws IOException { - this.serverChannel.register(acceptSelector, SelectionKey.OP_ACCEPT); - final CountDownLatch cdl = new CountDownLatch(1); - this.running = true; - this.workers = new NIOThreadWorker[Runtime.getRuntime().availableProcessors()]; - for (int i = 0; i < workers.length; i++) { - workers[i] = new NIOThreadWorker(); - workers[i].setDaemon(true); - workers[i].start(); - } - for (int i = 0; i < workers.length - 1; i++) { //构成环形 - workers[i].next = workers[i + 1]; - } - workers[workers.length - 1].next = workers[0]; - currWorker = workers[0]; - new Thread() { - @Override - public void run() { - cdl.countDown(); - while (running) { - try { - acceptSelector.select(); - Set selectedKeys = acceptSelector.selectedKeys(); - synchronized (selectedKeys) { - Iterator iter = selectedKeys.iterator(); - while (iter.hasNext()) { - SelectionKey key = (SelectionKey) iter.next(); - iter.remove(); - if (key.isAcceptable()) { - try { - SocketChannel channel = ((ServerSocketChannel) key.channel()).accept(); - channel.configureBlocking(false); - channel.setOption(StandardSocketOptions.TCP_NODELAY, true); - channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); - channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); - channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); - channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); - createCounter.incrementAndGet(); - livingCounter.incrementAndGet(); - currWorker.addChannel(channel); - currWorker = currWorker.next; - } catch (IOException io) { - io.printStackTrace(); - } - } - } - } - } catch (Throwable t) { - t.printStackTrace(); - } - } - } - }.start(); - try { - cdl.await(); - } catch (Exception e) { - e.printStackTrace(); - } - } - - @Override - public void close() throws IOException { - if (!this.running) return; - this.running = false; - serverChannel.close(); - acceptSelector.close(); - for (NIOThreadWorker worker : workers) { - worker.interrupt(); - } - } - - class NIOThreadWorker extends Thread { - - final Selector selector; - - NIOThreadWorker next; - - public NIOThreadWorker() { - try { - this.selector = Selector.open(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public void addChannel(SocketChannel channel) throws IOException { - AsyncConnection conn = AsyncConnection.create(channel, null, this.selector, context); - context.runAsync(new PrepareRunner(context, conn, null, null)); - } - - @Override - public void run() { - while (running) { - try { - selector.select(50); - } catch (IOException e) { - e.printStackTrace(); - } - try { - Set selectedKeys = selector.selectedKeys(); - synchronized (selectedKeys) { - Iterator iter = selectedKeys.iterator(); - while (iter.hasNext()) { - SelectionKey key = (SelectionKey) iter.next(); - iter.remove(); - processKey(key); - } - } - } catch (Exception e) { - e.printStackTrace(); - } - } - } - - private void processKey(SelectionKey key) { - if (key == null || !key.isValid()) return; - SocketChannel socket = (SocketChannel) key.channel(); - AsyncNIOTCPConnection conn = (AsyncNIOTCPConnection) key.attachment(); - if (!socket.isOpen()) { - if (conn == null) { - key.cancel(); - } else { - conn.dispose(); - } - return; - } - if (conn == null) return; - if (key.isWritable()) { - if (conn.writeHandler != null) writeOP(key, socket, conn); - } else if (key.isReadable()) { - if (conn.readHandler != null) readOP(key, socket, conn); - } - } - - private void readOP(SelectionKey key, SocketChannel socket, AsyncNIOTCPConnection conn) { - final CompletionHandler handler = conn.removeReadHandler(); - final ByteBuffer buffer = conn.removeReadBuffer(); - final Object attach = conn.removeReadAttachment(); - //System.out.println(conn + "------readbuf:" + buffer + "-------handler:" + handler); - if (handler == null || buffer == null) return; - try { - final int rs = socket.read(buffer); - { //测试 - buffer.flip(); - byte[] bs = new byte[buffer.remaining()]; - buffer.get(bs); - //System.out.println(conn + "------readbuf:" + buffer + "-------handler:" + handler + "-------读内容: " + new String(bs)); - } - //System.out.println(conn + "------readbuf:" + buffer + "-------handler:" + handler + "-------read: " + rs); - context.runAsync(() -> { - try { - handler.completed(rs, attach); - } catch (Throwable e) { - handler.failed(e, attach); - } - }); - } catch (Throwable t) { - context.runAsync(() -> handler.failed(t, attach)); - } - } - - private void writeOP(SelectionKey key, SocketChannel socket, AsyncNIOTCPConnection conn) { - final CompletionHandler handler = conn.writeHandler; - final ByteBuffer oneBuffer = conn.removeWriteOneBuffer(); - final ByteBuffer[] buffers = conn.removeWriteBuffers(); - final Object attach = conn.removeWriteAttachment(); - final int writingCount = conn.removeWritingCount(); - final int writeOffset = conn.removeWriteOffset(); - final int writeLength = conn.removeWriteLength(); - if (handler == null || (oneBuffer == null && buffers == null)) return; - //System.out.println(conn + "------buffers:" + Arrays.toString(buffers) + "---onebuf:" + oneBuffer + "-------handler:" + handler); - try { - int rs = 0; - if (oneBuffer == null) { - int offset = writeOffset; - int length = writeLength; - rs = (int) socket.write(buffers, offset, length); - boolean over = true; - int end = offset + length; - for (int i = offset; i < end; i++) { - if (buffers[i].hasRemaining()) { - over = false; - length -= i - offset; - offset = i; - } - } - if (!over) { - conn.writingCount += rs; - conn.writeHandler = handler; - conn.writeAttachment = attach; - conn.writeBuffers = buffers; - conn.writeOffset = offset; - conn.writeLength = length; - key.interestOps(SelectionKey.OP_READ + SelectionKey.OP_WRITE); - key.selector().wakeup(); - return; - } - } else { - rs = socket.write(oneBuffer); - if (oneBuffer.hasRemaining()) { - conn.writingCount += rs; - conn.writeHandler = handler; - conn.writeAttachment = attach; - conn.writeOneBuffer = oneBuffer; - key.interestOps(SelectionKey.OP_READ + SelectionKey.OP_WRITE); - key.selector().wakeup(); - return; - } - } - conn.removeWriteHandler(); - key.interestOps(SelectionKey.OP_READ); //OP_CONNECT - final int rs0 = rs + writingCount; - //System.out.println(conn + "------buffers:" + Arrays.toString(buffers) + "---onebuf:" + oneBuffer + "-------handler:" + handler + "-------write: " + rs); - context.runAsync(() -> { - try { - handler.completed(rs0, attach); - } catch (Throwable e) { - handler.failed(e, attach); - } - }); - } catch (Throwable t) { - context.runAsync(() -> handler.failed(t, attach)); - } - } - - } - } - } diff --git a/src/org/redkale/net/Server.java b/src/org/redkale/net/Server.java index 5b44445e3..3bb4ded90 100644 --- a/src/org/redkale/net/Server.java +++ b/src/org/redkale/net/Server.java @@ -54,6 +54,9 @@ public abstract class Server prepare; + //ClassLoader + protected RedkaleClassLoader serverClassLoader; + //SSL protected SSLContext sslContext; @@ -274,7 +277,7 @@ public abstract class Server + * 详情见: https://redkale.org + * + * @author zhangjx + */ +public class TcpAioAsyncConnection extends AsyncConnection { + + private int readTimeoutSeconds; + + private int writeTimeoutSeconds; + + private final AsynchronousSocketChannel channel; + + private final SocketAddress remoteAddress; + + public TcpAioAsyncConnection(final AsynchronousSocketChannel ch, SSLContext sslContext, + final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds, + final AtomicLong livingCounter, final AtomicLong closedCounter) { + this.channel = ch; + this.sslContext = sslContext; + this.readTimeoutSeconds = readTimeoutSeconds; + this.writeTimeoutSeconds = writeTimeoutSeconds; + SocketAddress addr = addr0; + if (addr == null) { + try { + addr = ch.getRemoteAddress(); + } catch (Exception e) { + //do nothing + } + } + this.remoteAddress = addr; + this.livingCounter = livingCounter; + this.closedCounter = closedCounter; + } + + @Override + public void read(ByteBuffer dst, A attachment, CompletionHandler handler) { + this.readtime = System.currentTimeMillis(); + if (readTimeoutSeconds > 0) { + channel.read(dst, readTimeoutSeconds, TimeUnit.SECONDS, attachment, handler); + } else { + channel.read(dst, attachment, handler); + } + } + + @Override + public void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { + this.readtime = System.currentTimeMillis(); + channel.read(dst, timeout < 0 ? 0 : timeout, unit, attachment, handler); + } + + @Override + public void write(ByteBuffer src, A attachment, CompletionHandler handler) { + this.writetime = System.currentTimeMillis(); + if (writeTimeoutSeconds > 0) { + channel.write(src, writeTimeoutSeconds, TimeUnit.SECONDS, attachment, handler); + } else { + channel.write(src, attachment, handler); + } + } + + @Override + public void write(ByteBuffer[] srcs, int offset, int length, A attachment, final CompletionHandler handler) { + this.writetime = System.currentTimeMillis(); + channel.write(srcs, offset, length, writeTimeoutSeconds > 0 ? writeTimeoutSeconds : 60, TimeUnit.SECONDS, + attachment, new CompletionHandler() { + + @Override + public void completed(Long result, A attachment) { + handler.completed(result.intValue(), attachment); + } + + @Override + public void failed(Throwable exc, A attachment) { + handler.failed(exc, attachment); + } + + }); + } + + @Override + public void setReadTimeoutSeconds(int readTimeoutSeconds) { + this.readTimeoutSeconds = readTimeoutSeconds; + } + + @Override + public void setWriteTimeoutSeconds(int writeTimeoutSeconds) { + this.writeTimeoutSeconds = writeTimeoutSeconds; + } + + @Override + public int getReadTimeoutSeconds() { + return this.readTimeoutSeconds; + } + + @Override + public int getWriteTimeoutSeconds() { + return this.writeTimeoutSeconds; + } + + @Override + public final SocketAddress getRemoteAddress() { + return remoteAddress; + } + + @Override + public SocketAddress getLocalAddress() { + try { + return channel.getLocalAddress(); + } catch (IOException e) { + return null; + } + } + + @Override + public final Future read(ByteBuffer dst) { + return channel.read(dst); + } + + @Override + public final Future write(ByteBuffer src) { + return channel.write(src); + } + + @Override + public final void close() throws IOException { + super.close(); + channel.close(); + } + + @Override + public final boolean isOpen() { + return channel.isOpen(); + } + + @Override + public final boolean isTCP() { + return true; + } + +} diff --git a/src/org/redkale/net/TcpAioProtocolServer.java b/src/org/redkale/net/TcpAioProtocolServer.java new file mode 100644 index 000000000..4f2eb4fbe --- /dev/null +++ b/src/org/redkale/net/TcpAioProtocolServer.java @@ -0,0 +1,140 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.net; + +import java.io.IOException; +import java.net.*; +import java.nio.channels.*; +import java.util.Set; +import org.redkale.util.AnyValue; + +/** + * 协议底层Server + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + */ +public class TcpAioProtocolServer extends ProtocolServer { + + private AsynchronousChannelGroup group; + + private AsynchronousServerSocketChannel serverChannel; + + public TcpAioProtocolServer(Context context) { + super(context); + } + + @Override + public void open(AnyValue config) throws IOException { + group = AsynchronousChannelGroup.withCachedThreadPool(context.executor, 1); + this.serverChannel = AsynchronousServerSocketChannel.open(group); + + final Set> options = this.serverChannel.supportedOptions(); + if (options.contains(StandardSocketOptions.TCP_NODELAY)) { + this.serverChannel.setOption(StandardSocketOptions.TCP_NODELAY, true); + } + if (options.contains(StandardSocketOptions.SO_KEEPALIVE)) { + this.serverChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); + } + if (options.contains(StandardSocketOptions.SO_REUSEADDR)) { + this.serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); + } + if (options.contains(StandardSocketOptions.SO_RCVBUF)) { + this.serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); + } + if (options.contains(StandardSocketOptions.SO_SNDBUF)) { + this.serverChannel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); + } + } + + @Override + public void bind(SocketAddress local, int backlog) throws IOException { + this.serverChannel.bind(local, backlog); + } + + @Override + public void setOption(SocketOption name, T value) throws IOException { + this.serverChannel.setOption(name, value); + } + + @Override + public Set> supportedOptions() { + return this.serverChannel.supportedOptions(); + } + + @Override + public void accept() throws IOException { + final AsynchronousServerSocketChannel serchannel = this.serverChannel; + serchannel.accept(null, new CompletionHandler() { + + private boolean supportInited; + + private boolean supportTcpLay; + + private boolean supportAlive; + + private boolean supportReuse; + + private boolean supportRcv; + + private boolean supportSnd; + + @Override + public void completed(final AsynchronousSocketChannel channel, Void attachment) { + serchannel.accept(null, this); + if (maxconns > 0 && livingCounter.get() >= maxconns) { + try { + channel.close(); + } catch (Exception e) { + } + return; + } + createCounter.incrementAndGet(); + livingCounter.incrementAndGet(); + try { + if (!supportInited) { + synchronized (this) { + if (!supportInited) { + supportInited = true; + final Set> options = channel.supportedOptions(); + supportTcpLay = options.contains(StandardSocketOptions.TCP_NODELAY); + supportAlive = options.contains(StandardSocketOptions.SO_KEEPALIVE); + supportReuse = options.contains(StandardSocketOptions.SO_REUSEADDR); + supportRcv = options.contains(StandardSocketOptions.SO_RCVBUF); + supportSnd = options.contains(StandardSocketOptions.SO_SNDBUF); + } + } + } + if (supportTcpLay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true); + if (supportAlive) channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); + if (supportReuse) channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); + if (supportRcv) channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); + if (supportSnd) channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); + } catch (IOException e) { + e.printStackTrace(); + } + AsyncConnection conn = new TcpAioAsyncConnection(channel, context.sslContext, null, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null); + conn.livingCounter = livingCounter; + conn.closedCounter = closedCounter; + context.runAsync(new PrepareRunner(context, conn, null, null)); + } + + @Override + public void failed(Throwable exc, Void attachment) { + serchannel.accept(null, this); + //if (exc != null) context.logger.log(Level.FINEST, AsynchronousServerSocketChannel.class.getSimpleName() + " accept erroneous", exc); + } + }); + } + + @Override + public void close() throws IOException { + this.serverChannel.close(); + } + +} diff --git a/src/org/redkale/net/TcpBioAsyncConnection.java b/src/org/redkale/net/TcpBioAsyncConnection.java new file mode 100644 index 000000000..a53f95c7b --- /dev/null +++ b/src/org/redkale/net/TcpBioAsyncConnection.java @@ -0,0 +1,173 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.net; + +import java.io.IOException; +import java.net.*; +import java.nio.ByteBuffer; +import java.nio.channels.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; + +/** + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + */ +public class TcpBioAsyncConnection extends AsyncConnection { + + private int readTimeoutSeconds; + + private int writeTimeoutSeconds; + + private final Socket socket; + + private final ReadableByteChannel readChannel; + + private final WritableByteChannel writeChannel; + + private final SocketAddress remoteAddress; + + public TcpBioAsyncConnection(final Socket socket, final SocketAddress addr0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0, + final AtomicLong livingCounter, final AtomicLong closedCounter) { + this.socket = socket; + ReadableByteChannel rc = null; + WritableByteChannel wc = null; + try { + socket.setSoTimeout(Math.max(readTimeoutSeconds0, writeTimeoutSeconds0)); + rc = Channels.newChannel(socket.getInputStream()); + wc = Channels.newChannel(socket.getOutputStream()); + } catch (IOException e) { + e.printStackTrace(); + } + this.readChannel = rc; + this.writeChannel = wc; + this.readTimeoutSeconds = readTimeoutSeconds0; + this.writeTimeoutSeconds = writeTimeoutSeconds0; + SocketAddress addr = addr0; + if (addr == null) { + try { + addr = socket.getRemoteSocketAddress(); + } catch (Exception e) { + //do nothing + } + } + this.remoteAddress = addr; + this.livingCounter = livingCounter; + this.closedCounter = closedCounter; + } + + @Override + public boolean isTCP() { + return true; + } + + @Override + public SocketAddress getRemoteAddress() { + return remoteAddress; + } + + @Override + public SocketAddress getLocalAddress() { + return socket.getLocalSocketAddress(); + } + + @Override + public int getReadTimeoutSeconds() { + return readTimeoutSeconds; + } + + @Override + public int getWriteTimeoutSeconds() { + return writeTimeoutSeconds; + } + + @Override + public void setReadTimeoutSeconds(int readTimeoutSeconds) { + this.readTimeoutSeconds = readTimeoutSeconds; + } + + @Override + public void setWriteTimeoutSeconds(int writeTimeoutSeconds) { + this.writeTimeoutSeconds = writeTimeoutSeconds; + } + + @Override + public void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler) { + try { + int rs = 0; + for (int i = offset; i < offset + length; i++) { + rs += writeChannel.write(srcs[i]); + } + this.writetime = System.currentTimeMillis(); + if (handler != null) handler.completed(rs, attachment); + } catch (IOException e) { + if (handler != null) handler.failed(e, attachment); + } + } + + @Override + public void read(ByteBuffer dst, A attachment, CompletionHandler handler) { + try { + int rs = readChannel.read(dst); + this.readtime = System.currentTimeMillis(); + if (handler != null) handler.completed(rs, attachment); + } catch (IOException e) { + if (handler != null) handler.failed(e, attachment); + } + } + + @Override + public void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { + read(dst, attachment, handler); + } + + @Override + public Future read(ByteBuffer dst) { + try { + int rs = readChannel.read(dst); + this.readtime = System.currentTimeMillis(); + return CompletableFuture.completedFuture(rs); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void write(ByteBuffer src, A attachment, CompletionHandler handler) { + try { + int rs = writeChannel.write(src); + this.writetime = System.currentTimeMillis(); + if (handler != null) handler.completed(rs, attachment); + } catch (IOException e) { + if (handler != null) handler.failed(e, attachment); + } + } + + @Override + public Future write(ByteBuffer src) { + try { + int rs = writeChannel.write(src); + this.writetime = System.currentTimeMillis(); + return CompletableFuture.completedFuture(rs); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() throws IOException { + super.close(); + this.socket.close(); + } + + @Override + public boolean isOpen() { + return !socket.isClosed(); + } +} diff --git a/src/org/redkale/net/TcpNioAsyncConnection.java b/src/org/redkale/net/TcpNioAsyncConnection.java new file mode 100644 index 000000000..943b73987 --- /dev/null +++ b/src/org/redkale/net/TcpNioAsyncConnection.java @@ -0,0 +1,329 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.net; + +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; + +/** + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + */ +public class TcpNioAsyncConnection extends AsyncConnection { + + private int readTimeoutSeconds; + + private int writeTimeoutSeconds; + + private final Selector selector; + + private SelectionKey key; + + private final SocketChannel channel; + + private final SocketAddress remoteAddress; + + ByteBuffer readBuffer; + + Object readAttachment; + + CompletionHandler readHandler; + + ByteBuffer writeOneBuffer; + + ByteBuffer[] writeBuffers; + + int writingCount; + + int writeOffset; + + int writeLength; + + Object writeAttachment; + + CompletionHandler writeHandler; + + public TcpNioAsyncConnection(final SocketChannel ch, SocketAddress addr0, + final Selector selector, + final int readTimeoutSeconds0, final int writeTimeoutSeconds0, + final AtomicLong livingCounter, final AtomicLong closedCounter) { + this.channel = ch; + this.selector = selector; + this.readTimeoutSeconds = readTimeoutSeconds0; + this.writeTimeoutSeconds = writeTimeoutSeconds0; + SocketAddress addr = addr0; + if (addr == null) { + try { + addr = ch.getRemoteAddress(); + } catch (Exception e) { + //do nothing + } + } + this.remoteAddress = addr; + this.livingCounter = livingCounter; + this.closedCounter = closedCounter; + } + + @Override + public void setReadTimeoutSeconds(int readTimeoutSeconds) { + this.readTimeoutSeconds = readTimeoutSeconds; + } + + @Override + public void setWriteTimeoutSeconds(int writeTimeoutSeconds) { + this.writeTimeoutSeconds = writeTimeoutSeconds; + } + + @Override + public int getReadTimeoutSeconds() { + return this.readTimeoutSeconds; + } + + @Override + public int getWriteTimeoutSeconds() { + return this.writeTimeoutSeconds; + } + + @Override + public final SocketAddress getRemoteAddress() { + return remoteAddress; + } + + @Override + public SocketAddress getLocalAddress() { + try { + return channel.getLocalAddress(); + } catch (IOException e) { + return null; + } + } + + CompletionHandler removeReadHandler() { + CompletionHandler handler = this.readHandler; + this.readHandler = null; + return handler; + } + + ByteBuffer removeReadBuffer() { + ByteBuffer buffer = this.readBuffer; + this.readBuffer = null; + return buffer; + } + + Object removeReadAttachment() { + Object attach = this.readAttachment; + this.readAttachment = null; + return attach; + } + + void completeRead(int rs) { + Object attach = this.readAttachment; + CompletionHandler handler = this.readHandler; + this.readBuffer = null; + this.readAttachment = null; + this.readHandler = null; + handler.completed(rs, attach); + } + + void faileRead(Throwable t) { + Object attach = this.readAttachment; + CompletionHandler handler = this.readHandler; + this.readBuffer = null; + this.readAttachment = null; + this.readHandler = null; + handler.failed(t, attach); + } + + CompletionHandler removeWriteHandler() { + CompletionHandler handler = this.writeHandler; + this.writeHandler = null; + return handler; + } + + ByteBuffer removeWriteOneBuffer() { + ByteBuffer buffer = this.writeOneBuffer; + this.writeOneBuffer = null; + return buffer; + } + + ByteBuffer[] removeWriteBuffers() { + ByteBuffer[] buffers = this.writeBuffers; + this.writeBuffers = null; + return buffers; + } + + int removeWritingCount() { + int rs = this.writingCount; + this.writingCount = 0; + return rs; + } + + int removeWriteOffset() { + int rs = this.writeOffset; + this.writeOffset = 0; + return rs; + } + + int removeWriteLength() { + int rs = this.writeLength; + this.writeLength = 0; + return rs; + } + + Object removeWriteAttachment() { + Object attach = this.writeAttachment; + this.writeAttachment = null; + return attach; + } + + void completeWrite(int rs) { + Object attach = this.writeAttachment; + CompletionHandler handler = this.writeHandler; + this.writeOneBuffer = null; + this.writeBuffers = null; + this.writeOffset = 0; + this.writeLength = 0; + this.writeAttachment = null; + this.writeHandler = null; + handler.completed(rs, attach); + } + + void faileWrite(Throwable t) { + Object attach = this.writeAttachment; + CompletionHandler handler = this.writeHandler; + this.writeOneBuffer = null; + this.writeBuffers = null; + this.writeOffset = 0; + this.writeLength = 0; + this.writeAttachment = null; + this.writeHandler = null; + handler.failed(t, attach); + } + + @Override + public void read(ByteBuffer dst, A attachment, CompletionHandler handler) { + if (this.readHandler != null) throw new RuntimeException("pending read"); + try { + this.readBuffer = dst; + this.readAttachment = attachment; + this.readHandler = handler; + if (key == null) { + key = channel.register(selector, SelectionKey.OP_READ); + key.attach(this); + } else { + key.interestOps(SelectionKey.OP_READ); + } + selector.wakeup(); + } catch (Exception e) { + faileRead(e); + } + } + + @Override + public void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { + read(dst, attachment, handler); + } + + @Override + public Future read(ByteBuffer dst) { + CompletableFuture future = new CompletableFuture(); + read(dst, null, new CompletionHandler() { + @Override + public void completed(Integer result, Void attachment) { + future.complete(result); + } + + @Override + public void failed(Throwable exc, Void attachment) { + future.completeExceptionally(exc); + } + }); + return future; + } + + @Override + public void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler) { + if (this.writeHandler != null) throw new RuntimeException("pending write"); + try { + this.writeBuffers = srcs; + this.writeOffset = offset; + this.writeLength = length; + this.writingCount = 0; + this.writeAttachment = attachment; + this.writeHandler = handler; + if (key == null) { + key = channel.register(selector, SelectionKey.OP_WRITE); + key.attach(this); + } else { + key.interestOps(SelectionKey.OP_WRITE); + } + selector.wakeup(); + } catch (Exception e) { + faileWrite(e); + } + } + + @Override + public void write(ByteBuffer src, A attachment, CompletionHandler handler) { + if (this.writeHandler != null) throw new RuntimeException("pending write"); + try { + this.writeOneBuffer = src; + this.writingCount = 0; + this.writeAttachment = attachment; + this.writeHandler = handler; + if (key == null) { + key = channel.register(selector, SelectionKey.OP_WRITE); + key.attach(this); + } else { + key.interestOps(SelectionKey.OP_WRITE); + } + selector.wakeup(); + } catch (Exception e) { + faileWrite(e); + } + } + + @Override + public Future write(ByteBuffer src) { + CompletableFuture future = new CompletableFuture(); + write(src, null, new CompletionHandler() { + @Override + public void completed(Integer result, Void attachment) { + future.complete(result); + } + + @Override + public void failed(Throwable exc, Void attachment) { + future.completeExceptionally(exc); + } + }); + return future; + } + + @Override + public final void close() throws IOException { + super.close(); + channel.close(); + key.cancel(); + } + + @Override + public final boolean isOpen() { + return channel.isOpen(); + } + + @Override + public final boolean isTCP() { + return true; + } +} diff --git a/src/org/redkale/net/TcpNioProtocolServer.java b/src/org/redkale/net/TcpNioProtocolServer.java new file mode 100644 index 000000000..9cf54fa4e --- /dev/null +++ b/src/org/redkale/net/TcpNioProtocolServer.java @@ -0,0 +1,309 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.net; + +import java.io.IOException; +import java.net.*; +import java.nio.ByteBuffer; +import java.nio.channels.*; +import java.util.*; +import java.util.concurrent.CountDownLatch; +import org.redkale.util.AnyValue; + +/** + * 协议底层Server + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + */ +public class TcpNioProtocolServer extends ProtocolServer { + + private Selector acceptSelector; + + private ServerSocketChannel serverChannel; + + private NIOThreadWorker[] workers; + + private NIOThreadWorker currWorker; + + private boolean running; + + public TcpNioProtocolServer(Context context) { + super(context); + } + + @Override + public void open(AnyValue config) throws IOException { + acceptSelector = Selector.open(); + this.serverChannel = ServerSocketChannel.open(); + serverChannel.configureBlocking(false); + ServerSocket socket = serverChannel.socket(); + socket.setReceiveBufferSize(16 * 1024); + socket.setReuseAddress(true); + + final Set> options = this.serverChannel.supportedOptions(); + if (options.contains(StandardSocketOptions.TCP_NODELAY)) { + this.serverChannel.setOption(StandardSocketOptions.TCP_NODELAY, true); + } + if (options.contains(StandardSocketOptions.SO_KEEPALIVE)) { + this.serverChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); + } + if (options.contains(StandardSocketOptions.SO_REUSEADDR)) { + this.serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); + } + if (options.contains(StandardSocketOptions.SO_RCVBUF)) { + this.serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); + } + if (options.contains(StandardSocketOptions.SO_SNDBUF)) { + this.serverChannel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); + } + } + + @Override + public void bind(SocketAddress local, int backlog) throws IOException { + this.serverChannel.bind(local, backlog); + } + + @Override + public Set> supportedOptions() { + return this.serverChannel.supportedOptions(); + } + + @Override + public void setOption(SocketOption name, T value) throws IOException { + this.serverChannel.setOption(name, value); + } + + @Override + public void accept() throws IOException { + this.serverChannel.register(acceptSelector, SelectionKey.OP_ACCEPT); + final CountDownLatch cdl = new CountDownLatch(1); + this.running = true; + this.workers = new NIOThreadWorker[Runtime.getRuntime().availableProcessors()]; + for (int i = 0; i < workers.length; i++) { + workers[i] = new NIOThreadWorker(); + workers[i].setDaemon(true); + workers[i].start(); + } + for (int i = 0; i < workers.length - 1; i++) { //构成环形 + workers[i].next = workers[i + 1]; + } + workers[workers.length - 1].next = workers[0]; + currWorker = workers[0]; + new Thread() { + @Override + public void run() { + cdl.countDown(); + while (running) { + try { + acceptSelector.select(); + Set selectedKeys = acceptSelector.selectedKeys(); + synchronized (selectedKeys) { + Iterator iter = selectedKeys.iterator(); + while (iter.hasNext()) { + SelectionKey key = (SelectionKey) iter.next(); + iter.remove(); + if (key.isAcceptable()) { + try { + SocketChannel channel = ((ServerSocketChannel) key.channel()).accept(); + channel.configureBlocking(false); + channel.setOption(StandardSocketOptions.TCP_NODELAY, true); + channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); + channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); + channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); + channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); + createCounter.incrementAndGet(); + livingCounter.incrementAndGet(); + currWorker.addChannel(channel); + currWorker = currWorker.next; + } catch (IOException io) { + io.printStackTrace(); + } + } + } + } + } catch (Throwable t) { + t.printStackTrace(); + } + } + } + }.start(); + try { + cdl.await(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public void close() throws IOException { + if (!this.running) return; + this.running = false; + serverChannel.close(); + acceptSelector.close(); + for (NIOThreadWorker worker : workers) { + worker.interrupt(); + } + } + + class NIOThreadWorker extends Thread { + + final Selector selector; + + NIOThreadWorker next; + + public NIOThreadWorker() { + try { + this.selector = Selector.open(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public void addChannel(SocketChannel channel) throws IOException { + AsyncConnection conn = new TcpNioAsyncConnection(channel, null, selector, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null); + context.runAsync(new PrepareRunner(context, conn, null, null)); + } + + @Override + public void run() { + while (running) { + try { + selector.select(50); + } catch (IOException e) { + e.printStackTrace(); + } + try { + Set selectedKeys = selector.selectedKeys(); + synchronized (selectedKeys) { + Iterator iter = selectedKeys.iterator(); + while (iter.hasNext()) { + SelectionKey key = (SelectionKey) iter.next(); + iter.remove(); + processKey(key); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + private void processKey(SelectionKey key) { + if (key == null || !key.isValid()) return; + SocketChannel socket = (SocketChannel) key.channel(); + TcpNioAsyncConnection conn = (TcpNioAsyncConnection) key.attachment(); + if (!socket.isOpen()) { + if (conn == null) { + key.cancel(); + } else { + conn.dispose(); + } + return; + } + if (conn == null) return; + if (key.isWritable()) { + if (conn.writeHandler != null) writeOP(key, socket, conn); + } else if (key.isReadable()) { + if (conn.readHandler != null) readOP(key, socket, conn); + } + } + + private void readOP(SelectionKey key, SocketChannel socket, TcpNioAsyncConnection conn) { + final CompletionHandler handler = conn.removeReadHandler(); + final ByteBuffer buffer = conn.removeReadBuffer(); + final Object attach = conn.removeReadAttachment(); + //System.out.println(conn + "------readbuf:" + buffer + "-------handler:" + handler); + if (handler == null || buffer == null) return; + try { + final int rs = socket.read(buffer); + { //测试 + buffer.flip(); + byte[] bs = new byte[buffer.remaining()]; + buffer.get(bs); + //System.out.println(conn + "------readbuf:" + buffer + "-------handler:" + handler + "-------读内容: " + new String(bs)); + } + //System.out.println(conn + "------readbuf:" + buffer + "-------handler:" + handler + "-------read: " + rs); + context.runAsync(() -> { + try { + handler.completed(rs, attach); + } catch (Throwable e) { + handler.failed(e, attach); + } + }); + } catch (Throwable t) { + context.runAsync(() -> handler.failed(t, attach)); + } + } + + private void writeOP(SelectionKey key, SocketChannel socket, TcpNioAsyncConnection conn) { + final CompletionHandler handler = conn.writeHandler; + final ByteBuffer oneBuffer = conn.removeWriteOneBuffer(); + final ByteBuffer[] buffers = conn.removeWriteBuffers(); + final Object attach = conn.removeWriteAttachment(); + final int writingCount = conn.removeWritingCount(); + final int writeOffset = conn.removeWriteOffset(); + final int writeLength = conn.removeWriteLength(); + if (handler == null || (oneBuffer == null && buffers == null)) return; + //System.out.println(conn + "------buffers:" + Arrays.toString(buffers) + "---onebuf:" + oneBuffer + "-------handler:" + handler); + try { + int rs = 0; + if (oneBuffer == null) { + int offset = writeOffset; + int length = writeLength; + rs = (int) socket.write(buffers, offset, length); + boolean over = true; + int end = offset + length; + for (int i = offset; i < end; i++) { + if (buffers[i].hasRemaining()) { + over = false; + length -= i - offset; + offset = i; + } + } + if (!over) { + conn.writingCount += rs; + conn.writeHandler = handler; + conn.writeAttachment = attach; + conn.writeBuffers = buffers; + conn.writeOffset = offset; + conn.writeLength = length; + key.interestOps(SelectionKey.OP_READ + SelectionKey.OP_WRITE); + key.selector().wakeup(); + return; + } + } else { + rs = socket.write(oneBuffer); + if (oneBuffer.hasRemaining()) { + conn.writingCount += rs; + conn.writeHandler = handler; + conn.writeAttachment = attach; + conn.writeOneBuffer = oneBuffer; + key.interestOps(SelectionKey.OP_READ + SelectionKey.OP_WRITE); + key.selector().wakeup(); + return; + } + } + conn.removeWriteHandler(); + key.interestOps(SelectionKey.OP_READ); //OP_CONNECT + final int rs0 = rs + writingCount; + //System.out.println(conn + "------buffers:" + Arrays.toString(buffers) + "---onebuf:" + oneBuffer + "-------handler:" + handler + "-------write: " + rs); + context.runAsync(() -> { + try { + handler.completed(rs0, attach); + } catch (Throwable e) { + handler.failed(e, attach); + } + }); + } catch (Throwable t) { + context.runAsync(() -> handler.failed(t, attach)); + } + } + + } +} diff --git a/src/org/redkale/net/UdpBioAsyncConnection.java b/src/org/redkale/net/UdpBioAsyncConnection.java new file mode 100644 index 000000000..32df9898f --- /dev/null +++ b/src/org/redkale/net/UdpBioAsyncConnection.java @@ -0,0 +1,167 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.net; + +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; + +/** + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + */ +public class UdpBioAsyncConnection extends AsyncConnection { + + private int readTimeoutSeconds; + + private int writeTimeoutSeconds; + + private final DatagramChannel channel; + + private final SocketAddress remoteAddress; + + private final boolean client; + + public UdpBioAsyncConnection(final DatagramChannel ch, SocketAddress addr0, + final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0, + final AtomicLong livingCounter, final AtomicLong closedCounter) { + this.channel = ch; + this.client = client0; + this.readTimeoutSeconds = readTimeoutSeconds0; + this.writeTimeoutSeconds = writeTimeoutSeconds0; + SocketAddress addr = addr0; + if (addr == null) { + try { + addr = ch.getRemoteAddress(); + } catch (Exception e) { + //do nothing + } + } + this.remoteAddress = addr; + this.livingCounter = livingCounter; + this.closedCounter = closedCounter; + } + + @Override + public void setReadTimeoutSeconds(int readTimeoutSeconds) { + this.readTimeoutSeconds = readTimeoutSeconds; + } + + @Override + public void setWriteTimeoutSeconds(int writeTimeoutSeconds) { + this.writeTimeoutSeconds = writeTimeoutSeconds; + } + + @Override + public int getReadTimeoutSeconds() { + return this.readTimeoutSeconds; + } + + @Override + public int getWriteTimeoutSeconds() { + return this.writeTimeoutSeconds; + } + + @Override + public final SocketAddress getRemoteAddress() { + return remoteAddress; + } + + @Override + public SocketAddress getLocalAddress() { + try { + return channel.getLocalAddress(); + } catch (IOException e) { + return null; + } + } + + @Override + public void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler) { + try { + int rs = 0; + for (int i = offset; i < offset + length; i++) { + rs += channel.send(srcs[i], remoteAddress); + if (i != offset) Thread.sleep(10); + } + this.writetime = System.currentTimeMillis(); + if (handler != null) handler.completed(rs, attachment); + } catch (Exception e) { + if (handler != null) handler.failed(e, attachment); + } + } + + @Override + public void read(ByteBuffer dst, A attachment, CompletionHandler handler) { + try { + int rs = channel.read(dst); + this.readtime = System.currentTimeMillis(); + if (handler != null) handler.completed(rs, attachment); + } catch (IOException e) { + if (handler != null) handler.failed(e, attachment); + } + } + + @Override + public void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { + read(dst, attachment, handler); + } + + @Override + public Future read(ByteBuffer dst) { + try { + int rs = channel.read(dst); + this.readtime = System.currentTimeMillis(); + return CompletableFuture.completedFuture(rs); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void write(ByteBuffer src, A attachment, CompletionHandler handler) { + try { + int rs = channel.send(src, remoteAddress); + this.writetime = System.currentTimeMillis(); + if (handler != null) handler.completed(rs, attachment); + } catch (IOException e) { + if (handler != null) handler.failed(e, attachment); + } + } + + @Override + public Future write(ByteBuffer src) { + try { + int rs = channel.send(src, remoteAddress); + this.writetime = System.currentTimeMillis(); + return CompletableFuture.completedFuture(rs); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public final void close() throws IOException { + super.close(); + if (client) channel.close(); + } + + @Override + public final boolean isOpen() { + return channel.isOpen(); + } + + @Override + public final boolean isTCP() { + return false; + } +} diff --git a/src/org/redkale/net/UdpBioProtocolServer.java b/src/org/redkale/net/UdpBioProtocolServer.java new file mode 100644 index 000000000..13a45c05c --- /dev/null +++ b/src/org/redkale/net/UdpBioProtocolServer.java @@ -0,0 +1,123 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.net; + +import java.io.IOException; +import java.net.*; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import org.redkale.util.AnyValue; + +/** + * 协议底层Server + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + */ +public class UdpBioProtocolServer extends ProtocolServer { + + private boolean running; + + private DatagramChannel serverChannel; + + public UdpBioProtocolServer(Context context) { + super(context); + } + + @Override + public void open(AnyValue config) throws IOException { + DatagramChannel ch = DatagramChannel.open(); + ch.configureBlocking(true); + this.serverChannel = ch; + final Set> options = this.serverChannel.supportedOptions(); + if (options.contains(StandardSocketOptions.TCP_NODELAY)) { + this.serverChannel.setOption(StandardSocketOptions.TCP_NODELAY, true); + } + if (options.contains(StandardSocketOptions.SO_KEEPALIVE)) { + this.serverChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); + } + if (options.contains(StandardSocketOptions.SO_REUSEADDR)) { + this.serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); + } + if (options.contains(StandardSocketOptions.SO_RCVBUF)) { + this.serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); + } + if (options.contains(StandardSocketOptions.SO_SNDBUF)) { + this.serverChannel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); + } + } + + @Override + public void bind(SocketAddress local, int backlog) throws IOException { + this.serverChannel.bind(local); + } + + @Override + public void setOption(SocketOption name, T value) throws IOException { + this.serverChannel.setOption(name, value); + } + + @Override + public Set> supportedOptions() { + return this.serverChannel.supportedOptions(); + } + + @Override + public void accept() throws IOException { + final DatagramChannel serchannel = this.serverChannel; + final int readTimeoutSeconds = this.context.readTimeoutSeconds; + final int writeTimeoutSeconds = this.context.writeTimeoutSeconds; + final CountDownLatch cdl = new CountDownLatch(1); + this.running = true; + new Thread() { + @Override + public void run() { + cdl.countDown(); + while (running) { + final ByteBuffer buffer = context.pollBuffer(); + try { + SocketAddress address = serchannel.receive(buffer); + buffer.flip(); + AsyncConnection conn = new UdpBioAsyncConnection(serchannel, address, false, readTimeoutSeconds, writeTimeoutSeconds, null, null); + context.runAsync(new PrepareRunner(context, conn, buffer, null)); + } catch (Exception e) { + context.offerBuffer(buffer); + } + } + } + }.start(); + try { + cdl.await(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public void close() throws IOException { + this.running = false; + this.serverChannel.close(); + } + + @Override + public long getCreateCount() { + return -1; + } + + @Override + public long getClosedCount() { + return -1; + } + + @Override + public long getLivingCount() { + return -1; + } +}