Client优化

This commit is contained in:
redkale
2024-08-25 09:43:51 +08:00
parent 5ddf730c7d
commit 48007b141b
4 changed files with 8 additions and 35 deletions

View File

@@ -43,6 +43,8 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
protected final ScheduledThreadPoolExecutor timeoutScheduler;
protected final Random random = new Random();
// 结合ClientRequest.isCompleted()使用
// 使用场景批量request提交时后面的request需响应上一个request返回值来构建
// 例如: MySQL批量提交PrepareSQL场景
@@ -404,7 +406,6 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
if (workThread != null && workThread.threads() == entrys.length && workThread.index() > -1) {
return entrys[workThread.index()];
}
ThreadLocalRandom random = ThreadLocalRandom.current();
int index = workThread == null || workThread.index() < 0
? random.nextInt(entrys.length)
: workThread.index() % entrys.length;

View File

@@ -195,38 +195,6 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
}
}
// private void sendFastRequestInLocking(R request, ClientFuture respFuture) {
// ByteArray array = arrayThreadLocal.get();
// array.clear();
// request.writeTo(this, array);
// if (request.isCompleted()) {
// doneRequestCounter.increment();
// } else { //还剩半包没发送完
// pauseWriting.set(true);
// currHalfWriteFuture = respFuture;
// }
// channel.fastWrite(array.getBytes());
// }
//
// private void sendFastRequestInLocking(ClientFuture[] respFutures) {
// ByteArray array = arrayThreadLocal.get();
// array.clear();
// for (ClientFuture respFuture : respFutures) {
// if (pauseWriting.get()) {
// pauseRequests.add(respFuture);
// } else {
// ClientRequest request = respFuture.request;
// request.writeTo(this, array);
// if (request.isCompleted()) {
// doneRequestCounter.increment();
// } else { //还剩半包没发送完
// pauseWriting.set(true);
// currHalfWriteFuture = respFuture;
// }
// }
// }
// channel.fastWrite(array.getBytes());
// }
// 发送半包和积压的请求数据包
void sendHalfWriteInReadThread(R request, Throwable halfRequestExc) {
writeLock.lock();
@@ -237,7 +205,6 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
this.currHalfWriteFuture = null;
if (halfRequestExc == null) {
offerFirstRespFuture(respFuture);
// sendRequestInLocking(request, respFuture);
sendRequestToChannel(respFuture);
} else {
codec.responseComplete(true, respFuture, null, halfRequestExc);

View File

@@ -5,6 +5,7 @@
*/
package org.redkale.net.client;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.*;
import org.redkale.annotation.Nonnull;
@@ -53,6 +54,10 @@ public class ClientFuture<R extends ClientRequest, T> extends CompletableFuture<
}
}
public static ClientFuture[] array(Collection<ClientFuture> list) {
return list.toArray(new ClientFuture[list.size()]);
}
@Override // JDK9+
public <U> ClientFuture<R, U> newIncompleteFuture() {
ClientFuture future = new ClientFuture<>(conn, request);

View File

@@ -51,7 +51,7 @@ public class ClientWriteThread extends Thread {
list.add(respFuture);
}
}
conn.sendRequestToChannel(list.toArray(new ClientFuture[list.size()]));
conn.sendRequestToChannel(ClientFuture.array(list));
list.clear();
if (over) {
return;