去掉persistence.xml的监听文件变化功能,该有watch组件提供动态修改数据源配置

This commit is contained in:
Redkale
2018-07-05 09:49:19 +08:00
parent b08f9f5757
commit b172b66263
5 changed files with 78 additions and 86 deletions

View File

@@ -362,6 +362,14 @@ public final class Application {
return new ArrayList<>(servers);
}
public List<DataSource> getDataSources() {
return new ArrayList<>(dataSources);
}
public List<CacheSource> getCacheSources() {
return new ArrayList<>(cacheSources);
}
public File getHome() {
return home;
}

View File

@@ -6,6 +6,7 @@
package org.redkale.boot.watch;
import org.redkale.service.AbstractService;
import org.redkale.util.Comment;
import org.redkale.watch.WatchService;
/**
@@ -14,4 +15,6 @@ import org.redkale.watch.WatchService;
*/
public abstract class AbstractWatchService extends AbstractService implements WatchService {
@Comment("缺少参数")
public static final int RET_WATCH_PARAMS_ILLEGAL = 1600_0001;
}

View File

@@ -5,10 +5,15 @@
*/
package org.redkale.boot.watch;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Properties;
import javax.annotation.Resource;
import org.redkale.boot.Application;
import org.redkale.net.TransportFactory;
import org.redkale.net.http.RestService;
import org.redkale.net.http.*;
import org.redkale.service.*;
import org.redkale.source.*;
import org.redkale.util.*;
/**
*
@@ -17,10 +22,58 @@ import org.redkale.net.http.RestService;
@RestService(name = "source", catalog = "watch", repair = false)
public class SourceWatchService extends AbstractWatchService {
@Comment("不存在的Source")
public static final int RET_SOURCE_NOT_EXISTS = 1605_0001;
@Comment("Source不支持getReadPoolSource/getWritePoolSource方法")
public static final int RET_SOURCE_CHANGE_METHOD_NOT_EXISTS = 1605_0002;
@Comment("PoolSource调用change方法失败")
public static final int RET_SOURCE_METHOD_INVOKE_NOT_EXISTS = 1605_0003;
@Resource
private Application application;
@Resource
private TransportFactory transportFactory;
@RestMapping(name = "change", auth = false, comment = "动态更改DataSource的配置")
public RetResult addNode(@RestParam(name = "name", comment = "DataSource的标识") final String name,
@RestParam(name = "properties", comment = "配置") final Properties properties) throws IOException {
if (name == null) return new RetResult(RET_WATCH_PARAMS_ILLEGAL, "not found param (name)");
if (properties == null) return new RetResult(RET_WATCH_PARAMS_ILLEGAL, "not found param (properties)");
DataSource source = null;
for (DataSource s : application.getDataSources()) {
String resName = ((Resourcable) s).resourceName();
if (resName == null) continue;
if (!resName.equals(name)) continue;
source = s;
break;
}
if (source == null) return new RetResult(RET_SOURCE_NOT_EXISTS, "not found source (name = " + name + ")");
Method readPoolMethod = null;
Method writePoolMethod = null;
Class stype = source.getClass();
do {
for (Method m : stype.getDeclaredMethods()) {
if (!PoolSource.class.isAssignableFrom(m.getReturnType())) continue;
if (m.getParameterCount() != 0) continue;
if (m.getName().equals("getReadPoolSource")) {
readPoolMethod = m;
} else if (m.getName().equals("getWritePoolSource")) {
writePoolMethod = m;
}
}
} while ((stype = stype.getSuperclass()) != Object.class);
if (readPoolMethod == null) return new RetResult(RET_SOURCE_CHANGE_METHOD_NOT_EXISTS, "not found source method(getReadPoolSource)");
if (writePoolMethod == null) return new RetResult(RET_SOURCE_CHANGE_METHOD_NOT_EXISTS, "not found source method(getWritePoolSource)");
readPoolMethod.setAccessible(true);
writePoolMethod.setAccessible(true);
try {
PoolSource readPoolSource = (PoolSource) readPoolMethod.invoke(source);
readPoolSource.change(properties);
PoolSource writePoolSource = (PoolSource) writePoolMethod.invoke(source);
writePoolSource.change(properties);
return RetResult.success();
} catch (Exception e) {
return new RetResult(RET_SOURCE_METHOD_INVOKE_NOT_EXISTS, "poolsource invoke method('change') error");
}
}
}

View File

@@ -5,12 +5,8 @@
*/
package org.redkale.source;
import java.io.*;
import java.lang.ref.WeakReference;
import java.lang.reflect.Method;
import java.net.URL;
import java.nio.file.*;
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
import java.sql.*;
import java.util.*;
import java.util.concurrent.*;
@@ -27,8 +23,6 @@ import static org.redkale.source.DataSources.*;
*/
public class PoolJdbcSource extends PoolSource<Connection> {
private static final Map<String, AbstractMap.SimpleEntry<WatchService, List<WeakReference<PoolJdbcSource>>>> maps = new HashMap<>();
private final ConnectionPoolDataSource source;
private final ArrayBlockingQueue<PooledConnection> queue;
@@ -68,12 +62,6 @@ public class PoolJdbcSource extends PoolSource<Connection> {
logger.log(Level.WARNING, "connectionErronOccurred [" + event.getSQLException().getSQLState() + "]", event.getSQLException());
}
};
try {
this.watch();
} catch (Exception e) {
logger.log(Level.WARNING, DataSource.class.getSimpleName() + " watch " + persistxml + " error", e);
}
}
private static ConnectionPoolDataSource createDataSource(Properties property) {
@@ -152,74 +140,14 @@ public class PoolJdbcSource extends PoolSource<Connection> {
return 0;
}
private void watch() throws IOException {
if (persistxml == null || unitName == null) return;
final String file = persistxml.getFile();
final File f = new File(file);
if (!f.isFile() || !f.canRead()) return;
synchronized (maps) {
AbstractMap.SimpleEntry<WatchService, List<WeakReference<PoolJdbcSource>>> entry = maps.get(file);
if (entry != null) {
entry.getValue().add(new WeakReference<>(this));
return;
}
final WatchService watcher = f.toPath().getFileSystem().newWatchService();
final List<WeakReference<PoolJdbcSource>> list = new CopyOnWriteArrayList<>();
Thread watchThread = new Thread() {
@Override
public void run() {
try {
while (!this.isInterrupted()) {
final WatchKey key = watcher.take();
long d; //防止文件正在更新过程中去读取
for (;;) {
d = f.lastModified();
Thread.sleep(2000L);
if (d == f.lastModified()) break;
}
final Map<String, Properties> m = loadPersistenceXml(new FileInputStream(file));
key.pollEvents().stream().forEach((event) -> {
if (event.kind() != ENTRY_MODIFY) return;
if (!((Path) event.context()).toFile().getName().equals(f.getName())) return;
for (WeakReference<PoolJdbcSource> ref : list) {
PoolJdbcSource pool = ref.get();
if (pool == null) continue;
try {
Properties property = m.get(unitName);
if (property == null) property = m.get(unitName + "." + pool.rwtype);
if (property != null) pool.change(property);
} catch (Exception ex) {
logger.log(Level.INFO, event.context() + " occur error", ex);
}
}
});
key.reset();
}
} catch (Exception e) {
logger.log(Level.WARNING, "DataSource watch " + file + " occur error", e);
}
}
};
f.getParentFile().toPath().register(watcher, ENTRY_MODIFY);
watchThread.setName("DataSource-Watch-" + maps.size() + "-Thread");
watchThread.setDaemon(true);
watchThread.start();
logger.log(Level.INFO, watchThread.getName() + " start watching " + file);
//-----------------------------------------------------------
list.add(new WeakReference<>(this));
maps.put(file, new AbstractMap.SimpleEntry<>(watcher, list));
}
}
@Override
public void change(Properties property) {
Method seturlm;
Class clazz = source.getClass();
String newurl = property.getProperty(JDBC_URL);
String newuser = property.getProperty(JDBC_USER);
String newpassword = property.getProperty(JDBC_PWD);
if (this.url.equals(newurl) && this.username.equals(newuser) && this.password.equals(newpassword)) return;
String newurl = property.getProperty(JDBC_URL, this.url);
String newuser = property.getProperty(JDBC_USER, this.username);
String newpassword = property.getProperty(JDBC_PWD, this.password);
if (Objects.equals(this.url, newurl) && Objects.equals(this.username, newuser) && Objects.equals(this.password, newpassword)) return;
try {
try {
seturlm = clazz.getMethod("setUrl", String.class);

View File

@@ -9,7 +9,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.sql.*;
import java.util.Properties;
import java.util.*;
import java.util.concurrent.*;
import java.util.logging.*;
import org.redkale.net.AsyncConnection;
@@ -70,10 +70,10 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
@Override
public void change(Properties prop) {
String newurl = prop.getProperty(JDBC_URL);
String newuser = prop.getProperty(JDBC_USER, "");
String newpassword = prop.getProperty(JDBC_PWD, "");
if (this.url.equals(newurl) && this.username.equals(newuser) && this.password.equals(newpassword)) return;
String newurl = prop.getProperty(JDBC_URL, this.url);
String newuser = prop.getProperty(JDBC_USER, this.username);
String newpassword = prop.getProperty(JDBC_PWD, this.password);
if (Objects.equals(this.url, newurl) && Objects.equals(this.username, newuser) && Objects.equals(this.password, newpassword)) return;
this.url = newurl;
this.username = newuser;
this.password = newpassword;