diff --git a/src/main/java/org/redkale/net/sncp/SncpRequest.java b/src/main/java/org/redkale/net/sncp/SncpRequest.java index c1874111b..3fb743b75 100644 --- a/src/main/java/org/redkale/net/sncp/SncpRequest.java +++ b/src/main/java/org/redkale/net/sncp/SncpRequest.java @@ -39,9 +39,9 @@ public class SncpRequest extends Request { private int headerSize; - private SncpHeader header; + private ByteArray halfArray; - private int bodyOffset; + private SncpHeader header; private boolean ping; @@ -55,11 +55,27 @@ public class SncpRequest extends Request { protected int readHeader(ByteBuffer buffer, Request last) { //---------------------route---------------------------------- if (this.readState == READ_STATE_ROUTE) { - if (buffer.remaining() < 2) { - return 2 - buffer.remaining(); //小于2 + int remain = buffer.remaining(); + int expect = halfArray == null ? 2 : 2 - halfArray.length(); + if (remain < expect) { + if (remain == 1) { + if (halfArray == null) { + halfArray = new ByteArray(); + } + halfArray.clear().put(buffer.get()); + } + buffer.clear(); + return expect - remain; //小于2 + } else { + if (halfArray == null) { + this.headerSize = buffer.getChar(); + } else { + halfArray.put(buffer.get()); + this.headerSize = halfArray.getChar(0); + halfArray.clear(); + } } - this.headerSize = buffer.getChar(); - if (headerSize < SncpHeader.HEADER_SUBSIZE) { + if (this.headerSize < SncpHeader.HEADER_SUBSIZE) { context.getLogger().log(Level.WARNING, "sncp buffer header.length must more " + SncpHeader.HEADER_SUBSIZE + ", but " + this.headerSize); return -1; } @@ -67,7 +83,23 @@ public class SncpRequest extends Request { } //---------------------head---------------------------------- if (this.readState == READ_STATE_HEADER) { - this.header = SncpHeader.read(buffer, this.headerSize); + int remain = buffer.remaining(); + int expect = halfArray == null ? this.headerSize - 2 : this.headerSize - 2 - halfArray.length(); + if (remain < expect) { + if (halfArray == null) { + halfArray = new ByteArray(); + } + halfArray.put(buffer); + buffer.clear(); + return expect - remain; + } + if (halfArray == null || halfArray.length() == 0) { + this.header = SncpHeader.read(buffer, this.headerSize); + } else { + halfArray.put(buffer, expect); + this.header = SncpHeader.read(halfArray, this.headerSize); + halfArray.clear(); + } if (this.header.getRetcode() != 0) { // retcode context.getLogger().log(Level.WARNING, "sncp buffer header.retcode not 0"); return -1; @@ -76,29 +108,41 @@ public class SncpRequest extends Request { context.getLogger().log(Level.WARNING, "sncp buffer body.length must lower " + context.getMaxBody() + ", but " + this.header.getBodyLength()); return -1; } - this.body = new byte[this.header.getBodyLength()]; + this.traceid = this.header.getTraceid(); this.readState = READ_STATE_BODY; } //---------------------body---------------------------------- if (this.readState == READ_STATE_BODY) { int bodyLength = this.header.getBodyLength(); if (bodyLength == 0) { + this.body = new byte[0]; this.readState = READ_STATE_END; + halfArray = null; if (this.header.getSeqid() == 0 && this.header.getServiceid() == Uint128.ZERO && this.header.getActionid() == Uint128.ZERO) { this.ping = true; } return 0; } - int len = Math.min(bodyLength - this.bodyOffset, buffer.remaining()); - buffer.get(body, this.bodyOffset, len); - this.bodyOffset += len; - int rs = bodyLength - this.bodyOffset; - if (rs == 0) { - this.readState = READ_STATE_END; - } else { + int remain = buffer.remaining(); + int expect = halfArray == null ? bodyLength : bodyLength - halfArray.length(); + if (remain < expect) { + if (halfArray == null) { + halfArray = new ByteArray(); + } + halfArray.put(buffer); buffer.clear(); + return expect - remain; } - return rs; + if (halfArray == null || halfArray.length() == 0) { + this.body = new byte[bodyLength]; + buffer.get(body); + } else { + halfArray.put(buffer, expect); + this.body = halfArray.getBytes(); + } + this.readState = READ_STATE_END; + halfArray = null; + return 0; } return 0; } @@ -116,8 +160,7 @@ public class SncpRequest extends Request { @Override public String toString() { return SncpRequest.class.getSimpleName() + "_" + Objects.hashCode(this) - + "{header=" + this.header + ",bodyOffset=" + this.bodyOffset - + ",body=[" + (this.body == null ? -1 : this.body.length) + "]}"; + + "{header=" + this.header + ",body=[" + (this.body == null ? -1 : this.body.length) + "]}"; } @Override @@ -125,7 +168,7 @@ public class SncpRequest extends Request { this.reader.clear(); this.readState = READ_STATE_ROUTE; this.header = null; - this.bodyOffset = 0; + this.halfArray = null; this.body = null; this.ping = false; super.recycle(); diff --git a/src/test/java/org/redkale/test/sncp/SncpRequestParseTest.java b/src/test/java/org/redkale/test/sncp/SncpRequestParseTest.java new file mode 100644 index 000000000..14be31ebe --- /dev/null +++ b/src/test/java/org/redkale/test/sncp/SncpRequestParseTest.java @@ -0,0 +1,80 @@ +/* + * + */ +package org.redkale.test.sncp; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.logging.Logger; +import org.junit.jupiter.api.*; +import org.redkale.net.*; +import org.redkale.net.client.ClientAddress; +import org.redkale.net.sncp.*; +import org.redkale.util.*; + +/** + * + * @author zhangjx + */ +public class SncpRequestParseTest { + + private boolean main; + + public static void main(String[] args) throws Throwable { + SncpRequestParseTest test = new SncpRequestParseTest(); + test.main = true; + test.run(); + } + + @Test + public void run() throws Exception { + InetSocketAddress sncpAddress = new InetSocketAddress("127.0.0.1", 3389); + InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.1", 3344); + final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16); + SncpClient client = new SncpClient("test", asyncGroup, sncpAddress, new ClientAddress(remoteAddress), "TCP", Utility.cpus(), 16); + SncpClientConnection conn = client.createClientConnection(1, asyncGroup.newTCPClientConnection()); + + SncpContext.SncpContextConfig config = new SncpContext.SncpContextConfig(); + config.logger = Logger.getLogger(SncpRequestParseTest.class.getSimpleName()); + config.serverAddress = sncpAddress; + config.maxBody = 1024 * 1024 * 1024; + SncpContext context = new SncpContext(config); + + SncpHeader header = SncpHeader.create(sncpAddress, Uint128.ZERO, "", Uint128.ZERO, ""); + SncpClientRequest clientRequest = new SncpClientRequest(); + ByteArray writeArray = new ByteArray(); + clientRequest.prepare(header, 1, "aa", new byte[20]); + clientRequest.writeTo(conn, writeArray); + byte[] bs = writeArray.getBytes(); + int headerSize = SncpHeader.calcHeaderSize(clientRequest); + System.out.println("整个sncp请求长度: " + bs.length + "." + Arrays.toString(bs)); + System.out.println(" " + Arrays.toString(Arrays.copyOfRange(bs, 2, bs.length))); + + SncpRequestTest request = new SncpRequestTest(context); + Assertions.assertEquals(1, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 0, 1)), null)); + Assertions.assertEquals(headerSize - 2, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 1, 2)), null)); + Assertions.assertEquals(0, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 2, bs.length)), null)); + Assertions.assertEquals("aa", request.getHeader().getTraceid()); + + System.out.println("测试第二段"); + request = new SncpRequestTest(context); + Assertions.assertEquals(1, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 0, 1)), null)); + Assertions.assertEquals(headerSize - 2, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 1, 2)), null)); + Assertions.assertEquals(headerSize - headerSize / 2, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 2, headerSize / 2)), null)); + Assertions.assertEquals(0, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, headerSize / 2, bs.length)), null)); + Assertions.assertEquals("aa", request.getHeader().getTraceid()); + } + + public static class SncpRequestTest extends SncpRequest { + + protected SncpRequestTest(SncpContext context) { + super(context); + } + + @Override + protected int readHeader(ByteBuffer buffer, Request last) { + return super.readHeader(buffer, last); + } + } +}