Semaphore
Semaphore #
无论是显式锁还是通过synchronized
关键字获取的隐式锁,其在任一时刻都只能让一个任务访问资源,而Semaphore
(计数信号量)允许多个任务同时访问资源。可以把Semaphore
看作是持有对象访问许可(permits)的“security”。访问对象时,须先通过acquire()
获取许可,若此时没有许可可用,那么acquire()
将阻塞,否则获取许可,可用许可数-1;使用完资源后,通过release()
方法返还许可。事实上,并没有实际上的许可证对象,Semaphore
通过协同各个线程工作,来达到目的。
Semaphore
的构造器接受一个“公平性参数”。不传入此参数或传入false时,线程获取许可的顺序无法保证,即使线程阻塞了很久,其仍然可能被刚调用acquire()
方法的线程“抢走”许可,这可能会导致线程“饿死”。当传入true时,Semaphore
保证线程获取许可的顺序和其调用acquire()
方法之后被执行的顺序一致1,也就是先执行的任务先获取许可(FIFO)。需要说明的是,tryAcquire()
方法不遵循公平性原则,如果有许可可用,它直接获取之。在使用Semaphore
时,一般将其设置为公平的
Semaphore
通常用于限制访问资源的线程数量,典型的例子就是控制“池”的并发访问量。下例中使用Semaphore
控制池中的对象方法,当需要使用时,可以将它们“签出”(checkout),使用完毕之后再将其“签入”(checkin),使用泛型类封装功能2。
1class Pool<T> {
2 private final int size;
3 final List<T> items = new ArrayList<>();
4 private final boolean[] checkedOut;
5 private final Semaphore available;
6
7 public Pool(Class<T> classObject, int size) {
8 this.size = size;
9 checkedOut = new boolean[size];
10 available = new Semaphore(size, true);
11 // Load pool with objects that can be checked out:
12 for (int i = 0; i < size; ++i) {
13 try {
14 // Assumes a default constructor:
15 items.add(classObject.newInstance());
16 } catch (Exception e) {
17 throw new RuntimeException(e);
18 }
19 }
20 }
21
22 T checkOut() throws InterruptedException {
23 available.acquire();
24 return getItem();
25 }
26
27 void checkIn(T x) {
28 if (releaseItem(x)) {
29 available.release();
30 System.out.println("release " + x);
31 }
32 }
33
34 void checkAllIn() {
35 available.release(releaseAll());
36 }
37
38 private synchronized T getItem() {
39 for (int i = 0; i < size; ++i) {
40 if (!checkedOut[i]) {
41 checkedOut[i] = true;
42 return items.get(i);
43 }
44 }
45 // Semaphore prevents reaching here
46 return null;
47 }
48
49 private synchronized boolean releaseItem(T item) {
50 int index = items.indexOf(item);
51 if (index == -1) {
52 return false; // Not in the list
53 }
54 if (checkedOut[index]) {
55 checkedOut[index] = false;
56
57 return true;
58 }
59 // Wasn't checked out
60 return false;
61 }
62
63 private synchronized int releaseAll() {
64 int r = 0;
65 for (int i = 0; i < items.size(); i++) {
66 if (checkedOut[i]) {
67 checkedOut[i] = false;
68 ++r;
69 }
70 }
71 return r;
72 }
73}
这个池使用checkout
和checkIn
方法来签出和签入对象,在签出对象之前调用acquire()
,如果没有可用对象,那么checkOut
将阻塞。由于Semaphore的机制,checkOut
方法并不需要使用同步,但是getItem
方法则需要同步了,Semaphore协同多线程对资源的访问,但是并不能保证多线程对资源修改的并发安全,这是两回事3。checkIn
方法则判断给定对象是否被使用,是则签入之,否则不做任何操作,同样的,releaseItem
方法也需要使用同步。
The semaphore encapsulates the synchronization needed to restrict access to the pool, separately from any synchronization needed to maintain the consistency of the pool itself.
接下来我们可以测试这个池能否正常工作了:
1public class SemaphoreDemo {
2
3 private static class AcquireTask<T> implements Runnable {
4 private static int counter = 0;
5 private final int id = counter++;
6 private final Pool<T> pool;
7
8 public AcquireTask(Pool<T> pool) {
9 this.pool = pool;
10 }
11
12 @Override
13 public void run() {
14 try {
15 T item = pool.checkOut();
16 System.out.println(this + " acquire " + item);
17 } catch (InterruptedException e) {
18 // Acceptable way to terminate
19 }
20 }
21
22 @Override
23 public String toString() {
24 return "CheckoutTask-" + id;
25 }
26 }
27
28 private static class ReleaseTask<T> implements Runnable {
29 private static int counter = 0;
30 private final int id = counter++;
31 private final Pool<T> pool;
32
33 public ReleaseTask(Pool<T> pool) {
34 this.pool = pool;
35 }
36
37 @Override
38 public void run() {
39 try {
40 List<T> items = pool.items;
41 for (T item : items) {
42 pool.checkIn(item);
43 }
44 } catch (Exception e) {
45 // Acceptable way to terminate
46 }
47 }
48
49 @Override
50 public String toString() {
51 return "AcquireTask-" + id + " ";
52 }
53 }
54
55 private static class Fat {
56 private volatile double d; // Prevent optimization
57 private static int counter = 0;
58 private final int id = counter++;
59
60 public Fat() {
61 // Expensive, interruptible operation:
62 for (int i = 1; i < 10000; i++) {
63 d += (Math.PI + Math.E) / (double) i;
64 }
65 }
66
67 @Override
68 public String toString() {
69 return "Fat-" + id;
70 }
71 }
72
73 final static int SIZE = 5;
74
75 private void test() throws InterruptedException {
76 final Pool<Fat> pool = new Pool<>(Fat.class, SIZE);
77 ExecutorService exec = Executors.newCachedThreadPool();
78 for (int i = 0; i < SIZE; i++) {
79 exec.execute(new AcquireTask<>(pool));
80 }
81 exec.execute(new ReleaseTask<>(pool));
82 List<Fat> list = new ArrayList<>();
83 for (int i = 0; i < SIZE; i++) {
84 Fat f = pool.checkOut();
85 System.out.println(i + ": main() acquire " + f);
86 list.add(f);
87 }
88 Future<?> blocked = exec.submit(() -> {
89 try {
90 // Semaphore prevents additional checkout,
91 // so call is blocked:
92 pool.checkOut();
93 } catch (InterruptedException e) {
94 System.out.println("checkOut() Interrupted");
95 }
96 });
97 TimeUnit.SECONDS.sleep(2);
98 blocked.cancel(true); // Break out of blocked call
99 // release all items
100 pool.checkAllIn();
101 for (Fat f : list) {
102 pool.checkIn(f); // Second checkIn ignored
103 }
104 exec.shutdown();
105 }
106
107 public static void main(String[] args) throws Exception {
108 SemaphoreDemo semaphoreDemo = new SemaphoreDemo();
109 semaphoreDemo.test();
110 }
111}
112/* output(sample)
113AcquireTask-0 acquire Fat-0
114AcquireTask-4 acquire Fat-4
115AcquireTask-3 acquire Fat-3
116AcquireTask-2 acquire Fat-2
117AcquireTask-1 acquire Fat-1
118release Fat-0
1190: main() acquire Fat-0
120release Fat-1
1211: main() acquire Fat-1
122release Fat-2
1232: main() acquire Fat-2
124release Fat-3
1253: main() acquire Fat-3
126release Fat-4
1274: main() acquire Fat-4
128checkOut() Interrupted
129*///:~
上例SemaphoreDemo
有两个任务,分别用于签入签出对象,程序首先使用AcquireTask
签出所有对象,接着使用ReleaseTask
签入对象。主线程接着依次签出所有对象,可以看到,主线程的签出过程是被阻塞的,只有对象签入之后,才能被签出。主线程签出所有对象之后,由于没有签入任务,接着的签出任务一定是被阻塞的,主线程休眠2s后中断了阻塞的任务。
并不能保证先调用
acquire()
方法的线程就能先获得许可,而是先调用方法的线程先执行内部逻辑的线程优先获取许可。所以有可能线程a先于线程b调用acquire()
方法,但是却晚于线程b到达“等待点”。 ↩︎这个示例演化自
Semaphore
的javaDoc: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Semaphore.html,来自TIJ。 ↩︎如果使用是个“许可证数”为1的
Semaphore
,其作用相当于一个独占锁,任意时刻只有一个任务能够获取许可并且对资源进行修改,此时,getItem
方法可以不使用同步。 ↩︎