From 7bf75b69cd83ea64b0ca41cc33302d3566a62e40 Mon Sep 17 00:00:00 2001 From: redkale Date: Fri, 26 Jan 2024 23:23:26 +0800 Subject: [PATCH] ClusterAgent --- .../cluster/spi/CacheClusterAgent.java | 36 ++++++++++--------- .../org/redkale/cluster/spi/ClusterAgent.java | 2 ++ 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/redkale/cluster/spi/CacheClusterAgent.java b/src/main/java/org/redkale/cluster/spi/CacheClusterAgent.java index d03a3a8d9..b83151021 100644 --- a/src/main/java/org/redkale/cluster/spi/CacheClusterAgent.java +++ b/src/main/java/org/redkale/cluster/spi/CacheClusterAgent.java @@ -67,12 +67,15 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable { if ("ttls".equals(event.name())) { newTtls = Integer.parseInt(event.newValue().toString()); if (newTtls < 5) { - sb.append(CacheClusterAgent.class.getSimpleName()).append("(name=").append(resourceName()).append(") cannot change '").append(event.name()).append("' to '").append(event.coverNewValue()).append("'\r\n"); + sb.append(CacheClusterAgent.class.getSimpleName()).append("(name=").append(resourceName()) + .append(") cannot change '").append(event.name()).append("' to '").append(event.coverNewValue()).append("'\r\n"); } else { - sb.append(CacheClusterAgent.class.getSimpleName()).append("(name=").append(resourceName()).append(") change '").append(event.name()).append("' to '").append(event.coverNewValue()).append("'\r\n"); + sb.append(CacheClusterAgent.class.getSimpleName()).append("(name=").append(resourceName()) + .append(") change '").append(event.name()).append("' to '").append(event.coverNewValue()).append("'\r\n"); } } else { - sb.append(CacheClusterAgent.class.getSimpleName()).append("(name=").append(resourceName()).append(") skip change '").append(event.name()).append("' to '").append(event.coverNewValue()).append("'\r\n"); + sb.append(CacheClusterAgent.class.getSimpleName()).append("(name=").append(resourceName()) + .append(") skip change '").append(event.name()).append("' to '").append(event.coverNewValue()).append("'\r\n"); } } if (newTtls != this.ttls) { @@ -128,15 +131,15 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable { private Runnable newTask() { return () -> { try { - checkApplicationHealth(); - checkHttpAddressHealth(); - loadSncpAddressHealth(); localEntrys.values().stream() .filter(e -> !e.canceled) .forEach(this::checkLocalHealth); remoteEntrys.values().stream() .filter(entry -> "SNCP".equalsIgnoreCase(entry.protocol)) .forEach(this::updateSncpAddress); + checkApplicationHealth(); + checkHttpAddressHealth(); + loadSncpAddressHealth(); } catch (Exception e) { logger.log(Level.SEVERE, "scheduleAtFixedRate check error", e); } @@ -212,11 +215,11 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable { return queryAddress0(serviceName, new HashSet<>(), new AtomicLong()); } - private CompletableFuture> queryAddress0(final String serviceName, final Set set, final AtomicLong cursor) { + private CompletableFuture> queryAddress0(String serviceName, Set set, AtomicLong cursor) { final CompletableFuture> future = source.hscanAsync(serviceName, AddressEntry.class, cursor, 10000); return future.thenCompose(map -> { map.forEach((n, v) -> { - if (v != null && (System.currentTimeMillis() - v.time) / 1000 < ttls) { + if (v != null && (System.currentTimeMillis() - v.time) / 1000 <= ttls) { set.add(v.addr); } }); @@ -232,17 +235,17 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable { String serviceName = generateApplicationServiceName(); String serviceid = generateApplicationServiceId(); AddressEntry entry = (AddressEntry) source.hget(serviceName, serviceid, AddressEntry.class); - return entry != null && (System.currentTimeMillis() - entry.time) / 1000 < ttls; + return entry != null && (System.currentTimeMillis() - entry.time) / 1000 <= ttls; } protected void checkApplicationHealth() { - String checkname = generateApplicationServiceName(); + String checkName = generateApplicationServiceName(); String checkid = generateApplicationCheckId(); AddressEntry entry = new AddressEntry(); entry.addr = this.appAddress; entry.nodeid = this.nodeid; entry.time = System.currentTimeMillis(); - source.hset(checkname, checkid, AddressEntry.class, entry); + source.hset(checkName, checkid, AddressEntry.class, entry); } @Override @@ -263,8 +266,9 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable { @Override public void deregister(Application application) { + String serviceid = generateApplicationServiceId(); String serviceName = generateApplicationServiceName(); - source.del(serviceName); + source.hdel(serviceName, serviceid); } @Override @@ -286,26 +290,26 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable { deregister(ns, protocol, service, true); } - protected void deregister(NodeServer ns, String protocol, Service service, boolean realcanceled) { + protected void deregister(NodeServer ns, String protocol, Service service, boolean realCanceled) { String serviceName = generateServiceName(ns, protocol, service); String serviceid = generateServiceId(ns, protocol, service); ClusterEntry currEntry = null; for (final ClusterEntry entry : localEntrys.values()) { - if (entry.serviceName.equals(serviceName) && entry.serviceid.equals(serviceid)) { + if (Objects.equals(entry.serviceName, serviceName) && Objects.equals(entry.serviceid, serviceid)) { currEntry = entry; break; } } if (currEntry == null) { for (final ClusterEntry entry : remoteEntrys.values()) { - if (entry.serviceName.equals(serviceName) && entry.serviceid.equals(serviceid)) { + if (Objects.equals(entry.serviceName, serviceName) && Objects.equals(entry.serviceid, serviceid)) { currEntry = entry; break; } } } source.hdel(serviceName, serviceid); - if (realcanceled && currEntry != null) { + if (realCanceled && currEntry != null) { currEntry.canceled = true; } } diff --git a/src/main/java/org/redkale/cluster/spi/ClusterAgent.java b/src/main/java/org/redkale/cluster/spi/ClusterAgent.java index 5c7c25837..30b284659 100644 --- a/src/main/java/org/redkale/cluster/spi/ClusterAgent.java +++ b/src/main/java/org/redkale/cluster/spi/ClusterAgent.java @@ -63,8 +63,10 @@ public abstract class ClusterAgent { protected Set tags; + //key: serviceid protected final ConcurrentHashMap localEntrys = new ConcurrentHashMap<>(); + //key: serviceid protected final ConcurrentHashMap remoteEntrys = new ConcurrentHashMap<>(); public void init(AnyValue config) {