ClusterAgent

This commit is contained in:
redkale
2024-07-01 13:10:25 +08:00
parent 9b48e85b26
commit 69c2baae51
3 changed files with 36 additions and 36 deletions

View File

@@ -203,7 +203,7 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
newaddr.resname = entry.resourceName; newaddr.resname = entry.resourceName;
newaddr.nodeid = this.nodeid; newaddr.nodeid = this.nodeid;
newaddr.time = System.currentTimeMillis(); newaddr.time = System.currentTimeMillis();
source.hset(entry.checkName, entry.checkid, AddressEntry.class, newaddr); source.hset(entry.checkName, entry.checkId, AddressEntry.class, newaddr);
} }
@Override // 获取SNCP远程服务的可用ip列表 @Override // 获取SNCP远程服务的可用ip列表
@@ -309,7 +309,7 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
entry.resname = clusterEntry.resourceName; entry.resname = clusterEntry.resourceName;
entry.nodeid = this.nodeid; entry.nodeid = this.nodeid;
entry.time = System.currentTimeMillis(); entry.time = System.currentTimeMillis();
source.hset(clusterEntry.serviceName, clusterEntry.serviceid, AddressEntry.class, entry); source.hset(clusterEntry.serviceName, clusterEntry.serviceId, AddressEntry.class, entry);
return clusterEntry; return clusterEntry;
} }
@@ -320,14 +320,14 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
protected void deregister(NodeServer ns, String protocol, Service service, boolean realCanceled) { protected void deregister(NodeServer ns, String protocol, Service service, boolean realCanceled) {
String serviceName = generateServiceName(ns, protocol, service); String serviceName = generateServiceName(ns, protocol, service);
String serviceid = generateServiceId(ns, protocol, service); String serviceId = generateServiceId(ns, protocol, service);
Predicate<ClusterEntry> predicate = Predicate<ClusterEntry> predicate =
entry -> Objects.equals(entry.serviceName, serviceName) && Objects.equals(entry.serviceid, serviceid); entry -> Objects.equals(entry.serviceName, serviceName) && Objects.equals(entry.serviceId, serviceId);
ClusterEntry currEntry = Utility.find(localEntrys.values(), predicate); ClusterEntry currEntry = Utility.find(localEntrys.values(), predicate);
if (currEntry == null) { if (currEntry == null) {
currEntry = Utility.find(remoteEntrys.values(), predicate); currEntry = Utility.find(remoteEntrys.values(), predicate);
} }
source.hdel(serviceName, serviceid); source.hdel(serviceName, serviceId);
if (realCanceled && currEntry != null) { if (realCanceled && currEntry != null) {
currEntry.canceled = true; currEntry.canceled = true;
} }

View File

@@ -61,10 +61,10 @@ public abstract class ClusterAgent {
protected Set<String> tags; protected Set<String> tags;
// key: serviceid // key: serviceId
protected final ConcurrentHashMap<String, ClusterEntry> localEntrys = new ConcurrentHashMap<>(); protected final ConcurrentHashMap<String, ClusterEntry> localEntrys = new ConcurrentHashMap<>();
// key: serviceid // key: serviceId
protected final ConcurrentHashMap<String, ClusterEntry> remoteEntrys = new ConcurrentHashMap<>(); protected final ConcurrentHashMap<String, ClusterEntry> remoteEntrys = new ConcurrentHashMap<>();
public void init(AnyValue config) { public void init(AnyValue config) {
@@ -75,7 +75,7 @@ public abstract class ClusterAgent {
this.protocols = Utility.isEmpty(ps) ? null : ps.split(";"); this.protocols = Utility.isEmpty(ps) ? null : ps.split(";");
String ts = config.getValue("ports", ""); String ts = config.getValue("ports", "");
if (ts != null && !ts.isEmpty()) { if (Utility.isNotEmpty(ts)) {
String[] its = ts.split(";"); String[] its = ts.split(";");
List<Integer> list = new ArrayList<>(); List<Integer> list = new ArrayList<>();
for (String str : its) { for (String str : its) {
@@ -152,14 +152,14 @@ public abstract class ClusterAgent {
continue; continue;
} }
ClusterEntry htentry = register(ns, protocol, service); ClusterEntry htentry = register(ns, protocol, service);
localEntrys.put(htentry.serviceid, htentry); localEntrys.put(htentry.serviceId, htentry);
} }
// 远程模式加载IP列表, 只支持SNCP协议 // 远程模式加载IP列表, 只支持SNCP协议
if (ns.isSNCP()) { if (ns.isSNCP()) {
for (Service service : remoteServices) { for (Service service : remoteServices) {
ClusterEntry entry = new ClusterEntry(ns, protocol, service); ClusterEntry entry = new ClusterEntry(ns, protocol, service);
updateSncpAddress(entry); updateSncpAddress(entry);
remoteEntrys.put(entry.serviceid, entry); remoteEntrys.put(entry.serviceId, entry);
} }
} }
} }
@@ -248,7 +248,7 @@ public abstract class ClusterAgent {
try { try {
Set<InetSocketAddress> addrs = ClusterAgent.this.queryAddress(entry).join(); Set<InetSocketAddress> addrs = ClusterAgent.this.queryAddress(entry).join();
SncpRpcGroups rpcGroups = application.getSncpRpcGroups(); SncpRpcGroups rpcGroups = application.getSncpRpcGroups();
rpcGroups.putClusterAddress(entry.resourceid, addrs); rpcGroups.putClusterAddress(entry.resourceId, addrs);
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.SEVERE, entry + " updateSncpAddress error", e); logger.log(Level.SEVERE, entry + " updateSncpAddress error", e);
} }
@@ -368,7 +368,7 @@ public abstract class ClusterAgent {
public class ClusterEntry { public class ClusterEntry {
// serviceName+nodeid为主 服务的单个实例 // serviceName+nodeid为主 服务的单个实例
public String serviceid; public String serviceId;
// 以协议+Rest资源名为主 服务类名 // 以协议+Rest资源名为主 服务类名
public String serviceName; public String serviceName;
@@ -377,9 +377,9 @@ public abstract class ClusterAgent {
public final String resourceName; public final String resourceName;
public final String resourceid; public final String resourceId;
public String checkid; public String checkId;
public String checkName; public String checkName;
@@ -397,14 +397,14 @@ public abstract class ClusterAgent {
public boolean canceled; public boolean canceled;
public ClusterEntry(NodeServer ns, String protocol, Service service) { public ClusterEntry(NodeServer ns, String protocol, Service service) {
this.serviceid = generateServiceId(ns, protocol, service); this.serviceId = generateServiceId(ns, protocol, service);
this.serviceName = generateServiceName(ns, protocol, service); this.serviceName = generateServiceName(ns, protocol, service);
this.checkid = generateCheckId(ns, protocol, service); this.checkId = generateCheckId(ns, protocol, service);
this.checkName = generateCheckName(ns, protocol, service); this.checkName = generateCheckName(ns, protocol, service);
Class restype = Sncp.getResourceType(service); Class restype = Sncp.getResourceType(service);
this.resourceType = restype.getName(); this.resourceType = restype.getName();
this.resourceName = Sncp.getResourceName(service); this.resourceName = Sncp.getResourceName(service);
this.resourceid = Sncp.resourceid(resourceName, restype); this.resourceId = Sncp.resourceid(resourceName, restype);
this.protocol = protocol; this.protocol = protocol;
InetSocketAddress addr = ns.getSocketAddress(); InetSocketAddress addr = ns.getSocketAddress();
String host = addr.getHostString(); String host = addr.getHostString();

View File

@@ -181,9 +181,9 @@ public class CronExpression {
private static final String[] DAYS = new String[] {"MON", "TUE", "WED", "THU", "FRI", "SAT", "SUN"}; private static final String[] DAYS = new String[] {"MON", "TUE", "WED", "THU", "FRI", "SAT", "SUN"};
private final Type type; private final CronType type;
protected CronField(Type type) { protected CronField(CronType type) {
this.type = type; this.type = type;
} }
@@ -310,7 +310,7 @@ public class CronExpression {
@Nullable @Nullable
public abstract <T extends Temporal & Comparable<? super T>> T nextOrSame(T temporal); public abstract <T extends Temporal & Comparable<? super T>> T nextOrSame(T temporal);
protected Type type() { protected CronType type() {
return this.type; return this.type;
} }
@@ -319,7 +319,7 @@ public class CronExpression {
return (T) temporal; return (T) temporal;
} }
protected enum Type { protected enum CronType {
NANO(ChronoField.NANO_OF_SECOND, ChronoUnit.SECONDS), NANO(ChronoField.NANO_OF_SECOND, ChronoUnit.SECONDS),
SECOND(ChronoField.SECOND_OF_MINUTE, ChronoUnit.MINUTES, ChronoField.NANO_OF_SECOND), SECOND(ChronoField.SECOND_OF_MINUTE, ChronoUnit.MINUTES, ChronoField.NANO_OF_SECOND),
MINUTE( MINUTE(
@@ -362,7 +362,7 @@ public class CronExpression {
private final ChronoField[] lowerOrders; private final ChronoField[] lowerOrders;
Type(ChronoField field, ChronoUnit higherOrder, ChronoField... lowerOrders) { CronType(ChronoField field, ChronoUnit higherOrder, ChronoField... lowerOrders) {
this.field = field; this.field = field;
this.higherOrder = higherOrder; this.higherOrder = higherOrder;
this.lowerOrders = lowerOrders; this.lowerOrders = lowerOrders;
@@ -438,13 +438,13 @@ public class CronExpression {
// we store at most 60 bits, for seconds and minutes, so a 64-bit long suffices // we store at most 60 bits, for seconds and minutes, so a 64-bit long suffices
private long bits; private long bits;
private BitsCronField(Type type) { private BitsCronField(CronType type) {
super(type); super(type);
} }
public static BitsCronField zeroNanos() { public static BitsCronField zeroNanos() {
if (zeroNanos == null) { if (zeroNanos == null) {
BitsCronField field = new BitsCronField(Type.NANO); BitsCronField field = new BitsCronField(CronType.NANO);
field.setBit(0); field.setBit(0);
zeroNanos = field; zeroNanos = field;
} }
@@ -452,27 +452,27 @@ public class CronExpression {
} }
public static BitsCronField parseSeconds(String value) { public static BitsCronField parseSeconds(String value) {
return parseField(value, Type.SECOND); return parseField(value, CronType.SECOND);
} }
public static BitsCronField parseMinutes(String value) { public static BitsCronField parseMinutes(String value) {
return BitsCronField.parseField(value, Type.MINUTE); return BitsCronField.parseField(value, CronType.MINUTE);
} }
public static BitsCronField parseHours(String value) { public static BitsCronField parseHours(String value) {
return BitsCronField.parseField(value, Type.HOUR); return BitsCronField.parseField(value, CronType.HOUR);
} }
public static BitsCronField parseDaysOfMonth(String value) { public static BitsCronField parseDaysOfMonth(String value) {
return parseDate(value, Type.DAY_OF_MONTH); return parseDate(value, CronType.DAY_OF_MONTH);
} }
public static BitsCronField parseMonth(String value) { public static BitsCronField parseMonth(String value) {
return BitsCronField.parseField(value, Type.MONTH); return BitsCronField.parseField(value, CronType.MONTH);
} }
public static BitsCronField parseDaysOfWeek(String value) { public static BitsCronField parseDaysOfWeek(String value) {
BitsCronField result = parseDate(value, Type.DAY_OF_WEEK); BitsCronField result = parseDate(value, CronType.DAY_OF_WEEK);
if (result.getBit(0)) { if (result.getBit(0)) {
// cron supports 0 for Sunday; we use 7 like java.time // cron supports 0 for Sunday; we use 7 like java.time
result.setBit(7); result.setBit(7);
@@ -481,14 +481,14 @@ public class CronExpression {
return result; return result;
} }
private static BitsCronField parseDate(String value, BitsCronField.Type type) { private static BitsCronField parseDate(String value, CronType type) {
if (value.equals("?")) { if (value.equals("?")) {
value = "*"; value = "*";
} }
return BitsCronField.parseField(value, type); return BitsCronField.parseField(value, type);
} }
private static BitsCronField parseField(String value, Type type) { private static BitsCronField parseField(String value, CronType type) {
if (Utility.isBlank(value)) { if (Utility.isBlank(value)) {
throw new RedkaleException("Value must not be empty"); throw new RedkaleException("Value must not be empty");
} }
@@ -525,7 +525,7 @@ public class CronExpression {
} }
} }
private static ValueRange parseRange(String value, Type type) { private static ValueRange parseRange(String value, CronType type) {
if (value.equals("*")) { if (value.equals("*")) {
return type.range(); return type.range();
} else { } else {
@@ -538,7 +538,7 @@ public class CronExpression {
int max = Integer.parseInt(value, hyphenPos + 1, value.length(), 10); int max = Integer.parseInt(value, hyphenPos + 1, value.length(), 10);
min = type.checkValidValue(min); min = type.checkValidValue(min);
max = type.checkValidValue(max); max = type.checkValidValue(max);
if (type == Type.DAY_OF_WEEK && min == 7) { if (type == CronType.DAY_OF_WEEK && min == 7) {
// If used as a minimum in a range, Sunday means 0 (not 7) // If used as a minimum in a range, Sunday means 0 (not 7)
min = 0; min = 0;
} }
@@ -660,13 +660,13 @@ public class CronExpression {
private final String value; private final String value;
private CompositeCronField(Type type, CronField[] fields, String value) { private CompositeCronField(CronType type, CronField[] fields, String value) {
super(type); super(type);
this.fields = fields; this.fields = fields;
this.value = value; this.value = value;
} }
public static CronField compose(CronField[] fields, Type type, String value) { public static CronField compose(CronField[] fields, CronType type, String value) {
if (fields == null || fields.length < 1) { if (fields == null || fields.length < 1) {
throw new RedkaleException("Fields must not be empty"); throw new RedkaleException("Fields must not be empty");
} }