This commit is contained in:
Redkale
2020-07-30 17:35:59 +08:00
parent 84147280cf
commit 4b2c6eba63

View File

@@ -10,6 +10,7 @@ import java.lang.ref.WeakReference;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
@@ -56,6 +57,9 @@ public final class Transport {
//负载均衡策略
protected final TransportStrategy strategy;
//连接上限, 为null表示无限制
protected Semaphore semaphore;
protected Transport(String name, TransportFactory factory, final ObjectPool<ByteBuffer> transportBufferPool,
final AsynchronousChannelGroup transportChannelGroup, final SSLContext sslContext, final InetSocketAddress clientAddress,
final Collection<InetSocketAddress> addresses, final TransportStrategy strategy) {
@@ -78,6 +82,14 @@ public final class Transport {
updateRemoteAddresses(addresses);
}
public Semaphore getSemaphore() {
return semaphore;
}
public void setSemaphore(Semaphore semaphore) {
this.semaphore = semaphore;
}
public final InetSocketAddress[] updateRemoteAddresses(final Collection<InetSocketAddress> addresses) {
final TransportNode[] oldNodes = this.transportNodes;
synchronized (this) {
@@ -210,6 +222,44 @@ public final class Transport {
return tcp;
}
protected CompletableFuture<AsyncConnection> pollAsync(TransportNode node, SocketAddress addr, Supplier<CompletableFuture<AsyncConnection>> func, final int count) {
if (count >= 5) {
CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
future.completeExceptionally(new SQLException("create AsyncConnection error"));
return future;
}
final BlockingQueue<AsyncConnection> queue = node.conns;
if (!queue.isEmpty()) {
AsyncConnection conn;
while ((conn = queue.poll()) != null) {
if (conn.isOpen()) {
return CompletableFuture.completedFuture(conn);
} else {
conn.dispose();
}
}
}
if (semaphore != null && !semaphore.tryAcquire()) {
return CompletableFuture.supplyAsync(() -> {
try {
return queue.poll(1, TimeUnit.SECONDS);
} catch (Exception t) {
return null;
}
}, factory.executor).thenCompose((conn2) -> {
if (conn2 != null && conn2.isOpen()) {
return CompletableFuture.completedFuture(conn2);
}
return pollAsync(node, addr, func, count + 1);
});
}
return func.get().thenApply(conn -> {
if (conn != null && semaphore != null) conn.beforeCloseListener((c) -> semaphore.release());
return conn;
});
}
public CompletableFuture<AsyncConnection> pollConnection(SocketAddress addr0) {
if (this.strategy != null) return strategy.pollConnection(addr0, this);
final TransportNode[] nodes = this.transportNodes;
@@ -230,18 +280,7 @@ public final class Transport {
if (node == null) {
return AsyncConnection.createTCP(bufferPool, group, sslContext, addr, factory.readTimeoutSeconds, factory.writeTimeoutSeconds);
}
final BlockingQueue<AsyncConnection> queue = node.conns;
if (!queue.isEmpty()) {
AsyncConnection conn;
while ((conn = queue.poll()) != null) {
if (conn.isOpen()) {
return CompletableFuture.completedFuture(conn);
} else {
conn.dispose();
}
}
}
return AsyncConnection.createTCP(bufferPool, group, sslContext, addr, factory.readTimeoutSeconds, factory.writeTimeoutSeconds);
return pollAsync(node, addr, () -> AsyncConnection.createTCP(bufferPool, group, sslContext, addr, factory.readTimeoutSeconds, factory.writeTimeoutSeconds), 1);
}
//---------------------随机取地址------------------------
@@ -265,45 +304,53 @@ public final class Transport {
}
}
}
CompletableFuture future = new CompletableFuture();
final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group);
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
channel.connect(one.address, one, new CompletionHandler<Void, TransportNode>() {
@Override
public void completed(Void result, TransportNode attachment) {
attachment.disabletime = 0;
AsyncConnection asyncConn = AsyncConnection.create(bufferPool, channel, attachment.address, factory.readTimeoutSeconds, factory.writeTimeoutSeconds);
if (future.isDone()) {
if (!attachment.conns.offer(asyncConn)) asyncConn.dispose();
} else {
future.complete(asyncConn);
}
return pollAsync(one, one.getAddress(), () -> {
CompletableFuture future = new CompletableFuture();
AsynchronousSocketChannel channel0 = null;
try {
channel0 = AsynchronousSocketChannel.open(group);
channel0.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel0.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
channel0.setOption(StandardSocketOptions.SO_REUSEADDR, true);
} catch (Exception ex) {
ex.printStackTrace();
}
@Override
public void failed(Throwable exc, TransportNode attachment) {
attachment.disabletime = now;
try {
channel.close();
} catch (Exception e) {
final AsynchronousSocketChannel channel = channel0;
channel.connect(one.address, one, new CompletionHandler<Void, TransportNode>() {
@Override
public void completed(Void result, TransportNode attachment) {
attachment.disabletime = 0;
AsyncConnection asyncConn = AsyncConnection.create(bufferPool, channel, attachment.address, factory.readTimeoutSeconds, factory.writeTimeoutSeconds);
if (future.isDone()) {
if (!attachment.conns.offer(asyncConn)) asyncConn.dispose();
} else {
future.complete(asyncConn);
}
}
try {
pollConnection0(nodes, one, now).whenComplete((r, t) -> {
if (t != null) {
future.completeExceptionally(t);
} else {
future.complete(r);
}
});
} catch (Exception e) {
future.completeExceptionally(e);
@Override
public void failed(Throwable exc, TransportNode attachment) {
attachment.disabletime = now;
try {
channel.close();
} catch (Exception e) {
}
try {
pollConnection0(nodes, one, now).whenComplete((r, t) -> {
if (t != null) {
future.completeExceptionally(t);
} else {
future.complete(r);
}
});
} catch (Exception e) {
future.completeExceptionally(e);
}
}
}
});
return future;
});
return future;
}, 1);
}
return pollConnection0(nodes, null, now);
} catch (Exception ex) {