终结任务
终结任务 #
一般地,如果程序运行良好,任务执行完所需操作后自然结束,任务终结。
如果任务执行时出现异常,任务也会终结。
在设计多个线程协同工作的任务时,需要判断任务终结的条件,以便合适地终结任务,这点尤为重要。
在本节中主要讨论在多线程协同工作的情况下,如何合适的终结任务。
响应中断 #
在讨论
Object超类的时候,我们曾通过“录音-播放”模型简单阐述线程之间的协同工作,在那个示例中,方便起见,我们通过System.exit(0);
来粗暴地结束程序的运行。这种方式在并发编程实践中是不被允许的。
接下来的示例中,我们再次以线程之间的协同工作为切点,讨论如何“合理地”终结任务的运行。
下例模拟汽车的“打蜡-抛光”过程,抛光必须在打蜡完成之后,同样的,打蜡之前汽车必须是抛光过的。
1public class Wax {
2
3 static class Car {
4 private boolean waxOn = false;
5
6 public synchronized void waxed() {
7 waxOn = true; // Ready to buff
8 notifyAll();
9 }
10
11 public synchronized void buffed() {
12 waxOn = false; // Ready for another coat of wax
13 notifyAll();
14 }
15
16 public synchronized void waitForWaxing()
17 throws InterruptedException {
18 // waxOn = false时一直等待
19 while (!waxOn)
20 wait();
21 }
22
23 public synchronized void waitForBuffing()
24 throws InterruptedException {
25 // waxOn = true时一直等待
26 while (waxOn)
27 wait();
28 }
29 }
30
31 static class WaxOn implements Runnable {
32 private Car car;
33
34 public WaxOn(Car c) {
35 car = c;
36 }
37
38 @Override
39 public void run() {
40 try {
41 while (!Thread.interrupted()) {
42 TimeUnit.MILLISECONDS.sleep(100);
43 System.out.print("Wax On! ");
44 car.waxed();
45 car.waitForBuffing();
46 }
47 } catch (InterruptedException e) {
48 System.out.println("WaxOn Exiting via interrupt");
49 }
50 System.out.println("Ending Wax On task");
51 }
52 }
53
54 static class BufferOn implements Runnable {
55 private Car car;
56
57 public BufferOn(Car c) {
58 car = c;
59 }
60
61 @Override
62 public void run() {
63 try {
64 while (!Thread.interrupted()) {
65 // 任务直接进入等待直到被唤醒, waxOn = true时得以执行
66 car.waitForWaxing();
67 System.out.print("Wax Off! ");
68 TimeUnit.MILLISECONDS.sleep(100);
69 car.buffed();
70 }
71 } catch (InterruptedException e) {
72 System.out.println("BufferOn Exiting via interrupt");
73 }
74 System.out.println("Ending Buffer On task");
75 }
76 }
77
78 public static void main(String[] args) throws Exception {
79 Car car = new Car();
80 ExecutorService exec = Executors.newCachedThreadPool();
81 exec.execute(new BufferOn(car));
82 exec.execute(new WaxOn(car));
83 TimeUnit.SECONDS.sleep(2); // Run for a while...
84 exec.shutdownNow(); // Interrupt all tasks
85 }
86}
87/* Output: (95% match)
88Wax On! Wax Off! BufferOn Exiting via interrupt
89WaxOn Exiting via interrupt
90Ending Wax On task
91Ending Buffer On task
92*///:~
因为两个任务是交互等待-执行的,调用wait()
方法而进入WAITING状态的线程可以被中断并抛出异常1,上面的输出显示BufferOn
任务先响应中断,这只是可能的情况之一,因为输出 Wax Off! 之后BufferOn
任务会进入等待,而正好被中断。
调用执行器的shutdownNow()
方法关闭提交的任务,shutdownNow()
方法会立即给已经提交的任务发送一个中断interrupt()
命令。调用shutdownNow()
之后,可以看到两个任务都抛出InterruptedException
。
⚠️注意: 两个任务都抛出中断异常和任务中的sleep方法有关,由于sleep和wait都可以被中断并抛出异常,所以异常的抛出是由谁引发的并不容易确定。虽然try块位于任务的最外层,但是
Thread.interrupted()
方法并不抛出异常。
上例实际上是利用了中断线程而出现的异常而终止线程的运行,然而,BLOCKED2状态下的线程无法响应中断。
无法中断 #
Thread提供了interrupt()
方法,用于设置线程的中断状态。为了调用此方法,你必须持有Thread对象。并发编程过程中一般避免显式创建线程,上例中使用了shutdownNow()
向任务发送interrup()
命令,同样地,Java提供一个带有类型参数的接口
Future<V>,它具有取消任务执行的能力。
但是,阻塞状态下的线程是否都能响应中断呢?
1public class Interrupting {
2 private static ExecutorService exec = Executors.newCachedThreadPool();
3
4 static void test(Runnable r) throws InterruptedException {
5 // 构造一个可中断的任务
6 Future<?> f = exec.submit(r);
7 TimeUnit.MILLISECONDS.sleep(100);
8 // 中断任务
9 System.out.println(r.getClass().getSimpleName()
10 + " Interrupt: " + f.cancel(true));
11 }
12
13 public static void main(String[] args) throws Exception {
14 test(new SleepBlocked());
15 test(new IOBlocked(System.in)); // 不能中断
16 test(new SynchronizedBlocked()); // 不能中断
17 TimeUnit.SECONDS.sleep(3);
18 System.exit(0); // ... since last 2 interrupts failed
19 }
20
21 /** sleep可以被中断 */
22 static class SleepBlocked implements Runnable {
23 @Override
24 public void run() {
25 try {
26 TimeUnit.SECONDS.sleep(100);
27 } catch (InterruptedException e) {
28 System.out.println("InterruptedException");
29 }
30 System.out.println("Exiting SleepBlocked.run()");
31 }
32 }
33
34 /** I/O不可被中断 */
35 static class IOBlocked implements Runnable {
36 private InputStream in;
37
38 public IOBlocked(InputStream is) {
39 in = is;
40 }
41
42 @Override
43 public void run() {
44 try {
45 System.out.println("Waiting for read():");
46 in.read();
47 } catch (Exception e) {
48 if (Thread.currentThread().isInterrupted()) {
49 System.out.println("Interrupted from blocked I/O");
50 } else {
51 throw new RuntimeException(e);
52 }
53 }
54 System.out.println("Exiting IOBlocked.run()");
55 }
56 }
57
58 /** 不可被中断 */
59 static class SynchronizedBlocked implements Runnable {
60 public synchronized void f() {
61 while (true) // Never releases lock
62 Thread.yield();
63 }
64
65 public SynchronizedBlocked() {
66 // 构造之后就获取锁而不释放
67 new Thread(() -> {
68 f(); // Lock acquired by this thread
69 }).start();
70 }
71
72 /** run()方法将一直阻塞 */
73 @Override
74 public void run() {
75 System.out.println("Trying to call f()");
76 f();
77 System.out.println("Exiting SynchronizedBlocked.run()");
78 }
79 }
80}
81/* output:
82InterruptedException
83SleepBlocked Interrupt: true
84Exiting SleepBlocked.run()
85Waiting for read():
86IOBlocked Interrupt: true
87Trying to call f()
88SynchronizedBlocked Interrupt: true
89 *///:~
由于Future的cancel(boolean)
方法也是向执行任务的线程发送interrupt()
命令,上例中3个任务,只有SleepBlocked
在休眠时被中断并退出运行,其他的两个任务IOBlocked
和SynchronizedBlocked
均没有被中断。实际上,在编码过程中我们也可以发现,只有sleep()
方法需要处理InterruptedException
异常,而无论时I/O还是尝试调用synchronized
方法,都不需要处理InterruptedException
。
对于I/O阻塞的情况,有一个简单的处理办法——即关闭任务在其上发生阻塞的资源:
1public class CloseResource {
2 public static void main(String[] args) throws Exception {
3 ExecutorService exec = Executors.newCachedThreadPool();
4 InputStream socketInput = new Socket("localhost", 8080).getInputStream();
5 exec.execute(new Interrupting.IOBlocked(socketInput));
6 exec.execute(new Interrupting.IOBlocked(System.in));
7 TimeUnit.MILLISECONDS.sleep(10);
8 System.out.println("Shutting down all threads");
9 // 两个任务都无法响应中断
10 exec.shutdownNow();
11 TimeUnit.SECONDS.sleep(1);
12 System.out.println("Closing " + socketInput.getClass().getName());
13 // 关闭资源可以使线程响应中断
14 socketInput.close(); // Releases blocked thread
15 TimeUnit.SECONDS.sleep(1);
16 System.out.println("Closing " + System.in.getClass().getName());
17 System.in.close(); // Releases blocked thread
18 }
19}
20/* Output: (85% match)
21Waiting for read():
22Waiting for read():
23Shutting down all threads
24Closing java.net.SocketInputStream
25Interrupted from blocked I/O
26Exiting IOBlocked.run()
27Closing java.io.BufferedInputStream
28Exiting IOBlocked.run()
29*///:~
上例中的2个任务都无法响应中断,但是一旦关闭资源,那么阻塞就被中断。
对于因获取锁失败而阻塞的情况,实际上,上例中的情况可以看作是死锁,由于任务无法获取对象的锁而一直阻塞。幸运的是,Java提供ReentrantLock锁,其具备在因获取锁而阻塞但是又能响应中断的能力。
1public class LockingInterrupt {
2
3 // 可重入锁获取锁的时候可以被中断
4 private Lock lock = new ReentrantLock();
5
6 public LockingInterrupt() {
7 // lock the instance once constructed
8 lock.lock();
9 }
10
11 void f() {
12 try {
13 // invoke can be interrupted
14 lock.lockInterruptibly();
15 System.out.println("acquire lock in f() success");
16 }catch (InterruptedException e){
17 System.out.println("Interrupted from acquire lock in f()");
18 }
19 }
20
21 static class MutexTask implements Runnable{
22 LockingInterrupt mbi = new LockingInterrupt();
23 @Override
24 public void run() {
25 System.out.println("waiting for f()");
26 mbi.f();
27 System.out.println("Broken out of blocked call");
28 }
29 }
30
31 public static void main(String[] args) throws InterruptedException {
32 Thread t = new Thread(new MutexTask());
33 t.start();
34 // 中断t,若不中断,t会一直阻塞
35 t.interrupt();
36 }
37}
38/*
39waiting for f()
40Interrupted from acquire lock in f()
41Broken out of blocked call
42*///:~
上例中,LockingInterrupt
初始化的时候就占用锁,并没有释放锁,而在运行f()
方法的时候再去获取锁时任务就被阻塞了,在调用interrupt()
方法中断的时候,lockInterruptibly()
响应了中断,任务结束程序退出。
惯用法 #
从上面的例子我们已经知道,可以通过检查线程的中断状态来结束任务的执行。下面的例子展示了一种惯用法,它使用try-finally块来紧跟资源,以应对任何时候任务出现中断时保证资源被释放:
1public class InterruptingIdiom {
2
3 /** 需要清理的资源类 */
4 static class NeedsCleanup {
5 private final int id;
6
7 public NeedsCleanup(int ident) {
8 id = ident;
9 System.out.println("NeedsCleanup " + id);
10 }
11
12 public void cleanup() {
13 System.out.println("Cleaning up " + id);
14 }
15 }
16
17 static class Blocked3 implements Runnable {
18 private volatile double d = 0.0;
19
20 @Override
21 public void run() {
22 try {
23 while (!Thread.interrupted()) {
24 NeedsCleanup n1 = new NeedsCleanup(1);
25 // 在n1之后紧跟try-finally块,保证资源被合理的清除
26 // node 1
27 try {
28 System.out.println("Sleeping");
29 TimeUnit.SECONDS.sleep(1);
30 NeedsCleanup n2 = new NeedsCleanup(2);
31 // 同理
32 // node2
33 try {
34 System.out.println("Calculating");
35 //耗时操作
36 for (int i = 1; i < 2500000; i++) {
37 d = d + (Math.PI + Math.E) / d;
38 }
39 // node3
40 System.out.println(
41 "Finished time-consuming operation");
42 } finally {
43 n2.cleanup();
44 }
45 } finally {
46 n1.cleanup();
47 }
48 }
49 System.out.println("Exiting via while() test");
50 } catch (InterruptedException e) {
51 System.out.println("Exiting via InterruptedException");
52 }
53 }
54 }
55
56 public static void main(String[] args) throws Exception {
57 if (args.length != 1) {
58 System.out.println("usage: java InterruptingIdiom delay-in-mS");
59 System.exit(1);
60 }
61 Thread t = new Thread(new Blocked3());
62 t.start();
63 TimeUnit.MILLISECONDS.sleep(new Integer(args[0]));
64 t.interrupt();
65 }
66}
67/* Output: (Sample)
68NeedsCleanup 1
69Sleeping
70NeedsCleanup 2
71Calculating
72Finished time-consuming operation
73Cleaning up 2
74Cleaning up 1
75NeedsCleanup 1
76Sleeping
77Cleaning up 1
78Exiting via InterruptedException
79*///:~
上例接收一个参数,表示程序中断之前的运行时间(ms),由于任务中有一段耗时的循环操作,当参数大小不同时,程序的输出会有所差异:
任务可能在node1和node2之间中断,因此其输出为:
NeedsCleanup 1
Sleeping
Cleaning up 1
Exiting via InterruptedException
当任务在node2和node3之间设置中断状态,再次进入循环时中断被监测到,程序退出,此时的输出为:
NeedsCleanup 1
Sleeping
NeedsCleanup 2
Calculating
Finished time-consuming operation
Cleaning up 2
Cleaning up 1
Exiting via while() test
总之,无论任务在何时被释放,其创建的资源都会被合适地释放。
TIJ第四版第21章并发(694页)在描述线程的状态时,将调用休眠/等待之后线程的状态称为阻塞。为避免混淆,本文采用 Thread.State中关于线程的描述,并认为其不应该被称为阻塞状态。 ↩︎
本博客约定此状态(等待锁)的线程才处于阻塞状态。 ↩︎