Exchanger
Exchanger #
Exchanger
是在两个任务之间交换对象的栅栏。当这些任务进入栅栏时,各自拥有一个对象,离开时交换它们拥有的对象。栅栏可以用来设计缓存对象,2个任务分别来使用和清空缓存,当缓存空间满时,则在Exchanger上交换缓存,缓存得以重复使用1。
1public class DataBuffer<T> {
2
3 private Queue<T> buffer;
4 /** 利用size构造一个有界队列 */
5 private final int size;
6
7 public DataBuffer(Class<? extends Queue<T>> cls, int size) throws Exception {
8 this(cls, size, null);
9 }
10
11 public DataBuffer(Class<? extends Queue<T>> cls, int size, Generator<T> gen) throws Exception {
12 if (cls == null) throw new NullPointerException();
13 // 检查cls的类型,如果不是队列,则抛出异常
14 if (!Queue.class.isAssignableFrom(cls)) throw new ClassCastException();
15 if (size < 0) throw new IllegalArgumentException();
16 this.size = size;
17 try {
18 Constructor<? extends Queue<T>> c = cls.getConstructor(int.class);
19 c.setAccessible(true);
20 this.buffer = c.newInstance(size);
21 } catch (NoSuchMethodException | SecurityException | InvocationTargetException e) {
22 this.buffer = cls.newInstance();
23 }
24
25 if (gen != null) {
26 for (int i = 0; i < size; i++)
27 buffer.offer(gen.next());
28 }
29 }
30
31 synchronized boolean isFull() {
32 return buffer.size() >= size;
33 }
34
35 synchronized boolean isEmpty() {
36 return buffer.isEmpty();
37 }
38
39 synchronized int bufferSize() {
40 return buffer.size();
41 }
42
43 synchronized public Queue<T> getBuffer() {
44 return buffer;
45 }
46
47 synchronized boolean addToBuffer(T t) {
48 if (!isFull()) {
49 return buffer.offer(t);
50 }
51 return false;
52 }
53
54 synchronized T takeFromBuffer() {
55 if (!isEmpty()) {
56 buffer.remove();
57 }
58 return null;
59 }
60}
DataBuffer
接受一个Queue<T>
类型参数,用来初始化缓存队列,并且利用size指定了缓存队列的容量,作为是“达到栅栏”的前置条件。
1public class BufferSwap {
2
3 private class FillTask<T> implements Runnable {
4 private DataBuffer<T> db;
5 private final Exchanger<DataBuffer<T>> ex;
6 private final Generator<T> gen;
7
8 public FillTask(DataBuffer<T> db, Generator<T> gen, Exchanger<DataBuffer<T>> ex) {
9 this.db = db;
10 this.gen = gen;
11 this.ex = ex;
12 }
13
14 @Override
15 public void run() {
16 try {
17 while (db != null) {
18 if (db.isFull()) {
19 db = ex.exchange(db);
20 } else {
21 db.addToBuffer(gen.next());
22 }
23 }
24 } catch (InterruptedException e) {
25 // right to exit here
26 }
27 }
28 }
29
30 private class EmptyTask<T> implements Runnable {
31
32 private DataBuffer<T> db;
33 private final Exchanger<DataBuffer<T>> ex;
34 private final int ecLimit;
35
36 public EmptyTask(DataBuffer<T> db, Exchanger<DataBuffer<T>> ex, int limit) {
37 this.db = db;
38 this.ex = ex;
39 this.ecLimit = limit;
40 }
41
42 @Override
43 public void run() {
44 try {
45 while (ec.intValue() < ecLimit) {
46 if (db.isEmpty()) {
47 db = ex.exchange(db);
48 ec.incrementAndGet();
49 } else {
50 db.takeFromBuffer();
51 }
52 }
53 } catch (InterruptedException e) {
54 // exit by interrupted
55 }
56 }
57 }
58
59 /** 交换缓存的次数,用来限制程序的运行 */
60 private final AtomicInteger ec = new AtomicInteger();
61
62 /**
63 * @param size the buffer size
64 * @param limit the exchange time limit
65 */
66 void test(int size, int limit) {
67 Exchanger<DataBuffer<Fat>> xh = new Exchanger<>();
68 Generator<Fat> generator = BasicGenerator.create(Fat.class);
69 // ignore class check
70 // can not solve the issue actually...
71 DataBuffer<Fat> fullBuffer, emptyBuffer;
72 try {
73 fullBuffer = new DataBuffer(ArrayBlockingQueue.class, size, generator);
74 emptyBuffer = new DataBuffer(ArrayBlockingQueue.class, size);
75 } catch (Exception e) {
76 System.out.println("initialization failure");
77 return;
78 }
79 ExecutorService pool = Executors.newCachedThreadPool();
80 Future<?> t1 = pool.submit(this.new FillTask(fullBuffer, generator, xh));
81 Future<?> done = pool.submit(this.new EmptyTask<>(emptyBuffer, xh, limit));
82 for (; ; ) {
83 if (done.isDone()) {
84 t1.cancel(true);
85 break;
86 }
87 }
88 pool.shutdown();
89 Queue<Fat> full = fullBuffer.getBuffer();
90 System.out.print("fullTask's buffer: ");
91 for (Fat fat : full) {
92 System.out.printf("%s\t", fat);
93 }
94 System.out.println();
95 System.ocvnut.println("++++++++++++++++++++++++++++++++");
96 Queue<Fat> empty = emptyBuffer.getBuffer();
97 System.out.print("emptyTask's buffer:");
98 for (Fat fat : empty) {
99 System.out.printf("%s\t", fat);
100 }
101 }
102
103 public static void main(String[] args) {
104 BufferSwap bs = new BufferSwap();
105 bs.test(10, 100);
106 }
107}
108/* output
109fillTask's buffer: Fat-1000 Fat-1001 Fat-1002 Fat-1003 Fat-1004 Fat-1005 Fat-1006 Fat-1007 Fat-1008 Fat-1009
110++++++++++++++++++++++++++++++++
111emptyTask's buffer: Fat-990 Fat-991 Fat-992 Fat-993 Fat-994 Fat-995 Fat-996 Fat-997 Fat-998 Fat-999
112*///:~
BufferSwap
中有2个任务,FillTask
用来使用缓存,当缓存队列未满时,一直向缓存中添加对象,一旦缓存已满,则进入“栅栏”;而EmptyTask
用来清空已满的缓存队列,知道缓存队列为空进入”栅栏”,同时为了限制缓存交换的次数,我们在缓存交换达到限制时停止EmptyTask
。在test()
方法中,我们初始化了2个缓存对象fullBuffer
和emptyBuffer
,前者会初始化一个满的缓存,后者则会初始化一个空的缓存。本例中传入的类型参数是ArrayBlockingQueue.class
,并且忽略了类型检查2。
之后提交这2个缓存任务,使用Future<?>
来检查EmptyTask
的状态并适时取消FillTask
。这样做时可行的,因为FillTask
一定会在最后一次交换之后继续使用而占满缓存空间进入“栅栏”处阻塞,使用Future.cancel()
可以中断其阻塞并抛出中断异常,从而结束运行。随后重看2个任务阻塞队列中的对象,输出符合期望3。
这个示例演化自
Exchanger
的javaDoc:https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Exchanger.html。 ↩︎忽略类型检查的原因是因为尚不能处理泛型编程的所有问题。理论上这里传入任意Queue
实现类都是可以的,但是由于示例中所用的实例Fat并没有实现Comparable接口,所以当传入优先级队列时,构造器会抛出初始化异常。 ↩︎ 这里还存在一个潜在问题:
EmptyTask
完成时取消FillTask
,FillTask
的状态会影响程序的结果,若后者是在Exchanger处被阻塞时取消,那将抛出中断异常,程序输出如示例中说的那样;若后者在向缓存中添加对象时被中断,shutdown()
方法无法立刻中止FillTask
的运行,它将继续运行至进入栅栏而抛出异常,但是,主线程中的遍历(在使用普通队列时)就可能会抛出ConcorrentModificationException。解决此问题的方法是在FillTask
中分别处理2种取消的情况,或者在主线程中使用awaitTermination
等待FillTask
抛出异常而终结。 ↩︎