From cb3cb00e2b4b94c0b3574621580a8920c8cbd97e Mon Sep 17 00:00:00 2001 From: redkale Date: Sat, 21 Oct 2023 08:04:53 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96client?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/net/client/Client.java | 28 ++++++++++++++++--- .../redkale/net/client/ClientConnection.java | 3 -- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/redkale/net/client/Client.java b/src/main/java/org/redkale/net/client/Client.java index b3db2b951..6d5cd41a3 100644 --- a/src/main/java/org/redkale/net/client/Client.java +++ b/src/main/java/org/redkale/net/client/Client.java @@ -13,6 +13,7 @@ import java.util.function.*; import java.util.logging.Logger; import org.redkale.net.*; import org.redkale.util.*; +import static org.redkale.util.Utility.isNotEmpty; /** * @@ -327,11 +328,26 @@ public abstract class Client, R extends ClientR final Queue> waitQueue = this.connAcquireWaitings[connIndex]; if (!pool || this.connOpenStates[connIndex].compareAndSet(false, true)) { CompletableFuture future = group.createClient(tcp, this.address.randomAddress(), readTimeoutSeconds, writeTimeoutSeconds) - .thenApply(c -> (C) createClientConnection(connIndex, c).setMaxPipelines(maxPipelines)); + .thenApply(c -> { + if (isNotEmpty(traceid)) { + Traces.computeIfAbsent(traceid); + } + return (C) createClientConnection(connIndex, c).setMaxPipelines(maxPipelines); + }); R virtualReq = createVirtualRequestAfterConnect(); if (virtualReq != null) { virtualReq.traceid = traceid; - future = future.thenCompose(conn -> conn.writeVirtualRequest(virtualReq).thenApply(v -> conn)); + future = future.thenCompose(conn -> { + if (isNotEmpty(traceid)) { + Traces.computeIfAbsent(traceid); + } + return conn.writeVirtualRequest(virtualReq).thenApply(v -> { + if (isNotEmpty(traceid)) { + Traces.computeIfAbsent(traceid); + } + return conn; + }); + }); } else { future = future.thenApply(conn -> { conn.channel.readRegister(conn.getCodec()); //不用readRegisterInIOThread,因executeRead可能会异步 @@ -342,7 +358,9 @@ public abstract class Client, R extends ClientR future = future.thenCompose(authenticate); } return future.thenApply(c -> { - Traces.computeIfAbsent(traceid); + if (isNotEmpty(traceid)) { + Traces.computeIfAbsent(traceid); + } c.setAuthenticated(true); if (pool) { this.connArray[connIndex] = c; @@ -352,7 +370,9 @@ public abstract class Client, R extends ClientR if (workThread != null) { CompletableFuture fs = f; workThread.execute(() -> { - Traces.computeIfAbsent(traceid); + if (isNotEmpty(traceid)) { + Traces.computeIfAbsent(traceid); + } fs.complete(c); }); } else { diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index 0c6b0b254..187ca470d 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -14,7 +14,6 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.concurrent.locks.ReentrantLock; import java.util.function.*; -import java.util.logging.Level; import org.redkale.annotation.*; import org.redkale.net.*; import org.redkale.util.*; @@ -134,7 +133,6 @@ public abstract class ClientConnection implements Co } else { sendRequestInLocking(request, respFuture); } - client.logger.log(Level.INFO, channel + ", " + request.getTraceid()+ " 发送完请求: " + request); } finally { writeLock.unlock(); } @@ -321,7 +319,6 @@ public abstract class ClientConnection implements Co public void dispose(Throwable exc) { channel.offerWriteBuffer(writeBuffer); channel.dispose(); - System.out.println(Thread.currentThread().getName() + ": " + channel + ", 被关闭了"); Throwable e = exc == null ? new ClosedChannelException() : exc; CompletableFuture f; respWaitingCounter.reset();