《多线程基础之条件变量》
【条件变量导读】条件变量是多线程中比较灵活而且容易出错的线程同步手段,比如:虚假唤醒、为啥条件变量要和互斥锁结合使用?windows和linux双平台下,初始化、等待条件变量的api一样吗?
本文将分别为您介绍条件变量在windows和linux平台下的用法和注意事项,好!直接进入主题。
条件变量的使用场景可以用如下流程图进行阐述。
我们需反复判断一个多线程共享条件是否满足,一直到该条件满足为止(由于该条件被多个线程操作)。因此每次判断前进行加锁操作,判断完毕后解锁。但上述逻辑存在严重的效率问题,假设我们解锁离开临界区后,其他线程修改了条件,导致条件满足了;此时程序仍然需要睡眠 n 秒后才能得到反馈。因此我们需要这样一种机制:
某个线程 A 在条件不满足的情况下,主动让出互斥锁,
让其他线程去争夺这把锁,当前线程A在此处等待,等待条件的满足;
一旦条件满足,其他线程释放锁,并通知条件满足,
线程A就可以被立刻唤醒并能获取到互斥锁对象。
1、Windows下条件变量的用法
具体条件变量的定义和api,我就不介绍了,大家参考如下示例程序,就能很轻松地掌握条件变量地初始化,本文地重点是介绍条件变量地用法及注意事项。
#include <iostream>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <list>
class ThreadTask
{
public:
ThreadTask(int taskId)
{
m_taskId = taskId;
}
void doTask()
{
std::cout << " threadId: " << std::this_thread::get_id() << " do Task, taskId: " << m_taskId << std::endl;
}
private:
int m_taskId;
};
/定义全局互斥锁对象
std::mutex myMutex;
//定义全局的windows条件变量
std::condition_variable myCv;
/全局任务队列
std::list<ThreadTask*> taskList;
void* consumeThread()
{
while (true)
{
/判全局条件(公共队列taskList是否为空)前,先加锁
std::unique_lock<std::mutex> lk(myMutex);
while (taskList.empty())
{
/*
如果条件不满足,那继续等待条件变量满足条件
同时立刻让出刚占有的互斥锁对象,让其他线程去争抢
*/
myCv.wait(lk);
}
//假设条件满足了,当前线程将从myCv.wait(lk)返回,
//并立刻获取互斥锁对象操作公共的全局队列
ThreadTask* pTask = taskList.front();
//头部弹任务
taskList.pop_front();
if (!pTask)
continue;
pTask->doTask();
delete pTask;
pTask = nullptr;
}
return nullptr;
}
void* produceThread()
{
int taskId = 0;
while (true)
{
ThreadTask* pTask = nullptr;
{
std::lock_guard<std::mutex> lk(myMutex);
taskId++;
pTask = new ThreadTask(taskId);
taskList.push_back(pTask);
std::cout << "thread: " << std::this_thread::get_id() << " produce a Task, taskId: " << taskId << std::endl;
}
/*
生产完任务,通知消费线程consumeThread条件满足
释放锁资源myMutex
*/
myCv.notify_one();
std::this_thread::sleep_for(std::chrono::seconds(1));
}
return nullptr;
}
int main()
{
std::thread consumeThread1(consumeThread);
std::thread consumeThread2(consumeThread);
std::thread consumeThread3(consumeThread);
std::thread produceThread(produceThread);
if (produceThread.joinable())
produceThread.join();
if (consumeThread1.joinable())
consumeThread1.join();
if (consumeThread2.joinable())
consumeThread2.join();
if (consumeThread3.joinable())
consumeThread3.join();
return 0;
}
程序运行的结果:
可以看出生产线程生产完任务塞到公共队列中去,通知消费线程去公共队列中取任务,一共四个线程在操作公共队列taskList,并没有出现资源冲突的情况。这便是条件变量使用的妙处!
从上述代码中可以看到,条件变量竟然在等待一把互斥锁。
std::unique_lock<std::mutex> lk(myMutex);
while (taskList.empty())
myCv.wait(lk);
为啥条件变量要和互斥锁配合一起使用?我们可以假设下面这段伪码,互斥锁和条件变量分开使用。
lock(myMutex)
while (taskList.empty())
{
//释放锁
unlock(myMutex);
/再等待条件cv
cond_wait(&cv);
//再加锁
lock(myMutex)
}
假设线程当前线程(线程A)执行到第5行代码,释放了锁,此时操作系统把CPU时间片分配给另外一个等待myMutex的线程B,随后线程B释放信号,表明条件cv已经满足,等到线程A争抢到CPU时间片之后,就已经错过了线程B释放的信号了,那么线程B将永远阻塞在cond_wait()接口上。
解锁和等待条件变量必须是原子性的操作,要么都成功,要么都不成功,否则就很难保证线程的同步。
还有虚假唤醒的问题,何为虚假唤醒,就是 myCv.wait(lk)接口突然返回了,但它并不是被其它线程的信号唤醒的,可能是被操作系统某个中断信号给唤醒的,此时并没有相应的任务需要处理,如果继续让线程走下去,就可能会有问题,所以为了防止这种虚假唤醒的现象,我们外部循环去判断公共队列是否为空,如果为空,那就继续等待。这是Linux服务端面试必问的考点,请同学们慎重。
好,介绍完条件变量在windows下的用法,那么接着看下条件变量在linux下的用法。
2、Linux下条件变量的用法
条件变量的用法流程和windows的差不多,主要差异就是创建线程、初始化条件变量、等待条件变量的api接口不一样。
那,直接上代码!
#include <iostream>
#include <pthread.h>
#include <error.h>
#include <list>
#include <unistd.h>
#include <semaphore.h>
using namespace std;
class ThreadTask
{
public:
ThreadTask(int taskId)
{
m_taskId = taskId;
}
void doTask()
{
cout << " doTask taskId : " << m_taskId << " thread Id: " << pthread_self() << endl;
}
private:
int m_taskId;
};
pthread_mutex_t myMutex;
pthread_cond_t myCond;
list<ThreadTask*> taskList;
void* consumeThread(void* param)
{
while(true)
{
pthread_mutex_lock(&myMutex);
while(taskList.empty())
{
pthread_cond_wait(&myCond, &myMutex);
}
ThreadTask* pTask = taskList.front();
taskList.pop_front();
pthread_mutex_unlock(&myMutex);
if (pTask == nullptr)
continue;
pTask->doTask();
delete pTask;
pTask = nullptr;
}
return NULL;
}
void* produceThread(void* param)
{
int taskID = 0;
ThreadTask* pTask = NULL;
while (true)
{
pTask = new ThreadTask(taskID);
pthread_mutex_lock(&myMutex);
taskList.push_back(pTask);
std::cout << "produce a task, taskID: " << taskID << ", threadID: " << pthread_self() << std::endl;
pthread_mutex_unlock(&myMutex);
//释放信号量,通知消费者线程
pthread_cond_signal(&myCond);
taskID++;
sleep(1);
}
return NULL;
}
int main()
{
pthread_mutex_init(&myMutex, NULL);
pthread_cond_init(&myCond, NULL);
//创建3个消费者线程
pthread_t consumerThreadID[5];
for (int i = 0; i < 3; ++i)
{
pthread_create(&consumerThreadID[i], NULL, consumeThread, NULL);
}
//创建一个生产者线程
pthread_t producerThreadID;
pthread_create(&producerThreadID, NULL, produceThread, NULL);
pthread_join(producerThreadID, NULL);
for (int i = 0; i < 3; ++i)
{
pthread_join(consumerThreadID[i], NULL);
}
pthread_cond_destroy(&myCond);
pthread_mutex_destroy(&myMutex);
return 0;
}
Linux平台下运行的结果: