23.生产者消费者模型
一、生产者消费者模型介绍
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而通过容器来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给容器,消费者不找生产者要数据,而是直接从容器里取,容器就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个容器就是用来给生产者和消费者解耦的。
优点:
1.该模式从一定角度上提高了效率
2.正是有着仓库的存在,使得生产和消费进行了解耦
3.支持忙闲不均
二、生产者消费者模型的特点
我们想要研究明白生产者消费者模型,就需要研究清楚多个生产者和多个消费者之间的同步互斥关系。
• 三种关系: 生产者和生产者(互斥关系)、消费者和消费者(互斥关系)、生产者和消费者(互斥关系、同步关系)
• 两种角色: 生产者和消费者(通常由进程或线程承担)
• 一个交易场所: 通常指的是某种结构或组织的一段内存区域,且这段区域是共享或临界资源
生产者和生产者、消费者和消费者、生产者和消费者,它们之间为什么会存在互斥关系?
介于生产者和消费者之间的容器可能会被多个执行流同时访问,因此需要将该临界资源用互斥锁保护起来。所以所有生产者和消费者都会竞争式的申请锁,因此生产者和生产者、消费者和消费者、生产者和消费者之间都存在互斥关系。
生产者和消费者之间为什么会存在同步关系?
若一直让生产者生产,那么当生产者生产的数据装满容器后,生产者再生产数据就会生产失败。反之,让消费者一直消费,那么当容器当中的数据被消费完后,消费者再进行消费就会消费失败。虽然这样不会造成任何数据不一致的问题,但是这样会引起另一方的饥饿问题,是非常低效的。应该让生产者和消费者访问该容器时具有一定的顺序性,比如让生产者先生产,然后再让消费者进行消费。
注意: 互斥关系保证的是数据的正确性,而同步关系是为了让多线程之间协同起来
三、基于BlockQueue的生产者消费者模型
在多线程编程中,阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于:当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出。
//BlockQueue.hpp
#pragma once
#include <queue>
#include "mutex.hpp"
#include "cond.hpp"
namespace BlockQueueModual
{
using namespace std;
using namespace MutexModel;
using namespace CondModul;
template <typename T>
class BlockQueue
{
bool IsFull(){return q.size() == _cap;}
bool IsEmpty(){return q.empty();}
public:
BlockQueue(int cap = 10) : _cap(cap)
{
}
void Push(T& data)
{
LockGuard lock(_mutex);
while (IsFull())
{
cout << "生产者进入等待……" << endl;
_pwait_num++;
_productor_cond.Wait(_mutex);
_pwait_num--;
cout << "生产者被唤醒……" << endl;
}
q.push(data);
if (_cwait_num)
{
cout << "叫醒消费者……" << endl;
_consumer_cond.Notify();
}
}
void Pop(T *out)
{
LockGuard lock(_mutex);
while (IsEmpty())
{
cout << "消费者进入等待……" << endl;
_cwait_num++;
_consumer_cond.Wait(_mutex);
_cwait_num--;
cout << "消费者被唤醒……" << endl;
}
*out = q.front();
q.pop();
if (_pwait_num)
{
cout << "叫醒生产者……" << endl;
_productor_cond.Notify();
}
}
~BlockQueue()
{
}
private:
queue<T> q;
int _cap;
Mutex _mutex;
Cond _productor_cond;
Cond _consumer_cond;
int _cwait_num=0;
int _pwait_num=0;
};
}
//BlockQueue.cc
#include <iostream>
#include <unistd.h>
#include "BlockQueue.hpp"
using namespace std;
using namespace BlockQueueModual;
void *Customer(void *args)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
while (true)
{
int data = 0;
bq->Pop(&data);
printf("Custormer get a data %d\n",data);
sleep(1);
}
return nullptr;
}
void *Producer(void *args)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
int data = 10;
while (true)
{
bq->Push(data);
data++;
printf("Producer produce a data %d\n",data);
}
return nullptr;
}
int main()
{
BlockQueue<int> *bq = new BlockQueue<int>(10);
pthread_t c1,c2,c3, p1,p2;
pthread_create(&c1, nullptr, Customer, bq);
pthread_create(&c2, nullptr, Customer, bq);
pthread_create(&c3, nullptr, Customer, bq);
pthread_create(&p1, nullptr, Producer, bq);
pthread_create(&p2, nullptr, Producer, bq);
pthread_join(c1, nullptr);
pthread_join(p1, nullptr);
delete bq;
return 0;
}
四、基于RingQueue的生产者消费者模型
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态。而我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程。
//RingQueue.hpp
#pragma once
#include <vector>
#include "mutex.hpp"
#include "sem.hpp"
namespace RingQueueModual
{
using namespace std;
using namespace MutexModel;
using namespace SemModual;
template <typename T>
class RingQueue
{
public:
RingQueue(int cap = 10) : _v(cap), _cap(cap), _data_sem(0), _block_sem(cap)
{
}
void Push(T &data)
{
_data_sem.P();
{
LockGuard lock(_p_mutex);
_v[_p_step++] = data;
_p_step %= _cap;
_block_sem.V();
}
}
void Pop(T *out)
{
_block_sem.P();
{
LockGuard lock(_c_mutex);
*out = _v[_c_step++];
_c_step %= _cap;
_data_sem.V();
}
}
~RingQueue()
{
}
private:
vector<T> _v;
int _cap;
int _c_step;
int _p_step;
Sem _data_sem;
Sem _block_sem;
Mutex _p_mutex;
Mutex _c_mutex;
};
}
//RingQueue.cc
#include <iostream>
#include <unistd.h>
#include "RingQueue.hpp"
using namespace std;
using namespace RingQueueModual;
void *Customer(void *args)
{
RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);
while (true)
{
int data = 0;
rq->Pop(&data);
printf("Custormer get a data %d\n",data);
}
return nullptr;
}
void *Producer(void *args)
{
RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);
int data = 10;
while (true)
{
rq->Push(data);
data++;
printf("Producer produce a data %d\n",data);
sleep(1);
}
return nullptr;
}
int main()
{
RingQueue<int> *rq = new RingQueue<int>(10);
pthread_t c1,p1;
pthread_create(&c1, nullptr, Customer, rq);
pthread_create(&p1, nullptr, Producer, rq);
pthread_join(c1, nullptr);
pthread_join(p1, nullptr);
delete rq;
return 0;
}