diff --git a/src/main/java/org/redkale/mq/MessageRecord.java b/src/main/java/org/redkale/mq/MessageRecord.java index d05f58a8d..7b0924abc 100644 --- a/src/main/java/org/redkale/mq/MessageRecord.java +++ b/src/main/java/org/redkale/mq/MessageRecord.java @@ -13,6 +13,7 @@ import org.redkale.convert.bson.BsonConvert; import org.redkale.convert.json.JsonConvert; import org.redkale.net.http.HttpSimpleRequest; import org.redkale.net.sncp.SncpHeader; +import org.redkale.util.ByteArray; /** * 存在MQ里面的数据结构

@@ -318,8 +319,8 @@ public class MessageRecord implements Serializable { sb.append(",\"respTopic\":\"").append(this.respTopic).append("\""); } if (this.content != null) { - if (this.ctype == CTYPE_BSON_RESULT && this.content.length > SncpHeader.HEADER_SIZE) { - int offset = SncpHeader.HEADER_SIZE + 1; //循环占位符 + if (this.ctype == CTYPE_BSON_RESULT && this.content.length > SncpHeader.HEADER_SUBSIZE) { + int offset = new ByteArray(this.content).getChar(0) + 1; //循环占位符 Object rs = BsonConvert.root().convertFrom(Object.class, this.content, offset, this.content.length - offset); sb.append(",\"content\":").append(rs); } else if (this.ctype == CTYPE_HTTP_REQUEST) { diff --git a/src/main/java/org/redkale/mq/SncpMessageResponse.java b/src/main/java/org/redkale/mq/SncpMessageResponse.java index db98c9880..86305262e 100644 --- a/src/main/java/org/redkale/mq/SncpMessageResponse.java +++ b/src/main/java/org/redkale/mq/SncpMessageResponse.java @@ -41,14 +41,15 @@ public class SncpMessageResponse extends SncpResponse { if (callback != null) { callback.run(); } + int headerSize = SncpHeader.calcHeaderSize(request); if (out == null) { - final ByteArray result = onlyHeaderData; - fillHeader(result, 0, retcode); + final ByteArray result = new ByteArray(headerSize).putPlaceholder(headerSize); + writeHeader(result, 0, retcode); producer.apply(messageClient.createMessageRecord(message.getSeqid(), message.getRespTopic(), null, (byte[]) null)); return; } final ByteArray result = out.toByteArray(); - fillHeader(result, result.length() - SncpHeader.HEADER_SIZE, retcode); + writeHeader(result, result.length() - headerSize, retcode); producer.apply(messageClient.createMessageRecord(message.getSeqid(), message.getRespTopic(), null, result.getBytes())); } } diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index 583120b25..d679b8c37 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -51,7 +51,7 @@ public abstract class ClientConnection implements Co protected final ByteArray writeArray = new ByteArray(); - protected final ThreadLocal arrayThreadLocal = ThreadLocal.withInitial(() -> new ByteArray()); + protected final ThreadLocal arrayThreadLocal = ThreadLocal.withInitial(ByteArray::new); protected final ByteBuffer writeBuffer; diff --git a/src/main/java/org/redkale/net/client/ClientRequest.java b/src/main/java/org/redkale/net/client/ClientRequest.java index cce76e68a..a4c128807 100644 --- a/src/main/java/org/redkale/net/client/ClientRequest.java +++ b/src/main/java/org/redkale/net/client/ClientRequest.java @@ -6,6 +6,7 @@ package org.redkale.net.client; import java.io.Serializable; +import java.nio.charset.StandardCharsets; import java.util.function.Function; import org.redkale.net.WorkThread; import org.redkale.util.*; @@ -21,6 +22,8 @@ import org.redkale.util.*; */ public abstract class ClientRequest { + public static final byte[] EMPTY_TRACEID = new byte[0]; + protected long createTime = System.currentTimeMillis(); protected WorkThread workThread; @@ -60,6 +63,10 @@ public abstract class ClientRequest { return traceid; } + public byte[] traceBytes() { + return Utility.isEmpty(traceid) ? EMPTY_TRACEID : traceid.getBytes(StandardCharsets.UTF_8); + } + public T workThread(WorkThread thread) { this.workThread = thread; return (T) this; diff --git a/src/main/java/org/redkale/net/sncp/Sncp.java b/src/main/java/org/redkale/net/sncp/Sncp.java index b69182587..e5006a34d 100644 --- a/src/main/java/org/redkale/net/sncp/Sncp.java +++ b/src/main/java/org/redkale/net/sncp/Sncp.java @@ -36,12 +36,6 @@ import org.redkale.util.*; */ public abstract class Sncp { - private static final byte[] PING_BYTES = new SncpHeader(null, Uint128.ZERO, "", Uint128.ZERO, "") - .writeTo(new ByteArray(SncpHeader.HEADER_SIZE).putPlaceholder(SncpHeader.HEADER_SIZE), null, 0, 0, 0) - .getBytes(); - - private static final byte[] PONG_BYTES = Arrays.copyOf(PING_BYTES, PING_BYTES.length); - static final String FIELDPREFIX = "_redkale"; /** @@ -70,14 +64,6 @@ public abstract class Sncp { private Sncp() { } - public static byte[] getPingBytes() { - return Arrays.copyOf(PING_BYTES, PING_BYTES.length); - } - - public static byte[] getPongBytes() { - return Arrays.copyOf(PONG_BYTES, PONG_BYTES.length); - } - //key: actionid public static LinkedHashMap loadMethodActions(final Class serviceTypeOrImplClass) { final List list = new ArrayList<>(); diff --git a/src/main/java/org/redkale/net/sncp/SncpClientCodec.java b/src/main/java/org/redkale/net/sncp/SncpClientCodec.java index 97d8a7118..82f6f5123 100644 --- a/src/main/java/org/redkale/net/sncp/SncpClientCodec.java +++ b/src/main/java/org/redkale/net/sncp/SncpClientCodec.java @@ -6,7 +6,6 @@ package org.redkale.net.sncp; import java.nio.ByteBuffer; import java.util.logging.Logger; import org.redkale.net.client.ClientCodec; -import static org.redkale.net.sncp.SncpHeader.HEADER_SIZE; import org.redkale.util.*; /** @@ -27,9 +26,13 @@ public class SncpClientCodec extends ClientCodec localArray = ThreadLocal.withInitial(ByteArray::new); protected SncpDispatcherServlet() { super(); @@ -83,7 +83,11 @@ public class SncpDispatcherServlet extends DispatcherServlet HEADER_SUBSIZE; //2 + header.seqid = buffer.getLong(); //8 + header.serviceid = Uint128.read(buffer); //16 + header.sncpVersion = buffer.getInt(); //4 + header.actionid = Uint128.read(buffer); //16 + if (header.addrBytes == null) { + header.addrBytes = new byte[4]; } - buffer.get(this.addrBytes); //addr 4 - this.addrPort = buffer.getChar(); //port 2 - this.abilities = buffer.getInt(); //4 - this.timestamp = buffer.getLong(); //8 - this.retcode = buffer.getInt(); //4 - this.bodyLength = buffer.getInt(); //4 - return size; + buffer.get(header.addrBytes); //addr 4 + header.addrPort = buffer.getChar(); //port 2 + header.abilities = buffer.getInt(); //4 + header.timestamp = buffer.getLong(); //8 + int traceSize = buffer.getChar(); //2 + if (traceSize > 0) { + byte[] traces = new byte[traceSize]; + buffer.get(traces); + header.traceid = new String(traces, StandardCharsets.UTF_8); + } + header.retcode = buffer.getInt(); //4 + header.bodyLength = buffer.getInt(); //4 + return header; } - //返回Header Size - public int read(ByteArray array) { + //此处的array不包含开头headerSize的两字节,返回Header Size + public static SncpHeader read(ByteArray array, final int headerSize) { + SncpHeader header = new SncpHeader(); + header.valid = headerSize > HEADER_SUBSIZE; //2 int offset = 0; - this.seqid = array.getLong(offset); //8 + header.seqid = array.getLong(offset); //8 offset += 8; - int size = array.getChar(offset); - this.valid = size != HEADER_SIZE; //2 - offset += 2; - this.serviceid = array.getUint128(offset); //16 + header.serviceid = array.getUint128(offset); //16 offset += 16; - this.serviceVersion = array.getInt(offset); //4 + header.sncpVersion = array.getInt(offset); //4 offset += 4; - this.actionid = array.getUint128(offset); //16 + header.actionid = array.getUint128(offset); //16 offset += 16; - this.addrBytes = array.getBytes(offset, 4); //addr 4 + header.addrBytes = array.getBytes(offset, 4); //addr 4 offset += 4; - this.addrPort = array.getChar(offset); //port 2 + header.addrPort = array.getChar(offset); //port 2 offset += 2; - this.abilities = array.getInt(offset); //4 + header.abilities = array.getInt(offset); //4 offset += 4; - this.timestamp = array.getLong(offset); //8 + header.timestamp = array.getLong(offset); //8 offset += 8; - this.retcode = array.getInt(offset); //4 + int traceSize = array.getChar(offset); //2 + offset += 2; + if (traceSize > 0) { + byte[] traces = array.getBytes(offset, traceSize); + header.traceid = new String(traces, StandardCharsets.UTF_8); + offset += traceSize; + } + header.retcode = array.getInt(offset); //4 offset += 4; - this.bodyLength = array.getInt(offset); //4 - return size; + header.bodyLength = array.getInt(offset); //4 + return header; } - public ByteArray writeTo(ByteArray array, InetSocketAddress address, long newSeqid, int bodyLength, int retcode) { - byte[] newAddrBytes = address == null ? EMPTY_ADDR : address.getAddress().getAddress(); - int newAddrPort = address == null ? 0 : address.getPort(); - return writeTo(array, newAddrBytes, newAddrPort, newSeqid, bodyLength, retcode); + public ByteArray writeTo(ByteArray array, SncpClientRequest clientRequest, int bodyLength, int retcode) { + return writeTo(array, this.addrBytes, this.addrPort, (Long) clientRequest.getRequestid(), clientRequest.traceBytes(), bodyLength, retcode); } - public ByteArray writeTo(ByteArray array, byte[] newAddrBytes, int newAddrPort, long newSeqid, int bodyLength, int retcode) { + public ByteArray writeTo(ByteArray array, SncpResponse response, int bodyLength, int retcode) { + SncpRequest request = response.request(); + return writeTo(array, response.addrBytes, response.addrPort, (Long) request.getRequestid(), request.traceBytes(), bodyLength, retcode); + } + + private ByteArray writeTo(ByteArray array, byte[] newAddrBytes, int newAddrPort, long newSeqid, byte[] traces, int bodyLength, int retcode) { if (newAddrBytes.length != 4) { throw new SncpException("address bytes length must be 4, but " + newAddrBytes.length); } - if (array.length() < HEADER_SIZE) { - throw new SncpException("ByteArray length must more " + HEADER_SIZE); + if (traces == null) { + traces = SncpClientRequest.EMPTY_TRACEID; + } + int size = HEADER_SUBSIZE + 2 + traces.length; + if (array.length() < size) { + throw new SncpException("ByteArray length must more " + size); } int offset = 0; + array.putChar(offset, (char) size); //2 + offset += 2; array.putLong(offset, newSeqid); //8 offset += 8; - array.putChar(offset, (char) HEADER_SIZE); //2 - offset += 2; array.putUint128(offset, serviceid); //16 offset += 16; - array.putInt(offset, serviceVersion); //4 + array.putInt(offset, sncpVersion); //4 offset += 4; array.putUint128(offset, actionid); //16 offset += 16; @@ -146,9 +171,15 @@ public class SncpHeader { offset += 4; array.putLong(offset, System.currentTimeMillis()); //8 offset += 8; + array.putChar(offset, (char) traces.length); //2 + offset += 2; + if (traces.length > 0) { + array.put(offset, traces); //traces.length + offset += traces.length; + } array.putInt(offset, retcode); //4 offset += 4; - array.putInt(offset, bodyLength); //4 + array.putInt(offset, bodyLength); //4 return array; } @@ -158,11 +189,12 @@ public class SncpHeader { + (this.seqid == null ? ("{serviceid=" + this.serviceid + ",serviceName=" + this.serviceName) : ("{seqid=" + this.seqid + ",serviceid=" + this.serviceid + ",serviceName=" + this.serviceName)) - + ",serviceVersion=" + this.serviceVersion + + ",sncpVersion=" + this.sncpVersion + ",actionid=" + this.actionid + ",methodName=" + this.methodName + ",address=" + getAddress() + ",timestamp=" + this.timestamp + + ",traceid=" + getTraceid() + ",retcode=" + this.retcode + ",bodyLength=" + this.bodyLength + "}"; @@ -186,6 +218,14 @@ public class SncpHeader { && Objects.equals(this.actionid, other.actionid); } + public static int calcHeaderSize(SncpClientRequest request) { + return HEADER_SUBSIZE + 2 + request.traceBytes().length; + } + + public static int calcHeaderSize(SncpRequest request) { + return HEADER_SUBSIZE + 2 + request.traceBytes().length; + } + public Long getSeqid() { return seqid; } @@ -194,8 +234,8 @@ public class SncpHeader { return serviceid; } - public int getServiceVersion() { - return serviceVersion; + public int getSncpVersion() { + return sncpVersion; } public Uint128 getActionid() { @@ -210,12 +250,20 @@ public class SncpHeader { return addrPort; } - public int getBodyLength() { - return bodyLength; + public long getTimestamp() { + return timestamp; + } + + public String getTraceid() { + return traceid; } public int getRetcode() { return retcode; } + public int getBodyLength() { + return bodyLength; + } + } diff --git a/src/main/java/org/redkale/net/sncp/SncpRemoteInfo.java b/src/main/java/org/redkale/net/sncp/SncpRemoteInfo.java index 466e5171b..e909478a2 100644 --- a/src/main/java/org/redkale/net/sncp/SncpRemoteInfo.java +++ b/src/main/java/org/redkale/net/sncp/SncpRemoteInfo.java @@ -15,7 +15,7 @@ import org.redkale.convert.*; import org.redkale.convert.json.JsonConvert; import org.redkale.mq.*; import static org.redkale.net.sncp.Sncp.loadMethodActions; -import static org.redkale.net.sncp.SncpHeader.HEADER_SIZE; +import static org.redkale.net.sncp.SncpHeader.HEADER_SUBSIZE; import org.redkale.service.*; import org.redkale.util.*; @@ -88,7 +88,7 @@ public class SncpRemoteInfo { this.topic = messageAgent == null ? null : messageAgent.generateSncpReqTopic(resourceName, resourceType); for (Map.Entry en : loadMethodActions(Sncp.getServiceType(serviceImplClass)).entrySet()) { - this.actions.put(en.getKey().toString(), new SncpRemoteAction(serviceImplClass, resourceType, en.getValue(), serviceid, en.getKey())); + this.actions.put(en.getKey().toString(), new SncpRemoteAction(serviceImplClass, resourceType, en.getValue(), serviceid, en.getKey(), sncpClient)); } } @@ -176,11 +176,11 @@ public class SncpRemoteInfo { return null; } ByteBuffer buffer = ByteBuffer.wrap(msg.getContent()); - SncpHeader header = new SncpHeader(); - int headerSize = header.read(buffer); - if (headerSize != HEADER_SIZE) { - throw new SncpException("sncp header length must be " + HEADER_SIZE + ", but is " + headerSize); + int headerSize = buffer.getChar(); + if (headerSize <= HEADER_SUBSIZE) { + throw new SncpException("sncp header length must more " + HEADER_SUBSIZE + ", but is " + headerSize); } + SncpHeader header = SncpHeader.read(buffer, headerSize); if (!header.checkValid(action.header)) { throw new SncpException("sncp header error, response-header:" + action.header + "+, response-header:" + header); } @@ -331,7 +331,7 @@ public class SncpRemoteInfo { protected final SncpHeader header; @SuppressWarnings("unchecked") - SncpRemoteAction(final Class serviceImplClass, Class resourceType, Method method, Uint128 serviceid, Uint128 actionid) { + SncpRemoteAction(final Class serviceImplClass, Class resourceType, Method method, Uint128 serviceid, Uint128 actionid, final SncpClient sncpClient) { this.actionid = actionid == null ? Sncp.actionid(method) : actionid; Type rt = TypeToken.getGenericType(method.getGenericReturnType(), serviceImplClass); this.returnObjectType = rt == void.class || rt == Void.class ? null : rt; @@ -424,7 +424,7 @@ public class SncpRemoteInfo { this.paramHandlerClass = handlerFuncClass; this.paramHandlerResultType = handlerResultType; this.paramHandlerAttachIndex = handlerAttachIndex; - this.header = new SncpHeader(null, serviceid, resourceType.getName(), actionid, method.getName()); + this.header = SncpHeader.create(sncpClient == null ? null : sncpClient.getClientSncpAddress(), serviceid, resourceType.getName(), actionid, method.getName()); if (this.paramHandlerIndex >= 0 && method.getReturnType() != void.class) { throw new SncpException(method + " have CompletionHandler type parameter but return type is not void"); } diff --git a/src/main/java/org/redkale/net/sncp/SncpRequest.java b/src/main/java/org/redkale/net/sncp/SncpRequest.java index bf3796ce8..c1874111b 100644 --- a/src/main/java/org/redkale/net/sncp/SncpRequest.java +++ b/src/main/java/org/redkale/net/sncp/SncpRequest.java @@ -7,13 +7,14 @@ package org.redkale.net.sncp; import java.io.Serializable; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Objects; import java.util.logging.Level; import org.redkale.convert.*; import org.redkale.convert.bson.BsonReader; import org.redkale.net.Request; -import static org.redkale.net.sncp.SncpHeader.HEADER_SIZE; -import org.redkale.util.Uint128; +import static org.redkale.net.client.ClientRequest.EMPTY_TRACEID; +import org.redkale.util.*; /** * @@ -36,6 +37,8 @@ public class SncpRequest extends Request { protected int readState = READ_STATE_ROUTE; + private int headerSize; + private SncpHeader header; private int bodyOffset; @@ -50,17 +53,21 @@ public class SncpRequest extends Request { @Override //request.header与response.header数据格式保持一致 protected int readHeader(ByteBuffer buffer, Request last) { - //---------------------head---------------------------------- + //---------------------route---------------------------------- if (this.readState == READ_STATE_ROUTE) { - if (buffer.remaining() < HEADER_SIZE) { - return HEADER_SIZE - buffer.remaining(); //小于60 + if (buffer.remaining() < 2) { + return 2 - buffer.remaining(); //小于2 } - this.header = new SncpHeader(); - int headerSize = this.header.read(buffer); - if (headerSize != HEADER_SIZE) { - context.getLogger().log(Level.WARNING, "sncp buffer header.length not " + HEADER_SIZE + ", but " + headerSize); + this.headerSize = buffer.getChar(); + if (headerSize < SncpHeader.HEADER_SUBSIZE) { + context.getLogger().log(Level.WARNING, "sncp buffer header.length must more " + SncpHeader.HEADER_SUBSIZE + ", but " + this.headerSize); return -1; } + this.readState = READ_STATE_HEADER; + } + //---------------------head---------------------------------- + if (this.readState == READ_STATE_HEADER) { + this.header = SncpHeader.read(buffer, this.headerSize); if (this.header.getRetcode() != 0) { // retcode context.getLogger().log(Level.WARNING, "sncp buffer header.retcode not 0"); return -1; @@ -108,7 +115,9 @@ public class SncpRequest extends Request { @Override public String toString() { - return SncpRequest.class.getSimpleName() + "_" + Objects.hashCode(this) + "{header=" + this.header + ",bodyOffset=" + this.bodyOffset + ",body=[" + (this.body == null ? -1 : this.body.length) + "]}"; + return SncpRequest.class.getSimpleName() + "_" + Objects.hashCode(this) + + "{header=" + this.header + ",bodyOffset=" + this.bodyOffset + + ",body=[" + (this.body == null ? -1 : this.body.length) + "]}"; } @Override @@ -134,6 +143,10 @@ public class SncpRequest extends Request { return body == null ? null : reader.setBytes(body); } + public byte[] traceBytes() { + return Utility.isEmpty(traceid) ? EMPTY_TRACEID : traceid.getBytes(StandardCharsets.UTF_8); + } + public byte[] getBody() { return body; } diff --git a/src/main/java/org/redkale/net/sncp/SncpResponse.java b/src/main/java/org/redkale/net/sncp/SncpResponse.java index dcf2349ae..110ca7cd5 100644 --- a/src/main/java/org/redkale/net/sncp/SncpResponse.java +++ b/src/main/java/org/redkale/net/sncp/SncpResponse.java @@ -10,7 +10,6 @@ import java.nio.channels.CompletionHandler; import java.util.concurrent.*; import org.redkale.convert.bson.BsonWriter; import org.redkale.net.Response; -import static org.redkale.net.sncp.SncpHeader.HEADER_SIZE; import org.redkale.util.ByteArray; /** @@ -30,14 +29,12 @@ public class SncpResponse extends Response { public static final int RETCODE_THROWEXCEPTION = (1 << 4); //内部异常 - private final byte[] addrBytes; + final byte[] addrBytes; - private final int addrPort; + final int addrPort; protected final BsonWriter writer = new BsonWriter(); - protected final ByteArray onlyHeaderData = new ByteArray(HEADER_SIZE).putPlaceholder(HEADER_SIZE); - protected final CompletionHandler realHandler = new CompletionHandler() { @Override public void completed(Object result, Object attachment) { @@ -106,6 +103,14 @@ public class SncpResponse extends Response { return writer; } + protected SncpRequest request() { + return request; + } + + protected void writeHeader(ByteArray array, int bodyLength, int retcode) { + request.getHeader().writeTo(array, this, bodyLength, retcode); + } + @Override protected ExecutorService getWorkExecutor() { return super.getWorkExecutor(); @@ -127,8 +132,9 @@ public class SncpResponse extends Response { } public final void finishVoid() { + int headerSize = SncpHeader.calcHeaderSize(request); BsonWriter out = getBsonWriter(); - out.writePlaceholderTo(HEADER_SIZE); + out.writePlaceholderTo(headerSize); finish(0, out); } @@ -153,8 +159,9 @@ public class SncpResponse extends Response { } public final void finish(final Type type, final Object result) { + int headerSize = SncpHeader.calcHeaderSize(request); BsonWriter out = getBsonWriter(); - out.writePlaceholderTo(HEADER_SIZE); + out.writePlaceholderTo(headerSize); if (result != null || type != Void.class) { out.writeByte((byte) 0); //body的第一个字节为0,表示返回结果对象,而不是参数回调对象 context.getBsonConvert().convertTo(out, type, result); @@ -162,23 +169,19 @@ public class SncpResponse extends Response { finish(0, out); } - //调用此方法时out已写入SncpHeader + //调用此方法时out已写入SncpHeader的占位空间 public void finish(final int retcode, final BsonWriter out) { + int headerSize = SncpHeader.calcHeaderSize(request); if (out == null) { - final ByteArray array = onlyHeaderData; - fillHeader(array, 0, retcode); + final ByteArray array = new ByteArray(headerSize).putPlaceholder(headerSize); + writeHeader(array, 0, retcode); finish(array); return; } final ByteArray array = out.toByteArray(); - final int bodyLength = array.length() - HEADER_SIZE; - fillHeader(array, bodyLength, retcode); + final int bodyLength = array.length() - headerSize; + writeHeader(array, bodyLength, retcode); finish(array); } - protected void fillHeader(ByteArray array, int bodyLength, int retcode) { - SncpHeader header = request.getHeader(); - header.writeTo(array, this.addrBytes, this.addrPort, header.getSeqid(), bodyLength, retcode); - } - } diff --git a/src/test/java/org/redkale/test/sncp/SncpClientCodecTest.java b/src/test/java/org/redkale/test/sncp/SncpClientCodecTest.java index a5dea575e..979920a1f 100644 --- a/src/test/java/org/redkale/test/sncp/SncpClientCodecTest.java +++ b/src/test/java/org/redkale/test/sncp/SncpClientCodecTest.java @@ -43,22 +43,46 @@ public class SncpClientCodecTest { } catch (Exception e) { e.printStackTrace(); } - //---------------------------------------------- ByteBuffer realBuf; + //---------------------------------------------- + respResults.clear(); { - SncpHeader header = new SncpHeader(sncpAddress, Uint128.ZERO, "", Uint128.ZERO, ""); + SncpHeader header = SncpHeader.create(sncpAddress, Uint128.ZERO, "", Uint128.ZERO, ""); SncpClientRequest request = new SncpClientRequest(); - ByteArray writeArray = new ByteArray(); - request.prepare(header, 1, "", new byte[20]); + ByteArray writeArray1 = new ByteArray(); + request.prepare(header, 1, "aa", new byte[20]); + request.writeTo(conn, writeArray1); System.out.println("request.1 = " + request); - writeArray.put(new byte[SncpHeader.HEADER_SIZE]); - request.writeTo(conn, writeArray); - request.prepare(header, 2, "", new byte[25]); + System.out.println("headerSize = " + SncpHeader.calcHeaderSize(request) + ", arraySzie = " + writeArray1.getBytes().length); + ByteArray writeArray2 = new ByteArray(); + request.prepare(header, 2, "bb", new byte[25]); + request.writeTo(conn, writeArray2); System.out.println("request.2 = " + request); - writeArray.put(new byte[SncpHeader.HEADER_SIZE]); - request.writeTo(conn, writeArray); - System.out.println(writeArray.getBytes().length); - realBuf = ByteBuffer.wrap(writeArray.getBytes()); + System.out.println("headerSize = " + SncpHeader.calcHeaderSize(request) + ", arraySzie = " + writeArray2.getBytes().length); + writeArray1.put(writeArray2); + realBuf = ByteBuffer.wrap(writeArray1.getBytes()); + } + System.out.println("sncp.realBuf = " + realBuf.remaining()); + codec.decodeMessages(realBuf, new ByteArray()); + System.out.println("respResults.size = " + respResults.size()); + Assertions.assertEquals(2, respResults.size()); + //---------------------------------------------- + respResults.clear(); + { + SncpHeader header = SncpHeader.create(sncpAddress, Uint128.ZERO, "", Uint128.ZERO, ""); + SncpClientRequest request = new SncpClientRequest(); + ByteArray writeArray1 = new ByteArray(); + request.prepare(header, 1, "", new byte[20]); + request.writeTo(conn, writeArray1); + System.out.println("request.1 = " + request); + System.out.println("headerSize = " + SncpHeader.calcHeaderSize(request) + ", arraySzie = " + writeArray1.getBytes().length); + ByteArray writeArray2 = new ByteArray(); + request.prepare(header, 2, "", new byte[25]); + request.writeTo(conn, writeArray2); + System.out.println("request.2 = " + request); + System.out.println("headerSize = " + SncpHeader.calcHeaderSize(request) + ", arraySzie = " + writeArray2.getBytes().length); + writeArray1.put(writeArray2); + realBuf = ByteBuffer.wrap(writeArray1.getBytes()); } System.out.println("sncp.realBuf = " + realBuf.remaining()); codec.decodeMessages(realBuf, new ByteArray());