From bb454700783eb335e9ff7dbcd09f1c623804bd3a Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Mon, 7 May 2018 17:03:39 +0800 Subject: [PATCH] --- src/org/redkale/net/AsyncConnection.java | 20 +++- src/org/redkale/net/ProtocolServer.java | 135 ++++++++++++++++++++--- 2 files changed, 136 insertions(+), 19 deletions(-) diff --git a/src/org/redkale/net/AsyncConnection.java b/src/org/redkale/net/AsyncConnection.java index 11e819c55..241213873 100644 --- a/src/org/redkale/net/AsyncConnection.java +++ b/src/org/redkale/net/AsyncConnection.java @@ -229,12 +229,20 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl private final SocketAddress remoteAddress; - public NIOTCPAsyncConnection(final SocketChannel ch, SocketAddress addr, + public NIOTCPAsyncConnection(final SocketChannel ch, SocketAddress addr0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0, final AtomicLong livingCounter, final AtomicLong closedCounter) { this.channel = ch; this.readTimeoutSeconds = readTimeoutSeconds0; this.writeTimeoutSeconds = writeTimeoutSeconds0; + SocketAddress addr = addr0; + if (addr == null) { + try { + addr = ch.getRemoteAddress(); + } catch (Exception e) { + //do nothing + } + } this.remoteAddress = addr; this.livingCounter = livingCounter; this.closedCounter = closedCounter; @@ -374,13 +382,21 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl private final boolean client; - public BIOUDPAsyncConnection(final DatagramChannel ch, SocketAddress addr, + public BIOUDPAsyncConnection(final DatagramChannel ch, SocketAddress addr0, final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0, final AtomicLong livingCounter, final AtomicLong closedCounter) { this.channel = ch; this.client = client0; this.readTimeoutSeconds = readTimeoutSeconds0; this.writeTimeoutSeconds = writeTimeoutSeconds0; + SocketAddress addr = addr0; + if (addr == null) { + try { + addr = ch.getRemoteAddress(); + } catch (Exception e) { + //do nothing + } + } this.remoteAddress = addr; this.livingCounter = livingCounter; this.closedCounter = closedCounter; diff --git a/src/org/redkale/net/ProtocolServer.java b/src/org/redkale/net/ProtocolServer.java index ccb7c0b2d..1698fdfba 100644 --- a/src/org/redkale/net/ProtocolServer.java +++ b/src/org/redkale/net/ProtocolServer.java @@ -62,7 +62,7 @@ public abstract class ProtocolServer { public abstract void setOption(SocketOption name, T value) throws IOException; - public abstract void accept(); + public abstract void accept() throws IOException; public void setMaxconns(int maxconns) { this.maxconns = maxconns; @@ -70,8 +70,6 @@ public abstract class ProtocolServer { public abstract void close() throws IOException; - public abstract AsynchronousChannelGroup getChannelGroup(); - public long getCreateCount() { return createCounter.longValue(); } @@ -86,7 +84,7 @@ public abstract class ProtocolServer { //--------------------------------------------------------------------- public static ProtocolServer create(String protocol, Context context) { - if ("TCP".equalsIgnoreCase(protocol)) return new ProtocolTCPServer(context); + if ("TCP".equalsIgnoreCase(protocol)) return new ProtocolAIOTCPServer(context); if ("UDP".equalsIgnoreCase(protocol)) return new ProtocolUDPServer(context); throw new RuntimeException("ProtocolServer not support protocol " + protocol); } @@ -134,7 +132,7 @@ public abstract class ProtocolServer { } @Override - public void accept() { + public void accept() throws IOException { final DatagramChannel serchannel = this.serverChannel; final int readTimeoutSeconds = this.context.readTimeoutSeconds; final int writeTimeoutSeconds = this.context.writeTimeoutSeconds; @@ -170,11 +168,6 @@ public abstract class ProtocolServer { this.serverChannel.close(); } - @Override - public AsynchronousChannelGroup getChannelGroup() { - return null; - } - @Override public long getCreateCount() { return -1; @@ -191,7 +184,117 @@ public abstract class ProtocolServer { } } - private static final class ProtocolTCPServer extends ProtocolServer { + private static final class ProtocolNIOTCPServer extends ProtocolServer { + + private final Context context; + + private Selector acceptSelector; + + private Selector readSelector; + + private Selector writeSelector; + + private ServerSocketChannel serverChannel; + + private boolean running; + + public ProtocolNIOTCPServer(Context context) { + this.context = context; + } + + @Override + public void open(AnyValue config) throws IOException { + acceptSelector = Selector.open(); + readSelector = Selector.open(); + writeSelector = Selector.open(); + this.serverChannel = ServerSocketChannel.open(); + serverChannel.configureBlocking(false); + ServerSocket socket = serverChannel.socket(); + socket.setReceiveBufferSize(16 * 1024); + socket.setReuseAddress(true); + } + + @Override + public void bind(SocketAddress local, int backlog) throws IOException { + this.serverChannel.bind(local, backlog); + } + + @Override + public Set> supportedOptions() { + return this.serverChannel.supportedOptions(); + } + + @Override + public void setOption(SocketOption name, T value) throws IOException { + this.serverChannel.setOption(name, value); + } + + @Override + public void accept() throws IOException { + this.serverChannel.register(acceptSelector, SelectionKey.OP_ACCEPT); + final int readTimeoutSeconds = this.context.readTimeoutSeconds; + final int writeTimeoutSeconds = this.context.writeTimeoutSeconds; + final CountDownLatch cdl = new CountDownLatch(1); + this.running = true; + new Thread() { + @Override + public void run() { + cdl.countDown(); + while (running) { + try { + acceptSelector.select(); + Set selectedKeys = acceptSelector.selectedKeys(); + synchronized (selectedKeys) { + Iterator iter = selectedKeys.iterator(); + while (iter.hasNext()) { + SelectionKey key = (SelectionKey) iter.next(); + iter.remove(); + if (key.isAcceptable()) { + try { + SocketChannel channel = ((ServerSocketChannel) key.channel()).accept(); + channel.setOption(StandardSocketOptions.TCP_NODELAY, true); + channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); + channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); + channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); + channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); + channel.configureBlocking(false); + channel.register(readSelector, SelectionKey.OP_READ); + channel.register(writeSelector, SelectionKey.OP_WRITE); + createCounter.incrementAndGet(); + livingCounter.incrementAndGet(); + AsyncConnection conn = AsyncConnection.create(channel, null, readTimeoutSeconds, writeTimeoutSeconds); + context.runAsync(new PrepareRunner(context, conn, null, null)); + } catch (IOException io) { + io.printStackTrace(); + } + } + } + } + } catch (Throwable t) { + t.printStackTrace(); + } + } + } + }.start(); + try { + cdl.await(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public void close() throws IOException { + this.running = false; + serverChannel.close(); + acceptSelector.close(); + readSelector.close(); + writeSelector.close(); + } + + } + + private static final class ProtocolAIOTCPServer extends ProtocolServer { private final Context context; @@ -199,7 +302,7 @@ public abstract class ProtocolServer { private AsynchronousServerSocketChannel serverChannel; - public ProtocolTCPServer(Context context) { + public ProtocolAIOTCPServer(Context context) { this.context = context; } @@ -225,9 +328,10 @@ public abstract class ProtocolServer { } @Override - public void accept() { + public void accept() throws IOException { final AsynchronousServerSocketChannel serchannel = this.serverChannel; serchannel.accept(null, new CompletionHandler() { + private boolean supportInited; private boolean supportTcpLay; @@ -275,6 +379,7 @@ public abstract class ProtocolServer { if (supportRcv) channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); if (supportSnd) channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); } catch (IOException e) { + e.printStackTrace(); } context.runAsync(new PrepareRunner(context, conn, null, null)); } @@ -292,10 +397,6 @@ public abstract class ProtocolServer { this.serverChannel.close(); } - @Override - public AsynchronousChannelGroup getChannelGroup() { - return this.group; - } } }