阻塞队列的使用2例
上一篇文章介绍了juc的几种主要阻塞队列。
本文使用2个例子,演示了阻塞队列在Java中的应用。
查找关键字 #
下面的示例从目录及其子目录中查找指定关键字的文件并列出关键字所在的行的信息。我们使用阻塞队列存放目录及其子目录中所有文件,并且使用2个任务分别添加文件和查找文件。
1public class SearchKeyword {
2
3 private static final int FILE_QUEUE_SIZE = 10;
4 private static final int SEARCH_THREADS = 100;
5 private static final File DUMMY = new File("");
6 /**有界阻塞队列*/
7 private final BlockingQueue<File> queue = new ArrayBlockingQueue<>(FILE_QUEUE_SIZE);
8 private final static String DIR = "src";
9 private String keyword;
10 private volatile boolean done = false;
11
12 public static void main(String[] args) {
13 SearchKeyword sk = new SearchKeyword();
14 sk.test();
15 }
16
17 void test() {
18 // 带资源的try块
19 try (Scanner in = new Scanner(System.in)) {
20 System.out.print("Enter keyword (e.g. volatile): ");
21 keyword = in.nextLine();
22
23 Producer p = new Producer();
24 Consumer c = new Consumer();
25
26 ExecutorService pool = Executors.newCachedThreadPool();
27
28 pool.execute(p);
29
30 for (int i = 1; i <= SEARCH_THREADS; i++) {
31 // run consumer
32 pool.execute(c);
33 }
34 pool.shutdown();
35 }
36 }
37
38 class Producer implements Runnable {
39
40 @Override
41 public void run() {
42 try {
43 enumerate(new File(DIR));
44 // 空文件作为结束符
45 queue.put(DUMMY);
46 } catch (InterruptedException e) {
47 // ignore
48 }
49 }
50 }
51
52 class Consumer implements Runnable {
53
54 @Override
55 public void run() {
56 try {
57 while (!done) {
58 File file = queue.take();
59 if (file == DUMMY) {
60 done = true;
61 } else {
62 search(file, keyword);
63 }
64// Thread.yield();
65 }
66 } catch (Exception e) {
67 // ignore
68 }
69 }
70 }
71
72 /**
73 * Recursively enumerates all files in a given directory and its subdirectories.
74 *
75 * @param directory the directory in which to start
76 */
77 public void enumerate(File directory) throws InterruptedException {
78 File[] files = directory.listFiles();
79 for (File file : files) {
80 if (file.isDirectory()) {
81 enumerate(file);
82 } else {
83 queue.put(file);
84 }
85 }
86 }
87
88 /**
89 * Searches a file for a given keyword and prints all matching lines.
90 *
91 * @param file the file to search
92 * @param keyword the keyword to search for
93 */
94 public void search(File file, String keyword) throws IOException {
95 try (Scanner in = new Scanner(file, "UTF-8")) {
96 int lineNumber = 0;
97 while (in.hasNextLine()) {
98 lineNumber++;
99 String line = in.nextLine();
100 if (line.contains(keyword)) {
101 System.out.printf("[%s] %s:%d:%s%n", Thread.currentThread().getName(), file.getPath(), lineNumber, line);
102 }
103 }
104 }
105 }
106}
上例中用于存放文件的是有界的阻塞队列实现,并且代码没有任何的显式同步控制,程序是线程安全的,这就是阻塞队列在处理生产——消费模型时的优势。
事实上,我们无需关注队列中元素的插入/移除、以及put/take方法的阻塞情况,阻塞队列会处理好一切。不过,我们可以简单分析程序可能的运行过程:
- 若p任务一直占用cpu时间,那么队列很快将到达容量上限,put方法阻塞
- 此时c任务获得cpu时间及锁,并且能够顺利的移除元素,此时take方法唤醒put方法
- 但是put方法并没有获取锁,c任务继续执行,由于c任务有很多线程,队列中的元素很快被消耗完,所有执行c任务的线程take方法阻塞
- 此时p任务重新获得锁,put方法插入元素后唤醒take方法,c任务得以继续执行
- ...
- 插入dummy之后p任务完成
- c任务的任一线程读取到dummy之后修改修改标记变量并在下一次循环退出
- 其他执行c任务的线程读取到标记量并相继退出
实际上程序运行的过程比上面的阐述要复杂的多,不过需要理解的就是阻塞队列在队列满或空的情况下的阻塞是被相互唤醒的。
面包工厂的阻塞链 #
⚠️此节的内容关于阻塞链的描述部分可能有部分错误。
假设一个面包工厂有两个加工线,分别加工黄油面包和果酱面包,现在将面包工厂作为生产者,另外我们需要一个消费者,来看看每次都会吃到什么口味的面包
1public class ToastFactory {
2 private volatile int count;
3
4 static class Toast {
5 enum Status {DRY, BUTTERED, JAMMED}
6
7 private Status status = Status.DRY;
8 private final int id;
9
10 public Toast(int idn) {
11 id = idn;
12 }
13
14 public void butter() {
15 status = Status.BUTTERED;
16 }
17
18 public void jam() {
19 status = Status.JAMMED;
20 }
21
22 public Status getStatus() {
23 return status;
24 }
25
26 public int getId() {
27 return id;
28 }
29
30 @Override
31 public String toString() {
32 return "Toast " + id + ": " + status;
33 }
34 }
35
36 class ToastQueue extends LinkedBlockingQueue<Toast> {
37 }
38
39 class Toaster implements Runnable {
40 private ToastQueue rawQueue;
41
42
43 public Toaster(ToastQueue tq) {
44 rawQueue = tq;
45 }
46
47 @Override
48 public void run() {
49 try {
50 while (!Thread.interrupted()) {
51 TimeUnit.MILLISECONDS.sleep(100);
52 // Make toast
53 Toast t = new Toast(count++);
54 System.out.println(t);
55 // Insert into queue
56 rawQueue.put(t);
57 }
58 System.out.println("Toaster off");
59 } catch (InterruptedException e) {
60 System.out.println("Toaster interrupted");
61 }
62 }
63 }
64
65 /** Apply butter to toast: */
66 class Butterer implements Runnable {
67 private ToastQueue dryQueue, finishQueue;
68
69 public Butterer(ToastQueue dry, ToastQueue buttered) {
70 dryQueue = dry;
71 finishQueue = buttered;
72 }
73
74 @Override
75 public void run() {
76 try {
77 while (!Thread.interrupted()) {
78 // Blocks until next piece of toast is available:
79 Toast t = dryQueue.take();
80 t.butter();
81 System.out.println(t);
82 finishQueue.put(t);
83 }
84 System.out.println("Butterer off");
85 } catch (InterruptedException e) {
86 System.out.println("Butterer interrupted");
87 }
88 }
89 }
90
91 /** Apply jam to buttered toast: */
92 class Jammer implements Runnable {
93 private ToastQueue dryQueue, finishQueue;
94
95 public Jammer(ToastQueue raw, ToastQueue jam) {
96 dryQueue = raw;
97 finishQueue = jam;
98 }
99
100 @Override
101 public void run() {
102 try {
103 while (!Thread.interrupted()) {
104 // Blocks until next piece of toast is available:
105 Toast t = dryQueue.take();
106 t.jam();
107 System.out.println(t);
108 finishQueue.put(t);
109 }
110 System.out.println("Jammer off");
111 } catch (InterruptedException e) {
112 System.out.println("Jammer interrupted");
113 }
114 }
115 }
116
117 /** Consume the toast: */
118 class Eater implements Runnable {
119 private ToastQueue finishQueue;
120 private int counter = 0;
121
122 public Eater(ToastQueue finishQueue) {
123 this.finishQueue = finishQueue;
124 }
125
126 @Override
127 public void run() {
128 try {
129 while (!Thread.interrupted()) {
130 // Blocks until next piece of toast is available:
131 Toast toast = finishQueue.take();
132 // Verify that the toast is coming in order,
133 // and that all pieces are getting jammed:
134 if (toast.getId() != counter++ || toast.getStatus() == Toast.Status.DRY) {
135 System.out.println(">>>> Error: " + toast);
136 System.exit(1);
137 } else {
138 System.out.println("Chomp! " + toast);
139 }
140 }
141 System.out.println("Eater off");
142 } catch (InterruptedException e) {
143 System.out.println("Eater interrupted");
144 }
145 }
146 }
147
148
149 public void test() throws InterruptedException {
150 ToastQueue dryQueue = this.new ToastQueue(),
151 finishQueue = this.new ToastQueue();
152 ExecutorService exec = Executors.newCachedThreadPool();
153 exec.execute(this.new Toaster(dryQueue));
154 exec.execute(this.new Butterer(dryQueue, finishQueue));
155 exec.execute(this.new Jammer(dryQueue, finishQueue));
156 exec.execute(this.new Eater(finishQueue));
157
158 while (true) {
159 if (count > 4) {
160 break;
161 }
162 }
163 exec.shutdownNow();
164 System.out.println("toast count: " + count);
165 }
166
167 public static void main(String[] args) throws Exception {
168 ToastFactory tf = new ToastFactory();
169 tf.test();
170 }
171}
172/*
173Toast 0: DRY
174Toast 0: BUTTERED
175Chomp! Toast 0: BUTTERED
176Toast 1: DRY
177Toast 1: JAMMED
178Chomp! Toast 1: JAMMED
179Toast 2: DRY
180Toast 2: BUTTERED
181Chomp! Toast 2: BUTTERED
182Toast 3: DRY
183Toast 3: JAMMED
184Chomp! Toast 3: JAMMED
185Toast 4: DRY
186Toast 4: BUTTERED
187Chomp! Toast 4: BUTTERED
188toast count: 5
189Eater interrupted
190Jammer interrupted
191Butterer interrupted
192Toaster interrupted
193*///:~
上例有4个任务,分别为生产干面包(记为T1),生产黄油面包(记为T2),生产果酱面包(记为T3),消费面包(记为T4)。黄油/果酱面包只能由干面包加工而成,而T4只能消费加工好的面包
graph LR A[开始] --> B(干面包T1) B-- 黄油T2 -->D[生产完成] B-- 果酱T3 -->D D-- 消费T4 -->E[结束]
程序执行流程
从执行流程上来看,T1会阻塞T2和T3,而T2和T3会阻塞T4,而T4会阻塞T1,这样形成了一个阻塞链,从输出来看也正是如此,面包的生产和消费是有序的:被涂上黄油的面包0被消费,接着是被涂上果酱的面包1被消费...等等如此有规律的输出。
仔细想想,这种规律是怎么得到保证的呢?
从代码来看, 程序使用了2个阻塞队列:rawQueue和finishQueue分别表示干面包和加工完成的面包(黄油/果酱),程序运行时,T1, T2,T3,T4全部是RUNNABLE状态。由于采用的实现是LinkedBlockingQueue
,所以rowQueue的put(e)
方法无法被阻塞,单从这一点看,就不能保证得到代码示例中的规律输出,此外,T2/T3会随机争用rowQueue的take锁,所以面包被涂黄油还是果酱是无法确定的,完全由cpu随机调度,因此也不可能出现上述示例的规律输出,至于T4就更不用说了,由于T2/T3的随机争用,那么T4的if判断必然会出现错误,从而退出程序,符合逻辑的输出应该是向下面这样的(当然,把主线程的count判断值改大以观察效果):
1/*
2...
3Chomp! Toast 51: BUTTERED
4Toast 54: BUTTERED
5Toast 59: DRY
6Toast 56: BUTTERED
7>>>> Error: Toast 53: BUTTERED
8Toast 55: JAMMED
9Toast 57: BUTTERED
10Toast 59: BUTTERED
11...
12*/
既然是rowQueue的put(e)
方法无法被阻塞导致的问题,那么使用指定容量为1的ArrayBlockingQueue
是否可以满足规律输出呢?
遗憾的是,也不行1
1class ToastQueue extends ArrayBlockingQueue<Toast>{
2
3 public ToastQueue(int capacity) {
4 super(capacity);
5 }
6}
7
8class Toaster implements Runnable {
9 private ToastQueue rawQueue;
10
11
12 public Toaster(ToastQueue tq) {
13 rawQueue = tq;
14 }
15
16 @Override
17 public void run() {
18 try {
19 while (!Thread.interrupted()) {
20 // 这句休眠是保证阻塞链的根本
21// TimeUnit.MILLISECONDS.sleep(100);
22 // Make toast
23 Toast t = new Toast(count++);
24 rawQueue.put(t);
25 }
26 System.out.println("Toaster off");
27 } catch (InterruptedException e) {
28 System.out.println("Toaster interrupted");
29 }
30 }
31}
32
33/** Apply butter to toast: */
34class Butterer implements Runnable {
35 private ToastQueue dryQueue, finishQueue;
36
37 public Butterer(ToastQueue dry, ToastQueue buttered) {
38 dryQueue = dry;
39 finishQueue = buttered;
40 }
41
42 @Override
43 public void run() {
44 try {
45 while (!Thread.interrupted()) {
46 // Blocks until next piece of toast is available:
47 Toast t = dryQueue.take();
48 t.butter();
49 finishQueue.put(t);
50 }
51 System.out.println("Butterer off");
52 } catch (InterruptedException e) {
53 System.out.println("Butterer interrupted");
54 }
55 }
56}
57
58/** Apply jam to buttered toast: */
59class Jammer implements Runnable {
60 private ToastQueue dryQueue, finishQueue;
61
62 public Jammer(ToastQueue raw, ToastQueue jam) {
63 dryQueue = raw;
64 finishQueue = jam;
65 }
66
67 @Override
68 public void run() {
69 try {
70 while (!Thread.interrupted()) {
71 // Blocks until next piece of toast is available:
72 Toast t = dryQueue.take();
73 t.jam();
74 finishQueue.put(t);
75 }
76 System.out.println("Jammer off");
77 } catch (InterruptedException e) {
78 System.out.println("Jammer interrupted");
79 }
80 }
81}
82
83/** Consume the toast: */
84class Eater implements Runnable {
85 private ToastQueue finishQueue;
86 private int counter = 0;
87
88 public Eater(ToastQueue finishQueue) {
89 this.finishQueue = finishQueue;
90 }
91
92 @Override
93 public void run() {
94 try {
95 while (!Thread.interrupted()) {
96 // Blocks until next piece of toast is available:
97 Toast toast = finishQueue.take();
98 System.out.println("Chomp! " + toast);
99 }
100 System.out.println("Eater off");
101 } catch (InterruptedException e) {
102 System.out.println("Eater interrupted");
103 }
104 }
105}
106
107
108public void test() throws InterruptedException {
109 ToastQueue dryQueue = this.new ToastQueue(1),
110 finishQueue = this.new ToastQueue(1);
111 ExecutorService exec = Executors.newCachedThreadPool();
112 exec.execute(this.new Toaster(dryQueue));
113 exec.execute(this.new Butterer(dryQueue, finishQueue));
114 exec.execute(this.new Jammer(dryQueue, finishQueue));
115 exec.execute(this.new Eater(finishQueue));
116
117 while (true) {
118 if (count > 14) {
119 break;
120 }
121 }
122 exec.shutdownNow();
123 System.out.println("toast count: " + count);
124}
125
126public static void main(String[] args) throws Exception {
127 ToastFactory tf = new ToastFactory();
128 tf.test();
129}
130/* output (partial sample)
131...
132Chomp! Toast 18: JAMMED
133Chomp! Toast 20: JAMMED
134Chomp! Toast 19: BUTTERED
135Chomp! Toast 22: BUTTERED
136Chomp! Toast 21: JAMMED
137Chomp! Toast 24: JAMMED
138Eater off
139Butterer interrupted
140toast count: 28
141Toaster interrupted
142Jammer interrupted
143*///:~
可以看到,还是T2/T3的争用问题没有解决,T1的阻塞之后,T2/T3获得运行权之后将面包放入finishQueue
时又存在争用情况,尽管大多数情况下都是有序的,但是也存在少数情况下的乱序问题。
同时,上述代码还暴露了一个问题: volatile
变量的局限性,程序计划生产14块面包后结束,而最后的面包数却到了28!主线程和T1对共享变量count
进行修改,应该使用同步2。
实际上,在T1任务开始时使用 休眠来降低面包生产的速度,这样当程序运行时,T1处于休眠状态,/T2/T3/T4都是处于阻塞状态,这和前面讨论的无规律输出是完全不同的局面;当T1休眠超时之后,生产第一片面包并唤醒一个在rawQueue
上等待的任务(可能是T2或T3)后又进入休眠(100ms),此时(假如)T2被唤醒,那么T2加工面包之后唤醒T4并随即进入等待(T1任务100ms的休眠足够长时间让rawQueue
为空),T4完成之后随即进入等待(同理,100ms足够长),这样就完成了一轮规律输出3:
1/*
2Toast 0: DRY
3Toast 0: BUTTERED
4Chomp! Toast 0: BUTTERED
5*/
值得一提的是,关于上面提到的共享变量,并没有使用同步,但是却 意外地 没有出现问题2。这确实令人意外,明明是不满足happens-before原则的,但是却没有出现讹误(或许是测试少,讹误没有发生)。原因就出现在T1的休眠上,由于T1的休眠,T1有足够的时间来接收主线程“滞后”的中断命令,因此看起来就像是主线程的判断没有逻辑上的缺陷一样。
这是我见过的最强休眠。
这个代码还存在共享资源的访问讹误问题。 ↩︎
这是由于在前文中提到的,在使用
ArrayBlockingQueue
测试时,volatile关键字的局限性显现时意识到的。将count设置为volatile,并且只有线程T1在对其进行修改,主线程读取count的值作为任务中断的依据,看起来似乎不需要额外的同步,即可不出现讹误,但是却出现了。实际上,虽然保证了可见性,但是没有保证有序性,即对count的判断和对count的修改不满足happens-before原则,只有当对count值的读取总是发生在对count值的修改之前时,主线程中对count值的判断逻辑才是可行的,事实上主线程中对count值的判断总是滞后于修改的。 ↩︎ ↩︎看起来100ms的休眠好像是一个不太安全的机制,因为不能保证100ms的时间T4一定在T1休眠的时间内完成任务并进入等待。但是在测试过程中将休眠时间设置为1ns(Java能够设置的最小休眠时间),仍然得到了规律输出,这一点让人费解。 ↩︎