This commit is contained in:
地平线
2015-07-23 17:29:16 +08:00
parent bc65a871dd
commit 05496b5979

View File

@@ -33,10 +33,13 @@ public final class Transport {
protected final AsynchronousChannelGroup group; protected final AsynchronousChannelGroup group;
protected BlockingQueue<AsyncConnection> queue;
public Transport(String name, String protocol, int clients, int bufferPoolSize, WatchFactory watch, SocketAddress... addresses) { public Transport(String name, String protocol, int clients, int bufferPoolSize, WatchFactory watch, SocketAddress... addresses) {
this.name = name; this.name = name;
this.protocol = protocol; this.protocol = protocol;
this.udp = "UDP".equalsIgnoreCase(protocol); this.udp = "UDP".equalsIgnoreCase(protocol);
this.queue = this.udp ? null : new ArrayBlockingQueue<>(clients);
AsynchronousChannelGroup g = null; AsynchronousChannelGroup g = null;
try { try {
final AtomicInteger counter = new AtomicInteger(); final AtomicInteger counter = new AtomicInteger();
@@ -80,6 +83,12 @@ public final class Transport {
} }
public AsyncConnection pollConnection() { public AsyncConnection pollConnection() {
if (udp) return createConnection();
AsyncConnection conn = queue.poll();
return (conn != null && conn.isOpen()) ? conn : createConnection();
}
private AsyncConnection createConnection() {
SocketAddress addr = remoteAddres[0]; SocketAddress addr = remoteAddres[0];
try { try {
if (udp) { if (udp) {
@@ -97,10 +106,19 @@ public final class Transport {
} }
public void offerConnection(AsyncConnection conn) { public void offerConnection(AsyncConnection conn) {
if (udp) {
try { try {
conn.close(); conn.close();
} catch (IOException io) { } catch (IOException io) {
} }
} else if (conn.isOpen()) {
if (!queue.offer(conn)) {
try {
conn.close();
} catch (IOException io) {
}
}
}
} }
public <A> void async(final ByteBuffer buffer, A att, final CompletionHandler<Integer, A> handler) { public <A> void async(final ByteBuffer buffer, A att, final CompletionHandler<Integer, A> handler) {