【Linux系统编程】第四十七弹---深入探索:POSIX信号量与基于环形队列的生产消费模型实现
✨个人主页: 熬夜学编程的小林
💗系列专栏: 【C语言详解】 【数据结构详解】【C++详解】【Linux系统编程】
目录
1、POSIX信号量
2、基于环形队列的生产消费模型
2.1、代码实现
2.1.1、RingQueue基本结构
2.1.2、PV操作
2.1.3、构造析构函数
2.1.4、生产者入队
2.1.5、消费者出队
2.2、代码测试
2.2.1、内置类型
2.2.2、类类型
2.2.3、多生产多消费
1、POSIX信号量
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。
初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值
销毁信号量
int sem_destroy(sem_t *sem);
等待信号量
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()
发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()
上一弹生产者-消费者的例子是基于queue的,其空间可以动态分配,现在基于固定大小的环形队列重写这个程序(POSIX信号量):
2、基于环形队列的生产消费模型
- 环形队列采用数组模拟,用模运算来模拟环状特性
- 环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态
- 但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程
多线程如何在环形队列中进行生产和消费?1.单生产单消费 2.多生产多消费
1.队列为空,让谁先访问?生产者先生产
2.队列为满,让谁先访问?消费者来消费
3.队列不为空 && 队列不为满 -- 生产和消费同时进行
2.1、代码实现
2.1.1、RingQueue基本结构
RingQueue使用模板类实现,当单生产单消费的时候可以不加锁,因为使用信号量就可以实现同步与互斥,当多生产多消费的时候需要进行加锁,且为两把锁!!!
template<typename T>
class RingQueue
{
private:
// 等待信号量,信号量-1
void P(sem_t& s);
// 发布信号量(归还资源),信号量+1
void V(sem_t& s);
public:
// 构造函数初始化队列大小,信号量以及互斥锁
RingQueue(int max_cap)
:_ringqueue(max_cap)/*构造队列成员个数*/,_max_cap(max_cap),_c_step(0),_p_step(0);
// 生产者生产数据
void Push(const T& in);
// 消费者使用数据
void Pop(T* out);
// 析构函数释放信号量和互斥锁
~RingQueue();
private:
std::vector<T> _ringqueue; // vector实现队列
int _max_cap; // 最大容量
int _c_step; // 消费者位置
int _p_step; // 生产者位置
sem_t _data_sem; // 消费者关心数据资源
sem_t _space_sem; // 生产者关心空间资源
pthread_mutex_t _c_mutex;
pthread_mutex_t _p_mutex;
};
2.1.2、PV操作
P操作等待信号量,V操作发布信号量!
// 等待信号量,信号量-1
void P(sem_t& s)
{
sem_wait(&s);
}
// 发布信号量(归还资源),信号量+1
void V(sem_t& s)
{
sem_post(&s);
}
2.1.3、构造析构函数
构造函数申请队列空间,初始化信号量和互斥锁(多生产多消费需要使用互斥锁),析构函数释放信号量和互斥锁!!!
// 构造函数初始化队列大小,信号量以及互斥锁
RingQueue(int max_cap)
:_ringqueue(max_cap)/*构造队列成员个数*/,_max_cap(max_cap),_c_step(0),_p_step(0)
{
sem_init(&_data_sem,0,0); // 第一个0表示线程共享,第二个0表示信号量初始值为0
sem_init(&_space_sem,0,max_cap);
// pthread_mutex_init(&_c_mutex,nullptr);
// pthread_mutex_init(&_p_mutex,nullptr);
}
// 析构函数释放信号量和互斥锁
~RingQueue()
{
sem_destroy(&_data_sem);
sem_destroy(&_space_sem);
// pthread_mutex_destroy(&_c_mutex);
// pthread_mutex_destroy(&_p_mutex);
}
2.1.4、生产者入队
生产者生产数据(无锁版本)!!!
// 生产者生产数据
void Push(const T& in)
{
// 信号量:是一个计数器,是资源的预订机制。预订:在外部,可以不判断资源是否满足,就可以知道内部资源的情况!
P(_space_sem); // 信号量这里,对资源进行使用,申请,为什么不判断一下条件是否满足???信号量本身就是判断条件!
_ringqueue[_p_step] = in; // 生产数据
_p_step++;
_p_step %= _max_cap; // 循环轮转
V(_data_sem); // 归还数据资源,不为满归还
}
2.1.5、消费者出队
消费者使用数据(无锁版本)!!!
// 消费者使用数据
void Pop(T* out)
{
P(_data_sem);
*out = _ringqueue[_c_step];
_c_step++;
_c_step %= _max_cap;
V(_space_sem);
}
2.2、代码测试
2.2.1、内置类型
Consumer
void *Consumer(void *args)
{
RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);
while (true)
{
// 1.消费
int data;
rq->Pop(&data);
// 2.处理数据
std::cout << "Consumer->" << data << std::endl;
}
}
Productor
void *Productor(void *args)
{
srand(time(nullptr) ^ getpid());
RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);
while (true)
{
// 1.构造数据
int x = rand() % 10 + 1; // [1,10]
// 2.生产数据
rq->Push(x);
std::cout << "Productor->" << x << std::endl;
sleep(1);
}
}
主函数
int main()
{
RingQueue<int> *rq = new RingQueue<int>(5);
// 单生产,单消费
pthread_t c, p;
pthread_create(&c, nullptr, Consumer, rq);
pthread_create(&p, nullptr, Productor, rq);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
return 0;
}
运行结果
2.2.2、类类型
Consumer
void *Consumer(void *args)
{
RingQueue<Task> *rq = static_cast<RingQueue<Task>*>(args);
while (true)
{
// 1.消费
Task t;
rq->Pop(&t);
// 2.处理数据
t();
std::cout << "Consumer->" << t.result() << std::endl;
}
}
Productor
void *Productor(void *args)
{
srand(time(nullptr) ^ getpid());
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
while (true)
{
// 1.构造数据
int x = rand() % 10 + 1; // [1,10]
usleep(x * 1000);
int y = rand() % 10 + 1;
Task t(x,y);
// 2.生产数据
rq->Push(t);
std::cout << "Productor->" << t.debug() << std::endl;
sleep(1);
}
}
主函数
int main()
{
RingQueue<Task> *rq = new RingQueue<Task>(5);
// 单生产,单消费
pthread_t c, p;
pthread_create(&c, nullptr, Consumer, rq);
pthread_create(&p, nullptr, Productor, rq);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
return 0;
}
运行结果
2.2.3、多生产多消费
多生产多消费能够保证生产与消费之间的同步与互斥,但是不能保证生产与生产,消费与消费之间的同步与互斥,因此需要在入队与出队时加锁!!!
生产者入队
// 生产者生产数据
void Push(const T& in)
{
// 信号量:是一个计数器,是资源的预订机制。预订:在外部,可以不判断资源是否满足,就可以知道内部资源的情况!
P(_space_sem); // 信号量这里,对资源进行使用,申请,为什么不判断一下条件是否满足???信号量本身就是判断条件!
pthread_mutex_lock(&_p_mutex); // 给生产者上锁,解决多生产多消费数据不一致问题,放在P后效率更高
_ringqueue[_p_step] = in; // 生产数据
_p_step++;
_p_step %= _max_cap; // 循环轮转
pthread_mutex_unlock(&_p_mutex); // 解锁
V(_data_sem); // 归还数据资源,不为满归还
}
消费者出队
// 消费者使用数据
void Pop(T* out)
{
P(_data_sem);
pthread_mutex_lock(&_c_mutex);
*out = _ringqueue[_c_step];
_c_step++;
_c_step %= _max_cap;
pthread_mutex_unlock(&_c_mutex);
V(_space_sem);
}
主函数
int main()
{
RingQueue<Task> *rq = new RingQueue<Task>(5);
// 多生产,多消费
pthread_t c1,c2,p1,p2,p3;
pthread_create(&c1, nullptr, Consumer, rq);
pthread_create(&c2, nullptr, Consumer, rq);
pthread_create(&p1, nullptr, Productor, rq);
pthread_create(&p2, nullptr, Productor, rq);
pthread_create(&p3, nullptr, Productor, rq);
pthread_join(c1,nullptr);
pthread_join(c2,nullptr);
pthread_join(p1,nullptr);
pthread_join(p2,nullptr);
pthread_join(p3,nullptr);
return 0;
}
无锁
加锁