优化client

This commit is contained in:
redkale
2023-11-29 23:20:57 +08:00
parent a4b277e875
commit f0e53d4a8e

View File

@@ -62,7 +62,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
final int connLimit; //最大连接数
//连指定地址模式
final ConcurrentHashMap<SocketAddress, AddressConnEntry> connAddrEntrys = new ConcurrentHashMap<>();
final ConcurrentHashMap<SocketAddress, AddressConnEntry[]> connAddrEntrys = new ConcurrentHashMap<>();
protected int maxPipelines = DEFAULT_MAX_PIPELINES; //单个连接最大并行处理数
@@ -141,15 +141,20 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
return;
}
long now = System.currentTimeMillis();
for (AddressConnEntry<ClientConnection<R, P>> entry : this.connAddrEntrys.values()) {
ClientConnection<R, P> conn = entry.connection;
if (conn == null) {
continue;
for (AddressConnEntry<ClientConnection<R, P>>[] entrys : this.connAddrEntrys.values()) {
for (AddressConnEntry<ClientConnection<R, P>> entry : entrys) {
if (entry == null) {
continue;
}
ClientConnection<R, P> conn = entry.connection;
if (conn == null) {
continue;
}
if (now - conn.getLastWriteTime() < 10_000) {
continue;
}
conn.writeChannel(req).thenAccept(p -> handlePingResult((C) conn, p));
}
if (now - conn.getLastWriteTime() < 10_000) {
continue;
}
conn.writeChannel(req).thenAccept(p -> handlePingResult((C) conn, p));
}
} catch (Throwable t) {
//do nothing
@@ -175,8 +180,12 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
public void close() {
if (closed.compareAndSet(false, true)) {
this.timeoutScheduler.shutdownNow();
for (AddressConnEntry<C> entry : this.connAddrEntrys.values()) {
closeConnection(entry.connection);
for (AddressConnEntry<C>[] entrys : this.connAddrEntrys.values()) {
for (AddressConnEntry<C> entry : entrys) {
if (entry != null) {
closeConnection(entry.connection);
}
}
}
this.connAddrEntrys.clear();
group.close();
@@ -298,7 +307,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
return CompletableFuture.failedFuture(new NullPointerException("address is empty"));
}
final String traceid = Traces.currentTraceid();
final AddressConnEntry<C> entry = connAddrEntrys.computeIfAbsent(addr, a -> new AddressConnEntry());
final AddressConnEntry<C> entry = getAddressConnEntry(addr, workThread);
C ec = entry.connection;
if (pool && ec != null && ec.isOpen()) {
return CompletableFuture.completedFuture(ec);
@@ -358,6 +367,19 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
}
}
private AddressConnEntry<C> getAddressConnEntry(SocketAddress addr, WorkThread workThread) {
final AddressConnEntry<C>[] entrys = connAddrEntrys.computeIfAbsent(addr, a -> {
AddressConnEntry<C>[] array = new AddressConnEntry[connLimit];
for (int i = 0; i < array.length; i++) {
array[i] = new AddressConnEntry<>();
}
return array;
});
ThreadLocalRandom random = ThreadLocalRandom.current();
int index = workThread == null || workThread.index() < 0 ? random.nextInt(entrys.length) : workThread.index() % entrys.length;
return entrys[index];
}
protected void incrReqWritedCounter() {
reqWritedCounter.increment();
}