This commit is contained in:
Redkale
2018-08-04 15:12:22 +08:00
parent 5b501c7c2f
commit c69c1bb134
3 changed files with 39 additions and 12 deletions

View File

@@ -218,7 +218,11 @@ public abstract class PrepareServlet<K extends Serializable, C extends Context,
if (rs != Integer.MIN_VALUE) illRequestCounter.incrementAndGet(); if (rs != Integer.MIN_VALUE) illRequestCounter.incrementAndGet();
response.finish(true); response.finish(true);
} else if (rs == 0) { } else if (rs == 0) {
if (buffer.hasRemaining()) {
request.setMoredata(buffer);
} else {
request.offerReadBuffer(buffer); request.offerReadBuffer(buffer);
}
request.prepare(); request.prepare();
response.filter = this.headFilter; response.filter = this.headFilter;
response.servlet = this; response.servlet = this;
@@ -235,8 +239,12 @@ public abstract class PrepareServlet<K extends Serializable, C extends Context,
if (ai.get() > 0) { if (ai.get() > 0) {
buffer.clear(); buffer.clear();
request.channel.read(buffer, buffer, this); request.channel.read(buffer, buffer, this);
} else {
if (buffer.hasRemaining()) {
request.setMoredata(buffer);
} else { } else {
request.offerReadBuffer(buffer); request.offerReadBuffer(buffer);
}
request.prepare(); request.prepare();
try { try {
response.filter = PrepareServlet.this.headFilter; response.filter = PrepareServlet.this.headFilter;

View File

@@ -31,6 +31,10 @@ public abstract class Request<C extends Context> {
protected boolean keepAlive; protected boolean keepAlive;
protected boolean more; //pipeline模式
protected ByteBuffer moredata; //pipeline模式
protected AsyncConnection channel; protected AsyncConnection channel;
protected ByteBuffer readBuffer; protected ByteBuffer readBuffer;
@@ -50,6 +54,16 @@ public abstract class Request<C extends Context> {
this.jsonConvert = context.getJsonConvert(); this.jsonConvert = context.getJsonConvert();
} }
protected void setMoredata(ByteBuffer buffer) {
this.moredata = buffer;
}
protected ByteBuffer removeMoredata() {
ByteBuffer rs = this.moredata;
this.moredata = null;
return rs;
}
protected ByteBuffer pollReadBuffer() { protected ByteBuffer pollReadBuffer() {
ByteBuffer buffer = this.readBuffer; ByteBuffer buffer = this.readBuffer;
this.readBuffer = null; this.readBuffer = null;
@@ -90,6 +104,8 @@ public abstract class Request<C extends Context> {
protected void recycle() { protected void recycle() {
createtime = 0; createtime = 0;
keepAlive = false; keepAlive = false;
more = false;
moredata = null;
attributes.clear(); attributes.clear();
channel = null; // close it by response channel = null; // close it by response
} }

View File

@@ -30,8 +30,6 @@ public abstract class Response<C extends Context, R extends Request<C>> {
protected AsyncConnection channel; protected AsyncConnection channel;
protected ByteBuffer moredata; //pipeline模式
protected ByteBuffer writeHeadBuffer; protected ByteBuffer writeHeadBuffer;
protected ByteBuffer writeBodyBuffer; protected ByteBuffer writeBodyBuffer;
@@ -169,12 +167,6 @@ public abstract class Response<C extends Context, R extends Request<C>> {
return ch; return ch;
} }
protected ByteBuffer removeMoredata() {
ByteBuffer rs = this.moredata;
this.moredata = null;
return rs;
}
protected void prepare() { protected void prepare() {
inited = true; inited = true;
} }
@@ -184,7 +176,6 @@ public abstract class Response<C extends Context, R extends Request<C>> {
this.output = null; this.output = null;
this.filter = null; this.filter = null;
this.servlet = null; this.servlet = null;
this.moredata = null;
request.recycle(); request.recycle();
if (channel != null) { if (channel != null) {
channel.dispose(); channel.dispose();
@@ -260,7 +251,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
} }
this.recycleListener = null; this.recycleListener = null;
} }
if (request.keepAlive && channel != null) { if (request.keepAlive && !request.more && channel != null) {
if (channel.isOpen()) { if (channel.isOpen()) {
AsyncConnection conn = removeChannel(); AsyncConnection conn = removeChannel();
this.recycle(); this.recycle();
@@ -288,24 +279,36 @@ public abstract class Response<C extends Context, R extends Request<C>> {
public void finish(ByteBuffer buffer) { public void finish(ByteBuffer buffer) {
if (!this.inited) return; //避免重复关闭 if (!this.inited) return; //避免重复关闭
ByteBuffer data = this.request.removeMoredata();
this.request.more = data != null && this.request.keepAlive;
this.channel.write(buffer, buffer, finishHandler); this.channel.write(buffer, buffer, finishHandler);
if (this.request.more) new PrepareRunner(this.context, this.channel, data, null).run();
} }
public void finish(boolean kill, ByteBuffer buffer) { public void finish(boolean kill, ByteBuffer buffer) {
if (!this.inited) return; //避免重复关闭 if (!this.inited) return; //避免重复关闭
if (kill) refuseAlive(); if (kill) refuseAlive();
ByteBuffer data = this.request.removeMoredata();
this.request.more = data != null && this.request.keepAlive;
this.channel.write(buffer, buffer, finishHandler); this.channel.write(buffer, buffer, finishHandler);
if (this.request.more) new PrepareRunner(this.context, this.channel, data, null).run();
} }
public void finish(ByteBuffer... buffers) { public void finish(ByteBuffer... buffers) {
if (!this.inited) return; //避免重复关闭 if (!this.inited) return; //避免重复关闭
ByteBuffer data = this.request.removeMoredata();
this.request.more = data != null && this.request.keepAlive;
this.channel.write(buffers, buffers, finishHandler2); this.channel.write(buffers, buffers, finishHandler2);
if (this.request.more) new PrepareRunner(this.context, this.channel, data, null).run();
} }
public void finish(boolean kill, ByteBuffer... buffers) { public void finish(boolean kill, ByteBuffer... buffers) {
if (!this.inited) return; //避免重复关闭 if (!this.inited) return; //避免重复关闭
if (kill) refuseAlive(); if (kill) refuseAlive();
ByteBuffer data = this.request.removeMoredata();
this.request.more = data != null && this.request.keepAlive;
this.channel.write(buffers, buffers, finishHandler2); this.channel.write(buffers, buffers, finishHandler2);
if (this.request.more) new PrepareRunner(this.context, this.channel, data, null).run();
} }
protected <A> void send(final ByteBuffer buffer, final A attachment, final CompletionHandler<Integer, A> handler) { protected <A> void send(final ByteBuffer buffer, final A attachment, final CompletionHandler<Integer, A> handler) {