C++:Github开源7.8Kstar的线程池介绍
目录
源码展示
1. 头文件和依赖
1.
2.
3.
4.
5. 和
6.
7.
8.
2. ThreadPool 类的定义
1. std::vector workers;
2. std::queue> tasks;
3. std::mutex queue_mutex;
4. std::condition_variable condition;
5. bool stop;
3. 构造函数
1. 初始化 stop 变量
2. 创建 threads 个线程
3. 等待任务
4. 取出并执行任务
4. enqueue方法
1. 模板参数和返回类型推导
2. 创建任务
3. 任务入队
4. 返回 std::future
总结
5. 析构函数
1. 设置停止标志
2. 通知所有线程
3. 等待所有线程结束
4. 代码总结
5. 总结
总结
多线程管理的有效实践
任务调度与线程同步
资源管理与安全退出
实际应用中的启示
使用实例
总结
理解这个线程池的实现,有助于掌握多线程编程中一些重要的概念和技术,如任务调度、线程同步和并发控制。
github地址:
ThreadPool/ThreadPool.h at master · progschj/ThreadPool · GitHub
源码展示
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>
class ThreadPool {
public:
ThreadPool(size_t);
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>;
~ThreadPool();
private:
// need to keep track of threads so we can join them
std::vector< std::thread > workers;
// the task queue
std::queue< std::function<void()> > tasks;
// synchronization
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
: stop(false)
{
for(size_t i = 0;i<threads;++i)
workers.emplace_back(
[this]
{
for(;;)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this]{ return this->stop || !this->tasks.empty(); });
if(this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
}
);
}
// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
// don't allow enqueueing after stopping the pool
if(stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task](){ (*task)(); });
}
condition.notify_one();
return res;
}
// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for(std::thread &worker: workers)
worker.join();
}
#endif
1. 头文件和依赖
#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>
这部分代码包含了实现线程池所需的标准库头文件:
vector
:用于存储线程对象。queue
:用于任务队列。memory
:用于智能指针管理任务。thread
:用于线程管理。mutex
和condition_variable
:用于线程同步。future
:用于处理异步任务和返回值。functional
:用于存储和调用任意可调用对象(函数、lambda等)。stdexcept
:用于抛出异常。
1. <vector>
- 作用:
std::vector
是一个动态数组容器,能够在需要时自动扩展大小。 - 在代码中的作用:用于存储线程对象 (
std::vector<std::thread> workers
)。线程池在初始化时会创建多个线程,并将这些线程对象存储在std::vector
中,以便在析构时可以遍历和管理这些线程。
2. <queue>
- 作用:
std::queue
是一个基于 FIFO(先进先出)原则的容器适配器,提供了标准的队列操作,比如push
、pop
和front
。 - 在代码中的作用:用于实现任务队列 (
std::queue<std::function<void()>> tasks
)。当新的任务通过enqueue
方法添加时,它们会被放入这个队列中,并由线程池中的线程按顺序取出并执行。
3. <memory>
- 作用:
std::shared_ptr
和其他智能指针的定义包含在<memory>
中,用于自动管理动态分配的对象生命周期,避免手动管理内存。 - 在代码中的作用:用于管理包装任务的
std::packaged_task
对象 (std::make_shared<std::packaged_task<return_type()>>
) 的生命周期。std::shared_ptr
确保任务对象在没有引用时自动销毁,避免内存泄漏。
4. <thread>
- 作用:
std::thread
提供了一个类,用于创建和管理线程。线程是独立的执行路径,可以与主线程并发运行。 - 在代码中的作用:用于创建线程对象 (
std::vector<std::thread> workers
)。在线程池中,多个线程对象被创建并存储在workers
容器中,每个线程在初始化时启动,执行从任务队列中获取的任务。
5. <mutex>
和 <condition_variable>
- 作用:
std::mutex
:提供互斥锁,用于保护共享数据的访问,以防止数据竞争(race conditions)。std::condition_variable
:提供条件变量,允许线程等待某个条件发生(例如任务队列非空)并唤醒。
- 在代码中的作用:
std::mutex
用于保护任务队列的访问 (std::mutex queue_mutex
),确保同一时间只有一个线程可以对任务队列进行操作(如添加或移除任务)。std::condition_variable
用于同步线程,当任务队列为空时,线程会等待任务添加 (condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); });
);当有新任务添加时,线程会被唤醒执行任务 (condition.notify_one()
或condition.notify_all()
).
6. <future>
- 作用:
std::future
提供了一种机制,用于异步获取结果。它与std::promise
或std::packaged_task
配合使用,实现任务的异步返回值。 - 在代码中的作用:
- 在
enqueue
方法中,std::packaged_task<return_type()>
被用于包装一个可调用对象(如函数),并返回一个std::future
对象,调用者可以使用这个std::future
来获取任务的执行结果。这样,线程池的使用者可以提交任务并继续执行其他工作,然后在将来某个时刻检查任务的执行结果。
- 在
7. <functional>
- 作用:
std::function
是一个通用的多态函数封装器,能够存储和调用任意可调用对象,包括函数指针、lambda 表达式、bind 表达式或其他函数对象。 - 在代码中的作用:用于存储要在线程池中执行的任务 (
std::queue<std::function<void()>> tasks
)。任务被封装成std::function<void()>
对象并存储在任务队列中,线程池的工作线程会从队列中取出这些任务并执行。
8. <stdexcept>
- 作用:提供了标准的异常类,例如
std::runtime_error
,用于报告运行时错误。 - 在代码中的作用:在
enqueue
方法中,当线程池已经停止(stop
为true
)时,如果试图添加新的任务,代码会抛出一个std::runtime_error
异常 (throw std::runtime_error("enqueue on stopped ThreadPool");
),以防止在停止的线程池上继续执行任务。
2. ThreadPool 类的定义
class ThreadPool {
public:
ThreadPool(size_t);
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>;
~ThreadPool();
private:
std::vector<std::thread> workers; // 用于存储线程对象
std::queue<std::function<void()>> tasks; // 任务队列
std::mutex queue_mutex; // 用于保护任务队列的互斥锁
std::condition_variable condition; // 用于线程间同步的条件变量
bool stop; // 控制线程池停止的标志
};
workers
:这是一个存储线程对象的vector
,线程池中的每个线程都会在这个容器中保存。tasks
:这是一个任务队列,存储着需要线程执行的任务。queue_mutex
:互斥锁,用于保护对任务队列的访问,以防止多个线程同时修改队列导致数据竞争。condition
:条件变量,用于阻塞线程并唤醒它们(例如,当任务队列有新任务时,唤醒空闲线程去处理任务)。stop
:布尔变量,用于指示线程池是否应该停止。它在析构函数中被设为true
,并且在任务入队和线程工作时检查它的状态。
1. std::vector<std::thread> workers;
-
作用:
workers
是一个std::vector
容器,用于存储线程池中的所有线程对象。std::thread
是 C++ 标准库提供的线程类,表示一个可执行的线程。- 在
ThreadPool
构造函数中,线程池会根据指定的大小(即线程数)创建多个线程对象,并将它们存储在这个vector
中。
-
如何工作:
- 每个线程对象在创建时都会绑定一个任务循环,线程在这个循环中不断地从任务队列中取出任务并执行。
- 这些线程对象一直存在于
workers
容器中,直到线程池的析构函数被调用,析构函数会等待所有线程执行完毕后再退出(通过worker.join()
)。
-
总结:
workers
是线程池的核心部分,它保存了线程池中所有的线程,这些线程在创建后会一直运行,等待任务队列中的新任务。
2. std::queue<std::function<void()>> tasks;
-
作用:
tasks
是一个任务队列,用于存储需要线程执行的任务。每个任务被封装成一个std::function<void()>
对象。std::function<void()>
是一个通用的函数包装器,可以存储任意可调用对象(如普通函数、lambda 表达式、绑定表达式等),并提供一致的调用接口。
-
如何工作:
- 当调用
enqueue
方法时,新的任务会被添加到这个队列中(通过tasks.emplace()
)。 - 每个线程都会从这个队列中取出任务进行执行(通过
tasks.pop()
),如果任务队列为空,线程会等待直到有新任务加入。
- 当调用
-
总结:
tasks
是线程池的任务存储区,线程池中的线程会从这个队列中取出任务并执行。
3. std::mutex queue_mutex;
-
作用:
queue_mutex
是一个互斥锁(std::mutex
),用于保护对tasks
队列的访问。互斥锁的作用是防止多个线程同时访问和修改共享数据(在这里是任务队列),从而避免数据竞争问题。
-
如何工作:
- 每当一个线程需要向任务队列中添加任务或从任务队列中取出任务时,它首先会锁定这个互斥锁(通过
std::unique_lock<std::mutex> lock(queue_mutex);
)。 - 锁定互斥锁后,线程可以安全地访问和修改任务队列。操作完成后,互斥锁会被释放,以便其他线程可以继续访问任务队列。
- 每当一个线程需要向任务队列中添加任务或从任务队列中取出任务时,它首先会锁定这个互斥锁(通过
-
总结:
queue_mutex
确保了任务队列的安全访问,防止多个线程同时操作队列时出现的竞态条件。
4. std::condition_variable condition;
-
作用:
condition
是一个条件变量(std::condition_variable
),用于在线程之间进行同步。条件变量允许线程等待特定的条件发生,并在条件满足时被唤醒。
-
如何工作:
- 当线程池中的一个线程发现任务队列为空时,它会进入等待状态(通过
condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); });
)。在等待过程中,线程会释放queue_mutex
,这样其他线程仍然可以向任务队列中添加任务。 - 一旦有新任务加入队列,
enqueue
方法会调用condition.notify_one()
或condition.notify_all()
,唤醒一个或所有等待的线程。 - 被唤醒的线程会重新检查队列状态,如果队列中有任务,它们将继续执行;否则会再次进入等待状态。
- 当线程池中的一个线程发现任务队列为空时,它会进入等待状态(通过
-
总结:
condition
是线程池中线程之间的信号机制,确保当任务队列为空时,线程可以进入等待状态,当有新任务时,线程能及时被唤醒。
5. bool stop;
-
作用:
stop
是一个布尔标志,用于控制线程池的停止。当stop
被设置为true
时,线程池会停止接收新的任务,并让所有线程在完成当前任务后退出。
-
如何工作:
- 在线程池的构造函数中,
stop
初始化为false
,表示线程池正在运行。 - 在
enqueue
方法中,会检查stop
标志的状态,如果stop
为true
,则抛出异常,防止在停止的线程池中添加新任务。 - 在线程池的析构函数中,
stop
被设置为true
,并调用condition.notify_all()
唤醒所有等待中的线程。这些线程在被唤醒后会检查stop
标志,并在发现标志为true
且任务队列为空时退出。
- 在线程池的构造函数中,
-
总结:
stop
控制线程池的生命周期,确保线程池在停止时能够安全地退出,并且不再接收新任务。
3. 构造函数
inline ThreadPool::ThreadPool(size_t threads)
: stop(false)
{
for(size_t i = 0; i < threads; ++i)
workers.emplace_back(
[this]
{
for(;;)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this]{ return this->stop || !this->tasks.empty(); });
if(this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
}
);
}
- 初始化
stop
:构造函数中首先初始化stop
为false
,表示线程池运行中。 - 创建
threads
个线程:通过emplace_back
向workers
向量中添加threads
个线程对象。每个线程执行一个无限循环(for(;;)
),不断从任务队列中取出任务执行。 - 等待任务:线程会等待,直到条件变量触发(即有任务入队或线程池停止)。
condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); });
,这个条件表达式会在以下两种情况下触发:- 线程池被停止(
this->stop
为true
)。 - 任务队列不为空。
- 线程池被停止(
- 取任务执行:如果线程池停止并且任务队列为空,线程退出循环(
return
);否则从任务队列中取出任务并执行
这段代码实现了 ThreadPool
类的构造函数,其主要任务是初始化线程池并启动指定数量的工作线程。我们将逐步详细解析这段代码的每个部分,以更好地理解其工作原理。
1. 初始化 stop
变量
inline ThreadPool::ThreadPool(size_t threads)
: stop(false)
{
...
}
stop(false)
:在初始化列表中,stop
被设置为 false
。这是为了确保线程池在创建时处于运行状态,而不是停止状态。stop
变量是一个布尔值,用来控制线程池的生命周期。在构造函数中将其初始化为 false
,表示线程池可以接收任务并处理它们。
2. 创建 threads
个线程
for(size_t i = 0; i < threads; ++i)
workers.emplace_back(
[this]
{
for(;;)
{
...
}
}
);
-
workers.emplace_back(...)
:这里使用了emplace_back
方法向workers
向量中添加线程对象。emplace_back
方法直接在容器末尾构造对象,从而避免了不必要的拷贝或移动操作。这个方法会创建一个线程,并将线程对象存储在workers
容器中。 -
[this]
捕获列表:[this]
表示捕获当前对象的指针,从而在 lambda 表达式中可以访问ThreadPool
的成员变量和方法。 -
for(;;)
无限循环:线程的主循环,通过无限循环的形式(for(;;)
)实现。这个循环确保线程始终运行,直到线程池停止并且所有任务都处理完毕。在这个循环中,线程会不断地从任务队列中取出任务并执行。
3. 等待任务
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this]{ return this->stop || !this->tasks.empty(); });
if(this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
-
std::function<void()> task
:定义一个std::function<void()>
类型的变量task
,用于保存从任务队列中取出的任务。std::function<void()>
是一个通用的函数包装器,可以存储任意类型的可调用对象,并提供一致的调用接口。 -
std::unique_lock<std::mutex> lock(this->queue_mutex)
:创建一个std::unique_lock
对象,并将其与queue_mutex
关联。unique_lock
是一个智能锁,它会在构造时锁定互斥锁,在析构时自动释放锁。这里的作用是保护对tasks
队列的访问,确保只有一个线程能访问或修改队列,防止数据竞争。 -
condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); });
:- 线程在调用
condition.wait
时,会自动释放上面获取的queue_mutex
,并进入等待状态,直到被通知或满足某个条件。 - 这里的条件是一个 lambda 表达式,它会在以下两种情况下返回
true
,从而唤醒线程:this->stop == true
:表示线程池已经停止,不再接收新任务。!this->tasks.empty()
:表示任务队列中有任务可供执行。
- 线程在调用
-
if(this->stop && this->tasks.empty()) return;
:- 当线程被唤醒时,会首先检查
stop
标志。如果stop
为true
且任务队列为空,线程会退出循环并终止执行。 - 这意味着当线程池被停止且所有任务都已处理完毕时,线程将不再执行任何任务并结束。
- 当线程被唤醒时,会首先检查
-
task = std::move(this->tasks.front()); this->tasks.pop();
:- 当满足
condition.wait
的条件时,线程会从任务队列中取出一个任务并将其赋值给task
变量。这里使用了std::move
,将队列中的任务移动到task
,避免了不必要的拷贝。 - 取出任务后,线程会调用
pop
方法将任务从队列中移除。
- 当满足
4. 取出并执行任务
task();
task()
:一旦线程获取了一个任务,它会直接调用 task()
执行任务。这是一个调用操作符,执行 std::function<void()>
对象中存储的函数或可调用对象。
4. enqueue
方法
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
if(stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task](){ (*task)(); });
}
condition.notify_one();
return res;
}
- 模板参数:这个方法是一个模板方法,允许将任意可调用对象(如函数、lambda表达式)及其参数传递给线程池。
- 创建任务:通过
std::bind
将函数和参数绑定成一个可调用对象,并将其封装在std::packaged_task
中,生成一个共享指针task
。std::packaged_task
可以异步调用,并将结果保存在std::future
中。 - 任务入队:将任务添加到任务队列中(
tasks.emplace
),并通知一个等待中的线程来执行任务(condition.notify_one
)。 - 返回
std::future
:返回一个std::future
对象,调用者可以通过它获取任务的执行结果。
这个 enqueue
方法是 ThreadPool
类中一个非常重要的功能,它允许用户将任意的任务提交到线程池中执行,并且返回一个 std::future
对象,用于获取任务的执行结果。让我们逐步解析这段代码,理解它的工作原理和每个步骤的意义。
1. 模板参数和返回类型推导
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
-
模板参数:
enqueue
方法是一个模板函数,F
表示任意的可调用对象类型,Args...
表示零个或多个参数。这意味着你可以传递任何函数、lambda 表达式、函数对象,以及任意数量的参数给这个方法。 -
返回类型推导:
- 该方法的返回类型被定义为
std::future<typename std::result_of<F(Args...)>::type>
。 std::result_of<F(Args...)>::type
是一个类型萃取器(type trait),它用于推导出调用F
(带参数Args...
)时返回的结果类型。- 例如,如果你传递一个返回
int
的函数,std::result_of<F(Args...)>::type
将会是int
,因此enqueue
方法的返回类型将是std::future<int>
。
- 该方法的返回类型被定义为
2. 创建任务
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
-
return_type
:这是任务的返回类型,它等同于std::result_of<F(Args...)>::type
。这个类型用于定义任务的std::packaged_task
。 -
std::packaged_task
:std::packaged_task<return_type()>
是一个模板类,它封装了一个可调用对象,并允许在独立的线程中异步执行该对象。std::packaged_task
可以与std::future
一起工作,以便在任务完成时检索返回值。- 在这里,
std::packaged_task<return_type()>
被用来封装任务函数f
和它的参数args...
,以便任务可以在线程池中的某个线程中被执行。
-
std::bind
和std::forward
:std::bind
用于将函数f
与其参数args...
绑定在一起,生成一个可调用对象(函数对象),并将其传递给std::packaged_task
。std::bind
的返回值是一个std::function
,它表示一个可以调用的函数。std::forward
用于完美转发参数,确保保持参数的左右值属性。这样,当args...
是右值时,它们仍然会被以右值的方式传递。
-
共享指针
task
:std::make_shared
用于创建std::packaged_task
的共享指针task
。这样做是为了确保task
的生命周期被正确管理,并且可以在线程池中的多个线程之间安全地共享。
3. 任务入队
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
if(stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task](){ (*task)(); });
}
condition.notify_one();
-
获取
std::future
:- 在任务执行之前,通过
task->get_future()
获取与std::packaged_task
关联的std::future<return_type>
对象res
。这个std::future
对象允许调用者在任务完成后获取其返回值。
- 在任务执行之前,通过
-
锁定互斥锁:
- 使用
std::unique_lock<std::mutex> lock(queue_mutex);
锁定queue_mutex
互斥锁,确保只有一个线程可以安全地访问和修改任务队列tasks
。这避免了多个线程同时访问任务队列导致的数据竞争问题。
- 使用
-
检查线程池状态:
- 在任务入队之前,首先检查
stop
标志。如果stop
为true
,说明线程池已经停止,这时再尝试添加任务将抛出std::runtime_error
异常,防止在已停止的线程池中继续执行任务。
- 在任务入队之前,首先检查
-
任务入队:
- 通过
tasks.emplace([task](){ (*task)(); });
将任务添加到任务队列tasks
中。这里使用了 lambda 表达式[task](){ (*task)(); }
将task
传递给队列。 - 当某个线程从队列中取出任务时,它会执行
task()
,从而实际运行任务的内容。
- 通过
-
通知线程:
- 调用
condition.notify_one();
通知一个等待中的线程有新任务被加入队列。被通知的线程会从等待状态唤醒,取出队列中的任务并执行它。
- 调用
4. 返回 std::future
return res;
- 最后,
enqueue
方法返回res
,即与task
关联的std::future<return_type>
对象。调用者可以使用这个std::future
来等待任务的完成,并获取任务的返回值。
总结
-
模板参数与完美转发:
enqueue
方法是一个模板函数,可以接收任意的可调用对象及其参数。通过完美转发,确保传递给std::bind
的参数能够保留其原始的左右值属性。 -
任务封装与异步执行:通过
std::packaged_task
封装任务,并将其放入任务队列。这种封装允许任务在异步执行时生成一个std::future
,使得调用者可以获取任务的结果。 -
线程安全与同步:在任务入队时,使用互斥锁确保对任务队列的访问是线程安全的。同时,通过条件变量通知线程池中的线程,当有新任务可执行时,唤醒等待的线程。
-
异常处理:在线程池停止后,防止新的任务被添加,以避免在停止的线程池上进行操作,确保线程池的安全关闭。
5. 析构函数
inline ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for(std::thread &worker: workers)
worker.join();
}
- 设置停止标志:在析构函数中,首先锁定互斥锁并将
stop
设置为true
,这会使所有线程在完成当前任务后退出。 - 通知所有线程:通过
condition.notify_all()
唤醒所有等待中的线程,以便它们可以检查stop
标志并退出。 - 等待线程结束:使用
join
等待所有线程执行完毕,确保线程池中的所有线程都被正确销毁。
ThreadPool
类的析构函数负责清理线程池中的资源,确保所有线程都能安全地结束并且不再处理任何新任务。我们将逐步详细解析这段代码,以理解其工作原理和每个步骤的意义。
1. 设置停止标志
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
-
锁定互斥锁:
- 通过
std::unique_lock<std::mutex> lock(queue_mutex);
,析构函数首先获取了对queue_mutex
的独占访问权。这个锁定操作是必要的,以确保stop
标志的设置和对任务队列的访问是线程安全的。 std::unique_lock
提供了一种灵活的锁定机制,它在构造时自动锁定互斥锁,并在销毁时自动释放锁。
- 通过
-
设置
stop
标志:stop = true;
这一行代码将stop
标志设置为true
。这个标志对于线程池的所有线程来说是一个信号,指示线程池正在关闭,不再接受新任务。- 线程池中的工作线程会定期检查这个标志,发现
stop
为true
后,会终止任务循环并退出。这就保证了线程池在析构时能够正确地停止所有线程的执行。
2. 通知所有线程
condition.notify_all();
-
通知所有等待的线程:
condition.notify_all();
会唤醒所有因condition.wait
而被阻塞的线程。通常,这些线程正在等待任务队列中有新任务到来,或者线程池被停止。- 由于
stop
标志已经被设置为true
,被唤醒的线程在检查stop
后,将不再继续等待或执行新任务,而是会立即退出它们的任务循环。
-
确保线程及时退出:
- 如果有多个线程在等待任务,此时调用
notify_all()
可以确保所有等待的线程都能及时被唤醒并退出,从而避免析构函数中的join
操作陷入长时间的等待。 - 这一步骤至关重要,因为它确保了线程池能够快速、干净地关闭。
- 如果有多个线程在等待任务,此时调用
3. 等待所有线程结束
for(std::thread &worker: workers)
worker.join();
-
遍历线程集合:
for(std::thread &worker: workers)
这一行代码开始遍历workers
向量中存储的所有线程对象。- 每个线程对象代表线程池中一个正在运行的线程。
-
等待线程结束:
worker.join();
用于等待线程执行完毕。join()
会阻塞调用它的线程(在这里是主线程),直到被join
的线程完成执行。- 在
join()
被调用之前,析构函数会等待该线程执行完当前任务并退出任务循环。 - 一旦
join()
返回,表示该线程已经成功完成并且可以安全地销毁。
-
确保线程池完全关闭:
- 通过遍历
workers
向量并调用join()
,析构函数确保了所有线程都已经完成其任务并且不再运行。这避免了线程资源泄漏,确保程序的稳定性。
- 通过遍历
4. 代码总结
-
线程池停止:
- 首先,析构函数通过锁定互斥锁并设置
stop
标志,指示线程池已经停止,任何剩余的任务不会再被执行。
- 首先,析构函数通过锁定互斥锁并设置
-
唤醒所有线程:
- 通过
condition.notify_all()
,析构函数唤醒了所有正在等待任务的线程,使它们能够及时退出。
- 通过
-
线程销毁:
- 通过遍历
workers
并调用join()
,析构函数确保了所有线程都已经正确地完成并被销毁,从而安全地释放线程池资源。
- 通过遍历
5. 总结
ThreadPool
类的析构函数负责安全地停止线程池中的所有线程并清理资源。它通过以下步骤实现了这一点:
- 锁定互斥锁并设置
stop
标志,通知线程池停止接受新任务并准备关闭。 - 调用
condition.notify_all()
唤醒所有等待中的线程,使它们能够检查stop
标志并退出任务循环。 - 遍历所有线程对象,调用
join()
,等待每个线程完成执行并销毁它们,确保没有悬挂的线程和资源泄漏。
通过这种设计,ThreadPool
可以安全、优雅地关闭,避免了线程资源的泄露和不必要的线程阻塞问题。
总结
这篇文章详细解析了一个用C++编写的线程池实现,透过对代码的逐步讲解,我不仅加深了对多线程编程的理解,还学到了在实际项目中如何有效地管理并发任务。
多线程管理的有效实践
在这篇文章中,我学到的第一个关键点是如何通过线程池来管理多线程任务。相比于为每个任务单独创建一个线程,线程池的设计显得更加高效。线程池提前创建一定数量的线程,然后将任务交给这些线程处理,避免了频繁创建和销毁线程的开销。这种方法不仅提升了程序的性能,还减少了系统资源的浪费。
任务调度与线程同步
文章通过对enqueue
方法的解析,让我了解到如何在多线程环境中进行任务调度。这个方法使用了模板参数,使得线程池可以接受任意类型的任务并将其异步执行。同时,文章介绍了如何通过std::future
来获取异步任务的结果,这让我对C++中的异步编程有了更深的认识。
此外,线程同步在多线程编程中至关重要。文章通过讲解互斥锁(std::mutex
)和条件变量(std::condition_variable
)的使用,展示了如何防止数据竞争,并确保线程之间的正确通信。这些技术在保证程序正确性的同时,也提高了并发执行的效率。
资源管理与安全退出
析构函数的设计也是文章的一大亮点。在析构函数中,通过设置stop
标志并唤醒所有等待的线程,线程池能够安全地停止运行,并确保所有线程在完成当前任务后正确退出。这种设计不仅避免了资源泄露,还保证了程序在多线程环境下的稳定性。
实际应用中的启示
通过学习这个线程池的实现,我意识到在实际项目中,如何设计一个高效且健壮的多线程系统非常重要。线程池提供了一种解决方案,使得我们可以在多核处理器上更好地利用并发能力,同时确保资源的有效管理和任务的正确调度。
使用实例
#include <iostream>
#include <vector>
#include <thread>
#include <chrono>
#include <random>
#include "ThreadPool.h" // 假设 ThreadPool 的实现保存在一个头文件中
// 一个模拟的复杂任务,计算平方和,模拟长时间计算
int complexTask(int num) {
std::this_thread::sleep_for(std::chrono::milliseconds(100 + (num % 10) * 50)); // 模拟计算时间
return num * num;
}
int main() {
// 创建一个具有 8 个线程的线程池
ThreadPool pool(8);
// 将任务结果存储在 futures 向量中
std::vector<std::future<int>> futures;
// 提交 100 个任务到线程池
for (int i = 0; i < 100; ++i) {
futures.emplace_back(
pool.enqueue([i] {
return complexTask(i);
})
);
}
// 处理并显示每个任务的结果
int total_sum = 0;
for (auto &&result : futures) {
int value = result.get(); // 获取任务的结果
total_sum += value;
std::cout << "Task result: " << value << std::endl;
}
std::cout << "Total sum of all tasks: " << total_sum << std::endl;
return 0;
}
总结
不仅帮助我理解了C++线程池的内部机制,还让我掌握了多线程编程中的一些重要技巧,如任务调度、线程同步和资源管理。在未来的编程实践中,我可以将这些知识应用到更复杂的多线程项目中,提高程序的性能和稳定性。通过学习这个线程池的实现,我深刻体会到,良好的代码设计对于解决复杂问题、提升软件质量至关重要。