diff --git a/src/main/java/org/redkale/net/client/Client.java b/src/main/java/org/redkale/net/client/Client.java index 73197fb4f..6951e3edc 100644 --- a/src/main/java/org/redkale/net/client/Client.java +++ b/src/main/java/org/redkale/net/client/Client.java @@ -62,7 +62,7 @@ public abstract class Client, R extends ClientR final int connLimit; //最大连接数 //连指定地址模式 - final ConcurrentHashMap connAddrEntrys = new ConcurrentHashMap<>(); + final ConcurrentHashMap connAddrEntrys = new ConcurrentHashMap<>(); protected int maxPipelines = DEFAULT_MAX_PIPELINES; //单个连接最大并行处理数 @@ -141,15 +141,20 @@ public abstract class Client, R extends ClientR return; } long now = System.currentTimeMillis(); - for (AddressConnEntry> entry : this.connAddrEntrys.values()) { - ClientConnection conn = entry.connection; - if (conn == null) { - continue; + for (AddressConnEntry>[] entrys : this.connAddrEntrys.values()) { + for (AddressConnEntry> entry : entrys) { + if (entry == null) { + continue; + } + ClientConnection conn = entry.connection; + if (conn == null) { + continue; + } + if (now - conn.getLastWriteTime() < 10_000) { + continue; + } + conn.writeChannel(req).thenAccept(p -> handlePingResult((C) conn, p)); } - if (now - conn.getLastWriteTime() < 10_000) { - continue; - } - conn.writeChannel(req).thenAccept(p -> handlePingResult((C) conn, p)); } } catch (Throwable t) { //do nothing @@ -175,8 +180,12 @@ public abstract class Client, R extends ClientR public void close() { if (closed.compareAndSet(false, true)) { this.timeoutScheduler.shutdownNow(); - for (AddressConnEntry entry : this.connAddrEntrys.values()) { - closeConnection(entry.connection); + for (AddressConnEntry[] entrys : this.connAddrEntrys.values()) { + for (AddressConnEntry entry : entrys) { + if (entry != null) { + closeConnection(entry.connection); + } + } } this.connAddrEntrys.clear(); group.close(); @@ -298,7 +307,7 @@ public abstract class Client, R extends ClientR return CompletableFuture.failedFuture(new NullPointerException("address is empty")); } final String traceid = Traces.currentTraceid(); - final AddressConnEntry entry = connAddrEntrys.computeIfAbsent(addr, a -> new AddressConnEntry()); + final AddressConnEntry entry = getAddressConnEntry(addr, workThread); C ec = entry.connection; if (pool && ec != null && ec.isOpen()) { return CompletableFuture.completedFuture(ec); @@ -358,6 +367,19 @@ public abstract class Client, R extends ClientR } } + private AddressConnEntry getAddressConnEntry(SocketAddress addr, WorkThread workThread) { + final AddressConnEntry[] entrys = connAddrEntrys.computeIfAbsent(addr, a -> { + AddressConnEntry[] array = new AddressConnEntry[connLimit]; + for (int i = 0; i < array.length; i++) { + array[i] = new AddressConnEntry<>(); + } + return array; + }); + ThreadLocalRandom random = ThreadLocalRandom.current(); + int index = workThread == null || workThread.index() < 0 ? random.nextInt(entrys.length) : workThread.index() % entrys.length; + return entrys[index]; + } + protected void incrReqWritedCounter() { reqWritedCounter.increment(); }