This commit is contained in:
kamhung
2015-11-26 14:23:27 +08:00
parent 8aca1ff103
commit 41345ec9f6
3 changed files with 41 additions and 10 deletions

View File

@@ -9,6 +9,7 @@ import java.io.IOException;
import java.net.*; import java.net.*;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.*; import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
/** /**
@@ -23,6 +24,8 @@ public abstract class ProtocolServer {
public abstract void bind(SocketAddress local, int backlog) throws IOException; public abstract void bind(SocketAddress local, int backlog) throws IOException;
public abstract <T> Set<SocketOption<?>> supportedOptions();
public abstract <T> void setOption(SocketOption<T> name, T value) throws IOException; public abstract <T> void setOption(SocketOption<T> name, T value) throws IOException;
public abstract void accept(); public abstract void accept();
@@ -67,6 +70,11 @@ public abstract class ProtocolServer {
this.serverChannel.setOption(name, value); this.serverChannel.setOption(name, value);
} }
@Override
public <T> Set<SocketOption<?>> supportedOptions() {
return this.serverChannel.supportedOptions();
}
@Override @Override
public void accept() { public void accept() {
final DatagramChannel serchannel = this.serverChannel; final DatagramChannel serchannel = this.serverChannel;
@@ -139,6 +147,11 @@ public abstract class ProtocolServer {
this.serverChannel.setOption(name, value); this.serverChannel.setOption(name, value);
} }
@Override
public <T> Set<SocketOption<?>> supportedOptions() {
return this.serverChannel.supportedOptions();
}
@Override @Override
public void accept() { public void accept() {
final AsynchronousServerSocketChannel serchannel = this.serverChannel; final AsynchronousServerSocketChannel serchannel = this.serverChannel;

View File

@@ -46,7 +46,7 @@ public abstract class Server {
protected int backlog; protected int backlog;
protected ProtocolServer transport; protected ProtocolServer serverChannel;
protected int capacity; protected int capacity;
@@ -78,7 +78,7 @@ public abstract class Server {
this.config = config; this.config = config;
this.address = new InetSocketAddress(config.getValue("host", "0.0.0.0"), config.getIntValue("port", 80)); this.address = new InetSocketAddress(config.getValue("host", "0.0.0.0"), config.getIntValue("port", 80));
this.charset = Charset.forName(config.getValue("charset", "UTF-8")); this.charset = Charset.forName(config.getValue("charset", "UTF-8"));
this.backlog = config.getIntValue("backlog", 10240); this.backlog = config.getIntValue("backlog", 8 * 1024);
this.readTimeoutSecond = config.getIntValue("readTimeoutSecond", 0); this.readTimeoutSecond = config.getIntValue("readTimeoutSecond", 0);
this.writeTimeoutSecond = config.getIntValue("writeTimeoutSecond", 0); this.writeTimeoutSecond = config.getIntValue("writeTimeoutSecond", 0);
this.capacity = config.getIntValue("capacity", 8 * 1024); this.capacity = config.getIntValue("capacity", 8 * 1024);
@@ -117,12 +117,13 @@ public abstract class Server {
this.context = this.createContext(); this.context = this.createContext();
this.prepare.init(this.context, config); this.prepare.init(this.context, config);
if (this.watch != null) this.watch.inject(this.prepare); if (this.watch != null) this.watch.inject(this.prepare);
this.transport = ProtocolServer.create(this.protocol, context); this.serverChannel = ProtocolServer.create(this.protocol, context);
this.transport.open(); this.serverChannel.open();
transport.setOption(StandardSocketOptions.SO_REUSEADDR, true); if (this.serverChannel.supportedOptions().contains(StandardSocketOptions.TCP_NODELAY)) {
transport.setOption(StandardSocketOptions.SO_RCVBUF, 8 * 1024); this.serverChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
transport.bind(address, backlog); }
transport.accept(); serverChannel.bind(address, backlog);
serverChannel.accept();
final String threadName = "[" + Thread.currentThread().getName() + "] "; final String threadName = "[" + Thread.currentThread().getName() + "] ";
logger.info(threadName + this.getClass().getSimpleName() + "." + protocol + " listen: " + address logger.info(threadName + this.getClass().getSimpleName() + "." + protocol + " listen: " + address
+ ", threads: " + threads + ", bufferCapacity: " + capacity + ", bufferPoolSize: " + bufferPoolSize + ", responsePoolSize: " + responsePoolSize + ", threads: " + threads + ", bufferCapacity: " + capacity + ", bufferPoolSize: " + bufferPoolSize + ", responsePoolSize: " + responsePoolSize
@@ -135,7 +136,7 @@ public abstract class Server {
long s = System.currentTimeMillis(); long s = System.currentTimeMillis();
logger.info(this.getClass().getSimpleName() + "-" + this.protocol + " shutdowning"); logger.info(this.getClass().getSimpleName() + "-" + this.protocol + " shutdowning");
try { try {
this.transport.close(); this.serverChannel.close();
} catch (Exception e) { } catch (Exception e) {
} }
logger.info(this.getClass().getSimpleName() + "-" + this.protocol + " shutdow prepare servlet"); logger.info(this.getClass().getSimpleName() + "-" + this.protocol + " shutdow prepare servlet");

View File

@@ -26,6 +26,19 @@ public final class Transport {
protected static final int MAX_POOL_LIMIT = Runtime.getRuntime().availableProcessors() * 16; protected static final int MAX_POOL_LIMIT = Runtime.getRuntime().availableProcessors() * 16;
protected static final boolean supportTcpNoDelay;
static {
boolean tcpNoDelay = false;
try {
AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
tcpNoDelay = channel.supportedOptions().contains(StandardSocketOptions.TCP_NODELAY);
channel.close();
} catch (Exception e) {
}
supportTcpNoDelay = tcpNoDelay;
}
protected final String name; protected final String name;
protected final int bufferPoolSize; protected final int bufferPoolSize;
@@ -156,7 +169,10 @@ public final class Transport {
if (conn.isOpen()) return conn; if (conn.isOpen()) return conn;
} }
} }
if (channel == null) channel = AsynchronousSocketChannel.open(group); if (channel == null) {
channel = AsynchronousSocketChannel.open(group);
if (supportTcpNoDelay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
}
try { try {
channel.connect(addr).get(2, TimeUnit.SECONDS); channel.connect(addr).get(2, TimeUnit.SECONDS);
break; break;
@@ -167,6 +183,7 @@ public final class Transport {
} }
} else { } else {
channel = AsynchronousSocketChannel.open(group); channel = AsynchronousSocketChannel.open(group);
if (supportTcpNoDelay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.connect(addr).get(2, TimeUnit.SECONDS); channel.connect(addr).get(2, TimeUnit.SECONDS);
} }
if (channel == null) return null; if (channel == null) return null;