ClientCodec优化
This commit is contained in:
@@ -83,7 +83,7 @@ public abstract class ClientCodec<R extends ClientRequest, P extends ClientResul
|
|||||||
messageListener.onMessage(connection, cr);
|
messageListener.onMessage(connection, cr);
|
||||||
respPool.accept(cr);
|
respPool.accept(cr);
|
||||||
} else {
|
} else {
|
||||||
ClientFuture<R, P> respFuture = connection.pollRespFuture(cr.getRequestid());
|
ClientFuture respFuture = connection.pollRespFuture(cr.getRequestid());
|
||||||
if (respFuture != null) {
|
if (respFuture != null) {
|
||||||
if (respFuture.request != cr.request) {
|
if (respFuture.request != cr.request) {
|
||||||
connection.dispose(new RedkaleException("request pipeline error"));
|
connection.dispose(new RedkaleException("request pipeline error"));
|
||||||
@@ -116,7 +116,7 @@ public abstract class ClientCodec<R extends ClientRequest, P extends ClientResul
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void responseComplete(boolean halfCompleted, ClientFuture<R, P> respFuture, P message, Throwable exc) {
|
void responseComplete(boolean halfCompleted, ClientFuture<R, Object> respFuture, P message, Throwable exc) {
|
||||||
R request = respFuture.request;
|
R request = respFuture.request;
|
||||||
Traces.currentTraceid(request.getTraceid());
|
Traces.currentTraceid(request.getTraceid());
|
||||||
AsyncIOThread readThread = connection.channel.getReadIOThread();
|
AsyncIOThread readThread = connection.channel.getReadIOThread();
|
||||||
@@ -144,7 +144,7 @@ public abstract class ClientCodec<R extends ClientRequest, P extends ClientResul
|
|||||||
connection.preComplete(message, (R) request, exc);
|
connection.preComplete(message, (R) request, exc);
|
||||||
|
|
||||||
if (exc == null) {
|
if (exc == null) {
|
||||||
final P rs = request.respTransfer == null ? message : (P) request.respTransfer.apply(message);
|
final Object rs = request.respTransfer == null ? message : request.respTransfer.apply(message);
|
||||||
if (workThread == null) {
|
if (workThread == null) {
|
||||||
readThread.runWork(() -> {
|
readThread.runWork(() -> {
|
||||||
Traces.currentTraceid(request.traceid);
|
Traces.currentTraceid(request.traceid);
|
||||||
|
|||||||
@@ -30,7 +30,7 @@ public abstract class ClientRequest {
|
|||||||
|
|
||||||
protected String traceid;
|
protected String traceid;
|
||||||
|
|
||||||
//只会在ClientCodec的读线程里调用
|
//只会在ClientCodec的读线程里调用, 将ClientResult转成最终结果对象
|
||||||
Function respTransfer;
|
Function respTransfer;
|
||||||
|
|
||||||
public abstract void writeTo(ClientConnection conn, ByteArray array);
|
public abstract void writeTo(ClientConnection conn, ByteArray array);
|
||||||
|
|||||||
@@ -6,6 +6,7 @@
|
|||||||
package org.redkale.net.client;
|
package org.redkale.net.client;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
import org.redkale.annotation.Nullable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
@@ -21,6 +22,7 @@ import java.io.Serializable;
|
|||||||
*/
|
*/
|
||||||
public class ClientResponse<R extends ClientRequest, P extends ClientResult> {
|
public class ClientResponse<R extends ClientRequest, P extends ClientResult> {
|
||||||
|
|
||||||
|
@Nullable
|
||||||
protected R request; //服务端返回一个不存在的requestid,可能为null
|
protected R request; //服务端返回一个不存在的requestid,可能为null
|
||||||
|
|
||||||
protected P message;
|
protected P message;
|
||||||
|
|||||||
Reference in New Issue
Block a user