CompletionService
在提交单个任务时,使用submit()
或者execute()
方法或许能够满足要求,但如果需要控制多个任务时,依次提交的操作看起来“有些繁琐”,此时我们可以使用ExecutorService
提供的invokeAny/invokeAll
方法,在介绍CompletionService
接口时,我们不妨先看看这两个方法。
之前介绍AbstractExecutorService
时提到,这两个方法是在这个抽象类中实现的,其中前者在获取到一个任务的返回值时便取消其他(未执行或正在执行的任务)任务,而后者需要等待所有的任务执行完成之后才能对任务的返回进行处理,接下来我们分别来看:
invokeAll
会阻塞等待所有的任务执行完成。
1public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
2 throws InterruptedException {
3 if (tasks == null)
4 throw new NullPointerException();
5 ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
6 boolean done = false;
7 try {
8 for (Callable<T> t : tasks) {
9 RunnableFuture<T> f = newTaskFor(t);
10 futures.add(f);
11 execute(f);
12 }
13 // 有序迭代
14 for (int i = 0, size = futures.size(); i < size; i++) {
15 Future<T> f = futures.get(i);
16 if (!f.isDone()) {
17 try {
18 // 阻塞等待任务执行完成
19 f.get();
20 } catch (CancellationException ignore) {
21 } catch (ExecutionException ignore) {
22 }
23 }
24 }
25 done = true;
26 return futures;
27 } finally {
28 if (!done)
29 // 处理因异常而未正常执行的任务
30 for (int i = 0, size = futures.size(); i < size; i++)
31 futures.get(i).cancel(true);
32 }
33}
34
35// invokeAny
36public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
37 throws InterruptedException, ExecutionException {
38 try {
39 return doInvokeAny(tasks, false, 0);
40 } catch (TimeoutException cannotHappen) {
41 assert false;
42 return null;
43 }
44}
45private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
46 boolean timed, long nanos)
47 throws InterruptedException, ExecutionException, TimeoutException {
48 if (tasks == null)
49 throw new NullPointerException();
50 int ntasks = tasks.size();
51 if (ntasks == 0)
52 throw new IllegalArgumentException();
53 ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
54 ExecutorCompletionService<T> ecs =
55 new ExecutorCompletionService<T>(this);
56
57 try {
58 // Record exceptions so that if we fail to obtain any
59 // result, we can throw the last exception we got.
60 ExecutionException ee = null;
61 final long deadline = timed ? System.nanoTime() + nanos : 0L;
62 Iterator<? extends Callable<T>> it = tasks.iterator();
63
64 // Start one task for sure; the rest incrementally
65 futures.add(ecs.submit(it.next()));
66 --ntasks;
67 int active = 1;
68
69 for (;;) {
70 // 并没阻塞第一个任务,此时可能第一个任务还未执行完
71 Future<T> f = ecs.poll();
72 if (f == null) {
73 if (ntasks > 0) {
74 --ntasks;
75 // 不等待上一个任务的结果,直接新执行一个任务
76 futures.add(ecs.submit(it.next()));
77 ++active;
78 }
79 else if (active == 0)
80 break;
81 else if (timed) {
82 f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
83 if (f == null)
84 throw new TimeoutException();
85 nanos = deadline - System.nanoTime();
86 }
87 else
88 // 没有可执行的任务了,则等待一个结果
89 f = ecs.take();
90 }
91 // 有结果则返回
92 if (f != null) {
93 --active;
94 try {
95 return f.get();
96 } catch (ExecutionException eex) {
97 ee = eex;
98 } catch (RuntimeException rex) {
99 ee = new ExecutionException(rex);
100 }
101 }
102 }
103
104 if (ee == null)
105 ee = new ExecutionException();
106 throw ee;
107
108 } finally {
109 for (int i = 0, size = futures.size(); i < size; i++)
110 // 取消还未执行或者执行中的任务
111 // 中断任务
112 futures.get(i).cancel(true);
113 }
114}
可以看到,与invokeAll
不同的是,invokeAny
方法是在循环的启动任务,直到获取到任一任务的返回值为止,而未执行或正在执行的任务则会被中断。
下面的示例中,我们修改了
阻塞队列-查找关键字应用,让任务在成功搜寻到含有关键字的文件时就视为任务完成,取消其他任务的执行,这样一种场景之下,我们可以使用invokeAny
方法:
1public class Search1Keyword extends SearchKeyword {
2
3 String empty = "";
4
5 public static void main(String[] args) {
6 Search1Keyword s1k = new Search1Keyword();
7 s1k.find();
8 }
9
10 @Override
11 void find() {
12 // 带资源的try块
13 try (Scanner in = new Scanner(System.in)) {
14 System.out.print("Enter keyword (e.g. volatile): ");
15 keyword = in.nextLine();
16
17 Producer p = new Producer();
18 List<Callable<String>> tasks = new ArrayList<>();
19
20 ExecutorService pool = Executors.newCachedThreadPool();
21
22 for (int i = 1; i <= 10; i++) {
23 // run consumer
24 tasks.add(new Consumer1());
25 }
26 pool.execute(p);
27 // 此方法并不那么单纯,其结果只取一个,但是任务可能执行了多个
28 String res = pool.invokeAny(tasks);
29 System.out.println(res);
30 pool.shutdown();
31 } catch (Exception e) {
32 e.printStackTrace();
33 }
34 }
35
36 class Consumer1 implements Callable<String> {
37
38 @Override
39 public String call() throws Exception {
40 try {
41 while (!done) {
42 File file = queue.take();
43 if (file == DUMMY) {
44 done = true;
45 } else {
46 String s = search1(file, keyword);
47 if (s.length() > 0) {
48 return s;
49 }
50 }
51 }
52 } catch (Exception e) {
53 // ignore
54 }
55 return empty;
56 }
57 }
58
59 public String search1(File file, String keyword) throws FileNotFoundException {
60 StringBuilder sb = new StringBuilder("");
61
62 try (Scanner in = new Scanner(file, "UTF-8")) {
63 int lineNumber = 0;
64 while (in.hasNextLine()) {
65 if (!Thread.interrupted()) {
66 lineNumber++;
67 String line = in.nextLine();
68 if (line.contains(keyword)) {
69 sb.append("[").append(Thread.currentThread().getName()).append("]: ")
70 .append(file.getPath()).append(lineNumber).append(line).append("\n");
71 }
72 } else {
73 // thread interrupted by future.cancel()
74 System.out.printf("[%s] %s%n", Thread.currentThread().getName(), " interrupted");
75 return empty;
76 }
77 }
78 }
79 return sb.toString();
80 }
81}
82/* output (sample1)
83Enter keyword (e.g. volatile): take
84[pool-1-thread-5]: TestBlockingQueue.java39 LiftOff take() throws InterruptedException {
85[pool-1-thread-5]: TestBlockingQueue.java40 return rockets.take();
86[pool-1-thread-5]: TestBlockingQueue.java65 LiftOff rocket = take();
87[pool-1-thread-5]: TestBlockingQueue.java78 System.out.println("Interrupted during take()");
88
89[pool-1-thread-11] interrupted
90[pool-1-thread-10] interrupted
91[pool-1-thread-6] interrupted
92[pool-1-thread-4] interrupted
93[pool-1-thread-9] interrupted
94[pool-1-thread-3] interrupted
95[pool-1-thread-7] interrupted
96[pool-1-thread-8] interrupted
97
98(sample2)
99Enter keyword (e.g. volatile): take
100[pool-1-thread-4]: Search1Keyword.java66 File file = queue.take();
101
102[pool-1-thread-2] interrupted
103[pool-1-thread-10] interrupted
104[pool-1-thread-8] interrupted
105[pool-1-thread-5] interrupted
106[pool-1-thread-11] interrupted
107[pool-1-thread-7] interrupted
108[pool-1-thread-9] interrupted
109*/
我们将对一个包含关键字的文件进行的完整搜寻视为任务结束,虽然还可能有其他文件还有关键字,但是搜寻任务不再执行。从输出可以看到,输出的只包含一个文件的关键字信息。另外,我们使用10个任务,其中sample1中其他9个任务都被中断,而sample2中只有7个任务被interrupt,说明情况1中,所有的任务都开始执行了,而情况2中,还有未开始执行的任务(其永远不能执行了)。
试着思考一个问题,既然invokeAny
只需要获取一个任务的返回值即可,那为什么不直接启动第一个任务然后阻塞获取其返回值,而要启动(那么)多任务呢?启动一个任务不是更加简单么?
我们分析源码时,发现invokeAny使用了ExecutorCompletionService
,这个类继承自接口CompletionService
,可以用来管理任务提交之后的Future<T>
对象——将已经完成的Future
其放在一个阻塞队列中取用,这样我们就可以回答上面的问题了:
invokeAny
利用ExecutorCompletionService
提交任务,并管理任务的返回,这样可以避免单独启动一个任务而需要阻塞很长时间的弊端,启动的多个任务只要有一个任务完成,其放置已完成Future的阻塞队列将变得可用而使invokeAny快速结束。
ExecutorCompletionService
的快速用法为:
1ExecutorCompletionService<T> ecs = new ExecutorCompletionService<>(executor) ;
2for(Callable<T> task : tasks){
3 ecs.submit(task);
4}
5for (int i = 0; i < tasks.size() ; i++ ) {
6 // get return value
7 ecs.take().get();
8}