CompletionService

CompletionService

在提交单个任务时,使用submit()或者execute()方法或许能够满足要求,但如果需要控制多个任务时,依次提交的操作看起来“有些繁琐”,此时我们可以使用ExecutorService提供的invokeAny/invokeAll方法,在介绍CompletionService接口时,我们不妨先看看这两个方法。

之前介绍AbstractExecutorService时提到,这两个方法是在这个抽象类中实现的,其中前者在获取到一个任务的返回值时便取消其他(未执行或正在执行的任务)任务,而后者需要等待所有的任务执行完成之后才能对任务的返回进行处理,接下来我们分别来看:

invokeAll会阻塞等待所有的任务执行完成。

  1public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
  2    throws InterruptedException {
  3    if (tasks == null)
  4        throw new NullPointerException();
  5    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
  6    boolean done = false;
  7    try {
  8        for (Callable<T> t : tasks) {
  9            RunnableFuture<T> f = newTaskFor(t);
 10            futures.add(f);
 11            execute(f);
 12        }
 13        // 有序迭代
 14        for (int i = 0, size = futures.size(); i < size; i++) {
 15            Future<T> f = futures.get(i);
 16            if (!f.isDone()) {
 17                try {
 18                    // 阻塞等待任务执行完成
 19                    f.get();
 20                } catch (CancellationException ignore) {
 21                } catch (ExecutionException ignore) {
 22                }
 23            }
 24        }
 25        done = true;
 26        return futures;
 27    } finally {
 28        if (!done)
 29            // 处理因异常而未正常执行的任务
 30            for (int i = 0, size = futures.size(); i < size; i++)
 31                futures.get(i).cancel(true);
 32    }
 33}
 34
 35// invokeAny
 36public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
 37    throws InterruptedException, ExecutionException {
 38    try {
 39        return doInvokeAny(tasks, false, 0);
 40    } catch (TimeoutException cannotHappen) {
 41        assert false;
 42        return null;
 43    }
 44}
 45private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
 46                          boolean timed, long nanos)
 47    throws InterruptedException, ExecutionException, TimeoutException {
 48    if (tasks == null)
 49        throw new NullPointerException();
 50    int ntasks = tasks.size();
 51    if (ntasks == 0)
 52        throw new IllegalArgumentException();
 53    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
 54    ExecutorCompletionService<T> ecs =
 55        new ExecutorCompletionService<T>(this);
 56
 57    try {
 58        // Record exceptions so that if we fail to obtain any
 59        // result, we can throw the last exception we got.
 60        ExecutionException ee = null;
 61        final long deadline = timed ? System.nanoTime() + nanos : 0L;
 62        Iterator<? extends Callable<T>> it = tasks.iterator();
 63
 64        // Start one task for sure; the rest incrementally
 65        futures.add(ecs.submit(it.next()));
 66        --ntasks;
 67        int active = 1;
 68
 69        for (;;) {
 70            // 并没阻塞第一个任务,此时可能第一个任务还未执行完
 71            Future<T> f = ecs.poll();
 72            if (f == null) {
 73                if (ntasks > 0) {
 74                    --ntasks;
 75                    // 不等待上一个任务的结果,直接新执行一个任务
 76                    futures.add(ecs.submit(it.next()));
 77                    ++active;
 78                }
 79                else if (active == 0)
 80                    break;
 81                else if (timed) {
 82                    f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
 83                    if (f == null)
 84                        throw new TimeoutException();
 85                    nanos = deadline - System.nanoTime();
 86                }
 87                else
 88                    // 没有可执行的任务了,则等待一个结果
 89                    f = ecs.take();
 90            }
 91            // 有结果则返回
 92            if (f != null) {
 93                --active;
 94                try {
 95                    return f.get();
 96                } catch (ExecutionException eex) {
 97                    ee = eex;
 98                } catch (RuntimeException rex) {
 99                    ee = new ExecutionException(rex);
100                }
101            }
102        }
103
104        if (ee == null)
105            ee = new ExecutionException();
106        throw ee;
107
108    } finally {
109        for (int i = 0, size = futures.size(); i < size; i++)
110            // 取消还未执行或者执行中的任务
111            // 中断任务
112            futures.get(i).cancel(true);
113    }
114}

可以看到,与invokeAll不同的是,invokeAny方法是在循环的启动任务,直到获取到任一任务的返回值为止,而未执行或正在执行的任务则会被中断。

下面的示例中,我们修改了 阻塞队列-查找关键字应用,让任务在成功搜寻到含有关键字的文件时就视为任务完成,取消其他任务的执行,这样一种场景之下,我们可以使用invokeAny方法:

  1public class Search1Keyword extends SearchKeyword {
  2
  3    String empty = "";
  4
  5    public static void main(String[] args) {
  6        Search1Keyword s1k = new Search1Keyword();
  7        s1k.find();
  8    }
  9
 10    @Override
 11    void find() {
 12        // 带资源的try块
 13        try (Scanner in = new Scanner(System.in)) {
 14            System.out.print("Enter keyword (e.g. volatile): ");
 15            keyword = in.nextLine();
 16
 17            Producer p = new Producer();
 18            List<Callable<String>> tasks = new ArrayList<>();
 19
 20            ExecutorService pool = Executors.newCachedThreadPool();
 21
 22            for (int i = 1; i <= 10; i++) {
 23                // run consumer
 24                tasks.add(new Consumer1());
 25            }
 26            pool.execute(p);
 27            // 此方法并不那么单纯,其结果只取一个,但是任务可能执行了多个
 28            String res = pool.invokeAny(tasks);
 29            System.out.println(res);
 30            pool.shutdown();
 31        } catch (Exception e) {
 32            e.printStackTrace();
 33        }
 34    }
 35
 36    class Consumer1 implements Callable<String> {
 37
 38        @Override
 39        public String call() throws Exception {
 40            try {
 41                while (!done) {
 42                    File file = queue.take();
 43                    if (file == DUMMY) {
 44                        done = true;
 45                    } else {
 46                        String s = search1(file, keyword);
 47                        if (s.length() > 0) {
 48                            return s;
 49                        }
 50                    }
 51                }
 52            } catch (Exception e) {
 53                // ignore
 54            }
 55            return empty;
 56        }
 57    }
 58
 59    public String search1(File file, String keyword) throws FileNotFoundException {
 60        StringBuilder sb = new StringBuilder("");
 61
 62        try (Scanner in = new Scanner(file, "UTF-8")) {
 63            int lineNumber = 0;
 64            while (in.hasNextLine()) {
 65                if (!Thread.interrupted()) {
 66                    lineNumber++;
 67                    String line = in.nextLine();
 68                    if (line.contains(keyword)) {
 69                        sb.append("[").append(Thread.currentThread().getName()).append("]: ")
 70                            .append(file.getPath()).append(lineNumber).append(line).append("\n");
 71                    }
 72                } else {
 73                    // thread interrupted by future.cancel()
 74                    System.out.printf("[%s] %s%n", Thread.currentThread().getName(), " interrupted");
 75                    return empty;
 76                }
 77            }
 78        }
 79        return sb.toString();
 80    }
 81}
 82/* output (sample1)
 83Enter keyword (e.g. volatile): take
 84[pool-1-thread-5]: TestBlockingQueue.java39    LiftOff take() throws InterruptedException {
 85[pool-1-thread-5]: TestBlockingQueue.java40        return rockets.take();
 86[pool-1-thread-5]: TestBlockingQueue.java65                    LiftOff rocket = take();
 87[pool-1-thread-5]: TestBlockingQueue.java78                System.out.println("Interrupted during take()");
 88
 89[pool-1-thread-11]  interrupted
 90[pool-1-thread-10]  interrupted
 91[pool-1-thread-6]  interrupted
 92[pool-1-thread-4]  interrupted
 93[pool-1-thread-9]  interrupted
 94[pool-1-thread-3]  interrupted
 95[pool-1-thread-7]  interrupted
 96[pool-1-thread-8]  interrupted
 97
 98(sample2)
 99Enter keyword (e.g. volatile): take
100[pool-1-thread-4]: Search1Keyword.java66                    File file = queue.take();
101
102[pool-1-thread-2]  interrupted
103[pool-1-thread-10]  interrupted
104[pool-1-thread-8]  interrupted
105[pool-1-thread-5]  interrupted
106[pool-1-thread-11]  interrupted
107[pool-1-thread-7]  interrupted
108[pool-1-thread-9]  interrupted
109*/

我们将对一个包含关键字的文件进行的完整搜寻视为任务结束,虽然还可能有其他文件还有关键字,但是搜寻任务不再执行。从输出可以看到,输出的只包含一个文件的关键字信息。另外,我们使用10个任务,其中sample1中其他9个任务都被中断,而sample2中只有7个任务被interrupt,说明情况1中,所有的任务都开始执行了,而情况2中,还有未开始执行的任务(其永远不能执行了)。

试着思考一个问题,既然invokeAny只需要获取一个任务的返回值即可,那为什么不直接启动第一个任务然后阻塞获取其返回值,而要启动(那么)多任务呢?启动一个任务不是更加简单么?

我们分析源码时,发现invokeAny使用了ExecutorCompletionService,这个类继承自接口CompletionService,可以用来管理任务提交之后的Future<T>对象——将已经完成的Future其放在一个阻塞队列中取用,这样我们就可以回答上面的问题了:

invokeAny利用ExecutorCompletionService提交任务,并管理任务的返回,这样可以避免单独启动一个任务而需要阻塞很长时间的弊端,启动的多个任务只要有一个任务完成,其放置已完成Future的阻塞队列将变得可用而使invokeAny快速结束。

ExecutorCompletionService的快速用法为:

1ExecutorCompletionService<T> ecs = new ExecutorCompletionService<>(executor) ;
2for(Callable<T> task : tasks){
3    ecs.submit(task);
4}
5for (int i = 0; i < tasks.size() ; i++ ) {
6    // get return value
7    ecs.take().get();
8}