【C++】线程池实现
目录
- 一、线程池简介
- 线程池的核心组件
- 实现步骤
- 二、C++11实现线程池
- 源码
- 三、线程池源码解析
- 1. 成员变量
- 2. 构造函数
- 2.1 线程初始化
- 2.2 工作线程逻辑
- 3. 任务提交(enqueue方法)
- 3.1 方法签名
- 3.2 任务封装
- 3.3 任务入队
- 4. 析构函数
- 4.1 停机控制
- 5. 关键技术点解析
- 5.1 完美转发实现
- 5.2 异常传播机制
- 5.3 内存管理模型
- 四、 性能特征分析
- 五、 扩展优化方向
- 六、 典型问题排查指南
- 七、 测试用例
- 如果这篇文章对你有所帮助,渴望获得你的一个点赞!
一、线程池简介
线程池是一种并发编程技术,通过预先创建一组线程并复用它们来执行多个任务,避免了频繁创建和销毁线程的开销。它特别适合处理大量短生命周期任务的场景(如服务器请求、并行计算)。
线程池的核心组件
1. 任务队列(Task Queue)
存储待执行的任务(通常是函数对象或可调用对象)。
2. 工作线程(Worker Threads)
一组预先创建的线程,不断从队列中取出任务并执行。
3. 同步机制
互斥锁(Mutex):保护任务队列的线程安全访问。
条件变量(Condition Variable):通知线程任务到达或线程池终止。
实现步骤
1. 初始化线程池
创建固定数量的线程,每个线程循环等待任务。
2. 提交任务
将任务包装成函数对象,加入任务队列。
3. 任务执行
工作线程从队列中取出任务并执行。
4. 终止线程池
发送停止信号,等待所有线程完成当前任务后退出。
二、C++11实现线程池
源码
#include <vector>
#include <queue>
#include <future>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <stdexcept>
class ThreadPool
{
public:
//构造函数:根据输入的线程数(默认硬件并发数)创建工作线程。
//每个工作线程执行一个循环,不断从任务队列中取出并执行任务。
//explicit关键字防止隐式类型转换
explicit ThreadPool(size_t threads = std::thread::hardware_concurrency())
: stop(false)
{
if (threads == 0)
{
threads = 1;
}
for (size_t i = 0; i < threads; ++i)
{
workers.emplace_back([this]
{
for (;;)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
//等待条件:线程通过条件变量等待任务到来或停止信号。(CPU使用率:休眠时接近0%,仅在任务到来时唤醒)
//lambda表达式作为谓词,当条件(停止信号为true 或 任务队列非空)为真时,才会解除阻塞。
this->condition.wait(lock, [this] {
return (this->stop || !this->tasks.empty());
});
/* 传统忙等待:while (!(stop || !tasks.empty())) {} // 空循环消耗CPU */
if (this->stop && this->tasks.empty())
{
//如果线程池需要终止且任务队列为空则直接return
return;
}
//任务提取:从队列中取出任务并执行,使用std::move避免拷贝开销。
task = std::move(this->tasks.front());
this->tasks.pop();
}
//执行任务
task();
}
});
}
}
//任务提交(enqueue方法)
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;
//任务封装:使用std::packaged_task包装用户任务,支持异步返回结果。
//智能指针管理:shared_ptr确保任务对象的生命周期延续至执行完毕。
//完美转发:通过std::forward保持参数的左值/右值特性。
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
if (stop)
{
throw std::runtime_error("enqueue on stopped ThreadPool");
}
tasks.emplace([task]() { (*task)(); });
/* push传入的对象需要事先构造好,再复制过去插入容器中;
而emplace则可以自己使用构造函数所需的参数构造出对象,并直接插入容器中。
emplace相比于push省去了复制的步骤,则使用emplace会更加节省内存。*/
}
condition.notify_one();
return res;
}
~ThreadPool()
{
//设置stop标志,唤醒所有线程,等待任务队列清空。
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for (std::thread& worker : workers)
{
worker.join();
}
}
private:
std::vector<std::thread> workers; //存储工作线程对象
std::queue<std::function<void()>> tasks; //任务队列,存储待执行的任务
std::mutex queue_mutex; //保护任务队列的互斥锁
std::condition_variable condition; //线程间同步的条件变量
bool stop; //线程池是否停止标志
};
三、线程池源码解析
1. 成员变量
std::vector<std::thread> workers; // 工作线程容器
std::queue<std::function<void()>> tasks; // 任务队列
std::mutex queue_mutex; // 队列互斥锁
std::condition_variable condition; // 条件变量
bool stop; // 停机标志
设计要点:
-
采用生产者-消费者模式,任务队列作为共享资源
-
组合使用
mutex
+condition_variable
实现线程同步 -
vector
存储线程对象便于统一管理生命周期
2. 构造函数
2.1 线程初始化
explicit ThreadPool(size_t threads = std::thread::hardware_concurrency())
: stop(false)
{
if (threads == 0)
{
threads = 1;
}
for (size_t i = 0; i < threads; ++i)
{
workers.emplace_back([this] { /* 工作线程逻辑 */ });
}
}
设计要点:
-
explicit
防止隐式类型转换(如ThreadPool pool = 4;
) -
默认使用硬件并发线程数(通过
hardware_concurrency()
) -
最少创建1个线程避免空池
-
使用
emplace_back
直接构造线程对象
2.2 工作线程逻辑
for (;;)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queue_mutex);
condition.wait(lock, [this] {
return stop || !tasks.empty();
});
if (stop && tasks.empty())
{
return;
}
task = std::move(tasks.front());
tasks.pop();
}
task();
}
核心机制:
-
unique_lock
配合条件变量实现自动锁管理 -
双重状态检查(停机标志+队列非空)
-
任务提取使用移动语义避免拷贝
-
任务执行在锁作用域外进行
3. 任务提交(enqueue方法)
3.1 方法签名
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
类型推导:
- 使用尾置返回类型声明
std::result_of
推导可调用对象的返回类型- 完美转发参数(
F&&
+Args&&...
)
3.2 任务封装
auto task = std::make_shared<std::packaged_task<return_type()>>
(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
封装策略:
packaged_task
包装任务用于异步获取结果shared_ptr
管理任务对象生命周期std::bind
绑定参数(注意C++11的参数转发限制)
3.3 任务入队
tasks.emplace([task]() { (*task)(); });
优化点:
- 使用
emplace
直接构造队列元素 Lambda
捕获shared_ptr
保持任务有效性- 显式解引用执行
packaged_task
4. 析构函数
4.1 停机控制
~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for (auto& worker : workers)
{
worker.join();
}
}
停机协议:
- 设置停机标志原子操作
- 广播唤醒所有等待线程
- 等待所有工作线程退出
5. 关键技术点解析
5.1 完美转发实现
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
- 保持参数的左右值特性
- 支持移动语义参数的传递
- C++11的限制:无法完美转发所有参数类型
5.2 异常传播机制
- 任务异常通过
future
对象传播 packaged_task
自动捕获异常- 用户通过
future.get()
获取异常
5.3 内存管理模型
[任务提交者]
|
v
[packaged_task] <---- shared_ptr ---- [任务队列]
|
v
[future]
- 三重生命周期保障:
- 提交者持有
future
- 队列持有任务包装器
- 工作线程执行任务
- 提交者持有
四、 性能特征分析
1. 时间复杂度
操作 | 时间复杂度 |
---|---|
任务提交(enqueue) | O(1)(加锁开销) |
任务提取 | O(1) |
线程唤醒 | 取决于系统调度 |
2. 空间复杂度
组件 | 空间占用 |
---|---|
线程栈 | 每线程MB级 |
任务队列 | 与任务数成正比 |
同步原语 | 固定大小 |
五、 扩展优化方向
1. 任务窃取(Work Stealing)
- 实现多个任务队列
- 空闲线程从其他队列窃取任务
2. 动态线程池
void adjust_workers(size_t new_size)
{
if (new_size > workers.size())
{
// 扩容逻辑
}
else
{
// 缩容逻辑
}
}
3. 优先级队列
using Task = std::pair<int, std::function<void()>>; // 优先级+任务
std::priority_queue<Task> tasks;
4. 无锁队列
moodycamel::ConcurrentQueue<std::function<void()>> tasks;
六、 典型问题排查指南
现象 | 可能原因 | 解决方案 |
---|---|---|
任务未执行 | 线程池提前析构 | 延长线程池生命周期 |
future.get() 永久阻塞 | 任务未提交/异常未处理 | 检查任务提交路径 |
CPU利用率100% | 忙等待或锁竞争 | 优化任务粒度/使用无锁结构 |
内存持续增长 | 任务对象未正确释放 | 检查智能指针使用 |
该实现完整展现了现代C++线程池的核心设计范式,开发者可根据具体需求在此基础进行功能扩展和性能优化。理解这个代码结构是掌握更高级并发模式的基础。
七、 测试用例
使用实例(C++11兼容):
#include <iostream>
int main()
{
ThreadPool pool(4);
// 提交普通函数
auto future1 = pool.enqueue([](int a, int b) {
return a + b;
}, 2, 3);
// 提交成员函数
struct Calculator
{
int multiply(int a, int b)
{
return a * b;
}
} calc;
auto future2 = pool.enqueue(std::bind(&Calculator::multiply, &calc,
std::placeholders::_1,
std::placeholders::_2), 4, 5);
// 异常处理示例
auto future3 = pool.enqueue([]() -> int {
throw std::runtime_error("example error");
return 1;
});
std::cout << "2+3=" << future1.get() << std::endl;
std::cout << "4*5=" << future2.get() << std::endl;
try
{
future3.get();
}
catch(const std::exception& e)
{
std::cout << "Caught exception: " << e.what() << std::endl;
}
return 0;
}