生产者-消费者与阻塞队列
在讨论线程协作的时候,已经讨论了生产者与消费者雏形,比如录音是生产者,而播放则是消费者;同样的,在汽车打蜡的模型中,打蜡可看作生产者,抛光可看作消费者;只是它们的关系是简单的生产-消费关系。
除了简单的线程协同之外,Java提供了同步队列来解决线程的协同问题,本节重点讨论这部分的内容。
...在讨论线程协作的时候,已经讨论了生产者与消费者雏形,比如录音是生产者,而播放则是消费者;同样的,在汽车打蜡的模型中,打蜡可看作生产者,抛光可看作消费者;只是它们的关系是简单的生产-消费关系。
除了简单的线程协同之外,Java提供了同步队列来解决线程的协同问题,本节重点讨论这部分的内容。
...上一篇文章介绍了juc的几种主要阻塞队列。
本文使用2个例子,演示了阻塞队列在Java中的应用。
...Executors
可以称作执行器。Java并发系列的文章到目前为止,虽然没有特别说明,但是使用执行器(Executor(s))的次数已经难以计数了,Executors
提供了一些非常方便的静态方法,可以根据需要创建不同的ExecutorService
,然后调用其execute(Runnable)
或submit(Callable<T>)
方法。
在并发条件下,执行器还有一个非常明显的优势,它使用线程池管理线程,减少了系统创建和销毁线程的开销。在一般的Java并发过程中,也建议使用执行器完成任务而非显式地创建线程。
本文将从执行器开始,阐述Java中的线程池。
...前文就已经提过,Executors
执行器创建的线程池包括不同实现,可以应对不同的场景,那么Java中包含哪些实现呢?
本问就来讨论这些实现。
...前文说过,ThreadPoolExecutor
实例代表了Java线程池,前面我们介绍了ThreadPoolExecutor
的构造器和几个核心概念,在本节中,我们着重介绍线程池的执行过程以及线程池的关闭。
在提交单个任务时,使用submit()
或者execute()
方法或许能够满足要求,但如果需要控制多个任务时,依次提交的操作看起来“有些繁琐”,此时我们可以使用ExecutorService
提供的invokeAny/invokeAll
方法,在介绍CompletionService
接口时,我们不妨先看看这两个方法。
之前介绍AbstractExecutorService
时提到,这两个方法是在这个抽象类中实现的,其中前者在获取到一个任务的返回值时便取消其他(未执行或正在执行的任务)任务,而后者需要等待所有的任务执行完成之后才能对任务的返回进行处理,接下来我们分别来看:
invokeAll
会阻塞等待所有的任务执行完成。
1public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
2 throws InterruptedException {
3 if (tasks == null)
4 throw new NullPointerException();
5 ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
6 boolean done = false;
7 try {
8 for (Callable<T> t : tasks) {
9 RunnableFuture<T> f = newTaskFor(t);
10 futures.add(f);
11 execute(f);
12 }
13 // 有序迭代
14 for (int i = 0, size = futures.size(); i < size; i++) {
15 Future<T> f = futures.get(i);
16 if (!f.isDone()) {
17 try {
18 // 阻塞等待任务执行完成
19 f.get();
20 } catch (CancellationException ignore) {
21 } catch (ExecutionException ignore) {
22 }
23 }
24 }
25 done = true;
26 return futures;
27 } finally {
28 if (!done)
29 // 处理因异常而未正常执行的任务
30 for (int i = 0, size = futures.size(); i < size; i++)
31 futures.get(i).cancel(true);
32 }
33}
34
35// invokeAny
36public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
37 throws InterruptedException, ExecutionException {
38 try {
39 return doInvokeAny(tasks, false, 0);
40 } catch (TimeoutException cannotHappen) {
41 assert false;
42 return null;
43 }
44}
45private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
46 boolean timed, long nanos)
47 throws InterruptedException, ExecutionException, TimeoutException {
48 if (tasks == null)
49 throw new NullPointerException();
50 int ntasks = tasks.size();
51 if (ntasks == 0)
52 throw new IllegalArgumentException();
53 ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
54 ExecutorCompletionService<T> ecs =
55 new ExecutorCompletionService<T>(this);
56
57 try {
58 // Record exceptions so that if we fail to obtain any
59 // result, we can throw the last exception we got.
60 ExecutionException ee = null;
61 final long deadline = timed ? System.nanoTime() + nanos : 0L;
62 Iterator<? extends Callable<T>> it = tasks.iterator();
63
64 // Start one task for sure; the rest incrementally
65 futures.add(ecs.submit(it.next()));
66 --ntasks;
67 int active = 1;
68
69 for (;;) {
70 // 并没阻塞第一个任务,此时可能第一个任务还未执行完
71 Future<T> f = ecs.poll();
72 if (f == null) {
73 if (ntasks > 0) {
74 --ntasks;
75 // 不等待上一个任务的结果,直接新执行一个任务
76 futures.add(ecs.submit(it.next()));
77 ++active;
78 }
79 else if (active == 0)
80 break;
81 else if (timed) {
82 f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
83 if (f == null)
84 throw new TimeoutException();
85 nanos = deadline - System.nanoTime();
86 }
87 else
88 // 没有可执行的任务了,则等待一个结果
89 f = ecs.take();
90 }
91 // 有结果则返回
92 if (f != null) {
93 --active;
94 try {
95 return f.get();
96 } catch (ExecutionException eex) {
97 ee = eex;
98 } catch (RuntimeException rex) {
99 ee = new ExecutionException(rex);
100 }
101 }
102 }
103
104 if (ee == null)
105 ee = new ExecutionException();
106 throw ee;
107
108 } finally {
109 for (int i = 0, size = futures.size(); i < size; i++)
110 // 取消还未执行或者执行中的任务
111 // 中断任务
112 futures.get(i).cancel(true);
113 }
114}
可以看到,与invokeAll
不同的是,invokeAny
方法是在循环的启动任务,直到获取到任一任务的返回值为止,而未执行或正在执行的任务则会被中断。
除了ThreadPoolExecutor
之外,Java执行器(Executor)框架还提供了可以在指定延迟之后执行一次或周期执行任务的接口ScheduledExecutorService
,较
java.util.Timer而言,它是更好的选择。
与
线程池不同的是,用于计划执行的ScheduledThreadPoolExecutor
使用ScheduledFutureTask
作为任务,使用DelayedWorkQueue
作为任务队列,以实现计划(周期)执行的目的。
前文介绍了ScheduledFutureTask
和DeleyedWorkQueue
这么多,都是为了更好地理解任务执行的流程,在这之前,我们不妨先看如下示例:
在讨论线程的基本概念时,我们说过join()
方法可使当前线程等待调用join方法的线程执行完,可以实现简单的
无锁同步,使用CountDownLatch
可以更加简单的实现这一目的。毕竟,join()
方法的语义“加入一个线程”不是很容易就能让人理解。相较于join()
方法,CountDownLatch
的语义就明确多了。
CyclicBarrier
被称为“同步屏障”,事实上就可以把它理解为一个屏障,多个任务调用屏障的await()
方法将被阻塞,直到所有的任务都进入阻塞,那么屏障开启,所有任务继续执行。这看起来和CountDownLatch
非常像,不过CountDownLatch
只能触发一次,而CyclicBarrier
可以多次重用,这是它们的主要区别之一。
和CountDownLatch
一样,CyclicBarrier
接受一个整型参数,表示可限制的线程数。除此之外,CyclicBarrier
还可以接受一个Runnable
作为参数,这个参数称作barrierAction
,barrierAction
在所有线程到达屏障之后即开始执行,其他任务只能等待barrierAction
执行完毕之后才能继续执行,这是CyclicBarrier
和CountDownLatch
的区别之二。