diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index a5755e5ac..115e4b34c 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -1,430 +1,430 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package org.redkale.net.client; - -import java.io.Serializable; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; -import java.util.concurrent.locks.ReentrantLock; -import java.util.function.*; -import org.redkale.annotation.*; -import org.redkale.net.*; -import org.redkale.util.*; - -/** - * 注意: 要确保AsyncConnection的读写过程都必须在channel.ioThread中运行 - * - *
- * 详情见: https://redkale.org
- *
- * @author zhangjx
- * @since 2.3.0
- *
- * @param 响应对象
- */
-public abstract class ClientConnection writeChannel(R request) {
- return writeChannel(request, null);
- }
-
- protected final CompletableFuture respTransfer) {
- request.respTransfer = respTransfer;
- ClientFuture respFuture = createClientFuture(request);
- int rts = this.channel.getReadTimeoutSeconds();
- if (rts > 0 && !request.isCloseType()) {
- respFuture.setTimeout(client.timeoutScheduler.schedule(respFuture, rts, TimeUnit.SECONDS));
- }
- respWaitingCounter.increment(); //放在writeChannelInWriteThread计数会延迟,导致不准确
- writeLock.lock();
- try {
- offerRespFuture(respFuture);
- if (pauseWriting.get()) {
- pauseRequests.add(respFuture);
- } else {
- sendRequestInLocking(request, respFuture);
- }
- } finally {
- writeLock.unlock();
- }
- return respFuture;
- }
-
- //respTransfer只会在ClientCodec的读线程里调用
- protected final respTransfer) {
- ClientFuture[] respFutures = new ClientFuture[requests.length];
- int rts = this.channel.getReadTimeoutSeconds();
- for (int i = 0; i < respFutures.length; i++) {
- R request = requests[i];
- request.respTransfer = respTransfer;
- ClientFuture respFuture = createClientFuture(requests[i]);
- respFutures[i] = respFuture;
- if (rts > 0 && !request.isCloseType()) {
- respFuture.setTimeout(client.timeoutScheduler.schedule(respFuture, rts, TimeUnit.SECONDS));
- }
- }
- respWaitingCounter.add(respFutures.length);//放在writeChannelInWriteThread计数会延迟,导致不准确
-
- writeLock.lock();
- try {
- for (ClientFuture respFuture : respFutures) {
- offerRespFuture(respFuture);
- if (pauseWriting.get()) {
- pauseRequests.add(respFuture);
- }
- }
- sendRequestInLocking(respFutures);
- } finally {
- writeLock.unlock();
- }
- return Utility.allOfFutures(respFutures);
- }
-
- private void sendRequestInLocking(R request, ClientFuture respFuture) {
- if (true) { //新方式
- ByteArray array = arrayThreadLocal.get();
- array.clear();
- request.writeTo(this, array);
- if (request.isCompleted()) {
- doneRequestCounter.increment();
- } else { //还剩半包没发送完
- pauseWriting.set(true);
- currHalfWriteFuture = respFuture;
- }
- channel.fastWrite(array.getBytes());
- } else { //旧方式
- //发送请求数据包
- writeArray.clear();
- request.writeTo(this, writeArray);
- if (request.isCompleted()) {
- doneRequestCounter.increment();
- } else { //还剩半包没发送完
- pauseWriting.set(true);
- currHalfWriteFuture = respFuture;
- }
- if (writeArray.length() > 0) {
- if (writeBuffer.capacity() >= writeArray.length()) {
- writeBuffer.clear();
- writeBuffer.put(writeArray.content(), 0, writeArray.length());
- writeBuffer.flip();
- channel.write(writeBuffer, this, writeHandler);
- } else {
- channel.write(writeArray, this, writeHandler);
- }
- }
- }
- }
-
- private void sendRequestInLocking(ClientFuture[] respFutures) {
- ByteArray array = arrayThreadLocal.get();
- array.clear();
- for (ClientFuture respFuture : respFutures) {
- if (pauseWriting.get()) {
- pauseRequests.add(respFuture);
- } else {
- ClientRequest request = respFuture.request;
- request.writeTo(this, array);
- if (request.isCompleted()) {
- doneRequestCounter.increment();
- } else { //还剩半包没发送完
- pauseWriting.set(true);
- currHalfWriteFuture = respFuture;
- }
- }
- }
- channel.fastWrite(array.getBytes());
- }
-
- //发送半包和积压的请求数据包
- void sendHalfWriteInReadThread(R request, Throwable halfRequestExc) {
- writeLock.lock();
- try {
- pauseWriting.set(false);
- ClientFuture respFuture = this.currHalfWriteFuture;
- if (respFuture != null) {
- this.currHalfWriteFuture = null;
- if (halfRequestExc == null) {
- offerFirstRespFuture(respFuture);
- sendRequestInLocking(request, respFuture);
- } else {
- codec.responseComplete(true, respFuture, null, halfRequestExc);
- }
- }
- while (!pauseWriting.get() && (respFuture = pauseRequests.poll()) != null) {
- sendRequestInLocking((R) respFuture.getRequest(), respFuture);
- }
- } finally {
- writeLock.unlock();
- }
- }
-
- CompletableFuture writeVirtualRequest(R request) {
- if (!request.isVirtualType()) {
- return CompletableFuture.failedFuture(new RuntimeException("ClientVirtualRequest must be virtualType = true"));
- }
- ClientFuture
+ * 详情见: https://redkale.org
+ *
+ * @author zhangjx
+ * @since 2.3.0
+ *
+ * @param 响应对象
+ */
+public abstract class ClientConnection writeChannel(R request) {
+ return writeChannel(request, null);
+ }
+
+ protected final CompletableFuture respTransfer) {
+ request.respTransfer = respTransfer;
+ ClientFuture respFuture = createClientFuture(request);
+ int rts = this.channel.getReadTimeoutSeconds();
+ if (rts > 0 && !request.isCloseType()) {
+ respFuture.setTimeout(client.timeoutScheduler.schedule(respFuture, rts, TimeUnit.SECONDS));
+ }
+ respWaitingCounter.increment(); //放在writeChannelInWriteThread计数会延迟,导致不准确
+ writeLock.lock();
+ try {
+ offerRespFuture(respFuture);
+ if (pauseWriting.get()) {
+ pauseRequests.add(respFuture);
+ } else {
+ sendRequestInLocking(request, respFuture);
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ return respFuture;
+ }
+
+ //respTransfer只会在ClientCodec的读线程里调用
+ protected final respTransfer) {
+ ClientFuture[] respFutures = new ClientFuture[requests.length];
+ int rts = this.channel.getReadTimeoutSeconds();
+ for (int i = 0; i < respFutures.length; i++) {
+ R request = requests[i];
+ request.respTransfer = respTransfer;
+ ClientFuture respFuture = createClientFuture(requests[i]);
+ respFutures[i] = respFuture;
+ if (rts > 0 && !request.isCloseType()) {
+ respFuture.setTimeout(client.timeoutScheduler.schedule(respFuture, rts, TimeUnit.SECONDS));
+ }
+ }
+ respWaitingCounter.add(respFutures.length);//放在writeChannelInWriteThread计数会延迟,导致不准确
+
+ writeLock.lock();
+ try {
+ for (ClientFuture respFuture : respFutures) {
+ offerRespFuture(respFuture);
+ if (pauseWriting.get()) {
+ pauseRequests.add(respFuture);
+ }
+ }
+ sendRequestInLocking(respFutures);
+ } finally {
+ writeLock.unlock();
+ }
+ return Utility.allOfFutures(respFutures);
+ }
+
+ private void sendRequestInLocking(R request, ClientFuture respFuture) {
+ if (false) { //新方式
+ ByteArray array = arrayThreadLocal.get();
+ array.clear();
+ request.writeTo(this, array);
+ if (request.isCompleted()) {
+ doneRequestCounter.increment();
+ } else { //还剩半包没发送完
+ pauseWriting.set(true);
+ currHalfWriteFuture = respFuture;
+ }
+ channel.fastWrite(array.getBytes());
+ } else { //旧方式
+ //发送请求数据包
+ writeArray.clear();
+ request.writeTo(this, writeArray);
+ if (request.isCompleted()) {
+ doneRequestCounter.increment();
+ } else { //还剩半包没发送完
+ pauseWriting.set(true);
+ currHalfWriteFuture = respFuture;
+ }
+ if (writeArray.length() > 0) {
+ if (writeBuffer.capacity() >= writeArray.length()) {
+ writeBuffer.clear();
+ writeBuffer.put(writeArray.content(), 0, writeArray.length());
+ writeBuffer.flip();
+ channel.write(writeBuffer, this, writeHandler);
+ } else {
+ channel.write(writeArray, this, writeHandler);
+ }
+ }
+ }
+ }
+
+ private void sendRequestInLocking(ClientFuture[] respFutures) {
+ ByteArray array = arrayThreadLocal.get();
+ array.clear();
+ for (ClientFuture respFuture : respFutures) {
+ if (pauseWriting.get()) {
+ pauseRequests.add(respFuture);
+ } else {
+ ClientRequest request = respFuture.request;
+ request.writeTo(this, array);
+ if (request.isCompleted()) {
+ doneRequestCounter.increment();
+ } else { //还剩半包没发送完
+ pauseWriting.set(true);
+ currHalfWriteFuture = respFuture;
+ }
+ }
+ }
+ channel.fastWrite(array.getBytes());
+ }
+
+ //发送半包和积压的请求数据包
+ void sendHalfWriteInReadThread(R request, Throwable halfRequestExc) {
+ writeLock.lock();
+ try {
+ pauseWriting.set(false);
+ ClientFuture respFuture = this.currHalfWriteFuture;
+ if (respFuture != null) {
+ this.currHalfWriteFuture = null;
+ if (halfRequestExc == null) {
+ offerFirstRespFuture(respFuture);
+ sendRequestInLocking(request, respFuture);
+ } else {
+ codec.responseComplete(true, respFuture, null, halfRequestExc);
+ }
+ }
+ while (!pauseWriting.get() && (respFuture = pauseRequests.poll()) != null) {
+ sendRequestInLocking((R) respFuture.getRequest(), respFuture);
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ CompletableFuture writeVirtualRequest(R request) {
+ if (!request.isVirtualType()) {
+ return CompletableFuture.failedFuture(new RuntimeException("ClientVirtualRequest must be virtualType = true"));
+ }
+ ClientFuture> writeChannel(R[] requests) {
- return writeChannel(requests, null);
- }
-
- //respTransfer只会在ClientCodec的读线程里调用
- protected final
> writeChannel(R[] requests, Function
> writeChannel(R[] requests) {
+ return writeChannel(requests, null);
+ }
+
+ //respTransfer只会在ClientCodec的读线程里调用
+ protected final
> writeChannel(R[] requests, Function