当前位置: 首页 > article >正文

多线程+Condition 对象模拟生产者/消费者问题

实验目的与要求

(1)了解生产者/消费者问题。

(2)熟练使用 Python 标准库 threading 创建线程。

(3)熟练使用 Python 标准库 threading 中的 Condition 对象实现线程同步。

(4)理解使用列表模拟缓冲区的方法。

(5)理解缓冲区的重要性。

实验原理与内容

      1、利用安装部署好的开发环境模拟多线程+Condition 对象模拟生产者/消费者问题。

2、编写程序,创建生产者线程和消费者线程以及大小为 5 的缓冲区。生产者每隔 1 至 3 秒钟就生产一个数字并放入缓冲区,如果缓冲区已满则等待;消费者每隔 1 至 3 秒就 从缓冲区里取出生产日期较早的数字进行消费,如果缓冲区已空就等待。

3、运行程序,观察并理解缓冲区内数字变化以及生产者和消费者线程之间的同步。

验设备与软件环境

1、 Windows win7 / win8 / win10 操作系统。

2、 Visual Studio / Pycharm 及以上

实验过程与结果

1.定义 Producer 类、Consumer 类;

Producer类:

class Producer:

 def __init__(self, name):

        self.name = name



    def produce(self, item):

        print(f"{self.name} is producing {item}")



# 示例用法

producer1 = Producer("Producer 1")

producer1.produce("Product A")
Consumer类:

class Consumer:

    def __init__(self, name, age):

        self.name = name

        self.age = age



    def greet(self):

        print(f"Hello, my name is {self.name} and I'm {self.age} years old.")



# 示例用法

if __name__ == "__main__":

    # 创建一个 Consumer 实例

    consumer1 = Consumer("Alice", 30)

    # 调用 greet 方法

    consumer1.greet()

2.构造 run 函数,使用 sleep()、randint()函数来控制生产/消费的速度。

Producer类
import threading
import time
from random import randint

class Producer(threading.Thread):
    def __init__(self, buffer, max_items):
        threading.Thread.__init__(self)
        self.buffer = buffer
        self.max_items = max_items

    def run(self):
        while True:
            # 生成一个随机数作为生产的物品
            item = randint(1, 100)
            
            # 模拟生产者生产物品的速度
            time.sleep(randint(1, 3))
            
            # 尝试向缓冲区中添加物品
            self.buffer.add_item(item)
            print(f"Produced item {item}. Buffer: {self.buffer}")
            
            # 如果达到了最大物品数量,则结束生产
            if self.buffer.size() >= self.max_items:
                print("Producer finished producing.")
                break

class Buffer:
    def __init__(self):
        self. Items = []
 def add_item(self, item):
        self.items.append(item)

    def size(self):
        return len(self.items)

if __name__ == "__main__":
    buffer = Buffer()
    max_items = 10

    producer = Producer(buffer, max_items)
    producer.start()

Consumer类
import time
from random import randint

class Consumer:
    def __init__(self, name):
        self.name = name

    def run(self):
        while True:
            # 模拟消费过程
            print(f"{self.name} is consuming...")
            time.sleep(randint(1, 3))  # 随机休眠时间,模拟不同速度的消费
            print(f"{self.name} has finished consuming.")

3、在代码的后面还设置了停止线程。

import threading
import time
import random

class Buffer:
    def __init__(self, capacity):
        self.capacity = capacity
        self.items = []
        self.lock = threading.Lock()

    def produce(self):
        while True:
            with self.lock:
                if len(self.items) < self.capacity:
                    item = random.randint(1, 100)
                    self.items.append(item)
                    print(f"Produced: {item}")
            time.sleep(random.randint(1, 3))

    def consume(self):
while True:
            with self.lock:
                if self.items:
                    item = self.items.pop(0)
                    print(f"Consumed: {item}")
            time.sleep(random.randint(1, 3))

    def stop_threads(self):
        self.running = False

buffer = Buffer(capacity=5)

producer_thread = threading.Thread(target=buffer.produce)
consumer_thread = threading.Thread(target=buffer.consume)

producer_thread.start()
consumer_thread.start()

time.sleep(30)  # 让线程运行一段时间
buffer.stop_threads()
producer_thread.join()
consumer_thread.join()

4、调试及运行已设计编写好的代码,具体如调试运行所示。

import threading
import time
import random

class Buffer:
    def __init__(self, capacity):
        self.capacity = capacity
        self. Items = []
self.lock = threading.Lock()

    def produce(self):
        while True:
            with self.lock:
                if len(self.items) < self.capacity:
                    item = random.randint(1, 100)
                    self.items.append(item)
                    print(f"Produced: {item}")
            time.sleep(random.randint(1, 3))

    def consume(self):
        while True:
            with self.lock:
                if self.items:
                    item = self.items.pop(0)
                    print(f"Consumed: {item}")
            time.sleep(random.randint(1, 3))

    def stop_threads(self):
        self.running = False

buffer = Buffer(capacity=5)

producer_thread = threading.Thread(target=buffer.produce)
consumer_thread = threading.Thread(target=buffer.consume)

producer_thread.start()
consumer_thread.start()

time.sleep(30)  # 让线程运行一段时间
buffer.stop_threads()
producer_thread.join()
consumer_thread.join()

http://www.kler.cn/a/470260.html

相关文章:

  • 大风车excel:怎么把题库导入excel?题库导入excel
  • 设计模式——泛型单例类
  • asammdf python库解析MF4文件(一)cut and filter
  • 【数据可视化】数据可视化看板需求梳理模板(含示例)
  • 20241230 AI智能体-用例学习(LlamaIndex/Ollama)
  • 【踩坑指南2.0 2025最新】Scala中如何在命令行传入参数以运行主函数
  • 【亲测有效】Kafka3.5.0分布式集群安装部署与测试-最新
  • 带内管理和带外管理
  • 【ACM出版 | 高录用 |快检索】2025年第二届机器学习与神经网络国际学术会议(MLNN 2025)
  • 前后端分离架构设计与实现:构建现代Web应用的基石
  • 《机器学习》——逻辑回归(过采样)
  • 机器翻译
  • [ECCV 2018]Receptive Field Block Net for Accurate and Fast Object Detection
  • 【python如何使用随机模块】
  • RabbitMQ端口操作
  • 相机镜头竞品选型的主要参考参数和选型方法
  • 第4章:Go语言面向对象编程
  • 下载b站高清视频
  • 字玩FontPlayer开发笔记8 Tauri2文件系统
  • Opencv查找、绘制轮廓、圆形矩形轮廓和近似轮廓
  • ffmpeg八大开发库
  • 深入理解 pytest_runtest_makereport:如何在 pytest 中自定义测试报告
  • OKHttp调用第三方接口,响应转string报错okhttp3.internal.http.RealResponseBody@4a3d0218
  • 平安产险安徽分公司携手安徽中医药临床研究中心附属医院 共筑儿童安全防护网
  • SQLark:高效数据库连接管理的新篇章
  • 懒人不下床型遥控方案--手机对电脑的简单遥控(无收费方案)