[Linux]生产消费者模型
目录
一、生产消费者模型概念
1.概念
2.模块构成
3.协作机制
二、基于BlockingQueue的多CP问题
1.BlockQueue模块
2.Task任务模块
3.创建多线程以及整合模块
三、基于信号量的环形队列CP模型
1.POISX信号量接口
初始化信号量
PV操作
信号量销毁
2.模型简述
3.代码实现
一、生产消费者模型概念
1.概念
生产者消费者模型按名字来听就一定有生产者和消费者成员,生产者将自己生产的物品交给消费者,这就是最简单的生产消费协同。但是这样的话,生产者生产之后就一定要当面交给消费者,然后消费者不接收生产者就无法生产,这样就会有弊端了。如果说破生产者生产的快,但是消费者消费速度慢,那么生产者就需要等待消费者将东西买走后,才能继续生产,就会大大降低产能和效率。
所以接下来引入了超市的这一中间概念,生产者可以将生产的东西放到超市里面,不需要直接给消费者,这样的话,生产者的产能就不会受到消费者的影响了,而且消费者也可能随时去超时获取自己想要的东西,而不是说生产者生产出来,消费者就必须接收了。这个就是生产消费者模型的大概框架。
生产消费者模式是一种经典的多线程或多进程并发协作的一种模式。他主要用于解决数据的产生和消费速度不一致的问题,实现了功能解耦、时间解耦以及处理速度的解耦,同时保证了数据的完整性和正确性。功能解耦:生产者专注生产,不关心数据如何被消费者处理;消费者专注处理,不关心数据如何生产的。时间解耦:生产者和消费者(CP)之间不需要同步运行,在满足各自条件下进行工作就可以。处理速度解耦:CP之间不需要互相等待对方,所以处理速度快的一方那么就会一直以高速的方式工作,而慢速的一方则慢慢 执行即可,有中间的共享资源区作为数据的缓冲地带。
2.模块构成
分为生产者、消费者以及缓冲区三部分,生产者和消费者可以是一个线程、进程或者软件模块,生产者将生产的数据放入缓冲区,消费者从缓冲区中取出数据后做相应的处理工作。
3.协作机制
该模型一定涉及到执行流的互斥与同步问题了,生产者与消费者之间存在互斥关系,生产者访问缓冲区的时候,消费者是不可以访问的,消费者访问的时候,同样的生产者也是不可以访问的。因为生产和消费二者访问缓冲区的时候,会对缓冲区的数据做修改,那么就会有线程安全问题,所以要实现互斥机制。当然二者之间也有同步关系,缓冲区中没有数据时,消费者不能进行访问缓冲区,当生产者生产数据之后,会通知消费者访问的。
对于多生产多消费的情况下,也就是生产者和消费者都有很多线程。生产和消费都会修改缓冲区。所以生产者与生产者之间,消费者与消费者之间,也属于多线程访问共享资源,也是要实现互斥的。
上述概括下来,就是生产者之间、消费者之间的互斥以及生产者消费者之间的互斥与同步。
二、基于BlockingQueue的多CP问题
1.BlockQueue模块
该模块定义了一个任务队列,也就是CP问题的缓冲区部分、以及生产线程和消费线程对于该缓冲区的操作方法。当生产者要放入任务的时候,要先判断缓冲区中是否有空间,如果有空间的话,就放入任务,并且通知消费进程取出任务,当没有空间的时候,就进行等待条件变量就绪,这个条件变量的唤醒是由消费进程取出数据后,唤醒的,唤醒生产进程放入数据。
对于消费进程要取出任务的时候,首先判断缓冲区有没有任务,如果有的话,就取出任务,并通知生产进程放入数据,也就是唤醒当时因为缓冲区满了,而阻塞在条件变量中的等待线程。当没有任务的时候,就阻塞在条件变量中等待生产进程唤醒。
生产线程和消费线程之间通过锁实现的互斥操作,通过条件变量实现了同步的顺序操作。
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
// 缓冲区默认容量
static const int default_capacity = 5;
//生产消费者模型类
template <class T>
class BlockQueue
{
private:
std::queue<T> _task_queue; // 任务队列
int _capacity; // 队列容量
pthread_mutex_t _mutex; // 锁
pthread_cond_t _c_cond; // customer条件变量
pthread_cond_t _p_cond; // productor条件变量
public:
// 构造函数
BlockQueue(int capacity = default_capacity)
:_capacity(capacity)
{
//初始化锁和条件变量
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_c_cond, nullptr);
pthread_cond_init(&_p_cond, nullptr);
}
//生产者--将任务放入任务队列
void Push(const T& task)
{
//加锁
pthread_mutex_lock(&_mutex);
//判断缓冲区是否满了,如果满了的话,就一直通知消费者取出数据
//使用while是防止伪唤醒。
while(_capacity == _task_queue.size())
{
//释放锁,并等待消费者通知可以生产
pthread_cond_wait(&_p_cond, &_mutex);
}
//有空间了,放入任务
_task_queue.push(task);
//通知消费者可以取数据了
pthread_cond_signal(&_c_cond);
//解锁
pthread_mutex_unlock(&_mutex);
}
//消费者--将任务从任务队列取出
void PoP(T* task)
{
//加锁
pthread_mutex_lock(&_mutex);
//判断是否有数据,没有就进行等待
while(_task_queue.empty())
{
//释放锁,并等待生产者通知可以消费
pthread_cond_wait(&_c_cond, &_mutex);
}
//有数据了,取出数据
*task = _task_queue.front();
_task_queue.pop();
//通知生产者可以生产了
pthread_cond_signal(&_p_cond);
//解锁
pthread_mutex_unlock(&_mutex);
}
//析构函数
~BlockQueue()
{
//锁和条件变量的销毁
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_c_cond);
pthread_cond_destroy(&_p_cond);
}
};
2.Task任务模块
定义的是计算两个元素的加减法类
#pragma once
#include <iostream>
#include <unistd.h>
#include <string>
static const int defaultValue = 0;
static const std::string str = "+-";
// 计算加减法的任务类
class Task
{
private:
int _x;
int _y;
char _op;
int _result;
public:
//构造与析构函数
Task() {}
Task(int x, int y, char op)
: _x(x), _y(y), _op(op), _result(defaultValue) {}
~Task() {}
void Run()
{
switch (_op)
{
case '+':
_result = _x + _y;
break;
case '-':
_result = _x - _y;
break;
default:
std::cout << "op is error" << std::endl;
break;
}
}
// 运行任务重载函数
void operator()()
{
Run();
std::cout << _x << _op << _y << "=" << _result << std::endl;
sleep(2);
}
};
3.创建多线程以及整合模块
该模块则是定义了生产线程和消费线程具体线程函数。以及创建线程、定义任务队列类等操作。
#include <cstdlib>
#include <vector>
#include <string>
#include <cstring>
#include <pthread.h>
#include "BlockQueue.hpp"
#include "Task.hpp"
// 线程传递的参数
struct ThreadData
{
BlockQueue<Task> *_cp; // 生产者消费者模型对象
std::string _thread_name; // 线程名称
// 构造函数
ThreadData(BlockQueue<Task> *cp, std::string name)
: _cp(cp), _thread_name(name) {}
};
// 消费者线程函数
void *Costomer(void *arg)
{
// 接收参数
ThreadData *data = (ThreadData *)arg;
while (true)
{
// 获取任务
Task task;
data->_cp->PoP(&task);
std::cout << data->_thread_name << " run task : ";
// 执行任务
task();
}
return nullptr;
}
// 生产者线程参数
void *Productor(void *arg)
{
// 接收参数
ThreadData *data = (ThreadData *)arg;
while (true)
{
// 生产任务数据
int data1 = rand() % 10;
int data2 = rand() % 10;
char op = str[rand() % str.size()];
Task task(data1, data2, op);
// 将任务放入队列
data->_cp->Push(task);
std::cout << data->_thread_name << " push task into queue : " << data1 << op << data2 << std::endl;
}
return nullptr;
}
int main()
{
// 生产随机数种子
srand((uint16_t)time(nullptr) ^ getpid() ^ pthread_self());
// 创建生产者消费者模型对象
BlockQueue<Task> *cp = new BlockQueue<Task>();
// 创建生产者消费者线程
std::vector<pthread_t> custumer_thread(5);
std::vector<pthread_t> productor_thread(5);
// 消费者
for (int i = 0; i < 5; i++)
{
// 线程id
pthread_t tid;
// 构造线程参数
std::string thread_name = "customer-" + std::to_string(i);
ThreadData data(cp, thread_name);
// 创建线程
pthread_create(&tid, nullptr, Costomer, &data);
// 放入数组中
custumer_thread.push_back(tid);
}
// 生产者
for (int i = 0; i < 5; i++)
{
// 线程id
pthread_t tid;
// 构造线程参数
std::string thread_name = "productor-" + std::to_string(i);
ThreadData data(cp, thread_name);
// 创建线程
pthread_create(&tid, nullptr, Productor, &data);
// 放入数组中
productor_thread.push_back(tid);
}
// 线程等待
for (size_t i = 0; i < custumer_thread.size(); ++i)
{
if (pthread_join(custumer_thread[i], nullptr) != 0)
{
std::cerr << "等待消费者线程结束失败" << std::endl;
}
}
for (size_t i = 0; i < productor_thread.size(); ++i)
{
if (pthread_join(productor_thread[i], nullptr) != 0)
{
std::cerr << "等待生产者线程结束失败" << std::endl;
}
}
// 释放资源
delete cp;
return 0;
}
三、基于信号量的环形队列CP模型
1.POISX信号量接口
其实和System V信号量的作用基本上是一样的,只是接口上有区别。
初始化信号量
int sem_init(sem_t* sem, int pshared, unsigned int value);
第一个参数是传递要初始化的信号量;第二个参数表示信号量是用于进程间(非零值)还是线程间(零值)共享;第三个参数则是信号量的初始值。
PV操作
int sem_wait(sem_t* sem); //申请操作
int sem_post(sem_t* sem); //释放操作
信号量销毁
int sem_destory(sem_T* sem);
2.模型简述
该模型使用了锁实现了线程之间的互斥,信号量的方式实现了同步机制。但是这里的互斥只有生产者和生产者、消费者和消费者之间的互斥,没有生产者与消费者之间的互斥。因为使用了信号量记录了该资源区域的使用情况,用信号量表示了剩余空间的个数和存在的资源的个数,相当于将一个大的资源区域分成了若干个小区域。同时配合指针记录生产者下一次放数据的位置,消费者下一次取数据的位置。所以生产者线程和消费者线程两者访问不到同一个位置,所以也就不存在两个的互斥问题了。
信号量实现的同步是,当space剩余空间信号量为0的时候,生产线程无法放入数据,会等待消费线程取出数据,当消费线程取出数据后,会释放一个space信号量,那么生产线程就可以申请到信号量了,就可以放入数据了。放入数据之后,会将data信号量释放一个,表示着队列中的数据量多了一个。
在申请信号量和锁的顺序上,应该是先申请信号量,信号量代表的是该线程对于想要获取的资源是否存在,如果存在的话,再去为了访问资源而申请锁,如果说没有资源的话,申请到锁也没用,所以说应该先去申请信号量,之后再去申请锁资源。而且先申请锁的话,只会有一个线程到达申请信号量的步骤,而先申请信号量的话,可以有多个线程到达申请锁的位置等待锁资源。
经过上述阐述,因为生产者和消费者没有互斥的关系,所以要给生产者和消费者各设定一把锁去保证线程安全。同时还有设定两个信号量,一个是剩余空间信号量,另一个是剩余资源的信号量。
3.代码实现
#pragma once
#include <iostream>
#include <pthread.h>
#include <vector>
#include <semaphore.h>
// 环形队列默认大小
static const int default_capacity = 5;
template <class T>
class RingQueue
{
private:
std::vector<T> _ringQueue; // 环形队列
int _capacity; // 最大容量
int _p_step; // 生产者放入任务的位置
int _c_step; // 消费者取出任务的位置
sem_t _space_sem; // 剩余空间信号量
sem_t _data_sem; // 剩余任务信号量
pthread_mutex_t _p_mutex; // 生产锁
pthread_mutex_t _c_mutex; // 消费锁
private:
//申请信号量操作
void P(sem_t &sem) { sem_wait(&sem); }
//释放信号量操作
void V(sem_t &sem) { sem_post(&sem); }
public:
//构造函数
RingQueue(int capacity = default_capacity)
:_ringQueue(capacity), _capacity(capacity), _p_step(0), _c_step(0)
{
//初始化信号量,剩余空间为capacity,剩余任务数据为0
sem_init(&_space_sem, 0, _capacity);
sem_init(&_data_sem, 0, 0);
//初始化锁
pthread_mutex_init(&_p_mutex, nullptr);
pthread_mutex_init(&_c_mutex, nullptr);
}
//生产者放入任务操作
void Push(const T& task)
{
//申请信号量并加锁
P(_space_sem);
pthread_mutex_lock(&_p_mutex);
//放入任务
_ringQueue[_p_step] = task;
//更新生产者指针位置
_p_step++;
_p_step %= _capacity;
//解锁并释放信号量
pthread_mutex_unlock(&_p_mutex);
V(_data_sem);
}
//消费者取出任务操作
void Pop(T* task)
{
//申请信号量并加锁
P(_data_sem);
pthread_mutex_lock(&_c_mutex);
//取出任务
*task = _ringQueue[_c_step];
//更新消费者指针位置
_c_step++;
_c_step %= _capacity;
//解锁并释放信号量
pthread_mutex_unlock(&_c_mutex);
V(_space_sem);
}
//析构函数
~RingQueue()
{
//销毁信号量和锁
sem_destroy(&_space_sem);
sem_destroy(&_data_sem);
pthread_mutex_destroy(&_p_mutex);
pthread_mutex_destroy(&_c_mutex);
}
};