当前位置: 首页 > article >正文

Linux实现生产者消费者模型

目录

概念及优势

代码实现


概念及优势

生产者消费者模型是一种用于线程同步的模型,在这个模型中有两种角色,生产者生产数据,消费者消费数据。有三种关系,生产者与生产者,消费者与消费者,生产者与消费者。还有一个交易场所。

超市就是生活中最常见的生产者消费者模型,工厂生产商品,超市充当缓冲区,消费者去超市消费同时取走超市中的商品。

超市作为缓冲区,起到了很重要的作用,试想如果没有超市,那消费者想购物只能去找工厂,还要等待工厂将商品生产出来,同时工厂也不能持续生产商品,因为没有足够的空间存放,而超市作为缓冲区解决了这些问题,消费者不用去工厂阻塞等待商品生产,工厂也可以一直生产,只要超市还能放得下。

从上面的例子可以看出生产者消费者模型的优点,将生产者和消费者解耦,支持并发,支持忙闲不均。

再来讨论一下这三者间的关系

消费者与消费者不能同时向交易场所写数据,生产者与生产者也不能同时向交易场所拿数据,这两者都是互斥且竞争的关系。而生产者不能与消费者同时拿或写数据,这两者属于互斥且同步的关系。

交易场所的问题,交易场所为空时消费者不能消费,交易场所满时生产者不能生产。

代码实现

基于阻塞队列来实现生产者消费者模型,阻塞队列与普通队列的区别在于,阻塞队列在队列为空是pop操作会阻塞,队列为满时,push操作会阻塞。

//block_queue.hpp
#include <queue>
#include <pthread.h>


namespace my
{
    template<class T>
    class block_queue
    {
    public:
        block_queue(size_t max_size = 1000)
            :_max_size(max_size)
        {
            pthread_mutex_init(&_mutex, nullptr);
            pthread_cond_init(&_consumer_cond, nullptr);
            pthread_cond_init(&_producer_cond, nullptr);
        }  


        bool empty() const
        {
            return _b_queue.empty();
        }


        bool full() const
        {
            return _b_queue.size() == _max_size;
        }


        //生产者
        void push(const T& data)
        {
            pthread_mutex_lock(&_mutex);    //加锁
            //循环判断以防止虚假唤醒
            //如果用if判断,试想这种场景
            //线程a在等待,然后被唤醒了,既然被唤醒说明此时应该是不为满
            //但是被唤醒后还需要竞争锁(因为等待会释放锁),只有线程a再次持有锁
            //才会醒来执行后面的代码
            //但是线程a有可能并没有立马拿到锁,被另一个生产者线程b拿到锁
            //并且b此时判断也是不为满
            //那么b会向后执行,会向队列里push数据
            //这就有问题了,在这之后线程a拿到了锁,向后执行,但现在已经是满队列了
        
            while(full())
            {
                //队列满了,生产者等待
                //等待必须解锁,否则其他线程都拿不到锁
                //条件变量等待的条件一定属于临界资源,
                //所以pthread_cond_wait一定属于临界区代码
                //所以pthread_cond_wait这个函数在内部一定会解锁来防止死锁
                //被唤醒后会竞争锁
                pthread_cond_wait(&_producer_cond, &_mutex);    
            }

            _b_queue.push(data);                   //入队
            std::cout << "生产数据" << data << std::endl;
            pthread_mutex_unlock(&_mutex);         //解锁
            pthread_cond_signal(&_consumer_cond);  //唤醒消费者
            std::cout << "唤醒消费者" << std::endl;
        }


        const T& pop()
        {
            pthread_mutex_lock(&_mutex);    //加锁
            //循环判断以防止虚假唤醒
            while(empty())
            {
                pthread_cond_wait(&_consumer_cond, &_mutex);    

            }

            T& ret = _b_queue.front();
            std::cout << "消费数据" << ret << std::endl;
            _b_queue.pop();                        //出队                  
            pthread_mutex_unlock(&_mutex);         //解锁
            pthread_cond_signal(&_producer_cond);  //唤醒生产者
            std::cout << "唤醒生产者" << std::endl;
            return ret;
        }


        ~block_queue()
        {
            pthread_mutex_destroy(&_mutex);
            pthread_cond_destroy(&_consumer_cond);
            pthread_cond_destroy(&_producer_cond);
        }  

        size_t size() const
        {
            return _b_queue.size();
        }


    private:
        std::queue<T> _b_queue;            //底层容器
        size_t _max_size;                  //队列最大容量
        pthread_mutex_t _mutex;            //给队列加锁
        pthread_cond_t _consumer_cond;     //消费者条件变量
        pthread_cond_t _producer_cond;     //生产者条件变量

    };
    
}

main函数

#include <iostream>
#include <memory>
#include <unistd.h>
#include "block_queue.hpp"


using data = int;
void* producer(void* _bq)
{
    my::block_queue<data>& bq = *(my::block_queue<data>*)_bq;
    data val = 0;
    while (true)
    {
        //使得生产者总是慢于消费者,这样队列一直都只会有一个元素
        //sleep(2);
        bq.push(val);
        val++;
    }
}


void* consumer(void* _bq)
{
    my::block_queue<data>& bq = *(my::block_queue<data>*)_bq;
    data val = 0;
    while (true)
    {
        val = bq.pop();
    }
}


void test()
{
    pthread_t tid_consumer1 = 0, tid_producer1 = 0;
    pthread_t tid_consumer2 = 1, tid_producer2 = 0;
    pthread_t tid_consumer3 = 2, tid_producer3 = 0;
    std::shared_ptr<my::block_queue<int>> bq(std::make_shared<my::block_queue<int>>(5));
    pthread_create(&tid_producer1, nullptr, producer, &(*bq));
    pthread_create(&tid_producer2, nullptr, producer, &(*bq));
    pthread_create(&tid_producer3, nullptr, producer, &(*bq));
    pthread_create(&tid_consumer1, nullptr, consumer, &(*bq));
    pthread_create(&tid_consumer2, nullptr, consumer, &(*bq));
    pthread_create(&tid_consumer3, nullptr, consumer, &(*bq));
    pthread_join(tid_producer1, nullptr);
    pthread_join(tid_producer2, nullptr);
    pthread_join(tid_producer3, nullptr);
    pthread_join(tid_consumer1, nullptr);
    pthread_join(tid_consumer2, nullptr);
    pthread_join(tid_consumer3, nullptr);
    
}


int main()
{
    test();
    return 0;
}


http://www.kler.cn/a/610703.html

相关文章:

  • css重点知识汇总(二)
  • 网络华为HCIA+HCIP 广域网技术
  • ffmpeg-将多个视频切片成一个新的视频
  • Azure SDK 使用指南
  • 破解PDF转Word难题:如何选择高效、安全的转换工具?
  • 多版本PHP开发环境配置教程:WAMPServer下MySQL/Apache/MariaDB版本安装与切换
  • Photoshop怎样保存为ico格式
  • 【QT】QT中的中文显示乱码解决
  • 机器人的手眼标定——机器人抓取系统基础系列(五)
  • Docker 命令分类整理
  • AXIOM —— 安装
  • 无人机数据链技术详解,无人机图传数传技术,无人机数据传输技术原理
  • 网络华为HCIA+HCIP 路由
  • Day16 -实例:Web利用邮箱被动绕过CDN拿真实ip
  • Git基础理论知识学习笔记
  • Qt信号槽函数
  • uv - Getting Started 开始使用 [官方文档翻译]
  • Linux笔记---动静态库(使用篇)
  • 使用 Go 和 Gin 实现高可用负载均衡代理服务器
  • 视频网站服务器网络连接不稳定该如何解决?