sncp协议优化

This commit is contained in:
redkale
2023-07-05 14:51:28 +08:00
parent cfd22623b7
commit 1b7bdd2ad9
14 changed files with 263 additions and 159 deletions

View File

@@ -13,6 +13,7 @@ import org.redkale.convert.bson.BsonConvert;
import org.redkale.convert.json.JsonConvert;
import org.redkale.net.http.HttpSimpleRequest;
import org.redkale.net.sncp.SncpHeader;
import org.redkale.util.ByteArray;
/**
* 存在MQ里面的数据结构<p>
@@ -318,8 +319,8 @@ public class MessageRecord implements Serializable {
sb.append(",\"respTopic\":\"").append(this.respTopic).append("\"");
}
if (this.content != null) {
if (this.ctype == CTYPE_BSON_RESULT && this.content.length > SncpHeader.HEADER_SIZE) {
int offset = SncpHeader.HEADER_SIZE + 1; //循环占位符
if (this.ctype == CTYPE_BSON_RESULT && this.content.length > SncpHeader.HEADER_SUBSIZE) {
int offset = new ByteArray(this.content).getChar(0) + 1; //循环占位符
Object rs = BsonConvert.root().convertFrom(Object.class, this.content, offset, this.content.length - offset);
sb.append(",\"content\":").append(rs);
} else if (this.ctype == CTYPE_HTTP_REQUEST) {

View File

@@ -41,14 +41,15 @@ public class SncpMessageResponse extends SncpResponse {
if (callback != null) {
callback.run();
}
int headerSize = SncpHeader.calcHeaderSize(request);
if (out == null) {
final ByteArray result = onlyHeaderData;
fillHeader(result, 0, retcode);
final ByteArray result = new ByteArray(headerSize).putPlaceholder(headerSize);
writeHeader(result, 0, retcode);
producer.apply(messageClient.createMessageRecord(message.getSeqid(), message.getRespTopic(), null, (byte[]) null));
return;
}
final ByteArray result = out.toByteArray();
fillHeader(result, result.length() - SncpHeader.HEADER_SIZE, retcode);
writeHeader(result, result.length() - headerSize, retcode);
producer.apply(messageClient.createMessageRecord(message.getSeqid(), message.getRespTopic(), null, result.getBytes()));
}
}

View File

@@ -51,7 +51,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
protected final ByteArray writeArray = new ByteArray();
protected final ThreadLocal<ByteArray> arrayThreadLocal = ThreadLocal.withInitial(() -> new ByteArray());
protected final ThreadLocal<ByteArray> arrayThreadLocal = ThreadLocal.withInitial(ByteArray::new);
protected final ByteBuffer writeBuffer;

View File

@@ -6,6 +6,7 @@
package org.redkale.net.client;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.function.Function;
import org.redkale.net.WorkThread;
import org.redkale.util.*;
@@ -21,6 +22,8 @@ import org.redkale.util.*;
*/
public abstract class ClientRequest {
public static final byte[] EMPTY_TRACEID = new byte[0];
protected long createTime = System.currentTimeMillis();
protected WorkThread workThread;
@@ -60,6 +63,10 @@ public abstract class ClientRequest {
return traceid;
}
public byte[] traceBytes() {
return Utility.isEmpty(traceid) ? EMPTY_TRACEID : traceid.getBytes(StandardCharsets.UTF_8);
}
public <T extends ClientRequest> T workThread(WorkThread thread) {
this.workThread = thread;
return (T) this;

View File

@@ -36,12 +36,6 @@ import org.redkale.util.*;
*/
public abstract class Sncp {
private static final byte[] PING_BYTES = new SncpHeader(null, Uint128.ZERO, "", Uint128.ZERO, "")
.writeTo(new ByteArray(SncpHeader.HEADER_SIZE).putPlaceholder(SncpHeader.HEADER_SIZE), null, 0, 0, 0)
.getBytes();
private static final byte[] PONG_BYTES = Arrays.copyOf(PING_BYTES, PING_BYTES.length);
static final String FIELDPREFIX = "_redkale";
/**
@@ -70,14 +64,6 @@ public abstract class Sncp {
private Sncp() {
}
public static byte[] getPingBytes() {
return Arrays.copyOf(PING_BYTES, PING_BYTES.length);
}
public static byte[] getPongBytes() {
return Arrays.copyOf(PONG_BYTES, PONG_BYTES.length);
}
//key: actionid
public static LinkedHashMap<Uint128, Method> loadMethodActions(final Class serviceTypeOrImplClass) {
final List<Method> list = new ArrayList<>();

View File

@@ -6,7 +6,6 @@ package org.redkale.net.sncp;
import java.nio.ByteBuffer;
import java.util.logging.Logger;
import org.redkale.net.client.ClientCodec;
import static org.redkale.net.sncp.SncpHeader.HEADER_SIZE;
import org.redkale.util.*;
/**
@@ -27,9 +26,13 @@ public class SncpClientCodec extends ClientCodec<SncpClientRequest, SncpClientRe
private ByteArray recyclableArray;
protected ByteArray halfBodyBytes;
private ByteArray halfBodyBytes;
protected ByteArray halfHeaderBytes;
private ByteArray halfHeaderBytes;
private int halfHeaderSize;
private Byte halfHeaderSizeFirstByte;
SncpClientResult lastResult = null;
@@ -59,22 +62,20 @@ public class SncpClientCodec extends ClientCodec<SncpClientRequest, SncpClientRe
public void decodeMessages(ByteBuffer realBuf, ByteArray array) {
ByteBuffer buffer = realBuf;
while (buffer.hasRemaining()) {
if (halfHeaderBytes != null) {
if (buffer.remaining() + halfHeaderBytes.length() < SncpHeader.HEADER_SIZE) { //buffer不足以读取完整header
if (this.halfHeaderBytes != null) {
if (buffer.remaining() + halfHeaderBytes.length() < halfHeaderSize - 2) { //buffer不足以读取完整header
halfHeaderBytes.put(buffer);
return;
}
halfHeaderBytes.put(buffer, SncpHeader.HEADER_SIZE - halfHeaderBytes.length());
halfHeaderBytes.put(buffer, halfHeaderSize - 2 - halfHeaderBytes.length());
//读取完整header
SncpClientResult result = new SncpClientResult();
int headerSize = result.readHeader(halfHeaderBytes);
if (headerSize != HEADER_SIZE) {
occurError(null, new SncpException("sncp header length must be " + HEADER_SIZE + ", but " + headerSize)); //request不一定存在
result.readHeader(halfHeaderBytes, halfHeaderSize);
halfHeaderSize = 0;
if (!result.getHeader().isValid()) {
occurError(null, new SncpException("sncp header not valid"));
return;
}
//if (halfHeaderBytes.length() != HEADER_SIZE) {
// logger.log(Level.SEVERE, "halfHeaderBytes.length must be " + HEADER_SIZE + ", but " + halfHeaderBytes.length());
//}
halfHeaderBytes = null;
if (result.getBodyLength() < 1) {
addMessage(findRequest(result.getRequestid()), result);
@@ -116,15 +117,30 @@ public class SncpClientCodec extends ClientCodec<SncpClientRequest, SncpClientRe
lastResult = null;
continue;
}
if (buffer.remaining() < SncpHeader.HEADER_SIZE) { //buffer不足以读取完整header
halfHeaderBytes = pollArray();
halfHeaderBytes.put(buffer);
return;
if (this.halfHeaderSize < 1) {
if (buffer.remaining() < 2) { //只有一个字节
this.halfHeaderSizeFirstByte = buffer.get();
return;
} else {
if (this.halfHeaderSizeFirstByte != null) {
byte secondByte = buffer.get();
this.halfHeaderSize = (0xff00 & (this.halfHeaderSizeFirstByte << 8)) | (0xff & secondByte);
} else {
this.halfHeaderSize = buffer.getChar();
}
this.halfHeaderSizeFirstByte = null;
if (buffer.remaining() < this.halfHeaderSize - 2) { //buffer不足以读取完整header
this.halfHeaderBytes = pollArray();
this.halfHeaderBytes.put(buffer);
return;
}
}
}
SncpClientResult result = new SncpClientResult();
int headerSize = result.readHeader(buffer);
if (headerSize != HEADER_SIZE) {
occurError(null, new SncpException("sncp header length must be " + HEADER_SIZE + ", but " + headerSize)); //request不一定存在
result.readHeader(buffer, halfHeaderSize);
halfHeaderSize = 0;
if (!result.getHeader().isValid()) {
occurError(null, new SncpException("sncp header not valid"));
return;
}
if (result.getBodyLength() < 1) {

View File

@@ -47,6 +47,7 @@ public class SncpClientRequest extends ClientRequest {
boolean rs = super.recycle();
this.header = null;
this.seqid = 0;
this.traceid = null;
this.bodyContent = null;
return rs;
}
@@ -58,11 +59,11 @@ public class SncpClientRequest extends ClientRequest {
@Override
public void writeTo(ClientConnection conn, ByteArray array) {
array.putPlaceholder(SncpHeader.HEADER_SIZE);
array.putPlaceholder(SncpHeader.calcHeaderSize(this));
if (bodyContent == null) {
header.writeTo(array, header.getAddrBytes(), header.getAddrPort(), seqid, 0, 0);
header.writeTo(array, this, 0, 0);
} else {
header.writeTo(array, header.getAddrBytes(), header.getAddrPort(), seqid, bodyContent.length, 0);
header.writeTo(array, this, bodyContent.length, 0);
array.put(bodyContent);
}
}
@@ -70,7 +71,7 @@ public class SncpClientRequest extends ClientRequest {
@Override
public String toString() {
return getClass().getSimpleName() + "_" + Objects.hashCode(this) + "{"
+ "header=" + header + ", seqid =" + seqid
+ "header=" + header + ", seqid =" + seqid + ", traceid =" + traceid
+ ", body=[" + (bodyContent == null ? -1 : bodyContent.length) + "]"
+ "}";
}

View File

@@ -33,14 +33,14 @@ public class SncpClientResult {
return true;
}
protected int readHeader(ByteBuffer buffer) {
this.header = new SncpHeader();
return this.header.read(buffer);
protected boolean readHeader(ByteBuffer buffer, int headerSize) {
this.header = SncpHeader.read(buffer, headerSize);
return this.header.isValid();
}
protected int readHeader(ByteArray array) {
this.header = new SncpHeader();
return this.header.read(array);
protected boolean readHeader(ByteArray array, int headerSize) {
this.header = SncpHeader.read(array, headerSize);
return this.header.isValid();
}
protected boolean readBody(ByteBuffer buffer) {

View File

@@ -22,7 +22,7 @@ public class SncpDispatcherServlet extends DispatcherServlet<Uint128, SncpContex
private final ReentrantLock updateLock = new ReentrantLock();
private final byte[] pongBytes = Sncp.getPongBytes();
private final ThreadLocal<ByteArray> localArray = ThreadLocal.withInitial(ByteArray::new);
protected SncpDispatcherServlet() {
super();
@@ -83,7 +83,11 @@ public class SncpDispatcherServlet extends DispatcherServlet<Uint128, SncpContex
@Override
public void execute(SncpRequest request, SncpResponse response) throws IOException {
if (request.isPing()) {
response.finish(pongBytes);
ByteArray array = localArray.get().clear();
int headerSize = SncpHeader.calcHeaderSize(request);
array.putPlaceholder(headerSize);
response.writeHeader(array, 0, 0);
response.finish(array.getBytes());
return;
}
SncpServlet servlet = mappingServlet(request.getHeader().getServiceid());

View File

@@ -5,6 +5,7 @@ package org.redkale.net.sncp;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import org.redkale.util.*;
@@ -14,7 +15,7 @@ import org.redkale.util.*;
*/
public class SncpHeader {
public static final int HEADER_SIZE = 72;
public static final int HEADER_SUBSIZE = 72;
private static final byte[] EMPTY_ADDR = new byte[4];
@@ -24,8 +25,8 @@ public class SncpHeader {
private String serviceName;
//【预留字段】service接口版本
private int serviceVersion;
//sncp协议版本
private int sncpVersion;
private Uint128 actionid;
@@ -43,6 +44,9 @@ public class SncpHeader {
//时间戳
private long timestamp;
//日志ID
private String traceid;
//结果码非0表示错误
private int retcode;
@@ -51,90 +55,111 @@ public class SncpHeader {
private boolean valid;
public SncpHeader() {
private SncpHeader() {
}
public SncpHeader(InetSocketAddress clientSncpAddress, Uint128 serviceid, String serviceName, Uint128 actionid, String methodName) {
this.addrBytes = clientSncpAddress == null ? new byte[4] : clientSncpAddress.getAddress().getAddress();
this.addrPort = clientSncpAddress == null ? 0 : clientSncpAddress.getPort();
this.serviceid = serviceid;
this.serviceName = serviceName;
this.actionid = actionid;
this.methodName = methodName;
if (addrBytes.length != 4) {
throw new SncpException("address bytes length must be 4, but " + addrBytes.length);
public static SncpHeader create(InetSocketAddress clientSncpAddress, Uint128 serviceid, String serviceName, Uint128 actionid, String methodName) {
SncpHeader header = new SncpHeader();
header.addrBytes = clientSncpAddress == null ? new byte[4] : clientSncpAddress.getAddress().getAddress();
header.addrPort = clientSncpAddress == null ? 0 : clientSncpAddress.getPort();
header.serviceid = serviceid;
header.serviceName = serviceName;
header.actionid = actionid;
header.methodName = methodName;
if (header.addrBytes.length != 4) {
throw new SncpException("address bytes length must be 4, but " + header.addrBytes.length);
}
return header;
}
//返回Header Size
public int read(ByteBuffer buffer) {
this.seqid = buffer.getLong(); //8
int size = buffer.getChar();
this.valid = size != HEADER_SIZE; //2
this.serviceid = Uint128.read(buffer); //16
this.serviceVersion = buffer.getInt(); //4
this.actionid = Uint128.read(buffer); //16
if (this.addrBytes == null) {
this.addrBytes = new byte[4];
//此处的buffer不包含开头headerSize的两字节返回Header Size
public static SncpHeader read(ByteBuffer buffer, final int headerSize) {
SncpHeader header = new SncpHeader();
header.valid = headerSize > HEADER_SUBSIZE; //2
header.seqid = buffer.getLong(); //8
header.serviceid = Uint128.read(buffer); //16
header.sncpVersion = buffer.getInt(); //4
header.actionid = Uint128.read(buffer); //16
if (header.addrBytes == null) {
header.addrBytes = new byte[4];
}
buffer.get(this.addrBytes); //addr 4
this.addrPort = buffer.getChar(); //port 2
this.abilities = buffer.getInt(); //4
this.timestamp = buffer.getLong(); //8
this.retcode = buffer.getInt(); //4
this.bodyLength = buffer.getInt(); //4
return size;
buffer.get(header.addrBytes); //addr 4
header.addrPort = buffer.getChar(); //port 2
header.abilities = buffer.getInt(); //4
header.timestamp = buffer.getLong(); //8
int traceSize = buffer.getChar(); //2
if (traceSize > 0) {
byte[] traces = new byte[traceSize];
buffer.get(traces);
header.traceid = new String(traces, StandardCharsets.UTF_8);
}
header.retcode = buffer.getInt(); //4
header.bodyLength = buffer.getInt(); //4
return header;
}
//返回Header Size
public int read(ByteArray array) {
//此处的array不包含开头headerSize的两字节返回Header Size
public static SncpHeader read(ByteArray array, final int headerSize) {
SncpHeader header = new SncpHeader();
header.valid = headerSize > HEADER_SUBSIZE; //2
int offset = 0;
this.seqid = array.getLong(offset); //8
header.seqid = array.getLong(offset); //8
offset += 8;
int size = array.getChar(offset);
this.valid = size != HEADER_SIZE; //2
offset += 2;
this.serviceid = array.getUint128(offset); //16
header.serviceid = array.getUint128(offset); //16
offset += 16;
this.serviceVersion = array.getInt(offset); //4
header.sncpVersion = array.getInt(offset); //4
offset += 4;
this.actionid = array.getUint128(offset); //16
header.actionid = array.getUint128(offset); //16
offset += 16;
this.addrBytes = array.getBytes(offset, 4); //addr 4
header.addrBytes = array.getBytes(offset, 4); //addr 4
offset += 4;
this.addrPort = array.getChar(offset); //port 2
header.addrPort = array.getChar(offset); //port 2
offset += 2;
this.abilities = array.getInt(offset); //4
header.abilities = array.getInt(offset); //4
offset += 4;
this.timestamp = array.getLong(offset); //8
header.timestamp = array.getLong(offset); //8
offset += 8;
this.retcode = array.getInt(offset); //4
int traceSize = array.getChar(offset); //2
offset += 2;
if (traceSize > 0) {
byte[] traces = array.getBytes(offset, traceSize);
header.traceid = new String(traces, StandardCharsets.UTF_8);
offset += traceSize;
}
header.retcode = array.getInt(offset); //4
offset += 4;
this.bodyLength = array.getInt(offset); //4
return size;
header.bodyLength = array.getInt(offset); //4
return header;
}
public ByteArray writeTo(ByteArray array, InetSocketAddress address, long newSeqid, int bodyLength, int retcode) {
byte[] newAddrBytes = address == null ? EMPTY_ADDR : address.getAddress().getAddress();
int newAddrPort = address == null ? 0 : address.getPort();
return writeTo(array, newAddrBytes, newAddrPort, newSeqid, bodyLength, retcode);
public ByteArray writeTo(ByteArray array, SncpClientRequest clientRequest, int bodyLength, int retcode) {
return writeTo(array, this.addrBytes, this.addrPort, (Long) clientRequest.getRequestid(), clientRequest.traceBytes(), bodyLength, retcode);
}
public ByteArray writeTo(ByteArray array, byte[] newAddrBytes, int newAddrPort, long newSeqid, int bodyLength, int retcode) {
public ByteArray writeTo(ByteArray array, SncpResponse response, int bodyLength, int retcode) {
SncpRequest request = response.request();
return writeTo(array, response.addrBytes, response.addrPort, (Long) request.getRequestid(), request.traceBytes(), bodyLength, retcode);
}
private ByteArray writeTo(ByteArray array, byte[] newAddrBytes, int newAddrPort, long newSeqid, byte[] traces, int bodyLength, int retcode) {
if (newAddrBytes.length != 4) {
throw new SncpException("address bytes length must be 4, but " + newAddrBytes.length);
}
if (array.length() < HEADER_SIZE) {
throw new SncpException("ByteArray length must more " + HEADER_SIZE);
if (traces == null) {
traces = SncpClientRequest.EMPTY_TRACEID;
}
int size = HEADER_SUBSIZE + 2 + traces.length;
if (array.length() < size) {
throw new SncpException("ByteArray length must more " + size);
}
int offset = 0;
array.putChar(offset, (char) size); //2
offset += 2;
array.putLong(offset, newSeqid); //8
offset += 8;
array.putChar(offset, (char) HEADER_SIZE); //2
offset += 2;
array.putUint128(offset, serviceid); //16
offset += 16;
array.putInt(offset, serviceVersion); //4
array.putInt(offset, sncpVersion); //4
offset += 4;
array.putUint128(offset, actionid); //16
offset += 16;
@@ -146,9 +171,15 @@ public class SncpHeader {
offset += 4;
array.putLong(offset, System.currentTimeMillis()); //8
offset += 8;
array.putChar(offset, (char) traces.length); //2
offset += 2;
if (traces.length > 0) {
array.put(offset, traces); //traces.length
offset += traces.length;
}
array.putInt(offset, retcode); //4
offset += 4;
array.putInt(offset, bodyLength); //4
array.putInt(offset, bodyLength); //4
return array;
}
@@ -158,11 +189,12 @@ public class SncpHeader {
+ (this.seqid == null
? ("{serviceid=" + this.serviceid + ",serviceName=" + this.serviceName)
: ("{seqid=" + this.seqid + ",serviceid=" + this.serviceid + ",serviceName=" + this.serviceName))
+ ",serviceVersion=" + this.serviceVersion
+ ",sncpVersion=" + this.sncpVersion
+ ",actionid=" + this.actionid
+ ",methodName=" + this.methodName
+ ",address=" + getAddress()
+ ",timestamp=" + this.timestamp
+ ",traceid=" + getTraceid()
+ ",retcode=" + this.retcode
+ ",bodyLength=" + this.bodyLength
+ "}";
@@ -186,6 +218,14 @@ public class SncpHeader {
&& Objects.equals(this.actionid, other.actionid);
}
public static int calcHeaderSize(SncpClientRequest request) {
return HEADER_SUBSIZE + 2 + request.traceBytes().length;
}
public static int calcHeaderSize(SncpRequest request) {
return HEADER_SUBSIZE + 2 + request.traceBytes().length;
}
public Long getSeqid() {
return seqid;
}
@@ -194,8 +234,8 @@ public class SncpHeader {
return serviceid;
}
public int getServiceVersion() {
return serviceVersion;
public int getSncpVersion() {
return sncpVersion;
}
public Uint128 getActionid() {
@@ -210,12 +250,20 @@ public class SncpHeader {
return addrPort;
}
public int getBodyLength() {
return bodyLength;
public long getTimestamp() {
return timestamp;
}
public String getTraceid() {
return traceid;
}
public int getRetcode() {
return retcode;
}
public int getBodyLength() {
return bodyLength;
}
}

View File

@@ -15,7 +15,7 @@ import org.redkale.convert.*;
import org.redkale.convert.json.JsonConvert;
import org.redkale.mq.*;
import static org.redkale.net.sncp.Sncp.loadMethodActions;
import static org.redkale.net.sncp.SncpHeader.HEADER_SIZE;
import static org.redkale.net.sncp.SncpHeader.HEADER_SUBSIZE;
import org.redkale.service.*;
import org.redkale.util.*;
@@ -88,7 +88,7 @@ public class SncpRemoteInfo<T extends Service> {
this.topic = messageAgent == null ? null : messageAgent.generateSncpReqTopic(resourceName, resourceType);
for (Map.Entry<Uint128, Method> en : loadMethodActions(Sncp.getServiceType(serviceImplClass)).entrySet()) {
this.actions.put(en.getKey().toString(), new SncpRemoteAction(serviceImplClass, resourceType, en.getValue(), serviceid, en.getKey()));
this.actions.put(en.getKey().toString(), new SncpRemoteAction(serviceImplClass, resourceType, en.getValue(), serviceid, en.getKey(), sncpClient));
}
}
@@ -176,11 +176,11 @@ public class SncpRemoteInfo<T extends Service> {
return null;
}
ByteBuffer buffer = ByteBuffer.wrap(msg.getContent());
SncpHeader header = new SncpHeader();
int headerSize = header.read(buffer);
if (headerSize != HEADER_SIZE) {
throw new SncpException("sncp header length must be " + HEADER_SIZE + ", but is " + headerSize);
int headerSize = buffer.getChar();
if (headerSize <= HEADER_SUBSIZE) {
throw new SncpException("sncp header length must more " + HEADER_SUBSIZE + ", but is " + headerSize);
}
SncpHeader header = SncpHeader.read(buffer, headerSize);
if (!header.checkValid(action.header)) {
throw new SncpException("sncp header error, response-header:" + action.header + "+, response-header:" + header);
}
@@ -331,7 +331,7 @@ public class SncpRemoteInfo<T extends Service> {
protected final SncpHeader header;
@SuppressWarnings("unchecked")
SncpRemoteAction(final Class serviceImplClass, Class resourceType, Method method, Uint128 serviceid, Uint128 actionid) {
SncpRemoteAction(final Class serviceImplClass, Class resourceType, Method method, Uint128 serviceid, Uint128 actionid, final SncpClient sncpClient) {
this.actionid = actionid == null ? Sncp.actionid(method) : actionid;
Type rt = TypeToken.getGenericType(method.getGenericReturnType(), serviceImplClass);
this.returnObjectType = rt == void.class || rt == Void.class ? null : rt;
@@ -424,7 +424,7 @@ public class SncpRemoteInfo<T extends Service> {
this.paramHandlerClass = handlerFuncClass;
this.paramHandlerResultType = handlerResultType;
this.paramHandlerAttachIndex = handlerAttachIndex;
this.header = new SncpHeader(null, serviceid, resourceType.getName(), actionid, method.getName());
this.header = SncpHeader.create(sncpClient == null ? null : sncpClient.getClientSncpAddress(), serviceid, resourceType.getName(), actionid, method.getName());
if (this.paramHandlerIndex >= 0 && method.getReturnType() != void.class) {
throw new SncpException(method + " have CompletionHandler type parameter but return type is not void");
}

View File

@@ -7,13 +7,14 @@ package org.redkale.net.sncp;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.logging.Level;
import org.redkale.convert.*;
import org.redkale.convert.bson.BsonReader;
import org.redkale.net.Request;
import static org.redkale.net.sncp.SncpHeader.HEADER_SIZE;
import org.redkale.util.Uint128;
import static org.redkale.net.client.ClientRequest.EMPTY_TRACEID;
import org.redkale.util.*;
/**
*
@@ -36,6 +37,8 @@ public class SncpRequest extends Request<SncpContext> {
protected int readState = READ_STATE_ROUTE;
private int headerSize;
private SncpHeader header;
private int bodyOffset;
@@ -50,17 +53,21 @@ public class SncpRequest extends Request<SncpContext> {
@Override //request.header与response.header数据格式保持一致
protected int readHeader(ByteBuffer buffer, Request last) {
//---------------------head----------------------------------
//---------------------route----------------------------------
if (this.readState == READ_STATE_ROUTE) {
if (buffer.remaining() < HEADER_SIZE) {
return HEADER_SIZE - buffer.remaining(); //小于60
if (buffer.remaining() < 2) {
return 2 - buffer.remaining(); //小于2
}
this.header = new SncpHeader();
int headerSize = this.header.read(buffer);
if (headerSize != HEADER_SIZE) {
context.getLogger().log(Level.WARNING, "sncp buffer header.length not " + HEADER_SIZE + ", but " + headerSize);
this.headerSize = buffer.getChar();
if (headerSize < SncpHeader.HEADER_SUBSIZE) {
context.getLogger().log(Level.WARNING, "sncp buffer header.length must more " + SncpHeader.HEADER_SUBSIZE + ", but " + this.headerSize);
return -1;
}
this.readState = READ_STATE_HEADER;
}
//---------------------head----------------------------------
if (this.readState == READ_STATE_HEADER) {
this.header = SncpHeader.read(buffer, this.headerSize);
if (this.header.getRetcode() != 0) { // retcode
context.getLogger().log(Level.WARNING, "sncp buffer header.retcode not 0");
return -1;
@@ -108,7 +115,9 @@ public class SncpRequest extends Request<SncpContext> {
@Override
public String toString() {
return SncpRequest.class.getSimpleName() + "_" + Objects.hashCode(this) + "{header=" + this.header + ",bodyOffset=" + this.bodyOffset + ",body=[" + (this.body == null ? -1 : this.body.length) + "]}";
return SncpRequest.class.getSimpleName() + "_" + Objects.hashCode(this)
+ "{header=" + this.header + ",bodyOffset=" + this.bodyOffset
+ ",body=[" + (this.body == null ? -1 : this.body.length) + "]}";
}
@Override
@@ -134,6 +143,10 @@ public class SncpRequest extends Request<SncpContext> {
return body == null ? null : reader.setBytes(body);
}
public byte[] traceBytes() {
return Utility.isEmpty(traceid) ? EMPTY_TRACEID : traceid.getBytes(StandardCharsets.UTF_8);
}
public byte[] getBody() {
return body;
}

View File

@@ -10,7 +10,6 @@ import java.nio.channels.CompletionHandler;
import java.util.concurrent.*;
import org.redkale.convert.bson.BsonWriter;
import org.redkale.net.Response;
import static org.redkale.net.sncp.SncpHeader.HEADER_SIZE;
import org.redkale.util.ByteArray;
/**
@@ -30,14 +29,12 @@ public class SncpResponse extends Response<SncpContext, SncpRequest> {
public static final int RETCODE_THROWEXCEPTION = (1 << 4); //内部异常
private final byte[] addrBytes;
final byte[] addrBytes;
private final int addrPort;
final int addrPort;
protected final BsonWriter writer = new BsonWriter();
protected final ByteArray onlyHeaderData = new ByteArray(HEADER_SIZE).putPlaceholder(HEADER_SIZE);
protected final CompletionHandler realHandler = new CompletionHandler() {
@Override
public void completed(Object result, Object attachment) {
@@ -106,6 +103,14 @@ public class SncpResponse extends Response<SncpContext, SncpRequest> {
return writer;
}
protected SncpRequest request() {
return request;
}
protected void writeHeader(ByteArray array, int bodyLength, int retcode) {
request.getHeader().writeTo(array, this, bodyLength, retcode);
}
@Override
protected ExecutorService getWorkExecutor() {
return super.getWorkExecutor();
@@ -127,8 +132,9 @@ public class SncpResponse extends Response<SncpContext, SncpRequest> {
}
public final void finishVoid() {
int headerSize = SncpHeader.calcHeaderSize(request);
BsonWriter out = getBsonWriter();
out.writePlaceholderTo(HEADER_SIZE);
out.writePlaceholderTo(headerSize);
finish(0, out);
}
@@ -153,8 +159,9 @@ public class SncpResponse extends Response<SncpContext, SncpRequest> {
}
public final void finish(final Type type, final Object result) {
int headerSize = SncpHeader.calcHeaderSize(request);
BsonWriter out = getBsonWriter();
out.writePlaceholderTo(HEADER_SIZE);
out.writePlaceholderTo(headerSize);
if (result != null || type != Void.class) {
out.writeByte((byte) 0); //body的第一个字节为0表示返回结果对象而不是参数回调对象
context.getBsonConvert().convertTo(out, type, result);
@@ -162,23 +169,19 @@ public class SncpResponse extends Response<SncpContext, SncpRequest> {
finish(0, out);
}
//调用此方法时out已写入SncpHeader
//调用此方法时out已写入SncpHeader的占位空间
public void finish(final int retcode, final BsonWriter out) {
int headerSize = SncpHeader.calcHeaderSize(request);
if (out == null) {
final ByteArray array = onlyHeaderData;
fillHeader(array, 0, retcode);
final ByteArray array = new ByteArray(headerSize).putPlaceholder(headerSize);
writeHeader(array, 0, retcode);
finish(array);
return;
}
final ByteArray array = out.toByteArray();
final int bodyLength = array.length() - HEADER_SIZE;
fillHeader(array, bodyLength, retcode);
final int bodyLength = array.length() - headerSize;
writeHeader(array, bodyLength, retcode);
finish(array);
}
protected void fillHeader(ByteArray array, int bodyLength, int retcode) {
SncpHeader header = request.getHeader();
header.writeTo(array, this.addrBytes, this.addrPort, header.getSeqid(), bodyLength, retcode);
}
}

View File

@@ -43,22 +43,46 @@ public class SncpClientCodecTest {
} catch (Exception e) {
e.printStackTrace();
}
//----------------------------------------------
ByteBuffer realBuf;
//----------------------------------------------
respResults.clear();
{
SncpHeader header = new SncpHeader(sncpAddress, Uint128.ZERO, "", Uint128.ZERO, "");
SncpHeader header = SncpHeader.create(sncpAddress, Uint128.ZERO, "", Uint128.ZERO, "");
SncpClientRequest request = new SncpClientRequest();
ByteArray writeArray = new ByteArray();
request.prepare(header, 1, "", new byte[20]);
ByteArray writeArray1 = new ByteArray();
request.prepare(header, 1, "aa", new byte[20]);
request.writeTo(conn, writeArray1);
System.out.println("request.1 = " + request);
writeArray.put(new byte[SncpHeader.HEADER_SIZE]);
request.writeTo(conn, writeArray);
request.prepare(header, 2, "", new byte[25]);
System.out.println("headerSize = " + SncpHeader.calcHeaderSize(request) + ", arraySzie = " + writeArray1.getBytes().length);
ByteArray writeArray2 = new ByteArray();
request.prepare(header, 2, "bb", new byte[25]);
request.writeTo(conn, writeArray2);
System.out.println("request.2 = " + request);
writeArray.put(new byte[SncpHeader.HEADER_SIZE]);
request.writeTo(conn, writeArray);
System.out.println(writeArray.getBytes().length);
realBuf = ByteBuffer.wrap(writeArray.getBytes());
System.out.println("headerSize = " + SncpHeader.calcHeaderSize(request) + ", arraySzie = " + writeArray2.getBytes().length);
writeArray1.put(writeArray2);
realBuf = ByteBuffer.wrap(writeArray1.getBytes());
}
System.out.println("sncp.realBuf = " + realBuf.remaining());
codec.decodeMessages(realBuf, new ByteArray());
System.out.println("respResults.size = " + respResults.size());
Assertions.assertEquals(2, respResults.size());
//----------------------------------------------
respResults.clear();
{
SncpHeader header = SncpHeader.create(sncpAddress, Uint128.ZERO, "", Uint128.ZERO, "");
SncpClientRequest request = new SncpClientRequest();
ByteArray writeArray1 = new ByteArray();
request.prepare(header, 1, "", new byte[20]);
request.writeTo(conn, writeArray1);
System.out.println("request.1 = " + request);
System.out.println("headerSize = " + SncpHeader.calcHeaderSize(request) + ", arraySzie = " + writeArray1.getBytes().length);
ByteArray writeArray2 = new ByteArray();
request.prepare(header, 2, "", new byte[25]);
request.writeTo(conn, writeArray2);
System.out.println("request.2 = " + request);
System.out.println("headerSize = " + SncpHeader.calcHeaderSize(request) + ", arraySzie = " + writeArray2.getBytes().length);
writeArray1.put(writeArray2);
realBuf = ByteBuffer.wrap(writeArray1.getBytes());
}
System.out.println("sncp.realBuf = " + realBuf.remaining());
codec.decodeMessages(realBuf, new ByteArray());