From 8c0811209766d60873d12387c37853e5f08d9418 Mon Sep 17 00:00:00 2001 From: redkale Date: Tue, 27 Jun 2023 17:55:02 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96Response.completeInIOThread?= =?UTF-8?q?=E9=87=8C=E7=9A=84readRegister?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/redkale/net/AsyncNioConnection.java | 38 +++++++++++-------- src/main/java/org/redkale/net/Response.java | 2 +- 2 files changed, 23 insertions(+), 17 deletions(-) diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index 9e68f0511..6802c9ded 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -127,28 +127,34 @@ abstract class AsyncNioConnection extends AsyncConnection { handler.failed(new NotYetConnectedException(), null); return; } - if (this.readPending) { - handler.failed(new ReadPendingException(), null); - return; - } - this.readPending = true; - if (this.readTimeoutSeconds > 0) { - AsyncNioCompletionHandler newHandler = this.readTimeoutCompletionHandler; - newHandler.handler(handler, this.readByteBuffer); // new AsyncNioCompletionHandler(handler, this.readByteBuffer); - this.readCompletionHandler = newHandler; - newHandler.timeoutFuture = ioGroup.scheduleTimeout(newHandler, this.readTimeoutSeconds, TimeUnit.SECONDS); + if (handler != protocolCodec) { + if (this.readPending) { + handler.failed(new ReadPendingException(), null); + return; + } + this.readPending = true; + if (this.readTimeoutSeconds > 0) { + AsyncNioCompletionHandler newHandler = this.readTimeoutCompletionHandler; + newHandler.handler(handler, this.readByteBuffer); // new AsyncNioCompletionHandler(handler, this.readByteBuffer); + this.readCompletionHandler = newHandler; + newHandler.timeoutFuture = ioGroup.scheduleTimeout(newHandler, this.readTimeoutSeconds, TimeUnit.SECONDS); + } else { + this.readCompletionHandler = handler; + } } else { this.readCompletionHandler = handler; + this.readPending = true; } - try { if (readKey == null) { ioReadThread.register(selector -> { - try { - readKey = implRegister(selector, SelectionKey.OP_READ); - readKey.attach(this); - } catch (ClosedChannelException e) { - handleRead(0, e); + if (readKey == null) { + try { + readKey = implRegister(selector, SelectionKey.OP_READ); + readKey.attach(this); + } catch (ClosedChannelException e) { + handleRead(0, e); + } } }); } else { diff --git a/src/main/java/org/redkale/net/Response.java b/src/main/java/org/redkale/net/Response.java index b26842773..3e242dcbb 100644 --- a/src/main/java/org/redkale/net/Response.java +++ b/src/main/java/org/redkale/net/Response.java @@ -306,7 +306,7 @@ public abstract class Response> { AsyncConnection conn = removeChannel(); if (conn != null && conn.protocolCodec != null) { this.responseConsumer.accept(this); - conn.readRegisterInIOThreadSafe(conn.protocolCodec); + conn.readRegister(conn.protocolCodec); } else { Supplier poolSupplier = this.responseSupplier; Consumer poolConsumer = this.responseConsumer;