From 9942a194a6287eb3b78b15d9c2f226dce9ba4c31 Mon Sep 17 00:00:00 2001 From: redkale Date: Tue, 19 Dec 2023 10:49:25 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/boot/Application.java | 423 +++++++++--------- .../cache/spi/CacheManagerService.java | 15 +- .../redkale/source/SourceModuleEngine.java | 4 +- 3 files changed, 232 insertions(+), 210 deletions(-) diff --git a/src/main/java/org/redkale/boot/Application.java b/src/main/java/org/redkale/boot/Application.java index 36ec3decf..acee8a788 100644 --- a/src/main/java/org/redkale/boot/Application.java +++ b/src/main/java/org/redkale/boot/Application.java @@ -160,6 +160,9 @@ public final class Application { //@since 2.8.0 final LoggingModule loggingModule = new LoggingModule(this); + //数据源组件 + private final SourceModuleEngine sourceModule = new SourceModuleEngine(this); + //NodeServer 资源, 顺序必须是sncps, others, watchs final List servers = new CopyOnWriteArrayList<>(); @@ -303,7 +306,7 @@ public final class Application { this.resourceFactory.register("protobufconvert", Convert.class, ProtobufFactory.root().getConvert()); //系统内部模块组件 - moduleEngines.add(new SourceModuleEngine(this)); + moduleEngines.add(sourceModule); //放第一,很多module依赖于source moduleEngines.add(new MessageModuleEngine(this)); moduleEngines.add(new ClusterModuleEngine(this)); moduleEngines.add(new CacheModuleEngine(this)); @@ -315,7 +318,7 @@ public final class Application { //打印基础信息日志 logger.log(Level.INFO, colorMessage(logger, 36, 1, "-------------------------------- Redkale " + Redkale.getDotedVersion() + " --------------------------------")); - final String confDir = this.confDir.toString(); + final String confDirStr = this.confDir.toString(); logger.log(Level.INFO, "APP_OS = " + System.getProperty("os.name") + " " + System.getProperty("os.version") + " " + System.getProperty("os.arch") + "\r\n" + "APP_JAVA = " + System.getProperty("java.runtime.name", System.getProperty("org.graalvm.nativeimage.kind") != null ? "Nativeimage" : "") + " " + System.getProperty("java.runtime.version", System.getProperty("java.vendor.version", System.getProperty("java.vm.version"))) + "\r\n" //graalvm.nativeimage 模式下无 java.runtime.xxx 属性 @@ -325,12 +328,12 @@ public final class Application { + "APP_LOADER = " + this.classLoader.getClass().getSimpleName() + "\r\n" + RESNAME_APP_ADDR + " = " + this.localAddress.getHostString() + ":" + this.localAddress.getPort() + "\r\n" + RESNAME_APP_HOME + " = " + this.home.getPath().replace('\\', '/') + "\r\n" - + RESNAME_APP_CONF_DIR + " = " + confDir.substring(confDir.indexOf('!') + 1)); + + RESNAME_APP_CONF_DIR + " = " + confDirStr.substring(confDirStr.indexOf('!') + 1)); if (!compileMode && !(classLoader instanceof RedkaleClassLoader.RedkaleCacheClassLoader)) { String lib = environment.getPropertyValue(config.getValue("lib", "${APP_HOME}/libs/*").trim()); - lib = Utility.isEmpty(lib) ? confDir : (lib + ";" + confDir); - Server.loadLib(classLoader, logger, lib.isEmpty() ? confDir : (lib + ";" + confDir)); + lib = Utility.isEmpty(lib) ? confDirStr : (lib + ";" + confDirStr); + Server.loadLib(classLoader, logger, lib.isEmpty() ? confDirStr : (lib + ";" + confDirStr)); } this.shutdownLatch = new CountDownLatch(config.getAnyValues("server").length + 1); } @@ -352,68 +355,6 @@ public final class Application { this.onAppPostInit(); } - private void registerResourceEnvs(boolean first, Properties... envs) { - for (Properties props : envs) { - props.forEach((k, v) -> { - String val = environment.getPropertyValue(v.toString(), envs); - if (k.toString().startsWith("system.property.")) { - String key = k.toString().substring("system.property.".length()); - if (System.getProperty(key) == null || !first) { - System.setProperty(key, val); - } - resourceFactory.register(!first, k.toString(), val); - } else if (k.toString().startsWith("mimetype.property.")) { - MimeType.add(k.toString().substring("mimetype.property.".length()), val); - } else { - resourceFactory.register(!first, k.toString(), val); - } - }); - } - } - - /** - * 设置WorkExecutor - */ - private void initWorkExecutor() { - int bufferCapacity = 32 * 1024; - int bufferPoolSize = Utility.cpus() * 8; - final AnyValue executorConf = config.getAnyValue("executor", true); - StringBuilder executorLog = new StringBuilder(); - - final int workThreads = Math.max(Utility.cpus(), executorConf.getIntValue("threads", Utility.cpus() * 4)); - //指定threads则不使用虚拟线程池 - this.workExecutor = executorConf.getValue("threads") != null - ? WorkThread.createExecutor(workThreads, "Redkale-WorkThread-%s") - : WorkThread.createWorkExecutor(workThreads, "Redkale-WorkThread-%s"); - String executorName = this.workExecutor.getClass().getSimpleName(); - executorLog.append("defaultWorkExecutor: {type=").append(executorName); - if (executorName.contains("VirtualExecutor") || executorName.contains("PerTaskExecutor")) { - executorLog.append(", threads=[virtual]}"); - } else { - executorLog.append(", threads=").append(workThreads).append("}"); - } - - ExecutorService clientWorkExecutor = this.workExecutor; - if (executorName.contains("VirtualExecutor") || executorName.contains("PerTaskExecutor")) { - executorLog.append(", clientWorkExecutor: [workExecutor]"); - } else { - //给所有client给一个新的默认ExecutorService - int clientThreads = executorConf.getIntValue("clients", Utility.cpus() * 4); - clientWorkExecutor = WorkThread.createWorkExecutor(clientThreads, "Redkale-DefaultClient-WorkThread-%s"); - executorLog.append(", threads=").append(clientThreads).append("}"); - } - AsyncIOGroup ioGroup = new AsyncIOGroup("Redkale-DefaultClient-IOThread-%s", clientWorkExecutor, bufferCapacity, bufferPoolSize).skipClose(true); - this.clientAsyncGroup = ioGroup.start(); - - if (executorLog.length() > 0) { - logger.log(Level.INFO, executorLog.toString()); - } - this.resourceFactory.register(RESNAME_APP_EXECUTOR, Executor.class, this.workExecutor); - this.resourceFactory.register(RESNAME_APP_EXECUTOR, ExecutorService.class, this.workExecutor); - this.resourceFactory.register(RESNAME_APP_CLIENT_ASYNCGROUP, AsyncGroup.class, this.clientAsyncGroup); - this.resourceFactory.register(RESNAME_APP_CLIENT_ASYNCGROUP, AsyncIOGroup.class, this.clientAsyncGroup); - } - private void initResourceTypeLoader() { final Application application = this; //只有WatchService才能加载Application、WatchFactory @@ -579,6 +520,68 @@ public final class Application { }, HttpRpcClient.class); } + private void registerResourceEnvs(boolean first, Properties... envs) { + for (Properties props : envs) { + props.forEach((k, v) -> { + String val = environment.getPropertyValue(v.toString(), envs); + if (k.toString().startsWith("system.property.")) { + String key = k.toString().substring("system.property.".length()); + if (System.getProperty(key) == null || !first) { + System.setProperty(key, val); + } + resourceFactory.register(!first, k.toString(), val); + } else if (k.toString().startsWith("mimetype.property.")) { + MimeType.add(k.toString().substring("mimetype.property.".length()), val); + } else { + resourceFactory.register(!first, k.toString(), val); + } + }); + } + } + + /** + * 设置WorkExecutor + */ + private void initWorkExecutor() { + int bufferCapacity = 32 * 1024; + int bufferPoolSize = Utility.cpus() * 8; + final AnyValue executorConf = config.getAnyValue("executor", true); + StringBuilder executorLog = new StringBuilder(); + + final int workThreads = Math.max(Utility.cpus(), executorConf.getIntValue("threads", Utility.cpus() * 4)); + //指定threads则不使用虚拟线程池 + this.workExecutor = executorConf.getValue("threads") != null + ? WorkThread.createExecutor(workThreads, "Redkale-WorkThread-%s") + : WorkThread.createWorkExecutor(workThreads, "Redkale-WorkThread-%s"); + String executorName = this.workExecutor.getClass().getSimpleName(); + executorLog.append("defaultWorkExecutor: {type=").append(executorName); + if (executorName.contains("VirtualExecutor") || executorName.contains("PerTaskExecutor")) { + executorLog.append(", threads=[virtual]}"); + } else { + executorLog.append(", threads=").append(workThreads).append("}"); + } + + ExecutorService clientWorkExecutor = this.workExecutor; + if (executorName.contains("VirtualExecutor") || executorName.contains("PerTaskExecutor")) { + executorLog.append(", clientWorkExecutor: [workExecutor]"); + } else { + //给所有client给一个新的默认ExecutorService + int clientThreads = executorConf.getIntValue("clients", Utility.cpus() * 4); + clientWorkExecutor = WorkThread.createWorkExecutor(clientThreads, "Redkale-DefaultClient-WorkThread-%s"); + executorLog.append(", threads=").append(clientThreads).append("}"); + } + AsyncIOGroup ioGroup = new AsyncIOGroup("Redkale-DefaultClient-IOThread-%s", clientWorkExecutor, bufferCapacity, bufferPoolSize).skipClose(true); + this.clientAsyncGroup = ioGroup.start(); + + if (executorLog.length() > 0) { + logger.log(Level.INFO, executorLog.toString()); + } + this.resourceFactory.register(RESNAME_APP_EXECUTOR, Executor.class, this.workExecutor); + this.resourceFactory.register(RESNAME_APP_EXECUTOR, ExecutorService.class, this.workExecutor); + this.resourceFactory.register(RESNAME_APP_CLIENT_ASYNCGROUP, AsyncGroup.class, this.clientAsyncGroup); + this.resourceFactory.register(RESNAME_APP_CLIENT_ASYNCGROUP, AsyncIOGroup.class, this.clientAsyncGroup); + } + private void initAppListeners() throws Exception { //------------------------------------------------------------------------ for (AnyValue conf : config.getAnyValues("group")) { @@ -1065,129 +1068,6 @@ public final class Application { } } - void loadClassesByFilters(final ClassFilter... filters) throws IOException { - ClassFilter.Loader.load(getHome(), this.serverClassLoader, filters); - } - - List getModuleEngines() { - return moduleEngines; - } - - //使用了nohup或使用了后台&,Runtime.getRuntime().addShutdownHook失效 - private void signalShutdownHandle() { - Consumer> signalShutdownConsumer = Utility.signalShutdownConsumer(); - if (signalShutdownConsumer == null) { - return; - } - signalShutdownConsumer.accept(sig -> { - try { - long s = System.currentTimeMillis(); - logger.info(Application.this.getClass().getSimpleName() + " shutdowning " + sig); - shutdown(); - long e = System.currentTimeMillis() - s; - logger.info(Application.this.getClass().getSimpleName() + " shutdown in " + e + " ms"); - } catch (Exception ex) { - logger.log(Level.INFO, "Shutdown fail", ex); - } finally { - shutdownLatch.countDown(); - } - }); - } - - @SuppressWarnings("unchecked") - private void runServers(CountDownLatch serverCdl, final List serverConfs) throws Exception { - CountDownLatch serviceCdl = new CountDownLatch(serverConfs.size()); - CountDownLatch startCdl = new CountDownLatch(serverConfs.size()); - final AtomicBoolean inited = new AtomicBoolean(false); - final ReentrantLock nodeLock = new ReentrantLock(); - final Map> nodeClasses = new HashMap<>(); - for (final AnyValue serconf : serverConfs) { - Thread thread = new Thread() { - { - setName("Redkale-" + serconf.getValue("protocol", "Server").toUpperCase().replaceFirst("\\..+", "") + ":" + serconf.getIntValue("port") + "-Thread"); - this.setDaemon(true); - } - - @Override - public void run() { - try { - //Thread ctd = Thread.currentThread(); - //ctd.setContextClassLoader(new URLClassLoader(new URL[0], ctd.getContextClassLoader())); - final String protocol = serconf.getValue("protocol", "").replaceFirst("\\..+", "").toUpperCase(); - NodeServer server = null; - if ("SNCP".equals(protocol)) { - server = NodeSncpServer.createNodeServer(Application.this, serconf); - } else if ("WATCH".equalsIgnoreCase(protocol)) { - AnyValueWriter serconf2 = (AnyValueWriter) serconf; - AnyValueWriter rest = (AnyValueWriter) serconf2.getAnyValue("rest"); - if (rest == null) { - rest = new AnyValueWriter(); - serconf2.addValue("rest", rest); - } - rest.setValue("base", WatchServlet.class.getName()); - server = new NodeWatchServer(Application.this, serconf); - } else if ("HTTP".equalsIgnoreCase(protocol)) { - server = new NodeHttpServer(Application.this, serconf); - } else { - if (!inited.get()) { - nodeLock.lock(); - try { - if (!inited.getAndSet(true)) { //加载自定义的协议,如:SOCKS - ClassFilter profilter = new ClassFilter(classLoader, NodeProtocol.class, NodeServer.class, (Class[]) null); - ClassFilter.Loader.load(home, classLoader, profilter); - final Set> entrys = profilter.getFilterEntrys(); - for (FilterEntry entry : entrys) { - final Class type = entry.getType(); - NodeProtocol pros = type.getAnnotation(NodeProtocol.class); - String p = pros.value().toUpperCase(); - if ("SNCP".equals(p) || "HTTP".equals(p)) { - continue; - } - final Class old = nodeClasses.get(p); - if (old != null && old != type) { - throw new RedkaleException("Protocol(" + p + ") had NodeServer-Class(" - + old.getName() + ") but repeat NodeServer-Class(" + type.getName() + ")"); - } - nodeClasses.put(p, type); - } - } - } finally { - nodeLock.unlock(); - } - } - Class nodeClass = nodeClasses.get(protocol); - if (nodeClass != null) { - server = NodeServer.create(nodeClass, Application.this, serconf); - } - } - if (server == null) { - logger.log(Level.SEVERE, "Not found Server Class for protocol({0})", serconf.getValue("protocol")); - Utility.sleep(100); - System.exit(1); - } - server.serviceCdl = serviceCdl; - server.init(serconf); - if (!singletonMode && !compileMode) { - server.start(); - } else if (compileMode) { - server.getServer().getDispatcherServlet().init(server.getServer().getContext(), serconf); - } - servers.add(server); - serverCdl.countDown(); - startCdl.countDown(); - } catch (Exception ex) { - logger.log(Level.WARNING, serconf + " runServers error", ex); - Application.this.shutdownLatch.countDown(); - Utility.sleep(100); - System.exit(1); - } - } - }; - thread.start(); - } - startCdl.await(); - } - /** * 实例化单个Service * @@ -1317,22 +1197,6 @@ public final class Application { System.exit(0); //必须要有 } - private static String generateHelp() { - return "" - + "Usage: redkale [command] [arguments]\r\n" - + "Command: \r\n" - + " start, startup start one process\r\n" - + " --conf-file=[file] application config file, eg. application.xml、application.properties\r\n" - + " shutdown, stop shutdown one process\r\n" - + " --conf-file=[file] application config file, eg. application.xml、application.properties\r\n" - + " restart restart one process\r\n" - + " --conf-file=[file] application config file, eg. application.xml、application.properties\r\n" - + " apidoc generate apidoc\r\n" - + " --api-skiprpc=[true|false] skip @RestService(rpcOnly=true) service or @RestMapping(rpcOnly=true) method, default is true\r\n" - + " --api-host=[url] api root url, default is http://localhost\r\n" - + " help, -h, --help show this help\r\n"; - } - public List command(String cmd, String[] params) { List localServers = new ArrayList<>(servers); //顺序sncps, others, watchs List results = new ArrayList<>(); @@ -1371,6 +1235,116 @@ public final class Application { LoggingBaseHandler.traceEnable = true; } + private static String generateHelp() { + return "" + + "Usage: redkale [command] [arguments]\r\n" + + "Command: \r\n" + + " start, startup start one process\r\n" + + " --conf-file=[file] application config file, eg. application.xml、application.properties\r\n" + + " shutdown, stop shutdown one process\r\n" + + " --conf-file=[file] application config file, eg. application.xml、application.properties\r\n" + + " restart restart one process\r\n" + + " --conf-file=[file] application config file, eg. application.xml、application.properties\r\n" + + " apidoc generate apidoc\r\n" + + " --api-skiprpc=[true|false] skip @RestService(rpcOnly=true) service or @RestMapping(rpcOnly=true) method, default is true\r\n" + + " --api-host=[url] api root url, default is http://localhost\r\n" + + " help, -h, --help show this help\r\n"; + } + + @SuppressWarnings("unchecked") + private void runServers(CountDownLatch serverCdl, final List serverConfs) throws Exception { + CountDownLatch serviceCdl = new CountDownLatch(serverConfs.size()); + CountDownLatch startCdl = new CountDownLatch(serverConfs.size()); + final AtomicBoolean inited = new AtomicBoolean(false); + final ReentrantLock nodeLock = new ReentrantLock(); + final Map> nodeClasses = new HashMap<>(); + for (final AnyValue serconf : serverConfs) { + Thread thread = new Thread() { + { + setName("Redkale-" + serconf.getValue("protocol", "Server").toUpperCase().replaceFirst("\\..+", "") + ":" + serconf.getIntValue("port") + "-Thread"); + this.setDaemon(true); + } + + @Override + public void run() { + try { + //Thread ctd = Thread.currentThread(); + //ctd.setContextClassLoader(new URLClassLoader(new URL[0], ctd.getContextClassLoader())); + final String protocol = serconf.getValue("protocol", "").replaceFirst("\\..+", "").toUpperCase(); + NodeServer server = null; + if ("SNCP".equals(protocol)) { + server = NodeSncpServer.createNodeServer(Application.this, serconf); + } else if ("WATCH".equalsIgnoreCase(protocol)) { + AnyValueWriter serconf2 = (AnyValueWriter) serconf; + AnyValueWriter rest = (AnyValueWriter) serconf2.getAnyValue("rest"); + if (rest == null) { + rest = new AnyValueWriter(); + serconf2.addValue("rest", rest); + } + rest.setValue("base", WatchServlet.class.getName()); + server = new NodeWatchServer(Application.this, serconf); + } else if ("HTTP".equalsIgnoreCase(protocol)) { + server = new NodeHttpServer(Application.this, serconf); + } else { + if (!inited.get()) { + nodeLock.lock(); + try { + if (!inited.getAndSet(true)) { //加载自定义的协议,如:SOCKS + ClassFilter profilter = new ClassFilter(classLoader, NodeProtocol.class, NodeServer.class, (Class[]) null); + ClassFilter.Loader.load(home, classLoader, profilter); + final Set> entrys = profilter.getFilterEntrys(); + for (FilterEntry entry : entrys) { + final Class type = entry.getType(); + NodeProtocol pros = type.getAnnotation(NodeProtocol.class); + String p = pros.value().toUpperCase(); + if ("SNCP".equals(p) || "HTTP".equals(p)) { + continue; + } + final Class old = nodeClasses.get(p); + if (old != null && old != type) { + throw new RedkaleException("Protocol(" + p + ") had NodeServer-Class(" + + old.getName() + ") but repeat NodeServer-Class(" + type.getName() + ")"); + } + nodeClasses.put(p, type); + } + } + } finally { + nodeLock.unlock(); + } + } + Class nodeClass = nodeClasses.get(protocol); + if (nodeClass != null) { + server = NodeServer.create(nodeClass, Application.this, serconf); + } + } + if (server == null) { + logger.log(Level.SEVERE, "Not found Server Class for protocol({0})", serconf.getValue("protocol")); + Utility.sleep(100); + System.exit(1); + } + server.serviceCdl = serviceCdl; + server.init(serconf); + if (!singletonMode && !compileMode) { + server.start(); + } else if (compileMode) { + server.getServer().getDispatcherServlet().init(server.getServer().getContext(), serconf); + } + servers.add(server); + serverCdl.countDown(); + startCdl.countDown(); + } catch (Exception ex) { + logger.log(Level.WARNING, serconf + " runServers error", ex); + Application.this.shutdownLatch.countDown(); + Utility.sleep(100); + System.exit(1); + } + } + }; + thread.start(); + } + startCdl.await(); + } + private void stopServers() { this.onServersPreStop(); List localServers = new ArrayList<>(servers); //顺序sncps, others, watchs @@ -1387,6 +1361,43 @@ public final class Application { this.onServersPostStop(); } + //使用了nohup或使用了后台&,Runtime.getRuntime().addShutdownHook失效 + private void signalShutdownHandle() { + Consumer> signalShutdownConsumer = Utility.signalShutdownConsumer(); + if (signalShutdownConsumer == null) { + return; + } + signalShutdownConsumer.accept(sig -> { + try { + long s = System.currentTimeMillis(); + logger.info(Application.this.getClass().getSimpleName() + " shutdowning " + sig); + shutdown(); + long e = System.currentTimeMillis() - s; + logger.info(Application.this.getClass().getSimpleName() + " shutdown in " + e + " ms"); + } catch (Exception ex) { + logger.log(Level.INFO, "Shutdown fail", ex); + } finally { + shutdownLatch.countDown(); + } + }); + } + + void loadClassesByFilters(final ClassFilter... filters) throws IOException { + ClassFilter.Loader.load(getHome(), this.serverClassLoader, filters); + } + + List getModuleEngines() { + return moduleEngines; + } + + public DataSource loadDataSource(final String sourceName, boolean autoMemory) { + return sourceModule.loadDataSource(sourceName, autoMemory); + } + + public CacheSource loadCacheSource(final String sourceName, boolean autoMemory) { + return sourceModule.loadCacheSource(sourceName, autoMemory); + } + public ExecutorService getWorkExecutor() { return workExecutor; } diff --git a/src/main/java/org/redkale/cache/spi/CacheManagerService.java b/src/main/java/org/redkale/cache/spi/CacheManagerService.java index 3197b7f2c..ae28e2980 100644 --- a/src/main/java/org/redkale/cache/spi/CacheManagerService.java +++ b/src/main/java/org/redkale/cache/spi/CacheManagerService.java @@ -96,9 +96,9 @@ public class CacheManagerService implements CacheManager, Service { this.enabled = conf.getBoolValue("enabled", true); if (this.enabled) { this.localSource.init(conf); - String remoteSourceName = conf.getValue("source"); + String remoteSourceName = conf.getValue("remote"); if (remoteSource == null && application != null && Utility.isNotBlank(remoteSourceName)) { - CacheSource source = application.getResourceFactory().find(remoteSourceName, CacheSource.class); + CacheSource source = application.loadCacheSource(remoteSourceName, false); if (source == null) { throw new RedkaleException("Not found CacheSource '" + remoteSourceName + "'"); } @@ -156,6 +156,7 @@ public class CacheManagerService implements CacheManager, Service { * * @return 数据值 */ + @Override public T localGet(final String hash, final String key, final Type type, boolean nullable, Duration expire, Supplier supplier) { return get(localSource::hget, localSource::hset, hash, key, type, nullable, expire, supplier); } @@ -261,6 +262,7 @@ public class CacheManagerService implements CacheManager, Service { * * @return 数据值 */ + @Override public T remoteGet(final String hash, final String key, final Type type, boolean nullable, Duration expire, Supplier supplier) { return get(remoteSource::hget, remoteSource::hset, hash, key, type, nullable, expire, supplier); } @@ -278,6 +280,7 @@ public class CacheManagerService implements CacheManager, Service { * * @return 数据值 */ + @Override public CompletableFuture remoteGetAsync(String hash, String key, Type type, boolean nullable, Duration expire, Supplier> supplier) { return getAsync(remoteSource::hgetAsync, remoteSource::hsetAsync, hash, key, type, nullable, expire, supplier); } @@ -292,6 +295,7 @@ public class CacheManagerService implements CacheManager, Service { * @param value 数据值 * @param expire 过期时长,Duration.ZERO为永不过期 */ + @Override public void remoteSet(final String hash, final String key, final Type type, final T value, Duration expire) { checkEnable(); Objects.requireNonNull(expire); @@ -310,6 +314,7 @@ public class CacheManagerService implements CacheManager, Service { * @param value 数据值 * @param expire 过期时长,Duration.ZERO为永不过期 */ + @Override public CompletableFuture remoteSetAsync(String hash, String key, Type type, T value, Duration expire) { checkEnable(); Objects.requireNonNull(expire); @@ -326,6 +331,7 @@ public class CacheManagerService implements CacheManager, Service { * * @return 删除数量 */ + @Override public long remoteDel(String hash, String key) { checkEnable(); return remoteSource.hdel(hash, key); @@ -339,6 +345,7 @@ public class CacheManagerService implements CacheManager, Service { * * @return 删除数量 */ + @Override public CompletableFuture remoteDelAsync(String hash, String key) { checkEnable(); return remoteSource.hdelAsync(hash, key); @@ -453,6 +460,7 @@ public class CacheManagerService implements CacheManager, Service { * @param localExpire 本地过期时长,Duration.ZERO为永不过期,为null表示不本地缓存 * @param remoteExpire 远程过期时长,Duration.ZERO为永不过期,为null表示不远程缓存 */ + @Override public void bothSet(final String hash, final String key, final Type type, final T value, Duration localExpire, Duration remoteExpire) { checkEnable(); Type cacheType = loadCacheType(type, value); @@ -477,6 +485,7 @@ public class CacheManagerService implements CacheManager, Service { * * @return void */ + @Override public CompletableFuture bothSetAsync(String hash, String key, Type type, T value, Duration localExpire, Duration remoteExpire) { checkEnable(); Type cacheType = loadCacheType(type, value); @@ -498,6 +507,7 @@ public class CacheManagerService implements CacheManager, Service { * * @return 删除数量 */ + @Override public long bothDel(String hash, String key) { checkEnable(); long v = localSource.hdel(hash, key); @@ -516,6 +526,7 @@ public class CacheManagerService implements CacheManager, Service { * * @return 删除数量 */ + @Override public CompletableFuture bothDelAsync(String hash, String key) { checkEnable(); long v = localSource.hdel(hash, key); //内存操作,无需异步 diff --git a/src/main/java/org/redkale/source/SourceModuleEngine.java b/src/main/java/org/redkale/source/SourceModuleEngine.java index abd3a1728..1197571cb 100644 --- a/src/main/java/org/redkale/source/SourceModuleEngine.java +++ b/src/main/java/org/redkale/source/SourceModuleEngine.java @@ -332,7 +332,7 @@ public class SourceModuleEngine extends ModuleEngine { } } - private CacheSource loadCacheSource(final String sourceName, boolean autoMemory) { + public CacheSource loadCacheSource(final String sourceName, boolean autoMemory) { cacheSourceLock.lock(); try { long st = System.currentTimeMillis(); @@ -380,7 +380,7 @@ public class SourceModuleEngine extends ModuleEngine { } } - private DataSource loadDataSource(final String sourceName, boolean autoMemory) { + public DataSource loadDataSource(final String sourceName, boolean autoMemory) { dataSourceLock.lock(); try { DataSource old = resourceFactory.find(sourceName, DataSource.class);