diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index 5d55cfe70..a8b3163da 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -235,6 +235,8 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { public abstract void setWriteTimeoutSeconds(int writeTimeoutSeconds); + protected abstract void readRegisterImpl(CompletionHandler handler); + protected abstract void readImpl(CompletionHandler handler); //src写完才会回调 @@ -255,6 +257,22 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } } + public final void readRegister(CompletionHandler handler) { + if (sslEngine == null) { + readRegisterImpl(handler); + } else { + sslReadRegisterImpl(false, handler); + } + } + + public final void readRegisterInIOThread(CompletionHandler handler) { + if (inCurrReadThread()) { + readRegister(handler); + } else { + executeRead(() -> readRegister(handler)); + } + } + public final void read(CompletionHandler handler) { if (sslEngine == null) { readImpl(handler); @@ -908,7 +926,15 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } protected void sslReadImpl(final boolean handshake, final CompletionHandler handler) { - readImpl(new CompletionHandler() { + readImpl(createSslCompletionHandler(handshake, handler)); + } + + protected void sslReadRegisterImpl(final boolean handshake, final CompletionHandler handler) { + readRegisterImpl(createSslCompletionHandler(handshake, handler)); + } + + private CompletionHandler createSslCompletionHandler(final boolean handshake, final CompletionHandler handler) { + return new CompletionHandler() { @Override public void completed(Integer count, ByteBuffer attachment) { @@ -951,7 +977,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { public void failed(Throwable t, ByteBuffer attachment) { handler.failed(t, attachment); } - }); + }; } //加密ssl内容数据 diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index ce3d8094e..10903f1d1 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -127,6 +127,45 @@ abstract class AsyncNioConnection extends AsyncConnection { read(handler); } + @Override + protected final void readRegisterImpl(CompletionHandler handler) { + Objects.requireNonNull(handler); + if (!this.isConnected()) { + handler.failed(new NotYetConnectedException(), null); + return; + } + if (this.readPending) { + handler.failed(new ReadPendingException(), null); + return; + } + this.readPending = true; + if (this.readTimeoutSeconds > 0) { + AsyncNioCompletionHandler newHandler = this.readTimeoutCompletionHandler; + newHandler.handler(handler, this.readByteBuffer); // new AsyncNioCompletionHandler(handler, this.readByteBuffer); + this.readCompletionHandler = newHandler; + newHandler.timeoutFuture = ioGroup.scheduleTimeout(newHandler, this.readTimeoutSeconds, TimeUnit.SECONDS); + } else { + this.readCompletionHandler = handler; + } + + try { + if (readKey == null) { + ioReadThread.register(selector -> { + try { + readKey = implRegister(selector, SelectionKey.OP_READ); + readKey.attach(this); + } catch (ClosedChannelException e) { + handleRead(0, e); + } + }); + } else { + ioReadThread.interestOpsOr(readKey, SelectionKey.OP_READ); + } + } catch (Exception e) { + handleRead(0, e); + } + } + @Override public void readImpl(CompletionHandler handler) { Objects.requireNonNull(handler); diff --git a/src/main/java/org/redkale/net/client/Client.java b/src/main/java/org/redkale/net/client/Client.java index 8b12554ef..086800b2e 100644 --- a/src/main/java/org/redkale/net/client/Client.java +++ b/src/main/java/org/redkale/net/client/Client.java @@ -278,7 +278,7 @@ public abstract class Client, R extends ClientR if (virtualReq != null) { future = future.thenCompose(conn -> conn.writeVirtualRequest(virtualReq).thenApply(v -> conn)); } else { - future = future.thenApply(conn -> (C) conn.readChannel()); + future = future.thenApply(conn -> (C) conn.readRegisterChannel()); } if (authenticate != null) { future = future.thenCompose(authenticate); @@ -323,7 +323,7 @@ public abstract class Client, R extends ClientR if (virtualReq != null) { future = future.thenCompose(conn -> conn.writeVirtualRequest(virtualReq).thenApply(v -> conn)); } else { - future = future.thenApply(conn -> (C) conn.readChannel()); + future = future.thenApply(conn -> (C) conn.readRegisterChannel()); } if (authenticate != null) { future = future.thenCompose(authenticate); diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index 0ca75ba37..723f8adab 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -98,7 +98,7 @@ public abstract class ClientConnection implements Co } ClientFuture respFuture = createClientFuture(request); respFutureQueue.offer(respFuture); - readChannel(); + readRegisterChannel(); return respFuture; } @@ -109,8 +109,8 @@ public abstract class ClientConnection implements Co return new ClientFuture(this, request); } - protected ClientConnection readChannel() { - channel.readInIOThread(codec); + protected ClientConnection readRegisterChannel() { + channel.readRegisterInIOThread(codec); return this; }