This commit is contained in:
Redkale
2018-05-28 22:13:50 +08:00
parent 79de8dbbca
commit dd444e3a0f
12 changed files with 1467 additions and 1290 deletions

View File

@@ -125,6 +125,7 @@
aliveTimeoutSeconds: KeepAlive读操作超时秒数 默认30 0表示永久不超时; -1表示禁止KeepAlive aliveTimeoutSeconds: KeepAlive读操作超时秒数 默认30 0表示永久不超时; -1表示禁止KeepAlive
readTimeoutSeconds: 读操作超时秒数, 默认0 表示永久不超时 readTimeoutSeconds: 读操作超时秒数, 默认0 表示永久不超时
writeTimeoutSeconds: 写操作超时秒数, 默认0 表示永久不超时 writeTimeoutSeconds: 写操作超时秒数, 默认0 表示永久不超时
netimpl: ProtocolServer的实现类。TCP情况下值也可以是aio或nio默认值为aioUDP情况下值也可以是bio默认值为bio
interceptor: 启动/关闭NodeServer时被调用的拦截器实现类必须是org.redkale.boot.NodeInterceptor的子类默认为null interceptor: 启动/关闭NodeServer时被调用的拦截器实现类必须是org.redkale.boot.NodeInterceptor的子类默认为null
--> -->
<server protocol="HTTP" host="127.0.0.1" port="6060" root="root" lib=""> <server protocol="HTTP" host="127.0.0.1" port="6060" root="root" lib="">

View File

@@ -95,6 +95,7 @@ public abstract class NodeServer {
this.serverClassLoader = new RedkaleClassLoader(application.getServerClassLoader()); this.serverClassLoader = new RedkaleClassLoader(application.getServerClassLoader());
Thread.currentThread().setContextClassLoader(this.serverClassLoader); Thread.currentThread().setContextClassLoader(this.serverClassLoader);
this.serverThread = Thread.currentThread(); this.serverThread = Thread.currentThread();
this.server.setServerClassLoader(serverClassLoader);
} }
public static <T extends NodeServer> NodeServer create(Class<T> clazz, Application application, AnyValue serconf) { public static <T extends NodeServer> NodeServer create(Class<T> clazz, Application application, AnyValue serconf) {

View File

@@ -223,641 +223,6 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
return future; return future;
} }
static class AsyncNIOTCPConnection extends AsyncConnection {
private int readTimeoutSeconds;
private int writeTimeoutSeconds;
private final Selector selector;
private SelectionKey key;
private final SocketChannel channel;
private final SocketAddress remoteAddress;
ByteBuffer readBuffer;
Object readAttachment;
CompletionHandler readHandler;
ByteBuffer writeOneBuffer;
ByteBuffer[] writeBuffers;
int writingCount;
int writeOffset;
int writeLength;
Object writeAttachment;
CompletionHandler writeHandler;
public AsyncNIOTCPConnection(final SocketChannel ch, SocketAddress addr0,
final Selector selector,
final int readTimeoutSeconds0, final int writeTimeoutSeconds0,
final AtomicLong livingCounter, final AtomicLong closedCounter) {
this.channel = ch;
this.selector = selector;
this.readTimeoutSeconds = readTimeoutSeconds0;
this.writeTimeoutSeconds = writeTimeoutSeconds0;
SocketAddress addr = addr0;
if (addr == null) {
try {
addr = ch.getRemoteAddress();
} catch (Exception e) {
//do nothing
}
}
this.remoteAddress = addr;
this.livingCounter = livingCounter;
this.closedCounter = closedCounter;
}
@Override
public void setReadTimeoutSeconds(int readTimeoutSeconds) {
this.readTimeoutSeconds = readTimeoutSeconds;
}
@Override
public void setWriteTimeoutSeconds(int writeTimeoutSeconds) {
this.writeTimeoutSeconds = writeTimeoutSeconds;
}
@Override
public int getReadTimeoutSeconds() {
return this.readTimeoutSeconds;
}
@Override
public int getWriteTimeoutSeconds() {
return this.writeTimeoutSeconds;
}
@Override
public final SocketAddress getRemoteAddress() {
return remoteAddress;
}
@Override
public SocketAddress getLocalAddress() {
try {
return channel.getLocalAddress();
} catch (IOException e) {
return null;
}
}
CompletionHandler removeReadHandler() {
CompletionHandler handler = this.readHandler;
this.readHandler = null;
return handler;
}
ByteBuffer removeReadBuffer() {
ByteBuffer buffer = this.readBuffer;
this.readBuffer = null;
return buffer;
}
Object removeReadAttachment() {
Object attach = this.readAttachment;
this.readAttachment = null;
return attach;
}
void completeRead(int rs) {
Object attach = this.readAttachment;
CompletionHandler handler = this.readHandler;
this.readBuffer = null;
this.readAttachment = null;
this.readHandler = null;
handler.completed(rs, attach);
}
void faileRead(Throwable t) {
Object attach = this.readAttachment;
CompletionHandler handler = this.readHandler;
this.readBuffer = null;
this.readAttachment = null;
this.readHandler = null;
handler.failed(t, attach);
}
CompletionHandler removeWriteHandler() {
CompletionHandler handler = this.writeHandler;
this.writeHandler = null;
return handler;
}
ByteBuffer removeWriteOneBuffer() {
ByteBuffer buffer = this.writeOneBuffer;
this.writeOneBuffer = null;
return buffer;
}
ByteBuffer[] removeWriteBuffers() {
ByteBuffer[] buffers = this.writeBuffers;
this.writeBuffers = null;
return buffers;
}
int removeWritingCount() {
int rs = this.writingCount;
this.writingCount = 0;
return rs;
}
int removeWriteOffset() {
int rs = this.writeOffset;
this.writeOffset = 0;
return rs;
}
int removeWriteLength() {
int rs = this.writeLength;
this.writeLength = 0;
return rs;
}
Object removeWriteAttachment() {
Object attach = this.writeAttachment;
this.writeAttachment = null;
return attach;
}
void completeWrite(int rs) {
Object attach = this.writeAttachment;
CompletionHandler handler = this.writeHandler;
this.writeOneBuffer = null;
this.writeBuffers = null;
this.writeOffset = 0;
this.writeLength = 0;
this.writeAttachment = null;
this.writeHandler = null;
handler.completed(rs, attach);
}
void faileWrite(Throwable t) {
Object attach = this.writeAttachment;
CompletionHandler handler = this.writeHandler;
this.writeOneBuffer = null;
this.writeBuffers = null;
this.writeOffset = 0;
this.writeLength = 0;
this.writeAttachment = null;
this.writeHandler = null;
handler.failed(t, attach);
}
@Override
public <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) {
if (this.readHandler != null) throw new RuntimeException("pending read");
try {
this.readBuffer = dst;
this.readAttachment = attachment;
this.readHandler = handler;
if (key == null) {
key = channel.register(selector, SelectionKey.OP_READ);
key.attach(this);
} else {
key.interestOps(SelectionKey.OP_READ);
}
selector.wakeup();
} catch (Exception e) {
faileRead(e);
}
}
@Override
public <A> void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler<Integer, ? super A> handler) {
read(dst, attachment, handler);
}
@Override
public Future<Integer> read(ByteBuffer dst) {
CompletableFuture future = new CompletableFuture();
read(dst, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
future.complete(result);
}
@Override
public void failed(Throwable exc, Void attachment) {
future.completeExceptionally(exc);
}
});
return future;
}
@Override
public <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) {
if (this.writeHandler != null) throw new RuntimeException("pending write");
try {
this.writeBuffers = srcs;
this.writeOffset = offset;
this.writeLength = length;
this.writingCount = 0;
this.writeAttachment = attachment;
this.writeHandler = handler;
if (key == null) {
key = channel.register(selector, SelectionKey.OP_WRITE);
key.attach(this);
} else {
key.interestOps(SelectionKey.OP_WRITE);
}
selector.wakeup();
} catch (Exception e) {
faileWrite(e);
}
}
@Override
public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
if (this.writeHandler != null) throw new RuntimeException("pending write");
try {
this.writeOneBuffer = src;
this.writingCount = 0;
this.writeAttachment = attachment;
this.writeHandler = handler;
if (key == null) {
key = channel.register(selector, SelectionKey.OP_WRITE);
key.attach(this);
} else {
key.interestOps(SelectionKey.OP_WRITE);
}
selector.wakeup();
} catch (Exception e) {
faileWrite(e);
}
}
@Override
public Future<Integer> write(ByteBuffer src) {
CompletableFuture future = new CompletableFuture();
write(src, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
future.complete(result);
}
@Override
public void failed(Throwable exc, Void attachment) {
future.completeExceptionally(exc);
}
});
return future;
}
@Override
public final void close() throws IOException {
super.close();
channel.close();
key.cancel();
}
@Override
public final boolean isOpen() {
return channel.isOpen();
}
@Override
public final boolean isTCP() {
return true;
}
}
public static AsyncConnection create(final SocketChannel ch, SocketAddress addr, final Selector selector,
final int readTimeoutSeconds0, final int writeTimeoutSeconds0) {
return new AsyncNIOTCPConnection(ch, addr, selector, readTimeoutSeconds0, writeTimeoutSeconds0, null, null);
}
public static AsyncConnection create(final SocketChannel ch, final SocketAddress addr0, final Selector selector, final Context context) {
return new AsyncNIOTCPConnection(ch, addr0, selector, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null);
}
public static AsyncConnection create(final SocketChannel ch, SocketAddress addr, final Selector selector,
final int readTimeoutSeconds0, final int writeTimeoutSeconds0,
final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new AsyncNIOTCPConnection(ch, addr, selector, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter);
}
private static class AsyncBIOUDPConnection extends AsyncConnection {
private int readTimeoutSeconds;
private int writeTimeoutSeconds;
private final DatagramChannel channel;
private final SocketAddress remoteAddress;
private final boolean client;
public AsyncBIOUDPConnection(final DatagramChannel ch, SocketAddress addr0,
final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0,
final AtomicLong livingCounter, final AtomicLong closedCounter) {
this.channel = ch;
this.client = client0;
this.readTimeoutSeconds = readTimeoutSeconds0;
this.writeTimeoutSeconds = writeTimeoutSeconds0;
SocketAddress addr = addr0;
if (addr == null) {
try {
addr = ch.getRemoteAddress();
} catch (Exception e) {
//do nothing
}
}
this.remoteAddress = addr;
this.livingCounter = livingCounter;
this.closedCounter = closedCounter;
}
@Override
public void setReadTimeoutSeconds(int readTimeoutSeconds) {
this.readTimeoutSeconds = readTimeoutSeconds;
}
@Override
public void setWriteTimeoutSeconds(int writeTimeoutSeconds) {
this.writeTimeoutSeconds = writeTimeoutSeconds;
}
@Override
public int getReadTimeoutSeconds() {
return this.readTimeoutSeconds;
}
@Override
public int getWriteTimeoutSeconds() {
return this.writeTimeoutSeconds;
}
@Override
public final SocketAddress getRemoteAddress() {
return remoteAddress;
}
@Override
public SocketAddress getLocalAddress() {
try {
return channel.getLocalAddress();
} catch (IOException e) {
return null;
}
}
@Override
public <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) {
try {
int rs = 0;
for (int i = offset; i < offset + length; i++) {
rs += channel.send(srcs[i], remoteAddress);
if (i != offset) Thread.sleep(10);
}
this.writetime = System.currentTimeMillis();
if (handler != null) handler.completed(rs, attachment);
} catch (Exception e) {
if (handler != null) handler.failed(e, attachment);
}
}
@Override
public <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) {
try {
int rs = channel.read(dst);
this.readtime = System.currentTimeMillis();
if (handler != null) handler.completed(rs, attachment);
} catch (IOException e) {
if (handler != null) handler.failed(e, attachment);
}
}
@Override
public <A> void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler<Integer, ? super A> handler) {
read(dst, attachment, handler);
}
@Override
public Future<Integer> read(ByteBuffer dst) {
try {
int rs = channel.read(dst);
this.readtime = System.currentTimeMillis();
return CompletableFuture.completedFuture(rs);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
try {
int rs = channel.send(src, remoteAddress);
this.writetime = System.currentTimeMillis();
if (handler != null) handler.completed(rs, attachment);
} catch (IOException e) {
if (handler != null) handler.failed(e, attachment);
}
}
@Override
public Future<Integer> write(ByteBuffer src) {
try {
int rs = channel.send(src, remoteAddress);
this.writetime = System.currentTimeMillis();
return CompletableFuture.completedFuture(rs);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public final void close() throws IOException {
super.close();
if (client) channel.close();
}
@Override
public final boolean isOpen() {
return channel.isOpen();
}
@Override
public final boolean isTCP() {
return false;
}
}
public static AsyncConnection create(final DatagramChannel ch, SocketAddress addr,
final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0) {
return new AsyncBIOUDPConnection(ch, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, null, null);
}
public static AsyncConnection create(final DatagramChannel ch, SocketAddress addr,
final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0,
final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new AsyncBIOUDPConnection(ch, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter);
}
private static class AsyncBIOTCPConnection extends AsyncConnection {
private int readTimeoutSeconds;
private int writeTimeoutSeconds;
private final Socket socket;
private final ReadableByteChannel readChannel;
private final WritableByteChannel writeChannel;
private final SocketAddress remoteAddress;
public AsyncBIOTCPConnection(final Socket socket, final SocketAddress addr0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0,
final AtomicLong livingCounter, final AtomicLong closedCounter) {
this.socket = socket;
ReadableByteChannel rc = null;
WritableByteChannel wc = null;
try {
socket.setSoTimeout(Math.max(readTimeoutSeconds0, writeTimeoutSeconds0));
rc = Channels.newChannel(socket.getInputStream());
wc = Channels.newChannel(socket.getOutputStream());
} catch (IOException e) {
e.printStackTrace();
}
this.readChannel = rc;
this.writeChannel = wc;
this.readTimeoutSeconds = readTimeoutSeconds0;
this.writeTimeoutSeconds = writeTimeoutSeconds0;
SocketAddress addr = addr0;
if (addr == null) {
try {
addr = socket.getRemoteSocketAddress();
} catch (Exception e) {
//do nothing
}
}
this.remoteAddress = addr;
this.livingCounter = livingCounter;
this.closedCounter = closedCounter;
}
@Override
public boolean isTCP() {
return true;
}
@Override
public SocketAddress getRemoteAddress() {
return remoteAddress;
}
@Override
public SocketAddress getLocalAddress() {
return socket.getLocalSocketAddress();
}
@Override
public int getReadTimeoutSeconds() {
return readTimeoutSeconds;
}
@Override
public int getWriteTimeoutSeconds() {
return writeTimeoutSeconds;
}
@Override
public void setReadTimeoutSeconds(int readTimeoutSeconds) {
this.readTimeoutSeconds = readTimeoutSeconds;
}
@Override
public void setWriteTimeoutSeconds(int writeTimeoutSeconds) {
this.writeTimeoutSeconds = writeTimeoutSeconds;
}
@Override
public <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) {
try {
int rs = 0;
for (int i = offset; i < offset + length; i++) {
rs += writeChannel.write(srcs[i]);
}
this.writetime = System.currentTimeMillis();
if (handler != null) handler.completed(rs, attachment);
} catch (IOException e) {
if (handler != null) handler.failed(e, attachment);
}
}
@Override
public <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) {
try {
int rs = readChannel.read(dst);
this.readtime = System.currentTimeMillis();
if (handler != null) handler.completed(rs, attachment);
} catch (IOException e) {
if (handler != null) handler.failed(e, attachment);
}
}
@Override
public <A> void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler<Integer, ? super A> handler) {
read(dst, attachment, handler);
}
@Override
public Future<Integer> read(ByteBuffer dst) {
try {
int rs = readChannel.read(dst);
this.readtime = System.currentTimeMillis();
return CompletableFuture.completedFuture(rs);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
try {
int rs = writeChannel.write(src);
this.writetime = System.currentTimeMillis();
if (handler != null) handler.completed(rs, attachment);
} catch (IOException e) {
if (handler != null) handler.failed(e, attachment);
}
}
@Override
public Future<Integer> write(ByteBuffer src) {
try {
int rs = writeChannel.write(src);
this.writetime = System.currentTimeMillis();
return CompletableFuture.completedFuture(rs);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void close() throws IOException {
super.close();
this.socket.close();
}
@Override
public boolean isOpen() {
return !socket.isClosed();
}
}
/** /**
* 通常用于 ssl socket * 通常用于 ssl socket
* *
@@ -870,149 +235,38 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
} }
public static AsyncConnection create(final Socket socket, final SocketAddress addr0, final int readTimeoutSecond0, final int writeTimeoutSecond0) { public static AsyncConnection create(final Socket socket, final SocketAddress addr0, final int readTimeoutSecond0, final int writeTimeoutSecond0) {
return new AsyncBIOTCPConnection(socket, addr0, readTimeoutSecond0, writeTimeoutSecond0, null, null); return new TcpBioAsyncConnection(socket, addr0, readTimeoutSecond0, writeTimeoutSecond0, null, null);
} }
public static AsyncConnection create(final Socket socket, final SocketAddress addr0, final int readTimeoutSecond0, public static AsyncConnection create(final Socket socket, final SocketAddress addr0, final int readTimeoutSecond0,
final int writeTimeoutSecond0, final AtomicLong livingCounter, final AtomicLong closedCounter) { final int writeTimeoutSecond0, final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new AsyncBIOTCPConnection(socket, addr0, readTimeoutSecond0, writeTimeoutSecond0, livingCounter, closedCounter); return new TcpBioAsyncConnection(socket, addr0, readTimeoutSecond0, writeTimeoutSecond0, livingCounter, closedCounter);
} }
private static class AsyncAIOTCPConnection extends AsyncConnection { public static AsyncConnection create(final SocketChannel ch, SocketAddress addr, final Selector selector,
final int readTimeoutSeconds0, final int writeTimeoutSeconds0) {
return new TcpNioAsyncConnection(ch, addr, selector, readTimeoutSeconds0, writeTimeoutSeconds0, null, null);
}
private int readTimeoutSeconds; public static AsyncConnection create(final SocketChannel ch, final SocketAddress addr0, final Selector selector, final Context context) {
return new TcpNioAsyncConnection(ch, addr0, selector, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null);
}
private int writeTimeoutSeconds; public static AsyncConnection create(final SocketChannel ch, SocketAddress addr, final Selector selector,
final int readTimeoutSeconds0, final int writeTimeoutSeconds0,
final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new TcpNioAsyncConnection(ch, addr, selector, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter);
}
private final AsynchronousSocketChannel channel; public static AsyncConnection create(final DatagramChannel ch, SocketAddress addr,
final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0) {
private final SocketAddress remoteAddress; return new UdpBioAsyncConnection(ch, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, null, null);
}
public AsyncAIOTCPConnection(final AsynchronousSocketChannel ch, SSLContext sslContext,
final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds,
final AtomicLong livingCounter, final AtomicLong closedCounter) {
this.channel = ch;
this.sslContext = sslContext;
this.readTimeoutSeconds = readTimeoutSeconds;
this.writeTimeoutSeconds = writeTimeoutSeconds;
SocketAddress addr = addr0;
if (addr == null) {
try {
addr = ch.getRemoteAddress();
} catch (Exception e) {
//do nothing
}
}
this.remoteAddress = addr;
this.livingCounter = livingCounter;
this.closedCounter = closedCounter;
}
@Override
public <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) {
this.readtime = System.currentTimeMillis();
if (readTimeoutSeconds > 0) {
channel.read(dst, readTimeoutSeconds, TimeUnit.SECONDS, attachment, handler);
} else {
channel.read(dst, attachment, handler);
}
}
@Override
public <A> void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler<Integer, ? super A> handler) {
this.readtime = System.currentTimeMillis();
channel.read(dst, timeout < 0 ? 0 : timeout, unit, attachment, handler);
}
@Override
public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
this.writetime = System.currentTimeMillis();
if (writeTimeoutSeconds > 0) {
channel.write(src, writeTimeoutSeconds, TimeUnit.SECONDS, attachment, handler);
} else {
channel.write(src, attachment, handler);
}
}
@Override
public <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, final CompletionHandler<Integer, ? super A> handler) {
this.writetime = System.currentTimeMillis();
channel.write(srcs, offset, length, writeTimeoutSeconds > 0 ? writeTimeoutSeconds : 60, TimeUnit.SECONDS,
attachment, new CompletionHandler<Long, A>() {
@Override
public void completed(Long result, A attachment) {
handler.completed(result.intValue(), attachment);
}
@Override
public void failed(Throwable exc, A attachment) {
handler.failed(exc, attachment);
}
});
}
@Override
public void setReadTimeoutSeconds(int readTimeoutSeconds) {
this.readTimeoutSeconds = readTimeoutSeconds;
}
@Override
public void setWriteTimeoutSeconds(int writeTimeoutSeconds) {
this.writeTimeoutSeconds = writeTimeoutSeconds;
}
@Override
public int getReadTimeoutSeconds() {
return this.readTimeoutSeconds;
}
@Override
public int getWriteTimeoutSeconds() {
return this.writeTimeoutSeconds;
}
@Override
public final SocketAddress getRemoteAddress() {
return remoteAddress;
}
@Override
public SocketAddress getLocalAddress() {
try {
return channel.getLocalAddress();
} catch (IOException e) {
return null;
}
}
@Override
public final Future<Integer> read(ByteBuffer dst) {
return channel.read(dst);
}
@Override
public final Future<Integer> write(ByteBuffer src) {
return channel.write(src);
}
@Override
public final void close() throws IOException {
super.close();
channel.close();
}
@Override
public final boolean isOpen() {
return channel.isOpen();
}
@Override
public final boolean isTCP() {
return true;
}
public static AsyncConnection create(final DatagramChannel ch, SocketAddress addr,
final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0,
final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new UdpBioAsyncConnection(ch, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter);
} }
public static AsyncConnection create(final AsynchronousSocketChannel ch) { public static AsyncConnection create(final AsynchronousSocketChannel ch) {
@@ -1020,29 +274,29 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
} }
public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) { public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
return new AsyncAIOTCPConnection(ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null); return new TcpAioAsyncConnection(ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null);
} }
public static AsyncConnection create(final AsynchronousSocketChannel ch, SSLContext sslContext, final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) { public static AsyncConnection create(final AsynchronousSocketChannel ch, SSLContext sslContext, final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
return new AsyncAIOTCPConnection(ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null); return new TcpAioAsyncConnection(ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null);
} }
public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, final Context context) { public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, final Context context) {
return new AsyncAIOTCPConnection(ch, context.sslContext, addr0, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null); return new TcpAioAsyncConnection(ch, context.sslContext, addr0, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null);
} }
public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, final int readTimeoutSeconds, public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, final int readTimeoutSeconds,
final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) { final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new AsyncAIOTCPConnection(ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter); return new TcpAioAsyncConnection(ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter);
} }
public static AsyncConnection create(final AsynchronousSocketChannel ch, SSLContext sslContext, final SocketAddress addr0, final int readTimeoutSeconds, public static AsyncConnection create(final AsynchronousSocketChannel ch, SSLContext sslContext, final SocketAddress addr0, final int readTimeoutSeconds,
final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) { final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new AsyncAIOTCPConnection(ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter); return new TcpAioAsyncConnection(ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter);
} }
public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0,
final Context context, final AtomicLong livingCounter, final AtomicLong closedCounter) { final Context context, final AtomicLong livingCounter, final AtomicLong closedCounter) {
return new AsyncAIOTCPConnection(ch, context.sslContext, addr0, context.readTimeoutSeconds, context.writeTimeoutSeconds, livingCounter, closedCounter); return new TcpAioAsyncConnection(ch, context.sslContext, addr0, context.readTimeoutSeconds, context.writeTimeoutSeconds, livingCounter, closedCounter);
} }
} }

View File

@@ -7,12 +7,9 @@ package org.redkale.net;
import java.io.IOException; import java.io.IOException;
import java.net.*; import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*; import java.nio.channels.*;
import java.util.*; import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.redkale.net.AsyncConnection.AsyncNIOTCPConnection;
import org.redkale.util.AnyValue; import org.redkale.util.AnyValue;
/** /**
@@ -87,10 +84,32 @@ public abstract class ProtocolServer {
} }
//--------------------------------------------------------------------- //---------------------------------------------------------------------
public static ProtocolServer create(String protocol, Context context) { public static ProtocolServer create(String protocol, Context context, ClassLoader classLoader, String netimpl) {
if ("TCP".equalsIgnoreCase(protocol)) return new ProtocolAIOTCPServer(context); if (netimpl != null) netimpl = netimpl.trim();
if ("UDP".equalsIgnoreCase(protocol)) return new ProtocolBIOUDPServer(context); if ("TCP".equalsIgnoreCase(protocol)) {
throw new RuntimeException("ProtocolServer not support protocol " + protocol); if (netimpl == null || netimpl.isEmpty()) {
return new TcpAioProtocolServer(context);
} else if ("aio".equalsIgnoreCase(netimpl)) {
return new TcpAioProtocolServer(context);
} else if ("nio".equalsIgnoreCase(netimpl)) {
return new TcpNioProtocolServer(context);
}
} else if ("UDP".equalsIgnoreCase(protocol)) {
if (netimpl == null || netimpl.isEmpty()) {
return new UdpBioProtocolServer(context);
} else if ("bio".equalsIgnoreCase(netimpl)) {
return new UdpBioProtocolServer(context);
}
} else {
throw new RuntimeException("ProtocolServer not support protocol " + protocol);
}
try {
if (classLoader == null) classLoader = Thread.currentThread().getContextClassLoader();
Class clazz = classLoader.loadClass(netimpl);
return (ProtocolServer) clazz.getDeclaredConstructor(Context.class).newInstance(context);
} catch (Exception e) {
throw new RuntimeException("ProtocolServer(netimple=" + netimpl + ") newinstance error", e);
}
} }
public static boolean supportTcpNoDelay() { public static boolean supportTcpNoDelay() {
@@ -101,512 +120,4 @@ public abstract class ProtocolServer {
return supportTcpKeepAlive; return supportTcpKeepAlive;
} }
static final class ProtocolBIOUDPServer extends ProtocolServer {
private boolean running;
private DatagramChannel serverChannel;
public ProtocolBIOUDPServer(Context context) {
super(context);
}
@Override
public void open(AnyValue config) throws IOException {
DatagramChannel ch = DatagramChannel.open();
ch.configureBlocking(true);
this.serverChannel = ch;
final Set<SocketOption<?>> options = this.serverChannel.supportedOptions();
if (options.contains(StandardSocketOptions.TCP_NODELAY)) {
this.serverChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
}
if (options.contains(StandardSocketOptions.SO_KEEPALIVE)) {
this.serverChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
}
if (options.contains(StandardSocketOptions.SO_REUSEADDR)) {
this.serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
}
if (options.contains(StandardSocketOptions.SO_RCVBUF)) {
this.serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
}
if (options.contains(StandardSocketOptions.SO_SNDBUF)) {
this.serverChannel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024);
}
}
@Override
public void bind(SocketAddress local, int backlog) throws IOException {
this.serverChannel.bind(local);
}
@Override
public <T> void setOption(SocketOption<T> name, T value) throws IOException {
this.serverChannel.setOption(name, value);
}
@Override
public <T> Set<SocketOption<?>> supportedOptions() {
return this.serverChannel.supportedOptions();
}
@Override
public void accept() throws IOException {
final DatagramChannel serchannel = this.serverChannel;
final int readTimeoutSeconds = this.context.readTimeoutSeconds;
final int writeTimeoutSeconds = this.context.writeTimeoutSeconds;
final CountDownLatch cdl = new CountDownLatch(1);
this.running = true;
new Thread() {
@Override
public void run() {
cdl.countDown();
while (running) {
final ByteBuffer buffer = context.pollBuffer();
try {
SocketAddress address = serchannel.receive(buffer);
buffer.flip();
AsyncConnection conn = AsyncConnection.create(serchannel, address, false, readTimeoutSeconds, writeTimeoutSeconds);
context.runAsync(new PrepareRunner(context, conn, buffer, null));
} catch (Exception e) {
context.offerBuffer(buffer);
}
}
}
}.start();
try {
cdl.await();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void close() throws IOException {
this.running = false;
this.serverChannel.close();
}
@Override
public long getCreateCount() {
return -1;
}
@Override
public long getClosedCount() {
return -1;
}
@Override
public long getLivingCount() {
return -1;
}
}
static final class ProtocolAIOTCPServer extends ProtocolServer {
private AsynchronousChannelGroup group;
private AsynchronousServerSocketChannel serverChannel;
public ProtocolAIOTCPServer(Context context) {
super(context);
}
@Override
public void open(AnyValue config) throws IOException {
group = AsynchronousChannelGroup.withCachedThreadPool(context.executor, 1);
this.serverChannel = AsynchronousServerSocketChannel.open(group);
final Set<SocketOption<?>> options = this.serverChannel.supportedOptions();
if (options.contains(StandardSocketOptions.TCP_NODELAY)) {
this.serverChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
}
if (options.contains(StandardSocketOptions.SO_KEEPALIVE)) {
this.serverChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
}
if (options.contains(StandardSocketOptions.SO_REUSEADDR)) {
this.serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
}
if (options.contains(StandardSocketOptions.SO_RCVBUF)) {
this.serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
}
if (options.contains(StandardSocketOptions.SO_SNDBUF)) {
this.serverChannel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024);
}
}
@Override
public void bind(SocketAddress local, int backlog) throws IOException {
this.serverChannel.bind(local, backlog);
}
@Override
public <T> void setOption(SocketOption<T> name, T value) throws IOException {
this.serverChannel.setOption(name, value);
}
@Override
public <T> Set<SocketOption<?>> supportedOptions() {
return this.serverChannel.supportedOptions();
}
@Override
public void accept() throws IOException {
final AsynchronousServerSocketChannel serchannel = this.serverChannel;
serchannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
private boolean supportInited;
private boolean supportTcpLay;
private boolean supportAlive;
private boolean supportReuse;
private boolean supportRcv;
private boolean supportSnd;
@Override
public void completed(final AsynchronousSocketChannel channel, Void attachment) {
serchannel.accept(null, this);
if (maxconns > 0 && livingCounter.get() >= maxconns) {
try {
channel.close();
} catch (Exception e) {
}
return;
}
createCounter.incrementAndGet();
livingCounter.incrementAndGet();
AsyncConnection conn = AsyncConnection.create(channel, null, context);
conn.livingCounter = livingCounter;
conn.closedCounter = closedCounter;
try {
if (!supportInited) {
synchronized (this) {
if (!supportInited) {
supportInited = true;
final Set<SocketOption<?>> options = channel.supportedOptions();
supportTcpLay = options.contains(StandardSocketOptions.TCP_NODELAY);
supportAlive = options.contains(StandardSocketOptions.SO_KEEPALIVE);
supportReuse = options.contains(StandardSocketOptions.SO_REUSEADDR);
supportRcv = options.contains(StandardSocketOptions.SO_RCVBUF);
supportSnd = options.contains(StandardSocketOptions.SO_SNDBUF);
}
}
}
if (supportTcpLay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
if (supportAlive) channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
if (supportReuse) channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
if (supportRcv) channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
if (supportSnd) channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024);
} catch (IOException e) {
e.printStackTrace();
}
context.runAsync(new PrepareRunner(context, conn, null, null));
}
@Override
public void failed(Throwable exc, Void attachment) {
serchannel.accept(null, this);
//if (exc != null) context.logger.log(Level.FINEST, AsynchronousServerSocketChannel.class.getSimpleName() + " accept erroneous", exc);
}
});
}
@Override
public void close() throws IOException {
this.serverChannel.close();
}
}
static final class ProtocolNIOTCPServer extends ProtocolServer {
private Selector acceptSelector;
private ServerSocketChannel serverChannel;
private NIOThreadWorker[] workers;
private NIOThreadWorker currWorker;
private boolean running;
public ProtocolNIOTCPServer(Context context) {
super(context);
}
@Override
public void open(AnyValue config) throws IOException {
acceptSelector = Selector.open();
this.serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
ServerSocket socket = serverChannel.socket();
socket.setReceiveBufferSize(16 * 1024);
socket.setReuseAddress(true);
final Set<SocketOption<?>> options = this.serverChannel.supportedOptions();
if (options.contains(StandardSocketOptions.TCP_NODELAY)) {
this.serverChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
}
if (options.contains(StandardSocketOptions.SO_KEEPALIVE)) {
this.serverChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
}
if (options.contains(StandardSocketOptions.SO_REUSEADDR)) {
this.serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
}
if (options.contains(StandardSocketOptions.SO_RCVBUF)) {
this.serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
}
if (options.contains(StandardSocketOptions.SO_SNDBUF)) {
this.serverChannel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024);
}
}
@Override
public void bind(SocketAddress local, int backlog) throws IOException {
this.serverChannel.bind(local, backlog);
}
@Override
public <T> Set<SocketOption<?>> supportedOptions() {
return this.serverChannel.supportedOptions();
}
@Override
public <T> void setOption(SocketOption<T> name, T value) throws IOException {
this.serverChannel.setOption(name, value);
}
@Override
public void accept() throws IOException {
this.serverChannel.register(acceptSelector, SelectionKey.OP_ACCEPT);
final CountDownLatch cdl = new CountDownLatch(1);
this.running = true;
this.workers = new NIOThreadWorker[Runtime.getRuntime().availableProcessors()];
for (int i = 0; i < workers.length; i++) {
workers[i] = new NIOThreadWorker();
workers[i].setDaemon(true);
workers[i].start();
}
for (int i = 0; i < workers.length - 1; i++) { //构成环形
workers[i].next = workers[i + 1];
}
workers[workers.length - 1].next = workers[0];
currWorker = workers[0];
new Thread() {
@Override
public void run() {
cdl.countDown();
while (running) {
try {
acceptSelector.select();
Set<SelectionKey> selectedKeys = acceptSelector.selectedKeys();
synchronized (selectedKeys) {
Iterator<?> iter = selectedKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = (SelectionKey) iter.next();
iter.remove();
if (key.isAcceptable()) {
try {
SocketChannel channel = ((ServerSocketChannel) key.channel()).accept();
channel.configureBlocking(false);
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024);
createCounter.incrementAndGet();
livingCounter.incrementAndGet();
currWorker.addChannel(channel);
currWorker = currWorker.next;
} catch (IOException io) {
io.printStackTrace();
}
}
}
}
} catch (Throwable t) {
t.printStackTrace();
}
}
}
}.start();
try {
cdl.await();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void close() throws IOException {
if (!this.running) return;
this.running = false;
serverChannel.close();
acceptSelector.close();
for (NIOThreadWorker worker : workers) {
worker.interrupt();
}
}
class NIOThreadWorker extends Thread {
final Selector selector;
NIOThreadWorker next;
public NIOThreadWorker() {
try {
this.selector = Selector.open();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void addChannel(SocketChannel channel) throws IOException {
AsyncConnection conn = AsyncConnection.create(channel, null, this.selector, context);
context.runAsync(new PrepareRunner(context, conn, null, null));
}
@Override
public void run() {
while (running) {
try {
selector.select(50);
} catch (IOException e) {
e.printStackTrace();
}
try {
Set<SelectionKey> selectedKeys = selector.selectedKeys();
synchronized (selectedKeys) {
Iterator<?> iter = selectedKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = (SelectionKey) iter.next();
iter.remove();
processKey(key);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void processKey(SelectionKey key) {
if (key == null || !key.isValid()) return;
SocketChannel socket = (SocketChannel) key.channel();
AsyncNIOTCPConnection conn = (AsyncNIOTCPConnection) key.attachment();
if (!socket.isOpen()) {
if (conn == null) {
key.cancel();
} else {
conn.dispose();
}
return;
}
if (conn == null) return;
if (key.isWritable()) {
if (conn.writeHandler != null) writeOP(key, socket, conn);
} else if (key.isReadable()) {
if (conn.readHandler != null) readOP(key, socket, conn);
}
}
private void readOP(SelectionKey key, SocketChannel socket, AsyncNIOTCPConnection conn) {
final CompletionHandler handler = conn.removeReadHandler();
final ByteBuffer buffer = conn.removeReadBuffer();
final Object attach = conn.removeReadAttachment();
//System.out.println(conn + "------readbuf:" + buffer + "-------handler:" + handler);
if (handler == null || buffer == null) return;
try {
final int rs = socket.read(buffer);
{ //测试
buffer.flip();
byte[] bs = new byte[buffer.remaining()];
buffer.get(bs);
//System.out.println(conn + "------readbuf:" + buffer + "-------handler:" + handler + "-------读内容: " + new String(bs));
}
//System.out.println(conn + "------readbuf:" + buffer + "-------handler:" + handler + "-------read: " + rs);
context.runAsync(() -> {
try {
handler.completed(rs, attach);
} catch (Throwable e) {
handler.failed(e, attach);
}
});
} catch (Throwable t) {
context.runAsync(() -> handler.failed(t, attach));
}
}
private void writeOP(SelectionKey key, SocketChannel socket, AsyncNIOTCPConnection conn) {
final CompletionHandler handler = conn.writeHandler;
final ByteBuffer oneBuffer = conn.removeWriteOneBuffer();
final ByteBuffer[] buffers = conn.removeWriteBuffers();
final Object attach = conn.removeWriteAttachment();
final int writingCount = conn.removeWritingCount();
final int writeOffset = conn.removeWriteOffset();
final int writeLength = conn.removeWriteLength();
if (handler == null || (oneBuffer == null && buffers == null)) return;
//System.out.println(conn + "------buffers:" + Arrays.toString(buffers) + "---onebuf:" + oneBuffer + "-------handler:" + handler);
try {
int rs = 0;
if (oneBuffer == null) {
int offset = writeOffset;
int length = writeLength;
rs = (int) socket.write(buffers, offset, length);
boolean over = true;
int end = offset + length;
for (int i = offset; i < end; i++) {
if (buffers[i].hasRemaining()) {
over = false;
length -= i - offset;
offset = i;
}
}
if (!over) {
conn.writingCount += rs;
conn.writeHandler = handler;
conn.writeAttachment = attach;
conn.writeBuffers = buffers;
conn.writeOffset = offset;
conn.writeLength = length;
key.interestOps(SelectionKey.OP_READ + SelectionKey.OP_WRITE);
key.selector().wakeup();
return;
}
} else {
rs = socket.write(oneBuffer);
if (oneBuffer.hasRemaining()) {
conn.writingCount += rs;
conn.writeHandler = handler;
conn.writeAttachment = attach;
conn.writeOneBuffer = oneBuffer;
key.interestOps(SelectionKey.OP_READ + SelectionKey.OP_WRITE);
key.selector().wakeup();
return;
}
}
conn.removeWriteHandler();
key.interestOps(SelectionKey.OP_READ); //OP_CONNECT
final int rs0 = rs + writingCount;
//System.out.println(conn + "------buffers:" + Arrays.toString(buffers) + "---onebuf:" + oneBuffer + "-------handler:" + handler + "-------write: " + rs);
context.runAsync(() -> {
try {
handler.completed(rs0, attach);
} catch (Throwable e) {
handler.failed(e, attach);
}
});
} catch (Throwable t) {
context.runAsync(() -> handler.failed(t, attach));
}
}
}
}
} }

View File

@@ -54,6 +54,9 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
//服务的根Servlet //服务的根Servlet
protected final PrepareServlet<K, C, R, P, S> prepare; protected final PrepareServlet<K, C, R, P, S> prepare;
//ClassLoader
protected RedkaleClassLoader serverClassLoader;
//SSL //SSL
protected SSLContext sslContext; protected SSLContext sslContext;
@@ -274,7 +277,7 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
public void start() throws IOException { public void start() throws IOException {
this.context = this.createContext(); this.context = this.createContext();
this.prepare.init(this.context, config); this.prepare.init(this.context, config);
this.serverChannel = ProtocolServer.create(this.protocol, context); this.serverChannel = ProtocolServer.create(this.protocol, context, this.serverClassLoader, config == null ? null : config.getValue("netimpl"));
this.serverChannel.open(config); this.serverChannel.open(config);
serverChannel.bind(address, backlog); serverChannel.bind(address, backlog);
serverChannel.accept(); serverChannel.accept();
@@ -299,6 +302,14 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
logger.info(this.getClass().getSimpleName() + " shutdown in " + e + " ms"); logger.info(this.getClass().getSimpleName() + " shutdown in " + e + " ms");
} }
public RedkaleClassLoader getServerClassLoader() {
return serverClassLoader;
}
public void setServerClassLoader(RedkaleClassLoader serverClassLoader) {
this.serverClassLoader = serverClassLoader;
}
/** /**
* 判断是否存在Filter * 判断是否存在Filter
* *

View File

@@ -0,0 +1,158 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.net;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.ssl.SSLContext;
/**
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
public class TcpAioAsyncConnection extends AsyncConnection {
private int readTimeoutSeconds;
private int writeTimeoutSeconds;
private final AsynchronousSocketChannel channel;
private final SocketAddress remoteAddress;
public TcpAioAsyncConnection(final AsynchronousSocketChannel ch, SSLContext sslContext,
final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds,
final AtomicLong livingCounter, final AtomicLong closedCounter) {
this.channel = ch;
this.sslContext = sslContext;
this.readTimeoutSeconds = readTimeoutSeconds;
this.writeTimeoutSeconds = writeTimeoutSeconds;
SocketAddress addr = addr0;
if (addr == null) {
try {
addr = ch.getRemoteAddress();
} catch (Exception e) {
//do nothing
}
}
this.remoteAddress = addr;
this.livingCounter = livingCounter;
this.closedCounter = closedCounter;
}
@Override
public <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) {
this.readtime = System.currentTimeMillis();
if (readTimeoutSeconds > 0) {
channel.read(dst, readTimeoutSeconds, TimeUnit.SECONDS, attachment, handler);
} else {
channel.read(dst, attachment, handler);
}
}
@Override
public <A> void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler<Integer, ? super A> handler) {
this.readtime = System.currentTimeMillis();
channel.read(dst, timeout < 0 ? 0 : timeout, unit, attachment, handler);
}
@Override
public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
this.writetime = System.currentTimeMillis();
if (writeTimeoutSeconds > 0) {
channel.write(src, writeTimeoutSeconds, TimeUnit.SECONDS, attachment, handler);
} else {
channel.write(src, attachment, handler);
}
}
@Override
public <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, final CompletionHandler<Integer, ? super A> handler) {
this.writetime = System.currentTimeMillis();
channel.write(srcs, offset, length, writeTimeoutSeconds > 0 ? writeTimeoutSeconds : 60, TimeUnit.SECONDS,
attachment, new CompletionHandler<Long, A>() {
@Override
public void completed(Long result, A attachment) {
handler.completed(result.intValue(), attachment);
}
@Override
public void failed(Throwable exc, A attachment) {
handler.failed(exc, attachment);
}
});
}
@Override
public void setReadTimeoutSeconds(int readTimeoutSeconds) {
this.readTimeoutSeconds = readTimeoutSeconds;
}
@Override
public void setWriteTimeoutSeconds(int writeTimeoutSeconds) {
this.writeTimeoutSeconds = writeTimeoutSeconds;
}
@Override
public int getReadTimeoutSeconds() {
return this.readTimeoutSeconds;
}
@Override
public int getWriteTimeoutSeconds() {
return this.writeTimeoutSeconds;
}
@Override
public final SocketAddress getRemoteAddress() {
return remoteAddress;
}
@Override
public SocketAddress getLocalAddress() {
try {
return channel.getLocalAddress();
} catch (IOException e) {
return null;
}
}
@Override
public final Future<Integer> read(ByteBuffer dst) {
return channel.read(dst);
}
@Override
public final Future<Integer> write(ByteBuffer src) {
return channel.write(src);
}
@Override
public final void close() throws IOException {
super.close();
channel.close();
}
@Override
public final boolean isOpen() {
return channel.isOpen();
}
@Override
public final boolean isTCP() {
return true;
}
}

View File

@@ -0,0 +1,140 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.net;
import java.io.IOException;
import java.net.*;
import java.nio.channels.*;
import java.util.Set;
import org.redkale.util.AnyValue;
/**
* 协议底层Server
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
public class TcpAioProtocolServer extends ProtocolServer {
private AsynchronousChannelGroup group;
private AsynchronousServerSocketChannel serverChannel;
public TcpAioProtocolServer(Context context) {
super(context);
}
@Override
public void open(AnyValue config) throws IOException {
group = AsynchronousChannelGroup.withCachedThreadPool(context.executor, 1);
this.serverChannel = AsynchronousServerSocketChannel.open(group);
final Set<SocketOption<?>> options = this.serverChannel.supportedOptions();
if (options.contains(StandardSocketOptions.TCP_NODELAY)) {
this.serverChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
}
if (options.contains(StandardSocketOptions.SO_KEEPALIVE)) {
this.serverChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
}
if (options.contains(StandardSocketOptions.SO_REUSEADDR)) {
this.serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
}
if (options.contains(StandardSocketOptions.SO_RCVBUF)) {
this.serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
}
if (options.contains(StandardSocketOptions.SO_SNDBUF)) {
this.serverChannel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024);
}
}
@Override
public void bind(SocketAddress local, int backlog) throws IOException {
this.serverChannel.bind(local, backlog);
}
@Override
public <T> void setOption(SocketOption<T> name, T value) throws IOException {
this.serverChannel.setOption(name, value);
}
@Override
public <T> Set<SocketOption<?>> supportedOptions() {
return this.serverChannel.supportedOptions();
}
@Override
public void accept() throws IOException {
final AsynchronousServerSocketChannel serchannel = this.serverChannel;
serchannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
private boolean supportInited;
private boolean supportTcpLay;
private boolean supportAlive;
private boolean supportReuse;
private boolean supportRcv;
private boolean supportSnd;
@Override
public void completed(final AsynchronousSocketChannel channel, Void attachment) {
serchannel.accept(null, this);
if (maxconns > 0 && livingCounter.get() >= maxconns) {
try {
channel.close();
} catch (Exception e) {
}
return;
}
createCounter.incrementAndGet();
livingCounter.incrementAndGet();
try {
if (!supportInited) {
synchronized (this) {
if (!supportInited) {
supportInited = true;
final Set<SocketOption<?>> options = channel.supportedOptions();
supportTcpLay = options.contains(StandardSocketOptions.TCP_NODELAY);
supportAlive = options.contains(StandardSocketOptions.SO_KEEPALIVE);
supportReuse = options.contains(StandardSocketOptions.SO_REUSEADDR);
supportRcv = options.contains(StandardSocketOptions.SO_RCVBUF);
supportSnd = options.contains(StandardSocketOptions.SO_SNDBUF);
}
}
}
if (supportTcpLay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
if (supportAlive) channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
if (supportReuse) channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
if (supportRcv) channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
if (supportSnd) channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024);
} catch (IOException e) {
e.printStackTrace();
}
AsyncConnection conn = new TcpAioAsyncConnection(channel, context.sslContext, null, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null);
conn.livingCounter = livingCounter;
conn.closedCounter = closedCounter;
context.runAsync(new PrepareRunner(context, conn, null, null));
}
@Override
public void failed(Throwable exc, Void attachment) {
serchannel.accept(null, this);
//if (exc != null) context.logger.log(Level.FINEST, AsynchronousServerSocketChannel.class.getSimpleName() + " accept erroneous", exc);
}
});
}
@Override
public void close() throws IOException {
this.serverChannel.close();
}
}

View File

@@ -0,0 +1,173 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.net;
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
/**
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
public class TcpBioAsyncConnection extends AsyncConnection {
private int readTimeoutSeconds;
private int writeTimeoutSeconds;
private final Socket socket;
private final ReadableByteChannel readChannel;
private final WritableByteChannel writeChannel;
private final SocketAddress remoteAddress;
public TcpBioAsyncConnection(final Socket socket, final SocketAddress addr0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0,
final AtomicLong livingCounter, final AtomicLong closedCounter) {
this.socket = socket;
ReadableByteChannel rc = null;
WritableByteChannel wc = null;
try {
socket.setSoTimeout(Math.max(readTimeoutSeconds0, writeTimeoutSeconds0));
rc = Channels.newChannel(socket.getInputStream());
wc = Channels.newChannel(socket.getOutputStream());
} catch (IOException e) {
e.printStackTrace();
}
this.readChannel = rc;
this.writeChannel = wc;
this.readTimeoutSeconds = readTimeoutSeconds0;
this.writeTimeoutSeconds = writeTimeoutSeconds0;
SocketAddress addr = addr0;
if (addr == null) {
try {
addr = socket.getRemoteSocketAddress();
} catch (Exception e) {
//do nothing
}
}
this.remoteAddress = addr;
this.livingCounter = livingCounter;
this.closedCounter = closedCounter;
}
@Override
public boolean isTCP() {
return true;
}
@Override
public SocketAddress getRemoteAddress() {
return remoteAddress;
}
@Override
public SocketAddress getLocalAddress() {
return socket.getLocalSocketAddress();
}
@Override
public int getReadTimeoutSeconds() {
return readTimeoutSeconds;
}
@Override
public int getWriteTimeoutSeconds() {
return writeTimeoutSeconds;
}
@Override
public void setReadTimeoutSeconds(int readTimeoutSeconds) {
this.readTimeoutSeconds = readTimeoutSeconds;
}
@Override
public void setWriteTimeoutSeconds(int writeTimeoutSeconds) {
this.writeTimeoutSeconds = writeTimeoutSeconds;
}
@Override
public <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) {
try {
int rs = 0;
for (int i = offset; i < offset + length; i++) {
rs += writeChannel.write(srcs[i]);
}
this.writetime = System.currentTimeMillis();
if (handler != null) handler.completed(rs, attachment);
} catch (IOException e) {
if (handler != null) handler.failed(e, attachment);
}
}
@Override
public <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) {
try {
int rs = readChannel.read(dst);
this.readtime = System.currentTimeMillis();
if (handler != null) handler.completed(rs, attachment);
} catch (IOException e) {
if (handler != null) handler.failed(e, attachment);
}
}
@Override
public <A> void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler<Integer, ? super A> handler) {
read(dst, attachment, handler);
}
@Override
public Future<Integer> read(ByteBuffer dst) {
try {
int rs = readChannel.read(dst);
this.readtime = System.currentTimeMillis();
return CompletableFuture.completedFuture(rs);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
try {
int rs = writeChannel.write(src);
this.writetime = System.currentTimeMillis();
if (handler != null) handler.completed(rs, attachment);
} catch (IOException e) {
if (handler != null) handler.failed(e, attachment);
}
}
@Override
public Future<Integer> write(ByteBuffer src) {
try {
int rs = writeChannel.write(src);
this.writetime = System.currentTimeMillis();
return CompletableFuture.completedFuture(rs);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void close() throws IOException {
super.close();
this.socket.close();
}
@Override
public boolean isOpen() {
return !socket.isClosed();
}
}

View File

@@ -0,0 +1,329 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.net;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
/**
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
public class TcpNioAsyncConnection extends AsyncConnection {
private int readTimeoutSeconds;
private int writeTimeoutSeconds;
private final Selector selector;
private SelectionKey key;
private final SocketChannel channel;
private final SocketAddress remoteAddress;
ByteBuffer readBuffer;
Object readAttachment;
CompletionHandler readHandler;
ByteBuffer writeOneBuffer;
ByteBuffer[] writeBuffers;
int writingCount;
int writeOffset;
int writeLength;
Object writeAttachment;
CompletionHandler writeHandler;
public TcpNioAsyncConnection(final SocketChannel ch, SocketAddress addr0,
final Selector selector,
final int readTimeoutSeconds0, final int writeTimeoutSeconds0,
final AtomicLong livingCounter, final AtomicLong closedCounter) {
this.channel = ch;
this.selector = selector;
this.readTimeoutSeconds = readTimeoutSeconds0;
this.writeTimeoutSeconds = writeTimeoutSeconds0;
SocketAddress addr = addr0;
if (addr == null) {
try {
addr = ch.getRemoteAddress();
} catch (Exception e) {
//do nothing
}
}
this.remoteAddress = addr;
this.livingCounter = livingCounter;
this.closedCounter = closedCounter;
}
@Override
public void setReadTimeoutSeconds(int readTimeoutSeconds) {
this.readTimeoutSeconds = readTimeoutSeconds;
}
@Override
public void setWriteTimeoutSeconds(int writeTimeoutSeconds) {
this.writeTimeoutSeconds = writeTimeoutSeconds;
}
@Override
public int getReadTimeoutSeconds() {
return this.readTimeoutSeconds;
}
@Override
public int getWriteTimeoutSeconds() {
return this.writeTimeoutSeconds;
}
@Override
public final SocketAddress getRemoteAddress() {
return remoteAddress;
}
@Override
public SocketAddress getLocalAddress() {
try {
return channel.getLocalAddress();
} catch (IOException e) {
return null;
}
}
CompletionHandler removeReadHandler() {
CompletionHandler handler = this.readHandler;
this.readHandler = null;
return handler;
}
ByteBuffer removeReadBuffer() {
ByteBuffer buffer = this.readBuffer;
this.readBuffer = null;
return buffer;
}
Object removeReadAttachment() {
Object attach = this.readAttachment;
this.readAttachment = null;
return attach;
}
void completeRead(int rs) {
Object attach = this.readAttachment;
CompletionHandler handler = this.readHandler;
this.readBuffer = null;
this.readAttachment = null;
this.readHandler = null;
handler.completed(rs, attach);
}
void faileRead(Throwable t) {
Object attach = this.readAttachment;
CompletionHandler handler = this.readHandler;
this.readBuffer = null;
this.readAttachment = null;
this.readHandler = null;
handler.failed(t, attach);
}
CompletionHandler removeWriteHandler() {
CompletionHandler handler = this.writeHandler;
this.writeHandler = null;
return handler;
}
ByteBuffer removeWriteOneBuffer() {
ByteBuffer buffer = this.writeOneBuffer;
this.writeOneBuffer = null;
return buffer;
}
ByteBuffer[] removeWriteBuffers() {
ByteBuffer[] buffers = this.writeBuffers;
this.writeBuffers = null;
return buffers;
}
int removeWritingCount() {
int rs = this.writingCount;
this.writingCount = 0;
return rs;
}
int removeWriteOffset() {
int rs = this.writeOffset;
this.writeOffset = 0;
return rs;
}
int removeWriteLength() {
int rs = this.writeLength;
this.writeLength = 0;
return rs;
}
Object removeWriteAttachment() {
Object attach = this.writeAttachment;
this.writeAttachment = null;
return attach;
}
void completeWrite(int rs) {
Object attach = this.writeAttachment;
CompletionHandler handler = this.writeHandler;
this.writeOneBuffer = null;
this.writeBuffers = null;
this.writeOffset = 0;
this.writeLength = 0;
this.writeAttachment = null;
this.writeHandler = null;
handler.completed(rs, attach);
}
void faileWrite(Throwable t) {
Object attach = this.writeAttachment;
CompletionHandler handler = this.writeHandler;
this.writeOneBuffer = null;
this.writeBuffers = null;
this.writeOffset = 0;
this.writeLength = 0;
this.writeAttachment = null;
this.writeHandler = null;
handler.failed(t, attach);
}
@Override
public <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) {
if (this.readHandler != null) throw new RuntimeException("pending read");
try {
this.readBuffer = dst;
this.readAttachment = attachment;
this.readHandler = handler;
if (key == null) {
key = channel.register(selector, SelectionKey.OP_READ);
key.attach(this);
} else {
key.interestOps(SelectionKey.OP_READ);
}
selector.wakeup();
} catch (Exception e) {
faileRead(e);
}
}
@Override
public <A> void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler<Integer, ? super A> handler) {
read(dst, attachment, handler);
}
@Override
public Future<Integer> read(ByteBuffer dst) {
CompletableFuture future = new CompletableFuture();
read(dst, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
future.complete(result);
}
@Override
public void failed(Throwable exc, Void attachment) {
future.completeExceptionally(exc);
}
});
return future;
}
@Override
public <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) {
if (this.writeHandler != null) throw new RuntimeException("pending write");
try {
this.writeBuffers = srcs;
this.writeOffset = offset;
this.writeLength = length;
this.writingCount = 0;
this.writeAttachment = attachment;
this.writeHandler = handler;
if (key == null) {
key = channel.register(selector, SelectionKey.OP_WRITE);
key.attach(this);
} else {
key.interestOps(SelectionKey.OP_WRITE);
}
selector.wakeup();
} catch (Exception e) {
faileWrite(e);
}
}
@Override
public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
if (this.writeHandler != null) throw new RuntimeException("pending write");
try {
this.writeOneBuffer = src;
this.writingCount = 0;
this.writeAttachment = attachment;
this.writeHandler = handler;
if (key == null) {
key = channel.register(selector, SelectionKey.OP_WRITE);
key.attach(this);
} else {
key.interestOps(SelectionKey.OP_WRITE);
}
selector.wakeup();
} catch (Exception e) {
faileWrite(e);
}
}
@Override
public Future<Integer> write(ByteBuffer src) {
CompletableFuture future = new CompletableFuture();
write(src, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
future.complete(result);
}
@Override
public void failed(Throwable exc, Void attachment) {
future.completeExceptionally(exc);
}
});
return future;
}
@Override
public final void close() throws IOException {
super.close();
channel.close();
key.cancel();
}
@Override
public final boolean isOpen() {
return channel.isOpen();
}
@Override
public final boolean isTCP() {
return true;
}
}

View File

@@ -0,0 +1,309 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.net;
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import org.redkale.util.AnyValue;
/**
* 协议底层Server
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
public class TcpNioProtocolServer extends ProtocolServer {
private Selector acceptSelector;
private ServerSocketChannel serverChannel;
private NIOThreadWorker[] workers;
private NIOThreadWorker currWorker;
private boolean running;
public TcpNioProtocolServer(Context context) {
super(context);
}
@Override
public void open(AnyValue config) throws IOException {
acceptSelector = Selector.open();
this.serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
ServerSocket socket = serverChannel.socket();
socket.setReceiveBufferSize(16 * 1024);
socket.setReuseAddress(true);
final Set<SocketOption<?>> options = this.serverChannel.supportedOptions();
if (options.contains(StandardSocketOptions.TCP_NODELAY)) {
this.serverChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
}
if (options.contains(StandardSocketOptions.SO_KEEPALIVE)) {
this.serverChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
}
if (options.contains(StandardSocketOptions.SO_REUSEADDR)) {
this.serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
}
if (options.contains(StandardSocketOptions.SO_RCVBUF)) {
this.serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
}
if (options.contains(StandardSocketOptions.SO_SNDBUF)) {
this.serverChannel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024);
}
}
@Override
public void bind(SocketAddress local, int backlog) throws IOException {
this.serverChannel.bind(local, backlog);
}
@Override
public <T> Set<SocketOption<?>> supportedOptions() {
return this.serverChannel.supportedOptions();
}
@Override
public <T> void setOption(SocketOption<T> name, T value) throws IOException {
this.serverChannel.setOption(name, value);
}
@Override
public void accept() throws IOException {
this.serverChannel.register(acceptSelector, SelectionKey.OP_ACCEPT);
final CountDownLatch cdl = new CountDownLatch(1);
this.running = true;
this.workers = new NIOThreadWorker[Runtime.getRuntime().availableProcessors()];
for (int i = 0; i < workers.length; i++) {
workers[i] = new NIOThreadWorker();
workers[i].setDaemon(true);
workers[i].start();
}
for (int i = 0; i < workers.length - 1; i++) { //构成环形
workers[i].next = workers[i + 1];
}
workers[workers.length - 1].next = workers[0];
currWorker = workers[0];
new Thread() {
@Override
public void run() {
cdl.countDown();
while (running) {
try {
acceptSelector.select();
Set<SelectionKey> selectedKeys = acceptSelector.selectedKeys();
synchronized (selectedKeys) {
Iterator<?> iter = selectedKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = (SelectionKey) iter.next();
iter.remove();
if (key.isAcceptable()) {
try {
SocketChannel channel = ((ServerSocketChannel) key.channel()).accept();
channel.configureBlocking(false);
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024);
createCounter.incrementAndGet();
livingCounter.incrementAndGet();
currWorker.addChannel(channel);
currWorker = currWorker.next;
} catch (IOException io) {
io.printStackTrace();
}
}
}
}
} catch (Throwable t) {
t.printStackTrace();
}
}
}
}.start();
try {
cdl.await();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void close() throws IOException {
if (!this.running) return;
this.running = false;
serverChannel.close();
acceptSelector.close();
for (NIOThreadWorker worker : workers) {
worker.interrupt();
}
}
class NIOThreadWorker extends Thread {
final Selector selector;
NIOThreadWorker next;
public NIOThreadWorker() {
try {
this.selector = Selector.open();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void addChannel(SocketChannel channel) throws IOException {
AsyncConnection conn = new TcpNioAsyncConnection(channel, null, selector, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null);
context.runAsync(new PrepareRunner(context, conn, null, null));
}
@Override
public void run() {
while (running) {
try {
selector.select(50);
} catch (IOException e) {
e.printStackTrace();
}
try {
Set<SelectionKey> selectedKeys = selector.selectedKeys();
synchronized (selectedKeys) {
Iterator<?> iter = selectedKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = (SelectionKey) iter.next();
iter.remove();
processKey(key);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void processKey(SelectionKey key) {
if (key == null || !key.isValid()) return;
SocketChannel socket = (SocketChannel) key.channel();
TcpNioAsyncConnection conn = (TcpNioAsyncConnection) key.attachment();
if (!socket.isOpen()) {
if (conn == null) {
key.cancel();
} else {
conn.dispose();
}
return;
}
if (conn == null) return;
if (key.isWritable()) {
if (conn.writeHandler != null) writeOP(key, socket, conn);
} else if (key.isReadable()) {
if (conn.readHandler != null) readOP(key, socket, conn);
}
}
private void readOP(SelectionKey key, SocketChannel socket, TcpNioAsyncConnection conn) {
final CompletionHandler handler = conn.removeReadHandler();
final ByteBuffer buffer = conn.removeReadBuffer();
final Object attach = conn.removeReadAttachment();
//System.out.println(conn + "------readbuf:" + buffer + "-------handler:" + handler);
if (handler == null || buffer == null) return;
try {
final int rs = socket.read(buffer);
{ //测试
buffer.flip();
byte[] bs = new byte[buffer.remaining()];
buffer.get(bs);
//System.out.println(conn + "------readbuf:" + buffer + "-------handler:" + handler + "-------读内容: " + new String(bs));
}
//System.out.println(conn + "------readbuf:" + buffer + "-------handler:" + handler + "-------read: " + rs);
context.runAsync(() -> {
try {
handler.completed(rs, attach);
} catch (Throwable e) {
handler.failed(e, attach);
}
});
} catch (Throwable t) {
context.runAsync(() -> handler.failed(t, attach));
}
}
private void writeOP(SelectionKey key, SocketChannel socket, TcpNioAsyncConnection conn) {
final CompletionHandler handler = conn.writeHandler;
final ByteBuffer oneBuffer = conn.removeWriteOneBuffer();
final ByteBuffer[] buffers = conn.removeWriteBuffers();
final Object attach = conn.removeWriteAttachment();
final int writingCount = conn.removeWritingCount();
final int writeOffset = conn.removeWriteOffset();
final int writeLength = conn.removeWriteLength();
if (handler == null || (oneBuffer == null && buffers == null)) return;
//System.out.println(conn + "------buffers:" + Arrays.toString(buffers) + "---onebuf:" + oneBuffer + "-------handler:" + handler);
try {
int rs = 0;
if (oneBuffer == null) {
int offset = writeOffset;
int length = writeLength;
rs = (int) socket.write(buffers, offset, length);
boolean over = true;
int end = offset + length;
for (int i = offset; i < end; i++) {
if (buffers[i].hasRemaining()) {
over = false;
length -= i - offset;
offset = i;
}
}
if (!over) {
conn.writingCount += rs;
conn.writeHandler = handler;
conn.writeAttachment = attach;
conn.writeBuffers = buffers;
conn.writeOffset = offset;
conn.writeLength = length;
key.interestOps(SelectionKey.OP_READ + SelectionKey.OP_WRITE);
key.selector().wakeup();
return;
}
} else {
rs = socket.write(oneBuffer);
if (oneBuffer.hasRemaining()) {
conn.writingCount += rs;
conn.writeHandler = handler;
conn.writeAttachment = attach;
conn.writeOneBuffer = oneBuffer;
key.interestOps(SelectionKey.OP_READ + SelectionKey.OP_WRITE);
key.selector().wakeup();
return;
}
}
conn.removeWriteHandler();
key.interestOps(SelectionKey.OP_READ); //OP_CONNECT
final int rs0 = rs + writingCount;
//System.out.println(conn + "------buffers:" + Arrays.toString(buffers) + "---onebuf:" + oneBuffer + "-------handler:" + handler + "-------write: " + rs);
context.runAsync(() -> {
try {
handler.completed(rs0, attach);
} catch (Throwable e) {
handler.failed(e, attach);
}
});
} catch (Throwable t) {
context.runAsync(() -> handler.failed(t, attach));
}
}
}
}

View File

@@ -0,0 +1,167 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.net;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
/**
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
public class UdpBioAsyncConnection extends AsyncConnection {
private int readTimeoutSeconds;
private int writeTimeoutSeconds;
private final DatagramChannel channel;
private final SocketAddress remoteAddress;
private final boolean client;
public UdpBioAsyncConnection(final DatagramChannel ch, SocketAddress addr0,
final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0,
final AtomicLong livingCounter, final AtomicLong closedCounter) {
this.channel = ch;
this.client = client0;
this.readTimeoutSeconds = readTimeoutSeconds0;
this.writeTimeoutSeconds = writeTimeoutSeconds0;
SocketAddress addr = addr0;
if (addr == null) {
try {
addr = ch.getRemoteAddress();
} catch (Exception e) {
//do nothing
}
}
this.remoteAddress = addr;
this.livingCounter = livingCounter;
this.closedCounter = closedCounter;
}
@Override
public void setReadTimeoutSeconds(int readTimeoutSeconds) {
this.readTimeoutSeconds = readTimeoutSeconds;
}
@Override
public void setWriteTimeoutSeconds(int writeTimeoutSeconds) {
this.writeTimeoutSeconds = writeTimeoutSeconds;
}
@Override
public int getReadTimeoutSeconds() {
return this.readTimeoutSeconds;
}
@Override
public int getWriteTimeoutSeconds() {
return this.writeTimeoutSeconds;
}
@Override
public final SocketAddress getRemoteAddress() {
return remoteAddress;
}
@Override
public SocketAddress getLocalAddress() {
try {
return channel.getLocalAddress();
} catch (IOException e) {
return null;
}
}
@Override
public <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) {
try {
int rs = 0;
for (int i = offset; i < offset + length; i++) {
rs += channel.send(srcs[i], remoteAddress);
if (i != offset) Thread.sleep(10);
}
this.writetime = System.currentTimeMillis();
if (handler != null) handler.completed(rs, attachment);
} catch (Exception e) {
if (handler != null) handler.failed(e, attachment);
}
}
@Override
public <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) {
try {
int rs = channel.read(dst);
this.readtime = System.currentTimeMillis();
if (handler != null) handler.completed(rs, attachment);
} catch (IOException e) {
if (handler != null) handler.failed(e, attachment);
}
}
@Override
public <A> void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler<Integer, ? super A> handler) {
read(dst, attachment, handler);
}
@Override
public Future<Integer> read(ByteBuffer dst) {
try {
int rs = channel.read(dst);
this.readtime = System.currentTimeMillis();
return CompletableFuture.completedFuture(rs);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
try {
int rs = channel.send(src, remoteAddress);
this.writetime = System.currentTimeMillis();
if (handler != null) handler.completed(rs, attachment);
} catch (IOException e) {
if (handler != null) handler.failed(e, attachment);
}
}
@Override
public Future<Integer> write(ByteBuffer src) {
try {
int rs = channel.send(src, remoteAddress);
this.writetime = System.currentTimeMillis();
return CompletableFuture.completedFuture(rs);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public final void close() throws IOException {
super.close();
if (client) channel.close();
}
@Override
public final boolean isOpen() {
return channel.isOpen();
}
@Override
public final boolean isTCP() {
return false;
}
}

View File

@@ -0,0 +1,123 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.net;
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.redkale.util.AnyValue;
/**
* 协议底层Server
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
public class UdpBioProtocolServer extends ProtocolServer {
private boolean running;
private DatagramChannel serverChannel;
public UdpBioProtocolServer(Context context) {
super(context);
}
@Override
public void open(AnyValue config) throws IOException {
DatagramChannel ch = DatagramChannel.open();
ch.configureBlocking(true);
this.serverChannel = ch;
final Set<SocketOption<?>> options = this.serverChannel.supportedOptions();
if (options.contains(StandardSocketOptions.TCP_NODELAY)) {
this.serverChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
}
if (options.contains(StandardSocketOptions.SO_KEEPALIVE)) {
this.serverChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
}
if (options.contains(StandardSocketOptions.SO_REUSEADDR)) {
this.serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
}
if (options.contains(StandardSocketOptions.SO_RCVBUF)) {
this.serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
}
if (options.contains(StandardSocketOptions.SO_SNDBUF)) {
this.serverChannel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024);
}
}
@Override
public void bind(SocketAddress local, int backlog) throws IOException {
this.serverChannel.bind(local);
}
@Override
public <T> void setOption(SocketOption<T> name, T value) throws IOException {
this.serverChannel.setOption(name, value);
}
@Override
public <T> Set<SocketOption<?>> supportedOptions() {
return this.serverChannel.supportedOptions();
}
@Override
public void accept() throws IOException {
final DatagramChannel serchannel = this.serverChannel;
final int readTimeoutSeconds = this.context.readTimeoutSeconds;
final int writeTimeoutSeconds = this.context.writeTimeoutSeconds;
final CountDownLatch cdl = new CountDownLatch(1);
this.running = true;
new Thread() {
@Override
public void run() {
cdl.countDown();
while (running) {
final ByteBuffer buffer = context.pollBuffer();
try {
SocketAddress address = serchannel.receive(buffer);
buffer.flip();
AsyncConnection conn = new UdpBioAsyncConnection(serchannel, address, false, readTimeoutSeconds, writeTimeoutSeconds, null, null);
context.runAsync(new PrepareRunner(context, conn, buffer, null));
} catch (Exception e) {
context.offerBuffer(buffer);
}
}
}
}.start();
try {
cdl.await();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void close() throws IOException {
this.running = false;
this.serverChannel.close();
}
@Override
public long getCreateCount() {
return -1;
}
@Override
public long getClosedCount() {
return -1;
}
@Override
public long getLivingCount() {
return -1;
}
}