diff --git a/src/main/java/org/redkale/scheduled/ScheduledManager.java b/src/main/java/org/redkale/scheduled/ScheduledManager.java index 91ec20416..956ace53f 100644 --- a/src/main/java/org/redkale/scheduled/ScheduledManager.java +++ b/src/main/java/org/redkale/scheduled/ScheduledManager.java @@ -3,6 +3,8 @@ */ package org.redkale.scheduled; +import java.util.List; + /** * 定时管理器 * @@ -29,6 +31,16 @@ public interface ScheduledManager { */ public int start(String scheduleName); + /** + * 执行所有宿主对象中指定的任务名 + * + * @see org.redkale.scheduled.Scheduled#name() + * @param scheduleName 定时任务名称 + * @param all 是执行所有同名任务还是只执行其中任意一个 + * @return 结果集合 + */ + public List execute(String scheduleName, boolean all); + /** * 关闭宿主对象中所有的定时任务方法 * diff --git a/src/main/java/org/redkale/scheduled/spi/ScheduleManagerService.java b/src/main/java/org/redkale/scheduled/spi/ScheduleManagerService.java index 19f949b2d..95edae04d 100644 --- a/src/main/java/org/redkale/scheduled/spi/ScheduleManagerService.java +++ b/src/main/java/org/redkale/scheduled/spi/ScheduleManagerService.java @@ -307,6 +307,27 @@ public class ScheduleManagerService implements ScheduledManager, Service { return c; } + @Override + public List execute(String scheduleName, boolean all) { + List rs = new ArrayList<>(); + lock.lock(); + try { + for (Map.Entry> item : refTaskMap.entrySet()) { + for (ScheduledTask task : item.getValue()) { + if (Objects.equals(task.name(), scheduleName)) { + rs.add(task.execute()); + if (!all) { + return rs; + } + } + } + } + } finally { + lock.unlock(); + } + return rs; + } + @Override public int stop(String scheduleName) { int c = 0; @@ -362,6 +383,21 @@ public class ScheduleManagerService implements ScheduledManager, Service { public abstract void start(); + protected abstract Function delegate(); + + public Object execute() { + Object rs = null; + doing.set(true); + try { + rs = delegate().apply(event); + } catch (Throwable t) { + logger.log(Level.SEVERE, "ScheduledTask[" + name() + "] schedule error", t); + } finally { + doing.set(false); + } + return rs; + } + public void stop() { if (future != null) { future.cancel(true); @@ -415,16 +451,14 @@ public class ScheduleManagerService implements ScheduledManager, Service { this.timeUnit = timeUnit; } + @Override + protected Function delegate() { + return delegate; + } + @Override public void run() { - doing.set(true); - try { - delegate.apply(event); - } catch (Throwable t) { - logger.log(Level.SEVERE, "schedule task error", t); - } finally { - doing.set(false); - } + super.execute(); if (ref.get() == null) { super.stop(); } @@ -461,16 +495,14 @@ public class ScheduleManagerService implements ScheduledManager, Service { this.zoneId = zoneId; } + @Override + protected Function delegate() { + return delegate; + } + @Override public void run() { - doing.set(true); - try { - delegate.apply(event); - } catch (Throwable t) { - logger.log(Level.SEVERE, "schedule task error", t); - } finally { - doing.set(false); - } + super.execute(); schedule(); } diff --git a/src/main/java/org/redkale/source/CacheMemorySource.java b/src/main/java/org/redkale/source/CacheMemorySource.java index 394042e8f..f296159e7 100644 --- a/src/main/java/org/redkale/source/CacheMemorySource.java +++ b/src/main/java/org/redkale/source/CacheMemorySource.java @@ -325,7 +325,7 @@ public final class CacheMemorySource extends AbstractCacheSource { for (int i = 0; i < keyVals.length; i += 2) { String key = keyVals[i].toString(); Object val = keyVals[i + 1]; - set0(key.toString(), 0, null, null, val); + set0(key, 0, null, null, val); } } diff --git a/src/main/java/org/redkale/source/CacheScoredValue.java b/src/main/java/org/redkale/source/CacheScoredValue.java index 2ce43432e..b720f1551 100644 --- a/src/main/java/org/redkale/source/CacheScoredValue.java +++ b/src/main/java/org/redkale/source/CacheScoredValue.java @@ -59,7 +59,7 @@ public class CacheScoredValue implements Serializable, Comparable CompletableFuture publishAsync(String topic, Convert convert, Type messageType, T message) { - if (message instanceof byte[]) { + if (message == null || message instanceof byte[]) { return publishAsync(topic, (byte[]) message); } if (messageType == String.class && (convert == null || convert instanceof TextConvert)) { @@ -1337,7 +1337,7 @@ public interface CacheSource extends Resourcable { } default CompletableFuture getSetLongAsync(String key, long value, long defValue) { - return getSetAsync(key, Long.class, value).thenApply(v -> v == null ? defValue : (Long) v); + return getSetAsync(key, Long.class, value).thenApply(v -> v == null ? defValue : v); } default CompletableFuture getSetLongAsync(String key, long value) {