ClientRequest增加transfer功能
This commit is contained in:
@@ -209,6 +209,13 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
|
|||||||
return connect(null).thenCompose(conn -> writeChannel(conn, request));
|
return connect(null).thenCompose(conn -> writeChannel(conn, request));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public final <T> CompletableFuture<T> sendAsync(R request, Function<P, T> respTransfer) {
|
||||||
|
if (request.workThread == null) {
|
||||||
|
request.workThread = WorkThread.currWorkThread();
|
||||||
|
}
|
||||||
|
return connect(null).thenCompose(conn -> writeChannel(conn, request, respTransfer));
|
||||||
|
}
|
||||||
|
|
||||||
public final CompletableFuture<P> sendAsync(ChannelContext context, R request) {
|
public final CompletableFuture<P> sendAsync(ChannelContext context, R request) {
|
||||||
if (request.workThread == null) {
|
if (request.workThread == null) {
|
||||||
request.workThread = WorkThread.currWorkThread();
|
request.workThread = WorkThread.currWorkThread();
|
||||||
@@ -216,10 +223,21 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
|
|||||||
return connect(context).thenCompose(conn -> writeChannel(conn, request));
|
return connect(context).thenCompose(conn -> writeChannel(conn, request));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public final <T> CompletableFuture<T> sendAsync(ChannelContext context, R request, Function<P, T> respTransfer) {
|
||||||
|
if (request.workThread == null) {
|
||||||
|
request.workThread = WorkThread.currWorkThread();
|
||||||
|
}
|
||||||
|
return connect(context).thenCompose(conn -> writeChannel(conn, request, respTransfer));
|
||||||
|
}
|
||||||
|
|
||||||
protected CompletableFuture<P> writeChannel(ClientConnection conn, R request) {
|
protected CompletableFuture<P> writeChannel(ClientConnection conn, R request) {
|
||||||
return conn.writeChannel(request);
|
return conn.writeChannel(request);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected <T> CompletableFuture<T> writeChannel(ClientConnection conn, R request, Function<P, T> respTransfer) {
|
||||||
|
return conn.writeChannel(request, respTransfer);
|
||||||
|
}
|
||||||
|
|
||||||
protected CompletableFuture<C> connect() {
|
protected CompletableFuture<C> connect() {
|
||||||
return connect(null);
|
return connect(null);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -15,7 +15,7 @@ import org.redkale.net.*;
|
|||||||
import org.redkale.util.*;
|
import org.redkale.util.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 每个ClientConnection绑定一个独立的ClientCodec实例
|
* 每个ClientConnection绑定一个独立的ClientCodec实例, 只会同一读线程里运行
|
||||||
*
|
*
|
||||||
* <p>
|
* <p>
|
||||||
* 详情见: https://redkale.org
|
* 详情见: https://redkale.org
|
||||||
@@ -121,9 +121,10 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
|
|||||||
respFuture.completeExceptionally(exc);
|
respFuture.completeExceptionally(exc);
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
|
final Object rs = request.respTransfer == null ? message : request.respTransfer.apply(message);
|
||||||
workThread.runWork(() -> {
|
workThread.runWork(() -> {
|
||||||
Traces.currTraceid(request.traceid);
|
Traces.currTraceid(request.traceid);
|
||||||
respFuture.complete(message);
|
respFuture.complete(rs);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ import java.nio.channels.ClosedChannelException;
|
|||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.*;
|
import java.util.concurrent.*;
|
||||||
import java.util.concurrent.atomic.*;
|
import java.util.concurrent.atomic.*;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.*;
|
||||||
import org.redkale.net.*;
|
import org.redkale.net.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -68,6 +68,12 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
|
|||||||
protected abstract ClientCodec createCodec();
|
protected abstract ClientCodec createCodec();
|
||||||
|
|
||||||
protected final CompletableFuture<P> writeChannel(R request) {
|
protected final CompletableFuture<P> writeChannel(R request) {
|
||||||
|
return writeChannel(request, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
//respTransfer只会在ClientCodec的读线程里调用
|
||||||
|
protected final <T> CompletableFuture<T> writeChannel(R request, Function<P, T> respTransfer) {
|
||||||
|
request.respTransfer = respTransfer;
|
||||||
ClientFuture respFuture = createClientFuture(request);
|
ClientFuture respFuture = createClientFuture(request);
|
||||||
int rts = this.channel.getReadTimeoutSeconds();
|
int rts = this.channel.getReadTimeoutSeconds();
|
||||||
if (rts > 0 && !request.isCloseType()) {
|
if (rts > 0 && !request.isCloseType()) {
|
||||||
@@ -139,8 +145,8 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
|
|||||||
return channel;
|
return channel;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ClientCodec<R, P> getCodec() {
|
public <C extends ClientCodec<R, P>> C getCodec() {
|
||||||
return codec;
|
return (C) codec;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getMaxPipelines() {
|
public int getMaxPipelines() {
|
||||||
|
|||||||
@@ -6,6 +6,7 @@
|
|||||||
package org.redkale.net.client;
|
package org.redkale.net.client;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
import java.util.function.Function;
|
||||||
import org.redkale.net.WorkThread;
|
import org.redkale.net.WorkThread;
|
||||||
import org.redkale.util.*;
|
import org.redkale.util.*;
|
||||||
|
|
||||||
@@ -26,6 +27,9 @@ public abstract class ClientRequest {
|
|||||||
|
|
||||||
protected String traceid;
|
protected String traceid;
|
||||||
|
|
||||||
|
//只会在ClientCodec的读线程里调用
|
||||||
|
Function respTransfer;
|
||||||
|
|
||||||
public abstract void writeTo(ClientConnection conn, ByteArray array);
|
public abstract void writeTo(ClientConnection conn, ByteArray array);
|
||||||
|
|
||||||
public Serializable getRequestid() {
|
public Serializable getRequestid() {
|
||||||
@@ -55,6 +59,11 @@ public abstract class ClientRequest {
|
|||||||
return (T) this;
|
return (T) this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public <T extends ClientRequest> T currTraceid(String traceid) {
|
||||||
|
this.traceid = traceid;
|
||||||
|
return (T) this;
|
||||||
|
}
|
||||||
|
|
||||||
//数据是否全部写入,如果只写部分,返回false, 配合ClientConnection.pauseWriting使用
|
//数据是否全部写入,如果只写部分,返回false, 配合ClientConnection.pauseWriting使用
|
||||||
protected boolean isCompleted() {
|
protected boolean isCompleted() {
|
||||||
return true;
|
return true;
|
||||||
@@ -63,11 +72,13 @@ public abstract class ClientRequest {
|
|||||||
protected void prepare() {
|
protected void prepare() {
|
||||||
this.createTime = System.currentTimeMillis();
|
this.createTime = System.currentTimeMillis();
|
||||||
this.traceid = Traces.currTraceid();
|
this.traceid = Traces.currTraceid();
|
||||||
|
this.respTransfer = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected boolean recycle() {
|
protected boolean recycle() {
|
||||||
this.createTime = 0;
|
this.createTime = 0;
|
||||||
this.traceid = null;
|
this.traceid = null;
|
||||||
|
this.respTransfer = null;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user