线程同步(互斥锁条件变量)
线程同步
- 互斥锁(互斥量)
- 条件变量
- 生产/消费者模型
一、互斥锁
C++11提供了四种互斥锁:
- mutex:互斥锁。
- timed_mutex:带超时机制的互斥锁。
- recursive_mutex:递归互斥锁。
- recursive_timed_mutex:带超时机制的递归互斥锁。
包含头文件:#include <mutex>
1、mutex类
1)加锁lock()
互斥锁有锁定和未锁定两种状态。
如果互斥锁是未锁定状态,调用lock()成员函数的线程会得到互斥锁的所有权,并将其上锁。
如果互斥锁是锁定状态,调用lock()成员函数的线程就会阻塞等待,直到互斥锁变成未锁定状态。
2)解锁unlock()
只有持有锁的线程才能解锁。
lock和unlock至少满足95%的应用场景!
3)尝试加锁try_lock()
如果互斥锁是未锁定状态,则加锁成功,函数返回true。
如果互斥锁是锁定状态,则加锁失败,函数立即返回false。(线程不会阻塞等待)
2、timed_mutex类
增加了两个成员函数:
bool try_lock_for(时间长度);
bool try_lock_until(时间点);
3、recursive_mutex类
递归互斥锁允许同一线程多次获得互斥锁,可以解决同一线程多次加锁造成的死锁问题。
4、lock_guard类
lock_guard是模板类,可以简化互斥锁的使用,也更安全。
lock_guard的定义如下:
template<class Mutex>
class lock_guard
{
explicit lock_guard(Mutex& mtx);
}
lock_guard在构造函数中加锁,在析构函数中解锁。
lock_guard采用了RAII思想(在类构造函数中分配资源,在析构函数中释放资源,保证资源在离开作用域时自动释放)。
二、条件变量-生产消费者模型
条件变量
- 当条件不满足时,相关线程被一直阻塞,直到某种条件出现,这些线程才会被唤醒。
- 为了保护共享资源,条件变量需要和互斥锁结合一起使用
- 生产/消费者模型(高速缓存队列)
条件变量是一种线程同步机制。当条件不满足时,相关线程被一直阻塞,直到某种条件出现,这些线程才会被唤醒。
C++11的条件变量提供了两个类:
condition_variable:只支持与普通mutex搭配,效率更高。
condition_variable_any:是一种通用的条件变量,可以与任意mutex搭配(包括用户自定义的锁类型)。
包含头文件:<condition_variable>
1、condition_variable类
主要成员函数:
1)condition_variable() 默认构造函数。
2)condition_variable(const condition_variable &)=delete 禁止拷贝。
3)condition_variable& condition_variable::operator=(const condition_variable &)=delete 禁止赋值。
4)notify_one() 通知一个等待的线程。
5)notify_all() 通知全部等待的线程。
6)wait(unique_lock<mutex> lock) 阻塞当前线程,直到通知到达。
7)wait(unique_lock<mutex> lock,Pred pred) 循环的阻塞当前线程,直到通知到达且谓词满足。
8)wait_for(unique_lock<mutex> lock,时间长度)
9)wait_for(unique_lock<mutex> lock,时间长度,Pred pred)
10)wait_until(unique_lock<mutex> lock,时间点)
11)wait_until(unique_lock<mutex> lock,时间点,Pred pred)
重点(wait(mutex)函数):
wait(mutex)做了三件事:
-
把互斥锁解锁。
-
阻塞,等待被唤醒。
-
被唤醒后,给互斥锁加锁。
2、unique_lock类
template <class Mutex> class unique_lock是模板类,模板参数为互斥锁类型。
unique_lock和lock_guard都是管理锁的辅助类,都是RAII风格(在构造时获得锁,在析构时释放锁)。它们的区别在于:为了配合condition_variable,unique_lock还有lock()和unlock()成员函数。
生产者消费者模型类示例:
#include <iostream>
#include <string>
#include <thread> // 线程类头文件。
#include <mutex> // 互斥锁类的头文件。
#include <deque> // deque容器的头文件。
#include <queue> // queue容器的头文件。
#include <condition_variable> // 条件变量的头文件。
using namespace std;
class AA
{
mutex m_mutex; // 互斥锁。
condition_variable m_cond; // 条件变量。
queue<string, deque<string>> m_q; // 缓存队列,底层容器用deque。
public:
void incache(int num) // 生产数据,num指定数据的个数。
{
lock_guard<mutex> lock(m_mutex); // 申请加锁。
for (int ii=0 ; ii<num ; ii++)
{
static int bh = 1; // 超女编号。
string message = to_string(bh++) + "号超女"; // 拼接出一个数据。
m_q.push(message); // 把生产出来的数据入队。
}
//m_cond.notify_one(); // 唤醒一个被当前条件变量阻塞的线程。
m_cond.notify_all(); // 唤醒全部被当前条件变量阻塞的线程。
}
void outcache() { // 消费者线程任务函数。
while (true) {
// 把互斥锁转换成unique_lock<mutex>,并申请加锁。
unique_lock<mutex> lock(m_mutex);
// 条件变量虚假唤醒:消费者线程被唤醒后,缓存队列中没有数据。
//while (m_q.empty()) // 如果队列空,进入循环,否则直接处理数据。必须用循环,不能用if
// m_cond.wait(lock); // 1)把互斥锁解开;2)阻塞,等待被唤醒;3)给互斥锁加锁。
m_cond.wait(lock, [this] { return !m_q.empty(); });
// 数据元素出队。
string message = m_q.front(); m_q.pop();
cout << "线程:" << this_thread::get_id() << "," << message << endl;
lock.unlock(); // 手工解锁。
// 处理出队的数据(把数据消费掉)。
this_thread::sleep_for(chrono::milliseconds(1)); // 假设处理数据需要1毫秒。
}
}
};
int main()
{
AA aa;
thread t1(&AA::outcache, &aa); // 创建消费者线程t1。
thread t2(&AA::outcache, &aa); // 创建消费者线程t2。
thread t3(&AA::outcache, &aa); // 创建消费者线程t3。
this_thread::sleep_for(chrono::seconds(2)); // 休眠2秒。
aa.incache(2); // 生产2个数据。
this_thread::sleep_for(chrono::seconds(3)); // 休眠3秒。
aa.incache(5); // 生产5个数据。
t1.join(); // 回收子线程的资源。
t2.join();
t3.join();
}
流程:
- 程序运行,因为wait把互斥锁解开了,所以三个消费者都能加锁成功,现在wait到了第二步,三个线程都被阻塞在条件变量的wait()函数中,此时互斥锁没有被任何线程占有;
- 生产者往队列中放完数据后,会发出条件信号;
- wait()函数接收到i先弄好之后,不一定立即返回,他还要申请加锁,加锁成功后才会返回;
- 如果wait()返回了,一定申请到了锁,接下来可以让队列中的数据出队,出对后再解锁。
消费者线程中的while(m_q.empty())循环:
条件变量存在虚假唤醒的情况:消费者线程被唤醒后,缓存队列中没有数据(三个消费者线程,一次生产两个数据,然后notifyall,全部唤醒,肯定有一个线程拿不到数据,被虚假唤醒了)。如果被虚假唤醒了应该继续等待下一次通知,所以用if肯定不行,必须用while。
也可以用wait(unique_lock<mutex> lock,Pred pred) 版本,添加一个谓词:
m_cond.wait(lock, [this] { return !m_q.empty(); });
效果是一样的,本质上这个重载的wait函数中也有个while循环。