AsyncGroup

This commit is contained in:
redkale
2024-06-09 07:35:50 +08:00
parent 36fb3391cc
commit 4478c521e6
5 changed files with 69 additions and 24 deletions

View File

@@ -5,9 +5,6 @@
*/
package org.redkale.net;
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.*;
import static javax.net.ssl.SSLEngineResult.Status.*;
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
@@ -18,6 +15,8 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.function.*;
import javax.net.ssl.*;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.*;
import static javax.net.ssl.SSLEngineResult.Status.*;
import org.redkale.util.*;
/**
@@ -204,9 +203,6 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
return ioWriteThread;
}
@Override
public abstract boolean isOpen();
public abstract boolean isTCP();
public abstract boolean shutdownInput();

View File

@@ -38,6 +38,17 @@ public abstract class AsyncGroup {
return createTCPClient(address, 0, 0, 0);
}
/**
* 创建TCP连接
*
* @see org.redkale.net.AsyncIOGroup#createTCPClient(java.net.SocketAddress, int, int, int)
*
* @param address 地址
* @param connectTimeoutSeconds 连接超时
* @param readTimeoutSeconds 读超时
* @param writeTimeoutSeconds 写超时
* @return AsyncConnection
*/
public abstract CompletableFuture<AsyncConnection> createTCPClient(
final SocketAddress address,
final int connectTimeoutSeconds,
@@ -48,6 +59,17 @@ public abstract class AsyncGroup {
return createUDPClient(address, 0, 0, 0);
}
/**
* 创建UDP连接
*
* @see org.redkale.net.AsyncIOGroup#createUDPClient(java.net.SocketAddress, int, int, int)
*
* @param address 地址
* @param connectTimeoutSeconds 连接超时
* @param readTimeoutSeconds 读超时
* @param writeTimeoutSeconds 写超时
* @return AsyncConnection
*/
public abstract CompletableFuture<AsyncConnection> createUDPClient(
final SocketAddress address,
final int connectTimeoutSeconds,
@@ -69,9 +91,31 @@ public abstract class AsyncGroup {
: createUDPClient(address, connectTimeoutSeconds, readTimeoutSeconds, writeTimeoutSeconds);
}
/**
* 设置超时回调
*
* @see org.redkale.net.AsyncIOGroup#scheduleTimeout(java.lang.Runnable, long, java.util.concurrent.TimeUnit)
*
* @param callable 回调函数
* @param delay 延迟时长
* @param unit 时长单位
* @return ScheduledFuture
*/
public abstract ScheduledFuture scheduleTimeout(Runnable callable, long delay, TimeUnit unit);
/**
* 启动
* @see org.redkale.net.AsyncIOGroup#start()
*
* @return AsyncGroup
*/
public abstract AsyncGroup start();
/**
* 关闭
* @see org.redkale.net.AsyncIOGroup#close()
*
* @return AsyncGroup
*/
public abstract AsyncGroup close();
}

View File

@@ -39,7 +39,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
protected final AsyncNioCompletionHandler<ByteBuffer> readTimeoutCompletionHandler =
new AsyncNioCompletionHandler<>(true, this);
//值大于0才有效
// 值大于0才有效
protected int readTimeoutSeconds;
protected ByteBuffer readByteBuffer;
@@ -52,7 +52,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
protected final AsyncNioCompletionHandler<Object> writeTimeoutCompletionHandler =
new AsyncNioCompletionHandler<>(false, this);
//值大于0才有效
// 值大于0才有效
protected int writeTimeoutSeconds;
protected byte[] writeByteTuple1Array;

View File

@@ -85,6 +85,8 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
// respFutureQueue、respFutureMap二选一, key: requestid SPSC模式
private final ConcurrentHashMap<Serializable, ClientFuture<R, P>> respFutureMap = new ConcurrentHashMap<>();
private final AtomicBoolean closed = new AtomicBoolean();
Iterator<ClientFuture<R, P>> currRespIterator; // 必须在调用decodeMessages之前重置为null
private int maxPipelines; // 最大并行处理数
@@ -325,23 +327,25 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
}
public void dispose(Throwable exc) {
channel.offerWriteBuffer(writeBuffer);
channel.dispose();
Throwable e = exc == null ? new ClosedChannelException() : exc;
CompletableFuture f;
respWaitingCounter.reset();
WorkThread thread = channel.getReadIOThread();
if (!respFutureQueue.isEmpty()) {
while ((f = respFutureQueue.poll()) != null) {
CompletableFuture future = f;
thread.runWork(() -> future.completeExceptionally(e));
if (closed.compareAndSet(false, true)) {
channel.offerWriteBuffer(writeBuffer);
channel.dispose();
Throwable e = exc == null ? new ClosedChannelException() : exc;
CompletableFuture f;
respWaitingCounter.reset();
WorkThread thread = channel.getReadIOThread();
if (!respFutureQueue.isEmpty()) {
while ((f = respFutureQueue.poll()) != null) {
CompletableFuture future = f;
thread.runWork(() -> future.completeExceptionally(e));
}
}
if (!respFutureMap.isEmpty()) {
respFutureMap.forEach((key, future) -> {
respFutureMap.remove(key);
thread.runWork(() -> future.completeExceptionally(e));
});
}
}
if (!respFutureMap.isEmpty()) {
respFutureMap.forEach((key, future) -> {
respFutureMap.remove(key);
thread.runWork(() -> future.completeExceptionally(e));
});
}
}

View File

@@ -90,6 +90,7 @@ public class ClientFuture<R extends ClientRequest, T> extends CompletableFuture<
}
Traces.removeTraceid();
});
conn.dispose(ex);
}
@Override