This commit is contained in:
Redkale
2018-05-05 16:05:48 +08:00
parent e3e065f36e
commit a168dab8c0

View File

@@ -219,6 +219,149 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
return future;
}
private static class NIOTCPAsyncConnection extends AsyncConnection {
private int readTimeoutSeconds;
private int writeTimeoutSeconds;
private final SocketChannel channel;
private final SocketAddress remoteAddress;
public NIOTCPAsyncConnection(final SocketChannel ch, SocketAddress addr,
final int readTimeoutSeconds0, final int writeTimeoutSeconds0,
final AtomicLong livingCounter, final AtomicLong closedCounter) {
this.channel = ch;
this.readTimeoutSeconds = readTimeoutSeconds0;
this.writeTimeoutSeconds = writeTimeoutSeconds0;
this.remoteAddress = addr;
this.livingCounter = livingCounter;
this.closedCounter = closedCounter;
}
@Override
public void setReadTimeoutSeconds(int readTimeoutSeconds) {
this.readTimeoutSeconds = readTimeoutSeconds;
}
@Override
public void setWriteTimeoutSeconds(int writeTimeoutSeconds) {
this.writeTimeoutSeconds = writeTimeoutSeconds;
}
@Override
public int getReadTimeoutSeconds() {
return this.readTimeoutSeconds;
}
@Override
public int getWriteTimeoutSeconds() {
return this.writeTimeoutSeconds;
}
@Override
public final SocketAddress getRemoteAddress() {
return remoteAddress;
}
@Override
public SocketAddress getLocalAddress() {
try {
return channel.getLocalAddress();
} catch (IOException e) {
return null;
}
}
@Override
public <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) {
try {
int rs = (int) channel.write(srcs, offset, length);
this.writetime = System.currentTimeMillis();
if (handler != null) handler.completed(rs, attachment);
} catch (Exception e) {
if (handler != null) handler.failed(e, attachment);
}
}
@Override
public <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) {
try {
int rs = channel.read(dst);
this.readtime = System.currentTimeMillis();
if (handler != null) handler.completed(rs, attachment);
} catch (IOException e) {
if (handler != null) handler.failed(e, attachment);
}
}
@Override
public <A> void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler<Integer, ? super A> handler) {
read(dst, attachment, handler);
}
@Override
public Future<Integer> read(ByteBuffer dst) {
try {
int rs = channel.read(dst);
this.readtime = System.currentTimeMillis();
return CompletableFuture.completedFuture(rs);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
try {
int rs = channel.write(src);
this.writetime = System.currentTimeMillis();
if (handler != null) handler.completed(rs, attachment);
} catch (IOException e) {
if (handler != null) handler.failed(e, attachment);
}
}
@Override
public Future<Integer> write(ByteBuffer src) {
try {
int rs = channel.read(src);
this.writetime = System.currentTimeMillis();
return CompletableFuture.completedFuture(rs);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public final void close() throws IOException {
super.close();
channel.close();
}
@Override
public final boolean isOpen() {
return channel.isOpen();
}
@Override
public final boolean isTCP() {
return true;
}
}
public static AsyncConnection create(final SocketChannel ch, SocketAddress addr,
final int readTimeoutSeconds0, final int writeTimeoutSeconds0) {
return new NIOTCPAsyncConnection(ch, addr, readTimeoutSeconds0, writeTimeoutSeconds0, null, null);
}
public static AsyncConnection create(final SocketChannel ch, SocketAddress addr,
final int readTimeoutSeconds0, final int writeTimeoutSeconds0,
final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new NIOTCPAsyncConnection(ch, addr, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter);
}
private static class BIOUDPAsyncConnection extends AsyncConnection {
private int readTimeoutSeconds;
@@ -240,7 +383,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
this.writeTimeoutSeconds = writeTimeoutSeconds0;
this.remoteAddress = addr;
this.livingCounter = livingCounter;
this.closedCounter = this.closedCounter;
this.closedCounter = closedCounter;
}
@Override