线程-条件

线程-条件


  1# author: wangy
  2# date: 2024/8/8
  3# description: 条件 使用"守护线程"结束线程的运行
  4
  5
  6"""
  7再讨论下汽车打蜡-抛光的问题
  8"""
  9from threading import Thread, current_thread, Condition
 10
 11buffered = False
 12waxed = True
 13condition = Condition()
 14
 15
 16def wax_on():
 17    """
 18    打蜡
 19    """
 20    global waxed, buffered
 21    with condition:
 22        while True:
 23            if not buffered:
 24                # print(f"{current_thread().name}: waiting for buffering")
 25                condition.wait()
 26            buffered = False
 27            waxed = True
 28            time.sleep(1)
 29            print(f"{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}"
 30                  f": {current_thread().name}")
 31            condition.notify()
 32
 33
 34def buffer_on():
 35    """
 36    抛光
 37    """
 38    global buffered, waxed
 39    with condition:
 40        while True:
 41            if not waxed:
 42                # print(f"{current_thread().name}: waiting for waxing")
 43                condition.wait()
 44            buffered = True
 45            waxed = False
 46            time.sleep(1)
 47            print(f"{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}"
 48                  f": {current_thread().name}")
 49            condition.notify()
 50
 51
 52def car_polish():
 53    wax_t = Thread(target=wax_on, args=(), daemon=True)
 54    wax_t.start()
 55    buffer_t = Thread(target=buffer_on, args=(), daemon=True)
 56    buffer_t.start()
 57    # 这里不要join了, 线程是死循环, join会让线程一直运行
 58    # wax_t.join()
 59    # buffer_t.join()
 60
 61
 62# ########################## #
 63#   利用锁和条件实现一个阻塞队列  #
 64# ########################## #
 65
 66class BlockQueue:
 67    """
 68    A FIFO blocking queue implemented by list
 69    """
 70
 71    def __init__(self, size):
 72        self.__threshold = size
 73        self.__condition = Condition()
 74        self.__queue = []
 75        self.size = len(self.__queue)
 76
 77    def put(self, item):
 78        with self.__condition:
 79            while self.is_full():
 80                print(f"{current_thread().name}: waiting for buffering")
 81                self.__condition.wait()
 82                continue
 83            self.__queue.append(item)
 84            self.size += 1
 85            print(f"{current_thread().name}: >>>> {item}, queue size: {self.size}")
 86            self.__condition.notify_all()
 87
 88    def get(self):
 89        with self.__condition:
 90            while self.is_empty():
 91                print(f"{current_thread().name}: waiting for item in queue")
 92                self.__condition.wait()
 93                continue
 94            pop = self.__queue.pop(0)
 95            self.size -= 1
 96            print(f"{current_thread().name}: <<<< {pop}, queue size: {self.size}")
 97            self.__condition.notify_all()
 98
 99    def is_full(self):
100        return self.size == self.__threshold
101
102    def is_empty(self):
103        return self.size == 0
104
105    def print_queue(self):
106        print(self.__queue)
107
108    def size(self):
109        return self.size
110
111
112queue = BlockQueue(1)  # 容量为1的阻塞队列
113
114
115def in_queue(q):
116    # 格式化浮点数
117    while True:
118        random_uniform = "{:.2f}".format(random.uniform(1, 10))
119        q.put(random_uniform)
120        time.sleep(1)
121
122
123def out_queue(q):
124    while True:
125        q.get()
126        time.sleep(2)
127
128
129def in_out_queue_thread(func1, func2):
130    # 粗暴地使用守护线程,
131    # 方便主线程结束时, 也结束子线程
132    in_t = [Thread(target=func1, name=f"in_t{i}", args=(queue,), daemon=True)
133            for i in range(1)]
134    out_t = [Thread(target=func2, name=f"ou_t{i}", args=(queue,), daemon=True)
135             for i in range(1)]
136    for to in in_t:
137        to.start()
138    for ti in out_t:
139        ti.start()
140
141
142if __name__ == '__main__':
143    car_polish()
144    # in_out_queue_thread(in_queue, out_queue)
145    time.sleep(10)
146    sys.exit(-1)