当前位置: 首页 > article >正文

[Linux]生产消费者模型

目录

一、生产消费者模型概念

1.概念

2.模块构成

3.协作机制

二、基于BlockingQueue的多CP问题

1.BlockQueue模块

2.Task任务模块

3.创建多线程以及整合模块

三、基于信号量的环形队列CP模型

1.POISX信号量接口

初始化信号量

PV操作

信号量销毁 

2.模型简述 

3.代码实现


一、生产消费者模型概念

1.概念

        生产者消费者模型按名字来听就一定有生产者和消费者成员,生产者将自己生产的物品交给消费者,这就是最简单的生产消费协同。但是这样的话,生产者生产之后就一定要当面交给消费者,然后消费者不接收生产者就无法生产,这样就会有弊端了。如果说破生产者生产的快,但是消费者消费速度慢,那么生产者就需要等待消费者将东西买走后,才能继续生产,就会大大降低产能和效率。

        所以接下来引入了超市的这一中间概念,生产者可以将生产的东西放到超市里面,不需要直接给消费者,这样的话,生产者的产能就不会受到消费者的影响了,而且消费者也可能随时去超时获取自己想要的东西,而不是说生产者生产出来,消费者就必须接收了。这个就是生产消费者模型的大概框架。

        生产消费者模式是一种经典的多线程或多进程并发协作的一种模式。他主要用于解决数据的产生和消费速度不一致的问题,实现了功能解耦、时间解耦以及处理速度的解耦,同时保证了数据的完整性和正确性。功能解耦:生产者专注生产,不关心数据如何被消费者处理;消费者专注处理,不关心数据如何生产的。时间解耦:生产者和消费者(CP)之间不需要同步运行,在满足各自条件下进行工作就可以。处理速度解耦:CP之间不需要互相等待对方,所以处理速度快的一方那么就会一直以高速的方式工作,而慢速的一方则慢慢 执行即可,有中间的共享资源区作为数据的缓冲地带。

2.模块构成

        分为生产者、消费者以及缓冲区三部分,生产者和消费者可以是一个线程、进程或者软件模块,生产者将生产的数据放入缓冲区,消费者从缓冲区中取出数据后做相应的处理工作。

3.协作机制

        该模型一定涉及到执行流的互斥与同步问题了,生产者与消费者之间存在互斥关系,生产者访问缓冲区的时候,消费者是不可以访问的,消费者访问的时候,同样的生产者也是不可以访问的。因为生产和消费二者访问缓冲区的时候,会对缓冲区的数据做修改,那么就会有线程安全问题,所以要实现互斥机制。当然二者之间也有同步关系,缓冲区中没有数据时,消费者不能进行访问缓冲区,当生产者生产数据之后,会通知消费者访问的。

        对于多生产多消费的情况下,也就是生产者和消费者都有很多线程。生产和消费都会修改缓冲区。所以生产者与生产者之间,消费者与消费者之间,也属于多线程访问共享资源,也是要实现互斥的。

        上述概括下来,就是生产者之间、消费者之间的互斥以及生产者消费者之间的互斥与同步。

        

二、基于BlockingQueue的多CP问题

1.BlockQueue模块

        该模块定义了一个任务队列,也就是CP问题的缓冲区部分、以及生产线程和消费线程对于该缓冲区的操作方法。当生产者要放入任务的时候,要先判断缓冲区中是否有空间,如果有空间的话,就放入任务,并且通知消费进程取出任务,当没有空间的时候,就进行等待条件变量就绪,这个条件变量的唤醒是由消费进程取出数据后,唤醒的,唤醒生产进程放入数据。

        对于消费进程要取出任务的时候,首先判断缓冲区有没有任务,如果有的话,就取出任务,并通知生产进程放入数据,也就是唤醒当时因为缓冲区满了,而阻塞在条件变量中的等待线程。当没有任务的时候,就阻塞在条件变量中等待生产进程唤醒。

        生产线程和消费线程之间通过锁实现的互斥操作,通过条件变量实现了同步的顺序操作。

#pragma once

#include <iostream>
#include <queue>
#include <pthread.h>

// 缓冲区默认容量
static const int default_capacity = 5;

//生产消费者模型类
template <class T>
class BlockQueue
{
private:
    std::queue<T> _task_queue; // 任务队列
    int _capacity;             // 队列容量
    pthread_mutex_t _mutex;    // 锁
    pthread_cond_t _c_cond;    // customer条件变量
    pthread_cond_t _p_cond;    // productor条件变量

public:
    // 构造函数
    BlockQueue(int capacity = default_capacity)
        :_capacity(capacity)
    {
        //初始化锁和条件变量
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_c_cond, nullptr);
        pthread_cond_init(&_p_cond, nullptr);
    }
    //生产者--将任务放入任务队列
    void Push(const T& task)
    {
        //加锁
        pthread_mutex_lock(&_mutex);
        //判断缓冲区是否满了,如果满了的话,就一直通知消费者取出数据
        //使用while是防止伪唤醒。
        while(_capacity == _task_queue.size())
        {
            //释放锁,并等待消费者通知可以生产
            pthread_cond_wait(&_p_cond, &_mutex);
        }
        //有空间了,放入任务
        _task_queue.push(task);
        //通知消费者可以取数据了
        pthread_cond_signal(&_c_cond);
        //解锁
        pthread_mutex_unlock(&_mutex);
    }
    //消费者--将任务从任务队列取出
    void PoP(T* task)
    {
        //加锁
        pthread_mutex_lock(&_mutex);
        //判断是否有数据,没有就进行等待
        while(_task_queue.empty())
        {
            //释放锁,并等待生产者通知可以消费
            pthread_cond_wait(&_c_cond, &_mutex);
        }
        //有数据了,取出数据
        *task = _task_queue.front();
        _task_queue.pop();
        //通知生产者可以生产了
        pthread_cond_signal(&_p_cond);
        //解锁
        pthread_mutex_unlock(&_mutex);
    }
    //析构函数
    ~BlockQueue()
    {
        //锁和条件变量的销毁
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_c_cond);
        pthread_cond_destroy(&_p_cond);
    }
};

2.Task任务模块

        定义的是计算两个元素的加减法类

#pragma once

#include <iostream>
#include <unistd.h>
#include <string>

static const int defaultValue = 0;
static const std::string str = "+-";

// 计算加减法的任务类
class Task
{
private:
    int _x;
    int _y;
    char _op;
    int _result;

public:
    //构造与析构函数
    Task() {}
    Task(int x, int y, char op)
        : _x(x), _y(y), _op(op), _result(defaultValue) {}
    ~Task() {}

    void Run()
    {
        switch (_op)
        {
        case '+':
            _result = _x + _y;
            break;
        case '-':
            _result = _x - _y;
            break;
        default:
            std::cout << "op is error" << std::endl;
            break;
        }
    }
    // 运行任务重载函数
    void operator()()
    {
        Run();
        std::cout << _x << _op << _y << "=" << _result << std::endl;
        sleep(2);
    }
};

3.创建多线程以及整合模块

        该模块则是定义了生产线程和消费线程具体线程函数。以及创建线程、定义任务队列类等操作。

#include <cstdlib>
#include <vector>
#include <string>
#include <cstring>
#include <pthread.h>
#include "BlockQueue.hpp"
#include "Task.hpp"

// 线程传递的参数
struct ThreadData
{
    BlockQueue<Task> *_cp;    // 生产者消费者模型对象
    std::string _thread_name; // 线程名称

    // 构造函数
    ThreadData(BlockQueue<Task> *cp, std::string name)
        : _cp(cp), _thread_name(name) {}
};
// 消费者线程函数
void *Costomer(void *arg)
{
    // 接收参数
    ThreadData *data = (ThreadData *)arg;

    while (true)
    {
        // 获取任务
        Task task;
        data->_cp->PoP(&task);
        std::cout << data->_thread_name << " run task : ";
        // 执行任务
        task();
    }
    return nullptr;
}
// 生产者线程参数
void *Productor(void *arg)
{
    // 接收参数
    ThreadData *data = (ThreadData *)arg;

    while (true)
    {
        // 生产任务数据
        int data1 = rand() % 10;
        int data2 = rand() % 10;
        char op = str[rand() % str.size()];
        Task task(data1, data2, op);
        // 将任务放入队列
        data->_cp->Push(task);
        std::cout << data->_thread_name << " push task into queue : " << data1 << op << data2 << std::endl;
    }
    return nullptr;
}

int main()
{
    // 生产随机数种子
    srand((uint16_t)time(nullptr) ^ getpid() ^ pthread_self());
    // 创建生产者消费者模型对象
    BlockQueue<Task> *cp = new BlockQueue<Task>();

    // 创建生产者消费者线程
    std::vector<pthread_t> custumer_thread(5);
    std::vector<pthread_t> productor_thread(5);
    // 消费者
    for (int i = 0; i < 5; i++)
    {
        // 线程id
        pthread_t tid;
        // 构造线程参数
        std::string thread_name = "customer-" + std::to_string(i);
        ThreadData data(cp, thread_name);
        // 创建线程
        pthread_create(&tid, nullptr, Costomer, &data);
        // 放入数组中
        custumer_thread.push_back(tid);
    }
    // 生产者
    for (int i = 0; i < 5; i++)
    {
        // 线程id
        pthread_t tid;
        // 构造线程参数
        std::string thread_name = "productor-" + std::to_string(i);
        ThreadData data(cp, thread_name);
        // 创建线程
        pthread_create(&tid, nullptr, Productor, &data);
        // 放入数组中
        productor_thread.push_back(tid);
    }
    
    // 线程等待
    for (size_t i = 0; i < custumer_thread.size(); ++i)
    {
        if (pthread_join(custumer_thread[i], nullptr) != 0)
        {
            std::cerr << "等待消费者线程结束失败" << std::endl;
        }
    }

    for (size_t i = 0; i < productor_thread.size(); ++i)
    {
        if (pthread_join(productor_thread[i], nullptr) != 0)
        {
            std::cerr << "等待生产者线程结束失败" << std::endl;
        }
    }

    // 释放资源
    delete cp;

    return 0;
}

三、基于信号量的环形队列CP模型

1.POISX信号量接口

        其实和System V信号量的作用基本上是一样的,只是接口上有区别。

初始化信号量

int sem_init(sem_t* sem, int pshared, unsigned int value);

        第一个参数是传递要初始化的信号量;第二个参数表示信号量是用于进程间(非零值)还是线程间(零值)共享;第三个参数则是信号量的初始值。

PV操作

int sem_wait(sem_t* sem);   //申请操作

int sem_post(sem_t* sem);  //释放操作

信号量销毁 

int sem_destory(sem_T* sem);

2.模型简述 

        该模型使用了锁实现了线程之间的互斥,信号量的方式实现了同步机制。但是这里的互斥只有生产者和生产者、消费者和消费者之间的互斥,没有生产者与消费者之间的互斥。因为使用了信号量记录了该资源区域的使用情况,用信号量表示了剩余空间的个数和存在的资源的个数,相当于将一个大的资源区域分成了若干个小区域。同时配合指针记录生产者下一次放数据的位置,消费者下一次取数据的位置。所以生产者线程和消费者线程两者访问不到同一个位置,所以也就不存在两个的互斥问题了。

        信号量实现的同步是,当space剩余空间信号量为0的时候,生产线程无法放入数据,会等待消费线程取出数据,当消费线程取出数据后,会释放一个space信号量,那么生产线程就可以申请到信号量了,就可以放入数据了。放入数据之后,会将data信号量释放一个,表示着队列中的数据量多了一个。

        在申请信号量和锁的顺序上,应该是先申请信号量,信号量代表的是该线程对于想要获取的资源是否存在,如果存在的话,再去为了访问资源而申请锁,如果说没有资源的话,申请到锁也没用,所以说应该先去申请信号量,之后再去申请锁资源。而且先申请锁的话,只会有一个线程到达申请信号量的步骤,而先申请信号量的话,可以有多个线程到达申请锁的位置等待锁资源。

        经过上述阐述,因为生产者和消费者没有互斥的关系,所以要给生产者和消费者各设定一把锁去保证线程安全。同时还有设定两个信号量,一个是剩余空间信号量,另一个是剩余资源的信号量。

3.代码实现

#pragma once

#include <iostream>
#include <pthread.h>
#include <vector>
#include <semaphore.h>

// 环形队列默认大小
static const int default_capacity = 5;

template <class T>
class RingQueue
{
private:
    std::vector<T> _ringQueue; // 环形队列
    int _capacity;             // 最大容量
    int _p_step;               // 生产者放入任务的位置
    int _c_step;               // 消费者取出任务的位置
    sem_t _space_sem;          // 剩余空间信号量
    sem_t _data_sem;           // 剩余任务信号量
    pthread_mutex_t _p_mutex;  // 生产锁
    pthread_mutex_t _c_mutex;  // 消费锁

private:
    //申请信号量操作
    void P(sem_t &sem) { sem_wait(&sem); }
    //释放信号量操作
    void V(sem_t &sem) { sem_post(&sem); }

public:
    //构造函数
    RingQueue(int capacity = default_capacity)
        :_ringQueue(capacity), _capacity(capacity), _p_step(0), _c_step(0)
    {
        //初始化信号量,剩余空间为capacity,剩余任务数据为0
        sem_init(&_space_sem, 0, _capacity);
        sem_init(&_data_sem, 0, 0);
        //初始化锁
        pthread_mutex_init(&_p_mutex, nullptr);
        pthread_mutex_init(&_c_mutex, nullptr);
    }
    //生产者放入任务操作
    void Push(const T& task)
    {
        //申请信号量并加锁
        P(_space_sem);
        pthread_mutex_lock(&_p_mutex);

        //放入任务
        _ringQueue[_p_step] = task;
        //更新生产者指针位置
        _p_step++;
        _p_step %= _capacity;

        //解锁并释放信号量
        pthread_mutex_unlock(&_p_mutex);
        V(_data_sem);
    }
    //消费者取出任务操作
    void Pop(T* task)
    {
        //申请信号量并加锁
        P(_data_sem);
        pthread_mutex_lock(&_c_mutex);

        //取出任务
        *task = _ringQueue[_c_step];
        //更新消费者指针位置
        _c_step++;
        _c_step %= _capacity;

        //解锁并释放信号量
        pthread_mutex_unlock(&_c_mutex);
        V(_space_sem);
    }
    //析构函数
    ~RingQueue()
    {
        //销毁信号量和锁
        sem_destroy(&_space_sem);
        sem_destroy(&_data_sem);
        
        pthread_mutex_destroy(&_p_mutex);
        pthread_mutex_destroy(&_c_mutex);
    }
};


http://www.kler.cn/a/473179.html

相关文章:

  • 基于单片机的数字气压计设计
  • STM32烧写失败之Contents mismatch at: 0800005CH (Flash=FFH Required=29H) !
  • iOS - AutoreleasePool
  • 用OpenCV实现UVC视频分屏
  • LabVIEW四旋翼飞行器姿态监测系统
  • 如何打开/处理大型dat文件?二进制格式.dat文件如何打开?Python读取.dat文件
  • 概率论常用的分布公式
  • Monaco Editor 系列报错修复:webpack-cli已经下载了但是还报错
  • 用JavaScript和python实现简单四则运算出题机
  • 如何在Jupyter中快速切换Anaconda里不同的虚拟环境
  • HQChart使用教程30-K线图如何对接第3方数据44-DRAWPIE数据结构
  • maven依赖的配置和排除依赖
  • 【Vim Masterclass 笔记08】第 6 章:Vim 中的文本变换及替换操作 + S06L20:文本的插入、变更、替换,以及合并操作
  • Ruby语言的并发编程
  • 2025-1-7-sklearn学习(33)数据集转换-特征提取 我不去想未来是平坦还是泥泞,只要热爱生命,一切 都在意料之中。
  • 【STM32+QT项目】基于STM32与QT的智慧粮仓环境监测与管理系统设计(完整工程资料源码)
  • 使用wav2vec 2.0进行音位分类任务的研究总结
  • HunyuanVideo: A Systematic Framework For LargeVideo Generative Models 论文解读
  • 网络基础1 http1.0 1.1 http/2的演进史
  • 【Uniapp-Vue3】创建自定义页面模板
  • C++语言的计算机基础
  • 【LeetCode】307. 区域和检索 - 数组可修改
  • GPT解释联合训练中的颜色映射概念
  • 设计模式学习笔记——结构型模式
  • C#通过外部进程调用Python
  • 计算机网络之---数据链路层的功能与作用