1、支持调度过程中,通过条件控制任务结束

2、优化代码逻辑
This commit is contained in:
lxyer 2018-12-09 22:08:09 +08:00
parent 54cff118c3
commit c3e2a47b7e
9 changed files with 170 additions and 77 deletions

View File

@ -1,13 +1,18 @@
package com.lxyer.timer;
import com.lxyer.timer.queue.TimerQueue;
import com.lxyer.timer.task.Task;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* @author: liangxianyou
*/
public class TimerExecutor {
private Logger logger = Logger.getLogger(this.getClass().getSimpleName());
private TimerQueue queue = new TimerQueue();
private ExecutorService executor;
@ -19,14 +24,15 @@ public class TimerExecutor {
public void add(Task ... task){
for (Task t : task) {
t.setTimerExecutor(this);
queue.put(t);
queue.push(t);
logger.log(Level.INFO, "add new task : " + t.getName());
}
}
private void add(Task task, boolean upTime){
protected void add(Task task, boolean upTime){
task.setTimerExecutor(this);
if (upTime) task.nextTime();
queue.put(task);
queue.push(task);
}
public Task remove(String name){
@ -50,7 +56,7 @@ public class TimerExecutor {
//执行调度
executor.execute(take);
add(take, true);
//add(take, true); //继续添加任务到 队列
}catch (Exception e){
e.printStackTrace();
}

View File

@ -1,26 +1,34 @@
package com.lxyer.timer;
import com.lxyer.timer.scheduled.Scheduled;
import com.lxyer.timer.task.Job;
import com.lxyer.timer.task.Task;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Created by liangxianyou at 2018/7/23 14:33.
*/
public abstract class AbstractTask implements Task {
public class TimerTask implements Task {
private Logger logger = Logger.getLogger(this.getClass().getSimpleName());
private long startTime = System.currentTimeMillis();
protected String name;
private long theTime;
private Scheduled scheduled;
private boolean isComplete;
private long startTime = System.currentTimeMillis();
private TimerExecutor timerExecutor;
private Job job;
public AbstractTask(String name, Scheduled scheduled) {
this.name = name;
this.scheduled = scheduled;
this.theTime = scheduled.theTime().toInstant(ZoneOffset.of("+8")).toEpochMilli();
public static Task by(String name, Scheduled scheduled, Job job) {
TimerTask task = new TimerTask();
task.name = name;
task.scheduled = scheduled;
task.job = job;
return task;
}
@Override
@ -71,5 +79,25 @@ public abstract class AbstractTask implements Task {
public long startTime() {
return startTime;
}
@Override
public void run() {
//没有完成任务继续执行返回true,表示完成
if (!isComplete) {
long start = System.currentTimeMillis();
StringBuilder buf = new StringBuilder();
buf.append("task [" + getName() + "] : ").append("not complete -> ");
long end;
if (!(isComplete = job.execute())) {
end = System.currentTimeMillis();
timerExecutor.add(this, true);
} else {
end = System.currentTimeMillis();
}
logger.log(Level.INFO, buf.append(isComplete ? "had complete" : "not complete;").append("time: ").append(end - start).append(" ms").toString());
}
}
}

View File

@ -1,4 +1,6 @@
package com.lxyer.timer;
package com.lxyer.timer.queue;
import com.lxyer.timer.task.Task;
import java.util.Arrays;
import java.util.HashSet;
@ -7,14 +9,17 @@ import java.util.Set;
/**
* Created by liangxianyou at 2018/7/23 14:07.
*/
@SuppressWarnings("Duplicates")
class TimerQueue{
public class TimerQueue{
Object lock = new Object();
Task[] queue = new Task[128];
Set<String> names = new HashSet<>();
int size=0;
void put(Task task) {
/**
* 新加调度任务
* @param task
*/
public void push(Task task) {
remove(task.getName());
synchronized (lock){
int inx = size;//目标坐标
@ -36,7 +41,12 @@ class TimerQueue{
}
}
Task take() throws InterruptedException {
/**
* 调度等待执行的任务
* @return
* @throws InterruptedException
*/
public Task take() throws InterruptedException {
synchronized (lock){
while (size == 0) lock.wait(10);//循环避免非put线程唤醒空异常
@ -58,11 +68,21 @@ class TimerQueue{
}
}
Task remove(String name){
/**
* 删除指定名称的任务
* @param name
* @return
*/
public Task remove(String name){
return get(name, true);
}
Task get(String name){
/**
* 返回指定名称的任务
* @param name
* @return
*/
public Task get(String name){
return get(name, false);
}

View File

@ -13,27 +13,46 @@ public class ScheduledCycle implements Scheduled {
private long period;
private TemporalUnit unit = ChronoUnit.MILLIS;
/**
* 构造方法仅限类内部使用为降低使用成本统一使用 of 的静态方法构建对象
* @param period
*/
@Deprecated
public ScheduledCycle(long period) {
this.theTime = LocalDateTime.now();
this.theTime = LocalDateTime.now().plus(period, ChronoUnit.MILLIS);
this.period = period;
}
@Deprecated
public ScheduledCycle(long period,TemporalUnit unit) {
this.theTime = LocalDateTime.now();
this.theTime = LocalDateTime.now().plus(period, unit);
this.period = period;
this.unit = unit;
}
@Deprecated
public ScheduledCycle(LocalDateTime startTime, long period) {
this.theTime = startTime;
this.period = period;
}
@Deprecated
public ScheduledCycle(LocalDateTime startTime, long period, TemporalUnit unit) {
this.theTime = startTime;
this.period = period;
this.unit = unit;
}
public static Scheduled of(long period) {
return new ScheduledCycle(period);
}
public static Scheduled of(long period,TemporalUnit unit) {
return new ScheduledCycle(period, unit);
}
public static Scheduled of(LocalDateTime startTime, long period) {
return new ScheduledCycle(startTime, period);
}
public static Scheduled of(LocalDateTime startTime, long period, TemporalUnit unit) {
return new ScheduledCycle(startTime, period, unit);
}
@Override
public LocalDateTime nextTime() {
return theTime = theTime.plus(period, unit);

View File

@ -26,19 +26,26 @@ public class ScheduledExpres implements Scheduled{
private String[] cfgArr;
private LocalDateTime theTime;
private int _y,_M,_d,_H,_m;
public ScheduledExpres(String cfg){
@Deprecated
private ScheduledExpres(String cfg){
this.cfg = cfg;
this.theTime = LocalDateTime.now();
initTheTime();
}
public ScheduledExpres(final LocalDateTime startTime, String cfg){
@Deprecated
private ScheduledExpres(final LocalDateTime startTime, String cfg){
LocalDateTime now = LocalDateTime.now();
this.theTime = now.isAfter(startTime)? now : startTime;
this.cfg = cfg;
initTheTime();
}
public static Scheduled of(String cfg) {
return new ScheduledExpres(cfg);
}
public static Scheduled of(final LocalDateTime startTime, String cfg) {
return new ScheduledExpres(startTime, cfg);
}
//寻找初始合法时间
public void initTheTime() {

View File

@ -0,0 +1,19 @@
package com.lxyer.timer.task;
/**
* @author: liangxianyou at 2018/12/8 17:24.
*/
@FunctionalInterface
public interface Job {
/**
* 任务执行的内容
* @return true:完成完成任务false未完成任务
*/
boolean execute();
/*default Job then(Job job) {
return job;
}*/
}

View File

@ -1,11 +1,12 @@
package com.lxyer.timer;
package com.lxyer.timer.task;
import com.lxyer.timer.TimerExecutor;
import com.lxyer.timer.scheduled.Scheduled;
/**
* @author: liangxianyou at 2018/8/5 19:32.
*/
public interface Task extends Runnable{
public interface Task extends Runnable {
/**
* 得到任务名称
@ -34,11 +35,6 @@ public interface Task extends Runnable{
*/
long theTime();
/**
* 执行任务
*/
void run();
/**
* 是否完成
* @return

View File

@ -1,30 +0,0 @@
package com.lxyer.timer;
import com.lxyer.timer.scheduled.Scheduled;
import java.text.SimpleDateFormat;
/**
* @author: liangxianyou
* @createtime: 2018/8/5 20:39.
*/
public class TaskImpl extends AbstractTask {
public TaskImpl(String name, Scheduled scheduled) {
super(name, scheduled);
}
@Override
public void run() {
ThreadLocal<SimpleDateFormat> local = new ThreadLocal<>();
SimpleDateFormat sdf = local.get();
if (sdf == null){
sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
}
System.out.printf("执行任务:%s now:%s, %n", name, sdf.format(System.currentTimeMillis()));
}
@Override
public long startTime() {
return 0;
}
}

View File

@ -2,9 +2,9 @@ package com.lxyer.timer;
import com.lxyer.timer.scheduled.ScheduledCycle;
import com.lxyer.timer.scheduled.ScheduledExpres;
import com.lxyer.timer.task.Task;
import org.junit.Test;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
/**
@ -23,11 +23,15 @@ public class TimerTest {
public void t2() throws InterruptedException {
TimerExecutor timerExecutor = new TimerExecutor(1);
//Task t1 = new TaskImpl("a1", new ScheduledExpres("1-40 * * * *"));//1-40定时每分钟执行
TaskImpl t2 = new TaskImpl("a2", new ScheduledCycle(5000 * 1));
//TaskImpl t2 = new TaskImpl("a2", ScheduledCycle.of(5000 * 1));
Task task = TimerTask.by("A2", ScheduledCycle.of(1000 * 1), () -> {
System.out.println("xxxx");
return false;// false 继续执行 true结束任务
});
//timerExecutor.add(t2);
timerExecutor.add(task);
//60s后修改a1 每2s执行一次
//Thread.sleep(1000);
@ -145,14 +149,15 @@ public class TimerTest {
08 18 * 7,8 4
"task1", "1 22-23 * * 7"
*/
Task task = new TaskImpl("task1", new ScheduledExpres("1 22-23 * * 7")){
@Override
public void run() {
System.out.println("----");
System.out.println(new SimpleDateFormat("0: yyyy-MM-dd HH:mm:ss").format(theTime()));
System.out.println(new SimpleDateFormat("1: yyyy-MM-dd HH:mm:ss").format(nextTime()));
System.out.println(new SimpleDateFormat("2: yyyy-MM-dd HH:mm:ss").format(nextTime()));
/*System.out.println(new SimpleDateFormat("3: yyyy-MM-dd HH:mm:ss").format(nextTime()));
Task task = TimerTask.by("task1", ScheduledExpres.of("1 22-23 * * 7"), () -> {
//System.out.println("");
System.out.println("----");
//System.out.println(new SimpleDateFormat("0: yyyy-MM-dd HH:mm:ss").format(theTime()));
//System.out.println(new SimpleDateFormat("1: yyyy-MM-dd HH:mm:ss").format(nextTime()));
/*System.out.println(new SimpleDateFormat("2: yyyy-MM-dd HH:mm:ss").format(nextTime()));
System.out.println(new SimpleDateFormat("3: yyyy-MM-dd HH:mm:ss").format(nextTime()));
System.out.println(new SimpleDateFormat("4: yyyy-MM-dd HH:mm:ss").format(nextTime()));
System.out.println(new SimpleDateFormat("5: yyyy-MM-dd HH:mm:ss").format(nextTime()));
System.out.println(new SimpleDateFormat("6: yyyy-MM-dd HH:mm:ss").format(nextTime()));
@ -163,11 +168,34 @@ public class TimerTest {
System.out.println(new SimpleDateFormat("11: yyyy-MM-dd HH:mm:ss").format(nextTime()));
System.out.println(new SimpleDateFormat("12: yyyy-MM-dd HH:mm:ss").format(nextTime()));
System.out.println(new SimpleDateFormat("13: yyyy-MM-dd HH:mm:ss").format(nextTime()));*/
}
};
return true;
});
task.run();
task.setScheduled(new ScheduledCycle(1000 * 5));//定时每秒执行
task.setScheduled(ScheduledCycle.of(1000 * 5));//定时每秒执行
task.run();
}
@Test
public void t7() {
TimerExecutor executor = new TimerExecutor(1);
executor.add(TimerTask.by("A1", ScheduledCycle.of(1000 * 5), () -> {
try {
Thread.sleep(6);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task do..");
return true;
}));
try {
Thread.sleep(1000 * 10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}