DataJdbcSource支持配置中心动态变更配置

This commit is contained in:
Redkale
2022-12-06 19:53:15 +08:00
parent cc3c07d0c2
commit 2663b6dea0
4 changed files with 178 additions and 35 deletions

View File

@@ -989,7 +989,11 @@ public final class Application {
if (redNode != null) {
AnyValue sourceNode = redNode.getAnyValue(sourceType);
if (sourceNode != null) {
return sourceNode.getAnyValue(sourceName);
AnyValue confNode = sourceNode.getAnyValue(sourceName);
if (confNode != null) { //必须要设置name属性
((DefaultAnyValue) confNode).setValue("name", sourceName);
}
return confNode;
}
}
return null;
@@ -1762,6 +1766,9 @@ public final class Application {
|| key.startsWith("redkale.cachesource.") || key.startsWith("redkale.cachesource[")) {
if (!Objects.equals(en.getValue(), sourceProperties.get(key))) {
same = false;
if (key.endsWith(".name")) { //不更改source.name属性
throw new RuntimeException("source properties contains illegal key: " + key);
}
}
} else {
throw new RuntimeException("source properties contains illegal key: " + key);
@@ -1770,39 +1777,64 @@ public final class Application {
if (same) return; //无内容改变
AnyValue redNode = AnyValue.loadFromProperties(sourceChangeCache).getAnyValue("redkale");
AnyValue cacheNode = redNode.getAnyValue("cachesource");
Map<String, String> back = new HashMap<>();
if (cacheNode != null) {
cacheNode.forEach(null, (name, conf) -> {
CacheSource source = Utility.find(cacheSources, s -> Objects.equals(s.resourceName(), name));
cacheNode.forEach(null, (key, conf) -> {
CacheSource source = Utility.find(cacheSources, s -> Objects.equals(s.resourceName(), key));
if (source == null) return;
back.clear();
List<ResourceEvent> events = new ArrayList<>();
AnyValue old = findSourceConfig(name, "cachesource");
DefaultAnyValue old = (DefaultAnyValue) findSourceConfig(key, "cachesource");
conf.forEach((k, v) -> {
if (old != null) {
events.add(ResourceEvent.create(k, v, old.getValue(k)));
((DefaultAnyValue) old).setValue(k, v);
String o = old.getValue(k);
back.put(k, o);
events.add(ResourceEvent.create(k, v, o));
old.setValue(k, v);
} else {
events.add(ResourceEvent.create(k, v, null));
}
});
((AbstractCacheSource) source).onResourceChange(events.toArray(new ResourceEvent[events.size()]));
try {
((AbstractCacheSource) source).onResourceChange(events.toArray(new ResourceEvent[events.size()]));
} catch (RuntimeException e) {
if (old != null) { //回退
back.forEach((k, v) -> {
old.setValue(k, v);
});
}
throw e;
}
});
}
AnyValue sourceNode = redNode.getAnyValue("datasource");
if (sourceNode != null) {
sourceNode.forEach(null, (name, conf) -> {
DataSource source = Utility.find(dataSources, s -> Objects.equals(s.resourceName(), name));
sourceNode.forEach(null, (key, conf) -> {
DataSource source = Utility.find(dataSources, s -> Objects.equals(s.resourceName(), key));
if (source == null) return;
back.clear();
List<ResourceEvent> events = new ArrayList<>();
AnyValue old = findSourceConfig(name, "datasource");
DefaultAnyValue old = (DefaultAnyValue) findSourceConfig(key, "datasource");
conf.forEach((k, v) -> {
if (old != null) {
events.add(ResourceEvent.create(k, v, old.getValue(k)));
((DefaultAnyValue) old).setValue(k, v);
String o = old.getValue(k);
back.put(k, o);
events.add(ResourceEvent.create(k, v, o));
old.setValue(k, v);
} else {
events.add(ResourceEvent.create(k, v, null));
}
});
((AbstractDataSource) source).onResourceChange(events.toArray(new ResourceEvent[events.size()]));
try {
((AbstractDataSource) source).onResourceChange(events.toArray(new ResourceEvent[events.size()]));
} catch (RuntimeException e) {
if (old != null) { //回退
back.forEach((k, v) -> {
old.setValue(k, v);
});
}
throw e;
}
});
}
sourceProperties.putAll(sourceChangeCache);

View File

@@ -94,7 +94,7 @@ public abstract class AbstractDataSource extends AbstractService implements Data
@ResourceListener
public abstract void onResourceChange(ResourceEvent[] events);
public static String parseDbtype(String url) {
String dbtype = null;
if (url == null) return dbtype;
@@ -121,8 +121,8 @@ public abstract class AbstractDataSource extends AbstractService implements Data
return dbtype;
}
protected UrlInfo parseUrl(final String url) {
final UrlInfo info = new UrlInfo();
protected SourceUrlInfo parseSourceUrl(final String url) {
final SourceUrlInfo info = new SourceUrlInfo();
info.url = url;
if (url.startsWith("jdbc:h2:")) return info;
String url0 = url.substring(url.indexOf("://") + 3);
@@ -154,7 +154,7 @@ public abstract class AbstractDataSource extends AbstractService implements Data
return info;
}
protected static class UrlInfo {
public static class SourceUrlInfo {
public Properties attributes = new Properties();
@@ -164,6 +164,12 @@ public abstract class AbstractDataSource extends AbstractService implements Data
public InetSocketAddress servaddr;
public String username;
public String password;
public String encoding;
@Override
public String toString() {
return JsonConvert.root().convertTo(this);

View File

@@ -51,13 +51,59 @@ public class DataJdbcSource extends DataSqlSource {
@Override
@ResourceListener
public void onResourceChange(ResourceEvent[] events) {
//@TODO 待实现
super.onResourceChange(events);
StringBuilder sb = new StringBuilder();
if (readConfProps == writeConfProps) {
List<ResourceEvent> allEvents = new ArrayList<>();
for (ResourceEvent event : events) { //可能需要解密
allEvents.add(ResourceEvent.create(event.name(), decryptProperty(event.name(), event.newValue().toString()), event.oldValue()));
}
this.readPool.onResourceChange(allEvents.toArray(new ResourceEvent[allEvents.size()]));
for (ResourceEvent event : allEvents) {
this.readConfProps.put(event.name(), event.newValue());
sb.append("DataSource(name=").append(resourceName()).append(") the ").append(event.name()).append(" resource changed\r\n");
}
} else {
List<ResourceEvent> readEvents = new ArrayList<>();
List<ResourceEvent> writeEvents = new ArrayList<>();
for (ResourceEvent event : events) {
if (event.name().startsWith("read.")) {
String newName = event.name().substring("read.".length());
readEvents.add(ResourceEvent.create(newName, decryptProperty(newName, event.newValue().toString()), event.oldValue()));
} else {
String newName = event.name().substring("write.".length());
writeEvents.add(ResourceEvent.create(newName, decryptProperty(newName, event.newValue().toString()), event.oldValue()));
}
sb.append("DataSource(name=").append(resourceName()).append(") the ").append(event.name()).append(" resource changed\r\n");
}
if (!readEvents.isEmpty()) {
this.readPool.onResourceChange(readEvents.toArray(new ResourceEvent[readEvents.size()]));
}
if (!writeEvents.isEmpty()) {
this.writePool.onResourceChange(writeEvents.toArray(new ResourceEvent[writeEvents.size()]));
}
//更新Properties
if (!readEvents.isEmpty()) {
for (ResourceEvent event : readEvents) {
this.readConfProps.put(event.name(), event.newValue());
}
}
if (!writeEvents.isEmpty()) {
for (ResourceEvent event : writeEvents) {
this.writeConfProps.put(event.name(), event.newValue());
}
}
}
initSqlAttributes();
if (!sb.isEmpty()) {
logger.log(Level.INFO, sb.toString());
}
}
@Override
public void destroy(AnyValue config) {
if (readPool != null) readPool.close();
if (writePool != null) writePool.close();
if (writePool != null && writePool != readPool) writePool.close();
}
@Local
@@ -65,7 +111,7 @@ public class DataJdbcSource extends DataSqlSource {
public void close() throws Exception {
super.close();
if (readPool != null) readPool.close();
if (writePool != null) writePool.close();
if (writePool != null && writePool != readPool) writePool.close();
}
public static boolean acceptsConf(AnyValue conf) {
@@ -1124,6 +1170,10 @@ public class DataJdbcSource extends DataSqlSource {
protected String url;
protected int urlVersion;
protected Properties clientInfo = new Properties();
public ConnectionPool(Properties prop) {
this.connectTimeoutSeconds = Integer.decode(prop.getProperty(DATA_SOURCE_CONNECTTIMEOUT_SECONDS, "6"));
this.maxconns = Math.max(1, Integer.decode(prop.getProperty(DATA_SOURCE_MAXCONNS, "" + Utility.cpus() * 4)));
@@ -1139,21 +1189,48 @@ public class DataJdbcSource extends DataSqlSource {
} catch (SQLException e) {
throw new RuntimeException(e);
}
clientInfo.put("version", urlVersion);
}
@ResourceListener
public void onResourceChange(ResourceEvent[] events) {
public synchronized void onResourceChange(ResourceEvent[] events) {
String newUrl = this.url;
int newConnectTimeoutSeconds = this.connectTimeoutSeconds;
int newMaxconns = this.maxconns;
String newUser = this.connectAttrs.getProperty("user");
String newPassword = this.connectAttrs.getProperty("password");
for (ResourceEvent event : events) {
if (event.name().equals(DATA_SOURCE_CONNECTTIMEOUT_SECONDS) || event.name().endsWith("." + DATA_SOURCE_CONNECTTIMEOUT_SECONDS)) {
this.connectTimeoutSeconds = Integer.decode(event.newValue().toString());
} else if (event.name().equals(DATA_SOURCE_URL) || event.name().endsWith("." + DATA_SOURCE_URL)) {
this.url = event.newValue().toString();
if (event.name().equals(DATA_SOURCE_URL) || event.name().endsWith("." + DATA_SOURCE_URL)) {
newUrl = event.newValue().toString();
} else if (event.name().equals(DATA_SOURCE_CONNECTTIMEOUT_SECONDS) || event.name().endsWith("." + DATA_SOURCE_CONNECTTIMEOUT_SECONDS)) {
newConnectTimeoutSeconds = Integer.decode(event.newValue().toString());
} else if (event.name().equals(DATA_SOURCE_USER) || event.name().endsWith("." + DATA_SOURCE_USER)) {
this.connectAttrs.put("user", event.newValue().toString());
newUser = event.newValue().toString();
} else if (event.name().equals(DATA_SOURCE_PASSWORD) || event.name().endsWith("." + DATA_SOURCE_PASSWORD)) {
this.connectAttrs.put("password", event.newValue().toString());
newPassword = event.newValue().toString();
} else if (event.name().equals(DATA_SOURCE_MAXCONNS) || event.name().endsWith("." + DATA_SOURCE_MAXCONNS)) {
logger.log(Level.WARNING, event.name() + " (new-value: " + event.newValue() + ") will not take effect");
newMaxconns = Math.max(1, Integer.decode(event.newValue().toString()));
}
}
if (!Objects.equals(newUser, this.connectAttrs.get("user"))
|| !Objects.equals(newPassword, this.connectAttrs.get("password")) || !Objects.equals(newUrl, url)) {
this.urlVersion++;
Properties newClientInfo = new Properties();
newClientInfo.put("version", urlVersion);
this.clientInfo = newClientInfo;
}
this.url = newUrl;
this.connectTimeoutSeconds = newConnectTimeoutSeconds;
this.connectAttrs.put("user", newUser);
this.connectAttrs.put("password", newPassword);
if (newMaxconns != this.maxconns) {
ArrayBlockingQueue<Connection> newQueue = new ArrayBlockingQueue<>(newMaxconns);
ArrayBlockingQueue<Connection> oldQueue = this.queue;
this.queue = newQueue;
this.maxconns = newMaxconns;
Connection conn;
while ((conn = oldQueue.poll()) != null) {
offerConnection(conn);
}
}
}
@@ -1183,6 +1260,7 @@ public class DataJdbcSource extends DataSqlSource {
}
try {
conn = driver.connect(url, connectAttrs);
conn.setClientInfo(clientInfo);
} catch (SQLException ex) {
throw new RuntimeException(ex);
}
@@ -1210,7 +1288,11 @@ public class DataJdbcSource extends DataSqlSource {
protected boolean checkValid(Connection conn) {
try {
return !conn.isClosed() && conn.isValid(1);
boolean rs = !conn.isClosed() && conn.isValid(1);
if (!rs) return rs;
Properties prop = conn.getClientInfo();
if (prop == null) return false;
return prop == clientInfo || Objects.equals(prop.getProperty("version"), clientInfo.getProperty("version"));
} catch (SQLException ex) {
if (!"08S01".equals(ex.getSQLState())) {//MySQL特性 长时间连接没使用会抛出com.mysql.jdbc.exceptions.jdbc4.CommunicationsException
logger.log(Level.FINER, "result.getConnection from pooled connection abort [" + ex.getSQLState() + "]", ex);

View File

@@ -90,7 +90,10 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
Properties rwConf = new Properties();
conf.forEach((k, v) -> rwConf.put(k, v));
this.dbtype = parseDbtype(rwConf.getProperty(DATA_SOURCE_URL));
decryptProperties(rwConf);
rwConf.forEach((k, v) -> {
String n = decryptProperty(k.toString(), v == null ? null : v.toString());
if (!Objects.equals(n, v)) rwConf.put(k, n);
});
initProperties(rwConf);
this.readConfProps = rwConf;
this.writeConfProps = rwConf;
@@ -100,8 +103,14 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
conf.getAnyValue("read").forEach((k, v) -> readConf.put(k, v));
conf.getAnyValue("write").forEach((k, v) -> writeConf.put(k, v));
this.dbtype = parseDbtype(readConf.getProperty(DATA_SOURCE_URL));
decryptProperties(readConf);
decryptProperties(writeConf);
readConf.forEach((k, v) -> {
String n = decryptProperty(k.toString(), v == null ? null : v.toString());
if (!Objects.equals(n, v)) readConf.put(k, n);
});
writeConf.forEach((k, v) -> {
String n = decryptProperty(k.toString(), v == null ? null : v.toString());
if (!Objects.equals(n, v)) writeConf.put(k, n);
});
initProperties(readConf);
initProperties(writeConf);
this.readConfProps = readConf;
@@ -109,6 +118,10 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
}
this.name = conf.getValue("name", "");
this.sqlFormatter = (info, val) -> formatValueToString(info, val);
initSqlAttributes();
}
protected void initSqlAttributes() {
this.autoddl = "true".equals(readConfProps.getProperty(DATA_SOURCE_TABLE_AUTODDL, "false").trim());
this.containSQL = readConfProps.getProperty(DATA_SOURCE_CONTAIN_SQLTEMPLATE, "LOCATE(${keystr}, ${column}) > 0");
@@ -119,14 +132,24 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
this.cacheForbidden = "NONE".equalsIgnoreCase(readConfProps.getProperty(DATA_SOURCE_CACHEMODE));
}
@Override
@ResourceListener
public void onResourceChange(ResourceEvent[] events) {
//@TODO 待实现
if (events == null || events.length < 1) return;
//不支持读写分离模式的动态切换
if (readConfProps == writeConfProps
&& (events[0].name().startsWith("read.") || events[0].name().startsWith("write."))) {
throw new RuntimeException("DataSource(name=" + resourceName() + ") not support to change to read/write separation mode");
}
if (readConfProps != writeConfProps
&& (!events[0].name().startsWith("read.") && !events[0].name().startsWith("write."))) {
throw new RuntimeException("DataSource(name=" + resourceName() + ") not support to change to non read/write separation mode");
}
}
//解密可能存在的加密字段, 可重载
protected void decryptProperties(Properties props) {
protected String decryptProperty(String key, String value) {
return value;
}
protected void initProperties(Properties props) {