当前位置: 首页 > article >正文

基于阻塞队列的生产者消费者模型

目录

生产者消费者模型

生产者消费者模型是什么?

生产者消费者模型优点

基于阻塞队列的生产者消费者模型


生产者消费者模型

前面我们学习了生产者多线程,以及多线程的控制,下面我们看一下多线程中最常见的一个应用——生产者消费者模型。

生产者消费者模型是什么?

生产者消费者模式是一个在编码中常用的设计,这样做不仅可以提高效率,还可以解耦!

那么我们介绍一下生产者消费者模型,我们使用生活中的例子:

现在有有一批顾客,他们想要买火腿肠,还有一批供应商是专门生产火腿肠的。

那么顾客一般去买火腿肠去哪里买呢?当然是去超市!

而供应商也因为生产的东西太多,不可能去专门零售,所以一般都是之间给超市供货。

而此时顾客就是消费者,供应商就是生产者,但是这里还出现了一个超市,而超市就是交易场所。

那么此时这三者之间有什么关系呢?

首先我们需要分析清楚里面有几种关系:

  1. 生产者——生产者

  2. 消费者——消费者

  3. 生产者——消费者

以上就有着三种关系,那么着三种关系之间有什么联系呢?

如果是两个生产者,那么我想要给这一家超市供货,你也想要着一家超市供货,那么此时这两家供货商就是竞争关系,那么在多线程这里叫什么呢?互斥!因为一次只能由一家供货商供货。

那么如果是两个消费者呢?假设现在超市就剩下一根火腿肠了,那么你也想要,我也想要,此时这根火腿只能由一个人买走,那么此时这两个消费者之间的关系就是竞争。

那么如果是生产者和消费者呢?如果供货商现在正在往超市送货,那么此时你可以进来吗?不可以,所以此时消费者和生产者就有竞争关系(互斥),那么假设现在超市没有火腿了,那么要怎么办呢?是不是需要生产者赶快生产火腿,是的!那么此时需要怎么办呢?需要消费者通知生产者!那么如果此时超市的火腿已经放满了呢?超市已经有火腿了,但是现在起顾客还不知道超市已经上货了,那么此时是不是需要生产者通知消费者超市的货物更新了。所以此时生产者和消费者是什么关系呢——同步!

所i总结一下他们之间的关系:

  1. 生产者——生产者 关系:互斥

  2. 消费者——消费者 关系:互斥

  3. 生产者——消费者 关系:同步与互斥

生产者消费者模型优点

上面说了什么是生产者消费者模型,那么下面看一下为什么要有生产者消费者模型以及他们的有优点。

生产者消费者模型的优点:

  1. 提高效率

  2. 解耦

为什么说生产者消费者模型可以提高效率呢?

还是以上面为例,当我们在生产商品的时候,假设现在顾客不需要了,但是生产者还可以生产吗?可以!因为可以生产出来放到超市,所以也就类似于有缓冲区,可以多存储一些火腿,那么假设现在顾客忽然要很多火腿那么如果没有超市之前的缓存的话,那么就可能满足不了顾客,所以这是提高效率的一个点!

还有就是,如果现在顾客要很多火腿,那么现在一家供应商已经满足不了了,那么是可以超市可以联系多家供应商一起供应火腿呢?是的所以此时生产火腿就是并发的生产,并不是只有一家在生产火腿,那么如果现在火腿超市已经块放满了,所以需要顾客消费火腿,那么此时一个顾客是消费不了这么多火腿的,那么此时是不是可以有很多顾客同时消费火腿,而消费火腿也是并发的,所以这是提高效率的一个打点。

上面还提到了解耦,那么如何做到解耦呢?

上面生产者生产的火腿首先放到了超市,所以生产者并不关心消费者,生产者只需要生产就好了,此时生产者是不知道消费者的存在的,那么此时消费者也是在超市购买火腿,他并不知道是生产者供应的,所以此时消费者也并不关心生产者,此时就实现了生产者和消费者之间的解耦。

基于阻塞队列的生产者消费者模型

下面我们就编写一个基于阻塞队列的生产者消费者模型!

说一下我们想要怎么样编写:

  1. 我想要一批生产者生产数据,然后放到阻塞队列里面。

  2. 然后又一批消费者消费数据,从阻塞队列里面拿出来,然后进行消费。

  3. 所以我们需要一个阻塞队列。

  4. 这个阻塞队列是有一定的大小,如果当数据放满了阻塞队列,那么就不能让生产者继续生产了。

  5. 如果当阻塞队列里面的没有数据了,那么就停止消费者对数据的消费,此时让生产者来生产。

  6. 当生产满了后,就需要通知消费者来消费。

下面先想一下我们的阻塞队列里面需要有那些数据:

  1. 首先一定需要有一个队列,用来存放数据,此时这个队列需要可以存放任意类型的数据,所以需要是模板。

  2. 由于生产者消费者都会访问阻塞队列里面的各种数据,所以我们需要一把锁,来控制各种关系之间的互斥。

  3. 由于生产者需要有空来存放数据,所以需要一个条件变量,来表示是否有该资源,如果没有就阻塞生产者。

  4. 消费者又要有数据来消费,所以消费者需要在一个有数据的条件下来消费,如果没有数据,就让消费者阻塞。

  5. 所以此时还需要两个条件变量,一个 empty 是为了给生产者是否有空的条件变量。

  6. 还有一个条件变量 full 表示给消费者的一个是否有数据的条件变量。

  7. 当然我说了,阻塞队列是有大小的,所以还需要一个值表示最大的容量。

template<class T>
class blockQueue
{
public:
    blockQueue(size_t capacity = 5)
        :_capacity(capacity)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_full, nullptr);
        pthread_cond_init(&_empty, nullptr);
    }
    
     ~blockQueue()
    {
        pthread_cond_destroy(&_empty);
        pthread_cond_destroy(&_full);
        pthread_mutex_destroy(&_mutex);// 释放锁
    }
private:
    queue<T> _bourse;         // 交易场所
    size_t _capacity;         // 最大容量
    pthread_mutex_t _mutex;   // 互斥锁
    pthread_cond_t _full;     // 是否有数据
    pthread_cond_t _empty;    // 是否为空
};

上面就是我们的阻塞队列的成员变量以及其构造和析构函数。

这里还是在稍微提一下构造和析构里面干了什么,这里不明白的可以看一下之前的线程的互斥与同步就知道了。

因为阻塞队列里面的锁以及条件变量不是全局的,所以需要调用 init 函数来初始化,并使用后需要 destroy 销毁,所以在构造的时候就需要调用 init 为 mutex 和 cond,析构函数里面需要调用 destroy 为 mutex 和 cond。

那么还阻塞队列还需要哪些函数呢?

由于生产者需要将数据放入到阻塞队列中,而消费者需要将数据消费,也就是拿出来,那么就当然需要一个 push 函数,和pop 函数。

下面先用一个函数说明,其实这两个函数的细节是一样的,所以了解了一个函数,那么另一个也就知道了。

下面我们想一下这两个函数需要如何设计:

  1. 当生产者想要将一个数据 push 到阻塞队列中,那么第一步并不是将数据放入到阻塞队列中,前面我们说了想访问资源,那么需要先检测资源是否存在,而对于生产者来说,资源就是空位置,许哦一需要先检测资源。

  2. 但是检测资源是否存在也是访问数据,那么访问临界资源就是要加锁的,所以首先应该加锁。

  3. 加锁后然后访问资源,那么如果资源满足的话,那么就可以继续向后执行,然后将数据 push 到阻塞队列中。

  4. 那么如果是资源不满足呢?如果不满足就什么都不做的话,那么由于生产者不知道什么时候有数据,所以需要一直循环检测,也就是一直申请锁,然后释放锁,所以如果资源不满足的话,就让生产者进行等待,然后等资源满足后唤醒需生产者,那么什么时候资源满足呢?当有空位置的时候,那么就是资源满足,所以当消费者消费数据后,那么就资源满足了,所以当消费者消费数据后,那么就可以唤醒生产者。

  5. 那么如果当生产者生产数据就,那么就可以释放锁了,那么释放锁后呢?还要做什么吗?当然还需要,假设之前没有数据,然后消费者就会进行消费,但是由于没有数据,所以消费者会进程阻塞,等待资源到来,对于消费者而言,数据就是生产者生产的数据,所以此时生产者生产数据后还需要做的一件事就是唤醒消费者。

  6. 那么如果是对于消费者而言呢?消费者就需要消费数据,如果没有数据就进行阻塞,等待数据到来,所以消费者和生产者也是相同的。

下面看一下代码就理解了:

// 将生产的数据放入阻塞队列
    void push(const T &goods)
    {
        // 1. 检查是否可以放入数据,但是需要访问临界资源,需要先加锁
        pthread_mutex_lock(&_mutex);
        // 2. 访问队列是否为满,如果为满的话,那么便生产不了数据,需要到空的条件下等待,只要有空了,那么便可以继续
        while (_bourse.size() >= _capacity)
            pthread_cond_wait(&_empty, &_mutex);
        // 3. 将商品放入阻塞队列,等待消费者消费
        _bourse.push(goods);
        // cout << "生产者[" << pthread_self() << "]生产了一个数据: " << goods << endl;
        // 4. 放入后,说明现在有商品了,那么便可以唤醒在 full 条件下等待的线程
        pthread_cond_signal(&_full);
        // 5. 唤醒后解锁
        pthread_mutex_unlock(&_mutex);
    }
​
    void pop(T &goods)
    {
        // 1. 检查是否可以消费数据(阻塞队列里面有没有商品),但是需要先加锁
        pthread_mutex_lock(&_mutex);
        // 2. 判断是否可以消费
        while (_bourse.empty())
            pthread_cond_wait(&_full, &_mutex);
        // 3. 消费商品
        goods = _bourse.front();
        _bourse.pop();
        // cout << "消费者[" << pthread_self() << "]消费了一个数据: " << goods << endl;
        // 4. 消费商品后,有空余位置,就可以生成商品,那么唤醒生产者
        pthread_cond_signal(&_empty);
        // 5. 解锁
        pthread_mutex_unlock(&_mutex);
    }

这就是阻塞队列,那么我们是需要基于阻塞队列实现的生产者消费者模型,所以我们还需要有生产者和消费者,所以下面我们看一下 main 函数怎么写:

下面我们需要创建线程,创建线程后,我们就让主线程进行 join 就可以了:

#define THREAD_NUM 1
​
// 交易场所
blockQueue<task> market(10);
const char* oper = "+-*/%";
​
// 一直生产数据
void *product(void *args)
{
    cout << "一个生产者被创建[" << pthread_self() << "]" << endl;
    int s = strlen(oper);
    // cout << "s: " << s << endl;
    while (true)
    {
        int x = rand() % 100;
        int y = rand() % 3;
        char op = oper[rand() % s];
        market.push({x, y, op});
        // cout << "生产者[" << pthread_self() << "]生产了一个数据: " << goods << endl;
        cout << "生产一个数据:" << x << op << y << "=?" << endl;
        sleep(1);
    }
}
​
// 一直消费数据
void *consum(void *consum)
{
    cout << "一个消费者被创建[" << pthread_self() << "]" << endl;
    while (true)
    {
        task date;
        market.pop(date);
        // cout << "消费者[" << pthread_self() << "]消费了一个数据: " << goods << endl;
        if(date.get_code()) cout << date.get_x() << date.get_op() << date.get_y() << " 计算出错" << endl;
        else cout << date.get_x() << date.get_op() << date.get_y() << "=" << date() << " code: " << date.get_code() << endl;
        // sleep(1);
    }
}
​
void pthreadJoin(vector<pthread_t> &threads)
{
    for (pthread_t tid : threads)
    {
        pthread_join(tid, nullptr); // 忽略返回值
        cout << "tid: " << tid << " 等待成功" << endl;
    }
}
​
void creatCPThread(vector<pthread_t> &threads, size_t c_num, size_t p_num)
{
    threads.resize(c_num + p_num);
    // 创建生产者
    for (int i = 0; i < p_num; ++i)
        pthread_create(&(threads[i]), nullptr, product, nullptr);
    // 创建消费者
    for (int i = 0; i < c_num; ++i)
        pthread_create(&(threads[i + p_num]), nullptr, consum, nullptr);
}
​
int main()
{
    // 数据来源
    srand(time(0));
​
    // 创建线程
    vector<pthread_t> threads;
    creatCPThread(threads, THREAD_NUM, THREAD_NUM);
    // join
    pthreadJoin(threads);
    
    return 0;
}

这里我们使用 vector 来保存线程的 id ,方便和后面 join 线程。

这里创建 THTREAD_NUM 个生产者和消费者线程,然后分别让生产者执行 product 函数,让消费者执行 consum 函数。

然后让主线程就 join 其他线程即可。

解释 pthread_cond_wait 为什么需要 mutex!

通过这一次的代码,以及代码中的注释。

当生产者发现没有空位置的时候,是需要阻塞的,但是此时阻塞的位置是在哪里?

在临界区里面,那么此时他是抱着锁阻塞的,那么如果他就直接阻塞的话,那么此时他抱着锁,那么会发生什么?死锁,但是我们发现阻塞后米有发生死锁,那么说明了什么,在阻塞前,需要先解锁。所以 wait 函数里面一定帮我们进行了解锁,所以没有锁,那么既然需要解锁,那么需要什么变量,一定需要锁变量,所以 wait 是需要锁的。

那么在唤醒的时候是在哪里呢?在哪里阻塞就在那里唤醒,所以在 wait 唤醒,那么此时刚被唤醒是没有锁的,那么此时唤醒的位置是什么地方?还是临界区,那么此时如果唤醒在临界区里面,如果没有锁可以吗?不可以,所以在唤醒后,一定需要竞争锁,所以唤醒后还是需要竞争锁的。

还有一个问题,为什么在检测资源是否满足的时候使用 while 循环判断呢?

因为如果资源不满足的话,是需要wait的,wait是一个函数,既然是函数,那么就有可能调用失败,那么如果调用失败的话,如果底层使用的是数组迷你队列,那么此时就可能发生越界等等问题,如果使用的是 stl 那么最大容量可能就超了,也可能会直接奔溃,所以此时是有危险的,所以有可能唤醒是伪唤醒,所以这里需要循环判断,然唤醒的位置,继续在判断一次,如果满足条件的话,那么就去执行后面的代码!


http://www.kler.cn/a/147288.html

相关文章:

  • 【动手学深度学习Pytorch】1. 线性回归代码
  • AWTK-WIDGET-WEB-VIEW 实现笔记 (1) - 难点
  • 传奇996_24——变量lua
  • Python 使用Django进行单元测试unittest
  • git本地分支推送到远程和远程pull到本地
  • Amazon Web Services (AWS)
  • C语言:选择法对十个整数排序
  • 爬取极简壁纸
  • css实现鼠标移入背景图片变灰并浮现文字的效果
  • linux 命令 sudo、su 命令
  • 小H喜欢睡觉(C语言实现)
  • 多传感器融合SLAM调研
  • 前端项目部署自动检测更新后通知用户刷新页面(前端实现,技术框架vue、js、webpack)——方案一:编译项目时动态生成一个记录版本号的文件
  • C#——多线程之异步调用容易出现的问题
  • Go语言初始化已有环境,跟踪已有依赖环境
  • Android设计模式--桥接模式
  • 数据可视化:在Jupyter中使用Matplotlib绘制常用图表
  • Echarts大屏可视化_02 球体模块制作
  • kafka的详细安装部署
  • Vue路由跳转页面刷新
  • 87基于matlab的双卡尔曼滤波算法
  • java游戏制作-王者荣耀游戏
  • linux环境下samba服务器的配置
  • MYSQL 排序和分组怎么做?
  • ChatGLM2-6B微调过程说明文档
  • C语言基础篇5:指针(二)