Linux·线程控制
1. 线程控制
创建多线程要用到 pthread_create() 函数
这个函数不是系统调用,而是一个库函数,Linux下没有真正意义上的线程,都是用LWP模拟的,这意味着Linux操作系统并没有线程的系统调用接口,只会提供创建LWP轻量级进程的系统调用接口。
创建LWP的接口是 vfork() 函数
vfork函数是创建一个和父进程共享地址空间的子进程。
而vfork和fork创建进程底层调的都是一个叫 clone() 的函数
这个接口才是真正意义上的系统调用,但是这个系统调用太难调了,我们没办法使用。这个接口可以根据flag参数的选项,选择是创建一个子进程还是创建LWP。
因为LWP是Linux自己搞的一个概念将进程和线程进行代码复用造成二者之间关系在内核层面上的模糊化,但是主流操作系统知识是将二者明确区分开的,如果只学主流操作系统知识后是无法使用clone并创建LWP的,因此Linux设计者在系统调用上又封装了一层库,将clone封装进库中的pthread_create()函数里,此时用户可以直接使用该函数创建用户层面上的"线程”,从而降低了学习LWP的成本。
Linux下线程的概念是在用户层而非内核层提出的,我们把这种概念叫做用户及线程。
但是Windows操作系统下在内核中真的有进程和线程的区分,真的存在PCB和TCB,因此Windows真的提供了创建进程和线程的系统调用。
Linux设计者封装的这套线程库就被称为pthread库。它是用户级别的库,由Linux系统自带的,原生线程库。
说回创建线程的函数pthread_create()
第一个参数线程唯一标识符tid
第二个参数是线程属性,暂时不管设为nullptr
第三个参数是,返回值 void* 参数类型 void* 的函数指针,也是我们想让线程执行的任务
第四个参数是喂给线程函数的参数
返回值,创建成功为0。创建失败返回对应失败的原因,数字代表不同原因,同时设置错误码。
不过现在C++11标准下也支持了多线程,有一个<thread>库,本质上其还是封装了pthread库,因为高级语言封装了每个平台下的thread库,因此语言就具有了跨平台性,我们用thread库的函数就可以写出既能在Linux下跑也能在Windows下跑的代码了。
1.1 线程创建 create
我们可以把新线程的tid打印出来看看
可以看到是一个很长的数字,与我们能查到的LWP明显是不一样的。
在代码中如果想获取自己线程的线程id还可以使用man pthread_self查看
可以看到pthread_self()函数将线程tid打印出来了。
1.1.1 重入问题暴露
我们将代码稍加修改,新创建几个线程,让他们都去循环打印自己的名称和tid
很幸运,一上来就发生了打印错乱的情况,这就是因为发生了函数重入的情况。
因为向显示器打印都要访问同样的显示器文件缓冲区,而线程之间的运行顺序是不确定的,于是打印就出现了问题。
routine内部回访问公共资源,因此它是不可重入函数,入过我们想解决这类问题,就要用到同步或加锁的方式。
1.2 线程等待 join
线程跟进程是类似的,主线程也要等待子线程执行,得知子线程的运行结果以及回收子线程。man pthread_join 查看
等待或同步一个退出的线程,一般都是由主线程去join新线程,如果新线程一直不死就有主线程一直阻塞等待
第一个参数 thread,等待哪给新线程
第二个参数 retval,一个二级指针,接收新线程的返回值
返回值,如果等待成功返回0,如果出错返回错误码
因为子线程一直没退,因此主线程就一直阻塞在join的位置,不再向后走打印逻辑了。
1.2.1 线程参数及返回值
线程等待没啥好说的,跟进程等待一样。但是线程的返回值和参数就有的说了。
我们在使用pthread_create函数创建线程的时候最后一个参数void*arg就是我们要传给线程的入口参数,因为这个参数是void*类型的,因此我们可以传任意类型的参数,包扩结构体对象。
如果能传对对象,那就可以给一个线程传任意多个参数了。
下面说pthread_jion函数的返回值,因为join只能等待void*类型的返回值,这里join接收返回值是用一个输出型参数接收的。
因此主线程接收子线程返回值的方案看起来就有些繁琐,同时因为我们的机器是64位的,因此在强转ret的时候要注意类型。
如果想返回一个对象也可以
我们给ThreadData类稍加修改,让Excute只计算结果并记录不返回任何数据,新增Result函数返回记录好的计算数据
之所以要把这个返回搞的这么繁琐就是因为,只要主线程等待成功了子线程能返回,那结果就一定是可信的。如果搞一个全局变量,那其值就是不安全的,也无法判断其正确性,是不是我们想要的线程算出的数据。
下面我们可以基于上面学到创建线程和等待线程的知识写一个多线程的任务,目标就是创建10个线程,让它们分别用同一个函数进行数据加和计算工作,最后汇总输出它们的加和结果
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <cstring>
#include <string>
#define NUM 10
class ThreadData
{
public:
ThreadData() {}
void Init(const std::string &name, int a, int b)
{
_name = name;
_a = a;
_b = b;
}
void Excute() { _result = _a + _b; }
int Result() { return _result; }
std::string Getname() { return _name; }
void SetId(pthread_t tid) { _tid = tid; }
pthread_t Id() { return _tid; }
int A() { return _a; }
int B() { return _b; }
private:
std::string _name;
int _a;
int _b;
int _result;
pthread_t _tid;
};
std::string toHex(pthread_t tid)
{
char buffer[64];
snprintf(buffer, sizeof(buffer), "0x%lx", tid);
return buffer;
}
void *routine(void *args)
{
ThreadData *td = (ThreadData *)args;
while (true)
{
std::cout << "新线程:" << td->Getname() << " tid: " << toHex(pthread_self()) << std::endl;
td->Excute();
sleep(1);
break;
}
return td;
}
int main()
{
ThreadData td[NUM];
// 准备要处理的数据
for (int i = 0; i < NUM; i++)
{
char id[64];
snprintf(id, sizeof(id), "thread-%d", i);
td[i].Init(id, i * 10, i * 20);
}
// 创建多线程
for (int i = 0; i < NUM; i++)
{
pthread_t id;
pthread_create(&id, nullptr, routine, &td[i]);
td[i].SetId(id);
}
// 等待多线程
for (int i = 0; i < NUM; i++)
{
pthread_join(td[i].Id(), nullptr);
}
// 汇总处理结果
for (int i = 0; i < NUM; i++)
{
printf("td[%d]: %d+%d=%d [%ld]\n", i, td[i].A(), td[i].B(), td[i].Result(), td[i].Id());
}
pause();
return 0;
}
最后的运行结果
我们上面这段代码没有给join函数设置进程返回值的回收任务,因为进程返回值其实就是td数组每个元素的地址,因此没必要接收。
下面我们看这样一段代码
我们跑了两个线程,一个线程创建变量a,并将a的地址放到一个全局变量中去了,第二个线程从全局变量中拿到a的地址,并对该地址下的数据做修改
结果我们发现线程二对数据的改变确确实实的影响到了线程一,因此虽然我们说线程有各自独立的栈空间,但也是因为在代码层面上的隔阂,这块代码用不了那块代码的变量,实际上我们可以通过特殊手段让两线程之间的栈空间互相看到并影响。但是我们还是不要随便瞎搞,因为线程之间的调度顺序是不确定的。
1.3 线程终止 exit(thread版本)
man pthread_exit 查看
哪个线程掉 pthread_exit() 哪个线程就结束,注意这个函数是让线程结束,而如果调用 exit() 则是让整个进程结束,所有线程都会退出。
这个参数我们在join等待函数中见过了,不过join函数是二级指针,这里是一级指针。
其实就是线程结束之后向等待它的主线程发一个返回值指针,出去之后join用二级指针保存一级指针的地址,之后在主线程中使用就解引用就好了。
代码跟前面很像,不过要注意的是把ret转回数字的时候,因为是64位的机器因此要转成长整型,否则会有精度损失报错。
下面还有一种对出方式 man pthread_cancel 查看
发送线程tid来取消一个线程,但线程之间不能乱取消,一般都是主控线程取消别的线程。
取消一个线程一定是这个要被取消的线程已经跑起来了。
这边让主线程在子线程生成5秒之后取消掉
可以看到取消线程拿到的返回值是-1,这是内核定义的一个宏值
不建议使用cancel取消线程,如果有需要可以使用杀进程或杀线程。
1.4 线程分离
如果主线程想在子线程运行的时候做自己的事情,也就是有没有非阻塞等待的方案。
很遗憾线程只提供阻塞等待的方式,但是如果主线程不想等,可以将目标子线程线程设置为分离状态。
线程有两种被等待的状态。1. joined状态,线程默认创建出来后就是要被join等待的。2.detach状态,线程分离状态,一旦线程被设置为分离状态,就可以要求主线程不需要等待该线程。
线程分离的状态就是通过标志位的设置让两个执行流不再有任何关系,当然它们还是要在同一个地址空间中运行的。
在提出线程分离之后,我们就一定要保证主执行流一定是最后推出的,因为线程不像进程,即使父进程退出后子进程还能被当成孤儿托管。如果线程的主控流退出了,被分离的流还没退出,那后果是不可预测的,跟每个线程库各自的处理方案有关。
那如果我们不能保证主执行流一定最后退,就不能创建分离线程,当然后面的代码会有很多设计软件的,这种代码主执行流会一直在一个死循环中,因此它一定是最后退的,线程就可以做分离操作了。
线程分离函数 man pthread_detach() 查看
给一个线程 tid 就可以分离了,线程分离既可以由自己分离自己,也可以由主控线程控制要分离的新线程。
无论是哪种分离方案,最后都等待join不到
可以看到join的返回值是22,也就是说join函数出错了,没有等到目标线程的返回值。
我们知道进程是可以进行程序替换的,但是如果一个线程调用的程序替换,那其他线程就都坏掉了,因此我们肯定不能直接替换。那方法就是让一个新线程去创建一个子进程,子进程中再进行程序替换,子进程的PCB一开始只有新线程一个。
2. 线程封装
还是这个创建一个新线程的代码,这次我们关心打印出来的tid。
tid和我们预想的LWP值是不一样的,tid明显长的多,这是因为tid实际上是一个地址。
我们上节说过了,为了避免用户不会使用LWP的系统调用,因此Linux提供了一个线程库,将LWP的操作方案模拟成了线程的操作方案。
这个库给用户看到的样子就是Linux提供了线程操作的方案,既然是线程的操作,一定就有线程的id、优先级、状态等等属性,与之对应的就是所谓的TCB线程控制块,这个库会给用户一个有线程控制块的假像。
那么动态库是要加载到每个进程的虚拟地址空间中,既然TCB是线程库提供的,那它就要被线程库管理起来。每一个进程中会有若干线程,都在自己的虚拟地址空间中被管理着。
那么在物理内存中,线程库中就包含着机器中所有线程的属性集合,也就是机器中所有的TCB都被线程库给管理起来了。也就是说线程库本身就要对所有线程进行先描述再组织的管理。
线程库的管理方案入上,mmap区就是共享区,映射着原生线程库。
在库中所有的TCB都按数组结构存放,在线程库中TCB本名叫struct pthread,一个线程数据结构就是数组的一个元素,库的任务就是把每一个线程结构体维护好,随时等待内核来调度。这样在库中用一个数据将所有线程都管理起来就完成了组织工作。
每一个元素的地址就是我们之前使用的tid。
我们上节说过每一个线程都有自己独立的栈空间,但是本节我们在线程终止那里使用一个线程通过全局变量的方式访问并修改了另一个线程的栈空间,那这又是怎么回事。
线程的栈空间是独立的这句话没错,但是因为线程们是要共享同一块虚拟地址空间的,因此如果一个线程线程的栈即使是独立的,但也无法脱离这块虚拟地址空间,此时如果另一个线程能拿到这个线程栈的地址,就一定的修改该地址下的数据。但是这么做就是在写bug,我们要避免这种情况,不仅要在逻辑结构上,也要在代码层面上做到线程栈空间的独立。
主线程的栈就是进程创立时分配的栈,后来的新线程的栈都是在共享区又动态申请来一块区域,给了新线程做独立栈空间。
线程局部存储是跟编译器有关的一个东西,在变量定义的时候再最前面加上__thread关键字
此时就会把这变量在每个使用它的线程中另存一份,也就是说一个线程改变这个变量的内容后,另一个线程不会被影响。同时这个变量在两个线程中的地址也不在一个位置了,这也很明显表示了这个变量在每个线程的存储空间中都单独拷了一份。
我们上节说过一个进程中的线程之间大部分数据都是共享的,但是也有私有的一部分,其中包括线程错误码errno,这个变量才C标准库中定义的时候就被__thread修饰,这样就让它变成了一个局部存储的变量,每个线程都有自己独立的错误码了。
这里要注意,线程局部存储只能用来修饰内置类型。
2.1 clone() 系统调用
我们上节讲过Linux内核中只会提供创建轻量级进程LWP的接口clone,它的作用默认就是在地址空间内创建一个新的执行流。下面我们看看这个系统调用。
我们先看前四个参数
第一个参数明显是函数指针,就是我们选择线程的入口地址
第二个参数stack是线程库申请的线程的独立栈的起始地址
第三个参数flags明显是一个选项,这里要选择CLONE_PARENT 或 CLONE_THREAD很明显如果选parent就是重新构建一个新的进程,选thread就是构建一个新线程,这个我们上节也提过一点,fork中选择的就是parent,vfork中选择的就是thread
第四个参数arg是入口函数的参数
2.2 线程库原理
这里我们总结一下线程是怎么在库中封装的,我们知道pthread_create()函数是Linux原生线程库中封装的线程创建函数,我们也从这个函数入手刨析一下。
pthread_create()函数一旦被调用,首先会初始化线程的属性,然后创建一个struct pthread类型的指针*pd,这个指针就指向线程的TCB。
在pthread结构体中会记录线程所在 进程pid 和 tid 这里的tid就是内核中的LWP,一个bool类型user_stack表示用户是否提供栈,还一个bool类型stopped_start表示线程的运行状态,一个void*类型的变量result用来记录该线程最后的返回值,线程的入口函数的地址和参数地址,线程独立栈的起始地址和栈的大小。
线程的TCB创建好之后pthread_create()函数的下一步就是申请独立栈空间,申请时会调用mmap()系统调用这个函数可以简单理解成malloc函数,但是它是malloc函数的底层实现。在申请好独立栈空间后,会把独立栈地址和大小都设置回TCB中。
接下来把线程的方法和参数设置进TCB中。
然后将pd指针强转成pthread_t类型赋给newthread变量,之后我们pthread_self()函数拿到的tid包括pthread_create()函数的返回值都是从newthread中拿到的。这里表明了我们之前拿到的tid都是TCB的起始地址了。
最后前面这些预备工作都做好之后,回调用一个creaete_thread()函数,这个函数的底层就是clone()函数,此时完成了一个LWP的创建,那么在上层的表现就是一个线程创建好了。
因此我们得到的线程tid其实是在库中的TCB起始地址,而TCB是在pthread_create()时就给我们创建好了的,它里面会把栈申请好,把栈的起始地址写入到TCB中,最底层调用clone()创建的轻量级进程。
其实上层用户调用线程库的方法,很多时候都在查看TCB中的各种数据,比如pthread_join()函数,就是拿着要等待的目标线程TCB的地址,去访问result变量拿到线程的返回值。
到这里我们明显可以看出线程库对LWP的封装过程,因此我们把Linux下这种线程称为用户级线程,因为内核中根本没有线程的物理实现,所有线程创建、线程属性等都是通过线程库在系统调用之上模拟出来的。
2.3 手搓线程封装
下面我们基于前面学到的原理用C++风格封装一下pthread_create()
这里提供了两种封装方案,一种是要线程入口函数传参的,一种是不要线程入口函数传参的,main测试函数只提供不传参的创建多线程任务
Thread.hpp
#ifndef _THREAD_HPP_
#define _THREAD_HPP_
#include <iostream>
#include <string>
#include <pthread.h>
#include <unistd.h>
#include <sys/types.h>
#include <functional>
// 不传参版本
namespace atl
{
using func_t = std::function<void()>;
static int number = 1; // 定义静态变量用于给线程起名
enum class TSTATUS
{
NEW,
RUNNING,
STOP
};
class Thread
{
private:
// 因为成员函数有this指针,因此需要用static修饰函数来取消this指针
static void *Routine(void *args)
{
Thread *t = static_cast<Thread *>(args);
t->_status = TSTATUS::RUNNING; // 线程启动执行任务
t->_func();
return nullptr;
}
public:
Thread(func_t func)
: _func(func), _status(TSTATUS::NEW), _joined(true)
{
_name = "thread-" + std::to_string(number++);
_pid = ::getpid();
}
bool Start() // 线程启动
{
if (_status != TSTATUS::RUNNING) // 线程不是启动状态再启动
{
int n = ::pthread_create(&_tid, nullptr, Routine, this);
if (n != 0)
return false;
return true;
}
return false;
}
bool Stop() // 线程终止
{
if (_status == TSTATUS::RUNNING)
{
int n = ::pthread_cancel(_tid);
if(n!=0)return false;
_status = TSTATUS::STOP;
return true;
}
return false;
}
bool Join() // 线程等待
{
if (_joined) // 线程可被等待,如果线程已经被分离了就不能等
{
int n = ::pthread_join(_tid, nullptr); // 不关心线程返回值了
if (n != 0)
return false;
_status = TSTATUS::STOP; // 如果等到了说明被等的线程也运行完停下来了
return true;
}
return false;
}
void Detach() // 线程分离
{
_joined = false;
pthread_detach(_tid);
}
bool IsDetach() // 当前线程是否已经分离
{
return !_joined;
}
std::string Name()
{
return _name;
}
~Thread(){}
private:
std::string _name;
pthread_t _tid;
pid_t _pid;
bool _joined; // 线程没分离
func_t _func; // 线程执行的入口
TSTATUS _status; // 线程运行状态
};
}
// 传参版本
// namespace atl
// {
// static int number = 1; // 定义静态变量用于给线程起名
// enum class TSTATUS
// {
// NEW,
// RUNNING,
// STOP
// };
// template <class T>
// class Thread
// {
// using func_t = std::function<void(T)>;
// private:
// // 因为成员函数有this指针,因此需要用static修饰函数来取消this指针
// static void *Routine(void *args)
// {
// Thread<T> *t = static_cast<Thread<T> *>(args);
// t->_status = TSTATUS::RUNNING; // 线程启动执行任务
// t->_func(t->_data);
// return nullptr;
// }
// public:
// Thread(func_t func, T data)
// : _func(func), _data(data), _status(TSTATUS::NEW), _joined(true)
// {
// _name = "thread-" + std::to_string(number++);
// _pid = ::getpid();
// }
// bool Start() // 线程启动
// {
// if (_status != TSTATUS::RUNNING) // 线程不是启动状态再启动
// {
// int n = ::pthread_create(&_tid, nullptr, Routine, this);
// if (n != 0)
// return false;
// return true;
// }
// return false;
// }
// bool Stop() // 线程终止
// {
// if (_status == TSTATUS::RUNNING)
// {
// int n = ::pthread_cancel(_tid);
// if (n != 0)
// return false;
// _status = TSTATUS::STOP;
// return true;
// }
// return false;
// }
// bool Join() // 线程等待
// {
// if (_joined) // 线程可被等待,如果线程已经被分离了就不能等
// {
// int n = ::pthread_join(_tid, nullptr); // 不关心线程返回值了
// if (n != 0)
// return false;
// _status = TSTATUS::STOP; // 如果等到了说明被等的线程也运行完停下来了
// return true;
// }
// return false;
// }
// void Detach() // 线程分离
// {
// _joined = false;
// pthread_detach(_tid);
// }
// bool IsDetach() // 当前线程是否已经分离
// {
// return !_joined;
// }
// std::string Name()
// {
// return _name;
// }
// ~Thread() {}
// private:
// std::string _name;
// pthread_t _tid;
// pid_t _pid;
// bool _joined; // 线程没分离
// func_t _func; // 线程执行的入口
// TSTATUS _status; // 线程运行状态
// T _data;
// };
// }
#endif
main.cc
#include "Thread.hpp"
#include <unordered_map>
#include <memory>
#define NUM 10
using thread_ptr_t = std::shared_ptr<atl::Thread>;
int main()
{
std::unordered_map<std::string, thread_ptr_t> threads;
for (int i = 0; i < NUM; i++)
{
thread_ptr_t t = std::make_shared<atl::Thread>([]()
{
while(true)
{
std::cout <<"hello word"<<std::endl;
sleep(1);
} });
threads[t->Name()] = t;
}
for(auto& thread:threads)
{
thread.second->Start();
}
for(auto& thread:threads)
{
thread.second->Join();
}
return 0;
}