diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index e52960aed..9e68f0511 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -380,9 +380,9 @@ abstract class AsyncNioConnection extends AsyncConnection { if (writeCount == 0) { if (hasRemain) { - //writeCompleted = false; - //writeTotal = totalCount; - continue; //要全部输出完才返回 + writeCompleted = false; + writeTotal = totalCount; + //continue; //要全部输出完才返回 } break; } else if (writeCount < 0) { diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index ef24e4714..d993f7614 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -57,10 +57,47 @@ public abstract class ClientConnection implements Co @Override public void completed(Integer result, ClientConnection attachment) { + if (pauseWriting.get()) { //等待sendHalfWriteInReadThread调用 + if (!writePending.compareAndSet(true, false)) { + completed(0, attachment); + } + return; + } + ByteArray array = writeArray; + array.clear(); + ClientFuture respFuture; + while ((respFuture = requestQueue.poll()) != null) { + if (!respFuture.isDone()) { + R request = respFuture.request; + request.writeTo(attachment, array); + if (request.isCompleted()) { + doneRequestCounter.increment(); + } else { //还剩半包没发送完 + pauseWriting.set(true); + currHalfWriteFuture = respFuture; + break; + } + } + } + if (array.length() > 0) { + if (writeBuffer.capacity() >= array.length()) { + writeBuffer.clear(); + writeBuffer.put(array.content(), 0, array.length()); + writeBuffer.flip(); + channel.write(writeBuffer, attachment, this); + } else { + channel.write(array, attachment, this); + } + } else { + if (!writePending.compareAndSet(true, false)) { + completed(0, attachment); + } + } } @Override public void failed(Throwable exc, ClientConnection attachment) { + writePending.set(false); attachment.dispose(exc); } }; @@ -69,7 +106,8 @@ public abstract class ClientConnection implements Co final ConcurrentLinkedQueue pauseRequests = new ConcurrentLinkedQueue<>(); - ClientFuture currHalfWriteFuture; //pauseWriting=true,此字段才会有值; pauseWriting=false,此字段值为null + //pauseWriting=true,此字段才会有值; pauseWriting=false,此字段值为null + ClientFuture currHalfWriteFuture; @Nullable private final Client.AddressConnEntry connEntry; @@ -78,8 +116,10 @@ public abstract class ClientConnection implements Co private final ClientCodec codec; + private final ConcurrentLinkedQueue> requestQueue = new ConcurrentLinkedQueue(); + //respFutureQueue、respFutureMap二选一, SPSC队列模式 - private final ConcurrentLinkedDeque> respFutureQueue = new ConcurrentLinkedDeque<>(); //Utility.unsafe() != null ? new MpscGrowableArrayQueue<>(16, 1 << 16) : new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedDeque> respFutureQueue = new ConcurrentLinkedDeque<>(); //respFutureQueue、respFutureMap二选一, key: requestid, SPSC模式 private final ConcurrentHashMap> respFutureMap = new ConcurrentHashMap<>(); @@ -132,24 +172,28 @@ public abstract class ClientConnection implements Co } private void sendRequestInLocking(R request, ClientFuture respFuture) { - //发送请求数据包 - writeArray.clear(); - request.writeTo(this, writeArray); - if (request.isCompleted()) { - doneRequestCounter.increment(); - } else { //还剩半包没发送完 - pauseWriting.set(true); - currHalfWriteFuture = respFuture; - } - if (writeArray.length() > 0) { - if (writeBuffer.capacity() >= writeArray.length()) { - writeBuffer.clear(); - writeBuffer.put(writeArray.content(), 0, writeArray.length()); - writeBuffer.flip(); - channel.write(writeBuffer, this, writeHandler); - } else { - channel.write(writeArray, this, writeHandler); + if (writePending.compareAndSet(false, true)) { + //发送请求数据包 + writeArray.clear(); + request.writeTo(this, writeArray); + if (request.isCompleted()) { + doneRequestCounter.increment(); + } else { //还剩半包没发送完 + pauseWriting.set(true); + currHalfWriteFuture = respFuture; } + if (writeArray.length() > 0) { + if (writeBuffer.capacity() >= writeArray.length()) { + writeBuffer.clear(); + writeBuffer.put(writeArray.content(), 0, writeArray.length()); + writeBuffer.flip(); + channel.write(writeBuffer, this, writeHandler); + } else { + channel.write(writeArray, this, writeHandler); + } + } + } else { + requestQueue.add(respFuture); } } @@ -161,14 +205,26 @@ public abstract class ClientConnection implements Co ClientFuture respFuture = this.currHalfWriteFuture; if (respFuture != null) { this.currHalfWriteFuture = null; - if (halfRequestExc == null) { - offerFirstRespFuture(respFuture); - sendRequestInLocking(request, respFuture); - } else { - codec.responseComplete(true, respFuture, null, halfRequestExc); + if (!respFuture.isDone()) { + if (halfRequestExc == null) { + offerFirstRespFuture(respFuture); + ClientFuture future; + while ((future = pauseRequests.poll()) != null) { + requestQueue.add(future); + } + sendRequestInLocking(request, respFuture); + return; + } else { + codec.responseComplete(true, respFuture, null, halfRequestExc); + } } } - while (!pauseWriting.get() && (respFuture = pauseRequests.poll()) != null) { + respFuture = pauseRequests.poll(); + if (respFuture != null) { + ClientFuture future; + while ((future = pauseRequests.poll()) != null) { + requestQueue.add(future); + } sendRequestInLocking((R) respFuture.getRequest(), respFuture); } } finally {