高阶开发基础——快速入门C++并发编程6——大作业:实现一个超级迷你的线程池
目录
实现一个无返回的线程池
完全代码实现
Reference
实现一个无返回的线程池
实现一个简单的线程池非常简单,我们首先聊一聊线程池的定义:
线程池(Thread Pool) 是一种并发编程的设计模式,用于管理和复用多个线程,以提高程序的性能和资源利用率。它的核心思想是预先创建一组线程,并将任务分配给这些线程执行,而不是为每个任务单独创建和销毁线程。线程池广泛应用于需要处理大量短期任务的场景,例如 Web 服务器、数据库连接池、任务调度系统等。换而言之,线程池说白了就是一种饿汉思维——直接预先提供若干的线程,由线程池内部控制调度,确保我们可以只关心任务的提交以及完成。
我们下面要做的是设计一个任务是不返回的线程池。所以,我们约束我们的函数是:
using supportive_task_type = std::function<void()>;
下一步,就是构造我们的线程池的线程。注意的是——线程和任务是解耦合的,意味着我们需要一个中间函数解耦合任务派发。笔者决定,将任务派发分到一个私有函数完成:
CCThreadPool(const int workers_num) {
for(int i = 0; i < workers_num; i++){
internal_threads.emplace_back(
[this](){
__scheduled();
}
);
}
}
上面这个代码很简单,就是将每一个线程都分配一个调度函数,这个调度函数来委派分发任务,办法说简单也很简单:
void __scheduled(){
while(1){
// sources protections
std::unique_lock<std::mutex> locker(internal_mutex);
// waiting for the access of the task resources
controlling_cv.wait(locker, [this]{
return thread_pool_status || !tasks_queue.empty();}
);
// quit if requried
if(thread_pool_status && tasks_queue.empty()){
return;
}
// 现在我们可以取到任务执行了
supportive_task_type task(std::move(tasks_queue.front()));
tasks_queue.pop();
locker.unlock();
task();
}
}
当析构的时候,我们也要通知所有线程的cv不要睡眠了,由于设置了thread_pool_status
是true,直接线程跳出来结束全文。
~CCThreadPool(){
thread_pool_status = true;
controlling_cv.notify_all();
for(auto& thread : internal_threads){
thread.join();
}
}
完全代码实现
#include <condition_variable>
#include <functional>
#include <mutex>
#include <print>
#include <queue>
#include <thread>
#include <utility>
#include <vector>
class CCThreadPool {
public:
CCThreadPool() = delete;
CCThreadPool(const CCThreadPool &) = delete;
CCThreadPool &operator=(CCThreadPool &) = delete;
CCThreadPool(const int workers_num) {
for(int i = 0; i < workers_num; i++){
internal_threads.emplace_back(
[this](){
__scheduled();
}
);
}
}
~CCThreadPool(){
thread_pool_status = true;
controlling_cv.notify_all();
for(auto& thread : internal_threads){
thread.join();
}
}
template<typename F, typename... Args>
void enTask(F&& f, Args&&... args){
supportive_task_type task(
std::bind(std::forward<F&&>(f), std::forward<Args&&>(args)...));
{
std::unique_lock<std::mutex> locker(internal_mutex);
tasks_queue.emplace(std::move(task));
}
controlling_cv.notify_one();
}
private:
void __scheduled(){
while(1){
std::unique_lock<std::mutex> locker(internal_mutex);
controlling_cv.wait(locker, [this]{
return thread_pool_status || !tasks_queue.empty();}
);
// quit
if(thread_pool_status && tasks_queue.empty()){
return;
}
supportive_task_type task(std::move(tasks_queue.front()));
tasks_queue.pop();
locker.unlock();
task();
}
}
using supportive_task_type = std::function<void()>;
std::vector<std::thread> internal_threads;
std::queue<supportive_task_type> tasks_queue;
std::mutex internal_mutex;
std::condition_variable controlling_cv;
bool thread_pool_status = false;
};
int main()
{
std::println("Task start");
CCThreadPool pool(4);
for (int i = 0; i < 8; ++i) {
pool.enTask([i] {
std::println("Task {} is started at thread with id {}", i, std::this_thread::get_id());
std::this_thread::sleep_for(std::chrono::seconds(1));
std::println("Task {} is done", i);
});
}
return 0;
}
Reference
8. C++11 跨平台线程池-See的编程日记 (seestudy.cn)