Semaphore

Semaphore

Semaphore #

无论是显式锁还是通过synchronized关键字获取的隐式锁,其在任一时刻都只能让一个任务访问资源,而Semaphore(计数信号量)允许多个任务同时访问资源。可以把Semaphore看作是持有对象访问许可(permits)的“security”。访问对象时,须先通过acquire()获取许可,若此时没有许可可用,那么acquire()将阻塞,否则获取许可,可用许可数-1;使用完资源后,通过release()方法返还许可。事实上,并没有实际上的许可证对象,Semaphore通过协同各个线程工作,来达到目的。

Semaphore的构造器接受一个“公平性参数”。不传入此参数或传入false时,线程获取许可的顺序无法保证,即使线程阻塞了很久,其仍然可能被刚调用acquire()方法的线程“抢走”许可,这可能会导致线程“饿死”。当传入true时,Semaphore保证线程获取许可的顺序和其调用acquire()方法之后被执行的顺序一致1,也就是先执行的任务先获取许可(FIFO)。需要说明的是,tryAcquire()方法不遵循公平性原则,如果有许可可用,它直接获取之。在使用Semaphore时,一般将其设置为公平

Semaphore通常用于限制访问资源的线程数量,典型的例子就是控制“池”的并发访问量。下例中使用Semaphore控制池中的对象方法,当需要使用时,可以将它们“签出”(checkout),使用完毕之后再将其“签入”(checkin),使用泛型类封装功能2

 1class Pool<T> {
 2    private final int size;
 3    final List<T> items = new ArrayList<>();
 4    private final boolean[] checkedOut;
 5    private final Semaphore available;
 6
 7    public Pool(Class<T> classObject, int size) {
 8        this.size = size;
 9        checkedOut = new boolean[size];
10        available = new Semaphore(size, true);
11        // Load pool with objects that can be checked out:
12        for (int i = 0; i < size; ++i) {
13            try {
14                // Assumes a default constructor:
15                items.add(classObject.newInstance());
16            } catch (Exception e) {
17                throw new RuntimeException(e);
18            }
19        }
20    }
21
22    T checkOut() throws InterruptedException {
23        available.acquire();
24        return getItem();
25    }
26
27    void checkIn(T x) {
28        if (releaseItem(x)) {
29            available.release();
30            System.out.println("release " + x);
31        }
32    }
33
34    void checkAllIn() {
35        available.release(releaseAll());
36    }
37
38    private synchronized T getItem() {
39        for (int i = 0; i < size; ++i) {
40            if (!checkedOut[i]) {
41                checkedOut[i] = true;
42                return items.get(i);
43            }
44        }
45        // Semaphore prevents reaching here
46        return null;
47    }
48
49    private synchronized boolean releaseItem(T item) {
50        int index = items.indexOf(item);
51        if (index == -1) {
52            return false; // Not in the list
53        }
54        if (checkedOut[index]) {
55            checkedOut[index] = false;
56
57            return true;
58        }
59        // Wasn't checked out
60        return false;
61    }
62
63    private synchronized int releaseAll() {
64        int r = 0;
65        for (int i = 0; i < items.size(); i++) {
66            if (checkedOut[i]) {
67                checkedOut[i] = false;
68                ++r;
69            }
70        }
71        return r;
72    }
73}

这个池使用checkoutcheckIn方法来签出和签入对象,在签出对象之前调用acquire(),如果没有可用对象,那么checkOut将阻塞。由于Semaphore的机制,checkOut方法并不需要使用同步,但是getItem方法则需要同步了,Semaphore协同多线程对资源的访问,但是并不能保证多线程对资源修改的并发安全,这是两回事3checkIn方法则判断给定对象是否被使用,是则签入之,否则不做任何操作,同样的,releaseItem方法也需要使用同步。

The semaphore encapsulates the synchronization needed to restrict access to the pool, separately from any synchronization needed to maintain the consistency of the pool itself.

接下来我们可以测试这个池能否正常工作了:

  1public class SemaphoreDemo {
  2
  3    private static class AcquireTask<T> implements Runnable {
  4        private static int counter = 0;
  5        private final int id = counter++;
  6        private final Pool<T> pool;
  7
  8        public AcquireTask(Pool<T> pool) {
  9            this.pool = pool;
 10        }
 11
 12        @Override
 13        public void run() {
 14            try {
 15                T item = pool.checkOut();
 16                System.out.println(this + " acquire " + item);
 17            } catch (InterruptedException e) {
 18                // Acceptable way to terminate
 19            }
 20        }
 21
 22        @Override
 23        public String toString() {
 24            return "CheckoutTask-" + id;
 25        }
 26    }
 27
 28    private static class ReleaseTask<T> implements Runnable {
 29        private static int counter = 0;
 30        private final int id = counter++;
 31        private final Pool<T> pool;
 32
 33        public ReleaseTask(Pool<T> pool) {
 34            this.pool = pool;
 35        }
 36
 37        @Override
 38        public void run() {
 39            try {
 40                List<T> items = pool.items;
 41                for (T item : items) {
 42                    pool.checkIn(item);
 43                }
 44            } catch (Exception e) {
 45                // Acceptable way to terminate
 46            }
 47        }
 48
 49        @Override
 50        public String toString() {
 51            return "AcquireTask-" + id + " ";
 52        }
 53    }
 54
 55    private static class Fat {
 56        private volatile double d; // Prevent optimization
 57        private static int counter = 0;
 58        private final int id = counter++;
 59
 60        public Fat() {
 61            // Expensive, interruptible operation:
 62            for (int i = 1; i < 10000; i++) {
 63                d += (Math.PI + Math.E) / (double) i;
 64            }
 65        }
 66
 67        @Override
 68        public String toString() {
 69            return "Fat-" + id;
 70        }
 71    }
 72
 73    final static int SIZE = 5;
 74
 75    private void test() throws InterruptedException {
 76        final Pool<Fat> pool = new Pool<>(Fat.class, SIZE);
 77        ExecutorService exec = Executors.newCachedThreadPool();
 78        for (int i = 0; i < SIZE; i++) {
 79            exec.execute(new AcquireTask<>(pool));
 80        }
 81        exec.execute(new ReleaseTask<>(pool));
 82        List<Fat> list = new ArrayList<>();
 83        for (int i = 0; i < SIZE; i++) {
 84            Fat f = pool.checkOut();
 85            System.out.println(i + ": main() acquire " + f);
 86            list.add(f);
 87        }
 88        Future<?> blocked = exec.submit(() -> {
 89            try {
 90                // Semaphore prevents additional checkout,
 91                // so call is blocked:
 92                pool.checkOut();
 93            } catch (InterruptedException e) {
 94                System.out.println("checkOut() Interrupted");
 95            }
 96        });
 97        TimeUnit.SECONDS.sleep(2);
 98        blocked.cancel(true); // Break out of blocked call
 99        // release all items
100        pool.checkAllIn();
101        for (Fat f : list) {
102            pool.checkIn(f); // Second checkIn ignored
103        }
104        exec.shutdown();
105    }
106
107    public static void main(String[] args) throws Exception {
108        SemaphoreDemo semaphoreDemo = new SemaphoreDemo();
109        semaphoreDemo.test();
110    }
111}
112/* output(sample)
113AcquireTask-0 acquire Fat-0
114AcquireTask-4 acquire Fat-4
115AcquireTask-3 acquire Fat-3
116AcquireTask-2 acquire Fat-2
117AcquireTask-1 acquire Fat-1
118release Fat-0
1190: main() acquire Fat-0
120release Fat-1
1211: main() acquire Fat-1
122release Fat-2
1232: main() acquire Fat-2
124release Fat-3
1253: main() acquire Fat-3
126release Fat-4
1274: main() acquire Fat-4
128checkOut() Interrupted
129*///:~

上例SemaphoreDemo有两个任务,分别用于签入签出对象,程序首先使用AcquireTask签出所有对象,接着使用ReleaseTask签入对象。主线程接着依次签出所有对象,可以看到,主线程的签出过程是被阻塞的,只有对象签入之后,才能被签出。主线程签出所有对象之后,由于没有签入任务,接着的签出任务一定是被阻塞的,主线程休眠2s后中断了阻塞的任务。



  1. 并不能保证先调用acquire()方法的线程就能先获得许可,而是先调用方法的线程先执行内部逻辑的线程优先获取许可。所以有可能线程a先于线程b调用acquire()方法,但是却晚于线程b到达“等待点”。 ↩︎

  2. 这个示例演化自Semaphore的javaDoc: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Semaphore.html,来自TIJ。 ↩︎

  3. 如果使用是个“许可证数”为1的Semaphore,其作用相当于一个独占锁,任意时刻只有一个任务能够获取许可并且对资源进行修改,此时,getItem方法可以不使用同步。 ↩︎