ScheduledExecutorService(一)
除了ThreadPoolExecutor
之外,Java执行器(Executor)框架还提供了可以在指定延迟之后执行一次或周期执行任务的接口ScheduledExecutorService
,较
java.util.Timer而言,它是更好的选择。
与
线程池不同的是,用于计划执行的ScheduledThreadPoolExecutor
使用ScheduledFutureTask
作为任务,使用DelayedWorkQueue
作为任务队列,以实现计划(周期)执行的目的。
ScheduledThreadPoolExecutor继承关系图
从ScheduledThreadPoolExecutor
的继承关系图可以看到,其是ThreadPoolExecutor
的导出类,其提交任务和执行任务以及关闭线程池的逻辑应和线程池相差无几,其重点差别在于任务对象以及任务队列的封装上,后文将会详述ScheduledThreadPoolExecutor
的任务计划执行以及周期执行机制。
ScheduledExecutorService #
继承自ExecutorService
接口,其方法定义了一个可以用于在指定延迟之后执行一次或周期执行的ExecutorService,它主要定义了如下4个方法:
// 继承自ExecutorService 和Executor的方法被省略
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
/* 在给定的延迟之后执行Callable任务,立即返回ScheduledFuture<V>,
其可以获取任务的结果或者取消任务*/
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
/* 在给定的延迟之后执行Runnable任务,
立即返回ScheduledFuture<?>,其get()方法返回null*/
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
long period, TimeUnit unit)
/* 在给定的初始延迟initialDelay之后执行Runnable任务,
接着在给定的时间间隔period之后再次执行任务,
接着再间隔period之后再次执行任务...
如果某次任务的执行耗时 > period,下次的计划执行将被延后,
并不会同时执行多个任务
如果某次执行抛出异常,那么接下来的执行将被中止。
周期执行的任务只有线程池终止之后才会停止执行,也
就是说周期任务永远不会主动完成
返回值ScheduledFuture<?>代表将要执行的任务,
取消任务时,其get()方法会抛出异常*/
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
long delay, TimeUnit unit)
/* 在给定的初始延迟之后执行Runnable任务,
接着在任务完成之后延迟delay之后再次执行,接着在上一个
任务完成之后延迟delay再次执行...
如果某次执行抛出异常,那么接下来的执行将被中止。
周期执行的任务只有线程池终止之后才会停止执行,也
就是说周期任务永远不会主动完成
返回值ScheduledFuture<?>代表将要执行的任务,
取消任务时,其get()方法会抛出异常*/
ScheduledThreadPoolExecutor #
由于其是ThreadPoolExecutor
的导出类,故其主要逻辑和其父类一致,本节的讨论着重于二者差异的部分。
构造器 #
ScheduledThreadPoolExecutor
的构造器就不再赘述了,基本上是父类的构造参数中抽取了几个便于理解的构造器,将其分列如下:
1public ScheduledThreadPoolExecutor(int corePoolSize) {
2 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
3 new DelayedWorkQueue());
4}
5public ScheduledThreadPoolExecutor(int corePoolSize,
6 ThreadFactory threadFactory) {
7 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
8 new DelayedWorkQueue(), threadFactory);
9}
10public ScheduledThreadPoolExecutor(int corePoolSize,
11 RejectedExecutionHandler handler) {
12 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
13 new DelayedWorkQueue(), handler);
14}
15public ScheduledThreadPoolExecutor(int corePoolSize,
16 ThreadFactory threadFactory,
17 RejectedExecutionHandler handler) {
18 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
19 new DelayedWorkQueue(), threadFactory, handler);
20}
ScheduledThreadPoolExecutor
的实例均使用Integer.MAX_VALUE作为最大线程池数,这是否意味着其可以使用无限制的线程去运行任务呢?答案是否定的,ScheduledThreadPoolExecutor
保证了其池中的线程数不会超过corePoolSize
1。
域 #
除了构造器中指定的参数之外,ScheduledThreadPoolExecutor
还有一些其他参数,这些参数都可以在ScheduledThreadPoolExecutor
初始化完成之后再进行动态配置。
1/** 线程池shutdown之后是否继续执行周期任务,true执行,默认为false*/
2private volatile boolean continueExistingPeriodicTasksAfterShutdown;
3
4/** 线程池shutdown之后是否继续执行计划任务,true执行,默认为true*/
5private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
6
7/** 取消任务时是否将任务从队列中移除,true移除,默认false*/
8private volatile boolean removeOnCancel = false;
9
10/** 任务添加的顺序,初始化ScheduledFutureTask时使用*/
11private static final AtomicLong sequencer = new AtomicLong();
方法 #
ScheduledThreadPoolExecutor
使用最多的还是实现自ScheduledExecutorService
接口的4个方法,用于计划(周期)执行任务,其中,作为线程池的execute和submit方法全部直接调用了scheduleXX方法。值得一提的是,ScheduledThreadPoolExecutor
覆盖了ThreadPoolExecutor
的onShutdown()
方法,用于关闭线程池时的额外操作,该方法在父类中是空方法。
由Executors构建 #
一般地,我们会使用Executors
来获取线程池,Executors
提供了2个基本方法(不包括重载方法)来获取计划执行任务的线程池。
1/** 构造一个不可动态配置的ScheduledThreadPoolExecutor,其核心线程池数量为1*/
2public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
3 return new DelegatedScheduledExecutorService
4 (new ScheduledThreadPoolExecutor(1));
5}
6
7/** 构造一个核心线程池为1的ScheduledThreadPoolExecutor*/
8public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
9 return new ScheduledThreadPoolExecutor(corePoolSize);
10}
我们可以自定义线程工厂(ThreadFactory)来调用其重载方法以自定义线程信息。
接下来,介绍让ScheduledExecutorService按照计划执行任务的核心,ScheduledFutureTask
和 DelayedWorkQueue
。
ScheduledFutureTask #
1private class ScheduledFutureTask<V>
2 extends FutureTask<V> implements RunnableScheduledFuture<V>
3 {...}
提交给ScheduledExecutorService
的任务都被包装成ScheduledFutureTask
实例,相较FutureTask,其还实现了RunnableScheduledFuture
接口,这个接口是RunnableFuture,ScheduledFuture的子接口,也就是Runnable,Future和Delay的实现类。
实现Delay接口是关键,它保证计划任务能够按时(周期)执行,并且任务能够按照执行顺序或者添加顺序被取出执行。
域 #
1// 每一个实例都有一个“序号”,用来维持其在队列中的位置
2private final long sequenceNumber;
3
4// 任务下一次执行的时间,纳秒表示
5private long time;
6
7// 任务周期执行的“周期”,纳秒表示,正数表示固定频率执行;
8// 负数表示固定延迟执行,0表示不是周期执行的任务
9private final long period;
10
11// 用来重新插入队列中的任务 (周期执行的任务)
12RunnableScheduledFuture<V> outerTask = this;
13
14// 任务在队列中的索引(看出来是一个树)
15int heapIndex;
构造器 #
1// 构造一个单次执行的任务
2ScheduledFutureTask(Runnable r, V result, long ns) {
3 super(r, result);
4 this.time = ns;
5 this.period = 0;
6 this.sequenceNumber = sequencer.getAndIncrement();
7}
8// 构造单次执行的任务
9ScheduledFutureTask(Callable<V> callable, long ns) {
10 super(callable);
11 this.time = ns;
12 this.period = 0;
13 this.sequenceNumber = sequencer.getAndIncrement();
14}
15// 构造周期执行的任务
16ScheduledFutureTask(Runnable r, V result, long ns, long period) {
17 super(r, result);
18 this.time = ns;
19 this.period = period;
20 this.sequenceNumber = sequencer.getAndIncrement();
21}
前2个构造器构造单次执行的任务,不过使用的任务不同罢了;第三个构造器构造周期执行的任务。每构造一个任务,任务的sequenceNumber
便自增1。
方法 #
compareTo #
由于Delay接口实现了Comparable接口,因此实现此方法对任务进行排序,其排序规则是:
- 先比较
time
,先执行的任务在前 - 若
time
相等,再比较sequenceNumber
,先添加的任务在
setNextRunTime #
设置周期任务下一次执行的时间
1private void setNextRunTime() {
2 long p = period;
3 if (p > 0)
4 // 固定周期执行,上一次执行时间+period即可
5 time += p;
6 else
7 // 固定delay执行
8 time = triggerTime(-p);
9}
run #
执行任务的核心方法
1public void run() {
2 // 检查是否周期任务
3 boolean periodic = isPeriodic();
4 if (!canRunInCurrentRunState(periodic))
5 // 当前状态不允许运行任务
6 cancel(false);
7 else if (!periodic)
8 // 执行单次任务
9 ScheduledFutureTask.super.run();
10 // 执行周期任务使用了runAndReset方法
11 else if (ScheduledFutureTask.super.runAndReset()) {
12 // 周期任务执行完毕一次
13 // 设置下次执行的时间
14 setNextRunTime();
15 // 将任务添加到队列
16 reExecutePeriodic(outerTask);
17 }
18}
19
20// 将已经执行的任务再次放入任务队列中
21void reExecutePeriodic(RunnableScheduledFuture<?> task) {
22 if (canRunInCurrentRunState(true)) {
23 // 再次入队
24 super.getQueue().add(task);
25 // double check
26 if (!canRunInCurrentRunState(true) && remove(task))
27 // 取消任务
28 task.cancel(false);
29 else
30 // 创建(如果需要)worker,保证有线程执行任务
31 ensurePrestart();
32 }
33}
DelayedWorkQueue #
ScheduledThreadPoolExecutor
使用DeleyedWorkQueue
作为任务队列,它是一个特殊的delay queue,其维护一个有序的ScheduledFutureTask
任务队列。在本节中,限于数据结构相关知识尚缺,将跳过叙述队列中的元素如何调整其在树中的位置,着重叙述任务入队及出队的逻辑。
1static class DelayedWorkQueue extends AbstractQueue<Runnable>
2 implements BlockingQueue<Runnable> {
3 }
该类中,有一个核心概念,它用一个私有域表示:
1// 这个域用来等待队列的队首元素出现
2private Thread leader = null;
在delay queue 中,如果没有元素的delay超时,那么你将无法从队列中取出元素。当某个任务A的delay最先超时时,其将优先出队并执行,那么leader
将被声明为执行任务A的线程TA,在该任务A超时之前,leader不会被重置,在这一段时间内,其他线程只能等待;若任务A超时出队,leader将被重置,此时线程TA将唤醒等待的其他线程,然后重复重置leader的过程。我们将在任务入队和出队时看到leader
域的作用。
取消任务 #
默认情况下,如果取消一个任务的执行,该任务不会从队列中移除,不过我们可以动态地配置removeOnCancel
域,在取消任务时同时将任务从队列中移除。被取消的任务不能继续执行,在线程池关闭的时候将从队列中移除。
1void cancelSchedule() {
2 // default false
3 service.setRemoveOnCancelPolicy(false);
4 // task to cancelled
5 service.schedule(this::s, 10, TimeUnit.SECONDS);
6 BlockingQueue<Runnable> queue = service.getQueue();
7 Runnable task = queue.peek();
8 if (task instanceof RunnableScheduledFuture) {
9 ((FutureTask<?>) task).cancel(false);
10 }
11
12 service.schedule(this::s, 1, TimeUnit.SECONDS);
13 TimeUnit.SECONDS.sleep(2);
14 // should be 1
15 System.out.println("queue size: " + queue.size());
16
17 service.shutdown();
18 // removed by onShutdown hook method
19 System.out.println("queue size: " + queue.size());
20}
21
22public static void main(String[] args) {
23 TestScheduledPoolExecutor ts = new TestScheduledPoolExecutor(0);
24 ts.cancelSchedule();
25}
26/* output
27Thread[pool-1-thread-1,5,main] 1
28queue size: 1
29queue size: 0
30*///:~
上例中,可以看到提交了2个任务,只有一个任务执行。首先提交的任务随即被取消了,第一次获取队列大小时,执行完一个任务,但是队列不为空,被取消的任务还在队列中,在线程池shutdown之后,任务随即被移除。如果使用service.setRemoveOnCancelPolicy(true)
替换示例中的设置,那么两次获取的队列大小都是0。
这样的设计有一个好处,如果刻意取消一个任务,特定条件下可以避免重复的销毁和创建工作线程。在前面的讨论中,我们知道,核心线程空闲时是不会被销毁的,它会在任务队列上阻塞;但是非核心线程就不同了,如果队列为空,非核心线程会在 CP1处结束运行,但是如果取消一个任务,并且任务没有从队列中移除的话,那么这个非核心线程就不会被销毁。
关闭线程池 #
除了继承ThreadPoolExecutor
的
线程池关闭的逻辑之外,ScheduledThreadPoolExecutor
关闭线程池和其基类还有些许差异,主要是其通过实现onShutdown
方法,实现了新的关闭策略。
onShutDown方法 #
调用shutdown
和shutdownNow
方法的基本逻辑和基类一致,不过shutdown
过程中的onShutdown
方法引入了新的关闭策略
关闭策略由2个布尔值域控制,分别是
- executeExistingDelayedTasksAfterShutdown = true; shutdown之后默认执行计划(单次)任务
- continueExistingPeriodicTasksAfterShutdown;shutdown之后默认不执行周期任务
这两个域可以在线程池初始化之后进行动态配置,默认情况下,调用shutdown
方法之后,
- 计划的(one-shot)任务将继续执行;
- 如果是周期任务,将从任务队列中移除;
- 已经取消的任务将会从队列中移除
调用shutdownNow
方法的逻辑则完全和基类一致,其会中断所有任务,返回丢弃的任务列表
以下是onShutdown
方法的具体实现:
1@Override void onShutdown() {
2 BlockingQueue<Runnable> q = super.getQueue();
3 boolean keepDelayed =
4 getExecuteExistingDelayedTasksAfterShutdownPolicy();
5 boolean keepPeriodic =
6 getContinueExistingPeriodicTasksAfterShutdownPolicy();
7 // 如果shutdown之后既不执行计划任务也不执行周期任务
8 if (!keepDelayed && !keepPeriodic) {
9 // 那么取消所有任务的执行,并清空队列
10 for (Object e : q.toArray())
11 if (e instanceof RunnableScheduledFuture<?>)
12 ((RunnableScheduledFuture<?>) e).cancel(false);
13 q.clear();
14 }
15 else {
16 // Traverse snapshot to avoid iterator exceptions
17 for (Object e : q.toArray()) {
18 if (e instanceof RunnableScheduledFuture) {
19 RunnableScheduledFuture<?> t =
20 (RunnableScheduledFuture<?>)e;
21 // 不管是在shutdown之后执行计划任务或者周期任务,都移除已经取消的任务
22 // 但是不移除计划执行的任务
23 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
24 t.isCancelled()) { // also remove if already cancelled
25 if (q.remove(t))
26 t.cancel(false);
27 }
28 }
29 }
30 }
31 tryTerminate();
32 }
下面的示例中,我们重新设置了线程池的关闭策略,以观察线程池在关闭时候的行为
1@SneakyThrows
2void shutdownPolicy() {
3 // 如果任务在shutdown()之后仍在delay,那么将值设置为false可以取消任务的执行
4 // 其默认值为true
5 service.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
6 service.schedule(this::s, 1, TimeUnit.MILLISECONDS);
7
8 // 如果是周期执行的任务,将此值设置为true可以在调用shutdown()之后让其继续执行,否则结束执行
9 // 其默认值为false
10 service.setContinueExistingPeriodicTasksAfterShutdownPolicy(true);
11 service.scheduleWithFixedDelay(this::s, 2, 1, TimeUnit.SECONDS);
12
13 service.shutdown();
14 TimeUnit.SECONDS.sleep(10);
15 // shutdownNow interrupt all tasks
16 service.shutdownNow();
17 // could be true or false
18 System.out.println(service.isTerminated());
19}
在shutDown
之后,周期任务仍会一直执行,所以要使用shutDownNow
来中止任务的执行
特殊地,当
corePoolSize = 0
时,池中仅可允许一个线程执行任务 ↩︎