Request.pipelineHeaderLength

This commit is contained in:
redkale
2024-09-12 16:35:16 +08:00
parent c370c885e6
commit 1035052f02
7 changed files with 57 additions and 109 deletions

View File

@@ -23,6 +23,6 @@ public class SncpMessageRequest extends SncpRequest {
super(context);
this.message = message;
this.createTime = System.currentTimeMillis();
readHeader(ByteBuffer.wrap(message.getContent()), null);
readHeader(ByteBuffer.wrap(message.getContent()), -1);
}
}

View File

@@ -84,7 +84,7 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
buffer.flip();
final Response response = createResponse();
try {
decode(buffer, response, 0, null);
decode(buffer, response, 0, -1);
} catch (Throwable t) { // 此处不可 context.offerBuffer(buffer); 以免dispatcher.dispatch内部异常导致重复 offerBuffer
context.logger.log(Level.WARNING, "dispatch servlet abort, force to close channel ", t);
response.codecError(t);
@@ -106,7 +106,7 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
if (data != null) { // pipeline模式或UDP连接创建AsyncConnection时已经获取到ByteBuffer数据了
final Response response = createResponse();
try {
decode(data, response, 0, null);
decode(data, response, 0, -1);
} catch (Throwable t) {
context.logger.log(Level.WARNING, "dispatch servlet abort, force to close channel ", t);
response.codecError(t);
@@ -127,7 +127,7 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
if (data != null) { // pipeline模式或UDP连接创建AsyncConnection时已经获取到ByteBuffer数据了
final Response response = createResponse();
try {
decode(data, response, 0, null);
decode(data, response, 0, -1);
} catch (Throwable t) {
context.logger.log(Level.WARNING, "dispatch servlet abort, force to close channel ", t);
response.codecError(t);
@@ -144,10 +144,10 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
}
}
protected void decode(ByteBuffer buffer, Response response, int pipelineIndex, Request lastReq) {
protected void decode(ByteBuffer buffer, Response response, int pipelineIndex, final int pipelineHeaderLength) {
response.init(channel);
final Request request = response.request;
final int rs = request.readHeader(buffer, lastReq);
final int rs = request.readHeader(buffer, pipelineHeaderLength);
if (rs < 0) { // 表示数据格式不正确
final DispatcherServlet dispatcher = context.dispatcher;
dispatcher.incrExecuteCounter();
@@ -162,7 +162,7 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
} else if (rs == 0) {
context.dispatcher.incrExecuteCounter();
int pindex = pipelineIndex;
Request hreq = lastReq;
int plength = pipelineHeaderLength;
if (buffer.hasRemaining()) { // pipeline模式
if (pindex == 0) {
pindex++;
@@ -170,13 +170,13 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
if (request.getRequestid() == null) { // 存在requestid则无视pipeline模式
request.pipeline(pindex, pindex + 1);
}
if (hreq == null) {
hreq = request.copyHeader();
if (plength < 0) {
plength = request.pipelineHeaderLength();
}
context.executeDispatch(request, response);
final Response pipelineResponse = createResponse();
try {
decode(buffer, pipelineResponse, pindex + 1, hreq);
decode(buffer, pipelineResponse, pindex + 1, plength);
} catch (Throwable t) { // 此处不可 offerBuffer(buffer); 以免dispatcher.dispatch内部异常导致重复 offerBuffer
context.logger.log(Level.WARNING, "dispatch pipeline servlet abort, force to close channel ", t);
pipelineResponse.codecError(t);
@@ -193,7 +193,7 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
}
} else { // rs > 0
channel.setReadBuffer(buffer);
channel.read(readHandler.prepare(request, response, pipelineIndex, lastReq));
channel.read(readHandler.prepare(request, response, pipelineIndex, pipelineHeaderLength));
}
}
@@ -205,13 +205,14 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
private int pipelineIndex;
private Request lastReq;
private int pipelineHeaderLength;
public ReadCompletionHandler prepare(Request request, Response response, int pipelineIndex, Request lastReq) {
public ReadCompletionHandler prepare(
Request request, Response response, int pipelineIndex, int pipelineHeaderLength) {
this.request = request;
this.response = response;
this.pipelineIndex = pipelineIndex;
this.lastReq = lastReq;
this.pipelineHeaderLength = pipelineHeaderLength;
return this;
}
@@ -223,7 +224,7 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
return;
}
attachment.flip();
decode(attachment, response, pipelineIndex, lastReq);
decode(attachment, response, pipelineIndex, pipelineHeaderLength);
}
@Override

View File

@@ -81,8 +81,8 @@ public abstract class Request<C extends Context> {
this.channel = request.channel;
}
protected Request copyHeader() {
return null;
protected int pipelineHeaderLength() {
return -1;
}
protected Request pipeline(int pipelineIndex, int pipelineCount) {
@@ -95,10 +95,10 @@ public abstract class Request<C extends Context> {
* 返回值Integer.MIN_VALUE: 帧数据; -1数据不合法 0解析完毕 &gt;0: 需再读取的字节数。
*
* @param buffer ByteBuffer对象
* @param last 同一Channel的上一个Request
* @param pipelineHeaderLength 同一Channel的pipelien模式下上一个Request的header长度
* @return 缺少的字节数
*/
protected abstract int readHeader(ByteBuffer buffer, Request last);
protected abstract int readHeader(ByteBuffer buffer, int pipelineHeaderLength);
protected abstract Serializable getRequestid();

View File

@@ -417,8 +417,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
}
}
public void finish(
boolean kill, final byte[] bs1, int offset1, int length1, final byte[] bs2, int offset2, int length2) {
public void finish(boolean kill, byte[] bs1, int offset1, int length1, byte[] bs2, int offset2, int length2) {
if (kill) {
refuseAlive();
}
@@ -517,7 +516,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
}
}
protected <A> void send(final ByteBuffer buffer, final A attachment, final CompletionHandler<Integer, A> handler) {
protected <A> void send(ByteBuffer buffer, A attachment, CompletionHandler<Integer, A> handler) {
this.channel.write(buffer, attachment, new CompletionHandler<Integer, A>() {
@Override
@@ -546,7 +545,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
});
}
protected <A> void send(final ByteBuffer[] buffers, A attachment, final CompletionHandler<Integer, A> handler) {
protected <A> void send(ByteBuffer[] buffers, A attachment, CompletionHandler<Integer, A> handler) {
this.channel.write(buffers, attachment, new CompletionHandler<Integer, A>() {
@Override

View File

@@ -326,7 +326,12 @@ public class HttpRequest extends Request<HttpContext> {
}
@Override
protected int readHeader(final ByteBuffer buf, final Request last) {
protected int pipelineHeaderLength() {
return (!context.sameHeader || !context.lazyHeader) ? -1 : headerLength;
}
@Override
protected int readHeader(final ByteBuffer buf, final int pipelineHeaderLength) {
final ByteBuffer buffer = buf;
ByteArray bytes = bodyBytes;
if (this.readState == READ_STATE_ROUTE) {
@@ -337,41 +342,19 @@ public class HttpRequest extends Request<HttpContext> {
this.readState = READ_STATE_HEADER;
}
if (this.readState == READ_STATE_HEADER) {
if (last != null && ((HttpRequest) last).headerLength > 0) {
final HttpRequest httpLast = (HttpRequest) last;
if (pipelineHeaderLength > 0) {
int bufremain = buffer.remaining();
int remainHalf = httpLast.headerLength - this.headerHalfLen;
int remainHalf = pipelineHeaderLength - this.headerHalfLen;
if (remainHalf > bufremain) {
bytes.put(buffer);
this.headerHalfLen += bufremain;
buffer.clear();
return 1;
}
buffer.position(buffer.position() + remainHalf);
this.contentType = httpLast.contentType;
this.contentLength = httpLast.contentLength;
this.contentEncoding = httpLast.contentEncoding;
this.host = httpLast.host;
this.cookie = httpLast.cookie;
this.cookies = httpLast.cookies;
this.keepAlive = httpLast.keepAlive;
this.maybews = httpLast.maybews;
this.expect = httpLast.expect;
this.chunked = httpLast.chunked;
this.chunkedLength = httpLast.chunkedLength;
this.chunkedCurrOffset = httpLast.chunkedCurrOffset;
this.rpc = httpLast.rpc;
this.traceid = httpLast.traceid;
this.currentUserid = httpLast.currentUserid;
this.reqConvertType = httpLast.reqConvertType;
this.reqConvert = httpLast.reqConvert;
this.respConvertType = httpLast.respConvertType;
this.respConvert = httpLast.respConvert;
this.headerLength = httpLast.headerLength;
this.headerHalfLen = httpLast.headerHalfLen;
this.headerBytes = httpLast.headerBytes;
this.headerParsed = httpLast.headerParsed;
this.headers.setAll(httpLast.headers);
bytes.put(buffer, remainHalf);
this.headerBytes = bytes.getBytes();
this.headerLength = this.headerBytes.length;
this.headerParsed = false;
} else if (context.lazyHeader && getmethod) { // 非GET必须要读header会有Content-Length
int rs = loadHeaderBytes(buffer);
if (rs >= 0 && this.headerLength > context.getMaxHeader()) {
@@ -1235,37 +1218,6 @@ public class HttpRequest extends Request<HttpContext> {
return bytes.toString(latin1, charset);
}
@Override
protected HttpRequest copyHeader() {
if (!context.sameHeader || !context.lazyHeader) {
return null;
}
HttpRequest req = new HttpRequest(context, this.bodyBytes);
req.headerLength = this.headerLength;
req.headerBytes = this.headerBytes;
req.headerParsed = this.headerParsed;
req.contentType = this.contentType;
req.contentLength = this.contentLength;
req.contentEncoding = this.contentEncoding;
req.host = this.host;
req.cookie = this.cookie;
req.cookies = this.cookies;
req.keepAlive = this.keepAlive;
req.maybews = this.maybews;
req.expect = this.expect;
req.chunked = this.chunked;
req.rpc = this.rpc;
req.traceid = this.traceid;
req.currentUserid = this.currentUserid;
req.currentUserSupplier = this.currentUserSupplier;
req.reqConvertType = this.reqConvertType;
req.reqConvert = this.reqConvert;
req.respConvert = this.respConvert;
req.respConvertType = this.respConvertType;
req.headers.setAll(this.headers);
return req;
}
@Override
protected final Serializable getRequestid() {
return null;

View File

@@ -40,7 +40,7 @@ public class SncpRequest extends Request<SncpContext> {
protected int readState = READ_STATE_ROUTE;
private int headerSize;
private int headerLength;
private ByteArray halfArray;
@@ -55,7 +55,7 @@ public class SncpRequest extends Request<SncpContext> {
}
@Override // request.header与response.header数据格式保持一致
protected int readHeader(ByteBuffer buffer, Request last) {
protected int readHeader(ByteBuffer buffer, int pipelineHeaderLength) {
// ---------------------route----------------------------------
if (this.readState == READ_STATE_ROUTE) {
int remain = buffer.remaining();
@@ -71,26 +71,24 @@ public class SncpRequest extends Request<SncpContext> {
return expect - remain; // 小于2
} else {
if (halfArray == null) {
this.headerSize = buffer.getChar();
this.headerLength = buffer.getChar();
} else {
halfArray.put(buffer.get());
this.headerSize = halfArray.getChar(0);
this.headerLength = halfArray.getChar(0);
halfArray.clear();
}
}
if (this.headerSize < SncpHeader.HEADER_SUBSIZE) {
if (this.headerLength < SncpHeader.HEADER_SUBSIZE) {
context.getLogger()
.log(
Level.WARNING,
.log(Level.WARNING,
"sncp header.length must more " + SncpHeader.HEADER_SUBSIZE + ", but "
+ this.headerSize);
+ this.headerLength);
return -1;
}
if (this.headerSize > context.getMaxHeader()) {
if (this.headerLength > context.getMaxHeader()) {
context.getLogger()
.log(
Level.WARNING,
"sncp header.length must lower " + context.getMaxHeader() + ", but " + this.headerSize);
.log(Level.WARNING,
"sncp header.length must lower " + context.getMaxHeader() + ", but " + this.headerLength);
return -1;
}
this.readState = READ_STATE_HEADER;
@@ -98,7 +96,7 @@ public class SncpRequest extends Request<SncpContext> {
// ---------------------head----------------------------------
if (this.readState == READ_STATE_HEADER) {
int remain = buffer.remaining();
int expect = halfArray == null ? this.headerSize - 2 : this.headerSize - 2 - halfArray.length();
int expect = halfArray == null ? this.headerLength - 2 : this.headerLength - 2 - halfArray.length();
if (remain < expect) {
if (halfArray == null) {
halfArray = new ByteArray();
@@ -108,10 +106,10 @@ public class SncpRequest extends Request<SncpContext> {
return expect - remain;
}
if (halfArray == null || halfArray.length() == 0) {
this.header = SncpHeader.read(buffer, this.headerSize);
this.header = SncpHeader.read(buffer, this.headerLength);
} else {
halfArray.put(buffer, expect);
this.header = SncpHeader.read(halfArray, this.headerSize);
this.header = SncpHeader.read(halfArray, this.headerLength);
halfArray.clear();
}
if (this.header.getRetcode() != 0) { // retcode

View File

@@ -51,22 +51,20 @@ public class SncpRequestParseTest {
System.out.println(" " + Arrays.toString(Arrays.copyOfRange(bs, 2, bs.length)));
SncpRequestTest request = new SncpRequestTest(context);
Assertions.assertEquals(1, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 0, 1)), null));
Assertions.assertEquals(
headerSize - 2, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 1, 2)), null));
Assertions.assertEquals(0, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 2, bs.length)), null));
Assertions.assertEquals(1, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 0, 1)), -1));
Assertions.assertEquals(headerSize - 2, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 1, 2)), -1));
Assertions.assertEquals(0, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 2, bs.length)), -1));
Assertions.assertEquals("aa", request.getHeader().getTraceid());
System.out.println("测试第二段");
request = new SncpRequestTest(context);
Assertions.assertEquals(1, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 0, 1)), null));
Assertions.assertEquals(
headerSize - 2, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 1, 2)), null));
Assertions.assertEquals(1, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 0, 1)), -1));
Assertions.assertEquals(headerSize - 2, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 1, 2)), -1));
Assertions.assertEquals(
headerSize - headerSize / 2,
request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 2, headerSize / 2)), null));
request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 2, headerSize / 2)), -1));
Assertions.assertEquals(
0, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, headerSize / 2, bs.length)), null));
0, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, headerSize / 2, bs.length)), -1));
Assertions.assertEquals("aa", request.getHeader().getTraceid());
}
@@ -77,8 +75,8 @@ public class SncpRequestParseTest {
}
@Override
protected int readHeader(ByteBuffer buffer, Request last) {
return super.readHeader(buffer, last);
protected int readHeader(ByteBuffer buffer, int pipelineHeaderLength) {
return super.readHeader(buffer, pipelineHeaderLength);
}
}
}