Java

Executors与Executor框架

Executors可以称作执行器。Java并发系列的文章到目前为止,虽然没有特别说明,但是使用执行器(Executor(s))的次数已经难以计数了,Executors提供了一些非常方便的静态方法,可以根据需要创建不同的ExecutorService,然后调用其execute(Runnable)submit(Callable<T>)方法。

在并发条件下,执行器还有一个非常明显的优势,它使用线程池管理线程,减少了系统创建和销毁线程的开销。在一般的Java并发过程中,也建议使用执行器完成任务而非显式地创建线程。

本文将从执行器开始,阐述Java中的线程池。

...

CompletionService

在提交单个任务时,使用submit()或者execute()方法或许能够满足要求,但如果需要控制多个任务时,依次提交的操作看起来“有些繁琐”,此时我们可以使用ExecutorService提供的invokeAny/invokeAll方法,在介绍CompletionService接口时,我们不妨先看看这两个方法。

之前介绍AbstractExecutorService时提到,这两个方法是在这个抽象类中实现的,其中前者在获取到一个任务的返回值时便取消其他(未执行或正在执行的任务)任务,而后者需要等待所有的任务执行完成之后才能对任务的返回进行处理,接下来我们分别来看:

invokeAll会阻塞等待所有的任务执行完成。

  1public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
  2    throws InterruptedException {
  3    if (tasks == null)
  4        throw new NullPointerException();
  5    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
  6    boolean done = false;
  7    try {
  8        for (Callable<T> t : tasks) {
  9            RunnableFuture<T> f = newTaskFor(t);
 10            futures.add(f);
 11            execute(f);
 12        }
 13        // 有序迭代
 14        for (int i = 0, size = futures.size(); i < size; i++) {
 15            Future<T> f = futures.get(i);
 16            if (!f.isDone()) {
 17                try {
 18                    // 阻塞等待任务执行完成
 19                    f.get();
 20                } catch (CancellationException ignore) {
 21                } catch (ExecutionException ignore) {
 22                }
 23            }
 24        }
 25        done = true;
 26        return futures;
 27    } finally {
 28        if (!done)
 29            // 处理因异常而未正常执行的任务
 30            for (int i = 0, size = futures.size(); i < size; i++)
 31                futures.get(i).cancel(true);
 32    }
 33}
 34
 35// invokeAny
 36public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
 37    throws InterruptedException, ExecutionException {
 38    try {
 39        return doInvokeAny(tasks, false, 0);
 40    } catch (TimeoutException cannotHappen) {
 41        assert false;
 42        return null;
 43    }
 44}
 45private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
 46                          boolean timed, long nanos)
 47    throws InterruptedException, ExecutionException, TimeoutException {
 48    if (tasks == null)
 49        throw new NullPointerException();
 50    int ntasks = tasks.size();
 51    if (ntasks == 0)
 52        throw new IllegalArgumentException();
 53    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
 54    ExecutorCompletionService<T> ecs =
 55        new ExecutorCompletionService<T>(this);
 56
 57    try {
 58        // Record exceptions so that if we fail to obtain any
 59        // result, we can throw the last exception we got.
 60        ExecutionException ee = null;
 61        final long deadline = timed ? System.nanoTime() + nanos : 0L;
 62        Iterator<? extends Callable<T>> it = tasks.iterator();
 63
 64        // Start one task for sure; the rest incrementally
 65        futures.add(ecs.submit(it.next()));
 66        --ntasks;
 67        int active = 1;
 68
 69        for (;;) {
 70            // 并没阻塞第一个任务,此时可能第一个任务还未执行完
 71            Future<T> f = ecs.poll();
 72            if (f == null) {
 73                if (ntasks > 0) {
 74                    --ntasks;
 75                    // 不等待上一个任务的结果,直接新执行一个任务
 76                    futures.add(ecs.submit(it.next()));
 77                    ++active;
 78                }
 79                else if (active == 0)
 80                    break;
 81                else if (timed) {
 82                    f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
 83                    if (f == null)
 84                        throw new TimeoutException();
 85                    nanos = deadline - System.nanoTime();
 86                }
 87                else
 88                    // 没有可执行的任务了,则等待一个结果
 89                    f = ecs.take();
 90            }
 91            // 有结果则返回
 92            if (f != null) {
 93                --active;
 94                try {
 95                    return f.get();
 96                } catch (ExecutionException eex) {
 97                    ee = eex;
 98                } catch (RuntimeException rex) {
 99                    ee = new ExecutionException(rex);
100                }
101            }
102        }
103
104        if (ee == null)
105            ee = new ExecutionException();
106        throw ee;
107
108    } finally {
109        for (int i = 0, size = futures.size(); i < size; i++)
110            // 取消还未执行或者执行中的任务
111            // 中断任务
112            futures.get(i).cancel(true);
113    }
114}

可以看到,与invokeAll不同的是,invokeAny方法是在循环的启动任务,直到获取到任一任务的返回值为止,而未执行或正在执行的任务则会被中断。

...

ScheduledExecutorService(一)

除了ThreadPoolExecutor之外,Java执行器(Executor)框架还提供了可以在指定延迟之后执行一次或周期执行任务的接口ScheduledExecutorService,较 java.util.Timer而言,它是更好的选择。

线程池不同的是,用于计划执行的ScheduledThreadPoolExecutor使用ScheduledFutureTask作为任务,使用DelayedWorkQueue作为任务队列,以实现计划(周期)执行的目的。

...

CountDownLatch

CountDownLatch #

在讨论线程的基本概念时,我们说过join()方法可使当前线程等待调用join方法的线程执行完,可以实现简单的 无锁同步,使用CountDownLatch可以更加简单的实现这一目的。毕竟,join()方法的语义“加入一个线程”不是很容易就能让人理解。相较于join()方法,CountDownLatch的语义就明确多了。

...

CyclicBarrier

CyclicBarrier #

CyclicBarrier被称为“同步屏障”,事实上就可以把它理解为一个屏障,多个任务调用屏障的await()方法将被阻塞,直到所有的任务都进入阻塞,那么屏障开启,所有任务继续执行。这看起来和CountDownLatch非常像,不过CountDownLatch只能触发一次,而CyclicBarrier可以多次重用,这是它们的主要区别之一。

CountDownLatch一样,CyclicBarrier接受一个整型参数,表示可限制的线程数。除此之外,CyclicBarrier还可以接受一个Runnable作为参数,这个参数称作barrierActionbarrierAction在所有线程到达屏障之后即开始执行,其他任务只能等待barrierAction执行完毕之后才能继续执行,这是CyclicBarrierCountDownLatch的区别之二。

...

Semaphore

Semaphore #

无论是显式锁还是通过synchronized关键字获取的隐式锁,其在任一时刻都只能让一个任务访问资源,而Semaphore(计数信号量)允许多个任务同时访问资源。可以把Semaphore看作是持有对象访问许可(permits)的“security”。访问对象时,须先通过acquire()获取许可,若此时没有许可可用,那么acquire()将阻塞,否则获取许可,可用许可数-1;使用完资源后,通过release()方法返还许可。事实上,并没有实际上的许可证对象,Semaphore通过协同各个线程工作,来达到目的。

Semaphore的构造器接受一个“公平性参数”。不传入此参数或传入false时,线程获取许可的顺序无法保证,即使线程阻塞了很久,其仍然可能被刚调用acquire()方法的线程“抢走”许可,这可能会导致线程“饿死”。当传入true时,Semaphore保证线程获取许可的顺序和其调用acquire()方法之后被执行的顺序一致1,也就是先执行的任务先获取许可(FIFO)。需要说明的是,tryAcquire()方法不遵循公平性原则,如果有许可可用,它直接获取之。在使用Semaphore时,一般将其设置为公平

Semaphore通常用于限制访问资源的线程数量,典型的例子就是控制“池”的并发访问量。下例中使用Semaphore控制池中的对象方法,当需要使用时,可以将它们“签出”(checkout),使用完毕之后再将其“签入”(checkin),使用泛型类封装功能2

 1class Pool<T> {
 2    private final int size;
 3    final List<T> items = new ArrayList<>();
 4    private final boolean[] checkedOut;
 5    private final Semaphore available;
 6
 7    public Pool(Class<T> classObject, int size) {
 8        this.size = size;
 9        checkedOut = new boolean[size];
10        available = new Semaphore(size, true);
11        // Load pool with objects that can be checked out:
12        for (int i = 0; i < size; ++i) {
13            try {
14                // Assumes a default constructor:
15                items.add(classObject.newInstance());
16            } catch (Exception e) {
17                throw new RuntimeException(e);
18            }
19        }
20    }
21
22    T checkOut() throws InterruptedException {
23        available.acquire();
24        return getItem();
25    }
26
27    void checkIn(T x) {
28        if (releaseItem(x)) {
29            available.release();
30            System.out.println("release " + x);
31        }
32    }
33
34    void checkAllIn() {
35        available.release(releaseAll());
36    }
37
38    private synchronized T getItem() {
39        for (int i = 0; i < size; ++i) {
40            if (!checkedOut[i]) {
41                checkedOut[i] = true;
42                return items.get(i);
43            }
44        }
45        // Semaphore prevents reaching here
46        return null;
47    }
48
49    private synchronized boolean releaseItem(T item) {
50        int index = items.indexOf(item);
51        if (index == -1) {
52            return false; // Not in the list
53        }
54        if (checkedOut[index]) {
55            checkedOut[index] = false;
56
57            return true;
58        }
59        // Wasn't checked out
60        return false;
61    }
62
63    private synchronized int releaseAll() {
64        int r = 0;
65        for (int i = 0; i < items.size(); i++) {
66            if (checkedOut[i]) {
67                checkedOut[i] = false;
68                ++r;
69            }
70        }
71        return r;
72    }
73}

这个池使用checkoutcheckIn方法来签出和签入对象,在签出对象之前调用acquire(),如果没有可用对象,那么checkOut将阻塞。由于Semaphore的机制,checkOut方法并不需要使用同步,但是getItem方法则需要同步了,Semaphore协同多线程对资源的访问,但是并不能保证多线程对资源修改的并发安全,这是两回事3checkIn方法则判断给定对象是否被使用,是则签入之,否则不做任何操作,同样的,releaseItem方法也需要使用同步。

...

Exchanger

Exchanger #

Exchanger是在两个任务之间交换对象的栅栏。当这些任务进入栅栏时,各自拥有一个对象,离开时交换它们拥有的对象。栅栏可以用来设计缓存对象,2个任务分别来使用和清空缓存,当缓存空间满时,则在Exchanger上交换缓存,缓存得以重复使用1

...