ModuleEngine大优化
This commit is contained in:
384
src/main/java/org/redkale/boot/AppConfig.java
Normal file
384
src/main/java/org/redkale/boot/AppConfig.java
Normal file
@@ -0,0 +1,384 @@
|
||||
/*
|
||||
*
|
||||
*/
|
||||
package org.redkale.boot;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.logging.Logger;
|
||||
import java.util.logging.SimpleFormatter;
|
||||
import static org.redkale.boot.Application.RESNAME_APP_CONF_DIR;
|
||||
import static org.redkale.boot.Application.RESNAME_APP_CONF_FILE;
|
||||
import static org.redkale.boot.Application.RESNAME_APP_HOME;
|
||||
import org.redkale.util.AnyValue;
|
||||
import org.redkale.util.RedkaleClassLoader;
|
||||
import org.redkale.util.RedkaleException;
|
||||
import org.redkale.util.Utility;
|
||||
|
||||
/**
|
||||
* 加载系统参数配置
|
||||
*
|
||||
* @author zhangjx
|
||||
*/
|
||||
class AppConfig {
|
||||
|
||||
//日志
|
||||
private final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
|
||||
|
||||
final long startTime = System.currentTimeMillis();
|
||||
|
||||
//是否用于main方法运行
|
||||
final boolean singletonMode;
|
||||
|
||||
//是否用于编译模式运行
|
||||
final boolean compileMode;
|
||||
|
||||
//application.xml原始配置信息
|
||||
AnyValue config;
|
||||
|
||||
//是否从/META-INF中读取配置
|
||||
boolean configFromCache;
|
||||
|
||||
//本进程节点ID
|
||||
int nodeid;
|
||||
|
||||
//本进程节点ID
|
||||
String name;
|
||||
|
||||
//本地IP地址
|
||||
InetSocketAddress localAddress;
|
||||
|
||||
//配置信息Properties
|
||||
Properties envProperties;
|
||||
//进程根目录
|
||||
|
||||
File home;
|
||||
|
||||
//进程根目录
|
||||
String homePath;
|
||||
|
||||
//配置文件目录
|
||||
File confFile;
|
||||
|
||||
//配置文件目录
|
||||
URI confPath;
|
||||
|
||||
//根ClassLoader
|
||||
RedkaleClassLoader classLoader;
|
||||
|
||||
//Server根ClassLoader
|
||||
RedkaleClassLoader serverClassLoader;
|
||||
|
||||
//本地文件非system.property.开头的配置项
|
||||
Properties localEnvProperties = new Properties();
|
||||
|
||||
//本地文件设置System.properties且不存于System.properties的配置项
|
||||
Properties localSysProperties = new Properties();
|
||||
|
||||
//本地文件日志配置项
|
||||
Properties locaLogProperties = new Properties();
|
||||
|
||||
public AppConfig(boolean singletonMode, boolean compileMode) {
|
||||
this.singletonMode = singletonMode;
|
||||
this.compileMode = compileMode;
|
||||
}
|
||||
|
||||
public static AppConfig create(boolean singletonMode, boolean compileMode) throws IOException {
|
||||
AppConfig rs = new AppConfig(singletonMode, compileMode);
|
||||
rs.init(loadAppConfig());
|
||||
return rs;
|
||||
}
|
||||
|
||||
private void init(AnyValue conf) {
|
||||
this.config = conf;
|
||||
this.name = checkName(config.getValue("name", ""));
|
||||
this.nodeid = config.getIntValue("nodeid", 0);
|
||||
this.configFromCache = "true".equals(config.getValue("[config-from-cache]"));
|
||||
//初始化classLoader、serverClassLoader
|
||||
this.initClassLoader();
|
||||
//初始化home、confPath、localAddress等信息
|
||||
this.initAppHome();
|
||||
//读取本地参数配置
|
||||
this.initLocalProperties();
|
||||
//读取本地日志配置
|
||||
this.initLogProperties();
|
||||
}
|
||||
|
||||
/**
|
||||
* 初始化classLoader、serverClassLoader
|
||||
*/
|
||||
private void initClassLoader() {
|
||||
ClassLoader currClassLoader = Thread.currentThread().getContextClassLoader();
|
||||
if (currClassLoader instanceof RedkaleClassLoader) {
|
||||
this.classLoader = (RedkaleClassLoader) currClassLoader;
|
||||
} else {
|
||||
Set<String> cacheClasses = null;
|
||||
if (!singletonMode && !compileMode) {
|
||||
try {
|
||||
InputStream in = Application.class.getResourceAsStream(RedkaleClassLoader.RESOURCE_CACHE_CLASSES_PATH);
|
||||
if (in != null) {
|
||||
BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8), 1024);
|
||||
List<String> list = new ArrayList<>();
|
||||
reader.lines().forEach(list::add);
|
||||
Collections.sort(list);
|
||||
if (!list.isEmpty()) {
|
||||
cacheClasses = new LinkedHashSet<>(list);
|
||||
}
|
||||
in.close();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
//do nothing
|
||||
}
|
||||
}
|
||||
if (cacheClasses == null) {
|
||||
this.classLoader = new RedkaleClassLoader(currClassLoader);
|
||||
} else {
|
||||
this.classLoader = new RedkaleClassLoader.RedkaleCacheClassLoader(currClassLoader, cacheClasses);
|
||||
}
|
||||
Thread.currentThread().setContextClassLoader(this.classLoader);
|
||||
}
|
||||
if (compileMode || this.classLoader instanceof RedkaleClassLoader.RedkaleCacheClassLoader) {
|
||||
this.serverClassLoader = this.classLoader;
|
||||
} else {
|
||||
this.serverClassLoader = new RedkaleClassLoader(this.classLoader);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 初始化home、confPath、localAddress等信息
|
||||
*/
|
||||
private void initAppHome() {
|
||||
final File root = new File(System.getProperty(RESNAME_APP_HOME, ""));
|
||||
final String rootPath = getCanonicalPath(root);
|
||||
this.home = new File(rootPath);
|
||||
this.homePath = this.home.getPath();
|
||||
String confDir = System.getProperty(RESNAME_APP_CONF_DIR, "conf");
|
||||
if (confDir.contains("://") || confDir.startsWith("file:")
|
||||
|| confDir.startsWith("resource:") || confDir.contains("!")) { //graalvm native-image startwith resource:META-INF
|
||||
this.confPath = URI.create(confDir);
|
||||
if (confDir.startsWith("file:")) {
|
||||
this.confFile = getCanonicalFile(new File(this.confPath.getPath()));
|
||||
}
|
||||
} else if (confDir.charAt(0) == '/' || confDir.indexOf(':') > -1) {
|
||||
this.confFile = getCanonicalFile(new File(confDir));
|
||||
this.confPath = confFile.toURI();
|
||||
} else {
|
||||
this.confFile = new File(getCanonicalPath(new File(this.home, confDir)));
|
||||
this.confPath = confFile.toURI();
|
||||
}
|
||||
String localaddr = config.getValue("address", "").trim();
|
||||
InetAddress addr = localaddr.isEmpty() ? Utility.localInetAddress() : new InetSocketAddress(localaddr, config.getIntValue("port")).getAddress();
|
||||
this.localAddress = new InetSocketAddress(addr, config.getIntValue("port"));
|
||||
}
|
||||
|
||||
/**
|
||||
* 读取本地参数配置
|
||||
*/
|
||||
private void initLocalProperties() {
|
||||
AnyValue propsConf = this.config.getAnyValue("properties");
|
||||
if (propsConf != null) { //设置配置文件中的系统变量
|
||||
for (AnyValue prop : propsConf.getAnyValues("property")) {
|
||||
String key = prop.getValue("name", "");
|
||||
String value = prop.getValue("value");
|
||||
if (value != null && key.startsWith("system.property.")) {
|
||||
String propName = key.substring("system.property.".length());
|
||||
if (System.getProperty(propName) == null) { //命令行传参数优先级高
|
||||
localSysProperties.put(propName, value);
|
||||
}
|
||||
} else if (value != null) {
|
||||
localEnvProperties.put(key, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
//设置Convert默认配置项
|
||||
if (System.getProperty("redkale.convert.pool.size") == null
|
||||
&& localSysProperties.getProperty("redkale.convert.pool.size") == null) {
|
||||
localSysProperties.put("redkale.convert.pool.size", "128");
|
||||
}
|
||||
if (System.getProperty("redkale.convert.writer.buffer.defsize") == null
|
||||
&& localSysProperties.getProperty("redkale.convert.writer.buffer.defsize") == null) {
|
||||
localSysProperties.put("redkale.convert.writer.buffer.defsize", "4096");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 读取本地日志配置
|
||||
*/
|
||||
private void initLogProperties() {
|
||||
URI logConfURI;
|
||||
File logConfFile = null;
|
||||
if (configFromCache) {
|
||||
logConfURI = RedkaleClassLoader.getConfResourceAsURI(null, "logging.properties");
|
||||
} else if ("file".equals(confPath.getScheme())) {
|
||||
logConfFile = new File(confPath.getPath(), "logging.properties");
|
||||
logConfURI = logConfFile.toURI();
|
||||
if (!logConfFile.isFile() || !logConfFile.canRead()) {
|
||||
logConfFile = null;
|
||||
}
|
||||
} else {
|
||||
logConfURI = URI.create(confPath + (confPath.toString().endsWith("/") ? "" : "/") + "logging.properties");
|
||||
}
|
||||
if (!"file".equals(confPath.getScheme()) || logConfFile != null) {
|
||||
try {
|
||||
InputStream fin = logConfURI.toURL().openStream();
|
||||
Properties properties0 = new Properties();
|
||||
properties0.load(fin);
|
||||
fin.close();
|
||||
properties0.forEach((k, v) -> locaLogProperties.put(k.toString(), v.toString()));
|
||||
} catch (IOException e) {
|
||||
throw new RedkaleException("read logging.properties error", e);
|
||||
}
|
||||
}
|
||||
if (compileMode) {
|
||||
RedkaleClassLoader.putReflectionClass(java.lang.Class.class.getName());
|
||||
RedkaleClassLoader.putReflectionPublicConstructors(SimpleFormatter.class, SimpleFormatter.class.getName());
|
||||
RedkaleClassLoader.putReflectionPublicConstructors(LoggingSearchHandler.class, LoggingSearchHandler.class.getName());
|
||||
RedkaleClassLoader.putReflectionPublicConstructors(LoggingFileHandler.class, LoggingFileHandler.class.getName());
|
||||
RedkaleClassLoader.putReflectionPublicConstructors(LoggingFileHandler.LoggingFormater.class, LoggingFileHandler.LoggingFormater.class.getName());
|
||||
RedkaleClassLoader.putReflectionPublicConstructors(LoggingFileHandler.LoggingConsoleHandler.class, LoggingFileHandler.LoggingConsoleHandler.class.getName());
|
||||
RedkaleClassLoader.putReflectionPublicConstructors(LoggingFileHandler.LoggingSncpFileHandler.class, LoggingFileHandler.LoggingSncpFileHandler.class.getName());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 从本地application.xml或application.properties文件加载配置信息
|
||||
*
|
||||
* @return 配置信息
|
||||
* @throws IOException
|
||||
*/
|
||||
static AnyValue loadAppConfig() throws IOException {
|
||||
final String home = new File(System.getProperty(RESNAME_APP_HOME, "")).getCanonicalPath().replace('\\', '/');
|
||||
String sysConfFile = System.getProperty(RESNAME_APP_CONF_FILE);
|
||||
if (sysConfFile != null) {
|
||||
String text;
|
||||
if (sysConfFile.contains("://")) {
|
||||
text = Utility.readThenClose(URI.create(sysConfFile).toURL().openStream());
|
||||
} else {
|
||||
File f = new File(sysConfFile);
|
||||
if (f.isFile() && f.canRead()) {
|
||||
text = Utility.readThenClose(new FileInputStream(f));
|
||||
} else {
|
||||
throw new IOException("Read application conf file (" + sysConfFile + ") error ");
|
||||
}
|
||||
}
|
||||
return text.trim().startsWith("<") ? AnyValue.loadFromXml(text, (k, v) -> v.replace("${" + RESNAME_APP_HOME + "}", home))
|
||||
.getAnyValue("application") : AnyValue.loadFromProperties(text).getAnyValue("redkale");
|
||||
}
|
||||
String confDir = System.getProperty(RESNAME_APP_CONF_DIR, "conf");
|
||||
URI appConfFile;
|
||||
boolean fromCache = false;
|
||||
if (confDir.contains("://")) { //jar内部资源
|
||||
appConfFile = URI.create(confDir + (confDir.endsWith("/") ? "" : "/") + "application.xml");
|
||||
try {
|
||||
appConfFile.toURL().openStream().close();
|
||||
} catch (IOException e) { //没有application.xml就尝试读application.properties
|
||||
appConfFile = URI.create(confDir + (confDir.endsWith("/") ? "" : "/") + "application.properties");
|
||||
}
|
||||
} else if (confDir.charAt(0) == '/' || confDir.indexOf(':') > 0) { //绝对路径
|
||||
File f = new File(confDir, "application.xml");
|
||||
if (f.isFile() && f.canRead()) {
|
||||
appConfFile = f.toURI();
|
||||
confDir = f.getParentFile().getCanonicalPath();
|
||||
} else {
|
||||
f = new File(confDir, "application.properties");
|
||||
if (f.isFile() && f.canRead()) {
|
||||
appConfFile = f.toURI();
|
||||
confDir = f.getParentFile().getCanonicalPath();
|
||||
} else {
|
||||
appConfFile = RedkaleClassLoader.getConfResourceAsURI(null, "application.xml"); //不能传confDir
|
||||
try {
|
||||
appConfFile.toURL().openStream().close();
|
||||
} catch (IOException e) { //没有application.xml就尝试读application.properties
|
||||
appConfFile = RedkaleClassLoader.getConfResourceAsURI(null, "application.properties");
|
||||
}
|
||||
confDir = appConfFile.toString().replace("/application.xml", "").replace("/application.properties", "");
|
||||
fromCache = true;
|
||||
}
|
||||
}
|
||||
} else { //相对路径
|
||||
File f = new File(new File(home, confDir), "application.xml");
|
||||
if (f.isFile() && f.canRead()) {
|
||||
appConfFile = f.toURI();
|
||||
confDir = f.getParentFile().getCanonicalPath();
|
||||
} else {
|
||||
f = new File(new File(home, confDir), "application.properties");
|
||||
if (f.isFile() && f.canRead()) {
|
||||
appConfFile = f.toURI();
|
||||
confDir = f.getParentFile().getCanonicalPath();
|
||||
} else {
|
||||
appConfFile = RedkaleClassLoader.getConfResourceAsURI(null, "application.xml"); //不能传confDir
|
||||
try {
|
||||
appConfFile.toURL().openStream().close();
|
||||
} catch (IOException e) { //没有application.xml就尝试读application.properties
|
||||
appConfFile = RedkaleClassLoader.getConfResourceAsURI(null, "application.properties");
|
||||
}
|
||||
confDir = appConfFile.toString().replace("/application.xml", "").replace("/application.properties", "");
|
||||
fromCache = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
System.setProperty(RESNAME_APP_CONF_DIR, confDir);
|
||||
String text = Utility.readThenClose(appConfFile.toURL().openStream());
|
||||
AnyValue conf;
|
||||
if (text.trim().startsWith("<")) {
|
||||
conf = AnyValue.loadFromXml(text, (k, v) -> v.replace("${APP_HOME}", home)).getAnyValue("application");
|
||||
} else {
|
||||
conf = AnyValue.loadFromProperties(text).getAnyValue("redkale");
|
||||
}
|
||||
if (fromCache) {
|
||||
((AnyValue.DefaultAnyValue) conf).addValue("[config-from-cache]", "true");
|
||||
}
|
||||
|
||||
return conf;
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查name是否含特殊字符
|
||||
*
|
||||
* @param name
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
private String checkName(String name) {
|
||||
if (name == null || name.isEmpty()) {
|
||||
return name;
|
||||
}
|
||||
for (char ch : name.toCharArray()) {
|
||||
if (!((ch >= '0' && ch <= '9') || ch == '_' || ch == '.' || ch == '-' || (ch >= 'a' && ch <= 'z') || (ch >= 'A' && ch <= 'Z'))) { //不能含特殊字符
|
||||
throw new RedkaleException("name only 0-9 a-z A-Z _ - . cannot begin 0-9");
|
||||
}
|
||||
}
|
||||
return name;
|
||||
}
|
||||
|
||||
private String getCanonicalPath(File file) {
|
||||
try {
|
||||
return file.getCanonicalPath();
|
||||
} catch (IOException e) {
|
||||
return file.getAbsolutePath();
|
||||
}
|
||||
}
|
||||
|
||||
private File getCanonicalFile(File file) {
|
||||
try {
|
||||
return file.getCanonicalFile();
|
||||
} catch (IOException e) {
|
||||
return file;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -112,7 +112,7 @@ public class LoggingSearchHandler extends LoggingBaseHandler {
|
||||
if (application == null) {
|
||||
return;
|
||||
}
|
||||
this.source = (SearchSource) application.loadDataSource(sourceResourceName, false);
|
||||
this.source = application.getResourceFactory().find(sourceResourceName, SearchSource.class);
|
||||
if (retryCount.get() == 1 && this.source == null) {
|
||||
System.err.println("ERROR: not load logging.source(" + sourceResourceName + ")");
|
||||
}
|
||||
@@ -160,7 +160,7 @@ public class LoggingSearchHandler extends LoggingBaseHandler {
|
||||
if (!checkTagName(tagstr.replaceAll("\\$\\{.+\\}", ""))) {
|
||||
throw new RedkaleException("found illegal logging.property " + cname + ".tag = " + tagstr);
|
||||
}
|
||||
this.tag = tagstr.replace("${" + RESNAME_APP_NAME + "}", System.getProperty(RESNAME_APP_NAME, ""));
|
||||
this.tag = tagstr.replace("${" + RESNAME_APP_NAME + "}", System.getProperty("redkale.application.name", ""));
|
||||
if (this.tag.contains("%")) {
|
||||
this.tagDateFormat = this.tag;
|
||||
Times.formatTime(this.tagDateFormat, -1, System.currentTimeMillis()); //测试时间格式是否正确
|
||||
|
||||
161
src/main/java/org/redkale/boot/ModuleEngine.java
Normal file
161
src/main/java/org/redkale/boot/ModuleEngine.java
Normal file
@@ -0,0 +1,161 @@
|
||||
/*
|
||||
*
|
||||
*/
|
||||
package org.redkale.boot;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.logging.Logger;
|
||||
import org.redkale.service.Service;
|
||||
import org.redkale.util.ResourceEvent;
|
||||
import org.redkale.util.ResourceFactory;
|
||||
|
||||
/**
|
||||
*
|
||||
* 各组件的引擎类, 由Application管理
|
||||
*
|
||||
* <p>
|
||||
* 详情见: https://redkale.org
|
||||
*
|
||||
* @author zhangjx
|
||||
*
|
||||
* @since 2.8.0
|
||||
*/
|
||||
public abstract class ModuleEngine {
|
||||
|
||||
protected final Logger logger = Logger.getLogger(getClass().getSimpleName());
|
||||
|
||||
protected final Application application;
|
||||
|
||||
protected final ResourceFactory resourceFactory;
|
||||
|
||||
public ModuleEngine(Application application) {
|
||||
this.application = application;
|
||||
this.resourceFactory = application.getResourceFactory();
|
||||
}
|
||||
|
||||
/**
|
||||
* 进入Application.init方法时被调用
|
||||
*/
|
||||
public void onAppPreInit() {
|
||||
//do nothing
|
||||
}
|
||||
|
||||
/**
|
||||
* 结束Application.init方法前被调用
|
||||
*/
|
||||
public void onAppPostInit() {
|
||||
//do nothing
|
||||
}
|
||||
|
||||
/**
|
||||
* 进入Application.start方法被调用
|
||||
*/
|
||||
public void onAppPreStart() {
|
||||
//do nothing
|
||||
}
|
||||
|
||||
/**
|
||||
* 结束Application.start方法前被调用
|
||||
*/
|
||||
public void onAppPostStart() {
|
||||
//do nothing
|
||||
}
|
||||
|
||||
/**
|
||||
* 配置项加载后被调用
|
||||
*
|
||||
* @param props 配置项全量
|
||||
*/
|
||||
public void onEnvironmentLoaded(Properties props) {
|
||||
//do nothing
|
||||
}
|
||||
|
||||
/**
|
||||
* 配置项变更时被调用
|
||||
*
|
||||
* @param namespace 命名空间
|
||||
* @param events 变更项
|
||||
*/
|
||||
public void onEnvironmentChanged(String namespace, List<ResourceEvent> events) {
|
||||
//do nothing
|
||||
}
|
||||
|
||||
/**
|
||||
* 服务全部启动前被调用
|
||||
*/
|
||||
public void onServersPreStart() {
|
||||
//do nothing
|
||||
}
|
||||
|
||||
/**
|
||||
* 服务全部启动后被调用
|
||||
*/
|
||||
public void onServersPostStart() {
|
||||
//do nothing
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行Service.init方法前被调用
|
||||
*
|
||||
* @param service Service
|
||||
*/
|
||||
public void onServicePreInit(Service service) {
|
||||
//do nothing
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行Service.init方法后被调用
|
||||
*
|
||||
* @param service Service
|
||||
*/
|
||||
public void onServicePostInit(Service service) {
|
||||
//do nothing
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行Service.destroy方法前被调用
|
||||
*
|
||||
* @param service Service
|
||||
*/
|
||||
public void onServicePreDestroy(Service service) {
|
||||
//do nothing
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行Service.destroy方法后被调用
|
||||
*
|
||||
* @param service Service
|
||||
*/
|
||||
public void onServicePostDestroy(Service service) {
|
||||
//do nothing
|
||||
}
|
||||
|
||||
/**
|
||||
* 服务全部停掉前被调用
|
||||
*/
|
||||
public void onServersPreStop() {
|
||||
//do nothing
|
||||
}
|
||||
|
||||
/**
|
||||
* 服务全部停掉后被调用
|
||||
*/
|
||||
public void onServersPostStop() {
|
||||
//do nothing
|
||||
}
|
||||
|
||||
/**
|
||||
* 进入Application.shutdown方法被调用
|
||||
*/
|
||||
public void onAppPreShutdown() {
|
||||
//do nothing
|
||||
}
|
||||
|
||||
/**
|
||||
* 结束Application.shutdown方法前被调用
|
||||
*/
|
||||
public void onAppPostShutdown() {
|
||||
//do nothing
|
||||
}
|
||||
}
|
||||
@@ -389,7 +389,7 @@ public class NodeHttpServer extends NodeServer {
|
||||
String mqname = restConf.getValue("mq");
|
||||
MessageAgent agent0 = null;
|
||||
if (mqname != null) {
|
||||
agent0 = application.getMessageAgent(mqname);
|
||||
agent0 = application.getResourceFactory().find(mqname, MessageAgent.class);
|
||||
if (agent0 == null) {
|
||||
throw new RedkaleException("not found " + MessageAgent.class.getSimpleName() + " config for (name=" + mqname + ")");
|
||||
}
|
||||
@@ -548,7 +548,7 @@ public class NodeHttpServer extends NodeServer {
|
||||
|
||||
@Override //loadServlet执行之后调用
|
||||
protected void postLoadServlets() {
|
||||
final ClusterAgent cluster = application.getClusterAgent();
|
||||
final ClusterAgent cluster = application.getResourceFactory().find("", ClusterAgent.class);
|
||||
if (!application.isCompileMode() && cluster != null) {
|
||||
NodeProtocol pros = getClass().getAnnotation(NodeProtocol.class);
|
||||
String protocol = pros.value().toUpperCase();
|
||||
|
||||
@@ -12,6 +12,7 @@ import java.net.*;
|
||||
import java.nio.file.Path;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.logging.*;
|
||||
import org.redkale.annotation.*;
|
||||
@@ -103,6 +104,8 @@ public abstract class NodeServer {
|
||||
|
||||
private volatile int maxNameLength = 0;
|
||||
|
||||
CountDownLatch serviceCdl;
|
||||
|
||||
public NodeServer(Application application, Server server) {
|
||||
this.threadName = Thread.currentThread().getName();
|
||||
this.application = application;
|
||||
@@ -328,60 +331,60 @@ public abstract class NodeServer {
|
||||
return null;
|
||||
}
|
||||
}, Service.class);
|
||||
|
||||
//------------------------------------- 注册 DataSource --------------------------------------------------------
|
||||
resourceFactory.register((ResourceFactory rf, String srcResourceName, final Object srcObj, String resourceName, Field field, final Object attachment) -> {
|
||||
try {
|
||||
if (field.getAnnotation(Resource.class) == null && field.getAnnotation(javax.annotation.Resource.class) == null) {
|
||||
return null;
|
||||
}
|
||||
if ((srcObj instanceof Service) && Sncp.isRemote((Service) srcObj)) {
|
||||
return null; //远程模式不得注入 DataSource
|
||||
}
|
||||
DataSource source = application.loadDataSource(resourceName, false);
|
||||
field.set(srcObj, source);
|
||||
return source;
|
||||
} catch (Exception e) {
|
||||
logger.log(Level.SEVERE, "DataSource inject to " + srcObj + " error", e);
|
||||
return null;
|
||||
}
|
||||
}, DataSource.class);
|
||||
|
||||
//------------------------------------- 注册 CacheSource --------------------------------------------------------
|
||||
resourceFactory.register(new ResourceTypeLoader() {
|
||||
@Override
|
||||
public Object load(ResourceFactory rf, String srcResourceName, final Object srcObj, final String resourceName, Field field, final Object attachment) {
|
||||
try {
|
||||
if (field.getAnnotation(Resource.class) == null && field.getAnnotation(javax.annotation.Resource.class) == null) {
|
||||
return null;
|
||||
}
|
||||
if ((srcObj instanceof Service) && Sncp.isRemote((Service) srcObj)) {
|
||||
return null; //远程模式不需要注入 CacheSource
|
||||
}
|
||||
if (srcObj instanceof Servlet) {
|
||||
throw new RedkaleException("CacheSource cannot inject in Servlet " + srcObj);
|
||||
}
|
||||
final boolean ws = (srcObj instanceof org.redkale.net.http.WebSocketNodeService);
|
||||
CacheSource source = application.loadCacheSource(resourceName, ws);
|
||||
field.set(srcObj, source);
|
||||
Resource res = field.getAnnotation(Resource.class);
|
||||
if (res != null && res.required() && source == null) {
|
||||
throw new RedkaleException("CacheSource (resourceName = '" + resourceName + "') not found");
|
||||
} else {
|
||||
logger.info("Load CacheSource (type = " + (source == null ? null : source.getClass().getSimpleName()) + ", resourceName = '" + resourceName + "')");
|
||||
}
|
||||
return source;
|
||||
} catch (Exception e) {
|
||||
logger.log(Level.SEVERE, "DataSource inject error", e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean autoNone() {
|
||||
return false;
|
||||
}
|
||||
}, CacheSource.class);
|
||||
//
|
||||
// //------------------------------------- 注册 DataSource --------------------------------------------------------
|
||||
// resourceFactory.register((ResourceFactory rf, String srcResourceName, final Object srcObj, String resourceName, Field field, final Object attachment) -> {
|
||||
// try {
|
||||
// if (field.getAnnotation(Resource.class) == null && field.getAnnotation(javax.annotation.Resource.class) == null) {
|
||||
// return null;
|
||||
// }
|
||||
// if ((srcObj instanceof Service) && Sncp.isRemote((Service) srcObj)) {
|
||||
// return null; //远程模式不得注入 DataSource
|
||||
// }
|
||||
// DataSource source = application.loadDataSource(resourceName, false);
|
||||
// field.set(srcObj, source);
|
||||
// return source;
|
||||
// } catch (Exception e) {
|
||||
// logger.log(Level.SEVERE, "DataSource inject to " + srcObj + " error", e);
|
||||
// return null;
|
||||
// }
|
||||
// }, DataSource.class);
|
||||
//
|
||||
// //------------------------------------- 注册 CacheSource --------------------------------------------------------
|
||||
// resourceFactory.register(new ResourceTypeLoader() {
|
||||
// @Override
|
||||
// public Object load(ResourceFactory rf, String srcResourceName, final Object srcObj, final String resourceName, Field field, final Object attachment) {
|
||||
// try {
|
||||
// if (field.getAnnotation(Resource.class) == null && field.getAnnotation(javax.annotation.Resource.class) == null) {
|
||||
// return null;
|
||||
// }
|
||||
// if ((srcObj instanceof Service) && Sncp.isRemote((Service) srcObj)) {
|
||||
// return null; //远程模式不需要注入 CacheSource
|
||||
// }
|
||||
// if (srcObj instanceof Servlet) {
|
||||
// throw new RedkaleException("CacheSource cannot inject in Servlet " + srcObj);
|
||||
// }
|
||||
// final boolean ws = (srcObj instanceof org.redkale.net.http.WebSocketNodeService);
|
||||
// CacheSource source = application.loadCacheSource(resourceName, ws);
|
||||
// field.set(srcObj, source);
|
||||
// Resource res = field.getAnnotation(Resource.class);
|
||||
// if (res != null && res.required() && source == null) {
|
||||
// throw new RedkaleException("CacheSource (resourceName = '" + resourceName + "') not found");
|
||||
// } else {
|
||||
// logger.info("Load CacheSource (type = " + (source == null ? null : source.getClass().getSimpleName()) + ", resourceName = '" + resourceName + "')");
|
||||
// }
|
||||
// return source;
|
||||
// } catch (Exception e) {
|
||||
// logger.log(Level.SEVERE, "DataSource inject error", e);
|
||||
// return null;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public boolean autoNone() {
|
||||
// return false;
|
||||
// }
|
||||
// }, CacheSource.class);
|
||||
|
||||
//------------------------------------- 注册 WebSocketNode --------------------------------------------------------
|
||||
resourceFactory.register(new ResourceTypeLoader() {
|
||||
@@ -432,9 +435,7 @@ public abstract class NodeServer {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected void loadService(ClassFilter<? extends Service> serviceFilter) throws Exception {
|
||||
if (serviceFilter == null) {
|
||||
return;
|
||||
}
|
||||
Objects.requireNonNull(serviceFilter);
|
||||
final long starts = System.currentTimeMillis();
|
||||
final Set<FilterEntry<? extends Service>> entrys = (Set) serviceFilter.getAllFilterEntrys();
|
||||
ResourceFactory regFactory = isSNCP() ? application.getResourceFactory() : resourceFactory;
|
||||
@@ -548,8 +549,10 @@ public abstract class NodeServer {
|
||||
|
||||
}
|
||||
long et = System.currentTimeMillis();
|
||||
application.servicecdl.countDown();
|
||||
application.servicecdl.await();
|
||||
if (serviceCdl != null) {
|
||||
serviceCdl.countDown();
|
||||
serviceCdl.await();
|
||||
}
|
||||
logger.info(this.getClass().getSimpleName() + " construct services in " + (et - starts) + " ms and await " + (System.currentTimeMillis() - et) + " ms");
|
||||
|
||||
final StringBuilder sb = logger.isLoggable(Level.INFO) ? new StringBuilder() : null;
|
||||
@@ -611,8 +614,9 @@ public abstract class NodeServer {
|
||||
} else {
|
||||
localServices.stream().forEach(y -> {
|
||||
long s = System.currentTimeMillis();
|
||||
application.onServicePreInit(y);
|
||||
y.init(Sncp.getResourceConf(y));
|
||||
application.schedule(y);
|
||||
application.onServicePostInit(y);
|
||||
long e = System.currentTimeMillis() - s;
|
||||
if (slist != null) {
|
||||
String serstr = Sncp.toSimpleString(y, maxNameLength, maxTypeLength);
|
||||
@@ -666,7 +670,7 @@ public abstract class NodeServer {
|
||||
protected MessageAgent getMessageAgent(AnyValue serviceConf) {
|
||||
MessageAgent agent = null;
|
||||
if (serviceConf != null && serviceConf.getValue("mq") != null) {
|
||||
agent = application.getMessageAgent(serviceConf.getValue("mq"));
|
||||
agent = application.getResourceFactory().find(serviceConf.getValue("mq"), MessageAgent.class);
|
||||
if (agent != null) {
|
||||
messageAgents.put(agent.getName(), agent);
|
||||
}
|
||||
@@ -676,7 +680,7 @@ public abstract class NodeServer {
|
||||
|
||||
//Service.init执行之前调用
|
||||
protected void preInitServices(Set<Service> localServices, Set<Service> remoteServices, Set<Service> servletServices) {
|
||||
final ClusterAgent cluster = application.getClusterAgent();
|
||||
final ClusterAgent cluster = application.getResourceFactory().find("", ClusterAgent.class);
|
||||
if (!application.isCompileMode() && cluster != null) {
|
||||
NodeProtocol pros = getClass().getAnnotation(NodeProtocol.class);
|
||||
String protocol = pros.value().toUpperCase();
|
||||
@@ -697,8 +701,8 @@ public abstract class NodeServer {
|
||||
|
||||
//Service.destroy执行之前调用
|
||||
protected void preDestroyServices(Set<Service> localServices, Set<Service> remoteServices, Set<Service> servletServices) {
|
||||
if (!application.isCompileMode() && application.getClusterAgent() != null) { //服务注销
|
||||
final ClusterAgent cluster = application.getClusterAgent();
|
||||
final ClusterAgent cluster = application.getResourceFactory().find("", ClusterAgent.class);
|
||||
if (!application.isCompileMode() && cluster != null) { //服务注销
|
||||
NodeProtocol pros = getClass().getAnnotation(NodeProtocol.class);
|
||||
String protocol = pros.value().toUpperCase();
|
||||
if (cluster.containsProtocol(protocol) && cluster.containsPort(server.getSocketAddress().getPort())) {
|
||||
@@ -877,11 +881,12 @@ public abstract class NodeServer {
|
||||
if (finest) {
|
||||
logger.finest(Sncp.getResourceType(y) + " is destroying");
|
||||
}
|
||||
application.unschedule(y);
|
||||
application.onServicePreDestroy(y);
|
||||
y.destroy(Sncp.getResourceConf(y));
|
||||
if (finest) {
|
||||
logger.finest(Sncp.getResourceType(y) + " was destroyed");
|
||||
}
|
||||
application.onServicePostDestroy(y);
|
||||
long e = System.currentTimeMillis() - s;
|
||||
if (e > 2 && sb != null) {
|
||||
sb.append(Sncp.toSimpleString(y, maxNameLength, maxTypeLength)).append(" destroy ").append(e).append("ms").append(LINE_SEPARATOR);
|
||||
|
||||
@@ -13,8 +13,8 @@ import org.redkale.mq.MessageAgent;
|
||||
import org.redkale.net.*;
|
||||
import org.redkale.net.sncp.*;
|
||||
import org.redkale.service.Local;
|
||||
import org.redkale.util.AnyValue.DefaultAnyValue;
|
||||
import org.redkale.util.*;
|
||||
import org.redkale.util.AnyValue.DefaultAnyValue;
|
||||
|
||||
/**
|
||||
* SNCP Server节点的配置Server
|
||||
@@ -145,7 +145,7 @@ public class NodeSncpServer extends NodeServer {
|
||||
dynServletMap.put(x, servlet);
|
||||
String mq = Sncp.getResourceMQ(x);
|
||||
if (mq != null) {
|
||||
MessageAgent agent = application.getMessageAgent(mq);
|
||||
MessageAgent agent = application.getResourceFactory().find(mq, MessageAgent.class);
|
||||
agent.putService(this, x, servlet);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
*/
|
||||
package org.redkale.boot;
|
||||
|
||||
import java.util.List;
|
||||
import org.redkale.annotation.Bean;
|
||||
import org.redkale.boot.ClassFilter.FilterEntry;
|
||||
import org.redkale.convert.Decodeable;
|
||||
@@ -29,14 +30,9 @@ public class PrepareCompiler {
|
||||
//必须设置redkale.resource.skip.check=true
|
||||
//因redkale-maven-plugin的maven-core依赖jsr250,会覆盖redkale的javax.annotation.Resource导致无法识别Resource.required方法
|
||||
System.setProperty("redkale.resource.skip.check", "true");
|
||||
final Application application = new Application(false, true, Application.loadAppConfig());
|
||||
final Application application = new Application(AppConfig.create(false, true));
|
||||
application.init();
|
||||
for (ApplicationListener listener : application.listeners) {
|
||||
listener.preStart(application);
|
||||
}
|
||||
for (ApplicationListener listener : application.listeners) {
|
||||
listener.preCompile(application);
|
||||
}
|
||||
application.onPreCompile();
|
||||
application.start();
|
||||
final boolean hasSncp = application.getNodeServers().stream().filter(NodeSncpServer.class::isInstance).findFirst().isPresent();
|
||||
|
||||
@@ -54,7 +50,9 @@ public class PrepareCompiler {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
application.dataSources.forEach(source -> source.compile(clz));
|
||||
List<DataSource> dataSources = application.getResourceFactory().query(DataSource.class);
|
||||
dataSources.forEach(source -> source.compile(clz));
|
||||
//application.dataSources.forEach(source -> source.compile(clz));
|
||||
JsonFactory.root().loadEncoder(clz);
|
||||
if (hasSncp) {
|
||||
BsonFactory.root().loadEncoder(clz);
|
||||
@@ -73,7 +71,9 @@ public class PrepareCompiler {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
application.dataSources.forEach(source -> source.compile(clz));
|
||||
List<DataSource> dataSources = application.getResourceFactory().query(DataSource.class);
|
||||
dataSources.forEach(source -> source.compile(clz));
|
||||
//application.dataSources.forEach(source -> source.compile(clz));
|
||||
JsonFactory.root().loadEncoder(clz);
|
||||
if (hasSncp) {
|
||||
BsonFactory.root().loadEncoder(clz);
|
||||
@@ -133,9 +133,7 @@ public class PrepareCompiler {
|
||||
//do nothing
|
||||
}
|
||||
}
|
||||
for (ApplicationListener listener : application.listeners) {
|
||||
listener.postCompile(application);
|
||||
}
|
||||
application.onPostCompile();
|
||||
application.shutdown();
|
||||
return application;
|
||||
}
|
||||
|
||||
@@ -57,7 +57,9 @@ public abstract class PropertiesAgent {
|
||||
public abstract void destroy(AnyValue conf);
|
||||
|
||||
protected void updateEnvironmentProperties(Application application, String namespace, List<ResourceEvent> events) {
|
||||
if (events == null || events.isEmpty()) return;
|
||||
if (events == null || events.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
application.updateEnvironmentProperties(namespace, events);
|
||||
}
|
||||
|
||||
|
||||
@@ -102,7 +102,7 @@ public class CacheManagerService implements CacheManager, Service {
|
||||
this.localSource.init(conf);
|
||||
String remoteSourceName = conf.getValue("source");
|
||||
if (remoteSource == null && application != null && Utility.isNotBlank(remoteSourceName)) {
|
||||
CacheSource source = application.loadCacheSource(remoteSourceName, false);
|
||||
CacheSource source = application.getResourceFactory().find(remoteSourceName, CacheSource.class);
|
||||
if (source == null) {
|
||||
throw new RedkaleException("Not found CacheSource '" + remoteSourceName + "'");
|
||||
}
|
||||
|
||||
45
src/main/java/org/redkale/cache/support/CacheModuleEngine.java
vendored
Normal file
45
src/main/java/org/redkale/cache/support/CacheModuleEngine.java
vendored
Normal file
@@ -0,0 +1,45 @@
|
||||
/*
|
||||
*
|
||||
*/
|
||||
package org.redkale.cache.support;
|
||||
|
||||
import org.redkale.boot.Application;
|
||||
import org.redkale.boot.ModuleEngine;
|
||||
import org.redkale.util.AnyValue;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author zhangjx
|
||||
*/
|
||||
public class CacheModuleEngine extends ModuleEngine {
|
||||
|
||||
//全局缓存管理器
|
||||
private CacheManagerService cacheManager;
|
||||
|
||||
public CacheModuleEngine(Application application) {
|
||||
super(application);
|
||||
}
|
||||
|
||||
/**
|
||||
* 进入Application.init方法时被调用
|
||||
*/
|
||||
public void onAppPreInit() {
|
||||
//设置缓存管理器
|
||||
this.cacheManager = CacheManagerService.create(null).enabled(false);
|
||||
final AnyValue cacheConf = application.getAppConfig().getAnyValue("cache");
|
||||
this.resourceFactory.inject(this.cacheManager);
|
||||
if (!application.isCompileMode() && cacheConf != null) {
|
||||
this.cacheManager.init(cacheConf);
|
||||
}
|
||||
this.resourceFactory.register("", this.cacheManager);
|
||||
}
|
||||
|
||||
/**
|
||||
* 进入Application.shutdown方法被调用
|
||||
*/
|
||||
public void onAppPreShutdown() {
|
||||
if (!application.isCompileMode()) {
|
||||
this.cacheManager.destroy(this.cacheManager.getConfig());
|
||||
}
|
||||
}
|
||||
}
|
||||
224
src/main/java/org/redkale/cluster/ClusterModuleEngine.java
Normal file
224
src/main/java/org/redkale/cluster/ClusterModuleEngine.java
Normal file
@@ -0,0 +1,224 @@
|
||||
/*
|
||||
*
|
||||
*/
|
||||
package org.redkale.cluster;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Properties;
|
||||
import java.util.ServiceLoader;
|
||||
import java.util.Set;
|
||||
import java.util.logging.Level;
|
||||
import org.redkale.boot.Application;
|
||||
import org.redkale.boot.ModuleEngine;
|
||||
import org.redkale.util.AnyValue;
|
||||
import org.redkale.util.AnyValue.DefaultAnyValue;
|
||||
import org.redkale.util.RedkaleClassLoader;
|
||||
import org.redkale.util.ResourceEvent;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author zhangjx
|
||||
*/
|
||||
public class ClusterModuleEngine extends ModuleEngine {
|
||||
|
||||
//第三方服务配置资源
|
||||
//@since 2.8.0
|
||||
private Properties clusterProperties = new Properties();
|
||||
|
||||
//第三方服务发现管理接口
|
||||
//@since 2.1.0
|
||||
private ClusterAgent clusterAgent;
|
||||
|
||||
public ClusterModuleEngine(Application application) {
|
||||
super(application);
|
||||
}
|
||||
|
||||
/**
|
||||
* 进入Application.init方法时被调用
|
||||
*/
|
||||
public void onAppPreInit() {
|
||||
ClusterAgent cluster = null;
|
||||
AnyValue clusterConf = application.getAppConfig().getAnyValue("cluster");
|
||||
if (clusterConf != null) {
|
||||
try {
|
||||
String classVal = application.getPropertyValue(clusterConf.getValue("type", clusterConf.getValue("value"))); //兼容value字段
|
||||
if (classVal == null || classVal.isEmpty() || classVal.indexOf('.') < 0) { //不包含.表示非类名,比如值: consul, nacos
|
||||
Iterator<ClusterAgentProvider> it = ServiceLoader.load(ClusterAgentProvider.class, application.getClassLoader()).iterator();
|
||||
RedkaleClassLoader.putServiceLoader(ClusterAgentProvider.class);
|
||||
while (it.hasNext()) {
|
||||
ClusterAgentProvider provider = it.next();
|
||||
if (provider != null) {
|
||||
RedkaleClassLoader.putReflectionPublicConstructors(provider.getClass(), provider.getClass().getName()); //loader class
|
||||
}
|
||||
if (provider != null && provider.acceptsConf(clusterConf)) {
|
||||
cluster = provider.createInstance();
|
||||
cluster.setConfig(clusterConf);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (cluster == null) {
|
||||
ClusterAgent cacheClusterAgent = new CacheClusterAgent();
|
||||
if (cacheClusterAgent.acceptsConf(clusterConf)) {
|
||||
cluster = cacheClusterAgent;
|
||||
cluster.setConfig(clusterConf);
|
||||
}
|
||||
}
|
||||
if (cluster == null) {
|
||||
logger.log(Level.SEVERE, "load application cluster resource, but not found name='type' value error: " + clusterConf);
|
||||
}
|
||||
} else {
|
||||
Class type = application.getClassLoader().loadClass(classVal);
|
||||
if (!ClusterAgent.class.isAssignableFrom(type)) {
|
||||
logger.log(Level.SEVERE, "load application cluster resource, but not found "
|
||||
+ ClusterAgent.class.getSimpleName() + " implements class error: " + clusterConf);
|
||||
} else {
|
||||
RedkaleClassLoader.putReflectionDeclaredConstructors(type, type.getName());
|
||||
cluster = (ClusterAgent) type.getDeclaredConstructor().newInstance();
|
||||
cluster.setConfig(clusterConf);
|
||||
}
|
||||
}
|
||||
//此时不能执行cluster.init,因内置的对象可能依赖config.properties配置项
|
||||
} catch (Exception e) {
|
||||
logger.log(Level.SEVERE, "load application cluster resource error: " + clusterConf, e);
|
||||
}
|
||||
}
|
||||
this.clusterAgent = cluster;
|
||||
}
|
||||
|
||||
/**
|
||||
* 结束Application.init方法前被调用
|
||||
*/
|
||||
@Override
|
||||
public void onAppPostInit() {
|
||||
if (this.clusterAgent != null) {
|
||||
if (logger.isLoggable(Level.FINER)) {
|
||||
logger.log(Level.FINER, "ClusterAgent (type = " + this.clusterAgent.getClass().getSimpleName() + ") initing");
|
||||
}
|
||||
long s = System.currentTimeMillis();
|
||||
if (this.clusterAgent instanceof CacheClusterAgent) {
|
||||
String sourceName = ((CacheClusterAgent) clusterAgent).getSourceName(); //必须在inject前调用,需要赋值Resourcable.name
|
||||
//loadCacheSource(sourceName, false);
|
||||
}
|
||||
this.resourceFactory.inject(clusterAgent);
|
||||
clusterAgent.init(clusterAgent.getConfig());
|
||||
this.resourceFactory.register(ClusterAgent.class, clusterAgent);
|
||||
logger.info("ClusterAgent (type = " + this.clusterAgent.getClass().getSimpleName() + ") init in " + (System.currentTimeMillis() - s) + " ms");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 配置项加载后被调用
|
||||
*/
|
||||
@Override
|
||||
public void onEnvironmentLoaded(Properties props) {
|
||||
props.forEach((key, val) -> {
|
||||
if (key.toString().startsWith("redkale.cluster.")) {
|
||||
this.clusterProperties.put(key, val);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 配置项变更时被调用
|
||||
*
|
||||
* @param namespace 命名空间
|
||||
* @param events 变更项
|
||||
*/
|
||||
public void onEnvironmentChanged(String namespace, List<ResourceEvent> events) {
|
||||
Set<String> clusterRemovedKeys = new HashSet<>();
|
||||
Properties clusterChangedProps = new Properties();
|
||||
|
||||
for (ResourceEvent<String> event : events) {
|
||||
if (event.name().startsWith("redkale.cluster.")) {
|
||||
if (!Objects.equals(event.newValue(), this.clusterProperties.getProperty(event.name()))) {
|
||||
if (event.newValue() == null) {
|
||||
if (this.clusterProperties.containsKey(event.name())) {
|
||||
clusterRemovedKeys.add(event.name());
|
||||
}
|
||||
} else {
|
||||
clusterChangedProps.put(event.name(), event.newValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
//第三方服务注册配置项的变更
|
||||
if (!clusterChangedProps.isEmpty() || !clusterRemovedKeys.isEmpty()) {
|
||||
if (this.clusterAgent != null) {
|
||||
final DefaultAnyValue old = (DefaultAnyValue) application.getAppConfig().getAnyValue("cluster");
|
||||
Properties newProps = new Properties();
|
||||
newProps.putAll(clusterProperties);
|
||||
List<ResourceEvent> changeEvents = new ArrayList<>();
|
||||
clusterChangedProps.forEach((k, v) -> {
|
||||
final String key = k.toString();
|
||||
newProps.put(k, v);
|
||||
changeEvents.add(ResourceEvent.create(key.substring("redkale.cluster.".length()), v, this.clusterProperties.getProperty(key)));
|
||||
});
|
||||
clusterRemovedKeys.forEach(k -> {
|
||||
final String key = k;
|
||||
newProps.remove(k);
|
||||
changeEvents.add(ResourceEvent.create(key.substring("redkale.cluster.".length()), null, this.clusterProperties.getProperty(key)));
|
||||
});
|
||||
if (!changeEvents.isEmpty()) {
|
||||
DefaultAnyValue back = old.copy();
|
||||
try {
|
||||
old.replace(AnyValue.loadFromProperties(newProps).getAnyValue("redkale").getAnyValue("cluster"));
|
||||
clusterAgent.onResourceChange(changeEvents.toArray(new ResourceEvent[changeEvents.size()]));
|
||||
} catch (RuntimeException e) {
|
||||
old.replace(back); //还原配置
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
clusterChangedProps.forEach((k, v) -> {
|
||||
sb.append(ClusterAgent.class.getSimpleName()).append(" skip change '").append(k).append("'\r\n");
|
||||
});
|
||||
clusterRemovedKeys.forEach(k -> {
|
||||
sb.append(ClusterAgent.class.getSimpleName()).append(" skip change '").append(k).append("'\r\n");
|
||||
});
|
||||
if (sb.length() > 0) {
|
||||
logger.log(Level.INFO, sb.toString());
|
||||
}
|
||||
}
|
||||
clusterRemovedKeys.forEach(k -> this.clusterProperties.remove(k));
|
||||
this.clusterProperties.putAll(clusterChangedProps);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 进入Application.start方法被调用
|
||||
*/
|
||||
public void onAppPreStart() {
|
||||
if (!application.isSingletonMode() && !application.isCompileMode() && this.clusterAgent != null) {
|
||||
this.clusterAgent.register(application);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 服务全部启动后被调用
|
||||
*/
|
||||
public void onServersPostStart() {
|
||||
if (this.clusterAgent != null) {
|
||||
this.clusterAgent.start();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 服务全部停掉后被调用
|
||||
*/
|
||||
public void onServersPostStop() {
|
||||
if (!application.isCompileMode() && clusterAgent != null) {
|
||||
if (logger.isLoggable(Level.FINER)) {
|
||||
logger.log(Level.FINER, "ClusterAgent destroying");
|
||||
}
|
||||
long s = System.currentTimeMillis();
|
||||
clusterAgent.deregister(application);
|
||||
clusterAgent.destroy(clusterAgent.getConfig());
|
||||
logger.info("ClusterAgent destroy in " + (System.currentTimeMillis() - s) + " ms");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -123,9 +123,7 @@ public abstract class MessageAgent implements Resourcable {
|
||||
Class<MessageCoder<MessageRecord>> coderClass = (Class) Thread.currentThread().getContextClassLoader().loadClass(coderType);
|
||||
RedkaleClassLoader.putReflectionPublicConstructors(coderClass, coderClass.getName());
|
||||
MessageCoder<MessageRecord> coder = coderClass.getConstructor().newInstance();
|
||||
if (application != null) {
|
||||
application.getResourceFactory().inject(coder);
|
||||
}
|
||||
application.getResourceFactory().inject(coder);
|
||||
if (coder instanceof Service) {
|
||||
((Service) coder).init(config);
|
||||
}
|
||||
|
||||
418
src/main/java/org/redkale/mq/MessageModuleEngine.java
Normal file
418
src/main/java/org/redkale/mq/MessageModuleEngine.java
Normal file
@@ -0,0 +1,418 @@
|
||||
/*
|
||||
*
|
||||
*/
|
||||
package org.redkale.mq;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Properties;
|
||||
import java.util.ServiceLoader;
|
||||
import java.util.Set;
|
||||
import java.util.logging.Level;
|
||||
import org.redkale.boot.Application;
|
||||
import org.redkale.boot.ClassFilter;
|
||||
import org.redkale.boot.ModuleEngine;
|
||||
import org.redkale.boot.NodeServer;
|
||||
import org.redkale.convert.json.JsonConvert;
|
||||
import org.redkale.net.http.RestException;
|
||||
import org.redkale.util.AnyValue;
|
||||
import org.redkale.util.RedkaleClassLoader;
|
||||
import org.redkale.util.RedkaleException;
|
||||
import org.redkale.util.ResourceAnnotationProvider;
|
||||
import org.redkale.util.ResourceEvent;
|
||||
import org.redkale.util.ResourceFactory;
|
||||
import org.redkale.util.ResourceTypeLoader;
|
||||
import org.redkale.util.Utility;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author zhangjx
|
||||
*/
|
||||
public class MessageModuleEngine extends ModuleEngine {
|
||||
|
||||
//MQ管理配置资源
|
||||
//@since 2.8.0
|
||||
private Properties messageProperties = new Properties();
|
||||
|
||||
//MQ管理接口
|
||||
//@since 2.1.0
|
||||
private MessageAgent[] messageAgents;
|
||||
|
||||
public MessageModuleEngine(Application application) {
|
||||
super(application);
|
||||
}
|
||||
|
||||
/**
|
||||
* 配置项加载后被调用
|
||||
*/
|
||||
@Override
|
||||
public void onEnvironmentLoaded(Properties props) {
|
||||
if (this.messageAgents == null) {
|
||||
return;
|
||||
}
|
||||
props.forEach((key, val) -> {
|
||||
if (key.toString().startsWith("redkale.mq.") || key.toString().startsWith("redkale.mq[")) {
|
||||
if (key.toString().endsWith(".name")) {
|
||||
logger.log(Level.WARNING, "skip illegal key " + key + " in mq config, key cannot endsWith '.name'");
|
||||
} else {
|
||||
this.messageProperties.put(key, val);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 进入Application.init方法时被调用
|
||||
*/
|
||||
public void onAppPreInit() {
|
||||
MessageAgent[] mqs = null;
|
||||
AnyValue[] mqConfs = application.getAppConfig().getAnyValues("mq");
|
||||
if (mqConfs != null && mqConfs.length > 0) {
|
||||
mqs = new MessageAgent[mqConfs.length];
|
||||
Set<String> mqnames = new HashSet<>();
|
||||
for (int i = 0; i < mqConfs.length; i++) {
|
||||
AnyValue mqConf = mqConfs[i];
|
||||
String names = application.getPropertyValue(mqConf.getValue("name")); //含,或者;表示多个别名使用同一mq对象
|
||||
if (names != null && !names.isEmpty()) {
|
||||
for (String n : names.replace(',', ';').split(";")) {
|
||||
if (n.trim().isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
if (mqnames.contains(n.trim())) {
|
||||
throw new RedkaleException("mq.name(" + n.trim() + ") is repeat");
|
||||
}
|
||||
mqnames.add(n.trim());
|
||||
}
|
||||
} else if (names != null && names.isEmpty()) {
|
||||
String n = "";
|
||||
if (mqnames.contains(n.trim())) {
|
||||
throw new RedkaleException("mq.name(" + n.trim() + ") is repeat");
|
||||
}
|
||||
mqnames.add(n);
|
||||
}
|
||||
try {
|
||||
String classVal = application.getPropertyValue(mqConf.getValue("type", mqConf.getValue("value"))); //兼容value字段
|
||||
if (classVal == null || classVal.isEmpty() || classVal.indexOf('.') < 0) { //不包含.表示非类名,比如值: kafka, pulsar
|
||||
Iterator<MessageAgentProvider> it = ServiceLoader.load(MessageAgentProvider.class, application.getClassLoader()).iterator();
|
||||
RedkaleClassLoader.putServiceLoader(MessageAgentProvider.class);
|
||||
while (it.hasNext()) {
|
||||
MessageAgentProvider provider = it.next();
|
||||
if (provider != null) {
|
||||
RedkaleClassLoader.putReflectionPublicConstructors(provider.getClass(), provider.getClass().getName()); //loader class
|
||||
}
|
||||
if (provider != null && provider.acceptsConf(mqConf)) {
|
||||
mqs[i] = provider.createInstance();
|
||||
mqs[i].setConfig(mqConf);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (mqs[i] == null) {
|
||||
logger.log(Level.SEVERE, "load application mq resource, but not found name='value' value error: " + mqConf);
|
||||
}
|
||||
} else {
|
||||
Class type = application.getClassLoader().loadClass(classVal);
|
||||
if (!MessageAgent.class.isAssignableFrom(type)) {
|
||||
logger.log(Level.SEVERE, "load application mq resource, but not found "
|
||||
+ MessageAgent.class.getSimpleName() + " implements class error: " + mqConf);
|
||||
} else {
|
||||
RedkaleClassLoader.putReflectionDeclaredConstructors(type, type.getName());
|
||||
mqs[i] = (MessageAgent) type.getDeclaredConstructor().newInstance();
|
||||
mqs[i].setConfig(mqConf);
|
||||
}
|
||||
}
|
||||
//此时不能执行mq.init,因内置的对象可能依赖config.properties配置项
|
||||
} catch (Exception e) {
|
||||
logger.log(Level.SEVERE, "load application mq resource error: " + mqs[i], e);
|
||||
}
|
||||
}
|
||||
}
|
||||
this.messageAgents = mqs;
|
||||
//------------------------------------ 注册 ResourceProducer MessageProducer ------------------------------------
|
||||
resourceFactory.register(new ResourceAnnotationProvider<ResourceProducer>() {
|
||||
@Override
|
||||
public void load(ResourceFactory rf, String srcResourceName, Object srcObj, ResourceProducer annotation, Field field, Object attachment) {
|
||||
if (field.getType() != MessageProducer.class) {
|
||||
throw new RestException("@" + ResourceProducer.class.getSimpleName() + " must on " + MessageProducer.class.getName() + " type field, but on " + field);
|
||||
}
|
||||
MessageAgent agent = resourceFactory.find(annotation.mq(), MessageAgent.class);
|
||||
if (!annotation.required() && agent == null) {
|
||||
return;
|
||||
}
|
||||
if (agent == null) {
|
||||
throw new RedkaleException("Not found " + MessageAgent.class.getSimpleName() + "(name = " + annotation.mq() + ") on " + field);
|
||||
}
|
||||
try {
|
||||
MessageProducer producer = agent.loadMessageProducer(annotation);
|
||||
field.set(srcObj, producer);
|
||||
} catch (RuntimeException ex) {
|
||||
throw ex;
|
||||
} catch (Exception e) {
|
||||
throw new RedkaleException(field + "inject error", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<ResourceProducer> annotationType() {
|
||||
return ResourceProducer.class;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 结束Application.init方法前被调用
|
||||
*/
|
||||
@Override
|
||||
public void onAppPostInit() {
|
||||
if (this.messageAgents == null) {
|
||||
return;
|
||||
}
|
||||
if (logger.isLoggable(Level.FINER)) {
|
||||
logger.log(Level.FINER, "MessageAgent initing");
|
||||
}
|
||||
long s = System.currentTimeMillis();
|
||||
for (MessageAgent agent : this.messageAgents) {
|
||||
this.resourceFactory.inject(agent);
|
||||
agent.init(agent.getConfig());
|
||||
this.resourceFactory.register(agent.getName(), MessageAgent.class, agent);
|
||||
}
|
||||
logger.info("MessageAgent init in " + (System.currentTimeMillis() - s) + " ms");
|
||||
}
|
||||
|
||||
/**
|
||||
* 配置项变更时被调用
|
||||
*
|
||||
* @param namespace 命名空间
|
||||
* @param events 变更项
|
||||
*/
|
||||
public void onEnvironmentChanged(String namespace, List<ResourceEvent> events) {
|
||||
Set<String> messageRemovedKeys = new HashSet<>();
|
||||
Properties messageChangedProps = new Properties();
|
||||
|
||||
for (ResourceEvent<String> event : events) {
|
||||
if (event.name().startsWith("redkale.mq.") || event.name().startsWith("redkale.mq[")) {
|
||||
if (event.name().endsWith(".name")) {
|
||||
logger.log(Level.WARNING, "skip illegal key " + event.name()
|
||||
+ " in mq config " + (namespace == null ? "" : namespace) + ", key cannot endsWith '.name'");
|
||||
} else {
|
||||
if (!Objects.equals(event.newValue(), this.messageProperties.getProperty(event.name()))) {
|
||||
if (event.newValue() == null) {
|
||||
if (this.messageProperties.containsKey(event.name())) {
|
||||
messageRemovedKeys.add(event.name());
|
||||
}
|
||||
} else {
|
||||
messageChangedProps.put(event.name(), event.newValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
//MQ配置项的变更
|
||||
if (!messageChangedProps.isEmpty() || !messageRemovedKeys.isEmpty()) {
|
||||
Set<String> messageNames = new LinkedHashSet<>();
|
||||
List<String> keys = new ArrayList<>();
|
||||
keys.addAll(messageRemovedKeys);
|
||||
keys.addAll((Set) messageChangedProps.keySet());
|
||||
for (final String key : keys) {
|
||||
if (key.startsWith("redkale.mq[")) {
|
||||
messageNames.add(key.substring("redkale.mq[".length(), key.indexOf(']')));
|
||||
} else if (key.startsWith("redkale.mq.")) {
|
||||
messageNames.add(key.substring("redkale.mq.".length(), key.indexOf('.', "redkale.mq.".length())));
|
||||
}
|
||||
}
|
||||
//更新MQ
|
||||
for (String mqName : messageNames) {
|
||||
MessageAgent agent = Utility.find(messageAgents, s -> Objects.equals(s.resourceName(), mqName));
|
||||
if (agent == null) {
|
||||
continue; //多余的数据源
|
||||
}
|
||||
final AnyValue.DefaultAnyValue old = (AnyValue.DefaultAnyValue) findMQConfig(mqName);
|
||||
Properties newProps = new Properties();
|
||||
this.messageProperties.forEach((k, v) -> {
|
||||
final String key = k.toString();
|
||||
String prefix = "redkale.mq[" + mqName + "].";
|
||||
int pos = key.indexOf(prefix);
|
||||
if (pos < 0) {
|
||||
prefix = "redkale.mq." + mqName + ".";
|
||||
pos = key.indexOf(prefix);
|
||||
}
|
||||
if (pos < 0) {
|
||||
return; //不是同一name数据源配置项
|
||||
}
|
||||
newProps.put(k, v);
|
||||
});
|
||||
List<ResourceEvent> changeEvents = new ArrayList<>();
|
||||
messageChangedProps.forEach((k, v) -> {
|
||||
final String key = k.toString();
|
||||
String prefix = "redkale.mq[" + mqName + "].";
|
||||
int pos = key.indexOf(prefix);
|
||||
if (pos < 0) {
|
||||
prefix = "redkale.mq." + mqName + ".";
|
||||
pos = key.indexOf(prefix);
|
||||
}
|
||||
if (pos < 0) {
|
||||
return; //不是同一name数据源配置项
|
||||
}
|
||||
newProps.put(k, v);
|
||||
changeEvents.add(ResourceEvent.create(key.substring(prefix.length()), v, this.messageProperties.getProperty(key)));
|
||||
});
|
||||
messageRemovedKeys.forEach(k -> {
|
||||
final String key = k;
|
||||
String prefix = "redkale.mq[" + mqName + "].";
|
||||
int pos = key.indexOf(prefix);
|
||||
if (pos < 0) {
|
||||
prefix = "redkale.mq." + mqName + ".";
|
||||
pos = key.indexOf(prefix);
|
||||
}
|
||||
if (pos < 0) {
|
||||
return;
|
||||
}
|
||||
newProps.remove(k); //不是同一name数据源配置项
|
||||
changeEvents.add(ResourceEvent.create(key.substring(prefix.length()), null, this.messageProperties.getProperty(key)));
|
||||
});
|
||||
if (!changeEvents.isEmpty()) {
|
||||
AnyValue.DefaultAnyValue back = old.copy();
|
||||
try {
|
||||
old.replace(AnyValue.loadFromProperties(newProps).getAnyValue("redkale").getAnyValue("mq").getAnyValue(mqName));
|
||||
agent.onResourceChange(changeEvents.toArray(new ResourceEvent[changeEvents.size()]));
|
||||
} catch (RuntimeException e) {
|
||||
old.replace(back); //还原配置
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
messageRemovedKeys.forEach(k -> this.messageProperties.remove(k));
|
||||
this.messageProperties.putAll(messageChangedProps);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 服务全部启动后被调用
|
||||
*/
|
||||
public void onServersPostStart() {
|
||||
if (this.messageAgents == null) {
|
||||
return;
|
||||
}
|
||||
//startMessageAgent
|
||||
if (logger.isLoggable(Level.FINE)) {
|
||||
logger.log(Level.FINE, MessageAgent.class.getSimpleName() + " starting");
|
||||
}
|
||||
long s = System.currentTimeMillis();
|
||||
Set<String> names = new HashSet<>();
|
||||
ResourceFactory serResourceFactory = this.resourceFactory.createChild();
|
||||
List<ResourceFactory> factorys = new ArrayList<>();
|
||||
for (NodeServer ns : application.getNodeServers()) {
|
||||
factorys.add(ns.getResourceFactory());
|
||||
}
|
||||
serResourceFactory.register(new ResourceTypeLoader() {
|
||||
@Override
|
||||
public Object load(ResourceFactory factory, String srcResourceName, Object srcObj, String resourceName, Field field, Object attachment) {
|
||||
for (ResourceFactory f : factorys) {
|
||||
Object val = f.find(resourceName, field.getGenericType());
|
||||
if (val != null) {
|
||||
return val;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean autoNone() {
|
||||
return false;
|
||||
}
|
||||
}, Object.class);
|
||||
for (MessageAgent agent : this.messageAgents) {
|
||||
names.add(agent.getName());
|
||||
List<MessageConsumer> consumers = new ArrayList<>();
|
||||
AnyValue consumerConf = agent.getConfig().getAnyValue("consumer");
|
||||
if (consumerConf != null) { //加载 MessageConsumer
|
||||
ClassFilter filter = new ClassFilter(application.getServerClassLoader(), ResourceConsumer.class, MessageConsumer.class, null, null);
|
||||
if (consumerConf.getBoolValue("autoload", true)) {
|
||||
String includes = consumerConf.getValue("includes", "");
|
||||
String excludes = consumerConf.getValue("excludes", "");
|
||||
filter.setIncludePatterns(includes.split(";"));
|
||||
filter.setExcludePatterns(excludes.split(";"));
|
||||
} else {
|
||||
filter.setRefused(true);
|
||||
}
|
||||
|
||||
try {
|
||||
ClassFilter.Loader.load(application.getHome(), application.getServerClassLoader(), filter);
|
||||
List<ClassFilter.FilterEntry<? extends MessageConsumer>> entrys = new ArrayList(filter.getFilterEntrys());
|
||||
for (ClassFilter.FilterEntry<? extends MessageConsumer> en : entrys) {
|
||||
Class<? extends MessageConsumer> clazz = en.getType();
|
||||
ResourceConsumer res = clazz.getAnnotation(ResourceConsumer.class);
|
||||
if (!Objects.equals(agent.getName(), application.getPropertyValue(res.mq()))) {
|
||||
continue;
|
||||
}
|
||||
RedkaleClassLoader.putReflectionDeclaredConstructors(clazz, clazz.getName());
|
||||
final MessageConsumer consumer = clazz.getDeclaredConstructor().newInstance();
|
||||
serResourceFactory.inject(consumer);
|
||||
consumers.add(consumer);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RedkaleException(e);
|
||||
}
|
||||
for (MessageConsumer consumer : consumers) {
|
||||
consumer.init(consumerConf);
|
||||
}
|
||||
}
|
||||
agent.start(consumers);
|
||||
}
|
||||
logger.info("MessageAgent(names=" + JsonConvert.root().convertTo(names) + ") started in " + (System.currentTimeMillis() - s) + " ms");
|
||||
}
|
||||
|
||||
/**
|
||||
* 服务全部停掉前被调用
|
||||
*/
|
||||
public void onServersPreStop() {
|
||||
if (application.isCompileMode() && this.messageAgents != null) {
|
||||
Set<String> names = new HashSet<>();
|
||||
if (logger.isLoggable(Level.FINER)) {
|
||||
logger.log(Level.FINER, "MessageAgent stopping");
|
||||
}
|
||||
long s = System.currentTimeMillis();
|
||||
for (MessageAgent agent : this.messageAgents) {
|
||||
names.add(agent.getName());
|
||||
agent.stop();
|
||||
}
|
||||
logger.info("MessageAgent(names=" + JsonConvert.root().convertTo(names) + ") stop in " + (System.currentTimeMillis() - s) + " ms");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 服务全部停掉后被调用
|
||||
*/
|
||||
public void onServersPostStop() {
|
||||
if (this.messageAgents != null) {
|
||||
Set<String> names = new HashSet<>();
|
||||
if (logger.isLoggable(Level.FINER)) {
|
||||
logger.log(Level.FINER, "MessageAgent destroying");
|
||||
}
|
||||
long s = System.currentTimeMillis();
|
||||
for (MessageAgent agent : this.messageAgents) {
|
||||
names.add(agent.getName());
|
||||
agent.destroy(agent.getConfig());
|
||||
}
|
||||
logger.info("MessageAgent(names=" + JsonConvert.root().convertTo(names) + ") destroy in " + (System.currentTimeMillis() - s) + " ms");
|
||||
}
|
||||
}
|
||||
|
||||
private AnyValue findMQConfig(String mqName) {
|
||||
AnyValue mqsNode = application.getAppConfig().getAnyValue("mq");
|
||||
if (mqsNode != null) {
|
||||
AnyValue confNode = mqsNode.getAnyValue(mqName);
|
||||
if (confNode != null) { //必须要设置name属性
|
||||
((AnyValue.DefaultAnyValue) confNode).setValue("name", mqName);
|
||||
}
|
||||
return confNode;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -108,7 +108,7 @@ public class AsyncIOGroup extends AsyncGroup {
|
||||
}
|
||||
|
||||
@Override
|
||||
public AsyncGroup start() {
|
||||
public AsyncIOGroup start() {
|
||||
if (closed.get()) {
|
||||
throw new RedkaleException("group is closed");
|
||||
}
|
||||
|
||||
@@ -69,58 +69,6 @@ public abstract class Sncp {
|
||||
|
||||
}
|
||||
|
||||
public static class SchedulingEntry {
|
||||
|
||||
private Class serviceType;
|
||||
|
||||
private String methodName;
|
||||
|
||||
private Scheduled schedule;
|
||||
|
||||
private Runnable task;
|
||||
|
||||
public SchedulingEntry(Class serviceType, String methodName, Scheduled schedule, Runnable task) {
|
||||
Objects.requireNonNull(serviceType);
|
||||
Objects.requireNonNull(methodName);
|
||||
Objects.requireNonNull(schedule);
|
||||
Objects.requireNonNull(task);
|
||||
this.serviceType = serviceType;
|
||||
this.methodName = methodName;
|
||||
this.schedule = schedule;
|
||||
this.task = task;
|
||||
}
|
||||
|
||||
public Class getServiceType() {
|
||||
return serviceType;
|
||||
}
|
||||
|
||||
public String getMethodName() {
|
||||
return methodName;
|
||||
}
|
||||
|
||||
public Scheduled getSchedule() {
|
||||
return schedule;
|
||||
}
|
||||
|
||||
public Runnable getTask() {
|
||||
return task;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getClass().getSimpleName() + "{"
|
||||
+ "serviceType=" + serviceType
|
||||
+ ", methodName=" + methodName
|
||||
+ ", schedule.cron=" + schedule.cron()
|
||||
+ ", schedule.zone=" + schedule.zone()
|
||||
+ ", schedule.fixedDelay=" + schedule.fixedDelay()
|
||||
+ ", schedule.fixedRate=" + schedule.fixedRate()
|
||||
+ ", schedule.initialDelay=" + schedule.initialDelay()
|
||||
+ ", schedule.timeUnit=" + schedule.timeUnit()
|
||||
+ '}';
|
||||
}
|
||||
}
|
||||
|
||||
private Sncp() {
|
||||
}
|
||||
|
||||
@@ -173,7 +121,7 @@ public abstract class Sncp {
|
||||
Method old = actionids.get(actionid);
|
||||
if (old != null) {
|
||||
if (old.getDeclaringClass().equals(method.getDeclaringClass())) {
|
||||
throw new SncpException(serviceTypeOrImplClass.getName()
|
||||
throw new SncpException(serviceTypeOrImplClass.getName()
|
||||
+ " have one more same action(Method=" + method + ", " + old + ", actionid=" + actionid + ")");
|
||||
}
|
||||
continue;
|
||||
@@ -317,7 +265,7 @@ public abstract class Sncp {
|
||||
|
||||
//格式: sncp.req.module.user
|
||||
public static String generateSncpReqTopic(Service service, int nodeid) {
|
||||
return generateSncpReqTopic(Sncp.getResourceName(service), Sncp.getResourceType(service), nodeid);
|
||||
return generateSncpReqTopic(getResourceName(service), getResourceType(service), nodeid);
|
||||
}
|
||||
|
||||
//格式: sncp.req.module.user
|
||||
@@ -325,8 +273,8 @@ public abstract class Sncp {
|
||||
if (WebSocketNode.class.isAssignableFrom(resourceType)) {
|
||||
return getSncpReqTopicPrefix() + "module.wsnode" + nodeid + (resourceName.isEmpty() ? "" : ("-" + resourceName));
|
||||
}
|
||||
return getSncpReqTopicPrefix() + "module."
|
||||
+ resourceType.getSimpleName().replaceAll("Service.*$", "").toLowerCase()
|
||||
return getSncpReqTopicPrefix() + "module."
|
||||
+ resourceType.getSimpleName().replaceAll("Service.*$", "").toLowerCase()
|
||||
+ (resourceName.isEmpty() ? "" : ("-" + resourceName));
|
||||
}
|
||||
|
||||
@@ -789,7 +737,7 @@ public abstract class Sncp {
|
||||
if (!java.lang.reflect.Modifier.isPublic(mod)) {
|
||||
return null;
|
||||
}
|
||||
final SncpRemoteInfo info = createSncpRemoteInfo(name, getResourceType(serviceTypeOrImplClass),
|
||||
final SncpRemoteInfo info = createSncpRemoteInfo(name, getResourceType(serviceTypeOrImplClass),
|
||||
serviceTypeOrImplClass, BsonConvert.root(), sncpRpcGroups, client, agent, remoteGroup);
|
||||
final String supDynName = serviceTypeOrImplClass.getName().replace('.', '/');
|
||||
final String sncpInfoName = SncpRemoteInfo.class.getName().replace('.', '/');
|
||||
@@ -840,7 +788,7 @@ public abstract class Sncp {
|
||||
MethodDebugVisitor mv;
|
||||
AnnotationVisitor av0;
|
||||
|
||||
cw.visit(V11, ACC_PUBLIC + ACC_SUPER, newDynName, null, serviceTypeOrImplClass.isInterface() ? "java/lang/Object" : supDynName,
|
||||
cw.visit(V11, ACC_PUBLIC + ACC_SUPER, newDynName, null, serviceTypeOrImplClass.isInterface() ? "java/lang/Object" : supDynName,
|
||||
serviceTypeOrImplClass.isInterface() ? new String[]{supDynName} : null);
|
||||
{ //给动态生成的Service类标记上Resource
|
||||
av0 = cw.visitAnnotation(resDesc, true);
|
||||
@@ -961,7 +909,7 @@ public abstract class Sncp {
|
||||
mv.visitVarInsn(ILOAD, insn);
|
||||
}
|
||||
Class bigclaz = TypeToken.primitiveToWrapper((Class) pt);
|
||||
mv.visitMethodInsn(INVOKESTATIC, bigclaz.getName().replace('.', '/'), "valueOf",
|
||||
mv.visitMethodInsn(INVOKESTATIC, bigclaz.getName().replace('.', '/'), "valueOf",
|
||||
"(" + Type.getDescriptor((Class) pt) + ")" + Type.getDescriptor(bigclaz), false);
|
||||
} else {
|
||||
mv.visitVarInsn(ALOAD, insn);
|
||||
|
||||
@@ -26,17 +26,17 @@ import org.redkale.util.*;
|
||||
* 详情见: https://redkale.org
|
||||
*
|
||||
* @author zhangjx
|
||||
* @param <T> Service泛型
|
||||
* @param <S> Service泛型
|
||||
*
|
||||
* @since 2.8.0
|
||||
*/
|
||||
public class SncpRemoteInfo<T extends Service> {
|
||||
public class SncpRemoteInfo<S extends Service> {
|
||||
|
||||
protected static final Logger logger = Logger.getLogger(SncpRemoteInfo.class.getSimpleName());
|
||||
|
||||
protected final String name;
|
||||
|
||||
protected final Class<T> serviceType;
|
||||
protected final Class<S> serviceType;
|
||||
|
||||
protected final Uint128 serviceid;
|
||||
|
||||
@@ -71,7 +71,7 @@ public class SncpRemoteInfo<T extends Service> {
|
||||
//MQ模式下此字段才有值
|
||||
protected final MessageClient messageClient;
|
||||
|
||||
SncpRemoteInfo(String resourceName, Class<T> resourceType, Class<T> serviceImplClass, Convert convert,
|
||||
SncpRemoteInfo(String resourceName, Class<S> resourceType, Class<S> serviceImplClass, Convert convert,
|
||||
SncpRpcGroups sncpRpcGroups, SncpClient sncpClient, MessageAgent messageAgent, String remoteGroup) {
|
||||
Objects.requireNonNull(sncpRpcGroups);
|
||||
this.name = resourceName;
|
||||
|
||||
@@ -0,0 +1,65 @@
|
||||
/*
|
||||
*
|
||||
*/
|
||||
package org.redkale.schedule.support;
|
||||
|
||||
import org.redkale.boot.Application;
|
||||
import org.redkale.boot.ModuleEngine;
|
||||
import org.redkale.service.Service;
|
||||
import org.redkale.util.AnyValue;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author zhangjx
|
||||
*/
|
||||
public class ScheduleModuleEngine extends ModuleEngine {
|
||||
|
||||
//全局定时任务管理器
|
||||
private ScheduleManagerService scheduleManager;
|
||||
|
||||
public ScheduleModuleEngine(Application application) {
|
||||
super(application);
|
||||
}
|
||||
|
||||
/**
|
||||
* 进入Application.init方法时被调用
|
||||
*/
|
||||
public void onAppPreInit() {
|
||||
//设置定时管理器
|
||||
this.scheduleManager = ScheduleManagerService.create(null).enabled(false);
|
||||
final AnyValue scheduleConf = application.getAppConfig().getAnyValue("schedule", true);
|
||||
this.resourceFactory.inject(this.scheduleManager);
|
||||
if (!application.isCompileMode()) {
|
||||
this.scheduleManager.init(scheduleConf);
|
||||
}
|
||||
this.resourceFactory.register("", this.scheduleManager);
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行Service.init方法后被调用
|
||||
*
|
||||
* @param service Service
|
||||
*/
|
||||
public void onServicePostInit(Service service) {
|
||||
this.scheduleManager.schedule(service);
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行Service.destroy方法后被调用
|
||||
*
|
||||
* @param service Service
|
||||
*/
|
||||
@Override
|
||||
public void onServicePreDestroy(Service service) {
|
||||
this.scheduleManager.unschedule(service);
|
||||
}
|
||||
|
||||
/**
|
||||
* 进入Application.shutdown方法被调用
|
||||
*/
|
||||
public void onAppPreShutdown() {
|
||||
if (!application.isCompileMode()) {
|
||||
this.scheduleManager.destroy(this.scheduleManager.getConfig());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -6,14 +6,13 @@
|
||||
package org.redkale.service;
|
||||
|
||||
import java.io.*;
|
||||
import static java.lang.annotation.RetentionPolicy.RUNTIME;
|
||||
import java.lang.annotation.*;
|
||||
import static java.lang.annotation.ElementType.*;
|
||||
import static java.lang.annotation.RetentionPolicy.RUNTIME;
|
||||
import java.lang.reflect.*;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.*;
|
||||
import java.util.function.BiFunction;
|
||||
import static org.redkale.boot.Application.*;
|
||||
import org.redkale.util.RedkaleClassLoader;
|
||||
|
||||
/**
|
||||
@@ -59,13 +58,21 @@ public @interface RetLabel {
|
||||
RedkaleClassLoader.putServiceLoader(RetInfoTransfer.class);
|
||||
Iterator<RetInfoTransfer> it = loader.iterator();
|
||||
RetInfoTransfer func = it.hasNext() ? it.next() : null;
|
||||
if (func != null) RedkaleClassLoader.putReflectionPublicConstructors(func.getClass(), func.getClass().getName());
|
||||
if (func != null) {
|
||||
RedkaleClassLoader.putReflectionPublicConstructors(func.getClass(), func.getClass().getName());
|
||||
}
|
||||
RedkaleClassLoader.putReflectionPublicFields(clazz.getName());
|
||||
for (Field field : clazz.getFields()) {
|
||||
if (!Modifier.isStatic(field.getModifiers())) continue;
|
||||
if (field.getType() != int.class) continue;
|
||||
if (!Modifier.isStatic(field.getModifiers())) {
|
||||
continue;
|
||||
}
|
||||
if (field.getType() != int.class) {
|
||||
continue;
|
||||
}
|
||||
RetLabel[] infos = field.getAnnotationsByType(RetLabel.class);
|
||||
if (infos == null || infos.length == 0) continue;
|
||||
if (infos == null || infos.length == 0) {
|
||||
continue;
|
||||
}
|
||||
int value;
|
||||
try {
|
||||
value = field.getInt(null);
|
||||
@@ -74,11 +81,12 @@ public @interface RetLabel {
|
||||
continue;
|
||||
}
|
||||
for (RetLabel info : infos) {
|
||||
rets.computeIfAbsent(info.locale(), (k) -> new LinkedHashMap<>()).put(value, func == null ? info.value() : func.apply(value, info.value()));
|
||||
rets.computeIfAbsent(info.locale(), k -> new LinkedHashMap<>()).put(value, func == null ? info.value() : func.apply(value, info.value()));
|
||||
}
|
||||
}
|
||||
try {
|
||||
File propPath = new File(System.getProperty(RESNAME_APP_CONF_DIR, new File(System.getProperty(RESNAME_APP_HOME, ""), "conf").getPath()));
|
||||
File homePath = new File(System.getProperty("redkale.application.home", ""), "conf");
|
||||
File propPath = new File(System.getProperty("redkale.application.confPath", homePath.getPath()));
|
||||
if (propPath.isDirectory() && propPath.canRead()) {
|
||||
final String prefix = clazz.getSimpleName().toLowerCase();
|
||||
for (File propFile : propPath.listFiles(f -> f.getName().startsWith(prefix) && f.getName().endsWith(".properties"))) {
|
||||
@@ -93,7 +101,9 @@ public @interface RetLabel {
|
||||
in.close();
|
||||
prop.forEach((k, v) -> {
|
||||
int retcode = Integer.parseInt(k.toString());
|
||||
if (defrets.containsKey(retcode)) defrets.put(retcode, v.toString());
|
||||
if (defrets.containsKey(retcode)) {
|
||||
defrets.put(retcode, v.toString());
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
499
src/main/java/org/redkale/source/SourceModuleEngine.java
Normal file
499
src/main/java/org/redkale/source/SourceModuleEngine.java
Normal file
@@ -0,0 +1,499 @@
|
||||
/*
|
||||
*
|
||||
*/
|
||||
package org.redkale.source;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Properties;
|
||||
import java.util.ServiceLoader;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.logging.Level;
|
||||
import org.redkale.annotation.Resource;
|
||||
import org.redkale.boot.Application;
|
||||
import org.redkale.boot.ModuleEngine;
|
||||
import org.redkale.net.Servlet;
|
||||
import org.redkale.net.sncp.Sncp;
|
||||
import org.redkale.service.Service;
|
||||
import org.redkale.util.AnyValue;
|
||||
import org.redkale.util.InstanceProvider;
|
||||
import org.redkale.util.RedkaleClassLoader;
|
||||
import org.redkale.util.RedkaleException;
|
||||
import org.redkale.util.ResourceEvent;
|
||||
import org.redkale.util.ResourceFactory;
|
||||
import org.redkale.util.ResourceTypeLoader;
|
||||
import org.redkale.util.Utility;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author zhangjx
|
||||
*/
|
||||
public class SourceModuleEngine extends ModuleEngine {
|
||||
|
||||
//Source 原始的配置资源, 只会存在redkale.datasource(.|[) redkale.cachesource(.|[)开头的配置项
|
||||
private final Properties sourceProperties = new Properties();
|
||||
|
||||
//CacheSource 资源
|
||||
private final List<CacheSource> cacheSources = new CopyOnWriteArrayList<>();
|
||||
|
||||
private final ReentrantLock cacheSourceLock = new ReentrantLock();
|
||||
|
||||
//DataSource 资源
|
||||
private final List<DataSource> dataSources = new CopyOnWriteArrayList<>();
|
||||
|
||||
private final ReentrantLock dataSourceLock = new ReentrantLock();
|
||||
|
||||
//原生sql解析器
|
||||
private DataNativeSqlParser nativeSqlParser;
|
||||
|
||||
public SourceModuleEngine(Application application) {
|
||||
super(application);
|
||||
}
|
||||
|
||||
/**
|
||||
* 配置项加载后被调用
|
||||
*/
|
||||
@Override
|
||||
public void onEnvironmentLoaded(Properties props) {
|
||||
props.forEach((key, val) -> {
|
||||
if (key.toString().startsWith("redkale.datasource.") || key.toString().startsWith("redkale.datasource[")
|
||||
|| key.toString().startsWith("redkale.cachesource.") || key.toString().startsWith("redkale.cachesource[")) {
|
||||
if (key.toString().endsWith(".name")) {
|
||||
logger.log(Level.WARNING, "skip illegal key " + key + " in source config, key cannot endsWith '.name'");
|
||||
} else {
|
||||
this.sourceProperties.put(key, val);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 进入Application.init方法时被调用
|
||||
*/
|
||||
public void onAppPreInit() {
|
||||
//加载原生sql解析器
|
||||
Iterator<DataNativeSqlParserProvider> it = ServiceLoader.load(DataNativeSqlParserProvider.class, application.getClassLoader()).iterator();
|
||||
RedkaleClassLoader.putServiceLoader(DataNativeSqlParserProvider.class);
|
||||
List<DataNativeSqlParserProvider> providers = new ArrayList<>();
|
||||
while (it.hasNext()) {
|
||||
DataNativeSqlParserProvider provider = it.next();
|
||||
if (provider != null && provider.acceptsConf(null)) {
|
||||
RedkaleClassLoader.putReflectionPublicConstructors(provider.getClass(), provider.getClass().getName());
|
||||
providers.add(provider);
|
||||
}
|
||||
}
|
||||
for (DataNativeSqlParserProvider provider : InstanceProvider.sort(providers)) {
|
||||
this.nativeSqlParser = provider.createInstance();
|
||||
this.resourceFactory.register(DataNativeSqlParser.class, this.nativeSqlParser);
|
||||
break; //only first provider
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 结束Application.init方法前被调用
|
||||
*/
|
||||
@Override
|
||||
public void onAppPostInit() {
|
||||
//------------------------------------- 注册 DataSource --------------------------------------------------------
|
||||
resourceFactory.register((ResourceFactory rf, String srcResourceName, final Object srcObj, String resourceName, Field field, final Object attachment) -> {
|
||||
try {
|
||||
if (field.getAnnotation(Resource.class) == null && field.getAnnotation(javax.annotation.Resource.class) == null) {
|
||||
return null;
|
||||
}
|
||||
if ((srcObj instanceof Service) && Sncp.isRemote((Service) srcObj)) {
|
||||
return null; //远程模式不得注入 DataSource
|
||||
}
|
||||
DataSource source = loadDataSource(resourceName, false);
|
||||
field.set(srcObj, source);
|
||||
return source;
|
||||
} catch (Exception e) {
|
||||
logger.log(Level.SEVERE, "DataSource inject to " + srcObj + " error", e);
|
||||
return null;
|
||||
}
|
||||
}, DataSource.class);
|
||||
|
||||
//------------------------------------- 注册 CacheSource --------------------------------------------------------
|
||||
resourceFactory.register(new ResourceTypeLoader() {
|
||||
@Override
|
||||
public Object load(ResourceFactory rf, String srcResourceName, final Object srcObj, final String resourceName, Field field, final Object attachment) {
|
||||
try {
|
||||
if (field.getAnnotation(Resource.class) == null && field.getAnnotation(javax.annotation.Resource.class) == null) {
|
||||
return null;
|
||||
}
|
||||
if ((srcObj instanceof Service) && Sncp.isRemote((Service) srcObj)) {
|
||||
return null; //远程模式不需要注入 CacheSource
|
||||
}
|
||||
if (srcObj instanceof Servlet) {
|
||||
throw new RedkaleException("CacheSource cannot inject in Servlet " + srcObj);
|
||||
}
|
||||
final boolean ws = (srcObj instanceof org.redkale.net.http.WebSocketNodeService);
|
||||
CacheSource source = loadCacheSource(resourceName, ws);
|
||||
field.set(srcObj, source);
|
||||
Resource res = field.getAnnotation(Resource.class);
|
||||
if (res != null && res.required() && source == null) {
|
||||
throw new RedkaleException("CacheSource (resourceName = '" + resourceName + "') not found");
|
||||
} else {
|
||||
logger.info("Load CacheSource (type = " + (source == null ? null : source.getClass().getSimpleName()) + ", resourceName = '" + resourceName + "')");
|
||||
}
|
||||
return source;
|
||||
} catch (Exception e) {
|
||||
logger.log(Level.SEVERE, "DataSource inject error", e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean autoNone() {
|
||||
return false;
|
||||
}
|
||||
}, CacheSource.class);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 配置项变更时被调用
|
||||
*
|
||||
* @param namespace 命名空间
|
||||
* @param events 变更项
|
||||
*/
|
||||
public void onEnvironmentChanged(String namespace, List<ResourceEvent> events) {
|
||||
Set<String> sourceRemovedKeys = new HashSet<>();
|
||||
Properties sourceChangedProps = new Properties();
|
||||
|
||||
for (ResourceEvent<String> event : events) {
|
||||
if (event.name().startsWith("redkale.datasource.") || event.name().startsWith("redkale.datasource[")
|
||||
|| event.name().startsWith("redkale.cachesource.") || event.name().startsWith("redkale.cachesource[")) {
|
||||
if (event.name().endsWith(".name")) {
|
||||
logger.log(Level.WARNING, "skip illegal key " + event.name() + " in source config " + (namespace == null ? "" : namespace) + ", key cannot endsWith '.name'");
|
||||
} else {
|
||||
if (!Objects.equals(event.newValue(), this.sourceProperties.getProperty(event.name()))) {
|
||||
if (event.newValue() == null) {
|
||||
if (this.sourceProperties.containsKey(event.name())) {
|
||||
sourceRemovedKeys.add(event.name());
|
||||
}
|
||||
} else {
|
||||
sourceChangedProps.put(event.name(), event.newValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
//数据源配置项的变更
|
||||
if (!sourceChangedProps.isEmpty() || !sourceRemovedKeys.isEmpty()) {
|
||||
Set<String> cacheSourceNames = new LinkedHashSet<>();
|
||||
Set<String> dataSourceNames = new LinkedHashSet<>();
|
||||
List<String> keys = new ArrayList<>();
|
||||
keys.addAll(sourceRemovedKeys);
|
||||
keys.addAll((Set) sourceChangedProps.keySet());
|
||||
for (final String key : keys) {
|
||||
if (key.startsWith("redkale.cachesource[")) {
|
||||
cacheSourceNames.add(key.substring("redkale.cachesource[".length(), key.indexOf(']')));
|
||||
} else if (key.startsWith("redkale.cachesource.")) {
|
||||
cacheSourceNames.add(key.substring("redkale.cachesource.".length(), key.indexOf('.', "redkale.cachesource.".length())));
|
||||
} else if (key.startsWith("redkale.datasource[")) {
|
||||
dataSourceNames.add(key.substring("redkale.datasource[".length(), key.indexOf(']')));
|
||||
} else if (key.startsWith("redkale.datasource.")) {
|
||||
dataSourceNames.add(key.substring("redkale.datasource.".length(), key.indexOf('.', "redkale.datasource.".length())));
|
||||
}
|
||||
}
|
||||
//更新缓存
|
||||
for (String sourceName : cacheSourceNames) {
|
||||
CacheSource source = Utility.find(cacheSources, s -> Objects.equals(s.resourceName(), sourceName));
|
||||
if (source == null) {
|
||||
continue; //多余的数据源
|
||||
}
|
||||
final AnyValue.DefaultAnyValue old = (AnyValue.DefaultAnyValue) findSourceConfig(sourceName, "cachesource");
|
||||
Properties newProps = new Properties();
|
||||
this.sourceProperties.forEach((k, v) -> {
|
||||
final String key = k.toString();
|
||||
String prefix = "redkale.cachesource[" + sourceName + "].";
|
||||
int pos = key.indexOf(prefix);
|
||||
if (pos < 0) {
|
||||
prefix = "redkale.cachesource." + sourceName + ".";
|
||||
pos = key.indexOf(prefix);
|
||||
}
|
||||
if (pos < 0) {
|
||||
return; //不是同一name数据源配置项
|
||||
}
|
||||
newProps.put(k, v);
|
||||
});
|
||||
List<ResourceEvent> changeEvents = new ArrayList<>();
|
||||
sourceChangedProps.forEach((k, v) -> {
|
||||
final String key = k.toString();
|
||||
String prefix = "redkale.cachesource[" + sourceName + "].";
|
||||
int pos = key.indexOf(prefix);
|
||||
if (pos < 0) {
|
||||
prefix = "redkale.cachesource." + sourceName + ".";
|
||||
pos = key.indexOf(prefix);
|
||||
}
|
||||
if (pos < 0) {
|
||||
return; //不是同一name数据源配置项
|
||||
}
|
||||
newProps.put(k, v);
|
||||
changeEvents.add(ResourceEvent.create(key.substring(prefix.length()), v, this.sourceProperties.getProperty(key)));
|
||||
});
|
||||
sourceRemovedKeys.forEach(k -> {
|
||||
final String key = k;
|
||||
String prefix = "redkale.cachesource[" + sourceName + "].";
|
||||
int pos = key.indexOf(prefix);
|
||||
if (pos < 0) {
|
||||
prefix = "redkale.cachesource." + sourceName + ".";
|
||||
pos = key.indexOf(prefix);
|
||||
}
|
||||
if (pos < 0) {
|
||||
return;
|
||||
}
|
||||
newProps.remove(k); //不是同一name数据源配置项
|
||||
changeEvents.add(ResourceEvent.create(key.substring(prefix.length()), null, this.sourceProperties.getProperty(key)));
|
||||
});
|
||||
if (!changeEvents.isEmpty()) {
|
||||
AnyValue.DefaultAnyValue back = old.copy();
|
||||
try {
|
||||
old.replace(AnyValue.loadFromProperties(newProps).getAnyValue("redkale").getAnyValue("cachesource").getAnyValue(sourceName));
|
||||
((AbstractCacheSource) source).onResourceChange(changeEvents.toArray(new ResourceEvent[changeEvents.size()]));
|
||||
} catch (RuntimeException e) {
|
||||
old.replace(back); //还原配置
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
//更新数据库
|
||||
for (String sourceName : dataSourceNames) {
|
||||
DataSource source = Utility.find(dataSources, s -> Objects.equals(s.resourceName(), sourceName));
|
||||
if (source == null) {
|
||||
continue; //多余的数据源
|
||||
}
|
||||
AnyValue.DefaultAnyValue old = (AnyValue.DefaultAnyValue) findSourceConfig(sourceName, "datasource");
|
||||
Properties newProps = new Properties();
|
||||
this.sourceProperties.forEach((k, v) -> {
|
||||
final String key = k.toString();
|
||||
String prefix = "redkale.datasource[" + sourceName + "].";
|
||||
int pos = key.indexOf(prefix);
|
||||
if (pos < 0) {
|
||||
prefix = "redkale.datasource." + sourceName + ".";
|
||||
pos = key.indexOf(prefix);
|
||||
}
|
||||
if (pos < 0) {
|
||||
return; //不是同一name数据源配置项
|
||||
}
|
||||
newProps.put(k, v);
|
||||
});
|
||||
List<ResourceEvent> changeEvents = new ArrayList<>();
|
||||
sourceChangedProps.forEach((k, v) -> {
|
||||
final String key = k.toString();
|
||||
String prefix = "redkale.datasource[" + sourceName + "].";
|
||||
int pos = key.indexOf(prefix);
|
||||
if (pos < 0) {
|
||||
prefix = "redkale.datasource." + sourceName + ".";
|
||||
pos = key.indexOf(prefix);
|
||||
}
|
||||
if (pos < 0) {
|
||||
return; //不是同一name数据源配置项
|
||||
}
|
||||
newProps.put(k, v);
|
||||
changeEvents.add(ResourceEvent.create(key.substring(prefix.length()), v, this.sourceProperties.getProperty(key)));
|
||||
});
|
||||
sourceRemovedKeys.forEach(k -> {
|
||||
final String key = k;
|
||||
String prefix = "redkale.datasource[" + sourceName + "].";
|
||||
int pos = key.indexOf(prefix);
|
||||
if (pos < 0) {
|
||||
prefix = "redkale.datasource." + sourceName + ".";
|
||||
pos = key.indexOf(prefix);
|
||||
}
|
||||
if (pos < 0) {
|
||||
return;
|
||||
}
|
||||
newProps.remove(k); //不是同一name数据源配置项
|
||||
changeEvents.add(ResourceEvent.create(key.substring(prefix.length()), null, this.sourceProperties.getProperty(key)));
|
||||
});
|
||||
if (!changeEvents.isEmpty()) {
|
||||
AnyValue.DefaultAnyValue back = old.copy();
|
||||
try {
|
||||
old.replace(AnyValue.loadFromProperties(newProps).getAnyValue("redkale").getAnyValue("datasource").getAnyValue(sourceName));
|
||||
((AbstractDataSource) source).onResourceChange(changeEvents.toArray(new ResourceEvent[changeEvents.size()]));
|
||||
} catch (RuntimeException e) {
|
||||
old.replace(back); //还原配置
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
sourceRemovedKeys.forEach(this.sourceProperties::remove);
|
||||
this.sourceProperties.putAll(sourceChangedProps);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 服务全部停掉后被调用
|
||||
*/
|
||||
public void onServersPostStop() {
|
||||
for (DataSource source : dataSources) {
|
||||
if (source == null) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
if (source instanceof Service) {
|
||||
long s = System.currentTimeMillis();
|
||||
((Service) source).destroy(Sncp.isSncpDyn((Service) source) ? Sncp.getResourceConf((Service) source) : null);
|
||||
logger.info(source + " destroy in " + (System.currentTimeMillis() - s) + " ms");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.log(Level.FINER, source.getClass() + " close DataSource erroneous", e);
|
||||
}
|
||||
}
|
||||
for (CacheSource source : cacheSources) {
|
||||
if (source == null) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
if (source instanceof Service) {
|
||||
long s = System.currentTimeMillis();
|
||||
((Service) source).destroy(Sncp.isSncpDyn((Service) source) ? Sncp.getResourceConf((Service) source) : null);
|
||||
logger.info(source + " destroy in " + (System.currentTimeMillis() - s) + " ms");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.log(Level.FINER, source.getClass() + " close CacheSource erroneous", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private CacheSource loadCacheSource(final String sourceName, boolean autoMemory) {
|
||||
cacheSourceLock.lock();
|
||||
try {
|
||||
long st = System.currentTimeMillis();
|
||||
CacheSource old = resourceFactory.find(sourceName, CacheSource.class);
|
||||
if (old != null) {
|
||||
return old;
|
||||
}
|
||||
final AnyValue sourceConf = findSourceConfig(sourceName, "cachesource");
|
||||
if (sourceConf == null) {
|
||||
if (!autoMemory) {
|
||||
return null;
|
||||
}
|
||||
CacheSource source = new CacheMemorySource(sourceName);
|
||||
cacheSources.add(source);
|
||||
resourceFactory.register(sourceName, CacheSource.class, source);
|
||||
if (!application.isCompileMode() && source instanceof Service) {
|
||||
((Service) source).init(sourceConf);
|
||||
}
|
||||
logger.info("Load CacheSource resourceName = '" + sourceName + "', source = " + source + " in " + (System.currentTimeMillis() - st) + " ms");
|
||||
return source;
|
||||
}
|
||||
if (!sourceConf.getValue(AbstractCacheSource.CACHE_SOURCE_RESOURCE, "").isEmpty()) {
|
||||
CacheSource source = loadCacheSource(sourceConf.getValue(AbstractCacheSource.CACHE_SOURCE_RESOURCE), autoMemory);
|
||||
if (source != null) {
|
||||
resourceFactory.register(sourceName, CacheSource.class, source);
|
||||
}
|
||||
return source;
|
||||
}
|
||||
try {
|
||||
CacheSource source = AbstractCacheSource.createCacheSource(application.getServerClassLoader(),
|
||||
resourceFactory, sourceConf, sourceName, application.isCompileMode());
|
||||
|
||||
cacheSources.add(source);
|
||||
resourceFactory.register(sourceName, CacheSource.class, source);
|
||||
logger.info("Load CacheSource resourceName = '" + sourceName + "', source = " + source + " in " + (System.currentTimeMillis() - st) + " ms");
|
||||
return source;
|
||||
} catch (RuntimeException ex) {
|
||||
throw ex;
|
||||
} catch (Exception e) {
|
||||
logger.log(Level.SEVERE, "load application CaheSource error: " + sourceConf, e);
|
||||
}
|
||||
return null;
|
||||
} finally {
|
||||
cacheSourceLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private DataSource loadDataSource(final String sourceName, boolean autoMemory) {
|
||||
dataSourceLock.lock();
|
||||
try {
|
||||
DataSource old = resourceFactory.find(sourceName, DataSource.class);
|
||||
if (old != null) {
|
||||
return old;
|
||||
}
|
||||
final AnyValue sourceConf = findSourceConfig(sourceName, "datasource");
|
||||
if (sourceConf == null) {
|
||||
if (!autoMemory) {
|
||||
return null;
|
||||
}
|
||||
DataSource source = new DataMemorySource(sourceName);
|
||||
if (!application.isCompileMode() && source instanceof Service) {
|
||||
resourceFactory.inject(sourceName, source);
|
||||
((Service) source).init(sourceConf);
|
||||
}
|
||||
dataSources.add(source);
|
||||
resourceFactory.register(sourceName, DataSource.class, source);
|
||||
logger.info("Load DataSource resourceName = '" + sourceName + "', source = " + source);
|
||||
return source;
|
||||
}
|
||||
if (!sourceConf.getValue(DataSources.DATA_SOURCE_RESOURCE, "").isEmpty()) {
|
||||
DataSource source = loadDataSource(sourceConf.getValue(DataSources.DATA_SOURCE_RESOURCE), autoMemory);
|
||||
if (source != null) {
|
||||
if (source instanceof DataMemorySource && source instanceof SearchSource) {
|
||||
resourceFactory.register(sourceName, SearchSource.class, source);
|
||||
} else {
|
||||
resourceFactory.register(sourceName, DataSource.class, source);
|
||||
if (source instanceof DataSqlSource) {
|
||||
resourceFactory.register(sourceName, DataSqlSource.class, source);
|
||||
}
|
||||
if (source instanceof DataJdbcSource) {
|
||||
resourceFactory.register(sourceName, DataJdbcSource.class, source);
|
||||
}
|
||||
}
|
||||
}
|
||||
return source;
|
||||
}
|
||||
try {
|
||||
DataSource source = DataSources.createDataSource(application.getServerClassLoader(),
|
||||
resourceFactory, sourceConf, sourceName, application.isCompileMode());
|
||||
|
||||
if (!application.isCompileMode() && source instanceof Service) {
|
||||
resourceFactory.inject(sourceName, source);
|
||||
((Service) source).init(sourceConf);
|
||||
}
|
||||
dataSources.add(source);
|
||||
if (source instanceof DataMemorySource && source instanceof SearchSource) {
|
||||
resourceFactory.register(sourceName, SearchSource.class, source);
|
||||
} else {
|
||||
resourceFactory.register(sourceName, DataSource.class, source);
|
||||
if (source instanceof DataSqlSource) {
|
||||
resourceFactory.register(sourceName, DataSqlSource.class, source);
|
||||
}
|
||||
if (source instanceof DataJdbcSource) {
|
||||
resourceFactory.register(sourceName, DataJdbcSource.class, source);
|
||||
}
|
||||
}
|
||||
logger.info("Load DataSource resourceName = '" + sourceName + "', source = " + source);
|
||||
return source;
|
||||
} catch (RuntimeException ex) {
|
||||
throw ex;
|
||||
} catch (Exception e) {
|
||||
logger.log(Level.SEVERE, "load application DataSource error: " + sourceConf, e);
|
||||
}
|
||||
return null;
|
||||
} finally {
|
||||
dataSourceLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private AnyValue findSourceConfig(String sourceName, String sourceType) {
|
||||
AnyValue sourceNode = application.getAppConfig().getAnyValue(sourceType);
|
||||
if (sourceNode != null) {
|
||||
AnyValue confNode = sourceNode.getAnyValue(sourceName);
|
||||
if (confNode != null) { //必须要设置name属性
|
||||
((AnyValue.DefaultAnyValue) confNode).setValue("name", sourceName);
|
||||
}
|
||||
return confNode;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -372,7 +372,7 @@ public final class ResourceFactory {
|
||||
*
|
||||
* @return 旧资源对象
|
||||
*/
|
||||
public <A> A register(final String name, final A val) {
|
||||
public <A> A register(final String name, @Nonnull final A val) {
|
||||
return register(true, name, val);
|
||||
}
|
||||
|
||||
@@ -386,7 +386,7 @@ public final class ResourceFactory {
|
||||
*
|
||||
* @return 旧资源对象
|
||||
*/
|
||||
public <A> A register(final boolean autoSync, final String name, final A val) {
|
||||
public <A> A register(final boolean autoSync, final String name, @Nonnull final A val) {
|
||||
checkResourceName(name);
|
||||
final Class<?> claz = val.getClass();
|
||||
|
||||
@@ -422,7 +422,7 @@ public final class ResourceFactory {
|
||||
*
|
||||
* @return 旧资源对象
|
||||
*/
|
||||
public <A> A register(final String name, final Class<? extends A> clazz, final A val) {
|
||||
public <A> A register(final String name, final Class<? extends A> clazz, @Nullable final A val) {
|
||||
return register(true, name, clazz, val);
|
||||
}
|
||||
|
||||
@@ -436,7 +436,7 @@ public final class ResourceFactory {
|
||||
*
|
||||
* @return 旧资源对象
|
||||
*/
|
||||
public <A> A register(final String name, final Type clazz, final A val) {
|
||||
public <A> A register(final String name, final Type clazz, @Nullable final A val) {
|
||||
return register(true, name, clazz, val);
|
||||
}
|
||||
|
||||
@@ -1092,6 +1092,7 @@ public final class ResourceFactory {
|
||||
|
||||
public final String name;
|
||||
|
||||
@Nullable
|
||||
public final T value;
|
||||
|
||||
public final List<ResourceElement> elements;
|
||||
|
||||
Reference in New Issue
Block a user