From e6a2167261483f6ebb34abe255e0ae6c4b3033c8 Mon Sep 17 00:00:00 2001 From: redkale Date: Fri, 7 Apr 2023 14:05:13 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96client?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/org/redkale/mq/MessageRespFutureNode.java | 2 +- src/main/java/org/redkale/net/AsyncNioConnection.java | 6 +++--- .../java/org/redkale/net/client/ClientConnection.java | 8 +++++--- src/main/java/org/redkale/net/client/ClientFuture.java | 2 +- src/main/java/org/redkale/net/http/WebSocketFuture.java | 2 +- .../java/org/redkale/net/http/WebSocketWriteHandler.java | 2 +- 6 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/redkale/mq/MessageRespFutureNode.java b/src/main/java/org/redkale/mq/MessageRespFutureNode.java index 265a2a8f2..965ea321c 100644 --- a/src/main/java/org/redkale/mq/MessageRespFutureNode.java +++ b/src/main/java/org/redkale/mq/MessageRespFutureNode.java @@ -50,7 +50,7 @@ public class MessageRespFutureNode implements Runnable { @Override //超时后被timeoutExecutor调用 public void run() { //timeout respNodes.remove(this.seqid); - future.completeExceptionally(new TimeoutException()); + future.completeExceptionally(new TimeoutException("message-record: "+message)); logger.log(Level.WARNING, getClass().getSimpleName() + " wait msg: " + message + " timeout " + (System.currentTimeMillis() - createTime) + "ms" + (message.userid != null || (message.groupid != null && !message.groupid.isEmpty()) ? (message.userid != null ? (", userid:" + message.userid) : (", groupid:" + message.groupid)) : "")); } diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index c33ca6207..8892eccfc 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -397,9 +397,9 @@ abstract class AsyncNioConnection extends AsyncConnection { if (writeCount == 0) { if (hasRemain) { - //writeCompleted = false; - //writeTotal = totalCount; - continue; //要全部输出完才返回 + writeCompleted = false; + writeTotal = totalCount; + //continue; //要全部输出完才返回 } break; } else if (writeCount < 0) { diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index bb2225979..ef24e4714 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -9,7 +9,7 @@ import java.io.Serializable; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; -import java.util.*; +import java.util.Iterator; import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.concurrent.locks.ReentrantLock; @@ -45,6 +45,8 @@ public abstract class ClientConnection implements Co protected final LongAdder doneResponseCounter = new LongAdder(); + protected final AtomicBoolean writePending = new AtomicBoolean(); + protected final ReentrantLock writeLock = new ReentrantLock(); protected final ByteArray writeArray = new ByteArray(); @@ -77,10 +79,10 @@ public abstract class ClientConnection implements Co private final ClientCodec codec; //respFutureQueue、respFutureMap二选一, SPSC队列模式 - private final Deque> respFutureQueue = new ConcurrentLinkedDeque<>(); //Utility.unsafe() != null ? new MpscGrowableArrayQueue<>(16, 1 << 16) : new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedDeque> respFutureQueue = new ConcurrentLinkedDeque<>(); //Utility.unsafe() != null ? new MpscGrowableArrayQueue<>(16, 1 << 16) : new ConcurrentLinkedQueue<>(); //respFutureQueue、respFutureMap二选一, key: requestid, SPSC模式 - private final Map> respFutureMap = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> respFutureMap = new ConcurrentHashMap<>(); Iterator> currRespIterator; //必须在调用decodeMessages之前重置为null diff --git a/src/main/java/org/redkale/net/client/ClientFuture.java b/src/main/java/org/redkale/net/client/ClientFuture.java index b41001b49..3761cbe27 100644 --- a/src/main/java/org/redkale/net/client/ClientFuture.java +++ b/src/main/java/org/redkale/net/client/ClientFuture.java @@ -73,7 +73,7 @@ public class ClientFuture extends CompletableFuture< private void runTimeout() { conn.removeRespFuture(request.getRequestid(), this); - TimeoutException ex = new TimeoutException(); + TimeoutException ex = new TimeoutException("client-request: " + request); WorkThread workThread = null; if (request != null) { workThread = request.workThread; diff --git a/src/main/java/org/redkale/net/http/WebSocketFuture.java b/src/main/java/org/redkale/net/http/WebSocketFuture.java index cbd53b1b3..d08b2d56f 100644 --- a/src/main/java/org/redkale/net/http/WebSocketFuture.java +++ b/src/main/java/org/redkale/net/http/WebSocketFuture.java @@ -49,7 +49,7 @@ public class WebSocketFuture extends CompletableFuture implements Runna @Override public void run() { - TimeoutException ex = new TimeoutException(); + TimeoutException ex = new TimeoutException("packets: " + Arrays.toString(packets)); workThread.runWork(() -> completeExceptionally(ex)); } diff --git a/src/main/java/org/redkale/net/http/WebSocketWriteHandler.java b/src/main/java/org/redkale/net/http/WebSocketWriteHandler.java index 6eb316ce6..2f345545e 100644 --- a/src/main/java/org/redkale/net/http/WebSocketWriteHandler.java +++ b/src/main/java/org/redkale/net/http/WebSocketWriteHandler.java @@ -31,7 +31,7 @@ public class WebSocketWriteHandler implements CompletionHandler { protected final List> respList = new ArrayList(); - protected final ConcurrentLinkedDeque> requestQueue = new ConcurrentLinkedDeque(); + protected final ConcurrentLinkedQueue> requestQueue = new ConcurrentLinkedQueue(); public WebSocketWriteHandler(HttpContext context, WebSocket webSocket, ObjectPool byteArrayPool) { this.context = context;