Exchanger

Exchanger

Exchanger #

Exchanger是在两个任务之间交换对象的栅栏。当这些任务进入栅栏时,各自拥有一个对象,离开时交换它们拥有的对象。栅栏可以用来设计缓存对象,2个任务分别来使用和清空缓存,当缓存空间满时,则在Exchanger上交换缓存,缓存得以重复使用1

 1public class DataBuffer<T> {
 2
 3    private Queue<T> buffer;
 4    /** 利用size构造一个有界队列 */
 5    private final int size;
 6
 7    public DataBuffer(Class<? extends Queue<T>> cls, int size) throws Exception {
 8        this(cls, size, null);
 9    }
10
11    public DataBuffer(Class<? extends Queue<T>> cls, int size, Generator<T> gen) throws Exception {
12        if (cls == null) throw new NullPointerException();
13        // 检查cls的类型,如果不是队列,则抛出异常
14        if (!Queue.class.isAssignableFrom(cls)) throw new ClassCastException();
15        if (size < 0) throw new IllegalArgumentException();
16        this.size = size;
17        try {
18            Constructor<? extends Queue<T>> c = cls.getConstructor(int.class);
19            c.setAccessible(true);
20            this.buffer = c.newInstance(size);
21        } catch (NoSuchMethodException | SecurityException | InvocationTargetException e) {
22            this.buffer = cls.newInstance();
23        }
24
25        if (gen != null) {
26            for (int i = 0; i < size; i++)
27                buffer.offer(gen.next());
28        }
29    }
30
31    synchronized boolean isFull() {
32        return buffer.size() >= size;
33    }
34
35    synchronized boolean isEmpty() {
36        return buffer.isEmpty();
37    }
38
39    synchronized int bufferSize() {
40        return buffer.size();
41    }
42
43    synchronized public Queue<T> getBuffer() {
44        return buffer;
45    }
46
47    synchronized boolean addToBuffer(T t) {
48        if (!isFull()) {
49            return buffer.offer(t);
50        }
51        return false;
52    }
53
54    synchronized T takeFromBuffer() {
55        if (!isEmpty()) {
56            buffer.remove();
57        }
58        return null;
59    }
60}

DataBuffer接受一个Queue<T>类型参数,用来初始化缓存队列,并且利用size指定了缓存队列的容量,作为是“达到栅栏”的前置条件。

  1public class BufferSwap {
  2
  3    private class FillTask<T> implements Runnable {
  4        private DataBuffer<T> db;
  5        private final Exchanger<DataBuffer<T>> ex;
  6        private final Generator<T> gen;
  7
  8        public FillTask(DataBuffer<T> db, Generator<T> gen, Exchanger<DataBuffer<T>> ex) {
  9            this.db = db;
 10            this.gen = gen;
 11            this.ex = ex;
 12        }
 13
 14        @Override
 15        public void run() {
 16            try {
 17                while (db != null) {
 18                    if (db.isFull()) {
 19                        db = ex.exchange(db);
 20                    } else {
 21                        db.addToBuffer(gen.next());
 22                    }
 23                }
 24            } catch (InterruptedException e) {
 25                // right to exit here
 26            }
 27        }
 28    }
 29
 30    private class EmptyTask<T> implements Runnable {
 31
 32        private DataBuffer<T> db;
 33        private final Exchanger<DataBuffer<T>> ex;
 34        private final int ecLimit;
 35
 36        public EmptyTask(DataBuffer<T> db, Exchanger<DataBuffer<T>> ex, int limit) {
 37            this.db = db;
 38            this.ex = ex;
 39            this.ecLimit = limit;
 40        }
 41
 42        @Override
 43        public void run() {
 44            try {
 45                while (ec.intValue() < ecLimit) {
 46                    if (db.isEmpty()) {
 47                        db = ex.exchange(db);
 48                        ec.incrementAndGet();
 49                    } else {
 50                        db.takeFromBuffer();
 51                    }
 52                }
 53            } catch (InterruptedException e) {
 54                // exit by interrupted
 55            }
 56        }
 57    }
 58
 59    /** 交换缓存的次数,用来限制程序的运行 */
 60    private final AtomicInteger ec = new AtomicInteger();
 61
 62    /**
 63     * @param size  the buffer size
 64     * @param limit the exchange time limit
 65     */
 66    void test(int size, int limit) {
 67        Exchanger<DataBuffer<Fat>> xh = new Exchanger<>();
 68        Generator<Fat> generator = BasicGenerator.create(Fat.class);
 69        // ignore class check
 70        // can not solve the issue actually...
 71        DataBuffer<Fat> fullBuffer, emptyBuffer;
 72        try {
 73            fullBuffer = new DataBuffer(ArrayBlockingQueue.class, size, generator);
 74            emptyBuffer = new DataBuffer(ArrayBlockingQueue.class, size);
 75        } catch (Exception e) {
 76            System.out.println("initialization failure");
 77            return;
 78        }
 79        ExecutorService pool = Executors.newCachedThreadPool();
 80        Future<?> t1 = pool.submit(this.new FillTask(fullBuffer, generator, xh));
 81        Future<?> done = pool.submit(this.new EmptyTask<>(emptyBuffer, xh, limit));
 82        for (; ; ) {
 83            if (done.isDone()) {
 84                t1.cancel(true);
 85                break;
 86            }
 87        }
 88        pool.shutdown();
 89        Queue<Fat> full = fullBuffer.getBuffer();
 90        System.out.print("fullTask's buffer: ");
 91        for (Fat fat : full) {
 92            System.out.printf("%s\t", fat);
 93        }
 94        System.out.println();
 95        System.ocvnut.println("++++++++++++++++++++++++++++++++");
 96        Queue<Fat> empty = emptyBuffer.getBuffer();
 97        System.out.print("emptyTask's buffer:");
 98        for (Fat fat : empty) {
 99            System.out.printf("%s\t", fat);
100        }
101    }
102
103    public static void main(String[] args) {
104        BufferSwap bs = new BufferSwap();
105        bs.test(10, 100);
106    }
107}
108/* output
109fillTask's buffer: Fat-1000	Fat-1001	Fat-1002	Fat-1003	Fat-1004	Fat-1005	Fat-1006	Fat-1007	Fat-1008	Fat-1009
110++++++++++++++++++++++++++++++++
111emptyTask's buffer: Fat-990	Fat-991	Fat-992	Fat-993	Fat-994	Fat-995	Fat-996	Fat-997	Fat-998	Fat-999
112*///:~

BufferSwap中有2个任务,FillTask用来使用缓存,当缓存队列未满时,一直向缓存中添加对象,一旦缓存已满,则进入“栅栏”;而EmptyTask用来清空已满的缓存队列,知道缓存队列为空进入”栅栏”,同时为了限制缓存交换的次数,我们在缓存交换达到限制时停止EmptyTask。在test()方法中,我们初始化了2个缓存对象fullBufferemptyBuffer,前者会初始化一个满的缓存,后者则会初始化一个空的缓存。本例中传入的类型参数是ArrayBlockingQueue.class,并且忽略了类型检查2

之后提交这2个缓存任务,使用Future<?>来检查EmptyTask的状态并适时取消FillTask。这样做时可行的,因为FillTask一定会在最后一次交换之后继续使用而占满缓存空间进入“栅栏”处阻塞,使用Future.cancel()可以中断其阻塞并抛出中断异常,从而结束运行。随后重看2个任务阻塞队列中的对象,输出符合期望3


  1. 这个示例演化自Exchanger的javaDoc:https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Exchanger.html。 ↩︎

  2. 忽略类型检查的原因是因为尚不能处理泛型编程的所有问题。理论上这里传入任意Queue实现类都是可以的,但是由于示例中所用的实例Fat并没有实现Comparable接口,所以当传入优先级队列时,构造器会抛出初始化异常。 ↩︎

  3. 这里还存在一个潜在问题:EmptyTask完成时取消FillTaskFillTask的状态会影响程序的结果,若后者是在Exchanger处被阻塞时取消,那将抛出中断异常,程序输出如示例中说的那样;若后者在向缓存中添加对象时被中断,shutdown()方法无法立刻中止FillTask的运行,它将继续运行至进入栅栏而抛出异常,但是,主线程中的遍历(在使用普通队列时)就可能会抛出ConcorrentModificationException。解决此问题的方法是在FillTask中分别处理2种取消的情况,或者在主线程中使用awaitTermination等待FillTask抛出异常而终结。 ↩︎