linux网络编程11——线程池
1. 线程池
1.1 池化技术原理
池化技术
当一个资源或对象的创建或者销毁的开销较大时,可以使用池化技术来保持一定数量的创建好的对象以供随时取用,于是就有了池式结构。常见的池式结构包括线程池、内存池和连接池。
池化技术应用的前提条件主要包括三个,总结一下,以供记忆:
- 资源是可复用的。在计算机中,大部分资源都是可复用的,例如内存、线程、进程、tcp连接等。
- 资源可以长时间保存。像分配的内存块、已创建的线程、创建的TCP连接,都是可以长时间存在。
- 资源创建和销毁的开销较大。如动态内存分配,涉及系统调用和上下文切换,开销较大。
1.2 线程池原理
线程池
线程池是一种维持管理固定数量线程的池式结构。本文讲解了线程池的结构设计和实现。
下图是一个线程池的典型机构:
其中主要包括三个部件:
- 生产者线程,是线程池的使用者,它通过向任务队列传递任务交给消费者线程执行。
- 任务队列。是一个先进先出的队列,由于插入和删除元素都只需要在一段进行读写,因此临界区很短,比较高效。
- 消费者线程,负责从任务队列中取出任务并执行。
下图是一个消费者线程的状态转换图。从图中可以看到,当任务队列不为空时,消费者线程就不断从中取出任务并执行,否则就会陷入等待态。
线程池的作用
- 性能优化,异步执行耗时任务,不过度占用核心线程,能够提高响应速度和性能。耗时任务可来自于:IO密集型任务(如网络IO和磁盘IO)、CPU密集型任务。
- 并发执行,充分利用多核,并发执行核心业务。
- 复用线程资源,减少创建和销毁线程的开销。
线程数量如何确定
线程数量取决于任务类型和cpu核心数量。
- cpu密集型:cpu核心数目
- io密集型: cpu核心数目 * (线程等待时间+cpu运算时间) / cpu运算时间
1.3 线程池的应用场景
1.3.1 nginx中的线程池
nginx中的线程池被用于处理文件IO。nginx主要有两个功能,分别是反向代理和提供静态文件资源。
在文件IO中,需要通过内核调用发送数据,涉及到内核态和用户态之间的上下文切换以及数据向内核缓冲区的传输,如果这些操作放在主线程执行将会导致一定的等待时间,减低了响应速度。
因此nginx使用线程池来分离耗时的操作,尤其时文件IO,避免其对主线程的干扰,从而提升 Nginx 的整体性能和响应效率。
1.3.2 redis中的线程池
Redis 默认情况下是单线程的,即它仅开启一个主线程来处理所有的请求。这也是 Redis 一直以来的设计理念,即使用单线程模型实现高性能。
不过,Redis 在 6.0 版本中引入了多线程支持,用于处理客户端的网络读写(如数据的读取、协议解析等),这可以提升在高并发环境下的网络 I/O 性能。
redis的线程池主要用于IO读写和协议解析,不负责执行操作。
1.4 线程池的实现
下面是一个使用C++11写的使用阻塞队列实现的线程池。
先看代码,然后进行讲解。
#include <stdio.h>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <thread>
#include <queue>
#include <vector>
typedef std::function<void(void)> Task;
class ThreadPool
{
public:
ThreadPool(size_t thread_count, size_t max_queue_size) :
stop_(false), max_queue_size_(max_queue_size)
{
for (size_t i = 0; i < thread_count; ++i)
{
workers_.emplace_back(std::bind(&ThreadPool::worker_run, this));
}
}
void enqueue(Task task)
{
std::unique_lock<std::mutex> lock(mutex_);
cond2_.wait(lock, [this] { return stop_ || queue_.size() < max_queue_size_; });
if (stop_) return;
queue_.emplace(std::move(task));
cond1_.notify_one();
}
~ThreadPool()
{
{
std::lock_guard<std::mutex> lock(mutex_);
stop_ = true;
}
cond1_.notify_all();
cond2_.notify_all();
for (auto &thread : workers_)
{
thread.join();
}
}
private:
void worker_run()
{
while (true)
{
Task task;
{
// 出队操作
std::unique_lock<std::mutex> lock(mutex_);
cond1_.wait(lock, [this] { return stop_ || !queue_.empty(); });
if (stop_) return;
task = std::move(queue_.front());
queue_.pop();
cond2_.notify_one();
}
task();
}
}
private:
std::vector<std::thread> workers_; // 消费者线程
std::mutex mutex_;
std::condition_variable cond1_;
std::condition_variable cond2_;
bool stop_; // 是否停止运行
size_t max_queue_size_; // 任务队列最大长度
std::queue<Task> queue_; // 任务队列
};
内部主要组件
ThreadPool类中,在功能方面主要包括一个vector<thread>和一个queue<Task>,前者用来保存消费者线程,后者用来保存线程的任务。
另外,需要保证线程安全,因此需要一个mutex和两个条件变量,两个条件变量分别对应的消费者线程的等待队列和生产者线程的等待队列。
最后,stop_成员变量用来通知消费者线程结束执行,max_queue_size_成员变量用来指出queue_的最大尺寸。
对外接口enqueue
对外主要提供一个入队操作enqueue,在本实现中,设置了queue队列最大尺寸,因此该操作可能阻塞线程。
消费者线程主函数worker_run_
消费者线程的主要工作就是不断从任务队列中取出任务并执行。但任务队列为空时,需要消费者线程阻塞。
线程安全保证
通过std::mutex和std::unique_lock,以及std::condition_variable,可以保证线程安全。具体是这样做的:
- 生产者线程提交任务时,先持有锁,然后检查任务队列是否已满,如果已满则陷入阻塞,否则将任务插入队列。然后通知消费者线程的等待队列。
- 消费者线程获取任务时,先持有锁,然后检查任务队列是否为空,如果为空则陷入阻塞,否则获取一个任务,然后通知生产者线程的等待队列。
线程池的关闭
线程池的关闭,需要先保证每个线程都被关闭,可以通过一个共享变量值的变化通知消费者线程和生产者线程,即设置一个stop变量。
当需要关闭线程池时,设置stop为true,然后当生产者消费者线程看到stop为true时就不再阻塞,并且消费者线程会退出执行,而生产者线程为退出入队函数。
现在的问题在于,我们要保证stop的线程安全以及当stop的值发生变化时,其它线程能够即使看到。对于第一点,我们可以使用std::mutex和std::lock_guard来保证。对于第二点,我们可以让生产者和消费者线程在进入阻塞之前和退出阻塞之后都检查stop的值,如果stop值为true则直接return。
最后在关闭线程时,需要先更改stop的值为true,然后唤醒所有的等待中的线程。
1.5 总结
那么这个线程池就讲到这里。坦白说,这是一个非常基础的线程池,没有尝试使用无锁编程。线程池关闭可能被无限延迟(关闭一般是在程序结束运行时,所以一般问题不大)。另外对于任务类型,设计的也比较简单,没有考虑future和promise等,这个可以留给用户提供的task去自定义。
学习参考
学习更多相关知识请参考零声 github。