优化
This commit is contained in:
@@ -160,6 +160,9 @@ public final class Application {
|
|||||||
//@since 2.8.0
|
//@since 2.8.0
|
||||||
final LoggingModule loggingModule = new LoggingModule(this);
|
final LoggingModule loggingModule = new LoggingModule(this);
|
||||||
|
|
||||||
|
//数据源组件
|
||||||
|
private final SourceModuleEngine sourceModule = new SourceModuleEngine(this);
|
||||||
|
|
||||||
//NodeServer 资源, 顺序必须是sncps, others, watchs
|
//NodeServer 资源, 顺序必须是sncps, others, watchs
|
||||||
final List<NodeServer> servers = new CopyOnWriteArrayList<>();
|
final List<NodeServer> servers = new CopyOnWriteArrayList<>();
|
||||||
|
|
||||||
@@ -303,7 +306,7 @@ public final class Application {
|
|||||||
this.resourceFactory.register("protobufconvert", Convert.class, ProtobufFactory.root().getConvert());
|
this.resourceFactory.register("protobufconvert", Convert.class, ProtobufFactory.root().getConvert());
|
||||||
|
|
||||||
//系统内部模块组件
|
//系统内部模块组件
|
||||||
moduleEngines.add(new SourceModuleEngine(this));
|
moduleEngines.add(sourceModule); //放第一,很多module依赖于source
|
||||||
moduleEngines.add(new MessageModuleEngine(this));
|
moduleEngines.add(new MessageModuleEngine(this));
|
||||||
moduleEngines.add(new ClusterModuleEngine(this));
|
moduleEngines.add(new ClusterModuleEngine(this));
|
||||||
moduleEngines.add(new CacheModuleEngine(this));
|
moduleEngines.add(new CacheModuleEngine(this));
|
||||||
@@ -315,7 +318,7 @@ public final class Application {
|
|||||||
//打印基础信息日志
|
//打印基础信息日志
|
||||||
logger.log(Level.INFO, colorMessage(logger, 36, 1, "-------------------------------- Redkale " + Redkale.getDotedVersion() + " --------------------------------"));
|
logger.log(Level.INFO, colorMessage(logger, 36, 1, "-------------------------------- Redkale " + Redkale.getDotedVersion() + " --------------------------------"));
|
||||||
|
|
||||||
final String confDir = this.confDir.toString();
|
final String confDirStr = this.confDir.toString();
|
||||||
logger.log(Level.INFO, "APP_OS = " + System.getProperty("os.name") + " " + System.getProperty("os.version") + " " + System.getProperty("os.arch") + "\r\n"
|
logger.log(Level.INFO, "APP_OS = " + System.getProperty("os.name") + " " + System.getProperty("os.version") + " " + System.getProperty("os.arch") + "\r\n"
|
||||||
+ "APP_JAVA = " + System.getProperty("java.runtime.name", System.getProperty("org.graalvm.nativeimage.kind") != null ? "Nativeimage" : "")
|
+ "APP_JAVA = " + System.getProperty("java.runtime.name", System.getProperty("org.graalvm.nativeimage.kind") != null ? "Nativeimage" : "")
|
||||||
+ " " + System.getProperty("java.runtime.version", System.getProperty("java.vendor.version", System.getProperty("java.vm.version"))) + "\r\n" //graalvm.nativeimage 模式下无 java.runtime.xxx 属性
|
+ " " + System.getProperty("java.runtime.version", System.getProperty("java.vendor.version", System.getProperty("java.vm.version"))) + "\r\n" //graalvm.nativeimage 模式下无 java.runtime.xxx 属性
|
||||||
@@ -325,12 +328,12 @@ public final class Application {
|
|||||||
+ "APP_LOADER = " + this.classLoader.getClass().getSimpleName() + "\r\n"
|
+ "APP_LOADER = " + this.classLoader.getClass().getSimpleName() + "\r\n"
|
||||||
+ RESNAME_APP_ADDR + " = " + this.localAddress.getHostString() + ":" + this.localAddress.getPort() + "\r\n"
|
+ RESNAME_APP_ADDR + " = " + this.localAddress.getHostString() + ":" + this.localAddress.getPort() + "\r\n"
|
||||||
+ RESNAME_APP_HOME + " = " + this.home.getPath().replace('\\', '/') + "\r\n"
|
+ RESNAME_APP_HOME + " = " + this.home.getPath().replace('\\', '/') + "\r\n"
|
||||||
+ RESNAME_APP_CONF_DIR + " = " + confDir.substring(confDir.indexOf('!') + 1));
|
+ RESNAME_APP_CONF_DIR + " = " + confDirStr.substring(confDirStr.indexOf('!') + 1));
|
||||||
|
|
||||||
if (!compileMode && !(classLoader instanceof RedkaleClassLoader.RedkaleCacheClassLoader)) {
|
if (!compileMode && !(classLoader instanceof RedkaleClassLoader.RedkaleCacheClassLoader)) {
|
||||||
String lib = environment.getPropertyValue(config.getValue("lib", "${APP_HOME}/libs/*").trim());
|
String lib = environment.getPropertyValue(config.getValue("lib", "${APP_HOME}/libs/*").trim());
|
||||||
lib = Utility.isEmpty(lib) ? confDir : (lib + ";" + confDir);
|
lib = Utility.isEmpty(lib) ? confDirStr : (lib + ";" + confDirStr);
|
||||||
Server.loadLib(classLoader, logger, lib.isEmpty() ? confDir : (lib + ";" + confDir));
|
Server.loadLib(classLoader, logger, lib.isEmpty() ? confDirStr : (lib + ";" + confDirStr));
|
||||||
}
|
}
|
||||||
this.shutdownLatch = new CountDownLatch(config.getAnyValues("server").length + 1);
|
this.shutdownLatch = new CountDownLatch(config.getAnyValues("server").length + 1);
|
||||||
}
|
}
|
||||||
@@ -352,68 +355,6 @@ public final class Application {
|
|||||||
this.onAppPostInit();
|
this.onAppPostInit();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void registerResourceEnvs(boolean first, Properties... envs) {
|
|
||||||
for (Properties props : envs) {
|
|
||||||
props.forEach((k, v) -> {
|
|
||||||
String val = environment.getPropertyValue(v.toString(), envs);
|
|
||||||
if (k.toString().startsWith("system.property.")) {
|
|
||||||
String key = k.toString().substring("system.property.".length());
|
|
||||||
if (System.getProperty(key) == null || !first) {
|
|
||||||
System.setProperty(key, val);
|
|
||||||
}
|
|
||||||
resourceFactory.register(!first, k.toString(), val);
|
|
||||||
} else if (k.toString().startsWith("mimetype.property.")) {
|
|
||||||
MimeType.add(k.toString().substring("mimetype.property.".length()), val);
|
|
||||||
} else {
|
|
||||||
resourceFactory.register(!first, k.toString(), val);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 设置WorkExecutor
|
|
||||||
*/
|
|
||||||
private void initWorkExecutor() {
|
|
||||||
int bufferCapacity = 32 * 1024;
|
|
||||||
int bufferPoolSize = Utility.cpus() * 8;
|
|
||||||
final AnyValue executorConf = config.getAnyValue("executor", true);
|
|
||||||
StringBuilder executorLog = new StringBuilder();
|
|
||||||
|
|
||||||
final int workThreads = Math.max(Utility.cpus(), executorConf.getIntValue("threads", Utility.cpus() * 4));
|
|
||||||
//指定threads则不使用虚拟线程池
|
|
||||||
this.workExecutor = executorConf.getValue("threads") != null
|
|
||||||
? WorkThread.createExecutor(workThreads, "Redkale-WorkThread-%s")
|
|
||||||
: WorkThread.createWorkExecutor(workThreads, "Redkale-WorkThread-%s");
|
|
||||||
String executorName = this.workExecutor.getClass().getSimpleName();
|
|
||||||
executorLog.append("defaultWorkExecutor: {type=").append(executorName);
|
|
||||||
if (executorName.contains("VirtualExecutor") || executorName.contains("PerTaskExecutor")) {
|
|
||||||
executorLog.append(", threads=[virtual]}");
|
|
||||||
} else {
|
|
||||||
executorLog.append(", threads=").append(workThreads).append("}");
|
|
||||||
}
|
|
||||||
|
|
||||||
ExecutorService clientWorkExecutor = this.workExecutor;
|
|
||||||
if (executorName.contains("VirtualExecutor") || executorName.contains("PerTaskExecutor")) {
|
|
||||||
executorLog.append(", clientWorkExecutor: [workExecutor]");
|
|
||||||
} else {
|
|
||||||
//给所有client给一个新的默认ExecutorService
|
|
||||||
int clientThreads = executorConf.getIntValue("clients", Utility.cpus() * 4);
|
|
||||||
clientWorkExecutor = WorkThread.createWorkExecutor(clientThreads, "Redkale-DefaultClient-WorkThread-%s");
|
|
||||||
executorLog.append(", threads=").append(clientThreads).append("}");
|
|
||||||
}
|
|
||||||
AsyncIOGroup ioGroup = new AsyncIOGroup("Redkale-DefaultClient-IOThread-%s", clientWorkExecutor, bufferCapacity, bufferPoolSize).skipClose(true);
|
|
||||||
this.clientAsyncGroup = ioGroup.start();
|
|
||||||
|
|
||||||
if (executorLog.length() > 0) {
|
|
||||||
logger.log(Level.INFO, executorLog.toString());
|
|
||||||
}
|
|
||||||
this.resourceFactory.register(RESNAME_APP_EXECUTOR, Executor.class, this.workExecutor);
|
|
||||||
this.resourceFactory.register(RESNAME_APP_EXECUTOR, ExecutorService.class, this.workExecutor);
|
|
||||||
this.resourceFactory.register(RESNAME_APP_CLIENT_ASYNCGROUP, AsyncGroup.class, this.clientAsyncGroup);
|
|
||||||
this.resourceFactory.register(RESNAME_APP_CLIENT_ASYNCGROUP, AsyncIOGroup.class, this.clientAsyncGroup);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void initResourceTypeLoader() {
|
private void initResourceTypeLoader() {
|
||||||
final Application application = this;
|
final Application application = this;
|
||||||
//只有WatchService才能加载Application、WatchFactory
|
//只有WatchService才能加载Application、WatchFactory
|
||||||
@@ -579,6 +520,68 @@ public final class Application {
|
|||||||
}, HttpRpcClient.class);
|
}, HttpRpcClient.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void registerResourceEnvs(boolean first, Properties... envs) {
|
||||||
|
for (Properties props : envs) {
|
||||||
|
props.forEach((k, v) -> {
|
||||||
|
String val = environment.getPropertyValue(v.toString(), envs);
|
||||||
|
if (k.toString().startsWith("system.property.")) {
|
||||||
|
String key = k.toString().substring("system.property.".length());
|
||||||
|
if (System.getProperty(key) == null || !first) {
|
||||||
|
System.setProperty(key, val);
|
||||||
|
}
|
||||||
|
resourceFactory.register(!first, k.toString(), val);
|
||||||
|
} else if (k.toString().startsWith("mimetype.property.")) {
|
||||||
|
MimeType.add(k.toString().substring("mimetype.property.".length()), val);
|
||||||
|
} else {
|
||||||
|
resourceFactory.register(!first, k.toString(), val);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 设置WorkExecutor
|
||||||
|
*/
|
||||||
|
private void initWorkExecutor() {
|
||||||
|
int bufferCapacity = 32 * 1024;
|
||||||
|
int bufferPoolSize = Utility.cpus() * 8;
|
||||||
|
final AnyValue executorConf = config.getAnyValue("executor", true);
|
||||||
|
StringBuilder executorLog = new StringBuilder();
|
||||||
|
|
||||||
|
final int workThreads = Math.max(Utility.cpus(), executorConf.getIntValue("threads", Utility.cpus() * 4));
|
||||||
|
//指定threads则不使用虚拟线程池
|
||||||
|
this.workExecutor = executorConf.getValue("threads") != null
|
||||||
|
? WorkThread.createExecutor(workThreads, "Redkale-WorkThread-%s")
|
||||||
|
: WorkThread.createWorkExecutor(workThreads, "Redkale-WorkThread-%s");
|
||||||
|
String executorName = this.workExecutor.getClass().getSimpleName();
|
||||||
|
executorLog.append("defaultWorkExecutor: {type=").append(executorName);
|
||||||
|
if (executorName.contains("VirtualExecutor") || executorName.contains("PerTaskExecutor")) {
|
||||||
|
executorLog.append(", threads=[virtual]}");
|
||||||
|
} else {
|
||||||
|
executorLog.append(", threads=").append(workThreads).append("}");
|
||||||
|
}
|
||||||
|
|
||||||
|
ExecutorService clientWorkExecutor = this.workExecutor;
|
||||||
|
if (executorName.contains("VirtualExecutor") || executorName.contains("PerTaskExecutor")) {
|
||||||
|
executorLog.append(", clientWorkExecutor: [workExecutor]");
|
||||||
|
} else {
|
||||||
|
//给所有client给一个新的默认ExecutorService
|
||||||
|
int clientThreads = executorConf.getIntValue("clients", Utility.cpus() * 4);
|
||||||
|
clientWorkExecutor = WorkThread.createWorkExecutor(clientThreads, "Redkale-DefaultClient-WorkThread-%s");
|
||||||
|
executorLog.append(", threads=").append(clientThreads).append("}");
|
||||||
|
}
|
||||||
|
AsyncIOGroup ioGroup = new AsyncIOGroup("Redkale-DefaultClient-IOThread-%s", clientWorkExecutor, bufferCapacity, bufferPoolSize).skipClose(true);
|
||||||
|
this.clientAsyncGroup = ioGroup.start();
|
||||||
|
|
||||||
|
if (executorLog.length() > 0) {
|
||||||
|
logger.log(Level.INFO, executorLog.toString());
|
||||||
|
}
|
||||||
|
this.resourceFactory.register(RESNAME_APP_EXECUTOR, Executor.class, this.workExecutor);
|
||||||
|
this.resourceFactory.register(RESNAME_APP_EXECUTOR, ExecutorService.class, this.workExecutor);
|
||||||
|
this.resourceFactory.register(RESNAME_APP_CLIENT_ASYNCGROUP, AsyncGroup.class, this.clientAsyncGroup);
|
||||||
|
this.resourceFactory.register(RESNAME_APP_CLIENT_ASYNCGROUP, AsyncIOGroup.class, this.clientAsyncGroup);
|
||||||
|
}
|
||||||
|
|
||||||
private void initAppListeners() throws Exception {
|
private void initAppListeners() throws Exception {
|
||||||
//------------------------------------------------------------------------
|
//------------------------------------------------------------------------
|
||||||
for (AnyValue conf : config.getAnyValues("group")) {
|
for (AnyValue conf : config.getAnyValues("group")) {
|
||||||
@@ -1065,129 +1068,6 @@ public final class Application {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void loadClassesByFilters(final ClassFilter... filters) throws IOException {
|
|
||||||
ClassFilter.Loader.load(getHome(), this.serverClassLoader, filters);
|
|
||||||
}
|
|
||||||
|
|
||||||
List<ModuleEngine> getModuleEngines() {
|
|
||||||
return moduleEngines;
|
|
||||||
}
|
|
||||||
|
|
||||||
//使用了nohup或使用了后台&,Runtime.getRuntime().addShutdownHook失效
|
|
||||||
private void signalShutdownHandle() {
|
|
||||||
Consumer<Consumer<String>> signalShutdownConsumer = Utility.signalShutdownConsumer();
|
|
||||||
if (signalShutdownConsumer == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
signalShutdownConsumer.accept(sig -> {
|
|
||||||
try {
|
|
||||||
long s = System.currentTimeMillis();
|
|
||||||
logger.info(Application.this.getClass().getSimpleName() + " shutdowning " + sig);
|
|
||||||
shutdown();
|
|
||||||
long e = System.currentTimeMillis() - s;
|
|
||||||
logger.info(Application.this.getClass().getSimpleName() + " shutdown in " + e + " ms");
|
|
||||||
} catch (Exception ex) {
|
|
||||||
logger.log(Level.INFO, "Shutdown fail", ex);
|
|
||||||
} finally {
|
|
||||||
shutdownLatch.countDown();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private void runServers(CountDownLatch serverCdl, final List<AnyValue> serverConfs) throws Exception {
|
|
||||||
CountDownLatch serviceCdl = new CountDownLatch(serverConfs.size());
|
|
||||||
CountDownLatch startCdl = new CountDownLatch(serverConfs.size());
|
|
||||||
final AtomicBoolean inited = new AtomicBoolean(false);
|
|
||||||
final ReentrantLock nodeLock = new ReentrantLock();
|
|
||||||
final Map<String, Class<? extends NodeServer>> nodeClasses = new HashMap<>();
|
|
||||||
for (final AnyValue serconf : serverConfs) {
|
|
||||||
Thread thread = new Thread() {
|
|
||||||
{
|
|
||||||
setName("Redkale-" + serconf.getValue("protocol", "Server").toUpperCase().replaceFirst("\\..+", "") + ":" + serconf.getIntValue("port") + "-Thread");
|
|
||||||
this.setDaemon(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
//Thread ctd = Thread.currentThread();
|
|
||||||
//ctd.setContextClassLoader(new URLClassLoader(new URL[0], ctd.getContextClassLoader()));
|
|
||||||
final String protocol = serconf.getValue("protocol", "").replaceFirst("\\..+", "").toUpperCase();
|
|
||||||
NodeServer server = null;
|
|
||||||
if ("SNCP".equals(protocol)) {
|
|
||||||
server = NodeSncpServer.createNodeServer(Application.this, serconf);
|
|
||||||
} else if ("WATCH".equalsIgnoreCase(protocol)) {
|
|
||||||
AnyValueWriter serconf2 = (AnyValueWriter) serconf;
|
|
||||||
AnyValueWriter rest = (AnyValueWriter) serconf2.getAnyValue("rest");
|
|
||||||
if (rest == null) {
|
|
||||||
rest = new AnyValueWriter();
|
|
||||||
serconf2.addValue("rest", rest);
|
|
||||||
}
|
|
||||||
rest.setValue("base", WatchServlet.class.getName());
|
|
||||||
server = new NodeWatchServer(Application.this, serconf);
|
|
||||||
} else if ("HTTP".equalsIgnoreCase(protocol)) {
|
|
||||||
server = new NodeHttpServer(Application.this, serconf);
|
|
||||||
} else {
|
|
||||||
if (!inited.get()) {
|
|
||||||
nodeLock.lock();
|
|
||||||
try {
|
|
||||||
if (!inited.getAndSet(true)) { //加载自定义的协议,如:SOCKS
|
|
||||||
ClassFilter profilter = new ClassFilter(classLoader, NodeProtocol.class, NodeServer.class, (Class[]) null);
|
|
||||||
ClassFilter.Loader.load(home, classLoader, profilter);
|
|
||||||
final Set<FilterEntry<NodeServer>> entrys = profilter.getFilterEntrys();
|
|
||||||
for (FilterEntry<NodeServer> entry : entrys) {
|
|
||||||
final Class<? extends NodeServer> type = entry.getType();
|
|
||||||
NodeProtocol pros = type.getAnnotation(NodeProtocol.class);
|
|
||||||
String p = pros.value().toUpperCase();
|
|
||||||
if ("SNCP".equals(p) || "HTTP".equals(p)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
final Class<? extends NodeServer> old = nodeClasses.get(p);
|
|
||||||
if (old != null && old != type) {
|
|
||||||
throw new RedkaleException("Protocol(" + p + ") had NodeServer-Class("
|
|
||||||
+ old.getName() + ") but repeat NodeServer-Class(" + type.getName() + ")");
|
|
||||||
}
|
|
||||||
nodeClasses.put(p, type);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
nodeLock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Class<? extends NodeServer> nodeClass = nodeClasses.get(protocol);
|
|
||||||
if (nodeClass != null) {
|
|
||||||
server = NodeServer.create(nodeClass, Application.this, serconf);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (server == null) {
|
|
||||||
logger.log(Level.SEVERE, "Not found Server Class for protocol({0})", serconf.getValue("protocol"));
|
|
||||||
Utility.sleep(100);
|
|
||||||
System.exit(1);
|
|
||||||
}
|
|
||||||
server.serviceCdl = serviceCdl;
|
|
||||||
server.init(serconf);
|
|
||||||
if (!singletonMode && !compileMode) {
|
|
||||||
server.start();
|
|
||||||
} else if (compileMode) {
|
|
||||||
server.getServer().getDispatcherServlet().init(server.getServer().getContext(), serconf);
|
|
||||||
}
|
|
||||||
servers.add(server);
|
|
||||||
serverCdl.countDown();
|
|
||||||
startCdl.countDown();
|
|
||||||
} catch (Exception ex) {
|
|
||||||
logger.log(Level.WARNING, serconf + " runServers error", ex);
|
|
||||||
Application.this.shutdownLatch.countDown();
|
|
||||||
Utility.sleep(100);
|
|
||||||
System.exit(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
thread.start();
|
|
||||||
}
|
|
||||||
startCdl.await();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 实例化单个Service
|
* 实例化单个Service
|
||||||
*
|
*
|
||||||
@@ -1317,22 +1197,6 @@ public final class Application {
|
|||||||
System.exit(0); //必须要有
|
System.exit(0); //必须要有
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String generateHelp() {
|
|
||||||
return ""
|
|
||||||
+ "Usage: redkale [command] [arguments]\r\n"
|
|
||||||
+ "Command: \r\n"
|
|
||||||
+ " start, startup start one process\r\n"
|
|
||||||
+ " --conf-file=[file] application config file, eg. application.xml、application.properties\r\n"
|
|
||||||
+ " shutdown, stop shutdown one process\r\n"
|
|
||||||
+ " --conf-file=[file] application config file, eg. application.xml、application.properties\r\n"
|
|
||||||
+ " restart restart one process\r\n"
|
|
||||||
+ " --conf-file=[file] application config file, eg. application.xml、application.properties\r\n"
|
|
||||||
+ " apidoc generate apidoc\r\n"
|
|
||||||
+ " --api-skiprpc=[true|false] skip @RestService(rpcOnly=true) service or @RestMapping(rpcOnly=true) method, default is true\r\n"
|
|
||||||
+ " --api-host=[url] api root url, default is http://localhost\r\n"
|
|
||||||
+ " help, -h, --help show this help\r\n";
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<Object> command(String cmd, String[] params) {
|
public List<Object> command(String cmd, String[] params) {
|
||||||
List<NodeServer> localServers = new ArrayList<>(servers); //顺序sncps, others, watchs
|
List<NodeServer> localServers = new ArrayList<>(servers); //顺序sncps, others, watchs
|
||||||
List<Object> results = new ArrayList<>();
|
List<Object> results = new ArrayList<>();
|
||||||
@@ -1371,6 +1235,116 @@ public final class Application {
|
|||||||
LoggingBaseHandler.traceEnable = true;
|
LoggingBaseHandler.traceEnable = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static String generateHelp() {
|
||||||
|
return ""
|
||||||
|
+ "Usage: redkale [command] [arguments]\r\n"
|
||||||
|
+ "Command: \r\n"
|
||||||
|
+ " start, startup start one process\r\n"
|
||||||
|
+ " --conf-file=[file] application config file, eg. application.xml、application.properties\r\n"
|
||||||
|
+ " shutdown, stop shutdown one process\r\n"
|
||||||
|
+ " --conf-file=[file] application config file, eg. application.xml、application.properties\r\n"
|
||||||
|
+ " restart restart one process\r\n"
|
||||||
|
+ " --conf-file=[file] application config file, eg. application.xml、application.properties\r\n"
|
||||||
|
+ " apidoc generate apidoc\r\n"
|
||||||
|
+ " --api-skiprpc=[true|false] skip @RestService(rpcOnly=true) service or @RestMapping(rpcOnly=true) method, default is true\r\n"
|
||||||
|
+ " --api-host=[url] api root url, default is http://localhost\r\n"
|
||||||
|
+ " help, -h, --help show this help\r\n";
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private void runServers(CountDownLatch serverCdl, final List<AnyValue> serverConfs) throws Exception {
|
||||||
|
CountDownLatch serviceCdl = new CountDownLatch(serverConfs.size());
|
||||||
|
CountDownLatch startCdl = new CountDownLatch(serverConfs.size());
|
||||||
|
final AtomicBoolean inited = new AtomicBoolean(false);
|
||||||
|
final ReentrantLock nodeLock = new ReentrantLock();
|
||||||
|
final Map<String, Class<? extends NodeServer>> nodeClasses = new HashMap<>();
|
||||||
|
for (final AnyValue serconf : serverConfs) {
|
||||||
|
Thread thread = new Thread() {
|
||||||
|
{
|
||||||
|
setName("Redkale-" + serconf.getValue("protocol", "Server").toUpperCase().replaceFirst("\\..+", "") + ":" + serconf.getIntValue("port") + "-Thread");
|
||||||
|
this.setDaemon(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
//Thread ctd = Thread.currentThread();
|
||||||
|
//ctd.setContextClassLoader(new URLClassLoader(new URL[0], ctd.getContextClassLoader()));
|
||||||
|
final String protocol = serconf.getValue("protocol", "").replaceFirst("\\..+", "").toUpperCase();
|
||||||
|
NodeServer server = null;
|
||||||
|
if ("SNCP".equals(protocol)) {
|
||||||
|
server = NodeSncpServer.createNodeServer(Application.this, serconf);
|
||||||
|
} else if ("WATCH".equalsIgnoreCase(protocol)) {
|
||||||
|
AnyValueWriter serconf2 = (AnyValueWriter) serconf;
|
||||||
|
AnyValueWriter rest = (AnyValueWriter) serconf2.getAnyValue("rest");
|
||||||
|
if (rest == null) {
|
||||||
|
rest = new AnyValueWriter();
|
||||||
|
serconf2.addValue("rest", rest);
|
||||||
|
}
|
||||||
|
rest.setValue("base", WatchServlet.class.getName());
|
||||||
|
server = new NodeWatchServer(Application.this, serconf);
|
||||||
|
} else if ("HTTP".equalsIgnoreCase(protocol)) {
|
||||||
|
server = new NodeHttpServer(Application.this, serconf);
|
||||||
|
} else {
|
||||||
|
if (!inited.get()) {
|
||||||
|
nodeLock.lock();
|
||||||
|
try {
|
||||||
|
if (!inited.getAndSet(true)) { //加载自定义的协议,如:SOCKS
|
||||||
|
ClassFilter profilter = new ClassFilter(classLoader, NodeProtocol.class, NodeServer.class, (Class[]) null);
|
||||||
|
ClassFilter.Loader.load(home, classLoader, profilter);
|
||||||
|
final Set<FilterEntry<NodeServer>> entrys = profilter.getFilterEntrys();
|
||||||
|
for (FilterEntry<NodeServer> entry : entrys) {
|
||||||
|
final Class<? extends NodeServer> type = entry.getType();
|
||||||
|
NodeProtocol pros = type.getAnnotation(NodeProtocol.class);
|
||||||
|
String p = pros.value().toUpperCase();
|
||||||
|
if ("SNCP".equals(p) || "HTTP".equals(p)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
final Class<? extends NodeServer> old = nodeClasses.get(p);
|
||||||
|
if (old != null && old != type) {
|
||||||
|
throw new RedkaleException("Protocol(" + p + ") had NodeServer-Class("
|
||||||
|
+ old.getName() + ") but repeat NodeServer-Class(" + type.getName() + ")");
|
||||||
|
}
|
||||||
|
nodeClasses.put(p, type);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
nodeLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Class<? extends NodeServer> nodeClass = nodeClasses.get(protocol);
|
||||||
|
if (nodeClass != null) {
|
||||||
|
server = NodeServer.create(nodeClass, Application.this, serconf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (server == null) {
|
||||||
|
logger.log(Level.SEVERE, "Not found Server Class for protocol({0})", serconf.getValue("protocol"));
|
||||||
|
Utility.sleep(100);
|
||||||
|
System.exit(1);
|
||||||
|
}
|
||||||
|
server.serviceCdl = serviceCdl;
|
||||||
|
server.init(serconf);
|
||||||
|
if (!singletonMode && !compileMode) {
|
||||||
|
server.start();
|
||||||
|
} else if (compileMode) {
|
||||||
|
server.getServer().getDispatcherServlet().init(server.getServer().getContext(), serconf);
|
||||||
|
}
|
||||||
|
servers.add(server);
|
||||||
|
serverCdl.countDown();
|
||||||
|
startCdl.countDown();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
logger.log(Level.WARNING, serconf + " runServers error", ex);
|
||||||
|
Application.this.shutdownLatch.countDown();
|
||||||
|
Utility.sleep(100);
|
||||||
|
System.exit(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
thread.start();
|
||||||
|
}
|
||||||
|
startCdl.await();
|
||||||
|
}
|
||||||
|
|
||||||
private void stopServers() {
|
private void stopServers() {
|
||||||
this.onServersPreStop();
|
this.onServersPreStop();
|
||||||
List<NodeServer> localServers = new ArrayList<>(servers); //顺序sncps, others, watchs
|
List<NodeServer> localServers = new ArrayList<>(servers); //顺序sncps, others, watchs
|
||||||
@@ -1387,6 +1361,43 @@ public final class Application {
|
|||||||
this.onServersPostStop();
|
this.onServersPostStop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//使用了nohup或使用了后台&,Runtime.getRuntime().addShutdownHook失效
|
||||||
|
private void signalShutdownHandle() {
|
||||||
|
Consumer<Consumer<String>> signalShutdownConsumer = Utility.signalShutdownConsumer();
|
||||||
|
if (signalShutdownConsumer == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
signalShutdownConsumer.accept(sig -> {
|
||||||
|
try {
|
||||||
|
long s = System.currentTimeMillis();
|
||||||
|
logger.info(Application.this.getClass().getSimpleName() + " shutdowning " + sig);
|
||||||
|
shutdown();
|
||||||
|
long e = System.currentTimeMillis() - s;
|
||||||
|
logger.info(Application.this.getClass().getSimpleName() + " shutdown in " + e + " ms");
|
||||||
|
} catch (Exception ex) {
|
||||||
|
logger.log(Level.INFO, "Shutdown fail", ex);
|
||||||
|
} finally {
|
||||||
|
shutdownLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
void loadClassesByFilters(final ClassFilter... filters) throws IOException {
|
||||||
|
ClassFilter.Loader.load(getHome(), this.serverClassLoader, filters);
|
||||||
|
}
|
||||||
|
|
||||||
|
List<ModuleEngine> getModuleEngines() {
|
||||||
|
return moduleEngines;
|
||||||
|
}
|
||||||
|
|
||||||
|
public DataSource loadDataSource(final String sourceName, boolean autoMemory) {
|
||||||
|
return sourceModule.loadDataSource(sourceName, autoMemory);
|
||||||
|
}
|
||||||
|
|
||||||
|
public CacheSource loadCacheSource(final String sourceName, boolean autoMemory) {
|
||||||
|
return sourceModule.loadCacheSource(sourceName, autoMemory);
|
||||||
|
}
|
||||||
|
|
||||||
public ExecutorService getWorkExecutor() {
|
public ExecutorService getWorkExecutor() {
|
||||||
return workExecutor;
|
return workExecutor;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -96,9 +96,9 @@ public class CacheManagerService implements CacheManager, Service {
|
|||||||
this.enabled = conf.getBoolValue("enabled", true);
|
this.enabled = conf.getBoolValue("enabled", true);
|
||||||
if (this.enabled) {
|
if (this.enabled) {
|
||||||
this.localSource.init(conf);
|
this.localSource.init(conf);
|
||||||
String remoteSourceName = conf.getValue("source");
|
String remoteSourceName = conf.getValue("remote");
|
||||||
if (remoteSource == null && application != null && Utility.isNotBlank(remoteSourceName)) {
|
if (remoteSource == null && application != null && Utility.isNotBlank(remoteSourceName)) {
|
||||||
CacheSource source = application.getResourceFactory().find(remoteSourceName, CacheSource.class);
|
CacheSource source = application.loadCacheSource(remoteSourceName, false);
|
||||||
if (source == null) {
|
if (source == null) {
|
||||||
throw new RedkaleException("Not found CacheSource '" + remoteSourceName + "'");
|
throw new RedkaleException("Not found CacheSource '" + remoteSourceName + "'");
|
||||||
}
|
}
|
||||||
@@ -156,6 +156,7 @@ public class CacheManagerService implements CacheManager, Service {
|
|||||||
*
|
*
|
||||||
* @return 数据值
|
* @return 数据值
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public <T> T localGet(final String hash, final String key, final Type type, boolean nullable, Duration expire, Supplier<T> supplier) {
|
public <T> T localGet(final String hash, final String key, final Type type, boolean nullable, Duration expire, Supplier<T> supplier) {
|
||||||
return get(localSource::hget, localSource::hset, hash, key, type, nullable, expire, supplier);
|
return get(localSource::hget, localSource::hset, hash, key, type, nullable, expire, supplier);
|
||||||
}
|
}
|
||||||
@@ -261,6 +262,7 @@ public class CacheManagerService implements CacheManager, Service {
|
|||||||
*
|
*
|
||||||
* @return 数据值
|
* @return 数据值
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public <T> T remoteGet(final String hash, final String key, final Type type, boolean nullable, Duration expire, Supplier<T> supplier) {
|
public <T> T remoteGet(final String hash, final String key, final Type type, boolean nullable, Duration expire, Supplier<T> supplier) {
|
||||||
return get(remoteSource::hget, remoteSource::hset, hash, key, type, nullable, expire, supplier);
|
return get(remoteSource::hget, remoteSource::hset, hash, key, type, nullable, expire, supplier);
|
||||||
}
|
}
|
||||||
@@ -278,6 +280,7 @@ public class CacheManagerService implements CacheManager, Service {
|
|||||||
*
|
*
|
||||||
* @return 数据值
|
* @return 数据值
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public <T> CompletableFuture<T> remoteGetAsync(String hash, String key, Type type, boolean nullable, Duration expire, Supplier<CompletableFuture<T>> supplier) {
|
public <T> CompletableFuture<T> remoteGetAsync(String hash, String key, Type type, boolean nullable, Duration expire, Supplier<CompletableFuture<T>> supplier) {
|
||||||
return getAsync(remoteSource::hgetAsync, remoteSource::hsetAsync, hash, key, type, nullable, expire, supplier);
|
return getAsync(remoteSource::hgetAsync, remoteSource::hsetAsync, hash, key, type, nullable, expire, supplier);
|
||||||
}
|
}
|
||||||
@@ -292,6 +295,7 @@ public class CacheManagerService implements CacheManager, Service {
|
|||||||
* @param value 数据值
|
* @param value 数据值
|
||||||
* @param expire 过期时长,Duration.ZERO为永不过期
|
* @param expire 过期时长,Duration.ZERO为永不过期
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public <T> void remoteSet(final String hash, final String key, final Type type, final T value, Duration expire) {
|
public <T> void remoteSet(final String hash, final String key, final Type type, final T value, Duration expire) {
|
||||||
checkEnable();
|
checkEnable();
|
||||||
Objects.requireNonNull(expire);
|
Objects.requireNonNull(expire);
|
||||||
@@ -310,6 +314,7 @@ public class CacheManagerService implements CacheManager, Service {
|
|||||||
* @param value 数据值
|
* @param value 数据值
|
||||||
* @param expire 过期时长,Duration.ZERO为永不过期
|
* @param expire 过期时长,Duration.ZERO为永不过期
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public <T> CompletableFuture<Void> remoteSetAsync(String hash, String key, Type type, T value, Duration expire) {
|
public <T> CompletableFuture<Void> remoteSetAsync(String hash, String key, Type type, T value, Duration expire) {
|
||||||
checkEnable();
|
checkEnable();
|
||||||
Objects.requireNonNull(expire);
|
Objects.requireNonNull(expire);
|
||||||
@@ -326,6 +331,7 @@ public class CacheManagerService implements CacheManager, Service {
|
|||||||
*
|
*
|
||||||
* @return 删除数量
|
* @return 删除数量
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public long remoteDel(String hash, String key) {
|
public long remoteDel(String hash, String key) {
|
||||||
checkEnable();
|
checkEnable();
|
||||||
return remoteSource.hdel(hash, key);
|
return remoteSource.hdel(hash, key);
|
||||||
@@ -339,6 +345,7 @@ public class CacheManagerService implements CacheManager, Service {
|
|||||||
*
|
*
|
||||||
* @return 删除数量
|
* @return 删除数量
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public CompletableFuture<Long> remoteDelAsync(String hash, String key) {
|
public CompletableFuture<Long> remoteDelAsync(String hash, String key) {
|
||||||
checkEnable();
|
checkEnable();
|
||||||
return remoteSource.hdelAsync(hash, key);
|
return remoteSource.hdelAsync(hash, key);
|
||||||
@@ -453,6 +460,7 @@ public class CacheManagerService implements CacheManager, Service {
|
|||||||
* @param localExpire 本地过期时长,Duration.ZERO为永不过期,为null表示不本地缓存
|
* @param localExpire 本地过期时长,Duration.ZERO为永不过期,为null表示不本地缓存
|
||||||
* @param remoteExpire 远程过期时长,Duration.ZERO为永不过期,为null表示不远程缓存
|
* @param remoteExpire 远程过期时长,Duration.ZERO为永不过期,为null表示不远程缓存
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public <T> void bothSet(final String hash, final String key, final Type type, final T value, Duration localExpire, Duration remoteExpire) {
|
public <T> void bothSet(final String hash, final String key, final Type type, final T value, Duration localExpire, Duration remoteExpire) {
|
||||||
checkEnable();
|
checkEnable();
|
||||||
Type cacheType = loadCacheType(type, value);
|
Type cacheType = loadCacheType(type, value);
|
||||||
@@ -477,6 +485,7 @@ public class CacheManagerService implements CacheManager, Service {
|
|||||||
*
|
*
|
||||||
* @return void
|
* @return void
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public <T> CompletableFuture<Void> bothSetAsync(String hash, String key, Type type, T value, Duration localExpire, Duration remoteExpire) {
|
public <T> CompletableFuture<Void> bothSetAsync(String hash, String key, Type type, T value, Duration localExpire, Duration remoteExpire) {
|
||||||
checkEnable();
|
checkEnable();
|
||||||
Type cacheType = loadCacheType(type, value);
|
Type cacheType = loadCacheType(type, value);
|
||||||
@@ -498,6 +507,7 @@ public class CacheManagerService implements CacheManager, Service {
|
|||||||
*
|
*
|
||||||
* @return 删除数量
|
* @return 删除数量
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public long bothDel(String hash, String key) {
|
public long bothDel(String hash, String key) {
|
||||||
checkEnable();
|
checkEnable();
|
||||||
long v = localSource.hdel(hash, key);
|
long v = localSource.hdel(hash, key);
|
||||||
@@ -516,6 +526,7 @@ public class CacheManagerService implements CacheManager, Service {
|
|||||||
*
|
*
|
||||||
* @return 删除数量
|
* @return 删除数量
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public CompletableFuture<Long> bothDelAsync(String hash, String key) {
|
public CompletableFuture<Long> bothDelAsync(String hash, String key) {
|
||||||
checkEnable();
|
checkEnable();
|
||||||
long v = localSource.hdel(hash, key); //内存操作,无需异步
|
long v = localSource.hdel(hash, key); //内存操作,无需异步
|
||||||
|
|||||||
@@ -332,7 +332,7 @@ public class SourceModuleEngine extends ModuleEngine {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private CacheSource loadCacheSource(final String sourceName, boolean autoMemory) {
|
public CacheSource loadCacheSource(final String sourceName, boolean autoMemory) {
|
||||||
cacheSourceLock.lock();
|
cacheSourceLock.lock();
|
||||||
try {
|
try {
|
||||||
long st = System.currentTimeMillis();
|
long st = System.currentTimeMillis();
|
||||||
@@ -380,7 +380,7 @@ public class SourceModuleEngine extends ModuleEngine {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private DataSource loadDataSource(final String sourceName, boolean autoMemory) {
|
public DataSource loadDataSource(final String sourceName, boolean autoMemory) {
|
||||||
dataSourceLock.lock();
|
dataSourceLock.lock();
|
||||||
try {
|
try {
|
||||||
DataSource old = resourceFactory.find(sourceName, DataSource.class);
|
DataSource old = resourceFactory.find(sourceName, DataSource.class);
|
||||||
|
|||||||
Reference in New Issue
Block a user