This commit is contained in:
redkale
2024-09-07 19:55:51 +08:00
parent 426e8e463a
commit 9cb4889a9c
5 changed files with 23 additions and 21 deletions

View File

@@ -34,45 +34,47 @@ public abstract class AsyncGroup {
return new AsyncIOGroup(threadNameFormat, workExecutor, safeBufferPool);
}
public CompletableFuture<AsyncConnection> createTCPClient(final SocketAddress address) {
return createTCPClient(address, 0);
public CompletableFuture<AsyncConnection> createTCPClientConnection(final SocketAddress address) {
return AsyncGroup.this.createTCPClientConnection(address, 0);
}
/**
* 创建TCP连接
*
* @see org.redkale.net.AsyncIOGroup#createTCPClient(java.net.SocketAddress, int)
* @see org.redkale.net.AsyncIOGroup#createTCPClientConnection(java.net.SocketAddress, int)
*
* @param address 地址
* @param connectTimeoutSeconds 连接超时
* @return AsyncConnection
*/
public abstract CompletableFuture<AsyncConnection> createTCPClient(
public abstract CompletableFuture<AsyncConnection> createTCPClientConnection(
SocketAddress address, int connectTimeoutSeconds);
public CompletableFuture<AsyncConnection> createUDPClient(final SocketAddress address) {
return createUDPClient(address, 0);
public CompletableFuture<AsyncConnection> createUDPClientConnection(final SocketAddress address) {
return AsyncGroup.this.createUDPClientConnection(address, 0);
}
/**
* 创建UDP连接
*
* @see org.redkale.net.AsyncIOGroup#createUDPClient(java.net.SocketAddress, int)
* @see org.redkale.net.AsyncIOGroup#createUDPClientConnection(java.net.SocketAddress, int)
*
* @param address 地址
* @param connectTimeoutSeconds 连接超时
* @return AsyncConnection
*/
public abstract CompletableFuture<AsyncConnection> createUDPClient(
public abstract CompletableFuture<AsyncConnection> createUDPClientConnection(
SocketAddress address, int connectTimeoutSeconds);
public CompletableFuture<AsyncConnection> createClient(final boolean tcp, final SocketAddress address) {
return tcp ? createTCPClient(address) : createUDPClient(address);
public CompletableFuture<AsyncConnection> createClientConnection(final boolean tcp, final SocketAddress address) {
return tcp ? createTCPClientConnection(address) : createUDPClientConnection(address);
}
public CompletableFuture<AsyncConnection> createClient(
public CompletableFuture<AsyncConnection> createClientConnection(
boolean tcp, SocketAddress address, int connectTimeoutSeconds) {
return tcp ? createTCPClient(address, connectTimeoutSeconds) : createUDPClient(address, connectTimeoutSeconds);
return tcp
? AsyncGroup.this.createTCPClientConnection(address, connectTimeoutSeconds)
: createUDPClientConnection(address, connectTimeoutSeconds);
}
/**

View File

@@ -246,7 +246,7 @@ public class AsyncIOGroup extends AsyncGroup {
}
@Override
public CompletableFuture<AsyncConnection> createTCPClient(SocketAddress address, int connectTimeoutSeconds) {
public CompletableFuture<AsyncConnection> createTCPClientConnection(SocketAddress address, int connectTimeoutSeconds) {
Objects.requireNonNull(address);
AsyncNioTcpConnection conn;
try {
@@ -322,7 +322,7 @@ public class AsyncIOGroup extends AsyncGroup {
}
@Override
public CompletableFuture<AsyncConnection> createUDPClient(SocketAddress address, int connectTimeoutSeconds) {
public CompletableFuture<AsyncConnection> createUDPClientConnection(SocketAddress address, int connectTimeoutSeconds) {
AsyncNioUdpConnection conn;
try {
conn = newUDPClientConnection(address);

View File

@@ -289,14 +289,14 @@ public final class Transport {
try {
if (!tcp) { // UDP
SocketAddress udpaddr = rand ? nodes[0].address : addr;
return asyncGroup.createUDPClient(udpaddr, 6);
return asyncGroup.createUDPClientConnection(udpaddr, 6);
}
if (!rand) { // 指定地址
TransportNode node = findTransportNode(addr);
if (node == null) {
return asyncGroup.createTCPClient(addr, 6);
return asyncGroup.createTCPClientConnection(addr, 6);
}
return pollAsync(node, addr, () -> asyncGroup.createTCPClient(addr, 6));
return pollAsync(node, addr, () -> asyncGroup.createTCPClientConnection(addr, 6));
}
// ---------------------随机取地址------------------------
@@ -323,7 +323,7 @@ public final class Transport {
}
}
return pollAsync(one, one.getAddress(), () -> {
return asyncGroup.createTCPClient(one.address, 6).whenComplete((c, t) -> {
return asyncGroup.createTCPClientConnection(one.address, 6).whenComplete((c, t) -> {
one.disabletime = t == null ? 0 : System.currentTimeMillis();
});
});
@@ -345,7 +345,7 @@ public final class Transport {
if (future.isDone()) {
return future;
}
asyncGroup.createTCPClient(node.address, 6).whenComplete((c, t) -> {
asyncGroup.createTCPClientConnection(node.address, 6).whenComplete((c, t) -> {
if (c != null && !future.complete(c)) {
node.connQueue.offer(c);
}

View File

@@ -354,7 +354,7 @@ public class TransportFactory {
continue; // 可用
}
CompletableFuture<AsyncConnection> future =
Utility.orTimeout(asyncGroup.createTCPClient(node.address), null, 2, TimeUnit.SECONDS);
Utility.orTimeout(asyncGroup.createTCPClientConnection(node.address), null, 2, TimeUnit.SECONDS);
future.whenComplete((r, t) -> {
node.disabletime = t == null ? 0 : System.currentTimeMillis();
if (r != null) {

View File

@@ -347,7 +347,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
}
final Queue<CompletableFuture<C>> waitQueue = entry.connAcquireWaitings;
if (!pool || entry.connOpenState.compareAndSet(false, true)) {
CompletableFuture<C> future = group.createClient(tcp, addr, connectTimeoutSeconds)
CompletableFuture<C> future = group.createClientConnection(tcp, addr, connectTimeoutSeconds)
.thenApply(c ->
(C) createClientConnection(c).setConnEntry(entry).setMaxPipelines(maxPipelines));
R virtualReq = createVirtualRequestAfterConnect();