This commit is contained in:
地平线
2015-07-23 17:19:50 +08:00
parent f8cf29ea08
commit bc65a871dd
3 changed files with 20 additions and 10 deletions

View File

@@ -14,6 +14,8 @@
<!-- <!--
远程client地址组资源. 注意: remote的name值不能为LOCAL不区分大小写 远程client地址组资源. 注意: remote的name值不能为LOCAL不区分大小写
protocol 值只能是UDP TCP 默认UDP protocol 值只能是UDP TCP 默认UDP
clients: 连接池数, 默认: CPU核数*8
buffers: ByteBuffer对象池的大小 默认: CPU核数*16
group: 组名, 默认是空字符串, 通常不同机房使用不同的group值 group: 组名, 默认是空字符串, 通常不同机房使用不同的group值
--> -->
<remote name="myremote" protocol="UDP" group=""> <remote name="myremote" protocol="UDP" group="">

View File

@@ -479,7 +479,8 @@ public final class Application {
} }
list.add(new SimpleEntry<>(name, addresses)); list.add(new SimpleEntry<>(name, addresses));
} }
Transport transport = new Transport(name, protocol, watch, 100, addresses); Transport transport = new Transport(name, protocol, conf.getIntValue("clients", Runtime.getRuntime().availableProcessors() * 8),
conf.getIntValue("buffers:", Runtime.getRuntime().availableProcessors() * 16), watch, addresses);
factory.register(name, Transport.class, transport); factory.register(name, Transport.class, transport);
if (this.nodeName.isEmpty() && host.equals(addrs[0].getValue("addr"))) { if (this.nodeName.isEmpty() && host.equals(addrs[0].getValue("addr"))) {
this.nodeName = name; this.nodeName = name;

View File

@@ -23,17 +23,20 @@ public final class Transport {
protected SocketAddress[] remoteAddres; protected SocketAddress[] remoteAddres;
protected ObjectPool<ByteBuffer> bufferPool; protected final ObjectPool<ByteBuffer> bufferPool;
protected String name; protected final String name;
protected String protocol; protected final String protocol;
private final boolean udp;
protected final AsynchronousChannelGroup group; protected final AsynchronousChannelGroup group;
public Transport(String name, String protocol, WatchFactory watch, int bufferPoolSize, SocketAddress... addresses) { public Transport(String name, String protocol, int clients, int bufferPoolSize, WatchFactory watch, SocketAddress... addresses) {
this.name = name; this.name = name;
this.protocol = protocol; this.protocol = protocol;
this.udp = "UDP".equalsIgnoreCase(protocol);
AsynchronousChannelGroup g = null; AsynchronousChannelGroup g = null;
try { try {
final AtomicInteger counter = new AtomicInteger(); final AtomicInteger counter = new AtomicInteger();
@@ -72,17 +75,21 @@ public final class Transport {
for (ByteBuffer buffer : buffers) offerBuffer(buffer); for (ByteBuffer buffer : buffers) offerBuffer(buffer);
} }
public boolean isUDP() {
return udp;
}
public AsyncConnection pollConnection() { public AsyncConnection pollConnection() {
SocketAddress addr = remoteAddres[0]; SocketAddress addr = remoteAddres[0];
try { try {
if ("TCP".equalsIgnoreCase(protocol)) { if (udp) {
AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group);
channel.connect(addr).get(2, TimeUnit.SECONDS);
return AsyncConnection.create(channel, 0, 0);
} else {
AsyncDatagramChannel channel = AsyncDatagramChannel.open(group); AsyncDatagramChannel channel = AsyncDatagramChannel.open(group);
channel.connect(addr); channel.connect(addr);
return AsyncConnection.create(channel, addr, true, 0, 0); return AsyncConnection.create(channel, addr, true, 0, 0);
} else {
AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group);
channel.connect(addr).get(2, TimeUnit.SECONDS);
return AsyncConnection.create(channel, 0, 0);
} }
} catch (Exception ex) { } catch (Exception ex) {
throw new RuntimeException("transport address = " + addr, ex); throw new RuntimeException("transport address = " + addr, ex);