CyclicBarrier
CyclicBarrier #
CyclicBarrier
被称为“同步屏障”,事实上就可以把它理解为一个屏障,多个任务调用屏障的await()
方法将被阻塞,直到所有的任务都进入阻塞,那么屏障开启,所有任务继续执行。这看起来和CountDownLatch
非常像,不过CountDownLatch
只能触发一次,而CyclicBarrier
可以多次重用,这是它们的主要区别之一。
和CountDownLatch
一样,CyclicBarrier
接受一个整型参数,表示可限制的线程数。除此之外,CyclicBarrier
还可以接受一个Runnable
作为参数,这个参数称作barrierAction
,barrierAction
在所有线程到达屏障之后即开始执行,其他任务只能等待barrierAction
执行完毕之后才能继续执行,这是CyclicBarrier
和CountDownLatch
的区别之二。
1public class TestCyclicBarrier {
2
3 private static StringBuffer sb = new StringBuffer();
4 /** CyclicBarrier的构造器任务总是会先执行完毕 */
5 static CyclicBarrier c = new CyclicBarrier(2, () -> {
6 sb.append(3);
7 });
8 private static final int ASSERT_VALUE = 312;
9
10 static int run() {
11 Thread t = new Thread(() -> {
12 try {
13 c.await();
14 } catch (Exception e) {
15 // ignore;
16 }
17 sb.append(1);
18 });
19 t.start();
20 try {
21 c.await();
22 sb.append(2);
23 t.join();
24 } catch (Exception e) {
25 // ignore
26 }
27 return Integer.parseInt(sb.toString()) | (sb.delete(0, sb.length())).length();
28 }
29
30 public static void main(String[] args) {
31
32 for (; ; ) {
33 int r;
34 if ((r = run()) != ASSERT_VALUE) {
35 // should be 321
36 System.out.println(r);
37 return;
38 }
39 }
40 }
41}
上例中,barrier有一个barrierAction
和2个“屏障任务”,main方法的输出大概率为312,小概率为321,不会出现其他结果,所以main方法无论执行多长时间,其总会结束。由于barrierAction
总是先执行,故结果总是3xx1,其先执行完毕的原因在源码中很容易找到:
1//...
2// 所有任务到达屏障
3if (index == 0) { // tripped
4boolean ranAction = false;
5try {
6 final Runnable command = barrierCommand;
7 //直接在当前线程调用command的run方法
8 if (command != null)
9 command.run();
10 ranAction = true;
11 nextGeneration(); // 唤醒所有线程
12 return 0;
13 } finally {
14 if (!ranAction)
15 breakBarrier();
16 }
17}
18//...
不过,屏障开启后,任务的执行顺序完全是由cpu调度的。同时,本例中的CyclicBarrier
是静态域,在main方法重复执行时,并不会重新初始化,因此也直接证明了CyclicBarrier
的可重用性——屏障开启后,任务继续执行后调用屏障的await()
方法同样会阻塞而等待所有任务到达屏障,依次循环。
下例的“赛马游戏”2完美地阐述了CyclicBarrier
可以多次重用的特点,马每次跑一步,不过不同的马步长不同,等待所有的马都“跑出这一步”后,屏障开启,先确定是否有马到达终点,如有则结束赛跑,否则继续下一轮,直到有马越过终点线,下面是示例代码:
1public class HorseRace {
2
3 static class Horse implements Runnable {
4 private static int counter = 0;
5 private final int id = counter++;
6 private int strides = 0;
7 private static Random rand = new Random(47);
8 private static CyclicBarrier barrier;
9
10 public Horse(CyclicBarrier b) {
11 barrier = b;
12 }
13
14 public int getStrides() {
15 return strides;
16 }
17
18 @Override
19 public void run() {
20 try {
21 while (!Thread.interrupted()) {
22 strides += rand.nextInt(3); // Produces 0, 1 or 2
23 barrier.await();
24 }
25 } catch (InterruptedException e) {
26 // A legitimate way to exit
27 } catch (BrokenBarrierException e) {
28 // This one we want to know about
29 throw new RuntimeException(e);
30 }
31 }
32
33 @Override
34 public String toString() {
35 return "Horse " + id + " ";
36 }
37
38 public String tracks() {
39 StringBuilder s = new StringBuilder();
40 for (int i = 0; i < getStrides(); i++) {
41 s.append("*");
42 }
43 s.append(id);
44 return s.toString();
45 }
46 }
47
48 static final int FINISH_LINE = 20;
49 private List<Horse> horses = new ArrayList<>();
50 private ExecutorService exec = Executors.newCachedThreadPool();
51 private CyclicBarrier barrier;
52
53 /** 这是一构造器 */
54 public HorseRace(int nHorses, final int pause) {
55 barrier = new CyclicBarrier(nHorses, () -> {
56 StringBuilder s = new StringBuilder();
57 for (int i = 0; i < FINISH_LINE; i++) {
58 s.append("="); // The fence on the racetrack
59 }
60 System.out.println(s);
61 for (Horse horse : horses) {
62 System.out.println(horse.tracks());
63 }
64 for (Horse horse : horses) {
65 if (horse.getStrides() >= FINISH_LINE) {
66 System.out.println(horse + "won!");
67 exec.shutdownNow();
68 return;
69 }
70 }
71 try {
72 TimeUnit.MILLISECONDS.sleep(pause);
73 } catch (InterruptedException e) {
74 System.out.println("barrier-action sleep interrupted");
75 }
76 });
77 for (int i = 0; i < nHorses; i++) {
78 Horse horse = new Horse(barrier);
79 horses.add(horse);
80 exec.execute(horse);
81 }
82 }
83
84 public static void main(String[] args) {
85 int nHorses = 7;
86 int pause = 200;
87 if (args.length > 0) { // Optional argument
88 int n = new Integer(args[0]);
89 nHorses = n > 0 ? n : nHorses;
90 }
91 if (args.length > 1) { // Optional argument
92 int p = new Integer(args[1]);
93 pause = p > -1 ? p : pause;
94 }
95 new HorseRace(nHorses, pause);
96 }
97}
实际上程序通过获取每匹马的strides
域来判断马是否到达终点。在TIJ原书中,对strides
域的有关操作做了同步处理,而本例中移除了这些同步,这是否安全?虽然CyclicBarrier的barrierAction
和HorseRace
都访问了strides
域,不过,二者访问域的时间一定是错开的:前者在所有马都到达屏障后开始访问,而此时的马处于阻塞状态,而马获得访问权时,barrierAction
一定没在执行3。因此本例中,不使用同步也是安全的。
CyclicBarrier
还有一些特殊方法:
public void reset();
这个方法将CyclicBarrier重置到初始状态
注意,这个方法会导致已经在屏障处等待的线程抛出BrokenBarrierException
如果确实需要一个新的CyclicBarrier来执行操作,新建一个实例是更好的选择
public int getNumberWaiting() ;
这个方法获取在屏障等待的线程数
public int getParties() ;
这个方法获取所有的线程数(用来构建CyclicBarrier实例的int入参)
下面的例子展示了在任务执行时重置
CyclicBarrier
的操作,这个示例只是为了展示上面几个方法的用法,千万不要在执行任务时贸然去做这样的操作!如果处理不得当将很大可能引发阻塞或其他并发问题。
笔者本意是计划执行批量任务,这些任务有一个域来计算其运行次数,并可能在某个任务上调用
CyclicBarrier
的reset()
方法,在reset()
调用之前和之后的任务其运行次数会有差别,通过这个运行差异在barrierAction
中来终结线程池。事实上这个预想完全落空了,reset()
之后,如果不再次使所有线程重新到达屏障处等待,barrierAction
就不可能执行。
1public class ResetCyclicBarrier {
2 static void reSetBarrierIf(int parties, int bound) {
3 TaskMayFail[] tasks = new TaskMayFail[parties];
4 ThreadPoolExecutor exec = (ThreadPoolExecutor) Executors.newCachedThreadPool();
5 exec.setKeepAliveTime(0, TimeUnit.SECONDS);
6 AtomicInteger ai = new AtomicInteger();
7 CyclicBarrier c2 = new CyclicBarrier(parties, () -> {
8 // if reset barrier while task is running, the
9 // barrier action can not reach in this cycle
10 // until relaunch all parties to reach at barrier
11 // in next round
12 int i = 0;
13 int r = tasks[i].runtime;
14 while (i < parties) {
15 if (r != tasks[i].runtime) {
16 System.out.println(tasks[i] + ":" + tasks[i].runtime + ": " + r);
17 exec.shutdownNow();
18 return;
19 }
20 r = tasks[i].runtime;
21 i++;
22 }
23 });
24
25 for (int i = 0; i < parties; i++) {
26 TaskMayFail taskMayFail = new TaskMayFail(c2, ai, bound);
27 tasks[i] = taskMayFail;
28 exec.execute(taskMayFail);
29 }
30 }
31
32 private static class TaskMayFail implements Runnable {
33 static Random rand = new Random();
34 static int count = 1;
35 final CyclicBarrier cb;
36 final AtomicInteger reSetCount;
37 final int bound;
38 final int id = count++;
39 int runtime = 0;
40
41
42 public TaskMayFail(CyclicBarrier cb, AtomicInteger reSetCount, int bound) {
43 this.cb = cb;
44 this.reSetCount = reSetCount;
45 this.bound = bound;
46 }
47
48 @Override
49 public String toString() {
50 return "[TaskMayFail-" + id + "-runtime-" + runtime + "]";
51 }
52
53 @Override
54 public void run() {
55 try {
56 while (!Thread.currentThread().isInterrupted()) {
57 if (rand.nextBoolean()) {
58 // bound值可调整reset的概率
59 if (rand.nextInt(bound) == 0) {
60 throw new ArithmeticException();
61 }
62 }
63 runtime++;
64 cb.await();
65 }
66 } catch (ArithmeticException ae) {
67 reSetCount.incrementAndGet();
68 while (cb.getNumberWaiting() < (cb.getParties() - reSetCount.intValue())) {
69 // waiting for all parties reach at barrier
70 // or all parties throws exception
71 }
72 // reset barrier
73 cb.reset();
74 System.out.printf("%s-%s reset %s%n",
75 Thread.currentThread().getName(),
76 this,
77 cb);
78 } catch (InterruptedException | BrokenBarrierException ae) {
79 reSetCount.incrementAndGet();
80 // once barrier reset, other parties wait on barrier
81 // will throw BrokenBarrierException
82 System.out.printf("%s-%s return by broken barrier.%n",
83 Thread.currentThread().getName(),
84 this);
85 } finally {
86 Thread.currentThread().interrupt();
87 }
88 }
89
90 public static void main(String[] args) {
91 reSetBarrierIf(13, 100);
92 }
93 }
94}
95/* output (sample)
96pool-1-thread-3-[TaskMayFail-3-runtime-19] reset java.util.concurrent.CyclicBarrier@618bfe9a
97pool-1-thread-4-[TaskMayFail-4-runtime-20] return by broken barrier.
98pool-1-thread-9-[TaskMayFail-9-runtime-20] return by broken barrier.
99pool-1-thread-8-[TaskMayFail-8-runtime-20] return by broken barrier.
100pool-1-thread-5-[TaskMayFail-5-runtime-20] return by broken barrier.
101pool-1-thread-12-[TaskMayFail-12-runtime-20] return by broken barrier.
102pool-1-thread-7-[TaskMayFail-7-runtime-20] return by broken barrier.
103pool-1-thread-13-[TaskMayFail-13-runtime-20] return by broken barrier.
104pool-1-thread-1-[TaskMayFail-1-runtime-20] return by broken barrier.
105pool-1-thread-11-[TaskMayFail-11-runtime-20] return by broken barrier.
106pool-1-thread-2-[TaskMayFail-2-runtime-20] return by broken barrier.
107pool-1-thread-10-[TaskMayFail-10-runtime-20] return by broken barrier.
108pool-1-thread-6-[TaskMayFail-6-runtime-19] reset java.util.concurrent.CyclicBarrier@618bfe9a
109*///:~
从输出可以看到,CyclicBarrier
可以重复使用。上例的设计很巧妙,因为屏障在开启之后,任务可能很快就抛出ArithmeticException
而进入reset
流程,而此时其他任务可能在屏障处等待或者还未执行,若此时贸然reset
,那些等待的线程会抛出BrokenBarrierException
并退出,但是未执行的线程并未意识到reset
的发生(可以这么表述),依然进入阻塞,如果没有再次任务进入reset
流程,程序很快将因为没有足够多的线程到达屏障而阻塞4。
所以,上例引入一个原子变量,用于跟踪进入reset
和已经退出的任务数,那么剩余的线程应该就是到达屏障的线程数,利用这个限制来保证所有的线程都得到处理,以简化问题的复杂性,一旦确定所有的线程都被处理,就可以执行reset()
方法。同时reset()
之后,barrierAction
便无法执行。
这个示例演化自 《Java并发编程的艺术》方腾飞等.著,第八章8.2节代码清单8-4。不过该书中关于这段代码的运行解释是不正确的,本例也证明了这一点。 ↩︎
《Thinking in Java》 4th Edition, 第21章21.7.2示例代码。 ↩︎
虽然是这样,但上述代码并不能保证对
strides
的内存可见性,main线程获取的可能不是最新值,使用volatail
关键字修饰strides
域可解决问题。 ↩︎就算有任务再次进入了
reset
流程,也依然可能存在上面描述的问题,这仅仅增加了程序运行的不稳定性。 ↩︎