From 5975be44d913082b73343781a77534333c5f228f Mon Sep 17 00:00:00 2001 From: Redkale Date: Wed, 4 Jan 2023 11:53:17 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96ClientFuture?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../redkale/net/client/ClientConnection.java | 12 +++--- .../org/redkale/net/client/ClientFuture.java | 40 +++++++++++++++++-- 2 files changed, 42 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index d6b900ead..f58ad60da 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -142,7 +142,7 @@ public abstract class ClientConnection implements Co } ClientFuture f = entry.getValue(); if (f != null) { - f.mergeCount++; + f.incrMergeCount(); } //req.respFuture.mergeCount++; } @@ -214,9 +214,7 @@ public abstract class ClientConnection implements Co client.respDoneCounter.increment(); } try { - if (respFuture.timeout != null) { - respFuture.timeout.cancel(true); - } + respFuture.cancelTimeout(); ClientRequest request = respFuture.request; //if (client.finest) client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + ClientConnection.this + ", 回调处理, req=" + request + ", message=" + rs.message); preComplete(rs.message, (R) request, rs.exc); @@ -289,7 +287,7 @@ public abstract class ClientConnection implements Co Serializable reqid = rs.getRequestid(); ClientFuture respFuture = reqid == null ? responseQueue.poll() : responseMap.remove(reqid); if (respFuture != null) { - int mergeCount = respFuture.mergeCount; + int mergeCount = respFuture.getMergeCount(); completeResponse(rs, respFuture); if (mergeCount > 0) { for (int i = 0; i < mergeCount; i++) { @@ -363,8 +361,8 @@ public abstract class ClientConnection implements Co respFuture = createClientFuture(request); int rts = this.channel.getReadTimeoutSeconds(); if (rts > 0 && respFuture.request != null) { - respFuture.conn = this; - respFuture.timeout = client.timeoutScheduler.schedule(respFuture, rts, TimeUnit.SECONDS); + respFuture.setConn(this); + respFuture.setTimeout(client.timeoutScheduler.schedule(respFuture, rts, TimeUnit.SECONDS)); } } respWaitingCounter.increment(); //放在writeChannelInThread计数会延迟,导致不准确 diff --git a/src/main/java/org/redkale/net/client/ClientFuture.java b/src/main/java/org/redkale/net/client/ClientFuture.java index fb144e2d5..f9272781b 100644 --- a/src/main/java/org/redkale/net/client/ClientFuture.java +++ b/src/main/java/org/redkale/net/client/ClientFuture.java @@ -26,21 +26,55 @@ public class ClientFuture extends CompletableFuture implements Runnable { public boolean completeExceptionally(Throwable ex) { return true; } + + @Override + void setConn(ClientConnection conn) { + } + + @Override + void setTimeout(ScheduledFuture timeout) { + } + + @Override + void incrMergeCount() { + } + + @Override + public void run() { + } }; protected final ClientRequest request; - ScheduledFuture timeout; + private ScheduledFuture timeout; - int mergeCount; //合并的个数,不算自身 + private int mergeCount; //合并的个数,不算自身 - ClientConnection conn; + private ClientConnection conn; public ClientFuture(ClientRequest request) { super(); this.request = request; } + void setConn(ClientConnection conn) { + this.conn = conn; + } + + void setTimeout(ScheduledFuture timeout) { + this.timeout = timeout; + } + + void cancelTimeout() { + if (timeout != null) { + timeout.cancel(true); + } + } + + void incrMergeCount() { + mergeCount++; + } + public int getMergeCount() { return mergeCount; }