【线程】线程的同步
本文重点:理解条件变量和生产者消费者模型
同步是在保证数据安全的情况下,让我们的线程访问资源具有一定的顺序性
条件变量cond
当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了,就是申请锁失败了,它就要添加到一个队列中等待,这个队列叫做等待队列,把这个等待队列可以叫做条件变量,也就是说申请锁失败,进行等待是在条件变量下等的,这样它们排好队,就有顺序性。
条件变量依赖于锁的实现
那我怎么知道几时要在条件变量下等呢?
一定是临界资源不就绪,没错,临界资源也是有状态的!!
你怎么知道临界资源是就绪还是不就绪的?
你判断出来的!判断是访问临界资源吗?必须是的,也就是判断必须在加锁之后!!
条件变量接口
初始化接口和锁是一样的
pthread_cond_t: 系统提供的类型
cond:要初始化的条件变量
attr:条件变量的属性,设为nullptrpthread_cond_t cond = PTHREAD_COND_INITIALIZER;
全局的条件变量,自动初始化和销毁
等待条件满足
cond:要在这个条件变量上等待
mutex:锁,后面详细解释
唤醒等待
signal是唤醒一个线程,broadcast是唤醒条件变量下的全部线程
下面通过代码来理解上面的问题
注意函数接口的位置,想想为什么函数是在这?
#include<iostream>
#include<unistd.h>
#include<pthread.h>
using namespace std;
int cnt=0;
pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond=PTHREAD_COND_INITIALIZER;
void* Count(void* args)
{
pthread_detach(pthread_self());//分离线程
uint64_t number=(uint64_t)args;
cout<<"pthread: "<<number<<",create success"<<endl;
while(1)
{
pthread_mutex_lock(&mutex);
// 我们怎么知道我们要让一个线程去休眠了那?一定是临界资源不就绪,没错,临界资源也是有状态的!!
// 你怎么知道临界资源是就绪还是不就绪的?你判断出来的!判断是访问临界资源吗?必须是的,也就是判断必须在加锁之后!!!
pthread_cond_wait(&cond,&mutex);
//为什么这个函数在这里?因为临界资源不就绪,就要等待
//为什么还要一个锁的参数?因为这个线程在前面的时候申请了锁,此时线程持有锁
//1.pthread_cond_wait让线程等待的时候,会自动释放锁!2.唤醒而返回的时候,重新持有锁
cout<<"pthread: "<<number<<", cnt: "<<cnt++<<endl;
pthread_mutex_unlock(&mutex);
sleep(1);
}
return nullptr;
}
int main()
{
for(uint64_t i=0;i<5;i++)
{
pthread_t tid;
pthread_create(&tid,nullptr,Count,(void*)i);
usleep(1000);
}
//让主线程把等待的线程唤醒
while(1)
{
sleep(1);
pthread_cond_signal(&cond);//唤醒条件变量下的一个线程,默认第一个开始
cout<<"signal one thread..."<<endl;
}
return 0;
}
从结果看出,线程访问临界资源时具有一定的顺序性
要点:
pthread_cond_wait函数的位置
我们怎么知道我们要让一个线程去休眠了那?
一定是临界资源不就绪,没错,临界资源也是有状态的!!
你怎么知道临界资源是就绪还是不就绪的?你判断出来的!判断是访问临界资源吗?必须是的,也就是判断必须在加锁之后!!!
为什么这个函数在这里?因为临界资源不就绪,就要等待
为什么还要一个锁的参数?因为这个线程在前面的时候申请了锁,此时线程持有锁
1.pthread_cond_wait让线程等待的时候,会自动释放锁!2.唤醒而返回的时候,重新持有锁
总的来说:条件变量依赖于锁的实现
可能大家还没有很理解,那就在看看下面的生产者消费者模型,再次理解线程怎么等待和唤醒的
生产者消费者模型的原理
producter consumer
生产者生产产品供货给超市,消费者通过从超市消费,得到产品,超市就相当于一个大的缓存,生产者生产的产品都可以放在超市里,供消费者使用,这样可以提高效率,支持忙闲不均,生产和消费的行为存在一定的解耦(解耦的意思也就是生产者生产是一个执行流,消费者消费是另一个执行流,两个执行流可以同时访问超市资源,就有一定的解耦,不像只有一个执行流时,就不能同时执行)
在生产者生产产品的过程中,消费者是不能访问产品的,但是消费者可以处理产品,相反,在消费者访问产品时,生产者无法生产,但是生产者可以获取数据,这样结合,效率就会很高
记住生产者消费者模型就记住“321”原则
3种关系
2种角色--生产者,消费者
1个场所--超市
生产者消费者模型的实现
思路:创建两个线程,单生产单消费的,一个充当消费者,一个充当生产者,消费者消费任务,生产者生产任务。写一个等待队列的类,方法写生产任务和消费任务,这个是共享资源,需要加锁和解锁,看资源状态来添加条件变量,在写一个任务的类,写清楚是什么任务
大家可以把下面的三个文件的代码拷贝到自己的Visual Studio Code下看,更好看一些,代码是不难得,认真看是能看懂的
一定要特别关注条件变量的接口的使用和他的位置
main.cc
#include "blockqueue.hpp"
#include "task.hpp"
#include<time.h>
using namespace std;
void *Producter(void *p_args)
{
int len=opers.size();
BlockQueue<Task>* bq=static_cast<BlockQueue<Task>*>(p_args);
while (1)
{
// 模拟生产者获取数据的过程
int data1=rand()%10+1;//[1,10]
usleep(100);
int data2=rand()%10;
char op=opers[rand()%len];
Task t(data1,data2,op);
bq->push(t);//生产任务
// cout << "生产了一个任务:" << t.GetTask() <<endl;
printf("生产了一个任务: %s\n",t.GetTask().c_str());
sleep(1);
}
return nullptr;
}
void *Consumer(void *c_args)
{
BlockQueue<Task>* bq=static_cast<BlockQueue<Task>*>(c_args);
while (1)
{
//消费任务
Task t= bq->pop();
//计算
t.run();//模拟消费者消费消费数据的过程
// cout << "处理任务: " <<t.GetTask()<<" 运算结果:"<<t.GetResult()<<endl;
printf("处理任务:%s,运算结果:%s\n",t.GetTask().c_str(),t.GetResult().c_str());
}
return nullptr;
}
int main()
{
srand(time(nullptr));
pthread_t p, c;
BlockQueue<Task>* bq=new BlockQueue<Task>;
pthread_create(&p, nullptr, Producter, (void *)bq);
pthread_create(&c, nullptr, Consumer, (void *)bq);
pthread_join(p, nullptr);
pthread_join(c, nullptr);
delete bq;
return 0;
}
blockqueue.hpp
#pragma once
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include<queue>
using namespace std;
pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER;
//生产者和消费者的条件变量要分开的,它们是不同的
pthread_cond_t c_cond=PTHREAD_COND_INITIALIZER;//消费者的条件变量
pthread_cond_t p_cond=PTHREAD_COND_INITIALIZER;//生产者的条件变量
template<class T>
class BlockQueue
{
public:
BlockQueue(int maxcap=4):_maxcap(maxcap)
{}
T pop()
{
//加锁
pthread_mutex_lock(&mutex);
// 我们怎么知道我们要让一个线程去休眠了那?一定是临界资源不就绪,没错,临界资源也是有状态的!!
// 你怎么知道临界资源是就绪还是不就绪的?你判断出来的!判断是访问临界资源吗?必须是的,也就是判断必须在加锁之后!!!
if(_wait_q.size()==0)//队列为空,临界资源不就绪,就不能消费了,在消费者条件变量下等待
{
pthread_cond_wait(&c_cond,&mutex);//1.pthread_cond_wait让线程等待的时候,会自动释放锁!2.唤醒而返回的时候,重新持有锁
}
T out=_wait_q.front();
_wait_q.pop();
pthread_cond_signal(&p_cond);//消费了一个,生产者肯定可以生产了,唤醒生产者的条件变量下的一个线程
pthread_mutex_unlock(&mutex);
return out;
}
void push(const T& in)
{
pthread_mutex_lock(&mutex);
if(_wait_q.size()==_maxcap)//队列满了,临界资源不就绪,就不能生产了,在生产者条件变量下等待
{
pthread_cond_wait(&p_cond,&mutex);
}
_wait_q.push(in);
pthread_cond_signal(&c_cond);//生产了一个,消费者肯定可以消费了,唤醒消费者的条件变量下的一个线程
pthread_mutex_unlock(&mutex);
}
~BlockQueue()
{}
private:
queue<T> _wait_q;//等待队列
int _maxcap;//队列的最大容量
};
task.hpp
#pragma once
#include <iostream>
#include <string>
using namespace std;
string opers="+-*/%";
enum
{
Divzero = 1,
Modzero,
Unknown
};
class Task
{
public:
Task(int data1, int data2, char op) : _data1(data1), _data2(data2), _op(op), _result(0), _exitcode(0)
{}
void run()
{
switch (_op)
{
case '+':
{
_result = _data1+_data2;
break;
}
case '-':
{
_result = _data1-_data2;
break;
}
case '*':
{
_result = _data1*_data2;
break;
}
case '/':
{
if (_data2 == 0) _exitcode = Divzero;
else _result = _data1/_data2;
break;
}
case '%':
{
if (_data2 == 0) _exitcode = Modzero;
else _result = _data1%_data2;
break;
}
default:
{
_exitcode=Unknown;
break;
}
}
}
void operator()()
{
run();
}
string GetResult()
{
string r=to_string(_data1);
r+=_op;
r+=to_string(_data2);
r+='=';
r+=to_string(_result);
r+='[';
r+=to_string(_exitcode);
r+=']';
return r;
}
string GetTask()
{
string r=to_string(_data1);
r+=_op;
r+=to_string(_data2);
r+="=?";
return r;
}
private:
int _data1;
int _data2;
char _op;
int _result;
int _exitcode;
};
main.cc中打印用printf比较好,用cout我的平台下会有点乱序
看懂上面的实现再来看这个问题
什么是伪唤醒?
假如多个生产者生产了四个产品,生产者条件变量下的等待的线程有三个,消费者消费了一个产品,假如用的pthread_cond_broadcast接口,那就唤醒了那三个线程,它们就pthread_cond_wait函数返回,重新持有锁,就生产产品,但是到第二个线程的时候,队列已经满了,第二个线程还会生产 ,那就越界出错了
怎么防止伪唤醒?
就在条件判断时改为循环,不要if,这样函数返回时还要判断(上面的代码还是if,你们自己改吧)
下面的代码是把上面的实现改为多生产者多消费者
只改了main.cc
#include "blockqueue.hpp"
#include "task.hpp"
#include<time.h>
using namespace std;
void *Producter(void *p_args)
{
int len=opers.size();
BlockQueue<Task>* bq=static_cast<BlockQueue<Task>*>(p_args);
while (1)
{
// 模拟生产者获取数据的过程
int data1=rand()%10+1;//[1,10]
usleep(100);
int data2=rand()%10;
char op=opers[rand()%len];
Task t(data1,data2,op);
bq->push(t);//生产任务
// cout << "生产了一个任务:" << t.GetTask() <<endl;
printf("生产了一个任务:%s,thread id: %x\n",t.GetTask().c_str(),pthread_self());
sleep(1);
}
return nullptr;
}
void *Consumer(void *c_args)
{
BlockQueue<Task>* bq=static_cast<BlockQueue<Task>*>(c_args);
while (1)
{
//消费任务
Task t= bq->pop();
//计算
t.run();//模拟消费者消费消费数据的过程
// cout << "处理任务: " <<t.GetTask()<<" 运算结果:"<<t.GetResult()<<endl;
printf("处理任务:%s,运算结果:%s,thread id: %x\n",t.GetTask().c_str(),t.GetResult().c_str(),pthread_self());
}
return nullptr;
}
int main()
{
srand(time(nullptr));
pthread_t p[5], c[3];
BlockQueue<Task>* bq=new BlockQueue<Task>;
for(int i=0;i<5;i++)
pthread_create(p+i, nullptr, Producter, (void *)bq);
for(int i=0;i<3;i++)
pthread_create(c+i, nullptr, Consumer, (void *)bq);
for(int i=0;i<5;i++)
pthread_join(p[i], nullptr);
for(int i=0;i<3;i++)
pthread_join(c[i], nullptr);
delete bq;
return 0;
}