diff --git a/src/main/java/org/redkale/net/client/Client.java b/src/main/java/org/redkale/net/client/Client.java index e250d6cab..efd4d4835 100644 --- a/src/main/java/org/redkale/net/client/Client.java +++ b/src/main/java/org/redkale/net/client/Client.java @@ -209,6 +209,13 @@ public abstract class Client, R extends ClientR return connect(null).thenCompose(conn -> writeChannel(conn, request)); } + public final CompletableFuture sendAsync(R request, Function respTransfer) { + if (request.workThread == null) { + request.workThread = WorkThread.currWorkThread(); + } + return connect(null).thenCompose(conn -> writeChannel(conn, request, respTransfer)); + } + public final CompletableFuture

sendAsync(ChannelContext context, R request) { if (request.workThread == null) { request.workThread = WorkThread.currWorkThread(); @@ -216,10 +223,21 @@ public abstract class Client, R extends ClientR return connect(context).thenCompose(conn -> writeChannel(conn, request)); } + public final CompletableFuture sendAsync(ChannelContext context, R request, Function respTransfer) { + if (request.workThread == null) { + request.workThread = WorkThread.currWorkThread(); + } + return connect(context).thenCompose(conn -> writeChannel(conn, request, respTransfer)); + } + protected CompletableFuture

writeChannel(ClientConnection conn, R request) { return conn.writeChannel(request); } + protected CompletableFuture writeChannel(ClientConnection conn, R request, Function respTransfer) { + return conn.writeChannel(request, respTransfer); + } + protected CompletableFuture connect() { return connect(null); } diff --git a/src/main/java/org/redkale/net/client/ClientCodec.java b/src/main/java/org/redkale/net/client/ClientCodec.java index ef69c45b4..f8a896625 100644 --- a/src/main/java/org/redkale/net/client/ClientCodec.java +++ b/src/main/java/org/redkale/net/client/ClientCodec.java @@ -15,7 +15,7 @@ import org.redkale.net.*; import org.redkale.util.*; /** - * 每个ClientConnection绑定一个独立的ClientCodec实例 + * 每个ClientConnection绑定一个独立的ClientCodec实例, 只会同一读线程里运行 * *

* 详情见: https://redkale.org @@ -121,9 +121,10 @@ public abstract class ClientCodec implements Complet respFuture.completeExceptionally(exc); }); } else { + final Object rs = request.respTransfer == null ? message : request.respTransfer.apply(message); workThread.runWork(() -> { Traces.currTraceid(request.traceid); - respFuture.complete(message); + respFuture.complete(rs); }); } } catch (Throwable t) { diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index 29630687c..9e226b612 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -10,7 +10,7 @@ import java.nio.channels.ClosedChannelException; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; -import java.util.function.Consumer; +import java.util.function.*; import org.redkale.net.*; /** @@ -68,6 +68,12 @@ public abstract class ClientConnection implements Co protected abstract ClientCodec createCodec(); protected final CompletableFuture

writeChannel(R request) { + return writeChannel(request, null); + } + + //respTransfer只会在ClientCodec的读线程里调用 + protected final CompletableFuture writeChannel(R request, Function respTransfer) { + request.respTransfer = respTransfer; ClientFuture respFuture = createClientFuture(request); int rts = this.channel.getReadTimeoutSeconds(); if (rts > 0 && !request.isCloseType()) { @@ -139,8 +145,8 @@ public abstract class ClientConnection implements Co return channel; } - public ClientCodec getCodec() { - return codec; + public > C getCodec() { + return (C) codec; } public int getMaxPipelines() { diff --git a/src/main/java/org/redkale/net/client/ClientRequest.java b/src/main/java/org/redkale/net/client/ClientRequest.java index fd8ede409..46e118de2 100644 --- a/src/main/java/org/redkale/net/client/ClientRequest.java +++ b/src/main/java/org/redkale/net/client/ClientRequest.java @@ -6,6 +6,7 @@ package org.redkale.net.client; import java.io.Serializable; +import java.util.function.Function; import org.redkale.net.WorkThread; import org.redkale.util.*; @@ -26,6 +27,9 @@ public abstract class ClientRequest { protected String traceid; + //只会在ClientCodec的读线程里调用 + Function respTransfer; + public abstract void writeTo(ClientConnection conn, ByteArray array); public Serializable getRequestid() { @@ -55,6 +59,11 @@ public abstract class ClientRequest { return (T) this; } + public T currTraceid(String traceid) { + this.traceid = traceid; + return (T) this; + } + //数据是否全部写入,如果只写部分,返回false, 配合ClientConnection.pauseWriting使用 protected boolean isCompleted() { return true; @@ -63,11 +72,13 @@ public abstract class ClientRequest { protected void prepare() { this.createTime = System.currentTimeMillis(); this.traceid = Traces.currTraceid(); + this.respTransfer = null; } protected boolean recycle() { this.createTime = 0; this.traceid = null; + this.respTransfer = null; return true; } }