当前位置: 首页 > article >正文

高阶开发基础——快速入门C++并发编程6——大作业:实现一个超级迷你的线程池

目录

实现一个无返回的线程池

完全代码实现

Reference


实现一个无返回的线程池

实现一个简单的线程池非常简单,我们首先聊一聊线程池的定义:

线程池(Thread Pool) 是一种并发编程的设计模式,用于管理和复用多个线程,以提高程序的性能和资源利用率。它的核心思想是预先创建一组线程,并将任务分配给这些线程执行,而不是为每个任务单独创建和销毁线程。线程池广泛应用于需要处理大量短期任务的场景,例如 Web 服务器、数据库连接池、任务调度系统等。换而言之,线程池说白了就是一种饿汉思维——直接预先提供若干的线程,由线程池内部控制调度,确保我们可以只关心任务的提交以及完成。

我们下面要做的是设计一个任务是不返回的线程池。所以,我们约束我们的函数是:

using supportive_task_type = std::function<void()>;

下一步,就是构造我们的线程池的线程。注意的是——线程和任务是解耦合的,意味着我们需要一个中间函数解耦合任务派发。笔者决定,将任务派发分到一个私有函数完成:

    CCThreadPool(const int workers_num) {
        for(int i = 0; i < workers_num; i++){
            internal_threads.emplace_back(
            [this](){
                __scheduled();
            }
        );
        }
    }

上面这个代码很简单,就是将每一个线程都分配一个调度函数,这个调度函数来委派分发任务,办法说简单也很简单:

void __scheduled(){
        while(1){
            // sources protections
            std::unique_lock<std::mutex> locker(internal_mutex);
            // waiting for the access of the task resources
            controlling_cv.wait(locker, [this]{
                return thread_pool_status || !tasks_queue.empty();}
            );
            // quit if requried
            if(thread_pool_status && tasks_queue.empty()){
                return;
            }
            
            // 现在我们可以取到任务执行了
            supportive_task_type task(std::move(tasks_queue.front()));
            tasks_queue.pop();
            locker.unlock();
            task();
        }
    }

当析构的时候,我们也要通知所有线程的cv不要睡眠了,由于设置了thread_pool_status是true,直接线程跳出来结束全文。

    ~CCThreadPool(){
        thread_pool_status = true;
        controlling_cv.notify_all();
        for(auto& thread : internal_threads){
            thread.join();
        }
    }

完全代码实现

#include <condition_variable>
#include <functional>
#include <mutex>
#include <print>
#include <queue>
#include <thread>
#include <utility>
#include <vector>
​
class CCThreadPool {
  public:
    CCThreadPool()                          = delete;
    CCThreadPool(const CCThreadPool &)      = delete;
    CCThreadPool &operator=(CCThreadPool &) = delete;
​
    CCThreadPool(const int workers_num) {
        for(int i = 0; i < workers_num; i++){
            internal_threads.emplace_back(
            [this](){
                __scheduled();
            }
        );
        }
    }
​
    ~CCThreadPool(){
        thread_pool_status = true;
        controlling_cv.notify_all();
        for(auto& thread : internal_threads){
            thread.join();
        }
    }
​
    template<typename F, typename... Args>
    void enTask(F&& f, Args&&... args){
        supportive_task_type task(
            std::bind(std::forward<F&&>(f), std::forward<Args&&>(args)...));
        {
            std::unique_lock<std::mutex> locker(internal_mutex);
            tasks_queue.emplace(std::move(task));
        }
        controlling_cv.notify_one();
    }
​
  private:
    void __scheduled(){
        while(1){
            std::unique_lock<std::mutex> locker(internal_mutex);
            controlling_cv.wait(locker, [this]{
                return thread_pool_status || !tasks_queue.empty();}
            );
            // quit
            if(thread_pool_status && tasks_queue.empty()){
                return;
            }
            supportive_task_type task(std::move(tasks_queue.front()));
            tasks_queue.pop();
            locker.unlock();
            task();
        }
    }
​
    using supportive_task_type = std::function<void()>;
    std::vector<std::thread> internal_threads;
    std::queue<supportive_task_type> tasks_queue;
    std::mutex internal_mutex;
    std::condition_variable controlling_cv;
    bool thread_pool_status = false;
};
​
​
int main()
{
    std::println("Task start");
    CCThreadPool pool(4);
    
    for (int i = 0; i < 8; ++i) {
        pool.enTask([i] {
            std::println("Task {} is started at thread with id {}", i, std::this_thread::get_id());
            std::this_thread::sleep_for(std::chrono::seconds(1));
            std::println("Task {} is done", i);
        });
    }
    return 0;
}

Reference

8. C++11 跨平台线程池-See的编程日记 (seestudy.cn)


http://www.kler.cn/a/531278.html

相关文章:

  • 探秘Linux IO虚拟化:virtio的奇幻之旅
  • 传输层协议 UDP 与 TCP
  • Java集合+并发(部分)
  • VMware安装win10记录
  • 在CentOS服务器上部署DeepSeek R1
  • 深入解析 Linux 内核内存管理核心:mm/memory.c
  • Java:日期时间范围的处理
  • leetcode15-三数之和
  • 【AudioClassificationModelZoo-Pytorch】基于Pytorch的声音事件检测分类系统
  • Rust中的切片类型:灵活的数据视图
  • LeetCode 0680.验证回文串 II:两侧向中间,不同就试删
  • 订单状态监控实战:基于 SQL 的状态机分析与异常检测
  • 树莓派pico入坑笔记,睡眠
  • go-zero学习笔记(三)
  • 院校联合以项目驱动联合培养医工计算机AI人才路径探析
  • 【Linux网络编程】:守护进程,前台进程,后台进程
  • C++哈希表深度解析:从原理到实现,全面掌握高效键值对存储
  • Mac M1 Comfyui 使用MMAudio遇到的问题解决?
  • 【C++】B2122 单词翻转
  • 【C++篇】位图与布隆过滤器
  • 毫秒级响应的VoIP中的系统组合推荐
  • 【DeepSeek背后的技术】系列一:混合专家模型(MoE)
  • 从零开始实现一个双向循环链表:C语言实战
  • Java多线程——对象的组合
  • FPGA|例化生成的PLL功能IP核
  • 为什么在Rust中要用Struct和Enum组织数据?