ScheduledManager增加execute方法

This commit is contained in:
redkale
2024-08-01 08:03:10 +08:00
parent d6cd97c798
commit f1f8f653e4
5 changed files with 64 additions and 20 deletions

View File

@@ -3,6 +3,8 @@
*/
package org.redkale.scheduled;
import java.util.List;
/**
* 定时管理器
*
@@ -29,6 +31,16 @@ public interface ScheduledManager {
*/
public int start(String scheduleName);
/**
* 执行所有宿主对象中指定的任务名
*
* @see org.redkale.scheduled.Scheduled#name()
* @param scheduleName 定时任务名称
* @param all 是执行所有同名任务还是只执行其中任意一个
* @return 结果集合
*/
public List<Object> execute(String scheduleName, boolean all);
/**
* 关闭宿主对象中所有的定时任务方法
*

View File

@@ -307,6 +307,27 @@ public class ScheduleManagerService implements ScheduledManager, Service {
return c;
}
@Override
public List<Object> execute(String scheduleName, boolean all) {
List<Object> rs = new ArrayList<>();
lock.lock();
try {
for (Map.Entry<WeakReference, List<ScheduledTask>> item : refTaskMap.entrySet()) {
for (ScheduledTask task : item.getValue()) {
if (Objects.equals(task.name(), scheduleName)) {
rs.add(task.execute());
if (!all) {
return rs;
}
}
}
}
} finally {
lock.unlock();
}
return rs;
}
@Override
public int stop(String scheduleName) {
int c = 0;
@@ -362,6 +383,21 @@ public class ScheduleManagerService implements ScheduledManager, Service {
public abstract void start();
protected abstract Function<ScheduledEvent, Object> delegate();
public Object execute() {
Object rs = null;
doing.set(true);
try {
rs = delegate().apply(event);
} catch (Throwable t) {
logger.log(Level.SEVERE, "ScheduledTask[" + name() + "] schedule error", t);
} finally {
doing.set(false);
}
return rs;
}
public void stop() {
if (future != null) {
future.cancel(true);
@@ -415,16 +451,14 @@ public class ScheduleManagerService implements ScheduledManager, Service {
this.timeUnit = timeUnit;
}
@Override
protected Function<ScheduledEvent, Object> delegate() {
return delegate;
}
@Override
public void run() {
doing.set(true);
try {
delegate.apply(event);
} catch (Throwable t) {
logger.log(Level.SEVERE, "schedule task error", t);
} finally {
doing.set(false);
}
super.execute();
if (ref.get() == null) {
super.stop();
}
@@ -461,16 +495,14 @@ public class ScheduleManagerService implements ScheduledManager, Service {
this.zoneId = zoneId;
}
@Override
protected Function<ScheduledEvent, Object> delegate() {
return delegate;
}
@Override
public void run() {
doing.set(true);
try {
delegate.apply(event);
} catch (Throwable t) {
logger.log(Level.SEVERE, "schedule task error", t);
} finally {
doing.set(false);
}
super.execute();
schedule();
}

View File

@@ -325,7 +325,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
for (int i = 0; i < keyVals.length; i += 2) {
String key = keyVals[i].toString();
Object val = keyVals[i + 1];
set0(key.toString(), 0, null, null, val);
set0(key, 0, null, null, val);
}
}

View File

@@ -59,7 +59,7 @@ public class CacheScoredValue implements Serializable, Comparable<CacheScoredVal
@Override
public int compareTo(CacheScoredValue o) {
return ((Comparable) this.score).compareTo((Number) o.getScore());
return ((Comparable) this.score).compareTo(o.getScore());
}
@Override

View File

@@ -113,7 +113,7 @@ public interface CacheSource extends Resourcable {
}
default <T> CompletableFuture<Integer> publishAsync(String topic, Convert convert, Type messageType, T message) {
if (message instanceof byte[]) {
if (message == null || message instanceof byte[]) {
return publishAsync(topic, (byte[]) message);
}
if (messageType == String.class && (convert == null || convert instanceof TextConvert)) {
@@ -1337,7 +1337,7 @@ public interface CacheSource extends Resourcable {
}
default CompletableFuture<Long> getSetLongAsync(String key, long value, long defValue) {
return getSetAsync(key, Long.class, value).thenApply(v -> v == null ? defValue : (Long) v);
return getSetAsync(key, Long.class, value).thenApply(v -> v == null ? defValue : v);
}
default CompletableFuture<Long> getSetLongAsync(String key, long value) {