diff --git a/src/main/java/net/tccn/timer/queue/TimerQueue.java b/src/main/java/net/tccn/timer/queue/TimerQueue.java index cc189fd..1559a52 100644 --- a/src/main/java/net/tccn/timer/queue/TimerQueue.java +++ b/src/main/java/net/tccn/timer/queue/TimerQueue.java @@ -2,17 +2,18 @@ package net.tccn.timer.queue; import net.tccn.timer.task.Task; -import java.util.HashSet; import java.util.LinkedList; -import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; /** * Created by liangxianyou at 2018/7/23 14:07. */ public class TimerQueue { - Object lock = new Object(); - LinkedList queue = new LinkedList(); - Set names = new HashSet<>(); + private ReentrantLock lock = new ReentrantLock(); + private Condition isEmpty = lock.newCondition(); + private LinkedList queue = new LinkedList(); /** * 新加调度任务 @@ -20,7 +21,8 @@ public class TimerQueue { * @param task */ public void push(Task task) { - synchronized (lock) { + try { + lock.lock(); remove(task.getName()); int inx = queue.size();//目标坐标 while (inx > 0 && queue.get(inx).theTime() > task.theTime()) { @@ -28,10 +30,9 @@ public class TimerQueue { } queue.add(inx, task); - - //size++; - names.add(task.getName()); - lock.notify(); + isEmpty.signal(); + } finally { + lock.unlock(); } } @@ -42,9 +43,10 @@ public class TimerQueue { * @throws InterruptedException */ public Task take() throws InterruptedException { - synchronized (lock) { + try { + lock.lock(); while (queue.size() == 0) { - lock.wait(10);//循环避免非put线程唤醒空异常 + isEmpty.await(); } long currentTime = System.currentTimeMillis(); @@ -53,9 +55,11 @@ public class TimerQueue { if (currentTime >= nextTime) { return queue.removeFirst(); } else { - lock.wait(nextTime - currentTime); + isEmpty.await(nextTime - currentTime, TimeUnit.MILLISECONDS); return take(); } + } finally { + lock.unlock(); } } @@ -80,11 +84,8 @@ public class TimerQueue { } private Task get(String name, boolean remove) { - synchronized (lock) { - if (!names.contains(name)) { - return null; - } - + try { + lock.lock(); Task take = null; for (int i = 0; i < queue.size(); i++) { if (name.equals(queue.get(i).getName())) { @@ -93,11 +94,12 @@ public class TimerQueue { } if (remove && take != null) { queue.remove(take); - names.remove(take.getName()); } - lock.notify(); + isEmpty.signal(); return take; + } finally { + lock.unlock(); } } } diff --git a/src/test/java/net/tccn/timer/TimerTest.java b/src/test/java/net/tccn/timer/TimerTest.java index 7b3bb98..fd4951b 100644 --- a/src/test/java/net/tccn/timer/TimerTest.java +++ b/src/test/java/net/tccn/timer/TimerTest.java @@ -4,8 +4,15 @@ import net.tccn.timer.scheduled.Scheduled; import net.tccn.timer.scheduled.ScheduledCycle; import net.tccn.timer.scheduled.ScheduledExpres; import net.tccn.timer.task.Task; +import org.junit.Test; import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; /** * t2 测试定时器加入任务的调度管理 @@ -212,5 +219,99 @@ public class TimerTest { } } + final ReentrantLock lock = new ReentrantLock(); + List list = new ArrayList<>(); + Condition isEmpty = lock.newCondition(); + //Condition conditionB = lock.newCondition(); + + //@Test + public void lockTest1() { + + new Thread(() -> { + try { + /*while (!lock.tryLock(1, TimeUnit.SECONDS)) { + conditionA.await(); + }*/ + lock.lock(); + + System.out.println("lock a"); + Thread.sleep(10_000); + System.out.println("lock b"); + + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + lock.unlock(); + //conditionB.signal(); + } + }).start(); + + + new Thread(() -> { + try { + /*while (!lock.tryLock(1, TimeUnit.SECONDS)) { + System.out.println(1); + conditionB.await(); + }*/ + lock.lock();//dcf3w2 + + System.out.println("lock A"); + Thread.sleep(10_000); + System.out.println("lock B"); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + lock.unlock(); + } + }).start(); + + try { + Thread.sleep(22_000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + + } + + //@Test + public void lockTest2() { + new Thread(() -> { + try { + for (int i = 0; i < 10; i++) { + lock.lock(); + list.add(i); + isEmpty.signal(); + lock.unlock(); + Thread.sleep(1000); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + }).start(); + + new Thread(() ->{ + while (true) { + lock.lock(); + if (list.isEmpty()) { + try { + isEmpty.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + System.out.println(list.remove(0)); + lock.unlock(); + } + }).start(); + + try { + Thread.sleep(20_000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + } }