ThreadPoolExecutor(一)

ThreadPoolExecutor(一)

前文就已经提过,Executors执行器创建的线程池包括不同实现,可以应对不同的场景,那么Java中包含哪些实现呢?

本问就来讨论这些实现。

ThreadPoolExecutor #

该类是执行器(线程池)的核心类,一般来讲,Java的线程池,指的就是ThreadPoolExecutor实例。

构造器 #

ThreadPoolExecutor提供了4个构造器用来构造线程池实例:

 1
 2public ThreadPoolExecutor(int corePoolSize,
 3                          int maximumPoolSize,
 4                          long keepAliveTime,
 5                          TimeUnit unit,
 6                          BlockingQueue<Runnable> workQueue) {
 7   ...
 8}
 9
10public ThreadPoolExecutor(int corePoolSize,
11                          int maximumPoolSize,
12                          long keepAliveTime,
13                          TimeUnit unit,
14                          BlockingQueue<Runnable> workQueue,
15                          ThreadFactory threadFactory) {
16    ...
17}
18
19public ThreadPoolExecutor(int corePoolSize,
20                          int maximumPoolSize,
21                          long keepAliveTime,
22                          TimeUnit unit,
23                          BlockingQueue<Runnable> workQueue,
24                          RejectedExecutionHandler handler) {
25    ...
26}
27
28public ThreadPoolExecutor(int corePoolSize,
29                          int maximumPoolSize,
30                          long keepAliveTime,
31                          TimeUnit unit,
32                          BlockingQueue<Runnable> workQueue,
33                          ThreadFactory threadFactory,
34                          RejectedExecutionHandler handler) {
35    ...
36}

从构造器来看呢,要构建一个线程池实例,至少需要提供5个参数,另外2个参数不提供则可以使用默认配置1,这些参数分别是:

参数描述
corePoolSize核心线程池大小
maximumPoolSize最大线程池大小
keepAliveTime非核心线程执行完任务后最长的空间等待时间,超时则销毁线程
unitkeepAliveTime的单位
workQueue用于保存待执行任务的队列
threadFactory用于创建线程的线程工厂
handler线程池满载(队列无空间,且不能新建线程)后,处理新提交任务的拒绝策略

这些构造器参数就是线程池的核心概念,理解这几个参数在线程池运行过程中的意义便理解了线程池的大半。

核心概念 #

核心线程池与最大线程池 #

线程池的getPoolSize()方法返回的线程数不应该超过线程池的核心线程池大小(corePoolSize) 最大线程池大小(maximumPoolSize)。线程池中的工作线程数不可能超过最大线程池大小。若想获得当前的正在执行任务的线程数,需使用getActiveCount()方法。

当一个任务被提交至线程池后,若:

  • 当前工作线程数 < corePoolSize,新建一个线程来完成任务——尽管可能有空闲核心线程。 (当工作线程数 < corePoolSize时,任务队列一定是空的)
  • corePoolSize < 当前工作线程数 < maximumPoolSize,并且任务队列已满,那么新建一个非核心线程来完成任务。

当设置corePoolSize=maximumPoolSize时,你将获得一个固定容量的线程池;当将maxPoolSize设置为Integer.MAX_VALUE时,线程数没有限制,这有可能造成内存泄漏

本文约定当前工作线程指代线程池中存在的线程(getPoolSize()方法的返回值),其中可能存在部分空闲线程。当工作线程数少于核心线程数时:

1)当前线程池中的线程全是核心线程;

2)任务队列一定是空的;

3)当前某个线程可能是空闲的(执行完任务,在等待队列中的任务(runWorker方法阻塞))。

尽管在构建线程池实例时要指定corePoolSizemaximumPoolSize,在获得实例之后还可以通过setCorePoolSize(int)setMaximumPoolSize(int)来对其进行修改。

类似地,存活时间,线程工厂,拒绝策略其他参数都可以在线程池初始化之后再进行设置。

默认情况下,当线程池初始化成功之后,池中是没有任何线程的。不过,可以调用prestartCoreThread()prestartAllCoreThreads()来向线程池中添加一个或所有核心线程。如果你使用一个非空的任务队列初始化线程池,这样做是有用的。

 1@SneakyThrows
 2void initPoolWithNonEmptyQueue() {
 3    BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(2) {{
 4        add(() -> {
 5            System.out.println("1st task done");
 6        });
 7        add(()->{
 8            System.out.println("2nd task done");
 9        });
10    }};
11
12    ThreadPoolExecutor.AbortPolicy abortPolicy = 
13        new ThreadPoolExecutor.AbortPolicy();
14    ThreadPoolExecutor poolExecutor = 
15        new ThreadPoolExecutor(1, 1, 0, 
16                    TimeUnit.MILLISECONDS, queue, abortPolicy);
17
18    poolExecutor.prestartCoreThread();
19    poolExecutor.shutdown();
20
21}
22/* output
231st task done
242nd task done
25*///:~

使用prestartCoreThread()还有一个好处,它可以保证队列中的任务顺序执行。

线程工厂 #

线程池中的线程使用线程工厂ThreadFactory创建,如果没有指定,将使用Executors.defaultThreadFactory。如果线程工厂在创建线程时失败而返回null,那么线程池将无法执行任何任务。

存活时间 #

keepAliveTime针对的是非核心线程,非核心线程处理完任务后,若在keepAliveTime内没有新任务添加到队列并被其获取并运行,其将被销毁。这是一种资源保护策略,如果线程池的任务突然增多,可能又会创建非核心线程来完成任务。当corePoolSize = maximumPoolSize时,线程池无法创建非核心线程,此时keepAliveTime参数可能没有意义,一般将其设置为0。

但凡事并非绝对,ThreadPoolExecutor维护一个布尔型变量allowCoreThreadTimeOut,其默认值是false,用来控制核心线程池的“生命”:

1/**
2 * If false (default), core threads stay alive even when idle.
3 * If true, core threads use keepAliveTime to time out waiting
4 * for work.
5 */
6private volatile boolean allowCoreThreadTimeOut;

这个变量的值由allowCoreThreadTimeOut(boolean value)方法修改

 1public void allowCoreThreadTimeOut(boolean value) {
 2    if (value && keepAliveTime <= 0)
 3        throw new IllegalArgumentException(
 4            "Core threads must have nonzero keep alive times");
 5    if (value != allowCoreThreadTimeOut) {
 6        allowCoreThreadTimeOut = value;
 7        if (value)
 8            interruptIdleWorkers();
 9    }
10}

可以看到,如果将变量allowCoreThreadTimeOut的值设置为true,那么空闲的核心线程池也将会在keepAliveTime超时之后被销毁(如果没有任务让其执行)。

任务队列 #

任务队列是一个阻塞队列,一个线程池中只有一个任务队列。任务队列用于存放当前尚没有线程可执行之的任务,其和线程池之间存在如下的交互关系:

  • 如果当前工作线程 < corePoolSize,线程池将创建新线程执行任务而非将任务放入队列
  • 如果当前工作线程 > corePoolSize,线程池倾向于将任务放入队列而非创建新线程执行之
  • 如果任务无法放入队列(满),并且当前工作线程 < maximumPoolSize,将创建新线程执行之,否则任务将被拒绝

任务队列有3种常见实现:

  1. 直接运行(direct handoffs),这种情形的任务队列一般由 SynchronousQueue实现,这种队列的实现对线程池的要求严苛,如果没有可用的线程即刻执行任务,那么将任务放入队列将失败。在此情形下,一般将maximumPoolSize设置为Integer.MAX_ VALUE以防止线程池拒绝任务。这种实现可能会导致内存泄漏。

  2. 无界任务队列, 一般由 LinkedBlockingQueue实现,这种情形下,当当前工作线程达到corePoolSize之后,所有新提交的任务都会放入队列中,由于队列无界,就不会再创建新线程了,也不会拒绝任务。因此maximumPoolSize这一设置将无意义。如果任务源源不断地提交,有可能任务积压导致内存泄漏。

  3. 有界队列,一般由 ArrayBlockingQueue实现,使用有界队列可以避免资源耗尽,但是也增加了配置的难度,是应该配置更多的线程数更小的队列还是应该配置更大的队列更少的线程数,往往需要根据具体的任务来考量。

拒绝策略 #

前面提到,如果线程池满,新提交的任务就会被线程池拒绝执行;同样的,如果线程池关闭了,提交任务也会被拒绝。线程池通过调用RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor)来拒绝任务,ThreadPoolExecutor内建了4种不同的拒绝策略:

  1. ThreadPoolExecutor.AbortPolicy,也是默认的拒绝策略,该策略直接抛出RejectedExecutionException的运行时异常
1public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
2    throw new RejectedExecutionException("Task " + r.toString() +
3                                         " rejected from " +
4                                         e.toString());
5}
  1. ThreadPoolExecutor.CallerRunsPolicy,如果线程池未关闭,该策略直接在执行execute()方法的线程上运行任务,否则该任务被丢弃
1public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
2    if (!e.isShutdown()) {
3        r.run();
4    }
5}
  1. ThreadPoolExecutor.DiscardPolicy,该策略直接丢弃不能被执行的任务
1public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
2}
  1. ThreadPoolExecutor.DiscardOldestPolicy,如果线程池未关闭,则将队列头部的任务丢弃,然后继续执行execute(Runnable)方法
1public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
2    if (!e.isShutdown()) {
3        e.getQueue().poll();
4        e.execute(r);
5    }
6}

Executors构建的实例 #

Executors的三个方法(没有包含重载方法)返回该类的实例:

 1public static ExecutorService newFixedThreadPool(int nThreads) {
 2        return new ThreadPoolExecutor(nThreads, nThreads,
 3                                      0L, TimeUnit.MILLISECONDS,
 4                                      new LinkedBlockingQueue<Runnable>());
 5}
 6/* 构建一个固定容量的线程池,该线程池的线程都是核心线程,任务队列使用无界队列;当线程数达到
 7corePoolSize时,新提交的任务都将放入队列,这个线程池不会拒绝任务*/
 8
 9
10public static ExecutorService newCachedThreadPool() {
11        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
12                                      60L, TimeUnit.SECONDS,
13                                      new SynchronousQueue<Runnable>());
14}
15/* 构建一个corePoolSize为0,maximumPoolSize无限制的线程池,线程池中的线程都是非核心线程,
16当线程空闲超过60s后即被销毁,这个线程池的任务队列使用的是SynchronousQueue,因此一旦提交任务,
17即会创建一个线程去执行之*/
18
19public static ExecutorService newSingleThreadExecutor() {
20        return new FinalizableDelegatedExecutorService
21            (new ThreadPoolExecutor(1, 1,
22                                    0L, TimeUnit.MILLISECONDS,
23                                    new LinkedBlockingQueue<Runnable>()));
24}
25/* 构建一个corePoolSize = maximumPoolSize = 1的线程池,该线程池只有一个核心线程,任务
26队列为无界队列,因此当核心线程已被创建后,所有提交的任务都放入队列,这个线程池不会拒绝任务。与
27第一个静态方法不同的是,由于其使用FinalizableDelegatedExecutorService包装
28ThreadPoolExecutor,这个线程池一旦初始化,不允许再进行动态配置*/

如上所示,前2个静态方法构造的都是特殊的ThreadPoolExecutor实例,初始化成功之后,都是可以通过ThreadPoolExecutor的实例方法进行动态配置的。

第3个静态方法有所不同,其生成了一个容量为1且不可改变的线程池,严格来说,它返回的不是ThreadPoolExecutor实例,而是由ThreadPoolExecutor包装的FinalizableDelegatedExecutorService实例。

FinalizableDelegatedExecutorService是Executors类(仅具有包访问权限)的内部类,FinalizableDelegatedExecutorService类继自DelegatedExecutorService,这是一个仅仅有ExecutorService接口方法的包装类,因此,当我们调用newSingleThreadExecutor()方法时,仅可以将其声明为ExecutorService

1ExecutorService service = Executors.newSingleThreadExecutor();
2
3// !非法,不能强制类型转换
4ThreadPoolExecutor pool = (ThreadPoolExecutor)Executors.newSingleThreadExecutor();

正因为其是一个仅仅可以执行ExecutorService接口方法的包装类,其无法在线程池初始化之后再动态配置。

扩展阅读: ThreadPoolExecutor jdk1.8 Javadoc


  1. 须调用合适的构造器,实际上所有参数必须提供,不过有些由构造器默认提供。 ↩︎