diff --git a/src/main/java/org/redkale/mq/spi/SncpMessageRequest.java b/src/main/java/org/redkale/mq/spi/SncpMessageRequest.java index 9d204022e..08ea6fd09 100644 --- a/src/main/java/org/redkale/mq/spi/SncpMessageRequest.java +++ b/src/main/java/org/redkale/mq/spi/SncpMessageRequest.java @@ -23,6 +23,6 @@ public class SncpMessageRequest extends SncpRequest { super(context); this.message = message; this.createTime = System.currentTimeMillis(); - readHeader(ByteBuffer.wrap(message.getContent()), null); + readHeader(ByteBuffer.wrap(message.getContent()), -1); } } diff --git a/src/main/java/org/redkale/net/ProtocolCodec.java b/src/main/java/org/redkale/net/ProtocolCodec.java index 8f8738296..278f46b19 100644 --- a/src/main/java/org/redkale/net/ProtocolCodec.java +++ b/src/main/java/org/redkale/net/ProtocolCodec.java @@ -84,7 +84,7 @@ class ProtocolCodec implements CompletionHandler { buffer.flip(); final Response response = createResponse(); try { - decode(buffer, response, 0, null); + decode(buffer, response, 0, -1); } catch (Throwable t) { // 此处不可 context.offerBuffer(buffer); 以免dispatcher.dispatch内部异常导致重复 offerBuffer context.logger.log(Level.WARNING, "dispatch servlet abort, force to close channel ", t); response.codecError(t); @@ -106,7 +106,7 @@ class ProtocolCodec implements CompletionHandler { if (data != null) { // pipeline模式或UDP连接创建AsyncConnection时已经获取到ByteBuffer数据了 final Response response = createResponse(); try { - decode(data, response, 0, null); + decode(data, response, 0, -1); } catch (Throwable t) { context.logger.log(Level.WARNING, "dispatch servlet abort, force to close channel ", t); response.codecError(t); @@ -127,7 +127,7 @@ class ProtocolCodec implements CompletionHandler { if (data != null) { // pipeline模式或UDP连接创建AsyncConnection时已经获取到ByteBuffer数据了 final Response response = createResponse(); try { - decode(data, response, 0, null); + decode(data, response, 0, -1); } catch (Throwable t) { context.logger.log(Level.WARNING, "dispatch servlet abort, force to close channel ", t); response.codecError(t); @@ -144,10 +144,10 @@ class ProtocolCodec implements CompletionHandler { } } - protected void decode(ByteBuffer buffer, Response response, int pipelineIndex, Request lastReq) { + protected void decode(ByteBuffer buffer, Response response, int pipelineIndex, final int pipelineHeaderLength) { response.init(channel); final Request request = response.request; - final int rs = request.readHeader(buffer, lastReq); + final int rs = request.readHeader(buffer, pipelineHeaderLength); if (rs < 0) { // 表示数据格式不正确 final DispatcherServlet dispatcher = context.dispatcher; dispatcher.incrExecuteCounter(); @@ -162,7 +162,7 @@ class ProtocolCodec implements CompletionHandler { } else if (rs == 0) { context.dispatcher.incrExecuteCounter(); int pindex = pipelineIndex; - Request hreq = lastReq; + int plength = pipelineHeaderLength; if (buffer.hasRemaining()) { // pipeline模式 if (pindex == 0) { pindex++; @@ -170,13 +170,13 @@ class ProtocolCodec implements CompletionHandler { if (request.getRequestid() == null) { // 存在requestid则无视pipeline模式 request.pipeline(pindex, pindex + 1); } - if (hreq == null) { - hreq = request.copyHeader(); + if (plength < 0) { + plength = request.pipelineHeaderLength(); } context.executeDispatch(request, response); final Response pipelineResponse = createResponse(); try { - decode(buffer, pipelineResponse, pindex + 1, hreq); + decode(buffer, pipelineResponse, pindex + 1, plength); } catch (Throwable t) { // 此处不可 offerBuffer(buffer); 以免dispatcher.dispatch内部异常导致重复 offerBuffer context.logger.log(Level.WARNING, "dispatch pipeline servlet abort, force to close channel ", t); pipelineResponse.codecError(t); @@ -193,7 +193,7 @@ class ProtocolCodec implements CompletionHandler { } } else { // rs > 0 channel.setReadBuffer(buffer); - channel.read(readHandler.prepare(request, response, pipelineIndex, lastReq)); + channel.read(readHandler.prepare(request, response, pipelineIndex, pipelineHeaderLength)); } } @@ -205,13 +205,14 @@ class ProtocolCodec implements CompletionHandler { private int pipelineIndex; - private Request lastReq; + private int pipelineHeaderLength; - public ReadCompletionHandler prepare(Request request, Response response, int pipelineIndex, Request lastReq) { + public ReadCompletionHandler prepare( + Request request, Response response, int pipelineIndex, int pipelineHeaderLength) { this.request = request; this.response = response; this.pipelineIndex = pipelineIndex; - this.lastReq = lastReq; + this.pipelineHeaderLength = pipelineHeaderLength; return this; } @@ -223,7 +224,7 @@ class ProtocolCodec implements CompletionHandler { return; } attachment.flip(); - decode(attachment, response, pipelineIndex, lastReq); + decode(attachment, response, pipelineIndex, pipelineHeaderLength); } @Override diff --git a/src/main/java/org/redkale/net/Request.java b/src/main/java/org/redkale/net/Request.java index e958fe9a2..02232768a 100644 --- a/src/main/java/org/redkale/net/Request.java +++ b/src/main/java/org/redkale/net/Request.java @@ -81,8 +81,8 @@ public abstract class Request { this.channel = request.channel; } - protected Request copyHeader() { - return null; + protected int pipelineHeaderLength() { + return -1; } protected Request pipeline(int pipelineIndex, int pipelineCount) { @@ -95,10 +95,10 @@ public abstract class Request { * 返回值:Integer.MIN_VALUE: 帧数据; -1:数据不合法; 0:解析完毕; >0: 需再读取的字节数。 * * @param buffer ByteBuffer对象 - * @param last 同一Channel的上一个Request + * @param pipelineHeaderLength 同一Channel的pipelien模式下上一个Request的header长度 * @return 缺少的字节数 */ - protected abstract int readHeader(ByteBuffer buffer, Request last); + protected abstract int readHeader(ByteBuffer buffer, int pipelineHeaderLength); protected abstract Serializable getRequestid(); diff --git a/src/main/java/org/redkale/net/Response.java b/src/main/java/org/redkale/net/Response.java index c20d79323..388e4bc24 100644 --- a/src/main/java/org/redkale/net/Response.java +++ b/src/main/java/org/redkale/net/Response.java @@ -417,8 +417,7 @@ public abstract class Response> { } } - public void finish( - boolean kill, final byte[] bs1, int offset1, int length1, final byte[] bs2, int offset2, int length2) { + public void finish(boolean kill, byte[] bs1, int offset1, int length1, byte[] bs2, int offset2, int length2) { if (kill) { refuseAlive(); } @@ -517,7 +516,7 @@ public abstract class Response> { } } - protected void send(final ByteBuffer buffer, final A attachment, final CompletionHandler handler) { + protected void send(ByteBuffer buffer, A attachment, CompletionHandler handler) { this.channel.write(buffer, attachment, new CompletionHandler() { @Override @@ -546,7 +545,7 @@ public abstract class Response> { }); } - protected void send(final ByteBuffer[] buffers, A attachment, final CompletionHandler handler) { + protected void send(ByteBuffer[] buffers, A attachment, CompletionHandler handler) { this.channel.write(buffers, attachment, new CompletionHandler() { @Override diff --git a/src/main/java/org/redkale/net/http/HttpRequest.java b/src/main/java/org/redkale/net/http/HttpRequest.java index f1fa97574..82039ea42 100644 --- a/src/main/java/org/redkale/net/http/HttpRequest.java +++ b/src/main/java/org/redkale/net/http/HttpRequest.java @@ -326,7 +326,12 @@ public class HttpRequest extends Request { } @Override - protected int readHeader(final ByteBuffer buf, final Request last) { + protected int pipelineHeaderLength() { + return (!context.sameHeader || !context.lazyHeader) ? -1 : headerLength; + } + + @Override + protected int readHeader(final ByteBuffer buf, final int pipelineHeaderLength) { final ByteBuffer buffer = buf; ByteArray bytes = bodyBytes; if (this.readState == READ_STATE_ROUTE) { @@ -337,41 +342,19 @@ public class HttpRequest extends Request { this.readState = READ_STATE_HEADER; } if (this.readState == READ_STATE_HEADER) { - if (last != null && ((HttpRequest) last).headerLength > 0) { - final HttpRequest httpLast = (HttpRequest) last; + if (pipelineHeaderLength > 0) { int bufremain = buffer.remaining(); - int remainHalf = httpLast.headerLength - this.headerHalfLen; + int remainHalf = pipelineHeaderLength - this.headerHalfLen; if (remainHalf > bufremain) { bytes.put(buffer); this.headerHalfLen += bufremain; buffer.clear(); return 1; } - buffer.position(buffer.position() + remainHalf); - this.contentType = httpLast.contentType; - this.contentLength = httpLast.contentLength; - this.contentEncoding = httpLast.contentEncoding; - this.host = httpLast.host; - this.cookie = httpLast.cookie; - this.cookies = httpLast.cookies; - this.keepAlive = httpLast.keepAlive; - this.maybews = httpLast.maybews; - this.expect = httpLast.expect; - this.chunked = httpLast.chunked; - this.chunkedLength = httpLast.chunkedLength; - this.chunkedCurrOffset = httpLast.chunkedCurrOffset; - this.rpc = httpLast.rpc; - this.traceid = httpLast.traceid; - this.currentUserid = httpLast.currentUserid; - this.reqConvertType = httpLast.reqConvertType; - this.reqConvert = httpLast.reqConvert; - this.respConvertType = httpLast.respConvertType; - this.respConvert = httpLast.respConvert; - this.headerLength = httpLast.headerLength; - this.headerHalfLen = httpLast.headerHalfLen; - this.headerBytes = httpLast.headerBytes; - this.headerParsed = httpLast.headerParsed; - this.headers.setAll(httpLast.headers); + bytes.put(buffer, remainHalf); + this.headerBytes = bytes.getBytes(); + this.headerLength = this.headerBytes.length; + this.headerParsed = false; } else if (context.lazyHeader && getmethod) { // 非GET必须要读header,会有Content-Length int rs = loadHeaderBytes(buffer); if (rs >= 0 && this.headerLength > context.getMaxHeader()) { @@ -1235,37 +1218,6 @@ public class HttpRequest extends Request { return bytes.toString(latin1, charset); } - @Override - protected HttpRequest copyHeader() { - if (!context.sameHeader || !context.lazyHeader) { - return null; - } - HttpRequest req = new HttpRequest(context, this.bodyBytes); - req.headerLength = this.headerLength; - req.headerBytes = this.headerBytes; - req.headerParsed = this.headerParsed; - req.contentType = this.contentType; - req.contentLength = this.contentLength; - req.contentEncoding = this.contentEncoding; - req.host = this.host; - req.cookie = this.cookie; - req.cookies = this.cookies; - req.keepAlive = this.keepAlive; - req.maybews = this.maybews; - req.expect = this.expect; - req.chunked = this.chunked; - req.rpc = this.rpc; - req.traceid = this.traceid; - req.currentUserid = this.currentUserid; - req.currentUserSupplier = this.currentUserSupplier; - req.reqConvertType = this.reqConvertType; - req.reqConvert = this.reqConvert; - req.respConvert = this.respConvert; - req.respConvertType = this.respConvertType; - req.headers.setAll(this.headers); - return req; - } - @Override protected final Serializable getRequestid() { return null; diff --git a/src/main/java/org/redkale/net/sncp/SncpRequest.java b/src/main/java/org/redkale/net/sncp/SncpRequest.java index a9e9ca69f..19d49bd55 100644 --- a/src/main/java/org/redkale/net/sncp/SncpRequest.java +++ b/src/main/java/org/redkale/net/sncp/SncpRequest.java @@ -40,7 +40,7 @@ public class SncpRequest extends Request { protected int readState = READ_STATE_ROUTE; - private int headerSize; + private int headerLength; private ByteArray halfArray; @@ -55,7 +55,7 @@ public class SncpRequest extends Request { } @Override // request.header与response.header数据格式保持一致 - protected int readHeader(ByteBuffer buffer, Request last) { + protected int readHeader(ByteBuffer buffer, int pipelineHeaderLength) { // ---------------------route---------------------------------- if (this.readState == READ_STATE_ROUTE) { int remain = buffer.remaining(); @@ -71,26 +71,24 @@ public class SncpRequest extends Request { return expect - remain; // 小于2 } else { if (halfArray == null) { - this.headerSize = buffer.getChar(); + this.headerLength = buffer.getChar(); } else { halfArray.put(buffer.get()); - this.headerSize = halfArray.getChar(0); + this.headerLength = halfArray.getChar(0); halfArray.clear(); } } - if (this.headerSize < SncpHeader.HEADER_SUBSIZE) { + if (this.headerLength < SncpHeader.HEADER_SUBSIZE) { context.getLogger() - .log( - Level.WARNING, + .log(Level.WARNING, "sncp header.length must more " + SncpHeader.HEADER_SUBSIZE + ", but " - + this.headerSize); + + this.headerLength); return -1; } - if (this.headerSize > context.getMaxHeader()) { + if (this.headerLength > context.getMaxHeader()) { context.getLogger() - .log( - Level.WARNING, - "sncp header.length must lower " + context.getMaxHeader() + ", but " + this.headerSize); + .log(Level.WARNING, + "sncp header.length must lower " + context.getMaxHeader() + ", but " + this.headerLength); return -1; } this.readState = READ_STATE_HEADER; @@ -98,7 +96,7 @@ public class SncpRequest extends Request { // ---------------------head---------------------------------- if (this.readState == READ_STATE_HEADER) { int remain = buffer.remaining(); - int expect = halfArray == null ? this.headerSize - 2 : this.headerSize - 2 - halfArray.length(); + int expect = halfArray == null ? this.headerLength - 2 : this.headerLength - 2 - halfArray.length(); if (remain < expect) { if (halfArray == null) { halfArray = new ByteArray(); @@ -108,10 +106,10 @@ public class SncpRequest extends Request { return expect - remain; } if (halfArray == null || halfArray.length() == 0) { - this.header = SncpHeader.read(buffer, this.headerSize); + this.header = SncpHeader.read(buffer, this.headerLength); } else { halfArray.put(buffer, expect); - this.header = SncpHeader.read(halfArray, this.headerSize); + this.header = SncpHeader.read(halfArray, this.headerLength); halfArray.clear(); } if (this.header.getRetcode() != 0) { // retcode diff --git a/src/test/java/org/redkale/test/sncp/SncpRequestParseTest.java b/src/test/java/org/redkale/test/sncp/SncpRequestParseTest.java index 4703d1c44..6262438ce 100644 --- a/src/test/java/org/redkale/test/sncp/SncpRequestParseTest.java +++ b/src/test/java/org/redkale/test/sncp/SncpRequestParseTest.java @@ -51,22 +51,20 @@ public class SncpRequestParseTest { System.out.println(" " + Arrays.toString(Arrays.copyOfRange(bs, 2, bs.length))); SncpRequestTest request = new SncpRequestTest(context); - Assertions.assertEquals(1, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 0, 1)), null)); - Assertions.assertEquals( - headerSize - 2, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 1, 2)), null)); - Assertions.assertEquals(0, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 2, bs.length)), null)); + Assertions.assertEquals(1, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 0, 1)), -1)); + Assertions.assertEquals(headerSize - 2, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 1, 2)), -1)); + Assertions.assertEquals(0, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 2, bs.length)), -1)); Assertions.assertEquals("aa", request.getHeader().getTraceid()); System.out.println("测试第二段"); request = new SncpRequestTest(context); - Assertions.assertEquals(1, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 0, 1)), null)); - Assertions.assertEquals( - headerSize - 2, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 1, 2)), null)); + Assertions.assertEquals(1, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 0, 1)), -1)); + Assertions.assertEquals(headerSize - 2, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 1, 2)), -1)); Assertions.assertEquals( headerSize - headerSize / 2, - request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 2, headerSize / 2)), null)); + request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, 2, headerSize / 2)), -1)); Assertions.assertEquals( - 0, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, headerSize / 2, bs.length)), null)); + 0, request.readHeader(ByteBuffer.wrap(Arrays.copyOfRange(bs, headerSize / 2, bs.length)), -1)); Assertions.assertEquals("aa", request.getHeader().getTraceid()); } @@ -77,8 +75,8 @@ public class SncpRequestParseTest { } @Override - protected int readHeader(ByteBuffer buffer, Request last) { - return super.readHeader(buffer, last); + protected int readHeader(ByteBuffer buffer, int pipelineHeaderLength) { + return super.readHeader(buffer, pipelineHeaderLength); } } }