diff --git a/src/main/java/org/redkale/net/client/Client.java b/src/main/java/org/redkale/net/client/Client.java index 24c416fc6..4bd40e622 100644 --- a/src/main/java/org/redkale/net/client/Client.java +++ b/src/main/java/org/redkale/net/client/Client.java @@ -83,6 +83,8 @@ public abstract class Client, R extends ClientR protected int writeTimeoutSeconds; //------------------ LocalThreadMode模式 ------------------ + final CopyOnWriteArrayList localConnList = new CopyOnWriteArrayList<>(); + final ThreadLocal localConnection = new ThreadLocal(); //------------------ 可选项 ------------------ @@ -282,7 +284,7 @@ public abstract class Client, R extends ClientR return conn; } - protected CompletableFuture connect() { + public final CompletableFuture connect() { if (isThreadLocalConnMode()) { C conn = localConnection.get(); if (conn == null || !conn.isOpen()) { @@ -292,6 +294,7 @@ public abstract class Client, R extends ClientR return CompletableFuture.failedFuture(e); } localConnection.set(conn); + localConnList.add(conn); } return CompletableFuture.completedFuture(conn); } else { @@ -299,7 +302,7 @@ public abstract class Client, R extends ClientR } } - protected C connect1() { + private C connect1() { CompletableFuture future = group.createClient(tcp, this.address.randomAddress(), readTimeoutSeconds, writeTimeoutSeconds) .thenApply(c -> (C) createConnection(-2, c).setMaxPipelines(maxPipelines)); R virtualReq = createVirtualRequestAfterConnect(); @@ -312,7 +315,7 @@ public abstract class Client, R extends ClientR return future.thenApply(c -> (C) c.setAuthenticated(true)).join(); } - protected CompletableFuture connect0() { + private CompletableFuture connect0() { final int size = this.connArray.length; WorkThread workThread = WorkThread.currWorkThread(); final int connIndex = (workThread != null && workThread.threads() == size) ? workThread.index() : (int) Math.abs(connIndexSeq.getAndIncrement()) % size; @@ -354,9 +357,9 @@ public abstract class Client, R extends ClientR } //指定地址获取连接 - protected CompletableFuture connect(final SocketAddress addr) { + public final CompletableFuture connect(final SocketAddress addr) { if (addr == null) { - return connect(); + return connect0(); } final AddressConnEntry entry = connAddrEntrys.computeIfAbsent(addr, a -> new AddressConnEntry()); C ec = entry.connection; diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index ae7421f56..cd95559dc 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -30,8 +30,8 @@ import org.redkale.util.ByteArray; */ public abstract class ClientConnection implements Consumer { - //=-1 表示连接放在connAddrEntrys存储 //=-2 表示连接放在ThreadLocal存储 + //=-1 表示连接放在connAddrEntrys存储 //>=0 表示connArray的下坐标,从0开始 protected final int index; @@ -186,8 +186,10 @@ public abstract class ClientConnection implements Co if (index >= 0) { client.connOpenStates[index].set(false); client.connArray[index] = null; //必须connOpenStates之后 - } else if (connEntry != null) { + } else if (connEntry != null) { //index=-1 connEntry.connOpenState.set(false); + } else {//index=-2 + client.localConnList.remove(this); } } diff --git a/src/main/java/org/redkale/net/sncp/SncpClient.java b/src/main/java/org/redkale/net/sncp/SncpClient.java index d44c77bbe..0e98341f1 100644 --- a/src/main/java/org/redkale/net/sncp/SncpClient.java +++ b/src/main/java/org/redkale/net/sncp/SncpClient.java @@ -3,7 +3,7 @@ */ package org.redkale.net.sncp; -import java.net.*; +import java.net.InetSocketAddress; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; import org.redkale.net.*; @@ -46,11 +46,6 @@ public class SncpClient extends Client connect(SocketAddress addr) { - return super.connect(addr); - } - @Override protected CompletableFuture writeChannel(ClientConnection conn, SncpClientRequest request) { return super.writeChannel(conn, request);