多线程+Condition 对象模拟生产者/消费者问题
实验目的与要求
(1)了解生产者/消费者问题。
(2)熟练使用 Python 标准库 threading 创建线程。
(3)熟练使用 Python 标准库 threading 中的 Condition 对象实现线程同步。
(4)理解使用列表模拟缓冲区的方法。
(5)理解缓冲区的重要性。
实验原理与内容
1、利用安装部署好的开发环境模拟多线程+Condition 对象模拟生产者/消费者问题。
2、编写程序,创建生产者线程和消费者线程以及大小为 5 的缓冲区。生产者每隔 1 至 3 秒钟就生产一个数字并放入缓冲区,如果缓冲区已满则等待;消费者每隔 1 至 3 秒就 从缓冲区里取出生产日期较早的数字进行消费,如果缓冲区已空就等待。
3、运行程序,观察并理解缓冲区内数字变化以及生产者和消费者线程之间的同步。
实验设备与软件环境
1、 Windows win7 / win8 / win10 操作系统。
2、 Visual Studio / Pycharm 及以上
实验过程与结果
1.定义 Producer 类、Consumer 类;
Producer类:
class Producer:
def __init__(self, name):
self.name = name
def produce(self, item):
print(f"{self.name} is producing {item}")
# 示例用法
producer1 = Producer("Producer 1")
producer1.produce("Product A")
Consumer类:
class Consumer:
def __init__(self, name, age):
self.name = name
self.age = age
def greet(self):
print(f"Hello, my name is {self.name} and I'm {self.age} years old.")
# 示例用法
if __name__ == "__main__":
# 创建一个 Consumer 实例
consumer1 = Consumer("Alice", 30)
# 调用 greet 方法
consumer1.greet()
2.构造 run 函数,使用 sleep()、randint()函数来控制生产/消费的速度。
Producer类
import threading
import time
from random import randint
class Producer(threading.Thread):
def __init__(self, buffer, max_items):
threading.Thread.__init__(self)
self.buffer = buffer
self.max_items = max_items
def run(self):
while True:
# 生成一个随机数作为生产的物品
item = randint(1, 100)
# 模拟生产者生产物品的速度
time.sleep(randint(1, 3))
# 尝试向缓冲区中添加物品
self.buffer.add_item(item)
print(f"Produced item {item}. Buffer: {self.buffer}")
# 如果达到了最大物品数量,则结束生产
if self.buffer.size() >= self.max_items:
print("Producer finished producing.")
break
class Buffer:
def __init__(self):
self. Items = []
def add_item(self, item):
self.items.append(item)
def size(self):
return len(self.items)
if __name__ == "__main__":
buffer = Buffer()
max_items = 10
producer = Producer(buffer, max_items)
producer.start()
Consumer类
import time
from random import randint
class Consumer:
def __init__(self, name):
self.name = name
def run(self):
while True:
# 模拟消费过程
print(f"{self.name} is consuming...")
time.sleep(randint(1, 3)) # 随机休眠时间,模拟不同速度的消费
print(f"{self.name} has finished consuming.")
3、在代码的后面还设置了停止线程。
import threading
import time
import random
class Buffer:
def __init__(self, capacity):
self.capacity = capacity
self.items = []
self.lock = threading.Lock()
def produce(self):
while True:
with self.lock:
if len(self.items) < self.capacity:
item = random.randint(1, 100)
self.items.append(item)
print(f"Produced: {item}")
time.sleep(random.randint(1, 3))
def consume(self):
while True:
with self.lock:
if self.items:
item = self.items.pop(0)
print(f"Consumed: {item}")
time.sleep(random.randint(1, 3))
def stop_threads(self):
self.running = False
buffer = Buffer(capacity=5)
producer_thread = threading.Thread(target=buffer.produce)
consumer_thread = threading.Thread(target=buffer.consume)
producer_thread.start()
consumer_thread.start()
time.sleep(30) # 让线程运行一段时间
buffer.stop_threads()
producer_thread.join()
consumer_thread.join()
4、调试及运行已设计编写好的代码,具体如调试运行所示。
import threading
import time
import random
class Buffer:
def __init__(self, capacity):
self.capacity = capacity
self. Items = []
self.lock = threading.Lock()
def produce(self):
while True:
with self.lock:
if len(self.items) < self.capacity:
item = random.randint(1, 100)
self.items.append(item)
print(f"Produced: {item}")
time.sleep(random.randint(1, 3))
def consume(self):
while True:
with self.lock:
if self.items:
item = self.items.pop(0)
print(f"Consumed: {item}")
time.sleep(random.randint(1, 3))
def stop_threads(self):
self.running = False
buffer = Buffer(capacity=5)
producer_thread = threading.Thread(target=buffer.produce)
consumer_thread = threading.Thread(target=buffer.consume)
producer_thread.start()
consumer_thread.start()
time.sleep(30) # 让线程运行一段时间
buffer.stop_threads()
producer_thread.join()
consumer_thread.join()