当前位置: 首页 > article >正文

【Linux】生产者消费者模型

需要云服务器等云产品来学习Linux的同学可以移步/–>腾讯云<–/官网,轻量型云服务器低至112元/年,优惠多多。(联系我有折扣哦)

文章目录

  • 1. 生产者消费者模型概念和理解
    • 1.1什么是生产者消费者模型
    • 1.2 生产者消费者模型的分析
  • 2. 基于BlockQueue的生产者消费者模型
    • 2.1 阻塞队列(BlockQueue)
    • 2.2 代码实现与测试
    • 2.3 生产者消费者模型的性能分析
  • 3. 基于BlockQueue的生产消费模型的扩展应用

1. 生产者消费者模型概念和理解

1.1什么是生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

举个例子:生活中的超市,大家买东西会去超市里面买,供货商会把货物供给超市。此时我们就是消费者,供货商就是生产者,超市就是容器,供货商只管生产商品然后放进超市,大家只管去超市购买商品,不需要管另一方的生产(消费)情况。就完成了解耦。

image-20240204150852254

1.2 生产者消费者模型的分析

1. 生产者消费者模型的优点/特点

  • 使生产者和消费者进行解耦,减少互相影响
  • 支持并发操作
  • 支持忙闲不均

2. 生产者消费者模型的“321”原则

  • 3种关系:生产者和生产者(互斥);消费者和消费者(互斥);生产者和消费者(互斥和同步)
  • 2种角色:生产者和消费者角色
  • 1个交易场所:一段特定结构的缓冲区

2. 基于BlockQueue的生产者消费者模型

2.1 阻塞队列(BlockQueue)

在多线程编程中,阻塞队列是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

image-20240204152402730

2.2 代码实现与测试

使用C++中queue模拟实现阻塞队列的生产者消费者模型

这里为了方便理解,采用单生产者单消费者来实现

/*BlockQueue.hpp*/
#pragma once
#include <queue>
#include <pthread.h>

#define N 5

template <class T>
class BlockQueue
{
public:
    BlockQueue(int maxCap = N) : _maxCap(maxCap) // 初始化阻塞队列最大容量
    {
        pthread_mutex_init(&_mutex, nullptr); // 初始化互斥量
        pthread_cond_init(&_pcond, nullptr);  // 初始化条件变量
        pthread_cond_init(&_ccond, nullptr);
    }
    // 备注:一般来说,我们设计函数的参数时,如果是出入型参数就使用const &,输出型参数使用*,既输入又输出的参数使用&
    void push(const T &in) // 输入型参数,要放进队列里面的值(生产)
    {
        pthread_mutex_lock(&_mutex); // 生产的内容放进阻塞队列之前首先进行加锁,保证线程安全
        // 1. 判断队列是否已满,已满就要进行阻塞等待
        // 细节:这里使用while做循环判断不能用if,因为如果同一时间多个生产/消费线程接收到信息,就会出现唤醒多个线程的情况,多个生产线程生产内容不能被全部放进队列中
        while (is_full())
        {
            // 在pthread_cond_wait函数中需要传入我们正在使用的互斥锁。
            // ?想一想:如果当前使用的mutex被当前线程获取,那么消费线程也没有办法获取到互斥锁,就没有办法从队列中消费内容,也就不可能让生产线程继续生产,也就产生了死锁现象
            // 所以,这里pthread_cond_wait线程传入mutex之后,会将mutex解锁,然后将自己挂起,直到该线程被唤醒之后,pthread_cond_wait会和其他线程一起竞争mutex资源
            // 竞争到这个mutex资源之后函数调用结束,返回。
            pthread_cond_wait(&_pcond, &_mutex);
        }
        // 2. 未满就入队列
        _q.push(in);
        // 3. 走到这里就保证了当前队列里面一定有内容可以被消费,所以唤醒消费线程
        pthread_cond_signal(&_ccond); // 这里的唤醒是可以有一定的策略的,为了方便理解,我们忽略这些策略问题
        pthread_mutex_unlock(&_mutex); // 操作完成后解锁
    }
    void pop(T *out) // 输出型参数,要从队列里面拿走的值(消费)
    {
        pthread_mutex_lock(&_mutex);
        while(is_empty())
        {
            pthread_cond_wait(&_ccond, &_mutex);
        }
        *out = _q.front();
        _q.pop();

        pthread_cond_signal(&_pcond);
        pthread_mutex_unlock(&_mutex);
    }
    ~BlockQueue()
    {
        // 回收资源:包括两个条件变量和一个互斥量
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_ccond);
        pthread_cond_destroy(&_pcond);
    }

private:
    bool is_empty()
    {
        return _q.empty();
    }
    bool is_full()
    {
        return _q.size() == _maxCap;
    }

private:
    std::queue<T> _q;            // 队列
    int _maxCap;            // 最大容量
    pthread_mutex_t _mutex; // 互斥量
    pthread_cond_t _ccond;  // 消费者条件变量
    pthread_cond_t _pcond;  // 生产者条件变量
};

测试代码1:

/*mainPC.cc*/
#include <iostream>
#include <sys/types.h>
#include <unistd.h>
#include "BlockQueue.hpp"

void *productor(void *bq_)
{
    BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(bq_);

    while (true)
    {
        int data = rand() % 10 + 1;
        bq->push(data);
        std::cout << "生产数据" << data << std::endl;
        sleep(1); // 这里生产者的速度慢一点,消费的速度很快,最后的现象就是生产了一个就消费一个
    }

    return nullptr;
}
void *consumer(void *bq_)
{
    BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(bq_);

    while (true)
    {
        int data;
        bq->pop(&data);
        std::cout << "消费数据" << data << std::endl;
    }

    return nullptr;
    return nullptr;
}

int main()
{
    srand((unsigned int)time(nullptr) ^ getpid());
    BlockQueue<int> *bq = new BlockQueue<int>();

    pthread_t p, c;
    pthread_create(&p, nullptr, productor, bq);
    pthread_create(&c, nullptr, consumer, bq);

    pthread_join(p, nullptr);
    pthread_join(c, nullptr);

    delete bq;
    return 0;
}

image-20240205191426822

测试代码2:

#include <iostream>
#include <sys/types.h>
#include <unistd.h>
#include "BlockQueue.hpp"

void *productor(void *bq_)
{
    BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(bq_);

    while (true)
    {
        int data = rand() % 10 + 1;
        bq->push(data);
        std::cout << "生产数据" << data << std::endl;
        // sleep(1); // 这里生产者的速度慢一点,消费的速度很快,最后的现象就是生产了一个就消费一个
    }

}
void *consumer(void *bq_)
{
    BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(bq_);

    while (true)
    {
        int data;
        bq->pop(&data);
        std::cout << "消费数据" << data << std::endl;
        sleep(1); // 这里消费速度慢一点,生产速度很快,结果就是开始运行的一瞬间,生产线程把队列填满,然后消费一个生产一个
    }

    return nullptr;
    return nullptr;
}

int main()
{
    srand((unsigned int)time(nullptr) ^ getpid());
    BlockQueue<int> *bq = new BlockQueue<int>();

    pthread_t p, c;
    pthread_create(&p, nullptr, productor, bq);
    pthread_create(&c, nullptr, consumer, bq);

    pthread_join(p, nullptr);
    pthread_join(c, nullptr);

    delete bq;
    return 0;
}

image-20240205191800806

2.3 生产者消费者模型的性能分析

在前面的内容中我们说生产者消费者模型具有高效率的特性。但是会有很多疑问

为什么生产者消费者模型具有高效性?通过线程同步和互斥已经保证了临界资源单执行流访问,还有什么提高效率的空间呢?

生产者消费者模型的关注点不应该只在阻塞队列这里,而是要考虑到在生产之前的事情和消费之后的事情

  • 对于生产者而言,要向BlockQueue里面放置任务,对于消费者而言,要从BlockQueue里面拿去任务
  • 对于生产者,任务从哪里来?获取任务和构建任务是要花时间的
  • 对于消费者,难道它把任务从任务队列中拿出来就完了吗?消费者拿到任务之后,后续还有没有任务?

所以,高效体现在一个线程拿出来任务可能正在做计算,它在做计算的同时,其他线程可以继续从队列中拿,继续做运算,高效并不是体现在从队列中拿数据高效!而是**我们可以让一个、多个线程并发的同时计算多个任务!**在计算多个任务的同时,并不影响其他线程,继续从队列里拿任务的过程。也就是说,生产者消费者模型的高效:可以在生产之前与消费之后让线程并行执行,不要认为生产者消费模式仅仅只是把任务生产到队列的过程就是生产过程,生产过程:1.拿任务、需要费点劲2.拿到后再放到队列里面整个一体,整个生产的过程;整个消费的过程:不是把任务拿到线程的上下文中就完了,拿到之后还要进行计算或存储这些工作才是消费的过程在生产前和和消费后我们多个线程是可以并发的。

3. 基于BlockQueue的生产消费模型的扩展应用

上面我们对BlockQueue的测试使用的是简单的生产一个整数和消费一个整数,但是实际上我们在实现BlockQueue的时候使用的是类模版,我们实际上是可以在BlockQueue中存放指定任务的,接下来我们在对这个测试进行一些扩展

一共有三个任务需要被完成:1、生产任务;2、完成任务;3、保存结果,我们这里使用一个计算作为要生产的任务,接下来改写测试代码

/*封装任务Task.hpp*/
#pragma once

#include <string>
#include <functional>

static std::string oper = "+-*/%";

class CalTask
{
    using func_t = std::function<int(int, int, char)>;

public:
    CalTask() {}
    CalTask(int x, int y, char op, func_t func)
        : _x(x), _y(y), _op(op), _callback(func)
    {
    }
    std::string operator()()
    {
        int result = _callback(_x, _y, _op);
        char buffer[64];
        snprintf(buffer, sizeof buffer, "%d %c %d = %d", _x, _op, _y, result);
        return buffer;
    }
    std::string toTaskString()
    {
        char buffer[64];
        snprintf(buffer, sizeof buffer, "%d %c %d = ?", _x, _op, _y);
        return buffer;
    }

private:
    int _x;
    int _y;
    char _op;
    func_t _callback;
};
class SaveTask
{
    typedef std::function<void(const std::string)> func_t;

public:
    SaveTask() {}
    SaveTask(const std::string &msg, func_t func)
        : _msg(msg), _func(func)
    {
    }
    void operator()()
    {
        _func(_msg);
    }

private:
    std::string _msg;
    func_t _func;
};
int mymath(int a, int b, char op)
{
    int ans = 0;
    switch (op)
    {
    case '+':
        ans = a + b;
        break;
    case '-':
        ans = a - b;
        break;
    case '*':
        ans = a * b;
        break;
    case '/':
    {
        if (b == 0)
        {
            std::cerr << "div zero error!" << std::endl;
            ans = -1;
        }
        else
            ans = a / b;
    }
    break;
    case '%':
    {
        if (b == 0)
        {
            std::cerr << "mod zero error!" << std::endl;
            ans = -1;
        }
        else
            ans = a % b;
    }
    break;
    default:
        break;
    }
    return ans;
}
void Save(const std::string &msg)
{
    FILE *fp = fopen("./log.txt", "a+");
    if (fp == NULL)
    {
        std::cerr << "open file error" << std::endl;
        return;
    }
    fputs(msg.c_str(), fp);
    fputs("\n", fp);

    fclose(fp);
}
/*main.cc*/
#include <iostream>
#include <sys/types.h>
#include <unistd.h>
#include "BlockQueue.hpp"
#include "Task.hpp"

struct BlockQueues // 作为参数传递给其他线程
{
public:
    BlockQueue<CalTask> *c_bq;
    BlockQueue<SaveTask> *s_bq;

public:
    BlockQueues()
    {
        c_bq = new BlockQueue<CalTask>();
        s_bq = new BlockQueue<SaveTask>();
    }
    ~BlockQueues()
    {
        delete c_bq;
        delete s_bq;
    }
};

void *productor(void *bqs_) // 生产线程执行的任务
{
    BlockQueue<CalTask> *bq = static_cast<BlockQueues *>(bqs_)->c_bq;

    while (true)
    {
        int x = rand() % 10 + 1;
        int y = rand() % 5 + 1;
        int opNum = rand() % oper.size();
        CalTask tmp(x, y, oper[opNum], mymath);
        bq->push(tmp);
        std::cout << "predictor thread 生产任务成功" << tmp.toTaskString() << std::endl;
        sleep(1); // 这里生产者的速度慢一点,消费的速度很快,最后的现象就是生产了一个就消费一个
    }
}
void *consumer(void *bqs_) // 计算线程执行的任务
{
    BlockQueue<CalTask> *bq1 = static_cast<BlockQueues *>(bqs_)->c_bq;
    BlockQueue<SaveTask> *bq2 = static_cast<BlockQueues *>(bqs_)->s_bq;

    while (true)
    {
        // 完成计算任务
        CalTask tmp;
        bq1->pop(&tmp);
        std::string result = tmp();
        std::cout << "cal thread 计算任务完成" << result << std::endl;

        // 推送存储任务
        SaveTask save(result, Save);
        bq2->push(save);
        std::cout << "cal thread 推送存储任务完成" << std::endl;
        //  sleep(1); // 这里消费速度慢一点,生产速度很快,结果就是开始运行的一瞬间,生产线程把队列填满,然后消费一个生产一个
    }

    return nullptr;
    return nullptr;
}
void *saver(void *bqs_) // 保存线程执行的任务
{
    BlockQueue<SaveTask> *bq = static_cast<BlockQueues *>(bqs_)->s_bq;

    while (true)
    {
        SaveTask t;
        bq->pop(&t);
        t();
        std::cout << "save thread 存储任务执行完成" << std::endl;
    }

    return nullptr;
}

int main()
{
    srand((unsigned int)time(nullptr) ^ getpid());
    BlockQueues bqs; // 创建bqs对象

    pthread_t p, c, s; // p表示生产 c表示计算 s表示保存
    pthread_create(&p, nullptr, productor, &bqs);
    pthread_create(&c, nullptr, consumer, &bqs);
    pthread_create(&s, nullptr, saver, &bqs);

    pthread_join(p, nullptr);
    pthread_join(c, nullptr);
    pthread_join(s, nullptr);

    return 0;
}

这里的设计采用的是两个阻塞队列,在三个执行流之间,生产和计算之间有一个阻塞队列用于保存计算任务,在计算和存储之间有一个阻塞队列用于保存存储任务,所以控制了生产的速度之后,最后会出现的情况就是生产一个计算任务放入阻塞队列中,计算线程从其中拿出计算任务计算,完成计算任务之后产生存储任务放到另一个阻塞队列中,等待存储线程从其中拿。所以最终显示的结果就是产生一个任务,计算一个任务,存储一个任务,周而复始。同时log.txt中不断被写入数据

# 监控脚本
 while :; do echo "+++++++++++++++++++++++"; cat log.txt;  sleep 1; done

image-20240205214502711


本节完……


http://www.kler.cn/a/228832.html

相关文章:

  • tui-editor报错
  • ORB-SLAM2源码学习:ORBmatcher.cc⑥: int ORBmatcher::Fuse将地图点投影到关键帧中进行匹配和融合
  • ASP.NET Core - 配置系统之自定义配置提供程序
  • leetcode707-设计链表
  • SimpleHelp远程管理软件存在任意文件读取漏洞(CVE-2024-57727)
  • 21天学通C++——11多态(引入多态的目的)
  • 静态库和动态库
  • vue全屏,退出全屏、监听ESC退出全屏
  • 01背包问题 动态规划
  • CAN通信----(创芯科技)CAN分析仪----转CANTest使用
  • 2024年2月CCF-全国精英算法大赛题目
  • 前端面试题——Vue的双向绑定
  • <网络安全>《16 网络安全隔离与信息单向导入系统》
  • 计算机视觉实战项目3(图像分类+目标检测+目标跟踪+姿态识别+车道线识别+车牌识别+无人机检测+A*路径规划+单目测距与测速+行人车辆计数等)
  • 【HarmonyOS应用开发】Web组件的使用(十三)
  • 壹[1],Xamarin开发环境配置
  • linux的nginx安装
  • 复旦大学NLP团队发布86页大模型Agent综述
  • Git私服搭建
  • UML---用例图,类图
  • 前端如何预防CSRF
  • python的进程,线程、协程
  • 群晖NAS开启FTP服务结合内网穿透实现公网远程访问本地服务
  • Unity3D开发之鼠标单双击判断
  • 如何在PS5上使用金手指修改游戏
  • docker 离线安装镜像