资源访问受限-锁和条件
可重入锁 #
Java SE 5之后提供了位于java.util.concurrent.locks
包下的显式互斥机制——Lock对象(显式锁),Lock对象必须被显式的创建,锁定和释放。
一般情况下 ,ReentrantLock保护代码块的基本结构是:
1myLock.lock(); // 可重入锁
2try{
3 // 临界区代码
4}finally{
5 myLock.unlock();
6}
这个结构可以确保同一时间只有一个线程进入临界区( critical section ),其他线程调用lock()
时会被阻塞,直到第一个线程释放锁。
我们利用锁机制来修改之前的转账逻辑,看看会发生什么:
1static class Bank {
2 private final double[] accounts;
3 // lock
4 private Lock lock;
5
6 public Bank(int accountCount, double money) {
7 // initialize bank account
8 accounts = new double[accountCount];
9 Arrays.fill(accounts, money);
10 // 使用JDK提供的可重入锁
11 lock = new ReentrantLock();
12 }
13
14 public void transfer(int from, int to, double amount)
15 throws InterruptedException {
16 lock.lock();
17 try {
18 if (accounts[from] < amount) return;
19 if (from == to) return;
20 // transfer
21 accounts[from] -= amount;
22 System.out.println(Thread.currentThread() + " move away");
23 accounts[to] += amount;
24 System.out.printf("%s: %10.2f from %d to %d, Total Balance: %10.2f%n",
25 Thread.currentThread(),
26 amount,
27 from,
28 to,
29 totalBalance());
30 } finally {
31 // 确保锁被释放
32 lock.unlock();
33 }
34 }
35
36 private double totalBalance() {
37 double sum = 0;
38 for (double a : accounts) {
39 sum += a;
40 }
41 return sum;
42 }
43
44 int size() {
45 return accounts.length;
46 }
47}
48/* output: (partial)
49Thread[Thread-0,5,main] move away
50Thread[Thread-0,5,main]: 948.12 from 22 to 50, Total Balance: 100000.00
51Thread[Thread-2,5,main] move away
52Thread[Thread-2,5,main]: 722.25 from 36 to 84, Total Balance: 100000.00
53Thread[Thread-4,5,main] move away
54Thread[Thread-4,5,main]: 621.82 from 62 to 45, Total Balance: 100000.00
55Thread[Thread-6,5,main] move away
56Thread[Thread-6,5,main]: 628.81 from 18 to 51, Total Balance: 100000.00
57Thread[Thread-8,5,main] move away
58...
59*///:~
上例中,我们对transfer()
方法里的核心代码块加锁,执行完成之后释放锁。每个线程在执行任务时都会获取锁,此时其他尝试进入方法的线程将被阻塞。从控制台输出来看,也是这样的:线程是有序执行的,下一个线程总是等待上一个线程执行完才开始执行,这样,无论多少次转账,总金额也不会变。
思考一个问题:totalBalance()方法是否需要加锁?
上面的示例使用了可重入锁1(ReentrantLock),可重入的意思是同一个线程可以重复获取锁,由一个计数器来记录锁获取的次数2,它实现了Lock接口的所有方法:
public void lock() {...}
若锁未被其他线程获取,获取锁,并将锁的计数器置为1,立即返回
若当前线程已经获取锁,锁的计数器+1,立即返回
若锁被其他线程占有,那么此线程休眠3
public void lockInterruptibly() throws InterruptedException {...}
同lock(),不过此法可以被中断(interrupted)
public boolean tryLock() {...}
尝试获取锁并立即返回,成功获取同lock()并返回true,失败则返回false
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {...}
带有超时机制的尝试获取锁,此法可被中断
public void unlock() {...}
若计数器>1,则计数器-1,不释放锁,否则计数器置为0并释放锁
public Condition newCondition() {...}
获取锁的条件对象
下例展示了尝试获取锁的情况:
1public class AttemptLocking {
2 private Lock lock = new ReentrantLock();
3
4 public static void main(String[] args)
5 throws InterruptedException {
6 AttemptLocking al = new AttemptLocking();
7 al.untimed();
8 al.timed();
9 new Thread(() -> {
10 al.lock.lock();
11 System.out.println("fetched");
12 }).start();
13 // let thread-0 finish
14 Thread.sleep(100);
15 al.untimed();
16 al.timed();
17
18 }
19
20 void untimed() {
21 boolean b = lock.tryLock();
22 try {
23 System.out.println("tryLock(): " + b);
24 } finally {
25 if (b) lock.unlock();
26 }
27 }
28
29 void timed() {
30 boolean b = false;
31 try {
32 b = lock.tryLock(2, TimeUnit.SECONDS);
33 System.out.println("tryLock(2, TimeUnit.SECONDS): " + b);
34 } catch (InterruptedException e) {
35 // e.printStackTrace();
36 } finally {
37 if (b) lock.unlock();
38 }
39 }
40}
41/* output:
42tryLock(): true
43tryLock(2, TimeUnit.SECONDS): true
44fetched
45tryLock(): false
46tryLock(2, TimeUnit.SECONDS): false
47*///:~
可以看到,main()方法中使用新线程获取了锁而不释放,此时再使用方法获取锁时失败,注意timed()方法在2s等待之后才返回失败。
可重入锁可以构建公平锁或非公平锁,默认使用非公平锁(上下文切换少,吞吐量高)。
条件 #
思考转账的逻辑,当从from转帐amount到to账户时,若from余额不足,任务会直接返回。
若想在from账户余额足够时再执行任务而不是直接退出,应该怎样做呢?
java.util.concurrent.locks
包下还提供了Condition对象,这个对象用来管理那些获得锁但是不能执行任务(条件不满足)的线程,条件可以这样使用:
1public void transfer(int from, int to, double amount)
2throws InterruptedException {
3 lock.lock();
4 try {
5 if (accounts[from] < amount) {
6 // could be interrupted
7 suficient.await();
8 };
9 if (from == to) return;
10 // transfer
11 accounts[from] -= amount;
12 System.out.println(Thread.currentThread() + " move away");
13 accounts[to] += amount;
14 System.out.printf("%s: %10.2f from %d to %d, Total Balance: %10.2f%n",
15 Thread.currentThread(),
16 amount,
17 from,
18 to,
19 totalBalance());
20 // invoke all waited condition
21 suficient.signalAll();
22 } finally {
23 lock.unlock();
24 }
25}
此时,当余额不足时,线程不再退出,而时等待其他转账线程唤醒之,知道满足条件继续执行任务。
void await() throws InterruptedException;
使当前线程等待,和条件相关的锁被释放。等待的线程可以被
singal()
或singalAll()
唤醒;若线程被中断也会解除等待状态;解除状态的线程重新排队获取锁void signalAll();
唤醒所有在此条件上等待的线程,被唤醒的线程需要重新获取锁
void signal();
唤醒在此条件上等待的任一线程,此方法具有随机性
此外,Condition还有一些带有超时参数和阻止中断的方法,请参照 Java SE API。
到此为止,我们可以利用锁和条件将转账任务改进为线程安全,功能更丰富类:
1public class SynchronizedTransfer {
2 static double INITIAL_MONEY = 1000;
3
4 public static void main(String[] args) {
5
6 int ACCOUNTS = 100;
7 Bank bank = new Bank(ACCOUNTS, INITIAL_MONEY);
8
9 for (int i = 0; i < ACCOUNTS; i++) {
10 Thread t = new Thread(new TransferTask(bank));
11 t.start();
12 // test thread
13 /* new Thread(new Runnable() {
14 @Override
15 public void run() {
16 double v = bank.totalBalance();
17 BigDecimal bigDecimal
18 = new BigDecimal(v)
19 .setScale(2,BigDecimal.ROUND_HALF_UP);
20 if (bigDecimal.intValue() != 100000){
21 System.out.println(bigDecimal + " is not even!");
22 }
23 }
24 }).start();*/
25 }
26
27 }
28
29 static class TransferTask implements Runnable {
30 private Bank bank;
31 private int size;
32 private double maxAmount = INITIAL_MONEY;
33
34 public TransferTask(Bank bank) {
35 this.bank = bank;
36 this.size = bank.size();
37 }
38
39 @Override
40 public void run() {
41 try {
42 int from = (int) (size * Math.random());
43 int to = (int) (size * Math.random());
44// int to = (from + 1 >= size) ? 0 : from + 1;
45 double amount = maxAmount * Math.random();
46 bank.transfer(from, to, amount);
47 Thread.sleep((long) (size * Math.random()));
48 } catch (InterruptedException e) {
49 // e.printStackTrace();
50 }
51 }
52 }
53
54 static class Bank {
55 private final double[] accounts;
56 // lock
57 private Lock lock;
58 // condition
59 private Condition suficient;
60
61 public Bank(int accountCount, double money) {
62 // initialize bank account
63 accounts = new double[accountCount];
64 Arrays.fill(accounts, money);
65 lock = new ReentrantLock();
66 suficient = lock.newCondition();
67 }
68
69 public void transfer(int from, int to, double amount)
70 throws InterruptedException {
71 lock.lock();
72 try {
73 if (accounts[from] < amount) {
74 // could be interrupted
75 suficient.await();
76 };
77 if (from == to) return;
78 // transfer
79 accounts[from] -= amount;
80 System.out.println(Thread.currentThread() + " move away");
81 accounts[to] += amount;
82 System.out.printf("%s: %10.2f from %d to %d,
83 Total Balance: %10.2f%n",
84 Thread.currentThread(),
85 amount,
86 from,
87 to,
88 totalBalance());
89 // invoke all waited condition
90 suficient.signalAll();
91 } finally {
92 lock.unlock();
93 }
94 }
95
96 private double totalBalance() {
97 double sum = 0;
98 for (double a : accounts) {
99 sum += a;
100 }
101 return sum;
102 }
103
104 int size() {
105 return accounts.length;
106 }
107 }
108}
实际上,上例在totalBalance()方法不加锁的情况下,转账任务也是安全的。
回答之前提出的问题:totalBalance()方法究竟是否需要加锁?
请注意main()方法中被注释的部分,它创建一个线程(记为T)去读取所有账户的余额,判断余额是否和初始化时相等。使用BigDecimal是为了处理Double数据类型的精度丢失。在totalBalance()不加锁的情况下,我们很容易看到这样的输出:
1/*
2Thread[Thread-0,5,main] move away
3Thread[Thread-0,5,main]: 793.81 from 86 to 37, Total Balance: 100000.00
499206.19 is not even!
5Thread[Thread-2,5,main] move away
6Thread[Thread-2,5,main]: 814.24 from 30 to 49, Total Balance: 100000.00
7...
8*///:~
这给出一个暗示:在有其他的线程访问totalBalance()方法时,totalBalance()不是线程安全的。尽管transfer()方法加锁了,任意时刻只有一个线程访问totalBalance()方法,但是T和转账线程不相关,它可被CPU调度与转账线程竞争对totalBalance()方法中的accounts资源的访问,正如上述输出所显示的那样。
所以,是否加锁应该以资源是否共享为参照
当没有被注释的部分时,由于transfer()方法加锁了,线程在transfer()方法中调用totalBalance()不会受到其他线程的影响;当被注释的线程运行时,这时totalBalance资源可能被共享访问了,为保证安全就必须加锁。