CyclicBarrier

CyclicBarrier

CyclicBarrier #

CyclicBarrier被称为“同步屏障”,事实上就可以把它理解为一个屏障,多个任务调用屏障的await()方法将被阻塞,直到所有的任务都进入阻塞,那么屏障开启,所有任务继续执行。这看起来和CountDownLatch非常像,不过CountDownLatch只能触发一次,而CyclicBarrier可以多次重用,这是它们的主要区别之一。

CountDownLatch一样,CyclicBarrier接受一个整型参数,表示可限制的线程数。除此之外,CyclicBarrier还可以接受一个Runnable作为参数,这个参数称作barrierActionbarrierAction在所有线程到达屏障之后即开始执行,其他任务只能等待barrierAction执行完毕之后才能继续执行,这是CyclicBarrierCountDownLatch的区别之二。

 1public class TestCyclicBarrier {
 2
 3    private static StringBuffer sb = new StringBuffer();
 4    /** CyclicBarrier的构造器任务总是会先执行完毕 */
 5    static CyclicBarrier c = new CyclicBarrier(2, () -> {
 6        sb.append(3);
 7    });
 8    private static final int ASSERT_VALUE = 312;
 9
10    static int run() {
11        Thread t = new Thread(() -> {
12            try {
13                c.await();
14            } catch (Exception e) {
15                // ignore;
16            }
17            sb.append(1);
18        });
19        t.start();
20        try {
21            c.await();
22            sb.append(2);
23            t.join();
24        } catch (Exception e) {
25            // ignore
26        }
27        return Integer.parseInt(sb.toString()) | (sb.delete(0, sb.length())).length();
28    }
29
30    public static void main(String[] args) {
31
32        for (; ; ) {
33            int r;
34            if ((r = run()) != ASSERT_VALUE) {
35                // should be 321
36                System.out.println(r);
37                return;
38            }
39        }
40    }
41}

上例中,barrier有一个barrierAction和2个“屏障任务”,main方法的输出大概率为312,小概率为321,不会出现其他结果,所以main方法无论执行多长时间,其总会结束。由于barrierAction总是先执行,故结果总是3xx1,其先执行完毕的原因在源码中很容易找到:

 1//...
 2// 所有任务到达屏障
 3if (index == 0) {  // tripped
 4boolean ranAction = false;
 5try {
 6    final Runnable command = barrierCommand;
 7    //直接在当前线程调用command的run方法
 8    if (command != null)
 9        command.run();
10    ranAction = true;
11    nextGeneration();  // 唤醒所有线程
12    return 0;
13    } finally {
14        if (!ranAction)
15            breakBarrier();
16    }
17}
18//...

不过,屏障开启后,任务的执行顺序完全是由cpu调度的。同时,本例中的CyclicBarrier是静态域,在main方法重复执行时,并不会重新初始化,因此也直接证明了CyclicBarrier的可重用性——屏障开启后,任务继续执行后调用屏障的await()方法同样会阻塞而等待所有任务到达屏障,依次循环。

下例的“赛马游戏”2完美地阐述了CyclicBarrier可以多次重用的特点,马每次跑一步,不过不同的马步长不同,等待所有的马都“跑出这一步”后,屏障开启,先确定是否有马到达终点,如有则结束赛跑,否则继续下一轮,直到有马越过终点线,下面是示例代码:

 1public class HorseRace {
 2
 3    static class Horse implements Runnable {
 4        private static int counter = 0;
 5        private final int id = counter++;
 6        private int strides = 0;
 7        private static Random rand = new Random(47);
 8        private static CyclicBarrier barrier;
 9
10        public Horse(CyclicBarrier b) {
11            barrier = b;
12        }
13
14        public int getStrides() {
15            return strides;
16        }
17
18        @Override
19        public void run() {
20            try {
21                while (!Thread.interrupted()) {
22                    strides += rand.nextInt(3); // Produces 0, 1 or 2
23                    barrier.await();
24                }
25            } catch (InterruptedException e) {
26                // A legitimate way to exit
27            } catch (BrokenBarrierException e) {
28                // This one we want to know about
29                throw new RuntimeException(e);
30            }
31        }
32
33        @Override
34        public String toString() {
35            return "Horse " + id + " ";
36        }
37
38        public String tracks() {
39            StringBuilder s = new StringBuilder();
40            for (int i = 0; i < getStrides(); i++) {
41                s.append("*");
42            }
43            s.append(id);
44            return s.toString();
45        }
46    }
47
48    static final int FINISH_LINE = 20;
49    private List<Horse> horses = new ArrayList<>();
50    private ExecutorService exec = Executors.newCachedThreadPool();
51    private CyclicBarrier barrier;
52
53    /** 这是一构造器 */
54    public HorseRace(int nHorses, final int pause) {
55        barrier = new CyclicBarrier(nHorses, () -> {
56            StringBuilder s = new StringBuilder();
57            for (int i = 0; i < FINISH_LINE; i++) {
58                s.append("="); // The fence on the racetrack
59            }
60            System.out.println(s);
61            for (Horse horse : horses) {
62                System.out.println(horse.tracks());
63            }
64            for (Horse horse : horses) {
65                if (horse.getStrides() >= FINISH_LINE) {
66                    System.out.println(horse + "won!");
67                    exec.shutdownNow();
68                    return;
69                }
70            }
71            try {
72                TimeUnit.MILLISECONDS.sleep(pause);
73            } catch (InterruptedException e) {
74                System.out.println("barrier-action sleep interrupted");
75            }
76        });
77        for (int i = 0; i < nHorses; i++) {
78            Horse horse = new Horse(barrier);
79            horses.add(horse);
80            exec.execute(horse);
81        }
82    }
83
84    public static void main(String[] args) {
85        int nHorses = 7;
86        int pause = 200;
87        if (args.length > 0) { // Optional argument
88            int n = new Integer(args[0]);
89            nHorses = n > 0 ? n : nHorses;
90        }
91        if (args.length > 1) { // Optional argument
92            int p = new Integer(args[1]);
93            pause = p > -1 ? p : pause;
94        }
95        new HorseRace(nHorses, pause);
96    }
97}

实际上程序通过获取每匹马的strides域来判断马是否到达终点。在TIJ原书中,对strides域的有关操作做了同步处理,而本例中移除了这些同步,这是否安全?虽然CyclicBarrier的barrierActionHorseRace都访问了strides域,不过,二者访问域的时间一定是错开的:前者在所有马都到达屏障后开始访问,而此时的马处于阻塞状态,而马获得访问权时,barrierAction一定没在执行3。因此本例中,不使用同步也是安全的。

CyclicBarrier还有一些特殊方法:

public void reset();
    这个方法将CyclicBarrier重置到初始状态
    注意,这个方法会导致已经在屏障处等待的线程抛出BrokenBarrierException
    如果确实需要一个新的CyclicBarrier来执行操作,新建一个实例是更好的选择

public int getNumberWaiting() ;
    这个方法获取在屏障等待的线程数

public int getParties() ;
    这个方法获取所有的线程数(用来构建CyclicBarrier实例的int入参)

下面的例子展示了在任务执行时重置CyclicBarrier的操作,这个示例只是为了展示上面几个方法的用法,千万不要在执行任务时贸然去做这样的操作!如果处理不得当将很大可能引发阻塞或其他并发问题。

笔者本意是计划执行批量任务,这些任务有一个域来计算其运行次数,并可能在某个任务上调用CyclicBarrierreset()方法,在reset()调用之前和之后的任务其运行次数会有差别,通过这个运行差异在barrierAction中来终结线程池。事实上这个预想完全落空了,reset()之后,如果不再次使所有线程重新到达屏障处等待,barrierAction就不可能执行。

  1public class ResetCyclicBarrier {
  2    static void reSetBarrierIf(int parties, int bound) {
  3        TaskMayFail[] tasks = new TaskMayFail[parties];
  4        ThreadPoolExecutor exec = (ThreadPoolExecutor) Executors.newCachedThreadPool();
  5        exec.setKeepAliveTime(0, TimeUnit.SECONDS);
  6        AtomicInteger ai = new AtomicInteger();
  7        CyclicBarrier c2 = new CyclicBarrier(parties, () -> {
  8            // if reset barrier while task is running, the
  9            // barrier action can not reach in this cycle
 10            // until relaunch all parties to reach at barrier
 11            // in next round
 12            int i = 0;
 13            int r = tasks[i].runtime;
 14            while (i < parties) {
 15                if (r != tasks[i].runtime) {
 16                    System.out.println(tasks[i] + ":" + tasks[i].runtime + ": " + r);
 17                    exec.shutdownNow();
 18                    return;
 19                }
 20                r = tasks[i].runtime;
 21                i++;
 22            }
 23        });
 24
 25        for (int i = 0; i < parties; i++) {
 26            TaskMayFail taskMayFail = new TaskMayFail(c2, ai, bound);
 27            tasks[i] = taskMayFail;
 28            exec.execute(taskMayFail);
 29        }
 30    }
 31
 32    private static class TaskMayFail implements Runnable {
 33        static Random rand = new Random();
 34        static int count = 1;
 35        final CyclicBarrier cb;
 36        final AtomicInteger reSetCount;
 37        final int bound;
 38        final int id = count++;
 39        int runtime = 0;
 40
 41
 42        public TaskMayFail(CyclicBarrier cb, AtomicInteger reSetCount, int bound) {
 43            this.cb = cb;
 44            this.reSetCount = reSetCount;
 45            this.bound = bound;
 46        }
 47
 48        @Override
 49        public String toString() {
 50            return "[TaskMayFail-" + id + "-runtime-" + runtime + "]";
 51        }
 52
 53        @Override
 54        public void run() {
 55            try {
 56                while (!Thread.currentThread().isInterrupted()) {
 57                    if (rand.nextBoolean()) {
 58                        // bound值可调整reset的概率
 59                        if (rand.nextInt(bound) == 0) {
 60                            throw new ArithmeticException();
 61                        }
 62                    }
 63                    runtime++;
 64                    cb.await();
 65                }
 66            } catch (ArithmeticException ae) {
 67                reSetCount.incrementAndGet();
 68                while (cb.getNumberWaiting() < (cb.getParties() - reSetCount.intValue())) {
 69                    // waiting for all parties reach at barrier
 70                    // or all parties throws exception
 71                }
 72                // reset barrier
 73                cb.reset();
 74                System.out.printf("%s-%s reset %s%n",
 75                    Thread.currentThread().getName(),
 76                    this,
 77                    cb);
 78            } catch (InterruptedException | BrokenBarrierException ae) {
 79                reSetCount.incrementAndGet();
 80                // once barrier reset, other parties wait on barrier
 81                // will throw BrokenBarrierException
 82                System.out.printf("%s-%s return by broken barrier.%n",
 83                    Thread.currentThread().getName(),
 84                    this);
 85            } finally {
 86                Thread.currentThread().interrupt();
 87            }
 88        }
 89
 90        public static void main(String[] args) {
 91            reSetBarrierIf(13, 100);
 92        }
 93    }
 94}
 95/* output (sample)
 96pool-1-thread-3-[TaskMayFail-3-runtime-19] reset java.util.concurrent.CyclicBarrier@618bfe9a
 97pool-1-thread-4-[TaskMayFail-4-runtime-20] return by broken barrier.
 98pool-1-thread-9-[TaskMayFail-9-runtime-20] return by broken barrier.
 99pool-1-thread-8-[TaskMayFail-8-runtime-20] return by broken barrier.
100pool-1-thread-5-[TaskMayFail-5-runtime-20] return by broken barrier.
101pool-1-thread-12-[TaskMayFail-12-runtime-20] return by broken barrier.
102pool-1-thread-7-[TaskMayFail-7-runtime-20] return by broken barrier.
103pool-1-thread-13-[TaskMayFail-13-runtime-20] return by broken barrier.
104pool-1-thread-1-[TaskMayFail-1-runtime-20] return by broken barrier.
105pool-1-thread-11-[TaskMayFail-11-runtime-20] return by broken barrier.
106pool-1-thread-2-[TaskMayFail-2-runtime-20] return by broken barrier.
107pool-1-thread-10-[TaskMayFail-10-runtime-20] return by broken barrier.
108pool-1-thread-6-[TaskMayFail-6-runtime-19] reset java.util.concurrent.CyclicBarrier@618bfe9a
109*///:~

从输出可以看到,CyclicBarrier可以重复使用。上例的设计很巧妙,因为屏障在开启之后,任务可能很快就抛出ArithmeticException而进入reset流程,而此时其他任务可能在屏障处等待或者还未执行,若此时贸然reset,那些等待的线程会抛出BrokenBarrierException并退出,但是未执行的线程并未意识到reset的发生(可以这么表述),依然进入阻塞,如果没有再次任务进入reset流程,程序很快将因为没有足够多的线程到达屏障而阻塞4

所以,上例引入一个原子变量,用于跟踪进入reset和已经退出的任务数,那么剩余的线程应该就是到达屏障的线程数,利用这个限制来保证所有的线程都得到处理,以简化问题的复杂性,一旦确定所有的线程都被处理,就可以执行reset()方法。同时reset()之后,barrierAction便无法执行。



  1. 这个示例演化自 《Java并发编程的艺术》方腾飞等.著,第八章8.2节代码清单8-4。不过该书中关于这段代码的运行解释是不正确的,本例也证明了这一点。 ↩︎

  2. 《Thinking in Java》 4th Edition, 第21章21.7.2示例代码。 ↩︎

  3. 虽然是这样,但上述代码并不能保证对strides的内存可见性,main线程获取的可能不是最新值,使用volatail关键字修饰strides域可解决问题。 ↩︎

  4. 就算有任务再次进入了reset流程,也依然可能存在上面描述的问题,这仅仅增加了程序运行的不稳定性。 ↩︎