From ddd3d33edc160b81766f84af83659d640a4450dc Mon Sep 17 00:00:00 2001 From: redkale Date: Thu, 7 Sep 2023 17:07:54 +0800 Subject: [PATCH] =?UTF-8?q?Client=E5=A2=9E=E5=8A=A0newConnection?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/net/client/Client.java | 54 +++++++++++++------ 1 file changed, 38 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/redkale/net/client/Client.java b/src/main/java/org/redkale/net/client/Client.java index 92ce29f99..92909aa0a 100644 --- a/src/main/java/org/redkale/net/client/Client.java +++ b/src/main/java/org/redkale/net/client/Client.java @@ -299,15 +299,23 @@ public abstract class Client, R extends ClientR } public final CompletableFuture connect() { + return connect(true); + } + + public final CompletableFuture newConnection() { + return connect(true); + } + + private CompletableFuture connect(final boolean pool) { final int size = this.connArray.length; WorkThread workThread = WorkThread.currentWorkThread(); final int connIndex = (workThread != null && workThread.threads() == size) ? workThread.index() : (int) Math.abs(connIndexSeq.getAndIncrement()) % size; C cc = (C) this.connArray[connIndex]; - if (cc != null && cc.isOpen()) { + if (pool && cc != null && cc.isOpen()) { return CompletableFuture.completedFuture(cc); } final Queue> waitQueue = this.connAcquireWaitings[connIndex]; - if (this.connOpenStates[connIndex].compareAndSet(false, true)) { + if (!pool || this.connOpenStates[connIndex].compareAndSet(false, true)) { CompletableFuture future = group.createClient(tcp, this.address.randomAddress(), readTimeoutSeconds, writeTimeoutSeconds) .thenApply(c -> (C) createClientConnection(connIndex, c).setMaxPipelines(maxPipelines)); R virtualReq = createVirtualRequestAfterConnect(); @@ -325,17 +333,19 @@ public abstract class Client, R extends ClientR return future.thenCompose(c -> { return CompletableFuture.supplyAsync(() -> { c.setAuthenticated(true); - this.connArray[connIndex] = c; - CompletableFuture f; - while ((f = waitQueue.poll()) != null) { - if (!f.isDone()) { - f.complete(c); + if (pool) { + this.connArray[connIndex] = c; + CompletableFuture f; + while ((f = waitQueue.poll()) != null) { + if (!f.isDone()) { + f.complete(c); + } } } return c; }, c.channel.getWriteIOThread()); }).whenComplete((r, t) -> { - if (t != null) { + if (pool && t != null) { this.connOpenStates[connIndex].set(false); } }); @@ -348,16 +358,26 @@ public abstract class Client, R extends ClientR //指定地址获取连接 public final CompletableFuture connect(final SocketAddress addr) { + return connect(true, addr); + } + + //指定地址获取连接 + public final CompletableFuture newConnection(final SocketAddress addr) { + return connect(false, addr); + } + + //指定地址获取连接 + private CompletableFuture connect(final boolean pool, final SocketAddress addr) { if (addr == null) { return connect(); } final AddressConnEntry entry = connAddrEntrys.computeIfAbsent(addr, a -> new AddressConnEntry()); C ec = entry.connection; - if (ec != null && ec.isOpen()) { + if (pool && ec != null && ec.isOpen()) { return CompletableFuture.completedFuture(ec); } final Queue> waitQueue = entry.connAcquireWaitings; - if (entry.connOpenState.compareAndSet(false, true)) { + if (!pool || entry.connOpenState.compareAndSet(false, true)) { CompletableFuture future = group.createClient(tcp, addr, readTimeoutSeconds, writeTimeoutSeconds) .thenApply(c -> (C) createClientConnection(-1, c).setMaxPipelines(maxPipelines)); R virtualReq = createVirtualRequestAfterConnect(); @@ -375,17 +395,19 @@ public abstract class Client, R extends ClientR return future.thenCompose(c -> { return CompletableFuture.supplyAsync(() -> { c.setAuthenticated(true); - entry.connection = c; - CompletableFuture f; - while ((f = waitQueue.poll()) != null) { - if (!f.isDone()) { - f.complete(c); + if (pool) { + entry.connection = c; + CompletableFuture f; + while ((f = waitQueue.poll()) != null) { + if (!f.isDone()) { + f.complete(c); + } } } return c; }, c.channel.getWriteIOThread()); }).whenComplete((r, t) -> { - if (t != null) { + if (pool && t != null) { entry.connOpenState.set(false); } });