This commit is contained in:
redkale
2024-08-17 11:11:20 +08:00
parent 96e9d897ec
commit 9402db9fd5

View File

@@ -265,63 +265,20 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
return sendAsync(getAddress(request), (Function) null, request);
}
public final CompletableFuture<P>[] sendAsync(R... requests) {
return sendAsync(getAddress(requests[0]), (Function) null, requests);
}
public final <T> CompletableFuture<T> sendAsync(Function<P, T> respTransfer, R request) {
return sendAsync(getAddress(request), respTransfer, request);
}
public final <T> CompletableFuture<T>[] sendAsync(Function<P, T> respTransfer, R... requests) {
return sendAsync(getAddress(requests[0]), respTransfer, requests);
}
public final CompletableFuture<P> sendAsync(SocketAddress addr, R request) {
return sendAsync(addr, (Function) null, request);
}
public final CompletableFuture<P>[] sendAsync(SocketAddress addr, R... requests) {
return sendAsync(addr, (Function) null, requests);
}
public final <T> CompletableFuture<T> sendAsync(SocketAddress addr, Function<P, T> respTransfer, R request) {
request.traceid = Traces.computeIfAbsent(request.traceid, Traces.currentTraceid());
request.computeWorkThreadIfAbsent();
return connect(request.workThread, addr).thenCompose(conn -> writeChannel(conn, respTransfer, request));
}
public final <T> CompletableFuture<T>[] sendAsync(SocketAddress addr, Function<P, T> respTransfer, R... requests) {
String traceid = Traces.computeIfAbsent(requests[0].traceid, Traces.currentTraceid());
ClientFuture[] respFutures = new ClientFuture[requests.length];
for (int i = 0; i < respFutures.length; i++) {
R request = requests[i];
request.traceid = traceid;
request.computeWorkThreadIfAbsent();
respFutures[i] = createClientFuture(null, requests[i]);
}
connect(requests[0].workThread, addr).whenComplete((conn, t) -> {
if (t != null) {
for (ClientFuture f : respFutures) {
f.completeExceptionally(t);
}
} else {
CompletableFuture<T>[] fs = writeChannel(conn, respTransfer, requests);
for (int i = 0; i < respFutures.length; i++) {
final int index = i;
fs[index].whenComplete((v, e) -> {
if (e != null) {
respFutures[index].completeExceptionally(e);
} else {
respFutures[index].complete(v);
}
});
}
}
});
return respFutures;
}
protected CompletableFuture<P> writeChannel(ClientConnection conn, R request) {
return conn.writeChannel(request);
}