线程-条件
1# author: wangy
2# date: 2024/8/8
3# description: 条件 使用"守护线程"结束线程的运行
4
5
6"""
7再讨论下汽车打蜡-抛光的问题
8"""
9from threading import Thread, current_thread, Condition
10
11buffered = False
12waxed = True
13condition = Condition()
14
15
16def wax_on():
17 """
18 打蜡
19 """
20 global waxed, buffered
21 with condition:
22 while True:
23 if not buffered:
24 # print(f"{current_thread().name}: waiting for buffering")
25 condition.wait()
26 buffered = False
27 waxed = True
28 time.sleep(1)
29 print(f"{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}"
30 f": {current_thread().name}")
31 condition.notify()
32
33
34def buffer_on():
35 """
36 抛光
37 """
38 global buffered, waxed
39 with condition:
40 while True:
41 if not waxed:
42 # print(f"{current_thread().name}: waiting for waxing")
43 condition.wait()
44 buffered = True
45 waxed = False
46 time.sleep(1)
47 print(f"{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}"
48 f": {current_thread().name}")
49 condition.notify()
50
51
52def car_polish():
53 wax_t = Thread(target=wax_on, args=(), daemon=True)
54 wax_t.start()
55 buffer_t = Thread(target=buffer_on, args=(), daemon=True)
56 buffer_t.start()
57 # 这里不要join了, 线程是死循环, join会让线程一直运行
58 # wax_t.join()
59 # buffer_t.join()
60
61
62# ########################## #
63# 利用锁和条件实现一个阻塞队列 #
64# ########################## #
65
66class BlockQueue:
67 """
68 A FIFO blocking queue implemented by list
69 """
70
71 def __init__(self, size):
72 self.__threshold = size
73 self.__condition = Condition()
74 self.__queue = []
75 self.size = len(self.__queue)
76
77 def put(self, item):
78 with self.__condition:
79 while self.is_full():
80 print(f"{current_thread().name}: waiting for buffering")
81 self.__condition.wait()
82 continue
83 self.__queue.append(item)
84 self.size += 1
85 print(f"{current_thread().name}: >>>> {item}, queue size: {self.size}")
86 self.__condition.notify_all()
87
88 def get(self):
89 with self.__condition:
90 while self.is_empty():
91 print(f"{current_thread().name}: waiting for item in queue")
92 self.__condition.wait()
93 continue
94 pop = self.__queue.pop(0)
95 self.size -= 1
96 print(f"{current_thread().name}: <<<< {pop}, queue size: {self.size}")
97 self.__condition.notify_all()
98
99 def is_full(self):
100 return self.size == self.__threshold
101
102 def is_empty(self):
103 return self.size == 0
104
105 def print_queue(self):
106 print(self.__queue)
107
108 def size(self):
109 return self.size
110
111
112queue = BlockQueue(1) # 容量为1的阻塞队列
113
114
115def in_queue(q):
116 # 格式化浮点数
117 while True:
118 random_uniform = "{:.2f}".format(random.uniform(1, 10))
119 q.put(random_uniform)
120 time.sleep(1)
121
122
123def out_queue(q):
124 while True:
125 q.get()
126 time.sleep(2)
127
128
129def in_out_queue_thread(func1, func2):
130 # 粗暴地使用守护线程,
131 # 方便主线程结束时, 也结束子线程
132 in_t = [Thread(target=func1, name=f"in_t{i}", args=(queue,), daemon=True)
133 for i in range(1)]
134 out_t = [Thread(target=func2, name=f"ou_t{i}", args=(queue,), daemon=True)
135 for i in range(1)]
136 for to in in_t:
137 to.start()
138 for ti in out_t:
139 ti.start()
140
141
142if __name__ == '__main__':
143 car_polish()
144 # in_out_queue_thread(in_queue, out_queue)
145 time.sleep(10)
146 sys.exit(-1)