阻塞队列的使用2例

阻塞队列的使用2例

上一篇文章介绍了juc的几种主要阻塞队列。

本文使用2个例子,演示了阻塞队列在Java中的应用。

查找关键字 #

下面的示例从目录及其子目录中查找指定关键字的文件并列出关键字所在的行的信息。我们使用阻塞队列存放目录及其子目录中所有文件,并且使用2个任务分别添加文件和查找文件。

  1public class SearchKeyword {
  2
  3    private static final int FILE_QUEUE_SIZE = 10;
  4    private static final int SEARCH_THREADS = 100;
  5    private static final File DUMMY = new File("");
  6    /**有界阻塞队列*/
  7    private final BlockingQueue<File> queue = new ArrayBlockingQueue<>(FILE_QUEUE_SIZE);
  8    private final static String DIR = "src";
  9    private String keyword;
 10    private volatile boolean done = false;
 11
 12    public static void main(String[] args) {
 13        SearchKeyword sk = new SearchKeyword();
 14        sk.test();
 15    }
 16
 17    void test() {
 18        // 带资源的try块
 19        try (Scanner in = new Scanner(System.in)) {
 20            System.out.print("Enter keyword (e.g. volatile): ");
 21            keyword = in.nextLine();
 22
 23            Producer p = new Producer();
 24            Consumer c = new Consumer();
 25
 26            ExecutorService pool = Executors.newCachedThreadPool();
 27
 28            pool.execute(p);
 29
 30            for (int i = 1; i <= SEARCH_THREADS; i++) {
 31                // run consumer
 32                pool.execute(c);
 33            }
 34            pool.shutdown();
 35        }
 36    }
 37
 38    class Producer implements Runnable {
 39
 40        @Override
 41        public void run() {
 42            try {
 43                enumerate(new File(DIR));
 44                // 空文件作为结束符
 45                queue.put(DUMMY);
 46            } catch (InterruptedException e) {
 47                // ignore
 48            }
 49        }
 50    }
 51
 52    class Consumer implements Runnable {
 53
 54        @Override
 55        public void run() {
 56            try {
 57                while (!done) {
 58                    File file = queue.take();
 59                    if (file == DUMMY) {
 60                        done = true;
 61                    } else {
 62                        search(file, keyword);
 63                    }
 64//                  Thread.yield();
 65                }
 66            } catch (Exception e) {
 67                // ignore
 68            }
 69        }
 70    }
 71
 72    /**
 73     * Recursively enumerates all files in a given directory and its subdirectories.
 74     *
 75     * @param directory the directory in which to start
 76     */
 77    public void enumerate(File directory) throws InterruptedException {
 78        File[] files = directory.listFiles();
 79        for (File file : files) {
 80            if (file.isDirectory()) {
 81                enumerate(file);
 82            } else {
 83                queue.put(file);
 84            }
 85        }
 86    }
 87
 88    /**
 89     * Searches a file for a given keyword and prints all matching lines.
 90     *
 91     * @param file    the file to search
 92     * @param keyword the keyword to search for
 93     */
 94    public void search(File file, String keyword) throws IOException {
 95        try (Scanner in = new Scanner(file, "UTF-8")) {
 96            int lineNumber = 0;
 97            while (in.hasNextLine()) {
 98                lineNumber++;
 99                String line = in.nextLine();
100                if (line.contains(keyword)) {
101                    System.out.printf("[%s] %s:%d:%s%n", Thread.currentThread().getName(), file.getPath(), lineNumber, line);
102                }
103            }
104        }
105    }
106}

上例中用于存放文件的是有界的阻塞队列实现,并且代码没有任何的显式同步控制,程序是线程安全的,这就是阻塞队列在处理生产——消费模型时的优势。

事实上,我们无需关注队列中元素的插入/移除、以及put/take方法的阻塞情况,阻塞队列会处理好一切。不过,我们可以简单分析程序可能的运行过程:

  • 若p任务一直占用cpu时间,那么队列很快将到达容量上限,put方法阻塞
  • 此时c任务获得cpu时间及锁,并且能够顺利的移除元素,此时take方法唤醒put方法
  • 但是put方法并没有获取锁,c任务继续执行,由于c任务有很多线程,队列中的元素很快被消耗完,所有执行c任务的线程take方法阻塞
  • 此时p任务重新获得锁,put方法插入元素后唤醒take方法,c任务得以继续执行
  • ...
  • 插入dummy之后p任务完成
  • c任务的任一线程读取到dummy之后修改修改标记变量并在下一次循环退出
  • 其他执行c任务的线程读取到标记量并相继退出

实际上程序运行的过程比上面的阐述要复杂的多,不过需要理解的就是阻塞队列在队列满或空的情况下的阻塞是被相互唤醒的。

面包工厂的阻塞链 #

⚠️此节的内容关于阻塞链的描述部分可能有部分错误。

假设一个面包工厂有两个加工线,分别加工黄油面包和果酱面包,现在将面包工厂作为生产者,另外我们需要一个消费者,来看看每次都会吃到什么口味的面包

  1public class ToastFactory {
  2    private volatile int count;
  3
  4    static class Toast {
  5        enum Status {DRY, BUTTERED, JAMMED}
  6
  7        private Status status = Status.DRY;
  8        private final int id;
  9
 10        public Toast(int idn) {
 11            id = idn;
 12        }
 13
 14        public void butter() {
 15            status = Status.BUTTERED;
 16        }
 17
 18        public void jam() {
 19            status = Status.JAMMED;
 20        }
 21
 22        public Status getStatus() {
 23            return status;
 24        }
 25
 26        public int getId() {
 27            return id;
 28        }
 29
 30        @Override
 31        public String toString() {
 32            return "Toast " + id + ": " + status;
 33        }
 34    }
 35
 36    class ToastQueue extends LinkedBlockingQueue<Toast> {
 37    }
 38
 39    class Toaster implements Runnable {
 40        private ToastQueue rawQueue;
 41
 42
 43        public Toaster(ToastQueue tq) {
 44            rawQueue = tq;
 45        }
 46
 47        @Override
 48        public void run() {
 49            try {
 50                while (!Thread.interrupted()) {
 51                    TimeUnit.MILLISECONDS.sleep(100);
 52                    // Make toast
 53                    Toast t = new Toast(count++);
 54                    System.out.println(t);
 55                    // Insert into queue
 56                    rawQueue.put(t);
 57                }
 58                System.out.println("Toaster off");
 59            } catch (InterruptedException e) {
 60                System.out.println("Toaster interrupted");
 61            }
 62        }
 63    }
 64
 65    /** Apply butter to toast: */
 66    class Butterer implements Runnable {
 67        private ToastQueue dryQueue, finishQueue;
 68
 69        public Butterer(ToastQueue dry, ToastQueue buttered) {
 70            dryQueue = dry;
 71            finishQueue = buttered;
 72        }
 73
 74        @Override
 75        public void run() {
 76            try {
 77                while (!Thread.interrupted()) {
 78                    // Blocks until next piece of toast is available:
 79                    Toast t = dryQueue.take();
 80                    t.butter();
 81                    System.out.println(t);
 82                    finishQueue.put(t);
 83                }
 84                System.out.println("Butterer off");
 85            } catch (InterruptedException e) {
 86                System.out.println("Butterer interrupted");
 87            }
 88        }
 89    }
 90
 91    /** Apply jam to buttered toast: */
 92    class Jammer implements Runnable {
 93        private ToastQueue dryQueue, finishQueue;
 94
 95        public Jammer(ToastQueue raw, ToastQueue jam) {
 96            dryQueue = raw;
 97            finishQueue = jam;
 98        }
 99
100        @Override
101        public void run() {
102            try {
103                while (!Thread.interrupted()) {
104                    // Blocks until next piece of toast is available:
105                    Toast t = dryQueue.take();
106                    t.jam();
107                    System.out.println(t);
108                    finishQueue.put(t);
109                }
110                System.out.println("Jammer off");
111            } catch (InterruptedException e) {
112                System.out.println("Jammer interrupted");
113            }
114        }
115    }
116
117    /** Consume the toast: */
118    class Eater implements Runnable {
119        private ToastQueue finishQueue;
120        private int counter = 0;
121
122        public Eater(ToastQueue finishQueue) {
123            this.finishQueue = finishQueue;
124        }
125
126        @Override
127        public void run() {
128            try {
129                while (!Thread.interrupted()) {
130                    // Blocks until next piece of toast is available:
131                    Toast toast = finishQueue.take();
132                    // Verify that the toast is coming in order,
133                    // and that all pieces are getting jammed:
134                    if (toast.getId() != counter++ || toast.getStatus() == Toast.Status.DRY) {
135                        System.out.println(">>>> Error: " + toast);
136                        System.exit(1);
137                    } else {
138                        System.out.println("Chomp! " + toast);
139                    }
140                }
141                System.out.println("Eater off");
142            } catch (InterruptedException e) {
143                System.out.println("Eater interrupted");
144            }
145        }
146    }
147
148
149    public void test() throws InterruptedException {
150        ToastQueue dryQueue = this.new ToastQueue(),
151            finishQueue = this.new ToastQueue();
152        ExecutorService exec = Executors.newCachedThreadPool();
153        exec.execute(this.new Toaster(dryQueue));
154        exec.execute(this.new Butterer(dryQueue, finishQueue));
155        exec.execute(this.new Jammer(dryQueue, finishQueue));
156        exec.execute(this.new Eater(finishQueue));
157
158        while (true) {
159            if (count > 4) {
160                break;
161            }
162        }
163        exec.shutdownNow();
164        System.out.println("toast count: " + count);
165    }
166
167    public static void main(String[] args) throws Exception {
168        ToastFactory tf = new ToastFactory();
169        tf.test();
170    }
171}
172/*
173Toast 0: DRY
174Toast 0: BUTTERED
175Chomp! Toast 0: BUTTERED
176Toast 1: DRY
177Toast 1: JAMMED
178Chomp! Toast 1: JAMMED
179Toast 2: DRY
180Toast 2: BUTTERED
181Chomp! Toast 2: BUTTERED
182Toast 3: DRY
183Toast 3: JAMMED
184Chomp! Toast 3: JAMMED
185Toast 4: DRY
186Toast 4: BUTTERED
187Chomp! Toast 4: BUTTERED
188toast count: 5
189Eater interrupted
190Jammer interrupted
191Butterer interrupted
192Toaster interrupted
193*///:~

上例有4个任务,分别为生产干面包(记为T1),生产黄油面包(记为T2),生产果酱面包(记为T3),消费面包(记为T4)。黄油/果酱面包只能由干面包加工而成,而T4只能消费加工好的面包

graph LR
A[开始] --> B(干面包T1)
    B-- 黄油T2 -->D[生产完成]
    B-- 果酱T3 -->D
    D-- 消费T4 -->E[结束]

程序执行流程

从执行流程上来看,T1会阻塞T2和T3,而T2和T3会阻塞T4,而T4会阻塞T1,这样形成了一个阻塞链,从输出来看也正是如此,面包的生产和消费是有序的:被涂上黄油的面包0被消费,接着是被涂上果酱的面包1被消费...等等如此有规律的输出。

仔细想想,这种规律是怎么得到保证的呢?

从代码来看, 程序使用了2个阻塞队列:rawQueue和finishQueue分别表示干面包和加工完成的面包(黄油/果酱),程序运行时,T1, T2,T3,T4全部是RUNNABLE状态。由于采用的实现是LinkedBlockingQueue,所以rowQueue的put(e)方法无法被阻塞,单从这一点看,就不能保证得到代码示例中的规律输出,此外,T2/T3会随机争用rowQueue的take锁,所以面包被涂黄油还是果酱是无法确定的,完全由cpu随机调度,因此也不可能出现上述示例的规律输出,至于T4就更不用说了,由于T2/T3的随机争用,那么T4的if判断必然会出现错误,从而退出程序,符合逻辑的输出应该是向下面这样的(当然,把主线程的count判断值改大以观察效果):

 1/*
 2...
 3Chomp! Toast 51: BUTTERED
 4Toast 54: BUTTERED
 5Toast 59: DRY
 6Toast 56: BUTTERED
 7>>>> Error: Toast 53: BUTTERED
 8Toast 55: JAMMED
 9Toast 57: BUTTERED
10Toast 59: BUTTERED
11...
12*/

既然是rowQueue的put(e)方法无法被阻塞导致的问题,那么使用指定容量为1的ArrayBlockingQueue是否可以满足规律输出呢?

遗憾的是,也不行1

  1class ToastQueue extends ArrayBlockingQueue<Toast>{
  2
  3   public ToastQueue(int capacity) {
  4       super(capacity);
  5   }
  6}
  7
  8class Toaster implements Runnable {
  9   private ToastQueue rawQueue;
 10
 11
 12   public Toaster(ToastQueue tq) {
 13       rawQueue = tq;
 14   }
 15
 16   @Override
 17   public void run() {
 18       try {
 19           while (!Thread.interrupted()) {
 20               // 这句休眠是保证阻塞链的根本
 21//                TimeUnit.MILLISECONDS.sleep(100);
 22               // Make toast
 23               Toast t = new Toast(count++);
 24               rawQueue.put(t);
 25           }
 26           System.out.println("Toaster off");
 27       } catch (InterruptedException e) {
 28           System.out.println("Toaster interrupted");
 29       }
 30   }
 31}
 32
 33/** Apply butter to toast: */
 34class Butterer implements Runnable {
 35   private ToastQueue dryQueue, finishQueue;
 36
 37   public Butterer(ToastQueue dry, ToastQueue buttered) {
 38       dryQueue = dry;
 39       finishQueue = buttered;
 40   }
 41
 42   @Override
 43   public void run() {
 44       try {
 45           while (!Thread.interrupted()) {
 46               // Blocks until next piece of toast is available:
 47               Toast t = dryQueue.take();
 48               t.butter();
 49               finishQueue.put(t);
 50           }
 51           System.out.println("Butterer off");
 52       } catch (InterruptedException e) {
 53           System.out.println("Butterer interrupted");
 54       }
 55   }
 56}
 57
 58/** Apply jam to buttered toast: */
 59class Jammer implements Runnable {
 60   private ToastQueue dryQueue, finishQueue;
 61
 62   public Jammer(ToastQueue raw, ToastQueue jam) {
 63       dryQueue = raw;
 64       finishQueue = jam;
 65   }
 66
 67   @Override
 68   public void run() {
 69       try {
 70           while (!Thread.interrupted()) {
 71               // Blocks until next piece of toast is available:
 72               Toast t = dryQueue.take();
 73               t.jam();
 74               finishQueue.put(t);
 75           }
 76           System.out.println("Jammer off");
 77       } catch (InterruptedException e) {
 78           System.out.println("Jammer interrupted");
 79       }
 80   }
 81}
 82
 83/** Consume the toast: */
 84class Eater implements Runnable {
 85   private ToastQueue finishQueue;
 86   private int counter = 0;
 87
 88   public Eater(ToastQueue finishQueue) {
 89       this.finishQueue = finishQueue;
 90   }
 91
 92   @Override
 93   public void run() {
 94       try {
 95           while (!Thread.interrupted()) {
 96               // Blocks until next piece of toast is available:
 97               Toast toast = finishQueue.take();
 98               System.out.println("Chomp! " + toast);
 99           }
100           System.out.println("Eater off");
101       } catch (InterruptedException e) {
102           System.out.println("Eater interrupted");
103       }
104   }
105}
106
107
108public void test() throws InterruptedException {
109   ToastQueue dryQueue = this.new ToastQueue(1),
110       finishQueue = this.new ToastQueue(1);
111   ExecutorService exec = Executors.newCachedThreadPool();
112   exec.execute(this.new Toaster(dryQueue));
113   exec.execute(this.new Butterer(dryQueue, finishQueue));
114   exec.execute(this.new Jammer(dryQueue, finishQueue));
115   exec.execute(this.new Eater(finishQueue));
116
117   while (true) {
118       if (count > 14) {
119           break;
120       }
121   }
122   exec.shutdownNow();
123   System.out.println("toast count: " + count);
124}
125
126public static void main(String[] args) throws Exception {
127   ToastFactory tf = new ToastFactory();
128   tf.test();
129}
130/* output (partial sample)
131...
132Chomp! Toast 18: JAMMED
133Chomp! Toast 20: JAMMED
134Chomp! Toast 19: BUTTERED
135Chomp! Toast 22: BUTTERED
136Chomp! Toast 21: JAMMED
137Chomp! Toast 24: JAMMED
138Eater off
139Butterer interrupted
140toast count: 28
141Toaster interrupted
142Jammer interrupted
143*///:~

可以看到,还是T2/T3的争用问题没有解决,T1的阻塞之后,T2/T3获得运行权之后将面包放入finishQueue时又存在争用情况,尽管大多数情况下都是有序的,但是也存在少数情况下的乱序问题。

同时,上述代码还暴露了一个问题: volatile变量的局限性,程序计划生产14块面包后结束,而最后的面包数却到了28!主线程和T1对共享变量count进行修改,应该使用同步2

实际上,在T1任务开始时使用 休眠来降低面包生产的速度,这样当程序运行时,T1处于休眠状态,/T2/T3/T4都是处于阻塞状态,这和前面讨论的无规律输出是完全不同的局面;当T1休眠超时之后,生产第一片面包并唤醒一个在rawQueue上等待的任务(可能是T2或T3)后又进入休眠(100ms),此时(假如)T2被唤醒,那么T2加工面包之后唤醒T4并随即进入等待(T1任务100ms的休眠足够长时间让rawQueue为空),T4完成之后随即进入等待(同理,100ms足够长),这样就完成了一轮规律输出3

1/*
2Toast 0: DRY
3Toast 0: BUTTERED
4Chomp! Toast 0: BUTTERED
5*/

值得一提的是,关于上面提到的共享变量,并没有使用同步,但是却 意外地 没有出现问题2。这确实令人意外,明明是不满足happens-before原则的,但是却没有出现讹误(或许是测试少,讹误没有发生)。原因就出现在T1的休眠上,由于T1的休眠,T1有足够的时间来接收主线程“滞后”的中断命令,因此看起来就像是主线程的判断没有逻辑上的缺陷一样。

这是我见过的最强休眠。



  1. 这个代码还存在共享资源的访问讹误问题。 ↩︎

  2. 这是由于在前文中提到的,在使用ArrayBlockingQueue测试时,volatile关键字的局限性显现时意识到的。将count设置为volatile,并且只有线程T1在对其进行修改,主线程读取count的值作为任务中断的依据,看起来似乎不需要额外的同步,即可不出现讹误,但是却出现了。实际上,虽然保证了可见性,但是没有保证有序性,即对count的判断和对count的修改不满足happens-before原则,只有当对count值的读取总是发生在对count值的修改之前时,主线程中对count值的判断逻辑才是可行的,事实上主线程中对count值的判断总是滞后于修改的。 ↩︎ ↩︎

  3. 看起来100ms的休眠好像是一个不太安全的机制,因为不能保证100ms的时间T4一定在T1休眠的时间内完成任务并进入等待。但是在测试过程中将休眠时间设置为1ns(Java能够设置的最小休眠时间),仍然得到了规律输出,这一点让人费解。 ↩︎