SncpRequest优化
This commit is contained in:
@@ -39,9 +39,9 @@ public class SncpRequest extends Request<SncpContext> {
|
||||
|
||||
private int headerSize;
|
||||
|
||||
private SncpHeader header;
|
||||
private ByteArray halfArray;
|
||||
|
||||
private int bodyOffset;
|
||||
private SncpHeader header;
|
||||
|
||||
private boolean ping;
|
||||
|
||||
@@ -55,11 +55,27 @@ public class SncpRequest extends Request<SncpContext> {
|
||||
protected int readHeader(ByteBuffer buffer, Request last) {
|
||||
//---------------------route----------------------------------
|
||||
if (this.readState == READ_STATE_ROUTE) {
|
||||
if (buffer.remaining() < 2) {
|
||||
return 2 - buffer.remaining(); //小于2
|
||||
int remain = buffer.remaining();
|
||||
int expect = halfArray == null ? 2 : 2 - halfArray.length();
|
||||
if (remain < expect) {
|
||||
if (remain == 1) {
|
||||
if (halfArray == null) {
|
||||
halfArray = new ByteArray();
|
||||
}
|
||||
halfArray.clear().put(buffer.get());
|
||||
}
|
||||
buffer.clear();
|
||||
return expect - remain; //小于2
|
||||
} else {
|
||||
if (halfArray == null) {
|
||||
this.headerSize = buffer.getChar();
|
||||
} else {
|
||||
halfArray.put(buffer.get());
|
||||
this.headerSize = halfArray.getChar(0);
|
||||
halfArray.clear();
|
||||
}
|
||||
}
|
||||
this.headerSize = buffer.getChar();
|
||||
if (headerSize < SncpHeader.HEADER_SUBSIZE) {
|
||||
if (this.headerSize < SncpHeader.HEADER_SUBSIZE) {
|
||||
context.getLogger().log(Level.WARNING, "sncp buffer header.length must more " + SncpHeader.HEADER_SUBSIZE + ", but " + this.headerSize);
|
||||
return -1;
|
||||
}
|
||||
@@ -67,7 +83,23 @@ public class SncpRequest extends Request<SncpContext> {
|
||||
}
|
||||
//---------------------head----------------------------------
|
||||
if (this.readState == READ_STATE_HEADER) {
|
||||
this.header = SncpHeader.read(buffer, this.headerSize);
|
||||
int remain = buffer.remaining();
|
||||
int expect = halfArray == null ? this.headerSize - 2 : this.headerSize - 2 - halfArray.length();
|
||||
if (remain < expect) {
|
||||
if (halfArray == null) {
|
||||
halfArray = new ByteArray();
|
||||
}
|
||||
halfArray.put(buffer);
|
||||
buffer.clear();
|
||||
return expect - remain;
|
||||
}
|
||||
if (halfArray == null || halfArray.length() == 0) {
|
||||
this.header = SncpHeader.read(buffer, this.headerSize);
|
||||
} else {
|
||||
halfArray.put(buffer, expect);
|
||||
this.header = SncpHeader.read(halfArray, this.headerSize);
|
||||
halfArray.clear();
|
||||
}
|
||||
if (this.header.getRetcode() != 0) { // retcode
|
||||
context.getLogger().log(Level.WARNING, "sncp buffer header.retcode not 0");
|
||||
return -1;
|
||||
@@ -76,29 +108,41 @@ public class SncpRequest extends Request<SncpContext> {
|
||||
context.getLogger().log(Level.WARNING, "sncp buffer body.length must lower " + context.getMaxBody() + ", but " + this.header.getBodyLength());
|
||||
return -1;
|
||||
}
|
||||
this.body = new byte[this.header.getBodyLength()];
|
||||
this.traceid = this.header.getTraceid();
|
||||
this.readState = READ_STATE_BODY;
|
||||
}
|
||||
//---------------------body----------------------------------
|
||||
if (this.readState == READ_STATE_BODY) {
|
||||
int bodyLength = this.header.getBodyLength();
|
||||
if (bodyLength == 0) {
|
||||
this.body = new byte[0];
|
||||
this.readState = READ_STATE_END;
|
||||
halfArray = null;
|
||||
if (this.header.getSeqid() == 0 && this.header.getServiceid() == Uint128.ZERO && this.header.getActionid() == Uint128.ZERO) {
|
||||
this.ping = true;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
int len = Math.min(bodyLength - this.bodyOffset, buffer.remaining());
|
||||
buffer.get(body, this.bodyOffset, len);
|
||||
this.bodyOffset += len;
|
||||
int rs = bodyLength - this.bodyOffset;
|
||||
if (rs == 0) {
|
||||
this.readState = READ_STATE_END;
|
||||
} else {
|
||||
int remain = buffer.remaining();
|
||||
int expect = halfArray == null ? bodyLength : bodyLength - halfArray.length();
|
||||
if (remain < expect) {
|
||||
if (halfArray == null) {
|
||||
halfArray = new ByteArray();
|
||||
}
|
||||
halfArray.put(buffer);
|
||||
buffer.clear();
|
||||
return expect - remain;
|
||||
}
|
||||
return rs;
|
||||
if (halfArray == null || halfArray.length() == 0) {
|
||||
this.body = new byte[bodyLength];
|
||||
buffer.get(body);
|
||||
} else {
|
||||
halfArray.put(buffer, expect);
|
||||
this.body = halfArray.getBytes();
|
||||
}
|
||||
this.readState = READ_STATE_END;
|
||||
halfArray = null;
|
||||
return 0;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
@@ -116,8 +160,7 @@ public class SncpRequest extends Request<SncpContext> {
|
||||
@Override
|
||||
public String toString() {
|
||||
return SncpRequest.class.getSimpleName() + "_" + Objects.hashCode(this)
|
||||
+ "{header=" + this.header + ",bodyOffset=" + this.bodyOffset
|
||||
+ ",body=[" + (this.body == null ? -1 : this.body.length) + "]}";
|
||||
+ "{header=" + this.header + ",body=[" + (this.body == null ? -1 : this.body.length) + "]}";
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -125,7 +168,7 @@ public class SncpRequest extends Request<SncpContext> {
|
||||
this.reader.clear();
|
||||
this.readState = READ_STATE_ROUTE;
|
||||
this.header = null;
|
||||
this.bodyOffset = 0;
|
||||
this.halfArray = null;
|
||||
this.body = null;
|
||||
this.ping = false;
|
||||
super.recycle();
|
||||
|
||||
@@ -0,0 +1,80 @@
|
||||
/*
|
||||
*
|
||||
*/
|
||||
package org.redkale.test.sncp;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.logging.Logger;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.redkale.net.*;
|
||||
import org.redkale.net.client.ClientAddress;
|
||||
import org.redkale.net.sncp.*;
|
||||
import org.redkale.util.*;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author zhangjx
|
||||
*/
|
||||
public class SncpRequestParseTest {
|
||||
|
||||
private boolean main;
|
||||
|
||||
public static void main(String[] args) throws Throwable {
|
||||
SncpRequestParseTest test = new SncpRequestParseTest();
|
||||
test.main = true;
|
||||
test.run();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void run() throws Exception {
|
||||
InetSocketAddress sncpAddress = new InetSocketAddress("127.0.0.1", 3389);
|
||||
InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.1", 3344);
|
||||
final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16);
|
||||
SncpClient client = new SncpClient("test", asyncGroup, sncpAddress, new ClientAddress(remoteAddress), "TCP", Utility.cpus(), 16);
|
||||
SncpClientConnection conn = client.createClientConnection(1, asyncGroup.newTCPClientConnection());
|
||||
|
||||
SncpContext.SncpContextConfig config = new SncpContext.SncpContextConfig();
|
||||
config.logger = Logger.getLogger(SncpRequestParseTest.class.getSimpleName());
|
||||
config.serverAddress = sncpAddress;
|
||||
config.maxBody = 1024 * 1024 * 1024;
|
||||
SncpContext context = new SncpContext(config);
|
||||
|
||||
SncpHeader header = SncpHeader.create(sncpAddress, Uint128.ZERO, "", Uint128.ZERO, "");
|
||||
SncpClientRequest clientRequest = new SncpClientRequest();
|
||||
ByteArray writeArray = new ByteArray();
|
||||
clientRequest.prepare(header, 1, "aa", new byte[20]);
|
||||
clientRequest.writeTo(conn, writeArray);
|
||||
byte[] bs = writeArray.getBytes();
|
||||
int headerSize = SncpHeader.calcHeaderSize(clientRequest);
|
||||
System.out.println("整个sncp请求长度: " + bs.length + "." + Arrays.toString(bs));
|
||||
System.out.println(" " + Arrays.toString(Arrays.copyOfRange(bs, 2, bs.length)));
|
||||
|
||||
SncpRequestTest request = new SncpRequestTest(context);
|
||||
Assertions.assertEquals(1, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 0, 1)), null));
|
||||
Assertions.assertEquals(headerSize - 2, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 1, 2)), null));
|
||||
Assertions.assertEquals(0, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 2, bs.length)), null));
|
||||
Assertions.assertEquals("aa", request.getHeader().getTraceid());
|
||||
|
||||
System.out.println("测试第二段");
|
||||
request = new SncpRequestTest(context);
|
||||
Assertions.assertEquals(1, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 0, 1)), null));
|
||||
Assertions.assertEquals(headerSize - 2, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 1, 2)), null));
|
||||
Assertions.assertEquals(headerSize - headerSize / 2, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 2, headerSize / 2)), null));
|
||||
Assertions.assertEquals(0, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, headerSize / 2, bs.length)), null));
|
||||
Assertions.assertEquals("aa", request.getHeader().getTraceid());
|
||||
}
|
||||
|
||||
public static class SncpRequestTest extends SncpRequest {
|
||||
|
||||
protected SncpRequestTest(SncpContext context) {
|
||||
super(context);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int readHeader(ByteBuffer buffer, Request last) {
|
||||
return super.readHeader(buffer, last);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user