This commit is contained in:
Redkale
2018-05-07 17:03:39 +08:00
parent d3c6ab8dc5
commit bb45470078
2 changed files with 136 additions and 19 deletions

View File

@@ -229,12 +229,20 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
private final SocketAddress remoteAddress;
public NIOTCPAsyncConnection(final SocketChannel ch, SocketAddress addr,
public NIOTCPAsyncConnection(final SocketChannel ch, SocketAddress addr0,
final int readTimeoutSeconds0, final int writeTimeoutSeconds0,
final AtomicLong livingCounter, final AtomicLong closedCounter) {
this.channel = ch;
this.readTimeoutSeconds = readTimeoutSeconds0;
this.writeTimeoutSeconds = writeTimeoutSeconds0;
SocketAddress addr = addr0;
if (addr == null) {
try {
addr = ch.getRemoteAddress();
} catch (Exception e) {
//do nothing
}
}
this.remoteAddress = addr;
this.livingCounter = livingCounter;
this.closedCounter = closedCounter;
@@ -374,13 +382,21 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
private final boolean client;
public BIOUDPAsyncConnection(final DatagramChannel ch, SocketAddress addr,
public BIOUDPAsyncConnection(final DatagramChannel ch, SocketAddress addr0,
final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0,
final AtomicLong livingCounter, final AtomicLong closedCounter) {
this.channel = ch;
this.client = client0;
this.readTimeoutSeconds = readTimeoutSeconds0;
this.writeTimeoutSeconds = writeTimeoutSeconds0;
SocketAddress addr = addr0;
if (addr == null) {
try {
addr = ch.getRemoteAddress();
} catch (Exception e) {
//do nothing
}
}
this.remoteAddress = addr;
this.livingCounter = livingCounter;
this.closedCounter = closedCounter;

View File

@@ -62,7 +62,7 @@ public abstract class ProtocolServer {
public abstract <T> void setOption(SocketOption<T> name, T value) throws IOException;
public abstract void accept();
public abstract void accept() throws IOException;
public void setMaxconns(int maxconns) {
this.maxconns = maxconns;
@@ -70,8 +70,6 @@ public abstract class ProtocolServer {
public abstract void close() throws IOException;
public abstract AsynchronousChannelGroup getChannelGroup();
public long getCreateCount() {
return createCounter.longValue();
}
@@ -86,7 +84,7 @@ public abstract class ProtocolServer {
//---------------------------------------------------------------------
public static ProtocolServer create(String protocol, Context context) {
if ("TCP".equalsIgnoreCase(protocol)) return new ProtocolTCPServer(context);
if ("TCP".equalsIgnoreCase(protocol)) return new ProtocolAIOTCPServer(context);
if ("UDP".equalsIgnoreCase(protocol)) return new ProtocolUDPServer(context);
throw new RuntimeException("ProtocolServer not support protocol " + protocol);
}
@@ -134,7 +132,7 @@ public abstract class ProtocolServer {
}
@Override
public void accept() {
public void accept() throws IOException {
final DatagramChannel serchannel = this.serverChannel;
final int readTimeoutSeconds = this.context.readTimeoutSeconds;
final int writeTimeoutSeconds = this.context.writeTimeoutSeconds;
@@ -170,11 +168,6 @@ public abstract class ProtocolServer {
this.serverChannel.close();
}
@Override
public AsynchronousChannelGroup getChannelGroup() {
return null;
}
@Override
public long getCreateCount() {
return -1;
@@ -191,7 +184,117 @@ public abstract class ProtocolServer {
}
}
private static final class ProtocolTCPServer extends ProtocolServer {
private static final class ProtocolNIOTCPServer extends ProtocolServer {
private final Context context;
private Selector acceptSelector;
private Selector readSelector;
private Selector writeSelector;
private ServerSocketChannel serverChannel;
private boolean running;
public ProtocolNIOTCPServer(Context context) {
this.context = context;
}
@Override
public void open(AnyValue config) throws IOException {
acceptSelector = Selector.open();
readSelector = Selector.open();
writeSelector = Selector.open();
this.serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
ServerSocket socket = serverChannel.socket();
socket.setReceiveBufferSize(16 * 1024);
socket.setReuseAddress(true);
}
@Override
public void bind(SocketAddress local, int backlog) throws IOException {
this.serverChannel.bind(local, backlog);
}
@Override
public <T> Set<SocketOption<?>> supportedOptions() {
return this.serverChannel.supportedOptions();
}
@Override
public <T> void setOption(SocketOption<T> name, T value) throws IOException {
this.serverChannel.setOption(name, value);
}
@Override
public void accept() throws IOException {
this.serverChannel.register(acceptSelector, SelectionKey.OP_ACCEPT);
final int readTimeoutSeconds = this.context.readTimeoutSeconds;
final int writeTimeoutSeconds = this.context.writeTimeoutSeconds;
final CountDownLatch cdl = new CountDownLatch(1);
this.running = true;
new Thread() {
@Override
public void run() {
cdl.countDown();
while (running) {
try {
acceptSelector.select();
Set<SelectionKey> selectedKeys = acceptSelector.selectedKeys();
synchronized (selectedKeys) {
Iterator<?> iter = selectedKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = (SelectionKey) iter.next();
iter.remove();
if (key.isAcceptable()) {
try {
SocketChannel channel = ((ServerSocketChannel) key.channel()).accept();
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024);
channel.configureBlocking(false);
channel.register(readSelector, SelectionKey.OP_READ);
channel.register(writeSelector, SelectionKey.OP_WRITE);
createCounter.incrementAndGet();
livingCounter.incrementAndGet();
AsyncConnection conn = AsyncConnection.create(channel, null, readTimeoutSeconds, writeTimeoutSeconds);
context.runAsync(new PrepareRunner(context, conn, null, null));
} catch (IOException io) {
io.printStackTrace();
}
}
}
}
} catch (Throwable t) {
t.printStackTrace();
}
}
}
}.start();
try {
cdl.await();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void close() throws IOException {
this.running = false;
serverChannel.close();
acceptSelector.close();
readSelector.close();
writeSelector.close();
}
}
private static final class ProtocolAIOTCPServer extends ProtocolServer {
private final Context context;
@@ -199,7 +302,7 @@ public abstract class ProtocolServer {
private AsynchronousServerSocketChannel serverChannel;
public ProtocolTCPServer(Context context) {
public ProtocolAIOTCPServer(Context context) {
this.context = context;
}
@@ -225,9 +328,10 @@ public abstract class ProtocolServer {
}
@Override
public void accept() {
public void accept() throws IOException {
final AsynchronousServerSocketChannel serchannel = this.serverChannel;
serchannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
private boolean supportInited;
private boolean supportTcpLay;
@@ -275,6 +379,7 @@ public abstract class ProtocolServer {
if (supportRcv) channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
if (supportSnd) channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024);
} catch (IOException e) {
e.printStackTrace();
}
context.runAsync(new PrepareRunner(context, conn, null, null));
}
@@ -292,10 +397,6 @@ public abstract class ProtocolServer {
this.serverChannel.close();
}
@Override
public AsynchronousChannelGroup getChannelGroup() {
return this.group;
}
}
}