优化DataSqlSource.onResourceChange

This commit is contained in:
Redkale
2022-12-06 21:32:21 +08:00
parent 2663b6dea0
commit 7a915c7015
2 changed files with 85 additions and 66 deletions

View File

@@ -49,55 +49,18 @@ public class DataJdbcSource extends DataSqlSource {
}
@Override
@ResourceListener
public void onResourceChange(ResourceEvent[] events) {
super.onResourceChange(events);
StringBuilder sb = new StringBuilder();
if (readConfProps == writeConfProps) {
List<ResourceEvent> allEvents = new ArrayList<>();
for (ResourceEvent event : events) { //可能需要解密
allEvents.add(ResourceEvent.create(event.name(), decryptProperty(event.name(), event.newValue().toString()), event.oldValue()));
}
this.readPool.onResourceChange(allEvents.toArray(new ResourceEvent[allEvents.size()]));
for (ResourceEvent event : allEvents) {
this.readConfProps.put(event.name(), event.newValue());
sb.append("DataSource(name=").append(resourceName()).append(") the ").append(event.name()).append(" resource changed\r\n");
}
} else {
List<ResourceEvent> readEvents = new ArrayList<>();
List<ResourceEvent> writeEvents = new ArrayList<>();
for (ResourceEvent event : events) {
if (event.name().startsWith("read.")) {
String newName = event.name().substring("read.".length());
readEvents.add(ResourceEvent.create(newName, decryptProperty(newName, event.newValue().toString()), event.oldValue()));
} else {
String newName = event.name().substring("write.".length());
writeEvents.add(ResourceEvent.create(newName, decryptProperty(newName, event.newValue().toString()), event.oldValue()));
}
sb.append("DataSource(name=").append(resourceName()).append(") the ").append(event.name()).append(" resource changed\r\n");
}
if (!readEvents.isEmpty()) {
this.readPool.onResourceChange(readEvents.toArray(new ResourceEvent[readEvents.size()]));
}
if (!writeEvents.isEmpty()) {
this.writePool.onResourceChange(writeEvents.toArray(new ResourceEvent[writeEvents.size()]));
}
//更新Properties
if (!readEvents.isEmpty()) {
for (ResourceEvent event : readEvents) {
this.readConfProps.put(event.name(), event.newValue());
}
}
if (!writeEvents.isEmpty()) {
for (ResourceEvent event : writeEvents) {
this.writeConfProps.put(event.name(), event.newValue());
}
}
}
initSqlAttributes();
if (!sb.isEmpty()) {
logger.log(Level.INFO, sb.toString());
}
protected void updateOneResourceChange(Properties newProps, ResourceEvent[] events) {
this.readPool.onResourceChange(events);
}
@Override
protected void updateReadResourceChange(Properties newReadProps, ResourceEvent[] events) {
this.readPool.onResourceChange(events);
}
@Override
protected void updateWriteResourceChange(Properties newWriteProps, ResourceEvent[] events) {
this.writePool.onResourceChange(events);
}
@Override

View File

@@ -88,29 +88,17 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
public void init(AnyValue conf) {
if (conf.getAnyValue("read") == null) { //没有读写分离
Properties rwConf = new Properties();
conf.forEach((k, v) -> rwConf.put(k, v));
conf.forEach((k, v) -> rwConf.put(k, decryptProperty(k, v)));
this.dbtype = parseDbtype(rwConf.getProperty(DATA_SOURCE_URL));
rwConf.forEach((k, v) -> {
String n = decryptProperty(k.toString(), v == null ? null : v.toString());
if (!Objects.equals(n, v)) rwConf.put(k, n);
});
initProperties(rwConf);
this.readConfProps = rwConf;
this.writeConfProps = rwConf;
} else { //读写分离
Properties readConf = new Properties();
Properties writeConf = new Properties();
conf.getAnyValue("read").forEach((k, v) -> readConf.put(k, v));
conf.getAnyValue("write").forEach((k, v) -> writeConf.put(k, v));
conf.getAnyValue("read").forEach((k, v) -> readConf.put(k, decryptProperty(k, v)));
conf.getAnyValue("write").forEach((k, v) -> writeConf.put(k, decryptProperty(k, v)));
this.dbtype = parseDbtype(readConf.getProperty(DATA_SOURCE_URL));
readConf.forEach((k, v) -> {
String n = decryptProperty(k.toString(), v == null ? null : v.toString());
if (!Objects.equals(n, v)) readConf.put(k, n);
});
writeConf.forEach((k, v) -> {
String n = decryptProperty(k.toString(), v == null ? null : v.toString());
if (!Objects.equals(n, v)) writeConf.put(k, n);
});
initProperties(readConf);
initProperties(writeConf);
this.readConfProps = readConf;
@@ -118,10 +106,10 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
}
this.name = conf.getValue("name", "");
this.sqlFormatter = (info, val) -> formatValueToString(info, val);
initSqlAttributes();
afterResourceChange();
}
protected void initSqlAttributes() {
protected void afterResourceChange() {
this.autoddl = "true".equals(readConfProps.getProperty(DATA_SOURCE_TABLE_AUTODDL, "false").trim());
this.containSQL = readConfProps.getProperty(DATA_SOURCE_CONTAIN_SQLTEMPLATE, "LOCATE(${keystr}, ${column}) > 0");
@@ -145,6 +133,74 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
&& (!events[0].name().startsWith("read.") && !events[0].name().startsWith("write."))) {
throw new RuntimeException("DataSource(name=" + resourceName() + ") not support to change to non read/write separation mode");
}
StringBuilder sb = new StringBuilder();
if (readConfProps == writeConfProps) {
List<ResourceEvent> allEvents = new ArrayList<>();
Properties newProps = new Properties(this.readConfProps);
for (ResourceEvent event : events) { //可能需要解密
String newValue = decryptProperty(event.name(), event.newValue().toString());
allEvents.add(ResourceEvent.create(event.name(), newValue, event.oldValue()));
newProps.put(event.name(), newValue);
sb.append("DataSource(name=").append(resourceName()).append(") the ").append(event.name()).append(" resource changed\r\n");
}
updateOneResourceChange(newProps, allEvents.toArray(new ResourceEvent[allEvents.size()]));
for (ResourceEvent event : allEvents) {
this.readConfProps.put(event.name(), event.newValue());
}
} else {
List<ResourceEvent> readEvents = new ArrayList<>();
List<ResourceEvent> writeEvents = new ArrayList<>();
Properties newReadProps = new Properties(this.readConfProps);
Properties newWriteProps = new Properties(this.writeConfProps);
for (ResourceEvent event : events) {
if (event.name().startsWith("read.")) {
String newName = event.name().substring("read.".length());
String newValue = decryptProperty(event.name(), event.newValue().toString());
readEvents.add(ResourceEvent.create(newName, newValue, event.oldValue()));
newReadProps.put(event.name(), newValue);
} else {
String newName = event.name().substring("write.".length());
String newValue = decryptProperty(event.name(), event.newValue().toString());
writeEvents.add(ResourceEvent.create(newName, newValue, event.oldValue()));
newWriteProps.put(event.name(), newValue);
}
sb.append("DataSource(name=").append(resourceName()).append(") the ").append(event.name()).append(" resource changed\r\n");
}
if (!readEvents.isEmpty()) {
updateReadResourceChange(newReadProps, readEvents.toArray(new ResourceEvent[readEvents.size()]));
}
if (!writeEvents.isEmpty()) {
updateWriteResourceChange(newWriteProps, writeEvents.toArray(new ResourceEvent[writeEvents.size()]));
}
//更新Properties
if (!readEvents.isEmpty()) {
for (ResourceEvent event : readEvents) {
this.readConfProps.put(event.name(), event.newValue());
}
}
if (!writeEvents.isEmpty()) {
for (ResourceEvent event : writeEvents) {
this.writeConfProps.put(event.name(), event.newValue());
}
}
}
afterResourceChange();
if (!sb.isEmpty()) {
logger.log(Level.INFO, sb.toString());
}
}
protected void updateOneResourceChange(Properties newProps, ResourceEvent[] events) {
throw new UnsupportedOperationException("Not supported yet.");
}
protected void updateReadResourceChange(Properties newReadProps, ResourceEvent[] events) {
throw new UnsupportedOperationException("Not supported yet.");
}
protected void updateWriteResourceChange(Properties newWriteProps, ResourceEvent[] events) {
throw new UnsupportedOperationException("Not supported yet.");
}
//解密可能存在的加密字段, 可重载