Linux系统:线程
目录
一、页框:管理内存的基本单位
二、页表:虚拟地址是如何转化为物理地址的?
三、线程的概念
1.线程的概念
2.创建线程
3.线程存在的意义
4.线程的缺点
5.线程共享的资源
6.线程私有的资源
四、线程控制
1.线程等待
2.线程的传参
3.线程的返回值
4.多线程创建&多线程等待
5.线程终止
(1)调用pthread_exit函数
(2)调用pthread_cancel函数
6.线程分离:不等待线程
五、线程的tid:地址
六、线程的局部存储
七、线程的封装
八、线程互斥
1.线程互斥的背景
2.互斥锁
3.互斥锁原理
4.互斥锁底层实现原理
(1)线程申请锁(加锁)的底层实现原理
(2)线程解锁的底层实现原理
九、线程同步
1.条件变量接口
2.生产者消费者模型
3.基于BolckingQueue生产者消费者模型
(1)单生产者单消费者模型
(2)多生产者多消费者模型
4.POSIX信号量接口
5.基于环形队列的生产者消费者模型
(1)单生产者单消费者模型
(2)多生产者多消费者模型
十、线程池
1.线程池概念
2.线程池的应用场景
3.线程池示例
4.线程池代码
十一、日志
十二、在线程池中添加日志
十三、线程安全的单例模式
1.懒汉实现单例模式和饿汉实现单例模式介绍
2.懒汉方式实现单例模式线程池源代码
十四、死锁
1.死锁的概念
2.死锁的四个必要条件
3.避免死锁的方法
4.避免死锁的算法
一、页框:管理内存的基本单位
操作系统对于内存的管理是以内存块为单位的,一个内存块的大小为4KB。另外,操作系统对磁盘进行管理时也是以大小为4KB的逻辑区块为单位的。因此内存与磁盘空间进行IO交互时很便捷。
操作系统管理物理内存时,会将内存划分为许多大小为4KB的叶框。当父子进程发生写时拷贝时,即使被修改的数据大小仅有4B,但是操作系统重新申请的空间是一个页框的大小4KB,将需要写时拷贝的数据所在的整个叶框都拷贝过来。
那么操作系统如何管理页框呢?
将叶框存放进数组中,struct page_memory[N],已知物理内存的起始地址为0,第一个叶框的下标也为0,所以接下来每一个叶框的起始地址就是数组下标*4KB
二、页表:虚拟地址是如何转化为物理地址的?
以32位操作系统为例:
虚拟地址的一个地址是4字节,32比特位,操作系统会将其划分为10位、10位和12位:高10位共有2^10=1024种组合作为索引,组成页目录;中10位共有2^10=1024种组合也作为索引,组成页表;低12位共有2^12=4096种组合,每种组合4KB,正好对应页框大小。
页目录中的每个索引指向一个页表,页表中的每个索引根据低12位的地址判断当前虚拟地址在物理内存的哪一个页框中,指向对应页框的起始地址。低12位的地址还作为页内偏移量,根据页框的起始地址,加上页内偏移量就可以得出当前虚拟地址在物理内存中物理地址。
上述页表叫做二级页表。
三、线程的概念
1.线程的概念
函数也有地址,函数中的每一行代码都有地址并且是连续的(一个函数对应一批连续的地址)。如果将多个函数的代码拆分开,本质上是拆分页表。
虚拟地址的本质是一种资源,拥有该资源就可以访问物理内存(页框)
线程是一个进程中的一个执行路线,一个进程中可能会存在多个线程,这些线程会执行进程中不同的代码段。线程是CPU调度的基本单位。
在Window操作系统中,为线程重新设计了一套管理线程的结构tcb,同时还有各种调度算法等。但是线程结构tcb和进程结构pcb的许多属性都相同,所以Linux操作系统复用了进程的结构,用进程的结构task_struct管理线程。操作系统会创建多个task_struct共用同一份进程地址空间,每个task_struct就是一个线程,它们共同组成了一个进程。在Linux操作系统中,对于CPU来说不会再区分线程和进程。
线程又叫做执行流,一个线程一定是小于等于进程的,在Linux系统中,执行流也叫做轻量级进程。
每个线程都会执行整个代码的其中一部分,获得对应代码的虚拟地址
Linux系统中没有线程的概念,只有LWP轻量级进程。为了让用户更容易理解,Linux系统提供了pthread线程库将轻量级进程的系统接口封装成线程的接口提供给用户。
所以Linux系统中的线程也叫做用户级线程
2.创建线程
创建线程函数 pthread_create
#include <pthread.h>
int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
void *(*start_routine)(void *), void *arg);
pthread_t *thread:输出型参数,指向 pthread_t
类型的变量的指针,该变量用来存储新创建线程的标识符。
const pthread_attr_t *attr:指向 pthread_attr_t
类型的变量的指针,它指定了线程的属性。如果设置为 NULL
,则使用默认属性。
void *(*start_routine)(void *):一个函数指针,指向新线程的启动函数,该函数返回值类型为void*并且必须接受一个void*类型参数
void *arg:作为参数传递给start_routine函数
返回值:成功返回0,失败返回一个非0错误码
#include <iostream>
#include <pthread.h>
#include <unistd.h>
using namespace std;
//新线程启动函数
void* threadstart(void* args)
{
while(true)
{
cout<<"new thread is running..."<<"pid: "<<getpid()<<endl;
sleep(1);
}
}
int main()
{
pthread_t tid;//新线程的标识符
pthread_create(&tid,nullptr,threadstart,(void*)"thread_new");//创建新线程
//主线程
while(true)
{
sleep(1);
cout<<"main thread is running..."<<"pid: "<<getpid()<<endl;
}
return 0;
}
同一个进程,但是有两个执行流,使用ps -aL命令查看系统中的LWP(轻量级进程)
3.线程存在的意义
(1)线程创建的成本比进程创建低:创建进程消耗的资源很大,但是创建线程只需要创建对应的task_struct,多个线程共享一个进程地址空间和页表
(2)线程的调度成本比进程低:线程的切换不需要切换页表和进程地址空间,更重要的是线程切换不需要重新加载cache
(3)线程删除成本比进程删除低:删除一个线程只需要删除其对应的task_struct即可
关于cache:
CPU中有多个集成式硬件chche缓存,用于提高CPU对物理内存的访问效率,当CPU需要访问物理内存中的某个数据时,CPU会将该数据的周围数据都提前存放到chche缓存中。当CPU需要访问数据时会优先从chche中查找,如果命中了就直接从chche中读取数据。存放到chche中的数据就叫做热数据。
使用 lscpu 命令查看当前系统CPU的chche:
切换线程不需要重新加载chche,而切换进程需要清空chche,并且将需要访问的数据再次提前加载到chche中,这是线程调度成本比进程低最重要原因
4.线程的缺点
(1)线程健壮性低,一个线程崩溃会导致整个进程崩溃
(2)线程缺乏访问控制,所有线程都共享同一份资源,某个线程的数据可能会被其他线程修改
5.线程共享的资源
线程除了共享同一进程地址空间(进程地址空间的代码段、数据段),还共享以下资源:
- 文件描述符表:某个线程打开的文件,会被其他所有线程看到
- 每种信号的处理方式(SIG_IGN、SIG_DFL或者自定义捕捉函数)
- 当前工作目录
- 用户id和组id
6.线程私有的资源
- 线程id
- 一组寄存器
- 栈
- 错误码errno
- 信号屏蔽字(block表)
- 线程调度优先级
一组寄存器:寄存器中存储的是线程执行的上下文数据
栈:线程在运行时会产生各种临时变量,这些临时变量要保存在每个线程各自的栈区
四、线程控制
Linux系统中没有线程的概念,只有轻量级进程和轻量级进程控制的相关接口,但是为了方便用户,Linux系统自带一个原生线程库pthread,pthread库中将轻量级进程控制的相关接口封装,按照线程的接口方式提供给用户。(所以编译多线程代码时要链接动态库pthread)
1.线程等待
#include <pthread.h>
int pthread_join(pthread_t thread, void **retval)
pthread_t thread:要等待的线程标识符
void **retval:如果非 NULL
,pthread_join
会将结束线程的返回值存储在这个指针指向的位置
返回值:成功返回0,失败返回非零错误码
主线程和新线程谁先运行是不确定的,但是期望的是新线程先退出,主线程获取新线程的推出信息。为了保证新线程比主线程先退出,所以使用pthread_join函数使主线程等待新线程,直到新线程退出,pthread_join函数获取到新线程的退出信息主线程才会继续运行,否则主线程将一直阻塞等待。
如果主线程先退出了,说明main函数结束了,所有线程都会退出。但是不建议如此,主线程需要等待新线程,否则会出现类似僵尸进程的问题。
#include <iostream>
#include <pthread.h>
#include <unistd.h>
using namespace std;
void* threadRun(void* args)
{
int cnt=10;
while(cnt--)
{
cout<<"new thread is running... cnt: "<<cnt<<endl;
sleep(1);
}
return nullptr;
}
int main()
{
//创建新线程
pthread_t tid;//新线程标识符
int n=pthread_create(&tid,nullptr,threadRun,(void*)"thread 1");//创建线程
if(n!=0)//线程创建失败
{
cerr<<"create thread error"<<endl;
return 1;
}
//线程等待
n=pthread_join(tid,nullptr);
if(n==0)//线程等待成功
{
cout<<"main thread wait success"<<endl;
}
return 0;
}
2.线程的传参
创建线程时新线程会有一个void*类型的参数,这也就说明新线程可以接受任何类型的参数。我们可以给新线程传递类对象,类中包含多个参数和方法,这样就不用局限于单一参数了:
给新线程传递类对象时,类对象最好开辟在堆空间上,多个线程都需要类对象时就重新申请堆空间,保证每个线程数据的独立性。如果将类对象开辟在主线程的栈空间上(在main函数中直接定义),多个线程就会访问同一个类对象,互相干扰。
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <string>
using namespace std;
//作为新线程的参数
class ThreadData{
public:
string name;
string num;
};
void* threadRun(void* args)
{
ThreadData* td=static_cast<ThreadData*>(args);//将args类型转换
int cnt=10;
while(cnt--)
{
cout<<td->name<<" is running... num: "<<td->num<<" "<<cnt<<endl;
sleep(1);
}
delete td;
return nullptr;
}
int main()
{
//新线程的参数
ThreadData* td=new ThreadData();
td->name="thread-1";
td->num=1;
//创建新线程
pthread_t tid;//新线程标识符
int n=pthread_create(&tid,nullptr,threadRun,(void*)td);//创建线程
if(n!=0)//线程创建失败
{
cerr<<"create thread error"<<endl;
return 1;
}
//线程等待
n=pthread_join(tid,nullptr);
if(n==0)//线程等待成功
{
cout<<"main thread wait success"<<endl;
}
return 0;
}
3.线程的返回值
线程只有正常运行结束才能正确返回规定的值,一旦运行异常操作系统就会发送进程信号,整个进程都会终止(所有线程都结束),同时返回错误码。
线程的返回值类型为void**,是一个输出型参数,返回void*类型的地址,即返回任何指针类型的地址(void*可以接收任何类型的地址),因此像传参数一样我们可以返回类对象,包含多个参数和方法:
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <string>
using namespace std;
//作为新线程的参数
class ThreadData{
public:
int Excute()
{
return x+y;
}
public:
string name;
int x;
int y;
};
//作为新线程的返回值
class ThreadResult{
public:
string Print()
{
return to_string(x)+"+"+to_string(y)+"="+to_string(result);
}
public:
int x;
int y;
int result;
};
//新线程启动函数
void* threadRun(void* args)
{
ThreadData* td=static_cast<ThreadData*>(args);//将args类型转换
ThreadResult* result =new ThreadResult();
int cnt=10;
while(cnt--)
{
cout<<td->name<<" is running... num: "<<cnt<<endl;
result->x=td->x;
result->y=td->y;
result->result=td->Excute();
sleep(1);
break;
}
delete td;
return result;
}
int main()
{
//新线程的参数
ThreadData* td=new ThreadData();
td->name="thread-1";
td->x=8;
td->y=8;
//创建新线程
pthread_t tid;//新线程标识符
int n=pthread_create(&tid,nullptr,threadRun,(void*)td);//创建线程
if(n!=0)//线程创建失败
{
cerr<<"create thread error"<<endl;
return 1;
}
//线程等待
ThreadResult *result=new ThreadResult();//接收新线程返回值
n=pthread_join(tid,(void**)&result);//线程等待
if(n==0)//线程等待成功
{
cout<<"main thread wait success, new thread exit code:"<<result->Print()<<endl;
}
return 0;
}
4.多线程创建&多线程等待
直接看代码:
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <string>
#include <vector>
using namespace std;
const int num=10;
void* threadrun(void* args)
{
string name=static_cast<const char*>(args);
while(true)
{
cout<<name<<" is running"<<endl;
sleep(1);
break;
}
return args;
}
int main()
{
//创建多线程:创建10个线程
vector<pthread_t> tids;
for(int i=0;i<num;++i)
{
pthread_t tid;//线程标识符
char* name=new char[128];//线程的名字
snprintf(name,128,"thread-%d",i+1);
pthread_create(&tid,nullptr,threadrun,name);//创建线程
tids.emplace_back(tid);
}
//多线程等待
for(auto e:tids)
{
void* name=nullptr;
pthread_join(e,&name);//线程等待
cout<<(const char*)name<<"quit..."<<endl;
delete (const char*)name;
}
return 0;
}
5.线程终止
线程终止有两种方式:(1)线程的函数执行完毕return (2)调用pthread_exit函数
(3)调用pthread_cancel函数
exit函数是专门用于终止进程的,如果一个线程调用exit函数会导致整个进程都结束
线程终止后会释放线程占用的任何资源,包括栈空间
(1)调用pthread_exit函数
#include <pthread.h>
void pthread_exit(void *retval)
void *retval:指向线程的返回值,返回值可以任意设置
#include <pthread.h>
#include <stdio.h>
void* thread_function(void* arg) {
printf("Thread is running.\n");
pthread_exit((void*)42); // 线程返回值 42
return NULL; // 这行代码实际上不会执行
}
int main() {
pthread_t thread_id;
if (pthread_create(&thread_id, NULL, thread_function, NULL) != 0) {
perror("pthread_create");
return 1;
}
void* return_value;
if (pthread_join(thread_id, &return_value) != 0) {
perror("pthread_join");
return 1;
}
printf("Thread returned value: %ld\n", (long)return_value);
return 0;
}
(2)调用pthread_cancel函数
#include <pthread.h>
int pthread_cancel(pthread_t thread)
pthread_t thread:要取消的线程标识符
返回值:成功返回0,失败返回非零错误码
使用pthread_cancel函数取消线程后,pthread_join接收到的返回结果是-1
6.线程分离:不等待线程
进程可以通过使用signal函数将SIGCHLD信号置为SIGIGN,使得父进程无需等待子进程。
同样也有方法可以使主线程无需等待新线程:线程分离:pthread_detach
一个线程被创建时默认是需要被主线程等待的,如果一个线程被分离,其还是属于整个进程的,但是不需要被主线程等待(线程分离后,如果运行异常还是会影响整个进程,整个进程都会终止)
#include <pthread.h>
int pthread_detach(pthread_t thread)
pthread_t thread:线程标识符
返回值:成功返回0,失败返回非零错误码
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <string>
#include <vector>
using namespace std;
const int num=10;
void* threadrun(void* args)
{
pthread_detach(pthread_self());//分离线程
string name=static_cast<const char*>(args);
while(true)
{
cout<<name<<" is running"<<endl;
sleep(1);
break;
}
//return args;
pthread_exit(args);//终止线程
}
int main()
{
//创建多线程:创建10个线程
vector<pthread_t> tids;
for(int i=0;i<num;++i)
{
pthread_t tid;//线程标识符
char* name=new char[128];//线程的名字
snprintf(name,128,"thread-%d",i+1);
pthread_create(&tid,nullptr,threadrun,name);//创建线程
tids.emplace_back(tid);
}
// //多线程等待
// for(auto e:tids)
// {
// void* name=nullptr;
// pthread_join(e,&name);//线程等待
// cout<<(const char*)name<<"quit..."<<endl;
// delete (const char*)name;
// }
return 0;
}
五、线程的tid:地址
Linux系统中没有线程的概念,只有LWP轻量级进程。为了让用户更容易理解,Linux系统提供了pthread线程库将轻量级进程的系统调用接口封装成线程的接口提供给用户。所以操作系统内核提供了LWP值管理轻量级进程,而pthread库提供了tid管理线程。
线程tid是用户级的,轻量级进程LWP的值是内核级的:LWP的值是由操作系统内核分配用于唯一标识一个进程的;而线程的tid是pthread库提供给用户的一个线程ID,由pthread库自行维护,并且pthread库也要对线程进行管理。
那么pthread库如何对线程进行管理呢?
线程相关的方法都在pthread动态库中,因此想要创建线程等操作首先要将pthread动态库加载到内存中并映射到进程地址空间的共享区中
每新建一个线程时,pthread库都会在进程地址空间的共享区中申请一个内存块(tcb),内存块中存储struct pthread、线程局部存储和线程栈。所有的内存块都是连续存放的,如果要管理一个线程只需要找到其对应的内存块即可。而pthread_t tid线程的tid就是内存块的起始地址。
进行线程等待时,本质就是使用tid(即pthread库维护的内存块的地址)找到对应的内存块,从而释放线程所占用的资源。
所以,Linux系统中的线程=pthread库中的线程属性+LWP
将线程的tid以16进制的方式输出:确实是一个地址
#include <iostream>
#include <pthread.h>
#include <string>
#include <unistd.h>
using namespace std;
//转换为16进制
string ToHex(pthread_t tid)
{
char id[128];
snprintf(id,sizeof(id),"0x%lx",tid);
return id;
}
//新线程启动函数
void* threadrun(void* args)
{
string name=static_cast<const char*>(args);
while(true)
{
cout<<name<<" is running, tid: "<<ToHex(pthread_self())<<endl;
sleep(1);
}
return nullptr;
}
//main函数
int main()
{
//创建新线程
pthread_t tid;
pthread_create(&tid,nullptr,threadrun,(void*)"thread-1");
//线程等待
pthread_join(tid,nullptr);
return 0;
}
六、线程的局部存储
在线程的pcb中存在一个线程局部存储空间,允许每个线程将数据私有化,用于需要保持线程独立性的数据
理论上的全局变量会被各个线程共享,但是使用__thread修饰的变量,将会在线程各自的线程局部存储空间中存储,各线程之间数据互不干扰
(注意:__thread只能用于修饰内置类型数据)
#include <iostream>
#include <pthread.h>
#include <string>
#include <unistd.h>
using namespace std;
__thread int val=100;
//转换为16进制
string ToHex(pthread_t tid)
{
char id[128];
snprintf(id,sizeof(id),"0x%lx",tid);
return id;
}
//新线程启动函数
void* threadrun(void* args)
{
string name=static_cast<const char*>(args);
while(true)
{
cout<<name<<" is running, tid: "<<ToHex(pthread_self())<<"val: "<<val<<" &val: "<<&val<<endl;
sleep(1);
val++;
}
return nullptr;
}
//main函数
int main()
{
//创建新线程
pthread_t tid;
pthread_create(&tid,nullptr,threadrun,(void*)"thread-1");
while(true)
{
cout<<"main thread, val: "<<val<<" &val: "<<&val<<endl;
sleep(1);
}
//线程等待
pthread_join(tid,nullptr);
return 0;
}
七、线程的封装
Thread.hpp
#pragma once
#include <iostream>
#include <pthread.h>
#include <string>
using namespace std;
namespace ThreadMoudle
{
typedef void(*func_t)(const string& name);//函数指针,指向线程的启动函数
class Thread
{
public:
void Excute()
{
cout << _name << " is running" << endl;
_isrunning=true;//线程启动后,正在运行
_func(_name);
_isrunning=false;
}
public:
//构造函数
Thread(const string& name,func_t func):_name(name),_func(func)
{
cout<<"create "<<name<<"done"<<endl;
}
//例程函数
//问题1:
//将例程函数用static修饰,它就属于类而不属于对象,就不会有隐藏参数this指针
//由于线程的启动函数只能有一个void*类型的参数,所以必须将例程函数设置为静态函数
//问题2:
//例程函数改为静态函数后,只能调用进程成员变量,不能调用_func
//将this指针作为参数传给例程函数,通过this指针调用_func(将_func函数指针再封装成Excute函数)
static void* ThreadRoutine(void* args)
{
Thread* self=static_cast<Thread*>(args);//this指针
self->Excute();
return nullptr;
}
//线程开始
bool Start()
{
int n=::pthread_create(&_tid,nullptr,ThreadRoutine,this);//创建线程,启动函数为“例程函数”
if(n!=0) return false;//线程创建失败
return true;
}
//线程结束
void Stop()
{
if(_isrunning)
{
pthread_cancel(_tid);//取消线程
_isrunning=false;
cout << _name << " Stop" << endl;
}
}
//线程等待
void Join()
{
if(!_isrunning)//只有线程不在运行时才需要等待
{
pthread_join(_tid,nullptr);
_isrunning=false;
cout << _name << " Joined" << endl;
}
}
//检测线程状态
string Status()
{
if(_isrunning) return "running";
else return "sleep";
}
//返回线程名称
string Name()
{
return _name;
}
//析构函数
~Thread()
{
}
private:
string _name;//线程的名称
pthread_t _tid;//线程的tid
bool _isrunning;//线程是否在运行
func_t _func;//线程的启动函数
};
}
Main.cc
#include <iostream>
#include <string>
#include <vector>
#include <unistd.h>
#include "Thread.hpp"
using namespace std;
// void Print(const string& name)
// {
// int cnt=1;
// while(true)
// {
// cout<<name<<" is running, cnt: "<<cnt++<<endl;
// sleep(1);
// }
// }
// int main()
// {
// ThreadMoudle::Thread t("thread-1",Print);
// t.Start();
// cout<<t.Name()<<", status: "<<t.Status()<<endl;
// sleep(10);
// cout<<t.Name()<<", status: "<<t.Status()<<endl;
// t.Stop();
// sleep(1);
// cout<<t.Name()<<", status: "<<t.Status()<<endl;
// t.Join();
// cout<<"join done"<<endl;
// return 0;
// }
void Print(const std::string &name)
{
int cnt = 1;
while (true)
{
std::cout << name << "is running, cnt: " << cnt++ << std::endl;
sleep(1);
}
}
const int gnum = 10;
int main()
{
// 我在管理原生线程, 先描述,在组织
// 构建线程对象
std::vector<ThreadMoudle::Thread> threads;
for (int i = 0; i < gnum; i++)
{
std::string name = "thread-" + std::to_string(i + 1);
threads.emplace_back(name, Print);
sleep(1);
}
// 统一启动
for (auto &thread : threads)
{
thread.Start();
}
sleep(10);
// 统一结束
for (auto &thread : threads)
{
thread.Stop();
}
// 等待线程等待
for (auto &thread : threads)
{
thread.Join();
}
// Thread t("thread-1", Print);
// t.Start();
// std::cout << t.Name() << ", status: " << t.Status() << std::endl;
// sleep(10);
// std::cout << t.Name() << ", status: " << t.Status() << std::endl;
// t.Stop();
// sleep(1);
// std::cout << t.Name() << ", status: " << t.Status() << std::endl;
// t.Join();
// std::cout << "join done" << std::endl;
return 0;
}
八、线程互斥
1.线程互斥的背景
多线程共享的资源叫做临界资源,当多个线程同时访问临界资源时就有可能会出现问题,接下来以模拟抢票场景为例
模拟实现抢票场景:共有100张票,由4个进程一起抢票,最后票被多卖出两张,共卖出102张票。有三个原因:
(1)当一个线程进行if语句判断为真后可以被并发地切换到其他线程,其他线程再次判断为真准备抢票
(2)抢票业务是一个漫长的过程,在该过程中可能会有很多线程都进入到抢票中
(3)ticket--不是一个原子操作,对应的是三条汇编指令:
- load:将共享变量ticket从内存加载到寄存器中
- update:更新寄存器里面的值,执行-1操作
- store:将新值,从寄存器写回共享变量ticket的内存地址
// 操作共享变量会有问题的售票系统代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
int ticket = 100;
void *route(void *arg)
{
char *id = (char *)arg;
while (1)
{
if (ticket > 0)
{
usleep(1000);
printf("%s sells ticket:%d\n", id, ticket);
ticket--;
}
else
{
break;
}
}
return nullptr;
}
int main(void)
{
pthread_t t1, t2, t3, t4;
pthread_create(&t1, NULL, route, (void*)"thread 1");
pthread_create(&t2, NULL, route, (void*)"thread 2");
pthread_create(&t3, NULL, route, (void*)"thread 3");
pthread_create(&t4, NULL, route, (void*)"thread 4");
pthread_join(t1, NULL);
pthread_join(t2, NULL);
pthread_join(t3, NULL);
pthread_join(t4, NULL);
}
2.互斥锁
互斥锁类型:pthread_mutex_t
定义互斥锁:pthread_mutex_t mutex
初始化互斥锁:
如果互斥锁在全局的或者静态的,pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;如果互斥锁是在栈或者堆上定义的,调用pthread_mutex_init函数。
pthread_mutex_init函数:
#include <pthread.h>
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
参数说明:
mutex
:指向pthread_mutex_t
类型的变量的指针,即互斥锁attr
:指向pthread_mutexattr_t
类型的变量的指针,它指定了互斥锁的属性。如果这个参数是NULL
,则使用默认属性。
返回值:成功返回0,失败返回非零错误码
销毁互斥锁:如果互斥锁是全局的或者静态的,不需要手动销毁,互斥锁会随着进程的结束而销毁;如果互斥锁是局部的,调用pthread_mutex_destroy函数
pthread_mutex_destroy函数:
#include <pthread.h>
int pthread_mutex_destroy(pthread_mutex_t *mutex)
参数说明:
mutex
:指向pthread_mutex_t
类型的变量的指针,该变量表示要销毁的互斥锁。
返回值:成功返回0,失败返回非零错误码
加锁函数pthread_mutex_lock:
#include <pthread.h>
int pthread_mutex_lock(pthread_mutex_t *mutex)
参数说明:
mutex
:指向pthread_mutex_t
类型的变量的指针,该变量表示要锁定的互斥锁。
返回值:成功返回0,失败返回非零错误码
解锁函数pthread_mutex_unlock:
#include <pthread.h>
int pthread_mutex_unlock(pthread_mutex_t *mutex)
参数说明:
mutex
:指向pthread_mutex_t
类型的变量的指针,该变量表示要解锁的互斥锁
返回值:成功返回0,失败返回非零错误码
3.互斥锁原理
多线程共享的资源叫做临界资源,而访问临界资源的代码叫做临界区。对临界资源的保护,本质是对临界区代码的保护。因此加锁解锁针对的是临界区代码。
当某个线程遇到加锁的临界区代码时,首先会申请锁,如果申请成功则继续运行,否进入阻塞状态。本质上申请锁成功后线程可以继续运行是因为pthread_mutex_lock函数成功返回,而申请锁失败了被阻塞是因为pthread_mutex_unlock函数不返回,从而使线程阻塞。
线程要申请锁,首先要保证锁可以被所有线程都访问,这就说明互斥锁本身也是一个临界资源,也需要受到保护,所以加锁的过程必须是原子性的!
小tips:当线程申请锁成功正在执行临界区代码时,如果CPU时间片到了可以被切换走,但此时其他线程依然不能访问临界区代码,因为被切换走的线程还没有释放锁,其他的所有线程必须等待。这也就相当于一个线程申请锁成功后访问临界区代码,对于其他线程来说是原子的!
解决多线程抢票问题:
// 操作共享变量会有问题的售票系统代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
int ticket = 100;
pthread_mutex_t gmutex=PTHREAD_MUTEX_INITIALIZER;//创建并初始化互斥锁
void *route(void *arg)
{
char *id = (char *)arg;
while (1)
{
pthread_mutex_lock(&gmutex);//加锁
if (ticket > 0)
{
usleep(1000);
printf("%s sells ticket:%d\n", id, ticket);
ticket--;
pthread_mutex_unlock(&gmutex);//解锁
}
else
{
pthread_mutex_unlock(&gmutex);//解锁,两个判断情况都要解锁,否则有一种情况不会解锁一直加锁
break;
}
}
return nullptr;
}
int main(void)
{
pthread_t t1, t2, t3, t4;
pthread_create(&t1, NULL, route, (void*)"thread 1");
pthread_create(&t2, NULL, route, (void*)"thread 2");
pthread_create(&t3, NULL, route, (void*)"thread 3");
pthread_create(&t4, NULL, route, (void*)"thread 4");
pthread_join(t1, NULL);
pthread_join(t2, NULL);
pthread_join(t3, NULL);
pthread_join(t4, NULL);
}
4.互斥锁底层实现原理
(1)线程申请锁(加锁)的底层实现原理
在内存中定义了一个互斥锁mutex,mutex属性为0表示互斥锁被线程申请了,属性为1则表示互斥锁没有线程来申请。初始时mutex的属性值为0,另外CPU中存在一个寄存器%al。
一开始mutex没有线程申请,其属性值为1,当某个线程开始申请锁后经过汇编语句1将寄存器%al中的值初始化为0;汇编语句2将寄存器中的值和mutex的属性值交换,%al=1,mutex=0;如果此时线程没有被切换继续执行汇编语句3,成功返回线程继续运行;相反如果线程此时被切换了,线程会带走CPU和寄存器上的相关数据,其他线程再次来进行申请锁,此时mutex已经被上一个线程申请了,mutex属性值为0,经过汇编语句1、2交换mutex和寄存器%al的值后,寄存器%al的值为0,再继续经过汇编语句3条件判断就会被挂起等待,从而实现对临界资源的加锁。
被切换走的线程再次回到CPU执行时,带来被切换时CPU和寄存器上的数据,寄存器%al的值为1,经过汇编语句3条件判断成功返回,从而继续访问临界资源。
所以真正意义上的加锁其实是汇编语句2:交换不是拷贝,只有一个"1",
(2)线程解锁的底层实现原理
解锁只需要经过汇编语句1,交换寄存器%al和mutex的属性值,这样mutex的属性值就变成了1,表示当前没有线程来申请锁。
九、线程同步
线程同步要保证线程访问临界资源的顺序性,不允许一个线程一直独占临界资源。Linux系统中通常使用条件变量同步机制来完成线程同步。
线程同步有两种方式:条件变量和POSIX信号量
1.条件变量接口
条件变量类型:pthread_cond_t
定义条件变量:pthread_cond_t cond
初始化条件变量:
如果条件变量是全局的或者静态的,pthread_cond_t cond = PTHREAD_COND_INITIALIZER;如果条件变量是在堆或者栈上定义的,调用pthread_cond_init函数
pthread_cond_init函数:
#include <pthread.h>
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
参数说明:
cond
:指向pthread_cond_t
类型的变量的指针,即条件变量attr
:指向pthread_condattr_t
类型的变量的指针,它指定了条件变量的属性。如果这个参数是NULL
,则使用默认属性。
返回值:成功返回0,失败返回非零错误码
销毁条件变量:如果条件变量是全局的或者静态的,不需要手动销毁,条件变量会随着进程的结束而销毁;如果条件变量是局部的,调用pthread_cond_destroy函数
pthread_cond_destroy函数:
#include <pthread.h>
int pthread_cond_destroy(pthread_cond_t *cond)
参数说明:
cond
:指向pthread_cond_t
类型的变量的指针,该变量表示要销毁的条件变量。
返回值:成功返回0,失败返回非零错误码
等待条件变量函数pthread_cond_wait:
#include <pthread.h>
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
参数说明:
cond
:指向pthread_cond_t
类型的变量的指针,该变量表示要等待的条件变量。mutex
:指向pthread_mutex_t
类型的变量的指针,该变量表示与条件变量配合使用的互斥锁(函数被调用时,除了让线程阻塞等待,还会解锁;函数返回时,先参与锁的竞争,重新加锁后才会成功返回)
返回值:成功返回0,失败返回非零错误码
唤醒一个线程函数pthread_cond_signal:
#include <pthread.h>
int pthread_cond_signal(pthread_cond_t *cond)
参数说明:
cond
:指向pthread_cond_t
类型的变量的指针,该变量表示要发送信号的条件变量。
返回值:成功返回0,失败返回非零错误码
唤醒所有线程函数pthread_cond_broadcast:
#include <pthread.h>
int pthread_cond_broadcast(pthread_cond_t *cond)
参数说明:
cond
:指向pthread_cond_t
类型的变量的指针,该变量表示要广播信号的条件变量。
返回值:成功返回0,失败返回非零错误码
2.生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而 通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者 要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队 列就是用来给生产者和消费者解耦的。生产者和消费者互不影响,可以并发运行。
优点:解耦、支持并发、支持忙闲不均
3.基于BolckingQueue生产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。
其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元 素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程 操作时会被阻塞)
(1)单生产者单消费者模型
BlockQueue.hpp
#pragma once
#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>
const static int defaultcap=5;
template<class T>
class BlockQueue
{
public:
//构造函数
BlockQueue(int cap=defaultcap):_max_cap(cap)
{
pthread_mutex_init(&_mutex,nullptr);//初始化互斥锁
pthread_cond_init(&_p_cond,nullptr);//初始化生产者条件变量
pthread_cond_init(&_c_cond,nullptr);//初始化消费者条件变量
}
//析构函数
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);//销毁锁
pthread_cond_destroy(&_p_cond);//销毁生产者条件变量
pthread_cond_destroy(&_c_cond);//销毁消费者条件变量
}
//添加数据
void Equeue(const T& in)//输入型参数
{
pthread_mutex_lock(&_mutex);//加锁
while(IsFull())//阻塞队列满了,生产者不能再添加数据,线程必须等待
{
//阻塞队列满了,生产者线程必须等待,不可以添加数据
//存在问题:当前生产者线程处于临界区,如果在此处等待,没有解锁的话,其他线程无法访问临界区
//解决问题:当调用pthread_wait_cond函数除了让线程阻塞等待,还会自动解锁;
// 函数返回时,线程要重新参与锁的竞争,重新加上锁后再返回
pthread_cond_wait(&_p_cond,&_mutex);
}
//到当前位置时,说明1.阻塞队列没有满 或者 2.生产者线程被唤醒了
_block_queue.push(in);//添加数据
pthread_mutex_unlock(&_mutex);//解锁
pthread_cond_signal(&_c_cond);//生产者线程生产一个数据后,说明阻塞队列一定不为空
//所以让生产者线程唤醒消费者线程
}
//取出数据
void Pop(T* out)//输出型参数
{
pthread_mutex_lock(&_mutex);//加锁
while(IsEmpty())//阻塞队列空了,消费者线程不能取出数据,必须进行等待
{
pthread_cond_wait(&_c_cond,&_mutex);//存在问题:伪唤醒问题
}
//到当前位置时,说明1.阻塞队列不是空的 或者 2.消费者线程被唤醒了
*out=_block_queue.front();
_block_queue.pop();
pthread_mutex_unlock(&_mutex);//解锁
pthread_cond_signal(&_p_cond);//消费者线程取出一个数据后,说明次数阻塞队列一定不是满的
//所以让消费者线程唤醒生产者线程
}
private:
bool IsFull()
{
return _block_queue.size()==_max_cap;
}
bool IsEmpty()
{
return _block_queue.empty();
}
private:
std::queue<T> _block_queue;//阻塞队列中维护的是临界资源,因此要给阻塞队列加锁
pthread_mutex_t _mutex;//互斥锁
int _max_cap;//阻塞队列的最大容量
pthread_cond_t _p_cond;//生产者条件变量
pthread_cond_t _c_cond;//消费者条件变量
};
main.cc
#include "BlockQueue.hpp"
#include <unistd.h>
void* Consumer(void* args)
{
BlockQueue<int>* bq=static_cast<BlockQueue<int>*>(args);
while(true)
{
sleep(2);
//1.获取数据
int data=0;
bq->Pop(&data);
//2.处理数据
std::cout<<"Consumer -> "<<data<<std::endl;
}
}
void* Productor(void* args)
{
BlockQueue<int>* bq=static_cast<BlockQueue<int>*>(args);
while(true)
{
//1.构建数据
int data=rand()%10+1;//数据范围[1,10]
//2.生产数据
bq->Equeue(data);
std::cout<<"Productor -> "<<data<<std::endl;
}
}
int main()
{
BlockQueue<int> *bq=new BlockQueue<int>();
pthread_t c,q;
pthread_create(&c,nullptr,Consumer,bq);
pthread_create(&q,nullptr,Productor,bq);
pthread_join(c,nullptr);
pthread_join(q,nullptr);
return 0;
}
(2)多生产者多消费者模型
单生产者单消费者模型的代码同样适用于多生产者多消费者模型,只需要多创建几个生产者和消费者即可。生产者线程之间会自行进行锁竞争,消费者线程之间也会进行锁竞争。
4.POSIX信号量接口
信号量本质是对临界资源的预定,将一个大的临界资源分为多个小的临界资源,多个线程访问划分出来的小的临界资源,只要不访问同一份临界资源即可。
如果设置信号量的最大值为1,则称为二元信号量,信号量只能取0或1,即把资源当成一个整体来访问使用,相当于互斥锁。
信号量类型:sem_t
定义信号量:sem_t sem
初始化信号量函数sem_init:
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value)
参数说明:
sem
是指向信号量对象的指针。pshared
控制信号量的类型:如果为0,表示信号量在当前进程的线程间共享;如果大于0,表示信号量在多个进程间共享。value
是信号量的初始值。
返回值:成功返回0,失败返回-1并设置errno错误码
销毁信号量函数sem_destroy:
#include <semaphore.h>
int sem_destroy(sem_t *sem)
参数说明:
sem
:指向sem_t
类型的变量的指针,该变量表示要销毁的信号量。
返回值:成功返回0,失败返回-1并设置errno错误码
P操作信号量--函数sem_wait:
#include <semaphore.h>
int sem_wait(sem_t *sem)
参数说明:
sem
:指向sem_t
类型的变量的指针,该变量表示要等待的信号量。
返回值:成功返回0,失败返回-1并设置errno错误码
V操作信号量++函数sem_post:
#include <semaphore.h>
int sem_post(sem_t *sem)
参数说明:
sem
:指向sem_t
类型的变量的指针,该变量表示要释放的信号量。
返回值:成功返回0,失败返回-1并设置errno错误码
5.基于环形队列的生产者消费者模型
环形队列采用数组模拟,用模运算来模拟环状特性
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态。
但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程
(1)单生产者单消费者模型
RingQueue.hpp
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <semaphore.h>
template<class T>
class RingQueue
{
public:
//构造函数
RingQueue(int max_cap):_ringqueue(max_cap),_c_step(0),_p_step(0)
{
sem_init(&_data_sem,0,0);//初始化数据信号量,初始值为0
sem_init(&_space_sem,0,max_cap);//初始化空间信号量,初始值为max_cap
}
//析构函数
~RingQueue()
{
sem_destroy(&_data_sem);//销毁数据信号量
sem_destroy(&_space_sem);//销毁空间信号量
}
//生产者--生产数据资源,消费空间资源
void Push(const T&in)
{
P(_space_sem);//申请空间信号量,空间信号量--(阻塞调用线程,直到信号量的值大于零,然后将其值减一)
_ringqueue[_p_step]=in;
_p_step++;
_p_step%=_max_cap;
V(_data_sem);//释放数据信号量,数据信号量++(增加信号量的值,如果信号量的值增加后有其他线程正在等待该信号量,那么至少有一个等待的线程将被唤醒)
}
//消费者
void Pop(T* out)
{
P(_data_sem);//申请数据信号量,数据信号量--(阻塞调用线程,直到信号量的值大于零,然后将其值减一)
*out=_ringqueue[_c_step];
_c_step++;
_c_step%=_max_cap;
V(_space_sem);//释放空间信号量,空间信号量++(增加信号量的值,如果信号量的值增加后有其他线程正在等待该信号量,那么至少有一个等待的线程将被唤醒)
}
//私有成员函数
private:
//P操作--
void P(sem_t &s)
{
sem_wait(&s);
}
//V操作++
void V(sem_t &s)
{
sem_post(&s);
}
//私有成员变量
private:
std::vector<T> _ringqueue;//环形队列
int _max_cap;//最大容量
int _c_step;//消费者下标
int _p_step;//生产者下标
sem_t _data_sem;//数据信号量--消费者
sem_t _space_sem;//空间信号量--生产者
};
main.cc
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
#include "RingQueue.hpp"
//消费数据
void* Consumer(void* args)
{
RingQueue<int>* rq=static_cast<RingQueue<int>*>(args);
while(true)
{
int data=0;
//1.消费
rq->Pop(&data);
//2.处理数据
std::cout<<"Consumer-> "<<data<<std::endl;
}
return nullptr;
}
//生产数据
void* Productor(void* args)
{
RingQueue<int>* rq=static_cast<RingQueue<int>*>(args);
while(true)
{
sleep(1);
//1.生产
int data=rand()%10+1;
//2.处理数据
rq->Push(data);
std::cout<<"Productor-> "<<data<<std::endl;
}
return nullptr;
}
int main()
{
//srand(time(nullptr)^getpid());
RingQueue<int>* rq=new RingQueue<int>(5);
pthread_t c,p;
pthread_create(&c,nullptr,Consumer,rq);
pthread_create(&c,nullptr,Productor,rq);
return 0;
}
(2)多生产者多消费者模型
多生产者多消费者模型中,生产者和生产者之间要有互斥关系,消费者和消费者之间也要有互斥关系。所有还另外需要加两把锁。
十、线程池
1.线程池概念
一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
2.线程池的应用场景
1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误.
3.线程池示例
1. 创建固定数量线程池,循环从任务队列中获取任务对象,
2. 获取到任务对象后,执行任务对象中的任务接口
4.线程池代码
ThreadPool.hpp
//ThreadPool.hpp
#include <iostream>
#include <queue>
#include <vector>
#include <string>
#include <unistd.h>
#include <functional>
#include "Thread.hpp"
using namespace ThreadMoudle;
static const int gdefaultnum=5;//线程池中线程的数量
template<typename T>
class ThreadPool
{
public:
//构造函数
ThreadPool(int thread_num=gdefaultnum):_thread_num(thread_num),_isrunning(false),_sleep_thread_num(0)
{
pthread_mutex_init(&_mutex,nullptr);//初始化互斥锁
pthread_cond_init(&_cond,nullptr);//初始化条件变量
}
//线程池初始化
void Init()
{
func_t func=std::bind(&ThreadPool::HandlerTask,this,std::placeholders::_1);//强关联,构建新对象,作为任务队列中对象的参数
for(int i=0;i<_thread_num;++i)
{
std::string threadname="thread-"+std::to_string(i+1);
_threads.emplace_back(threadname,func);//emplace_back会自动根据参数创建对象填入vector容器中
}
}
//启动线程池
void Start()
{
_isrunning=true;
//创建并运行所有线程
for(auto& thread:_threads)
{
thread.Start();
}
}
//关闭线程池
void Stop()
{
LockQueue();
_isrunning=false;
WakeupAll();//唤醒所有线程,防止有线程在休眠不经过判断条件,无法退出
UnlockQueue();
}
//向线程池发送任务
void Equeue(const T& in)
{
LockQueue();//加锁
if(_isrunning)//只有线程池处于运行状态时,才可以向任务队列添加任务
{
_task_queue.push(in);//向任务队列中添加任务
if(_sleep_thread_num>0)//只要有线程在休眠就唤醒线程
{
Wakeup();
}
}
UnlockQueue();//解锁
}
//析构函数
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);//销毁互斥锁
pthread_cond_destroy(&_cond);//销毁条件变量
}
private:
//任务队列加锁
void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
//任务队列解锁
void UnlockQueue()
{
pthread_mutex_unlock(&_mutex);
}
//唤醒单个线程
void Wakeup()
{
pthread_cond_signal(&_cond);
}
//唤醒所有线程
void WakeupAll()
{
pthread_cond_broadcast(&_cond);
}
//判断任务队列是否为空
bool IsEmpty()
{
return _task_queue.empty();
}
//线程处理任务队列中的任务
void HandlerTask(const std::string& name)
{
while(true)
{
LockQueue();//加锁
while(IsEmpty()&&_isrunning)//如果任务队列为空,线程休眠(使用while防止伪唤醒问题)
{ //任务队列为空但是线程池还要运行时,线程才能去休眠
_sleep_thread_num++;
Sleep();//线程休眠
_sleep_thread_num--;
}
//判定线程池是否要退出
if(IsEmpty()&&!_isrunning)//只有当任务队列为空,并且线程池也要退出时,再退出线程
{
std::cout<<name<<" quit"<<std::endl;
UnlockQueue();
break;
}
//处理任务
T t=_task_queue.front();//取出任务
_task_queue.pop();//删除任务队列中的任务
UnlockQueue();//解锁
t();//处理任务,必须在解锁之后处理,因为任务被取出之后就属于线程了,不属于临界资源了
std::cout<<name<<": "<<t.result()<<std::endl;
}
}
//线程休眠
void Sleep()
{
pthread_cond_wait(&_cond,&_mutex);
}
private:
int _thread_num;//线程池中线程的个数
std::vector<Thread> _threads;//管理线程
std::queue<T> _task_queue;//任务队列
bool _isrunning;//判断线程池是否正在运行
pthread_mutex_t _mutex;//互斥锁,保护临界资源:任务队列
pthread_cond_t _cond;//条件变量,实现线程同步
int _sleep_thread_num;//休眠线程的个数
};
Thread.hpp
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <pthread.h>
namespace ThreadMoudle
{
// 线程要执行的方法,后面我们随时调整
// typedef void (*func_t)(ThreadData *td); // 函数指针类型
// typedef std::function<void()> func_t;
using func_t = std::function<void(const std::string&)>;
class Thread
{
public:
void Excute()
{
_isrunning = true;
_func(_name);
_isrunning = false;
}
public:
Thread(const std::string &name, func_t func):_name(name), _func(func)
{
}
static void *ThreadRoutine(void *args) // 新线程都会执行该方法!
{
Thread *self = static_cast<Thread*>(args); // 获得了当前对象
self->Excute();
return nullptr;
}
bool Start()
{
int n = ::pthread_create(&_tid, nullptr, ThreadRoutine, this);
if(n != 0) return false;
return true;
}
std::string Status()
{
if(_isrunning) return "running";
else return "sleep";
}
void Stop()
{
if(_isrunning)
{
::pthread_cancel(_tid);
_isrunning = false;
}
}
void Join()
{
::pthread_join(_tid, nullptr);
}
std::string Name()
{
return _name;
}
~Thread()
{
}
private:
std::string _name;
pthread_t _tid;
bool _isrunning;
func_t _func; // 线程要执行的回调函数
};
} // namespace ThreadModle
Task.hpp
#pragma once
#include<iostream>
#include<functional>
// typedef std::function<void()> task_t;
// using task_t = std::function<void()>;
// void Download()
// {
// std::cout << "我是一个下载的任务" << std::endl;
// }
// 要做加法
class Task
{
public:
Task()
{
}
Task(int x, int y) : _x(x), _y(y)
{
}
void Excute()
{
_result = _x + _y;
}
void operator ()()
{
Excute();
}
std::string debug()
{
std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=?";
return msg;
}
std::string result()
{
std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=" + std::to_string(_result);
return msg;
}
private:
int _x;
int _y;
int _result;
};
main.cc
#include "ThreadPool.hpp"
#include "Task.hpp"
int main()
{
ThreadPool<Task> *tp=new ThreadPool<Task>();
tp->Init();//初始化线程池
tp->Start();//启动线程池
int cnt=10;
while(cnt--)
{
//不断地向线程池发送任务
sleep(1);
Task t(1,1);
tp->Equeue(t);
sleep(1);
}
tp->Stop();
std::cout<<"thread pool stop"<<std::endl;
sleep(10);
return 0;
return 0;
}
十一、日志
日志是程序运行信息的记录,可以向显示器打印或者向文件打印
日志有特定的格式:例如[日志等级][pid][filename][filenumber][time] + 日志内容(支持可变参数)
日志等级:DEBUG(调试信息)、INFO(常规信息)、WARNING(警告信息)、ERROR(错误信息)、FATAL(致命信息)
日志实现代码:
#pragma once
#include <iostream>
#include <unistd.h>
#include <sys/wait.h>
#include <ctime>
#include <stdarg.h>
#include <fstream>
#include <cstring>
#include <pthread.h>
namespace log_ns
{
// 日志消息等级
enum
{
DEBUG = 1, // 调试信息
INFO, // 常规信息
WARNING, // 警告信息
ERROR, // 错误信息
FATAL // 致命信息
};
// 日志消息类
class logmessage
{
public:
std::string _level; // 日志等级
pid_t _id; // 进程id
std::string _filename; // 文件名
int _filenumber; // 文件行号
std::string _curr_time; // 时间
std::string _message_info; // 日志内容
};
// 将日志等级转化为字符串
std::string LevelToString(int level)
{
switch (level)
{
case DEBUG:
return "DEBUG";
case INFO:
return "INFO";
case WARNING:
return "WARNING";
case ERROR:
return "ERROR";
case FATAL:
return "FATAL";
default:
return "UNKNOWN";
}
}
std::string GetCurrTime()
{
time_t now = time(nullptr); // 获取当前时间戳,time_t为时间戳类型
struct tm *curr_time = localtime(&now); // 将时间戳转化为struct tm类型,其成员变量为年月日时分秒
char buffer[128];
snprintf(buffer, sizeof(buffer), "%d-%02d-%02d %02d:%02d:%02d",
curr_time->tm_year + 1900,
curr_time->tm_mon + 1,
curr_time->tm_mday,
curr_time->tm_hour,
curr_time->tm_min,
curr_time->tm_sec);
return buffer;
}
#define SCREEN_TYPE 1 // 日志消息向显示器输出
#define FILE_TYPE 2 // 日志消息向文件输出
const std::string glogfile = "./log.txt"; // 默认向该文件中输出日志消息
pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER; // 全局锁,防止多线程输出日志消息时出错
// 日志类
class Log
{
private:
int _type; // 决定日志消息向显示器/文件输出
std::string _logfile; // 日志文件路径
public:
// 有参构造函数
Log(const std::string logfile = glogfile) : _logfile(logfile), _type(SCREEN_TYPE) // 默认向显示器输出
{
}
// 改变日志消息输出位置
void Enable(int type)
{
_type = type;
}
// 日志消息向显示器输出
void FlushLogToScreen(const logmessage &lg)
{
printf("[%s][%d][%s][%d][%s] %s",
lg._level.c_str(),
lg._id,
lg._filename.c_str(),
lg._filenumber,
lg._curr_time.c_str(),
lg._message_info.c_str());
}
// 日志消息向文件输出
void FlushLogToFile(const logmessage &lg)
{
std::ofstream out(_logfile, std::ios::app); // 打开日志文件并追加输入
if (!out.is_open())
return; // 打开文件失败
char logtxt[2048];
snprintf(logtxt, sizeof(logtxt), "[%s][%d][%s][%d][%s] %s", // 将日志消息格式化输入到logtxt缓冲区
lg._level.c_str(),
lg._id,
lg._filename.c_str(),
lg._filenumber,
lg._curr_time.c_str(),
lg._message_info.c_str());
out.write(logtxt, strlen(logtxt)); // 向日志文件中输出
out.close(); // 关闭文件
}
// 刷新日志消息
void FlushLog(const logmessage &lg)
{
pthread_mutex_lock(&glock);
switch (_type)
{
case SCREEN_TYPE:
FlushLogToScreen(lg);
break;
case FILE_TYPE:
FlushLogToFile(lg);
break;
}
pthread_mutex_unlock(&glock);
}
void logMessage(std::string filename, int filenumber, int level, const char *format, ...)
{
logmessage lg; // 日志消息对象
lg._level = LevelToString(level); // 初始化日志消息:日志等级
lg._id = getpid(); // 初始化日志消息:进程id
lg._filename = filename; // 初始化日志消息:日志文件名
lg._filenumber = filenumber; // 初始化日志消息:日志文件行号
lg._curr_time = GetCurrTime(); // 初始化日志消息:时间
// 将可变参数中的内容格式化为日志消息内容
va_list ap;
va_start(ap, format);
char log_info[1024];
vsnprintf(log_info, sizeof(log_info), format, ap); // 自动提取可变参数,并将其填入log_info缓冲区中
va_end(ap);
lg._message_info = log_info;
// 打印日志消息(刷新日志消息)
FlushLog(lg);
}
// 析构函数
~Log()
{
}
};
Log lg; // 定义一个全局的日志类对象
// 宏,使得日志消息输出时,可以不用带__FILE__和__LINE__,其中__VA_ARGS__用于接受可变参数部分
/*__VA_ARGS__加上##如果没有可变参数,可以自动去掉__VA_ARGS__*/
#define LOG(Level, Format, ...) \
do \
{ \
lg.logMessage(__FILE__, __LINE__, Level, Format, ##__VA_ARGS__); \
} while (0)
// 同时定义切换日志消息输出位置的宏
#define EnableScreen() do{lg.Enable(SCREEN_TYPE);} while (0)
#define EnableFILE() do{lg.Enable(FILE_TYPE);} while (0)
}
日志测试代码:
#include "ThreadPool.hpp"
#include "Task.hpp"
#include "Log.hpp"
using namespace log_ns;
int main()
{
//向显示器输出
EnableScreen();
LOG(INFO,"hello world\n");
LOG(DEBUG,"hello %d, world: %c,hello:%f\n",1000,'A',3.14);
LOG(ERROR,"hello %d, world: %c,hello:%f\n",1000,'A',3.14);
LOG(FATAL,"hello %d, world: %c,hello:%f\n",1000,'A',3.14);
LOG(WARNING,"hello %d, world: %c,hello:%f\n",1000,'A',3.14);
LOG(INFO,"hello %d, world: %c,hello:%f\n",1000,'A',3.14);
LOG(DEBUG,"hello %d, world: %c,hello:%f\n",1000,'A',3.14);
//向文件输出
EnableFILE();
LOG(DEBUG,"hello %d, world: %c,hello:%f\n",1000,'A',3.14);
LOG(DEBUG,"hello %d, world: %c,hello:%f\n",1000,'A',3.14);
LOG(DEBUG,"hello %d, world: %c,hello:%f\n",1000,'A',3.14);
LOG(DEBUG,"hello %d, world: %c,hello:%f\n",1000,'A',3.14);
LOG(DEBUG,"hello %d, world: %c,hello:%f\n",1000,'A',3.14);
LOG(DEBUG,"hello %d, world: %c,hello:%f\n",1000,'A',3.14);
return 0;
}
十二、在线程池中添加日志
在县城翅中廷加日志本质是将原本使用cout输出的信息,改为使用日志输出,日志可以选择将信息输出到显示器中,也可以选择奖信息输出到日志文件中
ThreadPool.hpp
#include <iostream>
#include <queue>
#include <vector>
#include <string>
#include <unistd.h>
#include <functional>
#include "Task.hpp"
#include "Thread.hpp"
#include "Log.hpp"
using namespace ThreadMoudle;
using namespace log_ns;
static const int gdefaultnum=5;//线程池中线程的数量
template<typename T>
class ThreadPool
{
public:
//构造函数
ThreadPool(int thread_num=gdefaultnum):_thread_num(thread_num),_isrunning(false),_sleep_thread_num(0)
{
pthread_mutex_init(&_mutex,nullptr);//初始化互斥锁
pthread_cond_init(&_cond,nullptr);//初始化条件变量
}
//线程池初始化
void Init()
{
func_t func=std::bind(&ThreadPool::HandlerTask,this,std::placeholders::_1);//强关联,构建新对象,作为任务队列中对象的参数
for(int i=0;i<_thread_num;++i)
{
std::string threadname="thread-"+std::to_string(i+1);
_threads.emplace_back(threadname,func);//emplace_back会自动根据参数创建对象填入vector容器中
LOG(DEBUG,"construct thread %s done,init sucess\n",threadname.c_str());
}
}
//启动线程池
void Start()
{
_isrunning=true;
//创建并运行所有线程
for(auto& thread:_threads)
{
LOG(DEBUG,"start thread %s done.\n",thread.Name().c_str());
thread.Start();
}
}
//关闭线程池
void Stop()
{
LockQueue();
_isrunning=false;
WakeupAll();//唤醒所有线程,防止有线程在休眠不经过判断条件,无法退出
UnlockQueue();
LOG(INFO,"Thread Pool Stop Sucess!\n");
}
//向线程池发送任务
void Equeue(const T& in)
{
LockQueue();//加锁
if(_isrunning)//只有线程池处于运行状态时,才可以向任务队列添加任务
{
_task_queue.push(in);//向任务队列中添加任务
if(_sleep_thread_num>0)//只要有线程在休眠就唤醒线程
{
Wakeup();
}
}
UnlockQueue();//解锁
}
//析构函数
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);//销毁互斥锁
pthread_cond_destroy(&_cond);//销毁条件变量
}
private:
//任务队列加锁
void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
//任务队列解锁
void UnlockQueue()
{
pthread_mutex_unlock(&_mutex);
}
//唤醒单个线程
void Wakeup()
{
pthread_cond_signal(&_cond);
}
//唤醒所有线程
void WakeupAll()
{
pthread_cond_broadcast(&_cond);
}
//判断任务队列是否为空
bool IsEmpty()
{
return _task_queue.empty();
}
//线程处理任务队列中的任务
void HandlerTask(const std::string& name)
{
while(true)
{
LockQueue();//加锁
while(IsEmpty()&&_isrunning)//如果任务队列为空,线程休眠(使用while防止伪唤醒问题)
{ //任务队列为空但是线程池还要运行时,线程才能去休眠
_sleep_thread_num++;
LOG(INFO,"%s thread sleep begin!\n",name.c_str());
Sleep();//线程休眠
LOG(INFO,"%s thread wake up!\n",name.c_str());
_sleep_thread_num--;
}
//判定线程池是否要退出
if(IsEmpty()&&!_isrunning)//只有当任务队列为空,并且线程池也要退出时,再退出线程
{
//std::cout<<name<<" quit"<<std::endl;
LOG(INFO,"%s thread quit\n",name.c_str());
UnlockQueue();
break;
}
//处理任务
T t=_task_queue.front();//取出任务
_task_queue.pop();//删除任务队列中的任务
UnlockQueue();//解锁
t();//处理任务,必须在解锁之后处理,因为任务被取出之后就属于线程了,不属于临界资源了
//std::cout<<name<<": "<<t.result()<<std::endl;
LOG(DEBUG,"hander task done,task is: %s\n",t.result().c_str());
}
}
//线程休眠
void Sleep()
{
pthread_cond_wait(&_cond,&_mutex);
}
private:
int _thread_num;//线程池中线程的个数
std::vector<Thread> _threads;//管理线程
std::queue<T> _task_queue;//任务队列
bool _isrunning;//判断线程池是否正在运行
pthread_mutex_t _mutex;//互斥锁,保护临界资源:任务队列
pthread_cond_t _cond;//条件变量,实现线程同步
int _sleep_thread_num;//休眠线程的个数
};
main.cc
#include "ThreadPool.hpp"
#include "Task.hpp"
#include "Log.hpp"
using namespace log_ns;
int main()
{
ThreadPool<Task> *tp=new ThreadPool<Task>();
tp->Init();//初始化线程池
tp->Start();//启动线程池
int cnt=10;
while(cnt--)
{
//不断地向线程池发送任务
sleep(1);
Task t(1,1);
tp->Equeue(t);
LOG(INFO,"eququ a task,%s\n",t.debug().c_str());
sleep(1);
}
tp->Stop();
LOG(INFO,"thread pool stop!\n");
return 0;
}
十三、线程安全的单例模式
1.懒汉实现单例模式和饿汉实现单例模式介绍
单例模式是一种设计模式,某些类只能有一个对象(实例)就叫做单例模式
实现单例模式有两种方式:饿汉实现方式和懒汉实现方式
以吃饭为例:吃完饭立即洗碗,下次吃饭就可以立即使用碗吃饭就是饿汉实现方式;吃完饭不洗碗,等下次吃饭用到碗时再洗碗就是懒汉实现方式。
实际编写代码时,在栈区开辟对象就是饿汉实现方式,因为在编译期间就会为栈区的对象分配空间;在堆区开辟对象就是懒汉实现方式,在程序运行时才会为堆区的对象分配空间
如何使类只有一个对象?
例如T类,只需要使用一个包装类来使用T类的对象,并将T类的构造函数设置为私有成员,并禁用拷贝构造函数、赋值运算符
饿汉实现方式:
template <typename T>
class Singleton
{
static T data;
public:
static T* GetInstance()//返回对象指针
{
return &data;
}
};
懒汉实现方式:
template <typename T>
class Singleton {
static T* inst;
public:
static T* GetInstance()//返回对象指针
{
if (inst == NULL)
{
inst = new T();
}
return inst;
}
};
2.懒汉方式实现单例模式线程池源代码
ThreadPool.hpp
#include <iostream>
#include <queue>
#include <vector>
#include <string>
#include <unistd.h>
#include <functional>
#include "Task.hpp"
#include "Thread.hpp"
#include "Log.hpp"
using namespace ThreadMoudle;
using namespace log_ns;
static const int gdefaultnum = 5; // 线程池中线程的数量
template <typename T>
class ThreadPool
{
public:
// 返回单例模式对象指针
// 存在问题:多线程获取单例模式的线程池,如果线程池对象没有被创建,此时多个线程可能会同时执行new创建线程池对象
// 这就不是单例模式的线程池了,所以必须对创建线程池对象的操作加锁!
static ThreadPool<T> *GetInstance()
{
if (_tp == nullptr)//只有当线程池对象没有被创建时才需要加锁,否则直接返回单例模式线程池的对象指针即可
{
pthread_mutex_lock(&_sig_mutex); // 加锁
if (_tp == nullptr) // 说明对象还没有开辟
{
LOG(INFO, "create threadpool\n");
_tp = new ThreadPool(); // 懒汉实现方式
_tp->Init();
_tp->Start();
}
else
{
LOG(INFO, "get threadpool\n");
}
pthread_mutex_unlock(&_sig_mutex); // 解锁
}
return _tp;
}
// 关闭线程池
void Stop()
{
LockQueue();
_isrunning = false;
WakeupAll(); // 唤醒所有线程,防止有线程在休眠不经过判断条件,无法退出
UnlockQueue();
LOG(INFO, "Thread Pool Stop Sucess!\n");
}
// 向线程池发送任务
void Equeue(const T &in)
{
LockQueue(); // 加锁
if (_isrunning) // 只有线程池处于运行状态时,才可以向任务队列添加任务
{
_task_queue.push(in); // 向任务队列中添加任务
if (_sleep_thread_num > 0) // 只要有线程在休眠就唤醒线程
{
Wakeup();
}
}
UnlockQueue(); // 解锁
}
// 析构函数
~ThreadPool()
{
pthread_mutex_destroy(&_mutex); // 销毁互斥锁
pthread_cond_destroy(&_cond); // 销毁条件变量
}
private:
ThreadPool(const ThreadPool<T> &) = delete; // 禁用拷贝构造函数
void operator=(const ThreadPool<T> &) = delete; // 禁用赋值运算符重载
// 构造函数
ThreadPool(int thread_num = gdefaultnum) : _thread_num(thread_num), _isrunning(false), _sleep_thread_num(0)
{
pthread_mutex_init(&_mutex, nullptr); // 初始化互斥锁
pthread_cond_init(&_cond, nullptr); // 初始化条件变量
}
// 线程池初始化
void Init()
{
func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1); // 强关联,构建新对象,作为任务队列中对象的参数
for (int i = 0; i < _thread_num; ++i)
{
std::string threadname = "thread-" + std::to_string(i + 1);
_threads.emplace_back(threadname, func); // emplace_back会自动根据参数创建对象填入vector容器中
LOG(DEBUG, "construct thread %s done,init sucess\n", threadname.c_str());
}
}
// 启动线程池
void Start()
{
_isrunning = true;
// 创建并运行所有线程
for (auto &thread : _threads)
{
LOG(DEBUG, "start thread %s done.\n", thread.Name().c_str());
thread.Start();
}
}
// 任务队列加锁
void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
// 任务队列解锁
void UnlockQueue()
{
pthread_mutex_unlock(&_mutex);
}
// 唤醒单个线程
void Wakeup()
{
pthread_cond_signal(&_cond);
}
// 唤醒所有线程
void WakeupAll()
{
pthread_cond_broadcast(&_cond);
}
// 判断任务队列是否为空
bool IsEmpty()
{
return _task_queue.empty();
}
// 线程处理任务队列中的任务
void HandlerTask(const std::string &name)
{
while (true)
{
LockQueue(); // 加锁
while (IsEmpty() && _isrunning) // 如果任务队列为空,线程休眠(使用while防止伪唤醒问题)
{ // 任务队列为空但是线程池还要运行时,线程才能去休眠
_sleep_thread_num++;
LOG(INFO, "%s thread sleep begin!\n", name.c_str());
Sleep(); // 线程休眠
LOG(INFO, "%s thread wake up!\n", name.c_str());
_sleep_thread_num--;
}
// 判定线程池是否要退出
if (IsEmpty() && !_isrunning) // 只有当任务队列为空,并且线程池也要退出时,再退出线程
{
// std::cout<<name<<" quit"<<std::endl;
LOG(INFO, "%s thread quit\n", name.c_str());
UnlockQueue();
break;
}
// 处理任务
T t = _task_queue.front(); // 取出任务
_task_queue.pop(); // 删除任务队列中的任务
UnlockQueue(); // 解锁
t(); // 处理任务,必须在解锁之后处理,因为任务被取出之后就属于线程了,不属于临界资源了
// std::cout<<name<<": "<<t.result()<<std::endl;
LOG(DEBUG, "hander task done,task is: %s\n", t.result().c_str());
}
}
// 线程休眠
void Sleep()
{
pthread_cond_wait(&_cond, &_mutex);
}
private:
int _thread_num; // 线程池中线程的个数
std::vector<Thread> _threads; // 管理线程
std::queue<T> _task_queue; // 任务队列
bool _isrunning; // 判断线程池是否正在运行
pthread_mutex_t _mutex; // 互斥锁,保护临界资源:任务队列
pthread_cond_t _cond; // 条件变量,实现线程同步
int _sleep_thread_num; // 休眠线程的个数
// 单例模式
static ThreadPool<T> *_tp;
// 如果多个线程同时调用单例模式的线程池,会创建多个对象,这样就不是单例模式了,所以要在
static pthread_mutex_t _sig_mutex; // 单例的锁,
};
template <class T>
ThreadPool<T> *ThreadPool<T>::_tp = nullptr; // 静态成员类外初始化
template <class T>
pthread_mutex_t ThreadPool<T>::_sig_mutex = PTHREAD_MUTEX_INITIALIZER; // 静态成员类外初始化
main.cc
#include "ThreadPool.hpp"
#include "Task.hpp"
#include "Log.hpp"
using namespace log_ns;
int main()
{
//ThreadPool<Task> *tp=new ThreadPool<Task>();
// tp->Init();//初始化线程池
// tp->Start();//启动线程池
int cnt=10;
while(cnt--)
{
//不断地向线程池发送任务
sleep(1);
Task t(1,1);
ThreadPool<Task>::GetInstance()->Equeue(t);//使用单例模式:初始化线程池、启动线程池
LOG(INFO,"eququ a task,%s\n",t.debug().c_str());
sleep(1);
}
ThreadPool<Task>::GetInstance()->Stop();//关闭线程池
//tp->Stop();
LOG(INFO,"thread pool stop!\n");
return 0;
}
十四、死锁
1.死锁的概念
死锁是指各个进程各自占有不会释放的资源,并且互相申请其他进程所占有的不会释放的资源,从而处于的一种永久等待的状态
2.死锁的四个必要条件
- 互斥条件:一个资源每次只能被一个执行流使用
- 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放
- 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺
- 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系
3.避免死锁的方法
- 破坏死锁的四个必要条件
- 加锁顺序一致
- 避免锁未释放的场景
- 资源一次性分配
4.避免死锁的算法
- 死锁检测算法
- 银行家算法