ThreadPoolExecutor(二)
前文说过,ThreadPoolExecutor
实例代表了Java线程池,前面我们介绍了ThreadPoolExecutor
的构造器和几个核心概念,在本节中,我们着重介绍线程池的执行过程以及线程池的关闭。
线程池的运行状态 #
线程池的运行状态表示了线程池的生命周期,在代码实现中它们使用用一个整数表示:
状态 | 描述 |
---|---|
RUNNING | 接受新任务的提交,执行队列中的任务 |
SHUTDOWN | 不接受新任务的提交,执行队列中的任务 |
STOP | 不接受新任务的提交,不执行队列中的任务,中断正在执行的任务 |
TIDYING | 所有任务终止,workerCount = 0 ,执行terminated()方法 |
TERMINATED | terminated()方法执行完毕 |
为了方便地判断线程池的运行状态,给上述线程池状态约定了单调的演化关系:
状态变化 | 条件 |
---|---|
RUNNING -> SHUTDOWN | 调用shutdown() 方法,或者隐式调用了finalize() 1 |
(RUNNING或SHUTDOWN) -> STOP | 调用shutdownNow() 方法 |
SHUTDOWN -> TIDYING | 当线程池和任务队列都为空时 |
STOP -> TIDYING | 线程池为空 |
TIDYING -> TERMINATED | 当terminated() 方法执行完成 |
可以看到,线程池的状态是单调演化的,除了RUNNING状态可以接受任务并执行外,其他的状态都将导致线程池资源关闭。ThreadPoolExecutor
类中有几个获取线程池状态的方法:
1/** 若线程池的状态不是RUNNING,那么该方法就返回true*/
2public boolean isShutdown() {
3 return ! isRunning(ctl.get());
4}
5
6/** 若线程池的状态不是RUNNING,并且状态没有还没有切换到TERMINATED,该方法就返回true
7这个方法返回true说明线程池正处于terminae的过程中*/
8public boolean isTerminating() {
9 int c = ctl.get();
10 return ! isRunning(c) && runStateLessThan(c, TERMINATED);
11}
12
13/** 若线程的状态为TERMINATED,该方法返回true*/
14public boolean isTerminated() {
15 return runStateAtLeast(ctl.get(), TERMINATED);
16}
线程池中任务的执行过程 #
了解了线程池的工作状态,接下来我们尝试去深入任务是如何在线程池中被执行的,以及线程池中核心线程,任务队列以及非核心线程之间是如何协同工作的。
在 任务队列中,我们阐述了任务队列与线程池之间存在交互关系,这种交互关系体现了线程池执行任务的重要过程。
线程池执行流程图
提交任务 #
在介绍
ExecutorService时我们提到了AbstractExecutorService
基类,它有两个重要的作用:
- 将所有的任务提交转变为执行一个
FutureTask
- 实现了
invokeAny/invokeAll
方法
了解到这一点之后,我们将线程池的任务执行重心放在ThreadPoolExecutor
的execute(Runnable)
方法上:
1public void execute(Runnable command) {
2 if (command == null)
3 throw new NullPointerException();
4
5 int c = ctl.get();
6 // 当前工作线程数 < corePoolSize
7 if (workerCountOf(c) < corePoolSize) {
8 // 直接添加新的工作线程执行之
9 if (addWorker(command, true))
10 return;
11 // 若新建失败,则表示rs >= shutdown,任务将会被拒绝
12 c = ctl.get();
13 }
14 // 否则将任务放入队列
15 if (isRunning(c) && workQueue.offer(command)) {
16 // 线程状态RUNNING,任务已放入队列
17 // double check
18 int recheck = ctl.get();
19 // 这里double-check的原因是:
20 if (! isRunning(recheck) && remove(command))
21 // 1. 线程池可能被shutdown了,这时候直接从队列移除任务并拒绝之
22 reject(command);
23 else if (workerCountOf(recheck) == 0)
24 // 2. 若corePoolSize = 0,而非核心线程都完成了任务
25 // 空闲线程超时被销毁之后,就可能出现workerCount = 0 的情况
26 // 此时添加一个非核心线程去执行队列中的任务
27 addWorker(null, false);
28 }
29 // 队列满了,则尝试新建一个非核心线程执行任务,否则拒绝之
30 else if (!addWorker(command, false))
31 reject(command);
32}
33
34/**使用Worker包装线程来执行任务*/
35private boolean addWorker(Runnable firstTask, boolean core) {
36 // 循环判断,直到满足新建Worker的条件为止
37 retry:
38 for (;;) {
39 int c = ctl.get();
40 int rs = runStateOf(c);
41
42 // Check if queue empty only if necessary.
43 // 解释一下这个return false的逻辑
44 /* 1. 若rs = runnning,继续添加worker
45 * 2. 若rs >= shutdown
46 * 2.1 rs >= stop 不新建worker(return false)
47 * 2.2 rs = shutdown,firstTask != null,
48 * 不新建worker (shutdown之后不接受新任务提交)
49 * 2.3 rs = shutdown,firstTask = null,workQueue为空,不新建worker
50 */
51 if (rs >= SHUTDOWN &&
52 ! (rs == SHUTDOWN &&
53 firstTask == null &&
54 ! workQueue.isEmpty()))
55 return false;
56
57 for (;;) {
58 int wc = workerCountOf(c);
59 if (wc >= CAPACITY ||
60 wc >= (core ? corePoolSize : maximumPoolSize))
61 // 线程数量超限
62 return false;
63 if (compareAndIncrementWorkerCount(c))
64 break retry;
65 c = ctl.get(); // Re-read ctl
66 if (runStateOf(c) != rs)
67 continue retry;
68 // else CAS failed due to workerCount change; retry inner loop
69 }
70 }
71
72 boolean workerStarted = false;
73 boolean workerAdded = false;
74 Worker w = null;
75 try {
76 w = new Worker(firstTask);
77 /*
78 Worker(Runnable firstTask) {
79 setState(-1); // inhibit interrupts until runWorker
80 this.firstTask = firstTask;
81 this.thread = getThreadFactory().newThread(this);
82 }
83 */
84 final Thread t = w.thread;
85 if (t != null) {
86 final ReentrantLock mainLock = this.mainLock;
87 mainLock.lock();
88 try {
89 // Recheck while holding lock.
90 // Back out on ThreadFactory failure or if
91 // shut down before lock acquired.
92 int rs = runStateOf(ctl.get());
93 // 状态为RUNNING时可以新建Worker执行任务
94 // 状态为SHUTDOWN时,任务必须为空(不可提交任务)
95 if (rs < SHUTDOWN ||
96 (rs == SHUTDOWN && firstTask == null)) {
97 if (t.isAlive()) // precheck that t is startable
98 throw new IllegalThreadStateException();
99 // 调整字段值
100 workers.add(w);
101 int s = workers.size();
102 if (s > largestPoolSize)
103 largestPoolSize = s;
104 workerAdded = true;
105 }
106 } finally {
107 mainLock.unlock();
108 }
109 // 运行任务
110 if (workerAdded) {
111 // 从Worker的构造器来看,线程t的构造器参数是Worker
112 // 因此start()实际上执行的是Worker的run()方法
113 t.start();
114 workerStarted = true;
115 }
116 }
117 } finally {
118 // 线程池创建线程失败,清理资源
119 if (! workerStarted)
120 addWorkerFailed(w);
121 }
122 // 返回true表示线程已创建并启动
123 // 根据调用参数的不同,启动的线程可能直接执行任务
124 // 也可能从队列中获取任务执行
125 return workerStarted;
126}

线程池添加worker的流程
创建空线程 #
前面介绍
核心概念的时候说到,线程池初始化成功之后,池中是没有活动线程的,不过线程池具有很好的灵活性,可以进行动态配置。使用prestartCoreThread()
和prestartAllCoreThreads()
方法可以向线程池中添加核心线程,这些线程并没有使用任务初始化,不过其会尝试去队列中获取任务执行,若队列为空,这些线程就会挂起(waiting)2。
1/** 创建一个核心线程*/
2public boolean prestartCoreThread() {
3 return workerCountOf(ctl.get()) < corePoolSize &&
4 addWorker(null, true);
5}
6/** 创建所有核心线程*/
7public int prestartAllCoreThreads() {
8 int n = 0;
9 while (addWorker(null, true))
10 ++n;
11 return n;
12}
执行任务 #
线程池创建线程是为了执行任务,addWorker()
方法成功时会启动线程,线程则会调用Worker的run()
方法。
1public void run() {
2 runWorker(this);
3}
4
5/**该方法会循环进行,并且在getTask()方法处阻塞*/
6final void runWorker(Worker w) {
7 Thread wt = Thread.currentThread();
8 // 任务即为创建Worker的入参
9 Runnable task = w.firstTask;
10 w.firstTask = null;
11 w.unlock(); // allow interrupts
12 boolean completedAbruptly = true;
13 try {
14 // 只要有任务提交或队列不为空,则一直执行
15 while (task != null || (task = getTask()) != null) {
16 w.lock();
17 // If pool is stopping, ensure thread is interrupted;
18 // if not, ensure thread is not interrupted. This
19 // requires a recheck in second case to deal with
20 // shutdownNow race while clearing interrupt
21 // 如果线程池状态为STOP(调用shutdownNow()),则中断线程
22 if ((runStateAtLeast(ctl.get(), STOP) ||
23 (Thread.interrupted() &&
24 runStateAtLeast(ctl.get(), STOP))) &&
25 !wt.isInterrupted())
26 wt.interrupt();
27 try {
28 // 可扩展方法
29 beforeExecute(wt, task);
30 Throwable thrown = null;
31 try {
32 task.run();
33 } catch (RuntimeException x) {
34 thrown = x; throw x;
35 } catch (Error x) {
36 thrown = x; throw x;
37 } catch (Throwable x) {
38 thrown = x; throw new Error(x);
39 } finally {
40 // 可扩展方法
41 afterExecute(task, thrown);
42 }
43 } finally {
44 task = null;
45 w.completedTasks++;
46 w.unlock();
47 }
48 }
49 completedAbruptly = false;
50 } finally {
51 // while循环结束后的动作
52 processWorkerExit(w, completedAbruptly);
53 }
54}
55
56/** 该方法从队列中获取任务,方法会被阻塞(核心线程)或超时阻塞(非核心线程)*/
57private Runnable getTask() {
58 boolean timedOut = false; // Did the last poll() time out?
59
60 for (;;) {
61 int c = ctl.get();
62 int rs = runStateOf(c);
63
64 // Check if queue empty only if necessary.
65 // 如果状态为SHUTDOWN,但队列不为空,仍从队列中执行任务
66 // 如果状态为STOP,则直接return null
67 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
68 // workerCount - 1
69 decrementWorkerCount();
70 return null;
71 }
72
73 int wc = workerCountOf(c);
74
75 // Are workers subject to culling?
76 // 当allowCoreThreadTimeOut被设置时,核心线程超时阻塞
77 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
78
79 if ((wc > maximumPoolSize || (timed && timedOut))
80 && (wc > 1 || workQueue.isEmpty())) {
81 if (compareAndDecrementWorkerCount(c))
82 return null;
83 continue;
84 }
85
86 try {
87 // 阻塞队列获取队头任务
88 Runnable r = timed ?
89 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
90 workQueue.take();
91 if (r != null)
92 return r;
93 // 超时未获取到任务 --> line 79 --> return null
94 timedOut = true;
95 } catch (InterruptedException retry) {
96 timedOut = false;
97 }
98 }
99}

线程池执行任务的流程
可以看到,线程池中的线程初始化之后,其执行任务的过程是阻塞的,也就是说,线程池中的线程一直处于“stand by”状态,除此之外,我们还可以得到以下信息:
- 如果没有设置
allowCoreThreadTimeOut
,核心线程执行任务的过程将一直进行 - 非核心线程的执行任务的过程将在超时之后,方法不返回,循环再次进行,将在try块之前的if语句块中返回null
- 当线程池状态为SHUTDOWN时,若队列不为空,仍会去队列中获取任务执行;若状态为STOP,将不会从队列中获取任务
当出现下列任一情况时,getTask()
会返回null结束线程运行:
- workerCount > maximumPoolSize,一般在动配置maximumPoolSize之后出现
- 线程池状态为STOP
- 线程池状态为SHUTDOWN,且队列为空
- 当线程获取队列中的任务超时,且该线程不是队列中的唯一线程或队列为空
前面3点都比较好理解,第4点有点难以理解,我们使用一个corePoolSize=0的线程池特例加以说明:
1void cachedPool(){
2 ThreadPoolExecutor service =
3 (ThreadPoolExecutor) Executors.newCachedThreadPool();
4
5 // service 5秒之后即关闭
6 service.setKeepAliveTime(5,TimeUnit.SECONDS);
7 service.submit(()->{
8 System.out.println("task done");
9 });
10}
我们知道,newCachedThreadPool
构建一个corePoolSize=0的线程池,因此池中所有的任务在空闲超时都会被超时销毁,我们不妨来看看这一过程是如何发生的;我们将keepAliveTime
重新设置为5s,并且向线程池中提交一个任务。
线程池首先会新建一个线程执行任务,调用的是addWorker(firstTask, false)方法;
在runWorker的第二次循环时,由于firstTask已经被执行,将调用
getTask()
方法去队列中获取任务。我们知道队列中没有任务,超时时间为5s,5s之后getTask()方法将timeout
置为true后进入第二次循环;注意此次循环:
1if ((wc > maximumPoolSize || (timed && timedOut)) 2 && (wc > 1 || workQueue.isEmpty())) { 3 if (compareAndDecrementWorkerCount(c)) 4 return null; 5 continue; 6}
不难看出来,第一次wc =1 并且timeout=false,显然是不满足if的条件;第二次则不同,timeout此时为true,workQueue.isEmpty为true,if条件满足;
此时将 wc-1,并且返回null
返回null之后,runWorker()方法的while循环也会结束,接下来会执行processWorkerExit(w, completedAbruptly)
方法:
1/**while循环正常结束,completedAbruptly为false*/
2private void processWorkerExit(Worker w, boolean completedAbruptly) {
3 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
4 decrementWorkerCount();
5
6 final ReentrantLock mainLock = this.mainLock;
7 mainLock.lock();
8 try {
9 // 统计已经完成的任务数
10 completedTaskCount += w.completedTasks;
11 // 将Worker从HashSet中移除
12 workers.remove(w);
13 } finally {
14 mainLock.unlock();
15 }
16
17 // 正如其名,「尝试」终止线程池
18 tryTerminate();
19
20 int c = ctl.get();
21 // 若线程池状态为RUNNING or SHUTDOWN
22 if (runStateLessThan(c, STOP)) {
23 if (!completedAbruptly) {
24 // 线程池中的最小线程数
25 int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
26 if (min == 0 && ! workQueue.isEmpty())
27 //队列非空时,要保证池中有线程运行任务
28 min = 1;
29 if (workerCountOf(c) >= min)
30 // 池中还有线程,可以安心返回
31 return; // replacement not needed
32 }
33 // 否则,向池中加入一个线程
34 addWorker(null, false);
35 }
36}
在上面方法的最后if条件中,wc=min=0
,池中没有线程并且任务队列为空,线程成功完成使命,结束运行。
综上所述,被创建的线程除了执行被提交的任务之外,还会被阻塞执行队列中的任务,而核心线程和非核心线程在空闲时又会存在处理方式的差异。
值得一提的是,在上面的newFixedThreadPool()
的例子中,线程池提交完任务之后,并没有调用关闭方法,那么线程池能关闭么?
通过上面的分析,例子中的线程在执行完任务后超时被销毁,此时池中没有线程在运行,队列中也没有任务,那么就意味着所有的逻辑都已经完成,并没有发生阻塞,线程池中的线程数为0,任务队列为空,虽然如此,线程池的状态还是RUNNING!线程池并没有终止,其还可以继续提交任务运行,实际上,线程池回到了初始化 时的状态。
如何合理地关闭线程池 #
ThreadPoolExecutor
提供了2个关闭线程池的方法
1public void shutdown() {
2 final ReentrantLock mainLock = this.mainLock;
3 mainLock.lock();
4 try {
5 // 检查权限
6 checkShutdownAccess();
7 // 修改线程池状态为SHUTDOWN
8 advanceRunState(SHUTDOWN);
9 // 中断所有空闲(waiting)的线程
10 // 在condition.await()上阻塞的线程能够响应中断,
11 // 这就是线程池能够关闭而不阻塞的原因
12 // 阻塞的线程被中断唤醒后继续在getTask()上继续执行,
13 // 在线程池状态判断时return null而结束
14 interruptIdleWorkers();
15 onShutdown(); // hook for ScheduledThreadPoolExecutor
16 } finally {
17 mainLock.unlock();
18 }
19 // 执行terminated()(空)方法,将线程状态设置为TERMINATED
20 tryTerminate();
21}
22
23public List<Runnable> shutdownNow() {
24 List<Runnable> tasks;
25 final ReentrantLock mainLock = this.mainLock;
26 mainLock.lock();
27 try {
28 // 权限检查
29 checkShutdownAccess();
30 // 修改线程池状态为STOP
31 advanceRunState(STOP);
32 // 中断所有线程
33 interruptWorkers();
34 // 队列中未执行的任务
35 tasks = drainQueue();
36 } finally {
37 mainLock.unlock();
38 }
39 tryTerminate();
40 return tasks;
41}
42
43final void tryTerminate() {
44 for (;;) {
45 int c = ctl.get();
46 /*直接返回的条件:
47 * 1. 线程池状态为RUNNING
48 * 2. 线程池状态为 TIDYING 或 TERMINATED
49 * 3. 线程状态为 SHUTDOWN, 且队列不为空
50 */
51 if (isRunning(c) ||
52 runStateAtLeast(c, TIDYING) ||
53 (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
54 return;
55 // 若工作线程数 > 0 , 中断一个空闲线程并返回
56 if (workerCountOf(c) != 0) { // Eligible to terminate
57 interruptIdleWorkers(ONLY_ONE);
58 return;
59 }
60
61 final ReentrantLock mainLock = this.mainLock;
62 mainLock.lock();
63 try {
64 // 设置线程池状态为TIDYING
65 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
66 try {
67 // 运行terminated()方法
68 terminated();
69 } finally {
70 // 设置线程状态为TERMINATED
71 ctl.set(ctlOf(TERMINATED, 0));
72 // 唤醒awaitTermination方法
73 termination.signalAll();
74 }
75 return;
76 }
77 } finally {
78 mainLock.unlock();
79 }
80 // else retry on failed CAS
81 }
82}
从上面的分析,我们可以清晰地看到shutdown()
和shutdownNow()
的区别,前者只中断了空闲线程,后者中断了所有线程;结合前文getTask()
方法的表述,前者未被中断的线程还可继续执行并从任务队列中获取任务执行,而后者已经无法从队列中获取任务执行了,这与本节开头对线程池的
运行状态的描述一致。
shutdown()
和shutdownNow()
方法都不能中断正在执行的任务,不过后者对正在执行的任务发送了中断命令,如果任务能够响应中断,即可以作出相应操作。如果想在shutdown()
或shutdownNow()
执行之后继续获取任务的返回值,只能使用awaitTermination()
方法愚蠢地等待。awaitTermination()
方法阻塞当前调用该方法的线程,直到任务执行完毕、超时、调用线程被中断3者任一条件发生。
需要说明的是,如果awaitTermination()
阻塞过程中线程池的状态变为TERNMINATD,说明任务执行完毕,返回true;否则返回false或抛出中断异常。
下面的示例代码演示了shutdown()
和shutdownNow()
方法的区别:
1public class ExecutorShutdown {
2
3 static int pointer = 0;
4 /** 容量为1的线程池,其能保证提交的任务都是序列化执行的 */
5 ThreadPoolExecutor service
6 = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
7
8 @SneakyThrows
9 public static void main(String[] args) {
10 ExecutorShutdown es = new ExecutorShutdown();
11 es.shutdown();
12// es.awaitTermination(1, TimeUnit.SECONDS);
13
14 }
15
16 void shutdown() {
17 service.execute(new ComplexTask());
18 // 对于newFixedThreadPool(1),EasyTask在任务队列中
19 service.execute(new EasyTask());
20 service.shutdown();
21 // shutdown之后,任务并没有执行完成,pointer的值还是0
22 System.out.println("pointer:" + pointer);
23
24 // 获取待任务队列
25 System.out.println("workQueue: " + service.getQueue());
26 // 判断该执行器是否被关闭
27 System.out.println("is executor shutdown? " + service.isShutdown());
28 // 执行器关闭之后所有任务是否都完成
29 // 如果没有调用shutdown()或shutdownNow()就直接调用isTerminated(),该方法必返回false
30 System.out.println("is executor terminated? " + service.isTerminated());
31 System.out.println("pointer:" + pointer);
32 }
33
34 void awaitTermination(int timeout, TimeUnit unit) {
35 service.execute(new ComplexTask());
36 service.execute(new EasyTask());
37 List<Runnable> tasks;
38 try {
39 if (service.awaitTermination(timeout, unit)) {
40 service.shutdown();
41 } else {
42 if(!(tasks = service.shutdownNow()).isEmpty()){
43 System.out.println("丢弃任务" + tasks);
44 }
45 }
46 } catch (InterruptedException e) {
47 e.printStackTrace();
48 }
49 System.out.println("workQueue: " + service.getQueue());
50 System.out.println("is executor shutdown? " + service.isShutdown());
51 System.out.println("is executor terminated? " + service.isTerminated());
52 }
53
54 abstract class Task {
55 @Override
56 public String toString() {
57 return getClass().getSimpleName() + "@" + Integer.toHexString(hashCode());
58 }
59 }
60
61 class ComplexTask extends Task implements Runnable {
62 @Override
63 public void run() {
64 // 响应中断,调用shutdownNow()可以结束任务
65 System.out.println("[" + Thread.currentThread()
66 + "@" + this + "],开始执行");
67 // never finish unless interrupted
68 for (; ; ) {
69 if (!Thread.interrupted()) {
70 pointer++;
71 } else {
72 System.out.println("[" + Thread.currentThread()
73 + "@" + this + "],被中断");
74 break;
75 }
76 }
77 }
78 }
79
80 class EasyTask extends Task implements Runnable {
81 @Override
82 public void run() {
83 System.out.println("[" + Thread.currentThread()
84 + "@" + this + "],开始执行");
85 pointer++;
86 System.out.println("[" + Thread.currentThread()
87 + "@" + this + "],执行完成");
88 }
89 }
90}
91/* output
92调用shutdown:
93[Thread[pool-1-thread-1,5,main]@ComplexTask@48d82c9c],开始执行
94pointer:0
95workQueue: [EasyTask@14ae5a5]
96is executor shutdown? true
97is executor terminated? false
98pointer:813
99
100调用awaitTermination:
101[Thread[pool-1-thread-1,5,main]@ComplexTask@7ac59a98],开始执行
102[Thread[pool-1-thread-1,5,main]@ComplexTask@7ac59a98],被中断
103丢弃任务[EasyTask@7f31245a]
104workQueue: []
105is executor shutdown? true
106is executor terminated? true
107*///:~
上例中我们设计了一个可以正常执行的任务EasyTask和一个无限循环执行的任务ComplexTask,后者响应中断,如果不中断线程,ComplexTask将一直运行下去。我们使用一个固定容量为1的线程池运行任务,并且先提交ComplexTask,ComplexTask无法结束运行,那么EasyTask将会放入队列中。
从运行的结果上来看,使用shutdown()
无法结束线程池的运行,虽然主线程结束,但线程池一直在后台运行,同时EasyTask也还在任务队列中,主线程结束后线程池的还没有终止,程序会一直在后台运行。
当调用awaitTermination(timeout, unit)
时,很明显这个方法将超时并返回false,最终执行shutdownNow()
,shutdownNow给ComplexTask任务发送中断命令,其在下一次循环检查到中断,结束执行。同时任务队列中的EasyTask被丢弃,任务队列为空,主线程结束后,线程池也成功终止。
如果ComplexTask在设计时,没有响应中断,而使用死循环执行任务,那么shutdownNow()
方法仍然无法终止线程池,这就是官方文档中关于shutdownNow()
方法描述的语义:
There are no guarantees beyond best-effort attempts to stop processing actively executing tasks. This implementation cancels tasks via {@link Thread#interrupt}, so any task that fails to respond to interrupts may never terminate.