PriorityBlockingQueue and DelayQueue

PriorityBlockingQueue and DelayQueue

PriorityBlockingQueue #

PriorityBlockingQueue就是一个基础的可阻塞的 优先级队列,当队列为空时,从队列中获取元素时被阻塞。其余特性和优先级队列是一致的。

下例展示了如何构建一个可以放入优先级队列的任务:

 1public class PrioritizedTask implements Runnable, Comparable<PrioritizedTask> {
 2    protected static List<PrioritizedTask> sequence = new ArrayList<>();
 3    private Random rand = new Random(47);
 4    private static int counter = 0;
 5    private final int id = counter++;
 6    private final int priority;
 7
 8
 9    public PrioritizedTask(int priority) {
10        this.priority = priority;
11        sequence.add(this);
12    }
13
14    @Override
15    public int compareTo(PrioritizedTask arg) {
16        return priority < arg.priority ? 1 :
17            (priority > arg.priority ? -1 : 0);
18    }
19
20    @Override
21    public void run() {
22        try {
23            TimeUnit.MILLISECONDS.sleep(rand.nextInt(250));
24        } catch (InterruptedException e) {
25            // Acceptable way to exit
26        }
27        System.out.println(this);
28    }
29
30    @Override
31    public String toString() {
32        return String.format("[%1$-3d]", priority) +
33            " Task " + id;
34    }
35
36    public String summary() {
37        return "(" + id + ":" + priority + ")";
38    }
39
40    public static class EndSentinel extends PrioritizedTask {
41        private ExecutorService exec;
42
43        public EndSentinel(ExecutorService e) {
44            super(-1); // Lowest priority in this program
45            exec = e;
46        }
47
48        @Override
49        public void run() {
50            int count = 0;
51            for (PrioritizedTask pt : sequence) {
52                System.out.print(pt.summary());
53                if (++count % 5 == 0)
54                    System.out.println();
55            }
56            System.out.println();
57            System.out.println(this + " Calling shutdownNow()");
58            exec.shutdownNow();
59        }
60    }
61}

PrioritizedTask实现了Runnable和Comparable接口,有一个int型priority域,用来表示任务的优先级,在compareTo方法中的逻辑表示,优先级高的将会优先出队。其还有一个静态域,用来记录所有任务被置入队列的顺序。PrioritizedTask有一个静态内部类,也是其子类,它被称作“结束哨兵”,它的优先级为-1,代表它会最后出队,当执行这个任务时,代表任务所有的任务执行完毕,可以关闭线程池资源。

在接下来的示例中,将模拟生产者和消费者,执行PriorityBlockingQueue中的任务,我们可以从程序的输出观察优先级队列的出队(被执行)的顺序:

 1public class PriorityBlockingQueueDemo {
 2
 3    static class PrioritizedTaskProducer implements Runnable {
 4        private Random rand = new Random(47);
 5        private Queue<Runnable> queue;
 6        private ExecutorService exec;
 7
 8        public PrioritizedTaskProducer(
 9            Queue<Runnable> q, ExecutorService e) {
10            queue = q;
11            exec = e; // Used for EndSentinel
12        }
13
14        @Override
15        public void run() {
16            // Unbounded queue; never blocks.
17            // Fill it up fast with random priorities:
18            for (int i = 0; i < 20; i++) {
19                queue.add(new PrioritizedTask(rand.nextInt(10)));
20                Thread.yield();
21            }
22            // Trickle in highest-priority jobs:
23            try {
24                for (int i = 0; i < 10; i++) {
25                    TimeUnit.MILLISECONDS.sleep(250);
26                    queue.add(new PrioritizedTask(10));
27                }
28                // Add jobs, lowest priority first:
29                for (int i = 0; i < 10; i++)
30                    queue.add(new PrioritizedTask(i));
31                // A sentinel to stop all the tasks:
32                queue.add(new PrioritizedTask.EndSentinel(exec));
33            } catch (InterruptedException e) {
34                // Acceptable way to exit
35            }
36            System.out.println("Finished PrioritizedTaskProducer");
37        }
38    }
39
40    static class PrioritizedTaskConsumer implements Runnable {
41        private PriorityBlockingQueue<Runnable> q;
42
43        public PrioritizedTaskConsumer(
44            PriorityBlockingQueue<Runnable> q) {
45            this.q = q;
46        }
47
48        @Override
49        public void run() {
50            try {
51                while (!Thread.interrupted())
52                    // Use current thread to run the task:
53                    q.take().run();
54            } catch (InterruptedException e) {
55                // Acceptable way to exit
56            }
57            System.out.println("Finished PrioritizedTaskConsumer");
58        }
59    }
60
61    public static void main(String[] args) throws Exception {
62        ExecutorService exec = Executors.newCachedThreadPool();
63        PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>();
64        exec.execute(new PrioritizedTaskProducer(queue, exec));
65        exec.execute(new PrioritizedTaskConsumer(queue));
66    }
67}
68/* output(partial)
69[9  ] Task 5
70[9  ] Task 13
71[9  ] Task 14
72[8  ] Task 10
73... other 15 tasks
74[0  ] Task 18
75[10 ] Task 20
76... other 7 tasks
77[10 ] Task 28
78Finished PrioritizedTaskProducer
79[10 ] Task 29
80... other 9 tasks
81[0  ] Task 30
82(0:8)(1:5)(2:3)(3:1)(4:1)
83(5:9)(6:8)(7:0)(8:2)(9:7)
84(10:8)(11:8)(12:1)(13:9)(14:9)
85(15:8)(16:8)(17:1)(18:0)(19:8)
86(20:10)(21:10)(22:10)(23:10)(24:10)
87(25:10)(26:10)(27:10)(28:10)(29:10)
88(30:0)(31:1)(32:2)(33:3)(34:4)
89(35:5)(36:6)(37:7)(38:8)(39:9)
90(40:-1)
91[-1 ] Task 40 Calling shutdownNow()
92Finished PrioritizedTaskConsumer
93*///:~

PrioritizedTaskProducer任务负责向队列添加40个任务,前20个任务不间断地添加进队,且随机0-10的优先级;后10个任务是间隔固定时间添加优先级为10的任务,最后10个任务不间断添加优先级递增到9的任务,最后添加"结束哨兵"任务,其将打印所有任务添加到队列的顺序。PrioritizedTaskConsumer则是不间断的尝试从队列中取出任务执行。从输出可以看到,队列中如果有优先级高的任务,它一定是先出队的。

这个例子不需要任何显式同步,因为阻塞队列提供了所需的同步。

DelayQueue #

DelayQueue是一个无界的阻塞队列,利用PriorityQueue实现,用于存放实现Delay接口1的对象,队列中的对象只能在其到期之后才能被取出。同时其还是一个有序队列,即队头的元素将最先到期,若没有任何元素到期,就不会有队头元素,poll()方法将返回null,因此DelayQueue不接受null作为元素。

实际上,在了解了ScheduledThreadPoolExecutor.ScheduledFutureTask出队规则之后,DelayQueue的出队的实现也就不言自明了——当leader被设置时,表明有任务即将出队,其他任务进入等待,该任务出队之后重置leader

 1// Delayqueue.take
 2public E take() throws InterruptedException {
 3    final ReentrantLock lock = this.lock;
 4    lock.lockInterruptibly();
 5    try {
 6        for (;;) {
 7            E first = q.peek();
 8            if (first == null)
 9                available.await();
10            else {
11                long delay = first.getDelay(NANOSECONDS);
12                if (delay <= 0)
13                    return q.poll();
14                first = null; // don't retain ref while waiting
15                if (leader != null)
16                    available.await();
17                else {
18                    Thread thisThread = Thread.currentThread();
19                    leader = thisThread;
20                    try {
21                        available.awaitNanos(delay);
22                    } finally {
23                        if (leader == thisThread)
24                            leader = null;
25                    }
26                }
27            }
28        }
29    } finally {
30        if (leader == null && q.peek() != null)
31            available.signal();
32        lock.unlock();
33    }
34}

下例展示了如何构造一个可以放入DelayQueue中的任务:

  1public class DelayQueueDemo {
  2
  3    private static class DelayedTask implements Runnable, Delayed {
  4        protected static List<DelayedTask> sequence = new ArrayList<>();
  5        private static int counter = 0;
  6        private final int id = counter++;
  7        private final int delta;
  8        /** 到期时间 */
  9        private final long trigger;
 10
 11        public DelayedTask(int delayInMilliseconds) {
 12            delta = delayInMilliseconds;
 13            trigger = System.nanoTime() + NANOSECONDS.convert(delta, MILLISECONDS);
 14            sequence.add(this);
 15        }
 16
 17        @Override
 18        public long getDelay(TimeUnit unit) {
 19            return unit.convert(trigger - System.nanoTime(), NANOSECONDS);
 20        }
 21
 22        @Override
 23        public int compareTo(Delayed arg) {
 24            DelayedTask that = (DelayedTask) arg;
 25            if (trigger < that.trigger) return -1;
 26            if (trigger > that.trigger) return 1;
 27            return 0;
 28        }
 29
 30        @Override
 31        public void run() {
 32            System.out.print(this + " ");
 33        }
 34
 35        @Override
 36        public String toString() {
 37            return String.format("[%1$-4d]", delta) + " Task " + id;
 38        }
 39
 40        public String summary() {
 41            return "(" + id + ":" + delta + ")";
 42        }
 43
 44        static class EndSentinel extends DelayedTask {
 45            private ExecutorService exec;
 46
 47            public EndSentinel(int delay, ExecutorService e) {
 48                super(delay);
 49                exec = e;
 50            }
 51
 52            @Override
 53            public void run() {
 54                System.out.println();
 55                for (DelayedTask pt : sequence) {
 56                    System.out.print(pt.summary() + " ");
 57                }
 58                System.out.println();
 59                System.out.println(this + " Calling shutdownNow()");
 60                exec.shutdownNow();
 61            }
 62        }
 63    }
 64
 65    static class DelayedTaskConsumer implements Runnable {
 66        private DelayQueue<DelayedTask> q;
 67
 68        public DelayedTaskConsumer(DelayQueue<DelayedTask> q) {
 69            this.q = q;
 70        }
 71
 72        @Override
 73        public void run() {
 74            try {
 75                while (!Thread.interrupted())
 76                    // Run task with the current thread
 77                    q.take().run();
 78            } catch (InterruptedException e) {
 79                // Acceptable way to exit
 80            }
 81            System.out.println("Finished DelayedTaskConsumer");
 82        }
 83    }
 84
 85    public static void main(String[] args) {
 86        Random rand = new Random(47);
 87        ExecutorService exec = Executors.newCachedThreadPool();
 88        DelayQueue<DelayedTask> queue = new DelayQueue<>();
 89        // Fill with tasks that have random delays:
 90        for (int i = 0; i < 20; i++)
 91            queue.put(new DelayedTask(rand.nextInt(5000)));
 92        // Set the stopping point
 93        queue.add(new DelayedTask.EndSentinel(5000, exec));
 94        exec.execute(new DelayedTaskConsumer(queue));
 95    }
 96}
 97/* output(sample)
 98[128 ] Task 11 [200 ] Task 7 [429 ] Task 5 [520 ] Task 18 [555 ] Task 1 [961 ] Task 4 [998 ] Task 16 [1207] Task 9 [1693] Task 2 [1809] Task 14 [1861] Task 3 [2278] Task 15 [3288] Task 10 [3551] Task 12 [4258] Task 0 [4258] Task 19 [4522] Task 8 [4589] Task 13 [4861] Task 17 [4868] Task 6
 99(0:4258) (1:555) (2:1693) (3:1861) (4:961) (5:429) (6:4868) (7:200) (8:4522) (9:1207) (10:3288) (11:128) (12:3551) (13:4589) (14:1809) (15:2278) (16:998) (17:4861) (18:520) (19:4258) (20:5000)
100[5000] Task 20 Calling shutdownNow()
101Finished DelayedTaskConsumer
102*///:~

从输出可以看到,任务入队的顺序和任务出队的顺序没有任何关系,任务是按照超时先后出队的。



  1. Delay接口实际上继承了Comparable接口。 ↩︎