diff --git a/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java b/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java index 74ace08e4..e5deec360 100644 --- a/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java +++ b/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java @@ -107,8 +107,9 @@ class AsyncNioTcpProtocolServer extends ProtocolServer { return pool == null ? safeResponsePool.get() : pool.get(); }; this.responseConsumer = (v) -> { - if (Thread.currentThread() != v.thread && v.thread != null) { - v.thread.execute(() -> { + WorkThread thread = v.channel != null ? v.channel.getAsyncIOThread() : v.thread; + if (thread != null && !thread.inCurrThread()) { + thread.execute(() -> { ObjectPool pool = localResponsePool.get(); (pool == null ? safeResponsePool : pool).accept(v); }); diff --git a/src/main/java/org/redkale/net/Response.java b/src/main/java/org/redkale/net/Response.java index c3e3c167c..50e3bdbf1 100644 --- a/src/main/java/org/redkale/net/Response.java +++ b/src/main/java/org/redkale/net/Response.java @@ -47,6 +47,8 @@ public abstract class Response> { protected Servlet> servlet; + protected final ByteBuffer writeBuffer; + private final CompletionHandler finishBytesHandler = new CompletionHandler() { @Override @@ -65,13 +67,21 @@ public abstract class Response> { @Override public void completed(Integer result, ByteBuffer attachment) { - channel.offerBuffer(attachment); + if (attachment != writeBuffer) { + channel.offerBuffer(attachment); + } else { + attachment.clear(); + } finish(); } @Override public void failed(Throwable exc, ByteBuffer attachment) { - channel.offerBuffer(attachment); + if (attachment != writeBuffer) { + channel.offerBuffer(attachment); + } else { + attachment.clear(); + } finish(true); } @@ -105,6 +115,7 @@ public abstract class Response> { this.context = context; this.request = request; this.thread = WorkThread.currWorkThread(); + this.writeBuffer = context != null ? ByteBuffer.allocateDirect(context.getBufferCapacity()) : null; } protected AsyncConnection removeChannel() { @@ -217,7 +228,11 @@ public abstract class Response> { AsyncConnection conn = removeChannel(); if (conn != null && conn.protocolCodec != null) { this.responseConsumer.accept(this); - conn.read(conn.protocolCodec); + if (conn.inCurrThread()) { + conn.read(conn.protocolCodec); + } else { + conn.execute(() -> conn.read(conn.protocolCodec)); + } } else { Supplier poolSupplier = this.responseSupplier; Consumer poolConsumer = this.responseConsumer; @@ -270,7 +285,15 @@ public abstract class Response> { } }); } else { - this.channel.write(bs, offset, length, finishBytesHandler); + ByteBuffer buffer = this.writeBuffer; + if (buffer != null && buffer.capacity() >= length) { + buffer.clear(); + buffer.put(bs, offset, length); + buffer.flip(); + this.channel.write(buffer, buffer, finishBufferHandler); + } else { + this.channel.write(bs, offset, length, finishBytesHandler); + } } }