Linux -- 从抢票逻辑理解线程互斥
目录
抢票逻辑代码:
thread.hpp
thread.cc
运行结果:
为什么票会抢为负数?
概念前言
临界资源
临界区
原子性
数据不一致
为什么数据不一致?
互斥
概念
pthread_mutex_init(初始化互斥锁)
pthread_mutex_lock(申请互斥锁)
pthread_mutex_unlock(释放互斥锁)
pthread_mutex_destory(销毁互斥锁)
全局的互斥锁
thread.cc 代码
局部的互斥锁
thread.cc 代码
抢票逻辑代码:
thread.hpp
#ifndef __THREAD_HPP__
#define __THREAD_HPP__
#include<vector>
#include<iostream>
#include<string>
#include<functional>
#include<pthread.h>
#include<unistd.h>
namespace ThreadModule
{
//给 函数参数为T(T为任意类型)的引用,返回值为 void 的函数 重命名为func_t
template<typename T>
using func_t=std::function<void(T&)>;
template<typename T>
class Thread
{
public:
//线程的任务
void Excute()
{
_func(_data);
}
public:
//构造函数
Thread(func_t<T> func, T &data,const std::string &name="none-name")
:_func(func),_data(data),_threadname(name),_stop(true)
{ }
//如果没有static,由于 this 指针,函数的参数有2个,而pthread_create要求函数参数只能有void*
//加上static,则要求函数不能访问类内的非静态成员变量,也就避免了this指针作为函数参数
static void* threadroute(void* args)
{
//参数从void* 类型转为Thread<T> *类型,static_cast是一种相对安全的类型转换方式
Thread<T> *self=static_cast<Thread<T> *>(args);
//由于没有了this指针,所以需要封装Excute函数来传递 _data参数,从而执行任务
self->Excute();
return nullptr;
}
//开始执行任务
bool Start()
{
//创建线程
int n=pthread_create(&_tid,nullptr,threadroute,this);
if(!n)
{
_stop=false;//修改状态
return true;
}
else
{
return false;
}
}
void Detach()
{
//有线程启动了才分离线程
if(!_stop)
pthread_detach(_tid);
}
void Join()
{
if(!_stop)
pthread_join(_tid,nullptr);
}
std::string name()
{
return _threadname;
}
void Stop()
{
_stop=true;
}
//析构函数
~Thread()
{ }
private:
std::string _threadname;//线程名
bool _stop;//该线程是否启动,true表示未启动,false表示已启动
pthread_t _tid;
T &_data;
func_t<T> _func;//线程调用的函数
};
}
#endif
thread.cc
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include<string>
#include<vector>
using namespace std;
#include"thread.hpp"
using namespace ThreadModule;
int g_tickets=10000;//共享资源,未保护
const int num=4;
void route(int &tickets)
{
//票没抢完就一直抢
while(true)
{
if(tickets>0)//还有票,可以继续抢
{
usleep(1000);
//目前抢到的票
printf("get tickets:%d\n",tickets);
tickets--;
}
else//已经没票了,不能抢了,退出
{
break;
}
}
}
int main()
{
std::vector<Thread<int>> threads;
//创建线程
for(int i=0;i<num;i++)
{
std::string name="thread-"+std::to_string(i+1);
threads.emplace_back(route,g_tickets,name);
}
//启动线程
for(auto &threads:threads)
{
threads.Start();
}
//等待线程
for(auto &threads:threads)
{
threads.Join();
std::cout<<"wait thread done, thread is: "<<threads.name()<<std::endl;
}
//
return 0;
}
运行结果:
发现票数被减为了负数,且有的票被重复抢了,每次运行的结果都不一样。
为什么票会抢为负数?
概念前言
临界资源
多线程执行流共享的资源称为临界资源。
临界区
每个线程内部,访问了临界资源的代码称为临界区。
原子性
不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成。
数据不一致
在多线程或分布式系统中,由于并发操作或其他因素导致的数据状态不符合预期的情况。当多个线程或进程同时访问和修改共享资源时,如果没有适当的同步机制,可能会出现数据不一致的问题。这可能导致系统的不稳定、错误的结果或难以调试的行为。例如,上述的运行结果中出现了剩余的票数为负数的情况,而剩余的票数不应该出现负数,数据状态不符合预期,即数据不一致。
为什么数据不一致?
我们可以来模拟一下代码的整个运行过程。
在代码中,我们创建了4个线程,每个线程都要执行以下代码,函数内中一共有 3 个地方访问了临界资源,标为1、2、3:
void route(int &tickets)
{
//票没抢完就一直抢
while(true)
{
//还有票,可以继续抢
if(tickets>0)//1
{
usleep(1000);
//目前剩下的票数
printf("get tickets:%d\n",tickets);//2
tickets--;//3
}
else//已经没票了,不能抢了,退出
{
break;
}
}
}
假设现在只剩一张票了,即 g_tickets = 1:
假设现在执行的是线程 1,线程 1 进行 tickets>0 的判断,这个判断过程是由 CPU 来完成的。
系统把 g_tickets 的值从内存读到 CPU 的寄存器 ebx 中,判断结果为真,线程 1 开始执行 if 的代码块,还没执行到打印操作,线程 1 被挂起并切走了,切走时线程 1 带走了寄存器的上下文数据,g_tickets 还是 1,还没有写回到内存中!
此时轮到线程 2 执行函数了,线程 2 也进行了 tickets>0 的判断,由于 g_tickets 的值依旧为 1,和线程 1 的过程一样,所以线程 2 也执行 if 的代码块, 线程 2 也还没有执行到打印操作,就被挂起并切走了。线程 3 同理。
再次轮到线程 1 时,由于已经进行过 if 判断了,线程 1 直接执行打印操作,对 tickets -- 并把 tickets 的值写回到内存中,g_tickets 的值变为 0。 这里我们需要了解到,tickets-- 看似只有一句代码,其实要分为三个过程来执行:
- 把 tickets 从内存中读到 CPU 中;
- CPU 进行 -- 操作;
- 把 tickets 的值写回内存中。
再次轮到线程 2,因为线程 2 已经进行过 if 判断了,线程 2 以为 tickets 还是1,也直接执行打印操作,把 tickets-- 并写回到内存中,但线程 2 在进行 tickets -- 操作时,读到的 tickets 已经是 0 了,-- 操作后,tickets 变为 -1,内存中的 g_tickets 的值变为 -1.
线程 3 也是同理,-- 后 tickets 变为 -2,写回到内存后,内存中的 g_tickets 的值变为 -2.
就这样 g_tickets 的值被减到了负数!
也就是说,多线程访问共享资源 g_tickets 时,由于共享资源 g_tickets 未被保护,且 -- 操作不是原子的,在执行任何一个步骤时线程都可能被切换,导致产生了计算过程的中间状态!
互斥
由于多个执行流访问全局数据的代码,所以会发生上面的问题,多个线程共享的全局数据就是临界资源,访问了全局数据的代码其实就是临界区,换句话说,保护临界区,就可以保护临界资源,就可以解决上面数据不一致的问题。
概念
任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,对临界资源起保护作用。
pthread_mutex_init(初始化互斥锁)
#include <pthread.h>
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);
初始化一个互斥锁。
mutex
:指向要初始化的互斥锁对象的指针。
attr
:指向互斥锁属性对象的指针,可以为NULL
以使用默认属性。
pthread_mutex_lock(申请互斥锁)
#include <pthread.h>
int pthread_mutex_lock(pthread_mutex_t *mutex);
尝试获取互斥锁。如果锁已经被其他线程持有,则当前线程将被阻塞,直到锁可用。
mutex
:指向要锁定的互斥锁对象的指针。
pthread_mutex_unlock(释放互斥锁)
#include <pthread.h>
int pthread_mutex_unlock(pthread_mutex_t *mutex);
释放一个互斥锁,允许其他等待的线程获取该锁。
mutex
:指向要解锁的互斥锁对象的指针。
pthread_mutex_destory(销毁互斥锁)
#include <pthread.h>
int pthread_mutex_destroy(pthread_mutex_t *mutex);
销毁一个互斥锁,释放相关资源。
mutex
:指向要销毁的互斥锁对象的指针。
加锁
注意加锁应该精细,只需要在临界区加锁,非临界区不需要加锁! 加锁成功后,只有申请到互斥锁的线程才可以访问临界区,其他线程阻塞等待,直到锁释放后,其他线程成功竞争到互斥锁,才可以访问临界区!
全局的互斥锁
如果互斥锁是全局的,或者静态的,则不需要 init 和 destory。
thread.cc 代码
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include<string>
#include<vector>
using namespace std;
#include"thread.hpp"
using namespace ThreadModule;
int g_tickets=10000;//共享资源,未保护
const int num=4;
//一把全局的锁
pthread_mutex_t gmutex=PTHREAD_MUTEX_INITIALIZER;
void route(int &tickets)
{
//票没抢完就一直抢
while(true)
{
pthread_mutex_lock(&gmutex);//申请锁
//还有票,可以继续抢
if(tickets>0)//1
{
usleep(1);
//目前剩下的票数
printf("get tickets:%d\n",tickets);//2
tickets--;//3
pthread_mutex_unlock(&gmutex);//释放锁
}
else//已经没票了,不能抢了,退出
{
pthread_mutex_unlock(&gmutex);//释放锁
break;
}
}
}
int main()
{
std::vector<Thread<int>> threads;
//创建线程
for(int i=0;i<num;i++)
{
std::string name="thread-"+std::to_string(i+1);
threads.emplace_back(route,g_tickets,name);
}
//启动线程
for(auto &threads:threads)
{
threads.Start();
}
//等待线程
for(auto &threads:threads)
{
threads.Join();
std::cout<<"wait thread done, thread is: "<<threads.name()<<std::endl;
}
//
return 0;
}
不再出现抢到负数的票和抢到重复的票的情况了,且每次运行结果都一样:
局部的互斥锁
thread.cc 代码
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include<string>
#include<vector>
#include<mutex>
using namespace std;
#include"thread.hpp"
using namespace ThreadModule;
int g_tickets=10000;//共享资源,未保护
const int num=4;
//因为锁变成局部的,为了让route可以访问互斥锁,且统计每个线程抢到了多少张票,定义一个类
class ThreadData
{
public:
ThreadData(int &tickets,std::string name,pthread_mutex_t &mutex)
:_tickets(tickets),_name(name),_mutex(mutex),_total(0)
{ }
~ThreadData()
{ }
public:
int &_tickets;//所有线程最终都会引用同一个全局变量g_tickets
std::string _name;
int _total;
pthread_mutex_t &_mutex;
};
void route(ThreadData *td)
{
//票没抢完就一直抢
while(true)
{
pthread_mutex_lock(&td->_mutex);//申请锁
//还有票,可以继续抢
if(td->_tickets>0)//1
{
usleep(1);
//目前剩下的票数
printf("%s running, get tickets:%d\n",td->_name.c_str(),td->_tickets);//2
td->_tickets--;//3
td->_total++;
pthread_mutex_unlock(&td->_mutex);//释放锁
}
else//已经没票了,不能抢了,退出
{
pthread_mutex_unlock(&td->_mutex);//释放锁
break;
}
}
}
int main()
{
//一把局部的锁
pthread_mutex_t mutex;
pthread_mutex_init(&mutex,nullptr);//初始化互斥锁
std::vector<Thread<ThreadData*>> threads;
std::vector<ThreadData*> datas;
//创建线程
for(int i=0;i<num;i++)
{
std::string name="thread-"+std::to_string(i+1);
ThreadData* td = new ThreadData(g_tickets,name,mutex);
threads.emplace_back(route,td,name);
datas.emplace_back(td);
}
//启动线程
for(auto &threads:threads)
{
threads.Start();
}
//等待线程
for(auto &threads:threads)
{
threads.Join();
//std::cout<<"wait thread done, thread is: "<<threads.name()<<std::endl;
}
for(auto data:datas)
{
std::cout<<data->_name<<" : "<<data->_total<<std::endl;
delete data;
}
pthread_mutex_unlock(&mutex);
//
return 0;
}
封装成类
LockGuard.hpp 代码
#ifndef __LOCK_GUARD_HPP__
#define __LOCK_GUARD_HPP__
#include<pthread.h>
#include<iostream>
class LockGuard
{
public:
LockGuard(pthread_mutex_t *mutex)
:_mutex(mutex)
{
pthread_mutex_lock(_mutex);//加锁
}
~LockGuard()
{
pthread_mutex_unlock(_mutex);//解锁
}
private:
pthread_mutex_t *_mutex;
};
#endif
thread.cc 代码
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include<string>
#include<vector>
#include<mutex>
using namespace std;
#include"thread.hpp"
#include"LockGuard.hpp"
using namespace ThreadModule;
int g_tickets=10000;//共享资源,未保护
const int num=4;
//因为锁变成局部的,为了让route可以访问互斥锁,且统计每个线程抢到了多少张票,定义一个类
class ThreadData
{
public:
ThreadData(int &tickets,std::string name,pthread_mutex_t &mutex)
:_tickets(tickets),_name(name),_mutex(mutex),_total(0)
{ }
~ThreadData()
{ }
public:
int &_tickets;//所有线程最终都会引用同一个全局变量g_tickets
std::string _name;
int _total;
pthread_mutex_t &_mutex;
};
void route(ThreadData *td)
{
//票没抢完就一直抢
while(true)
{
LockGuard guard(&td->_mutex);//临时对象
//还有票,可以继续抢
if(td->_tickets>0)//1
{
usleep(1);
//目前剩下的票数
printf("%s running, get tickets:%d\n",td->_name.c_str(),td->_tickets);//2
td->_tickets--;//3
td->_total++;
}
else//已经没票了,不能抢了,退出
{
break;
}
}
}
int main()
{
//一把局部的锁
pthread_mutex_t mutex;
pthread_mutex_init(&mutex,nullptr);//初始化互斥锁
std::vector<Thread<ThreadData*>> threads;
std::vector<ThreadData*> datas;
//创建线程
for(int i=0;i<num;i++)
{
std::string name="thread-"+std::to_string(i+1);
ThreadData* td = new ThreadData(g_tickets,name,mutex);
threads.emplace_back(route,td,name);
datas.emplace_back(td);
}
//启动线程
for(auto &threads:threads)
{
threads.Start();
}
//等待线程
for(auto &threads:threads)
{
threads.Join();
//std::cout<<"wait thread done, thread is: "<<threads.name()<<std::endl;
}
for(auto data:datas)
{
std::cout<<data->_name<<" : "<<data->_total<<std::endl;
delete data;
}
pthread_mutex_unlock(&mutex);
//
return 0;
}