This commit is contained in:
Redkale
2021-01-14 13:37:45 +08:00
parent 5781ad1de0
commit 8654b5eef8

View File

@@ -100,27 +100,21 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
return t; return t;
}); });
//delay为了错开请求
this.scheduler.scheduleAtFixedRate(() -> { this.scheduler.scheduleAtFixedRate(() -> {
checkApplicationHealth(); try {
checkHttpAddressHealth(); checkApplicationHealth();
}, 18, Math.max(2000, ttls * 1000 - 168), TimeUnit.MILLISECONDS); checkHttpAddressHealth();
loadMqtpAddressHealth();
this.scheduler.scheduleAtFixedRate(() -> { localEntrys.values().stream().filter(e -> !e.canceled).forEach(entry -> {
loadMqtpAddressHealth(); checkLocalHealth(entry);
}, 88 * 2, Math.max(2000, ttls * 1000 - 168), TimeUnit.MILLISECONDS); });
remoteEntrys.values().stream().filter(entry -> "SNCP".equalsIgnoreCase(entry.protocol)).forEach(entry -> {
this.scheduler.scheduleAtFixedRate(() -> { updateSncpTransport(entry);
localEntrys.values().stream().filter(e -> !e.canceled).forEach(entry -> { });
checkLocalHealth(entry); } catch (Exception e) {
}); logger.log(Level.SEVERE, "scheduleAtFixedRate check error", e);
}, 128 * 3, Math.max(2000, ttls * 1000 - 168), TimeUnit.MILLISECONDS); }
}, Math.max(2000, ttls * 1000), Math.max(2000, ttls * 1000), TimeUnit.MILLISECONDS);
this.scheduler.scheduleAtFixedRate(() -> {
remoteEntrys.values().stream().filter(entry -> "SNCP".equalsIgnoreCase(entry.protocol)).forEach(entry -> {
updateSncpTransport(entry);
});
}, 188 * 4, Math.max(2000, ttls * 1000 - 168), TimeUnit.MILLISECONDS);
} }
} }
@@ -150,14 +144,11 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
} }
protected void checkLocalHealth(final ClusterEntry entry) { protected void checkLocalHealth(final ClusterEntry entry) {
AddressEntry oldaddr = (AddressEntry) source.hget(entry.checkname, entry.checkid, AddressEntry.class);
AddressEntry newaddr = new AddressEntry(); AddressEntry newaddr = new AddressEntry();
newaddr.addr = entry.address; newaddr.addr = entry.address;
newaddr.nodeid = this.nodeid; newaddr.nodeid = this.nodeid;
newaddr.time = System.currentTimeMillis(); newaddr.time = System.currentTimeMillis();
source.hset(entry.checkname, entry.checkid, AddressEntry.class, newaddr); source.hset(entry.checkname, entry.checkid, AddressEntry.class, newaddr);
boolean ok = oldaddr != null && (System.currentTimeMillis() - oldaddr.time) / 1000 < ttls;
if (!ok) logger.log(Level.SEVERE, entry.checkid + " check error: " + oldaddr);
} }
@Override //获取MQTP的HTTP远程服务的可用ip列表, key = servicename的后半段 @Override //获取MQTP的HTTP远程服务的可用ip列表, key = servicename的后半段