This commit is contained in:
Redkale
2018-04-28 20:03:01 +08:00
parent cdacc30633
commit c470d31605
2 changed files with 37 additions and 23 deletions

View File

@@ -23,6 +23,24 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public abstract class ProtocolServer {
protected static final boolean supportTcpNoDelay;
protected static final boolean supportTcpKeepAlive;
static {
boolean tcpNoDelay = false;
boolean keepAlive = false;
try {
AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
tcpNoDelay = channel.supportedOptions().contains(StandardSocketOptions.TCP_NODELAY);
keepAlive = channel.supportedOptions().contains(StandardSocketOptions.SO_KEEPALIVE);
channel.close();
} catch (Exception e) {
}
supportTcpNoDelay = tcpNoDelay;
supportTcpKeepAlive = keepAlive;
}
//创建数
protected final AtomicLong createCounter = new AtomicLong();
@@ -72,6 +90,14 @@ public abstract class ProtocolServer {
throw new RuntimeException("ProtocolServer not support protocol " + protocol);
}
public static boolean supportTcpNoDelay() {
return supportTcpNoDelay;
}
public static boolean supportTcpKeepAlive() {
return supportTcpKeepAlive;
}
private static final class ProtocolUDPServer extends ProtocolServer {
private boolean running;
@@ -214,6 +240,11 @@ public abstract class ProtocolServer {
}
createCounter.incrementAndGet();
livingCounter.incrementAndGet();
try {
if (supportTcpNoDelay()) channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
if (supportTcpKeepAlive()) channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
} catch (IOException e) {
}
AsyncConnection conn = AsyncConnection.create(channel, null, context);
conn.livingCounter = livingCounter;
conn.closedCounter = closedCounter;

View File

@@ -18,6 +18,7 @@ import java.util.logging.Level;
import javax.net.ssl.SSLContext;
import org.redkale.convert.*;
import org.redkale.convert.json.JsonConvert;
import static org.redkale.net.ProtocolServer.*;
import org.redkale.util.*;
/**
@@ -32,24 +33,6 @@ public final class Transport {
public static final String DEFAULT_PROTOCOL = "TCP";
protected static final boolean supportTcpNoDelay;
protected static final boolean supportKeepAlive;
static {
boolean tcpNoDelay = false;
boolean keepAlive = false;
try {
AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
tcpNoDelay = channel.supportedOptions().contains(StandardSocketOptions.TCP_NODELAY);
keepAlive = channel.supportedOptions().contains(StandardSocketOptions.SO_KEEPALIVE);
channel.close();
} catch (Exception e) {
}
supportTcpNoDelay = tcpNoDelay;
supportKeepAlive = keepAlive;
}
protected final AtomicInteger seq = new AtomicInteger(-1);
protected final TransportFactory factory;
@@ -238,7 +221,7 @@ public final class Transport {
if (!rand) { //指定地址
TransportNode node = findTransportNode(addr);
if (node == null) {
return AsyncConnection.createTCP(group, sslContext, addr, supportTcpNoDelay, factory.readTimeoutSeconds, factory.writeTimeoutSeconds);
return AsyncConnection.createTCP(group, sslContext, addr, supportTcpNoDelay(), factory.readTimeoutSeconds, factory.writeTimeoutSeconds);
}
final BlockingQueue<AsyncConnection> queue = node.conns;
if (!queue.isEmpty()) {
@@ -247,7 +230,7 @@ public final class Transport {
if (conn.isOpen()) return CompletableFuture.completedFuture(conn);
}
}
return AsyncConnection.createTCP(group, sslContext, addr, supportTcpNoDelay, factory.readTimeoutSeconds, factory.writeTimeoutSeconds);
return AsyncConnection.createTCP(group, sslContext, addr, supportTcpNoDelay(), factory.readTimeoutSeconds, factory.writeTimeoutSeconds);
}
//---------------------随机取地址------------------------
@@ -269,8 +252,8 @@ public final class Transport {
}
CompletableFuture future = new CompletableFuture();
final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group);
if (supportTcpNoDelay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
if (supportKeepAlive) channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
if (supportTcpNoDelay()) channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
if (supportTcpKeepAlive()) channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
channel.connect(one.address, one, new CompletionHandler<Void, TransportNode>() {
@Override
public void completed(Void result, TransportNode attachment) {
@@ -320,7 +303,7 @@ public final class Transport {
if (node == exclude) continue;
if (future.isDone()) return future;
final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group);
if (supportTcpNoDelay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
if (supportTcpNoDelay()) channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.connect(node.address, node, new CompletionHandler<Void, TransportNode>() {
@Override
public void completed(Void result, TransportNode attachment) {