配置中心支持动态更改mq和cluster配置

This commit is contained in:
Redkale
2022-12-11 13:29:33 +08:00
parent 86c66aab6d
commit d36c168faa
4 changed files with 253 additions and 25 deletions

View File

@@ -142,6 +142,7 @@ public final class Application {
final ExecutorService workExecutor;
//日志配置资源
//@since 2.8.0
final Properties loggingProperties = new Properties();
//Source 原始的配置资源, 只会存在redkale.datasource(.|[) redkale.cachesource(.|[)开头的配置项
@@ -172,10 +173,18 @@ public final class Application {
//配置信息只读版Properties
private final Environment environment;
//第三方服务配置资源
//@since 2.8.0
final Properties clusterProperties = new Properties();
//第三方服务发现管理接口
//@since 2.1.0
private final ClusterAgent clusterAgent;
//MQ管理配置资源
//@since 2.8.0
final Properties messageProperties = new Properties();
//MQ管理接口
//@since 2.1.0
private final MessageAgent[] messageAgents;
@@ -779,6 +788,14 @@ public final class Application {
} else {
sourceProperties.put(key, val);
}
} else if (key.toString().startsWith("redkale.mq.") || key.toString().startsWith("redkale.mq[")) {
if (key.toString().endsWith(".name")) {
logger.log(Level.WARNING, "skip illegal key " + key + " in mq config, key cannot endsWith '.name'");
} else {
messageProperties.put(key, val);
}
} else if (key.toString().startsWith("redkale.cluster.")) {
clusterProperties.put(key, val);
}
});
}
@@ -1109,6 +1126,18 @@ public final class Application {
return null;
}
private AnyValue findMQConfig(String mqName) {
AnyValue mqsNode = config.getAnyValue("mq");
if (mqsNode != null) {
AnyValue confNode = mqsNode.getAnyValue(mqName);
if (confNode != null) { //必须要设置name属性
((DefaultAnyValue) confNode).setValue("name", mqName);
}
return confNode;
}
return null;
}
CacheSource loadCacheSource(final String sourceName, boolean autoMemory) {
long st = System.currentTimeMillis();
CacheSource old = resourceFactory.find(sourceName, CacheSource.class);
@@ -1828,10 +1857,25 @@ public final class Application {
Set<String> sourceRemovedKeys = new HashSet<>();
Properties sourceChangedProps = new Properties();
Set<String> loggingRemovedKeys = new HashSet<>();
Properties loggingChangedProps = new Properties();
Set<String> clusterRemovedKeys = new HashSet<>();
Properties clusterChangedProps = new Properties();
Set<String> messageRemovedKeys = new HashSet<>();
Properties messageChangedProps = new Properties();
for (ResourceEvent<String> event : events) {
if (namespace != null && namespace.startsWith("logging")) {
loggingChangedProps.put(event.name(), event.newValue());
if (event.newValue() == null) {
if (loggingProperties.containsKey(event.name())) {
loggingRemovedKeys.add(event.name());
}
} else {
loggingChangedProps.put(event.name(), event.newValue());
}
continue;
}
if (event.name().startsWith("redkale.datasource.") || event.name().startsWith("redkale.datasource[")
@@ -1849,6 +1893,30 @@ public final class Application {
}
}
}
} else if (event.name().startsWith("redkale.mq.") || event.name().startsWith("redkale.mq[")) {
if (event.name().endsWith(".name")) {
logger.log(Level.WARNING, "skip illegal key " + event.name() + " in mq config " + (namespace == null ? "" : namespace) + ", key cannot endsWith '.name'");
} else {
if (!Objects.equals(event.newValue(), messageProperties.getProperty(event.name()))) {
if (event.newValue() == null) {
if (messageProperties.containsKey(event.name())) {
messageRemovedKeys.add(event.name());
}
} else {
messageChangedProps.put(event.name(), event.newValue());
}
}
}
} else if (event.name().startsWith("redkale.cluster.")) {
if (!Objects.equals(event.newValue(), clusterProperties.getProperty(event.name()))) {
if (event.newValue() == null) {
if (clusterProperties.containsKey(event.name())) {
clusterRemovedKeys.add(event.name());
}
} else {
clusterChangedProps.put(event.name(), event.newValue());
}
}
} else if (event.name().startsWith("system.property.")) {
String propName = event.name().substring("system.property.".length());
if (event.newValue() == null) {
@@ -1894,10 +1962,11 @@ public final class Application {
envRemovedKeys.forEach(k -> envProperties.remove(k));
resourceFactory.register(envRegisterProps, "", Environment.class);
}
//日志配置项的变更
if (!loggingChangedProps.isEmpty()) {
if (!loggingChangedProps.isEmpty() || !loggingRemovedKeys.isEmpty()) {
//只是简单变更日志级别则直接操作,无需重新配置日志
if (loggingChangedProps.size() == 1 && loggingChangedProps.containsKey(".level")) {
if (loggingRemovedKeys.isEmpty() && loggingChangedProps.size() == 1 && loggingChangedProps.containsKey(".level")) {
try {
Level logLevel = Level.parse(loggingChangedProps.getProperty(".level"));
Logger.getGlobal().setLevel(logLevel);
@@ -1910,10 +1979,12 @@ public final class Application {
Properties newLogProps = new Properties();
newLogProps.putAll(this.loggingProperties);
newLogProps.putAll(loggingChangedProps);
loggingRemovedKeys.forEach(k -> newLogProps.remove(k));
reconfigLogging(newLogProps);
logger.log(Level.INFO, "reconfig logging finished ");
}
}
//数据源配置项的变更
if (!sourceChangedProps.isEmpty() || !sourceRemovedKeys.isEmpty()) {
Set<String> cacheSourceNames = new LinkedHashSet<>();
@@ -1935,7 +2006,7 @@ public final class Application {
//更新缓存
for (String sourceName : cacheSourceNames) {
CacheSource source = Utility.find(cacheSources, s -> Objects.equals(s.resourceName(), sourceName));
if (source == null) return; //多余的数据源
if (source == null) continue; //多余的数据源
final DefaultAnyValue old = (DefaultAnyValue) findSourceConfig(sourceName, "cachesource");
Properties newProps = new Properties();
sourceProperties.forEach((k, v) -> {
@@ -1976,8 +2047,8 @@ public final class Application {
});
if (!changeEvents.isEmpty()) {
DefaultAnyValue back = old.copy();
old.replace(AnyValue.loadFromProperties(newProps).getAnyValue("redkale").getAnyValue("cachesource").getAnyValue(sourceName));
try {
old.replace(AnyValue.loadFromProperties(newProps).getAnyValue("redkale").getAnyValue("cachesource").getAnyValue(sourceName));
((AbstractCacheSource) source).onResourceChange(changeEvents.toArray(new ResourceEvent[changeEvents.size()]));
} catch (RuntimeException e) {
old.replace(back); //还原配置
@@ -1988,7 +2059,7 @@ public final class Application {
//更新数据库
for (String sourceName : dataSourceNames) {
DataSource source = Utility.find(dataSources, s -> Objects.equals(s.resourceName(), sourceName));
if (source == null) return; //多余的数据源
if (source == null) continue; //多余的数据源
DefaultAnyValue old = (DefaultAnyValue) findSourceConfig(sourceName, "datasource");
Properties newProps = new Properties();
sourceProperties.forEach((k, v) -> {
@@ -2029,8 +2100,8 @@ public final class Application {
});
if (!changeEvents.isEmpty()) {
DefaultAnyValue back = old.copy();
old.replace(AnyValue.loadFromProperties(newProps).getAnyValue("redkale").getAnyValue("datasource").getAnyValue(sourceName));
try {
old.replace(AnyValue.loadFromProperties(newProps).getAnyValue("redkale").getAnyValue("datasource").getAnyValue(sourceName));
((AbstractDataSource) source).onResourceChange(changeEvents.toArray(new ResourceEvent[changeEvents.size()]));
} catch (RuntimeException e) {
old.replace(back); //还原配置
@@ -2041,6 +2112,119 @@ public final class Application {
sourceRemovedKeys.forEach(k -> sourceProperties.remove(k));
sourceProperties.putAll(sourceChangedProps);
}
//MQ配置项的变更
if (!messageChangedProps.isEmpty() || !messageRemovedKeys.isEmpty()) {
Set<String> messageNames = new LinkedHashSet<>();
List<String> keys = new ArrayList<>();
keys.addAll(messageRemovedKeys);
keys.addAll((Set) messageChangedProps.keySet());
for (final String key : keys) {
if (key.startsWith("redkale.mq[")) {
messageNames.add(key.substring("redkale.mq[".length(), key.indexOf(']')));
} else if (key.startsWith("redkale.mq.")) {
messageNames.add(key.substring("redkale.mq.".length(), key.indexOf('.', "redkale.mq.".length())));
}
}
//更新MQ
for (String mqName : messageNames) {
MessageAgent agent = Utility.find(messageAgents, s -> Objects.equals(s.resourceName(), mqName));
if (agent == null) continue; //多余的数据源
final DefaultAnyValue old = (DefaultAnyValue) findMQConfig(mqName);
Properties newProps = new Properties();
messageProperties.forEach((k, v) -> {
final String key = k.toString();
String prefix = "redkale.mq[" + mqName + "].";
int pos = key.indexOf(prefix);
if (pos < 0) {
prefix = "redkale.mq." + mqName + ".";
pos = key.indexOf(prefix);
}
if (pos < 0) return; //不是同一name数据源配置项
newProps.put(k, v);
});
List<ResourceEvent> changeEvents = new ArrayList<>();
messageChangedProps.forEach((k, v) -> {
final String key = k.toString();
String prefix = "redkale.mq[" + mqName + "].";
int pos = key.indexOf(prefix);
if (pos < 0) {
prefix = "redkale.mq." + mqName + ".";
pos = key.indexOf(prefix);
}
if (pos < 0) return; //不是同一name数据源配置项
newProps.put(k, v);
changeEvents.add(ResourceEvent.create(key.substring(prefix.length()), v, messageProperties.getProperty(key)));
});
messageRemovedKeys.forEach(k -> {
final String key = k;
String prefix = "redkale.mq[" + mqName + "].";
int pos = key.indexOf(prefix);
if (pos < 0) {
prefix = "redkale.mq." + mqName + ".";
pos = key.indexOf(prefix);
}
if (pos < 0) return;
newProps.remove(k); //不是同一name数据源配置项
changeEvents.add(ResourceEvent.create(key.substring(prefix.length()), null, messageProperties.getProperty(key)));
});
if (!changeEvents.isEmpty()) {
DefaultAnyValue back = old.copy();
try {
old.replace(AnyValue.loadFromProperties(newProps).getAnyValue("redkale").getAnyValue("mq").getAnyValue(mqName));
agent.onResourceChange(changeEvents.toArray(new ResourceEvent[changeEvents.size()]));
} catch (RuntimeException e) {
old.replace(back); //还原配置
throw e;
}
}
}
messageRemovedKeys.forEach(k -> messageProperties.remove(k));
messageProperties.putAll(messageChangedProps);
}
//第三方服务注册配置项的变更
if (!clusterChangedProps.isEmpty() || !clusterRemovedKeys.isEmpty()) {
if (this.clusterAgent != null) {
final DefaultAnyValue old = (DefaultAnyValue) config.getAnyValue("cluster");
Properties newProps = new Properties();
newProps.putAll(clusterProperties);
List<ResourceEvent> changeEvents = new ArrayList<>();
clusterChangedProps.forEach((k, v) -> {
final String key = k.toString();
newProps.put(k, v);
changeEvents.add(ResourceEvent.create(key.substring("redkale.cluster.".length()), v, clusterProperties.getProperty(key)));
});
clusterRemovedKeys.forEach(k -> {
final String key = k;
newProps.remove(k);
changeEvents.add(ResourceEvent.create(key.substring("redkale.cluster.".length()), null, clusterProperties.getProperty(key)));
});
if (!changeEvents.isEmpty()) {
DefaultAnyValue back = old.copy();
try {
old.replace(AnyValue.loadFromProperties(newProps).getAnyValue("redkale").getAnyValue("cluster"));
clusterAgent.onResourceChange(changeEvents.toArray(new ResourceEvent[changeEvents.size()]));
} catch (RuntimeException e) {
old.replace(back); //还原配置
throw e;
}
}
} else {
StringBuilder sb = new StringBuilder();
clusterChangedProps.forEach((k, v) -> {
sb.append(ClusterAgent.class.getSimpleName()).append(" skip change '").append(k).append("'\r\n");
});
clusterRemovedKeys.forEach(k -> {
sb.append(ClusterAgent.class.getSimpleName()).append(" skip change '").append(k).append("'\r\n");
});
if (!sb.isEmpty()) {
logger.log(Level.INFO, sb.toString());
}
}
clusterRemovedKeys.forEach(k -> clusterProperties.remove(k));
clusterProperties.putAll(clusterChangedProps);
}
}
}

View File

@@ -37,6 +37,8 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
protected ScheduledThreadPoolExecutor scheduler;
protected ScheduledFuture taskFuture;
//可能被HttpMessageClient用到的服务 key: serviceName
protected final ConcurrentHashMap<String, Collection<InetSocketAddress>> httpAddressMap = new ConcurrentHashMap<>();
@@ -46,13 +48,37 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
@Override
public void init(ResourceFactory factory, AnyValue config) {
super.init(factory, config);
this.sourceName = getSourceName();
this.ttls = config.getIntValue("ttls", 10);
if (this.ttls < 5) this.ttls = 10;
}
@Override
@ResourceListener
public void onResourceChange(ResourceEvent[] events) {
StringBuilder sb = new StringBuilder();
int newTtls = this.ttls;
for (ResourceEvent event : events) {
if ("ttls".equals(event.name())) {
newTtls = Integer.parseInt(event.newValue().toString());
if (newTtls < 5) {
sb.append(CacheClusterAgent.class.getSimpleName()).append("(name=").append(resourceName()).append(") cannot change '").append(event.name()).append("' to '").append(event.coverNewValue()).append("'\r\n");
} else {
sb.append(CacheClusterAgent.class.getSimpleName()).append("(name=").append(resourceName()).append(") change '").append(event.name()).append("' to '").append(event.coverNewValue()).append("'\r\n");
}
} else {
sb.append(CacheClusterAgent.class.getSimpleName()).append("(name=").append(resourceName()).append(") skip change '").append(event.name()).append("' to '").append(event.coverNewValue()).append("'\r\n");
}
}
if (newTtls != this.ttls) {
this.ttls = newTtls;
start();
}
if (!sb.isEmpty()) {
logger.log(Level.INFO, sb.toString());
}
}
@Override
public void setConfig(AnyValue config) {
super.setConfig(config);
@@ -88,22 +114,29 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
return t;
});
this.scheduler.scheduleAtFixedRate(() -> {
try {
checkApplicationHealth();
checkHttpAddressHealth();
loadMqtpAddressHealth();
localEntrys.values().stream().filter(e -> !e.canceled).forEach(entry -> {
checkLocalHealth(entry);
});
remoteEntrys.values().stream().filter(entry -> "SNCP".equalsIgnoreCase(entry.protocol)).forEach(entry -> {
updateSncpTransport(entry);
});
} catch (Exception e) {
logger.log(Level.SEVERE, "scheduleAtFixedRate check error", e instanceof CompletionException ? ((CompletionException) e).getCause() : e);
}
}, Math.max(2000, ttls * 1000), Math.max(2000, ttls * 1000), TimeUnit.MILLISECONDS);
}
if (this.taskFuture != null) {
this.taskFuture.cancel(true);
}
this.taskFuture = this.scheduler.scheduleAtFixedRate(newTask(), Math.max(2000, ttls * 1000), Math.max(2000, ttls * 1000), TimeUnit.MILLISECONDS);
}
private Runnable newTask() {
return () -> {
try {
checkApplicationHealth();
checkHttpAddressHealth();
loadMqtpAddressHealth();
localEntrys.values().stream().filter(e -> !e.canceled).forEach(entry -> {
checkLocalHealth(entry);
});
remoteEntrys.values().stream().filter(entry -> "SNCP".equalsIgnoreCase(entry.protocol)).forEach(entry -> {
updateSncpTransport(entry);
});
} catch (Exception e) {
logger.log(Level.SEVERE, "scheduleAtFixedRate check error", e instanceof CompletionException ? ((CompletionException) e).getCause() : e);
}
};
}
protected void loadMqtpAddressHealth() {

View File

@@ -83,6 +83,9 @@ public abstract class ClusterAgent {
}
}
@ResourceListener
public abstract void onResourceChange(ResourceEvent[] events);
public void destroy(AnyValue config) {
}

View File

@@ -29,7 +29,7 @@ import org.redkale.util.*;
*
* @since 2.1.0
*/
public abstract class MessageAgent {
public abstract class MessageAgent implements Resourcable {
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
@@ -93,6 +93,9 @@ public abstract class MessageAgent {
this.timeoutExecutor.setRemoveOnCancelPolicy(true);
}
@ResourceListener
public abstract void onResourceChange(ResourceEvent[] events);
public CompletableFuture<Map<String, Long>> start() {
final LinkedHashMap<String, Long> map = new LinkedHashMap<>();
final List<CompletableFuture> futures = new ArrayList<>();
@@ -143,6 +146,11 @@ public abstract class MessageAgent {
return producers;
}
@Override
public String resourceName() {
return name;
}
public MessageCoder<MessageRecord> getMessageCoder() {
return this.messageCoder;
}