This commit is contained in:
@@ -26,24 +26,23 @@ public final class PrepareRunner implements Runnable {
|
|||||||
|
|
||||||
private final Context context;
|
private final Context context;
|
||||||
|
|
||||||
|
private final boolean keepAlive;
|
||||||
|
|
||||||
private ByteBuffer data;
|
private ByteBuffer data;
|
||||||
|
|
||||||
private Response response;
|
public PrepareRunner(Context context, AsyncConnection channel, ByteBuffer data, boolean keepAlive) {
|
||||||
|
|
||||||
public PrepareRunner(Context context, AsyncConnection channel, ByteBuffer data, Response response) {
|
|
||||||
this.context = context;
|
this.context = context;
|
||||||
this.channel = channel;
|
this.channel = channel;
|
||||||
this.data = data;
|
this.data = data;
|
||||||
this.response = response;
|
this.keepAlive = keepAlive;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
final boolean keepalive = response != null;
|
|
||||||
final PrepareServlet prepare = context.prepare;
|
final PrepareServlet prepare = context.prepare;
|
||||||
final ObjectPool<? extends Response> responsePool = context.responsePool;
|
final ObjectPool<? extends Response> responsePool = context.responsePool;
|
||||||
|
final Response response = responsePool.get();
|
||||||
if (data != null) { //BIO模式的UDP连接创建AsyncConnection时已经获取到ByteBuffer数据了
|
if (data != null) { //BIO模式的UDP连接创建AsyncConnection时已经获取到ByteBuffer数据了
|
||||||
if (response == null) response = responsePool.get();
|
|
||||||
try {
|
try {
|
||||||
response.init(channel);
|
response.init(channel);
|
||||||
prepare.prepare(data, response.request, response);
|
prepare.prepare(data, response.request, response);
|
||||||
@@ -53,10 +52,9 @@ public final class PrepareRunner implements Runnable {
|
|||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (response == null) response = responsePool.get();
|
|
||||||
final ByteBuffer buffer = response.request.pollReadBuffer();
|
final ByteBuffer buffer = response.request.pollReadBuffer();
|
||||||
try {
|
try {
|
||||||
channel.read(buffer, keepalive ? context.getAliveTimeoutSeconds() : 0, TimeUnit.SECONDS, null,
|
channel.read(buffer, keepAlive ? context.getAliveTimeoutSeconds() : 0, TimeUnit.SECONDS, null,
|
||||||
new CompletionHandler<Integer, Void>() {
|
new CompletionHandler<Integer, Void>() {
|
||||||
@Override
|
@Override
|
||||||
public void completed(Integer count, Void attachment1) {
|
public void completed(Integer count, Void attachment1) {
|
||||||
|
|||||||
@@ -150,7 +150,7 @@ public abstract class ProtocolServer {
|
|||||||
SocketAddress address = serchannel.receive(buffer);
|
SocketAddress address = serchannel.receive(buffer);
|
||||||
buffer.flip();
|
buffer.flip();
|
||||||
AsyncConnection conn = AsyncConnection.create(serchannel, address, false, readTimeoutSeconds, writeTimeoutSeconds);
|
AsyncConnection conn = AsyncConnection.create(serchannel, address, false, readTimeoutSeconds, writeTimeoutSeconds);
|
||||||
context.runAsync(new PrepareRunner(context, conn, buffer, null));
|
context.runAsync(new PrepareRunner(context, conn, buffer, false));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
context.offerBuffer(buffer);
|
context.offerBuffer(buffer);
|
||||||
}
|
}
|
||||||
@@ -244,7 +244,7 @@ public abstract class ProtocolServer {
|
|||||||
AsyncConnection conn = AsyncConnection.create(channel, null, context);
|
AsyncConnection conn = AsyncConnection.create(channel, null, context);
|
||||||
conn.livingCounter = livingCounter;
|
conn.livingCounter = livingCounter;
|
||||||
conn.closedCounter = closedCounter;
|
conn.closedCounter = closedCounter;
|
||||||
context.runAsync(new PrepareRunner(context, conn, null, null));
|
context.runAsync(new PrepareRunner(context, conn, null, false));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -170,21 +170,13 @@ public abstract class Response<C extends Context, R extends Request<C>> {
|
|||||||
protected boolean recycle() {
|
protected boolean recycle() {
|
||||||
if (!inited) return false;
|
if (!inited) return false;
|
||||||
boolean keepAlive = request.keepAlive;
|
boolean keepAlive = request.keepAlive;
|
||||||
if (recycleListener != null) {
|
|
||||||
try {
|
|
||||||
recycleListener.accept(request, this);
|
|
||||||
} catch (Exception e) {
|
|
||||||
context.logger.log(Level.WARNING, "Response.recycleListener error, request = " + request, e);
|
|
||||||
}
|
|
||||||
recycleListener = null;
|
|
||||||
}
|
|
||||||
this.output = null;
|
this.output = null;
|
||||||
this.filter = null;
|
this.filter = null;
|
||||||
this.servlet = null;
|
this.servlet = null;
|
||||||
request.recycle();
|
request.recycle();
|
||||||
if (channel != null) {
|
if (channel != null) {
|
||||||
if (keepAlive) {
|
if (keepAlive) {
|
||||||
this.context.runAsync(new PrepareRunner(context, channel, null, null));
|
this.context.runAsync(new PrepareRunner(context, channel, null, keepAlive));
|
||||||
} else {
|
} else {
|
||||||
try {
|
try {
|
||||||
if (channel.isOpen()) channel.close();
|
if (channel.isOpen()) channel.close();
|
||||||
@@ -255,6 +247,14 @@ public abstract class Response<C extends Context, R extends Request<C>> {
|
|||||||
if (!this.inited) return; //避免重复关闭
|
if (!this.inited) return; //避免重复关闭
|
||||||
//System.println("耗时: " + (System.currentTimeMillis() - request.createtime));
|
//System.println("耗时: " + (System.currentTimeMillis() - request.createtime));
|
||||||
if (kill) refuseAlive();
|
if (kill) refuseAlive();
|
||||||
|
if (this.recycleListener != null) {
|
||||||
|
try {
|
||||||
|
this.recycleListener.accept(request, this);
|
||||||
|
} catch (Exception e) {
|
||||||
|
context.logger.log(Level.WARNING, "Response.recycleListener error, request = " + request, e);
|
||||||
|
}
|
||||||
|
this.recycleListener = null;
|
||||||
|
}
|
||||||
this.context.responsePool.accept(this);
|
this.context.responsePool.accept(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -179,7 +179,6 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean recycle() {
|
protected boolean recycle() {
|
||||||
boolean rs = super.recycle();
|
|
||||||
this.status = 200;
|
this.status = 200;
|
||||||
this.contentLength = -1;
|
this.contentLength = -1;
|
||||||
this.contentType = null;
|
this.contentType = null;
|
||||||
@@ -187,7 +186,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
|
|||||||
this.headsended = false;
|
this.headsended = false;
|
||||||
this.header.clear();
|
this.header.clear();
|
||||||
this.bufferHandler = null;
|
this.bufferHandler = null;
|
||||||
return rs;
|
return super.recycle();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
Reference in New Issue
Block a user