ClusterAgent

This commit is contained in:
redkale
2024-01-26 23:23:26 +08:00
parent c94c34e6a8
commit 7bf75b69cd
2 changed files with 22 additions and 16 deletions

View File

@@ -67,12 +67,15 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
if ("ttls".equals(event.name())) {
newTtls = Integer.parseInt(event.newValue().toString());
if (newTtls < 5) {
sb.append(CacheClusterAgent.class.getSimpleName()).append("(name=").append(resourceName()).append(") cannot change '").append(event.name()).append("' to '").append(event.coverNewValue()).append("'\r\n");
sb.append(CacheClusterAgent.class.getSimpleName()).append("(name=").append(resourceName())
.append(") cannot change '").append(event.name()).append("' to '").append(event.coverNewValue()).append("'\r\n");
} else {
sb.append(CacheClusterAgent.class.getSimpleName()).append("(name=").append(resourceName()).append(") change '").append(event.name()).append("' to '").append(event.coverNewValue()).append("'\r\n");
sb.append(CacheClusterAgent.class.getSimpleName()).append("(name=").append(resourceName())
.append(") change '").append(event.name()).append("' to '").append(event.coverNewValue()).append("'\r\n");
}
} else {
sb.append(CacheClusterAgent.class.getSimpleName()).append("(name=").append(resourceName()).append(") skip change '").append(event.name()).append("' to '").append(event.coverNewValue()).append("'\r\n");
sb.append(CacheClusterAgent.class.getSimpleName()).append("(name=").append(resourceName())
.append(") skip change '").append(event.name()).append("' to '").append(event.coverNewValue()).append("'\r\n");
}
}
if (newTtls != this.ttls) {
@@ -128,15 +131,15 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
private Runnable newTask() {
return () -> {
try {
checkApplicationHealth();
checkHttpAddressHealth();
loadSncpAddressHealth();
localEntrys.values().stream()
.filter(e -> !e.canceled)
.forEach(this::checkLocalHealth);
remoteEntrys.values().stream()
.filter(entry -> "SNCP".equalsIgnoreCase(entry.protocol))
.forEach(this::updateSncpAddress);
checkApplicationHealth();
checkHttpAddressHealth();
loadSncpAddressHealth();
} catch (Exception e) {
logger.log(Level.SEVERE, "scheduleAtFixedRate check error", e);
}
@@ -212,11 +215,11 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
return queryAddress0(serviceName, new HashSet<>(), new AtomicLong());
}
private CompletableFuture<Set<InetSocketAddress>> queryAddress0(final String serviceName, final Set<InetSocketAddress> set, final AtomicLong cursor) {
private CompletableFuture<Set<InetSocketAddress>> queryAddress0(String serviceName, Set<InetSocketAddress> set, AtomicLong cursor) {
final CompletableFuture<Map<String, AddressEntry>> future = source.hscanAsync(serviceName, AddressEntry.class, cursor, 10000);
return future.thenCompose(map -> {
map.forEach((n, v) -> {
if (v != null && (System.currentTimeMillis() - v.time) / 1000 < ttls) {
if (v != null && (System.currentTimeMillis() - v.time) / 1000 <= ttls) {
set.add(v.addr);
}
});
@@ -232,17 +235,17 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
String serviceName = generateApplicationServiceName();
String serviceid = generateApplicationServiceId();
AddressEntry entry = (AddressEntry) source.hget(serviceName, serviceid, AddressEntry.class);
return entry != null && (System.currentTimeMillis() - entry.time) / 1000 < ttls;
return entry != null && (System.currentTimeMillis() - entry.time) / 1000 <= ttls;
}
protected void checkApplicationHealth() {
String checkname = generateApplicationServiceName();
String checkName = generateApplicationServiceName();
String checkid = generateApplicationCheckId();
AddressEntry entry = new AddressEntry();
entry.addr = this.appAddress;
entry.nodeid = this.nodeid;
entry.time = System.currentTimeMillis();
source.hset(checkname, checkid, AddressEntry.class, entry);
source.hset(checkName, checkid, AddressEntry.class, entry);
}
@Override
@@ -263,8 +266,9 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
@Override
public void deregister(Application application) {
String serviceid = generateApplicationServiceId();
String serviceName = generateApplicationServiceName();
source.del(serviceName);
source.hdel(serviceName, serviceid);
}
@Override
@@ -286,26 +290,26 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
deregister(ns, protocol, service, true);
}
protected void deregister(NodeServer ns, String protocol, Service service, boolean realcanceled) {
protected void deregister(NodeServer ns, String protocol, Service service, boolean realCanceled) {
String serviceName = generateServiceName(ns, protocol, service);
String serviceid = generateServiceId(ns, protocol, service);
ClusterEntry currEntry = null;
for (final ClusterEntry entry : localEntrys.values()) {
if (entry.serviceName.equals(serviceName) && entry.serviceid.equals(serviceid)) {
if (Objects.equals(entry.serviceName, serviceName) && Objects.equals(entry.serviceid, serviceid)) {
currEntry = entry;
break;
}
}
if (currEntry == null) {
for (final ClusterEntry entry : remoteEntrys.values()) {
if (entry.serviceName.equals(serviceName) && entry.serviceid.equals(serviceid)) {
if (Objects.equals(entry.serviceName, serviceName) && Objects.equals(entry.serviceid, serviceid)) {
currEntry = entry;
break;
}
}
}
source.hdel(serviceName, serviceid);
if (realcanceled && currEntry != null) {
if (realCanceled && currEntry != null) {
currEntry.canceled = true;
}
}

View File

@@ -63,8 +63,10 @@ public abstract class ClusterAgent {
protected Set<String> tags;
//key: serviceid
protected final ConcurrentHashMap<String, ClusterEntry> localEntrys = new ConcurrentHashMap<>();
//key: serviceid
protected final ConcurrentHashMap<String, ClusterEntry> remoteEntrys = new ConcurrentHashMap<>();
public void init(AnyValue config) {