并发

生产者-消费者与阻塞队列

在讨论线程协作的时候,已经讨论了生产者与消费者雏形,比如录音是生产者,而播放则是消费者;同样的,在汽车打蜡的模型中,打蜡可看作生产者,抛光可看作消费者;只是它们的关系是简单的生产-消费关系。

除了简单的线程协同之外,Java提供了同步队列来解决线程的协同问题,本节重点讨论这部分的内容。

...

Executors与Executor框架

Executors可以称作执行器。Java并发系列的文章到目前为止,虽然没有特别说明,但是使用执行器(Executor(s))的次数已经难以计数了,Executors提供了一些非常方便的静态方法,可以根据需要创建不同的ExecutorService,然后调用其execute(Runnable)submit(Callable<T>)方法。

在并发条件下,执行器还有一个非常明显的优势,它使用线程池管理线程,减少了系统创建和销毁线程的开销。在一般的Java并发过程中,也建议使用执行器完成任务而非显式地创建线程。

本文将从执行器开始,阐述Java中的线程池。

...

CompletionService

在提交单个任务时,使用submit()或者execute()方法或许能够满足要求,但如果需要控制多个任务时,依次提交的操作看起来“有些繁琐”,此时我们可以使用ExecutorService提供的invokeAny/invokeAll方法,在介绍CompletionService接口时,我们不妨先看看这两个方法。

之前介绍AbstractExecutorService时提到,这两个方法是在这个抽象类中实现的,其中前者在获取到一个任务的返回值时便取消其他(未执行或正在执行的任务)任务,而后者需要等待所有的任务执行完成之后才能对任务的返回进行处理,接下来我们分别来看:

invokeAll会阻塞等待所有的任务执行完成。

  1public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
  2    throws InterruptedException {
  3    if (tasks == null)
  4        throw new NullPointerException();
  5    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
  6    boolean done = false;
  7    try {
  8        for (Callable<T> t : tasks) {
  9            RunnableFuture<T> f = newTaskFor(t);
 10            futures.add(f);
 11            execute(f);
 12        }
 13        // 有序迭代
 14        for (int i = 0, size = futures.size(); i < size; i++) {
 15            Future<T> f = futures.get(i);
 16            if (!f.isDone()) {
 17                try {
 18                    // 阻塞等待任务执行完成
 19                    f.get();
 20                } catch (CancellationException ignore) {
 21                } catch (ExecutionException ignore) {
 22                }
 23            }
 24        }
 25        done = true;
 26        return futures;
 27    } finally {
 28        if (!done)
 29            // 处理因异常而未正常执行的任务
 30            for (int i = 0, size = futures.size(); i < size; i++)
 31                futures.get(i).cancel(true);
 32    }
 33}
 34
 35// invokeAny
 36public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
 37    throws InterruptedException, ExecutionException {
 38    try {
 39        return doInvokeAny(tasks, false, 0);
 40    } catch (TimeoutException cannotHappen) {
 41        assert false;
 42        return null;
 43    }
 44}
 45private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
 46                          boolean timed, long nanos)
 47    throws InterruptedException, ExecutionException, TimeoutException {
 48    if (tasks == null)
 49        throw new NullPointerException();
 50    int ntasks = tasks.size();
 51    if (ntasks == 0)
 52        throw new IllegalArgumentException();
 53    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
 54    ExecutorCompletionService<T> ecs =
 55        new ExecutorCompletionService<T>(this);
 56
 57    try {
 58        // Record exceptions so that if we fail to obtain any
 59        // result, we can throw the last exception we got.
 60        ExecutionException ee = null;
 61        final long deadline = timed ? System.nanoTime() + nanos : 0L;
 62        Iterator<? extends Callable<T>> it = tasks.iterator();
 63
 64        // Start one task for sure; the rest incrementally
 65        futures.add(ecs.submit(it.next()));
 66        --ntasks;
 67        int active = 1;
 68
 69        for (;;) {
 70            // 并没阻塞第一个任务,此时可能第一个任务还未执行完
 71            Future<T> f = ecs.poll();
 72            if (f == null) {
 73                if (ntasks > 0) {
 74                    --ntasks;
 75                    // 不等待上一个任务的结果,直接新执行一个任务
 76                    futures.add(ecs.submit(it.next()));
 77                    ++active;
 78                }
 79                else if (active == 0)
 80                    break;
 81                else if (timed) {
 82                    f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
 83                    if (f == null)
 84                        throw new TimeoutException();
 85                    nanos = deadline - System.nanoTime();
 86                }
 87                else
 88                    // 没有可执行的任务了,则等待一个结果
 89                    f = ecs.take();
 90            }
 91            // 有结果则返回
 92            if (f != null) {
 93                --active;
 94                try {
 95                    return f.get();
 96                } catch (ExecutionException eex) {
 97                    ee = eex;
 98                } catch (RuntimeException rex) {
 99                    ee = new ExecutionException(rex);
100                }
101            }
102        }
103
104        if (ee == null)
105            ee = new ExecutionException();
106        throw ee;
107
108    } finally {
109        for (int i = 0, size = futures.size(); i < size; i++)
110            // 取消还未执行或者执行中的任务
111            // 中断任务
112            futures.get(i).cancel(true);
113    }
114}

可以看到,与invokeAll不同的是,invokeAny方法是在循环的启动任务,直到获取到任一任务的返回值为止,而未执行或正在执行的任务则会被中断。

...

ScheduledExecutorService(一)

除了ThreadPoolExecutor之外,Java执行器(Executor)框架还提供了可以在指定延迟之后执行一次或周期执行任务的接口ScheduledExecutorService,较 java.util.Timer而言,它是更好的选择。

线程池不同的是,用于计划执行的ScheduledThreadPoolExecutor使用ScheduledFutureTask作为任务,使用DelayedWorkQueue作为任务队列,以实现计划(周期)执行的目的。

...

CountDownLatch

CountDownLatch #

在讨论线程的基本概念时,我们说过join()方法可使当前线程等待调用join方法的线程执行完,可以实现简单的 无锁同步,使用CountDownLatch可以更加简单的实现这一目的。毕竟,join()方法的语义“加入一个线程”不是很容易就能让人理解。相较于join()方法,CountDownLatch的语义就明确多了。

...

CyclicBarrier

CyclicBarrier #

CyclicBarrier被称为“同步屏障”,事实上就可以把它理解为一个屏障,多个任务调用屏障的await()方法将被阻塞,直到所有的任务都进入阻塞,那么屏障开启,所有任务继续执行。这看起来和CountDownLatch非常像,不过CountDownLatch只能触发一次,而CyclicBarrier可以多次重用,这是它们的主要区别之一。

CountDownLatch一样,CyclicBarrier接受一个整型参数,表示可限制的线程数。除此之外,CyclicBarrier还可以接受一个Runnable作为参数,这个参数称作barrierActionbarrierAction在所有线程到达屏障之后即开始执行,其他任务只能等待barrierAction执行完毕之后才能继续执行,这是CyclicBarrierCountDownLatch的区别之二。

...