获取任务的返回值

获取任务的返回值

获取任务的返回值 #

要创建一个任务,通常实现Runnable接口。不幸的是,Runnable接口的run()方法返回void,因此,其并不适合处理计算任务。

考虑一个经典的问题:用多线程分段计算0-100的加和,我们需要把每个线程计算的值汇总,然后再求和,那么应该怎样获取每个任务返回值呢?

Java提供了CallableFuture接口,使任务有提供返回值的能力。

Callable 接口 #

V call() throws Exception

Callable接口只有一个方法call(),和Runnable接口不同的是call()方法有返回值并且抛出受查异常(checked exception)。

利用Callable接口,上述问题可以轻松解决:

 1public class DividedCalculate {
 2    static class Task implements Callable<Integer> {
 3        int min;
 4        int max;
 5        public Task(int min, int max) {
 6            this.min = min;
 7            this.max = max;
 8        }
 9
10        @Override
11        public Integer call() {
12            int sum = 0;
13            for (int i = min; i < max; i++) {
14                sum += i;
15            }
16            return sum;
17        }
18    }
19
20    @SneakyThrows
21    public static void main(String[] args) {
22        ExecutorService pool = Executors.newCachedThreadPool();
23
24        Future<Integer> s3 = pool.submit(new Task(51, 76));
25        Future<Integer> s2 = pool.submit(new Task(26, 51));
26        Future<Integer> s4 = pool.submit(new Task(76, 101));
27        Future<Integer> s1 = pool.submit(new Task(1, 26));
28
29        pool.shutdown();
30        System.out.printf("%d + %d + %d + %d = %d",
31        s1.get(), s2.get(), s3.get(), s4.get(),
32        s1.get() + s2.get() + s3.get() + s4.get());
33    }
34}
35
36/* output:
37325 + 950 + 1575 + 2200 = 5050
38*///:~

我们使用执行器提交任务,执行器的submit()方法返回一个带有返回值参数类型的Future<T>对象:

<T> Future<T> submit(Callable<T> task)

Future 接口 #

pubic interface Future<V>

上面的示例中我们使用Future<Integer>来接收任务的返回值,由此可见接口声明的类型参数就是Callable接口的返回类型。

Future封装了异步计算的返回结果。除此之外,Future还提供了一些实用方法来判断任务的执行状态。

Future接口支持的方法 #

boolean cancel(boolean mayInterruptIfRunning)
    尝试取消任务的执行,实际上是向任务发送一个中断(interrupt())信号。
    布尔值参数为true表示向这个任务发送中断信号,false则不发送中断信号。
    这个方法返回之后调用isDone()总是返回true;
    如果此方法返回true,调用isCanceled()总是返回true。

    返回false的情形:
    - 任务已经执行完毕;
    - 任务已经被取消;
    - 由于某些原因不能被取消;

    返回true表示任务被成功取消。

boolean isCancelled()
    如果任务**正常**完成之前被取消则返回true。

boolean isDone()
    如果任务完成则返回true。
    注意任务可能是正常执行完成,抛出异常而终止,或者通过isCancel()方法被取消。
    上述3种情况任意一种都会导致此方法返回true。

V get() throws InterruptedException,ExecutionException
    等待任务执行完成并获取返回值。
    调用此方法会抛出异常
        - 若方法被取消,抛出CancellationException;
        - 若方法执行异常,抛出ExecutionException;
        - 若方法在等待过程中被中断,则抛出InterruptedException;

V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException,TimeoutException
    在指定超时限制内等待任务执行并获取返回值。
    抛出异常和get()方法一样,除外多了一个TimeOutException,超时异常。

特别地,如果只想使用Future的可取消任务的特性,而不需要任务返回值,那么可以将Future声明为Future<?>并且将任务返回null

 1public class CancelableTask {
 2    static class Cancelable<V> implements Callable<V> {
 3        @Override
 4        public V call() throws Exception {
 5            System.out.println("---");
 6            int i = 0;
 7            while (true) {
 8                i++;
 9                if (i > 100000) {
10                    break;
11                }
12            }
13            return null;
14        }
15    }
16
17    public static void main(String[] args) {
18        ExecutorService service = Executors.newSingleThreadExecutor();
19        Future<?> submit = service.submit(new Cancelable<>());
20        System.out.println(submit.cancel(true));
21        System.out.println(submit.isCancelled());
22        System.out.println(submit.isDone());
23        service.shutdown();
24    }
25}
26
27/* output:
28true
29true
30true
31*///~

由于Callable实例无法通过Thread类运行(Thread类是Runnable接口的实现,并且只能通过Runnable初始化),于是我们在之前的分步计算中使用了执行器的submit()方法来获取任务的返回值。

Java提供了另一个有用的类FutureTask,用来包装CallableRunnable实例。由于其实现了Future接口,其能够实现Future接口的功能;又由于其实现了Runnable接口,其又能被显示线程或者执行器执行。

FutureTask 类 #

public class FutureTask<V> extends Object implements RunnableFuture<V>

public interface RunnableFuture<V> extends Runnable, Future<V>

从类继承关系可以看到FutureTask类同时实现了FutureRunnable接口,因此FutureTask实例是一个可以取消的异步任务,同步也能够使用Future<V>获取任务返回值。从灵活性上来说,其可以用Thread类包装运行或者直接提交(submit)给执行器。

FutureTask构造器 #

FutureTask(Callable<V> callable)

FutureTask(Runnable runnable, V result)
    result是返回类型,如果不需要,可以使用如下形式:
    Future<?> f = new FutureTask<Void>(runnable, null)

FutureTask方法 #

// 实现Future的方法
public boolean isCancelled()
public boolean isDone()
public boolean cancel(boolean mayInterruptIfRunning)
public V get() throws InterruptedException, ExecutionException
public V get(long timeout,TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException

// 实现Runnable的方法
public void run()

// protected方法
protected void done()
    这个方法在任务执行(正常执行或抛出异常)完成之后被调用。默认实现不执行任何操作,
    导出类可以覆盖这个方法并执行相关操作。覆盖方法可以查询任务状态去判断任务是否被取消。

protected void set(V v)
    若任务没有返回值或已取消执行,为Future设置返回值。这个方法在任务成功执行完成之前
    被run()方法调用。

protected void setException(Throwable t)
    若任务没有设置异常或已取消执行,为任务设置任务执行时抛出的异常(ExecutionException)。
    这个方法在任务执行失败时被run()方法调用。

protected boolean runAndReset()
    这个方法为那些需要多次执行的任务设计。此方法执行任务但是不设置返回值,并将Future设置
    为初始状态。若任务出现异常或被取消或已经执行完成,则此方法执行失败。

下面的代码示例展示了FutureTask类中run()runAndReset()方法的区别:

  1public class FutureTaskImpl<V> extends FutureTask<V> {
  2    private int runTime = 0;
  3    private boolean isDone = false;
  4
  5    public FutureTaskImpl(Callable<V> callable) {
  6        super(callable);
  7    }
  8    public FutureTaskImpl(Runnable runnable, V result) {
  9        super(runnable, result);
 10    }
 11
 12    @Override
 13    protected void done() {
 14        if (isCancelled()) {
 15            System.out.println("task is canceled");
 16            return;
 17        }
 18        isDone = true;
 19        runTime++;
 20    }
 21
 22    @Override
 23    protected boolean runAndReset() {
 24        if (super.runAndReset()) {
 25            runTime++;
 26        } else {
 27            return false;
 28        }
 29        return true;
 30    }
 31
 32    static class Task implements Runnable {
 33        @Override
 34        public void run() {
 35            // do something
 36        }
 37    }
 38
 39    static class Task2 implements Callable<Integer> {
 40
 41        @Override
 42        public Integer call() throws Exception {
 43            int sum = 0;
 44            for (int i = 0; i < 100; i++) {
 45                sum += i;
 46            }
 47            return sum;
 48        }
 49    }
 50
 51    /**
 52     * 先执行{@link FutureTask#run()}再执行{@link #runAndReset()}
 53     * <p>
 54     * 任务不可执行
 55     */
 56    void resetAfterRun() {
 57        run();
 58        System.out.println(runAndReset()); // false
 59        System.out.println("runTime:" + runTime);
 60        System.out.println("isDone:" + isDone);
 61    }
 62
 63    /**
 64     * 先执行{@link #runAndReset()}再执行{@link FutureTask#run()}
 65     * <p>
 66     * 任务可以再次执行
 67     *
 68     * 对于有返回值的任务,执行{@link #runAndReset()}之后
 69     * 调用{@link FutureTask#get()}
 70     * 方法获取返回值会造成阻塞
 71     */
 72    @SneakyThrows
 73    void runAfterReset() {
 74        for (; ; ) {
 75            runAndReset();
 76            if (runTime > 1) break;
 77        }
 78//        V v = get(); // blocked
 79        System.out.println("isDone: " + isDone); // false
 80        run();
 81        System.out.println("runTime: " + runTime);
 82        V v1 = get();
 83        System.out.println("result: " + v1);
 84        System.out.println("isDone: " + isDone); // true
 85    }
 86
 87    public static void main(String[] args) {
 88        // 构造一个没有返回值的FutureTask
 89        FutureTaskImpl<?> ft = new FutureTaskImpl<>(new Task(), null);
 90        FutureTaskImpl<?> ft2 = new FutureTaskImpl<>(new Task2());
 91        ft2.runAfterReset();
 92//        ft.resetAfterRun();
 93    }
 94}
 95/* output:
 96isDone: false
 97runTime: 3
 98result: 4950
 99isDone: true
100*///~

可以看到,我们计划在循环中让任务执行runAndReset()2次,之后尝试去调用get()方法,发现进程会一直阻塞,这也和api文档中描述的一致(without setting its result, and then resets this future to initial state),说明任务没有执行完成而且是处于初始状态。

接下来的isDone()方法返回false也验证了这点,接着调用run()方法再次运行任务,最后获取任务的返回值,看到任务共执行了3次,最后的结果是最后一次run()方法返回的结果,接着的isDone()方法返回true,说明任务执行完成。

相反地,如果先运行run()方法,再尝试运行runAndReset(),后者直接返回false

应用示例 #

抢票问题中,为了获取每个线程抢到的票数,我们使用了ThreadLocal来存放当前线程和其抢到的票(自定义bean)的信息,并在任务执行完成之后将其返回,以便程序完成之后明确地知道每个线程抢到的票数。

之所以使用自定义bean使任务包含线程信息而不使任务直接返回其抢到的票数,是因为线程池无法操作线程,更加无法在线程池的维度获取当前运行任务的线程信息。

利用FutureTask对象,我们则可以通过显示的构造线程来简化任务的代码:

 1public class TicketIssueWithFutureTask extends TicketIssue {
 2
 3    private final HashMap<Thread, Future<Integer>> resultMap = 
 4        new HashMap<>();
 5
 6    static class Purchase implements Callable<Integer> {
 7
 8        // 线程抢到的票计数器
 9        // 线程内部存储一般声明为static
10        private static ThreadLocal<Integer> tl = 
11            ThreadLocal.withInitial(() -> 0);
12
13        private final Tick tick;
14
15        Purchase(Tick tick) {
16            this.tick = tick;
17        }
18
19        @Override
20        public Integer call() {
21            while (true) {
22                synchronized (tick) {
23                    if (tick.getTick()) {
24                        tl.set(tl.get() + 1);
25                        try {
26                            // 给其他线程机会
27                            tick.wait(10);
28                        } catch (InterruptedException e) {
29                            e.printStackTrace();
30                        }
31                    } else {
32                        if (!tick.isTickSupply) break;
33                    }
34                }
35            }
36            return tl.get();
37        }
38    }
39
40    @Override
41    void multiPurchase(int threadCount) 
42        throws ExecutionException, InterruptedException {
43        for (int i = 0; i < threadCount; i++) {
44            // FutureTask实现了Runnable,可以在显式线程执行之后再通过其获取返回值
45            // 当然,也可以通过执行器执行
46            FutureTask<Integer> ft = new FutureTask<>(new Purchase(tick));
47            Thread t = new Thread(ft);
48            t.start();
49            resultMap.put(t, ft);
50        }
51        int sum = 0;
52        for (Map.Entry<Thread, Future<Integer>> entry :
53            resultMap.entrySet()) {
54                System.out.println(entry.getKey().getName() 
55                + " 抢到票:" + entry.getValue().get() + "张");
56            s   um = sum + entry.getValue().get();
57        }
58        System.out.println("已购票数:" + sum);
59    }
60
61
62    public static void main(String[] args) throws Exception {
63        TicketIssueWithFutureTask ti = new TicketIssueWithFutureTask();
64
65        ti.singleSupply(10);
66        ti.multiPurchase(12);
67    }
68}

使用FutureTask之后,使用显式的线程对应每个线程的返回值,就可以获得想要的信息。