diff --git a/src/org/redkale/net/AsyncConnection.java b/src/org/redkale/net/AsyncConnection.java index 747ec7e51..6fa40e71d 100644 --- a/src/org/redkale/net/AsyncConnection.java +++ b/src/org/redkale/net/AsyncConnection.java @@ -14,7 +14,6 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import javax.net.ssl.SSLContext; -import static org.redkale.net.ProtocolServer.*; /** * @@ -53,6 +52,14 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl public abstract boolean isTCP(); + public abstract boolean shutdownInput(); + + public abstract boolean shutdownOutput(); + + public abstract boolean setOption(SocketOption name, T value); + + public abstract Set> supportedOptions(); + public abstract SocketAddress getRemoteAddress(); public abstract SocketAddress getLocalAddress(); @@ -165,7 +172,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl */ public static CompletableFuture createTCP(final AsynchronousChannelGroup group, final SocketAddress address, final int readTimeoutSeconds, final int writeTimeoutSeconds) { - return createTCP(group, null, address, supportTcpNoDelay(), readTimeoutSeconds, writeTimeoutSeconds); + return createTCP(group, null, address, readTimeoutSeconds, writeTimeoutSeconds); } /** @@ -181,29 +188,12 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl */ public static CompletableFuture createTCP(final AsynchronousChannelGroup group, final SSLContext sslContext, final SocketAddress address, final int readTimeoutSeconds, final int writeTimeoutSeconds) { - return createTCP(group, sslContext, address, false, readTimeoutSeconds, writeTimeoutSeconds); - } - - /** - * 创建TCP协议客户端连接 - * - * @param address 连接点子 - * @param sslContext SSLContext - * @param group 连接AsynchronousChannelGroup - * @param noDelay TcpNoDelay - * @param readTimeoutSeconds 读取超时秒数 - * @param writeTimeoutSeconds 写入超时秒数 - * - * @return 连接CompletableFuture - */ - public static CompletableFuture createTCP(final AsynchronousChannelGroup group, final SSLContext sslContext, - final SocketAddress address, final boolean noDelay, final int readTimeoutSeconds, final int writeTimeoutSeconds) { final CompletableFuture future = new CompletableFuture<>(); try { final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group); try { - if (noDelay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true); - if (supportTcpKeepAlive()) channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); + channel.setOption(StandardSocketOptions.TCP_NODELAY, true); + channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); } catch (IOException e) { } diff --git a/src/org/redkale/net/ProtocolServer.java b/src/org/redkale/net/ProtocolServer.java index 1cf3ff21f..ded78c4e6 100644 --- a/src/org/redkale/net/ProtocolServer.java +++ b/src/org/redkale/net/ProtocolServer.java @@ -7,7 +7,6 @@ package org.redkale.net; import java.io.IOException; import java.net.*; -import java.nio.channels.*; import java.util.*; import java.util.concurrent.atomic.AtomicLong; import org.redkale.util.AnyValue; @@ -22,24 +21,6 @@ import org.redkale.util.AnyValue; */ public abstract class ProtocolServer { - protected static final boolean supportTcpNoDelay; - - protected static final boolean supportTcpKeepAlive; - - static { - boolean tcpNoDelay = false; - boolean keepAlive = false; - try { - AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(); - tcpNoDelay = channel.supportedOptions().contains(StandardSocketOptions.TCP_NODELAY); - keepAlive = channel.supportedOptions().contains(StandardSocketOptions.SO_KEEPALIVE); - channel.close(); - } catch (Exception e) { - } - supportTcpNoDelay = tcpNoDelay; - supportTcpKeepAlive = keepAlive; - } - //创建数 protected final AtomicLong createCounter = new AtomicLong(); @@ -112,12 +93,4 @@ public abstract class ProtocolServer { } } - public static boolean supportTcpNoDelay() { - return supportTcpNoDelay; - } - - public static boolean supportTcpKeepAlive() { - return supportTcpKeepAlive; - } - } diff --git a/src/org/redkale/net/TcpAioAsyncConnection.java b/src/org/redkale/net/TcpAioAsyncConnection.java index 5cdc79efc..bb0458090 100644 --- a/src/org/redkale/net/TcpAioAsyncConnection.java +++ b/src/org/redkale/net/TcpAioAsyncConnection.java @@ -6,9 +6,10 @@ package org.redkale.net; import java.io.IOException; -import java.net.SocketAddress; +import java.net.*; import java.nio.ByteBuffer; import java.nio.channels.*; +import java.util.Set; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import javax.net.ssl.SSLContext; @@ -50,6 +51,41 @@ public class TcpAioAsyncConnection extends AsyncConnection { this.closedCounter = closedCounter; } + @Override + public boolean shutdownInput() { + try { + this.channel.shutdownInput(); + return true; + } catch (IOException e) { + return false; + } + } + + @Override + public boolean shutdownOutput() { + try { + this.channel.shutdownOutput(); + return true; + } catch (IOException e) { + return false; + } + } + + @Override + public boolean setOption(SocketOption name, T value) { + try { + this.channel.setOption(name, value); + return true; + } catch (IOException e) { + return false; + } + } + + @Override + public Set> supportedOptions() { + return this.channel.supportedOptions(); + } + @Override public void read(ByteBuffer dst, A attachment, CompletionHandler handler) { this.readtime = System.currentTimeMillis(); diff --git a/src/org/redkale/net/TcpAioProtocolServer.java b/src/org/redkale/net/TcpAioProtocolServer.java index 4f2eb4fbe..dfd2475e9 100644 --- a/src/org/redkale/net/TcpAioProtocolServer.java +++ b/src/org/redkale/net/TcpAioProtocolServer.java @@ -9,6 +9,7 @@ import java.io.IOException; import java.net.*; import java.nio.channels.*; import java.util.Set; +import java.util.logging.Level; import org.redkale.util.AnyValue; /** @@ -72,18 +73,6 @@ public class TcpAioProtocolServer extends ProtocolServer { final AsynchronousServerSocketChannel serchannel = this.serverChannel; serchannel.accept(null, new CompletionHandler() { - private boolean supportInited; - - private boolean supportTcpLay; - - private boolean supportAlive; - - private boolean supportReuse; - - private boolean supportRcv; - - private boolean supportSnd; - @Override public void completed(final AsynchronousSocketChannel channel, Void attachment) { serchannel.accept(null, this); @@ -97,26 +86,13 @@ public class TcpAioProtocolServer extends ProtocolServer { createCounter.incrementAndGet(); livingCounter.incrementAndGet(); try { - if (!supportInited) { - synchronized (this) { - if (!supportInited) { - supportInited = true; - final Set> options = channel.supportedOptions(); - supportTcpLay = options.contains(StandardSocketOptions.TCP_NODELAY); - supportAlive = options.contains(StandardSocketOptions.SO_KEEPALIVE); - supportReuse = options.contains(StandardSocketOptions.SO_REUSEADDR); - supportRcv = options.contains(StandardSocketOptions.SO_RCVBUF); - supportSnd = options.contains(StandardSocketOptions.SO_SNDBUF); - } - } - } - if (supportTcpLay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true); - if (supportAlive) channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); - if (supportReuse) channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); - if (supportRcv) channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); - if (supportSnd) channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); + channel.setOption(StandardSocketOptions.TCP_NODELAY, true); + channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); + channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); + channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); + channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024); } catch (IOException e) { - e.printStackTrace(); + context.logger.log(Level.INFO, channel + " setOption error", e); } AsyncConnection conn = new TcpAioAsyncConnection(channel, context.sslContext, null, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null); conn.livingCounter = livingCounter; diff --git a/src/org/redkale/net/TcpBioAsyncConnection.java b/src/org/redkale/net/TcpBioAsyncConnection.java index a53f95c7b..1e4766f21 100644 --- a/src/org/redkale/net/TcpBioAsyncConnection.java +++ b/src/org/redkale/net/TcpBioAsyncConnection.java @@ -9,6 +9,7 @@ import java.io.IOException; import java.net.*; import java.nio.ByteBuffer; import java.nio.channels.*; +import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; @@ -21,6 +22,18 @@ import java.util.concurrent.atomic.AtomicLong; */ public class TcpBioAsyncConnection extends AsyncConnection { + static final Set> defaultOptions = defaultOptions(); + + private static Set> defaultOptions() { + HashSet> set = new HashSet<>(5); + set.add(StandardSocketOptions.SO_SNDBUF); + set.add(StandardSocketOptions.SO_RCVBUF); + set.add(StandardSocketOptions.SO_KEEPALIVE); + set.add(StandardSocketOptions.SO_REUSEADDR); + set.add(StandardSocketOptions.TCP_NODELAY); + return Collections.unmodifiableSet(set); + } + private int readTimeoutSeconds; private int writeTimeoutSeconds; @@ -97,6 +110,60 @@ public class TcpBioAsyncConnection extends AsyncConnection { this.writeTimeoutSeconds = writeTimeoutSeconds; } + @Override + public boolean shutdownInput() { + try { + this.socket.shutdownInput(); + return true; + } catch (IOException e) { + return false; + } + } + + @Override + public boolean shutdownOutput() { + try { + this.socket.shutdownOutput(); + return true; + } catch (IOException e) { + return false; + } + } + + @Override + public boolean setOption(SocketOption name, T value) { + try { + if (StandardSocketOptions.SO_REUSEADDR == name) { + this.socket.setReuseAddress((Boolean) value); + return true; + } + if (StandardSocketOptions.SO_KEEPALIVE == name) { + this.socket.setKeepAlive((Boolean) value); + return true; + } + if (StandardSocketOptions.TCP_NODELAY == name) { + this.socket.setTcpNoDelay((Boolean) value); + return true; + } + if (StandardSocketOptions.SO_RCVBUF == name) { + this.socket.setReceiveBufferSize((Integer) value); + return true; + } + if (StandardSocketOptions.SO_SNDBUF == name) { + this.socket.setSendBufferSize((Integer) value); + return true; + } + } catch (IOException e) { + return false; + } + return false; + } + + @Override + public Set> supportedOptions() { + return defaultOptions; + } + @Override public void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler) { try { diff --git a/src/org/redkale/net/TcpNioAsyncConnection.java b/src/org/redkale/net/TcpNioAsyncConnection.java index 943b73987..5f5c776de 100644 --- a/src/org/redkale/net/TcpNioAsyncConnection.java +++ b/src/org/redkale/net/TcpNioAsyncConnection.java @@ -6,9 +6,10 @@ package org.redkale.net; import java.io.IOException; -import java.net.SocketAddress; +import java.net.*; import java.nio.ByteBuffer; import java.nio.channels.*; +import java.util.Set; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; @@ -108,6 +109,41 @@ public class TcpNioAsyncConnection extends AsyncConnection { } } + @Override + public boolean shutdownInput() { + try { + this.channel.shutdownInput(); + return true; + } catch (IOException e) { + return false; + } + } + + @Override + public boolean shutdownOutput() { + try { + this.channel.shutdownOutput(); + return true; + } catch (IOException e) { + return false; + } + } + + @Override + public boolean setOption(SocketOption name, T value) { + try { + this.channel.setOption(name, value); + return true; + } catch (IOException e) { + return false; + } + } + + @Override + public Set> supportedOptions() { + return this.channel.supportedOptions(); + } + CompletionHandler removeReadHandler() { CompletionHandler handler = this.readHandler; this.readHandler = null; diff --git a/src/org/redkale/net/Transport.java b/src/org/redkale/net/Transport.java index 225b247d5..9ec73302f 100644 --- a/src/org/redkale/net/Transport.java +++ b/src/org/redkale/net/Transport.java @@ -18,7 +18,6 @@ import java.util.logging.Level; import javax.net.ssl.SSLContext; import org.redkale.convert.*; import org.redkale.convert.json.JsonConvert; -import static org.redkale.net.ProtocolServer.*; import org.redkale.util.*; /** @@ -221,7 +220,7 @@ public final class Transport { if (!rand) { //指定地址 TransportNode node = findTransportNode(addr); if (node == null) { - return AsyncConnection.createTCP(group, sslContext, addr, supportTcpNoDelay(), factory.readTimeoutSeconds, factory.writeTimeoutSeconds); + return AsyncConnection.createTCP(group, sslContext, addr, factory.readTimeoutSeconds, factory.writeTimeoutSeconds); } final BlockingQueue queue = node.conns; if (!queue.isEmpty()) { @@ -234,7 +233,7 @@ public final class Transport { } } } - return AsyncConnection.createTCP(group, sslContext, addr, supportTcpNoDelay(), factory.readTimeoutSeconds, factory.writeTimeoutSeconds); + return AsyncConnection.createTCP(group, sslContext, addr, factory.readTimeoutSeconds, factory.writeTimeoutSeconds); } //---------------------随机取地址------------------------ @@ -260,8 +259,8 @@ public final class Transport { } CompletableFuture future = new CompletableFuture(); final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group); - if (supportTcpNoDelay()) channel.setOption(StandardSocketOptions.TCP_NODELAY, true); - if (supportTcpKeepAlive()) channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); + channel.setOption(StandardSocketOptions.TCP_NODELAY, true); + channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); channel.connect(one.address, one, new CompletionHandler() { @Override @@ -312,8 +311,8 @@ public final class Transport { if (node == exclude) continue; if (future.isDone()) return future; final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group); - if (supportTcpNoDelay()) channel.setOption(StandardSocketOptions.TCP_NODELAY, true); - if (supportTcpKeepAlive()) channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); + channel.setOption(StandardSocketOptions.TCP_NODELAY, true); + channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); channel.connect(node.address, node, new CompletionHandler() { @Override diff --git a/src/org/redkale/net/UdpBioAsyncConnection.java b/src/org/redkale/net/UdpBioAsyncConnection.java index 32df9898f..54801fe98 100644 --- a/src/org/redkale/net/UdpBioAsyncConnection.java +++ b/src/org/redkale/net/UdpBioAsyncConnection.java @@ -6,9 +6,10 @@ package org.redkale.net; import java.io.IOException; -import java.net.SocketAddress; +import java.net.*; import java.nio.ByteBuffer; import java.nio.channels.*; +import java.util.Set; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; @@ -85,6 +86,31 @@ public class UdpBioAsyncConnection extends AsyncConnection { } } + @Override + public boolean shutdownInput() { + return false; + } + + @Override + public boolean shutdownOutput() { + return false; + } + + @Override + public boolean setOption(SocketOption name, T value) { + try { + this.channel.setOption(name, value); + return true; + } catch (IOException e) { + return false; + } + } + + @Override + public Set> supportedOptions() { + return this.channel.supportedOptions(); + } + @Override public void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler) { try {