手写线程池
一 项目概念
1.1 并发与并行
并发:在同一时间上有多个任务进行,每个任务都分配了时间片,cpu切换速度较快,从宏观上可以看作是一起执行的,实际上一段时间,只有一个任务
并行:指在同一时刻,有多个任务在同时执行,这需要有多个处理单元(如多个 CPU 核心、多个处理器或多台计算机)同时工作,每个任务可以分配到不同的处理单元上,真正地同时进行处理。
1.2 IO密集型和CPU密集型
IO密集型: 程序里面的指令,涉及了一些IO操作,比如设备,网络,文件操作,对于计算操作较少
CPU密集型:CPU 密集型任务或应用程序是指在其执行过程中,主要的时间和资源消耗在 CPU 的计算处理上
那对于这两个来说,单核和多核,使用多线程有什么不同呢?
对于IO密集型来说,无论是单核还是多核使用多线程都是可以的,IO密集型更加时候多线程的模式,因为会有等待外部输入的情况的
对于CPU密集型来说如果是单核多线程,那么就会出现一个线程算一段,算完,再组合起来,其实线程的调度是有额外消费的(线程的上下文切换)
所以并不是所有的情况都适合用多线程
1.3 创建很多线程可以吗?线程真的是越多越好?
1.线程的创建和销毁都是非常“重”的操作
2.线程本身占用大量内存
3.线程的上下文的切换需要占用大量时间
4.线程的大量唤醒会导致系统负载很大,可能导致宕机
每个线程都有线程函数,而我们的用户空间在虚拟地址上只有3G,那么每个函数都会开一个栈帧,那么空间被大量占用,运行的空间就会少
1.4 线程同步
线程通信包括线程互斥和线程通信
线程互斥包括了互斥锁和atomic原子类型
线程通信包括条件变量和信号量
二 项目整体架构
这张图片就是整个项目的整体架构了,这个项目的使用就是外部写出需求,把需求扔进线程池里面,里面的线程会去拿取任务,然后通过返回值得到结果
所以ThreadPool里面有两个队列,一个是任务队列,一个是线程队列
这里我们需要注意的是任务队列会被多个线程进入,所以要保证是线程安全的,线程队列不用,因为没有线程竞争的情况
2.1 ThreadPool类
这个类需要包含两个队列,一个是任务队列,一个是线程队列
线程用的map,任务用的queue,对于线程容器来说,在后面细节的设计中涉及了进程id的使用,所以我们使用map这样操作比较方便,对于任务队列来说,先进先出的特性,很快就想到了队列
我们使用的时候需要有开启线程池的接口,同时我们还需要设计一下线程池的启动模式,比如是动态增长的,还是固定不变的
关于队列的大小,不是固定不变的
对于 Result 我们后面再做解析
对于start函数的详解
void ThreadPool::start(int initThreadSize)
{
isPoolRunning_ = true;
curThreadSize_ = initThreadSize;
this->initThreadSize_ = initThreadSize;
for (int i = 0; i < initThreadSize_; i++)
{
auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this,std::placeholders::_1));
int threadId = ptr->getThreadId();
threads_.emplace(threadId, std::move(ptr));
/* this->threads_.emplace_back(std::move(ptr));*/
}
for (int i = 0; i < initThreadSize_; i++)
{
this->threads_[i]->start();
idleThreadSize_++;
}
}
对于线程池的开始,我们应该先把线程队列创建好,也就是创建一个一个的线程,放入到容器中去,其中线程对象使用的智能指针,传入了一个函数对象,使用了bind绑定了一个函数,然后加入到容器中,其中需要注意的是,因为是unique_ptr是不能拷贝和赋值的,所以我们得使用move(ptr)
然后每个线程开始启动,这里的启动还没拿取任务,所以是空闲的,设置了一个变量去记录它
对于 submitTask函数详解
Result ThreadPool::submitTask(std::shared_ptr<Task> sp)
{
//获取锁
std::unique_lock<std::mutex> lock(taskQueMtx_);
//线程通信
if (!notFull_.wait_for(lock, std::chrono::seconds(1),
[&]()->bool {return taskQue_.size() < taskQueMaxThreadHold_; }))
{
std::cout << "提交超时" << std::endl;
return Result(sp, false);
}
//如果有空余把任务放入
taskQue_.emplace(sp);
taskSize_++;
//通知消费者消费
notNull_.notify_all();
//根据任务的数量和线程的数量,判断是否需要创建新的线程
if (poolMode_ == PoolMode::MODE_CACHED
&&taskSize_>idleThreadSize_
&&curThreadSize_<threadSizeThresHold_)
{
//创建新线程
auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this,std::placeholders::_1));
int threadId = ptr->getThreadId();
threads_.emplace(threadId, std::move(ptr));
threads_[threadId]->start();
curThreadSize_++;
}
return Result(sp);
}
这里的参数设置成了智能指针,因为我们没办法判断用户提交的任务是不是已经销毁了,如果是单纯的指针,那么就会出现野指针的情况
提交任务涉及了任务队列,所以我们需要保持该队列的线程安全,这里我们用互斥锁和条件变量进行操作
这里线程会拿到锁然后判断队列是不是满的,如果是满的那么该线程会进入等待,同时放出锁,让其他线程去抢,也就是每个线程,如果不是满的,那么就会往下走,放入用户提交的任务,同时通知消费者消费
然后就是模式的选择,如果是动态增长的就需要考虑线程数量的大小了
对于 线程函数的详解
ThreadPool::threadFunc
是线程池中的工作线程执行的函数,它的主要职责是从任务队列中取出任务并执行。同时,该函数还处理了线程的超时回收、线程池关闭等情况。
void ThreadPool::threadFunc(int threadid)
{
auto lastTime = std::chrono::high_resolution_clock().now();
for (;;)
{
std::shared_ptr<Task> task;
{//获取锁
std::unique_lock<std::mutex>lock(taskQueMtx_);
//判断是不是超时返回,还是有任务阻塞
while (taskQue_.size() == 0)
{
if (poolMode_ == PoolMode::MODE_CACHED)
{
if (std::cv_status::timeout ==
notFull_.wait_for(lock, std::chrono::seconds(1)))
{
auto now = std::chrono::high_resolution_clock().now();
auto dur = std::chrono::duration_cast<std::chrono::seconds>(now - lastTime);
if (dur.count() >= 60 &&
curThreadSize_ > initThreadSize_)
{
//开始回收当前线程
threads_.erase(threadid);
curThreadSize_--;
idleThreadSize_--;
return;
}
}
}
else
{
notNull_.wait(lock);
}
//线程池要结束
if (!isPoolRunning_)
{
threads_.erase(threadid);
curThreadSize_--;
idleThreadSize_--;
exitCond_.notify_all();
return;
}
}
idleThreadSize_--;
//消费任务
task = taskQue_.front();
taskQue_.pop();
taskSize_--;
//通知
if (taskQue_.size() > 0)
{
notNull_.notify_all();
}
notFull_.notify_all();
}
if (task != nullptr)
{
task->exec();
}
idleThreadSize_++;
}
threads_.erase(threadid);
curThreadSize_--;
idleThreadSize_--;
/*std::cout << "begin threadFunc" << std::endl;
std::cout << std::this_thread::get_id() << std::endl;
std:: cout<< "end threadFunc" << std::endl;*/
}
这里就是消费者的模型,同时涉及了两个模式的转换,还有一些小细节
当任务队列中有任务时,将空闲线程数量 idleThreadSize_
减 1,表示当前线程不再空闲。然后从任务队列的头部取出一个任务,将其存储在 task
变量中,并将任务从队列中移除,同时减少任务数量 taskSize_
。
- 如果任务队列中还有剩余任务,使用
notNull_.notify_all()
通知所有等待在notNull_
条件变量上的线程,表明任务队列中还有任务可供消费。 - 使用
notFull_.notify_all()
通知所有等待在notFull_
条件变量上的线程,表明任务队列中有空闲空间,可以继续提交任务。
2.2 Thread类
线程类就是一个start函数比较重要
void Thread::start()
{
std::thread t(func_,threadId_);
t.detach();
}
这里我们设置了一个线程分离,因为如果不调用 t.detach()
,并且 std::thread
对象 t
在其生命周期结束时线程还在运行,就会有问题。std::thread
对象在销毁时,会检查它所关联的线程是否还在运行。如果线程还在运行且没有被分离(即没有调用 detach
),也没有调用 join
等待线程结束,那么程序会调用 std::terminate
来终止整个程序。
2.3 返回值的考虑
用户提交的任务千奇百怪,,所以我们需要用一个类型去把他们全部收起来,这里很容易想到继承,我们写一个父类Task,用户提交的任务继承父类,这样就很好解决了
但是这样还不行,因为有的用户需要返回值,那么我们就需要提供一个可以接收任意类型的类,模板?我一开始也想这个,这个显然不想,因为我们子类继承父类肯定要重写run函数,虚函数带模板肯定是不行的,因为模板不能正确实例化
这里给出代码
class Any
{
public:
Any() = default;
~Any() = default;
//这里用了unique_ptr所以要
Any(const Any&) = delete;
Any& operator=(const Any&) = delete;
Any(Any&&) = default;
Any& operator=(Any&&) = default;
template <class T>
Any(T data):base_(std::make_unique<Derive<T>>(data))
{}
private:
class Base
{
public:
virtual ~Base() = default;
private:
};
template <class T>
class Derive :public Base
{
public:
Derive(T data)
{
data_ = data;
}
T data_;
};
template<class T>
T cast_()
{
//从成员变量中提取data
Derive<T>* pd = dynamic_cast<Derive<T>*>(base_.get());
if (pd == nullptr)
{
throw"type is unmatch!";
}
return pd->data_;
}
private:
std::unique_ptr<Base> base_;
};
这里在Any类里面放了一个父类Base和子类Derive,这样就可以接收到各式各样的返回值了,同时Derive用模板,这样Derive就可以存储各种值了
模板函数 cast_
:用于将存储在 Any
对象中的数据转换回原始类型。使用 dynamic_cast
进行类型转换,如果转换失败(即 pd
为 nullptr
),则抛出一个异常,表示类型不匹配。
这里的cast_是为了获取返回值的类型,好返回给外界
Result类
这里为什么需要这个呢?
Any
类的局限性:Any
类的核心功能是存储任意类型的数据,它本身并不具备对任务执行状态进行管理的能力。当向线程池提交一个任务时,任务可能因为各种原因(如任务队列已满导致提交超时)无法成功提交,或者处于正在执行、执行完成等不同状态,Any
无法表示这些状态信息。Result
类的优势:Result
类通过构造函数中的isValid
参数,能够明确标识任务是否有效提交。在ThreadPool::submitTask
函数中,如果任务提交超时,会创建一个Result
对象并将isValid
设为false
。调用者可以通过检查这个标志来判断任务是否成功提交,从而采取相应的处理措施。
还有就是,当用户在获取返回值的时候,可能线程还没执行完毕或者在阻塞,那么我们就需要根据这个情况去判断
Result
类使用了信号量(sem_
)来实现线程同步。当调用 Result::get
方法时,如果任务还没有执行完,调用线程会被阻塞,直到任务执行完毕并设置了结果。这是通过信号量的 wait
操作实现的。而当任务执行完毕,调用 Result::setVal
方法设置结果时,会通过信号量的 post
操作释放资源,唤醒等待的线程,确保调用者能安全地获取任务结果。
这里的一些小细节就是Task类里面有一个Result指针,这样在任务执行完毕的时候,我们可以把返回值放到Result里面去,因为Result里面有一个Any类型,Task::run函数返回值也是any,这样我们就可以拿到返回值了
这里还有个重点,也算是我的心得
Task里面有个Result指针,这个指针的作用主要是用来给Result对象设置返回值的,因为我们submit函数里面有Result返回,在构造Result对象的时候,该对象里面有一个Task的指针指针,构造的时候就会构造Task,在Task的构造中会传入Result的指针,让他们两个建立联系
在 Result
类的构造函数中,会调用 task_->setResult(this)
方法,将当前 Result
对象的指针传递给 Task
对象,从而建立两者之间的关联
三 总结
整个项目总共二百多行,里面的细节值得细细品味,整个项目的精髓也在里面,特别是Result类的作用,它怎么和Task联系起来的,这点很重要