【Linux】多线程:线程互斥、互斥锁
目录
一、多线程访问公共资源时所产生的问题
二、互斥相关背景概念
互斥量mutex(锁)的引入
三、互斥量
1、初始化互斥量(mutex)
2、互斥量加锁
3、互斥量解锁
4、 销毁互斥量
四、互斥量的使用
1、使用静态互斥量
2、使用动态互斥量
五、互斥锁的原理
一、多线程访问公共资源时所产生的问题
我们知道一个进程中的多个线程共享进程的地址空间,因此进行线程间的通信是极为容易的,这也就意味着进程中的多个线程可以随意访问进程中的“公共资源”。但多线程的并发访问会不会对这些公共资源造成不可预测的结果呢?
在此之前,为了方便使用,我们先对pthread库中的线程进行一下封装。
#include <pthread.h>
#include <string>
#include <functional>
using FuncType = std::function<void(const std::string&)>;//包装器
//线程类的封装
class Thread
{
private:
pthread_t _tid;//线程ID
std::string _thread_name;//线程名
FuncType _func;//线程的执行函数
bool _is_running;//线程的状态
//...
private:
void Excute()
{
_is_running = true;
_func(_thread_name);
_is_running = false;
}
//类中的函数参数包含this指针,使用static修饰
static void* ThreadRoute(void* arg)
{
Thread* self = static_cast<Thread*>(arg);
self->Excute();
return (void*)0;
}
public:
Thread(std::string thread_name, FuncType func)
:_thread_name(thread_name), _func(func)
{
_is_running = false;
}
//线程启动
bool Start(){
int ret = pthread_create(&_tid, NULL, ThreadRoute, (void*)this);
if (ret != 0){
return false;
}
std::cout << _thread_name << " has Started" << std::endl;
return true;
}
//线程取消
bool Stop()
{
if(_is_running){
int ret = pthread_cancel(_tid);
if (ret != 0){
return false;
}
std::cout << _thread_name << " has Stoped" << std::endl;
_is_running = false;
}
return true;
}
//回收线程
bool Join()
{
if(!_is_running){
int ret = pthread_join(_tid, NULL);//不关心线程返回值,设置为NULL
if (ret != 0){
return false;
}
}
std::cout << _thread_name << " has Joined" << std::endl;
return true;
}
};
我们写一个程序来验证一下:我们在全局区创建一个变量表示票数,让多个线程同时去“抢票”,直到票数为0时停止抢票。
#include <pthread.h>
#include <stdio.h>
#include <iostream>
#include <unistd.h>
#include <vector>
#include "Thread.hpp"
#define BUFF_SIZE 128
int ticket_num = 1000;
// 线程函数
void grab_tickets(const std::string& name)
{
while(true){
if(ticket_num > 0){
usleep(1000);//模拟每次的业务处理时长
ticket_num--;
std::cout << name << " has get a ticket! " << "Remaining Quantity is : " << ticket_num << std::endl;
}
else{
break;
}
}
}
int main()
{
// 存储每个线程的tid
std::vector<Thread> threads;
// 假设创建5个线程,此时加上主线程,该进程中共有6个线程
for (int i = 0; i < 5; i++)
{
std::string name = "Thread - " + std::to_string(i + 1);
threads.emplace_back(name, grab_tickets);
}
for (auto& t : threads)
{
if(!t.Start()){
perror("Start false!!!");
exit(-1);
}
}
sleep(1);//1秒后未退出就强制退出
for (auto& t : threads)
{
if(!t.Stop()){
perror("Stop false!!!");
exit(-1);
}
}
for (auto& t : threads)
{
if(!t.Join()){
perror("Join false!!!");
exit(-1);
}
}
return 0;
}
我们运行程序,看一下结果:
可以观察到,我们在线程的执行函数中使用了条件判断,当票的数量小于等于0时就停止抢票。但是我们发现程序运行后的票数竟然变为了负数。这是为什么呢?由此可以看出,抢票的操作并不是“原子的”。
• if 语句判断条件为真以后, 代码可以并发的切换到其他线程。
• usleep 这个模拟漫长业务的过程, 在这个漫长的业务过程中, 可能有很多个。线程会进入该代码段。
• --ticket 操作本身就不是一个原子操作。 而是对应三条汇编指令:
1、 将共享变量 ticket 从内存加载到寄存器中。
2、更新寄存器里面的值, 执行 -1 操作。
3、 将新值从寄存器写回共享变量 ticket 的内存地址。
要解决以上问题, 需要做到三点:
• 代码必须要有互斥行为: 当代码进入临界区执行时, 不允许其他线程进入该临界区。
• 如果多个线程同时要求执行临界区的代码, 并且临界区没有线程在执行, 那么只能允许一个线程进入该临界区。
• 如果线程不在临界区中执行, 那么该线程不能阻止其他线程进入临界区。
要做到这三点, 本质上就是需要一把锁。 Linux 上提供的这把锁叫互斥量。
二、互斥相关背景概念
1、临界资源:多线程执行流共享的资源就叫做临界资源。
2、临界区:每个线程内部,访问临界资源的代码,就叫做临界区。
3、互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用。
4、原子性(后面讨论如何实现):不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成。
互斥量mutex(锁)的引入
1、大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量归属单个 线程,其他线程无法获得这种变量。
2、但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量(如全局区变量),可以通过数据的共享,完成线程之间的交互。
3、多个线程并发的操作共享变量,会带来一些问题,如上述例子中的“抢票”。
因此,在多线程编程中需要保证对共享资源操作的原子性。而对共享资源操作的保护本质上就是保护操作共享资源的代码片段,也就是临界区。因此,我们只需要保证对临界区操作的原子性就能够保证多线程正确地操作临界资源。
上述操作的实现我们可以通过互斥量,也就是为临界区的起始位置和结束位置加一把“锁”来控制。当多个线程并发进入临界区前,都会去竞争互斥锁的归属。只有拿到互斥锁的那个线程才会去执行临界区的代码,而其他线程阻塞在申请锁的函数当中。在退出临界区时,该线程会释放它所拥有的互斥锁。如果该线程需要再次执行该任务,此时该线程会与其他线程一起重新参与锁的竞争。
三、互斥量
1、初始化互斥量(mutex)
方法 1:静态分配
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
- 解释:这是静态初始化互斥量的方式。
PTHREAD_MUTEX_INITIALIZER
是一个宏,用于初始化互斥量mutex
,该宏定义在 POSIX 线程库中。静态初始化适用于那些在程序编译时已知的全局或静态互斥量。- 优点:不需要额外的初始化函数调用,不需要手动释放互斥量,生命周期随程序。
- 限制:仅适用于静态或全局互斥量的初始化,不能用于动态创建的互斥量。
方法 2:动态分配
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
- 参数:
mutex
:指向要初始化的互斥量的指针。这个互斥量必须在使用之前被初始化。attr
:用于指定互斥量的属性。可以为NULL
,表示使用默认的属性;也可以指定一个pthread_mutexattr_t
结构体来设置互斥量的自定义属性。- 返回值:如果初始化成功,返回
0
;否则,返回一个错误代码。- 优点:适用于动态分配的互斥量,也可以通过
attr
参数设置互斥量的特定属性,例如递归锁、进程间锁等。- 使用示例:
在使用动态分配方法时,需要在不再需要互斥量时调用pthread_mutex_t mutex; pthread_mutex_init(&mutex, NULL);
pthread_mutex_destroy
来销毁它,以释放相关资源。
2、互斥量加锁
函数原型
int pthread_mutex_lock(pthread_mutex_t *mutex);
参数
mutex
:指向要加锁的互斥量的指针。返回值
- 如果加锁成功,返回
0
。- 如果加锁失败,返回一个错误代码。例如,如果调用
pthread_mutex_lock
的线程已经持有该锁,且互斥量的属性设置为递归锁,则可能返回EDEADLK
(死锁错误)。
调用 pthread_ lock 时, 可能会遇到以下情况:
• 互斥量处于未锁状态, 该函数会将互斥量锁定, 同时返回成功
• 发起函数调用时, 其他线程已经锁定互斥量, 或者存在其他线程同时申请互斥量,但没有竞争到互斥量, 那么 pthread_ lock 调用会陷入阻塞(执行流被挂起), 等待互斥量解锁。
3、互斥量解锁
函数原型
int pthread_mutex_unlock(pthread_mutex_t *mutex);
参数
mutex
:指向要解锁的互斥量的指针。返回值
- 如果解锁成功,返回
0
。- 如果解锁失败,返回一个错误代码。例如,如果调用
pthread_mutex_unlock
的线程没有持有该锁,可能会返回EPERM
(操作不允许)错误。
4、 销毁互斥量
【注意】:
• 使用 PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要销毁。
• 不要销毁一个已经加锁的互斥量。• 已经销毁的互斥量, 要确保后面不会有线程再尝试加锁。
函数原型
int pthread_mutex_destroy(pthread_mutex_t *mutex);
参数
mutex
:指向要销毁的互斥量的指针。该互斥量必须已经被初始化,并且在调用pthread_mutex_destroy
之前,所有对该互斥量的操作(如pthread_mutex_lock
和pthread_mutex_unlock
)都必须已经完成。返回值
- 如果成功,返回
0
。- 如果销毁失败,返回一个错误代码。例如,如果有其他线程正在使用该互斥量,可能会返回
EBUSY
。
四、互斥量的使用
1、使用静态互斥量
我们可以在全局区创建一个互斥量,使用宏进行初始化。在使用互斥锁之前,我们需要让不同的线程看到同一把锁,进而去并发竞争这把锁,而所有线程都恰好能够看到全局区的变量。
#include <pthread.h>
#include <stdio.h>
#include <iostream>
#include <unistd.h>
#include <vector>
#include "Thread.hpp"
#define BUFF_SIZE 128
int ticket_num = 1000;
//全局的互斥量,使用宏进行初始化
pthread_mutex_t g_mutex = PTHREAD_MUTEX_INITIALIZER;
// 线程函数
void grab_tickets(const std::string& name)
{
while(true){
pthread_mutex_lock(&g_mutex);//加锁
//----------------临界区起始位置------------------------
if(ticket_num > 0){
usleep(1000);//模拟每次的业务处理时长
ticket_num--;
std::cout << name << " has get a ticket! " << "Remaining Quantity is : " << ticket_num << std::endl;
//----------------临界区结束位置1------------------------
pthread_mutex_unlock(&g_mutex);//解锁
}
else{
//----------------临界区结束位置2------------------------
pthread_mutex_unlock(&g_mutex);//解锁
break;
}
}
}
int main()
{
// 存储每个线程的tid
std::vector<Thread> threads;
// 假设创建5个线程,此时加上主线程,该进程中共有6个线程
for (int i = 0; i < 5; i++)
{
std::string name = "Thread - " + std::to_string(i + 1);
threads.emplace_back(name, grab_tickets);
}
for (auto& t : threads)
{
if(!t.Start()){
perror("Start false!!!");
exit(-1);
}
}
sleep(10);
for (auto& t : threads)
{
if(!t.Join()){
perror("Join false!!!");
exit(-1);
}
}
return 0;
}
2、使用动态互斥量
实际上,我们也可以将互斥锁进行进一步的封装。在上述代码中,有两个退出路径,分别在if和else中。我们可以在进入临界区前创建一个局部对象,在创建锁对象时在构造函数中实现加锁。当退出临界区后,因为该对象只是一个局部变量,会自动进行销毁。所以我们在析构函数中进行解锁即可。具体实现如下:
#pragma once
#include <pthread.h>
class LockGuard
{
private:
pthread_mutex_t* _mutex;
public:
LockGuard(pthread_mutex_t* mutex)
:_mutex(mutex)
{
pthread_mutex_lock(_mutex);//加锁
}
~LockGuard()
{
pthread_mutex_unlock(_mutex);//解锁
}
};
同样的,当在局部使用pthread_mutex_init函数创建局部互斥锁的时候,我们也需要让所有线程看到同一把锁。因此,所有的线程都需要拿到这把局部锁的地址,进而去竞争和使用。那如何做到呢?我们可以对封装好的线程对象稍作修改——为线程类添加一个私有变量:pthread_mutex_t* mutex用来存储互斥锁的地址。如此,当线程需要使用互斥锁时,直接从线程对象中获取到局部锁的地址即可。
Thread.hpp:
#include <pthread.h>
#include <string>
#include <functional>
class Thread;//声明类
using FuncType = std::function<void(Thread* thread)>;//包装器
//线程类的封装
class Thread
{
private:
pthread_t _tid;//线程ID
std::string _thread_name;//线程名
FuncType _func;//线程的执行函数
bool _is_running;//线程的状态
pthread_mutex_t* _mutex;//局部互斥锁的地址
private:
void Excute()
{
_is_running = true;
_func(this);
_is_running = false;
}
//类中的函数参数包含this指针,使用static修饰
static void* ThreadRoute(void* arg)
{
Thread* self = static_cast<Thread*>(arg);
self->Excute();
return (void*)0;
}
public:
Thread(std::string thread_name, FuncType func, pthread_mutex_t* mutex)
:_thread_name(thread_name), _func(func), _mutex(mutex)
{
_is_running = false;
}
//线程启动
bool Start(){
int ret = pthread_create(&_tid, NULL, ThreadRoute, (void*)this);
if (ret != 0){
return false;
}
std::cout << _thread_name << " has Started" << std::endl;
return true;
}
//线程取消
bool Stop()
{
if(_is_running){
int ret = pthread_cancel(_tid);
if (ret != 0){
return false;
}
std::cout << _thread_name << " has Stoped" << std::endl;
_is_running = false;
}
return true;
}
//回收线程
bool Join()
{
if(!_is_running){
int ret = pthread_join(_tid, NULL);//不关心线程返回值,设置为NULL
if (ret != 0){
return false;
}
}
std::cout << _thread_name << " has Joined" << std::endl;
return true;
}
//获取互斥锁的地址
pthread_mutex_t* get_mutex_address(){
return _mutex;
}
std::string get_name(){
return _thread_name;
}
};
#include <pthread.h>
#include <stdio.h>
#include <iostream>
#include <unistd.h>
#include <vector>
#include "Thread.hpp"
#include "LockGuard.hpp"
int ticket_num = 1000;
//全局的互斥量,使用宏进行初始化
pthread_mutex_t g_mutex = PTHREAD_MUTEX_INITIALIZER;
// 线程函数
void grab_tickets(Thread* thread)
{
while(true){
LockGuard lock_guard(thread->get_mutex_address());//加锁
//----------------临界区起始位置------------------------
if(ticket_num > 0){
usleep(1000);//模拟每次的业务处理时长
ticket_num--;
std::cout << thread->get_name() << " has get a ticket! " << "Remaining Quantity is : " << ticket_num << std::endl;
//----------------临界区结束位置1------------------------
}
else{
//----------------临界区结束位置2------------------------
//退出时自动解锁
break;
}
}
}
int main()
{
// 存储每个线程的tid
std::vector<Thread> threads;
//创建锁
pthread_mutex_t local_mutex;
int ret = pthread_mutex_init(&local_mutex, NULL);
if(ret != 0){
perror("Mutex init false!!!");
exit(-1);
}
// 假设创建5个线程,此时加上主线程,该进程中共有6个线程
for (int i = 0; i < 5; i++)
{
std::string name = "Thread - " + std::to_string(i + 1);
threads.emplace_back(name, grab_tickets, &local_mutex);
}
for (auto& t : threads)
{
if(!t.Start()){
perror("Start false!!!");
exit(-1);
}
}
sleep(10);//10秒后未退出就强制退出
for (auto& t : threads)
{
if(!t.Stop()){
perror("Stop false!!!");
exit(-1);
}
}
for (auto& t : threads)
{
if(!t.Join()){
perror("Join false!!!");
exit(-1);
}
}
//销毁锁
pthread_mutex_destroy(&local_mutex);
return 0;
}
如此便解决了多线程并发所带来的对公共资源操作的问题。但新的问题又出现了:运行上述程序的同学会发现,上述程序的执行结果出现了一个线程长时间占用锁的情况,这显然是不符合“抢票”这一动作的规则的。我们想要的是多个线程依次抢票,而不是“一家独大”。要解决这种情况,需要引入条件变量的使用,在下节内容中会详细讲解。
五、互斥锁的原理
为了实现互斥锁操作,大多数体系结构都提供了 swap 或 exchange 指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的 总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。 现在我们把 lock 和 unlock 的伪代码改一下:
在cpu中,寄存器只有一套。每个线程在执行时都有自己的线程上下文(thread context),包括寄存器的状态、程序计数器(PC)、堆栈指针等。上下文切换发生在线程切换时,需要保存当前线程的上下文,并恢复下一个线程的上下文。
上下文切换:
- 当操作系统在多线程环境中切换线程时,会将当前线程的寄存器状态(即上下文)保存到内存中,然后加载下一个线程的寄存器状态。这样,每个线程在切换时都能从它上次运行的状态继续执行。
- 实际上,处理器寄存器的物理数量是有限的,通常不会为每个线程提供独立的寄存器。相反,操作系统和处理器通过保存和恢复寄存器状态来模拟每个线程拥有自己的寄存器集。
首先我们先在内存中创建一个互斥量,让多线程都去竞争这一个互斥量。当一个线程竞争到互斥量时,首先会将自己al寄存器的内容清零,接着会将互斥量与自己的al寄存器的内容进行交换。
交换操作完成后,al 寄存器中会存储互斥量原来的值。通过检查这个值,线程可以判断是否成功获得互斥量。如果交换后的al寄存器中的值大于0,则说明该线程成功竞争到互斥量,此时该互斥量变为线程私有;否则,则说明其他线程已经持有锁,当前线程需要挂起等待。
由于cpu硬件电路在实现交换指令时是原子的,所以此时不会被其他线程介入,这也就保证了对互斥量操作的原子性,也就是说加锁这一操作本身就是原子的。
而在竞争到互斥量之后,线程仍可能会被cpu调度切换,但此时内存中唯一的互斥量已经被该线程带走,成为了线程自身的数据。此时,即使其他线程被调度,也无法执行临界区的代码,只能挂起等待抢走互斥量的线程执行完临界区代码后将互斥量交换回内存中。此后该线程再与其他线程一起重新竞争这把锁。