From b3862cea72a8a0223d29bb9d8bb9cb25c9efc2f5 Mon Sep 17 00:00:00 2001 From: Redkale Date: Sun, 1 Jan 2023 13:53:19 +0800 Subject: [PATCH] =?UTF-8?q?Client=E5=AE=9E=E7=8E=B0requestid?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/redkale/net/client/ClientCodec.java | 4 +- .../redkale/net/client/ClientConnection.java | 51 ++++++++++++++----- .../org/redkale/net/client/ClientFuture.java | 6 ++- .../org/redkale/net/client/ClientRequest.java | 9 +++- .../redkale/net/client/ClientResponse.java | 6 +++ 5 files changed, 59 insertions(+), 17 deletions(-) diff --git a/src/main/java/org/redkale/net/client/ClientCodec.java b/src/main/java/org/redkale/net/client/ClientCodec.java index b31bd1e79..0b3344b14 100644 --- a/src/main/java/org/redkale/net/client/ClientCodec.java +++ b/src/main/java/org/redkale/net/client/ClientCodec.java @@ -34,8 +34,8 @@ public abstract class ClientCodec { //返回true: array会clear, 返回false: buffer会clear public abstract boolean decodeMessages(ByteBuffer buffer, ByteArray array); - protected Queue responseQueue() { - return connection.responseQueue; + protected Iterator responseIterator() { + return connection.responseQueue2.iterator(); } public List> pollMessages() { diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index 7a002104e..e0b6c1125 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -5,6 +5,7 @@ */ package org.redkale.net.client; +import java.io.Serializable; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.*; @@ -51,7 +52,10 @@ public abstract class ClientConnection implements Co protected final Queue requestQueue = new ArrayDeque<>(); - protected final ArrayDeque responseQueue = new ArrayDeque<>(); + final ArrayDeque responseQueue2 = new ArrayDeque<>(); + + //key: requestid + final HashMap responseMap = new LinkedHashMap<>(); protected final CompletionHandler writeHandler = new CompletionHandler() { @@ -91,6 +95,10 @@ public abstract class ClientConnection implements Co return this; } + protected boolean isWaitingResponseEmpty() { + return responseQueue2.isEmpty() && responseMap.isEmpty(); + } + protected void resumeWrite() { this.pauseWriting.set(false); } @@ -99,7 +107,7 @@ public abstract class ClientConnection implements Co ClientConnection conn = this; ByteArray rw = conn.writeArray; rw.clear(); - int pipelines = maxPipelines > 1 ? (maxPipelines - responseQueue.size()) : 1; + int pipelines = maxPipelines > 1 ? (maxPipelines - responseQueue2.size() - responseMap.size()) : 1; if (must && pipelines < 1) { pipelines = 1; } @@ -120,7 +128,7 @@ public abstract class ClientConnection implements Co break; } writeLastRequest = req; - if (req.canMerge(conn)) { + if (req.getRequestid() == null && req.canMerge(conn)) { R r; while ((r = requestQueue.poll()) != null) { i++; @@ -179,7 +187,12 @@ public abstract class ClientConnection implements Co if (respFuture != null) { if (!respFuture.request.isCompleted()) { if (rs.exc == null) { - responseQueue.offerFirst(respFuture); + Serializable reqid = respFuture.request.getRequestid(); + if (reqid == null) { + responseQueue2.offerFirst(respFuture); + } else { + responseMap.put(reqid, respFuture); + } pauseWriting.set(false); return; } else { //异常了需要清掉半包 @@ -264,13 +277,14 @@ public abstract class ClientConnection implements Co List> results = codec.pollMessages(); if (results != null) { for (ClientResponse

rs : results) { - ClientFuture respFuture = responseQueue.poll(); + Serializable reqid = rs.getRequestid(); + ClientFuture respFuture = reqid == null ? responseQueue2.poll() : responseMap.remove(reqid); if (respFuture != null) { int mergeCount = respFuture.mergeCount; completeResponse(rs, respFuture); if (mergeCount > 0) { for (int i = 0; i < mergeCount; i++) { - respFuture = responseQueue.poll(); + respFuture = reqid == null ? responseQueue2.poll() : responseMap.remove(reqid); if (respFuture != null) { completeResponse(rs, respFuture); } @@ -282,7 +296,7 @@ public abstract class ClientConnection implements Co if (buffer.hasRemaining()) { decodeResponse(buffer); - } else if (responseQueue.isEmpty()) { //队列都已处理完了 + } else if (isWaitingResponseEmpty()) { //队列都已处理完了 buffer.clear(); channel.setReadBuffer(buffer); if (readPending.compareAndSet(true, false)) { @@ -354,12 +368,17 @@ public abstract class ClientConnection implements Co } private void writeChannelInThread(R request, ClientFuture respFuture) { + Serializable reqid = request.getRequestid(); //保证顺序一致 if (client.closeRequest != null && respFuture.request == client.closeRequest) { - responseQueue.offer(ClientFuture.EMPTY); + responseQueue2.offer(ClientFuture.EMPTY); } else { request.respFuture = respFuture; - responseQueue.offer(respFuture); + if (reqid == null) { + responseQueue2.offer(respFuture); + } else { + responseMap.put(reqid, respFuture); + } } requestQueue.offer(request); if (isAuthenticated() && client.reqWritedCounter != null) { @@ -406,9 +425,17 @@ public abstract class ClientConnection implements Co CompletableFuture f; respWaitingCounter.reset(); WorkThread thread = channel.getAsyncIOThread(); - while ((f = responseQueue.poll()) != null) { - CompletableFuture future = f; - thread.runWork(() -> future.completeExceptionally(e)); + if (!responseQueue2.isEmpty()) { + while ((f = responseQueue2.poll()) != null) { + CompletableFuture future = f; + thread.runWork(() -> future.completeExceptionally(e)); + } + } + if (!responseMap.isEmpty()) { + responseMap.forEach((key, future) -> { + responseMap.remove(key); + thread.runWork(() -> future.completeExceptionally(e)); + }); } } diff --git a/src/main/java/org/redkale/net/client/ClientFuture.java b/src/main/java/org/redkale/net/client/ClientFuture.java index 70e60ae0f..1b70397eb 100644 --- a/src/main/java/org/redkale/net/client/ClientFuture.java +++ b/src/main/java/org/redkale/net/client/ClientFuture.java @@ -72,10 +72,14 @@ public class ClientFuture extends CompletableFuture implements Runnable { } private void runTimeout() { - Queue responseQueue = conn.responseQueue; + Queue responseQueue = conn.responseQueue2; if (responseQueue != null) { responseQueue.remove(this); } + if (request.getRequestid() != null) { + conn.responseMap.remove(request.getRequestid()); + } + TimeoutException ex = new TimeoutException(); WorkThread workThread = null; if (request != null) { diff --git a/src/main/java/org/redkale/net/client/ClientRequest.java b/src/main/java/org/redkale/net/client/ClientRequest.java index 316c3786f..34be758c6 100644 --- a/src/main/java/org/redkale/net/client/ClientRequest.java +++ b/src/main/java/org/redkale/net/client/ClientRequest.java @@ -5,7 +5,8 @@ */ package org.redkale.net.client; -import java.util.function.*; +import java.io.Serializable; +import java.util.function.BiConsumer; import org.redkale.net.WorkThread; import org.redkale.util.*; @@ -28,6 +29,10 @@ public abstract class ClientRequest implements BiConsumer { protected Throwable exc; + public Serializable getRequestid() { + return null; + } + public ClientResponse(P result) { this.message = result; }