【Linux】线程锁同步互斥生产消费模型
目录
认识锁及其接口
从原理角度理解锁
从实现角度理解锁
Linux线程同步
条件变量
写一个测试代码
生产者消费者模型
基于 BlockingQueue 的生产者消费者模型
POSIX信号量
基于环形队列的生产消费模型
实际上,线程的大部分资源都是共享的,把多个线程能看到的资源称为共享资源,我们需要对这部分资源进行保护!类似去银行ATM机取钱,在这个小空间内,在取钱期间没有任何人能打扰我。
我们先来见一见多线程访问的问题,以抢票代码为例,
#include <iostream>
#include <vector>
#include <cstdio>
#include <unistd.h>
#include "Thread.hpp"
using namespace ThreadModule;
int tickets = 10000;
void route(const std::string &name)
{
while (true)
{
if (tickets > 0)
{
// 抢票过程
usleep(1000); // 抢票花费的时间
printf("who:%s,get a ticket:%d\n", name.c_str(), tickets);
tickets--;
}
else
{
break;
}
}
}
int main()
{
Thread t1("thread-1", route);
Thread t2("thread-2", route);
Thread t3("thread-3", route);
Thread t4("thread-4", route);
t1.Start();
t2.Start();
t3.Start();
t4.Start();
t1.Join();
t2.Join();
t3.Join();
t4.Join();
return 0;
}
程序运行到最后,我们发现,有两个进程抢到了-1票,这就很离谱了,原因分析如下:
if (tickets > 0)这条语句判断的过程其实是逻辑计算,而逻辑计算由CPU来做,tickets是在内存中存放的变量,比如CPU中有eax寄存器,还有一个存放计算结果的寄存器,第一步要把数据tickets从内存移动到eax寄存器中,然后把0也放到CPU中寄存器,然后CPU就可以对寄存器中的两个值做逻辑判断,如果成立,结果为1,否则为0,然后就得到了结果,CPU就会控制执行流执行if还是else。而每一个线程都会执行这个逻辑,但是CPU内寄存器只有一套,但是寄存器里面的数据可以有多套。假如CPU正在执行线程代码,正处于逻辑判断,此时这个线程被切换了,意味着线程必须把寄存器里的数值带走,当这个线程重新被切换回来的时候,要重新恢复寄存器中的值(寄存器中的数据属于线程私有,看起来放在了一套公共的寄存器中,但是属于线程私有,当它被切换时,它要带走自己的数据,回来的时候,会恢复!)。现在4个线程抢票,抢到最后tickets=1,此时线程1把1放到寄存器中,然后进行判断结果也为真,进去之后正执行抢票的时候,它被切换了,被切换时,要保存自己的寄存器中的数据(eax-1,res-1),然后切换到了线程2,也要读取tickets到寄存器,也是1,然后判断结果也为真, 然后在执行抢票的时候,线程2也被切换了,然后线程3和4都是这样的情况,都进了if语句内部,然而票数只剩1张,却放了4个线程进来,然后线程1又被切换回来,tickets--,减到0了,线程b再tickets--(tickets--时,1.重读数据2.--数据3.写回数据,所以,线程2tickets--之前,tickets已被线程1减为0),tickets变为-1,然后接下来变为-2、-3,由于tickets是共享资源,这就造成了数据不一致的问题。
解决数据不一致的问题的方法就是加锁!
认识锁及其接口
pthread_mutex_t就是互斥锁类型,所谓互斥,就是只允许一个线程进行资源访问。也就是我们先要定义pthread_mutex_t类型的变量。在定义好锁之后,要初始化锁有两种方法:
1)如果锁是全局或静态的,可以用pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER进程初始化,不需要destory,因为这是在全局数据区或静态数据区创建,会随着进程运行一直存在,进程运行结束后就会自动释放;
2)但是如果锁是new或malloc出来的,或者在栈上开辟的,此时需要使用pthread_mutex_init函数来进行初始化,其第一个参数为定义的这把锁,第二个参数直接设为nullptr。当用完这把锁后,要销毁这把锁,就需要调用pthread_mutex_destory函数。
在我们创建锁并初始化后,未来想对一个区域加锁,就可以调用pthread_mutex_lock来进行加锁,参数就是之前初始化好的这把锁,未来不想用这把锁时,可以调用pthread_mutex_unlock解锁,参数也是那把锁。在某个区域加锁后,多个线程会竞争这把锁,但只有一个进程竞争成功,竞争失败的线程会自动进行阻塞,直到有线程解锁。
就像之前tickets一样,我们需要对其进行保护,被保护的叫做临界资源,把访问临界资源的代码叫做临界区,不访问临界资源的代码叫做非临界区,因此,所谓对临界资源进行保护,本质是对临界区代码进行保护,我们对所有资源进行访问,本质都是通过代码进行访问的,所以,保护资源,本质就是把访问资源的代码保护起来。在非临界区所有线程并行执行,而到了临界区所有线程串行执行。
我们对上面抢票的过程进行加锁和解锁:
抢票结果是正常的。
在加锁时,我们有几个原则:
1)加锁的范围、粒度一定要尽量小。
2)任何线程,要进行抢票,都得先申请锁,原则上不应该有例外。
3)所有线程申请锁,前提是所有线程看到这把锁,锁本身也是共享资源。这就要求加锁的过程,必须是原子的!
4)原子性:要么不做,要做就做完,没有中间状态,就是原子性。
5)如果线程申请锁失败了,我的线程要被阻塞,直到锁被释放,再去竞争这把锁。
6)如果线程申请锁成功了,继续向后运行!
7)如果线程申请锁成功了,就可以执行临界区的代码了。那在执行临界区代码期间,可以被切换吗?
可以切换!但是其他线程不能进入临界区,因为我虽然被切换了,但是我没有释放锁!也就是说,我可以放心执行完临界区的代码,没有人能打扰我!
结论:对于其他线程,要么我没有申请锁,要么我释放了锁,对于其他线程才有意义!也就是我访问临界区,对其他线程是原子的!
如果我们想定义局部的锁,可以将锁作为参数传入,为此,我们可以创建ThreadData类,
class ThreadData
{
public:
ThreadData(const std::string& name,pthread_mutex_t* lock):_name(name),_lock(lock)
{
}
public:
std::string _name;
pthread_mutex_t *_lock;
};
将ThreadData类对象传入,这样所有线程就可以看到同一把锁。
这样我们就看到,不会出现票抢到负数的情况。
我们还可以对加解锁进行封装:
class LockGuard
{
public:
LockGuard(pthread_mutex_t* mutex):_mutex(mutex)
{
pthread_mutex_lock(_mutex);
}
LockGuard()
{
pthread_mutex_unlock(_mutex);
}
private:
pthread_mutex_t* _mutex;
};
定义lockguard临时对象时,调用构造函数,自动进行加锁,while循环结束时,临时变量被释放,析构函数被调用,进行解锁。这种锁的形式称为RAII风格的锁。
从原理角度理解锁
pthread_mutex_lock(&mutex);
如何理解申请锁成功,允许你进入临界区?
答:申请锁成功,pthread_mutex_lock()函数会返回,所以继续向下执行,就执行到了临界区。
如何理解申请锁失败,不允许你进入临界区?
答:申请锁失败,pthread_mutex_t()函数不返回,线程就是阻塞了。在锁被unlock后,线程在pthread_mutex_lock内部被重新唤醒,重新申请锁。
从实现角度理解锁
为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单元的数据交换,由于只有一条指令,保证了原子性。
CPU在设计时,就已经在芯片级别设计了CPU能够识别的指令集,比如给CPU寄存器写入0011这样的二进制组合,CPU中的硬件电路就能判定这是让CPU做什么操作(+-*/),也就是说,CPU内部本身是有自己的指令集的。所以最开始时,人们采用二进制进行编程。后来人们觉得二进制编程效率太低,就有了汇编语句(然后就有了编译器),最后又有了C/C++这样的语言。
大多数体系结构都提供了swap或exchange指令,这表示CPU硬件电路设计时就直接支持swap或exchange指令。
我们需要知道两个前提:
1.CPU的寄存器只有一套,被所有的线程所共享,但是寄存器里面的数据,属于执行流的上下文,属于执行流私有的数据!
2.CPU在执行代码的时候,一定要有对应的执行载体---线程&&进程。
比如,CPU在执行lock函数时,前置是CPU在调度某个线程,只不过这个线程某段代码恰好在执行lock函数。
3.数据在内存中,被所有线程共享。
结论:把数据从内存移动到CPU寄存器中,本质是把数据从共享变成线程私有!!
锁其实我们可以当成内存上的一个变量lock,其初始值为1,表示还未被申请,如果为0,表示已经被申请了。上面汇编语句的%al是某个寄存器,第1条语句先把0放到%al中,第2条语句是用一条语句把%al的值和mutex做交换。假设在执行到第3行时,线程A被切换走,同时其寄存器中的值也要被带走:%al:1,执行第3行。然后线程B被调度,B也要申请锁,也要首先执行第1行代码,先把%al清空,再执行第二条语句,但是交换的本质不是拷贝,也就是永远只有一个1,很不幸,这个1之前已经被线程A交换走了,于是%al就得到了0,执行第3条语句,不成立,转而到了第6句,线程B被挂起等待,同样要把自己对应的信息带走:%al:0,执行第7行;类似地,线程C、D、E等发现都进不去。然后又调度到了线程A,恢复自己的信息,%al=1,执行第3行,继续执行之前未完成时事情,执行第4行,返回0,申请锁成功,加锁,进入了临界区,线程A就称为申请锁成功。因此,只有第2行交换指令才能称为加锁,一条汇编,交换不是拷贝,只有一个“1”,持有1的,就表示持有锁!
在解锁时,只需要把“1”归还给mutex即可。
Linux线程同步
在上面的运行结果中,我们发现会出现同一个线程连续抢到多次的现象,这样是不太合理的。所以,为了更高效合理:
1)当线程解锁后,不能立马申请锁
2)要二次申请,需要和所有进程排队
我们要求线程申请锁具有一定的顺序性,也就是同步,可以是严格的顺序性,也可以是宏观上具有相对的顺序性。
条件变量
为了保证多线程在访问临界资源具有顺序性(同步),pthread库给我们提供了条件变量,
条件变量其实是一个pthread_cond_t的数据类型,当定义了一个全局的或者静态的条件变量时,使用PTHREAD_COND_INITIALIZER来初始化,如果是定义了局部的条件变量,使用pthread_cond_init函数来初始化,
线程需要在执行的条件变量cond下去等待,
然后还需要通知在条件变量下等待的进程,进行唤醒,pthread_cond_signal表示在该条件变量下唤醒一个线程,pthread_cond_broadcast表示唤醒所有线程。
为了更好地认识条件变量,我们举一个例子:
一个人a眼睛被蒙上,前面放了一张盘子,另一个人b向盘子中放苹果,当把苹果放到盘子上时,a并不知道盘子上放上了苹果。由于a要去盘子上拿苹果,b要去盘子上放苹果,所以盘子是共享资源。假设b正在放苹果,a来盘子里拿苹果,但是并没有拿到苹果,a有没有放苹果是不确定的,所以a拿没拿到苹果也是不确定的,所以计算机里a读取数据就存在二义性。a和b就相当于两个线程,苹果就是数据,为了保证共享资源的安全,a和b在进入和出来时要进行加锁和解锁。当a进入临界区时,首先要判断是否有苹果,如果没有苹果,a解锁后退出临界区,然后a怕马上就有了苹果,所以马上又去申请锁,访问临界区,然后再判断,再退出,所以a一旦在非临界区,就不清楚盘子上是否有苹果,为了有苹果后,a尽快知道,a只能不断申请锁,判断,释放锁...,在a非常频繁检测盘子有没有苹果的时候,也就不断在申请和释放锁,b竞争锁的能力又没有a强,导致b长时间把苹果放不进去。但是,虽然造成了饥饿问题,但是,a和b都没有犯错,这是不合理的。所以,为了解决这种尴尬局面,引入了铃铛, 当b把苹果放到盘里后,就敲一下铃铛,a听到铃铛响了再来。这个铃铛就是条件变量。条件变量:1.需要一个线程队列 2.需要有通知机制。这个线程队列里可以有多个线程在排队,当b放进去苹果后,可以把在排队的线程叫醒一个/全部叫醒。
写一个测试代码
我们先来写一个基础版的多线程代码:
#include<iostream>
#include<string>
#include<pthread.h>
#include<unistd.h>
const int num = 5;
pthread_cond_t gcond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;
void* Wait(void* args)
{
std::string name = static_cast<const char*>(args);
while(true)
{
pthread_mutex_lock(&gmutex);
std::cout << "I am " << name << std::endl;
pthread_mutex_unlock(&gmutex);
sleep(1);
}
}
int main()
{
pthread_t threads[num];
for(int i = 0 ; i < num ; i++)
{
char* name = new char[1024];
//char name[1024];
snprintf(name,1024,"thread-%d",i+1);
pthread_create(threads+i,nullptr,Wait,(void*)name);
}
for(int i = 0 ; i < num ; i++)
{
pthread_join(threads[i],nullptr);
}
return 0;
}
发现打印过程比较凌乱,
加上上面的一行语句,我们让所有线程进行等待,
编译运行程序,我们发现,所有的线程都在进行等待,都在“铃铛”上进行排队,所以我们的主线程需要的就是唤醒新线程,所以在主线程中加入唤醒代码:
除了使用pthread_cond_signal一个一个唤醒,还可以使用pthread_cond_broadcast把所有线程全部唤醒,
好,写到这里,我们简单认识并使用了这些接口。接下来我们要说一下生产消费模型:
生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。 生产者和消费者彼此之间不直接通讯, 而通过阻塞队列来进行通讯, 所以生产者生产完数据之后不用等待消费者处理, 直接扔给阻塞队列, 消费者不找生产者要数据, 而是直接从阻塞队列里取, 阻塞队列就相当于一个缓冲区, 平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
生产者消费者模型有如下优点:
1.解耦
2.支持并发,效率高
3.支持忙闲不均
为了便于记忆,生产消费模型要遵循“321”原则:
1:1个交易场所(以特定数据结构形式存在的一段内存空间)
2:2种角色(生产线程、消费线程)
3:3种关系(生产和生产 消费和消费 生产和消费)
其中,生产和生产之间是互斥关系,消费和消费之间是互斥关系,生产和消费之间是互斥和同步关系。
未来我们实现生产消费模型,本质就是通过代码实现321原则,用锁和条件变量(或其他方式)来实现三种关系!
基于 BlockingQueue 的生产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
首先,我们要写一个单生产单消费的模型,然后扩展到多生产多消费的模型,此时只需要满足生产和消费之间的同步与互斥关系即可,不需要考虑生产与生产、消费与消费之间的关系。
阻塞队列直接使用C++中queue的数据结构。
在生产者生产数据入队列的函数中,在队列满了之后,必须等待,此时生产者不能生产。但由于等待时是处于临界区,生产者线程还拿着锁呢,所以pthread_mutex_wait的第二个参数就是除了让自己继续排队等待,还会自己释放传入的锁,在生产者释放了锁之后,消费者就可以申请锁去消费了!但是,在pthread_mutex_wait函数返回时,不还在临界区吗?可是生产线程现在没有拿到锁呀?!这不就有问题了嘛!因此,当pthread_mutex_wait函数返回时,必须先参与锁的竞争,重新加上锁,该函数才会返回!
void Pop(T* out)
{
pthread_mutex_lock(&_mutex);
if(IsEmpty())
{
pthread_cond_wait(&_c_cond,&_mutex);
}
//1.没空 || 2.被唤醒了
*out = _block_quene.front();
_block_quene.pop();
pthread_mutex_unlock(&_mutex);
}
void Equeue(const T& in)
{
pthread_mutex_lock(&_mutex);
if(IsFull())
{
//满了,必须等待,生产者不能生产
pthread_cond_wait(&_p_cond,&_mutex);
}
//1.没满 || 2.被唤醒了
_block_quene.push(in); // 生产到阻塞队列
pthread_mutex_unlock(&_mutex);
// 让消费者消费
}
在写完生产者和消费者入数据和出数据的接口后,我们发现,如果队列为满,导致生产者去等待,但是没有人去唤醒生产者;如果队列为空,导致消费者去等待,但是没有人去唤醒消费者。但是,队列为不为空,生产者最清楚,因为生产者在临界区会生产数据,当出临界区时,保证队列至少有1个数据;队列为不为满,消费者最清楚,因为消费者在临界区会消费数据,当出临界区时,保证队列一定不为满。因此,生产者和消费者可以互相唤醒对方。
还有一个问题,如果是有2个消费者,1个生产者,队列为空,那2个消费者都阻塞在wait那里,当生产者生产了1个数据,并且用了pthread_cond_broadcast去同时唤醒两个生产者,其中1个消费时申请到了锁,然后去消费数据,但是此时另一个消费者在干什么呢?在等待,不是在条件变量那里等待,而是在锁那里等,等那个消费者把锁释放,如果这个消费者恰巧申请到了锁,但是队列里已经没有数据可拿,我们把这种条件尚未满足,但是线程被异常唤醒的情况,叫做伪唤醒,所以为了避免这样的情况,判断为空判断为满不能用if,只能用while:
这样就保证了代码的鲁棒性!
在写好生产消费模型,就可以调用:
void *Consumer(void *args)
{
BlockQuene<int> *bq = static_cast<BlockQuene<int> *>(args);
while(true)
{
//1.获取数据
int data = 0;
bq->Pop(&data);
//2.处理数据
std::cout << "Consumer -> " << data << std::endl;
}
}
void *Productor(void *args)
{
srand(time(nullptr)^getpid());
BlockQuene<int> *bq = static_cast<BlockQuene<int> *>(args);
while(true)
{
sleep(2);
//1.构建数据
int data = rand() % 10 + 1;
//2.生产数据
bq->Equeue(data);
std::cout << "Productor -> " << data << std::endl;
}
}
int main()
{
BlockQuene<int>* bq = new BlockQuene<int>();
pthread_t c, p;
pthread_create(&c, nullptr, Consumer, (void *)bq);
pthread_create(&p, nullptr, Productor, (void *)bq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
}
我们发现,生产者生产一个,消费者消费一个。
上面我们是传递的int,更常见的是生产者向缓冲区投递“任务”,比如class类对象、函数等。
using task_t = std::function<void()>;
void Download()
{
std::cout << "我是一个下载任务" << std::endl;
}
void *Consumer(void *args)
{
BlockQuene<task_t> *bq = static_cast<BlockQuene<task_t>*>(args);
while(true)
{
//1.获取数据
task_t t;
bq->Pop(&t);
// t.Excute();
t();
//2.处理数据
//std::cout << "Consumer -> " << t.result() << std::endl;
}
}
void *Productor(void *args)
{
srand(time(nullptr));
BlockQuene<task_t> *bq = static_cast<BlockQuene<task_t> *>(args);
while(true)
{
sleep(2);
//1.构建数据
// int x = rand() % 10 + 1;
// usleep(x*1000);
// int y = rand() % 10 + 1;
// task_t t(x,y);
//2.生产数据
bq->Equeue(Download);
std::cout << "Productor -> Download " << std::endl;
}
}
int main()
{
BlockQuene<task_t>* bq = new BlockQuene<task_t>();
pthread_t c, p;
pthread_create(&c, nullptr, Consumer, (void *)bq);
pthread_create(&p, nullptr, Productor, (void *)bq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
}
以上是单生产单消费模型,但是也存在多生产多消费的情况,实际上,在多生产多消费的情况下,上面的代码我们不需要改变,直接支持多生产多消费,因为只有一把锁。
上面我们说到生产者消费者模型的优点之一是效率高,具体体现是:由于生产者产生任务要花时间,消费者处理任务也要花时间,一个生产者在进行生产的时候,其他生产者可能在进行构建任务,一个消费者在进行获取任务的时候,其他消费者可能早已经获取了任务,正在处理任务,所以对于消费者侧,获取任务和处理任务就是并发了,对于生产者侧,往阻塞队列里放任务和产生任务就是并发了,这才是高效的主要。
所以,单生产单消费、单生产多消费、多生产单消费、多生产多消费什么时候使用呢?如果生产任务很快处理任务很慢就选单生产多消费,如果生产任务很慢处理任务很快就选多生产单消费,如果生产任务很慢处理任务很慢就选多生产多消费。
因为无论生产者还是消费者,都要检测资源是否满足,而只有真正查一次才知道,查本身就是访问临界区,所以查以前必定要加锁,由于检测结果是在临界区才知道,所以wait一定是在临界区。
POSIX信号量
sem_init是用来初始化信号量,其中信号量是sem_t类型,第一个参数是sem_t变量的地址,第二个参数设置为0,表示进程内共享信号量,第三个参数表示要信号量要初始的值,返回值表示初始化是否成功。
sem_init是用于销毁信号量。
sem_wait就是信号量的P操作,使信号量--,如果申请信号量失败,进程就会阻塞在这里。
sem_post就是用完资源之后归还信号量,使信号量++。
基于环形队列的生产消费模型
我们可以使用数据来模拟环形队列的特性,逻辑上是环状的,物理上是可能是线性的,通过取模运算模拟环形队列。
从头部位置取元素,从尾部放元素。然而,当head==tail时,环形队列可能为空,也可能为满,为了解决这个问题,可以采用计数器或者牺牲一个空位置(if(head == (tail+1)%N))。但是,现在不用为判断循环队列满空的问题发愁,因为我们有信号量。我们要解决的问题是多线程如何在环形队列中进行生产和消费,包括单生产单消费和多生产多消费。
1.如果队列为空,让谁先运行?生产者先生产。
2.如果队列满了,让谁再访问?消费者来消费。
这就是生产消费在局部会有顺序性,也体现出互斥特点。
3.如果队列不为空&&队列不为满,此时head!=end,这就允许了生产和消费同时进行。
基于以上分析,我们可以得出以下结论:
a.不能让生产者把消费者套一个圈。
b.不能让消费者超过生产者。
因此,环形队列只有3种情况,为空,为满,不为空不为满,只有当环形队列为空或为满时,生产和消费才会指向同一个位置,才会存在对应的并发问题。在不为空不为满时,生产和消费才会指向不同的位置,可以并发访问环形队列,这样就体现了把环形队列不整体使用的特点。
那么,我们如何保证以上原则呢?通过使用信号量!信号量是用来做互斥与同步的。站在消费者角度,最关心的是数据资源;站在生产者角度,最关心的是空间资源。数据资源+空间资源=N,N为环形队列长度。刚开始时,数据资源sem_t data_sem=0,空间资源sem_t space_sem=N。
对于生产者,要申请空间资源(P操作),返回数据资源(V操作),对于消费者,要申请数据资源(P操作),返回空间资源(V操作)。
生产者
P(space_sem)
//生产
V(data_sem)
消费者
P(data_sem)
//消费
V(space_sem)
刚开始时,data_sem并没有资源,即便消费者线程先来执行,信号量会拦住消费者,只有生产者能申请到信号量来进行生产,这不就是互斥吗。如果现在数据资源是满的,即便生产者先来执行,信号量也会拦住生产者,无法生产,而生产者就可能申请到信号量消费,这不就是互斥吗。生产者不可能一直生产,因为有信号量的限制,这就注定了生产者不能把消费者套一个圈。消费者也不能一直消费,不会读到垃圾数据或超过消费者。所以,通过上面两段伪代码,就能把1.2.a.b.这4个原则全部维持住。如果队列不为空且不为满,那生产者和消费者可以同时进来,只要保证它们访问的不是同一个位置,就可以保证并发,不为空且不为满时head!=end,这样天然两个下标不会指向同一个位置。
关于环形队列的基本类定义如下:
#pragma once
#include <string>
#include <vector>
#include <iostream>
#include <semaphore.h>
template <class T>
class RingQuene
{
public:
RingQuene(int max_cap)
: _ringqueue(max_cap), _max_cap(max_cap), _c_step(0), _p_step(0)
{
sem_init(&_data_sem,0,0);
sem_init(&_space_sem,0,max_cap);
}
~RingQuene()
{
sem_destroy(&_data_sem);
sem_destroy(&_space_sem);
}
private:
std::vector<T> _ringqueue;
int _max_cap;
int _c_step; //消费者的位置
int _p_step; //生产者的位置
sem_t _data_sem; //消费者关心的信号量
sem_t _space_sem; //生产者关心的信号量
};
void P(sem_t& s)
{
sem_wait(&s);
}
void V(sem_t& s)
{
sem_post(&s);
}
void Push(const T& in) // 生产者
{
P(_space_sem);
_ringqueue[_p_step] = in;
_p_step++;
_p_step %= _max_cap;
V(_data_sem);
}
void Pop(T* out) //消费者
{
P(_data_sem);
*out = _ringqueue[_c_step];
_c_step++;
_c_step %= _max_cap;
V(_space_sem);
}
无论是Push还是Pop,都要先申请信号量,再释放信号量。
然后,包含一下之前写过的Task.hpp,让线程执行Task任务,
template <class T>
class RingQuene
{
private:
void P(sem_t& s)
{
sem_wait(&s);
}
void V(sem_t& s)
{
sem_post(&s);
}
public:
RingQuene(int max_cap)
: _ringqueue(max_cap), _max_cap(max_cap), _c_step(0), _p_step(0)
{
sem_init(&_data_sem,0,0);
sem_init(&_space_sem,0,max_cap);
}
void Push(const T& in) // 生产者
{
P(_space_sem);
_ringqueue[_p_step] = in;
_p_step++;
_p_step %= _max_cap;
V(_data_sem);
}
void Pop(T* out) //消费者
{
P(_data_sem);
*out = _ringqueue[_c_step];
_c_step++;
_c_step %= _max_cap;
V(_space_sem);
}
~RingQuene()
{
sem_destroy(&_data_sem);
sem_destroy(&_space_sem);
}
private:
std::vector<T> _ringqueue;
int _max_cap;
int _c_step; //消费者的位置
int _p_step; //生产者的位置
sem_t _data_sem; //消费者关心的信号量
sem_t _space_sem; //生产者关心的信号量
};
以上是单生产单消费模型,那如果是多生产多消费模型,该如何改呢?如果直接用上面的代码,肯定会出问题,原因在于,虽然多个生产者和消费者都可以申请信号量,但是,生产者和消费者的下标位置(_c_step和_p_step)只有1个,这样下标位置就成了临界资源,多个生产者就有可能访问同一个下标,所以必须要加锁。
如上,对临界资源进行加解锁,确实维护了生产者和生产者之间、消费者和消费者之间的互斥关系,但是,随着而来的问题是,先加锁好还是先申请信号量好?如果先加锁,那必然是一个生产者进入临界区后再由这个生产者去申请信号量,这看似没问题,但是这时其它生产者线程只能在锁那里等待,后来当生产者线程释放了信号量和锁之后,其他生产者才能先竞争锁,然后才能申请信号量,这就注定了申请锁和申请信号量是串行的。但如果先让所有生产者去申请信号量,每一个线程先把信号量预定着,然后它们之间再去竞争锁,这样,在某一个线程申请锁成功后,其他线程早已经各自预定了一个信号量,当这个线程释放锁之后,其他线程就可以立即竞争锁而不用去申请信号量了。所以,要先申请信号量,再加锁,申请信号量是原子的,这样会使效率高一些。
但是,这么说的话,同一个时刻只能有一个生产者线程和消费者线程在跑,那多生产多消费的意义又在哪里呢?实际上,和基于阻塞队列的生产消费模型类似,我们更需要关注构造数据和处理数据,这些可以由多线程并发完成,
不知道有没有发现一个细节,信号量这里,对资源进行使用、申请,为什么不判断一下条件是否满足?因为信号量本身就是判断条件!锁本身和资源无关,只是表明让线程串行起来,而信号量是判断内部资源数量多少的,所以,信号量的本质就是一个计数器,是资源的预定机制,采用原子性将内部资源的使用状态在外部就可以判断出来。