当前位置: 首页 > article >正文

[多线程]基于阻塞队列(Blocking Queue)的生产消费者模型的实现

标题:[多线程]基于阻塞队列(Blocking Queue)的生产消费者模型的实现
@水墨不写bug

在这里插入图片描述


文章目录

  • 一、生产者消费者模型特点:
  • 二、实现
  • 2.1详细解释
    • 1. 成员变量
    • 2. 构造函数
    • 3. `Isfull` 和 `Isempty`
    • 4. `Push` 函数
    • 5. `Pop` 函数
    • 6. 析构函数
    • 7. `GetSize` 函数
  • 三、总结与多线程分析
  • 四、生产消费模型的优势与分析


一下的代码实现了一个阻塞队列(BlockQueue),用于生产者-消费者模型的实践。该模型允许生产者和消费者并发地进行数据生产和消费操作。

一、生产者消费者模型特点:

1.一个交易场所(特定数据结构形式存在的一段内存空间)
2.两种角色(生产者、消费者—生产线程、消费线程)
3.三种关系(生产者之间、消费者之间、生产者和消费者之间的关系)

与阻塞队列结合后形成的特点:

  1. 同步生产和消费操作:在多线程环境下,生产者和消费者可以同时操作队列,但需要保证线程安全。
  2. 避免资源浪费:当队列满时,生产者需要等待;当队列空时,消费者需要等待,以此避免资源浪费和无效操作。
  3. 提高效率:通过设置高水位线和低水位线,适时唤醒多个生产者或消费者,提高生产和消费的效率。

二、实现

#ifndef __BLOCK_QUEUE__
#define __BLOCK_QUEUE__

#include <cstdlib>
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <queue>

using std::cout;
using std::endl;

// 阻塞队列的实现
// 生产消费模型的实践
// 生产和消费并发进行,3:三种关系(
//       当前实现是单生产,单消费,于是不需要考虑生产者与生产者之间、消费者与消费者之间的关系
//       仅仅考虑单个生产者和消费者之间的关系
// ——————当队列满,停止生产;队列为空,停止消费(线程同步)

// ) 2:两个角色(consumer,creator) 1:一个交易场所(queue)

template <class T>
class BlockQueue
{
private:
    // 判断队列是否已满
    bool Isfull()
    {
        return _queue.size() == _maxcap;
    }

    // 判断队列是否为空
    bool Isempty()
    {
        return _queue.empty();
    }

public:
    // 构造函数,初始化队列的最大容量和相关参数
    BlockQueue(int maxcap = 10)
        : _maxcap(maxcap), _queue(), _l_water(0), _h_water(maxcap), _call_num(maxcap / 2)
    {
        if (maxcap <= 4) // 数据总体个数太少,不在处理这些细节
        {
            _l_water = 0x3f3f3f3f; // 无穷大
            _h_water = -0x3f3f3f3f; // 无穷小
            _call_num = 1;
        }
        else
        {
            _l_water = maxcap / 4;
            _h_water = maxcap * 3 / 4;
        }

        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_p_cond, nullptr);
        pthread_cond_init(&_c_cond, nullptr);
    }

    // 生产者向队列中生产数据
    void Push(T &in)
    {
        pthread_mutex_lock(&_mutex);
        while (Isfull())
        // 队列满了,停止生产,在pcond条件变量下等待
        {
            pthread_cond_wait(&_p_cond, &_mutex);
        }
        // 走到这里,要么等待之后被唤醒,要么队列没有满,可以加入数据
        _queue.push(in);

        pthread_cond_signal(&_c_cond);

        // 数据较多,叫醒消费者来消费
        if (_queue.size() >= _h_water)
        {
            cout << "call lots comsumer" << endl;
            int t = _call_num;
            while (t--)
                pthread_cond_signal(&_c_cond);
        }

        pthread_mutex_unlock(&_mutex);
    }

    // 消费者拿队列内的数据
    void Pop(T *out)
    {
        pthread_mutex_lock(&_mutex);
        while (Isempty())
        // 队列空了,停止消费,在ccond条件变量下等待
        {
            pthread_cond_wait(&_c_cond, &_mutex);
        }
        // 走到这里,要么等待之后被唤醒,要么队列没有空,可以拿出数据
        *out = _queue.front();
        _queue.pop();

        pthread_cond_signal(&_p_cond);

        // 数据较少,叫醒生产者来生产
        if (_queue.size() <= _l_water)
        {
            cout << "call lots creator" << endl;

            int t = _call_num;
            while (t--)
                pthread_cond_signal(&_p_cond);
        }

        pthread_mutex_unlock(&_mutex);
    }

    // 析构函数,销毁互斥锁和条件变量
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_p_cond);
        pthread_cond_destroy(&_c_cond);
    }

    // 获取队列当前大小
    size_t GetSize()
    {
        return _queue.size();
    }

private:
    std::queue<int> _queue; // 交易场所

    int _maxcap;            // 队列最大数据个数
    pthread_mutex_t _mutex; // 维护生产和消费的互斥关系

    // 生产者和消费者在不同的条件变量下等待
    pthread_cond_t _p_cond;
    pthread_cond_t _c_cond;

    int _l_water; // 低水平线,低于这条线,叫醒更多的生产者生产
    int _h_water; // 高水平线,高于这条线,叫醒更多的消费者消费

    int _call_num; // 一次叫醒大量的合适值
};

#endif

2.1详细解释

1. 成员变量

  • std::queue<int> _queue:标准库中的队列,用于存储数据。
  • int _maxcap:队列的最大容量。
  • pthread_mutex_t _mutex:互斥锁,用于保证生产和消费操作的互斥,避免数据竞争。
  • pthread_cond_t _p_condpthread_cond_t _c_cond:条件变量,分别用于生产者和消费者的等待和唤醒。
  • int _l_waterint _h_water:低水位线和高水位线,用于优化生产和消费的效率。当队列中的数据量低于低水位线时,唤醒生产者;当数据量高于高水位线时,唤醒消费者。
  • int _call_num:一次性唤醒的生产者或消费者数量。

2. 构造函数

BlockQueue(int maxcap = 10)
  • 初始化队列的最大容量和相关参数。
  • 根据最大容量设置高水位线和低水位线,确保在数据量较多或较少时唤醒适当数量的生产者或消费者。
  • 初始化互斥锁和条件变量。

3. IsfullIsempty

  • bool Isfull():判断队列是否已满。通过比较队列当前大小和最大容量来实现。
  • bool Isempty():判断队列是否为空。通过检查队列是否为空来实现。

4. Push 函数

void Push(T &in)
  • 生产者向队列中添加数据。
  • 先获取互斥锁,进入临界区。
  • 如果队列已满,生产者在 _p_cond 条件变量上等待。
  • 向队列中添加数据后,唤醒在 _c_cond 条件变量上等待的消费者。
  • 如果队列中的数据量达到或超过高水位线,唤醒多个消费者。
  • 释放互斥锁,退出临界区。

5. Pop 函数

void Pop(T *out)
  • 消费者从队列中取数据。
  • 先获取互斥锁,进入临界区。
  • 如果队列为空,消费者在 _c_cond 条件变量上等待。
  • 从队列中取出数据后,唤醒在 _p_cond 条件变量上等待的生产者。
  • 如果队列中的数据量低于或等于低水位线,唤醒多个生产者。
  • 释放互斥锁,退出临界区。

6. 析构函数

~BlockQueue()
  • 销毁互斥锁和条件变量,释放资源。

7. GetSize 函数

size_t GetSize()
  • 返回队列的当前大小。

三、总结与多线程分析

总结:
这段代码通过互斥锁和条件变量实现了一个线程安全的阻塞队列,能够有效地处理生产者和消费者之间的同步问题。通过设置高水位线和低水位线,可以在数据量较多或较少时适时唤醒多个生产者或消费者,提高队列的使用效率。

多线程分析:
上面的代码实现是对应一个生产者一个消费者的情况。多生产多消费起始只需要创建多个生产者线程,多个消费者线程即可
因为临界区一次只允许一个线程访问,而这个线程是从哪来的?

对于生产者而言,生产者内部首先需要竞争出来一个生产者;
消费者也一样,需要竞争出来一个消费者;
然后,优胜的生产者和优胜的消费者之间还要进行一次竞争。
这一过程具体形象的解释了为什么这段代码可以不用改变就可以实现多生产多消费的情况。

四、生产消费模型的优势与分析

优势:

1.协调忙闲不均;
2.效率高;
3.实现生产者和消费者之间的解耦和;

整个系统共用一把锁,意味着一次只能有一个线程访问临界区。多生产多消费相对于单生产单消费而言,高效体现在哪里?
对于临界区的访问,多生产多消费和单生产单消费是没有区别的串行访问
但是产生任务,解决任务也需要耗费时间。高效体现在生产者之间和消费者之间的并发一个生产者访问队列的时候,其他生产者也在生产数据(构建请求)。消费者访问队列的时候,其他消费者也在消耗数据(解决请求)。


完~
转载请注明出处

在这里插入图片描述


http://www.kler.cn/a/582923.html

相关文章:

  • FPGA学习(三)——LED流水灯
  • 大数据实时分析:ClickHouse、Doris、TiDB 对比分析
  • 交通工具驱动电机技术解析:电瓶车、汽车、地铁与高铁的电机对比
  • 达梦数据库-学习-10-SQL 注入 HINT 规则(固定执行计划)
  • Redis Sentinel (哨兵模式)深度解析:构建高可用分布式缓存系统的核心机制
  • AI+Mermaid 制作流程图
  • 聚类中的相似矩阵和拉普拉斯矩阵
  • 计算机操作系统
  • Redis-缓存穿透击穿雪崩
  • 常见的交换机端口类型
  • k8s面经
  • 如何将错误边界与React的Suspense结合使用?
  • 随机快速排序
  • 我与DeepSeek读《大型网站技术架构》(12)-网购秒杀系统架构设计案例分析
  • JVM学习-类文件结构 类加载
  • FX-std::vector
  • Postgresql中null值和空字符串举例详解例子解析
  • SpringBoot 实现接口数据脱敏
  • 办公常用自动化工具
  • 【C++】STL全面简介与string类的使用(万字解析)