线程与任务(二)

线程与任务(二)

第一篇文章中,讨论了线程与任务的概念,以及利用任务(Runnable接口)来创建线程。

同时,讨论了线程的生命周期。此外,介绍了线程的优先级以及守护线程这两个实用性不高的概念。

最后,讨论了线程的中断状态这个概念。线程的中断状态以及如何响应中断,对于理解线程的运行机制很重要。

这一篇,继续讨论几个线程相关的概念,包括:

  • 线程的让步
  • 等待线程加入
  • 自管理线程
  • 处理线程的异常

线程让步 #

Thread.yield()是一个静态方法,可以给线程调度器一个暗示:当前线程的run()方法已经完成的差不多了,可以让别的线程(相同优先级)使用CPU了。注意,没有任何机制保证这个暗示一定会采纳。

这个方法并不能控制线程的运行,不要误用此方法

线程休眠 #

通常调用sleep()可以使线程中止一段时间,此时线程让出CPU时间给其他线程使用。

Java SE 5 之后,可以使用 TimeUnit来执行这个行为1

调用sleep()可能引发 中断异常Interrupted Exception )。

需要说明的是,不同于Object.wait(),在使用同步时,线程的休眠并不会释放锁。

加入线程(join) #

可以在一个线程( A )中调用另一个线程( B )的join()方法,其效果是A线程会进入等待(挂起),等待B线程执行完毕后再继续执行join()方法可以接受一个时间参数,表示最长等待时间,若超时仍未返回,A线程继续执行。

join()方法可以被中断,中断发生的情况和休眠一致。

下面的代码演示了 interruptsleepjoin方法所执行的操作:

 1public class JoinAndSleep {
 2    public static void main(String[] args) {
 3        Slepper sa = new Slepper("sa",100);
 4        Slepper sb = new Slepper("sb",100);
 5        Joiner ja = new Joiner("ja",sa);
 6        Joiner jb = new Joiner("jb",sb);
 7        sa.interrupt();
 8    }
 9}
10// 继承Thread来创建线程
11class Slepper extends Thread{
12    private int duration;
13
14    public Slepper(String name, int duration) {
15        super(name);
16        this.duration = duration;
17        start();
18    }
19
20    @Override
21    public void run() {
22        try{
23            sleep(duration);
24        }catch (InterruptedException e){
25            System.out.println(currentThread() 
26            + "is interrupted ? " + isInterrupted());
27            return;
28        }
29        System.out.println(currentThread() + " has awakened.");
30    }
31}
32
33class Joiner extends Thread{
34    private Slepper slepper;
35
36    public Joiner(String name, Slepper slepper) {
37        super(name);
38        this.slepper = slepper;
39        start();
40    }
41
42    @Override
43    public void run() {
44        try {
45            slepper.join();
46        } catch (InterruptedException e) {
47            System.out.println(currentThread() + " interrupted()");
48            return;
49        }
50        System.out.println(currentThread() + "join completed.");
51    }
52}
53/* output:
54Thread[sa,5,main]is interrupted ? false
55Thread[ja,5,main]join completed.
56Thread[sb,5,main] has awakened.
57Thread[jb,5,main]join completed.
58*///:~

上例中,jajb总是会等待sasb完成,samain()中被设置中断状态,因此在sarun()方法执行sleep()会抛出异常,同时清除中断状态,因此中断状态为false

可以使不同的线程中断查看程序的状态

若上例在main()方法中使ja中断,那么可能的输出结果是:

1/*
2Thread[ja,5,main] interrupted()
3Thread[sa,5,main] has awakened.
4Thread[sb,5,main] has awakened.
5Thread[jb,5,main]join completed.
6*/

此时的情况是jarun()方法中执行join()抛出异常,此时ja直接结束而不等待sa运行结束。

上例中,还有一个关注点:在构造器中直接调用了start()方法,这种方式称为自管理线程

简单的无锁同步 #

不难理解,“加入一个线程”含有让线程有序执行的语义,利用这个性质,可以实现简单的无锁同步

 1public class SyncWithoutSynchronized {
 2
 3    private int sum;
 4
 5    void increase() {
 6        sum+=;
 7        System.out.println(Thread.currentThread() + ": " + sum);
 8    }
 9
10    /** 单线程模式 */
11    void singleThread() throws InterruptedException {
12        Thread task = new Thread(() -> {
13            for (int i = 0; i < 10; i++) {
14                increase();
15            }
16        });
17        task.start();
18        // 等待task执行完成
19        task.join();
20        System.out.println(sum);
21    }
22
23    void multiThread() throws InterruptedException {
24        for (int i = 0; i < 10; i++) {
25            Thread thread = new Thread(() -> {
26                for (int j = 0; j < 1; j++) {
27                    increase();
28                }
29            });
30            thread.start();
31            // 使用join()保证有序性,此时可以不需要同步
32            // join() 保证了happens-before原则
33            thread.join();
34        }
35        // 主线程等待所有的子线程结束
36        System.out.println(sum);
37    }
38
39    public static void main(String[] args) throws InterruptedException {
40        SyncWithoutSynchronized va = new SyncWithoutSynchronized();
41    //        va.singleThread();
42        va.multiThread();
43    }
44}
45/* output:
46Thread[Thread-0,5,main]: 1
47Thread[Thread-1,5,main]: 2
48Thread[Thread-2,5,main]: 3
49Thread[Thread-3,5,main]: 4
50Thread[Thread-4,5,main]: 5
51Thread[Thread-5,5,main]: 6
52Thread[Thread-6,5,main]: 7
53Thread[Thread-7,5,main]: 8
54Thread[Thread-8,5,main]: 9
55Thread[Thread-9,5,main]: 10
5610
57*///:~

上例使用后台任务自增共享变量sum的值,主线程总是等待后台任务执行完成之后再返回。在multiThread()方法中,额外开启了10个线程,每一个线程都在主线程上调用join()方法,从输出来看,线程0-9是顺序执行的,最终的结果不会出现讹误,这种情况下,实现了无锁同步,而共享变量sum不需要额外处理。

自管理线程 #

除了实现Runnable接口之外,还可以通过继承Thread类来创建线程:

 1public class SelfManageThread {
 2
 3    public static void main(String[] args) {
 4        for (int i = 0; i <5 ; i++) {
 5            new SelfManaged();
 6//            new SlefRunnable();
 7        }
 8    }
 9
10    static class SelfManaged extends Thread {
11        private static int count = 0;
12        private final int id = count;
13
14        public SelfManaged() {
15            super(String.valueOf(++count));
16          	// 在构造器中调用start()
17            start();
18        }
19
20        @Override
21        public String toString() {
22            return "#" + getName() + "(" + id + "), ";
23        }
24
25        @Override
26        public void run() {
27            System.out.print(this);
28            Thread.yield();
29        }
30    }
31}
32/* output: (sample)
33 * #1(1), #4(4), #5(5), #3(3), #2(2),
34 *///:~

上面的示例中,对象创建时顺便创建并启动线程

一般地,任务都实现自Runnable接口,同样可以利用Runnable实现自管理线程:

 1static class SlefRunnable implements Runnable{
 2  private static int count = 0;
 3  private final int id = ++count;
 4  private Thread t = new Thread(this, String.valueOf(id));
 5
 6  public SlefRunnable() {
 7    t.start();
 8  }
 9
10  @Override
11  public String toString() {
12    return "#" + t.getName() + "(" + id + "), ";
13  }
14
15  @Override
16  public void run() {
17    System.out.print(this);
18    Thread.yield();
19  }
20}

实现Runnable的好处是其可以再继承自某个类(如果需要的话)。

由于示例比较简单,因此在构造器中启动线程可能是安全的。但是,并不建议在构造器中启动线程,这样可能会存在风险:另一个任务可能在实例初始化完成之前开始执行,这意味着访问处于不稳定的状态。

自管理线程的惯用法 #

有时候,把线程以内部类的形式实现可能会很有用,就像上面的示例那样,甚至可以使用匿名内部类:

 1// form1 比较常用
 2//...
 3Thread thread = new Thread(new Runnable() {
 4  private int count = 5;
 5
 6  @Override
 7  public String toString() {
 8    return "#" + Thread.currentThread().getName() + "(" + count + "), ";
 9  }
10
11  @Override
12  public void run() {
13    while (--count > 0) {
14      System.out.print(this);
15      Thread.yield();
16    }
17  }
18});
19thread.start();
20//...
21
22// form2
23public class SelfManageThread {
24    Thread thread;
25    public SelfManageThread() {
26        thread= new Thread(new Runnable() {
27            private int count = 5;
28
29            @Override
30            public String toString() {
31                return "#" + Thread.currentThread().getName() 
32                + "(" + count + "), ";
33            }
34
35            @Override
36            public void run() {
37                while (--count > 0) {
38                    System.out.print(this);
39                    Thread.yield();
40                }
41            }
42        });
43        thread.start();
44    }
45}

需要说明的是,本节讨论的都是显式创建线程的方式,这种方式在部分编程规范2里已经不再推荐了,尤其在很多线程协同的场景下,创建并维护线程的成本以及上下文切换的成本会非常高,此时,线程池将是更好的选择。

捕获线程的异常 #

从线程中逃逸的异常不能被捕获,一旦线程中的异常逃逸到run()方法外部,那么它将会传播到控制台,这种情况下,线程就终止了。

 1public class ExceptionThread {
 2
 3    public static void main(String[] args) {
 4        Thread t = new Thread(new ExceptionT());
 5        try {
 6            t.start();
 7        } catch (Exception x) {
 8            System.out.println(x.toString());
 9        }
10    }
11
12    static class ExceptionT implements Runnable {
13        @Override
14        public void run() {
15            throw  new RuntimeException();
16        }
17    }
18}
19/*
20Exception in thread "Thread-0" java.lang.RuntimeException
21	at com.access.concurrency.basic.ExceptionThread$ExceptionT
22    .run(ExceptionThread.java:22)
23	at java.lang.Thread.run(Thread.java:748)
24*///:~

可以看到,线程抛出的异常无法在线程外部被捕获

在Java SE 5之后,为Thread类添加了一个接口Thread.UncaughtExceptionHandler,该接口允许在每个Thread对象上分配一个异常处理器,用来应对线程出现未捕获的异常而濒临死亡的情况。

 1public class ExceptionThread2 {
 2
 3    public static void main(String[] args) {
 4      	// 静态方法,为线程分配默认异常处理器
 5        Thread.setDefaultUncaughtExceptionHandler(
 6            new MyUncaughtExceptionHandler(true));
 7        Thread t = new Thread(new ExceptionT());
 8        // 分配异常处理器
 9        t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
10        t.start();
11
12    }
13
14    static class ExceptionT implements Runnable {
15        @Override
16        public void run() {
17            throw new RuntimeException();
18        }
19    }
20
21  	// 自定义异常处理器
22    static class MyUncaughtExceptionHandler
23        implements Thread.UncaughtExceptionHandler {
24
25        private boolean isDeafult;
26
27        public MyUncaughtExceptionHandler() {
28        }
29
30        public MyUncaughtExceptionHandler(boolean isDeafult) {
31            this.isDeafult = isDeafult;
32        }
33
34        @Override
35        public void uncaughtException(Thread t, Throwable e) {
36            System.out.println("default ?(" + isDeafult+ ") " + "caught " + e);
37        }
38    }
39}
40/* output:
41default ?(true) caught java.lang.RuntimeException
42*///:~

给线程分配异常处理器的的方法有2个:

1- static setDefaultUncaughtExceptionHandler(UncaughtExceptionHandler eh)
2- setUncaughtExceptionHandler(UncaughtExceptionHandler eh)

其中给线程分配默认异常处理器是静态方法,调用之后,在此线程内创建的所有线程都使用此异常处理器



  1. 上文已多处使用此方法。 ↩︎

  2. 阿里编码规范不推荐显式创建线程。 ↩︎