diff --git a/src/org/redkale/net/AsyncConnection.java b/src/org/redkale/net/AsyncConnection.java index b13a7ea20..efaf84be7 100644 --- a/src/org/redkale/net/AsyncConnection.java +++ b/src/org/redkale/net/AsyncConnection.java @@ -219,6 +219,149 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl return future; } + private static class NIOTCPAsyncConnection extends AsyncConnection { + + private int readTimeoutSeconds; + + private int writeTimeoutSeconds; + + private final SocketChannel channel; + + private final SocketAddress remoteAddress; + + public NIOTCPAsyncConnection(final SocketChannel ch, SocketAddress addr, + final int readTimeoutSeconds0, final int writeTimeoutSeconds0, + final AtomicLong livingCounter, final AtomicLong closedCounter) { + this.channel = ch; + this.readTimeoutSeconds = readTimeoutSeconds0; + this.writeTimeoutSeconds = writeTimeoutSeconds0; + this.remoteAddress = addr; + this.livingCounter = livingCounter; + this.closedCounter = closedCounter; + } + + @Override + public void setReadTimeoutSeconds(int readTimeoutSeconds) { + this.readTimeoutSeconds = readTimeoutSeconds; + } + + @Override + public void setWriteTimeoutSeconds(int writeTimeoutSeconds) { + this.writeTimeoutSeconds = writeTimeoutSeconds; + } + + @Override + public int getReadTimeoutSeconds() { + return this.readTimeoutSeconds; + } + + @Override + public int getWriteTimeoutSeconds() { + return this.writeTimeoutSeconds; + } + + @Override + public final SocketAddress getRemoteAddress() { + return remoteAddress; + } + + @Override + public SocketAddress getLocalAddress() { + try { + return channel.getLocalAddress(); + } catch (IOException e) { + return null; + } + } + + @Override + public void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler) { + try { + int rs = (int) channel.write(srcs, offset, length); + this.writetime = System.currentTimeMillis(); + if (handler != null) handler.completed(rs, attachment); + } catch (Exception e) { + if (handler != null) handler.failed(e, attachment); + } + } + + @Override + public void read(ByteBuffer dst, A attachment, CompletionHandler handler) { + try { + int rs = channel.read(dst); + this.readtime = System.currentTimeMillis(); + if (handler != null) handler.completed(rs, attachment); + } catch (IOException e) { + if (handler != null) handler.failed(e, attachment); + } + } + + @Override + public void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { + read(dst, attachment, handler); + } + + @Override + public Future read(ByteBuffer dst) { + try { + int rs = channel.read(dst); + this.readtime = System.currentTimeMillis(); + return CompletableFuture.completedFuture(rs); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void write(ByteBuffer src, A attachment, CompletionHandler handler) { + try { + int rs = channel.write(src); + this.writetime = System.currentTimeMillis(); + if (handler != null) handler.completed(rs, attachment); + } catch (IOException e) { + if (handler != null) handler.failed(e, attachment); + } + } + + @Override + public Future write(ByteBuffer src) { + try { + int rs = channel.read(src); + this.writetime = System.currentTimeMillis(); + return CompletableFuture.completedFuture(rs); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public final void close() throws IOException { + super.close(); + channel.close(); + } + + @Override + public final boolean isOpen() { + return channel.isOpen(); + } + + @Override + public final boolean isTCP() { + return true; + } + } + + public static AsyncConnection create(final SocketChannel ch, SocketAddress addr, + final int readTimeoutSeconds0, final int writeTimeoutSeconds0) { + return new NIOTCPAsyncConnection(ch, addr, readTimeoutSeconds0, writeTimeoutSeconds0, null, null); + } + + public static AsyncConnection create(final SocketChannel ch, SocketAddress addr, + final int readTimeoutSeconds0, final int writeTimeoutSeconds0, + final AtomicLong livingCounter, final AtomicLong closedCounter) { + return new NIOTCPAsyncConnection(ch, addr, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter); + } + private static class BIOUDPAsyncConnection extends AsyncConnection { private int readTimeoutSeconds; @@ -240,7 +383,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl this.writeTimeoutSeconds = writeTimeoutSeconds0; this.remoteAddress = addr; this.livingCounter = livingCounter; - this.closedCounter = this.closedCounter; + this.closedCounter = closedCounter; } @Override