当前位置: 首页 > article >正文

linux网络编程11——线程池

1. 线程池

1.1 池化技术原理

池化技术

当一个资源或对象的创建或者销毁的开销较大时,可以使用池化技术来保持一定数量的创建好的对象以供随时取用,于是就有了池式结构。常见的池式结构包括线程池、内存池和连接池。

池化技术应用的前提条件主要包括三个,总结一下,以供记忆:

  1. 资源是可复用的。在计算机中,大部分资源都是可复用的,例如内存、线程、进程、tcp连接等。
  2. 资源可以长时间保存。像分配的内存块、已创建的线程、创建的TCP连接,都是可以长时间存在。
  3. 资源创建和销毁的开销较大。如动态内存分配,涉及系统调用和上下文切换,开销较大。

1.2 线程池原理

线程池

线程池是一种维持管理固定数量线程的池式结构。本文讲解了线程池的结构设计和实现。

下图是一个线程池的典型机构:

在这里插入图片描述

其中主要包括三个部件:

  • 生产者线程,是线程池的使用者,它通过向任务队列传递任务交给消费者线程执行。
  • 任务队列。是一个先进先出的队列,由于插入和删除元素都只需要在一段进行读写,因此临界区很短,比较高效。
  • 消费者线程,负责从任务队列中取出任务并执行。

下图是一个消费者线程的状态转换图。从图中可以看到,当任务队列不为空时,消费者线程就不断从中取出任务并执行,否则就会陷入等待态。

在这里插入图片描述

线程池的作用

  1. 性能优化,异步执行耗时任务,不过度占用核心线程,能够提高响应速度和性能。耗时任务可来自于:IO密集型任务(如网络IO和磁盘IO)、CPU密集型任务。
  2. 并发执行,充分利用多核,并发执行核心业务。
  3. 复用线程资源,减少创建和销毁线程的开销。

线程数量如何确定

线程数量取决于任务类型cpu核心数量

  • cpu密集型:cpu核心数目
  • io密集型: cpu核心数目 * (线程等待时间+cpu运算时间) / cpu运算时间

1.3 线程池的应用场景

1.3.1 nginx中的线程池

nginx中的线程池被用于处理文件IO。nginx主要有两个功能,分别是反向代理和提供静态文件资源。

在文件IO中,需要通过内核调用发送数据,涉及到内核态和用户态之间的上下文切换以及数据向内核缓冲区的传输,如果这些操作放在主线程执行将会导致一定的等待时间,减低了响应速度。

因此nginx使用线程池来分离耗时的操作,尤其时文件IO,避免其对主线程的干扰,从而提升 Nginx 的整体性能和响应效率。

1.3.2 redis中的线程池

Redis 默认情况下是单线程的,即它仅开启一个主线程来处理所有的请求。这也是 Redis 一直以来的设计理念,即使用单线程模型实现高性能。

不过,Redis 在 6.0 版本中引入了多线程支持,用于处理客户端的网络读写(如数据的读取、协议解析等),这可以提升在高并发环境下的网络 I/O 性能。

redis的线程池主要用于IO读写和协议解析,不负责执行操作。

1.4 线程池的实现

下面是一个使用C++11写的使用阻塞队列实现的线程池。

先看代码,然后进行讲解。

#include <stdio.h>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <thread>
#include <queue>
#include <vector>

typedef std::function<void(void)> Task;

class ThreadPool
{
public:
    ThreadPool(size_t thread_count, size_t max_queue_size) :
        stop_(false), max_queue_size_(max_queue_size)
    {
        for (size_t i = 0; i < thread_count; ++i)
        {
            workers_.emplace_back(std::bind(&ThreadPool::worker_run, this));
        }
    }

    void enqueue(Task task)
    {
        std::unique_lock<std::mutex> lock(mutex_);
        cond2_.wait(lock, [this] { return stop_ || queue_.size() < max_queue_size_; });
        if (stop_) return;

        queue_.emplace(std::move(task));
        cond1_.notify_one();
    }

    ~ThreadPool()
    {
        {
            std::lock_guard<std::mutex> lock(mutex_);
            stop_ = true;
        }
        
        cond1_.notify_all();
        cond2_.notify_all();
        for (auto &thread : workers_)
        {
            thread.join();
        }
    }

private:
    void worker_run()
    {
        while (true)
        {
            Task task;
            {
                // 出队操作
                std::unique_lock<std::mutex> lock(mutex_);
                cond1_.wait(lock, [this] { return stop_ || !queue_.empty(); });
                if (stop_) return;

                task = std::move(queue_.front());
                queue_.pop();
                cond2_.notify_one();
            }

            task();
        }
    }

private:
    std::vector<std::thread> workers_;  // 消费者线程

    std::mutex mutex_;
    std::condition_variable cond1_;
    std::condition_variable cond2_;
    
    bool stop_;                 // 是否停止运行
    size_t max_queue_size_;     // 任务队列最大长度
    std::queue<Task> queue_;    // 任务队列
};

内部主要组件

ThreadPool类中,在功能方面主要包括一个vector<thread>和一个queue<Task>,前者用来保存消费者线程,后者用来保存线程的任务。

另外,需要保证线程安全,因此需要一个mutex和两个条件变量,两个条件变量分别对应的消费者线程的等待队列和生产者线程的等待队列。

最后,stop_成员变量用来通知消费者线程结束执行,max_queue_size_成员变量用来指出queue_的最大尺寸。

对外接口enqueue

对外主要提供一个入队操作enqueue,在本实现中,设置了queue队列最大尺寸,因此该操作可能阻塞线程。

消费者线程主函数worker_run_

消费者线程的主要工作就是不断从任务队列中取出任务并执行。但任务队列为空时,需要消费者线程阻塞。

线程安全保证

通过std::mutex和std::unique_lock,以及std::condition_variable,可以保证线程安全。具体是这样做的:

  1. 生产者线程提交任务时,先持有锁,然后检查任务队列是否已满,如果已满则陷入阻塞,否则将任务插入队列。然后通知消费者线程的等待队列。
  2. 消费者线程获取任务时,先持有锁,然后检查任务队列是否为空,如果为空则陷入阻塞,否则获取一个任务,然后通知生产者线程的等待队列。

线程池的关闭

线程池的关闭,需要先保证每个线程都被关闭,可以通过一个共享变量值的变化通知消费者线程和生产者线程,即设置一个stop变量。

当需要关闭线程池时,设置stop为true,然后当生产者消费者线程看到stop为true时就不再阻塞,并且消费者线程会退出执行,而生产者线程为退出入队函数。

现在的问题在于,我们要保证stop的线程安全以及当stop的值发生变化时,其它线程能够即使看到。对于第一点,我们可以使用std::mutex和std::lock_guard来保证。对于第二点,我们可以让生产者和消费者线程在进入阻塞之前和退出阻塞之后都检查stop的值,如果stop值为true则直接return。

最后在关闭线程时,需要先更改stop的值为true,然后唤醒所有的等待中的线程。

1.5 总结

那么这个线程池就讲到这里。坦白说,这是一个非常基础的线程池,没有尝试使用无锁编程。线程池关闭可能被无限延迟(关闭一般是在程序结束运行时,所以一般问题不大)。另外对于任务类型,设计的也比较简单,没有考虑future和promise等,这个可以留给用户提供的task去自定义。

学习参考

学习更多相关知识请参考零声 github。


http://www.kler.cn/a/510411.html

相关文章:

  • OpenMP并行编程实例系列2 —— 并行结构
  • Web前端第一次作业
  • 第22篇 基于ARM A9处理器用汇编语言实现中断<四>
  • 汽车网络信息安全-ISO/SAE 21434解析(上)
  • 【JavaEE】Spring Web MVC
  • 差分(前缀和的逆运算)
  • 【MySQL】事务(二)
  • 二叉树OJ题:挑战与突破
  • springboot自动配置原理(高低版本比较)spring.factories文件的作用
  • RISC-V精简指令集
  • 雷电9最新版安装Magisk+LSPosd(新手速通)
  • 基于SSM的家庭记账本小程序设计与实现(LW+源码+讲解)
  • Git实用指南:忽略文件、命令别名、版本控制、撤销修改与标签管理
  • 国产编辑器EverEdit - 文字对齐
  • Golang学习笔记_27——单例模式
  • S4 HANA凭证更改记录
  • Xilinx FPGA :开发使用及 Tips 总结
  • K8S-Pod资源清单的编写,资源的增删改查,镜像的下载策略
  • 基于无线传感器网络的森林防火设备远程监控系统(论文+源码)
  • 根据进程id查看服务使用的垃圾收集器
  • 论文阅读:CosAE Learnable Fourier Series for Image Restoration
  • 大数据面试——引入
  • 【NextJS】PostgreSQL 遇上 Prisma ORM
  • 单链表的删除实战
  • NEC纪实 :2024全国机器人大赛 Robocon 常州工学院团队首战国三
  • QT笔记- Qt6.8.1 Android编程 添加AndroidManifest.xml文件以支持修改权限