【Linux】基于阻塞队列的生产者消费者模型
🌠 作者:@阿亮joy.
🎆专栏:《学会Linux》
🎇 座右铭:每个优秀的人都有一段沉默的时光,那段时光是付出了很多努力却得不到结果的日子,我们把它叫做扎根
目录
- 👉为何要使用生产者消费者模型👈
- 👉生产者消费者模型的优点👈
- 👉基于阻塞队列的生产者和消费者模型👈
- 👉总结👈
👉为何要使用生产者消费者模型👈
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
生产者消费者模型在生活中是相当常见的,比如客户去超市里买商品就是一个很好的例子。在这个例子中,供货商就是生产者,顾客就是消费者,超市就是一个交易场所,本质是一个商品的缓冲区。超市存在的意义就是让生产者和消费者解耦,以提高效率。
在生产者消费者模型中,有一个交易场所超市,生产者和消费者两种角色,生产者和生产者、消费者和消费者、生产者和消费者三种关系。其中生产者和生产者之间的关系是竞争互斥关系,消费者和消费者之间的关系是竞争互斥关系,生产者和消费者之间的关系是互斥和同步关系。当生产者进行生产时,消费者不能进行消费(保证安全),所以生产者和消费者是互斥关系。当商品很少时,生产者需要进行生产,消费者需要进行等待;而当商品很多时,生产者不能进行生产,消费者进行消费,所以生产行为和消费行为具有一定的顺序性,也就是说生产者和消费者是同步关系。
在计算机世界里,生产者和消费者都是通过线程模拟出来的。当生产者线程生产完商品后,就可以通知消费者线程来进行消费;而当消费者线程消费完商品后,就可以通知生产者线程来进行生产,这个过程就是通过条件变量来实现的。
👉生产者消费者模型的优点👈
想要真正理解生产者消费者模型优点,我们需要知道生产者生产数据和消费者消费数据都是想要消耗时间的。那么当消费者线程进行消费数据时,该线程是不会访问阻塞队列的。所以当消费者线程进行消费时,生产者线程从其他地方获取数据并将该数据放入到阻塞队列中,这样就可以提高生产者线程和消费者线程的并发度了。所以生产者消费者模型具有一下优点:解耦、支持并发。
👉基于阻塞队列的生产者和消费者模型👈
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)。注:管道本身就是一个阻塞队列,有数据就消费,没数据就等待。管道内部已经实现了互斥和同步的功能。
单生产者单消费者模型
// BlockQueue.hpp
#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>
const int DefaultCap = 5;
template <class T>
class BlockQueue
{
private:
bool isEmpty()
{
return _bq.empty();
}
bool isFull()
{
return _bq.size() == _capacity;
}
public:
BlockQueue(int capacity = DefaultCap)
: _capacity(capacity)
{
pthread_mutex_init(&_mtx, nullptr);
pthread_cond_init(&_empty, nullptr);
pthread_cond_init(&_full, nullptr);
}
// 生产者
void push(const T& in)
{
// 访问临界资源需要进行加锁保护
pthread_mutex_lock(&_mtx);
// 1.先检查当前的临界资源是否满足访问条件
// 检查临界资源是否满足访问条件,也是访问临界资源
// 所以它也需要在加锁和解锁之间
// pthread_cond_wait函数是让线程在特定的条件变量下阻塞等待
// 当进行等待时,线程所持有的锁会被自动释放掉.当条件变量满足
// 时,线程会被在阻塞挂起的地方被唤醒.线程被唤醒的时候,是在临
// 界区中的,pthread_cond_wait会自动帮助线程获取锁
while(isFull())
pthread_cond_wait(&_full, &_mtx);
// pthread_cond_wait是一个函数,它就可能会调用失败,从而出现
// 伪唤醒(临界资源不满足访问条件,却往下进行临界资源的访问)的
// 情况,所以需要通过while来保证满足访问临界资源的条件,而不能
// 通过if来判断临界资源是否能被访问
// 2.访问临界资源(来到这里,临界资源100%是就绪的!)
_bq.push(in);
// 可以指定特定的唤醒线程的策略,如数据的个数大于容量的一半
// if(2 * _bq.size() >= _capacity) pthread_cond_signal(&_empty);
// 唤醒线程可以在释放锁之前,也可以在释放锁之后
pthread_cond_signal(&_empty);
pthread_mutex_unlock(&_mtx);
}
// 消费者
void pop(T* out)
{
pthread_mutex_lock(&_mtx);
while(isEmpty())
pthread_cond_wait(&_empty, &_mtx);
*out = _bq.front();
_bq.pop();
pthread_cond_signal(&_full);
pthread_mutex_unlock(&_mtx);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mtx);
pthread_cond_destroy(&_empty);
pthread_cond_destroy(&_full);
}
private:
std::queue<T> _bq; // 阻塞队列(STL中的容器并不是线程安全的,需要加锁进行保护)
int _capacity; // 容量上线
pthread_mutex_t _mtx; // 互斥锁保护队列的安全
pthread_cond_t _empty; // 该条件变量表示bq是否为空
pthread_cond_t _full; // 该条件变量表示bq是否为满
};
// ConProd.cc
#include "BlockQueue.hpp"
#include <pthread.h>
#include <unistd.h>
void* consume(void* args)
{
BlockQueue<int>* bq = (BlockQueue<int>*)args;
while(1)
{
int a;
bq->pop(&a);
std::cout << "消费了一个数据:" << a << std::endl;
sleep(1);
}
return nullptr;
}
void* produce(void* args)
{
BlockQueue<int>* bq = (BlockQueue<int>*)args;
int a = 1;
while(1)
{
bq->push(a);
std::cout << "生产了一个数据:" << a << std::endl;
++a;
}
return nullptr;
}
int main()
{
BlockQueue<int>* bq = new BlockQueue<int>();
pthread_t c, p;
pthread_create(&c, nullptr, consume, (void*)bq);
pthread_create(&p, nullptr, produce, (void*)bq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
delete bq;
return 0;
}
增加任务类
// Task.hpp
#pragma once
#include <iostream>
#include <functional>
typedef std::function<int(int, int)> func_t;
class Task
{
public:
Task() = default;
Task(int x, int y, func_t func)
: _x(x)
, _y(y)
, _func(func)
{}
int operator()()
{
return _func(_x, _y);
}
public:
int _x;
int _y;
func_t _func;
};
// ConProd.cc
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>
int Add(int x, int y)
{
return x + y;
}
void* consume(void* args)
{
BlockQueue<Task>* bq = (BlockQueue<Task>*)args;
while(1)
{
// 获取任务
Task t;
bq->pop(&t);
// 完成任务
std::cout << "consumer: " << t._x << " + " << t._y << " = " << t() << std::endl;
}
return nullptr;
}
void* produce(void* args)
{
BlockQueue<Task>* bq = (BlockQueue<Task>*)args;
while(1)
{
// 制作任务 -- 不一定从生产者来的,可能是从网络来的
int x = rand() % 10 + 1;
int y = rand() % 20 + 1;
// 生产任务
Task t(x, y, Add);
bq->push(t);
std::cout << "productor: " << x << " + " << y << " = ?" << std::endl;
sleep(1);
}
return nullptr;
}
int main()
{
srand((unsigned int)time(nullptr) ^ getpid() ^ 0x123);
BlockQueue<Task>* bq = new BlockQueue<Task>();
pthread_t c, p;
pthread_create(&c, nullptr, consume, (void*)bq);
pthread_create(&p, nullptr, produce, (void*)bq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
delete bq;
return 0;
}
多生产者多消费者
// ConProd.cc
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>
int Add(int x, int y)
{
return x + y;
}
void* consume(void* args)
{
BlockQueue<Task>* bq = (BlockQueue<Task>*)args;
while(1)
{
// 获取任务
Task t;
bq->pop(&t);
// 完成任务
std::cout << pthread_self() << " consumer: " << t._x << " + " << t._y << " = " << t() << std::endl;
}
return nullptr;
}
void* produce(void* args)
{
BlockQueue<Task>* bq = (BlockQueue<Task>*)args;
while(1)
{
// 制作任务 -- 不一定从生产者来的,可能是从网络来的
int x = rand() % 10 + 1;
int y = rand() % 20 + 1;
// int x, y;
// std::cout << "Please Enter x: ";
// std::cin >> x;
// std::cout << "Please Enter y: ";
// std::cin >> y;
// 生产任务
Task t(x, y, Add);
bq->push(t);
std::cout << pthread_self() << " productor: " << x << " + " << y << " = ?" << std::endl;
sleep(1);
}
return nullptr;
}
int main()
{
srand((unsigned int)time(nullptr) ^ getpid() ^ 0x123);
BlockQueue<Task>* bq = new BlockQueue<Task>();
// pthread_t c, p;
pthread_t c[2], p[2];
pthread_create(c, nullptr, consume, (void*)bq);
pthread_create(c + 1, nullptr, consume, (void*)bq);
pthread_create(p, nullptr, produce, (void*)bq);
pthread_create(p + 1, nullptr, produce, (void*)bq);
pthread_join(c[0], nullptr);
pthread_join(c[1], nullptr);
pthread_join(p[0], nullptr);
pthread_join(p[1], nullptr);
delete bq;
return 0;
}
多生产者多消费者模型的意义就是让生产者并发地获取和制作任务,让消费者并发地完成消费任务。多生产者多消费者模型主要用于处理消费任务或者获取和制作任务比较耗时的场景。
锁的封装
// LockGuard.hpp
#pragma once
#include <iostream>
#include <pthread.h>
class Mutex
{
public:
Mutex(pthread_mutex_t* pmtx)
: _pmtx(pmtx)
{}
void lock()
{
std::cout << "进行加锁" << std::endl;
pthread_mutex_lock(_pmtx);
}
void unlock()
{
std::cout << "进行解锁" << std::endl;
pthread_mutex_unlock(_pmtx);
}
~Mutex()
{}
private:
pthread_mutex_t* _pmtx;
};
// RAII的加锁方式
class LockGuard
{
public:
LockGuard(pthread_mutex_t* pmtx)
: _mtx(pmtx)
{
_mtx.lock();
}
~LockGuard()
{
_mtx.unlock();
}
private:
Mutex _mtx;
};
// BlockQueue.hpp
#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>
#include "LockGuard.hpp"
const int DefaultCap = 5;
template <class T>
class BlockQueue
{
private:
bool isEmpty()
{
return _bq.empty();
}
bool isFull()
{
return _bq.size() == _capacity;
}
public:
BlockQueue(int capacity = DefaultCap)
: _capacity(capacity)
{
pthread_mutex_init(&_mtx, nullptr);
pthread_cond_init(&_empty, nullptr);
pthread_cond_init(&_full, nullptr);
}
// 生产者
void push(const T &in)
{
// 出了函数的作用域会自动解锁
LockGuard lockGuard(&_mtx);
while (isFull())
pthread_cond_wait(&_full, &_mtx);
_bq.push(in);
pthread_cond_signal(&_empty);
}
// 消费者
void pop(T *out)
{
// 出了函数的作用域会自动解锁
LockGuard lockGuard(&_mtx);
while (isEmpty())
pthread_cond_wait(&_empty, &_mtx);
*out = _bq.front();
_bq.pop();
pthread_cond_signal(&_full);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mtx);
pthread_cond_destroy(&_empty);
pthread_cond_destroy(&_full);
}
private:
std::queue<T> _bq; // 阻塞队列(STL中的容器并不是线程安全的,需要加锁进行保护)
int _capacity; // 容量上线
pthread_mutex_t _mtx; // 互斥锁保护队列的安全
pthread_cond_t _empty; // 该条件变量表示bq是否为空
pthread_cond_t _full; // 该条件变量表示bq是否为满
};
👉总结👈
本篇博客主要讲解了为什么要使用生产者消费者模型、基于阻塞队列的生产者和消费者模型以及 RAII 的加锁方式等等。那么以上就是本篇博客的全部内容了,如果大家觉得有收获的话,可以点个三连支持一下!谢谢大家!💖💝❣️