Client.authenticate

This commit is contained in:
redkale
2024-09-05 12:11:34 +08:00
parent 8ca6f46c4a
commit 3cda0c4dd9

View File

@@ -81,7 +81,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
protected Supplier<R> closeRequestSupplier; protected Supplier<R> closeRequestSupplier;
// 创建连接后进行的登录鉴权操作 // 创建连接后进行的登录鉴权操作
protected Function<String, Function<C, CompletableFuture<C>>> authenticate; protected BiFunction<WorkThread, String, Function<C, CompletableFuture<C>>> authenticate;
protected Client(String name, AsyncGroup group, ClientAddress address) { protected Client(String name, AsyncGroup group, ClientAddress address) {
this(name, group, true, address, Utility.cpus(), DEFAULT_MAX_PIPELINES, null, null, null); this(name, group, true, address, Utility.cpus(), DEFAULT_MAX_PIPELINES, null, null, null);
@@ -106,7 +106,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
boolean tcp, boolean tcp,
ClientAddress address, ClientAddress address,
int maxConns, int maxConns,
Function<String, Function<C, CompletableFuture<C>>> authenticate) { BiFunction<WorkThread, String, Function<C, CompletableFuture<C>>> authenticate) {
this(name, group, tcp, address, maxConns, DEFAULT_MAX_PIPELINES, null, null, authenticate); this(name, group, tcp, address, maxConns, DEFAULT_MAX_PIPELINES, null, null, authenticate);
} }
@@ -117,7 +117,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
ClientAddress address, ClientAddress address,
int maxConns, int maxConns,
Supplier<R> closeRequestSupplier, Supplier<R> closeRequestSupplier,
Function<String, Function<C, CompletableFuture<C>>> authenticate) { BiFunction<WorkThread, String, Function<C, CompletableFuture<C>>> authenticate) {
this(name, group, tcp, address, maxConns, DEFAULT_MAX_PIPELINES, null, closeRequestSupplier, authenticate); this(name, group, tcp, address, maxConns, DEFAULT_MAX_PIPELINES, null, closeRequestSupplier, authenticate);
} }
@@ -131,7 +131,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
int maxPipelines, int maxPipelines,
Supplier<R> pingRequestSupplier, Supplier<R> pingRequestSupplier,
Supplier<R> closeRequestSupplier, Supplier<R> closeRequestSupplier,
Function<String, Function<C, CompletableFuture<C>>> authenticate) { BiFunction<WorkThread, String, Function<C, CompletableFuture<C>>> authenticate) {
if (maxPipelines < 1) { if (maxPipelines < 1) {
throw new IllegalArgumentException("maxPipelines must bigger 0"); throw new IllegalArgumentException("maxPipelines must bigger 0");
} }
@@ -346,6 +346,9 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
R virtualReq = createVirtualRequestAfterConnect(); R virtualReq = createVirtualRequestAfterConnect();
if (virtualReq != null) { if (virtualReq != null) {
virtualReq.traceid = traceid; virtualReq.traceid = traceid;
if (virtualReq.workThread == null) {
virtualReq.workThread = workThread;
}
future = future.thenCompose(conn -> { future = future.thenCompose(conn -> {
Traces.currentTraceid(traceid); Traces.currentTraceid(traceid);
return conn.writeVirtualRequest(virtualReq).thenApply(v -> conn); return conn.writeVirtualRequest(virtualReq).thenApply(v -> conn);
@@ -357,7 +360,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
}); });
} }
if (authenticate != null) { if (authenticate != null) {
future = future.thenCompose(authenticate.apply(traceid)); future = future.thenCompose(authenticate.apply(workThread, traceid));
} }
return future.thenApply(c -> { return future.thenApply(c -> {
c.setAuthenticated(true); c.setAuthenticated(true);