获取任务的返回值
获取任务的返回值 #
要创建一个任务,通常实现Runnable
接口。不幸的是,Runnable
接口的run()
方法返回void
,因此,其并不适合处理计算任务。
考虑一个经典的问题:用多线程分段计算0-100的加和,我们需要把每个线程计算的值汇总,然后再求和,那么应该怎样获取每个任务返回值呢?
Java提供了Callable
和Future
接口,使任务有提供返回值的能力。
Callable 接口 #
V call() throws Exception
Callable
接口只有一个方法call()
,和Runnable
接口不同的是call()
方法有返回值并且抛出受查异常(checked exception)。
利用Callable接口,上述问题可以轻松解决:
1public class DividedCalculate {
2 static class Task implements Callable<Integer> {
3 int min;
4 int max;
5 public Task(int min, int max) {
6 this.min = min;
7 this.max = max;
8 }
9
10 @Override
11 public Integer call() {
12 int sum = 0;
13 for (int i = min; i < max; i++) {
14 sum += i;
15 }
16 return sum;
17 }
18 }
19
20 @SneakyThrows
21 public static void main(String[] args) {
22 ExecutorService pool = Executors.newCachedThreadPool();
23
24 Future<Integer> s3 = pool.submit(new Task(51, 76));
25 Future<Integer> s2 = pool.submit(new Task(26, 51));
26 Future<Integer> s4 = pool.submit(new Task(76, 101));
27 Future<Integer> s1 = pool.submit(new Task(1, 26));
28
29 pool.shutdown();
30 System.out.printf("%d + %d + %d + %d = %d",
31 s1.get(), s2.get(), s3.get(), s4.get(),
32 s1.get() + s2.get() + s3.get() + s4.get());
33 }
34}
35
36/* output:
37325 + 950 + 1575 + 2200 = 5050
38*///:~
我们使用执行器提交任务,执行器的submit()
方法返回一个带有返回值参数类型的Future<T>
对象:
<T> Future<T> submit(Callable<T> task)
Future 接口 #
pubic interface Future<V>
上面的示例中我们使用Future<Integer>
来接收任务的返回值,由此可见接口声明的类型参数就是Callable
接口的返回类型。
Future
封装了异步计算的返回结果。除此之外,Future
还提供了一些实用方法来判断任务的执行状态。
Future接口支持的方法 #
boolean cancel(boolean mayInterruptIfRunning)
尝试取消任务的执行,实际上是向任务发送一个中断(interrupt())信号。
布尔值参数为true表示向这个任务发送中断信号,false则不发送中断信号。
这个方法返回之后调用isDone()总是返回true;
如果此方法返回true,调用isCanceled()总是返回true。
返回false的情形:
- 任务已经执行完毕;
- 任务已经被取消;
- 由于某些原因不能被取消;
返回true表示任务被成功取消。
boolean isCancelled()
如果任务**正常**完成之前被取消则返回true。
boolean isDone()
如果任务完成则返回true。
注意任务可能是正常执行完成,抛出异常而终止,或者通过isCancel()方法被取消。
上述3种情况任意一种都会导致此方法返回true。
V get() throws InterruptedException,ExecutionException
等待任务执行完成并获取返回值。
调用此方法会抛出异常
- 若方法被取消,抛出CancellationException;
- 若方法执行异常,抛出ExecutionException;
- 若方法在等待过程中被中断,则抛出InterruptedException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException,TimeoutException
在指定超时限制内等待任务执行并获取返回值。
抛出异常和get()方法一样,除外多了一个TimeOutException,超时异常。
特别地,如果只想使用Future
的可取消任务的特性,而不需要任务返回值,那么可以将Future
声明为Future<?>
并且将任务返回null
。
1public class CancelableTask {
2 static class Cancelable<V> implements Callable<V> {
3 @Override
4 public V call() throws Exception {
5 System.out.println("---");
6 int i = 0;
7 while (true) {
8 i++;
9 if (i > 100000) {
10 break;
11 }
12 }
13 return null;
14 }
15 }
16
17 public static void main(String[] args) {
18 ExecutorService service = Executors.newSingleThreadExecutor();
19 Future<?> submit = service.submit(new Cancelable<>());
20 System.out.println(submit.cancel(true));
21 System.out.println(submit.isCancelled());
22 System.out.println(submit.isDone());
23 service.shutdown();
24 }
25}
26
27/* output:
28true
29true
30true
31*///~
由于Callable
实例无法通过Thread
类运行(Thread
类是Runnable
接口的实现,并且只能通过Runnable
初始化),于是我们在之前的分步计算中使用了执行器的submit()
方法来获取任务的返回值。
Java提供了另一个有用的类FutureTask
,用来包装Callable
或Runnable
实例。由于其实现了Future
接口,其能够实现Future
接口的功能;又由于其实现了Runnable
接口,其又能被显示线程或者执行器执行。
FutureTask 类 #
public class FutureTask<V> extends Object implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V>
从类继承关系可以看到FutureTask
类同时实现了Future
和Runnable
接口,因此FutureTask
实例是一个可以取消的异步任务,同步也能够使用Future<V>
获取任务返回值。从灵活性上来说,其可以用Thread
类包装运行或者直接提交(submit)给执行器。
FutureTask构造器 #
FutureTask(Callable<V> callable)
FutureTask(Runnable runnable, V result)
result是返回类型,如果不需要,可以使用如下形式:
Future<?> f = new FutureTask<Void>(runnable, null)
FutureTask方法 #
// 实现Future的方法
public boolean isCancelled()
public boolean isDone()
public boolean cancel(boolean mayInterruptIfRunning)
public V get() throws InterruptedException, ExecutionException
public V get(long timeout,TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException
// 实现Runnable的方法
public void run()
// protected方法
protected void done()
这个方法在任务执行(正常执行或抛出异常)完成之后被调用。默认实现不执行任何操作,
导出类可以覆盖这个方法并执行相关操作。覆盖方法可以查询任务状态去判断任务是否被取消。
protected void set(V v)
若任务没有返回值或已取消执行,为Future设置返回值。这个方法在任务成功执行完成之前
被run()方法调用。
protected void setException(Throwable t)
若任务没有设置异常或已取消执行,为任务设置任务执行时抛出的异常(ExecutionException)。
这个方法在任务执行失败时被run()方法调用。
protected boolean runAndReset()
这个方法为那些需要多次执行的任务设计。此方法执行任务但是不设置返回值,并将Future设置
为初始状态。若任务出现异常或被取消或已经执行完成,则此方法执行失败。
下面的代码示例展示了FutureTask类中run()
和runAndReset()
方法的区别:
1public class FutureTaskImpl<V> extends FutureTask<V> {
2 private int runTime = 0;
3 private boolean isDone = false;
4
5 public FutureTaskImpl(Callable<V> callable) {
6 super(callable);
7 }
8 public FutureTaskImpl(Runnable runnable, V result) {
9 super(runnable, result);
10 }
11
12 @Override
13 protected void done() {
14 if (isCancelled()) {
15 System.out.println("task is canceled");
16 return;
17 }
18 isDone = true;
19 runTime++;
20 }
21
22 @Override
23 protected boolean runAndReset() {
24 if (super.runAndReset()) {
25 runTime++;
26 } else {
27 return false;
28 }
29 return true;
30 }
31
32 static class Task implements Runnable {
33 @Override
34 public void run() {
35 // do something
36 }
37 }
38
39 static class Task2 implements Callable<Integer> {
40
41 @Override
42 public Integer call() throws Exception {
43 int sum = 0;
44 for (int i = 0; i < 100; i++) {
45 sum += i;
46 }
47 return sum;
48 }
49 }
50
51 /**
52 * 先执行{@link FutureTask#run()}再执行{@link #runAndReset()}
53 * <p>
54 * 任务不可执行
55 */
56 void resetAfterRun() {
57 run();
58 System.out.println(runAndReset()); // false
59 System.out.println("runTime:" + runTime);
60 System.out.println("isDone:" + isDone);
61 }
62
63 /**
64 * 先执行{@link #runAndReset()}再执行{@link FutureTask#run()}
65 * <p>
66 * 任务可以再次执行
67 *
68 * 对于有返回值的任务,执行{@link #runAndReset()}之后
69 * 调用{@link FutureTask#get()}
70 * 方法获取返回值会造成阻塞
71 */
72 @SneakyThrows
73 void runAfterReset() {
74 for (; ; ) {
75 runAndReset();
76 if (runTime > 1) break;
77 }
78// V v = get(); // blocked
79 System.out.println("isDone: " + isDone); // false
80 run();
81 System.out.println("runTime: " + runTime);
82 V v1 = get();
83 System.out.println("result: " + v1);
84 System.out.println("isDone: " + isDone); // true
85 }
86
87 public static void main(String[] args) {
88 // 构造一个没有返回值的FutureTask
89 FutureTaskImpl<?> ft = new FutureTaskImpl<>(new Task(), null);
90 FutureTaskImpl<?> ft2 = new FutureTaskImpl<>(new Task2());
91 ft2.runAfterReset();
92// ft.resetAfterRun();
93 }
94}
95/* output:
96isDone: false
97runTime: 3
98result: 4950
99isDone: true
100*///~
可以看到,我们计划在循环中让任务执行runAndReset()
2次,之后尝试去调用get()
方法,发现进程会一直阻塞,这也和api文档中描述的一致(without setting its result, and then resets this future to initial state),说明任务没有执行完成而且是处于初始状态。
接下来的isDone()
方法返回false也验证了这点,接着调用run()
方法再次运行任务,最后获取任务的返回值,看到任务共执行了3次,最后的结果是最后一次run()
方法返回的结果,接着的isDone()
方法返回true,说明任务执行完成。
相反地,如果先运行run()
方法,再尝试运行runAndReset()
,后者直接返回false
。
应用示例 #
在
抢票问题中,为了获取每个线程抢到的票数,我们使用了ThreadLocal
来存放当前线程和其抢到的票(自定义bean)的信息,并在任务执行完成之后将其返回,以便程序完成之后明确地知道每个线程抢到的票数。
之所以使用自定义bean使任务包含线程信息而不使任务直接返回其抢到的票数,是因为线程池无法操作线程,更加无法在线程池的维度获取当前运行任务的线程信息。
利用FutureTask
对象,我们则可以通过显示的构造线程来简化任务的代码:
1public class TicketIssueWithFutureTask extends TicketIssue {
2
3 private final HashMap<Thread, Future<Integer>> resultMap =
4 new HashMap<>();
5
6 static class Purchase implements Callable<Integer> {
7
8 // 线程抢到的票计数器
9 // 线程内部存储一般声明为static
10 private static ThreadLocal<Integer> tl =
11 ThreadLocal.withInitial(() -> 0);
12
13 private final Tick tick;
14
15 Purchase(Tick tick) {
16 this.tick = tick;
17 }
18
19 @Override
20 public Integer call() {
21 while (true) {
22 synchronized (tick) {
23 if (tick.getTick()) {
24 tl.set(tl.get() + 1);
25 try {
26 // 给其他线程机会
27 tick.wait(10);
28 } catch (InterruptedException e) {
29 e.printStackTrace();
30 }
31 } else {
32 if (!tick.isTickSupply) break;
33 }
34 }
35 }
36 return tl.get();
37 }
38 }
39
40 @Override
41 void multiPurchase(int threadCount)
42 throws ExecutionException, InterruptedException {
43 for (int i = 0; i < threadCount; i++) {
44 // FutureTask实现了Runnable,可以在显式线程执行之后再通过其获取返回值
45 // 当然,也可以通过执行器执行
46 FutureTask<Integer> ft = new FutureTask<>(new Purchase(tick));
47 Thread t = new Thread(ft);
48 t.start();
49 resultMap.put(t, ft);
50 }
51 int sum = 0;
52 for (Map.Entry<Thread, Future<Integer>> entry :
53 resultMap.entrySet()) {
54 System.out.println(entry.getKey().getName()
55 + " 抢到票:" + entry.getValue().get() + "张");
56 s um = sum + entry.getValue().get();
57 }
58 System.out.println("已购票数:" + sum);
59 }
60
61
62 public static void main(String[] args) throws Exception {
63 TicketIssueWithFutureTask ti = new TicketIssueWithFutureTask();
64
65 ti.singleSupply(10);
66 ti.multiPurchase(12);
67 }
68}
使用FutureTask
之后,使用显式的线程对应每个线程的返回值,就可以获得想要的信息。