From 4478c521e667196df7badea0840947ec8596ed4c Mon Sep 17 00:00:00 2001 From: redkale Date: Sun, 9 Jun 2024 07:35:50 +0800 Subject: [PATCH] AsyncGroup --- .../java/org/redkale/net/AsyncConnection.java | 8 +--- src/main/java/org/redkale/net/AsyncGroup.java | 44 +++++++++++++++++++ .../org/redkale/net/AsyncNioConnection.java | 4 +- .../redkale/net/client/ClientConnection.java | 36 ++++++++------- .../org/redkale/net/client/ClientFuture.java | 1 + 5 files changed, 69 insertions(+), 24 deletions(-) diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index cf5203c9f..fb835fb6a 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -5,9 +5,6 @@ */ package org.redkale.net; -import static javax.net.ssl.SSLEngineResult.HandshakeStatus.*; -import static javax.net.ssl.SSLEngineResult.Status.*; - import java.io.IOException; import java.net.*; import java.nio.ByteBuffer; @@ -18,6 +15,8 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.*; import javax.net.ssl.*; import javax.net.ssl.SSLEngineResult.HandshakeStatus; +import static javax.net.ssl.SSLEngineResult.HandshakeStatus.*; +import static javax.net.ssl.SSLEngineResult.Status.*; import org.redkale.util.*; /** @@ -204,9 +203,6 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { return ioWriteThread; } - @Override - public abstract boolean isOpen(); - public abstract boolean isTCP(); public abstract boolean shutdownInput(); diff --git a/src/main/java/org/redkale/net/AsyncGroup.java b/src/main/java/org/redkale/net/AsyncGroup.java index e887e10d8..970b131fa 100644 --- a/src/main/java/org/redkale/net/AsyncGroup.java +++ b/src/main/java/org/redkale/net/AsyncGroup.java @@ -38,6 +38,17 @@ public abstract class AsyncGroup { return createTCPClient(address, 0, 0, 0); } + /** + * 创建TCP连接 + * + * @see org.redkale.net.AsyncIOGroup#createTCPClient(java.net.SocketAddress, int, int, int) + * + * @param address 地址 + * @param connectTimeoutSeconds 连接超时 + * @param readTimeoutSeconds 读超时 + * @param writeTimeoutSeconds 写超时 + * @return AsyncConnection + */ public abstract CompletableFuture createTCPClient( final SocketAddress address, final int connectTimeoutSeconds, @@ -48,6 +59,17 @@ public abstract class AsyncGroup { return createUDPClient(address, 0, 0, 0); } + /** + * 创建UDP连接 + * + * @see org.redkale.net.AsyncIOGroup#createUDPClient(java.net.SocketAddress, int, int, int) + * + * @param address 地址 + * @param connectTimeoutSeconds 连接超时 + * @param readTimeoutSeconds 读超时 + * @param writeTimeoutSeconds 写超时 + * @return AsyncConnection + */ public abstract CompletableFuture createUDPClient( final SocketAddress address, final int connectTimeoutSeconds, @@ -69,9 +91,31 @@ public abstract class AsyncGroup { : createUDPClient(address, connectTimeoutSeconds, readTimeoutSeconds, writeTimeoutSeconds); } + /** + * 设置超时回调 + * + * @see org.redkale.net.AsyncIOGroup#scheduleTimeout(java.lang.Runnable, long, java.util.concurrent.TimeUnit) + * + * @param callable 回调函数 + * @param delay 延迟时长 + * @param unit 时长单位 + * @return ScheduledFuture + */ public abstract ScheduledFuture scheduleTimeout(Runnable callable, long delay, TimeUnit unit); + /** + * 启动 + * @see org.redkale.net.AsyncIOGroup#start() + * + * @return AsyncGroup + */ public abstract AsyncGroup start(); + /** + * 关闭 + * @see org.redkale.net.AsyncIOGroup#close() + * + * @return AsyncGroup + */ public abstract AsyncGroup close(); } diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index b0812f13f..c3e63367d 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -39,7 +39,7 @@ abstract class AsyncNioConnection extends AsyncConnection { protected final AsyncNioCompletionHandler readTimeoutCompletionHandler = new AsyncNioCompletionHandler<>(true, this); - //值大于0才有效 + // 值大于0才有效 protected int readTimeoutSeconds; protected ByteBuffer readByteBuffer; @@ -52,7 +52,7 @@ abstract class AsyncNioConnection extends AsyncConnection { protected final AsyncNioCompletionHandler writeTimeoutCompletionHandler = new AsyncNioCompletionHandler<>(false, this); - //值大于0才有效 + // 值大于0才有效 protected int writeTimeoutSeconds; protected byte[] writeByteTuple1Array; diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index 7d7504aba..95d397c79 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -85,6 +85,8 @@ public abstract class ClientConnection> respFutureMap = new ConcurrentHashMap<>(); + private final AtomicBoolean closed = new AtomicBoolean(); + Iterator> currRespIterator; // 必须在调用decodeMessages之前重置为null private int maxPipelines; // 最大并行处理数 @@ -325,23 +327,25 @@ public abstract class ClientConnection future.completeExceptionally(e)); + if (closed.compareAndSet(false, true)) { + channel.offerWriteBuffer(writeBuffer); + channel.dispose(); + Throwable e = exc == null ? new ClosedChannelException() : exc; + CompletableFuture f; + respWaitingCounter.reset(); + WorkThread thread = channel.getReadIOThread(); + if (!respFutureQueue.isEmpty()) { + while ((f = respFutureQueue.poll()) != null) { + CompletableFuture future = f; + thread.runWork(() -> future.completeExceptionally(e)); + } + } + if (!respFutureMap.isEmpty()) { + respFutureMap.forEach((key, future) -> { + respFutureMap.remove(key); + thread.runWork(() -> future.completeExceptionally(e)); + }); } - } - if (!respFutureMap.isEmpty()) { - respFutureMap.forEach((key, future) -> { - respFutureMap.remove(key); - thread.runWork(() -> future.completeExceptionally(e)); - }); } } diff --git a/src/main/java/org/redkale/net/client/ClientFuture.java b/src/main/java/org/redkale/net/client/ClientFuture.java index 039e3ca85..daf12bedd 100644 --- a/src/main/java/org/redkale/net/client/ClientFuture.java +++ b/src/main/java/org/redkale/net/client/ClientFuture.java @@ -90,6 +90,7 @@ public class ClientFuture extends CompletableFuture< } Traces.removeTraceid(); }); + conn.dispose(ex); } @Override