ScheduledExecutorService(一)

ScheduledExecutorService(一)

除了ThreadPoolExecutor之外,Java执行器(Executor)框架还提供了可以在指定延迟之后执行一次或周期执行任务的接口ScheduledExecutorService,较 java.util.Timer而言,它是更好的选择。

线程池不同的是,用于计划执行的ScheduledThreadPoolExecutor使用ScheduledFutureTask作为任务,使用DelayedWorkQueue作为任务队列,以实现计划(周期)执行的目的。

xx

ScheduledThreadPoolExecutor继承关系图

ScheduledThreadPoolExecutor的继承关系图可以看到,其是ThreadPoolExecutor的导出类,其提交任务和执行任务以及关闭线程池的逻辑应和线程池相差无几,其重点差别在于任务对象以及任务队列的封装上,后文将会详述ScheduledThreadPoolExecutor的任务计划执行以及周期执行机制。

ScheduledExecutorService #

继承自ExecutorService接口,其方法定义了一个可以用于在指定延迟之后执行一次或周期执行的ExecutorService,它主要定义了如下4个方法:

// 继承自ExecutorService 和Executor的方法被省略

<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
    /* 在给定的延迟之后执行Callable任务,立即返回ScheduledFuture<V>,
    其可以获取任务的结果或者取消任务*/

ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
     /* 在给定的延迟之后执行Runnable任务,
     立即返回ScheduledFuture<?>,其get()方法返回null*/

ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
                                        long period, TimeUnit unit)
    /* 在给定的初始延迟initialDelay之后执行Runnable任务,
    接着在给定的时间间隔period之后再次执行任务,
    接着再间隔period之后再次执行任务...

    如果某次任务的执行耗时 > period,下次的计划执行将被延后,
    并不会同时执行多个任务

    如果某次执行抛出异常,那么接下来的执行将被中止。
    周期执行的任务只有线程池终止之后才会停止执行,也
    就是说周期任务永远不会主动完成

    返回值ScheduledFuture<?>代表将要执行的任务,
    取消任务时,其get()方法会抛出异常*/

ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
                                            long delay, TimeUnit unit)
    /* 在给定的初始延迟之后执行Runnable任务,
    接着在任务完成之后延迟delay之后再次执行,接着在上一个
    任务完成之后延迟delay再次执行...

    如果某次执行抛出异常,那么接下来的执行将被中止。
    周期执行的任务只有线程池终止之后才会停止执行,也
    就是说周期任务永远不会主动完成

    返回值ScheduledFuture<?>代表将要执行的任务,
    取消任务时,其get()方法会抛出异常*/

ScheduledThreadPoolExecutor #

由于其是ThreadPoolExecutor的导出类,故其主要逻辑和其父类一致,本节的讨论着重于二者差异的部分。

构造器 #

ScheduledThreadPoolExecutor的构造器就不再赘述了,基本上是父类的构造参数中抽取了几个便于理解的构造器,将其分列如下:

 1public ScheduledThreadPoolExecutor(int corePoolSize) {
 2    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
 3          new DelayedWorkQueue());
 4}
 5public ScheduledThreadPoolExecutor(int corePoolSize,
 6                               ThreadFactory threadFactory) {
 7    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
 8          new DelayedWorkQueue(), threadFactory);
 9}
10public ScheduledThreadPoolExecutor(int corePoolSize,
11                                   RejectedExecutionHandler handler) {
12    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
13          new DelayedWorkQueue(), handler);
14}
15public ScheduledThreadPoolExecutor(int corePoolSize,
16                                   ThreadFactory threadFactory,
17                                   RejectedExecutionHandler handler) {
18    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
19          new DelayedWorkQueue(), threadFactory, handler);
20}

ScheduledThreadPoolExecutor的实例均使用Integer.MAX_VALUE作为最大线程池数,这是否意味着其可以使用无限制的线程去运行任务呢?答案是否定的,ScheduledThreadPoolExecutor保证了其池中的线程数不会超过corePoolSize1

#

除了构造器中指定的参数之外,ScheduledThreadPoolExecutor还有一些其他参数,这些参数都可以在ScheduledThreadPoolExecutor初始化完成之后再进行动态配置。

 1/** 线程池shutdown之后是否继续执行周期任务,true执行,默认为false*/
 2private volatile boolean continueExistingPeriodicTasksAfterShutdown;
 3
 4/** 线程池shutdown之后是否继续执行计划任务,true执行,默认为true*/
 5private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
 6
 7/** 取消任务时是否将任务从队列中移除,true移除,默认false*/
 8private volatile boolean removeOnCancel = false;
 9
10/** 任务添加的顺序,初始化ScheduledFutureTask时使用*/
11private static final AtomicLong sequencer = new AtomicLong();

方法 #

ScheduledThreadPoolExecutor使用最多的还是实现自ScheduledExecutorService接口的4个方法,用于计划(周期)执行任务,其中,作为线程池的execute和submit方法全部直接调用了scheduleXX方法。值得一提的是,ScheduledThreadPoolExecutor覆盖了ThreadPoolExecutoronShutdown()方法,用于关闭线程池时的额外操作,该方法在父类中是空方法。

由Executors构建 #

一般地,我们会使用Executors来获取线程池,Executors提供了2个基本方法(不包括重载方法)来获取计划执行任务的线程池。

 1/** 构造一个不可动态配置的ScheduledThreadPoolExecutor,其核心线程池数量为1*/
 2public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
 3    return new DelegatedScheduledExecutorService
 4        (new ScheduledThreadPoolExecutor(1));
 5}
 6
 7/** 构造一个核心线程池为1的ScheduledThreadPoolExecutor*/
 8public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
 9    return new ScheduledThreadPoolExecutor(corePoolSize);
10}

我们可以自定义线程工厂(ThreadFactory)来调用其重载方法以自定义线程信息。

接下来,介绍让ScheduledExecutorService按照计划执行任务的核心,ScheduledFutureTaskDelayedWorkQueue

ScheduledFutureTask #

1private class ScheduledFutureTask<V>
2            extends FutureTask<V> implements RunnableScheduledFuture<V>
3            {...}

提交给ScheduledExecutorService的任务都被包装成ScheduledFutureTask实例,相较FutureTask,其还实现了RunnableScheduledFuture接口,这个接口是RunnableFuture,ScheduledFuture的子接口,也就是Runnable,Future和Delay的实现类。

实现Delay接口是关键,它保证计划任务能够按时(周期)执行,并且任务能够按照执行顺序或者添加顺序被取出执行。

#

 1// 每一个实例都有一个“序号”,用来维持其在队列中的位置
 2private final long sequenceNumber;
 3
 4// 任务下一次执行的时间,纳秒表示
 5private long time;
 6
 7// 任务周期执行的“周期”,纳秒表示,正数表示固定频率执行;
 8// 负数表示固定延迟执行,0表示不是周期执行的任务
 9private final long period;
10
11// 用来重新插入队列中的任务 (周期执行的任务)
12RunnableScheduledFuture<V> outerTask = this;
13
14// 任务在队列中的索引(看出来是一个树)
15int heapIndex;

构造器 #

 1// 构造一个单次执行的任务
 2ScheduledFutureTask(Runnable r, V result, long ns) {
 3    super(r, result);
 4    this.time = ns;
 5    this.period = 0;
 6    this.sequenceNumber = sequencer.getAndIncrement();
 7}
 8// 构造单次执行的任务
 9ScheduledFutureTask(Callable<V> callable, long ns) {
10    super(callable);
11    this.time = ns;
12    this.period = 0;
13    this.sequenceNumber = sequencer.getAndIncrement();
14}
15// 构造周期执行的任务
16ScheduledFutureTask(Runnable r, V result, long ns, long period) {
17    super(r, result);
18    this.time = ns;
19    this.period = period;
20    this.sequenceNumber = sequencer.getAndIncrement();
21}

前2个构造器构造单次执行的任务,不过使用的任务不同罢了;第三个构造器构造周期执行的任务。每构造一个任务,任务的sequenceNumber便自增1。

方法 #

compareTo #

由于Delay接口实现了Comparable接口,因此实现此方法对任务进行排序,其排序规则是:

  • 先比较time,先执行的任务在前
  • time相等,再比较sequenceNumber,先添加的任务在

setNextRunTime #

设置周期任务下一次执行的时间

1private void setNextRunTime() {
2   long p = period;
3   if (p > 0)
4       //  固定周期执行,上一次执行时间+period即可
5       time += p;
6   else
7       // 固定delay执行
8       time = triggerTime(-p);
9}

run #

执行任务的核心方法

 1public void run() {
 2    // 检查是否周期任务
 3    boolean periodic = isPeriodic();
 4    if (!canRunInCurrentRunState(periodic))
 5        // 当前状态不允许运行任务
 6        cancel(false);
 7    else if (!periodic)
 8        // 执行单次任务
 9        ScheduledFutureTask.super.run();
10    // 执行周期任务使用了runAndReset方法
11    else if (ScheduledFutureTask.super.runAndReset()) {
12        // 周期任务执行完毕一次
13        // 设置下次执行的时间
14        setNextRunTime();
15        // 将任务添加到队列
16        reExecutePeriodic(outerTask);
17    }
18}
19
20// 将已经执行的任务再次放入任务队列中
21void reExecutePeriodic(RunnableScheduledFuture<?> task) {
22    if (canRunInCurrentRunState(true)) {
23        // 再次入队
24        super.getQueue().add(task);
25        // double check
26        if (!canRunInCurrentRunState(true) && remove(task))
27            // 取消任务
28            task.cancel(false);
29        else
30            // 创建(如果需要)worker,保证有线程执行任务
31            ensurePrestart();
32    }
33}

DelayedWorkQueue #

ScheduledThreadPoolExecutor使用DeleyedWorkQueue作为任务队列,它是一个特殊的delay queue,其维护一个有序的ScheduledFutureTask任务队列。在本节中,限于数据结构相关知识尚缺,将跳过叙述队列中的元素如何调整其在树中的位置,着重叙述任务入队及出队的逻辑。

1static class DelayedWorkQueue extends AbstractQueue<Runnable>
2        implements BlockingQueue<Runnable> {
3        }

该类中,有一个核心概念,它用一个私有域表示:

1// 这个域用来等待队列的队首元素出现
2private Thread leader = null;

delay queue 中,如果没有元素的delay超时,那么你将无法从队列中取出元素。当某个任务A的delay最先超时时,其将优先出队并执行,那么leader将被声明为执行任务A的线程TA,在该任务A超时之前,leader不会被重置,在这一段时间内,其他线程只能等待;若任务A超时出队,leader将被重置,此时线程TA将唤醒等待的其他线程,然后重复重置leader的过程。我们将在任务入队和出队时看到leader域的作用。

取消任务 #

默认情况下,如果取消一个任务的执行,该任务不会从队列中移除,不过我们可以动态地配置removeOnCancel域,在取消任务时同时将任务从队列中移除。被取消的任务不能继续执行,在线程池关闭的时候将从队列中移除。

 1void cancelSchedule() {
 2    // default false
 3    service.setRemoveOnCancelPolicy(false);
 4    // task to cancelled
 5    service.schedule(this::s, 10, TimeUnit.SECONDS);
 6    BlockingQueue<Runnable> queue = service.getQueue();
 7    Runnable task = queue.peek();
 8    if (task instanceof RunnableScheduledFuture) {
 9        ((FutureTask<?>) task).cancel(false);
10    }
11
12    service.schedule(this::s, 1, TimeUnit.SECONDS);
13    TimeUnit.SECONDS.sleep(2);
14    // should be 1
15    System.out.println("queue size: " + queue.size());
16
17    service.shutdown();
18    // removed by onShutdown hook method
19    System.out.println("queue size: " + queue.size());
20}
21
22public static void main(String[] args) {
23    TestScheduledPoolExecutor ts = new TestScheduledPoolExecutor(0);
24    ts.cancelSchedule();
25}
26/* output
27Thread[pool-1-thread-1,5,main] 1
28queue size: 1
29queue size: 0
30*///:~

上例中,可以看到提交了2个任务,只有一个任务执行。首先提交的任务随即被取消了,第一次获取队列大小时,执行完一个任务,但是队列不为空,被取消的任务还在队列中,在线程池shutdown之后,任务随即被移除。如果使用service.setRemoveOnCancelPolicy(true)替换示例中的设置,那么两次获取的队列大小都是0。

这样的设计有一个好处,如果刻意取消一个任务,特定条件下可以避免重复的销毁和创建工作线程。在前面的讨论中,我们知道,核心线程空闲时是不会被销毁的,它会在任务队列上阻塞;但是非核心线程就不同了,如果队列为空,非核心线程会在 CP1处结束运行,但是如果取消一个任务,并且任务没有从队列中移除的话,那么这个非核心线程就不会被销毁。

关闭线程池 #

除了继承ThreadPoolExecutor线程池关闭的逻辑之外,ScheduledThreadPoolExecutor关闭线程池和其基类还有些许差异,主要是其通过实现onShutdown方法,实现了新的关闭策略。

onShutDown方法 #

调用shutdownshutdownNow方法的基本逻辑和基类一致,不过shutdown过程中的onShutdown方法引入了新的关闭策略

关闭策略由2个布尔值域控制,分别是

  • executeExistingDelayedTasksAfterShutdown = true; shutdown之后默认执行计划(单次)任务
  • continueExistingPeriodicTasksAfterShutdown;shutdown之后默认不执行周期任务

这两个域可以在线程池初始化之后进行动态配置,默认情况下,调用shutdown方法之后,

  • 计划的(one-shot)任务将继续执行;
  • 如果是周期任务,将从任务队列中移除;
  • 已经取消的任务将会从队列中移除

调用shutdownNow方法的逻辑则完全和基类一致,其会中断所有任务,返回丢弃的任务列表

以下是onShutdown方法的具体实现:

 1@Override void onShutdown() {
 2        BlockingQueue<Runnable> q = super.getQueue();
 3        boolean keepDelayed =
 4            getExecuteExistingDelayedTasksAfterShutdownPolicy();
 5        boolean keepPeriodic =
 6            getContinueExistingPeriodicTasksAfterShutdownPolicy();
 7        // 如果shutdown之后既不执行计划任务也不执行周期任务
 8        if (!keepDelayed && !keepPeriodic) {
 9            // 那么取消所有任务的执行,并清空队列
10            for (Object e : q.toArray())
11                if (e instanceof RunnableScheduledFuture<?>)
12                    ((RunnableScheduledFuture<?>) e).cancel(false);
13            q.clear();
14        }
15        else {
16            // Traverse snapshot to avoid iterator exceptions
17            for (Object e : q.toArray()) {
18                if (e instanceof RunnableScheduledFuture) {
19                    RunnableScheduledFuture<?> t =
20                        (RunnableScheduledFuture<?>)e;
21                    // 不管是在shutdown之后执行计划任务或者周期任务,都移除已经取消的任务
22                    // 但是不移除计划执行的任务
23                    if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
24                        t.isCancelled()) { // also remove if already cancelled
25                        if (q.remove(t))
26                            t.cancel(false);
27                    }
28                }
29            }
30        }
31        tryTerminate();
32    }

下面的示例中,我们重新设置了线程池的关闭策略,以观察线程池在关闭时候的行为

 1@SneakyThrows
 2void shutdownPolicy() {
 3   // 如果任务在shutdown()之后仍在delay,那么将值设置为false可以取消任务的执行
 4   // 其默认值为true
 5   service.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
 6   service.schedule(this::s, 1, TimeUnit.MILLISECONDS);
 7
 8   // 如果是周期执行的任务,将此值设置为true可以在调用shutdown()之后让其继续执行,否则结束执行
 9   // 其默认值为false
10   service.setContinueExistingPeriodicTasksAfterShutdownPolicy(true);
11   service.scheduleWithFixedDelay(this::s, 2, 1, TimeUnit.SECONDS);
12
13   service.shutdown();
14   TimeUnit.SECONDS.sleep(10);
15   // shutdownNow interrupt all tasks
16   service.shutdownNow();
17   // could be true or false
18   System.out.println(service.isTerminated());
19}

shutDown之后,周期任务仍会一直执行,所以要使用shutDownNow来中止任务的执行


  1. 特殊地,当corePoolSize = 0时,池中仅可允许一个线程执行任务 ↩︎