From 4b2c6eba63c93520437bf8f62d13d7e54305f819 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Thu, 30 Jul 2020 17:35:59 +0800 Subject: [PATCH] --- src/org/redkale/net/Transport.java | 141 +++++++++++++++++++---------- 1 file changed, 94 insertions(+), 47 deletions(-) diff --git a/src/org/redkale/net/Transport.java b/src/org/redkale/net/Transport.java index e2c087fdf..a79eb278f 100644 --- a/src/org/redkale/net/Transport.java +++ b/src/org/redkale/net/Transport.java @@ -10,6 +10,7 @@ import java.lang.ref.WeakReference; import java.net.*; import java.nio.ByteBuffer; import java.nio.channels.*; +import java.sql.SQLException; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -56,6 +57,9 @@ public final class Transport { //负载均衡策略 protected final TransportStrategy strategy; + //连接上限, 为null表示无限制 + protected Semaphore semaphore; + protected Transport(String name, TransportFactory factory, final ObjectPool transportBufferPool, final AsynchronousChannelGroup transportChannelGroup, final SSLContext sslContext, final InetSocketAddress clientAddress, final Collection addresses, final TransportStrategy strategy) { @@ -78,6 +82,14 @@ public final class Transport { updateRemoteAddresses(addresses); } + public Semaphore getSemaphore() { + return semaphore; + } + + public void setSemaphore(Semaphore semaphore) { + this.semaphore = semaphore; + } + public final InetSocketAddress[] updateRemoteAddresses(final Collection addresses) { final TransportNode[] oldNodes = this.transportNodes; synchronized (this) { @@ -210,6 +222,44 @@ public final class Transport { return tcp; } + protected CompletableFuture pollAsync(TransportNode node, SocketAddress addr, Supplier> func, final int count) { + if (count >= 5) { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new SQLException("create AsyncConnection error")); + return future; + } + final BlockingQueue queue = node.conns; + if (!queue.isEmpty()) { + AsyncConnection conn; + while ((conn = queue.poll()) != null) { + if (conn.isOpen()) { + return CompletableFuture.completedFuture(conn); + } else { + conn.dispose(); + } + } + } + if (semaphore != null && !semaphore.tryAcquire()) { + return CompletableFuture.supplyAsync(() -> { + try { + return queue.poll(1, TimeUnit.SECONDS); + } catch (Exception t) { + return null; + } + }, factory.executor).thenCompose((conn2) -> { + if (conn2 != null && conn2.isOpen()) { + return CompletableFuture.completedFuture(conn2); + } + return pollAsync(node, addr, func, count + 1); + }); + } + return func.get().thenApply(conn -> { + if (conn != null && semaphore != null) conn.beforeCloseListener((c) -> semaphore.release()); + return conn; + }); + + } + public CompletableFuture pollConnection(SocketAddress addr0) { if (this.strategy != null) return strategy.pollConnection(addr0, this); final TransportNode[] nodes = this.transportNodes; @@ -230,18 +280,7 @@ public final class Transport { if (node == null) { return AsyncConnection.createTCP(bufferPool, group, sslContext, addr, factory.readTimeoutSeconds, factory.writeTimeoutSeconds); } - final BlockingQueue queue = node.conns; - if (!queue.isEmpty()) { - AsyncConnection conn; - while ((conn = queue.poll()) != null) { - if (conn.isOpen()) { - return CompletableFuture.completedFuture(conn); - } else { - conn.dispose(); - } - } - } - return AsyncConnection.createTCP(bufferPool, group, sslContext, addr, factory.readTimeoutSeconds, factory.writeTimeoutSeconds); + return pollAsync(node, addr, () -> AsyncConnection.createTCP(bufferPool, group, sslContext, addr, factory.readTimeoutSeconds, factory.writeTimeoutSeconds), 1); } //---------------------随机取地址------------------------ @@ -265,45 +304,53 @@ public final class Transport { } } } - CompletableFuture future = new CompletableFuture(); - final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group); - channel.setOption(StandardSocketOptions.TCP_NODELAY, true); - channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); - channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); - channel.connect(one.address, one, new CompletionHandler() { - @Override - public void completed(Void result, TransportNode attachment) { - attachment.disabletime = 0; - AsyncConnection asyncConn = AsyncConnection.create(bufferPool, channel, attachment.address, factory.readTimeoutSeconds, factory.writeTimeoutSeconds); - if (future.isDone()) { - if (!attachment.conns.offer(asyncConn)) asyncConn.dispose(); - } else { - future.complete(asyncConn); - } + return pollAsync(one, one.getAddress(), () -> { + CompletableFuture future = new CompletableFuture(); + AsynchronousSocketChannel channel0 = null; + try { + channel0 = AsynchronousSocketChannel.open(group); + channel0.setOption(StandardSocketOptions.TCP_NODELAY, true); + channel0.setOption(StandardSocketOptions.SO_KEEPALIVE, true); + channel0.setOption(StandardSocketOptions.SO_REUSEADDR, true); + } catch (Exception ex) { + ex.printStackTrace(); } - - @Override - public void failed(Throwable exc, TransportNode attachment) { - attachment.disabletime = now; - try { - channel.close(); - } catch (Exception e) { + final AsynchronousSocketChannel channel = channel0; + channel.connect(one.address, one, new CompletionHandler() { + @Override + public void completed(Void result, TransportNode attachment) { + attachment.disabletime = 0; + AsyncConnection asyncConn = AsyncConnection.create(bufferPool, channel, attachment.address, factory.readTimeoutSeconds, factory.writeTimeoutSeconds); + if (future.isDone()) { + if (!attachment.conns.offer(asyncConn)) asyncConn.dispose(); + } else { + future.complete(asyncConn); + } } - try { - pollConnection0(nodes, one, now).whenComplete((r, t) -> { - if (t != null) { - future.completeExceptionally(t); - } else { - future.complete(r); - } - }); - } catch (Exception e) { - future.completeExceptionally(e); + @Override + public void failed(Throwable exc, TransportNode attachment) { + attachment.disabletime = now; + try { + channel.close(); + } catch (Exception e) { + } + try { + pollConnection0(nodes, one, now).whenComplete((r, t) -> { + if (t != null) { + future.completeExceptionally(t); + } else { + future.complete(r); + } + }); + + } catch (Exception e) { + future.completeExceptionally(e); + } } - } - }); - return future; + }); + return future; + }, 1); } return pollConnection0(nodes, null, now); } catch (Exception ex) {