Client优化

This commit is contained in:
redkale
2024-08-16 22:18:42 +08:00
parent 5384e9ab96
commit 96e9d897ec
3 changed files with 64 additions and 35 deletions

View File

@@ -265,7 +265,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
return sendAsync(getAddress(request), (Function) null, request);
}
public final CompletableFuture<List<P>> sendAsync(R... requests) {
public final CompletableFuture<P>[] sendAsync(R... requests) {
return sendAsync(getAddress(requests[0]), (Function) null, requests);
}
@@ -273,7 +273,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
return sendAsync(getAddress(request), respTransfer, request);
}
public final <T> CompletableFuture<List<T>> sendAsync(Function<P, T> respTransfer, R... requests) {
public final <T> CompletableFuture<T>[] sendAsync(Function<P, T> respTransfer, R... requests) {
return sendAsync(getAddress(requests[0]), respTransfer, requests);
}
@@ -281,7 +281,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
return sendAsync(addr, (Function) null, request);
}
public final CompletableFuture<List<P>> sendAsync(SocketAddress addr, R... requests) {
public final CompletableFuture<P>[] sendAsync(SocketAddress addr, R... requests) {
return sendAsync(addr, (Function) null, requests);
}
@@ -291,26 +291,47 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
return connect(request.workThread, addr).thenCompose(conn -> writeChannel(conn, respTransfer, request));
}
public final <T> CompletableFuture<List<T>> sendAsync(
SocketAddress addr, Function<P, T> respTransfer, R... requests) {
public final <T> CompletableFuture<T>[] sendAsync(SocketAddress addr, Function<P, T> respTransfer, R... requests) {
String traceid = Traces.computeIfAbsent(requests[0].traceid, Traces.currentTraceid());
for (R request : requests) {
ClientFuture[] respFutures = new ClientFuture[requests.length];
for (int i = 0; i < respFutures.length; i++) {
R request = requests[i];
request.traceid = traceid;
request.computeWorkThreadIfAbsent();
respFutures[i] = createClientFuture(null, requests[i]);
}
return connect(requests[0].workThread, addr).thenCompose(conn -> writeChannel(conn, respTransfer, requests));
connect(requests[0].workThread, addr).whenComplete((conn, t) -> {
if (t != null) {
for (ClientFuture f : respFutures) {
f.completeExceptionally(t);
}
} else {
CompletableFuture<T>[] fs = writeChannel(conn, respTransfer, requests);
for (int i = 0; i < respFutures.length; i++) {
final int index = i;
fs[index].whenComplete((v, e) -> {
if (e != null) {
respFutures[index].completeExceptionally(e);
} else {
respFutures[index].complete(v);
}
});
}
}
});
return respFutures;
}
protected CompletableFuture<P> writeChannel(ClientConnection conn, R request) {
return conn.writeChannel(request);
}
protected CompletableFuture<List<P>> writeChannel(ClientConnection conn, R... requests) {
protected CompletableFuture<P>[] writeChannel(ClientConnection conn, R... requests) {
requests[0].traceid = Traces.computeIfAbsent(requests[0].traceid, Traces.currentTraceid());
return conn.writeChannel(requests);
}
protected <T> CompletableFuture<List<T>> writeChannel(
protected <T> CompletableFuture<T>[] writeChannel(
ClientConnection conn, Function<P, T> respTransfer, R[] requests) {
return conn.writeChannel(respTransfer, requests);
}
@@ -433,6 +454,15 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
return entrys[index];
}
protected ClientFuture<R, P> createClientFuture(ClientConnection conn, R request) {
ClientFuture respFuture = new ClientFuture(conn, request);
int rts = getReadTimeoutSeconds();
if (rts > 0 && !request.isCloseType()) {
respFuture.setTimeout(timeoutScheduler.schedule(respFuture, rts, TimeUnit.SECONDS));
}
return respFuture;
}
protected void incrReqWritedCounter() {
reqWritedCounter.increment();
}

View File

@@ -114,21 +114,21 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
return writeChannel((Function) null, request);
}
protected final CompletableFuture<List<P>> writeChannel(R[] requests) {
return writeChannel((Function) null, requests);
protected final CompletableFuture<P>[] writeChannel(R[] requests) {
return writeChannel0((Function) null, requests);
}
protected final <T> CompletableFuture<T> writeChannel(Function<P, T> respTransfer, R request) {
return writeChannel0(respTransfer, request).thenApply(v -> Utility.isEmpty(v) ? null : v.get(0));
return writeChannel0(respTransfer, request)[0];
}
// respTransfer只会在ClientCodec的读线程里调用
protected final <T> CompletableFuture<List<T>> writeChannel(Function<P, T> respTransfer, R... requests) {
protected final <T> CompletableFuture<T>[] writeChannel(Function<P, T> respTransfer, R... requests) {
return writeChannel0(respTransfer, requests);
}
// respTransfer只会在ClientCodec的读线程里调用
protected final <T> CompletableFuture<List<T>> writeChannel0(Function<P, T> respTransfer, R... requests) {
protected final <T> CompletableFuture<T>[] writeChannel0(Function<P, T> respTransfer, R... requests) {
if (client.debug) {
client.logger.log(
Level.FINEST,
@@ -136,11 +136,10 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
+ Arrays.toString(requests));
}
ClientFuture[] respFutures = new ClientFuture[requests.length];
int rts = this.client.getReadTimeoutSeconds();
for (int i = 0; i < respFutures.length; i++) {
R request = requests[i];
request.respTransfer = respTransfer;
respFutures[i] = createClientFuture(requests[i]);
respFutures[i] = client.createClientFuture(this, requests[i]);
}
respWaitingCounter.add(respFutures.length); // 放在writeChannelInWriteThread计数会延迟导致不准确
@@ -160,7 +159,7 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
} finally {
writeLock.unlock();
}
return Utility.allOfFutures(respFutures);
return respFutures;
}
protected void sendRequestInLocking(ClientFuture... respFutures) {
@@ -257,7 +256,7 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
return CompletableFuture.failedFuture(
new RuntimeException("ClientVirtualRequest must be virtualType = true"));
}
ClientFuture<R, P> respFuture = createClientFuture(request);
ClientFuture<R, P> respFuture = client.createClientFuture(this, request);
writeLock.lock();
try {
offerRespFuture(respFuture);
@@ -270,15 +269,6 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
protected void preComplete(P resp, R req, Throwable exc) {}
protected ClientFuture<R, P> createClientFuture(R request) {
ClientFuture respFuture = new ClientFuture(this, request);
int rts = this.client.getReadTimeoutSeconds();
if (rts > 0 && !request.isCloseType()) {
respFuture.setTimeout(client.timeoutScheduler.schedule(respFuture, rts, TimeUnit.SECONDS));
}
return respFuture;
}
@Override // AsyncConnection.beforeCloseListener
public void accept(AsyncConnection t) {
respWaitingCounter.reset();

View File

@@ -8,6 +8,7 @@ package org.redkale.net.client;
import java.util.Objects;
import java.util.concurrent.*;
import org.redkale.annotation.Nonnull;
import org.redkale.annotation.Nullable;
import org.redkale.net.*;
import org.redkale.util.Traces;
@@ -24,7 +25,7 @@ public class ClientFuture<R extends ClientRequest, T> extends CompletableFuture<
@Nonnull
protected final R request;
@Nonnull
@Nullable
protected final ClientConnection conn;
private ScheduledFuture timeout;
@@ -37,7 +38,6 @@ public class ClientFuture<R extends ClientRequest, T> extends CompletableFuture<
ClientFuture(ClientConnection conn, R request) {
super();
Objects.requireNonNull(conn);
Objects.requireNonNull(request);
this.conn = conn;
this.request = request;
@@ -79,7 +79,7 @@ public class ClientFuture<R extends ClientRequest, T> extends CompletableFuture<
private void runTimeout() {
String traceid = request != null ? request.getTraceid() : null;
if (request != null) {
if (request != null && conn != null) {
conn.removeRespFuture(request.getRequestid(), this);
}
TimeoutException ex = new TimeoutException("client-request: " + request);
@@ -88,17 +88,26 @@ public class ClientFuture<R extends ClientRequest, T> extends CompletableFuture<
workThread = request.workThread;
request.workThread = null;
}
if (workThread == null || workThread.getWorkExecutor() == null) {
if (conn != null && (workThread == null || workThread.getWorkExecutor() == null)) {
workThread = conn.getChannel().getReadIOThread();
}
workThread.runWork(() -> {
if (workThread == null) {
Traces.currentTraceid(traceid);
if (!isDone()) {
completeExceptionally(ex);
}
Traces.removeTraceid();
});
conn.dispose(ex);
} else {
workThread.runWork(() -> {
Traces.currentTraceid(traceid);
if (!isDone()) {
completeExceptionally(ex);
}
Traces.removeTraceid();
});
}
if (conn != null) {
conn.dispose(ex);
}
}
@Override