CountDownLatch

CountDownLatch

CountDownLatch #

在讨论线程的基本概念时,我们说过join()方法可使当前线程等待调用join方法的线程执行完,可以实现简单的 无锁同步,使用CountDownLatch可以更加简单的实现这一目的。毕竟,join()方法的语义“加入一个线程”不是很容易就能让人理解。相较于join()方法,CountDownLatch的语义就明确多了。

在有些文档上,将CountDownLatch译为"倒计时门闩【shuān】",其维护一个计数器,这个计数器在CountDownLatch初始化之后便不能重置。在CountDownLatch上调用countDown()方法来将计数值减1,调用这个方法并不会引起阻塞。不过,在这个计数器为0之前,任何调用CountDownLatchawait()方法的任务都将阻塞。

CountDownLatch的典型用法是将一个任务分割为n个可以独立解决的部分,并创建一个计数器值为n(n为线程数量)的CountDownLatch,在每个任务完成时,调用countDown()方法将计数器减1,在等待所有任务完成的线程上调用await()方法,将任务阻塞,直到计数器为0之后再继续运行。

下面的代码演示了CountdownLatch的用法

 1public class CountDownLatchDemo {
 2
 3    private static class TaskPortion implements Runnable {
 4        private static int counter = 0;
 5        private final int id = counter++;
 6        private static Random rand = new Random(47);
 7        private final CountDownLatch latch;
 8
 9        TaskPortion(CountDownLatch latch) {
10            this.latch = latch;
11        }
12
13        @Override
14        public void run() {
15            try {
16                doWork();
17            } catch (InterruptedException ex) {
18                // Acceptable way to exit
19            } finally {
20                latch.countDown();
21            }
22        }
23
24        void doWork() throws InterruptedException {
25            TimeUnit.MILLISECONDS.sleep(rand.nextInt(2000));
26            System.out.println(this + "completed");
27        }
28
29        @Override
30        public String toString() {
31            return String.format("%1$-3d ", id);
32        }
33    }
34
35    /** Waits on the CountDownLatch: */
36    private static class WaitingTask implements Runnable {
37        private static int counter = 0;
38        private final int id = counter++;
39        private final CountDownLatch latch;
40
41        WaitingTask(CountDownLatch latch) {
42            this.latch = latch;
43        }
44
45        @Override
46        public void run() {
47            try {
48                latch.await();
49                System.out.println("Latch barrier passed for " + this);
50            } catch (InterruptedException ex) {
51                System.out.println(this + " interrupted");
52            }
53        }
54
55        @Override
56        public String toString() {
57            return String.format("WaitingTask %1$-3d ", id);
58        }
59    }
60
61    static final int SIZE = 10;
62
63    public static void main(String[] args) {
64        ExecutorService exec = Executors.newCachedThreadPool();
65        // 所有任务都必须使用同一个CountDownLatch对象
66        CountDownLatch latch = new CountDownLatch(SIZE);
67        exec.execute(new WaitingTask(latch));
68        for (int i = 0; i < SIZE; i++) {
69            exec.execute(new TaskPortion(latch));
70        }
71        System.out.println("Launched all tasks");
72        exec.shutdown(); // Quit when all tasks complete
73    }
74}
75/* output (sample)
76Launched all tasks
777   completed
789   completed
795   completed
808   completed
811   completed
822   completed
836   completed
844   completed
850   completed
863   completed
87Latch barrier passed for WaitingTask 0  
88*///:~

上面的示例中,WaitingTask将会阻塞,直到所有的TaskPortion执行完成,TaskPortion完成之后调用了countDown()方法,注意,countDown()方法是在finally块中调用的,这是为了防止TaskPortion出现异常而导致任务一直阻塞。当计数器为0后,我们看到WaitingTask成功执行。

await()还有一个重载方法await(long, TimeUnit),避免任务让线程一直等待。