ScheduleEngine

This commit is contained in:
redkale
2023-12-11 18:24:15 +08:00
parent 8780365e17
commit d6b9402ce0
9 changed files with 100 additions and 74 deletions

View File

@@ -31,7 +31,7 @@ import org.redkale.mq.*;
import org.redkale.net.*; import org.redkale.net.*;
import org.redkale.net.http.*; import org.redkale.net.http.*;
import org.redkale.net.sncp.*; import org.redkale.net.sncp.*;
import org.redkale.scheduling.ScheduleFactory; import org.redkale.scheduling.ScheduleEngine;
import org.redkale.service.Service; import org.redkale.service.Service;
import org.redkale.source.*; import org.redkale.source.*;
import org.redkale.util.*; import org.redkale.util.*;
@@ -208,8 +208,8 @@ public final class Application {
//全局根ResourceFactory //全局根ResourceFactory
final ResourceFactory resourceFactory = ResourceFactory.create(); final ResourceFactory resourceFactory = ResourceFactory.create();
//全局ScheduledFactory //全局定时任务管理器
private final ScheduleFactory scheduledFactory; private final ScheduleEngine scheduleEngine;
//服务配置项 //服务配置项
final AnyValue config; final AnyValue config;
@@ -625,8 +625,9 @@ public final class Application {
} }
} }
{ //设置ScheduledFactory { //设置定时管理
this.scheduledFactory = ScheduleFactory.create(this::getPropertyValue).disable(isCompileMode()); this.scheduleEngine = ScheduleEngine.create(this::getPropertyValue).enabled(!isCompileMode());
this.resourceFactory.register("", this.scheduleEngine);
} }
{ //加载原生sql解析器 { //加载原生sql解析器
@@ -2162,11 +2163,11 @@ public final class Application {
} }
public void schedule(Object service) { public void schedule(Object service) {
this.scheduledFactory.schedule(service); this.scheduleEngine.schedule(service);
} }
public void unschedule(Object service) { public void unschedule(Object service) {
this.scheduledFactory.unschedule(service); this.scheduleEngine.unschedule(service);
} }
void updateEnvironmentProperties(String namespace, List<ResourceEvent> events) { void updateEnvironmentProperties(String namespace, List<ResourceEvent> events) {
@@ -2632,7 +2633,7 @@ public final class Application {
if (this.workExecutor != null) { if (this.workExecutor != null) {
this.workExecutor.shutdownNow(); this.workExecutor.shutdownNow();
} }
this.scheduledFactory.destroy(); this.scheduleEngine.destroy();
long intms = System.currentTimeMillis() - f; long intms = System.currentTimeMillis() - f;
String ms = String.valueOf(intms); String ms = String.valueOf(intms);
@@ -2746,6 +2747,10 @@ public final class Application {
return clusterAgent; return clusterAgent;
} }
public ScheduleEngine getScheduleEngine() {
return this.scheduleEngine;
}
public MessageAgent getMessageAgent(String name) { public MessageAgent getMessageAgent(String name) {
if (messageAgents == null) { if (messageAgents == null) {
return null; return null;

View File

@@ -1,35 +0,0 @@
/*
*
*/
package org.redkale.caching;
import org.redkale.convert.json.JsonConvert;
/**
*
* 缓存配置
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.8.0
*/
public class CacheConfig {
private boolean enabled = true;
public boolean isEnabled() {
return enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
public String toString() {
return JsonConvert.root().convertTo(this);
}
}

View File

@@ -5,13 +5,11 @@ package org.redkale.caching;
import java.lang.reflect.Type; import java.lang.reflect.Type;
import java.time.Duration; import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ConcurrentSkipListSet;
import org.redkale.annotation.AutoLoad; import org.redkale.annotation.AutoLoad;
import org.redkale.annotation.Component; import org.redkale.annotation.Component;
import org.redkale.annotation.Nonnull;
import org.redkale.annotation.Nullable; import org.redkale.annotation.Nullable;
import org.redkale.annotation.ResourceType; import org.redkale.annotation.ResourceType;
import org.redkale.service.Local; import org.redkale.service.Local;
@@ -22,6 +20,7 @@ import org.redkale.util.AnyValue;
import org.redkale.util.TypeToken; import org.redkale.util.TypeToken;
/** /**
* 缓存管理器
* *
* @author zhangjx * @author zhangjx
*/ */
@@ -29,10 +28,10 @@ import org.redkale.util.TypeToken;
@Component @Component
@AutoLoad(false) @AutoLoad(false)
@ResourceType(CacheManager.class) @ResourceType(CacheManager.class)
public class CacheManagerService implements CacheManager, Service { public class CacheEngine implements CacheManager, Service {
//缓存配置项 //缓存配置项
protected final CacheConfig config; protected boolean enabled = true;
//数据类型与CacheValue泛型的对应关系 //数据类型与CacheValue泛型的对应关系
private final ConcurrentHashMap<Type, Type> cacheValueTypes = new ConcurrentHashMap<>(); private final ConcurrentHashMap<Type, Type> cacheValueTypes = new ConcurrentHashMap<>();
@@ -46,26 +45,38 @@ public class CacheManagerService implements CacheManager, Service {
//远程缓存Source //远程缓存Source
protected CacheSource remoteSource; protected CacheSource remoteSource;
protected CacheManagerService(@Nonnull CacheConfig config, @Nullable CacheSource remoteSource) { protected CacheEngine(@Nullable CacheSource remoteSource) {
this.config = Objects.requireNonNull(config);
this.remoteSource = remoteSource; this.remoteSource = remoteSource;
} }
public static CacheManagerService create(@Nonnull CacheConfig config, @Nullable CacheSource remoteSource) { public static CacheEngine create(@Nullable CacheSource remoteSource) {
return new CacheManagerService(config, remoteSource); return new CacheEngine(remoteSource);
} }
@Override @Override
public void init(AnyValue conf) { public void init(AnyValue conf) {
this.localSource.init(conf); if (conf == null) {
conf = AnyValue.create();
}
this.enabled = conf.getBoolValue("enabled", true);
if (this.enabled) {
this.localSource.init(conf);
}
} }
@Override @Override
public void destroy(AnyValue conf) { public void destroy(AnyValue conf) {
this.localSource.destroy(conf); if (this.enabled) {
this.localSource.destroy(conf);
}
} }
public CacheManagerService addHash(String hash) { public boolean isEnabled() {
return enabled;
}
public CacheEngine addHash(String hash) {
this.hashNames.add(hash); this.hashNames.add(hash);
return this; return this;
} }
@@ -223,7 +234,7 @@ public class CacheManagerService implements CacheManager, Service {
public <T> T bothGet(final String hash, final String key, final Type type) { public <T> T bothGet(final String hash, final String key, final Type type) {
Type t = loadCacheType(type); Type t = loadCacheType(type);
CacheValue<T> val = localSource.hget(hash, key, t); CacheValue<T> val = localSource.hget(hash, key, t);
if (val != null && !val.isExpired()) { if (CacheValue.isValid(val)) {
return val.getValue(); return val.getValue();
} }
if (remoteSource != null) { if (remoteSource != null) {
@@ -246,7 +257,7 @@ public class CacheManagerService implements CacheManager, Service {
public <T> CompletableFuture<T> bothGetAsync(final String hash, final String key, final Type type) { public <T> CompletableFuture<T> bothGetAsync(final String hash, final String key, final Type type) {
Type t = loadCacheType(type); Type t = loadCacheType(type);
CacheValue<T> val = localSource.hget(hash, key, t); CacheValue<T> val = localSource.hget(hash, key, t);
if (val != null && !val.isExpired()) { if (CacheValue.isValid(val)) {
return CompletableFuture.completedFuture(val.getValue()); return CompletableFuture.completedFuture(val.getValue());
} }
if (remoteSource != null) { if (remoteSource != null) {

View File

@@ -8,7 +8,7 @@ import java.time.Duration;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
/** /**
* //TODO 待实现 * 缓存管理器
* *
* <p> * <p>
* 详情见: https://redkale.org * 详情见: https://redkale.org

View File

@@ -9,7 +9,7 @@ import org.redkale.convert.json.JsonConvert;
/** /**
* *
* 缓存对象 * 内部缓存对象
* *
* <p> * <p>
* 详情见: https://redkale.org * 详情见: https://redkale.org
@@ -36,8 +36,12 @@ public class CacheValue<T> extends CacheExpire {
return new CacheValue(value, expire); return new CacheValue(value, expire);
} }
public static boolean isValid(CacheValue val) {
return val != null && !val.isExpired();
}
public static <T> T get(CacheValue val) { public static <T> T get(CacheValue val) {
return val != null && !val.isExpired() ? (T) val.getValue() : null; return isValid(val) ? (T) val.getValue() : null;
} }
public T getValue() { public T getValue() {

View File

@@ -25,14 +25,19 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.function.UnaryOperator; import java.util.function.UnaryOperator;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import org.redkale.annotation.AutoLoad;
import org.redkale.annotation.Component;
import org.redkale.annotation.Nullable; import org.redkale.annotation.Nullable;
import org.redkale.annotation.ResourceType;
import org.redkale.annotation.Scheduling;
import org.redkale.service.Local;
import org.redkale.service.Service;
import org.redkale.util.RedkaleClassLoader; import org.redkale.util.RedkaleClassLoader;
import org.redkale.util.RedkaleException; import org.redkale.util.RedkaleException;
import org.redkale.util.Utility; import org.redkale.util.Utility;
import org.redkale.annotation.Scheduling;
/** /**
* 定时任务工厂 * 定时任务管理器
* *
* <p> * <p>
* 详情见: https://redkale.org * 详情见: https://redkale.org
@@ -40,7 +45,11 @@ import org.redkale.annotation.Scheduling;
* @author zhangjx * @author zhangjx
* @since 2.8.0 * @since 2.8.0
*/ */
public class ScheduleFactory { @Local
@Component
@AutoLoad(false)
@ResourceType(ScheduleManager.class)
public class ScheduleEngine implements ScheduleManager, Service {
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
@@ -53,23 +62,24 @@ public class ScheduleFactory {
private final ScheduledThreadPoolExecutor scheduler; private final ScheduledThreadPoolExecutor scheduler;
private boolean disable; private boolean enabled = true;
protected ScheduleFactory(UnaryOperator<String> propertyFunc) { protected ScheduleEngine(UnaryOperator<String> propertyFunc) {
this.propertyFunc = propertyFunc; this.propertyFunc = propertyFunc;
this.scheduler = new ScheduledThreadPoolExecutor(Utility.cpus(), Utility.newThreadFactory("Scheduled-Task-Thread-%s")); this.scheduler = new ScheduledThreadPoolExecutor(Utility.cpus(), Utility.newThreadFactory("Scheduled-Task-Thread-%s"));
this.scheduler.setRemoveOnCancelPolicy(true); this.scheduler.setRemoveOnCancelPolicy(true);
} }
public static ScheduleFactory create(UnaryOperator<String> propertyFunc) { public static ScheduleEngine create(UnaryOperator<String> propertyFunc) {
return new ScheduleFactory(propertyFunc); return new ScheduleEngine(propertyFunc);
} }
public ScheduleFactory disable(boolean val) { public ScheduleEngine enabled(boolean val) {
this.disable = val; this.enabled = val;
return this; return this;
} }
@Override
public void unschedule(Object service) { public void unschedule(Object service) {
lock.lock(); lock.lock();
try { try {
@@ -92,6 +102,7 @@ public class ScheduleFactory {
} }
} }
@Override
public void schedule(Object service) { public void schedule(Object service) {
lock.lock(); lock.lock();
try { try {
@@ -124,7 +135,7 @@ public class ScheduleFactory {
} }
} while ((clazz = clazz.getSuperclass()) != Object.class); } while ((clazz = clazz.getSuperclass()) != Object.class);
//开始执行定时任务 //开始执行定时任务
if (!disable && !tasks.isEmpty()) { if (enabled && !tasks.isEmpty()) {
tasks.forEach((name, task) -> task.start()); tasks.forEach((name, task) -> task.start());
refTaskMap.put(ref, new ArrayList<>(tasks.values())); refTaskMap.put(ref, new ArrayList<>(tasks.values()));
} }

View File

@@ -0,0 +1,31 @@
/*
*
*/
package org.redkale.scheduling;
/**
* 定时管理器
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.8.0
*/
public interface ScheduleManager {
/**
* 开启宿主对象中所有的定时任务方法
*
* @param service 宿主对象
*/
public void schedule(Object service);
/**
* 关闭宿主对象中所有的定时任务方法
*
* @param service 宿主对象
*/
public void unschedule(Object service);
}

View File

@@ -6,8 +6,7 @@ package org.redkale.test.caching;
import java.time.Duration; import java.time.Duration;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.redkale.caching.CacheConfig; import org.redkale.caching.CacheEngine;
import org.redkale.caching.CacheManagerService;
import org.redkale.convert.json.JsonConvert; import org.redkale.convert.json.JsonConvert;
import org.redkale.source.CacheMemorySource; import org.redkale.source.CacheMemorySource;
import org.redkale.util.Utility; import org.redkale.util.Utility;
@@ -27,7 +26,7 @@ public class CachingTest {
public void run() throws Exception { public void run() throws Exception {
CacheMemorySource remoteSource = new CacheMemorySource("remote"); CacheMemorySource remoteSource = new CacheMemorySource("remote");
remoteSource.init(null); remoteSource.init(null);
CacheManagerService cache = CacheManagerService.create(new CacheConfig(), remoteSource); CacheEngine cache = CacheEngine.create(remoteSource);
cache.init(null); cache.init(null);
Duration expire = Duration.ofMillis(490); Duration expire = Duration.ofMillis(490);
cache.localSetString("user", "name:haha", "myha", expire); cache.localSetString("user", "name:haha", "myha", expire);

View File

@@ -4,7 +4,7 @@
package org.redkale.test.scheduling; package org.redkale.test.scheduling;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.redkale.scheduling.ScheduleFactory; import org.redkale.scheduling.ScheduleEngine;
import org.redkale.util.Utility; import org.redkale.util.Utility;
/** /**
@@ -20,7 +20,7 @@ public class ScheduleTest {
@Test @Test
public void run() throws Exception { public void run() throws Exception {
ScheduleFactory factory = ScheduleFactory.create(null); ScheduleEngine factory = ScheduleEngine.create(null);
ScheduleService service = new ScheduleService(); ScheduleService service = new ScheduleService();
factory.schedule(service); factory.schedule(service);
Utility.sleep(3000); Utility.sleep(3000);