This commit is contained in:
@@ -26,23 +26,24 @@ public final class PrepareRunner implements Runnable {
|
||||
|
||||
private final Context context;
|
||||
|
||||
private final boolean keepAlive;
|
||||
|
||||
private ByteBuffer data;
|
||||
|
||||
public PrepareRunner(Context context, AsyncConnection channel, ByteBuffer data, boolean keepAlive) {
|
||||
private Response response;
|
||||
|
||||
public PrepareRunner(Context context, AsyncConnection channel, ByteBuffer data, Response response) {
|
||||
this.context = context;
|
||||
this.channel = channel;
|
||||
this.data = data;
|
||||
this.keepAlive = keepAlive;
|
||||
this.response = response;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
final boolean keepalive = response != null;
|
||||
final PrepareServlet prepare = context.prepare;
|
||||
final ObjectPool<? extends Response> responsePool = context.responsePool;
|
||||
final Response response = responsePool.get();
|
||||
if (data != null) { //BIO模式的UDP连接创建AsyncConnection时已经获取到ByteBuffer数据了
|
||||
if (response == null) response = responsePool.get();
|
||||
try {
|
||||
response.init(channel);
|
||||
prepare.prepare(data, response.request, response);
|
||||
@@ -52,9 +53,10 @@ public final class PrepareRunner implements Runnable {
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (response == null) response = responsePool.get();
|
||||
final ByteBuffer buffer = response.request.pollReadBuffer();
|
||||
try {
|
||||
channel.read(buffer, keepAlive ? context.getAliveTimeoutSeconds() : 0, TimeUnit.SECONDS, null,
|
||||
channel.read(buffer, keepalive ? context.getAliveTimeoutSeconds() : 0, TimeUnit.SECONDS, null,
|
||||
new CompletionHandler<Integer, Void>() {
|
||||
@Override
|
||||
public void completed(Integer count, Void attachment1) {
|
||||
|
||||
@@ -150,7 +150,7 @@ public abstract class ProtocolServer {
|
||||
SocketAddress address = serchannel.receive(buffer);
|
||||
buffer.flip();
|
||||
AsyncConnection conn = AsyncConnection.create(serchannel, address, false, readTimeoutSeconds, writeTimeoutSeconds);
|
||||
context.runAsync(new PrepareRunner(context, conn, buffer, false));
|
||||
context.runAsync(new PrepareRunner(context, conn, buffer, null));
|
||||
} catch (Exception e) {
|
||||
context.offerBuffer(buffer);
|
||||
}
|
||||
@@ -244,7 +244,7 @@ public abstract class ProtocolServer {
|
||||
AsyncConnection conn = AsyncConnection.create(channel, null, context);
|
||||
conn.livingCounter = livingCounter;
|
||||
conn.closedCounter = closedCounter;
|
||||
context.runAsync(new PrepareRunner(context, conn, null, false));
|
||||
context.runAsync(new PrepareRunner(context, conn, null, null));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -169,19 +169,14 @@ public abstract class Response<C extends Context, R extends Request<C>> {
|
||||
|
||||
protected boolean recycle() {
|
||||
if (!inited) return false;
|
||||
boolean keepAlive = request.keepAlive;
|
||||
this.output = null;
|
||||
this.filter = null;
|
||||
this.servlet = null;
|
||||
request.recycle();
|
||||
if (channel != null) {
|
||||
if (keepAlive) {
|
||||
this.context.runAsync(new PrepareRunner(context, channel, null, keepAlive));
|
||||
} else {
|
||||
try {
|
||||
if (channel.isOpen()) channel.close();
|
||||
} catch (Exception e) {
|
||||
}
|
||||
try {
|
||||
if (channel.isOpen()) channel.close();
|
||||
} catch (Exception e) {
|
||||
}
|
||||
channel = null;
|
||||
}
|
||||
@@ -255,7 +250,14 @@ public abstract class Response<C extends Context, R extends Request<C>> {
|
||||
}
|
||||
this.recycleListener = null;
|
||||
}
|
||||
this.context.responsePool.accept(this);
|
||||
if (request.keepAlive) {
|
||||
AsyncConnection conn = removeChannel();
|
||||
this.recycle();
|
||||
this.prepare();
|
||||
new PrepareRunner(context, conn, null, this).run();
|
||||
} else {
|
||||
this.context.responsePool.accept(this);
|
||||
}
|
||||
}
|
||||
|
||||
public void finish(final byte[] bs) {
|
||||
|
||||
Reference in New Issue
Block a user