PriorityBlockingQueue and DelayQueue
PriorityBlockingQueue #
PriorityBlockingQueue
就是一个基础的可阻塞的
优先级队列,当队列为空时,从队列中获取元素时被阻塞。其余特性和优先级队列是一致的。
下例展示了如何构建一个可以放入优先级队列的任务:
1public class PrioritizedTask implements Runnable, Comparable<PrioritizedTask> {
2 protected static List<PrioritizedTask> sequence = new ArrayList<>();
3 private Random rand = new Random(47);
4 private static int counter = 0;
5 private final int id = counter++;
6 private final int priority;
7
8
9 public PrioritizedTask(int priority) {
10 this.priority = priority;
11 sequence.add(this);
12 }
13
14 @Override
15 public int compareTo(PrioritizedTask arg) {
16 return priority < arg.priority ? 1 :
17 (priority > arg.priority ? -1 : 0);
18 }
19
20 @Override
21 public void run() {
22 try {
23 TimeUnit.MILLISECONDS.sleep(rand.nextInt(250));
24 } catch (InterruptedException e) {
25 // Acceptable way to exit
26 }
27 System.out.println(this);
28 }
29
30 @Override
31 public String toString() {
32 return String.format("[%1$-3d]", priority) +
33 " Task " + id;
34 }
35
36 public String summary() {
37 return "(" + id + ":" + priority + ")";
38 }
39
40 public static class EndSentinel extends PrioritizedTask {
41 private ExecutorService exec;
42
43 public EndSentinel(ExecutorService e) {
44 super(-1); // Lowest priority in this program
45 exec = e;
46 }
47
48 @Override
49 public void run() {
50 int count = 0;
51 for (PrioritizedTask pt : sequence) {
52 System.out.print(pt.summary());
53 if (++count % 5 == 0)
54 System.out.println();
55 }
56 System.out.println();
57 System.out.println(this + " Calling shutdownNow()");
58 exec.shutdownNow();
59 }
60 }
61}
PrioritizedTask
实现了Runnable和Comparable接口,有一个int型priority
域,用来表示任务的优先级,在compareTo
方法中的逻辑表示,优先级高的将会优先出队。其还有一个静态域,用来记录所有任务被置入队列的顺序。PrioritizedTask
有一个静态内部类,也是其子类,它被称作“结束哨兵”,它的优先级为-1,代表它会最后出队,当执行这个任务时,代表任务所有的任务执行完毕,可以关闭线程池资源。
在接下来的示例中,将模拟生产者和消费者,执行PriorityBlockingQueue中的任务,我们可以从程序的输出观察优先级队列的出队(被执行)的顺序:
1public class PriorityBlockingQueueDemo {
2
3 static class PrioritizedTaskProducer implements Runnable {
4 private Random rand = new Random(47);
5 private Queue<Runnable> queue;
6 private ExecutorService exec;
7
8 public PrioritizedTaskProducer(
9 Queue<Runnable> q, ExecutorService e) {
10 queue = q;
11 exec = e; // Used for EndSentinel
12 }
13
14 @Override
15 public void run() {
16 // Unbounded queue; never blocks.
17 // Fill it up fast with random priorities:
18 for (int i = 0; i < 20; i++) {
19 queue.add(new PrioritizedTask(rand.nextInt(10)));
20 Thread.yield();
21 }
22 // Trickle in highest-priority jobs:
23 try {
24 for (int i = 0; i < 10; i++) {
25 TimeUnit.MILLISECONDS.sleep(250);
26 queue.add(new PrioritizedTask(10));
27 }
28 // Add jobs, lowest priority first:
29 for (int i = 0; i < 10; i++)
30 queue.add(new PrioritizedTask(i));
31 // A sentinel to stop all the tasks:
32 queue.add(new PrioritizedTask.EndSentinel(exec));
33 } catch (InterruptedException e) {
34 // Acceptable way to exit
35 }
36 System.out.println("Finished PrioritizedTaskProducer");
37 }
38 }
39
40 static class PrioritizedTaskConsumer implements Runnable {
41 private PriorityBlockingQueue<Runnable> q;
42
43 public PrioritizedTaskConsumer(
44 PriorityBlockingQueue<Runnable> q) {
45 this.q = q;
46 }
47
48 @Override
49 public void run() {
50 try {
51 while (!Thread.interrupted())
52 // Use current thread to run the task:
53 q.take().run();
54 } catch (InterruptedException e) {
55 // Acceptable way to exit
56 }
57 System.out.println("Finished PrioritizedTaskConsumer");
58 }
59 }
60
61 public static void main(String[] args) throws Exception {
62 ExecutorService exec = Executors.newCachedThreadPool();
63 PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>();
64 exec.execute(new PrioritizedTaskProducer(queue, exec));
65 exec.execute(new PrioritizedTaskConsumer(queue));
66 }
67}
68/* output(partial)
69[9 ] Task 5
70[9 ] Task 13
71[9 ] Task 14
72[8 ] Task 10
73... other 15 tasks
74[0 ] Task 18
75[10 ] Task 20
76... other 7 tasks
77[10 ] Task 28
78Finished PrioritizedTaskProducer
79[10 ] Task 29
80... other 9 tasks
81[0 ] Task 30
82(0:8)(1:5)(2:3)(3:1)(4:1)
83(5:9)(6:8)(7:0)(8:2)(9:7)
84(10:8)(11:8)(12:1)(13:9)(14:9)
85(15:8)(16:8)(17:1)(18:0)(19:8)
86(20:10)(21:10)(22:10)(23:10)(24:10)
87(25:10)(26:10)(27:10)(28:10)(29:10)
88(30:0)(31:1)(32:2)(33:3)(34:4)
89(35:5)(36:6)(37:7)(38:8)(39:9)
90(40:-1)
91[-1 ] Task 40 Calling shutdownNow()
92Finished PrioritizedTaskConsumer
93*///:~
PrioritizedTaskProducer
任务负责向队列添加40个任务,前20个任务不间断地添加进队,且随机0-10的优先级;后10个任务是间隔固定时间添加优先级为10的任务,最后10个任务不间断添加优先级递增到9的任务,最后添加"结束哨兵"任务,其将打印所有任务添加到队列的顺序。PrioritizedTaskConsumer
则是不间断的尝试从队列中取出任务执行。从输出可以看到,队列中如果有优先级高的任务,它一定是先出队的。
这个例子不需要任何显式同步,因为阻塞队列提供了所需的同步。
DelayQueue #
DelayQueue
是一个无界的阻塞队列,利用PriorityQueue
实现,用于存放实现Delay
接口1的对象,队列中的对象只能在其到期之后才能被取出。同时其还是一个有序队列,即队头的元素将最先到期,若没有任何元素到期,就不会有队头元素,poll()
方法将返回null
,因此DelayQueue不接受null
作为元素。
实际上,在了解了ScheduledThreadPoolExecutor.ScheduledFutureTask
的
出队规则之后,DelayQueue
的出队的实现也就不言自明了——当leader
被设置时,表明有任务即将出队,其他任务进入等待,该任务出队之后重置leader
:
1// Delayqueue.take
2public E take() throws InterruptedException {
3 final ReentrantLock lock = this.lock;
4 lock.lockInterruptibly();
5 try {
6 for (;;) {
7 E first = q.peek();
8 if (first == null)
9 available.await();
10 else {
11 long delay = first.getDelay(NANOSECONDS);
12 if (delay <= 0)
13 return q.poll();
14 first = null; // don't retain ref while waiting
15 if (leader != null)
16 available.await();
17 else {
18 Thread thisThread = Thread.currentThread();
19 leader = thisThread;
20 try {
21 available.awaitNanos(delay);
22 } finally {
23 if (leader == thisThread)
24 leader = null;
25 }
26 }
27 }
28 }
29 } finally {
30 if (leader == null && q.peek() != null)
31 available.signal();
32 lock.unlock();
33 }
34}
下例展示了如何构造一个可以放入DelayQueue
中的任务:
1public class DelayQueueDemo {
2
3 private static class DelayedTask implements Runnable, Delayed {
4 protected static List<DelayedTask> sequence = new ArrayList<>();
5 private static int counter = 0;
6 private final int id = counter++;
7 private final int delta;
8 /** 到期时间 */
9 private final long trigger;
10
11 public DelayedTask(int delayInMilliseconds) {
12 delta = delayInMilliseconds;
13 trigger = System.nanoTime() + NANOSECONDS.convert(delta, MILLISECONDS);
14 sequence.add(this);
15 }
16
17 @Override
18 public long getDelay(TimeUnit unit) {
19 return unit.convert(trigger - System.nanoTime(), NANOSECONDS);
20 }
21
22 @Override
23 public int compareTo(Delayed arg) {
24 DelayedTask that = (DelayedTask) arg;
25 if (trigger < that.trigger) return -1;
26 if (trigger > that.trigger) return 1;
27 return 0;
28 }
29
30 @Override
31 public void run() {
32 System.out.print(this + " ");
33 }
34
35 @Override
36 public String toString() {
37 return String.format("[%1$-4d]", delta) + " Task " + id;
38 }
39
40 public String summary() {
41 return "(" + id + ":" + delta + ")";
42 }
43
44 static class EndSentinel extends DelayedTask {
45 private ExecutorService exec;
46
47 public EndSentinel(int delay, ExecutorService e) {
48 super(delay);
49 exec = e;
50 }
51
52 @Override
53 public void run() {
54 System.out.println();
55 for (DelayedTask pt : sequence) {
56 System.out.print(pt.summary() + " ");
57 }
58 System.out.println();
59 System.out.println(this + " Calling shutdownNow()");
60 exec.shutdownNow();
61 }
62 }
63 }
64
65 static class DelayedTaskConsumer implements Runnable {
66 private DelayQueue<DelayedTask> q;
67
68 public DelayedTaskConsumer(DelayQueue<DelayedTask> q) {
69 this.q = q;
70 }
71
72 @Override
73 public void run() {
74 try {
75 while (!Thread.interrupted())
76 // Run task with the current thread
77 q.take().run();
78 } catch (InterruptedException e) {
79 // Acceptable way to exit
80 }
81 System.out.println("Finished DelayedTaskConsumer");
82 }
83 }
84
85 public static void main(String[] args) {
86 Random rand = new Random(47);
87 ExecutorService exec = Executors.newCachedThreadPool();
88 DelayQueue<DelayedTask> queue = new DelayQueue<>();
89 // Fill with tasks that have random delays:
90 for (int i = 0; i < 20; i++)
91 queue.put(new DelayedTask(rand.nextInt(5000)));
92 // Set the stopping point
93 queue.add(new DelayedTask.EndSentinel(5000, exec));
94 exec.execute(new DelayedTaskConsumer(queue));
95 }
96}
97/* output(sample)
98[128 ] Task 11 [200 ] Task 7 [429 ] Task 5 [520 ] Task 18 [555 ] Task 1 [961 ] Task 4 [998 ] Task 16 [1207] Task 9 [1693] Task 2 [1809] Task 14 [1861] Task 3 [2278] Task 15 [3288] Task 10 [3551] Task 12 [4258] Task 0 [4258] Task 19 [4522] Task 8 [4589] Task 13 [4861] Task 17 [4868] Task 6
99(0:4258) (1:555) (2:1693) (3:1861) (4:961) (5:429) (6:4868) (7:200) (8:4522) (9:1207) (10:3288) (11:128) (12:3551) (13:4589) (14:1809) (15:2278) (16:998) (17:4861) (18:520) (19:4258) (20:5000)
100[5000] Task 20 Calling shutdownNow()
101Finished DelayedTaskConsumer
102*///:~
从输出可以看到,任务入队的顺序和任务出队的顺序没有任何关系,任务是按照超时先后出队的。
Delay接口实际上继承了Comparable接口。 ↩︎