This commit is contained in:
redkale
2024-08-27 11:42:05 +08:00
parent 1d3049b743
commit 4c6403a4ca

View File

@@ -34,9 +34,6 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
protected final Client client; protected final Client client;
@Nonnull
protected LongAdder respWaitingCounter;
protected final LongAdder doneRequestCounter = new LongAdder(); protected final LongAdder doneRequestCounter = new LongAdder();
protected final LongAdder doneResponseCounter = new LongAdder(); protected final LongAdder doneResponseCounter = new LongAdder();
@@ -45,10 +42,10 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
protected final ByteArray writeArray = new ByteArray(); protected final ByteArray writeArray = new ByteArray();
protected final ThreadLocal<ByteArray> arrayThreadLocal = Utility.withInitialThreadLocal(ByteArray::new);
protected final ByteBuffer writeBuffer; protected final ByteBuffer writeBuffer;
protected final AsyncConnection channel;
protected final CompletionHandler<Integer, ClientConnection> writeHandler = protected final CompletionHandler<Integer, ClientConnection> writeHandler =
new CompletionHandler<Integer, ClientConnection>() { new CompletionHandler<Integer, ClientConnection>() {
@@ -63,17 +60,11 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
} }
}; };
final AtomicBoolean pauseWriting = new AtomicBoolean(); @Nonnull
protected LongAdder respWaitingCounter;
final ConcurrentLinkedQueue<ClientFuture> pauseRequests = new ConcurrentLinkedQueue<>();
// pauseWriting=true此字段才会有值; pauseWriting=false此字段值为null
ClientFuture currHalfWriteFuture;
@Nonnull @Nonnull
private Client.AddressConnEntry connEntry; protected Client.AddressConnEntry connEntry;
protected final AsyncConnection channel;
private final ClientCodec<R, P> codec; private final ClientCodec<R, P> codec;
@@ -85,6 +76,13 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
private final AtomicBoolean closed = new AtomicBoolean(); private final AtomicBoolean closed = new AtomicBoolean();
final AtomicBoolean pauseWriting = new AtomicBoolean();
final ConcurrentLinkedQueue<ClientFuture> pauseRequests = new ConcurrentLinkedQueue<>();
// pauseWriting=true此字段才会有值; pauseWriting=false此字段值为null
ClientFuture currHalfWriteFuture;
Iterator<ClientFuture<R, P>> currRespIterator; // 必须在调用decodeMessages之前重置为null Iterator<ClientFuture<R, P>> currRespIterator; // 必须在调用decodeMessages之前重置为null
private int maxPipelines; // 最大并行处理数 private int maxPipelines; // 最大并行处理数
@@ -193,18 +191,18 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
} }
// 发送半包和积压的请求数据包 // 发送半包和积压的请求数据包
void sendHalfWriteInReadThread(R request, Throwable halfRequestExc) { void sendHalfWriteInReadThread(R halfRequest, Throwable halfException) {
writeLock.lock(); writeLock.lock();
try { try {
pauseWriting.set(false); pauseWriting.set(false);
ClientFuture respFuture = this.currHalfWriteFuture; ClientFuture respFuture = this.currHalfWriteFuture;
if (respFuture != null) { if (respFuture != null) {
this.currHalfWriteFuture = null; this.currHalfWriteFuture = null;
if (halfRequestExc == null) { if (halfException == null) {
offerFirstRespFuture(respFuture); offerFirstRespFuture(respFuture);
sendRequestToChannel(respFuture); sendRequestToChannel(respFuture);
} else { } else {
codec.responseComplete(true, respFuture, null, halfRequestExc); codec.responseComplete(true, respFuture, null, halfException);
} }
} }
while (!pauseWriting.get() && (respFuture = pauseRequests.poll()) != null) { while (!pauseWriting.get() && (respFuture = pauseRequests.poll()) != null) {