ScheduleManager优化

This commit is contained in:
redkale
2024-01-02 15:48:26 +08:00
parent cad219d7e1
commit 34a03b0e26
4 changed files with 123 additions and 45 deletions

View File

@@ -180,5 +180,14 @@ public class Account {
  查询实体对象:   查询实体对象:
```java ```java
//主键查询
//等价sql: SELECT * FROM t_account WHERE account_id = 'account1';
Account account = source.find(Account.class, "account1");
//等价sql: SELECT * FROM t_account WHERE account_name = 'Hello' AND age = 18 LIMIT 1;
Account one = source.find(Account.class, FilterNodes.eq(Account::getAccountName, "Hello").and(Account::getAge, 18));
//等价sql: SELECT * FROM t_account WHERE account_name = 'Hello' OR age = 18;
List<Account> list = source.queryList(Account.class, FilterNodes.eq(Account::getAccountName, "Hello").or(Account::getAge, 18));
``` ```

View File

@@ -535,7 +535,8 @@ public abstract class NodeServer {
long e = System.currentTimeMillis() - s; long e = System.currentTimeMillis() - s;
if (slist != null) { if (slist != null) {
String serstr = Sncp.toSimpleString(y, maxNameLength, maxTypeLength); String serstr = Sncp.toSimpleString(y, maxNameLength, maxTypeLength);
slist.add(new StringBuilder().append(serstr).append(" load and init in ").append(e < 10 ? " " : (e < 100 ? " " : "")).append(e).append(" ms").append(LINE_SEPARATOR).toString()); slist.add(new StringBuilder().append(serstr).append(" load and init in ")
.append(e < 10 ? " " : (e < 100 ? " " : "")).append(e).append(" ms").append(LINE_SEPARATOR).toString());
} }
}); });
localServices.stream().forEach(y -> { localServices.stream().forEach(y -> {
@@ -545,7 +546,8 @@ public abstract class NodeServer {
long e = System.currentTimeMillis() - s; long e = System.currentTimeMillis() - s;
if (rs && slist != null) { if (rs && slist != null) {
String serstr = Sncp.toSimpleString(y, maxNameLength, maxTypeLength); String serstr = Sncp.toSimpleString(y, maxNameLength, maxTypeLength);
slist.add(new StringBuilder().append(serstr).append(" component-start in ").append(e < 10 ? " " : (e < 100 ? " " : "")).append(e).append(" ms").append(LINE_SEPARATOR).toString()); slist.add(new StringBuilder().append(serstr).append(" component-start in ")
.append(e < 10 ? " " : (e < 100 ? " " : "")).append(e).append(" ms").append(LINE_SEPARATOR).toString());
} }
} }
}); });
@@ -645,7 +647,8 @@ public abstract class NodeServer {
} }
protected ClassFilter<Service> createServiceClassFilter() { protected ClassFilter<Service> createServiceClassFilter() {
return createClassFilter(this.sncpGroup, null, Service.class, (!isSNCP() && application.watching) ? null : new Class[]{org.redkale.watch.WatchService.class}, Annotation.class, "services", "service"); return createClassFilter(this.sncpGroup, null, Service.class,
(!isSNCP() && application.watching) ? null : new Class[]{org.redkale.watch.WatchService.class}, Annotation.class, "services", "service");
} }
protected ClassFilter createClassFilter(final String localGroup, Class<? extends Annotation> ref, protected ClassFilter createClassFilter(final String localGroup, Class<? extends Annotation> ref,

View File

@@ -21,9 +21,19 @@ public interface ScheduleManager {
* *
* @param service 宿主对象 * @param service 宿主对象
* *
* @return 存在定时任务方法返回true否则返回false
*/ */
public boolean schedule(Object service); public void schedule(Object service);
/**
* 开启所有宿主对象中指定的任务名
*
* @see org.redkale.schedule.Scheduled#name()
*
* @param scheduleName 定时任务名称
*
* @return 返回任务数量
*/
public int start(String scheduleName);
/** /**
* 关闭宿主对象中所有的定时任务方法 * 关闭宿主对象中所有的定时任务方法
@@ -31,4 +41,15 @@ public interface ScheduleManager {
* @param service 宿主对象 * @param service 宿主对象
*/ */
public void unschedule(Object service); public void unschedule(Object service);
/**
* 关闭所有宿主对象中指定的任务名
*
* @see org.redkale.schedule.Scheduled#name()
*
* @param scheduleName 定时任务名称
*
* @return 返回任务数量
*/
public int stop(String scheduleName);
} }

View File

@@ -23,8 +23,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.UnaryOperator; import java.util.function.UnaryOperator;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@@ -122,20 +123,18 @@ public class ScheduleManagerService implements ScheduleManager, Service {
} }
@Override @Override
public boolean schedule(Object service) { public void schedule(Object service) {
lock.lock(); lock.lock();
try { try {
boolean remoteMode = service instanceof Service && Sncp.isRemote((Service) service); boolean remoteMode = service instanceof Service && Sncp.isRemote((Service) service);
for (WeakReference item : refTaskMap.keySet()) { for (WeakReference item : refTaskMap.keySet()) {
if (item.get() == service) { if (item.get() == service) {
logger.log(Level.WARNING, service + " repeat schedule"); logger.log(Level.WARNING, service + " repeat schedule");
return false;
} }
} }
Map<String, ScheduledTask> tasks = new LinkedHashMap<>(); Map<String, ScheduledTask> tasks = new LinkedHashMap<>();
Class clazz = service.getClass(); Class clazz = service.getClass();
WeakReference ref = new WeakReference(service); WeakReference ref = new WeakReference(service);
AtomicInteger taskCount = new AtomicInteger();
Set<String> methodKeys = new HashSet<>(); Set<String> methodKeys = new HashSet<>();
do { do {
for (final Method method : clazz.getDeclaredMethods()) { for (final Method method : clazz.getDeclaredMethods()) {
@@ -153,7 +152,7 @@ public class ScheduleManagerService implements ScheduleManager, Service {
throw new RedkaleException("@" + Scheduled.class.getSimpleName() + " must be on non-parameter or " throw new RedkaleException("@" + Scheduled.class.getSimpleName() + " must be on non-parameter or "
+ ScheduleEvent.class.getSimpleName() + "-parameter method, but on " + method); + ScheduleEvent.class.getSimpleName() + "-parameter method, but on " + method);
} }
ScheduledTask task = schedule(ref, method, remoteMode, taskCount); ScheduledTask task = schedule(ref, method, remoteMode);
//时间没配置: task=null //时间没配置: task=null
if (task != null) { if (task != null) {
tasks.put(method.getName(), task); tasks.put(method.getName(), task);
@@ -166,7 +165,6 @@ public class ScheduleManagerService implements ScheduleManager, Service {
tasks.forEach((name, task) -> task.start()); tasks.forEach((name, task) -> task.start());
refTaskMap.put(ref, new ArrayList<>(tasks.values())); refTaskMap.put(ref, new ArrayList<>(tasks.values()));
} }
return taskCount.get() > 0;
} finally { } finally {
lock.unlock(); lock.unlock();
} }
@@ -180,7 +178,7 @@ public class ScheduleManagerService implements ScheduleManager, Service {
if (item.getKey().get() == service) { if (item.getKey().get() == service) {
refTaskMap.remove(item.getKey()); refTaskMap.remove(item.getKey());
for (ScheduledTask task : item.getValue()) { for (ScheduledTask task : item.getValue()) {
task.cancel(); task.stop();
} }
} }
} }
@@ -189,7 +187,7 @@ public class ScheduleManagerService implements ScheduleManager, Service {
} }
} }
protected ScheduledTask schedule(WeakReference ref, Method method, boolean remoteMode, AtomicInteger taskCount) { protected ScheduledTask schedule(WeakReference ref, Method method, boolean remoteMode) {
Scheduled ann = method.getAnnotation(Scheduled.class); Scheduled ann = method.getAnnotation(Scheduled.class);
if (!LoadMode.matches(remoteMode, ann.mode())) { if (!LoadMode.matches(remoteMode, ann.mode())) {
return null; return null;
@@ -201,16 +199,14 @@ public class ScheduleManagerService implements ScheduleManager, Service {
String initialDelay = getProperty(ann.initialDelay()); String initialDelay = getProperty(ann.initialDelay());
String zone = getProperty(ann.zone()); String zone = getProperty(ann.zone());
TimeUnit timeUnit = ann.timeUnit(); TimeUnit timeUnit = ann.timeUnit();
return scheduleTask(ref, method, taskCount, name, cron, fixedDelay, fixedRate, initialDelay, zone, timeUnit); return scheduleTask(ref, method, name, cron, fixedDelay, fixedRate, initialDelay, zone, timeUnit);
} }
protected ScheduledTask scheduleTask(WeakReference ref, Method method, AtomicInteger taskCount, protected ScheduledTask scheduleTask(WeakReference ref, Method method, String name,
String name, String cron, String fixedDelay, String fixedRate, String cron, String fixedDelay, String fixedRate, String initialDelay, String zone, TimeUnit timeUnit) {
String initialDelay, String zone, TimeUnit timeUnit) {
if ((cron.isEmpty() || "-".equals(cron)) && "-1".equals(fixedRate) && "-1".endsWith(fixedDelay)) { if ((cron.isEmpty() || "-".equals(cron)) && "-1".equals(fixedRate) && "-1".endsWith(fixedDelay)) {
return createdOnlyNameTask(ref, method, taskCount, name, cron, fixedDelay, fixedRate, initialDelay, zone, timeUnit); //时间都没配置 return createdOnlyNameTask(ref, method, name, cron, fixedDelay, fixedRate, initialDelay, zone, timeUnit); //时间都没配置
} }
taskCount.incrementAndGet();
ZoneId zoneId = Utility.isEmpty(zone) ? null : ZoneId.of(zone); ZoneId zoneId = Utility.isEmpty(zone) ? null : ZoneId.of(zone);
if (!cron.isEmpty() && !"-".equals(cron)) { if (!cron.isEmpty() && !"-".equals(cron)) {
CronExpression cronExpr = CronExpression.parse(cron); CronExpression cronExpr = CronExpression.parse(cron);
@@ -223,32 +219,32 @@ public class ScheduleManagerService implements ScheduleManager, Service {
} }
} }
protected ScheduledTask createdOnlyNameTask(WeakReference ref, Method method, AtomicInteger taskCount, protected ScheduledTask createdOnlyNameTask(WeakReference ref, Method method, String name,
String name, String cron, String fixedDelay, String fixedRate, String cron, String fixedDelay, String fixedRate, String initialDelay, String zone, TimeUnit timeUnit) {
String initialDelay, String zone, TimeUnit timeUnit) {
return null; return null;
} }
protected Runnable createRunnable(final WeakReference ref, Method method) { protected Function<ScheduleEvent, Object> createFuncJob(final WeakReference ref, Method method) {
try { try {
if (!Modifier.isPublic(method.getModifiers())) { if (!Modifier.isPublic(method.getModifiers())) {
method.setAccessible(true); method.setAccessible(true);
} }
MethodHandle mh = MethodHandles.lookup().unreflect(method); MethodHandle mh = MethodHandles.lookup().unreflect(method);
ScheduleEvent event = method.getParameterCount() == 1 ? new ScheduleEvent() : null; return event -> {
return () -> { Object rs = null;
try { try {
Object obj = ref.get(); Object obj = ref.get();
if (obj != null) { if (obj != null) {
if (event == null) { if (event == null) {
mh.invoke(obj); rs = mh.invoke(obj);
} else { } else {
mh.invoke(obj, event.clear()); rs = mh.invoke(obj, event.clear());
} }
} }
} catch (Throwable t) { } catch (Throwable t) {
logger.log(Level.SEVERE, "schedule task error", t); logger.log(Level.SEVERE, "schedule task error", t);
} }
return rs;
}; };
} catch (IllegalAccessException e) { } catch (IllegalAccessException e) {
throw new RedkaleException(e); throw new RedkaleException(e);
@@ -302,6 +298,44 @@ public class ScheduleManagerService implements ScheduleManager, Service {
} }
} }
@Override
public int start(String scheduleName) {
int c = 0;
lock.lock();
try {
for (Map.Entry<WeakReference, List<ScheduledTask>> item : refTaskMap.entrySet()) {
for (ScheduledTask task : item.getValue()) {
if (Objects.equals(task.name(), scheduleName)) {
c++;
task.start();
}
}
}
} finally {
lock.unlock();
}
return c;
}
@Override
public int stop(String scheduleName) {
int c = 0;
lock.lock();
try {
for (Map.Entry<WeakReference, List<ScheduledTask>> item : refTaskMap.entrySet()) {
for (ScheduledTask task : item.getValue()) {
if (Objects.equals(task.name(), scheduleName)) {
c++;
task.stop();
}
}
}
} finally {
lock.unlock();
}
return c;
}
protected abstract class ScheduledTask implements Runnable { protected abstract class ScheduledTask implements Runnable {
protected final WeakReference ref; protected final WeakReference ref;
@@ -310,8 +344,12 @@ public class ScheduleManagerService implements ScheduleManager, Service {
protected final Method method; protected final Method method;
protected final AtomicBoolean started = new AtomicBoolean();
protected ScheduledFuture future; protected ScheduledFuture future;
protected final ScheduleEvent event;
protected ScheduledTask(WeakReference ref, String name, Method method) { protected ScheduledTask(WeakReference ref, String name, Method method) {
Objects.requireNonNull(ref); Objects.requireNonNull(ref);
Objects.requireNonNull(name); Objects.requireNonNull(name);
@@ -319,14 +357,17 @@ public class ScheduleManagerService implements ScheduleManager, Service {
this.ref = ref; this.ref = ref;
this.name = name; this.name = name;
this.method = method; this.method = method;
this.event = method.getParameterCount() == 0 ? null : new ScheduleEvent();
} }
public abstract void start(); public abstract void start();
public void cancel() { public void stop() {
if (future != null) { if (future != null) {
future.cancel(true); future.cancel(true);
future = null;
} }
this.started.set(false);
} }
public Method method() { public Method method() {
@@ -340,7 +381,7 @@ public class ScheduleManagerService implements ScheduleManager, Service {
protected class FixedTask extends ScheduledTask { protected class FixedTask extends ScheduledTask {
private final Runnable delegate; private final Function<ScheduleEvent, Object> delegate;
private final long fixedDelay; private final long fixedDelay;
@@ -352,7 +393,7 @@ public class ScheduleManagerService implements ScheduleManager, Service {
public FixedTask(final WeakReference ref, String name, Method method, long fixedDelay, long fixedRate, long initialDelay, TimeUnit timeUnit) { public FixedTask(final WeakReference ref, String name, Method method, long fixedDelay, long fixedRate, long initialDelay, TimeUnit timeUnit) {
super(ref, name, method); super(ref, name, method);
this.delegate = createRunnable(ref, method); this.delegate = createFuncJob(ref, method);
this.fixedDelay = fixedDelay; this.fixedDelay = fixedDelay;
this.fixedRate = fixedRate; this.fixedRate = fixedRate;
this.initialDelay = initialDelay; this.initialDelay = initialDelay;
@@ -362,30 +403,32 @@ public class ScheduleManagerService implements ScheduleManager, Service {
@Override @Override
public void run() { public void run() {
try { try {
delegate.run(); delegate.apply(event);
} catch (Throwable t) { } catch (Throwable t) {
logger.log(Level.SEVERE, "schedule task error", t); logger.log(Level.SEVERE, "schedule task error", t);
} }
if (ref.get() == null) { if (ref.get() == null) {
cancel(); stop();
} }
} }
@Override @Override
public void start() { public void start() {
if (fixedRate > 0) { if (started.compareAndSet(false, true)) {
this.future = scheduler.scheduleAtFixedRate(this, initialDelay > 0 ? initialDelay : 0, fixedRate, timeUnit); if (fixedRate > 0) {
} else if (fixedDelay > 0) { this.future = scheduler.scheduleAtFixedRate(this, initialDelay > 0 ? initialDelay : 0, fixedRate, timeUnit);
this.future = scheduler.scheduleWithFixedDelay(this, initialDelay, fixedDelay, timeUnit); } else if (fixedDelay > 0) {
} else if (initialDelay > 0) { this.future = scheduler.scheduleWithFixedDelay(this, initialDelay, fixedDelay, timeUnit);
this.future = scheduler.schedule(this, initialDelay, timeUnit); } else if (initialDelay > 0) {
this.future = scheduler.schedule(this, initialDelay, timeUnit);
}
} }
} }
} }
protected class CronTask extends ScheduledTask { protected class CronTask extends ScheduledTask {
private final Runnable delegate; private final Function<ScheduleEvent, Object> delegate;
private final CronExpression cron; private final CronExpression cron;
@@ -394,7 +437,7 @@ public class ScheduleManagerService implements ScheduleManager, Service {
public CronTask(WeakReference ref, String name, Method method, CronExpression cron, ZoneId zoneId) { public CronTask(WeakReference ref, String name, Method method, CronExpression cron, ZoneId zoneId) {
super(ref, name, method); super(ref, name, method);
this.delegate = createRunnable(ref, method); this.delegate = createFuncJob(ref, method);
this.cron = cron; this.cron = cron;
this.zoneId = zoneId; this.zoneId = zoneId;
} }
@@ -402,7 +445,7 @@ public class ScheduleManagerService implements ScheduleManager, Service {
@Override @Override
public void run() { public void run() {
try { try {
delegate.run(); delegate.apply(event);
} catch (Throwable t) { } catch (Throwable t) {
logger.log(Level.SEVERE, "schedule task error", t); logger.log(Level.SEVERE, "schedule task error", t);
} }
@@ -414,10 +457,12 @@ public class ScheduleManagerService implements ScheduleManager, Service {
if (ref.get() == null) { if (ref.get() == null) {
return; return;
} }
LocalDateTime now = zoneId == null ? LocalDateTime.now() : LocalDateTime.now(zoneId); if (started.compareAndSet(false, true)) {
LocalDateTime next = cron.next(now); LocalDateTime now = zoneId == null ? LocalDateTime.now() : LocalDateTime.now(zoneId);
Duration delay = Duration.between(now, next); LocalDateTime next = cron.next(now);
this.future = scheduler.schedule(this, delay.toNanos(), TimeUnit.NANOSECONDS); Duration delay = Duration.between(now, next);
this.future = scheduler.schedule(this, delay.toNanos(), TimeUnit.NANOSECONDS);
}
} }
} }
} }