优化Response.channel.write

This commit is contained in:
Redkale
2023-01-01 17:51:52 +08:00
parent b3862cea72
commit 1c1211a298
2 changed files with 30 additions and 6 deletions

View File

@@ -107,8 +107,9 @@ class AsyncNioTcpProtocolServer extends ProtocolServer {
return pool == null ? safeResponsePool.get() : pool.get();
};
this.responseConsumer = (v) -> {
if (Thread.currentThread() != v.thread && v.thread != null) {
v.thread.execute(() -> {
WorkThread thread = v.channel != null ? v.channel.getAsyncIOThread() : v.thread;
if (thread != null && !thread.inCurrThread()) {
thread.execute(() -> {
ObjectPool<Response> pool = localResponsePool.get();
(pool == null ? safeResponsePool : pool).accept(v);
});

View File

@@ -47,6 +47,8 @@ public abstract class Response<C extends Context, R extends Request<C>> {
protected Servlet<C, R, ? extends Response<C, R>> servlet;
protected final ByteBuffer writeBuffer;
private final CompletionHandler finishBytesHandler = new CompletionHandler<Integer, Void>() {
@Override
@@ -65,13 +67,21 @@ public abstract class Response<C extends Context, R extends Request<C>> {
@Override
public void completed(Integer result, ByteBuffer attachment) {
channel.offerBuffer(attachment);
if (attachment != writeBuffer) {
channel.offerBuffer(attachment);
} else {
attachment.clear();
}
finish();
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
channel.offerBuffer(attachment);
if (attachment != writeBuffer) {
channel.offerBuffer(attachment);
} else {
attachment.clear();
}
finish(true);
}
@@ -105,6 +115,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
this.context = context;
this.request = request;
this.thread = WorkThread.currWorkThread();
this.writeBuffer = context != null ? ByteBuffer.allocateDirect(context.getBufferCapacity()) : null;
}
protected AsyncConnection removeChannel() {
@@ -217,7 +228,11 @@ public abstract class Response<C extends Context, R extends Request<C>> {
AsyncConnection conn = removeChannel();
if (conn != null && conn.protocolCodec != null) {
this.responseConsumer.accept(this);
conn.read(conn.protocolCodec);
if (conn.inCurrThread()) {
conn.read(conn.protocolCodec);
} else {
conn.execute(() -> conn.read(conn.protocolCodec));
}
} else {
Supplier<Response> poolSupplier = this.responseSupplier;
Consumer<Response> poolConsumer = this.responseConsumer;
@@ -270,7 +285,15 @@ public abstract class Response<C extends Context, R extends Request<C>> {
}
});
} else {
this.channel.write(bs, offset, length, finishBytesHandler);
ByteBuffer buffer = this.writeBuffer;
if (buffer != null && buffer.capacity() >= length) {
buffer.clear();
buffer.put(bs, offset, length);
buffer.flip();
this.channel.write(buffer, buffer, finishBufferHandler);
} else {
this.channel.write(bs, offset, length, finishBytesHandler);
}
}
}