diff --git a/src/main/java/org/redkale/cluster/HttpClusterRpcClient.java b/src/main/java/org/redkale/cluster/HttpClusterRpcClient.java index 3e6de7980..83115b40d 100644 --- a/src/main/java/org/redkale/cluster/HttpClusterRpcClient.java +++ b/src/main/java/org/redkale/cluster/HttpClusterRpcClient.java @@ -161,9 +161,9 @@ public class HttpClusterRpcClient extends HttpRpcClient { String url = "http://" + addr.getHostString() + ":" + addr.getPort() + requesturi; if (finest) { if (clientBody != null) { - logger.log(Level.FINEST, "forEachCollectionFuture: url=" + url + ", body=" + new String(clientBody, StandardCharsets.UTF_8) + ", headers=" + clientHeaders); + logger.log(Level.FINEST, "forEachCollectionFuture: url: " + url + ", body: " + new String(clientBody, StandardCharsets.UTF_8) + ", headers: " + clientHeaders); } else { - logger.log(Level.FINEST, "forEachCollectionFuture: url=" + url + ", headers=" + clientHeaders); + logger.log(Level.FINEST, "forEachCollectionFuture: url: " + url + ", headers: " + clientHeaders); } } if (httpSimpleClient != null) { diff --git a/src/main/java/org/redkale/net/Request.java b/src/main/java/org/redkale/net/Request.java index edbf80498..363a13ac9 100644 --- a/src/main/java/org/redkale/net/Request.java +++ b/src/main/java/org/redkale/net/Request.java @@ -151,6 +151,10 @@ public abstract class Request { return attributes; } + public boolean isKeepAlive() { + return keepAlive; + } + public C getContext() { return this.context; } diff --git a/src/main/java/org/redkale/net/client/Client.java b/src/main/java/org/redkale/net/client/Client.java index c0a8be959..4f478341b 100644 --- a/src/main/java/org/redkale/net/client/Client.java +++ b/src/main/java/org/redkale/net/client/Client.java @@ -26,7 +26,7 @@ import org.redkale.util.*; * @param 请求对象 * @param

响应对象 */ -public abstract class Client, R extends ClientRequest, P> implements Resourcable { +public abstract class Client, R extends ClientRequest, P extends ClientResult> implements Resourcable { public static final int DEFAULT_MAX_PIPELINES = 128; diff --git a/src/main/java/org/redkale/net/client/ClientCodec.java b/src/main/java/org/redkale/net/client/ClientCodec.java index 71ae20d46..b88b69aa3 100644 --- a/src/main/java/org/redkale/net/client/ClientCodec.java +++ b/src/main/java/org/redkale/net/client/ClientCodec.java @@ -25,7 +25,7 @@ import org.redkale.util.*; * @param ClientRequest * @param

响应对象 */ -public abstract class ClientCodec implements CompletionHandler { +public abstract class ClientCodec implements CompletionHandler { private final List> respResults = new ArrayList<>(); @@ -166,6 +166,9 @@ public abstract class ClientCodec implements Complet Traces.removeTraceid(); }); } + if (!message.isKeepAlive()) { + connection.dispose(null); + } } else { //异常 if (workThread == null) { readThread.runWork(() -> { diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index d8ee26356..bf83a4e3c 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -30,7 +30,7 @@ import org.redkale.util.*; * @param 请求对象 * @param

响应对象 */ -public abstract class ClientConnection implements Consumer { +public abstract class ClientConnection implements Consumer { //=-1 表示连接放在connAddrEntrys存储 //>=0 表示connArray的下坐标,从0开始 diff --git a/src/main/java/org/redkale/net/client/ClientResponse.java b/src/main/java/org/redkale/net/client/ClientResponse.java index 3267ad66a..780aa903e 100644 --- a/src/main/java/org/redkale/net/client/ClientResponse.java +++ b/src/main/java/org/redkale/net/client/ClientResponse.java @@ -19,7 +19,7 @@ import java.io.Serializable; * @param 请求对象 * @param

message */ -public class ClientResponse { +public class ClientResponse { protected R request; //服务端返回一个不存在的requestid,可能为null @@ -105,7 +105,7 @@ public class ClientResponse { return false; } - static class ClientErrorResponse extends ClientResponse { + static class ClientErrorResponse extends ClientResponse { public ClientErrorResponse(R request, Throwable exc) { super(request, exc); diff --git a/src/main/java/org/redkale/net/client/ClientResult.java b/src/main/java/org/redkale/net/client/ClientResult.java new file mode 100644 index 000000000..a1dce01b7 --- /dev/null +++ b/src/main/java/org/redkale/net/client/ClientResult.java @@ -0,0 +1,20 @@ +/* + * + */ +package org.redkale.net.client; + +/** + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * + * @since 2.8.0 + * + */ +public interface ClientResult { + + public boolean isKeepAlive(); + +} diff --git a/src/main/java/org/redkale/net/http/HttpRequest.java b/src/main/java/org/redkale/net/http/HttpRequest.java index 5ab7227ea..5f2139962 100644 --- a/src/main/java/org/redkale/net/http/HttpRequest.java +++ b/src/main/java/org/redkale/net/http/HttpRequest.java @@ -282,10 +282,6 @@ public class HttpRequest extends Request { this.keepAlive = keepAlive; } - protected boolean isKeepAlive() { - return this.keepAlive; - } - protected ConvertType getRespConvertType() { return this.respConvertType; } diff --git a/src/main/java/org/redkale/net/sncp/SncpClientRequest.java b/src/main/java/org/redkale/net/sncp/SncpClientRequest.java index 0c2c6ca37..66ce0ef9a 100644 --- a/src/main/java/org/redkale/net/sncp/SncpClientRequest.java +++ b/src/main/java/org/redkale/net/sncp/SncpClientRequest.java @@ -61,9 +61,9 @@ public class SncpClientRequest extends ClientRequest { public void writeTo(ClientConnection conn, ByteArray array) { array.putPlaceholder(SncpHeader.calcHeaderSize(this)); if (bodyContent == null) { - header.writeTo(array, this, 0, 0); + header.writeTo(array, this, SncpHeader.KEEPALIVE_ON, 0, 0); } else { - header.writeTo(array, this, bodyContent.length, 0); + header.writeTo(array, this, SncpHeader.KEEPALIVE_ON, bodyContent.length, 0); array.put(bodyContent); } } diff --git a/src/main/java/org/redkale/net/sncp/SncpClientResult.java b/src/main/java/org/redkale/net/sncp/SncpClientResult.java index 117f92c76..459d49f7c 100644 --- a/src/main/java/org/redkale/net/sncp/SncpClientResult.java +++ b/src/main/java/org/redkale/net/sncp/SncpClientResult.java @@ -6,6 +6,7 @@ package org.redkale.net.sncp; import java.io.Serializable; import java.nio.ByteBuffer; import java.util.Objects; +import org.redkale.net.client.ClientResult; import org.redkale.util.ByteArray; /** @@ -18,7 +19,7 @@ import org.redkale.util.ByteArray; * * @since 2.8.0 */ -public class SncpClientResult { +public class SncpClientResult implements ClientResult { private SncpHeader header; @@ -54,6 +55,11 @@ public class SncpClientResult { return header == null ? null : header.getSeqid(); } + @Override + public boolean isKeepAlive() { + return header == null ? false : header.isKeepAlive(); + } + @Override public String toString() { return getClass().getSimpleName() + "_" + Objects.hashCode(this) + "{" diff --git a/src/main/java/org/redkale/net/sncp/SncpHeader.java b/src/main/java/org/redkale/net/sncp/SncpHeader.java index 89dda5bc1..0ba4a4933 100644 --- a/src/main/java/org/redkale/net/sncp/SncpHeader.java +++ b/src/main/java/org/redkale/net/sncp/SncpHeader.java @@ -15,7 +15,11 @@ import org.redkale.util.*; */ public class SncpHeader { - public static final int HEADER_SUBSIZE = 72; + public static final int HEADER_SUBSIZE = 69; + + public static final byte KEEPALIVE_ON = 0; + + public static final byte KEEPALIVE_OFF = -1; private static final byte[] EMPTY_ADDR = new byte[4]; @@ -38,8 +42,8 @@ public class SncpHeader { //响应方地址端口 private int addrPort; - // 预留扩展位 - private int abilities; + //保持连接,0:keepAlive; -1:关闭连接 + private byte keepAlive; //时间戳 private long timestamp; @@ -85,7 +89,7 @@ public class SncpHeader { } buffer.get(header.addrBytes); //addr 4 header.addrPort = buffer.getChar(); //port 2 - header.abilities = buffer.getInt(); //4 + header.keepAlive = buffer.get(); //1 header.timestamp = buffer.getLong(); //8 int traceSize = buffer.getChar(); //2 if (traceSize > 0) { @@ -115,8 +119,8 @@ public class SncpHeader { offset += 4; header.addrPort = array.getChar(offset); //port 2 offset += 2; - header.abilities = array.getInt(offset); //4 - offset += 4; + header.keepAlive = array.get(offset); //1 + offset += 1; header.timestamp = array.getLong(offset); //8 offset += 8; int traceSize = array.getChar(offset); //2 @@ -132,16 +136,16 @@ public class SncpHeader { return header; } - public ByteArray writeTo(ByteArray array, SncpClientRequest clientRequest, int bodyLength, int retcode) { - return writeTo(array, this.addrBytes, this.addrPort, (Long) clientRequest.getRequestid(), clientRequest.traceBytes(), bodyLength, retcode); + public ByteArray writeTo(ByteArray array, SncpClientRequest clientRequest, byte keepAlive, int bodyLength, int retcode) { + return writeTo(array, this.addrBytes, this.addrPort, (Long) clientRequest.getRequestid(), clientRequest.traceBytes(), keepAlive, bodyLength, retcode); } - public ByteArray writeTo(ByteArray array, SncpResponse response, int bodyLength, int retcode) { + public ByteArray writeTo(ByteArray array, SncpResponse response, byte keepAlive, int bodyLength, int retcode) { SncpRequest request = response.request(); - return writeTo(array, response.addrBytes, response.addrPort, (Long) request.getRequestid(), request.traceBytes(), bodyLength, retcode); + return writeTo(array, response.addrBytes, response.addrPort, (Long) request.getRequestid(), request.traceBytes(), keepAlive, bodyLength, retcode); } - private ByteArray writeTo(ByteArray array, byte[] newAddrBytes, int newAddrPort, long newSeqid, byte[] traces, int bodyLength, int retcode) { + private ByteArray writeTo(ByteArray array, byte[] newAddrBytes, int newAddrPort, long newSeqid, byte[] traces, byte keepAlive, int bodyLength, int retcode) { if (newAddrBytes.length != 4) { throw new SncpException("address bytes length must be 4, but " + newAddrBytes.length); } @@ -167,8 +171,8 @@ public class SncpHeader { offset += 4; array.putChar(offset, (char) newAddrPort); //2 offset += 2; - array.putInt(offset, abilities); //4 - offset += 4; + array.put(offset, keepAlive); //1 + offset += 1; array.putLong(offset, System.currentTimeMillis()); //8 offset += 8; array.putChar(offset, (char) traces.length); //2 @@ -193,6 +197,7 @@ public class SncpHeader { + ",actionid=" + this.actionid + ",methodName=" + this.methodName + ",address=" + getAddress() + + ",keepAlive=" + isKeepAlive() + ",timestamp=" + this.timestamp + ",traceid=" + getTraceid() + ",retcode=" + this.retcode @@ -210,6 +215,10 @@ public class SncpHeader { return valid; } + public boolean isKeepAlive() { + return keepAlive != -1; + } + //供client端request和response的header判断 public boolean checkValid(SncpHeader other) { return Objects.equals(this.serviceid, other.serviceid) diff --git a/src/main/java/org/redkale/net/sncp/SncpResponse.java b/src/main/java/org/redkale/net/sncp/SncpResponse.java index 853f51e81..249d63aed 100644 --- a/src/main/java/org/redkale/net/sncp/SncpResponse.java +++ b/src/main/java/org/redkale/net/sncp/SncpResponse.java @@ -11,6 +11,8 @@ import java.util.concurrent.*; import org.redkale.asm.AsmDepends; import org.redkale.convert.bson.BsonWriter; import org.redkale.net.Response; +import static org.redkale.net.sncp.SncpHeader.KEEPALIVE_OFF; +import static org.redkale.net.sncp.SncpHeader.KEEPALIVE_ON; import org.redkale.util.ByteArray; import org.redkale.util.Traces; @@ -111,7 +113,7 @@ public class SncpResponse extends Response { } protected void writeHeader(ByteArray array, int bodyLength, int retcode) { - request.getHeader().writeTo(array, this, bodyLength, retcode); + request.getHeader().writeTo(array, this, request.isKeepAlive() ? KEEPALIVE_ON : KEEPALIVE_OFF, bodyLength, retcode); } @Override