当前位置: 首页 > article >正文

Linux 线程池

1.概念介绍

线程池是一种多线程处理形式,它维护着多个线程,这些线程处于等待状态,随时准备接受任务并执行。线程池的主要目的是为了提高系统的性能和资源利用率,避免在处理短时间任务时频繁创建和销毁线程所带来的开销。

线程池的优点

  1. 提高性能:避免了频繁创建和销毁线程的开销,因为线程的创建和销毁是比较耗时的操作。
  2. 控制资源:可以限制线程的数量,防止过多的线程竞争系统资源,导致系统性能下降甚至崩溃。
  3. 提高响应性:能够更快地响应新的任务请求,因为线程已经准备好,无需等待线程创建。
  4. 可管理性:线程池可以统一管理、分配、调优和监控其中的线程。

线程池的应用场景

  1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
  2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
  3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误。

2.线程池的实现

首先,为了方便使用互斥锁,条件变量和线程,我们需要将这些封装起来。

Mutex.hpp对线程进行封装,代码如下:

封装互斥锁

#include <iostream>
#include <pthread.h>
using namespace std;

class Mutex
{
public:
    Mutex(const Mutex&)=delete;
    const Mutex& operator=(const Mutex&)=delete;
    Mutex()
    {
        pthread_mutex_init(&_lock,nullptr);
    }
    ~Mutex()
    {
        pthread_mutex_destroy(&_lock);
    }
    void Lock()
    {
        pthread_mutex_lock(&_lock);
    }
    pthread_mutex_t * LockPtr()
    {
        return &_lock;
    }
    void Unlock()
    {
        pthread_mutex_unlock(&_lock);
    }
private:
    pthread_mutex_t _lock;
};
class LockGuard
{
    public:
    LockGuard(Mutex& m)
    :_mutex(m)
    {
        _mutex.Lock();
    }
    ~LockGuard()
    {
        _mutex.Unlock();
    }
    private:
    Mutex& _mutex;
};

封装条件变量

Cond.hpp对线程进行封装,代码如下:

#include"Mutex.hpp"
class Cond
{
    public:
    Cond()
    {
        pthread_cond_init(&_cond,nullptr);
    }
    ~Cond()
    {
        pthread_cond_destroy(&_cond);
    }
    void Wait(Mutex& mutex)
    {
        pthread_cond_wait(&_cond,mutex.LockPtr());
    }
    void Notify()
    {
        pthread_cond_signal(&_cond);
    }
    void NotifyAll()
    {
        pthread_cond_broadcast(&_cond);
    }
    private:
    pthread_cond_t _cond;
};

封装线程

Thread.hpp对线程进行封装,代码如下:

#include <pthread.h>
#include <iostream>
#include <functional>
#include <string>
#include <unistd.h>
using namespace std;
using func_t = function<void(string)>;
static int number = 1;
enum STATUS
{
    NEW,
    RUNNING,
    STOP
};
class Thread
{
private:
    static void *Routine(void *arg)
    {
        Thread *t = static_cast<Thread *>(arg);
        t->_func(t->_name);
        return nullptr;
    }

public:
    Thread(func_t func)
        : _func(func), _status(NEW), _joinable(true)
    {
        _name = "Thread-" + to_string(number++);
        _pid = getpid();
    }
    bool Start()
    {
        if (_status != RUNNING)
        {
            _status = RUNNING;
            int n = pthread_create(&_tid, nullptr, Routine, this);
            if (n != 0)
            {
                return false;
            }
            return true;
        }
        return false;
    }
    bool Stop()
    {
        if (_status == RUNNING)
        {
            _status = STOP;
            int n = pthread_cancel(_tid);
            if (n != 0)
            {
                return false;
            }
            return true;
        }
        return false;
    }
    bool Join()
    {
        if (_joinable)
        {
            _status = STOP;
            int n = pthread_join(_tid, nullptr);
            if (n != 0)
            {
                return false;
            }
            return true;
        }
        return false;
    }
    void Detach()
    {
        _joinable = false;
        pthread_detach(_tid);
    }
    string Name()
    {
        return _name;
    }
private:
    string _name;
    pthread_t _tid;
    pid_t _pid;
    STATUS _status;
    bool _joinable;
    func_t _func;
};

线程的成员变量

    string _name;
    pthread_t _tid;
    pid_t _pid;
    STATUS _status;
    bool _joinable;
    func_t _func;

我们知道创建线程的时候 pthread_create 函数原型如下

int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
                   void *(*start_routine) (void *), void *arg);

其中执行的函数 start_routine 类型为void*(* ) (void*),即函数参数和返回值都为 void* 类型,而我们封装类传入的函数_func为 void(string) 类型,所以我们不能直接使用_func,而需要进行二次封装,通过类型为void*(* ) (void*) 的Routine函数封装使用_func函数,而 pthread_create 可以使用Routine函数。

    static void *Routine(void *arg)
    {
        Thread *t = static_cast<Thread *>(arg);
        t->_func(t->_name);
        return nullptr;
    }
    bool Start()
    {
        if (_status != RUNNING)
        {
            _status = RUNNING;
            int n = pthread_create(&_tid, nullptr, Routine, this);
            if (n != 0)
            {
                return false;
            }
            return true;
        }
        return false;
    }

但是为什么写代码时我们要将Routine函数定义为静态函数呢?因为在类内定义时,Routine无形中多了一个参数,即this指针,实际上Routine函数类型为void*(* ) (thread<T>*,void*),这样就不满足使用 pthread_create 时需要的函数类型,所以我们需要把Routine函数定义为静态函数从而去掉第一个隐含参数。

但这样Routine函数就不可以访问类内的成员变量_func,所以我们在使用 pthread_create 时需要把 this 参数传过去,从而让Runfunc函数能访问到_func,从而执行_func函数。

封装线程池

线程池的成员变量

    vector<thread_t> _threads;
    int _num;
    int _wait_num;
    std::queue<T> _taskq; // 临界资源
    //控制器
    Mutex _lock;
    Cond _cond;

    bool _isrunning;

    static ThreadPool<T> *instance;
    static Mutex mutex; // 只用来保护单例

线程池的主要组件包括线程数组、任务队列和控制器。线程数组用来存放被创建的线程,任务队列将新任务添加到队列最后,并通知空闲线程可以从队列最前端取用任务执行,控制器管理着一个队列锁,其保护临界资源任务队列,保证线程间的互斥关系,以及一个信号量,保证线程间的同步关系。 

线程池的实现通常包括线程池初始化、任务提交、线程调度和线程销毁等步骤。

线程池初始化

private:
    bool IsEmpty() { return _taskq.empty(); }
    void HandlerTask(string name)
    {
        cout << "线程: " << name << ", 进入HandlerTask的逻辑";
        while (true)
        {
            // 1. 拿任务
            T t;
            {
                LockGuard lockguard(_lock);
                while (IsEmpty() && _isrunning)
                {
                    _wait_num++;
                    _cond.Wait(_lock);
                    _wait_num--;
                }
                // 2. 任务队列为空 && 线程池退出了
                if (IsEmpty() && !_isrunning)
                    break;
                t = _taskq.front();
                _taskq.pop();
            }
            // 2. 处理任务
            t(); // 规定,未来所有的任务处理,全部都是必须提供()方法!
        }
        cout << "线程: " << name << " 退出";
    }
    ThreadPool(const ThreadPool<T> &) = delete;
    ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;

    ThreadPool(int num = defaultnum) : _num(num), _wait_num(0), _isrunning(false)
    {
        for (int i = 0; i < _num; i++)
        {
            _threads.push_back(make_shared<Thread>(bind(&ThreadPool::HandlerTask, this, std::placeholders::_1)));
            cout << "构建线程" << _threads.back()->Name() << "对象 ... 成功";
        }
    }
public:
    static ThreadPool<T> *getInstance()
    {
        if (instance == NULL)
        {
            LockGuard lockguard(mutex);
            if (instance == NULL)
            {
               cout << "单例首次被执行,需要加载对象...";
                instance = new ThreadPool<T>();
                instance->Start();
            }
        }
        return instance;
    }

任务提交

    void Equeue(T &in)
    {
        LockGuard lockguard(_lock);
        if (!_isrunning)
            return;
        _taskq.push(in);
        if (_wait_num > 0)
            _cond.Notify();
    }

线程调度

    void Start()
    {
        if (_isrunning)
            return;
        _isrunning = true; 
        for (auto &thread_ptr : _threads)
        {
            cout << "启动线程" << thread_ptr->Name() << " ... 成功";
            thread_ptr->Start();
        }
    }

停止调度

    void Stop()
    {
        LockGuard lockguard(_lock);
        if (_isrunning)
        {
            _isrunning = false; // 不工作
            // 1. 让线程自己退出(要唤醒) && // 2. 历史的任务被处理完了
            if (_wait_num > 0)
                _cond.NotifyAll();
        }
    }

ThreadPool.hpp完整代码如下

#include <iostream>
#include <string>
#include <queue>
#include <vector>
#include <memory>
#include "Mutex.hpp"
#include "Cond.hpp"
#include "Thread.hpp"
using thread_t = std::shared_ptr<Thread>;
const static int defaultnum = 5;

template <class T>
class ThreadPool
{
private:
    bool IsEmpty() { return _taskq.empty(); }
    void HandlerTask(string name)
    {
        cout << "线程: " << name << ", 进入HandlerTask的逻辑";
        while (true)
        {
            // 1. 拿任务
            T t;
            {
                LockGuard lockguard(_lock);
                while (IsEmpty() && _isrunning)
                {
                    _wait_num++;
                    _cond.Wait(_lock);
                    _wait_num--;
                }
                // 2. 任务队列为空 && 线程池退出了
                if (IsEmpty() && !_isrunning)
                    break;
                t = _taskq.front();
                _taskq.pop();
            }
            // 2. 处理任务
            t(); // 规定,未来所有的任务处理,全部都是必须提供()方法!
        }
        cout << "线程: " << name << " 退出";
    }
    ThreadPool(const ThreadPool<T> &) = delete;
    ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;

    ThreadPool(int num = defaultnum) : _num(num), _wait_num(0), _isrunning(false)
    {
        for (int i = 0; i < _num; i++)
        {
            _threads.push_back(make_shared<Thread>(bind(&ThreadPool::HandlerTask, this, std::placeholders::_1)));
            cout << "构建线程" << _threads.back()->Name() << "对象 ... 成功";
        }
    }

public:
    static ThreadPool<T> *getInstance()
    {
        if (instance == NULL)
        {
            LockGuard lockguard(mutex);
            if (instance == NULL)
            {
               cout << "单例首次被执行,需要加载对象...";
                instance = new ThreadPool<T>();
                instance->Start();
            }
        }
        return instance;
    }

    void Equeue(T &in)
    {
        LockGuard lockguard(_lock);
        if (!_isrunning)
            return;
        _taskq.push(in);
        if (_wait_num > 0)
            _cond.Notify();
    }
    void Start()
    {
        if (_isrunning)
            return;
        _isrunning = true; 
        for (auto &thread_ptr : _threads)
        {
            cout << "启动线程" << thread_ptr->Name() << " ... 成功";
            thread_ptr->Start();
        }
    }
    void Wait()
    {
        for (auto &thread_ptr : _threads)
        {
            thread_ptr->Join();
            cout << "回收线程" << thread_ptr->Name() << " ... 成功";
        }
    }
    void Stop()
    {
        LockGuard lockguard(_lock);
        if (_isrunning)
        {
            _isrunning = false; // 不工作
            // 1. 让线程自己退出(要唤醒) && // 2. 历史的任务被处理完了
            if (_wait_num > 0)
                _cond.NotifyAll();
        }
    }

private:
    vector<thread_t> _threads;
    int _num;
    int _wait_num;
    std::queue<T> _taskq; // 临界资源

    Mutex _lock;
    Cond _cond;

    bool _isrunning;

    static ThreadPool<T> *instance;
    static Mutex mutex; // 只用来保护单例
};

template <class T>
ThreadPool<T> *ThreadPool<T>::instance = NULL;
template <class T>
Mutex ThreadPool<T>::mutex; // 只用来保护单例


http://www.kler.cn/a/454127.html

相关文章:

  • 简单讲解关于微信小程序调整 miniprogram 后, tabbar 找不到图片的原因之一
  • 使用sam进行零样本、零学习的分割实践
  • 谷歌浏览器 Chrome 提示:此扩展程序可能很快将不再受支持
  • 开发微信小程序的过程与心得
  • QT-【常用容器类】-QList类 QLinkedList类
  • 微信小程序中遇到过的问题
  • RocketMQ的集群架构是怎样的?
  • 深度学习中的并行策略概述:4 Tensor Parallelism
  • 【ES6复习笔记】模板字符串(3)
  • 运行Zr.Admin项目(前端)
  • C++软件设计模式之类型模式和对象型模式
  • PDF书籍《手写调用链监控APM系统-Java版》第9章 插件与链路的结合:Mysql插件实现
  • 基于 Python 大数据的拼团购物数据分析系统的设计与实现
  • 路由器的原理
  • Axure10
  • 【无标题】学生信息管理系统界面
  • GitLab安装及使用
  • C++打造局域网聊天室第十三课: 任务栏托盘功能的实现
  • [2003].第2-01节:关系型数据库表及SQL简介
  • [bug]java导出csv用Microsoft Office Excel打开乱码解决
  • 【K8S问题系列 | 20 】K8S如何删除异常对象(Pod、Namespace、PV、PVC)
  • 各种网站(学习资源、常用工具及其他,持续更新中~)
  • Python - 获取当前函数中的所有参数信息(名称和值)
  • 【Rust自学】7.3. 路径(Path)Pt.2:访问父级模块、pub关键字在结构体和枚举类型上的使用
  • Redis基础知识分享(含5种数据类型介绍+增删改查操作)
  • c#自定义事件