From 96e9d897ecba35246b6ca7d13d614c36a7eb27d2 Mon Sep 17 00:00:00 2001 From: redkale Date: Fri, 16 Aug 2024 22:18:42 +0800 Subject: [PATCH] =?UTF-8?q?Client=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/net/client/Client.java | 48 +++++++++++++++---- .../redkale/net/client/ClientConnection.java | 26 ++++------ .../org/redkale/net/client/ClientFuture.java | 25 ++++++---- 3 files changed, 64 insertions(+), 35 deletions(-) diff --git a/src/main/java/org/redkale/net/client/Client.java b/src/main/java/org/redkale/net/client/Client.java index ba3624edb..ecb63d926 100644 --- a/src/main/java/org/redkale/net/client/Client.java +++ b/src/main/java/org/redkale/net/client/Client.java @@ -265,7 +265,7 @@ public abstract class Client, R extends ClientR return sendAsync(getAddress(request), (Function) null, request); } - public final CompletableFuture> sendAsync(R... requests) { + public final CompletableFuture

[] sendAsync(R... requests) { return sendAsync(getAddress(requests[0]), (Function) null, requests); } @@ -273,7 +273,7 @@ public abstract class Client, R extends ClientR return sendAsync(getAddress(request), respTransfer, request); } - public final CompletableFuture> sendAsync(Function respTransfer, R... requests) { + public final CompletableFuture[] sendAsync(Function respTransfer, R... requests) { return sendAsync(getAddress(requests[0]), respTransfer, requests); } @@ -281,7 +281,7 @@ public abstract class Client, R extends ClientR return sendAsync(addr, (Function) null, request); } - public final CompletableFuture> sendAsync(SocketAddress addr, R... requests) { + public final CompletableFuture

[] sendAsync(SocketAddress addr, R... requests) { return sendAsync(addr, (Function) null, requests); } @@ -291,26 +291,47 @@ public abstract class Client, R extends ClientR return connect(request.workThread, addr).thenCompose(conn -> writeChannel(conn, respTransfer, request)); } - public final CompletableFuture> sendAsync( - SocketAddress addr, Function respTransfer, R... requests) { + public final CompletableFuture[] sendAsync(SocketAddress addr, Function respTransfer, R... requests) { String traceid = Traces.computeIfAbsent(requests[0].traceid, Traces.currentTraceid()); - for (R request : requests) { + ClientFuture[] respFutures = new ClientFuture[requests.length]; + for (int i = 0; i < respFutures.length; i++) { + R request = requests[i]; request.traceid = traceid; request.computeWorkThreadIfAbsent(); + respFutures[i] = createClientFuture(null, requests[i]); } - return connect(requests[0].workThread, addr).thenCompose(conn -> writeChannel(conn, respTransfer, requests)); + connect(requests[0].workThread, addr).whenComplete((conn, t) -> { + if (t != null) { + for (ClientFuture f : respFutures) { + f.completeExceptionally(t); + } + } else { + CompletableFuture[] fs = writeChannel(conn, respTransfer, requests); + for (int i = 0; i < respFutures.length; i++) { + final int index = i; + fs[index].whenComplete((v, e) -> { + if (e != null) { + respFutures[index].completeExceptionally(e); + } else { + respFutures[index].complete(v); + } + }); + } + } + }); + return respFutures; } protected CompletableFuture

writeChannel(ClientConnection conn, R request) { return conn.writeChannel(request); } - protected CompletableFuture> writeChannel(ClientConnection conn, R... requests) { + protected CompletableFuture

[] writeChannel(ClientConnection conn, R... requests) { requests[0].traceid = Traces.computeIfAbsent(requests[0].traceid, Traces.currentTraceid()); return conn.writeChannel(requests); } - protected CompletableFuture> writeChannel( + protected CompletableFuture[] writeChannel( ClientConnection conn, Function respTransfer, R[] requests) { return conn.writeChannel(respTransfer, requests); } @@ -433,6 +454,15 @@ public abstract class Client, R extends ClientR return entrys[index]; } + protected ClientFuture createClientFuture(ClientConnection conn, R request) { + ClientFuture respFuture = new ClientFuture(conn, request); + int rts = getReadTimeoutSeconds(); + if (rts > 0 && !request.isCloseType()) { + respFuture.setTimeout(timeoutScheduler.schedule(respFuture, rts, TimeUnit.SECONDS)); + } + return respFuture; + } + protected void incrReqWritedCounter() { reqWritedCounter.increment(); } diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index 88d382a94..d26d79e04 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -114,21 +114,21 @@ public abstract class ClientConnection> writeChannel(R[] requests) { - return writeChannel((Function) null, requests); + protected final CompletableFuture

[] writeChannel(R[] requests) { + return writeChannel0((Function) null, requests); } protected final CompletableFuture writeChannel(Function respTransfer, R request) { - return writeChannel0(respTransfer, request).thenApply(v -> Utility.isEmpty(v) ? null : v.get(0)); + return writeChannel0(respTransfer, request)[0]; } // respTransfer只会在ClientCodec的读线程里调用 - protected final CompletableFuture> writeChannel(Function respTransfer, R... requests) { + protected final CompletableFuture[] writeChannel(Function respTransfer, R... requests) { return writeChannel0(respTransfer, requests); } // respTransfer只会在ClientCodec的读线程里调用 - protected final CompletableFuture> writeChannel0(Function respTransfer, R... requests) { + protected final CompletableFuture[] writeChannel0(Function respTransfer, R... requests) { if (client.debug) { client.logger.log( Level.FINEST, @@ -136,11 +136,10 @@ public abstract class ClientConnection respFuture = createClientFuture(request); + ClientFuture respFuture = client.createClientFuture(this, request); writeLock.lock(); try { offerRespFuture(respFuture); @@ -270,15 +269,6 @@ public abstract class ClientConnection createClientFuture(R request) { - ClientFuture respFuture = new ClientFuture(this, request); - int rts = this.client.getReadTimeoutSeconds(); - if (rts > 0 && !request.isCloseType()) { - respFuture.setTimeout(client.timeoutScheduler.schedule(respFuture, rts, TimeUnit.SECONDS)); - } - return respFuture; - } - @Override // AsyncConnection.beforeCloseListener public void accept(AsyncConnection t) { respWaitingCounter.reset(); diff --git a/src/main/java/org/redkale/net/client/ClientFuture.java b/src/main/java/org/redkale/net/client/ClientFuture.java index 983d6fdd5..03286c2de 100644 --- a/src/main/java/org/redkale/net/client/ClientFuture.java +++ b/src/main/java/org/redkale/net/client/ClientFuture.java @@ -8,6 +8,7 @@ package org.redkale.net.client; import java.util.Objects; import java.util.concurrent.*; import org.redkale.annotation.Nonnull; +import org.redkale.annotation.Nullable; import org.redkale.net.*; import org.redkale.util.Traces; @@ -24,7 +25,7 @@ public class ClientFuture extends CompletableFuture< @Nonnull protected final R request; - @Nonnull + @Nullable protected final ClientConnection conn; private ScheduledFuture timeout; @@ -37,7 +38,6 @@ public class ClientFuture extends CompletableFuture< ClientFuture(ClientConnection conn, R request) { super(); - Objects.requireNonNull(conn); Objects.requireNonNull(request); this.conn = conn; this.request = request; @@ -79,7 +79,7 @@ public class ClientFuture extends CompletableFuture< private void runTimeout() { String traceid = request != null ? request.getTraceid() : null; - if (request != null) { + if (request != null && conn != null) { conn.removeRespFuture(request.getRequestid(), this); } TimeoutException ex = new TimeoutException("client-request: " + request); @@ -88,17 +88,26 @@ public class ClientFuture extends CompletableFuture< workThread = request.workThread; request.workThread = null; } - if (workThread == null || workThread.getWorkExecutor() == null) { + if (conn != null && (workThread == null || workThread.getWorkExecutor() == null)) { workThread = conn.getChannel().getReadIOThread(); } - workThread.runWork(() -> { + if (workThread == null) { Traces.currentTraceid(traceid); if (!isDone()) { completeExceptionally(ex); } - Traces.removeTraceid(); - }); - conn.dispose(ex); + } else { + workThread.runWork(() -> { + Traces.currentTraceid(traceid); + if (!isDone()) { + completeExceptionally(ex); + } + Traces.removeTraceid(); + }); + } + if (conn != null) { + conn.dispose(ex); + } } @Override