diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index a06a852ea..c28e1fa8c 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -16,6 +16,8 @@ import javax.net.ssl.SSLEngineResult.HandshakeStatus; import static javax.net.ssl.SSLEngineResult.HandshakeStatus.*; import static javax.net.ssl.SSLEngineResult.Status.*; import javax.net.ssl.*; +import static javax.net.ssl.SSLEngineResult.HandshakeStatus.*; +import static javax.net.ssl.SSLEngineResult.Status.*; import org.redkale.util.*; /** @@ -193,6 +195,14 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl read(handler); } + public final void startReadInIOThread(CompletionHandler handler) { + if (inCurrThread()) { + startRead(handler); + } else { + execute(() -> startRead(handler)); + } + } + public final void read(CompletionHandler handler) { if (sslEngine == null) { readImpl(handler); @@ -201,6 +211,14 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl } } + public final void readInIOThread(CompletionHandler handler) { + if (inCurrThread()) { + read(handler); + } else { + execute(() -> read(handler)); + } + } + //src写完才会回调 public final void write(ByteBuffer src, A attachment, CompletionHandler handler) { if (sslEngine == null) { diff --git a/src/main/java/org/redkale/net/Response.java b/src/main/java/org/redkale/net/Response.java index 50e3bdbf1..dac8ae6e2 100644 --- a/src/main/java/org/redkale/net/Response.java +++ b/src/main/java/org/redkale/net/Response.java @@ -228,11 +228,7 @@ public abstract class Response> { AsyncConnection conn = removeChannel(); if (conn != null && conn.protocolCodec != null) { this.responseConsumer.accept(this); - if (conn.inCurrThread()) { - conn.read(conn.protocolCodec); - } else { - conn.execute(() -> conn.read(conn.protocolCodec)); - } + conn.readInIOThread(conn.protocolCodec); } else { Supplier poolSupplier = this.responseSupplier; Consumer poolConsumer = this.responseConsumer; @@ -405,7 +401,11 @@ public abstract class Response> { @Override public void completed(Integer result, A attachment) { - channel.offerBuffer(buffer); + if (buffer != writeBuffer) { + channel.offerBuffer(buffer); + } else { + buffer.clear(); + } if (handler != null) { handler.completed(result, attachment); } @@ -413,7 +413,11 @@ public abstract class Response> { @Override public void failed(Throwable exc, A attachment) { - channel.offerBuffer(buffer); + if (buffer != writeBuffer) { + channel.offerBuffer(buffer); + } else { + buffer.clear(); + } if (handler != null) { handler.failed(exc, attachment); }