当前位置: 首页 > article >正文

【C++】线程池实现

目录

  • 一、线程池简介
    • 线程池的核心组件
    • 实现步骤
  • 二、C++11实现线程池
    • 源码
  • 三、线程池源码解析
    • 1. 成员变量
    • 2. 构造函数
      • 2.1 线程初始化
      • 2.2 工作线程逻辑
    • 3. 任务提交(enqueue方法)
      • 3.1 方法签名
      • 3.2 任务封装
      • 3.3 任务入队
    • 4. 析构函数
      • 4.1 停机控制
    • 5. 关键技术点解析
      • 5.1 完美转发实现
      • 5.2 异常传播机制
      • 5.3 内存管理模型
  • 四、 性能特征分析
  • 五、 扩展优化方向
  • 六、 典型问题排查指南
  • 七、 测试用例
    • 如果这篇文章对你有所帮助,渴望获得你的一个点赞!

一、线程池简介

线程池是一种并发编程技术,通过预先创建一组线程并复用它们来执行多个任务,避免了频繁创建和销毁线程的开销。它特别适合处理大量短生命周期任务的场景(如服务器请求、并行计算)。

线程池的核心组件

1. 任务队列(Task Queue)
存储待执行的任务(通常是函数对象或可调用对象)。

2. 工作线程(Worker Threads)
一组预先创建的线程,不断从队列中取出任务并执行。

3. 同步机制
互斥锁(Mutex):保护任务队列的线程安全访问。
条件变量(Condition Variable):通知线程任务到达或线程池终止。

实现步骤

1. 初始化线程池
创建固定数量的线程,每个线程循环等待任务。

2. 提交任务
将任务包装成函数对象,加入任务队列。

3. 任务执行
工作线程从队列中取出任务并执行。

4. 终止线程池
发送停止信号,等待所有线程完成当前任务后退出。

二、C++11实现线程池

源码

#include <vector>
#include <queue>
#include <future>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <stdexcept>

class ThreadPool 
{
public:
    //构造函数:根据输入的线程数(默认硬件并发数)创建工作线程。
    //每个工作线程执行一个循环,不断从任务队列中取出并执行任务。
    //explicit关键字防止隐式类型转换
    explicit ThreadPool(size_t threads = std::thread::hardware_concurrency())
        : stop(false) 
    {
        if (threads == 0) 
        {
            threads = 1;
        }

        for (size_t i = 0; i < threads; ++i) 
        {
            workers.emplace_back([this] 
            {
                for (;;) 
                {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        
                        //等待条件:线程通过条件变量等待任务到来或停止信号。(CPU使用率:休眠时接近0%,仅在任务到来时唤醒)
                        //lambda表达式作为谓词,当条件(停止信号为true 或 任务队列非空)为真时,才会解除阻塞。
                        this->condition.wait(lock, [this] {
                            return (this->stop || !this->tasks.empty());
                            });

                        /* 传统忙等待:while (!(stop || !tasks.empty())) {} // 空循环消耗CPU */

                        if (this->stop && this->tasks.empty())
                        {
                            //如果线程池需要终止且任务队列为空则直接return
                            return;
                        }

                        //任务提取:从队列中取出任务并执行,使用std::move避免拷贝开销。
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }
                    //执行任务
                    task();
                }
            });
        }
    }

    //任务提交(enqueue方法)
    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args)
        -> std::future<typename std::result_of<F(Args...)>::type> 
    {
        using return_type = typename std::result_of<F(Args...)>::type;

        //任务封装:使用std::packaged_task包装用户任务,支持异步返回结果。
        //智能指针管理:shared_ptr确保任务对象的生命周期延续至执行完毕。
        //完美转发:通过std::forward保持参数的左值/右值特性。
        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
            );

        std::future<return_type> res = task->get_future();
        {
            std::unique_lock<std::mutex> lock(queue_mutex);

            if (stop)
            {
                throw std::runtime_error("enqueue on stopped ThreadPool");
            }  

            tasks.emplace([task]() { (*task)(); });
            /* push传入的对象需要事先构造好,再复制过去插入容器中;
               而emplace则可以自己使用构造函数所需的参数构造出对象,并直接插入容器中。
               emplace相比于push省去了复制的步骤,则使用emplace会更加节省内存。*/
        }
        condition.notify_one();
        return res;
    }

    ~ThreadPool() 
    {
        //设置stop标志,唤醒所有线程,等待任务队列清空。
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            stop = true;
        }
        condition.notify_all();
        for (std::thread& worker : workers)
        {
            worker.join();
        }
    }

private:
    std::vector<std::thread> workers;        //存储工作线程对象
    std::queue<std::function<void()>> tasks; //任务队列,存储待执行的任务

    std::mutex queue_mutex;                  //保护任务队列的互斥锁
    std::condition_variable condition;       //线程间同步的条件变量
    bool stop;                               //线程池是否停止标志
};

三、线程池源码解析

1. 成员变量

std::vector<std::thread> workers;        // 工作线程容器
std::queue<std::function<void()>> tasks; // 任务队列
std::mutex queue_mutex;                  // 队列互斥锁
std::condition_variable condition;       // 条件变量
bool stop;                               // 停机标志

设计要点:

  • 采用生产者-消费者模式,任务队列作为共享资源

  • 组合使用mutex+condition_variable实现线程同步

  • vector存储线程对象便于统一管理生命周期


2. 构造函数

2.1 线程初始化

explicit ThreadPool(size_t threads = std::thread::hardware_concurrency())
    : stop(false)
{
    if (threads == 0) 
    {
        threads = 1;
    }
        
    for (size_t i = 0; i < threads; ++i) 
    {
        workers.emplace_back([this] { /* 工作线程逻辑 */ });
    }
}

设计要点:

  • explicit防止隐式类型转换(如ThreadPool pool = 4;

  • 默认使用硬件并发线程数(通过hardware_concurrency()

  • 最少创建1个线程避免空池

  • 使用emplace_back直接构造线程对象


2.2 工作线程逻辑

for (;;)
{
    std::function<void()> task;
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        condition.wait(lock, [this] {
            return stop || !tasks.empty();
        });

        if (stop && tasks.empty()) 
        {
           return; 
        }

        task = std::move(tasks.front());
        tasks.pop();
    }
    task();
}

核心机制:

  • unique_lock配合条件变量实现自动锁管理

  • 双重状态检查(停机标志+队列非空)

  • 任务提取使用移动语义避免拷贝

  • 任务执行在锁作用域外进行


3. 任务提交(enqueue方法)

3.1 方法签名

template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
    -> std::future<typename std::result_of<F(Args...)>::type>

类型推导:

  • 使用尾置返回类型声明
  • std::result_of推导可调用对象的返回类型
  • 完美转发参数(F&&+Args&&...

3.2 任务封装

auto task = std::make_shared<std::packaged_task<return_type()>>
    (std::bind(std::forward<F>(f), std::forward<Args>(args)...));

封装策略:

  • packaged_task包装任务用于异步获取结果
  • shared_ptr管理任务对象生命周期
  • std::bind绑定参数(注意C++11的参数转发限制)

3.3 任务入队

tasks.emplace([task]() { (*task)(); });

优化点:

  • 使用emplace直接构造队列元素
  • Lambda捕获shared_ptr保持任务有效性
  • 显式解引用执行packaged_task

4. 析构函数

4.1 停机控制

~ThreadPool() 
{
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        stop = true;
    }
    condition.notify_all();
    for (auto& worker : workers)
    {
        worker.join();
    }  
}

停机协议:

  1. 设置停机标志原子操作
  2. 广播唤醒所有等待线程
  3. 等待所有工作线程退出

5. 关键技术点解析

5.1 完美转发实现

std::bind(std::forward<F>(f), std::forward<Args>(args)...)
  • 保持参数的左右值特性
  • 支持移动语义参数的传递
  • C++11的限制:无法完美转发所有参数类型

5.2 异常传播机制

  • 任务异常通过future对象传播
  • packaged_task自动捕获异常
  • 用户通过future.get()获取异常

5.3 内存管理模型

         [任务提交者]
             |
             v
       [packaged_task] <---- shared_ptr ---- [任务队列]
             |
             v
         [future]
  • 三重生命周期保障:
    1. 提交者持有future
    2. 队列持有任务包装器
    3. 工作线程执行任务

四、 性能特征分析

1. 时间复杂度

操作时间复杂度
任务提交(enqueue)O(1)(加锁开销)
任务提取O(1)
线程唤醒取决于系统调度

2. 空间复杂度

组件空间占用
线程栈每线程MB级
任务队列与任务数成正比
同步原语固定大小

五、 扩展优化方向

1. 任务窃取(Work Stealing)

  • 实现多个任务队列
  • 空闲线程从其他队列窃取任务

2. 动态线程池

void adjust_workers(size_t new_size) 
{
    if (new_size > workers.size()) 
    {
     	// 扩容逻辑
    } 
    else 
    {
        // 缩容逻辑
    }
}

3. 优先级队列

using Task = std::pair<int, std::function<void()>>; // 优先级+任务
   std::priority_queue<Task> tasks;

4. 无锁队列

moodycamel::ConcurrentQueue<std::function<void()>> tasks;

六、 典型问题排查指南

现象可能原因解决方案
任务未执行线程池提前析构延长线程池生命周期
future.get()永久阻塞任务未提交/异常未处理检查任务提交路径
CPU利用率100%忙等待或锁竞争优化任务粒度/使用无锁结构
内存持续增长任务对象未正确释放检查智能指针使用

该实现完整展现了现代C++线程池的核心设计范式,开发者可根据具体需求在此基础进行功能扩展和性能优化。理解这个代码结构是掌握更高级并发模式的基础。

七、 测试用例

使用实例(C++11兼容):

#include <iostream>

int main() 
{
    ThreadPool pool(4);
    
    // 提交普通函数
    auto future1 = pool.enqueue([](int a, int b) {
        return a + b;
    }, 2, 3);
    
    // 提交成员函数
    struct Calculator 
    {
        int multiply(int a, int b) 
        { 
            return a * b; 
        }
    } calc;
    auto future2 = pool.enqueue(std::bind(&Calculator::multiply, &calc, 
                                        std::placeholders::_1, 
                                        std::placeholders::_2), 4, 5);
    
    // 异常处理示例
    auto future3 = pool.enqueue([]() -> int {
        throw std::runtime_error("example error");
        return 1;
    });
    
    std::cout << "2+3=" << future1.get() << std::endl;
    std::cout << "4*5=" << future2.get() << std::endl;
    
    try 
    {
        future3.get();
    } 
    catch(const std::exception& e)
    {
        std::cout << "Caught exception: " << e.what() << std::endl;
    }
    
    return 0;
}

如果这篇文章对你有所帮助,渴望获得你的一个点赞!

在这里插入图片描述


http://www.kler.cn/a/530773.html

相关文章:

  • C语言教学第四课:控制结构
  • 享元模式——C++实现
  • Hive之数据定义DDL
  • 年化18%-39.3%的策略集 | backtrader通过xtquant连接qmt实战
  • python算法和数据结构刷题[5]:动态规划
  • UE学习日志#19 C++笔记#5 基础复习5 引用1
  • fpga系列 HDL:XILINX Vivado 常见错误 “在线逻辑分析Debug时ALL_CLOCK没有选项”
  • Rust语言进阶之文件处理:BufReader用法实例(一百零三)
  • React常见状态管理工具详解
  • 【数据结构】(4) 线性表 List
  • 【数据结构-字典树】力扣211. 添加与搜索单词 - 数据结构设计
  • 利用腾讯云cloud studio云端免费部署deepseek-R1
  • 浅析JWT
  • MySQL高效指南:视图、事务、PyMySQL操作与查询优化全解析!
  • ieee模版如何修改参考文献的格式以及多作者省略等
  • 从1号点到n号点最多经过k条边的最短距离
  • Python教学:文档处理及箱线图等
  • 优化 PHP-FPM 参数配置:实现服务器性能提升
  • 手机上运行AI大模型(Deepseek等)
  • 第27节课:安全审计与防御—构建坚固的网络安全防线
  • 蓝桥杯刷题DAY3:Horner 法则 前缀和+差分数组 贪心
  • Spring Boot 2 快速教程:WebFlux 集成 Mongodb(三)
  • FPGA|IP核PLL调用测试:调用IP核
  • 关于贪心学习的文笔记录
  • DBASE DBF数据库文件解析
  • LLM - 基于LM Studio本地部署DeepSeek-R1的蒸馏量化模型