Client增加newConnection

This commit is contained in:
redkale
2023-09-07 17:07:54 +08:00
parent 388505c899
commit ddd3d33edc

View File

@@ -299,15 +299,23 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
} }
public final CompletableFuture<C> connect() { public final CompletableFuture<C> connect() {
return connect(true);
}
public final CompletableFuture<C> newConnection() {
return connect(true);
}
private CompletableFuture<C> connect(final boolean pool) {
final int size = this.connArray.length; final int size = this.connArray.length;
WorkThread workThread = WorkThread.currentWorkThread(); WorkThread workThread = WorkThread.currentWorkThread();
final int connIndex = (workThread != null && workThread.threads() == size) ? workThread.index() : (int) Math.abs(connIndexSeq.getAndIncrement()) % size; final int connIndex = (workThread != null && workThread.threads() == size) ? workThread.index() : (int) Math.abs(connIndexSeq.getAndIncrement()) % size;
C cc = (C) this.connArray[connIndex]; C cc = (C) this.connArray[connIndex];
if (cc != null && cc.isOpen()) { if (pool && cc != null && cc.isOpen()) {
return CompletableFuture.completedFuture(cc); return CompletableFuture.completedFuture(cc);
} }
final Queue<CompletableFuture<C>> waitQueue = this.connAcquireWaitings[connIndex]; final Queue<CompletableFuture<C>> waitQueue = this.connAcquireWaitings[connIndex];
if (this.connOpenStates[connIndex].compareAndSet(false, true)) { if (!pool || this.connOpenStates[connIndex].compareAndSet(false, true)) {
CompletableFuture<C> future = group.createClient(tcp, this.address.randomAddress(), readTimeoutSeconds, writeTimeoutSeconds) CompletableFuture<C> future = group.createClient(tcp, this.address.randomAddress(), readTimeoutSeconds, writeTimeoutSeconds)
.thenApply(c -> (C) createClientConnection(connIndex, c).setMaxPipelines(maxPipelines)); .thenApply(c -> (C) createClientConnection(connIndex, c).setMaxPipelines(maxPipelines));
R virtualReq = createVirtualRequestAfterConnect(); R virtualReq = createVirtualRequestAfterConnect();
@@ -325,6 +333,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
return future.thenCompose(c -> { return future.thenCompose(c -> {
return CompletableFuture.supplyAsync(() -> { return CompletableFuture.supplyAsync(() -> {
c.setAuthenticated(true); c.setAuthenticated(true);
if (pool) {
this.connArray[connIndex] = c; this.connArray[connIndex] = c;
CompletableFuture<C> f; CompletableFuture<C> f;
while ((f = waitQueue.poll()) != null) { while ((f = waitQueue.poll()) != null) {
@@ -332,10 +341,11 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
f.complete(c); f.complete(c);
} }
} }
}
return c; return c;
}, c.channel.getWriteIOThread()); }, c.channel.getWriteIOThread());
}).whenComplete((r, t) -> { }).whenComplete((r, t) -> {
if (t != null) { if (pool && t != null) {
this.connOpenStates[connIndex].set(false); this.connOpenStates[connIndex].set(false);
} }
}); });
@@ -348,16 +358,26 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
//指定地址获取连接 //指定地址获取连接
public final CompletableFuture<C> connect(final SocketAddress addr) { public final CompletableFuture<C> connect(final SocketAddress addr) {
return connect(true, addr);
}
//指定地址获取连接
public final CompletableFuture<C> newConnection(final SocketAddress addr) {
return connect(false, addr);
}
//指定地址获取连接
private CompletableFuture<C> connect(final boolean pool, final SocketAddress addr) {
if (addr == null) { if (addr == null) {
return connect(); return connect();
} }
final AddressConnEntry<C> entry = connAddrEntrys.computeIfAbsent(addr, a -> new AddressConnEntry()); final AddressConnEntry<C> entry = connAddrEntrys.computeIfAbsent(addr, a -> new AddressConnEntry());
C ec = entry.connection; C ec = entry.connection;
if (ec != null && ec.isOpen()) { if (pool && ec != null && ec.isOpen()) {
return CompletableFuture.completedFuture(ec); return CompletableFuture.completedFuture(ec);
} }
final Queue<CompletableFuture<C>> waitQueue = entry.connAcquireWaitings; final Queue<CompletableFuture<C>> waitQueue = entry.connAcquireWaitings;
if (entry.connOpenState.compareAndSet(false, true)) { if (!pool || entry.connOpenState.compareAndSet(false, true)) {
CompletableFuture<C> future = group.createClient(tcp, addr, readTimeoutSeconds, writeTimeoutSeconds) CompletableFuture<C> future = group.createClient(tcp, addr, readTimeoutSeconds, writeTimeoutSeconds)
.thenApply(c -> (C) createClientConnection(-1, c).setMaxPipelines(maxPipelines)); .thenApply(c -> (C) createClientConnection(-1, c).setMaxPipelines(maxPipelines));
R virtualReq = createVirtualRequestAfterConnect(); R virtualReq = createVirtualRequestAfterConnect();
@@ -375,6 +395,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
return future.thenCompose(c -> { return future.thenCompose(c -> {
return CompletableFuture.supplyAsync(() -> { return CompletableFuture.supplyAsync(() -> {
c.setAuthenticated(true); c.setAuthenticated(true);
if (pool) {
entry.connection = c; entry.connection = c;
CompletableFuture<C> f; CompletableFuture<C> f;
while ((f = waitQueue.poll()) != null) { while ((f = waitQueue.poll()) != null) {
@@ -382,10 +403,11 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
f.complete(c); f.complete(c);
} }
} }
}
return c; return c;
}, c.channel.getWriteIOThread()); }, c.channel.getWriteIOThread());
}).whenComplete((r, t) -> { }).whenComplete((r, t) -> {
if (t != null) { if (pool && t != null) {
entry.connOpenState.set(false); entry.connOpenState.set(false);
} }
}); });