This commit is contained in:
redkale
2023-12-11 22:00:10 +08:00
parent 53f029052b
commit a2f6578503
6 changed files with 142 additions and 41 deletions

View File

@@ -22,6 +22,8 @@ import java.util.logging.*;
import org.redkale.annotation.Nonnull; import org.redkale.annotation.Nonnull;
import org.redkale.annotation.Resource; import org.redkale.annotation.Resource;
import org.redkale.boot.ClassFilter.FilterEntry; import org.redkale.boot.ClassFilter.FilterEntry;
import org.redkale.cache.CacheManager;
import org.redkale.cache.support.CacheManagerService;
import org.redkale.cluster.*; import org.redkale.cluster.*;
import org.redkale.convert.Convert; import org.redkale.convert.Convert;
import org.redkale.convert.bson.BsonFactory; import org.redkale.convert.bson.BsonFactory;
@@ -31,6 +33,7 @@ import org.redkale.mq.*;
import org.redkale.net.*; import org.redkale.net.*;
import org.redkale.net.http.*; import org.redkale.net.http.*;
import org.redkale.net.sncp.*; import org.redkale.net.sncp.*;
import org.redkale.schedule.ScheduleManager;
import org.redkale.schedule.support.ScheduleManagerService; import org.redkale.schedule.support.ScheduleManagerService;
import org.redkale.service.Service; import org.redkale.service.Service;
import org.redkale.source.*; import org.redkale.source.*;
@@ -211,6 +214,9 @@ public final class Application {
//全局定时任务管理器 //全局定时任务管理器
private final ScheduleManagerService scheduleManager; private final ScheduleManagerService scheduleManager;
//全局缓存管理器
private final CacheManagerService cacheManager;
//服务配置项 //服务配置项
final AnyValue config; final AnyValue config;
@@ -625,9 +631,12 @@ public final class Application {
} }
} }
{ //设置定时管理 { //设置定时管理
this.scheduleManager = ScheduleManagerService.create(this::getPropertyValue).enabled(!isCompileMode()); this.scheduleManager = ScheduleManagerService.create(null).enabled(false);
this.resourceFactory.register("", this.scheduleManager); }
{ //设置缓存管理器
this.cacheManager = CacheManagerService.create(null).enabled(false);
} }
{ //加载原生sql解析器 { //加载原生sql解析器
@@ -1212,6 +1221,25 @@ public final class Application {
} }
logger.info("MessageAgent init in " + (System.currentTimeMillis() - s) + " ms"); logger.info("MessageAgent init in " + (System.currentTimeMillis() - s) + " ms");
} }
{ //设置定时管理器
final AnyValue scheduleConf = config.getAnyValue("scheduling", true);
this.resourceFactory.inject(this.scheduleManager);
if (!isCompileMode()) {
this.scheduleManager.init(scheduleConf);
}
this.resourceFactory.register("", this.scheduleManager);
}
{ //设置缓存管理器
final AnyValue cacheConf = config.getAnyValue("caching");
this.resourceFactory.inject(this.cacheManager);
if (!isCompileMode() && cacheConf != null) {
this.cacheManager.init(cacheConf);
}
this.resourceFactory.register("", this.cacheManager);
}
//------------------------------------ 注册 ResourceProducer MessageProducer ------------------------------------ //------------------------------------ 注册 ResourceProducer MessageProducer ------------------------------------
resourceFactory.register(new ResourceAnnotationProvider<ResourceProducer>() { resourceFactory.register(new ResourceAnnotationProvider<ResourceProducer>() {
@Override @Override
@@ -1316,7 +1344,7 @@ public final class Application {
return null; return null;
} }
CacheSource loadCacheSource(final String sourceName, boolean autoMemory) { public CacheSource loadCacheSource(final String sourceName, boolean autoMemory) {
cacheSourceLock.lock(); cacheSourceLock.lock();
try { try {
long st = System.currentTimeMillis(); long st = System.currentTimeMillis();
@@ -2633,7 +2661,12 @@ public final class Application {
if (this.workExecutor != null) { if (this.workExecutor != null) {
this.workExecutor.shutdownNow(); this.workExecutor.shutdownNow();
} }
this.scheduleManager.destroy(); if (!isCompileMode()) {
this.scheduleManager.destroy(this.scheduleManager.getConfig());
}
if (!isCompileMode()) {
this.cacheManager.destroy(this.cacheManager.getConfig());
}
long intms = System.currentTimeMillis() - f; long intms = System.currentTimeMillis() - f;
String ms = String.valueOf(intms); String ms = String.valueOf(intms);
@@ -2747,10 +2780,14 @@ public final class Application {
return clusterAgent; return clusterAgent;
} }
public ScheduleManagerService getScheduleManager() { public ScheduleManager getScheduleManager() {
return this.scheduleManager; return this.scheduleManager;
} }
public CacheManager getCacheManager() {
return this.cacheManager;
}
public MessageAgent getMessageAgent(String name) { public MessageAgent getMessageAgent(String name) {
if (messageAgents == null) { if (messageAgents == null) {
return null; return null;

View File

@@ -11,14 +11,18 @@ import java.util.concurrent.ConcurrentSkipListSet;
import org.redkale.annotation.AutoLoad; import org.redkale.annotation.AutoLoad;
import org.redkale.annotation.Component; import org.redkale.annotation.Component;
import org.redkale.annotation.Nullable; import org.redkale.annotation.Nullable;
import org.redkale.annotation.Resource;
import org.redkale.annotation.ResourceType; import org.redkale.annotation.ResourceType;
import org.redkale.boot.Application;
import org.redkale.cache.CacheManager; import org.redkale.cache.CacheManager;
import org.redkale.service.Local; import org.redkale.service.Local;
import org.redkale.service.Service; import org.redkale.service.Service;
import org.redkale.source.CacheMemorySource; import org.redkale.source.CacheMemorySource;
import org.redkale.source.CacheSource; import org.redkale.source.CacheSource;
import org.redkale.util.AnyValue; import org.redkale.util.AnyValue;
import org.redkale.util.RedkaleException;
import org.redkale.util.TypeToken; import org.redkale.util.TypeToken;
import org.redkale.util.Utility;
/** /**
* 缓存管理器 * 缓存管理器
@@ -34,6 +38,9 @@ public class CacheManagerService implements CacheManager, Service {
//缓存配置项 //缓存配置项
protected boolean enabled = true; protected boolean enabled = true;
//配置
protected AnyValue config;
//数据类型与CacheValue泛型的对应关系 //数据类型与CacheValue泛型的对应关系
private final ConcurrentHashMap<Type, Type> cacheValueTypes = new ConcurrentHashMap<>(); private final ConcurrentHashMap<Type, Type> cacheValueTypes = new ConcurrentHashMap<>();
@@ -43,6 +50,9 @@ public class CacheManagerService implements CacheManager, Service {
//缓存hash集合, 用于定时遍历删除过期数据 //缓存hash集合, 用于定时遍历删除过期数据
protected final ConcurrentSkipListSet<String> hashNames = new ConcurrentSkipListSet<>(); protected final ConcurrentSkipListSet<String> hashNames = new ConcurrentSkipListSet<>();
@Resource(required = false)
protected Application application;
//远程缓存Source //远程缓存Source
protected CacheSource remoteSource; protected CacheSource remoteSource;
@@ -50,18 +60,41 @@ public class CacheManagerService implements CacheManager, Service {
this.remoteSource = remoteSource; this.remoteSource = remoteSource;
} }
//一般用于独立组件
public static CacheManagerService create(@Nullable CacheSource remoteSource) { public static CacheManagerService create(@Nullable CacheSource remoteSource) {
return new CacheManagerService(remoteSource); return new CacheManagerService(remoteSource);
} }
public boolean enabled() {
return this.enabled;
}
public CacheManagerService enabled(boolean val) {
this.enabled = val;
return this;
}
public AnyValue getConfig() {
return config;
}
@Override @Override
public void init(AnyValue conf) { public void init(AnyValue conf) {
this.config = conf;
if (conf == null) { if (conf == null) {
conf = AnyValue.create(); conf = AnyValue.create();
} }
this.enabled = conf.getBoolValue("enabled", true); this.enabled = conf.getBoolValue("enabled", true);
if (this.enabled) { if (this.enabled) {
this.localSource.init(conf); this.localSource.init(conf);
String remoteSourceName = conf.getValue("remoteSource");
if (remoteSource == null && application != null && Utility.isNotBlank(remoteSourceName)) {
CacheSource source = application.loadCacheSource(remoteSourceName, false);
if (source == null) {
throw new RedkaleException("Not found CacheSource '" + remoteSourceName + "'");
}
this.remoteSource = source;
}
} }
} }

View File

@@ -28,11 +28,14 @@ import java.util.logging.Logger;
import org.redkale.annotation.AutoLoad; import org.redkale.annotation.AutoLoad;
import org.redkale.annotation.Component; import org.redkale.annotation.Component;
import org.redkale.annotation.Nullable; import org.redkale.annotation.Nullable;
import org.redkale.annotation.Resource;
import org.redkale.annotation.ResourceType; import org.redkale.annotation.ResourceType;
import org.redkale.boot.Application;
import org.redkale.schedule.ScheduleManager; import org.redkale.schedule.ScheduleManager;
import org.redkale.schedule.Scheduling; import org.redkale.schedule.Scheduling;
import org.redkale.service.Local; import org.redkale.service.Local;
import org.redkale.service.Service; import org.redkale.service.Service;
import org.redkale.util.AnyValue;
import org.redkale.util.RedkaleClassLoader; import org.redkale.util.RedkaleClassLoader;
import org.redkale.util.RedkaleException; import org.redkale.util.RedkaleException;
import org.redkale.util.Utility; import org.redkale.util.Utility;
@@ -58,28 +61,64 @@ public class ScheduleManagerService implements ScheduleManager, Service {
private final ReentrantLock lock = new ReentrantLock(); private final ReentrantLock lock = new ReentrantLock();
@Nullable @Resource(required = false)
private final UnaryOperator<String> propertyFunc; protected Application application;
private final ScheduledThreadPoolExecutor scheduler; @Nullable
private UnaryOperator<String> propertyFunc;
private ScheduledThreadPoolExecutor scheduler;
private boolean enabled = true; private boolean enabled = true;
private AnyValue config;
protected ScheduleManagerService(UnaryOperator<String> propertyFunc) { protected ScheduleManagerService(UnaryOperator<String> propertyFunc) {
this.propertyFunc = propertyFunc; this.propertyFunc = propertyFunc;
this.scheduler = new ScheduledThreadPoolExecutor(Utility.cpus(), Utility.newThreadFactory("Scheduled-Task-Thread-%s"));
this.scheduler.setRemoveOnCancelPolicy(true);
} }
//一般用于独立组件
public static ScheduleManagerService create(UnaryOperator<String> propertyFunc) { public static ScheduleManagerService create(UnaryOperator<String> propertyFunc) {
return new ScheduleManagerService(propertyFunc); return new ScheduleManagerService(propertyFunc);
} }
public boolean enabled() {
return this.enabled;
}
public ScheduleManagerService enabled(boolean val) { public ScheduleManagerService enabled(boolean val) {
this.enabled = val; this.enabled = val;
return this; return this;
} }
public AnyValue getConfig() {
return config;
}
@Override
public void init(AnyValue conf) {
this.config = conf;
if (conf == null) {
conf = AnyValue.create();
}
this.enabled = conf.getBoolValue("enabled", true);
if (this.enabled) {
if (this.propertyFunc == null && application != null) {
UnaryOperator<String> func = application::getPropertyValue;
this.propertyFunc = func;
}
this.scheduler = new ScheduledThreadPoolExecutor(Utility.cpus(), Utility.newThreadFactory("Scheduled-Task-Thread-%s"));
this.scheduler.setRemoveOnCancelPolicy(true);
}
}
@Override
public void destroy(AnyValue conf) {
if (scheduler != null) {
scheduler.shutdown();
}
}
@Override @Override
public void unschedule(Object service) { public void unschedule(Object service) {
lock.lock(); lock.lock();
@@ -179,9 +218,6 @@ public class ScheduleManagerService implements ScheduleManager, Service {
try { try {
Object obj = ref.get(); Object obj = ref.get();
if (obj != null) { if (obj != null) {
// if (logger.isLoggable(Level.FINEST)) {
// logger.log(Level.FINEST, "schedule task " + method.getDeclaringClass().getSimpleName() + "." + method.getName());
// }
mh.invoke(obj); mh.invoke(obj);
} }
} catch (Throwable t) { } catch (Throwable t) {
@@ -240,12 +276,6 @@ public class ScheduleManagerService implements ScheduleManager, Service {
} }
} }
public void destroy() {
if (scheduler != null) {
scheduler.shutdown();
}
}
protected abstract class ScheduledTask implements Runnable { protected abstract class ScheduledTask implements Runnable {
protected final WeakReference ref; protected final WeakReference ref;

View File

@@ -401,10 +401,10 @@ public final class ResourceFactory {
} }
} }
if (rt == null) { if (rt == null) {
return (A) register(autoSync, name, claz, val); return register(autoSync, name, claz, val);
} else { } else {
A old = null; A old = null;
A t = (A) register(autoSync, name, rt, val); A t = register(autoSync, name, rt, val);
if (t != null) { if (t != null) {
old = t; old = t;
} }
@@ -744,7 +744,7 @@ public final class ResourceFactory {
return null; return null;
} }
public <T> boolean inject(final Object srcObj) { public boolean inject(final Object srcObj) {
return inject(srcObj, null); return inject(srcObj, null);
} }
@@ -752,7 +752,7 @@ public final class ResourceFactory {
return inject(srcObj, attachment, null); return inject(srcObj, attachment, null);
} }
public <T> boolean inject(final Object srcObj, final BiConsumer<Object, Field> consumer) { public boolean inject(final Object srcObj, final BiConsumer<Object, Field> consumer) {
return inject(srcObj, null, consumer); return inject(srcObj, null, consumer);
} }
@@ -760,7 +760,7 @@ public final class ResourceFactory {
return inject(null, srcObj, attachment, consumer, new ArrayList()); return inject(null, srcObj, attachment, consumer, new ArrayList());
} }
public <T> boolean inject(final String srcResourceName, final Object srcObj) { public boolean inject(final String srcResourceName, final Object srcObj) {
return inject(srcResourceName, srcObj, null); return inject(srcResourceName, srcObj, null);
} }
@@ -768,7 +768,7 @@ public final class ResourceFactory {
return inject(srcResourceName, srcObj, attachment, null); return inject(srcResourceName, srcObj, attachment, null);
} }
public <T> boolean inject(final String srcResourceName, final Object srcObj, final BiConsumer<Object, Field> consumer) { public boolean inject(final String srcResourceName, final Object srcObj, final BiConsumer<Object, Field> consumer) {
return inject(srcResourceName, srcObj, null, consumer); return inject(srcResourceName, srcObj, null, consumer);
} }

View File

@@ -26,24 +26,24 @@ public class CachingTest {
public void run() throws Exception { public void run() throws Exception {
CacheMemorySource remoteSource = new CacheMemorySource("remote"); CacheMemorySource remoteSource = new CacheMemorySource("remote");
remoteSource.init(null); remoteSource.init(null);
CacheManagerService cache = CacheManagerService.create(remoteSource); CacheManagerService manager = CacheManagerService.create(remoteSource);
cache.init(null); manager.init(null);
Duration expire = Duration.ofMillis(490); Duration expire = Duration.ofMillis(490);
cache.localSetString("user", "name:haha", "myha", expire); manager.localSetString("user", "name:haha", "myha", expire);
Assertions.assertEquals(cache.localGetString("user", "name:haha"), "myha"); Assertions.assertEquals(manager.localGetString("user", "name:haha"), "myha");
Utility.sleep(500); Utility.sleep(500);
Assertions.assertTrue(cache.localGetString("user", "name:haha") == null); Assertions.assertTrue(manager.localGetString("user", "name:haha") == null);
CachingBean bean = new CachingBean(); CachingBean bean = new CachingBean();
bean.setName("tom"); bean.setName("tom");
bean.setRemark("这是名字备注"); bean.setRemark("这是名字备注");
String json = bean.toString(); String json = bean.toString();
cache.localSet("user", bean.getName(), CachingBean.class, bean, expire); manager.localSet("user", bean.getName(), CachingBean.class, bean, expire);
Assertions.assertEquals(cache.localGet("user", bean.getName(), CachingBean.class).toString(), json); Assertions.assertEquals(manager.localGet("user", bean.getName(), CachingBean.class).toString(), json);
bean.setRemark(bean.getRemark() + "-新备注"); bean.setRemark(bean.getRemark() + "-新备注");
Assertions.assertEquals(cache.localGet("user", bean.getName(), CachingBean.class).toString(), json); Assertions.assertEquals(manager.localGet("user", bean.getName(), CachingBean.class).toString(), json);
cache.destroy(null); manager.destroy(null);
} }
public static class CachingBean { public static class CachingBean {

View File

@@ -11,21 +11,22 @@ import org.redkale.util.Utility;
* *
* @author zhangjx * @author zhangjx
*/ */
public class ScheduleTest { public class SchedulingTest {
public static void main(String[] args) throws Throwable { public static void main(String[] args) throws Throwable {
ScheduleTest test = new ScheduleTest(); SchedulingTest test = new SchedulingTest();
test.run(); test.run();
} }
@Test @Test
public void run() throws Exception { public void run() throws Exception {
ScheduleManagerService factory = ScheduleManagerService.create(null); ScheduleManagerService manager = ScheduleManagerService.create(null);
manager.init(null);
ScheduleService service = new ScheduleService(); ScheduleService service = new ScheduleService();
factory.schedule(service); manager.schedule(service);
Utility.sleep(3000); Utility.sleep(3000);
factory.unschedule(service); manager.unschedule(service);
factory.destroy(); manager.destroy(null);
} }
} }