client还原

This commit is contained in:
redkale
2023-04-02 23:41:22 +08:00
parent aa03d356b8
commit 8727b5165a
4 changed files with 7 additions and 7 deletions

View File

@@ -225,7 +225,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
return connect().thenCompose(conn -> writeChannel(conn, request)); return connect().thenCompose(conn -> writeChannel(conn, request));
} }
public final <T> CompletableFuture<T> sendAsync(R request, BiFunction<C, P, T> respTransfer) { public final <T> CompletableFuture<T> sendAsync(R request, Function<P, T> respTransfer) {
if (request.workThread == null) { if (request.workThread == null) {
request.workThread = WorkThread.currWorkThread(); request.workThread = WorkThread.currWorkThread();
} }
@@ -239,7 +239,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
return connect(addr).thenCompose(conn -> writeChannel(conn, request)); return connect(addr).thenCompose(conn -> writeChannel(conn, request));
} }
public final <T> CompletableFuture<T> sendAsync(SocketAddress addr, R request, BiFunction<C, P, T> respTransfer) { public final <T> CompletableFuture<T> sendAsync(SocketAddress addr, R request, Function<P, T> respTransfer) {
if (request.workThread == null) { if (request.workThread == null) {
request.workThread = WorkThread.currWorkThread(); request.workThread = WorkThread.currWorkThread();
} }
@@ -250,7 +250,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
return conn.writeChannel(request); return conn.writeChannel(request);
} }
protected <T> CompletableFuture<T> writeChannel(ClientConnection conn, R request, BiFunction<C, P, T> respTransfer) { protected <T> CompletableFuture<T> writeChannel(ClientConnection conn, R request, Function<P, T> respTransfer) {
return conn.writeChannel(request, respTransfer); return conn.writeChannel(request, respTransfer);
} }

View File

@@ -135,7 +135,7 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
}); });
} }
} else { } else {
final Object rs = request.respTransfer == null ? message : request.respTransfer.apply(connection, message); final Object rs = request.respTransfer == null ? message : request.respTransfer.apply(message);
if (workThread.inIO() && workThread.getState() == Thread.State.RUNNABLE) { if (workThread.inIO() && workThread.getState() == Thread.State.RUNNABLE) {
workThread.execute(() -> { workThread.execute(() -> {
Traces.currTraceid(request.traceid); Traces.currTraceid(request.traceid);

View File

@@ -106,7 +106,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
} }
//respTransfer只会在ClientCodec的读线程里调用 //respTransfer只会在ClientCodec的读线程里调用
protected final <T> CompletableFuture<T> writeChannel(R request, BiFunction<? extends ClientConnection<R, P>, P, T> respTransfer) { protected final <T> CompletableFuture<T> writeChannel(R request, Function<P, T> respTransfer) {
request.respTransfer = respTransfer; request.respTransfer = respTransfer;
ClientFuture respFuture = createClientFuture(request); ClientFuture respFuture = createClientFuture(request);
int rts = this.channel.getReadTimeoutSeconds(); int rts = this.channel.getReadTimeoutSeconds();

View File

@@ -6,7 +6,7 @@
package org.redkale.net.client; package org.redkale.net.client;
import java.io.Serializable; import java.io.Serializable;
import java.util.function.BiFunction; import java.util.function.Function;
import org.redkale.net.WorkThread; import org.redkale.net.WorkThread;
import org.redkale.util.*; import org.redkale.util.*;
@@ -28,7 +28,7 @@ public abstract class ClientRequest {
protected String traceid; protected String traceid;
//只会在ClientCodec的读线程里调用 //只会在ClientCodec的读线程里调用
BiFunction respTransfer; Function respTransfer;
public abstract void writeTo(ClientConnection conn, ByteArray array); public abstract void writeTo(ClientConnection conn, ByteArray array);