优化ProtocolCodec的续读功能
This commit is contained in:
@@ -159,6 +159,8 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
|
|||||||
context.dispatcher.incrExecuteCounter();
|
context.dispatcher.incrExecuteCounter();
|
||||||
int pindex = pipelineIndex;
|
int pindex = pipelineIndex;
|
||||||
boolean pipeline = false;
|
boolean pipeline = false;
|
||||||
|
boolean seted = false;
|
||||||
|
boolean completed = request.completed;
|
||||||
Request hreq = lastReq;
|
Request hreq = lastReq;
|
||||||
if (buffer.hasRemaining()) {
|
if (buffer.hasRemaining()) {
|
||||||
pipeline = true;
|
pipeline = true;
|
||||||
@@ -176,6 +178,7 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
|
|||||||
request.pipeline(pindex, pindex);
|
request.pipeline(pindex, pindex);
|
||||||
}
|
}
|
||||||
channel.setReadBuffer(buffer.clear());
|
channel.setReadBuffer(buffer.clear());
|
||||||
|
seted = true;
|
||||||
}
|
}
|
||||||
context.executeDispatch(request, response);
|
context.executeDispatch(request, response);
|
||||||
if (pipeline) {
|
if (pipeline) {
|
||||||
@@ -186,6 +189,11 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
|
|||||||
context.logger.log(Level.WARNING, "dispatch pipeline servlet abort, force to close channel ", t);
|
context.logger.log(Level.WARNING, "dispatch pipeline servlet abort, force to close channel ", t);
|
||||||
pipelineResponse.codecError(t);
|
pipelineResponse.codecError(t);
|
||||||
}
|
}
|
||||||
|
} else if (completed) {
|
||||||
|
if (!seted) {
|
||||||
|
channel.setReadBuffer(buffer.clear());
|
||||||
|
}
|
||||||
|
channel.readRegister(this);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
channel.setReadBuffer(buffer);
|
channel.setReadBuffer(buffer);
|
||||||
|
|||||||
@@ -33,6 +33,10 @@ public abstract class Request<C extends Context> {
|
|||||||
|
|
||||||
protected boolean keepAlive;
|
protected boolean keepAlive;
|
||||||
|
|
||||||
|
//请求包是否完成读取完毕,用于ProtocolCodec继续读的判断条件
|
||||||
|
//需要在readHeader方法中设置
|
||||||
|
protected boolean completed;
|
||||||
|
|
||||||
protected int pipelineIndex;
|
protected int pipelineIndex;
|
||||||
|
|
||||||
protected int pipelineCount;
|
protected int pipelineCount;
|
||||||
@@ -103,6 +107,7 @@ public abstract class Request<C extends Context> {
|
|||||||
pipelineIndex = 0;
|
pipelineIndex = 0;
|
||||||
pipelineCount = 0;
|
pipelineCount = 0;
|
||||||
pipelineCompleted = false;
|
pipelineCompleted = false;
|
||||||
|
completed = false;
|
||||||
keepAlive = false;
|
keepAlive = false;
|
||||||
attributes.clear();
|
attributes.clear();
|
||||||
channel = null; // close it by response
|
channel = null; // close it by response
|
||||||
|
|||||||
@@ -329,11 +329,14 @@ public abstract class Response<C extends Context, R extends Request<C>> {
|
|||||||
}
|
}
|
||||||
this.recycleListener = null;
|
this.recycleListener = null;
|
||||||
}
|
}
|
||||||
|
boolean completed = request.completed;
|
||||||
if (request.keepAlive && (request.pipelineIndex == 0 || request.pipelineCompleted)) {
|
if (request.keepAlive && (request.pipelineIndex == 0 || request.pipelineCompleted)) {
|
||||||
AsyncConnection conn = removeChannel();
|
AsyncConnection conn = removeChannel();
|
||||||
if (conn != null && conn.protocolCodec != null) {
|
if (conn != null && conn.protocolCodec != null) {
|
||||||
this.responseConsumer.accept(this);
|
this.responseConsumer.accept(this);
|
||||||
conn.readRegister(conn.protocolCodec);
|
if (!completed) {
|
||||||
|
conn.readRegister(conn.protocolCodec);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
Supplier<Response> poolSupplier = this.responseSupplier;
|
Supplier<Response> poolSupplier = this.responseSupplier;
|
||||||
Consumer<Response> poolConsumer = this.responseConsumer;
|
Consumer<Response> poolConsumer = this.responseConsumer;
|
||||||
|
|||||||
@@ -295,6 +295,7 @@ public class HttpRequest extends Request<HttpContext> {
|
|||||||
this.readState = READ_STATE_HEADER;
|
this.readState = READ_STATE_HEADER;
|
||||||
}
|
}
|
||||||
if (this.readState == READ_STATE_HEADER) {
|
if (this.readState == READ_STATE_HEADER) {
|
||||||
|
this.completed = true;
|
||||||
if (last != null && ((HttpRequest) last).headerLength > 0) {
|
if (last != null && ((HttpRequest) last).headerLength > 0) {
|
||||||
final HttpRequest httplast = (HttpRequest) last;
|
final HttpRequest httplast = (HttpRequest) last;
|
||||||
int bufremain = buffer.remaining();
|
int bufremain = buffer.remaining();
|
||||||
@@ -350,6 +351,7 @@ public class HttpRequest extends Request<HttpContext> {
|
|||||||
this.boundary = true;
|
this.boundary = true;
|
||||||
}
|
}
|
||||||
if (this.boundary) {
|
if (this.boundary) {
|
||||||
|
this.completed = false; //completed=true时ProtocolCodec会继续读下一个request
|
||||||
this.keepAlive = false; //文件上传必须设置keepAlive为false,因为文件过大时用户不一定会skip掉多余的数据
|
this.keepAlive = false; //文件上传必须设置keepAlive为false,因为文件过大时用户不一定会skip掉多余的数据
|
||||||
}
|
}
|
||||||
this.readState = READ_STATE_BODY;
|
this.readState = READ_STATE_BODY;
|
||||||
|
|||||||
@@ -83,6 +83,7 @@ public class SncpRequest extends Request<SncpContext> {
|
|||||||
}
|
}
|
||||||
//---------------------head----------------------------------
|
//---------------------head----------------------------------
|
||||||
if (this.readState == READ_STATE_HEADER) {
|
if (this.readState == READ_STATE_HEADER) {
|
||||||
|
this.completed = true;
|
||||||
int remain = buffer.remaining();
|
int remain = buffer.remaining();
|
||||||
int expect = halfArray == null ? this.headerSize - 2 : this.headerSize - 2 - halfArray.length();
|
int expect = halfArray == null ? this.headerSize - 2 : this.headerSize - 2 - halfArray.length();
|
||||||
if (remain < expect) {
|
if (remain < expect) {
|
||||||
|
|||||||
@@ -66,6 +66,6 @@ public class SncpSleepTest {
|
|||||||
System.out.println("耗时: " + e + " ms");
|
System.out.println("耗时: " + e + " ms");
|
||||||
server.shutdown();
|
server.shutdown();
|
||||||
workExecutor.shutdown();
|
workExecutor.shutdown();
|
||||||
Assertions.assertTrue(e < 900);
|
Assertions.assertTrue(e < 600);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user