ClientConnection优化临时方案

This commit is contained in:
redkale
2023-06-27 15:25:27 +08:00
parent f3475341a0
commit 5e6b2fb86e
2 changed files with 84 additions and 28 deletions

View File

@@ -380,9 +380,9 @@ abstract class AsyncNioConnection extends AsyncConnection {
if (writeCount == 0) {
if (hasRemain) {
//writeCompleted = false;
//writeTotal = totalCount;
continue; //要全部输出完才返回
writeCompleted = false;
writeTotal = totalCount;
//continue; //要全部输出完才返回
}
break;
} else if (writeCount < 0) {

View File

@@ -57,10 +57,47 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
@Override
public void completed(Integer result, ClientConnection attachment) {
if (pauseWriting.get()) { //等待sendHalfWriteInReadThread调用
if (!writePending.compareAndSet(true, false)) {
completed(0, attachment);
}
return;
}
ByteArray array = writeArray;
array.clear();
ClientFuture<R, P> respFuture;
while ((respFuture = requestQueue.poll()) != null) {
if (!respFuture.isDone()) {
R request = respFuture.request;
request.writeTo(attachment, array);
if (request.isCompleted()) {
doneRequestCounter.increment();
} else { //还剩半包没发送完
pauseWriting.set(true);
currHalfWriteFuture = respFuture;
break;
}
}
}
if (array.length() > 0) {
if (writeBuffer.capacity() >= array.length()) {
writeBuffer.clear();
writeBuffer.put(array.content(), 0, array.length());
writeBuffer.flip();
channel.write(writeBuffer, attachment, this);
} else {
channel.write(array, attachment, this);
}
} else {
if (!writePending.compareAndSet(true, false)) {
completed(0, attachment);
}
}
}
@Override
public void failed(Throwable exc, ClientConnection attachment) {
writePending.set(false);
attachment.dispose(exc);
}
};
@@ -69,7 +106,8 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
final ConcurrentLinkedQueue<ClientFuture> pauseRequests = new ConcurrentLinkedQueue<>();
ClientFuture currHalfWriteFuture; //pauseWriting=true此字段才会有值; pauseWriting=false此字段值为null
//pauseWriting=true此字段才会有值; pauseWriting=false此字段值为null
ClientFuture currHalfWriteFuture;
@Nullable
private final Client.AddressConnEntry connEntry;
@@ -78,8 +116,10 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
private final ClientCodec<R, P> codec;
private final ConcurrentLinkedQueue<ClientFuture<R, P>> requestQueue = new ConcurrentLinkedQueue();
//respFutureQueue、respFutureMap二选一 SPSC队列模式
private final ConcurrentLinkedDeque<ClientFuture<R, P>> respFutureQueue = new ConcurrentLinkedDeque<>(); //Utility.unsafe() != null ? new MpscGrowableArrayQueue<>(16, 1 << 16) : new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedDeque<ClientFuture<R, P>> respFutureQueue = new ConcurrentLinkedDeque<>();
//respFutureQueue、respFutureMap二选一, key: requestid SPSC模式
private final ConcurrentHashMap<Serializable, ClientFuture<R, P>> respFutureMap = new ConcurrentHashMap<>();
@@ -132,24 +172,28 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
}
private void sendRequestInLocking(R request, ClientFuture respFuture) {
//发送请求数据包
writeArray.clear();
request.writeTo(this, writeArray);
if (request.isCompleted()) {
doneRequestCounter.increment();
} else { //还剩半包没发送完
pauseWriting.set(true);
currHalfWriteFuture = respFuture;
}
if (writeArray.length() > 0) {
if (writeBuffer.capacity() >= writeArray.length()) {
writeBuffer.clear();
writeBuffer.put(writeArray.content(), 0, writeArray.length());
writeBuffer.flip();
channel.write(writeBuffer, this, writeHandler);
} else {
channel.write(writeArray, this, writeHandler);
if (writePending.compareAndSet(false, true)) {
//发送请求数据包
writeArray.clear();
request.writeTo(this, writeArray);
if (request.isCompleted()) {
doneRequestCounter.increment();
} else { //还剩半包没发送完
pauseWriting.set(true);
currHalfWriteFuture = respFuture;
}
if (writeArray.length() > 0) {
if (writeBuffer.capacity() >= writeArray.length()) {
writeBuffer.clear();
writeBuffer.put(writeArray.content(), 0, writeArray.length());
writeBuffer.flip();
channel.write(writeBuffer, this, writeHandler);
} else {
channel.write(writeArray, this, writeHandler);
}
}
} else {
requestQueue.add(respFuture);
}
}
@@ -161,14 +205,26 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
ClientFuture respFuture = this.currHalfWriteFuture;
if (respFuture != null) {
this.currHalfWriteFuture = null;
if (halfRequestExc == null) {
offerFirstRespFuture(respFuture);
sendRequestInLocking(request, respFuture);
} else {
codec.responseComplete(true, respFuture, null, halfRequestExc);
if (!respFuture.isDone()) {
if (halfRequestExc == null) {
offerFirstRespFuture(respFuture);
ClientFuture future;
while ((future = pauseRequests.poll()) != null) {
requestQueue.add(future);
}
sendRequestInLocking(request, respFuture);
return;
} else {
codec.responseComplete(true, respFuture, null, halfRequestExc);
}
}
}
while (!pauseWriting.get() && (respFuture = pauseRequests.poll()) != null) {
respFuture = pauseRequests.poll();
if (respFuture != null) {
ClientFuture future;
while ((future = pauseRequests.poll()) != null) {
requestQueue.add(future);
}
sendRequestInLocking((R) respFuture.getRequest(), respFuture);
}
} finally {