Client.connect
This commit is contained in:
@@ -83,6 +83,8 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
|
|||||||
protected int writeTimeoutSeconds;
|
protected int writeTimeoutSeconds;
|
||||||
//------------------ LocalThreadMode模式 ------------------
|
//------------------ LocalThreadMode模式 ------------------
|
||||||
|
|
||||||
|
final CopyOnWriteArrayList<C> localConnList = new CopyOnWriteArrayList<>();
|
||||||
|
|
||||||
final ThreadLocal<C> localConnection = new ThreadLocal();
|
final ThreadLocal<C> localConnection = new ThreadLocal();
|
||||||
|
|
||||||
//------------------ 可选项 ------------------
|
//------------------ 可选项 ------------------
|
||||||
@@ -282,7 +284,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
|
|||||||
return conn;
|
return conn;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected CompletableFuture<C> connect() {
|
public final CompletableFuture<C> connect() {
|
||||||
if (isThreadLocalConnMode()) {
|
if (isThreadLocalConnMode()) {
|
||||||
C conn = localConnection.get();
|
C conn = localConnection.get();
|
||||||
if (conn == null || !conn.isOpen()) {
|
if (conn == null || !conn.isOpen()) {
|
||||||
@@ -292,6 +294,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
|
|||||||
return CompletableFuture.failedFuture(e);
|
return CompletableFuture.failedFuture(e);
|
||||||
}
|
}
|
||||||
localConnection.set(conn);
|
localConnection.set(conn);
|
||||||
|
localConnList.add(conn);
|
||||||
}
|
}
|
||||||
return CompletableFuture.completedFuture(conn);
|
return CompletableFuture.completedFuture(conn);
|
||||||
} else {
|
} else {
|
||||||
@@ -299,7 +302,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected C connect1() {
|
private C connect1() {
|
||||||
CompletableFuture<C> future = group.createClient(tcp, this.address.randomAddress(), readTimeoutSeconds, writeTimeoutSeconds)
|
CompletableFuture<C> future = group.createClient(tcp, this.address.randomAddress(), readTimeoutSeconds, writeTimeoutSeconds)
|
||||||
.thenApply(c -> (C) createConnection(-2, c).setMaxPipelines(maxPipelines));
|
.thenApply(c -> (C) createConnection(-2, c).setMaxPipelines(maxPipelines));
|
||||||
R virtualReq = createVirtualRequestAfterConnect();
|
R virtualReq = createVirtualRequestAfterConnect();
|
||||||
@@ -312,7 +315,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
|
|||||||
return future.thenApply(c -> (C) c.setAuthenticated(true)).join();
|
return future.thenApply(c -> (C) c.setAuthenticated(true)).join();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected CompletableFuture<C> connect0() {
|
private CompletableFuture<C> connect0() {
|
||||||
final int size = this.connArray.length;
|
final int size = this.connArray.length;
|
||||||
WorkThread workThread = WorkThread.currWorkThread();
|
WorkThread workThread = WorkThread.currWorkThread();
|
||||||
final int connIndex = (workThread != null && workThread.threads() == size) ? workThread.index() : (int) Math.abs(connIndexSeq.getAndIncrement()) % size;
|
final int connIndex = (workThread != null && workThread.threads() == size) ? workThread.index() : (int) Math.abs(connIndexSeq.getAndIncrement()) % size;
|
||||||
@@ -354,9 +357,9 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
|
|||||||
}
|
}
|
||||||
|
|
||||||
//指定地址获取连接
|
//指定地址获取连接
|
||||||
protected CompletableFuture<C> connect(final SocketAddress addr) {
|
public final CompletableFuture<C> connect(final SocketAddress addr) {
|
||||||
if (addr == null) {
|
if (addr == null) {
|
||||||
return connect();
|
return connect0();
|
||||||
}
|
}
|
||||||
final AddressConnEntry<C> entry = connAddrEntrys.computeIfAbsent(addr, a -> new AddressConnEntry());
|
final AddressConnEntry<C> entry = connAddrEntrys.computeIfAbsent(addr, a -> new AddressConnEntry());
|
||||||
C ec = entry.connection;
|
C ec = entry.connection;
|
||||||
|
|||||||
@@ -30,8 +30,8 @@ import org.redkale.util.ByteArray;
|
|||||||
*/
|
*/
|
||||||
public abstract class ClientConnection<R extends ClientRequest, P> implements Consumer<AsyncConnection> {
|
public abstract class ClientConnection<R extends ClientRequest, P> implements Consumer<AsyncConnection> {
|
||||||
|
|
||||||
//=-1 表示连接放在connAddrEntrys存储
|
|
||||||
//=-2 表示连接放在ThreadLocal存储
|
//=-2 表示连接放在ThreadLocal存储
|
||||||
|
//=-1 表示连接放在connAddrEntrys存储
|
||||||
//>=0 表示connArray的下坐标,从0开始
|
//>=0 表示connArray的下坐标,从0开始
|
||||||
protected final int index;
|
protected final int index;
|
||||||
|
|
||||||
@@ -186,8 +186,10 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
|
|||||||
if (index >= 0) {
|
if (index >= 0) {
|
||||||
client.connOpenStates[index].set(false);
|
client.connOpenStates[index].set(false);
|
||||||
client.connArray[index] = null; //必须connOpenStates之后
|
client.connArray[index] = null; //必须connOpenStates之后
|
||||||
} else if (connEntry != null) {
|
} else if (connEntry != null) { //index=-1
|
||||||
connEntry.connOpenState.set(false);
|
connEntry.connOpenState.set(false);
|
||||||
|
} else {//index=-2
|
||||||
|
client.localConnList.remove(this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -3,7 +3,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.redkale.net.sncp;
|
package org.redkale.net.sncp;
|
||||||
|
|
||||||
import java.net.*;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import org.redkale.net.*;
|
import org.redkale.net.*;
|
||||||
@@ -46,11 +46,6 @@ public class SncpClient extends Client<SncpClientConnection, SncpClientRequest,
|
|||||||
return seqno.incrementAndGet();
|
return seqno.incrementAndGet();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected CompletableFuture<SncpClientConnection> connect(SocketAddress addr) {
|
|
||||||
return super.connect(addr);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected CompletableFuture<SncpClientResult> writeChannel(ClientConnection conn, SncpClientRequest request) {
|
protected CompletableFuture<SncpClientResult> writeChannel(ClientConnection conn, SncpClientRequest request) {
|
||||||
return super.writeChannel(conn, request);
|
return super.writeChannel(conn, request);
|
||||||
|
|||||||
Reference in New Issue
Block a user