并发

Semaphore

Semaphore #

无论是显式锁还是通过synchronized关键字获取的隐式锁,其在任一时刻都只能让一个任务访问资源,而Semaphore(计数信号量)允许多个任务同时访问资源。可以把Semaphore看作是持有对象访问许可(permits)的“security”。访问对象时,须先通过acquire()获取许可,若此时没有许可可用,那么acquire()将阻塞,否则获取许可,可用许可数-1;使用完资源后,通过release()方法返还许可。事实上,并没有实际上的许可证对象,Semaphore通过协同各个线程工作,来达到目的。

Semaphore的构造器接受一个“公平性参数”。不传入此参数或传入false时,线程获取许可的顺序无法保证,即使线程阻塞了很久,其仍然可能被刚调用acquire()方法的线程“抢走”许可,这可能会导致线程“饿死”。当传入true时,Semaphore保证线程获取许可的顺序和其调用acquire()方法之后被执行的顺序一致1,也就是先执行的任务先获取许可(FIFO)。需要说明的是,tryAcquire()方法不遵循公平性原则,如果有许可可用,它直接获取之。在使用Semaphore时,一般将其设置为公平

Semaphore通常用于限制访问资源的线程数量,典型的例子就是控制“池”的并发访问量。下例中使用Semaphore控制池中的对象方法,当需要使用时,可以将它们“签出”(checkout),使用完毕之后再将其“签入”(checkin),使用泛型类封装功能2

 1class Pool<T> {
 2    private final int size;
 3    final List<T> items = new ArrayList<>();
 4    private final boolean[] checkedOut;
 5    private final Semaphore available;
 6
 7    public Pool(Class<T> classObject, int size) {
 8        this.size = size;
 9        checkedOut = new boolean[size];
10        available = new Semaphore(size, true);
11        // Load pool with objects that can be checked out:
12        for (int i = 0; i < size; ++i) {
13            try {
14                // Assumes a default constructor:
15                items.add(classObject.newInstance());
16            } catch (Exception e) {
17                throw new RuntimeException(e);
18            }
19        }
20    }
21
22    T checkOut() throws InterruptedException {
23        available.acquire();
24        return getItem();
25    }
26
27    void checkIn(T x) {
28        if (releaseItem(x)) {
29            available.release();
30            System.out.println("release " + x);
31        }
32    }
33
34    void checkAllIn() {
35        available.release(releaseAll());
36    }
37
38    private synchronized T getItem() {
39        for (int i = 0; i < size; ++i) {
40            if (!checkedOut[i]) {
41                checkedOut[i] = true;
42                return items.get(i);
43            }
44        }
45        // Semaphore prevents reaching here
46        return null;
47    }
48
49    private synchronized boolean releaseItem(T item) {
50        int index = items.indexOf(item);
51        if (index == -1) {
52            return false; // Not in the list
53        }
54        if (checkedOut[index]) {
55            checkedOut[index] = false;
56
57            return true;
58        }
59        // Wasn't checked out
60        return false;
61    }
62
63    private synchronized int releaseAll() {
64        int r = 0;
65        for (int i = 0; i < items.size(); i++) {
66            if (checkedOut[i]) {
67                checkedOut[i] = false;
68                ++r;
69            }
70        }
71        return r;
72    }
73}

这个池使用checkoutcheckIn方法来签出和签入对象,在签出对象之前调用acquire(),如果没有可用对象,那么checkOut将阻塞。由于Semaphore的机制,checkOut方法并不需要使用同步,但是getItem方法则需要同步了,Semaphore协同多线程对资源的访问,但是并不能保证多线程对资源修改的并发安全,这是两回事3checkIn方法则判断给定对象是否被使用,是则签入之,否则不做任何操作,同样的,releaseItem方法也需要使用同步。

...

Exchanger

Exchanger #

Exchanger是在两个任务之间交换对象的栅栏。当这些任务进入栅栏时,各自拥有一个对象,离开时交换它们拥有的对象。栅栏可以用来设计缓存对象,2个任务分别来使用和清空缓存,当缓存空间满时,则在Exchanger上交换缓存,缓存得以重复使用1

...

PriorityBlockingQueue and DelayQueue

PriorityBlockingQueue #

PriorityBlockingQueue就是一个基础的可阻塞的 优先级队列,当队列为空时,从队列中获取元素时被阻塞。其余特性和优先级队列是一致的。

下例展示了如何构建一个可以放入优先级队列的任务:

 1public class PrioritizedTask implements Runnable, Comparable<PrioritizedTask> {
 2    protected static List<PrioritizedTask> sequence = new ArrayList<>();
 3    private Random rand = new Random(47);
 4    private static int counter = 0;
 5    private final int id = counter++;
 6    private final int priority;
 7
 8
 9    public PrioritizedTask(int priority) {
10        this.priority = priority;
11        sequence.add(this);
12    }
13
14    @Override
15    public int compareTo(PrioritizedTask arg) {
16        return priority < arg.priority ? 1 :
17            (priority > arg.priority ? -1 : 0);
18    }
19
20    @Override
21    public void run() {
22        try {
23            TimeUnit.MILLISECONDS.sleep(rand.nextInt(250));
24        } catch (InterruptedException e) {
25            // Acceptable way to exit
26        }
27        System.out.println(this);
28    }
29
30    @Override
31    public String toString() {
32        return String.format("[%1$-3d]", priority) +
33            " Task " + id;
34    }
35
36    public String summary() {
37        return "(" + id + ":" + priority + ")";
38    }
39
40    public static class EndSentinel extends PrioritizedTask {
41        private ExecutorService exec;
42
43        public EndSentinel(ExecutorService e) {
44            super(-1); // Lowest priority in this program
45            exec = e;
46        }
47
48        @Override
49        public void run() {
50            int count = 0;
51            for (PrioritizedTask pt : sequence) {
52                System.out.print(pt.summary());
53                if (++count % 5 == 0)
54                    System.out.println();
55            }
56            System.out.println();
57            System.out.println(this + " Calling shutdownNow()");
58            exec.shutdownNow();
59        }
60    }
61}

PrioritizedTask实现了Runnable和Comparable接口,有一个int型priority域,用来表示任务的优先级,在compareTo方法中的逻辑表示,优先级高的将会优先出队。其还有一个静态域,用来记录所有任务被置入队列的顺序。PrioritizedTask有一个静态内部类,也是其子类,它被称作“结束哨兵”,它的优先级为-1,代表它会最后出队,当执行这个任务时,代表任务所有的任务执行完毕,可以关闭线程池资源。

...