终结任务

终结任务

终结任务 #

一般地,如果程序运行良好,任务执行完所需操作后自然结束,任务终结。

如果任务执行时出现异常,任务也会终结。

在设计多个线程协同工作的任务时,需要判断任务终结的条件,以便合适地终结任务,这点尤为重要。

在本节中主要讨论在多线程协同工作的情况下,如何合适的终结任务。

响应中断 #

在讨论 Object超类的时候,我们曾通过“录音-播放”模型简单阐述线程之间的协同工作,在那个示例中,方便起见,我们通过System.exit(0);来粗暴地结束程序的运行。这种方式在并发编程实践中是不被允许的。

接下来的示例中,我们再次以线程之间的协同工作为切点,讨论如何“合理地”终结任务的运行。

下例模拟汽车的“打蜡-抛光”过程,抛光必须在打蜡完成之后,同样的,打蜡之前汽车必须是抛光过的。

 1public class Wax {
 2
 3    static class Car {
 4        private boolean waxOn = false;
 5
 6        public synchronized void waxed() {
 7            waxOn = true; // Ready to buff
 8            notifyAll();
 9        }
10
11        public synchronized void buffed() {
12            waxOn = false; // Ready for another coat of wax
13            notifyAll();
14        }
15
16        public synchronized void waitForWaxing()
17            throws InterruptedException {
18            // waxOn = false时一直等待
19            while (!waxOn)
20                wait();
21        }
22
23        public synchronized void waitForBuffing()
24            throws InterruptedException {
25            // waxOn = true时一直等待
26            while (waxOn)
27                wait();
28        }
29    }
30
31    static class WaxOn implements Runnable {
32        private Car car;
33
34        public WaxOn(Car c) {
35            car = c;
36        }
37
38        @Override
39        public void run() {
40            try {
41                while (!Thread.interrupted()) {
42                    TimeUnit.MILLISECONDS.sleep(100);
43                    System.out.print("Wax On! ");
44                    car.waxed();
45                    car.waitForBuffing();
46                }
47            } catch (InterruptedException e) {
48                System.out.println("WaxOn Exiting via interrupt");
49            }
50            System.out.println("Ending Wax On task");
51        }
52    }
53
54    static class BufferOn implements Runnable {
55        private Car car;
56
57        public BufferOn(Car c) {
58            car = c;
59        }
60
61        @Override
62        public void run() {
63            try {
64                while (!Thread.interrupted()) {
65                    // 任务直接进入等待直到被唤醒, waxOn = true时得以执行
66                    car.waitForWaxing();
67                    System.out.print("Wax Off! ");
68                    TimeUnit.MILLISECONDS.sleep(100);
69                    car.buffed();
70                }
71            } catch (InterruptedException e) {
72                System.out.println("BufferOn Exiting via interrupt");
73            }
74            System.out.println("Ending Buffer On task");
75        }
76    }
77
78    public static void main(String[] args) throws Exception {
79        Car car = new Car();
80        ExecutorService exec = Executors.newCachedThreadPool();
81        exec.execute(new BufferOn(car));
82        exec.execute(new WaxOn(car));
83        TimeUnit.SECONDS.sleep(2); // Run for a while...
84        exec.shutdownNow(); // Interrupt all tasks
85    }
86}
87/* Output: (95% match)
88Wax On! Wax Off!  BufferOn Exiting via interrupt
89WaxOn Exiting via interrupt
90Ending Wax On task
91Ending Buffer On task
92*///:~

因为两个任务是交互等待-执行的,调用wait()方法而进入WAITING状态的线程可以被中断并抛出异常1,上面的输出显示BufferOn任务先响应中断,这只是可能的情况之一,因为输出 Wax Off! 之后BufferOn任务会进入等待,而正好被中断。

调用执行器的shutdownNow()方法关闭提交的任务,shutdownNow()方法会立即给已经提交的任务发送一个中断interrupt()命令。调用shutdownNow()之后,可以看到两个任务都抛出InterruptedException

⚠️注意: 两个任务都抛出中断异常和任务中的sleep方法有关,由于sleep和wait都可以被中断并抛出异常,所以异常的抛出是由谁引发的并不容易确定。虽然try块位于任务的最外层,但是Thread.interrupted()方法并不抛出异常。

上例实际上是利用了中断线程而出现的异常而终止线程的运行,然而,BLOCKED2状态下的线程无法响应中断。

无法中断 #

Thread提供了interrupt()方法,用于设置线程的中断状态。为了调用此方法,你必须持有Thread对象。并发编程过程中一般避免显式创建线程,上例中使用了shutdownNow()向任务发送interrup()命令,同样地,Java提供一个带有类型参数的接口 Future<V>,它具有取消任务执行的能力。

但是,阻塞状态下的线程是否都能响应中断呢?

 1public class Interrupting {
 2    private static ExecutorService exec = Executors.newCachedThreadPool();
 3
 4    static void test(Runnable r) throws InterruptedException {
 5        // 构造一个可中断的任务
 6        Future<?> f = exec.submit(r);
 7        TimeUnit.MILLISECONDS.sleep(100);
 8        // 中断任务
 9        System.out.println(r.getClass().getSimpleName() 
10            + " Interrupt: " + f.cancel(true));
11    }
12
13    public static void main(String[] args) throws Exception {
14        test(new SleepBlocked());
15        test(new IOBlocked(System.in)); // 不能中断
16        test(new SynchronizedBlocked()); // 不能中断
17        TimeUnit.SECONDS.sleep(3);
18        System.exit(0); // ... since last 2 interrupts failed
19    }
20
21    /** sleep可以被中断 */
22    static class SleepBlocked implements Runnable {
23        @Override
24        public void run() {
25            try {
26                TimeUnit.SECONDS.sleep(100);
27            } catch (InterruptedException e) {
28                System.out.println("InterruptedException");
29            }
30            System.out.println("Exiting SleepBlocked.run()");
31        }
32    }
33
34    /** I/O不可被中断 */
35    static class IOBlocked implements Runnable {
36        private InputStream in;
37
38        public IOBlocked(InputStream is) {
39            in = is;
40        }
41
42        @Override
43        public void run() {
44            try {
45                System.out.println("Waiting for read():");
46                in.read();
47            } catch (Exception e) {
48                if (Thread.currentThread().isInterrupted()) {
49                    System.out.println("Interrupted from blocked I/O");
50                } else {
51                    throw new RuntimeException(e);
52                }
53            }
54            System.out.println("Exiting IOBlocked.run()");
55        }
56    }
57
58    /** 不可被中断 */
59    static class SynchronizedBlocked implements Runnable {
60        public synchronized void f() {
61            while (true) // Never releases lock
62                Thread.yield();
63        }
64
65        public SynchronizedBlocked() {
66            // 构造之后就获取锁而不释放
67            new Thread(() -> {
68                f(); // Lock acquired by this thread
69            }).start();
70        }
71
72        /** run()方法将一直阻塞 */
73        @Override
74        public void run() {
75            System.out.println("Trying to call f()");
76            f();
77            System.out.println("Exiting SynchronizedBlocked.run()");
78        }
79    }
80}
81/* output:
82InterruptedException
83SleepBlocked Interrupt: true
84Exiting SleepBlocked.run()
85Waiting for read():
86IOBlocked Interrupt: true
87Trying to call f()
88SynchronizedBlocked Interrupt: true
89 *///:~

由于Future的cancel(boolean)方法也是向执行任务的线程发送interrupt()命令,上例中3个任务,只有SleepBlocked在休眠时被中断并退出运行,其他的两个任务IOBlockedSynchronizedBlocked均没有被中断。实际上,在编码过程中我们也可以发现,只有sleep()方法需要处理InterruptedException异常,而无论时I/O还是尝试调用synchronized方法,都不需要处理InterruptedException

对于I/O阻塞的情况,有一个简单的处理办法——即关闭任务在其上发生阻塞的资源:

 1public class CloseResource {
 2    public static void main(String[] args) throws Exception {
 3        ExecutorService exec = Executors.newCachedThreadPool();
 4        InputStream socketInput = new Socket("localhost", 8080).getInputStream();
 5        exec.execute(new Interrupting.IOBlocked(socketInput));
 6        exec.execute(new Interrupting.IOBlocked(System.in));
 7        TimeUnit.MILLISECONDS.sleep(10);
 8        System.out.println("Shutting down all threads");
 9        // 两个任务都无法响应中断
10        exec.shutdownNow();
11        TimeUnit.SECONDS.sleep(1);
12        System.out.println("Closing " + socketInput.getClass().getName());
13        // 关闭资源可以使线程响应中断
14        socketInput.close(); // Releases blocked thread
15        TimeUnit.SECONDS.sleep(1);
16        System.out.println("Closing " + System.in.getClass().getName());
17        System.in.close(); // Releases blocked thread
18    }
19}
20/* Output: (85% match)
21Waiting for read():
22Waiting for read():
23Shutting down all threads
24Closing java.net.SocketInputStream
25Interrupted from blocked I/O
26Exiting IOBlocked.run()
27Closing java.io.BufferedInputStream
28Exiting IOBlocked.run()
29*///:~

上例中的2个任务都无法响应中断,但是一旦关闭资源,那么阻塞就被中断。

对于因获取锁失败而阻塞的情况,实际上,上例中的情况可以看作是死锁,由于任务无法获取对象的锁而一直阻塞。幸运的是,Java提供ReentrantLock锁,其具备在因获取锁而阻塞但是又能响应中断的能力。

 1public class LockingInterrupt {
 2
 3    // 可重入锁获取锁的时候可以被中断
 4    private Lock lock = new ReentrantLock();
 5
 6    public LockingInterrupt() {
 7        // lock the instance once constructed
 8        lock.lock();
 9    }
10
11    void f() {
12        try {
13            // invoke can be interrupted
14            lock.lockInterruptibly();
15            System.out.println("acquire lock in f() success");
16        }catch (InterruptedException e){
17            System.out.println("Interrupted from acquire lock in f()");
18        }
19    }
20
21    static class MutexTask implements Runnable{
22        LockingInterrupt mbi = new LockingInterrupt();
23        @Override
24        public void run() {
25            System.out.println("waiting for f()");
26            mbi.f();
27            System.out.println("Broken out of blocked call");
28        }
29    }
30
31    public static void main(String[] args) throws InterruptedException {
32        Thread t = new Thread(new MutexTask());
33        t.start();
34        // 中断t,若不中断,t会一直阻塞
35        t.interrupt();
36    }
37}
38/*
39waiting for f()
40Interrupted from acquire lock in f()
41Broken out of blocked call
42*///:~

上例中,LockingInterrupt初始化的时候就占用锁,并没有释放锁,而在运行f()方法的时候再去获取锁时任务就被阻塞了,在调用interrupt()方法中断的时候,lockInterruptibly()响应了中断,任务结束程序退出。

惯用法 #

从上面的例子我们已经知道,可以通过检查线程的中断状态来结束任务的执行。下面的例子展示了一种惯用法,它使用try-finally块来紧跟资源,以应对任何时候任务出现中断时保证资源被释放:

 1public class InterruptingIdiom {
 2
 3    /** 需要清理的资源类 */
 4    static class NeedsCleanup {
 5        private final int id;
 6
 7        public NeedsCleanup(int ident) {
 8            id = ident;
 9            System.out.println("NeedsCleanup " + id);
10        }
11
12        public void cleanup() {
13            System.out.println("Cleaning up " + id);
14        }
15    }
16
17    static class Blocked3 implements Runnable {
18        private volatile double d = 0.0;
19
20        @Override
21        public void run() {
22            try {
23                while (!Thread.interrupted()) {
24                    NeedsCleanup n1 = new NeedsCleanup(1);
25                    // 在n1之后紧跟try-finally块,保证资源被合理的清除
26                    // node 1
27                    try {
28                        System.out.println("Sleeping");
29                        TimeUnit.SECONDS.sleep(1);
30                        NeedsCleanup n2 = new NeedsCleanup(2);
31                        // 同理
32                        // node2
33                        try {
34                            System.out.println("Calculating");
35                            //耗时操作
36                            for (int i = 1; i < 2500000; i++) {
37                                d = d + (Math.PI + Math.E) / d;
38                            }
39                            // node3
40                            System.out.println(
41                                "Finished time-consuming operation");
42                        } finally {
43                            n2.cleanup();
44                        }
45                    } finally {
46                        n1.cleanup();
47                    }
48                }
49                System.out.println("Exiting via while() test");
50            } catch (InterruptedException e) {
51                System.out.println("Exiting via InterruptedException");
52            }
53        }
54    }
55
56    public static void main(String[] args) throws Exception {
57        if (args.length != 1) {
58            System.out.println("usage: java InterruptingIdiom delay-in-mS");
59            System.exit(1);
60        }
61        Thread t = new Thread(new Blocked3());
62        t.start();
63        TimeUnit.MILLISECONDS.sleep(new Integer(args[0]));
64        t.interrupt();
65    }
66}
67/* Output: (Sample)
68NeedsCleanup 1
69Sleeping
70NeedsCleanup 2
71Calculating
72Finished time-consuming operation
73Cleaning up 2
74Cleaning up 1
75NeedsCleanup 1
76Sleeping
77Cleaning up 1
78Exiting via InterruptedException
79*///:~

上例接收一个参数,表示程序中断之前的运行时间(ms),由于任务中有一段耗时的循环操作,当参数大小不同时,程序的输出会有所差异:

任务可能在node1和node2之间中断,因此其输出为:

NeedsCleanup 1
Sleeping
Cleaning up 1
Exiting via InterruptedException

当任务在node2和node3之间设置中断状态,再次进入循环时中断被监测到,程序退出,此时的输出为:

NeedsCleanup 1
Sleeping
NeedsCleanup 2
Calculating
Finished time-consuming operation
Cleaning up 2
Cleaning up 1
Exiting via while() test

总之,无论任务在何时被释放,其创建的资源都会被合适地释放。



  1. TIJ第四版第21章并发(694页)在描述线程的状态时,将调用休眠/等待之后线程的状态称为阻塞。为避免混淆,本文采用 Thread.State中关于线程的描述,并认为其不应该被称为阻塞状态。 ↩︎

  2. 本博客约定此状态(等待锁)的线程才处于阻塞状态。 ↩︎