优化DataSource配置中心通知
This commit is contained in:
@@ -1772,13 +1772,13 @@ public final class Application {
|
||||
void updateSourceProperties(Properties sourceChangeCache) {
|
||||
if (sourceChangeCache == null || sourceChangeCache.isEmpty()) return;
|
||||
synchronized (sourceProperties) {
|
||||
boolean same = true;
|
||||
Properties changedProps = new Properties();
|
||||
for (Map.Entry<Object, Object> en : sourceChangeCache.entrySet()) {
|
||||
String key = en.getKey().toString();
|
||||
if (key.startsWith("redkale.datasource.") || key.startsWith("redkale.datasource[")
|
||||
|| key.startsWith("redkale.cachesource.") || key.startsWith("redkale.cachesource[")) {
|
||||
if (!Objects.equals(en.getValue(), sourceProperties.get(key))) {
|
||||
same = false;
|
||||
changedProps.put(en.getKey(), en.getValue());
|
||||
if (key.endsWith(".name")) { //不更改source.name属性
|
||||
throw new RuntimeException("source properties contains illegal key: " + key);
|
||||
}
|
||||
@@ -1787,70 +1787,53 @@ public final class Application {
|
||||
throw new RuntimeException("source properties contains illegal key: " + key);
|
||||
}
|
||||
}
|
||||
if (same) return; //无内容改变
|
||||
AnyValue redNode = AnyValue.loadFromProperties(sourceChangeCache).getAnyValue("redkale");
|
||||
AnyValue cacheNode = redNode.getAnyValue("cachesource");
|
||||
Map<String, String> back = new HashMap<>();
|
||||
if (cacheNode != null) {
|
||||
cacheNode.forEach(null, (key, conf) -> {
|
||||
CacheSource source = Utility.find(cacheSources, s -> Objects.equals(s.resourceName(), key));
|
||||
if (source == null) return;
|
||||
back.clear();
|
||||
if (changedProps.isEmpty()) return; //无内容改变
|
||||
AnyValue newRedNode = AnyValue.loadFromProperties(changedProps).getAnyValue("redkale");
|
||||
AnyValue newCacheNode = newRedNode.getAnyValue("cachesource");
|
||||
if (newCacheNode != null) {
|
||||
newCacheNode.forEach(null, (sourceName, newConf) -> {
|
||||
CacheSource source = Utility.find(cacheSources, s -> Objects.equals(s.resourceName(), sourceName));
|
||||
if (source == null) return; //多余的数据源
|
||||
DefaultAnyValue old = (DefaultAnyValue) findSourceConfig(sourceName, "cachesource");
|
||||
old.merge(newConf);
|
||||
List<ResourceEvent> events = new ArrayList<>();
|
||||
DefaultAnyValue old = (DefaultAnyValue) findSourceConfig(key, "cachesource");
|
||||
conf.forEach((k, v) -> {
|
||||
if (old != null) {
|
||||
String o = old.getValue(k);
|
||||
back.put(k, o);
|
||||
events.add(ResourceEvent.create(k, v, o));
|
||||
old.setValue(k, v);
|
||||
} else {
|
||||
events.add(ResourceEvent.create(k, v, null));
|
||||
changedProps.forEach((k, v) -> {
|
||||
final String key = k.toString();
|
||||
String prefix = "redkale.cachesource[" + sourceName + "].";
|
||||
int pos = key.indexOf(prefix);
|
||||
if (pos < 0) {
|
||||
prefix = "redkale.cachesource." + sourceName + ".";
|
||||
pos = key.indexOf(prefix);
|
||||
}
|
||||
if (pos < 0) return;
|
||||
events.add(ResourceEvent.create(key.substring(prefix.length()), v, sourceProperties.get(key)));
|
||||
});
|
||||
try {
|
||||
((AbstractCacheSource) source).onResourceChange(events.toArray(new ResourceEvent[events.size()]));
|
||||
} catch (RuntimeException e) {
|
||||
if (old != null) { //回退
|
||||
back.forEach((k, v) -> {
|
||||
old.setValue(k, v);
|
||||
});
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
((AbstractCacheSource) source).onResourceChange(events.toArray(new ResourceEvent[events.size()]));
|
||||
});
|
||||
}
|
||||
AnyValue sourceNode = redNode.getAnyValue("datasource");
|
||||
if (sourceNode != null) {
|
||||
sourceNode.forEach(null, (key, conf) -> {
|
||||
DataSource source = Utility.find(dataSources, s -> Objects.equals(s.resourceName(), key));
|
||||
if (source == null) return;
|
||||
back.clear();
|
||||
AnyValue newSourceNode = newRedNode.getAnyValue("datasource");
|
||||
if (newSourceNode != null) {
|
||||
newSourceNode.forEach(null, (sourceName, newConf) -> {
|
||||
DataSource source = Utility.find(dataSources, s -> Objects.equals(s.resourceName(), sourceName));
|
||||
if (source == null) return; //多余的数据源
|
||||
DefaultAnyValue old = (DefaultAnyValue) findSourceConfig(sourceName, "datasource");
|
||||
old.merge(newConf);
|
||||
List<ResourceEvent> events = new ArrayList<>();
|
||||
DefaultAnyValue old = (DefaultAnyValue) findSourceConfig(key, "datasource");
|
||||
conf.forEach((k, v) -> {
|
||||
if (old != null) {
|
||||
String o = old.getValue(k);
|
||||
back.put(k, o);
|
||||
events.add(ResourceEvent.create(k, v, o));
|
||||
old.setValue(k, v);
|
||||
} else {
|
||||
events.add(ResourceEvent.create(k, v, null));
|
||||
changedProps.forEach((k, v) -> {
|
||||
final String key = k.toString();
|
||||
String prefix = "redkale.datasource[" + sourceName + "].";
|
||||
int pos = key.indexOf(prefix);
|
||||
if (pos < 0) {
|
||||
prefix = "redkale.datasource." + sourceName + ".";
|
||||
pos = key.indexOf(prefix);
|
||||
}
|
||||
if (pos < 0) return;
|
||||
events.add(ResourceEvent.create(key.substring(prefix.length()), v, sourceProperties.get(key)));
|
||||
});
|
||||
try {
|
||||
((AbstractDataSource) source).onResourceChange(events.toArray(new ResourceEvent[events.size()]));
|
||||
} catch (RuntimeException e) {
|
||||
if (old != null) { //回退
|
||||
back.forEach((k, v) -> {
|
||||
old.setValue(k, v);
|
||||
});
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
((AbstractDataSource) source).onResourceChange(events.toArray(new ResourceEvent[events.size()]));
|
||||
});
|
||||
}
|
||||
sourceProperties.putAll(sourceChangeCache);
|
||||
sourceProperties.putAll(changedProps);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -81,11 +81,14 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
|
||||
//用于复制表结构使用
|
||||
protected String tablecopySQL;
|
||||
|
||||
protected AnyValue config;
|
||||
|
||||
public DataSqlSource() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(AnyValue conf) {
|
||||
this.config = conf;
|
||||
if (conf.getAnyValue("read") == null) { //没有读写分离
|
||||
Properties rwConf = new Properties();
|
||||
conf.forEach((k, v) -> rwConf.put(k, decryptProperty(k, v)));
|
||||
|
||||
Reference in New Issue
Block a user