From a0da13bc4f8ce4cffedb33bb31fb6f2785f8177b Mon Sep 17 00:00:00 2001 From: redkale Date: Tue, 5 Dec 2023 16:59:13 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=AE=9A=E6=97=B6=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/module-info.java | 1 + .../org/redkale/annotation/Scheduled.java | 43 ++- .../java/org/redkale/boot/Application.java | 18 ++ .../java/org/redkale/boot/NodeServer.java | 2 + src/main/java/org/redkale/net/sncp/Sncp.java | 3 +- .../{util => scheduling}/CronExpression.java | 16 +- .../redkale/scheduling/ScheduledFactory.java | 306 ++++++++++++++++++ .../org/redkale/util/RedkaleClassLoader.java | 1 + 8 files changed, 384 insertions(+), 6 deletions(-) rename src/main/java/org/redkale/{util => scheduling}/CronExpression.java (98%) create mode 100644 src/main/java/org/redkale/scheduling/ScheduledFactory.java diff --git a/src/main/java/module-info.java b/src/main/java/module-info.java index 861758890..2860eeaaf 100644 --- a/src/main/java/module-info.java +++ b/src/main/java/module-info.java @@ -29,6 +29,7 @@ module org.redkale { exports org.redkale.net.http; exports org.redkale.net.sncp; exports org.redkale.persistence; + exports org.redkale.scheduling; exports org.redkale.service; exports org.redkale.source; exports org.redkale.util; diff --git a/src/main/java/org/redkale/annotation/Scheduled.java b/src/main/java/org/redkale/annotation/Scheduled.java index 42b7e6b80..e2e3c793f 100644 --- a/src/main/java/org/redkale/annotation/Scheduled.java +++ b/src/main/java/org/redkale/annotation/Scheduled.java @@ -20,16 +20,57 @@ import java.util.concurrent.TimeUnit; @Retention(RetentionPolicy.RUNTIME) public @interface Scheduled { + /** + * 名称 + * + * @return 名称 + */ + String name() default ""; + + /** + * cron表达式 + * + * @return cron表达式 + */ String cron() default ""; + /** + * 时区 + * + * @return 时区 + */ String zone() default ""; + /** + * + * @return deplay时间 + */ String fixedDelay() default "-1"; + /** + * + * @return 周期时间 + */ String fixedRate() default "-1"; + /** + * + * @return 起始deplay时间 + */ String initialDelay() default "-1"; - TimeUnit timeUnit() default TimeUnit.MILLISECONDS; + /** + * 时间单元 + * + * @return 时间单元 + */ + TimeUnit timeUnit() default TimeUnit.SECONDS; + + /** + * 备注 + * + * @return 备注 + */ + String comment() default ""; } diff --git a/src/main/java/org/redkale/boot/Application.java b/src/main/java/org/redkale/boot/Application.java index 65fcc1667..7ba97955b 100644 --- a/src/main/java/org/redkale/boot/Application.java +++ b/src/main/java/org/redkale/boot/Application.java @@ -30,6 +30,7 @@ import org.redkale.mq.*; import org.redkale.net.*; import org.redkale.net.http.*; import org.redkale.net.sncp.*; +import org.redkale.scheduling.ScheduledFactory; import org.redkale.service.Service; import org.redkale.source.*; import org.redkale.util.*; @@ -205,6 +206,9 @@ public final class Application { //全局根ResourceFactory final ResourceFactory resourceFactory = ResourceFactory.create(); + //全局ScheduledFactory + private final ScheduledFactory scheduledFactory; + //服务配置项 final AnyValue config; @@ -627,6 +631,10 @@ public final class Application { } } + { //设置ScheduledFactory + this.scheduledFactory = ScheduledFactory.create(this::getPropertyValue).disable(isCompileMode()); + } + { //加载原生sql解析器 Iterator it = ServiceLoader.load(DataNativeSqlParserProvider.class, classLoader).iterator(); RedkaleClassLoader.putServiceLoader(DataNativeSqlParserProvider.class); @@ -2159,6 +2167,14 @@ public final class Application { return val; } + public void schedule(Object service) { + this.scheduledFactory.schedule(service); + } + + public void unschedule(Object service) { + this.scheduledFactory.unschedule(service); + } + void updateEnvironmentProperties(String namespace, List events) { if (events == null || events.isEmpty()) { return; @@ -2622,6 +2638,8 @@ public final class Application { if (this.workExecutor != null) { this.workExecutor.shutdownNow(); } + this.scheduledFactory.destroy(); + long intms = System.currentTimeMillis() - f; String ms = String.valueOf(intms); int repeat = ms.length() > 7 ? 0 : (7 - ms.length()) / 2; diff --git a/src/main/java/org/redkale/boot/NodeServer.java b/src/main/java/org/redkale/boot/NodeServer.java index 2487aa725..40f2c1649 100644 --- a/src/main/java/org/redkale/boot/NodeServer.java +++ b/src/main/java/org/redkale/boot/NodeServer.java @@ -612,6 +612,7 @@ public abstract class NodeServer { localServices.stream().forEach(y -> { long s = System.currentTimeMillis(); y.init(Sncp.getResourceConf(y)); + application.schedule(y); long e = System.currentTimeMillis() - s; if (slist != null) { String serstr = Sncp.toSimpleString(y, maxNameLength, maxTypeLength); @@ -876,6 +877,7 @@ public abstract class NodeServer { if (finest) { logger.finest(Sncp.getResourceType(y) + " is destroying"); } + application.unschedule(y); y.destroy(Sncp.getResourceConf(y)); if (finest) { logger.finest(Sncp.getResourceType(y) + " was destroyed"); diff --git a/src/main/java/org/redkale/net/sncp/Sncp.java b/src/main/java/org/redkale/net/sncp/Sncp.java index e0649c635..b31a43a56 100644 --- a/src/main/java/org/redkale/net/sncp/Sncp.java +++ b/src/main/java/org/redkale/net/sncp/Sncp.java @@ -135,9 +135,10 @@ public abstract class Sncp { } if (method.getAnnotation(Scheduled.class) != null) { if (Modifier.isStatic(method.getModifiers()) - || !Modifier.isProtected(method.getModifiers()) || method.getParameterCount() > 0) { + || method.getParameterCount() > 0) { throw new SncpException(Scheduled.class.getSimpleName() + " must be on protected and non-parameter method, but on " + method); } + RedkaleClassLoader.putReflectionMethod(serviceTypeOrImplClass.getName(), method); continue; } if (Modifier.isStatic(method.getModifiers())) { diff --git a/src/main/java/org/redkale/util/CronExpression.java b/src/main/java/org/redkale/scheduling/CronExpression.java similarity index 98% rename from src/main/java/org/redkale/util/CronExpression.java rename to src/main/java/org/redkale/scheduling/CronExpression.java index 0eb1befcd..e79cfaaac 100644 --- a/src/main/java/org/redkale/util/CronExpression.java +++ b/src/main/java/org/redkale/scheduling/CronExpression.java @@ -1,12 +1,19 @@ /* * */ -package org.redkale.util; +package org.redkale.scheduling; import java.time.DateTimeException; -import java.time.temporal.*; -import java.util.*; +import java.time.temporal.ChronoField; +import java.time.temporal.ChronoUnit; +import java.time.temporal.Temporal; +import java.time.temporal.ValueRange; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import org.redkale.annotation.Nullable; +import org.redkale.util.RedkaleException; +import org.redkale.util.Utility; /** * cron定时表达式解析器
代码复制于org.springframework.scheduling.support.CronExpression @@ -28,7 +35,8 @@ public class CronExpression { "@weekly", "0 0 0 * * 0", "@daily", "0 0 0 * * *", "@midnight", "0 0 0 * * *", - "@hourly", "0 0 * * * *" + "@hourly", "0 0 * * * *", + "@minutely", "0 0/1 * * * *" }; private final CronField[] fields; diff --git a/src/main/java/org/redkale/scheduling/ScheduledFactory.java b/src/main/java/org/redkale/scheduling/ScheduledFactory.java new file mode 100644 index 000000000..b514cf4e5 --- /dev/null +++ b/src/main/java/org/redkale/scheduling/ScheduledFactory.java @@ -0,0 +1,306 @@ +/* + * + */ +package org.redkale.scheduling; + +import java.lang.ref.WeakReference; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.UnaryOperator; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.redkale.annotation.Nullable; +import org.redkale.annotation.Scheduled; +import org.redkale.util.RedkaleClassLoader; +import org.redkale.util.RedkaleException; +import org.redkale.util.Utility; + +/** + * 定时任务工厂 + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * @since 2.8.0 + */ +public class ScheduledFactory { + + protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); + + private final ConcurrentHashMap> refTaskMap = new ConcurrentHashMap<>(); + + private final ReentrantLock lock = new ReentrantLock(); + + @Nullable + private final UnaryOperator propertyFunc; + + private final ScheduledThreadPoolExecutor scheduler; + + private boolean disable; + + protected ScheduledFactory(UnaryOperator propertyFunc) { + this.propertyFunc = propertyFunc; + this.scheduler = new ScheduledThreadPoolExecutor(Utility.cpus(), Utility.newThreadFactory("Scheduled-Task-Thread-%s")); + } + + public static ScheduledFactory create(UnaryOperator propertyFunc) { + return new ScheduledFactory(propertyFunc); + } + + public ScheduledFactory disable(boolean val) { + this.disable = val; + return this; + } + + public void unschedule(Object service) { + lock.lock(); + try { + Map.Entry> entry = null; + for (Map.Entry> item : refTaskMap.entrySet()) { + if (item.getKey().get() == service) { + entry = item; + break; + } + } + if (entry == null) { + return; + } + refTaskMap.remove(entry.getKey()); + for (ScheduledTask task : entry.getValue()) { + task.cancel(); + } + } finally { + lock.unlock(); + } + } + + public void schedule(Object service) { + lock.lock(); + try { + for (WeakReference item : refTaskMap.keySet()) { + if (item.get() == service) { + logger.log(Level.WARNING, service + " repeat schedule"); + return; + } + } + Map tasks = new LinkedHashMap<>(); + Class clazz = service.getClass(); + WeakReference ref = new WeakReference(service); + do { + for (final Method method : clazz.getDeclaredMethods()) { + if (method.getAnnotation(Scheduled.class) == null) { + continue; + } + if (tasks.containsKey(method.getName())) { + continue; + } + if (method.getParameterCount() > 0) { + throw new RedkaleException("@" + Scheduled.class.getSimpleName() + " must be on non-parameter method, but on " + method); + } + ScheduledTask task = schedule(ref, method); + if (task == null) { + continue; //时间都没配置 + } + tasks.put(method.getName(), task); + RedkaleClassLoader.putReflectionMethod(clazz.getName(), method); + } + } while ((clazz = clazz.getSuperclass()) != Object.class); + //开始执行定时任务 + if (!disable && !tasks.isEmpty()) { + tasks.forEach((name, task) -> task.start()); + refTaskMap.put(ref, new ArrayList<>(tasks.values())); + } + } finally { + lock.unlock(); + } + } + + protected ScheduledTask schedule(WeakReference ref, Method method) { + Scheduled ann = method.getAnnotation(Scheduled.class); + String name = getProperty(ann.name()); + String cron = getProperty(ann.cron()); + String fixedDelay = getProperty(ann.fixedDelay()); + String fixedRate = getProperty(ann.fixedRate()); + String initialDelay = getProperty(ann.initialDelay()); + String zone = getProperty(ann.zone()); + TimeUnit timeUnit = ann.timeUnit(); + if ((cron.isEmpty() || "-".equals(cron)) && "-1".equals(fixedRate) && "-1".endsWith(fixedDelay)) { + return null; //时间都没配置 + } + ZoneId zoneId = Utility.isEmpty(zone) ? null : ZoneId.of(zone); + if (!cron.isEmpty() && !"-".equals(cron)) { + CronExpression cronExpr = CronExpression.parse(cron); + return new CronTask(ref, name, method, cronExpr, zoneId); + } else { + long fixedDelayLong = Long.parseLong(fixedDelay); + long fixedRateLong = Long.parseLong(fixedRate); + long initialDelayLong = Long.parseLong(initialDelay); + return new FixedTask(ref, name, method, fixedDelayLong, fixedRateLong, initialDelayLong, timeUnit); + } + } + + protected Runnable createRunnable(final WeakReference ref, Method method) { + if (!Modifier.isPublic(method.getModifiers())) { + method.setAccessible(true); + } + return () -> { + try { + Object obj = ref.get(); + if (obj != null) { + if (logger.isLoggable(Level.FINEST)) { + logger.log(Level.FINEST, "schedule task " + method.getDeclaringClass().getSimpleName() + "." + method.getName()); + } + method.invoke(obj); + } + } catch (Exception e) { + logger.log(Level.SEVERE, "schedule task error", e); + } + }; + } + + protected String getProperty(String value) { + if (propertyFunc == null || value.indexOf('}') < 0) { + return value; + } + return propertyFunc.apply(value); + } + + public void destroy() { + if (scheduler != null) { + scheduler.shutdown(); + } + } + + protected abstract class ScheduledTask implements Runnable { + + protected final WeakReference ref; + + protected final String name; + + protected final Method method; + + protected ScheduledFuture future; + + protected ScheduledTask(WeakReference ref, String name, Method method) { + Objects.requireNonNull(ref); + Objects.requireNonNull(name); + Objects.requireNonNull(method); + this.ref = ref; + this.name = name; + this.method = method; + } + + public abstract void start(); + + public void cancel() { + if (future != null) { + future.cancel(true); + } + } + + public Method method() { + return method; + } + + public String name() { + return name; + } + } + + protected class FixedTask extends ScheduledTask { + + private final Runnable delegate; + + private final long fixedDelay; + + private final long fixedRate; + + private final long initialDelay; + + private final TimeUnit timeUnit; + + public FixedTask(final WeakReference ref, String name, Method method, long fixedDelay, long fixedRate, long initialDelay, TimeUnit timeUnit) { + super(ref, name, method); + this.delegate = createRunnable(ref, method); + this.fixedDelay = fixedDelay; + this.fixedRate = fixedRate; + this.initialDelay = initialDelay; + this.timeUnit = timeUnit; + } + + @Override + public void run() { + try { + delegate.run(); + } catch (Throwable t) { + logger.log(Level.SEVERE, "schedule task error", t); + } + if (ref.get() == null) { + cancel(); + } + } + + @Override + public void start() { + if (fixedRate > 0) { + this.future = scheduler.scheduleAtFixedRate(this, initialDelay > 0 ? initialDelay : 0, fixedRate, timeUnit); + } else if (fixedDelay > 0) { + this.future = scheduler.scheduleWithFixedDelay(this, initialDelay, fixedDelay, timeUnit); + } else if (initialDelay > 0) { + this.future = scheduler.schedule(this, initialDelay, timeUnit); + } + } + } + + protected class CronTask extends ScheduledTask { + + private final Runnable delegate; + + private final CronExpression cron; + + @Nullable + private final ZoneId zoneId; + + public CronTask(WeakReference ref, String name, Method method, CronExpression cron, ZoneId zoneId) { + super(ref, name, method); + this.delegate = createRunnable(ref, method); + this.cron = cron; + this.zoneId = zoneId; + } + + @Override + public void run() { + try { + delegate.run(); + } catch (Throwable t) { + logger.log(Level.SEVERE, "schedule task error", t); + } + start(); + } + + @Override + public void start() { + if (ref.get() == null) { + return; + } + LocalDateTime now = zoneId == null ? LocalDateTime.now() : LocalDateTime.now(zoneId); + LocalDateTime next = cron.next(now); + Duration delay = Duration.between(now, next); + this.future = scheduler.schedule(this, delay.toNanos(), TimeUnit.NANOSECONDS); + } + } +} diff --git a/src/main/java/org/redkale/util/RedkaleClassLoader.java b/src/main/java/org/redkale/util/RedkaleClassLoader.java index 6004602e9..53a950b0c 100644 --- a/src/main/java/org/redkale/util/RedkaleClassLoader.java +++ b/src/main/java/org/redkale/util/RedkaleClassLoader.java @@ -59,6 +59,7 @@ public class RedkaleClassLoader extends URLClassLoader { "org.redkale.net.http", "org.redkale.net.sncp", "org.redkale.persistence", + "org.redkale.scheduling", "org.redkale.service", "org.redkale.source", "org.redkale.util",