资源访问受限-锁和条件

资源访问受限-锁和条件

可重入锁 #

Java SE 5之后提供了位于java.util.concurrent.locks包下的显式互斥机制——Lock对象(显式锁),Lock对象必须被显式的创建,锁定和释放

一般情况下 ,ReentrantLock保护代码块的基本结构是:

1myLock.lock(); // 可重入锁
2try{
3  // 临界区代码
4}finally{
5  myLock.unlock();
6}

这个结构可以确保同一时间只有一个线程进入临界区( critical section ),其他线程调用lock()时会被阻塞,直到第一个线程释放锁。

我们利用锁机制来修改之前的转账逻辑,看看会发生什么:

 1static class Bank {
 2  private final double[] accounts;
 3  // lock
 4  private Lock lock;
 5
 6  public Bank(int accountCount, double money) {
 7    // initialize bank account
 8    accounts = new double[accountCount];
 9    Arrays.fill(accounts, money);
10    // 使用JDK提供的可重入锁
11    lock = new ReentrantLock();
12  }
13
14  public void transfer(int from, int to, double amount) 
15  throws InterruptedException {
16    lock.lock();
17    try {
18      if (accounts[from] < amount) return;
19      if (from == to) return;
20      // transfer
21      accounts[from] -= amount;
22      System.out.println(Thread.currentThread() + " move away");
23      accounts[to] += amount;
24      System.out.printf("%s: %10.2f from %d to %d, Total Balance: %10.2f%n",
25                        Thread.currentThread(),
26                        amount,
27                        from,
28                        to,
29                        totalBalance());
30    } finally {
31      // 确保锁被释放
32      lock.unlock();
33    }
34  }
35
36  private double totalBalance() {
37      double sum = 0;
38      for (double a : accounts) {
39          sum += a;
40        }
41    	return sum;
42  }
43
44  int size() {
45    return accounts.length;
46  }
47}
48/* output: (partial)
49Thread[Thread-0,5,main] move away
50Thread[Thread-0,5,main]:     948.12 from 22 to 50, Total Balance:  100000.00
51Thread[Thread-2,5,main] move away
52Thread[Thread-2,5,main]:     722.25 from 36 to 84, Total Balance:  100000.00
53Thread[Thread-4,5,main] move away
54Thread[Thread-4,5,main]:     621.82 from 62 to 45, Total Balance:  100000.00
55Thread[Thread-6,5,main] move away
56Thread[Thread-6,5,main]:     628.81 from 18 to 51, Total Balance:  100000.00
57Thread[Thread-8,5,main] move away
58...
59*///:~

上例中,我们对transfer()方法里的核心代码块加锁,执行完成之后释放锁。每个线程在执行任务时都会获取锁,此时其他尝试进入方法的线程将被阻塞。从控制台输出来看,也是这样的:线程是有序执行的,下一个线程总是等待上一个线程执行完才开始执行,这样,无论多少次转账,总金额也不会变。

思考一个问题:totalBalance()方法是否需要加锁?


上面的示例使用了可重入锁1(ReentrantLock),可重入的意思是同一个线程可以重复获取锁,由一个计数器来记录锁获取的次数2,它实现了Lock接口的所有方法:

  • public void lock() {...}

    若锁未被其他线程获取,获取锁,并将锁的计数器置为1,立即返回

    若当前线程已经获取锁,锁的计数器+1,立即返回

    若锁被其他线程占有,那么此线程休眠3

  • public void lockInterruptibly() throws InterruptedException {...}

    同lock(),不过此法可以被中断(interrupted)

  • public boolean tryLock() {...}

    尝试获取锁并立即返回,成功获取同lock()并返回true,失败则返回false

  • public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {...}

    带有超时机制的尝试获取锁,此法可被中断

  • public void unlock() {...}

    若计数器>1,则计数器-1,不释放锁,否则计数器置为0并释放锁

  • public Condition newCondition() {...}

    获取锁的条件对象

下例展示了尝试获取锁的情况:

 1public class AttemptLocking {
 2    private Lock lock = new ReentrantLock();
 3
 4    public static void main(String[] args)
 5     throws InterruptedException {
 6        AttemptLocking al = new AttemptLocking();
 7        al.untimed();
 8        al.timed();
 9        new Thread(() -> {
10            al.lock.lock();
11            System.out.println("fetched");
12        }).start();
13        // let thread-0 finish
14        Thread.sleep(100);
15        al.untimed();
16        al.timed();
17
18    }
19
20    void untimed() {
21        boolean b = lock.tryLock();
22        try {
23            System.out.println("tryLock(): " + b);
24        } finally {
25            if (b) lock.unlock();
26        }
27    }
28
29    void timed() {
30        boolean b = false;
31        try {
32            b = lock.tryLock(2, TimeUnit.SECONDS);
33            System.out.println("tryLock(2, TimeUnit.SECONDS): " + b);
34        } catch (InterruptedException e) {
35            // e.printStackTrace();
36        } finally {
37            if (b) lock.unlock();
38        }
39    }
40}
41/* output:
42tryLock(): true
43tryLock(2, TimeUnit.SECONDS): true
44fetched
45tryLock(): false
46tryLock(2, TimeUnit.SECONDS): false
47*///:~

可以看到,main()方法中使用新线程获取了锁而不释放,此时再使用方法获取锁时失败,注意timed()方法在2s等待之后才返回失败

可重入锁可以构建公平锁或非公平锁,默认使用非公平锁(上下文切换少,吞吐量高)。

条件 #

思考转账的逻辑,当从from转帐amount到to账户时,若from余额不足,任务会直接返回。

若想在from账户余额足够时再执行任务而不是直接退出,应该怎样做呢?

java.util.concurrent.locks包下还提供了Condition对象,这个对象用来管理那些获得锁但是不能执行任务(条件不满足)的线程,条件可以这样使用:

 1public void transfer(int from, int to, double amount) 
 2throws InterruptedException {
 3  lock.lock();
 4  try {
 5    if (accounts[from] < amount) {
 6      // could be interrupted
 7      suficient.await();
 8    };
 9    if (from == to) return;
10    // transfer
11    accounts[from] -= amount;
12    System.out.println(Thread.currentThread() + " move away");
13    accounts[to] += amount;
14    System.out.printf("%s: %10.2f from %d to %d, Total Balance: %10.2f%n",
15                      Thread.currentThread(),
16                      amount,
17                      from,
18                      to,
19                      totalBalance());
20    // invoke all waited condition
21    suficient.signalAll();
22  } finally {
23    lock.unlock();
24  }
25}

此时,当余额不足时,线程不再退出,而时等待其他转账线程唤醒之,知道满足条件继续执行任务。

  • void await() throws InterruptedException;

    使当前线程等待,和条件相关的锁被释放。等待的线程可以被singal()singalAll()唤醒;若线程被中断也会解除等待状态;解除状态的线程重新排队获取锁

  • void signalAll();

    唤醒所有在此条件上等待的线程,被唤醒的线程需要重新获取锁

  • void signal();

    唤醒在此条件上等待的任一线程,此方法具有随机性

此外,Condition还有一些带有超时参数阻止中断的方法,请参照 Java SE API

到此为止,我们可以利用锁和条件将转账任务改进为线程安全,功能更丰富类:

  1public class SynchronizedTransfer {
  2    static double INITIAL_MONEY = 1000;
  3
  4    public static void main(String[] args) {
  5
  6        int ACCOUNTS = 100;
  7        Bank bank = new Bank(ACCOUNTS, INITIAL_MONEY);
  8
  9        for (int i = 0; i < ACCOUNTS; i++) {
 10            Thread t = new Thread(new TransferTask(bank));
 11            t.start();
 12						// test thread
 13            /* new Thread(new Runnable() {
 14                @Override
 15                public void run() {
 16                    double v = bank.totalBalance();
 17                    BigDecimal bigDecimal  
 18                    = new BigDecimal(v)
 19                            .setScale(2,BigDecimal.ROUND_HALF_UP);
 20                    if (bigDecimal.intValue()  != 100000){
 21                        System.out.println(bigDecimal +  "  is not even!");
 22                    }
 23                }
 24            }).start();*/
 25        }
 26
 27    }
 28
 29    static class TransferTask implements Runnable {
 30        private Bank bank;
 31        private int size;
 32        private double maxAmount = INITIAL_MONEY;
 33
 34        public TransferTask(Bank bank) {
 35            this.bank = bank;
 36            this.size = bank.size();
 37        }
 38
 39        @Override
 40        public void run() {
 41            try {
 42                int from = (int) (size * Math.random());
 43                int to = (int) (size * Math.random());
 44//                int to = (from + 1 >= size) ? 0 : from + 1;
 45                double amount = maxAmount * Math.random();
 46                bank.transfer(from, to, amount);
 47                Thread.sleep((long) (size * Math.random()));
 48            } catch (InterruptedException e) {
 49                // e.printStackTrace();
 50            }
 51        }
 52    }
 53
 54    static class Bank {
 55        private final double[] accounts;
 56        // lock
 57        private Lock lock;
 58      	// condition
 59        private Condition suficient;
 60
 61        public Bank(int accountCount, double money) {
 62            // initialize bank account
 63            accounts = new double[accountCount];
 64            Arrays.fill(accounts, money);
 65            lock = new ReentrantLock();
 66            suficient  = lock.newCondition();
 67        }
 68
 69        public void transfer(int from, int to, double amount) 
 70        throws InterruptedException {
 71            lock.lock();
 72            try {
 73                if (accounts[from] < amount) {
 74                    // could be interrupted
 75                    suficient.await();
 76                };
 77                if (from == to) return;
 78                // transfer
 79                accounts[from] -= amount;
 80                System.out.println(Thread.currentThread() + " move away");
 81                accounts[to] += amount;
 82                System.out.printf("%s: %10.2f from %d to %d,
 83                Total Balance: %10.2f%n",
 84                    Thread.currentThread(),
 85                    amount,
 86                    from,
 87                    to,
 88                    totalBalance());
 89                // invoke all waited condition
 90                suficient.signalAll();
 91            } finally {
 92                lock.unlock();
 93            }
 94        }
 95
 96        private double totalBalance() {
 97                double sum = 0;
 98                for (double a : accounts) {
 99                    sum += a;
100                }
101                return sum;
102        }
103
104        int size() {
105            return accounts.length;
106        }
107    }
108}

实际上,上例在totalBalance()方法不加锁的情况下,转账任务也是安全的

回答之前提出的问题:totalBalance()方法究竟是否需要加锁?

请注意main()方法中被注释的部分,它创建一个线程(记为T)去读取所有账户的余额,判断余额是否和初始化时相等。使用BigDecimal是为了处理Double数据类型的精度丢失。在totalBalance()不加锁的情况下,我们很容易看到这样的输出:

1/*
2Thread[Thread-0,5,main] move away
3Thread[Thread-0,5,main]:     793.81 from 86 to 37, Total Balance:  100000.00
499206.19  is not even!
5Thread[Thread-2,5,main] move away
6Thread[Thread-2,5,main]:     814.24 from 30 to 49, Total Balance:  100000.00
7...
8*///:~

这给出一个暗示:在有其他的线程访问totalBalance()方法时,totalBalance()不是线程安全的。尽管transfer()方法加锁了,任意时刻只有一个线程访问totalBalance()方法,但是T和转账线程不相关,它可被CPU调度与转账线程竞争对totalBalance()方法中的accounts资源的访问,正如上述输出所显示的那样。

所以,是否加锁应该以资源是否共享为参照

当没有被注释的部分时,由于transfer()方法加锁了,线程在transfer()方法中调用totalBalance()不会受到其他线程的影响;当被注释的线程运行时,这时totalBalance资源可能被共享访问了,为保证安全就必须加锁。


  1. 可重入锁是典型的独占锁。 ↩︎

  2. 计数器最大231-1。 ↩︎

  3. 实际上线程进入同步队列中排队,并自旋尝试获取锁,获取失败则线程的中断状态置位。 ↩︎