Linux:生产者消费者模型
一、普通生产者消费者模型
1.1 什么是生产者消费者模型
现实生活中,我们也会有像生物世界的生产者和消费者的概念,但是我们的消费者在大多数情况下并不和生产者直接联系,就比如说食物,不能说我今天去找供货商要十个面包,然后我还得在那等他把十个面包生产完了再走,虽然这对于生产者来说有多少需求就供应多少节约了成本,但是对于消费者来说却浪费了很多时间,我们作为消费者肯定希望我们去买的时候就能够买到,因此这个时候我们需要一个中间场所——超市,供应商可以一次性先生产一部分面包,然后把他摆到超市的货架上,这样消费者来拿的时候就可以直接拿了! 而当货架上快空了的时候,超市可以告知消费者先等一段时间再过来,然后通知生产者尽快生产,而货架满的时候,超市可以通知生产者先不要生产,而是让消费者尽量来消费。所以我们会发现这个中间场所让我们的交互变得更加理性化,我们生产者和消费者并不关心对方,而又超市这个中间场所来平衡两边的能力,这就是生产者消费者模型!!
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合(互相干扰)问题。生产者和消费者彼此之间不直接通讯,而 通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给(1)生产者和消费者进行一定程度解耦的 (2)支持忙闲不均
基于这段共享内存,他就会存在并发问题,因此可能会有以下三种关系
生产者vs生产者:因为空间有限,所以生产者和生产者是竞争关系,需要互斥!
消费者vs消费者:因为资源有限(比如世界末日),所以消费者和消费者是竞争关系,需要互斥!
生产者vs消费者: 因为有可能会出现生产者边生产,消费者边拿的情况,所以拿到货物必须要保证货物先被生产完毕,所以需要互斥!而超市可以被生产者和消费者都访问到,因此当我没有资源的时候,我并不希望消费者一直来访问我,当我资源特别多的时候,我希望生产者不要一直来访问我,因此我们希望消费者和生产者按照一定的顺序去访问资源,所以需要有同步!(要在互斥保证线程安全的前提下)!
“321”原则:3个关系,2个角色,1个特定结构的内存空间
计算机场景1:cpu/内存/外设
计算机场景2:main/参数池/add
比如以前我们在main函数里调用add函数,我们把参数传给他之后当add函数在执行的时候,main函数会在那里等,这是单执行流。
可是这次如果我们将main函数和add分为两个线程,add要执行的时候必然要保证main给他传递参数,所以此时他们就相当于一个生产者消费者模型,我们可以将参数统一放在一个空间里,让add去取,然后通过互斥和同步来实现解耦,使他具有并发性。
问题:可是我生产者生产资源后放在空间里,再让消费者来拿,解耦是做到了,可是他还需要加锁和解锁,所以他的高效性究竟体现在哪里呢?
——>
生产者的数据从哪里来??——>用户/网络等!!
所以生产者生产所需要的数据是需要花时间获取的!!!
所以第一步是获取数据,第二步才是生产数据到队列中!!
消费者对数据做加工也要花时间!!
所以第一步是消费数据,第二步是加工处理数据!!
因此高效性体现在一方访问临界资源,另一方访问非临界资源,此时两个线程并发访问!!
1.2 快速实现cp
假设我们当前想要实现一方通过传递两个参数,另一方接受参数并进行计算。
类的设计:
#pragma once
#include <iostream>
#include <string>
std::string opers="+-*/%";
enum{
DivZero=1,
ModZero,
Unknown
};
class Task
{
public:
Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0)
{
}
void run()
{
switch (oper_)
{
case '+':
result_ = data1_ + data2_;
break;
case '-':
result_ = data1_ - data2_;
break;
case '*':
result_ = data1_ * data2_;
break;
case '/':
{
if(data2_ == 0) exitcode_ = DivZero;
else result_ = data1_ / data2_;
}
break;
case '%':
{
if(data2_ == 0) exitcode_ = ModZero;
else result_ = data1_ % data2_;
} break;
default:
exitcode_ = Unknown;
break;
}
}
void operator ()()
{
run();
}
std::string GetResult()
{
std::string r = std::to_string(data1_);
r += oper_;
r += std::to_string(data2_);
r += "=";
r += std::to_string(result_);
r += "[code: ";
r += std::to_string(exitcode_);
r += "]";
return r;
}
std::string GetTask()
{
std::string r = std::to_string(data1_);
r += oper_;
r += std::to_string(data2_);
r += "=?";
return r;
}
~Task()
{
}
private:
int data1_;
int data2_;
char oper_;
int result_;
int exitcode_;
};
问题1:为什么要用类对象传参数,而不是直接定义几个变量去传参数呢??
——> 传类对象的好处(1)内部可以直接放很多我们所需要的内置类型,还可以把处理结果也放进去 (2)可以顺便把我们的执行任务写到类方法里 (3)可扩展性高,未来想加参数或者方法可以直接去类里面加!!
阻塞队列实现:
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
template <class T>
class BlockQueue
{
static const int defalutnum = 20;
public:
BlockQueue(int maxcap = defalutnum):maxcap_(maxcap)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&c_cond_, nullptr);
pthread_cond_init(&p_cond_, nullptr);
// low_water_ = maxcap_/3;
// high_water_ = (maxcap_*2)/3;
}
// 谁来唤醒呢?
T pop()
{
pthread_mutex_lock(&mutex_);
while(q_.size() == 0) // 因为判断临界资源调试是否满足,也是在访问临界资源!判断资源是否就绪,是通过再临界资源内部判断的。
{
// 如果线程wait时,被误唤醒了呢??
pthread_cond_wait(&c_cond_, &mutex_); // 你是持有锁的!!1. 调用的时候,自动释放锁,因为唤醒而返回的时候,重新持有锁
}
T out = q_.front(); // 你想消费,就直接能消费吗?不一定。你得先确保消费条件满足
q_.pop();
// if(q_.size()<low_water_) pthread_cond_signal(&p_cond_);
pthread_cond_signal(&p_cond_); // pthread_cond_broadcast
pthread_mutex_unlock(&mutex_);
return out;
}
void push(const T &in)
{
pthread_mutex_lock(&mutex_);
while(q_.size() == maxcap_){ // 做到防止线程被伪唤醒的情况
// 伪唤醒情况
pthread_cond_wait(&p_cond_, &mutex_); //1. 调用的时候,自动释放锁 2.?
}
// 1. 队列没满 2.被唤醒
q_.push(in); // 你想生产,就直接能生产吗?不一定。你得先确保生产条件满足
// if(q_.size() > high_water_) pthread_cond_signal(&c_cond_);
pthread_cond_signal(&c_cond_);
pthread_mutex_unlock(&mutex_);
}
~BlockQueue()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&c_cond_);
pthread_cond_destroy(&p_cond_);
}
private:
std::queue<T> q_; // 共享资源, q被当做整体使用的,q只有一份,加锁。但是共享资源也可以被看做多份!
//int mincap_;
int maxcap_; // 极值
pthread_mutex_t mutex_;
pthread_cond_t c_cond_;
pthread_cond_t p_cond_;
// int low_water_;
// int high_water_;
};
需要有两个条件变量,一个条件变量用来提醒生产者放数据,另一个用来体系消费者拿数据
问题2:为什么加锁之后要加个判断呢??
——>为了判断临界资源是否就绪!!所以要访问一下临界资源的数量
问题3:可是我们难道一有数据就唤醒消费者,一没数据就唤醒生产者吗??我可不可以就是当生产达到一定数量的时候在唤醒消费者,或者是消费者消费到一定数量的时候再唤醒生产者,这样是不是就减少了唤醒的次数??
——>设置一个low_water和high_water 低于low的时候才唤醒生产者,当高于high的时候才唤醒消费者,而在中间的时候就正常去并发,不去刻意唤醒!
执行:
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <unistd.h>
#include <ctime>
void *Consumer(void *args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
while (true)
{
// 消费
Task t = bq->pop();
// 计算
// t.run();
t();
std::cout << "处理任务: " << t.GetTask() << " 运算结果是: " << t.GetResult() << " thread id: " << pthread_self() << std::endl;
// t.run();
// sleep(1);
}
}
void *Productor(void *args)
{
int len = opers.size();
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
int x = 10;
int y = 20;
while (true)
{
// 模拟生产者生产数据
int data1 = rand() % 10 + 1; // [1,10]
usleep(10);
int data2 = rand() % 10;
char op = opers[rand() % len];
Task t(data1, data2, op);
// 生产
bq->push(t);
std::cout << "生产了一个任务: " << t.GetTask() << " thread id: " << pthread_self() << std::endl;
sleep(1);
}
}
int main()
{
srand(time(nullptr));
// 因为 321 原则
// BlockQueue 内部可不可以传递其他数据,比如对象?比如任务???
BlockQueue<Task> *bq = new BlockQueue<Task>();
pthread_t c[3], p[5];
for (int i = 0; i < 3; i++)
{
pthread_create(c + i, nullptr, Consumer, bq);
}
for (int i = 0; i < 5; i++)
{
pthread_create(p + i, nullptr, Productor, bq);
}
for (int i = 0; i < 3; i++)
{
pthread_join(c[i], nullptr);
}
for (int i = 0; i < 5; i++)
{
pthread_join(p[i], nullptr);
}
delete bq;
return 0;
}
问题4:多生产者消费者的时候,为什么我们只要加个for循环就可以做到了?
——>因为321原则,而无论有多少个线程,大家用的都是同一把锁(互斥),同样的两个条件变量(同步)
1.3 伪唤醒问题
比如说当前有3个生产者在阻塞状态,但是你同时把3个都唤醒了,唤醒之后必须持有锁,所以3个开始竞争,其中一个竞争成功并产生了资源,但是此时恰好队列已经满了,在他释放锁在的时候,另外两个被唤醒的资源并不在阻塞队列中,所以他们并不知道队列已经满了,于是产生了问题!!
——>所以我们的解决方法就是对于临界资源的访问要循环去访问!!这样的话那俩线程如果发现条件不满足他会继续进入休眠状态!
二、环形队列的生产消费模型
2.1 信号量铺垫
信号量的本质就是一把保证PV操作的计数器,而这个计数器就是用来描述临界资源的资源数目的
所以我们需要将资源是否就绪放在临界区之外,所以申请信号量的时候,其实就间接在做判断了!
信号量保证不让多余的人进来,而让进来的人到自己的位置上就是我们编码要解决的!
2.2 基于环形队列的生产消费模型
环形队列采用数组模拟,用模运算来模拟环状特性
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来 判断满或者空。另外也可以预留一个空的位置,作为满的状态
但是我们有信号量,我们就可以用信号量来充当这个计数器!而P和C关注的资源不一样,因此我们需要定义两把信号量spacesem(N)表示剩余空间,datasem(0)表示剩余数据!
而他们的操作如下图 ,当P占了一个空间,也以为着C多了一个资源 而C消费了一个资源 也意味着P多了一个空间 ,此时是并发访问的
而当空间没的时候,P占不了了就会被挂起,资源没的时候,C消费不了也会被挂起!
问题1:我们两个什么时候才会指向同一个位置呢??
——>空或者满,而不空或者不满的时候,我们一定只想不同的位置,我们可以同时访问!!
规则总结:
(1)指向同一个位置的时候只能有一个人访问
空:生产者
满:消费者
(2)消费者不能超过生产者
(3)生产者不能把消费者套一个圈
——>所以他们申请自己的资源,释放对方的资源,就友好地满足了一个追逐游戏必须具备的三个条件!!
问题2 :环形队列有什么特点?
——> 用两个信号量来分别充当资源的计数器,申请信号量如果成功就说明资源必然就绪了!(对资源预定成功)但是具体哪个线程去访问哪块资源,是由我们编码决定的 (就相当于我们去电影院能买到票,那么我们就可以进到电影院,但是具体要坐哪个位置,是需要我们去控制的)
(1)当为空或为满时,生产者和消费者只能有一个在执行,体现了局部的互斥性
(2)当为空时必须生产者先执行,为满时必须消费者先执行,体现了局部的同步性
(3)当不为空或者不为满时,生产者和消费者可以同时并发访问临界资源,体现了并发的高效性
(4)生产者和生产者之间以及消费者与消费者之间会竞争下标资源,需要互斥加锁,且锁要加在信号量的后面,因为信号量是原子的,不需要保护。
2.3 POSIX信号量接口介绍
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步(更加简易)。
1,初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值
2,销毁信号量
int sem_destroy(sem_t *sem);
3,等待信号量
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()
4,发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()
2.4 快速实现
环形队列的设计
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
#include <pthread.h>
const static int defaultcap = 5;
template<class T>
class RingQueue{
private:
void P(sem_t &sem)
{
sem_wait(&sem);
}
void V(sem_t &sem)
{
sem_post(&sem);
}
void Lock(pthread_mutex_t &mutex)
{
pthread_mutex_lock(&mutex);
}
void Unlock(pthread_mutex_t &mutex)
{
pthread_mutex_unlock(&mutex);
}
public:
RingQueue(int cap = defaultcap)
:ringqueue_(cap), cap_(cap), c_step_(0), p_step_(0)
{
sem_init(&cdata_sem_, 0, 0);
sem_init(&pspace_sem_, 0, cap);
pthread_mutex_init(&c_mutex_, nullptr);
pthread_mutex_init(&p_mutex_, nullptr);
}
void Push(const T &in) // 生产
{
P(pspace_sem_);
Lock(p_mutex_); // ?
ringqueue_[p_step_] = in;
// 位置后移,维持环形特性
p_step_++;
p_step_ %= cap_;
Unlock(p_mutex_);
V(cdata_sem_);
}
void Pop(T *out) // 消费
{
P(cdata_sem_);
Lock(c_mutex_); // ?
*out = ringqueue_[c_step_];
// 位置后移,维持环形特性
c_step_++;
c_step_ %= cap_;
Unlock(c_mutex_);
V(pspace_sem_);
}
~RingQueue()
{
sem_destroy(&cdata_sem_);
sem_destroy(&pspace_sem_);
pthread_mutex_destroy(&c_mutex_);
pthread_mutex_destroy(&p_mutex_);
}
private:
std::vector<T> ringqueue_;
int cap_;
int c_step_; // 消费者下标
int p_step_; // 生产者下标
sem_t cdata_sem_; // 消费者关注的数据资源
sem_t pspace_sem_; // 生产者关注的空间资源
pthread_mutex_t c_mutex_;
pthread_mutex_t p_mutex_;
};
问题1:加锁应该在申请信号量的前面还是申请信号量的后面??
——>应该在申请信号量的后面(2方案),从技术角度,信号量的操作是原子的所以并不需要被保护。从效率角度,2相比1来说,当一个线程抢到锁的时候,其他线程与其在这里干等,还不如先去把信号量资源给申请了!!
任务文件:
#pragma once
#include <iostream>
#include <string>
std::string opers="+-*/%";
enum{
DivZero=1,
ModZero,
Unknown
};
class Task
{
public:
Task()
{}
Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0)
{
}
void run()
{
switch (oper_)
{
case '+':
result_ = data1_ + data2_;
break;
case '-':
result_ = data1_ - data2_;
break;
case '*':
result_ = data1_ * data2_;
break;
case '/':
{
if(data2_ == 0) exitcode_ = DivZero;
else result_ = data1_ / data2_;
}
break;
case '%':
{
if(data2_ == 0) exitcode_ = ModZero;
else result_ = data1_ % data2_;
} break;
default:
exitcode_ = Unknown;
break;
}
}
void operator ()()
{
run();
}
std::string GetResult()
{
std::string r = std::to_string(data1_);
r += oper_;
r += std::to_string(data2_);
r += "=";
r += std::to_string(result_);
r += "[code: ";
r += std::to_string(exitcode_);
r += "]";
return r;
}
std::string GetTask()
{
std::string r = std::to_string(data1_);
r += oper_;
r += std::to_string(data2_);
r += "=?";
return r;
}
~Task()
{
}
private:
int data1_;
int data2_;
char oper_;
int result_;
int exitcode_;
};
主函数:
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
#include "RingQueue.hpp"
#include "Task.hpp"
using namespace std;
struct ThreadData
{
RingQueue<Task> *rq;
std::string threadname;
};
void *Productor(void *args)
{
// sleep(3);
ThreadData *td = static_cast<ThreadData*>(args);
RingQueue<Task> *rq = td->rq;
std::string name = td->threadname;
int len = opers.size();
while (true)
{
// 1. 获取数据
int data1 = rand() % 10 + 1;
usleep(10);
int data2 = rand() % 10;
char op = opers[rand() % len];
Task t(data1, data2, op);
// 2. 生产数据
rq->Push(t);
cout << "Productor task done, task is : " << t.GetTask() << " who: " << name << endl;
sleep(1);
}
return nullptr;
}
void *Consumer(void *args)
{
ThreadData *td = static_cast<ThreadData*>(args);
RingQueue<Task> *rq = td->rq;
std::string name = td->threadname;
while (true)
{
// 1. 消费数据
Task t;
rq->Pop(&t);
// 2. 处理数据
t();
cout << "Consumer get task, task is : " << t.GetTask() << " who: " << name << " result: " << t.GetResult() << endl;
// sleep(1);
}
return nullptr;
}
int main()
{
srand(time(nullptr) ^ getpid());
RingQueue<Task> *rq = new RingQueue<Task>(50);
pthread_t c[5], p[3];
for (int i = 0; i < 1; i++)
{
ThreadData *td = new ThreadData();
td->rq = rq;
td->threadname = "Productor-" + std::to_string(i);
pthread_create(p + i, nullptr, Productor, td);
}
for (int i = 0; i < 1; i++)
{
ThreadData *td = new ThreadData();
td->rq = rq;
td->threadname = "Consumer-" + std::to_string(i);
pthread_create(c + i, nullptr, Consumer, td);
}
for (int i = 0; i < 1; i++)
{
pthread_join(p[i], nullptr);
}
for (int i = 0; i < 1; i++)
{
pthread_join(c[i], nullptr);
}
return 0;
}