增加定时任务功能

This commit is contained in:
redkale
2023-12-05 16:59:13 +08:00
parent dbd4a79589
commit a0da13bc4f
8 changed files with 384 additions and 6 deletions

View File

@@ -29,6 +29,7 @@ module org.redkale {
exports org.redkale.net.http;
exports org.redkale.net.sncp;
exports org.redkale.persistence;
exports org.redkale.scheduling;
exports org.redkale.service;
exports org.redkale.source;
exports org.redkale.util;

View File

@@ -20,16 +20,57 @@ import java.util.concurrent.TimeUnit;
@Retention(RetentionPolicy.RUNTIME)
public @interface Scheduled {
/**
* 名称
*
* @return 名称
*/
String name() default "";
/**
* cron表达式
*
* @return cron表达式
*/
String cron() default "";
/**
* 时区
*
* @return 时区
*/
String zone() default "";
/**
*
* @return deplay时间
*/
String fixedDelay() default "-1";
/**
*
* @return 周期时间
*/
String fixedRate() default "-1";
/**
*
* @return 起始deplay时间
*/
String initialDelay() default "-1";
TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
/**
* 时间单元
*
* @return 时间单元
*/
TimeUnit timeUnit() default TimeUnit.SECONDS;
/**
* 备注
*
* @return 备注
*/
String comment() default "";
}

View File

@@ -30,6 +30,7 @@ import org.redkale.mq.*;
import org.redkale.net.*;
import org.redkale.net.http.*;
import org.redkale.net.sncp.*;
import org.redkale.scheduling.ScheduledFactory;
import org.redkale.service.Service;
import org.redkale.source.*;
import org.redkale.util.*;
@@ -205,6 +206,9 @@ public final class Application {
//全局根ResourceFactory
final ResourceFactory resourceFactory = ResourceFactory.create();
//全局ScheduledFactory
private final ScheduledFactory scheduledFactory;
//服务配置项
final AnyValue config;
@@ -627,6 +631,10 @@ public final class Application {
}
}
{ //设置ScheduledFactory
this.scheduledFactory = ScheduledFactory.create(this::getPropertyValue).disable(isCompileMode());
}
{ //加载原生sql解析器
Iterator<DataNativeSqlParserProvider> it = ServiceLoader.load(DataNativeSqlParserProvider.class, classLoader).iterator();
RedkaleClassLoader.putServiceLoader(DataNativeSqlParserProvider.class);
@@ -2159,6 +2167,14 @@ public final class Application {
return val;
}
public void schedule(Object service) {
this.scheduledFactory.schedule(service);
}
public void unschedule(Object service) {
this.scheduledFactory.unschedule(service);
}
void updateEnvironmentProperties(String namespace, List<ResourceEvent> events) {
if (events == null || events.isEmpty()) {
return;
@@ -2622,6 +2638,8 @@ public final class Application {
if (this.workExecutor != null) {
this.workExecutor.shutdownNow();
}
this.scheduledFactory.destroy();
long intms = System.currentTimeMillis() - f;
String ms = String.valueOf(intms);
int repeat = ms.length() > 7 ? 0 : (7 - ms.length()) / 2;

View File

@@ -612,6 +612,7 @@ public abstract class NodeServer {
localServices.stream().forEach(y -> {
long s = System.currentTimeMillis();
y.init(Sncp.getResourceConf(y));
application.schedule(y);
long e = System.currentTimeMillis() - s;
if (slist != null) {
String serstr = Sncp.toSimpleString(y, maxNameLength, maxTypeLength);
@@ -876,6 +877,7 @@ public abstract class NodeServer {
if (finest) {
logger.finest(Sncp.getResourceType(y) + " is destroying");
}
application.unschedule(y);
y.destroy(Sncp.getResourceConf(y));
if (finest) {
logger.finest(Sncp.getResourceType(y) + " was destroyed");

View File

@@ -135,9 +135,10 @@ public abstract class Sncp {
}
if (method.getAnnotation(Scheduled.class) != null) {
if (Modifier.isStatic(method.getModifiers())
|| !Modifier.isProtected(method.getModifiers()) || method.getParameterCount() > 0) {
|| method.getParameterCount() > 0) {
throw new SncpException(Scheduled.class.getSimpleName() + " must be on protected and non-parameter method, but on " + method);
}
RedkaleClassLoader.putReflectionMethod(serviceTypeOrImplClass.getName(), method);
continue;
}
if (Modifier.isStatic(method.getModifiers())) {

View File

@@ -1,12 +1,19 @@
/*
*
*/
package org.redkale.util;
package org.redkale.scheduling;
import java.time.DateTimeException;
import java.time.temporal.*;
import java.util.*;
import java.time.temporal.ChronoField;
import java.time.temporal.ChronoUnit;
import java.time.temporal.Temporal;
import java.time.temporal.ValueRange;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.redkale.annotation.Nullable;
import org.redkale.util.RedkaleException;
import org.redkale.util.Utility;
/**
* cron定时表达式解析器 <br> 代码复制于org.springframework.scheduling.support.CronExpression
@@ -28,7 +35,8 @@ public class CronExpression {
"@weekly", "0 0 0 * * 0",
"@daily", "0 0 0 * * *",
"@midnight", "0 0 0 * * *",
"@hourly", "0 0 * * * *"
"@hourly", "0 0 * * * *",
"@minutely", "0 0/1 * * * *"
};
private final CronField[] fields;

View File

@@ -0,0 +1,306 @@
/*
*
*/
package org.redkale.scheduling;
import java.lang.ref.WeakReference;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.UnaryOperator;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.redkale.annotation.Nullable;
import org.redkale.annotation.Scheduled;
import org.redkale.util.RedkaleClassLoader;
import org.redkale.util.RedkaleException;
import org.redkale.util.Utility;
/**
* 定时任务工厂
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
* @since 2.8.0
*/
public class ScheduledFactory {
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
private final ConcurrentHashMap<WeakReference, List<ScheduledTask>> refTaskMap = new ConcurrentHashMap<>();
private final ReentrantLock lock = new ReentrantLock();
@Nullable
private final UnaryOperator<String> propertyFunc;
private final ScheduledThreadPoolExecutor scheduler;
private boolean disable;
protected ScheduledFactory(UnaryOperator<String> propertyFunc) {
this.propertyFunc = propertyFunc;
this.scheduler = new ScheduledThreadPoolExecutor(Utility.cpus(), Utility.newThreadFactory("Scheduled-Task-Thread-%s"));
}
public static ScheduledFactory create(UnaryOperator<String> propertyFunc) {
return new ScheduledFactory(propertyFunc);
}
public ScheduledFactory disable(boolean val) {
this.disable = val;
return this;
}
public void unschedule(Object service) {
lock.lock();
try {
Map.Entry<WeakReference, List<ScheduledTask>> entry = null;
for (Map.Entry<WeakReference, List<ScheduledTask>> item : refTaskMap.entrySet()) {
if (item.getKey().get() == service) {
entry = item;
break;
}
}
if (entry == null) {
return;
}
refTaskMap.remove(entry.getKey());
for (ScheduledTask task : entry.getValue()) {
task.cancel();
}
} finally {
lock.unlock();
}
}
public void schedule(Object service) {
lock.lock();
try {
for (WeakReference item : refTaskMap.keySet()) {
if (item.get() == service) {
logger.log(Level.WARNING, service + " repeat schedule");
return;
}
}
Map<String, ScheduledTask> tasks = new LinkedHashMap<>();
Class clazz = service.getClass();
WeakReference ref = new WeakReference(service);
do {
for (final Method method : clazz.getDeclaredMethods()) {
if (method.getAnnotation(Scheduled.class) == null) {
continue;
}
if (tasks.containsKey(method.getName())) {
continue;
}
if (method.getParameterCount() > 0) {
throw new RedkaleException("@" + Scheduled.class.getSimpleName() + " must be on non-parameter method, but on " + method);
}
ScheduledTask task = schedule(ref, method);
if (task == null) {
continue; //时间都没配置
}
tasks.put(method.getName(), task);
RedkaleClassLoader.putReflectionMethod(clazz.getName(), method);
}
} while ((clazz = clazz.getSuperclass()) != Object.class);
//开始执行定时任务
if (!disable && !tasks.isEmpty()) {
tasks.forEach((name, task) -> task.start());
refTaskMap.put(ref, new ArrayList<>(tasks.values()));
}
} finally {
lock.unlock();
}
}
protected ScheduledTask schedule(WeakReference ref, Method method) {
Scheduled ann = method.getAnnotation(Scheduled.class);
String name = getProperty(ann.name());
String cron = getProperty(ann.cron());
String fixedDelay = getProperty(ann.fixedDelay());
String fixedRate = getProperty(ann.fixedRate());
String initialDelay = getProperty(ann.initialDelay());
String zone = getProperty(ann.zone());
TimeUnit timeUnit = ann.timeUnit();
if ((cron.isEmpty() || "-".equals(cron)) && "-1".equals(fixedRate) && "-1".endsWith(fixedDelay)) {
return null; //时间都没配置
}
ZoneId zoneId = Utility.isEmpty(zone) ? null : ZoneId.of(zone);
if (!cron.isEmpty() && !"-".equals(cron)) {
CronExpression cronExpr = CronExpression.parse(cron);
return new CronTask(ref, name, method, cronExpr, zoneId);
} else {
long fixedDelayLong = Long.parseLong(fixedDelay);
long fixedRateLong = Long.parseLong(fixedRate);
long initialDelayLong = Long.parseLong(initialDelay);
return new FixedTask(ref, name, method, fixedDelayLong, fixedRateLong, initialDelayLong, timeUnit);
}
}
protected Runnable createRunnable(final WeakReference ref, Method method) {
if (!Modifier.isPublic(method.getModifiers())) {
method.setAccessible(true);
}
return () -> {
try {
Object obj = ref.get();
if (obj != null) {
if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "schedule task " + method.getDeclaringClass().getSimpleName() + "." + method.getName());
}
method.invoke(obj);
}
} catch (Exception e) {
logger.log(Level.SEVERE, "schedule task error", e);
}
};
}
protected String getProperty(String value) {
if (propertyFunc == null || value.indexOf('}') < 0) {
return value;
}
return propertyFunc.apply(value);
}
public void destroy() {
if (scheduler != null) {
scheduler.shutdown();
}
}
protected abstract class ScheduledTask implements Runnable {
protected final WeakReference ref;
protected final String name;
protected final Method method;
protected ScheduledFuture future;
protected ScheduledTask(WeakReference ref, String name, Method method) {
Objects.requireNonNull(ref);
Objects.requireNonNull(name);
Objects.requireNonNull(method);
this.ref = ref;
this.name = name;
this.method = method;
}
public abstract void start();
public void cancel() {
if (future != null) {
future.cancel(true);
}
}
public Method method() {
return method;
}
public String name() {
return name;
}
}
protected class FixedTask extends ScheduledTask {
private final Runnable delegate;
private final long fixedDelay;
private final long fixedRate;
private final long initialDelay;
private final TimeUnit timeUnit;
public FixedTask(final WeakReference ref, String name, Method method, long fixedDelay, long fixedRate, long initialDelay, TimeUnit timeUnit) {
super(ref, name, method);
this.delegate = createRunnable(ref, method);
this.fixedDelay = fixedDelay;
this.fixedRate = fixedRate;
this.initialDelay = initialDelay;
this.timeUnit = timeUnit;
}
@Override
public void run() {
try {
delegate.run();
} catch (Throwable t) {
logger.log(Level.SEVERE, "schedule task error", t);
}
if (ref.get() == null) {
cancel();
}
}
@Override
public void start() {
if (fixedRate > 0) {
this.future = scheduler.scheduleAtFixedRate(this, initialDelay > 0 ? initialDelay : 0, fixedRate, timeUnit);
} else if (fixedDelay > 0) {
this.future = scheduler.scheduleWithFixedDelay(this, initialDelay, fixedDelay, timeUnit);
} else if (initialDelay > 0) {
this.future = scheduler.schedule(this, initialDelay, timeUnit);
}
}
}
protected class CronTask extends ScheduledTask {
private final Runnable delegate;
private final CronExpression cron;
@Nullable
private final ZoneId zoneId;
public CronTask(WeakReference ref, String name, Method method, CronExpression cron, ZoneId zoneId) {
super(ref, name, method);
this.delegate = createRunnable(ref, method);
this.cron = cron;
this.zoneId = zoneId;
}
@Override
public void run() {
try {
delegate.run();
} catch (Throwable t) {
logger.log(Level.SEVERE, "schedule task error", t);
}
start();
}
@Override
public void start() {
if (ref.get() == null) {
return;
}
LocalDateTime now = zoneId == null ? LocalDateTime.now() : LocalDateTime.now(zoneId);
LocalDateTime next = cron.next(now);
Duration delay = Duration.between(now, next);
this.future = scheduler.schedule(this, delay.toNanos(), TimeUnit.NANOSECONDS);
}
}
}

View File

@@ -59,6 +59,7 @@ public class RedkaleClassLoader extends URLClassLoader {
"org.redkale.net.http",
"org.redkale.net.sncp",
"org.redkale.persistence",
"org.redkale.scheduling",
"org.redkale.service",
"org.redkale.source",
"org.redkale.util",