From 2663b6dea08dfce5ff2a2410d58496970dde2331 Mon Sep 17 00:00:00 2001 From: Redkale Date: Tue, 6 Dec 2022 19:53:15 +0800 Subject: [PATCH] =?UTF-8?q?DataJdbcSource=E6=94=AF=E6=8C=81=E9=85=8D?= =?UTF-8?q?=E7=BD=AE=E4=B8=AD=E5=BF=83=E5=8A=A8=E6=80=81=E5=8F=98=E6=9B=B4?= =?UTF-8?q?=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/boot/Application.java | 58 +++++++--- .../redkale/source/AbstractDataSource.java | 14 ++- .../org/redkale/source/DataJdbcSource.java | 106 ++++++++++++++++-- .../org/redkale/source/DataSqlSource.java | 35 +++++- 4 files changed, 178 insertions(+), 35 deletions(-) diff --git a/src/main/java/org/redkale/boot/Application.java b/src/main/java/org/redkale/boot/Application.java index e282f8476..025f2389e 100644 --- a/src/main/java/org/redkale/boot/Application.java +++ b/src/main/java/org/redkale/boot/Application.java @@ -989,7 +989,11 @@ public final class Application { if (redNode != null) { AnyValue sourceNode = redNode.getAnyValue(sourceType); if (sourceNode != null) { - return sourceNode.getAnyValue(sourceName); + AnyValue confNode = sourceNode.getAnyValue(sourceName); + if (confNode != null) { //必须要设置name属性 + ((DefaultAnyValue) confNode).setValue("name", sourceName); + } + return confNode; } } return null; @@ -1762,6 +1766,9 @@ public final class Application { || key.startsWith("redkale.cachesource.") || key.startsWith("redkale.cachesource[")) { if (!Objects.equals(en.getValue(), sourceProperties.get(key))) { same = false; + if (key.endsWith(".name")) { //不更改source.name属性 + throw new RuntimeException("source properties contains illegal key: " + key); + } } } else { throw new RuntimeException("source properties contains illegal key: " + key); @@ -1770,39 +1777,64 @@ public final class Application { if (same) return; //无内容改变 AnyValue redNode = AnyValue.loadFromProperties(sourceChangeCache).getAnyValue("redkale"); AnyValue cacheNode = redNode.getAnyValue("cachesource"); + Map back = new HashMap<>(); if (cacheNode != null) { - cacheNode.forEach(null, (name, conf) -> { - CacheSource source = Utility.find(cacheSources, s -> Objects.equals(s.resourceName(), name)); + cacheNode.forEach(null, (key, conf) -> { + CacheSource source = Utility.find(cacheSources, s -> Objects.equals(s.resourceName(), key)); if (source == null) return; + back.clear(); List events = new ArrayList<>(); - AnyValue old = findSourceConfig(name, "cachesource"); + DefaultAnyValue old = (DefaultAnyValue) findSourceConfig(key, "cachesource"); conf.forEach((k, v) -> { if (old != null) { - events.add(ResourceEvent.create(k, v, old.getValue(k))); - ((DefaultAnyValue) old).setValue(k, v); + String o = old.getValue(k); + back.put(k, o); + events.add(ResourceEvent.create(k, v, o)); + old.setValue(k, v); } else { events.add(ResourceEvent.create(k, v, null)); } }); - ((AbstractCacheSource) source).onResourceChange(events.toArray(new ResourceEvent[events.size()])); + try { + ((AbstractCacheSource) source).onResourceChange(events.toArray(new ResourceEvent[events.size()])); + } catch (RuntimeException e) { + if (old != null) { //回退 + back.forEach((k, v) -> { + old.setValue(k, v); + }); + } + throw e; + } }); } AnyValue sourceNode = redNode.getAnyValue("datasource"); if (sourceNode != null) { - sourceNode.forEach(null, (name, conf) -> { - DataSource source = Utility.find(dataSources, s -> Objects.equals(s.resourceName(), name)); + sourceNode.forEach(null, (key, conf) -> { + DataSource source = Utility.find(dataSources, s -> Objects.equals(s.resourceName(), key)); if (source == null) return; + back.clear(); List events = new ArrayList<>(); - AnyValue old = findSourceConfig(name, "datasource"); + DefaultAnyValue old = (DefaultAnyValue) findSourceConfig(key, "datasource"); conf.forEach((k, v) -> { if (old != null) { - events.add(ResourceEvent.create(k, v, old.getValue(k))); - ((DefaultAnyValue) old).setValue(k, v); + String o = old.getValue(k); + back.put(k, o); + events.add(ResourceEvent.create(k, v, o)); + old.setValue(k, v); } else { events.add(ResourceEvent.create(k, v, null)); } }); - ((AbstractDataSource) source).onResourceChange(events.toArray(new ResourceEvent[events.size()])); + try { + ((AbstractDataSource) source).onResourceChange(events.toArray(new ResourceEvent[events.size()])); + } catch (RuntimeException e) { + if (old != null) { //回退 + back.forEach((k, v) -> { + old.setValue(k, v); + }); + } + throw e; + } }); } sourceProperties.putAll(sourceChangeCache); diff --git a/src/main/java/org/redkale/source/AbstractDataSource.java b/src/main/java/org/redkale/source/AbstractDataSource.java index c231cc71b..bf6184248 100644 --- a/src/main/java/org/redkale/source/AbstractDataSource.java +++ b/src/main/java/org/redkale/source/AbstractDataSource.java @@ -94,7 +94,7 @@ public abstract class AbstractDataSource extends AbstractService implements Data @ResourceListener public abstract void onResourceChange(ResourceEvent[] events); - + public static String parseDbtype(String url) { String dbtype = null; if (url == null) return dbtype; @@ -121,8 +121,8 @@ public abstract class AbstractDataSource extends AbstractService implements Data return dbtype; } - protected UrlInfo parseUrl(final String url) { - final UrlInfo info = new UrlInfo(); + protected SourceUrlInfo parseSourceUrl(final String url) { + final SourceUrlInfo info = new SourceUrlInfo(); info.url = url; if (url.startsWith("jdbc:h2:")) return info; String url0 = url.substring(url.indexOf("://") + 3); @@ -154,7 +154,7 @@ public abstract class AbstractDataSource extends AbstractService implements Data return info; } - protected static class UrlInfo { + public static class SourceUrlInfo { public Properties attributes = new Properties(); @@ -164,6 +164,12 @@ public abstract class AbstractDataSource extends AbstractService implements Data public InetSocketAddress servaddr; + public String username; + + public String password; + + public String encoding; + @Override public String toString() { return JsonConvert.root().convertTo(this); diff --git a/src/main/java/org/redkale/source/DataJdbcSource.java b/src/main/java/org/redkale/source/DataJdbcSource.java index 50f546278..9addbfc36 100644 --- a/src/main/java/org/redkale/source/DataJdbcSource.java +++ b/src/main/java/org/redkale/source/DataJdbcSource.java @@ -51,13 +51,59 @@ public class DataJdbcSource extends DataSqlSource { @Override @ResourceListener public void onResourceChange(ResourceEvent[] events) { - //@TODO 待实现 + super.onResourceChange(events); + StringBuilder sb = new StringBuilder(); + if (readConfProps == writeConfProps) { + List allEvents = new ArrayList<>(); + for (ResourceEvent event : events) { //可能需要解密 + allEvents.add(ResourceEvent.create(event.name(), decryptProperty(event.name(), event.newValue().toString()), event.oldValue())); + } + this.readPool.onResourceChange(allEvents.toArray(new ResourceEvent[allEvents.size()])); + for (ResourceEvent event : allEvents) { + this.readConfProps.put(event.name(), event.newValue()); + sb.append("DataSource(name=").append(resourceName()).append(") the ").append(event.name()).append(" resource changed\r\n"); + } + } else { + List readEvents = new ArrayList<>(); + List writeEvents = new ArrayList<>(); + for (ResourceEvent event : events) { + if (event.name().startsWith("read.")) { + String newName = event.name().substring("read.".length()); + readEvents.add(ResourceEvent.create(newName, decryptProperty(newName, event.newValue().toString()), event.oldValue())); + } else { + String newName = event.name().substring("write.".length()); + writeEvents.add(ResourceEvent.create(newName, decryptProperty(newName, event.newValue().toString()), event.oldValue())); + } + sb.append("DataSource(name=").append(resourceName()).append(") the ").append(event.name()).append(" resource changed\r\n"); + } + if (!readEvents.isEmpty()) { + this.readPool.onResourceChange(readEvents.toArray(new ResourceEvent[readEvents.size()])); + } + if (!writeEvents.isEmpty()) { + this.writePool.onResourceChange(writeEvents.toArray(new ResourceEvent[writeEvents.size()])); + } + //更新Properties + if (!readEvents.isEmpty()) { + for (ResourceEvent event : readEvents) { + this.readConfProps.put(event.name(), event.newValue()); + } + } + if (!writeEvents.isEmpty()) { + for (ResourceEvent event : writeEvents) { + this.writeConfProps.put(event.name(), event.newValue()); + } + } + } + initSqlAttributes(); + if (!sb.isEmpty()) { + logger.log(Level.INFO, sb.toString()); + } } @Override public void destroy(AnyValue config) { if (readPool != null) readPool.close(); - if (writePool != null) writePool.close(); + if (writePool != null && writePool != readPool) writePool.close(); } @Local @@ -65,7 +111,7 @@ public class DataJdbcSource extends DataSqlSource { public void close() throws Exception { super.close(); if (readPool != null) readPool.close(); - if (writePool != null) writePool.close(); + if (writePool != null && writePool != readPool) writePool.close(); } public static boolean acceptsConf(AnyValue conf) { @@ -1124,6 +1170,10 @@ public class DataJdbcSource extends DataSqlSource { protected String url; + protected int urlVersion; + + protected Properties clientInfo = new Properties(); + public ConnectionPool(Properties prop) { this.connectTimeoutSeconds = Integer.decode(prop.getProperty(DATA_SOURCE_CONNECTTIMEOUT_SECONDS, "6")); this.maxconns = Math.max(1, Integer.decode(prop.getProperty(DATA_SOURCE_MAXCONNS, "" + Utility.cpus() * 4))); @@ -1139,21 +1189,48 @@ public class DataJdbcSource extends DataSqlSource { } catch (SQLException e) { throw new RuntimeException(e); } + clientInfo.put("version", urlVersion); } @ResourceListener - public void onResourceChange(ResourceEvent[] events) { + public synchronized void onResourceChange(ResourceEvent[] events) { + String newUrl = this.url; + int newConnectTimeoutSeconds = this.connectTimeoutSeconds; + int newMaxconns = this.maxconns; + String newUser = this.connectAttrs.getProperty("user"); + String newPassword = this.connectAttrs.getProperty("password"); for (ResourceEvent event : events) { - if (event.name().equals(DATA_SOURCE_CONNECTTIMEOUT_SECONDS) || event.name().endsWith("." + DATA_SOURCE_CONNECTTIMEOUT_SECONDS)) { - this.connectTimeoutSeconds = Integer.decode(event.newValue().toString()); - } else if (event.name().equals(DATA_SOURCE_URL) || event.name().endsWith("." + DATA_SOURCE_URL)) { - this.url = event.newValue().toString(); + if (event.name().equals(DATA_SOURCE_URL) || event.name().endsWith("." + DATA_SOURCE_URL)) { + newUrl = event.newValue().toString(); + } else if (event.name().equals(DATA_SOURCE_CONNECTTIMEOUT_SECONDS) || event.name().endsWith("." + DATA_SOURCE_CONNECTTIMEOUT_SECONDS)) { + newConnectTimeoutSeconds = Integer.decode(event.newValue().toString()); } else if (event.name().equals(DATA_SOURCE_USER) || event.name().endsWith("." + DATA_SOURCE_USER)) { - this.connectAttrs.put("user", event.newValue().toString()); + newUser = event.newValue().toString(); } else if (event.name().equals(DATA_SOURCE_PASSWORD) || event.name().endsWith("." + DATA_SOURCE_PASSWORD)) { - this.connectAttrs.put("password", event.newValue().toString()); + newPassword = event.newValue().toString(); } else if (event.name().equals(DATA_SOURCE_MAXCONNS) || event.name().endsWith("." + DATA_SOURCE_MAXCONNS)) { - logger.log(Level.WARNING, event.name() + " (new-value: " + event.newValue() + ") will not take effect"); + newMaxconns = Math.max(1, Integer.decode(event.newValue().toString())); + } + } + if (!Objects.equals(newUser, this.connectAttrs.get("user")) + || !Objects.equals(newPassword, this.connectAttrs.get("password")) || !Objects.equals(newUrl, url)) { + this.urlVersion++; + Properties newClientInfo = new Properties(); + newClientInfo.put("version", urlVersion); + this.clientInfo = newClientInfo; + } + this.url = newUrl; + this.connectTimeoutSeconds = newConnectTimeoutSeconds; + this.connectAttrs.put("user", newUser); + this.connectAttrs.put("password", newPassword); + if (newMaxconns != this.maxconns) { + ArrayBlockingQueue newQueue = new ArrayBlockingQueue<>(newMaxconns); + ArrayBlockingQueue oldQueue = this.queue; + this.queue = newQueue; + this.maxconns = newMaxconns; + Connection conn; + while ((conn = oldQueue.poll()) != null) { + offerConnection(conn); } } } @@ -1183,6 +1260,7 @@ public class DataJdbcSource extends DataSqlSource { } try { conn = driver.connect(url, connectAttrs); + conn.setClientInfo(clientInfo); } catch (SQLException ex) { throw new RuntimeException(ex); } @@ -1210,7 +1288,11 @@ public class DataJdbcSource extends DataSqlSource { protected boolean checkValid(Connection conn) { try { - return !conn.isClosed() && conn.isValid(1); + boolean rs = !conn.isClosed() && conn.isValid(1); + if (!rs) return rs; + Properties prop = conn.getClientInfo(); + if (prop == null) return false; + return prop == clientInfo || Objects.equals(prop.getProperty("version"), clientInfo.getProperty("version")); } catch (SQLException ex) { if (!"08S01".equals(ex.getSQLState())) {//MySQL特性, 长时间连接没使用会抛出com.mysql.jdbc.exceptions.jdbc4.CommunicationsException logger.log(Level.FINER, "result.getConnection from pooled connection abort [" + ex.getSQLState() + "]", ex); diff --git a/src/main/java/org/redkale/source/DataSqlSource.java b/src/main/java/org/redkale/source/DataSqlSource.java index d41d33d1c..883dd4bda 100644 --- a/src/main/java/org/redkale/source/DataSqlSource.java +++ b/src/main/java/org/redkale/source/DataSqlSource.java @@ -90,7 +90,10 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi Properties rwConf = new Properties(); conf.forEach((k, v) -> rwConf.put(k, v)); this.dbtype = parseDbtype(rwConf.getProperty(DATA_SOURCE_URL)); - decryptProperties(rwConf); + rwConf.forEach((k, v) -> { + String n = decryptProperty(k.toString(), v == null ? null : v.toString()); + if (!Objects.equals(n, v)) rwConf.put(k, n); + }); initProperties(rwConf); this.readConfProps = rwConf; this.writeConfProps = rwConf; @@ -100,8 +103,14 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi conf.getAnyValue("read").forEach((k, v) -> readConf.put(k, v)); conf.getAnyValue("write").forEach((k, v) -> writeConf.put(k, v)); this.dbtype = parseDbtype(readConf.getProperty(DATA_SOURCE_URL)); - decryptProperties(readConf); - decryptProperties(writeConf); + readConf.forEach((k, v) -> { + String n = decryptProperty(k.toString(), v == null ? null : v.toString()); + if (!Objects.equals(n, v)) readConf.put(k, n); + }); + writeConf.forEach((k, v) -> { + String n = decryptProperty(k.toString(), v == null ? null : v.toString()); + if (!Objects.equals(n, v)) writeConf.put(k, n); + }); initProperties(readConf); initProperties(writeConf); this.readConfProps = readConf; @@ -109,6 +118,10 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi } this.name = conf.getValue("name", ""); this.sqlFormatter = (info, val) -> formatValueToString(info, val); + initSqlAttributes(); + } + + protected void initSqlAttributes() { this.autoddl = "true".equals(readConfProps.getProperty(DATA_SOURCE_TABLE_AUTODDL, "false").trim()); this.containSQL = readConfProps.getProperty(DATA_SOURCE_CONTAIN_SQLTEMPLATE, "LOCATE(${keystr}, ${column}) > 0"); @@ -119,14 +132,24 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi this.cacheForbidden = "NONE".equalsIgnoreCase(readConfProps.getProperty(DATA_SOURCE_CACHEMODE)); } + @Override @ResourceListener public void onResourceChange(ResourceEvent[] events) { - //@TODO 待实现 + if (events == null || events.length < 1) return; + //不支持读写分离模式的动态切换 + if (readConfProps == writeConfProps + && (events[0].name().startsWith("read.") || events[0].name().startsWith("write."))) { + throw new RuntimeException("DataSource(name=" + resourceName() + ") not support to change to read/write separation mode"); + } + if (readConfProps != writeConfProps + && (!events[0].name().startsWith("read.") && !events[0].name().startsWith("write."))) { + throw new RuntimeException("DataSource(name=" + resourceName() + ") not support to change to non read/write separation mode"); + } } //解密可能存在的加密字段, 可重载 - protected void decryptProperties(Properties props) { - + protected String decryptProperty(String key, String value) { + return value; } protected void initProperties(Properties props) {