queryAddress优化

This commit is contained in:
redkale
2023-11-11 12:34:15 +08:00
parent fdfa70c765
commit cb2abab70c
5 changed files with 21 additions and 12 deletions

View File

@@ -52,7 +52,7 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
super.init(config); super.init(config);
this.sourceName = getSourceName(); this.sourceName = getSourceName();
this.ttls = config.getIntValue("ttls", 10); this.ttls = config.getIntValue("ttls", 10);
if (this.ttls < 5) { if (this.ttls < 5) { //值不能太小
this.ttls = 10; this.ttls = 10;
} }
} }
@@ -121,7 +121,7 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
if (this.taskFuture != null) { if (this.taskFuture != null) {
this.taskFuture.cancel(true); this.taskFuture.cancel(true);
} }
this.taskFuture = this.scheduler.scheduleAtFixedRate(newTask(), Math.max(2000, ttls * 1000), Math.max(2000, ttls * 1000), TimeUnit.MILLISECONDS); this.taskFuture = this.scheduler.scheduleAtFixedRate(newTask(), ttls, ttls, TimeUnit.SECONDS);
} }
private Runnable newTask() { private Runnable newTask() {
@@ -208,15 +208,22 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
} }
private CompletableFuture<Set<InetSocketAddress>> queryAddress(final String serviceName) { private CompletableFuture<Set<InetSocketAddress>> queryAddress(final String serviceName) {
final CompletableFuture<Map<String, AddressEntry>> future = source.hscanAsync(serviceName, AddressEntry.class, new AtomicLong(), 10000); return queryAddress0(serviceName, new HashSet<>(), new AtomicLong());
return future.thenApply(map -> { }
final Set<InetSocketAddress> set = new HashSet<>();
private CompletableFuture<Set<InetSocketAddress>> queryAddress0(final String serviceName, final Set<InetSocketAddress> set, final AtomicLong cursor) {
final CompletableFuture<Map<String, AddressEntry>> future = source.hscanAsync(serviceName, AddressEntry.class, cursor, 10000);
return future.thenCompose(map -> {
map.forEach((n, v) -> { map.forEach((n, v) -> {
if (v != null && (System.currentTimeMillis() - v.time) / 1000 < ttls) { if (v != null && (System.currentTimeMillis() - v.time) / 1000 < ttls) {
set.add(v.addr); set.add(v.addr);
} }
}); });
return set; if (cursor.get() == 0) {
return CompletableFuture.completedFuture(set);
} else {
return queryAddress0(serviceName, set, cursor);
}
}); });
} }

View File

@@ -18,7 +18,7 @@ import static org.redkale.util.Utility.isEmpty;
import static org.redkale.util.Utility.isNotEmpty; import static org.redkale.util.Utility.isNotEmpty;
/** /**
* 没有配置MQ的情况下依赖ClusterAgent实现的默认HttpMessageClient实例 * 没有配置MQ的情况下依赖ClusterAgent实现的默认HttpRpcClient实例
* *
* <p> * <p>
* 详情见: https://redkale.org * 详情见: https://redkale.org

View File

@@ -63,6 +63,9 @@ public class HttpLocalRpcClient extends HttpRpcClient {
} }
} }
} }
if (nodeHttpServer == null) {
throw new HttpException("Not found HttpServer");
}
this.currServer = nodeHttpServer.getServer(); this.currServer = nodeHttpServer.getServer();
} }
return this.currServer; return this.currServer;

View File

@@ -27,12 +27,13 @@ public class SncpClient extends Client<SncpClientConnection, SncpClientRequest,
final InetSocketAddress clientSncpAddress; final InetSocketAddress clientSncpAddress;
public SncpClient(String name, AsyncGroup group, int nodeid, InetSocketAddress clientSncpAddress, ClientAddress address, String netprotocol, int maxConns, int maxPipelines) { public SncpClient(String name, AsyncGroup group, int nodeid,
InetSocketAddress clientSncpAddress, ClientAddress address, String netprotocol, int maxConns, int maxPipelines) {
super(name, group, "TCP".equalsIgnoreCase(netprotocol), address, maxConns, maxPipelines, null, null, null); //maxConns super(name, group, "TCP".equalsIgnoreCase(netprotocol), address, maxConns, maxPipelines, null, null, null); //maxConns
this.clientSncpAddress = clientSncpAddress; this.clientSncpAddress = clientSncpAddress;
this.nodeid = nodeid; this.nodeid = nodeid;
this.readTimeoutSeconds = 15; this.readTimeoutSeconds = 10;
this.writeTimeoutSeconds = 15; this.writeTimeoutSeconds = 10;
} }
@Override @Override

View File

@@ -21,8 +21,6 @@ public class SncpHeader {
public static final byte KEEPALIVE_OFF = -1; public static final byte KEEPALIVE_OFF = -1;
private static final byte[] EMPTY_ADDR = new byte[4];
private Long seqid; private Long seqid;
private Uint128 serviceid; private Uint128 serviceid;