修改:锁控制

This commit is contained in:
lxy
2020-09-02 01:50:52 +08:00
parent 84cd121ac3
commit f201a8fe64
2 changed files with 123 additions and 20 deletions

View File

@@ -2,17 +2,18 @@ package net.tccn.timer.queue;
import net.tccn.timer.task.Task; import net.tccn.timer.task.Task;
import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Set; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/** /**
* Created by liangxianyou at 2018/7/23 14:07. * Created by liangxianyou at 2018/7/23 14:07.
*/ */
public class TimerQueue { public class TimerQueue {
Object lock = new Object(); private ReentrantLock lock = new ReentrantLock();
LinkedList<Task> queue = new LinkedList(); private Condition isEmpty = lock.newCondition();
Set<String> names = new HashSet<>(); private LinkedList<Task> queue = new LinkedList();
/** /**
* 新加调度任务 * 新加调度任务
@@ -20,7 +21,8 @@ public class TimerQueue {
* @param task * @param task
*/ */
public void push(Task task) { public void push(Task task) {
synchronized (lock) { try {
lock.lock();
remove(task.getName()); remove(task.getName());
int inx = queue.size();//目标坐标 int inx = queue.size();//目标坐标
while (inx > 0 && queue.get(inx).theTime() > task.theTime()) { while (inx > 0 && queue.get(inx).theTime() > task.theTime()) {
@@ -28,10 +30,9 @@ public class TimerQueue {
} }
queue.add(inx, task); queue.add(inx, task);
isEmpty.signal();
//size++; } finally {
names.add(task.getName()); lock.unlock();
lock.notify();
} }
} }
@@ -42,9 +43,10 @@ public class TimerQueue {
* @throws InterruptedException * @throws InterruptedException
*/ */
public Task take() throws InterruptedException { public Task take() throws InterruptedException {
synchronized (lock) { try {
lock.lock();
while (queue.size() == 0) { while (queue.size() == 0) {
lock.wait(10);//循环避免非put线程唤醒空异常 isEmpty.await();
} }
long currentTime = System.currentTimeMillis(); long currentTime = System.currentTimeMillis();
@@ -53,9 +55,11 @@ public class TimerQueue {
if (currentTime >= nextTime) { if (currentTime >= nextTime) {
return queue.removeFirst(); return queue.removeFirst();
} else { } else {
lock.wait(nextTime - currentTime); isEmpty.await(nextTime - currentTime, TimeUnit.MILLISECONDS);
return take(); return take();
} }
} finally {
lock.unlock();
} }
} }
@@ -80,11 +84,8 @@ public class TimerQueue {
} }
private Task get(String name, boolean remove) { private Task get(String name, boolean remove) {
synchronized (lock) { try {
if (!names.contains(name)) { lock.lock();
return null;
}
Task take = null; Task take = null;
for (int i = 0; i < queue.size(); i++) { for (int i = 0; i < queue.size(); i++) {
if (name.equals(queue.get(i).getName())) { if (name.equals(queue.get(i).getName())) {
@@ -93,11 +94,12 @@ public class TimerQueue {
} }
if (remove && take != null) { if (remove && take != null) {
queue.remove(take); queue.remove(take);
names.remove(take.getName());
} }
lock.notify(); isEmpty.signal();
return take; return take;
} finally {
lock.unlock();
} }
} }
} }

View File

@@ -4,8 +4,15 @@ import net.tccn.timer.scheduled.Scheduled;
import net.tccn.timer.scheduled.ScheduledCycle; import net.tccn.timer.scheduled.ScheduledCycle;
import net.tccn.timer.scheduled.ScheduledExpres; import net.tccn.timer.scheduled.ScheduledExpres;
import net.tccn.timer.task.Task; import net.tccn.timer.task.Task;
import org.junit.Test;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/** /**
* t2 测试定时器加入任务的调度管理 * t2 测试定时器加入任务的调度管理
@@ -212,5 +219,99 @@ public class TimerTest {
} }
} }
final ReentrantLock lock = new ReentrantLock();
List<Integer> list = new ArrayList<>();
Condition isEmpty = lock.newCondition();
//Condition conditionB = lock.newCondition();
//@Test
public void lockTest1() {
new Thread(() -> {
try {
/*while (!lock.tryLock(1, TimeUnit.SECONDS)) {
conditionA.await();
}*/
lock.lock();
System.out.println("lock a");
Thread.sleep(10_000);
System.out.println("lock b");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
//conditionB.signal();
}
}).start();
new Thread(() -> {
try {
/*while (!lock.tryLock(1, TimeUnit.SECONDS)) {
System.out.println(1);
conditionB.await();
}*/
lock.lock();//dcf3w2
System.out.println("lock A");
Thread.sleep(10_000);
System.out.println("lock B");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}).start();
try {
Thread.sleep(22_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//@Test
public void lockTest2() {
new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
lock.lock();
list.add(i);
isEmpty.signal();
lock.unlock();
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() ->{
while (true) {
lock.lock();
if (list.isEmpty()) {
try {
isEmpty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(list.remove(0));
lock.unlock();
}
}).start();
try {
Thread.sleep(20_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} }