优化AsyncConnection

This commit is contained in:
Redkale
2023-01-01 19:52:44 +08:00
parent 1c1211a298
commit 6f6c3f70ff
2 changed files with 29 additions and 7 deletions

View File

@@ -16,6 +16,8 @@ import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.*;
import static javax.net.ssl.SSLEngineResult.Status.*;
import javax.net.ssl.*;
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.*;
import static javax.net.ssl.SSLEngineResult.Status.*;
import org.redkale.util.*;
/**
@@ -193,6 +195,14 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
read(handler);
}
public final void startReadInIOThread(CompletionHandler<Integer, ByteBuffer> handler) {
if (inCurrThread()) {
startRead(handler);
} else {
execute(() -> startRead(handler));
}
}
public final void read(CompletionHandler<Integer, ByteBuffer> handler) {
if (sslEngine == null) {
readImpl(handler);
@@ -201,6 +211,14 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
}
}
public final void readInIOThread(CompletionHandler<Integer, ByteBuffer> handler) {
if (inCurrThread()) {
read(handler);
} else {
execute(() -> read(handler));
}
}
//src写完才会回调
public final <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
if (sslEngine == null) {

View File

@@ -228,11 +228,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
AsyncConnection conn = removeChannel();
if (conn != null && conn.protocolCodec != null) {
this.responseConsumer.accept(this);
if (conn.inCurrThread()) {
conn.read(conn.protocolCodec);
} else {
conn.execute(() -> conn.read(conn.protocolCodec));
}
conn.readInIOThread(conn.protocolCodec);
} else {
Supplier<Response> poolSupplier = this.responseSupplier;
Consumer<Response> poolConsumer = this.responseConsumer;
@@ -405,7 +401,11 @@ public abstract class Response<C extends Context, R extends Request<C>> {
@Override
public void completed(Integer result, A attachment) {
channel.offerBuffer(buffer);
if (buffer != writeBuffer) {
channel.offerBuffer(buffer);
} else {
buffer.clear();
}
if (handler != null) {
handler.completed(result, attachment);
}
@@ -413,7 +413,11 @@ public abstract class Response<C extends Context, R extends Request<C>> {
@Override
public void failed(Throwable exc, A attachment) {
channel.offerBuffer(buffer);
if (buffer != writeBuffer) {
channel.offerBuffer(buffer);
} else {
buffer.clear();
}
if (handler != null) {
handler.failed(exc, attachment);
}