优化Response.completeInIOThread里的readRegister

This commit is contained in:
redkale
2023-06-27 17:55:02 +08:00
parent 5e6b2fb86e
commit 8c08112097
2 changed files with 23 additions and 17 deletions

View File

@@ -127,28 +127,34 @@ abstract class AsyncNioConnection extends AsyncConnection {
handler.failed(new NotYetConnectedException(), null);
return;
}
if (this.readPending) {
handler.failed(new ReadPendingException(), null);
return;
}
this.readPending = true;
if (this.readTimeoutSeconds > 0) {
AsyncNioCompletionHandler newHandler = this.readTimeoutCompletionHandler;
newHandler.handler(handler, this.readByteBuffer); // new AsyncNioCompletionHandler(handler, this.readByteBuffer);
this.readCompletionHandler = newHandler;
newHandler.timeoutFuture = ioGroup.scheduleTimeout(newHandler, this.readTimeoutSeconds, TimeUnit.SECONDS);
if (handler != protocolCodec) {
if (this.readPending) {
handler.failed(new ReadPendingException(), null);
return;
}
this.readPending = true;
if (this.readTimeoutSeconds > 0) {
AsyncNioCompletionHandler newHandler = this.readTimeoutCompletionHandler;
newHandler.handler(handler, this.readByteBuffer); // new AsyncNioCompletionHandler(handler, this.readByteBuffer);
this.readCompletionHandler = newHandler;
newHandler.timeoutFuture = ioGroup.scheduleTimeout(newHandler, this.readTimeoutSeconds, TimeUnit.SECONDS);
} else {
this.readCompletionHandler = handler;
}
} else {
this.readCompletionHandler = handler;
this.readPending = true;
}
try {
if (readKey == null) {
ioReadThread.register(selector -> {
try {
readKey = implRegister(selector, SelectionKey.OP_READ);
readKey.attach(this);
} catch (ClosedChannelException e) {
handleRead(0, e);
if (readKey == null) {
try {
readKey = implRegister(selector, SelectionKey.OP_READ);
readKey.attach(this);
} catch (ClosedChannelException e) {
handleRead(0, e);
}
}
});
} else {

View File

@@ -306,7 +306,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
AsyncConnection conn = removeChannel();
if (conn != null && conn.protocolCodec != null) {
this.responseConsumer.accept(this);
conn.readRegisterInIOThreadSafe(conn.protocolCodec);
conn.readRegister(conn.protocolCodec);
} else {
Supplier<Response> poolSupplier = this.responseSupplier;
Consumer<Response> poolConsumer = this.responseConsumer;