diff --git a/src/META-INF/application-template.xml b/src/META-INF/application-template.xml index 264570b09..6c3e67eed 100644 --- a/src/META-INF/application-template.xml +++ b/src/META-INF/application-template.xml @@ -14,6 +14,8 @@ diff --git a/src/com/wentch/redkale/boot/Application.java b/src/com/wentch/redkale/boot/Application.java index ecd56e924..cce75b7bf 100644 --- a/src/com/wentch/redkale/boot/Application.java +++ b/src/com/wentch/redkale/boot/Application.java @@ -479,7 +479,8 @@ public final class Application { } list.add(new SimpleEntry<>(name, addresses)); } - Transport transport = new Transport(name, protocol, watch, 100, addresses); + Transport transport = new Transport(name, protocol, conf.getIntValue("clients", Runtime.getRuntime().availableProcessors() * 8), + conf.getIntValue("buffers:", Runtime.getRuntime().availableProcessors() * 16), watch, addresses); factory.register(name, Transport.class, transport); if (this.nodeName.isEmpty() && host.equals(addrs[0].getValue("addr"))) { this.nodeName = name; diff --git a/src/com/wentch/redkale/net/Transport.java b/src/com/wentch/redkale/net/Transport.java index e9af5e96b..5c8d59693 100644 --- a/src/com/wentch/redkale/net/Transport.java +++ b/src/com/wentch/redkale/net/Transport.java @@ -23,17 +23,20 @@ public final class Transport { protected SocketAddress[] remoteAddres; - protected ObjectPool bufferPool; + protected final ObjectPool bufferPool; - protected String name; + protected final String name; - protected String protocol; + protected final String protocol; + + private final boolean udp; protected final AsynchronousChannelGroup group; - public Transport(String name, String protocol, WatchFactory watch, int bufferPoolSize, SocketAddress... addresses) { + public Transport(String name, String protocol, int clients, int bufferPoolSize, WatchFactory watch, SocketAddress... addresses) { this.name = name; this.protocol = protocol; + this.udp = "UDP".equalsIgnoreCase(protocol); AsynchronousChannelGroup g = null; try { final AtomicInteger counter = new AtomicInteger(); @@ -72,17 +75,21 @@ public final class Transport { for (ByteBuffer buffer : buffers) offerBuffer(buffer); } + public boolean isUDP() { + return udp; + } + public AsyncConnection pollConnection() { SocketAddress addr = remoteAddres[0]; try { - if ("TCP".equalsIgnoreCase(protocol)) { - AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group); - channel.connect(addr).get(2, TimeUnit.SECONDS); - return AsyncConnection.create(channel, 0, 0); - } else { + if (udp) { AsyncDatagramChannel channel = AsyncDatagramChannel.open(group); channel.connect(addr); return AsyncConnection.create(channel, addr, true, 0, 0); + } else { + AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group); + channel.connect(addr).get(2, TimeUnit.SECONDS); + return AsyncConnection.create(channel, 0, 0); } } catch (Exception ex) { throw new RuntimeException("transport address = " + addr, ex);