当前位置: 首页 > article >正文

【线程】基于阻塞队列的生产者消费者模型

文章目录

  • 1 生产者消费者模型
  • 2 阻塞队列
    • 2.1 成员变量
    • 2.2 消费者操作
    • 2.3 生产者生产
  • 3 总结

1 生产者消费者模型

在多线程环境中,生产者消费者模型是一种经典的线程同步模型,用于处理生产者线程与消费者线程之间的工作调度和资源共享问题。在这个模型中,生产者和消费者共享一个缓冲区,生产者往缓冲区中放入商品(或者数据),而消费者则从缓冲区中取出商品(或者数据)。为了确保线程安全,避免资源竞争,通常需要使用同步机制如互斥锁(mutex) 和 条件变量(condition variable)。

2 阻塞队列

阻塞队列在生产者消费者模型中是非常常见的一种设计,通过互斥锁条件变量来确保线程同步,避免数据竞争。生产者和消费者分别在合适的时机阻塞和唤醒彼此,使得生产者和消费者能平稳地进行数据的生产和消费。

2.1 成员变量

class BlockQueue
{
	static const int defaultnum = 20;
public:
	BlockQueue(int maxcap = defaultnum)
    :_maxcap(maxcap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_c_cond, nullptr);
        pthread_cond_init(&_p_cond, nullptr);
    }
private:
	std::queue<T> _q;  // 队列,存储生产者生产的数据
	int _maxcap;  // 队列的最大容量
	pthread_mutex_t _mutex;  // 互斥锁,用于保护队列的访问
	pthread_cond_t _c_cond;  // 消费者条件变量,用于阻塞消费	者
	pthread_cond_t _p_cond;  // 生产者条件变量,用于阻塞生产者

};
  • _q 是用于存储数据的队列。
  • _maxcap 是队列的最大容量。
  • _mutex 是互斥锁,用来保证生产者和消费者在访问队列时的互斥性。
  • _c_cond 是消费者的条件变量,当队列为空时,消费者会被阻塞,直到队列有数据。
  • _p_cond 是生产者的条件变量,当队列满时,生产者会被阻塞,直到队列有空间。

2.2 消费者操作

T pop()
    {
        //1.上锁  --> 消费的时候,需要给消费者上锁
        pthread_mutex_lock(&_mutex);

        while(_q.size() == 0)
        {
            //当商品为空的时候,就阻塞消费者
            pthread_cond_wait(&_c_cond, &_mutex);
        }

        //3.走到这里,两种情况 : 1.队列满了  2.被唤醒
        T out = _q.front();
        _q.pop();

        //4.当pop之后,队列就一定没满,因此可以唤醒生产者去生产了
        pthread_cond_signal(&_p_cond);

        pthread_mutex_unlock(&_mutex);

        return out;
    }
pthread_mutex_lock(&_mutex);

1. 先上锁,保证数据的安全

 while(_q.size() == 0)
        {
            //当商品为空的时候,就阻塞消费者
            pthread_cond_wait(&_c_cond, &_mutex);
        }

        T out = _q.front();
        _q.pop();

2. 上锁之后,可以进行消费,有两种情况:
case 1 : 队列为空,没有数据,则阻塞消费者
case 2: 队列不为空,进行消费

注意:这里的pthread_cond_wait(&_c_cond, &_mutex);在阻塞消费者的同时会释放mutex互斥锁,避免死锁的产生

 //当pop之后,队列就一定没满,因此可以唤醒生产者去生产了
pthread_cond_signal(&_p_cond);

3. 消费之后,队列一定不满(至少都有一个空位,因为刚刚消费了)。所以可以唤醒生产者进行生产

 pthread_mutex_unlock(&_mutex);

4. 所有操作结束之后,释放锁,避免死锁

2.3 生产者生产

void push(const T& in)
    {
        //1.上锁  --> 生产的时候,需要给生产者上锁
        pthread_mutex_lock(&_mutex);
        
        //2.当条件满足的时候,释放与p_cond相关的互斥锁,使“生产”线程进入阻塞状态
        //伪唤醒的情况
        while(_q.size() == _maxcap)
        {
            //自动唤醒  --> 释放_p_cond持有的锁,进入阻塞状态
            pthread_cond_wait(&_p_cond, &_mutex);
        }

        //3.生产内容 --> 走到这一步有两种可能:1.队列未满,2.被唤醒
        _q.push(in);

        pthread_cond_signal(&_c_cond);
        pthread_mutex_unlock(&_mutex);
    }
pthread_mutex_lock(&_mutex);

1. 先上锁,保证数据的安全

 //当条件满足的时候,释放与p_cond相关的互斥锁,使“生产”线程进入阻塞状态
        //伪唤醒的情况
        while(_q.size() == _maxcap)
        {
            //自动唤醒  --> 释放_p_cond持有的锁,进入阻塞状态
            pthread_cond_wait(&_p_cond, &_mutex);
        }

        //3.生产内容 --> 走到这一步有两种可能:1.队列未满,2.被唤醒
        _q.push(in);

2. 上锁之后,可以进行生产,有两种情况:
case 1 : 队列为满,不能继续生产,则阻塞生产者
case 2: 队列不为满,继续生产

为什么要使用while?

防止伪唤醒。

注意:这里的pthread_cond_wait(&_p_cond, &_mutex);在阻塞生产者的同时会释放mutex互斥锁,避免死锁的产生

pthread_cond_signal(&_c_cond);

3. 生产之后,队列一定不为空(至少有一个商品,因此可以继续消费)所以可以唤醒消费者进行消费

 pthread_mutex_unlock(&_mutex);

4. 所有操作结束之后,释放锁,避免死锁

3 总结

main.cc

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <unistd.h>
#include <ctime>

void* Consumer(void *args)
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task>*>(args);
    
    while(true)
    {
        Task t = bq->pop();

        std::cout << "消费了一个任务 : " << t.GetTask() << " 运算结果是 : " 
        << t.GetResult() << "thread id : " << pthread_self() << std::endl;

        sleep(1);
    }

}

void* Productor(void *args)
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task>*>(args);

    int x = 10, y = 20;
    while(true)
    {
        int data1 = rand() % 10 + 1;  //控制data1为[1,10]之间
        usleep(10);
        int data2 = rand() % 10 + 1;  //控制data2为[1,10]之间
        char op = opers[rand() % opers.size()];  //随机选取一个运算符

        //构建任务
        Task t(data1, data2, op);
        bq->push(t);
        std::cout << "生产了一个任务 : "  << t.GetTask() << "thread id : " << pthread_self() << std::endl;
        sleep(1);
    }
}

int main()
{
    srand(time(nullptr));
    BlockQueue<Task> *bq = new BlockQueue<Task>();
    pthread_t c[3], p[5];

    for (int i = 0; i < 3; ++ i)
    {
        pthread_create(c + i, nullptr, Consumer, bq);
    }

    for (int i = 0; i < 5; ++ i)
    {
        pthread_create(p + i, nullptr, Productor, bq);
    }

    for (int i = 0; i < 3; ++ i)
    {
        pthread_join(c[i], nullptr);
    }

    for (int i = 0; i < 5; ++ i)
    {
        pthread_join(p[i], nullptr);
    }

    delete bq;
    return 0;
}

BlockQueue.hpp

#include <iostream>
#include <queue>
#include <pthread.h>

template<class T>
class BlockQueue
{
    static const int defaultnum = 20;
public:
    BlockQueue(int maxcap = defaultnum)
    :_maxcap(maxcap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_c_cond, nullptr);
        pthread_cond_init(&_p_cond, nullptr);
    }

    T pop()
    {
        //1.上锁  --> 消费的时候,需要给消费者上锁
        pthread_mutex_lock(&_mutex);

        while(_q.size() == 0)
        {
            //当商品为空的时候,就阻塞消费者
            pthread_cond_wait(&_c_cond, &_mutex);
        }

        //3.走到这里,两种情况 : 1.队列满了  2.被唤醒
        T out = _q.front();
        _q.pop();

        //4.当pop之后,队列就一定没满,因此可以唤醒生产者去生产了
        pthread_cond_signal(&_p_cond);

        pthread_mutex_unlock(&_mutex);

        return out;
    }

    void push(const T& in)
    {
        //1.上锁  --> 生产的时候,需要给生产者上锁
        pthread_mutex_lock(&_mutex);
        
        //2.当条件满足的时候,释放与p_cond相关的互斥锁,使“生产”线程进入阻塞状态
        //伪唤醒的情况
        while(_q.size() == _maxcap)
        {
            //自动唤醒  --> 释放_p_cond持有的锁,进入阻塞状态
            pthread_cond_wait(&_p_cond, &_mutex);
        }

        //3.生产内容 --> 走到这一步有两种可能:1.队列未满,2.被唤醒
        _q.push(in);

        pthread_cond_signal(&_c_cond);
        pthread_mutex_unlock(&_mutex);
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_c_cond);
        pthread_cond_destroy(&_p_cond);
    }
private:
    std::queue<T> _q;  //共享资源,只有一个,但是可以被当成很多个
    int _maxcap;  //最大值

    pthread_mutex_t _mutex;   //锁
    pthread_cond_t _c_cond;    //consumer cond 消费者条件变量
    pthread_cond_t _p_cond;    //productor cond 生产者条件变量
};

Task.hpp

#include <iostream>
#include <string>

std::string opers = "+-*%";

enum
{
    DivZero = 1,
    ModZero,
    Unkown
};

class Task
{
public:
    Task(int x1, int x2, char oper)
    :_data1(x1)
    ,_data2(x2)
    ,_oper(oper)
    ,_result(0)
    ,_exitcode(0)
    {}

    void run()
    {
        switch(_oper)
        {
        case '+':
            _result = _data1 + _data2;
            break;
        
        case '-':
            _result = _data1 - _data2;
            break;
        
        case '*':
            _result = _data1 * _data2;
            break;
        
        case '/':
            if (_data2 == 0)
                _exitcode = DivZero;
            else
                _result = _data1 / _data2;
            break;
        case '%':
            if (_data2 == 0)
                _exitcode = ModZero;
            else
                _result = _data1 % _data2;
            break;
        default:
            _exitcode = Unkown;
            break;
        }
    }
    

    //重载operator()
    void operator()()
    {
        run();
    }

    std::string GetTask()
    {
        std::string r = std::to_string(_data1);
        r += _oper;
        r += std::to_string(_data2);
        r += "= ?";

        return r;
    }

    std::string GetResult()
    {
        std::string r = std::to_string(_data1);
        r += _oper;
        r += std::to_string(_data2);
        r += "= ";
        r += std::to_string(_result);
        r += "[code: ";
        r += std::to_string(_exitcode);
        r += "]";

        return r;
    }

    ~Task()
    {}

private:
    int _data1;
    int _data2;
    char _oper;

    int _result;
    int _exitcode;
};

在这里插入图片描述


http://www.kler.cn/a/531166.html

相关文章:

  • 基于WiFi的智能照明控制系统的设计与实现(论文+源码)
  • GWO优化LSBooST回归预测matlab
  • 13 尺寸结构模块(size.rs)
  • AI学习指南HuggingFace篇-Tokenizers 与文本处理
  • 2025年1月22日(网络编程 udp)
  • G. XOUR
  • 【C语言篇】“三子棋”
  • kubernetes(二)
  • 对比JSON和Hessian2的序列化格式
  • 前端 | JavaScript中的reduce方法
  • 【14】WLC3504 HA配置实例
  • 【股票数据API接口49】如何获取股票实时交易数据之Python、Java等多种主流语言实例代码演示通过股票数据接口获取数据
  • 自动化构建-make/Makefile 【Linux基础开发工具】
  • 本地快速部署DeepSeek-R1模型——2025新年贺岁
  • relational DB与NoSQL DB有什么区别?该如何选型?
  • C++ Primer 迭代器
  • Unity特效插件GodFX
  • 力扣经典题目之14. 最长公共前缀
  • Alibaba开发规范_异常日志之日志规约:最佳实践与常见陷阱
  • 最新功能发布!AllData数据中台核心菜单汇总
  • Win11使用VMware提示:平台不支持虚拟化的 Intel VT-x/EPT
  • 【BUUCTF逆向题】[WUSTCTF2020]level1、[GUET-CTF2019]re
  • linux通过lvm调整分区大小
  • 【Leetcode 每日一题】81. 搜索旋转排序数组 II
  • 【ChatGPT:开启人工智能新纪元】
  • 嵌入式硬件篇---HAL库内外部时钟主频锁相环分频器