From 34a03b0e263af395364e5e9a1348ec7666c5f331 Mon Sep 17 00:00:00 2001 From: redkale Date: Tue, 2 Jan 2024 15:48:26 +0800 Subject: [PATCH] =?UTF-8?q?ScheduleManager=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/datasource.md | 9 ++ .../java/org/redkale/boot/NodeServer.java | 9 +- .../org/redkale/schedule/ScheduleManager.java | 25 +++- .../schedule/spi/ScheduleManagerService.java | 125 ++++++++++++------ 4 files changed, 123 insertions(+), 45 deletions(-) diff --git a/docs/datasource.md b/docs/datasource.md index 60436f940..b380c7cc8 100644 --- a/docs/datasource.md +++ b/docs/datasource.md @@ -180,5 +180,14 @@ public class Account {   查询实体对象: ```java + //主键查询 + //等价sql: SELECT * FROM t_account WHERE account_id = 'account1'; + Account account = source.find(Account.class, "account1"); + + //等价sql: SELECT * FROM t_account WHERE account_name = 'Hello' AND age = 18 LIMIT 1; + Account one = source.find(Account.class, FilterNodes.eq(Account::getAccountName, "Hello").and(Account::getAge, 18)); + + //等价sql: SELECT * FROM t_account WHERE account_name = 'Hello' OR age = 18; + List list = source.queryList(Account.class, FilterNodes.eq(Account::getAccountName, "Hello").or(Account::getAge, 18)); ``` \ No newline at end of file diff --git a/src/main/java/org/redkale/boot/NodeServer.java b/src/main/java/org/redkale/boot/NodeServer.java index b1aff532a..2f6a79860 100644 --- a/src/main/java/org/redkale/boot/NodeServer.java +++ b/src/main/java/org/redkale/boot/NodeServer.java @@ -535,7 +535,8 @@ public abstract class NodeServer { long e = System.currentTimeMillis() - s; if (slist != null) { String serstr = Sncp.toSimpleString(y, maxNameLength, maxTypeLength); - slist.add(new StringBuilder().append(serstr).append(" load and init in ").append(e < 10 ? " " : (e < 100 ? " " : "")).append(e).append(" ms").append(LINE_SEPARATOR).toString()); + slist.add(new StringBuilder().append(serstr).append(" load and init in ") + .append(e < 10 ? " " : (e < 100 ? " " : "")).append(e).append(" ms").append(LINE_SEPARATOR).toString()); } }); localServices.stream().forEach(y -> { @@ -545,7 +546,8 @@ public abstract class NodeServer { long e = System.currentTimeMillis() - s; if (rs && slist != null) { String serstr = Sncp.toSimpleString(y, maxNameLength, maxTypeLength); - slist.add(new StringBuilder().append(serstr).append(" component-start in ").append(e < 10 ? " " : (e < 100 ? " " : "")).append(e).append(" ms").append(LINE_SEPARATOR).toString()); + slist.add(new StringBuilder().append(serstr).append(" component-start in ") + .append(e < 10 ? " " : (e < 100 ? " " : "")).append(e).append(" ms").append(LINE_SEPARATOR).toString()); } } }); @@ -645,7 +647,8 @@ public abstract class NodeServer { } protected ClassFilter createServiceClassFilter() { - return createClassFilter(this.sncpGroup, null, Service.class, (!isSNCP() && application.watching) ? null : new Class[]{org.redkale.watch.WatchService.class}, Annotation.class, "services", "service"); + return createClassFilter(this.sncpGroup, null, Service.class, + (!isSNCP() && application.watching) ? null : new Class[]{org.redkale.watch.WatchService.class}, Annotation.class, "services", "service"); } protected ClassFilter createClassFilter(final String localGroup, Class ref, diff --git a/src/main/java/org/redkale/schedule/ScheduleManager.java b/src/main/java/org/redkale/schedule/ScheduleManager.java index 836ddf3a5..4d5c0d238 100644 --- a/src/main/java/org/redkale/schedule/ScheduleManager.java +++ b/src/main/java/org/redkale/schedule/ScheduleManager.java @@ -21,9 +21,19 @@ public interface ScheduleManager { * * @param service 宿主对象 * - * @return 存在定时任务方法返回true,否则返回false */ - public boolean schedule(Object service); + public void schedule(Object service); + + /** + * 开启所有宿主对象中指定的任务名 + * + * @see org.redkale.schedule.Scheduled#name() + * + * @param scheduleName 定时任务名称 + * + * @return 返回任务数量 + */ + public int start(String scheduleName); /** * 关闭宿主对象中所有的定时任务方法 @@ -31,4 +41,15 @@ public interface ScheduleManager { * @param service 宿主对象 */ public void unschedule(Object service); + + /** + * 关闭所有宿主对象中指定的任务名 + * + * @see org.redkale.schedule.Scheduled#name() + * + * @param scheduleName 定时任务名称 + * + * @return 返回任务数量 + */ + public int stop(String scheduleName); } diff --git a/src/main/java/org/redkale/schedule/spi/ScheduleManagerService.java b/src/main/java/org/redkale/schedule/spi/ScheduleManagerService.java index e40a89f11..980ccbf77 100644 --- a/src/main/java/org/redkale/schedule/spi/ScheduleManagerService.java +++ b/src/main/java/org/redkale/schedule/spi/ScheduleManagerService.java @@ -23,8 +23,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; import java.util.function.UnaryOperator; import java.util.logging.Level; import java.util.logging.Logger; @@ -122,20 +123,18 @@ public class ScheduleManagerService implements ScheduleManager, Service { } @Override - public boolean schedule(Object service) { + public void schedule(Object service) { lock.lock(); try { boolean remoteMode = service instanceof Service && Sncp.isRemote((Service) service); for (WeakReference item : refTaskMap.keySet()) { if (item.get() == service) { logger.log(Level.WARNING, service + " repeat schedule"); - return false; } } Map tasks = new LinkedHashMap<>(); Class clazz = service.getClass(); WeakReference ref = new WeakReference(service); - AtomicInteger taskCount = new AtomicInteger(); Set methodKeys = new HashSet<>(); do { for (final Method method : clazz.getDeclaredMethods()) { @@ -153,7 +152,7 @@ public class ScheduleManagerService implements ScheduleManager, Service { throw new RedkaleException("@" + Scheduled.class.getSimpleName() + " must be on non-parameter or " + ScheduleEvent.class.getSimpleName() + "-parameter method, but on " + method); } - ScheduledTask task = schedule(ref, method, remoteMode, taskCount); + ScheduledTask task = schedule(ref, method, remoteMode); //时间没配置: task=null if (task != null) { tasks.put(method.getName(), task); @@ -166,7 +165,6 @@ public class ScheduleManagerService implements ScheduleManager, Service { tasks.forEach((name, task) -> task.start()); refTaskMap.put(ref, new ArrayList<>(tasks.values())); } - return taskCount.get() > 0; } finally { lock.unlock(); } @@ -180,7 +178,7 @@ public class ScheduleManagerService implements ScheduleManager, Service { if (item.getKey().get() == service) { refTaskMap.remove(item.getKey()); for (ScheduledTask task : item.getValue()) { - task.cancel(); + task.stop(); } } } @@ -189,7 +187,7 @@ public class ScheduleManagerService implements ScheduleManager, Service { } } - protected ScheduledTask schedule(WeakReference ref, Method method, boolean remoteMode, AtomicInteger taskCount) { + protected ScheduledTask schedule(WeakReference ref, Method method, boolean remoteMode) { Scheduled ann = method.getAnnotation(Scheduled.class); if (!LoadMode.matches(remoteMode, ann.mode())) { return null; @@ -201,16 +199,14 @@ public class ScheduleManagerService implements ScheduleManager, Service { String initialDelay = getProperty(ann.initialDelay()); String zone = getProperty(ann.zone()); TimeUnit timeUnit = ann.timeUnit(); - return scheduleTask(ref, method, taskCount, name, cron, fixedDelay, fixedRate, initialDelay, zone, timeUnit); + return scheduleTask(ref, method, name, cron, fixedDelay, fixedRate, initialDelay, zone, timeUnit); } - protected ScheduledTask scheduleTask(WeakReference ref, Method method, AtomicInteger taskCount, - String name, String cron, String fixedDelay, String fixedRate, - String initialDelay, String zone, TimeUnit timeUnit) { + protected ScheduledTask scheduleTask(WeakReference ref, Method method, String name, + String cron, String fixedDelay, String fixedRate, String initialDelay, String zone, TimeUnit timeUnit) { if ((cron.isEmpty() || "-".equals(cron)) && "-1".equals(fixedRate) && "-1".endsWith(fixedDelay)) { - return createdOnlyNameTask(ref, method, taskCount, name, cron, fixedDelay, fixedRate, initialDelay, zone, timeUnit); //时间都没配置 + return createdOnlyNameTask(ref, method, name, cron, fixedDelay, fixedRate, initialDelay, zone, timeUnit); //时间都没配置 } - taskCount.incrementAndGet(); ZoneId zoneId = Utility.isEmpty(zone) ? null : ZoneId.of(zone); if (!cron.isEmpty() && !"-".equals(cron)) { CronExpression cronExpr = CronExpression.parse(cron); @@ -223,32 +219,32 @@ public class ScheduleManagerService implements ScheduleManager, Service { } } - protected ScheduledTask createdOnlyNameTask(WeakReference ref, Method method, AtomicInteger taskCount, - String name, String cron, String fixedDelay, String fixedRate, - String initialDelay, String zone, TimeUnit timeUnit) { + protected ScheduledTask createdOnlyNameTask(WeakReference ref, Method method, String name, + String cron, String fixedDelay, String fixedRate, String initialDelay, String zone, TimeUnit timeUnit) { return null; } - protected Runnable createRunnable(final WeakReference ref, Method method) { + protected Function createFuncJob(final WeakReference ref, Method method) { try { if (!Modifier.isPublic(method.getModifiers())) { method.setAccessible(true); } MethodHandle mh = MethodHandles.lookup().unreflect(method); - ScheduleEvent event = method.getParameterCount() == 1 ? new ScheduleEvent() : null; - return () -> { + return event -> { + Object rs = null; try { Object obj = ref.get(); if (obj != null) { if (event == null) { - mh.invoke(obj); + rs = mh.invoke(obj); } else { - mh.invoke(obj, event.clear()); + rs = mh.invoke(obj, event.clear()); } } } catch (Throwable t) { logger.log(Level.SEVERE, "schedule task error", t); } + return rs; }; } catch (IllegalAccessException e) { throw new RedkaleException(e); @@ -302,6 +298,44 @@ public class ScheduleManagerService implements ScheduleManager, Service { } } + @Override + public int start(String scheduleName) { + int c = 0; + lock.lock(); + try { + for (Map.Entry> item : refTaskMap.entrySet()) { + for (ScheduledTask task : item.getValue()) { + if (Objects.equals(task.name(), scheduleName)) { + c++; + task.start(); + } + } + } + } finally { + lock.unlock(); + } + return c; + } + + @Override + public int stop(String scheduleName) { + int c = 0; + lock.lock(); + try { + for (Map.Entry> item : refTaskMap.entrySet()) { + for (ScheduledTask task : item.getValue()) { + if (Objects.equals(task.name(), scheduleName)) { + c++; + task.stop(); + } + } + } + } finally { + lock.unlock(); + } + return c; + } + protected abstract class ScheduledTask implements Runnable { protected final WeakReference ref; @@ -310,8 +344,12 @@ public class ScheduleManagerService implements ScheduleManager, Service { protected final Method method; + protected final AtomicBoolean started = new AtomicBoolean(); + protected ScheduledFuture future; + protected final ScheduleEvent event; + protected ScheduledTask(WeakReference ref, String name, Method method) { Objects.requireNonNull(ref); Objects.requireNonNull(name); @@ -319,14 +357,17 @@ public class ScheduleManagerService implements ScheduleManager, Service { this.ref = ref; this.name = name; this.method = method; + this.event = method.getParameterCount() == 0 ? null : new ScheduleEvent(); } public abstract void start(); - public void cancel() { + public void stop() { if (future != null) { future.cancel(true); + future = null; } + this.started.set(false); } public Method method() { @@ -340,7 +381,7 @@ public class ScheduleManagerService implements ScheduleManager, Service { protected class FixedTask extends ScheduledTask { - private final Runnable delegate; + private final Function delegate; private final long fixedDelay; @@ -352,7 +393,7 @@ public class ScheduleManagerService implements ScheduleManager, Service { public FixedTask(final WeakReference ref, String name, Method method, long fixedDelay, long fixedRate, long initialDelay, TimeUnit timeUnit) { super(ref, name, method); - this.delegate = createRunnable(ref, method); + this.delegate = createFuncJob(ref, method); this.fixedDelay = fixedDelay; this.fixedRate = fixedRate; this.initialDelay = initialDelay; @@ -362,30 +403,32 @@ public class ScheduleManagerService implements ScheduleManager, Service { @Override public void run() { try { - delegate.run(); + delegate.apply(event); } catch (Throwable t) { logger.log(Level.SEVERE, "schedule task error", t); } if (ref.get() == null) { - cancel(); + stop(); } } @Override public void start() { - if (fixedRate > 0) { - this.future = scheduler.scheduleAtFixedRate(this, initialDelay > 0 ? initialDelay : 0, fixedRate, timeUnit); - } else if (fixedDelay > 0) { - this.future = scheduler.scheduleWithFixedDelay(this, initialDelay, fixedDelay, timeUnit); - } else if (initialDelay > 0) { - this.future = scheduler.schedule(this, initialDelay, timeUnit); + if (started.compareAndSet(false, true)) { + if (fixedRate > 0) { + this.future = scheduler.scheduleAtFixedRate(this, initialDelay > 0 ? initialDelay : 0, fixedRate, timeUnit); + } else if (fixedDelay > 0) { + this.future = scheduler.scheduleWithFixedDelay(this, initialDelay, fixedDelay, timeUnit); + } else if (initialDelay > 0) { + this.future = scheduler.schedule(this, initialDelay, timeUnit); + } } } } protected class CronTask extends ScheduledTask { - private final Runnable delegate; + private final Function delegate; private final CronExpression cron; @@ -394,7 +437,7 @@ public class ScheduleManagerService implements ScheduleManager, Service { public CronTask(WeakReference ref, String name, Method method, CronExpression cron, ZoneId zoneId) { super(ref, name, method); - this.delegate = createRunnable(ref, method); + this.delegate = createFuncJob(ref, method); this.cron = cron; this.zoneId = zoneId; } @@ -402,7 +445,7 @@ public class ScheduleManagerService implements ScheduleManager, Service { @Override public void run() { try { - delegate.run(); + delegate.apply(event); } catch (Throwable t) { logger.log(Level.SEVERE, "schedule task error", t); } @@ -414,10 +457,12 @@ public class ScheduleManagerService implements ScheduleManager, Service { if (ref.get() == null) { return; } - LocalDateTime now = zoneId == null ? LocalDateTime.now() : LocalDateTime.now(zoneId); - LocalDateTime next = cron.next(now); - Duration delay = Duration.between(now, next); - this.future = scheduler.schedule(this, delay.toNanos(), TimeUnit.NANOSECONDS); + if (started.compareAndSet(false, true)) { + LocalDateTime now = zoneId == null ? LocalDateTime.now() : LocalDateTime.now(zoneId); + LocalDateTime next = cron.next(now); + Duration delay = Duration.between(now, next); + this.future = scheduler.schedule(this, delay.toNanos(), TimeUnit.NANOSECONDS); + } } } }