增加ClientResult功能

This commit is contained in:
redkale
2023-10-23 23:00:03 +08:00
parent 2304e115da
commit bd9b04ce0b
12 changed files with 68 additions and 28 deletions

View File

@@ -161,9 +161,9 @@ public class HttpClusterRpcClient extends HttpRpcClient {
String url = "http://" + addr.getHostString() + ":" + addr.getPort() + requesturi;
if (finest) {
if (clientBody != null) {
logger.log(Level.FINEST, "forEachCollectionFuture: url=" + url + ", body=" + new String(clientBody, StandardCharsets.UTF_8) + ", headers=" + clientHeaders);
logger.log(Level.FINEST, "forEachCollectionFuture: url: " + url + ", body: " + new String(clientBody, StandardCharsets.UTF_8) + ", headers: " + clientHeaders);
} else {
logger.log(Level.FINEST, "forEachCollectionFuture: url=" + url + ", headers=" + clientHeaders);
logger.log(Level.FINEST, "forEachCollectionFuture: url: " + url + ", headers: " + clientHeaders);
}
}
if (httpSimpleClient != null) {

View File

@@ -151,6 +151,10 @@ public abstract class Request<C extends Context> {
return attributes;
}
public boolean isKeepAlive() {
return keepAlive;
}
public C getContext() {
return this.context;
}

View File

@@ -26,7 +26,7 @@ import org.redkale.util.*;
* @param <R> 请求对象
* @param <P> 响应对象
*/
public abstract class Client<C extends ClientConnection<R, P>, R extends ClientRequest, P> implements Resourcable {
public abstract class Client<C extends ClientConnection<R, P>, R extends ClientRequest, P extends ClientResult> implements Resourcable {
public static final int DEFAULT_MAX_PIPELINES = 128;

View File

@@ -25,7 +25,7 @@ import org.redkale.util.*;
* @param <R> ClientRequest
* @param <P> 响应对象
*/
public abstract class ClientCodec<R extends ClientRequest, P> implements CompletionHandler<Integer, ByteBuffer> {
public abstract class ClientCodec<R extends ClientRequest, P extends ClientResult> implements CompletionHandler<Integer, ByteBuffer> {
private final List<ClientResponse<R, P>> respResults = new ArrayList<>();
@@ -166,6 +166,9 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
Traces.removeTraceid();
});
}
if (!message.isKeepAlive()) {
connection.dispose(null);
}
} else { //异常
if (workThread == null) {
readThread.runWork(() -> {

View File

@@ -30,7 +30,7 @@ import org.redkale.util.*;
* @param <R> 请求对象
* @param <P> 响应对象
*/
public abstract class ClientConnection<R extends ClientRequest, P> implements Consumer<AsyncConnection> {
public abstract class ClientConnection<R extends ClientRequest, P extends ClientResult> implements Consumer<AsyncConnection> {
//=-1 表示连接放在connAddrEntrys存储
//>=0 表示connArray的下坐标从0开始

View File

@@ -19,7 +19,7 @@ import java.io.Serializable;
* @param <R> 请求对象
* @param <P> message
*/
public class ClientResponse<R extends ClientRequest, P> {
public class ClientResponse<R extends ClientRequest, P extends ClientResult> {
protected R request; //服务端返回一个不存在的requestid可能为null
@@ -105,7 +105,7 @@ public class ClientResponse<R extends ClientRequest, P> {
return false;
}
static class ClientErrorResponse<R extends ClientRequest, P> extends ClientResponse<R, P> {
static class ClientErrorResponse<R extends ClientRequest, P extends ClientResult> extends ClientResponse<R, P> {
public ClientErrorResponse(R request, Throwable exc) {
super(request, exc);

View File

@@ -0,0 +1,20 @@
/*
*
*/
package org.redkale.net.client;
/**
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.8.0
*
*/
public interface ClientResult {
public boolean isKeepAlive();
}

View File

@@ -282,10 +282,6 @@ public class HttpRequest extends Request<HttpContext> {
this.keepAlive = keepAlive;
}
protected boolean isKeepAlive() {
return this.keepAlive;
}
protected ConvertType getRespConvertType() {
return this.respConvertType;
}

View File

@@ -61,9 +61,9 @@ public class SncpClientRequest extends ClientRequest {
public void writeTo(ClientConnection conn, ByteArray array) {
array.putPlaceholder(SncpHeader.calcHeaderSize(this));
if (bodyContent == null) {
header.writeTo(array, this, 0, 0);
header.writeTo(array, this, SncpHeader.KEEPALIVE_ON, 0, 0);
} else {
header.writeTo(array, this, bodyContent.length, 0);
header.writeTo(array, this, SncpHeader.KEEPALIVE_ON, bodyContent.length, 0);
array.put(bodyContent);
}
}

View File

@@ -6,6 +6,7 @@ package org.redkale.net.sncp;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Objects;
import org.redkale.net.client.ClientResult;
import org.redkale.util.ByteArray;
/**
@@ -18,7 +19,7 @@ import org.redkale.util.ByteArray;
*
* @since 2.8.0
*/
public class SncpClientResult {
public class SncpClientResult implements ClientResult {
private SncpHeader header;
@@ -54,6 +55,11 @@ public class SncpClientResult {
return header == null ? null : header.getSeqid();
}
@Override
public boolean isKeepAlive() {
return header == null ? false : header.isKeepAlive();
}
@Override
public String toString() {
return getClass().getSimpleName() + "_" + Objects.hashCode(this) + "{"

View File

@@ -15,7 +15,11 @@ import org.redkale.util.*;
*/
public class SncpHeader {
public static final int HEADER_SUBSIZE = 72;
public static final int HEADER_SUBSIZE = 69;
public static final byte KEEPALIVE_ON = 0;
public static final byte KEEPALIVE_OFF = -1;
private static final byte[] EMPTY_ADDR = new byte[4];
@@ -38,8 +42,8 @@ public class SncpHeader {
//响应方地址端口
private int addrPort;
// 预留扩展位
private int abilities;
//保持连接0keepAlive; -1:关闭连接
private byte keepAlive;
//时间戳
private long timestamp;
@@ -85,7 +89,7 @@ public class SncpHeader {
}
buffer.get(header.addrBytes); //addr 4
header.addrPort = buffer.getChar(); //port 2
header.abilities = buffer.getInt(); //4
header.keepAlive = buffer.get(); //1
header.timestamp = buffer.getLong(); //8
int traceSize = buffer.getChar(); //2
if (traceSize > 0) {
@@ -115,8 +119,8 @@ public class SncpHeader {
offset += 4;
header.addrPort = array.getChar(offset); //port 2
offset += 2;
header.abilities = array.getInt(offset); //4
offset += 4;
header.keepAlive = array.get(offset); //1
offset += 1;
header.timestamp = array.getLong(offset); //8
offset += 8;
int traceSize = array.getChar(offset); //2
@@ -132,16 +136,16 @@ public class SncpHeader {
return header;
}
public ByteArray writeTo(ByteArray array, SncpClientRequest clientRequest, int bodyLength, int retcode) {
return writeTo(array, this.addrBytes, this.addrPort, (Long) clientRequest.getRequestid(), clientRequest.traceBytes(), bodyLength, retcode);
public ByteArray writeTo(ByteArray array, SncpClientRequest clientRequest, byte keepAlive, int bodyLength, int retcode) {
return writeTo(array, this.addrBytes, this.addrPort, (Long) clientRequest.getRequestid(), clientRequest.traceBytes(), keepAlive, bodyLength, retcode);
}
public ByteArray writeTo(ByteArray array, SncpResponse response, int bodyLength, int retcode) {
public ByteArray writeTo(ByteArray array, SncpResponse response, byte keepAlive, int bodyLength, int retcode) {
SncpRequest request = response.request();
return writeTo(array, response.addrBytes, response.addrPort, (Long) request.getRequestid(), request.traceBytes(), bodyLength, retcode);
return writeTo(array, response.addrBytes, response.addrPort, (Long) request.getRequestid(), request.traceBytes(), keepAlive, bodyLength, retcode);
}
private ByteArray writeTo(ByteArray array, byte[] newAddrBytes, int newAddrPort, long newSeqid, byte[] traces, int bodyLength, int retcode) {
private ByteArray writeTo(ByteArray array, byte[] newAddrBytes, int newAddrPort, long newSeqid, byte[] traces, byte keepAlive, int bodyLength, int retcode) {
if (newAddrBytes.length != 4) {
throw new SncpException("address bytes length must be 4, but " + newAddrBytes.length);
}
@@ -167,8 +171,8 @@ public class SncpHeader {
offset += 4;
array.putChar(offset, (char) newAddrPort); //2
offset += 2;
array.putInt(offset, abilities); //4
offset += 4;
array.put(offset, keepAlive); //1
offset += 1;
array.putLong(offset, System.currentTimeMillis()); //8
offset += 8;
array.putChar(offset, (char) traces.length); //2
@@ -193,6 +197,7 @@ public class SncpHeader {
+ ",actionid=" + this.actionid
+ ",methodName=" + this.methodName
+ ",address=" + getAddress()
+ ",keepAlive=" + isKeepAlive()
+ ",timestamp=" + this.timestamp
+ ",traceid=" + getTraceid()
+ ",retcode=" + this.retcode
@@ -210,6 +215,10 @@ public class SncpHeader {
return valid;
}
public boolean isKeepAlive() {
return keepAlive != -1;
}
//供client端request和response的header判断
public boolean checkValid(SncpHeader other) {
return Objects.equals(this.serviceid, other.serviceid)

View File

@@ -11,6 +11,8 @@ import java.util.concurrent.*;
import org.redkale.asm.AsmDepends;
import org.redkale.convert.bson.BsonWriter;
import org.redkale.net.Response;
import static org.redkale.net.sncp.SncpHeader.KEEPALIVE_OFF;
import static org.redkale.net.sncp.SncpHeader.KEEPALIVE_ON;
import org.redkale.util.ByteArray;
import org.redkale.util.Traces;
@@ -111,7 +113,7 @@ public class SncpResponse extends Response<SncpContext, SncpRequest> {
}
protected void writeHeader(ByteArray array, int bodyLength, int retcode) {
request.getHeader().writeTo(array, this, bodyLength, retcode);
request.getHeader().writeTo(array, this, request.isKeepAlive() ? KEEPALIVE_ON : KEEPALIVE_OFF, bodyLength, retcode);
}
@Override