This commit is contained in:
@@ -247,7 +247,7 @@ public final class Transport {
|
||||
//---------------------随机取地址------------------------
|
||||
//从连接池里取
|
||||
for (final TransportAddress taddr : taddrs) {
|
||||
if (!taddr.enable) continue;
|
||||
if (taddr.disabletime > 0) continue;
|
||||
final BlockingQueue<AsyncConnection> queue = taddr.conns;
|
||||
if (!queue.isEmpty()) {
|
||||
AsyncConnection conn;
|
||||
@@ -259,6 +259,7 @@ public final class Transport {
|
||||
//从可用/不可用的地址列表中创建连接
|
||||
AtomicInteger count = new AtomicInteger(taddrs.length);
|
||||
CompletableFuture future = new CompletableFuture();
|
||||
final long now = System.currentTimeMillis();
|
||||
for (final TransportAddress taddr : taddrs) {
|
||||
if (future.isDone()) return future;
|
||||
final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group);
|
||||
@@ -266,7 +267,7 @@ public final class Transport {
|
||||
channel.connect(taddr.address, taddr, new CompletionHandler<Void, TransportAddress>() {
|
||||
@Override
|
||||
public void completed(Void result, TransportAddress attachment) {
|
||||
taddr.enable = true;
|
||||
taddr.disabletime = 0;
|
||||
AsyncConnection asyncConn = AsyncConnection.create(channel, attachment.address, factory.readTimeoutSecond, factory.writeTimeoutSecond);
|
||||
if (future.isDone()) {
|
||||
if (!attachment.conns.offer(asyncConn)) asyncConn.dispose();
|
||||
@@ -277,7 +278,7 @@ public final class Transport {
|
||||
|
||||
@Override
|
||||
public void failed(Throwable exc, TransportAddress attachment) {
|
||||
taddr.enable = false;
|
||||
taddr.disabletime = now;
|
||||
if (count.decrementAndGet() < 1) {
|
||||
future.completeExceptionally(exc);
|
||||
}
|
||||
@@ -348,7 +349,7 @@ public final class Transport {
|
||||
|
||||
protected InetSocketAddress address;
|
||||
|
||||
protected volatile boolean enable;
|
||||
protected volatile long disabletime; //不可用时的时间, 为0表示可用
|
||||
|
||||
protected final BlockingQueue<AsyncConnection> conns = new ArrayBlockingQueue<>(MAX_POOL_LIMIT);
|
||||
|
||||
@@ -356,13 +357,13 @@ public final class Transport {
|
||||
|
||||
public TransportAddress(InetSocketAddress address) {
|
||||
this.address = address;
|
||||
this.enable = true;
|
||||
this.disabletime = 0;
|
||||
}
|
||||
|
||||
@ConstructorParameters({"address", "enable"})
|
||||
public TransportAddress(InetSocketAddress address, boolean enable) {
|
||||
@ConstructorParameters({"address", "disabletime"})
|
||||
public TransportAddress(InetSocketAddress address, long disabletime) {
|
||||
this.address = address;
|
||||
this.enable = enable;
|
||||
this.disabletime = disabletime;
|
||||
}
|
||||
|
||||
public <T> T setAttribute(String name, T value) {
|
||||
@@ -398,8 +399,8 @@ public final class Transport {
|
||||
return address;
|
||||
}
|
||||
|
||||
public boolean isEnable() {
|
||||
return enable;
|
||||
public long getDisabletime() {
|
||||
return disabletime;
|
||||
}
|
||||
|
||||
@ConvertDisabled
|
||||
|
||||
Reference in New Issue
Block a user