diff --git a/src/com/wentch/redkale/net/Transport.java b/src/com/wentch/redkale/net/Transport.java index 5c8d59693..e3c293c82 100644 --- a/src/com/wentch/redkale/net/Transport.java +++ b/src/com/wentch/redkale/net/Transport.java @@ -33,10 +33,13 @@ public final class Transport { protected final AsynchronousChannelGroup group; + protected BlockingQueue queue; + public Transport(String name, String protocol, int clients, int bufferPoolSize, WatchFactory watch, SocketAddress... addresses) { this.name = name; this.protocol = protocol; this.udp = "UDP".equalsIgnoreCase(protocol); + this.queue = this.udp ? null : new ArrayBlockingQueue<>(clients); AsynchronousChannelGroup g = null; try { final AtomicInteger counter = new AtomicInteger(); @@ -80,6 +83,12 @@ public final class Transport { } public AsyncConnection pollConnection() { + if (udp) return createConnection(); + AsyncConnection conn = queue.poll(); + return (conn != null && conn.isOpen()) ? conn : createConnection(); + } + + private AsyncConnection createConnection() { SocketAddress addr = remoteAddres[0]; try { if (udp) { @@ -97,9 +106,18 @@ public final class Transport { } public void offerConnection(AsyncConnection conn) { - try { - conn.close(); - } catch (IOException io) { + if (udp) { + try { + conn.close(); + } catch (IOException io) { + } + } else if (conn.isOpen()) { + if (!queue.offer(conn)) { + try { + conn.close(); + } catch (IOException io) { + } + } } }