《线程池最终版:使用可变参模板和future优化重构设计》
之前设计的线程池,我们设计了很多复杂的东西,比如线程池里提供了一个抽象的Task基类,用户使用的时候,继承基类,再重写run方法,如果要传参数,可以通过派生类的构造函数传,又设计了Result类,Any类,Semaphore类等。
事实上,C++14、C++17提供了Any和Semaphore很多高级的用法,用起来非常简单。
之前设计的线程池提交任务要使用智能指针,定义派生类,还需要传参等等。如何让线程池提交任务更加方便?
int sum1(int a, int b)
{
return a + b;
}
int sum2(int a, int b, int c)
{
return a + b + c;
}
int main()
{
thread t1(sum1, 10, 20);
thread t2(sum2, 1, 2, 3);
t1.join();
t2.join();
return 0;
}
是不是直接可以将线程要执行的任务,传给线程呢?
/*
如何让线程池提交任务更方便
1.pool.submitTask(sum1,10,20);
pool.submitTask(sum2,1,2,3);
submitTask:可变参模板编程
*/
就不需要设计Mytask继承基类Task,重写Run方法,传参的时候,还需要Mytask定义成员变量,构造函数传参,不需要这么麻烦了。直接将用户要执行的方法,作为参数传递给线程,同时使用可变参模板传参数,不受参数的限制。
我们自己写Result及相关类型,有点复杂。
C++11,提供了线程库,thread,可以将任务作为参数传递给线程,但是没办法接收返回值。可以使用packaged_task(类似于function函数对象)来包装一下任务。
#include<iostream>
#include<functional>
#include<thread>
#include<future>
using namespace std;
/*
如何让线程池提交任务更方便
1.pool.submitTask(sum1,10,20);
pool.submitTask(sum2,1,2,3);
2.我们自己造了一个Result及相关类型,挺复杂
C++11 线程库 thread packaged_task(function函数对象)
使用future代替Result,节省线程池代码
*/
int sum1(int a, int b)
{
return a + b;
}
int sum2(int a, int b, int c)
{
return a + b + c;
}
int main()
{
packaged_task<int(int, int)> task(sum1); // 打包一个函数对象
// future 类似于我们自己实现的 Result
future<int> res = task.get_future(); // 返回一个future类型的对象
task(10, 20);
cout << res.get() << endl; // res.get() 类似于Result提供的 供用户调用接收返回值的get()方法
//thread t1(sum1, 10, 20);
//thread t2(sum2, 1, 2, 3);
//t1.join();
//t2.join();
return 0;
}
packgaed_task与function包装函数不同的是,packaged_task提供了get_future方法可以得到future类型的返回对象。
当任务比较耗时的时候,res.get()方法会阻塞。这也涉及到线程间通信的安全问题,我们自己设计的线程池又实现了Semaphore信号量的wait和post。
我们自己实现的Result类型接收任务对象。
我们自己实现的Result的setVal方法和get方法。
我们自己实现的用于线程间通信安全的Semaphore信号量的wait和post方法。
其实我们自己所实现的就是类似于future的机制。
看到future也有类似setVal和getVal的方法还有信号量的wait方法。既然C++11库中提供了future(异步编程机制),我们直接使用future来代替Result就可以了,节省线程池代码。
future机制
std::future提供了一种在异步操作完成后获取其结果的方式。它可以与多种异步执行机制配合使用,比std::async、std::thread、std::packaged_task等。通过std::future,主线程可以暂停执行,等待异步任务完成并获取其返回值,从而实现异步操作与主线程之间的同步。
#include<iostream>
#include<functional>
#include<thread>
#include<future>
using namespace std;
/*
如何让线程池提交任务更方便
1.pool.submitTask(sum1,10,20);
pool.submitTask(sum2,1,2,3);
2.我们自己造了一个Result及相关类型,挺复杂
C++11 线程库 thread packaged_task(function函数对象)
*/
int sum1(int a, int b)
{
return a + b;
}
int sum2(int a, int b, int c)
{
return a + b + c;
}
int main()
{
packaged_task<int(int, int)> task(sum1); // 打包一个函数对象
// future 类似于我们自己实现的 Result
future<int> res = task.get_future(); // 返回一个future类型的对象
// task(10, 20);
thread t1(std::move(task), 10, 20);
t1.detach();
cout << res.get() << endl; // res.get() 类似于Result提供的 供用户调用接收返回值的get()方法
//thread t1(sum1, 10, 20);
//thread t2(sum2, 1, 2, 3);
//t1.join();
//t2.join();
return 0;
}
基于可变参数模板以及future,下面我们来优化重构线程池,这次线程池设计成开源的。
线程池项目最终版
threadpool.hpp:
#pragma once
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include<iostream>
#include<vector>
#include<queue>
#include<memory>
#include<atomic>
#include<mutex>
#include<condition_variable>
#include<functional>
#include<unordered_map>
#include<thread>
#include<future>
using namespace std;
// 任务队列数量的上限阈值
const int TASK_MAX_THRESHHOLD = INT32_MAX;
// 线程数量的上限阈值
const int THREAD_MAX_THRESHHOLD = 1024;
// 线程最大空闲时间
const int THREAD_MAX_IDLE_TIME = 10; // 单位:秒
// 线程池支持的模式
enum PoolMode
{
MODE_FIXED, // 1.fixed模式 线程数量固定
MODE_CACHED,// 2.cached模式 线程数量可动态增长
};
// 线程类型
class Thread
{
public:
// 线程函数对象类型
using ThreadFunc = std::function<void(int)>;
// 线程构造
Thread(ThreadFunc func)
:func_(func)
, threadId_(generateId_++)
{}
// 线程析构
~Thread() = default;
// 线程启动
void start()
{
// 创建一个线程来执行线程函数
std::thread t(func_, threadId_);
t.detach();// 设置线程分离
}
// 获取线程id
int getId() const
{
return threadId_;
}
private:
static int generateId_;
ThreadFunc func_;
int threadId_; // 保存线程id
};
// 线程池类型
class ThreadPool
{
public:
// 线程池构造
ThreadPool()
: initThreadSize_(0)
, taskSize_(0)
, idleThreadSize_(0)
, curThreadSize_(0)
, taskQueMaxThreshHold_(TASK_MAX_THRESHHOLD)
, threadSizeHold_(THREAD_MAX_THRESHHOLD)
, poolMode_(PoolMode::MODE_FIXED)
, isPoolRunning_(false)
{}
// 线程池析构
~ThreadPool()
{
isPoolRunning_ = false;
// 等待线程池里面所有的线程返回
std::unique_lock<std::mutex> lock(taskQueMtx_);
notEmpty_.notify_all();
// 唤醒所有阻塞等待的线程
exitCond_.wait(lock, [&]()->bool {return threads_.size() == 0; });
}
// 设置线程池的工作模式
void setMode(PoolMode mode)
{
if (checkRunningState())
return;
poolMode_ = mode;
}
// 设置task任务队列的上线阈值
void setTaskQueMaxThreshHold(int threshhold)
{
if (checkRunningState())
return;
taskQueMaxThreshHold_ = threshhold;
}
// 设置线程池cached模式下线程阈值
void setThreadSizeThreshHold(int threshhold)
{
if (checkRunningState())
return;
if (poolMode_ == PoolMode::MODE_CACHED)
{
threadSizeHold_ = threshhold;
}
}
// 给线程池提交任务
// 使用可变参模板编程,让submitTask可以接收任意任务函数和任意数量的参数
template<typename Func,typename... Args>
auto submitTask(Func&& func, Args&&... args) -> std::future<decltype(func(args...))> // 推导submitTask的返回值
{
// 打包任务,放入任务队列里面
using RType = decltype(func(args...));
auto task = std::make_shared<std::packaged_task<RType()>> (std::bind(std::forward<Func>(func), std::forward<Args>(args)...));
std::future<RType> result = task->get_future();
// 获取锁
std::unique_lock<std::mutex> lock(taskQueMtx_);
// 用户提交任务,最长不能阻塞超过1s,否则判断提交任务失败,返回
if (!notFull_.wait_for(lock, std::chrono::seconds(1), [&]()->bool { return taskQue_.size() < (size_t)taskQueMaxThreshHold_; }))
{
// 表示notFull_等待1s钟,条件仍然不满足
std::cerr << "task queue is full, submit task faile." << std::endl;
auto task = std::make_shared<std::packaged_task<RType()>>([]()->RType{return RType(); });
return task->get_future();
}
// 如果有空余,把任务放进任务队列
// taskQue_.emplace(sp);
// using Task = std::function<void()>
// 不能直接将task放进任务队列,因为task是有返回值的,返回值类型不同。
// 我们任务队列设计的时候,接收的任务对象就是返回值为void的任务。
// 但是我们怎么把带返回值的任务传进去呢?
// 增加一个中间层,返回值是void不带参数的Lambda表达式函数对象,将实际要执行的任务封装起来即可。
taskQue_.emplace([task]() {(*task)(); });
taskSize_++;
// 因为新放了任务,任务队列肯定不为空了,在notEmpty_上进行通知,赶快分配线程执行任务吧
notEmpty_.notify_all();
// cached模式 任务处理比较紧急 场景:小而快的任务 需要根据任务数量和空闲线程数量,判断是否需要创建新的线程出来
if (poolMode_ == PoolMode::MODE_CACHED
&& taskSize_ > idleThreadSize_
&& curThreadSize_ < threadSizeHold_)
{
// 这行打印用于测试
std::cout << ">>> create new thread..." << std::endl;
// 创建新线程
auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this, std::placeholders::_1));
// threads_.emplace_back(std::move(ptr));
// threads_[curThreadSize_]->start();
int threadId = ptr->getId();
threads_.emplace(threadId, std::move(ptr));
// 启动线程
threads_[threadId]->start();
// 修改线程数量相关变量
curThreadSize_++;
idleThreadSize_++;
}
// 返回任务的Result对象
// return task->getResult();
return result;
}
// 开启线程池
void start(int initThreadSize = std::thread::hardware_concurrency())
{
// 设置线程池的运行状态
isPoolRunning_ = true;
// 记录初始线程个数
initThreadSize_ = initThreadSize;
// 记录线程池里面线程的总数量
curThreadSize_ = initThreadSize;
// 创建线程对象 std::vector<Thread*> threads_;
for (int i = 0; i < initThreadSize_; i++)
{
// 创建thread线程对象的时候,把线程函数给到thread线程对象
auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this, std::placeholders::_1));
int threadId = ptr->getId();
threads_.emplace(threadId, std::move(ptr));
// threads_.emplace_back(std::move(ptr));
}
// 启动所有线程 std::vector<Thread*> threads_;
for (int i = 0; i < initThreadSize_; i++)
{
threads_[i]->start();// 需要去执行一个线程函数
idleThreadSize_++;
}
}
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
private:
// 线程函数
void threadFunc(int threadid)
{
auto lastTime = std::chrono::high_resolution_clock().now();
// while (isPoolRunning_)
for (;;)
{
Task task;
{
// 先获取锁
std::unique_lock<std::mutex> lock(taskQueMtx_);
// 下面测试用的
std::cout << "tid: " << std::this_thread::get_id() << "尝试获取任务..." << std::endl;
// cached模式下,有可能已经创建了很多线程,但是空闲线程如果超过了60s,就应该把多余的线 程
// 结束回收掉(超过initThreadSize_的线程要进行回收)
// 当前时间 - 上一次线程执行完的时间 > 60s
// 每秒钟返回一次 怎么区分超时返回,还是有任务待执行返回?
// 锁+双重判断
while (taskQue_.size() == 0)
{
if (!isPoolRunning_)
{
// 2.线程正在执行任务 线程池要结束 回收线程资源
threads_.erase(threadid);
std::cout << "threadid:" << std::this_thread::get_id() << " exit" << std::endl;
// 唤醒线程池
exitCond_.notify_all();
return;
}
if (poolMode_ == PoolMode::MODE_CACHED)
{
// 条件变量 超时返回了
if (std::cv_status::timeout == notEmpty_.wait_for(lock, std::chrono::seconds(1)))
{
// 当前时间
auto now = std::chrono::high_resolution_clock().now();
// 空闲时间
auto dur = std::chrono::duration_cast<std::chrono::seconds>(now - lastTime);
if (dur.count() >= THREAD_MAX_IDLE_TIME && curThreadSize_ > initThreadSize_)
{
// 开始回收当前线程
// 记录线程数量的相关变量的值修改
// 把线程对象从线程列表删除
threads_.erase(threadid);
curThreadSize_--;
idleThreadSize_--;
std::cout << "threadid:" << std::this_thread::get_id() << " exit" << std::endl;
return;
}
}
}
else
{
// 等待任务队列不为空 notEmpty条件
notEmpty_.wait(lock);
}
}
idleThreadSize_--;
// 下面测试用的
std::cout << "tid: " << std::this_thread::get_id() << "获取任务成功..." << std::endl;
// 从任务队列获取任务
task = taskQue_.front();
taskQue_.pop();
taskSize_--;
// 如果依然有剩余任务,继续通知其他线程来获取执行任务,提高了多线程同时并发获取任务、处理任务的能力
if (taskQue_.size() > 0)
{
notEmpty_.notify_all();
}
// 取出一个任务之后,进行通知,通知其他用户可以继续提交任务
notFull_.notify_all();
}// 一个线程取完任务之后,此刻就应该把锁释放掉,让多线程可以同时并发获取任务、处理任务
// 当前线程负责执行该任务
if (task != nullptr)
{
// task->run(); // 执行任务,把任务的返回值通过setVal方法给Result
// task->exec();
task(); // 执行function<void()>
}
idleThreadSize_++;
lastTime = std::chrono::high_resolution_clock().now(); // 更新线程执行完任务的时间
}
}
// 检查pool的运行状态
bool checkRunningState() const
{
return isPoolRunning_;
}
private:
// std::vector<std::unique_ptr<Thread>> threads_; // 线程列表
std::unordered_map<int, std::unique_ptr<Thread>> threads_; // 线程列表
int initThreadSize_; // 初始的线程数量
int threadSizeHold_; // 线程数量上限阈值
std::atomic_int curThreadSize_; // 记录当前线程池里面的线程总数量
std::atomic_int idleThreadSize_; // 记录空闲线程的数量
// Task任务 =》函数对象
// 放进任务队列里的任务返回值设计成void,因为不确定任务的返回值。参数可以不传,可以直接绑定给要执行的函数。
using Task = std::function<void()>;
std::queue<Task> taskQue_; // 不需要使用智能指针了,因为之前是用户传入的任务,生命周期不确定。现在这个任务是我们自己线程池封装维护的。
std::atomic_int taskSize_; // 任务的数量
int taskQueMaxThreshHold_; // 任务队列数量的上线阈值
std::mutex taskQueMtx_; // 保证任务队列的线程安全
std::condition_variable notFull_; // 表示任务队列不满
std::condition_variable notEmpty_; // 表示任务队列不空
std::condition_variable exitCond_; // 等待线程资源全部回收
PoolMode poolMode_; // 当前线程池的工作模式
std::atomic_bool isPoolRunning_; // 表示线程池的启动状态
};
#endif
线程池项目-最终版.cpp:
#include<iostream>
#include<functional>
#include<thread>
#include<future>
using namespace std;
#include"threadpool.hpp"
int Thread::generateId_ = 0;
/*
如何让线程池提交任务更方便
1.pool.submitTask(sum1,10,20);
pool.submitTask(sum2,1,2,3);
2.我们自己造了一个Result及相关类型,挺复杂
C++11 线程库 thread packaged_task(function函数对象)
*/
int sum1(int a, int b)
{
return a + b;
}
int sum2(int a, int b, int c)
{
return a + b + c;
}
int main()
{
ThreadPool pool;
pool.start(4);
future<int> res1 = pool.submitTask(sum1, 1, 2);
future<int> res2 = pool.submitTask(sum2, 1, 2, 3);
future<int> res3 = pool.submitTask([](int begin, int end)->int {
int sum = 0;
for (int i = begin; i <= end; i++)
sum += i;
return sum;
}, 1, 100);
cout << res1.get() << endl;
cout << res2.get() << endl;
cout << res3.get() << endl;
//packaged_task<int(int, int)> task(sum1); // 打包一个函数对象
future 类似于我们自己实现的 Result
//future<int> res = task.get_future(); // 返回一个future类型的对象
task(10, 20);
//thread t1(std::move(task), 10, 20);
//t1.detach();
//cout << res.get() << endl; // res.get() 类似于Result提供的 供用户调用接收返回值的get()方法
//thread t1(sum1, 10, 20);
//thread t2(sum2, 1, 2, 3);
//t1.join();
//t2.join();
return 0;
}
再来测试一下cached模式。
#include<iostream>
#include<functional>
#include<thread>
#include<future>
#include<chrono>
using namespace std;
#include"threadpool.hpp"
int Thread::generateId_ = 0;
/*
如何让线程池提交任务更方便
1.pool.submitTask(sum1,10,20);
pool.submitTask(sum2,1,2,3);
2.我们自己造了一个Result及相关类型,挺复杂
C++11 线程库 thread packaged_task(function函数对象)
*/
int sum1(int a, int b)
{
this_thread::sleep_for(std::chrono::seconds(2));
return a + b;
}
int sum2(int a, int b, int c)
{
this_thread::sleep_for(std::chrono::seconds(2));
return a + b + c;
}
int main()
{
ThreadPool pool;
pool.setMode(PoolMode::MODE_CACHED);
pool.start(2);
future<int> res1 = pool.submitTask(sum1, 1, 2);
future<int> res2 = pool.submitTask(sum2, 1, 2, 3);
future<int> res3 = pool.submitTask([](int begin, int end)->int {
int sum = 0;
for (int i = begin; i <= end; i++)
sum += i;
return sum;
}, 1, 100);
cout << res1.get() << endl;
cout << res2.get() << endl;
cout << res3.get() << endl;
//packaged_task<int(int, int)> task(sum1); // 打包一个函数对象
future 类似于我们自己实现的 Result
//future<int> res = task.get_future(); // 返回一个future类型的对象
task(10, 20);
//thread t1(std::move(task), 10, 20);
//t1.detach();
//cout << res.get() << endl; // res.get() 类似于Result提供的 供用户调用接收返回值的get()方法
//thread t1(sum1, 10, 20);
//thread t2(sum2, 1, 2, 3);
//t1.join();
//t2.join();
return 0;
}
再来测试一次任务提交失败情况。
#include<iostream>
#include<functional>
#include<thread>
#include<future>
#include<chrono>
using namespace std;
#include"threadpool.hpp"
int Thread::generateId_ = 0;
/*
如何让线程池提交任务更方便
1.pool.submitTask(sum1,10,20);
pool.submitTask(sum2,1,2,3);
2.我们自己造了一个Result及相关类型,挺复杂
C++11 线程库 thread packaged_task(function函数对象)
*/
int sum1(int a, int b)
{
this_thread::sleep_for(std::chrono::seconds(2));
return a + b;
}
int sum2(int a, int b, int c)
{
this_thread::sleep_for(std::chrono::seconds(2));
return a + b + c;
}
int main()
{
ThreadPool pool;
pool.start(2);
future<int> res1 = pool.submitTask(sum1, 1, 2);
future<int> res2 = pool.submitTask(sum2, 1, 2, 3);
future<int> res3 = pool.submitTask([](int begin, int end)->int {
int sum = 0;
for (int i = begin; i <= end; i++)
sum += i;
return sum;
}, 1, 100);
future<int> res4 = pool.submitTask([](int begin, int end)->int {
int sum = 0;
for (int i = begin; i <= end; i++)
sum += i;
return sum;
}, 1, 100);
future<int> res5 = pool.submitTask([](int begin, int end)->int {
int sum = 0;
for (int i = begin; i <= end; i++)
sum += i;
return sum;
}, 1, 100);
cout << res1.get() << endl;
cout << res2.get() << endl;
cout << res3.get() << endl;
cout << res4.get() << endl;
cout << res5.get() << endl;
//packaged_task<int(int, int)> task(sum1); // 打包一个函数对象
future 类似于我们自己实现的 Result
//future<int> res = task.get_future(); // 返回一个future类型的对象
task(10, 20);
//thread t1(std::move(task), 10, 20);
//t1.detach();
//cout << res.get() << endl; // res.get() 类似于Result提供的 供用户调用接收返回值的get()方法
//thread t1(sum1, 10, 20);
//thread t2(sum2, 1, 2, 3);
//t1.join();
//t2.join();
return 0;
}
自此,我们将线程池改造成了基于可变参模板和future机制的线程池,使线程池变得更简洁,完成了最终版本的线程池。