This commit is contained in:
@@ -14,7 +14,6 @@ import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Consumer;
|
||||
import javax.net.ssl.SSLContext;
|
||||
import static org.redkale.net.ProtocolServer.*;
|
||||
|
||||
/**
|
||||
*
|
||||
@@ -53,6 +52,14 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
|
||||
|
||||
public abstract boolean isTCP();
|
||||
|
||||
public abstract boolean shutdownInput();
|
||||
|
||||
public abstract boolean shutdownOutput();
|
||||
|
||||
public abstract <T> boolean setOption(SocketOption<T> name, T value);
|
||||
|
||||
public abstract Set<SocketOption<?>> supportedOptions();
|
||||
|
||||
public abstract SocketAddress getRemoteAddress();
|
||||
|
||||
public abstract SocketAddress getLocalAddress();
|
||||
@@ -165,7 +172,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
|
||||
*/
|
||||
public static CompletableFuture<AsyncConnection> createTCP(final AsynchronousChannelGroup group, final SocketAddress address,
|
||||
final int readTimeoutSeconds, final int writeTimeoutSeconds) {
|
||||
return createTCP(group, null, address, supportTcpNoDelay(), readTimeoutSeconds, writeTimeoutSeconds);
|
||||
return createTCP(group, null, address, readTimeoutSeconds, writeTimeoutSeconds);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -181,29 +188,12 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
|
||||
*/
|
||||
public static CompletableFuture<AsyncConnection> createTCP(final AsynchronousChannelGroup group, final SSLContext sslContext,
|
||||
final SocketAddress address, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
|
||||
return createTCP(group, sslContext, address, false, readTimeoutSeconds, writeTimeoutSeconds);
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建TCP协议客户端连接
|
||||
*
|
||||
* @param address 连接点子
|
||||
* @param sslContext SSLContext
|
||||
* @param group 连接AsynchronousChannelGroup
|
||||
* @param noDelay TcpNoDelay
|
||||
* @param readTimeoutSeconds 读取超时秒数
|
||||
* @param writeTimeoutSeconds 写入超时秒数
|
||||
*
|
||||
* @return 连接CompletableFuture
|
||||
*/
|
||||
public static CompletableFuture<AsyncConnection> createTCP(final AsynchronousChannelGroup group, final SSLContext sslContext,
|
||||
final SocketAddress address, final boolean noDelay, final int readTimeoutSeconds, final int writeTimeoutSeconds) {
|
||||
final CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
|
||||
try {
|
||||
final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group);
|
||||
try {
|
||||
if (noDelay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
|
||||
if (supportTcpKeepAlive()) channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
|
||||
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
|
||||
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
|
||||
channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
|
||||
} catch (IOException e) {
|
||||
}
|
||||
|
||||
@@ -7,7 +7,6 @@ package org.redkale.net;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.*;
|
||||
import java.nio.channels.*;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import org.redkale.util.AnyValue;
|
||||
@@ -22,24 +21,6 @@ import org.redkale.util.AnyValue;
|
||||
*/
|
||||
public abstract class ProtocolServer {
|
||||
|
||||
protected static final boolean supportTcpNoDelay;
|
||||
|
||||
protected static final boolean supportTcpKeepAlive;
|
||||
|
||||
static {
|
||||
boolean tcpNoDelay = false;
|
||||
boolean keepAlive = false;
|
||||
try {
|
||||
AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
|
||||
tcpNoDelay = channel.supportedOptions().contains(StandardSocketOptions.TCP_NODELAY);
|
||||
keepAlive = channel.supportedOptions().contains(StandardSocketOptions.SO_KEEPALIVE);
|
||||
channel.close();
|
||||
} catch (Exception e) {
|
||||
}
|
||||
supportTcpNoDelay = tcpNoDelay;
|
||||
supportTcpKeepAlive = keepAlive;
|
||||
}
|
||||
|
||||
//创建数
|
||||
protected final AtomicLong createCounter = new AtomicLong();
|
||||
|
||||
@@ -112,12 +93,4 @@ public abstract class ProtocolServer {
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean supportTcpNoDelay() {
|
||||
return supportTcpNoDelay;
|
||||
}
|
||||
|
||||
public static boolean supportTcpKeepAlive() {
|
||||
return supportTcpKeepAlive;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -6,9 +6,10 @@
|
||||
package org.redkale.net;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.*;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.*;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import javax.net.ssl.SSLContext;
|
||||
@@ -50,6 +51,41 @@ public class TcpAioAsyncConnection extends AsyncConnection {
|
||||
this.closedCounter = closedCounter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shutdownInput() {
|
||||
try {
|
||||
this.channel.shutdownInput();
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shutdownOutput() {
|
||||
try {
|
||||
this.channel.shutdownOutput();
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> boolean setOption(SocketOption<T> name, T value) {
|
||||
try {
|
||||
this.channel.setOption(name, value);
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<SocketOption<?>> supportedOptions() {
|
||||
return this.channel.supportedOptions();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) {
|
||||
this.readtime = System.currentTimeMillis();
|
||||
|
||||
@@ -9,6 +9,7 @@ import java.io.IOException;
|
||||
import java.net.*;
|
||||
import java.nio.channels.*;
|
||||
import java.util.Set;
|
||||
import java.util.logging.Level;
|
||||
import org.redkale.util.AnyValue;
|
||||
|
||||
/**
|
||||
@@ -72,18 +73,6 @@ public class TcpAioProtocolServer extends ProtocolServer {
|
||||
final AsynchronousServerSocketChannel serchannel = this.serverChannel;
|
||||
serchannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
|
||||
|
||||
private boolean supportInited;
|
||||
|
||||
private boolean supportTcpLay;
|
||||
|
||||
private boolean supportAlive;
|
||||
|
||||
private boolean supportReuse;
|
||||
|
||||
private boolean supportRcv;
|
||||
|
||||
private boolean supportSnd;
|
||||
|
||||
@Override
|
||||
public void completed(final AsynchronousSocketChannel channel, Void attachment) {
|
||||
serchannel.accept(null, this);
|
||||
@@ -97,26 +86,13 @@ public class TcpAioProtocolServer extends ProtocolServer {
|
||||
createCounter.incrementAndGet();
|
||||
livingCounter.incrementAndGet();
|
||||
try {
|
||||
if (!supportInited) {
|
||||
synchronized (this) {
|
||||
if (!supportInited) {
|
||||
supportInited = true;
|
||||
final Set<SocketOption<?>> options = channel.supportedOptions();
|
||||
supportTcpLay = options.contains(StandardSocketOptions.TCP_NODELAY);
|
||||
supportAlive = options.contains(StandardSocketOptions.SO_KEEPALIVE);
|
||||
supportReuse = options.contains(StandardSocketOptions.SO_REUSEADDR);
|
||||
supportRcv = options.contains(StandardSocketOptions.SO_RCVBUF);
|
||||
supportSnd = options.contains(StandardSocketOptions.SO_SNDBUF);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (supportTcpLay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
|
||||
if (supportAlive) channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
|
||||
if (supportReuse) channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
|
||||
if (supportRcv) channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
|
||||
if (supportSnd) channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024);
|
||||
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
|
||||
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
|
||||
channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
|
||||
channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
|
||||
channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024);
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
context.logger.log(Level.INFO, channel + " setOption error", e);
|
||||
}
|
||||
AsyncConnection conn = new TcpAioAsyncConnection(channel, context.sslContext, null, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null);
|
||||
conn.livingCounter = livingCounter;
|
||||
|
||||
@@ -9,6 +9,7 @@ import java.io.IOException;
|
||||
import java.net.*;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.*;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
@@ -21,6 +22,18 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||
*/
|
||||
public class TcpBioAsyncConnection extends AsyncConnection {
|
||||
|
||||
static final Set<SocketOption<?>> defaultOptions = defaultOptions();
|
||||
|
||||
private static Set<SocketOption<?>> defaultOptions() {
|
||||
HashSet<SocketOption<?>> set = new HashSet<>(5);
|
||||
set.add(StandardSocketOptions.SO_SNDBUF);
|
||||
set.add(StandardSocketOptions.SO_RCVBUF);
|
||||
set.add(StandardSocketOptions.SO_KEEPALIVE);
|
||||
set.add(StandardSocketOptions.SO_REUSEADDR);
|
||||
set.add(StandardSocketOptions.TCP_NODELAY);
|
||||
return Collections.unmodifiableSet(set);
|
||||
}
|
||||
|
||||
private int readTimeoutSeconds;
|
||||
|
||||
private int writeTimeoutSeconds;
|
||||
@@ -97,6 +110,60 @@ public class TcpBioAsyncConnection extends AsyncConnection {
|
||||
this.writeTimeoutSeconds = writeTimeoutSeconds;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shutdownInput() {
|
||||
try {
|
||||
this.socket.shutdownInput();
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shutdownOutput() {
|
||||
try {
|
||||
this.socket.shutdownOutput();
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> boolean setOption(SocketOption<T> name, T value) {
|
||||
try {
|
||||
if (StandardSocketOptions.SO_REUSEADDR == name) {
|
||||
this.socket.setReuseAddress((Boolean) value);
|
||||
return true;
|
||||
}
|
||||
if (StandardSocketOptions.SO_KEEPALIVE == name) {
|
||||
this.socket.setKeepAlive((Boolean) value);
|
||||
return true;
|
||||
}
|
||||
if (StandardSocketOptions.TCP_NODELAY == name) {
|
||||
this.socket.setTcpNoDelay((Boolean) value);
|
||||
return true;
|
||||
}
|
||||
if (StandardSocketOptions.SO_RCVBUF == name) {
|
||||
this.socket.setReceiveBufferSize((Integer) value);
|
||||
return true;
|
||||
}
|
||||
if (StandardSocketOptions.SO_SNDBUF == name) {
|
||||
this.socket.setSendBufferSize((Integer) value);
|
||||
return true;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
return false;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<SocketOption<?>> supportedOptions() {
|
||||
return defaultOptions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) {
|
||||
try {
|
||||
|
||||
@@ -6,9 +6,10 @@
|
||||
package org.redkale.net;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.*;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.*;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
@@ -108,6 +109,41 @@ public class TcpNioAsyncConnection extends AsyncConnection {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shutdownInput() {
|
||||
try {
|
||||
this.channel.shutdownInput();
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shutdownOutput() {
|
||||
try {
|
||||
this.channel.shutdownOutput();
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> boolean setOption(SocketOption<T> name, T value) {
|
||||
try {
|
||||
this.channel.setOption(name, value);
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<SocketOption<?>> supportedOptions() {
|
||||
return this.channel.supportedOptions();
|
||||
}
|
||||
|
||||
CompletionHandler removeReadHandler() {
|
||||
CompletionHandler handler = this.readHandler;
|
||||
this.readHandler = null;
|
||||
|
||||
@@ -18,7 +18,6 @@ import java.util.logging.Level;
|
||||
import javax.net.ssl.SSLContext;
|
||||
import org.redkale.convert.*;
|
||||
import org.redkale.convert.json.JsonConvert;
|
||||
import static org.redkale.net.ProtocolServer.*;
|
||||
import org.redkale.util.*;
|
||||
|
||||
/**
|
||||
@@ -221,7 +220,7 @@ public final class Transport {
|
||||
if (!rand) { //指定地址
|
||||
TransportNode node = findTransportNode(addr);
|
||||
if (node == null) {
|
||||
return AsyncConnection.createTCP(group, sslContext, addr, supportTcpNoDelay(), factory.readTimeoutSeconds, factory.writeTimeoutSeconds);
|
||||
return AsyncConnection.createTCP(group, sslContext, addr, factory.readTimeoutSeconds, factory.writeTimeoutSeconds);
|
||||
}
|
||||
final BlockingQueue<AsyncConnection> queue = node.conns;
|
||||
if (!queue.isEmpty()) {
|
||||
@@ -234,7 +233,7 @@ public final class Transport {
|
||||
}
|
||||
}
|
||||
}
|
||||
return AsyncConnection.createTCP(group, sslContext, addr, supportTcpNoDelay(), factory.readTimeoutSeconds, factory.writeTimeoutSeconds);
|
||||
return AsyncConnection.createTCP(group, sslContext, addr, factory.readTimeoutSeconds, factory.writeTimeoutSeconds);
|
||||
}
|
||||
|
||||
//---------------------随机取地址------------------------
|
||||
@@ -260,8 +259,8 @@ public final class Transport {
|
||||
}
|
||||
CompletableFuture future = new CompletableFuture();
|
||||
final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group);
|
||||
if (supportTcpNoDelay()) channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
|
||||
if (supportTcpKeepAlive()) channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
|
||||
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
|
||||
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
|
||||
channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
|
||||
channel.connect(one.address, one, new CompletionHandler<Void, TransportNode>() {
|
||||
@Override
|
||||
@@ -312,8 +311,8 @@ public final class Transport {
|
||||
if (node == exclude) continue;
|
||||
if (future.isDone()) return future;
|
||||
final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group);
|
||||
if (supportTcpNoDelay()) channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
|
||||
if (supportTcpKeepAlive()) channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
|
||||
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
|
||||
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
|
||||
channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
|
||||
channel.connect(node.address, node, new CompletionHandler<Void, TransportNode>() {
|
||||
@Override
|
||||
|
||||
@@ -6,9 +6,10 @@
|
||||
package org.redkale.net;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.*;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.*;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
@@ -85,6 +86,31 @@ public class UdpBioAsyncConnection extends AsyncConnection {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shutdownInput() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shutdownOutput() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> boolean setOption(SocketOption<T> name, T value) {
|
||||
try {
|
||||
this.channel.setOption(name, value);
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<SocketOption<?>> supportedOptions() {
|
||||
return this.channel.supportedOptions();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) {
|
||||
try {
|
||||
|
||||
Reference in New Issue
Block a user