Linux —— 多线程
一、本篇重点
1.了解线程概念,理解线程与进程区别与联系
2.理解和学会线程控制相关的接口和操作
3.了解线程分离与线程安全的概念
4.学会线程同步。
5.学会互斥量,条件变量,posix信号量,以及读写锁
6.理解基于读写锁的读者写者问题
二、线程概念
1. 什么是线程
在一个程序里的一个执行路线就叫做线程,线程是一个进程内部的控制序列,线程是CPU调度的基本单位。
我们都知道进程可以理解为就是一个正在执行的程序,进程具有它自己独立的虚拟地址空间,并且有这各种相关的资源,线程是进程中的一个独立执行路径,可以看作是一个轻量级的“子任务”。线程共享进程的内存空间和其他资源,但每个线程都有自己的栈、程序计数器和寄存器。
线程是CPU调度的基本单位,进程是承担分配系统资源的实体。
线程在Linux中的实现,由于线程需要被管理,因此也需要有相关的结构去对其进行先描述,后组织,但不同于Windows系统,Linux系统并没有单独为线程这个概念去重新创造一个类去描述管理,而是直接复用进程的PCB去描述和管理线程,所以线程也被叫做轻量级进程。
2. 线程的优缺点
2.1 优点
2.1.1 资源消耗较少
线程创建和销毁的成本都比进程要低,线程贡献进程的内存和资源,不需要为每个线程都分配新的内存空间和资源,尤其是操作系统资源。
2.1.2 更快的上下文切换
线程切换只需要保存和恢复线程的上下文,而不需要更换内存地址空间,切换成本低
2.1.3 更高的并发性能
适合并行计算,多线程可以充分的利用多核处理器,线程可以并发执行,增加程序的吞吐量
2.1.4 更简单的线程通信
由于线程是共享同一份虚拟地址空间,可以直接访问和修改共享内存中的数据,相比于进程间的通信,线程之间的通信成本要低很多。
2.2 缺点
2.2.1 复杂性增加
存在并发问题,即线程间共享内存可能导致竞争条件、死锁、同步问题等等,需要使用锁或者其他同步机制来管理对共享数据的访问
调试困难,多线程程序中的问题可能难以重现和调试,尤其是涉及到并发问题的时候。
2.2.2 线程安全问题
数据竞争,多个线程同时访问和修改共享数据可能导致数据竞争,造成不可预期的结果,需要小心设计线程安全的数据访问模式
2.2.3 有限的资源隔离
错误传播,由于线程共享进程的资源,一个线程的错误可能会影响到同一个进程中的其他线程,甚至导致整个进程崩溃。
二、线程控制
1. Linux系统调用接口
1.1 线程创建 phread_create
接口说明
该接口用于创建一个新线程,首先我们可以定义一个或者一批phread_t类型的指针,它作为该函数的第一个输出型参数,能够在成功创建好新线程后,将该线程的id返回,该id标识这这个新创建的线程,并且用于后续对该线程的各项操作,然后是第二个参数,该参数用于设置线程的属性,一般直接用NULL表示默认,第三个参数则是要交给该线程执行的任务,是个函数指针类型,并且该函数的返回值是void*,参数也是void*,该设计方便我们将我们需要交给该线程的信息通过对象的方式去传递,并且在需要该线程的执行结果等等信息时,同样以对象的方式传递出来。第四个参数就是任务函数的参数指针
头文件
#include<pthread.h>
函数声明
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg);
pthread_t
是一个用于表示线程标识符(线程ID)的数据类型,它由POSIX标准定义,通常用于在C/C++程序中与线程相关的操作。它的具体实现取决于操作系统和编译器,但在Linux系统中,pthread_t
通常是一个无符号整数类型或者结构体类型,用于唯一标识系统中的一个线程。thread
:指向线程标识符的指针,用于存储新线程的ID。attr
:指向线程属性对象的指针,一般使用默认属性,传递NULL
。start_routine
:新线程开始执行的函数。arg
:传递给start_routine
函数的参数。
返回值:成功返回0,错误返回错误码
1.2 线程终止 pthread_exit
接口说明
终止当前线程,并且将该线程执行任务的结果等等信息,可以通过类对象的方式去将指针作为返回值,由pthread_join这个接口去接受到这个指针,也可以直接用return的方式去将结果传出,需要注意的是,该指针不能定义在这个线程的栈空间上,而是要定义在堆上,否则线程退出后,该线程栈内的内容就会被释放。
头文件
#include<pthread.h>
函数声明
void pthread_exit(void *retval);
retval
:返回值,传递给 pthread_join
的 retval
参数。
1.3 线程等待 pthread_join
接口说明
线程在结束后,里面的资源并不会被释放,仍然在进程的空间中,因此我们需要将该空间释放,并且,通过该函数可以获取线程最后结束后,返回的指针,由于这里的设计是希望通过输出型参数去获得一个void*类型的一个指针,因此这里的参数类型必须是void**,其第一个参数是指定要等待线程的id,等待方式是阻塞等待。
函数声明
#include <pthread.h>
int pthread_join(pthread_t thread, void **retval);
thread
:需要等待的线程的线程ID(pthread_t
类型)。这是目标线程的标识符,通常是在创建线程时由pthread_create
返回的。retval
:指向一个指针的指针,用于存储目标线程的退出状态(返回值)。可以为NULL
,表示不关心目标线程的退出状态。
1.4 线程分离 pthread_detach
接口说明
线程分离的概念是指,被分离的线程与创建该线程完全独立,当一个线程被分离时,在该线程执行结束后,系统会自动回收该线程的资源,此时就无需再调用pthread_join去回收,因此在一些线程任务中,我们不关注该线程的执行结果或者不需要改线程返回某些信息时,我们可以将该线程进行分离。
线程的分离可以是在线程被创建出来时,在第二个参数中设置线程属性时直接设置(PTHREAD_CREATE_DETACHED),也可以在线程中对其进行设置,使用接口pthread_detach也可以将指定id的线程进行分离,这里介绍的接口就是pthread_detach
函数声明
#include <pthread.h>
int pthread_detach(pthread_t thread);
-
参数:
thread
:需要设置为分离状态的线程的线程ID(pthread_t
类型)。
-
返回值:
- 成功时返回
0
。 - 失败时返回错误码(例如
ESRCH
表示线程不存在,EINVAL
表示线程已经是分离状态)。
- 成功时返回
2. 简单封装一个线程
我们将线程的接口进行简单的封装,这样可以使用时更加方便,以及得到更多的线程信息
我们将线程封装成类,每一个对象就是一个线程,类成员中包含线程的id和各种相关信息,这里可以自行设计,然后就是把pthread_create接口封装成run()函数,在我们创建线程对象的时候,就把关键信息给初始化到类内创建对象,run函数就是启动线程,需要注意的细节是:
在类内需要去封装pthread_create时,回调函数的类型需要void* fun(void*),而类内有个隐藏的this指针,所以需要把该函数设为静态成员,再将this指针传给它
#pragma once
#include<iostream>
#include<pthread.h>
#include<string>
#include<stdlib.h>
using namespace std;
class Thread
{
public:
typedef enum//线程状态
{
NEW = 0,
RUNNING,
EXITED
}ThreadStatu;
typedef void (*func_t)(void*);//函数指针
public:
Thread(int num,func_t func,void* args):_tid(0),_status(NEW),_func(func),_args(args)
{
char name[128];
snprintf(name,sizeof(name),"thread-%d",num);
_name = name;
}
int status() { return _status; }
string threadname() { return _name; }
pthread_t threadid()
{
if(_status == RUNNING)
{
return _tid;
}
else
{
return 0;
}
}
void operator()()// 仿函数,让线程执行任务的
{
if(_func != nullptr) _func(_args);
}
static void* runHelper(void* args)//注意,这里的args和用户传入的args不一样,这里是this指针,用于调用函数的
{
Thread* pt = (Thread*)args;
(*pt)();
return nullptr;
}
void run()//启动线程
{
int n = pthread_create(&_tid,nullptr,runHelper,this);
if(n != 0) exit(1);
_status = RUNNING;
}
void join()//这里设计简单一点,默认不需要获取函数的返回值
{
int n = pthread_join(_tid,nullptr);
if(n!=0)
{
cerr << "join error" << endl;
return;
}
_status = EXITED;
}
~Thread()
{}
private:
pthread_t _tid;//线程id
string _name;
func_t _func; //线程未来要执行的回调
ThreadStatu _status;
void* _args;
};
三、 线程同步
线程同步是用于解决线程安全问题的一种机制,确保多个线程正确有序地访问共享资源。常见的方法有互斥锁(mutex)、条件变量(Condition Variable)等等。
线程安全指的是,当多个线程同时访问共享资源时,如果没有正确的同步机制,就会导致竞争条件,造成数据错误或程序崩溃。线程安全就是保证多个线程访问同一个资源时不会出现冲突。
- 同步关系:解决的是线程间“谁先谁后”或“何时进行”的问题,让线程在适当的条件下协调工作。
- 互斥关系:解决的是“不能同时进行”的问题,确保共享资源在任何时候只被一个线程操作。
1. 互斥锁
1.1 介绍 pthread_t
互斥锁是一个常用的线程同步机制,用于保护共享资源,防止多个线程同时访问临界资源导致的数据竞争,互斥锁能够保证临界资源一次只能被一个线程访问,互斥锁pthread_t 的使用依赖于pthread库,并且,通常我们需要将其定义在全局中,且对于多个线程去竞争同一份临界资源时,需要对该临界资源上同一把锁。
1.2 初始化 pthread_mutex_init
#include <pthread.h>
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);
-
参数:
mutex
:指向要初始化的互斥锁对象的指针。attr
:指向互斥锁属性对象的指针,如果使用默认属性可以传入NULL
。
-
返回值:
- 成功返回
0
,失败返回错误码。
- 成功返回
1.3 销毁 pthread_mutex_destroy
int pthread_mutex_destroy(pthread_mutex_t *mutex);
-
参数:
mutex
:指向要销毁的互斥锁对象的指针。
-
返回值:
- 成功返回
0
,失败返回错误码。
- 成功返回
1.4 加锁和解锁
1.4.1 加锁 pthread_mutex_lock()
int pthread_mutex_lock(pthread_mutex_t *mutex);
-
参数:
mutex
:指向要加锁的互斥锁对象的指针。
-
返回值:
- 成功返回
0
,失败返回错误码。
- 成功返回
1.4.2 解锁 pthread_mutex_unlock()
int pthread_mutex_unlock(pthread_mutex_t *mutex);
-
参数:
mutex
:指向要解锁的互斥锁对象的指针。
-
返回值:
- 成功返回
0
,失败返回错误码。
- 成功返回
1.5 示例
#include <pthread.h>
#include <stdio.h>
pthread_mutex_t mutex; // 定义一个全局互斥锁
int shared_data = 0; // 共享资源
void *thread_function(void *arg) {
pthread_mutex_lock(&mutex); // 加锁
shared_data++; // 访问共享资源
printf("Thread %d incremented shared data to %d\n", *(int *)arg, shared_data);
pthread_mutex_unlock(&mutex); // 解锁
return NULL;
}
int main() {
pthread_t thread1, thread2;
int thread_id1 = 1, thread_id2 = 2;
// 初始化互斥锁
pthread_mutex_init(&mutex, NULL);
// 创建两个线程
pthread_create(&thread1, NULL, thread_function, &thread_id1);
pthread_create(&thread2, NULL, thread_function, &thread_id2);
// 等待线程结束
pthread_join(thread1, NULL);
pthread_join(thread2, NULL);
// 销毁互斥锁
pthread_mutex_destroy(&mutex);
return 0;
}
1.6 简单封装一个互斥锁
利用构造和析构的特性,去构建一个锁的类,这个类只需要传入锁的指针,当定义出该对象的时候,构造会自动上锁,而在该临时对象被销毁的时候,会利用析构将锁自动解除。
#pragma once
#include<pthread.h>
#include<iostream>
class Mutex
{
public:
Mutex(pthread_mutex_t* pm):_pm(pm)
{}
void lock()
{
pthread_mutex_lock(_pm);
}
void unlock()
{
pthread_mutex_unlock(_pm);
}
~Mutex()
{}
private:
pthread_mutex_t* _pm;
};
class LockGuard
{
public:
LockGuard(pthread_mutex_t* pm):_mutex(pm)
{
_mutex.lock();
}
~LockGuard()
{
_mutex.unlock();
}
private:
Mutex _mutex;
};
2. 死锁
2.1 概念
死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态。
例如:临界资源有AB两个锁,而1号线程申请了A锁,成功,并申请B锁,失败等待,而此时2号线程的状态是申请了B锁成功,而A锁失败等待,此时就形成了死锁
2.2 死锁的必要条件
互斥条件:一个资源每次只能被一个执行流使用
请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放
不剥夺条件: 一个执行流已获得的资源,在末使用完之前,不能强行剥夺
循环等待条件: 若干执行流之间形成一种头尾相接的循环等待资源的关系
3. 可重入函数和线程安全的概念
3.1 概念
线程安全:多个线程并发访问同一个资源的时候,我们需要对该资源进行保护,利用互斥等方式,让多线程有序的,按照现实逻辑的去访问。
可重入函数:可重入函数指的是允许多个线程(进程)对该函数同时进行访问,而不会导致预料不到的错误的函数
3.2 常见的线程不安全情况
3.2.1 不保护共享变量的函数
在多线程访问该变量时,可能会由于中断等原因,造成该变量出现数据竞争的情况
3.2.2 函数状态随着被调用,状态发生变化的函数
函数状态会随着被调用发生状态变化,也是会导致并发访问后数据竞争等结果不可预测的问题
3.2.3 返回指向静态变量指针的函数
由多个线程共同维护一个静态变量,可能会导致潜在的数据竞争,加大维护的成本
3.2.4 调用线程不安全函数的函数
线程不安全的函数本身被其他函数调用,该函数自然也是线程不安全的
3.3 常见线程安全的情况
每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的,类或者接口对于线程来说都是原子操作,多个线程之间的切换不会导致该接口的执行结果存在二义性。
4. 条件变量
4.1 什么是条件变量?
条件变量(Condition Variable) 是一种用于线程同步的机制,它允许线程在等待某个特定条件满足时挂起(即阻塞),并在条件满足后被唤醒。条件变量通常与互斥锁(mutex)配合使用,以确保线程在修改共享数据时的原子性操作。
我们可以简单理解成,条件变量就是一个给线程排队的队列,我们可以在某些情况下,把线程先挂起排队,该线程等待的资源满足时,再把其放出来,搭配互斥锁使用的话,这里的资源就是互斥锁内的资源,我们可以利用条件变量去有序的申请和使用互斥锁去访问临界资源
4.2 为什么要有条件变量?
条件变量通常用于维护线程同步,什么是线程同步?线程同步指的就是多线程并发访问时,我们需要设计合理的,符合现实逻辑的,高效的策略去进行多线程并发访问,条件变量就是用于设计线程同步的一种机制
4.3 怎么用条件变量?
4.3.1 条件变量类型 pthread_cond_t
使用的方式大致与互斥锁的设计理念差不多,也是需要定义出一个条件变量对象,把其看作为一个存放线程的队列去使用。
4.3.2 初始化 pthread_cond_init
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);
第一个参数是条件变量的指针,第二个参数用于设置属性,返回值为整形,返回0表示成功,返回其他表示失败。
4.3.3 销毁 pthread_cond_destroy
int pthread_cond_destroy(pthread_cond_t *cond);
将变量指针作为参数,销毁对创建出来的对象资源进行释放,成功返回0
4.3.4 等待挂起 pthread_cond_wait
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
第一个参数是变量指针,第二个是互斥锁指针,我们通常需要配合互斥锁的使用去对临界资源进行线程同步,这是为了保证临界资源部分的原子性,所以通常是在互斥锁的后面使用该接口去挂起线程,挂起线程时,同时会将锁给短暂的释放给其他线程使用
4.3.5 唤醒一个线程 pthread_cond_signal
int pthread_cond_signal(pthread_cond_t *cond);
将一个“队头线程”给唤醒
4.3.6 唤醒全部被挂起的线程 pthread_cond_broadcast
int pthread_cond_broadcast(pthread_cond_t *cond);
唤醒全部的被该条件变量挂起的线程
5. 生产消费模型(Producer-Consumer Model)
5.1 什么是生产消费模型?
生成消费模型是在处理多线程或多进程任务的一种经典模式,用于解决多个实体(生产者和消费者)之间的资源共享和协作问题。这个模型帮助管理和协调生产者和消费者对共享资源(通常是一个缓冲区或队列)的访问,以确保系统的稳定性和高效性。
- 生产者(Producer):负责生产数据或任务,将其放入共享的缓冲区或队列中。
- 消费者(Consumer):负责从共享的缓冲区或队列中取出数据或任务,并进行相应的处理或消费。
- 共享缓冲区(Buffer):一个中间存储区域,用于临时存放生产者产生的数据,供消费者取用。缓冲区通常有固定的大小(容量),可以是有限的(有界缓冲区)或无限的(无界缓冲区)。
- 同步机制:生产者和消费者需要通过某种方式进行同步,以协调它们对共享缓冲区的访问。这通常通过互斥锁(mutex)和条件变量(condition variable)来实现,以确保数据的一致性和线程间的正确执行顺序。
5.2 基于BlockingQueue的生产消费模型
5.2.1 什么是BlockingQueue?
阻塞队列(BlockingQueue)是一种数据结构,它在队列这种基本数据结构的基础上,增加了在特定情况下阻塞线程的功能。
我们这里可以通过简单的对基本队列进行封装,去实现一个简单的生产消费模型
5.2.2 设计思路
参数设计
首先就是对参数的设计,队列是肯定的,此外我们还需要知道其容量, 这个队列作为生产者和消费者共同访问的共享缓存区,我们肯定需要维护这个缓存区,所以需要用到互斥锁,避免多个线程并发访问时带来的数据竞争问题,然后是需要维护消费者和生产者的同步关系,当生产者生成满时,我们需要将生产者线程进行挂起等待,而消费者将数据消费空时,我们需要将消费者线程挂起等待,所以我们至少需要两个条件变量,综上考虑,我们设计类成员参数如下
template <class T>
class BlockQueue
{
private:
queue<T> _q;//队列
int _cap;//容量
pthread_mutex_t _mutex;//互斥锁
pthread_cond_t _full;//生产者对应条件变量
pthread_cond_t _empty;//消费者对应条件变量
};
基本框架
设计好参数后,我们开始设计基本的一个结构,首先是类内我们需要构造和析构函数,构造上,根据测试要求先简单设计一下参数,然后在main函数中去先用单生产单消费者进行测试,把基本的多线程模版写好
main.c
#include<time.h>
#include<sys/types.h>
#include<unistd.h>
#include"BlockQueue.hpp"
void* consumer(void* arg)//消费者
{
BlockQueue<int>* bq = static_cast<BlockQueue<int> *>(arg);
while(true)
{
//sleep(1);
int data = 0;
// 1. 将数据从blockqueue中获取 -- 获取到了数据
bq->pop(&data);
// 2. 结合某种业务逻辑,处理数据
cout << "consumer data: " << data << endl;
}
}
void* producer(void* arg)//生产者
{
BlockQueue<int>* bq = static_cast<BlockQueue<int>* >(arg);
while(true)
{
sleep(1);
//1. 先通过某种渠道获取数据
int data = rand() % 10 + 1;
//2. 将数据推送到blockqueue -- 完成生产过程
bq->push(data);
cout << "productor data: " << data << endl;
}
}
int main()
{
srand((uint64_t)time(nullptr) ^ getpid());
pthread_t c,p;
BlockQueue<int>* bq = new BlockQueue<int>;
pthread_create(&c,nullptr,consumer,bq);
pthread_create(&p,nullptr,producer,bq);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
delete bq;
return 0;
}
类的设计
构造,析构完成以后,我们要考虑,设计基本的接口,对于生产者而言,需要把数据放入缓存区,而消费者需要往缓存区获取数据,至少需要Push和Pop接口,并且维护好互斥和同步关系
#include<iostream>
using namespace std;
#include<queue>
#include<pthread.h>
const int gcap = 5;
template <class T>
class BlockQueue
{
private:
queue<T> _q;//队列
int _cap;//容量
pthread_mutex_t _mutex;//互斥锁
pthread_cond_t _full;//生产者对应条件变量
pthread_cond_t _empty;//消费者对应条件变量
public:
BlockQueue(const int cap = gcap)
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_full,nullptr);
pthread_cond_init(&_empty,nullptr);
_cap = cap;
}
bool isFull() { return _q.size() == _cap; }
bool isEmpty() { return _q.empty(); }
void push(const T& in)
{
pthread_mutex_lock(&_mutex);
while(isFull())
{
pthread_cond_wait(&_full,&_mutex);
}
_q.push(in);
//由于生产者进行了生产,此时一定可以进行消费,我们可以自定义一些策略去唤醒消费者,这里测试选择当生产超过货物的1/2后再唤醒消费者
if(_q.size() > _cap/2) pthread_cond_signal(&_empty);
pthread_mutex_unlock(&_mutex);
}
void pop(T* out)
{
pthread_mutex_lock(&_mutex);
while(isEmpty())
{
pthread_cond_wait(&_empty,&_mutex);
}
*out = _q.front();
_q.pop();
//由于消费者进行了消费,我们可以自定义策略去将生产者唤醒
pthread_cond_signal(&_full);
pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_full);
pthread_cond_destroy(&_empty);
}
};
细节
在设计Push和Pop接口时,我们为了维护同步关系,需要对当前队列是否为空或者满进行检测,若消费者消费时,当前为空,则需要将消费者挂起等待,但这里的检测要考虑到有多个消费者时,当前为空,都被挂起等待,货物补充好后,可能同时进行唤醒,此时程序会在挂起位置进行执行,如果此时只生产了比较少的货物,而全部消费者同时被唤起,部分消费者成功获取到了货物,而一部分被唤醒后,此时货物空了,若是不进行循环语句判断,则会出现不可预测的错误,生产者也是同理。
测试
控制消费者消费的比较快,生产较慢的情况是否能够预期的出现将消费者挂起,生产一个消费一个的情况,测试多个生产消费同时出现的情况等等
6. 信号量
6.1 什么是信号量?
信号量是一种用于同步线程或者进程的机制,它和互斥锁与条件变量的方式有所区别,我们可以先简单的理解信号量就是一种描述当前资源数量的计数器,当信号量为1是,即当前资源只有一个时,被称为二元信号量,类似于互斥锁。
操作系统根据信号量的值来决定是否允许线程或进程继续执行,线程在访问某个资源时,先申请信号量,申请成功才可以访问资源,失败则进行阻塞等待资源就绪,我们将用信号量去模拟实现一个基于循坏队列的生产消费模型。
信号量有两个基本的操作:
- 等待(Wait, P操作):请求访问资源。如果信号量值大于0,则减1,并继续执行;如果信号量值为0,则线程进入等待状态,直到信号量的值大于0。
- 信号(Signal, V操作):释放资源。当线程释放资源时,信号量的值加1,如果有等待线程,则唤醒等待线程。
6.2 基本的接口介绍
6.2.1 初始化 sem_init
信号量的数据类似是sem_t,可以认为就是一个资源计数器
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
sem:参数指针
pshared:若是0则表示信号量用于线程间共享,非0则是进程间共享
value:表示起始的资源的数量
返回值:成功返回0,失败返回-1并设置
errno
。
6.2.2 销毁 sem_destroy
int sem_destroy(sem_t *sem);
sem:参数指针
返回值:成功返回0,失败返回-1并设置
errno
。
6.2.3 等待信号量(P操作)
int sem_wait(sem_t *sem);
向信号量申请资源,信号量当前大于0,则申请成功,信号量减一,若失败则阻塞
6.2.4 释放信号量(V操作)
int sem_post(sem_t *sem);
表示该信号量表示的资源加一,信号量++
6.3 基于RingQueue的生产消费模型
main.cc
RingQueue.hpp
Task.hpp
我们创建一个简单的任务类型参数,该类执行简单的加减乘除取模运算,生产者生产任务,也就是构建任务对象,向缓冲区生产放入任务,消费者获取任务并进行处理
#pragma once
#include<iostream>
#include<string>
#include<unistd.h>
class Task
{
public:
Task()
{}
Task(int x,int y,char op):_x(x),_y(y),_op(op),_result(0),_exitCode(0)
{}
void operator()()
{
switch(_op)
{
case '+':
{
_result = _x +_y;
break;
}
case '-':
{
_result = _x -_y;
break;
}
case '*':
{
_result = _x *_y;
break;
}
case '/':
{
if(_y == 0)
{
_exitCode = -1;
break;
}
_result = _x /_y;
break;
}
case '%':
{
if(_y == 0)
{
_exitCode = -1;
break;
}
_result = _x %_y;
break;
}
default:
break;
}
sleep(1);
}
std::string formatArg()//任务内容信息,方便测试
{
return std::to_string(_x) + _op + std::to_string(_y) + "= ?";
}
std::string formatRes()
{
return std::to_string(_result) + "(" + std::to_string(_exitCode) + ")" ;
}
~Task()
{}
private:
int _x;
int _y;
char _op;
int _result;
int _exitCode;
};
7. 线程池
7.1 什么是线程池?
线程池就是我们通过管理一批创建好的线程,让这批线程处于等待某种任务的状态,并且管理好一个任务队列,客户端向任务队列发布任务后,我们可以用当前空闲的线程去立刻执行该项任务,这就避免了需要收到任务后才创建线程等一系列消耗,本质也可以看作是一个生产消费模型,共享区就是任务队列,生产者对应的就是客户端,消费者对应的就是各个待机的线程。
7.2 简单的线程池设计
首先我们需要一批被管理起来的线程,我们可以用vector容器,然后是需要一个任务队列,这个任务队列的进出以及判断都需要进行互斥管理,也就是加锁,然后是需要让vector内的线程进入等待挂起的状态,只有任务队列push进任务后,我们才将某个在挂起等待的线程唤醒,并让其执行该任务。
所以首先确定好基本的类成员,然后是设计好适合的构造析构,接着我们至少需要设计一个对外提供服务端向线程池内部任务队列push数据的一个接口,然后就是内部进行处理,需要设计获取任务的接口pop,还有任务的后续处理
还有对于一些上锁,条件变量等操作,我们可以简单做个封装,这样更加规范好看
ps:这里的参考代码用的线程是之前封装过的线程
#pragma once
#include<iostream>
#include<vector>
#include<queue>
#include"task.hpp"
#include<pthread.h>
#include"Thread.hpp"
static int N = 5;
template<class T>
class ThreadPool
{
public:
//封装一下上锁和线程等待操作
void lockQueue()
{
pthread_mutex_lock(&_lock);
}
void unlockQueue()
{
pthread_mutex_unlock(&_lock);
}
void threadWait()
{
pthread_cond_wait(&_cond,&_lock);
}
void threadWakeUp()
{
pthread_cond_signal(&_cond);
}
bool isEmpty()
{
return _Task.empty();
}
public:
ThreadPool(int num = N):_num(num)
{
pthread_mutex_init(&_lock,nullptr);
pthread_cond_init(&_cond,nullptr);
}
static void threadRoutine(void* args)
{
//pthread_detach(pthread_self());
ThreadPool<T>* tp = static_cast<ThreadPool<T>* >(args);
while(true)
{
tp->lockQueue();
while(tp->isEmpty())
{
tp->threadWait();
}
// 到这说明当前任务队列有任务需要被执行
// 该线程需要获取到任务并执行
T t = tp->popTask();
// 获取到任务后,应该及时归还锁,后续执行应当在临界区外
tp->unlockQueue();
//处理获取到的任务
t();
//测试
std::cout << "thread handler done, result: " << t.formatRes() << std::endl;
}
}
void init()
{
// 创建一批线程对象,去构建线程池
for(int i = 0;i<_num;i++)
{
Thread t(i,threadRoutine,this);
_threads.push_back(t);
}
}
void start()
{
//启动线程池
for(int i = 0;i<_num;i++)
{
_threads[i].run();
}
}
T popTask()
{
T t = _Task.front();
_Task.pop();
return t;
}
void pushTask(const T& t)
{
lockQueue();
_Task.push(t);
threadWakeUp();
unlockQueue();
}
~ThreadPool()
{
pthread_mutex_destroy(&_lock);
pthread_cond_destroy(&_cond);
//回收线程
for(int i = 0;i<_num;i++)
{
_threads[i].join();
}
}
private:
std::vector<Thread> _threads;
int _num;
std::queue<T> _Task;
pthread_mutex_t _lock;
pthread_cond_t _cond;
};
总结
本篇学习总结了关于多线程的相关知识,首先是了解了线程的概念以及相关的接口使用,然后我们自行封装一个线程,加深对线程的理解,接着我们谈到了关于多线程并发访问的问题,我们要如何解决这个问题,就需要学习到关于线程同步等概念,学习到互斥锁、条件变量等等,用这些接口去维护多线程的安全问题,然后是谈到了多线程编程中常见的一种模型,生产消费模型,并且用阻塞队列和循环队列,分别的用条件变量和信号量这两种方式去分别的实现了一个简单的生产消费模型的模板,最后学到了一个最常见的一个基于生产消费模型设计理念的例子——进程池的设计,并且我们也简单设计了一个简单的线程池加深各个接口的理解