当前位置: 首页 > article >正文

【Linux】基于阻塞队列的生产者消费者模型

​🌠 作者:@阿亮joy.
🎆专栏:《学会Linux》
🎇 座右铭:每个优秀的人都有一段沉默的时光,那段时光是付出了很多努力却得不到结果的日子,我们把它叫做扎根
在这里插入图片描述

目录

    • 👉为何要使用生产者消费者模型👈
    • 👉生产者消费者模型的优点👈
    • 👉基于阻塞队列的生产者和消费者模型👈
    • 👉总结👈

👉为何要使用生产者消费者模型👈

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

生产者消费者模型在生活中是相当常见的,比如客户去超市里买商品就是一个很好的例子。在这个例子中,供货商就是生产者,顾客就是消费者,超市就是一个交易场所,本质是一个商品的缓冲区。超市存在的意义就是让生产者和消费者解耦,以提高效率。


在生产者消费者模型中,有一个交易场所超市,生产者和消费者两种角色,生产者和生产者、消费者和消费者、生产者和消费者三种关系。其中生产者和生产者之间的关系是竞争互斥关系,消费者和消费者之间的关系是竞争互斥关系,生产者和消费者之间的关系是互斥和同步关系。当生产者进行生产时,消费者不能进行消费(保证安全),所以生产者和消费者是互斥关系。当商品很少时,生产者需要进行生产,消费者需要进行等待;而当商品很多时,生产者不能进行生产,消费者进行消费,所以生产行为和消费行为具有一定的顺序性,也就是说生产者和消费者是同步关系。

在计算机世界里,生产者和消费者都是通过线程模拟出来的。当生产者线程生产完商品后,就可以通知消费者线程来进行消费;而当消费者线程消费完商品后,就可以通知生产者线程来进行生产,这个过程就是通过条件变量来实现的。

👉生产者消费者模型的优点👈

想要真正理解生产者消费者模型优点,我们需要知道生产者生产数据和消费者消费数据都是想要消耗时间的。那么当消费者线程进行消费数据时,该线程是不会访问阻塞队列的。所以当消费者线程进行消费时,生产者线程从其他地方获取数据并将该数据放入到阻塞队列中,这样就可以提高生产者线程和消费者线程的并发度了。所以生产者消费者模型具有一下优点:解耦、支持并发。

👉基于阻塞队列的生产者和消费者模型👈

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)。注:管道本身就是一个阻塞队列,有数据就消费,没数据就等待。管道内部已经实现了互斥和同步的功能。

在这里插入图片描述

单生产者单消费者模型

// BlockQueue.hpp
#pragma once

#include <iostream>
#include <pthread.h>
#include <queue>

const int DefaultCap = 5;

template <class T>
class BlockQueue
{
private:
    bool isEmpty()
    {
        return _bq.empty();
    }

    bool isFull()
    {
        return _bq.size() == _capacity;
    }

public:
    BlockQueue(int capacity = DefaultCap)
        : _capacity(capacity)
    {
        pthread_mutex_init(&_mtx, nullptr);
        pthread_cond_init(&_empty, nullptr);
        pthread_cond_init(&_full, nullptr);
    }

    // 生产者
    void push(const T& in)
    {
        // 访问临界资源需要进行加锁保护
        pthread_mutex_lock(&_mtx);
        // 1.先检查当前的临界资源是否满足访问条件
        // 检查临界资源是否满足访问条件,也是访问临界资源
        // 所以它也需要在加锁和解锁之间
        // pthread_cond_wait函数是让线程在特定的条件变量下阻塞等待
        // 当进行等待时,线程所持有的锁会被自动释放掉.当条件变量满足
        // 时,线程会被在阻塞挂起的地方被唤醒.线程被唤醒的时候,是在临
        // 界区中的,pthread_cond_wait会自动帮助线程获取锁
        while(isFull())
            pthread_cond_wait(&_full, &_mtx);
        // pthread_cond_wait是一个函数,它就可能会调用失败,从而出现
        // 伪唤醒(临界资源不满足访问条件,却往下进行临界资源的访问)的
        // 情况,所以需要通过while来保证满足访问临界资源的条件,而不能
        // 通过if来判断临界资源是否能被访问
        
        // 2.访问临界资源(来到这里,临界资源100%是就绪的!)
        _bq.push(in);
        // 可以指定特定的唤醒线程的策略,如数据的个数大于容量的一半
        // if(2 * _bq.size() >= _capacity) pthread_cond_signal(&_empty);
        // 唤醒线程可以在释放锁之前,也可以在释放锁之后
        pthread_cond_signal(&_empty);

        pthread_mutex_unlock(&_mtx);
    }

    // 消费者
    void pop(T* out)
    {
        pthread_mutex_lock(&_mtx);
        while(isEmpty())
            pthread_cond_wait(&_empty, &_mtx);

        *out = _bq.front();
        _bq.pop();
        pthread_cond_signal(&_full);

        pthread_mutex_unlock(&_mtx);
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mtx);
        pthread_cond_destroy(&_empty);
        pthread_cond_destroy(&_full);
    }

private:
    std::queue<T> _bq;     // 阻塞队列(STL中的容器并不是线程安全的,需要加锁进行保护)
    int _capacity;         // 容量上线
    pthread_mutex_t _mtx;  // 互斥锁保护队列的安全
    pthread_cond_t _empty; // 该条件变量表示bq是否为空
    pthread_cond_t _full;  // 该条件变量表示bq是否为满
};

// ConProd.cc
#include "BlockQueue.hpp"
#include <pthread.h>
#include <unistd.h>

void* consume(void* args)
{
    BlockQueue<int>* bq = (BlockQueue<int>*)args;
    while(1)
    {
        int a;
        bq->pop(&a);
        std::cout << "消费了一个数据:" << a << std::endl;
        sleep(1);
    }
    return nullptr;
}

void* produce(void* args)
{
    BlockQueue<int>* bq = (BlockQueue<int>*)args;
    int a = 1;
    while(1)
    {
        bq->push(a);
        std::cout << "生产了一个数据:" << a << std::endl;
        ++a;
    }
    return nullptr;
}

int main()
{
    BlockQueue<int>* bq = new BlockQueue<int>();
    pthread_t c, p;

    pthread_create(&c, nullptr, consume, (void*)bq);
    pthread_create(&p, nullptr, produce, (void*)bq);
	
    pthread_join(c, nullptr);
    pthread_join(p, nullptr);
    delete bq;
    
    return 0;
}

在这里插入图片描述

增加任务类

// Task.hpp
#pragma once

#include <iostream>
#include <functional>

typedef std::function<int(int, int)> func_t;

class Task
{
public:
    Task() = default;

    Task(int x, int y, func_t func)
        : _x(x)
        , _y(y)
        , _func(func)
    {}

    int operator()()
    {
        return _func(_x, _y);
    }

public:
    int _x;
    int _y;
    func_t _func;
};

// ConProd.cc
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>

int Add(int x, int y)
{
    return x + y;
}

void* consume(void* args)
{
    BlockQueue<Task>* bq = (BlockQueue<Task>*)args;
    while(1)
    {
        // 获取任务
        Task t;
        bq->pop(&t);
        // 完成任务
        std::cout << "consumer: " << t._x << " + " << t._y << " = " << t() << std::endl;
    }
    return nullptr;
}

void* produce(void* args)
{
    BlockQueue<Task>* bq = (BlockQueue<Task>*)args;
    while(1)
    {
        // 制作任务 -- 不一定从生产者来的,可能是从网络来的
        int x = rand() % 10 + 1;
        int y = rand() % 20 + 1;
        // 生产任务
        Task t(x, y, Add);
        bq->push(t);
        std::cout << "productor: " << x << " + " << y << " = ?" << std::endl; 
        sleep(1);
    }
    return nullptr;
}

int main()
{
    srand((unsigned int)time(nullptr) ^ getpid() ^ 0x123);
    BlockQueue<Task>* bq = new BlockQueue<Task>();
    pthread_t c, p;

    pthread_create(&c, nullptr, consume, (void*)bq);
    pthread_create(&p, nullptr, produce, (void*)bq);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);
    delete bq;

    return 0;
}

在这里插入图片描述

多生产者多消费者

// ConProd.cc
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>

int Add(int x, int y)
{
    return x + y;
}

void* consume(void* args)
{
    BlockQueue<Task>* bq = (BlockQueue<Task>*)args;
    while(1)
    {
        // 获取任务
        Task t;
        bq->pop(&t);
        // 完成任务
        std::cout << pthread_self() << " consumer: " << t._x << " + " << t._y << " = " << t() << std::endl;
    }
    return nullptr;
}

void* produce(void* args)
{
    BlockQueue<Task>* bq = (BlockQueue<Task>*)args;
    while(1)
    {
        // 制作任务 -- 不一定从生产者来的,可能是从网络来的
        int x = rand() % 10 + 1;
        int y = rand() % 20 + 1;
        // int x, y;
        // std::cout << "Please Enter x: ";
        // std::cin >> x;
        // std::cout << "Please Enter y: ";
        // std::cin >> y;
        // 生产任务
        Task t(x, y, Add);
        bq->push(t);
        std::cout << pthread_self() << " productor: " << x << " + " << y << " = ?" << std::endl; 
        sleep(1);
    }
    return nullptr;
}

int main()
{
    srand((unsigned int)time(nullptr) ^ getpid() ^ 0x123);
    BlockQueue<Task>* bq = new BlockQueue<Task>();
    // pthread_t c, p;

    pthread_t c[2], p[2];
    pthread_create(c, nullptr, consume, (void*)bq);
    pthread_create(c + 1, nullptr, consume, (void*)bq);
    pthread_create(p, nullptr, produce, (void*)bq);
    pthread_create(p + 1, nullptr, produce, (void*)bq);

    pthread_join(c[0], nullptr);
    pthread_join(c[1], nullptr);
    pthread_join(p[0], nullptr);
    pthread_join(p[1], nullptr);
    delete bq;

    return 0;
}

多生产者多消费者模型的意义就是让生产者并发地获取和制作任务,让消费者并发地完成消费任务。多生产者多消费者模型主要用于处理消费任务或者获取和制作任务比较耗时的场景。

锁的封装

// LockGuard.hpp
#pragma once

#include <iostream>
#include <pthread.h>

class Mutex
{
public:
    Mutex(pthread_mutex_t* pmtx)
        : _pmtx(pmtx)
    {}

    void lock()
    {
        std::cout << "进行加锁" << std::endl;
        pthread_mutex_lock(_pmtx);
    }

    void unlock()
    {
        std::cout << "进行解锁" << std::endl;
        pthread_mutex_unlock(_pmtx);
    }

    ~Mutex()
    {}

private:
    pthread_mutex_t* _pmtx;
};

// RAII的加锁方式
class LockGuard
{
public:
    LockGuard(pthread_mutex_t* pmtx)
        : _mtx(pmtx)
    {
        _mtx.lock();
    }

    ~LockGuard()
    {
        _mtx.unlock();
    }

private:
    Mutex _mtx;
};

// BlockQueue.hpp
#pragma once

#include <iostream>
#include <pthread.h>
#include <queue>
#include "LockGuard.hpp"

const int DefaultCap = 5;

template <class T>
class BlockQueue
{
private:
    bool isEmpty()
    {
        return _bq.empty();
    }

    bool isFull()
    {
        return _bq.size() == _capacity;
    }

public:
    BlockQueue(int capacity = DefaultCap)
        : _capacity(capacity)
    {
        pthread_mutex_init(&_mtx, nullptr);
        pthread_cond_init(&_empty, nullptr);
        pthread_cond_init(&_full, nullptr);
    }

    // 生产者
    void push(const T &in)
    {
        // 出了函数的作用域会自动解锁
        LockGuard lockGuard(&_mtx);
        while (isFull())
            pthread_cond_wait(&_full, &_mtx);

        _bq.push(in);
        pthread_cond_signal(&_empty);
    }

    // 消费者
    void pop(T *out)
    {
        // 出了函数的作用域会自动解锁
        LockGuard lockGuard(&_mtx);
        while (isEmpty())
            pthread_cond_wait(&_empty, &_mtx);

        *out = _bq.front();
        _bq.pop();
        pthread_cond_signal(&_full);
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mtx);
        pthread_cond_destroy(&_empty);
        pthread_cond_destroy(&_full);
    }

private:
    std::queue<T> _bq;     // 阻塞队列(STL中的容器并不是线程安全的,需要加锁进行保护)
    int _capacity;         // 容量上线
    pthread_mutex_t _mtx;  // 互斥锁保护队列的安全
    pthread_cond_t _empty; // 该条件变量表示bq是否为空
    pthread_cond_t _full;  // 该条件变量表示bq是否为满
};

👉总结👈

本篇博客主要讲解了为什么要使用生产者消费者模型、基于阻塞队列的生产者和消费者模型以及 RAII 的加锁方式等等。那么以上就是本篇博客的全部内容了,如果大家觉得有收获的话,可以点个三连支持一下!谢谢大家!💖💝❣️


http://www.kler.cn/a/2942.html

相关文章:

  • React 19 新特性探索:提升性能与开发者体验
  • python-leetcode-旋转链表
  • C++中常用的十大排序方法之1——冒泡排序
  • Java中的注解与反射:深入理解getAnnotation(Class<T> annotationClass)方法
  • 基于STM32的智能温控花盆设计
  • 使用Ollama本地部署DeepSeek R1
  • Qt音视频开发22-音频播放QAudioOutput
  • 【亲测搭建成功】模拟无网络情况下安装K8S集群和相关组件
  • 导航雷达回波信号格式和目标检测算法
  • 走进Vue【三】vue-router详解
  • java多线程之线程安全(重点,难点)
  • 什么是黄金现货市场
  • 【算法】前缀和
  • 解决win10任何程序打开链接仍然为老旧IE的顽固问题[修改默认浏览器]
  • 重构类关系-Pull Up Constructor Body构造函数本体上移三
  • 2023年通过CDGA的朋友可以考CDGP数据治理专家认证啦!
  • 传感器之相机介绍和使用
  • 游戏开发中常用的算法(持续更新)
  • 206. 反转链表
  • [数据结构]排序算法
  • Swift入门
  • 类和对象 - 下
  • 第二十一天 数据库开发-MySQL
  • 大文件上传
  • forward函数——浅学深度学习框架中的forward
  • CVPR 2023 | 旷视研究院入选论文亮点解读