Linux实现生产者消费者模型
目录
概念及优势
代码实现
概念及优势
生产者消费者模型是一种用于线程同步的模型,在这个模型中有两种角色,生产者生产数据,消费者消费数据。有三种关系,生产者与生产者,消费者与消费者,生产者与消费者。还有一个交易场所。
超市就是生活中最常见的生产者消费者模型,工厂生产商品,超市充当缓冲区,消费者去超市消费同时取走超市中的商品。
超市作为缓冲区,起到了很重要的作用,试想如果没有超市,那消费者想购物只能去找工厂,还要等待工厂将商品生产出来,同时工厂也不能持续生产商品,因为没有足够的空间存放,而超市作为缓冲区解决了这些问题,消费者不用去工厂阻塞等待商品生产,工厂也可以一直生产,只要超市还能放得下。
从上面的例子可以看出生产者消费者模型的优点,将生产者和消费者解耦,支持并发,支持忙闲不均。
再来讨论一下这三者间的关系
消费者与消费者不能同时向交易场所写数据,生产者与生产者也不能同时向交易场所拿数据,这两者都是互斥且竞争的关系。而生产者不能与消费者同时拿或写数据,这两者属于互斥且同步的关系。
交易场所的问题,交易场所为空时消费者不能消费,交易场所满时生产者不能生产。
代码实现
基于阻塞队列来实现生产者消费者模型,阻塞队列与普通队列的区别在于,阻塞队列在队列为空是pop操作会阻塞,队列为满时,push操作会阻塞。
//block_queue.hpp
#include <queue>
#include <pthread.h>
namespace my
{
template<class T>
class block_queue
{
public:
block_queue(size_t max_size = 1000)
:_max_size(max_size)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_consumer_cond, nullptr);
pthread_cond_init(&_producer_cond, nullptr);
}
bool empty() const
{
return _b_queue.empty();
}
bool full() const
{
return _b_queue.size() == _max_size;
}
//生产者
void push(const T& data)
{
pthread_mutex_lock(&_mutex); //加锁
//循环判断以防止虚假唤醒
//如果用if判断,试想这种场景
//线程a在等待,然后被唤醒了,既然被唤醒说明此时应该是不为满
//但是被唤醒后还需要竞争锁(因为等待会释放锁),只有线程a再次持有锁
//才会醒来执行后面的代码
//但是线程a有可能并没有立马拿到锁,被另一个生产者线程b拿到锁
//并且b此时判断也是不为满
//那么b会向后执行,会向队列里push数据
//这就有问题了,在这之后线程a拿到了锁,向后执行,但现在已经是满队列了
while(full())
{
//队列满了,生产者等待
//等待必须解锁,否则其他线程都拿不到锁
//条件变量等待的条件一定属于临界资源,
//所以pthread_cond_wait一定属于临界区代码
//所以pthread_cond_wait这个函数在内部一定会解锁来防止死锁
//被唤醒后会竞争锁
pthread_cond_wait(&_producer_cond, &_mutex);
}
_b_queue.push(data); //入队
std::cout << "生产数据" << data << std::endl;
pthread_mutex_unlock(&_mutex); //解锁
pthread_cond_signal(&_consumer_cond); //唤醒消费者
std::cout << "唤醒消费者" << std::endl;
}
const T& pop()
{
pthread_mutex_lock(&_mutex); //加锁
//循环判断以防止虚假唤醒
while(empty())
{
pthread_cond_wait(&_consumer_cond, &_mutex);
}
T& ret = _b_queue.front();
std::cout << "消费数据" << ret << std::endl;
_b_queue.pop(); //出队
pthread_mutex_unlock(&_mutex); //解锁
pthread_cond_signal(&_producer_cond); //唤醒生产者
std::cout << "唤醒生产者" << std::endl;
return ret;
}
~block_queue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_consumer_cond);
pthread_cond_destroy(&_producer_cond);
}
size_t size() const
{
return _b_queue.size();
}
private:
std::queue<T> _b_queue; //底层容器
size_t _max_size; //队列最大容量
pthread_mutex_t _mutex; //给队列加锁
pthread_cond_t _consumer_cond; //消费者条件变量
pthread_cond_t _producer_cond; //生产者条件变量
};
}
main函数
#include <iostream>
#include <memory>
#include <unistd.h>
#include "block_queue.hpp"
using data = int;
void* producer(void* _bq)
{
my::block_queue<data>& bq = *(my::block_queue<data>*)_bq;
data val = 0;
while (true)
{
//使得生产者总是慢于消费者,这样队列一直都只会有一个元素
//sleep(2);
bq.push(val);
val++;
}
}
void* consumer(void* _bq)
{
my::block_queue<data>& bq = *(my::block_queue<data>*)_bq;
data val = 0;
while (true)
{
val = bq.pop();
}
}
void test()
{
pthread_t tid_consumer1 = 0, tid_producer1 = 0;
pthread_t tid_consumer2 = 1, tid_producer2 = 0;
pthread_t tid_consumer3 = 2, tid_producer3 = 0;
std::shared_ptr<my::block_queue<int>> bq(std::make_shared<my::block_queue<int>>(5));
pthread_create(&tid_producer1, nullptr, producer, &(*bq));
pthread_create(&tid_producer2, nullptr, producer, &(*bq));
pthread_create(&tid_producer3, nullptr, producer, &(*bq));
pthread_create(&tid_consumer1, nullptr, consumer, &(*bq));
pthread_create(&tid_consumer2, nullptr, consumer, &(*bq));
pthread_create(&tid_consumer3, nullptr, consumer, &(*bq));
pthread_join(tid_producer1, nullptr);
pthread_join(tid_producer2, nullptr);
pthread_join(tid_producer3, nullptr);
pthread_join(tid_consumer1, nullptr);
pthread_join(tid_consumer2, nullptr);
pthread_join(tid_consumer3, nullptr);
}
int main()
{
test();
return 0;
}