当前位置: 首页 > article >正文

Linux 手撕线程池

前言

线程池池化技术 中很典型的一个,它旨在高效的管理复用线程资源!在现在的计算机体系中,线程是执行任务(调度)的基本单位。然而,频繁的创建和销毁线程也会带来较大的开销,包括系统资源和性能等的下降!为了解决这个问题线程池应运而生!

目录

前言

1、线程池相关概念

1.1 池化技术

1.2 线程池

1.3 线程池的优点

1.4 线程池的应用场景

2、线程池的实现

2.1 封装Thread

• 全部源码 

2.2 线程池_V1版(朴素版)

• V1版全部源码

2.3 线程池_V2版 (优化版)

• 日志介绍

• 日志实现

• RAII 风格的锁 

• 日志全部源码

• 引入日志版全部源码

• V2版全部源码

3、单例模式

3.1 什么是设计模式

3.2 什么是单例模式

3.3 单例模式的特点

3.4 单例模式的实现

3.4.1 饿汉模式

3.4.2 懒汉模式

3.5 线程池_V3版(最终版)

• V3版全部源码

4、周边知识补充

4.1 STL 的线程问题

4.2 智能指针线程安全问题


1、线程池相关概念

1.1 池化技术

池化技术 是提前准备一些未来高频的使用资源放存一个容器中,当未来需要的时候直接从容器中获取以及重复利用!

池化技术 可以极大地提高任务调度时的性能。最典型的就是线程池,还常用于各种涉及网络相关的服务中,例如:MySQL连接池、HTTP连接池、Redis连接池等。除了线程池外还有 内存池,比如STL中的容器在申请空间时,都是直接从 空间配置器 allocator 中获取的!

说白了,就是我提前把你未来要的东西准备好,让你未来直接用减少了你未来的申请时间!所以,池化技术的本质就是 空间换时间

池化技术的优点

提高资源利用率:通过复用资源,减少了资源的浪费和不必要的开销。

提高系统性能:避免了频繁地向操作系统申请和释放资源的开销。

简化资源管理:通过集中管理资源池,简化了资源的管理和维护工作。

1.2 线程池

线程池 就是通过预先的创建并维护一定数量的线程,当有任务需要执行时,线程池会分配一个空闲的线程来处理该任务,而不是直接的创建一个新的,当任务执行完毕后,线程不会销毁而是回到线程池继续等待下一个任务!

1.3 线程池的优点

线程池也是池化技术的一种,所以池化技术的优点他都有!但最重要的两点是,1、高效 2、方便

高效体现在,线程会被合理的调度,可以确保 任务与线程 间做到负载均衡

方便体现在,线程在使用的时候已经创建好了,直接用即可,用完后也不需要销毁

当然也可以接上 生产消费者模型 来实现 解耦 操作

注意:线程池中的线程数量并不是越多越好,因为线程的数量过多会导致调度变得复杂化,具体创建多少线程取决于业务,比如 处理器内核、剩余内存、网络中的 socket 数量等

我们把 任务队列 换成 生产消费者模型

1.4 线程池的应用场景

1. 存在大量且短小的任务请求,比如 Web 服务器中的网页请求,使用 线程池 就非常合适,因为网页点击量众多,并且大多都没有长时间连接访问,你可以想象一个热门网站的点击次数!
2. 对性能要求苛刻,力求快速响应需求,比如游戏服务器,要求对玩家的操作做出快速响应
3. 突发大量请求,但不至于使服务器产生过多的线程,短时间内,在服务器创建大量线程会使得内存达到极限,造成出错,可以使用 线程池 规避问题

2、线程池的实现

我们这里实现三个版本的线程池;第一个版本不加任何的优化,第二个版本加上组件,第三个版本引入单例模式!

2.1 封装Thread

这里我们就不在直接使用pthread的原生接口了,我们对原生接口封装一个Thread类:

Thread类的设计

1、用户在创建时,直接给一个线程的执行任务函数+参数 

2、给用户提供 start、stop、和 join等接口方便用户操作

3、Thread 的成员应该有,线程的名字、tid、执行的函数、运行状态

OK,基于上述,我们先搭建一个框架出来:

#pragma once

#include <pthread.h>
#include <string>
#include <functional>

namespace ThreadModule
{
    template<typename T>
    using func_t = std::function<void(T&);// 外部想要让线程执行的函数

    template<typename T>
    class Thread
    {
    public:
        Thread(std::string name, func_t<T> func)
            :_name(name),_func(func),_is_running(false)
        {}

        // 启动线程
        bool start()
        {
            // ...
        }

        // 终止线程
        void stop()
        {
            // ...
        }

        // 等待线程
        bool join()
        {
            // ...
        }

        // 获取线程的名称
        std::string get_name()
        {
            return _name;
        }

        // 获取线程的状态
        std::string get_status()
        {
            return _is_running == true ? "start" : "stop";
        }

        ~Thread()
        {}    

    private:
        std::string _name;
        pthread_t _tid;
        func_t<T> _fun;
        bool _is_running;
    };
}

下面我们就来完善一下,上面的三个核心接口

start 接口的实现

在调用 start 时, start 内部去调用 pthread_create 去创建线程,但是pthread_create 需要有一个返回值void*参数void*的函数,这和用户的_func不符,在类里面实现一个函数是无法调用到的,原因是成员函数有一个,this指针!

所以我们选择在加一层,增加一个ThreadRoutine(返回值static void*)和 Extuce(返回值void),在调用pthread_create时传递ThreadRoutine,在它内部调用Excute,在Excute内部调用用户的_func!

// 启动线程
bool start()
{
    int n = pthread_create(&_tid, nullptr, ThreadRoutine, this);
    return n == 0 ? true : false;
}


private:
    void Excute()
    {
        _is_running = true;
        _func(_name);
        _is_running = false;
    }
    
    static void *ThreadRoutine(void *args)
    {
        Thread<T> *self = static_cast<Thread<T> *>(args);
        self->Excute();
        return nullptr;
    }

stop 接口的实现

判断当线程的状态为, 启动时,将线程调用pthread_cancle取消掉,然后将状态设为停止!

// 终止线程
void stop()
{
    if (_is_running)
    {
        pthread_cancel(_tid);
        _is_running = false;
    }
}

join 接口的实现

直接调用pthread_join即可

// 等待线程
void join()
{
    ::pthread_join(_tid, nullptr);
}

测试一下:

#include "Thread.hpp"
#include <iostream>
#include <unistd.h>
#include <vector>
using namespace ThreadModule;

void Task(std::string& name)
{
    while (true)
    {
        std::cout << name << ", is running..." << std::endl;
        sleep(1);
    }
}

const int num = 5;// 创建线程的数量

int main()
{
    std::vector<Thread<std::string>> tids;
    // 创建
    for(int i = 0; i < num; i++)
    {   
        std::string name = "thread_" + std::to_string(i+1);
        Thread<std::string>* t = new Thread<std::string>(name, Task);
        tids.emplace_back(name, Task);
    }
    sleep(2);

    // 启动
    for(auto& t : tids)
        t.start();
    sleep(10);

    // 终止
    for(auto& t : tids)
        t.stop();
    
    // 等待
    for(auto& t : tids)
        t.join();


    return 0;
}

• 全部源码 

#pragma once

#include <pthread.h>
#include <string>
#include <functional>

namespace ThreadModule
{
    template<typename T>
    using func_t = std::function<void(T&)>;// 外部想要让线程执行的函数

    template<typename T>
    class Thread
    {
    private:
        void Excute()
        {
            _is_running = true;
            _func(_name);
            _is_running = false;
        }

        static void* ThreadRoutine(void*args)
        {
            Thread<T>* self = static_cast<Thread<T>*>(args);
            self->Excute();
            return nullptr;
        }

    public:
        Thread(std::string name, func_t<T> func)
            :_name(name),_func(func),_is_running(false)
        {}

        // 启动线程
        bool start()
        {
            int n = pthread_create(&_tid, nullptr, ThreadRoutine, this);
            return n == 0 ? true : false;
        }

        // 终止线程
        void stop()
        {
            if(_is_running)
            {
                pthread_cancel(_tid);
                _is_running = false;
            }
        }

        // 等待线程
        void join()
        {
            ::pthread_join(_tid,nullptr);
        }

        // 获取线程的名称
        std::string get_name()
        {
            return _name;
        }

        // 获取线程的状态
        std::string get_status()
        {
            return _is_running == true ? "start" : "stop";
        }

        ~Thread()
        {}    

    private:
        std::string _name; // 线程名字
        pthread_t _tid; // 线程 tid
        func_t<T> _func; // 线程执行的任务
        bool _is_running; // 线程的运行状态
    };
}

2.2 线程池_V1版(朴素版)

朴素版的线程池,我们不加任何的中间组件!

实现思路:

1、线程池中包含属性:线程的数量(外部可指定),管理线程的容器、休眠线程的数量、任务队列、以及互斥锁和条件变量

2、外部使用时,直接调用相关的接口,向任务队列推送任务,线程池派休眠的线程去执行,然后外部可以终止线程池

3、为了使用方便,我们对互斥锁和条件变量进行简单的封装!

4、由于我们在创建线程池是需要向线程池中推送任务,我们可以将线程池写成模版,但是Thread已经是模版,有两种解决方法,1、搞两个模版参数 2、将Thread简化  这里为了代码的简化,我们选择后者!也就是将Thread的类模板去了

OK,基于上述,我们还是先搭建一个框架:

Thread.hpp

#pragma once

#include <pthread.h>
#include <string>
#include <functional>

namespace ThreadModule
{
    using func_t = std::function<void()>; // 外部想要让线程执行的函数

    class Thread
    {
    private:
        void Excute()
        {
            _is_running = true;
            _func();
            _is_running = false;
        }

        static void *ThreadRoutine(void *args)
        {
            Thread *self = static_cast<Thread *>(args);
            self->Excute();
            return nullptr;
        }

    public:
        Thread(std::string name, func_t func)
            : _name(name), _func(func), _is_running(false)
        {
        }

        // 启动线程
        bool start()
        {
            int n = pthread_create(&_tid, nullptr, ThreadRoutine, this);
            return n == 0 ? true : false;
        }

        // 终止线程
        void stop()
        {
            if (_is_running)
            {
                pthread_cancel(_tid);
                _is_running = false;
            }
        }

        // 等待线程
        void join()
        {
            ::pthread_join(_tid, nullptr);
        }

        // 获取线程的名称
        std::string get_name()
        {
            return _name;
        }

        // 获取线程的状态
        std::string get_status()
        {
            return _is_running == true ? "start" : "stop";
        }

        ~Thread()
        {
        }

    private:
        std::string _name; // 线程名字
        pthread_t _tid;    // 线程 tid
        func_t _func;   // 线程执行的任务
        bool _is_running;  // 线程的运行状态
    };
}

ThreadPool.hpp

#ifndef _M_T_P_
#define _M_T_P_

#include "Thread.hpp"
#include <pthread.h>
#include <vector>
#include <queue>
#include <iostream>
#include <unistd.h>

using namespace ThreadModule;

const static int g_default = 5;

template <class T>
class ThreadPool
{
private:
    // 给任务队列加锁
    void LockQueue()
    {
        pthread_mutex_lock(&_mutex);
    }
    // 给任务队列解锁
    void UnLockQueue()
    {
        pthread_mutex_unlock(&_mutex);
    }
    // 在 _cond 条件下阻塞等待
    void Sleep()
    {
        pthread_cond_wait(&_cond, &_mutex);
    }
    // 唤醒一个休眠的线程
    void WakeUp()
    {
        pthread_cond_signal(&_cond);
    }
    // 唤醒所有休眠的线程
    void WakeUpAll()
    {
        pthread_cond_broadcast(&_cond);
    }
    // 判断任务队列是否为空
    bool IsEmpty()
    {
        return _task_queue.empty();
    }
    // 处理任务
    void HandlerTask()
    {
        // ...
    }

public:
    ThreadPool(int thread_num = g_default)
        : _thread_num(thread_num), _sleep_thread_num(0), _is_running(false)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }

    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }

    void Init()
    {
    }

    void Start()
    {
    }

    void Stop()
    {
    }

    // 向任务队列推送任务
    void PushTask(T &task)
    {
    }

private:
    int _thread_num;              // 线程的数目
    std::vector<Thread> _threads; // 管理线程的容器
    std::queue<T> _task_queue;    // 任务队列
    int _sleep_thread_num;        // 休眠线程的数目
    bool _is_running;             // 线程池的状态
    pthread_mutex_t _mutex;       // 互斥锁
    pthread_cond_t _cond;         // 条件变量
};

#endif

接下来就实现这些接口:

init 接口实现

init 接口我们先创建一批Thread线程对象,放在管理线程的容器中,等未来启动的时候去调用线程池中的HandlerTask函数去处理函数!

我们暂时这里先不让HandlerTask去处理,这里有坑,我们先写一个全局的测试Test函数,测试一下是否跑的起来!

void Init()
{
    for(int i = 0; i < _thread_num; i++)
    {
        std::string threadname = "thread_"+std::to_string(i+1);
        _threads.emplace_back(threadname, test/*TODO*/);
    }
}

Start 接口实现

直接让线程池中的Thread对象调用他们自己的Start即可

void Start()
{
    for(auto& t : _threads)
    {
        t.start();
    }
}

我们先测试一下,当前的代码时候可以跑全局的test:

void test()
{
    while (true)
    {
        std::cout << "thread is running..." << std::endl;
        sleep(1);
    }
}

OK,没问题!当然这种方式不是我们想要的效果!我们想要的是:外部给任务队列推送一个任务,线程池选择一个休眠的线程去执行每个线程都是去执行HandlerTask,然后在它内部取出队列的任务并执行!但是当前的Thread对象需要一个返回值和参数都是void的函数,我们的HandlerTask多一个this指针,如何解决呢?这里不在搞成静态的了,直接上一种优雅的新玩法:使用std::bind将HandlerTask和this绑定然后构造出一种返回值void参数void的新函数类型:

修改后的 Init

void Init()
{
    func_t func = std::bind(&ThreadPool::HandlerTask, this);
    for(int i = 0; i < _thread_num; i++)
    {
        std::string threadname = "thread_"+std::to_string(i+1);
        _threads.emplace_back(threadname,func);
    }
}

PushTask 接口实现

因为任务队列将来被多线程访问所以是临界资源,所以得加锁保护!

加锁->推送任务->判断有休眠的线程唤醒->解锁

// 向任务队列推送任务
void PushTask(T &task)
{
    LockQueue();
    _task_queue.push(task);
    if(_sleep_thread_num > 0)// ?
    {
        WakeUp();
    }
    UnLockQueue();
}

HandlerTask 接口实现

线程池中的线程要重复的使用,所以这里应该是一个死循环!

每个线程进入临界区后如果没有任务就等待,并且将休眠线程数++,当醒来之后--

然后获取任务解锁!一定在临界区外边处理任务(处理任务本身也花费时间)~!

// 处理任务 -> 消费者
void HandlerTask()
{
    while (true)
    {
        LockQueue();
        // 任务队列为空
        // if(IsEmpty()) -> while 防止伪唤醒
        while(IsEmpty())// ? 存疑
        {
            _sleep_thread_num++;
            Sleep();// 阻塞等待
            _sleep_thread_num++;
        }
        // 获取任务
        T t = _task_queue.front();
        _task.pop();
        UnLockQueue();
        // 处理任务
        t(); //注意这里的处理任务不应该放在临界区因为处理任务也费时间
    }
}

为了演示,我们把我们以生产消费者写的那个Task类拿过来直接接入:

Task.hpp

#pragma once

#include <iostream>

class Task
{
public:
    Task(int x, int y)
        :_x(x),_y(y)
    {}

    Task(){}

    std::string debug()
    {
        return std::to_string(_x) + "+"+std::to_string(_y) +"=" +"?";
    }

    void Excute()
    {
        _result = _x + _y;
    }

    void operator()()
    {
        Excute();
    }

    std::string result()
    {
        std::string msg = std::to_string(_x) + "+"+std::to_string(_y) +"=" + std::to_string(_result);
        return msg;
    }
    
private:
    int _x;
    int _y;
    int _result;
};

Main.cc

#include "ThreadPool.hpp"
#include "Task.hpp"
#include <ctime>

int main()
{
    ThreadPool<Task>* tp = new ThreadPool<Task>();
    tp->Init();
    tp->Start();
    srand(time(nullptr));
    while (true)
    {
        int x = rand() % 10 + 1;
        usleep(100);// 降低x y 的重复率
        int y = rand() % 10 + 1;
        Task t(x,y);
        std::cout << t.debug() << std::endl;
        tp->PushTask(t);
        sleep(1);
    }

    return 0;
}

符合预期!

但是这样写我们不知道哪个线程执行的,所以我们改一下让线程执行时带上名字:

OK,我们look一眼效果:

Stop 接口的实现

我们期望在将线程池关掉时,想让它里面的所有任务都执行完!

具体做法就是,将_is_running设置为false;然后唤醒所有的线程去执行HandlerTask,执行完所有的任务完退出即可,

当线程池在启动状态且任务队列为空时,线程才去休眠,

当任务队列为空且线程池为关闭时,则解锁退出执行即可!

所以我们以前写的HandlerTask缺少了一个判断条件:

// 处理任务 -> 消费者
void HandlerTask(const std::string &name)
{
    while (true)
    {
        LockQueue();
        // 任务队列为空
        while (IsEmpty() && _is_running)
        {
            _sleep_thread_num++;
            Sleep(); // 阻塞等待
            _sleep_thread_num++;
        }
        // 如果任务队列为空 && 线程池的状态为 退出
        if (IsEmpty() && !_is_running)
        {
            UnLockQueue();
            break;
        }
        // 获取任务
        T t = _task_queue.front();
        _task_queue.pop();
        UnLockQueue();
        // 处理任务
        t(); // 注意这里的处理任务不应该放在临界区因为处理任务也费时间
        std::cout << name << ": " << t.result() << std::endl;
    }
}

由于Stop是基于_is_ruunning 的状态实现的,所以他也是临界资源得加锁!另外防止,在线程池关闭的时候还有线程向任务队列推送任务,所以也得限制一下,当线程池是启动状态的时候才允许推送!并且也需要把start的_is_running 进行保护!

OK,我们在测试一下:这次让他执行10次就终止:

没问题!这就是我们的朴素版的线程池实现!

• V1版全部源码

#ifndef _M_T_P_
#define _M_T_P_

#include "Thread.hpp"
#include <pthread.h>
#include <vector>
#include <queue>
#include <iostream>
#include <unistd.h>

using namespace ThreadModule;

const static int g_default = 5;

void test()
{
    while (true)
    {
        std::cout << "thread is running..." << std::endl;
        sleep(1);
    }
}

template <class T>
class ThreadPool
{
private:
    // 给任务队列加锁
    void LockQueue()
    {
        pthread_mutex_lock(&_mutex);
    }
    // 给任务队列解锁
    void UnLockQueue()
    {
        pthread_mutex_unlock(&_mutex);
    }
    // 在 _cond 条件下阻塞等待
    void Sleep()
    {
        pthread_cond_wait(&_cond, &_mutex);
    }
    // 唤醒一个休眠的线程
    void WakeUp()
    {
        pthread_cond_signal(&_cond);
    }
    // 唤醒所有休眠的线程
    void WakeUpAll()
    {
        pthread_cond_broadcast(&_cond);
    }
    // 判断任务队列是否为空
    bool IsEmpty()
    {
        return _task_queue.empty();
    }
    // 处理任务 -> 消费者
    void HandlerTask(const std::string &name)
    {
        while (true)
        {
            LockQueue();
            // 任务队列为空
            while (IsEmpty() && _is_running)
            {
                _sleep_thread_num++;
                Sleep(); // 阻塞等待
                _sleep_thread_num++;
            }
            // 如果任务队列为空 && 线程池的状态为 退出
            if (IsEmpty() && !_is_running)
            {
                UnLockQueue();
                break;
            }
            // 获取任务
            T t = _task_queue.front();
            _task_queue.pop();
            UnLockQueue();
            // 处理任务
            t(); // 注意这里的处理任务不应该放在临界区因为处理任务也费时间
            std::cout << name << ": " << t.result() << std::endl;
        }
    }

public:
    ThreadPool(int thread_num = g_default)
        : _thread_num(thread_num), _sleep_thread_num(0), _is_running(false)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }

    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }

    void Init()
    {
        func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);
        for (int i = 0; i < _thread_num; i++)
        {
            std::string threadname = "thread_" + std::to_string(i + 1);
            _threads.emplace_back(threadname, func);
        }
    }

    void Start()
    {
        LockQueue();
        _is_running = true;
        for (auto &t : _threads)
        {
            t.start();
        }
        UnLockQueue();
    }

    void Stop()
    {
        LockQueue();
        _is_running = false;
        WakeUpAll();
        UnLockQueue();
    }

    // 向任务队列推送任务 -> 生产者
    void PushTask(T &task)
    {
        LockQueue();
        // 当线程池是启动的时候才允许推送任务
        if (_is_running)
        {
            _task_queue.push(task);
            if (_sleep_thread_num > 0)
            {
                WakeUp();
            }
        }
        UnLockQueue();
    }

private:
    int _thread_num;              // 线程的数目
    std::vector<Thread> _threads; // 管理线程的容器
    std::queue<T> _task_queue;    // 任务队列
    int _sleep_thread_num;        // 休眠线程的数目
    bool _is_running;             // 线程池的状态
    pthread_mutex_t _mutex;       // 互斥锁
    pthread_cond_t _cond;         // 条件变量
};

#endif

2.3 线程池_V2版 (优化版)

V2版的线程池要在V1版本的基础上加上 日志 和 以前的 RAII 风格的半自动加锁组件 并 将任务队列用我们自己的阻塞队列

关于 RAII 风格的半自动化加锁组件,我们不在介绍,在生产消费模型已经介绍过了!这里直接拿过来用到 日志 以及合适的地方即可!这里重点介绍一下 日志 以及实现一个简易的日志组件!

• 日志介绍

日志 是一种记录事件或信息的文件或数据集合,通常用来跟踪/记录程序或系统在运行时的状态、错误、事务处理等信息日志 的应用十分广泛,包括但不限于 软件开发、系统运维、大数据分析等!

日志通常包含如下内容:

1、时间

2、日志等级(后面实现会介绍)

3、日志来源(例如哪个文件的第几行)

4、附加信息(比如相关操作干了啥)

日志的作用

1、调试和问题排查

2、监控和性能优化

3、历史记录与溯源

4、....

说白了,日志就是记录程序运行时的状态信息,等有问题了可以利用它的记录信息快速定位问题并解决!日志的简单介绍就到这里,下面我们手搓一个简单的日志~!

• 日志实现

我们这里实现一个简单的日志组件,我们想要实现的日志格式如下:

这里首先就是日志等级,所以我们把上面没有说的日志等级先给介绍了:

日志等级 表示日志信息的严重程度或重要性

常见的日志等级包括:

1、DEBUG 表示调试信息

2、INFO 表示一般信息

3、WARNING 表示警告信息

4、ERROR 表示错误信息

5、FATAL 表示严重的错误

有了日志等级我们就可以着手写我们的小组件了,关于如何获取行号和当前文件我们后面是现实会介绍,这里我们先根据我们的格式搭建一个框架出来:

我们未来的日志想要的效果是:用户创建日志对象调用日志操作接口就也可以使用日志了

,另外我想将日志的操作信息设置成可变的,也就是设置为 三个点,未来可传递多个

namespace LogModule
{
#define FLUSH_SCREEN 1
#define FLUSH_FILE 2

    const std::string g_file = "./log.txt";              // 默认的日志文件


    // 日志的等地枚举
    enum
    {
        DEBUG = 1, // 调试
        INFO,      // 正常消息
        WARNING,   // 警告
        ERROR,     // 错误
        FATAL      // 致命错误
    };

    // 描述日志的属性
    struct log_message
    {
        int _level;             // 日志登记
        pid_t _id;              // 当前进程的pid
        std::string _file_name; // 文件名称
        int _file_line;         // 在文件的第几行
        std::string _curr_time; // 当前的时间
        std::string _log_msg;   // 日志信息
    };

    class Log
    {
    public:
        // 默认的日志文件是全局的log.txt 默认的刷新方式是 向屏幕刷新
        Log(const std::string file = g_file)
            : _type(FLUSH_SCREEN), _file(file)
        {
        }

        ~Log()
        {
        }

        // 构建日志信息
        void LogMessage(int level, const std::string &file_name, int file_line, const char *fomat, ...)
        {
            // ...
        }

        // 指定刷新的类型
        void Enable(int type)
        {
            _type = type;
        }

    private:
        int _type;         // 日志刷新类型
        std::string _file; // 刷新日志的文件
    };
}

构建日志信息其实就是将日志的属性和操作信息整合成我们的上述格式,然后刷新到指定的文件即可!所以我们在LogMessage中先得搞一个log_message对象,这些参数就是该对象的属性!

现在有两个棘手的问题:

1、如何获取当前的时间?

1、我们可以先通过time函数获取一个时间戳

2、然后再使用localhost函数获取一个struct tm* 的对象,通过这个对象就可以获取时间了

struct tm 结构体原型

struct tm
{
    int tm_sec;   /* Seconds (0-60) */
    int tm_min;   /* Minutes (0-59) */
    int tm_hour;  /* Hours (0-23) */
    int tm_mday;  /* Day of the month (1-31) */
    int tm_mon;   /* Month (0-11) */
    int tm_year;  /* Year - 1900 */
    int tm_wday;  /* Day of the week (0-6, Sunday = 0) */
    int tm_yday;  /* Day in the year (0-365, 1 Jan = 0) */
    int tm_isdst; /* Daylight saving time */
};

可以通过:

tm_year  获取年、tm_mon获取月、tm_mday获取天

tm_hour  获取时、tm_min获取分、 tm_sec获取秒

注意:

tm_year的值是减了1900所以为了时间正确我们得自己加上1900

tm_mon 是0-11的范围,我们也得就加一保证正确

std::string GetCurrTime()
{
    time_t now = time(nullptr);
    struct tm *t = localtime(&now);
    char buffer[128];
    // 这里为了保证例如:9月显示09我们将域宽设置为2位,不够两位用0填充
    snprintf(buffer, sizeof(buffer), "%d-%02d-%02d %02d:%02d:%02d",
                t->tm_year + 1900,
                t->tm_mon + 1,
                t->tm_mday,
                t->tm_hour,
                t->tm_min,
                t->tm_sec);

    return buffer;
}

2、如何获取可变的参数包

我们可以用预处理操作 va_list、va_start,va_end配合 vsnprintf 函数将可变的参数提取到,一个缓冲区里面!最后将他构造给_lg_msg!

void LogMessage(int level, const std::string &file_name, int file_line, const char *fomat, ...)
{
    log_message lg;
    lg._level = level;
    lg._id = getpid();
    lg._file_name = file_name;
    lg._file_line = file_line;
    lg._curr_time = GetCurrTime();

    va_list ap;
    va_start(ap, fomat);
    char buffer[128];
    vsnprintf(buffer, sizeof(buffer), fomat, ap);
    va_end(ap);
    lg._log_msg = buffer;
}

关于可变的参数的获取,这里的va_list定义一个ap就是定义一个char*的指针,由于传参的时候是在压栈在两栈帧的中间的,而可以通过va_start将fomat指向的赋值给ap,此时ap不就指向参数了,然后vsnprintf就是将通过特定的方式(指针偏移)获取到了!如果有需要请自行搜索预处理相关的操作,这里不再介绍~!

3、日志等级是int如何转成字符串

这个没有技巧,直接硬转即可!

std::string LevelToString(int level)
{
    switch (level)
    {
    case DEBUG:
        return "DEBUG";
    case INFO:
        return "INFO";
    case WARNING:
        return "WARNING";
    case ERROR:
        return "ERROR";
    case FATAL:
        return "FATAL";
    default:
        return "UNKNOWN";
    }
}

我们先来手动的传递行号和文件,测试一下当前的逻辑:

void LogMessage(int level, const std::string &file_name, int file_line, const char *fomat, ...)
{
    log_message lg;
    lg._level = level;
    lg._id = getpid();
    lg._file_name = file_name;
    lg._file_line = file_line;
    lg._curr_time = GetCurrTime();

    va_list ap;
    va_start(ap, fomat);
    char buffer[128];
    vsnprintf(buffer, sizeof(buffer), fomat, ap);
    va_end(ap);
    lg._log_msg = buffer;

    // 测试
    printf("[%s][%d][%s][%d][%s] %s",
            LevelToString(lg._level).c_str(),
            lg._id,
            lg._file_name.c_str(),
            lg._file_line,
            lg._curr_time.c_str(),
            lg._log_msg.c_str());
}
#include "Log.hpp"
using namespace LogModule;

int main()
{
    Log lg;
    lg.LogMessage(DEBUG, "main", 7, "hello %ld, i am hero %d\n", 6.66, 666);
    sleep(1);
    lg.LogMessage(DEBUG, "main", 9, "hello %ld, i am hero %d\n", 6.66, 666);
    sleep(1);
    lg.LogMessage(DEBUG, "main", 11, "hello %ld, i am hero %d\n", 6.66, 666);
    sleep(1);
    lg.LogMessage(DEBUG, "main", 13, "hello %ld, i am hero %d\n", 6.66, 666);
    sleep(1);

    return 0;
}

OK,没有问题!下面我们加入刷新日志的逻辑:

刷新日志有两种:1、向显示器刷 2、向文件刷新(其实两种本质都是向文件,因为显示器也是文件!)

1、向显示器刷新,就直接使用printf输出即可

2、向文件刷新,打开文件,因为要控制格式所以就先写到一个缓冲区,最后将整个缓冲区写入文件即可!

void FlushToScreen(const log_message &lg)
{
    printf("[%s][%d][%s][%d][%s] %s",
            LevelToString(lg._level).c_str(),
            lg._id,
            lg._file_name.c_str(),
            lg._file_line,
            lg._curr_time.c_str(),
            lg._log_msg.c_str());
}

void FlushToFile(const log_message &lg)
{
    std::ofstream out(_file, std::ios::app);
    if (!out.is_open())
        return;
    char buffer[1024];
    snprintf(buffer, sizeof(buffer), "[%s][%d][%s][%d][%s] %s",
                LevelToString(lg._level).c_str(),
                lg._id,
                lg._file_name.c_str(),
                lg._file_line,
                lg._curr_time.c_str(),
                lg._log_msg.c_str());
    out << buffer;
    out.close();
}

void FlushLog(const log_message &lg)
{
    // 将来多执行流访问时,防止其他执行流修改_type所以将他保护
    LockGuard lock(&g_mutex);
    switch (_type)
    {
    case FLUSH_SCREEN:
        FlushToScreen(lg);
        break;
    case FLUSH_FILE:
        FlushToFile(lg);
        break;
    }
}

• RAII 风格的锁 

另外刷新日志未来可能是多个执行流并发访问的,有的向往显示器刷新,有的向往文件写,这就势必可能导致多个执行流对同一个_type修改,这可能有问题,为了避免这种问题,我们所以在刷新时加锁!

这里的枷锁,我们也不直接使用原生接口了,我们使用RAII风格的锁,我们这里实现一下这个小的组件:

#ifndef __LG__
#define __LG__
#include <pthread.h>

class LockGuard
{
public:
    LockGuard(pthread_mutex_t* mutex)
        :_mutex(mutex)
    {
        pthread_mutex_lock(_mutex);
    }

    ~LockGuard()
    {
        pthread_mutex_unlock(_mutex);
    }

private:
    pthread_mutex_t* _mutex;
};

#endif

OK,测试一下:

OK,没有问题~!上面的日志虽然已经可以了,但是我们不够优雅,我们想要的效果是:未来只要有人引用了我的头文件,就可以用,且不用我传递文件名和行号!可以使用来改善,可以写几个全局的宏,未来是直接使用宏即可!

我们可以定义一个全局的Log的对象,且让外面的用户只需要传递日志等级和操作信息即可,关于文件名可以使用预处理操作符 __FILE__获取,行号由 __LINE__获取,我们未来可能信息里面只有一句话,没有格式%d什么的格式控制,所以我们在__VA_ARGS__前面加上##代表的是当后面没有格式控制的时候将前面的 给去掉

#define LOG(Level, Fomat, ...)                                           \
    do                                                                   \
    {                                                                    \
        log.LogMessage(Level, __FILE__, __LINE__, Fomat, ##__VA_ARGS__); \
    } while (0)

为了方便用户指定刷新日志的位置,所以我们也提供两个宏!

#define ENABLE_SCREEN()           \
    do                            \
    {                             \
        log.Enable(FLUSH_SCREEN); \
    } while (0)

#define ENABLE_FILE()           \
    do                          \
    {                           \
        log.Enable(FLUSH_FILE); \
    } while (0)

OK,没有问题!

• 日志全部源码

#pragma once
#include <string>
#include <unistd.h>
#include <ctime>
#include <cstdarg>
#include <fstream>
#include "LockGuard.hpp"

namespace LogModule
{
#define FLUSH_SCREEN 1
#define FLUSH_FILE 2

    const std::string g_file = "./log.txt";              // 默认的日志文件
    pthread_mutex_t g_mutex = PTHREAD_MUTEX_INITIALIZER; // 定义一把全局的互斥锁

    // 日志的等地枚举
    enum
    {
        DEBUG = 1, // 调试
        INFO,      // 正常消息
        WARNING,   // 警告
        ERROR,     // 错误
        FATAL      // 致命错误
    };

    // 描述日志的属性
    struct log_message
    {
        int _level;             // 日志登记
        pid_t _id;              // 当前进程的pid
        std::string _file_name; // 文件名称
        int _file_line;         // 在文件的第几行
        std::string _curr_time; // 当前的时间
        std::string _log_msg;   // 日志信息
    };

    class Log
    {
    private:
        std::string GetCurrTime()
        {
            time_t now = time(nullptr);
            struct tm *t = localtime(&now);
            char buffer[128];
            snprintf(buffer, sizeof(buffer), "%d-%02d-%02d %02d:%02d:%02d",
                     t->tm_year + 1900,
                     t->tm_mon + 1,
                     t->tm_mday,
                     t->tm_hour,
                     t->tm_min,
                     t->tm_sec);

            return buffer;
        }

        std::string LevelToString(int level)
        {
            switch (level)
            {
            case DEBUG:
                return "DEBUG";
            case INFO:
                return "INFO";
            case WARNING:
                return "WARNING";
            case ERROR:
                return "ERROR";
            case FATAL:
                return "FATAL";
            default:
                return "UNKNOWN";
            }
        }

    void FlushToScreen(const log_message &lg)
    {
        printf("[%s][%d][%s][%d][%s] %s",
                LevelToString(lg._level).c_str(),
                lg._id,
                lg._file_name.c_str(),
                lg._file_line,
                lg._curr_time.c_str(),
                lg._log_msg.c_str());
    }

    void FlushToFile(const log_message &lg)
    {
        std::ofstream out(_file, std::ios::app);
        if (!out.is_open())
            return;
        char buffer[1024];
        snprintf(buffer, sizeof(buffer), "[%s][%d][%s][%d][%s] %s",
                    LevelToString(lg._level).c_str(),
                    lg._id,
                    lg._file_name.c_str(),
                    lg._file_line,
                    lg._curr_time.c_str(),
                    lg._log_msg.c_str());
        out << buffer;
        out.close();
    }

    void FlushLog(const log_message &lg)
    {
        // 将来多执行流访问时,防止其他执行流修改_type所以将他保护
        LockGuard lock(&g_mutex);
        switch (_type)
        {
        case FLUSH_SCREEN:
            FlushToScreen(lg);
            break;
        case FLUSH_FILE:
            FlushToFile(lg);
            break;
        }
    }

    public:
        // 默认的日志文件是全局的log.txt 默认的刷新方式是 向屏幕刷新
        Log(const std::string file = g_file)
            : _type(FLUSH_SCREEN), _file(file)
        {
        }

        ~Log()
        {
        }

        void LogMessage(int level, const std::string &file_name, int file_line, const char *fomat, ...)
        {
            log_message lg;
            lg._level = level;
            lg._id = getpid();
            lg._file_name = file_name;
            lg._file_line = file_line;
            lg._curr_time = GetCurrTime();

            va_list ap;
            va_start(ap, fomat);
            char buffer[128];
            vsnprintf(buffer, sizeof(buffer), fomat, ap);
            va_end(ap);
            lg._log_msg = buffer;

            // 刷新日志
            FlushLog(lg);
        }

        void Enable(int type)
        {
            _type = type;
        }

    private:
        int _type;         // 日志刷新类型
        std::string _file; // 刷新日志的文件
    };

    Log log;

#define LOG(Level, Fomat, ...)                                           \
    do                                                                   \
    {                                                                    \
        log.LogMessage(Level, __FILE__, __LINE__, Fomat, ##__VA_ARGS__); \
    } while (0)

#define ENABLE_SCREEN()           \
    do                            \
    {                             \
        log.Enable(FLUSH_SCREEN); \
    } while (0)

#define ENABLE_FILE()           \
    do                          \
    {                           \
        log.Enable(FLUSH_FILE); \
    } while (0)
} // namespace LogMoudle

下面我们就将上述的日志接入到我们前面的线程池当中,用它来充当打印和记录程序信息:

• 引入日志版全部源码

这里就是直接包一个头文件,哪里用哪里直接LOG用就可以了~!

#ifndef _M_T_P_
#define _M_T_P_

#include "Thread.hpp"
#include "Log.hpp"
#include <pthread.h>
#include <vector>
#include <queue>
#include <iostream>
#include <unistd.h>

using namespace ThreadModule;
using namespace LogModule;

const static int g_default = 5;

void test()
{
    while (true)
    {
        std::cout << "thread is running..." << std::endl;
        sleep(1);
    }
}

template <class T>
class ThreadPool
{
private:
    // 给任务队列加锁
    void LockQueue()
    {
        pthread_mutex_lock(&_mutex);
    }
    // 给任务队列解锁
    void UnLockQueue()
    {
        pthread_mutex_unlock(&_mutex);
    }
    // 在 _cond 条件下阻塞等待
    void Sleep()
    {
        pthread_cond_wait(&_cond, &_mutex);
    }
    // 唤醒一个休眠的线程
    void WakeUp()
    {
        pthread_cond_signal(&_cond);
    }
    // 唤醒所有休眠的线程
    void WakeUpAll()
    {
        pthread_cond_broadcast(&_cond);
    }
    // 判断任务队列是否为空
    bool IsEmpty()
    {
        return _task_queue.empty();
    }
    // 处理任务 -> 消费者
    void HandlerTask(const std::string &name)
    {
        while (true)
        {
            LockQueue();
            // 任务队列为空
            while (IsEmpty() && _is_running)
            {
                LOG(INFO,"%s sleep begin\n", name.c_str());
                _sleep_thread_num++;
                Sleep(); // 阻塞等待
                _sleep_thread_num++;
                LOG(INFO,"%s wake up\n", name.c_str());
            }
            // 如果任务队列为空 && 线程池的状态为 退出
            if (IsEmpty() && !_is_running)
            {
                UnLockQueue();
                LOG(INFO,"%s quit...\n", name.c_str());
                break;
            }
            // 获取任务
            T t = _task_queue.front();
            _task_queue.pop();
            UnLockQueue();
            // 处理任务
            t(); // 注意这里的处理任务不应该放在临界区因为处理任务也费时间
            //std::cout << name << ": " << t.result() << std::endl;
            LOG(DEBUG,"%s handler task: %s\n", name.c_str(), t.result().c_str());
        }
    }

public:
    ThreadPool(int thread_num = g_default)
        : _thread_num(thread_num), _sleep_thread_num(0), _is_running(false)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }

    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }

    void Init()
    {
        func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);
        for (int i = 0; i < _thread_num; i++)
        {
            std::string threadname = "thread_" + std::to_string(i + 1);
            _threads.emplace_back(threadname, func);
            LOG(INFO, "%s is init success!\n", threadname.c_str());
        }
    }

    void Start()
    {
        LockQueue();

        _is_running = true;
        UnLockQueue();
        for (auto &t : _threads)
        {
            t.start();
            LOG(INFO, "%s is start...\n", t.get_name().c_str());
        }
    }

    void Stop()
    {
        LockQueue();
        LOG(INFO, "threadpool is stop...\n");
        _is_running = false;
        WakeUpAll();
        UnLockQueue();
    }

    // 向任务队列推送任务 -> 生产者
    void PushTask(T &task)
    {
        LockQueue();
        // 当线程池是启动的时候才允许推送任务
        if (_is_running)
        {
            _task_queue.push(task);
            if (_sleep_thread_num > 0)
            {
                WakeUp();
            }
        }
        UnLockQueue();
    }

private:
    int _thread_num;              // 线程的数目
    std::vector<Thread> _threads; // 管理线程的容器
    std::queue<T> _task_queue;    // 任务队列
    int _sleep_thread_num;        // 休眠线程的数目
    bool _is_running;             // 线程池的状态
    pthread_mutex_t _mutex;       // 互斥锁
    pthread_cond_t _cond;         // 条件变量
};

#endif

测试一下:

#include "ThreadPool.hpp"
#include "Task.hpp"
#include <ctime>

int main()
{
    ThreadPool<Task>* tp = new ThreadPool<Task>();
    tp->Init();
    tp->Start();
    srand(time(nullptr));
    int cnt = 10;
    while (cnt--)
    {
        int x = rand() % 10 + 1;
        usleep(100);// 降低x y 的重复率
        int y = rand() % 10 + 1;
        Task t(x,y);
        //std::cout << t.debug() << std::endl;
        LOG(INFO, "Main thread push a task: %s\n", t.debug().c_str());
        tp->PushTask(t);
        sleep(1);
    }

    tp->Stop();
    sleep(2);
    return 0;
}

OK 加入日之后没有问题,我们下面就进行加入最后一个组件:阻塞队列

• V2版全部源码

BlockQueue.hpp

#pragma once

#include <pthread.h>
#include <queue>

const static int default_cap = 5;

template <class T>
class BlockingQueue
{
public:
    // 判断阻塞队列是否为空
    bool IsEmpty()
    {
        return _block_queue.empty();
    }

    // 判断阻塞队列是否为满
    bool IsFull()
    {
        return _max_cap == _block_queue.size();
    }

public:
    // 构造
    BlockingQueue(int cap = default_cap)
        : _max_cap(cap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_c_cond, nullptr);
        pthread_cond_init(&_p_cond, nullptr);
    }

    // 析构
    ~BlockingQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_c_cond);
        pthread_cond_destroy(&_p_cond);
    }

    // 生产者 生产(入队)
    void Push(const T &in)
    {
        // 加锁
        pthread_mutex_lock(&_mutex);
        // 判断是否为满
        while (IsFull()) // if ?
        {
            // 在生产者的条件下等待
            pthread_cond_wait(&_p_cond, &_mutex);
        }
        // 1、不为满 || 2、重新竞争到锁了
        _block_queue.push(in);
        // 解锁
        pthread_mutex_unlock(&_mutex);
        // 唤醒阻塞的消费者
        pthread_cond_signal(&_c_cond);
    }

    // 消费者 消费(出队)
    void Pop(T *out)
    {
        // 加锁
        pthread_mutex_lock(&_mutex);
        // 判断是否为空
        while (IsEmpty()) // if ?
        {
            // 在消费者的条件下等待
            pthread_cond_wait(&_c_cond, &_mutex);
        }
        // 1、不为空 || 2、重新竞争到锁了
        *out = _block_queue.front();
        _block_queue.pop();
        // 解锁
        pthread_mutex_unlock(&_mutex);
        // 唤醒阻塞的生产者
        pthread_cond_signal(&_p_cond);
    }

private:
    std::queue<T> _block_queue;
    int _max_cap;           // 阻塞队列的容量
    pthread_mutex_t _mutex; // 互斥锁
    pthread_cond_t _c_cond; // 消费者条件变量
    pthread_cond_t _p_cond; // 生产者条件变量
};

优化版线程池:ThreadPool.hpp

#ifndef _M_T_P_
#define _M_T_P_

#include "Thread.hpp"
#include "Log.hpp"
#include "BlockingQueue.hpp"
#include <pthread.h>
#include <vector>
#include <queue>
#include <iostream>
#include <unistd.h>

using namespace ThreadModule;
using namespace LogModule;

const static int g_default = 5;

void test()
{
    while (true)
    {
        std::cout << "thread is running..." << std::endl;
        sleep(1);
    }
}

template <class T>
class ThreadPool
{
private:
    // 给任务队列加锁
    void LockQueue()
    {
        pthread_mutex_lock(&_mutex);
    }
    // 给任务队列解锁
    void UnLockQueue()
    {
        pthread_mutex_unlock(&_mutex);
    }
    // 在 _cond 条件下阻塞等待
    void Sleep()
    {
        pthread_cond_wait(&_cond, &_mutex);
    }
    // 唤醒一个休眠的线程
    void WakeUp()
    {
        pthread_cond_signal(&_cond);
    }
    // 唤醒所有休眠的线程
    void WakeUpAll()
    {
        pthread_cond_broadcast(&_cond);
    }
    // 判断任务队列是否为空
    bool IsEmpty()
    {
        // return _task_queue.empty();
        return _task_queue.IsEmpty();
    }
    // 处理任务 -> 消费者
    void HandlerTask(const std::string &name)
    {
        while (true)
        {
            LockQueue();
            // 任务队列为空
            while (IsEmpty() && _is_running)
            {
                LOG(INFO, "%s sleep begin\n", name.c_str());
                _sleep_thread_num++;
                Sleep(); // 阻塞等待
                _sleep_thread_num++;
                LOG(INFO, "%s wake up\n", name.c_str());
            }
            // 如果任务队列为空 && 线程池的状态为 退出
            if (IsEmpty() && !_is_running)
            {
                UnLockQueue();
                LOG(INFO, "%s quit...\n", name.c_str());
                break;
            }
            // 获取任务
            // T t = _task_queue.front();
            // _task_queue.pop();
            T t;
            _task_queue.Pop(&t);
            UnLockQueue();
            // 处理任务
            t(); // 注意这里的处理任务不应该放在临界区因为处理任务也费时间
            // std::cout << name << ": " << t.result() << std::endl;
            LOG(DEBUG, "%s handler task: %s\n", name.c_str(), t.result().c_str());
        }
    }

public:
    ThreadPool(int thread_num = g_default)
        : _thread_num(thread_num), _sleep_thread_num(0), _is_running(false)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }

    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }

    void Init()
    {
        func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);
        for (int i = 0; i < _thread_num; i++)
        {
            std::string threadname = "thread_" + std::to_string(i + 1);
            _threads.emplace_back(threadname, func);
            LOG(INFO, "%s is init success!\n", threadname.c_str());
        }
    }

    void Start()
    {
        LockQueue();

        _is_running = true;
        UnLockQueue();
        for (auto &t : _threads)
        {
            t.start();
            LOG(INFO, "%s is start...\n", t.get_name().c_str());
        }
    }

    void Stop()
    {
        LockQueue();
        LOG(INFO, "threadpool is stop...\n");
        _is_running = false;
        WakeUpAll();
        UnLockQueue();
    }

    // 向任务队列推送任务 -> 生产者
    void PushTask(T &task)
    {
        LockQueue();
        // 当线程池是启动的时候才允许推送任务
        if (_is_running)
        {
            _task_queue.Push(task);
            if (_sleep_thread_num > 0)
            {
                WakeUp();
            }
        }
        UnLockQueue();
    }

private:
    int _thread_num;              // 线程的数目
    std::vector<Thread> _threads; // 管理线程的容器
    // std::queue<T> _task_queue;    // 任务队列
    BlockingQueue<T> _task_queue; // 阻塞队列
    int _sleep_thread_num;        // 休眠线程的数目
    bool _is_running;             // 线程池的状态
    pthread_mutex_t _mutex;       // 互斥锁
    pthread_cond_t _cond;         // 条件变量
};

#endif

测试一下:

OK,至此我们V2版本的线程池就实现完毕了!下面我们来介绍最终版本即引入单例模式的版本~!再来实现最终版之前必须得做的一件事情就是介绍单例设计模式!

3、单例模式

3.1 什么是设计模式

IT 行业这么火, 涌入的人很多,俗话说林子大了啥鸟都有。大佬和菜鸡们两极分化的越
来越严重,为了让菜鸡们不太拖大佬的后腿, 于是大佬们针对一些经典的常见的场景, 给
定了一些对应的解决方案, 这个就是 设计模式
简单来说,设计模式就是经典的编程套路,常见的设计模式有:单例模式工厂模式建造者模式代理模式等!

3.2 什么是单例模式

代码构建类,类实例化出对象,这个实例化出的对象也可以称为 实例,比如常见的 STL 容器,在使用时,都是先根据库中的类,形成一个 实例 以供使用;正常情况下,一个类可以实例化出很多很多个对象,但对于某些场景来说,是不适合创建出多个对象的

比如本文中提到的 线程池,当程序运行后,仅需一个 线程池对象 来进行高效任务计算,因为多个 线程池对象 无疑会大大增加调度成本,因此需要对 线程池类 进行特殊设计,任何时刻使其只能创建一个 对象,换句话说就是我创建了就不能让别人再创建对象!

正如 一山不容二虎 一样,线程池 对象在一个程序中是不推荐出现多个的

在一个程序中只允许实例化出一个对象,可以通过 单例模式 来实现,单例模式 是非常 经典、常用、常考 的设计模式

3.3 单例模式的特点

单例的最大特点就是:在任何时刻只允许有一个实例对象,这就很像我国的 一婚一妻制 

在很多的服务器开发场景中,经常需要让服务器加载很多的数据(上百G)到内存中,此时往往要用一个单例的类来管理这些数据~!在我们当前的场景中就需要一个单例的线程池来管理这些线程!

3.4 单例模式的实现

单例模式 有两种实现的方式: 饿汉式 懒汉式!他们避免类被再次创建对象的方式都是一样的:构造函数私有化、删除(禁用)赋值拷贝 和 拷贝构造

只有外面无法获取构造函数,那也就无法构建对象了,比如下面的这个类:

class Test
{
private:
	// 构造私有化
	Test()
	{}
	// 删除/禁用 拷贝后早和赋值拷贝
	Test(const Test& t) = delete;
	Test operator=(const Test& t) = delete;
};

此时外界一定是创建不了对象的!但是现在的问题来了,你不是说单例模式任何时候只有一个对象的,但是我现在在外面一个对象也创不了了!这可咋办,这还把人古住了?别着急,我们的单例到这里只是实现了一半,另一半就是如何保证只创建一个对象!既然类外面创建不了对象,那里面是不是可以啊!所以,另一半就是:在内里面创建一个静态的对象指针或者静态的对象,在初始化就好了!因为这个对象在外面获取,所以我们还是得提供一个静态的函数 getInstance() 获取这个单例对象的句柄:

class Test
{
private:
	// 构造私有化
	Test()
	{}
	// 删除/禁用 拷贝后早和赋值拷贝
	Test(const Test& t) = delete;
	Test& operator=(const Test& t) = delete;
public:
	// 静态函数获取句柄
	static Test* getInStance()
	{
		return _test;
	}

private:
	// 指向单例对象的静态指针
	static Test* _test;
};

为什么删除拷贝构造和赋值拷贝?

如果不删除拷贝构造/赋值拷贝,那么外部利用句柄获之后就可以通过拷贝构造/赋值拷贝,拷贝出一个一模一样的,这不符合单例模式

为什么需要创建一个静态的函数?

单例模式至少需要一个对象,外部需要获取并使用!

整体调用:外部获取句柄->通过句柄调用该类的其他函数!

至于如何获取这个对象,具体实现得看是饿汉式还是懒汉式了! 

3.4.1 饿汉模式

你的舍友张三驴很饿,尽管菜还没有上来,他已经早早的把洗干净,等待开饭,开饭时直接干:

饿汉模式 正是如此,在程序加载到内存时,就已经早早的把单例对象创建好了(此时程序服务还没有完全启动),也就是在类外直接 new 个实例对象,具体如下:

namespace cp
{
	class Test
	{
	private:
		// 构造私有化
		Test()
		{}
		// 删除/禁用 拷贝后早和赋值拷贝
		Test(const Test& t) = delete;
		Test& operator=(const Test& t) = delete;
	public:
		// 静态函数获取句柄
		static Test* getInStance()
		{
			return _test;
		}

		void print()
		{
			std::cout << "hello world" << std::endl;
		}
	private:
		// 只想单例对象的静态指针
		static Test* _test;
	};

	// 饿汉式 注意这里是c++98/03规定的,静态百年来那个必须类外定义,所以这里不算违反单例
	Test* Test::_test = new Test();
}

注意:这里你可能会想,你不是说在类外不能 new 创还能对象吗,你这不是创建了吗?

这里确实在类外面 new 了但是单例static的,所以这里是在初始化静态变量不算是违反单例模式的规则!

饿汉模式 是一种相对简单的实现单例的方式,只需要在类中静态申明,在类外面定义即可!但是他也有一定的弊端:延缓服务启动的速度

完全的启动项目是需要时间的,尤其一些很大的项目,但是创建单例对象也是需要时间的,饿汉模式 在启动时,是先创建的单例对象(静态的),如果这个单例类很大,先创建单例对象务必会延缓项目的启动,如果后期使用这个单例对象还好,如果不使用那就白创建了,且耽误了项目的启动时间!

综上所述,饿汉模式 不是很推荐,除非图他的简单,并且服务规模较小;既然饿汉模式优缺点,那就要解决缺点,所以 懒汉模式 就来了!

3.4.2 懒汉模式

你的舍友王二狗,很懒,每次吃完饭之后的碗都不想洗,等到下一次再吃的时候再洗,王二狗的这种做法比较轻松~!

懒汉模式 也是一样的,他并不会在程序加载时创建对象,而是第一次调用句柄的时候创建,后续无需创建,直接使用即可~!

class Test
{
private:
	// 构造私有化
	Test()
	{}
	// 删除/禁用 拷贝后早和赋值拷贝
	Test(const Test& t) = delete;
	Test& operator=(const Test& t) = delete;
public:
	// 静态函数获取句柄
	static Test* getInStance()
	{
		if (_test == nullptr)
		{
			_test = new Test();// 第一次使用的时候创建
		}
		return _test;
	}

	void print()
	{
		std::cout << "hello world" << std::endl;
	}

private:
	// 指向静态单例对象的指针
	static Test* _test;
};
// 初始化为空指针
Test* Test::_test = nullptr;

注意: 此时的静态指针需要初始化为 nullptr,方便第一次判断

饿汉模式中出现的问题这里全部都解决了:

• 创建耗时  - >  只有第一次使用时才创建

• 占用资源  - >  如果不使用,就不会创建

懒汉模式 的核心在于延时加载,可以优化服务器的速度以及资源的占用

延时加载就有点像我们以前的 写实拷贝、动态库、以及地址空间等!就赌你不使用~!

这样下来,懒汉模式 确实优秀,实现起来也不麻烦吧,为什么说 饿汉模式 简单呢?

这是因为当前只是单线程的场景,如果是多线程的场景下,此时的 懒汉模式 会有大问题,如果过过个线程同时执行 getInstance() 同时认为 _test = nullptr 那不就每个线程都创建了一个单例对象吗,这显然是不合理的,不符合单例模式!也就是说当前实现的 懒汉模式 存在严重的 线程安全 的问题!

namespace cp
{
	class Test
	{
	private:
		// 构造私有化
		Test()
		{}
		// 删除/禁用 拷贝后早和赋值拷贝
		Test(const Test& t) = delete;
		Test& operator=(const Test& t) = delete;
	public:
		// 静态函数获取句柄
		static Test* getInStance()
		{
			if (_test == nullptr)
			{
                std::cout << "获取了一个 Test 对象..." << std::endl;
				_test = new Test();// 第一次使用的时候创建
			}
			return _test;
		}

		void print()
		{
			std::cout << "hello world" << std::endl;
		}

	private:
		// 指向静态单例对象的指针
		static Test* _test;
	};
	// 初始化为空指针
	Test* Test::_test = nullptr;
}

int main()
{
    pthread_t tids[5];
    for(int i = 0; i < 5; i++)
    {
        pthread_create(tids+i, nullptr, [](void*)->void*{
            auto ptr = cp::Test::getInStance();
            ptr->print();
            return nullptr;
        }, nullptr);
    }

    for(int i = 0; i < 5; i++)
        pthread_join(tids[i], nullptr);
        
	return 0;
}

有了线程安全的问题如何解决呢?

解决并发访问造成的安全问题的利器那必然是 互斥锁!只需要在获取句柄时,加锁,获取完了解锁,当一个线程发现没有锁,就等待~!

namespace cp
{
	class Test
	{
	private:
		// 构造私有化
		Test()
		{}
		// 删除/禁用 拷贝后早和赋值拷贝
		Test(const Test& t) = delete;
		Test& operator=(const Test& t) = delete;
	public:
		// 静态函数获取句柄
		static Test* getInStance()
		{
            // 在每次获取句柄时先加锁
            pthread_mutex_lock(&_mutex);
			if (_test == nullptr)
			{
                std::cout << "获取了一个 Test 对象..." << std::endl;
				_test = new Test();// 第一次使用的时候创建
			}
            // 获取完解锁
            pthread_mutex_unlock(&_mutex);
			return _test;
		}

		void print()
		{
			std::cout << "hello world" << std::endl;
		}

	private:
		// 指向静态单例对象的指针
		static Test* _test;
        static pthread_mutex_t _mutex;
	};
	// 初始化为空指针
	Test* Test::_test = nullptr;
    pthread_mutex_t Test::_mutex = PTHREAD_MUTEX_INITIALIZER;
}

OK,结果没有问题了! 但是这里还有一个效率问题

当前的代码确实可以只创建一个单例对象了,但是即使后面来的线程不会在创建了,他也要进行申请锁、加锁、等待、解锁这个过程,而我们知道加锁的过程本身也是有资源消耗的,所以这样写还是不优雅

解决方案双检查加锁 DoubleCheck

static Test *getInStance()
{
    // 双检查加锁
    if (_test == nullptr)
    {
        // 在每次获取句柄时先加锁
        pthread_mutex_lock(&_mutex);
        if (_test == nullptr)
        {
            std::cout << "获取了一个 Test 对象..." << std::endl;
            _test = new Test(); // 第一次使用的时候创建
        }
        // 获取完解锁
        pthread_mutex_unlock(&_mutex);
    }

    return _test;
}

这样写就非常的优雅了,因为只有第一次会进行加锁的行为,之后就不在加锁了!因为 if 本身和加锁的消耗比可以忽略~!

饿汉模式没有现成安全吗?

没有!饿汉模式下,单例对象一开始就被创建了,因为是 static 的所以后期即便是多线程并发访问,线程池的对象始终只有一个,也就不存在线程安全了!

这样看下来 饿汉模式 确实比 懒汉模式 简单,懒汉模式 还得考虑单例对象的线程安全问题,饿汉模 式直接单例对象创建一个,生命周期随进程~!

当然,C++11或者更高的版本,懒汉模式有更为简洁的做法:当调用 getInStance() 时创建一个静态的对象并返回,这样因为是动态的只会创建一次,所以是可行的。并且在 C++11 之后,可以保证静态变量初始化时的线程安全问题,也就不需要 双检查加锁 了,实现起来非常简单

class Test
{
private:
    // 构造私有化
    Test() {}
    // 删除拷贝构造函数和赋值运算符,防止复制
    Test(const Test &) = delete;
    Test &operator=(const Test &) = delete;
public:
    static Test* getInstance()
    {
        static Test _test;
        return &_test;// C++11 以及更高的版本
    }

    void print()
    {
        std::cout << "hello world" << std::endl;
    }
};

这样写起来就是非常简单了~!

注意: 静态变量创建时的线程安全问题,在 C++11 之前是不被保障的


3.5 线程池_V3版(最终版)

我们这里就直接使用 懒汉模式 实现了,他比较优秀~!

• V3版全部源码

#ifndef _M_T_P_
#define _M_T_P_

#include "Thread.hpp"
#include "Log.hpp"
#include "BlockingQueue.hpp"
#include "LockGuard.hpp"
#include <pthread.h>
#include <vector>
#include <queue>
#include <iostream>
#include <unistd.h>

using namespace ThreadModule;
using namespace LogModule;

const static int g_default = 5;

void test()
{
    while (true)
    {
        std::cout << "thread is running..." << std::endl;
        sleep(1);
    }
}

template <class T>
class ThreadPool
{
private:
    // 给任务队列加锁
    void LockQueue()
    {
        pthread_mutex_lock(&_mutex);
    }
    // 给任务队列解锁
    void UnLockQueue()
    {
        pthread_mutex_unlock(&_mutex);
    }
    // 在 _cond 条件下阻塞等待
    void Sleep()
    {
        pthread_cond_wait(&_cond, &_mutex);
    }
    // 唤醒一个休眠的线程
    void WakeUp()
    {
        pthread_cond_signal(&_cond);
    }
    // 唤醒所有休眠的线程
    void WakeUpAll()
    {
        pthread_cond_broadcast(&_cond);
    }
    // 判断任务队列是否为空
    bool IsEmpty()
    {
        // return _task_queue.empty();
        return _task_queue.IsEmpty();
    }
    // 处理任务 -> 消费者
    void HandlerTask(const std::string &name)
    {
        while (true)
        {
            LockQueue();
            // 任务队列为空
            while (IsEmpty() && _is_running)
            {
                LOG(INFO, "%s sleep begin\n", name.c_str());
                _sleep_thread_num++;
                Sleep(); // 阻塞等待
                _sleep_thread_num++;
                LOG(INFO, "%s wake up\n", name.c_str());
            }
            // 如果任务队列为空 && 线程池的状态为 退出
            if (IsEmpty() && !_is_running)
            {
                UnLockQueue();
                LOG(INFO, "%s quit...\n", name.c_str());
                break;
            }
            // 获取任务
            // T t = _task_queue.front();
            // _task_queue.pop();
            T t;
            _task_queue.Pop(&t);
            UnLockQueue();
            // 处理任务
            t(); // 注意这里的处理任务不应该放在临界区因为处理任务也费时间
            // std::cout << name << ": " << t.result() << std::endl;
            LOG(DEBUG, "%s handler task: %s\n", name.c_str(), t.result().c_str());
        }
    }

    // 私有化构造
    ThreadPool(int thread_num = g_default)
        : _thread_num(thread_num), _sleep_thread_num(0), _is_running(false)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }

    // 删除或禁用赋值拷贝和拷贝构造
    ThreadPool(const ThreadPool &tp) = delete;
    ThreadPool &operator=(const ThreadPool &tp) = delete;

public:
    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }

    // 创建获取单例对象的句柄静态函数 -> 懒汉式
    static ThreadPool *getInstance()
    {
        // 双重检查加锁
        if (_tp == nullptr)
        {
            // 加锁 -> RAII风格
            LockGuard lock(&_static_mutex);
            if (_tp == nullptr)
            {
                _tp = new ThreadPool<T>();
                _tp->Init();
                _tp->Start();
                LOG(INFO, "Create ThreadPool...\n");
            }
            else
            {
                LOG(INFO, "Get ThreadPool...\n");
            }
        }

        return _tp;
    }

    void Init()
    {
        func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);
        for (int i = 0; i < _thread_num; i++)
        {
            std::string threadname = "thread_" + std::to_string(i + 1);
            _threads.emplace_back(threadname, func);
            LOG(INFO, "%s is init success!\n", threadname.c_str());
        }
    }

    void Start()
    {
        LockQueue();

        _is_running = true;
        UnLockQueue();
        for (auto &t : _threads)
        {
            t.start();
            LOG(INFO, "%s is start...\n", t.get_name().c_str());
        }
    }

    void Stop()
    {
        LockQueue();
        LOG(INFO, "threadpool is stop...\n");
        _is_running = false;
        WakeUpAll();
        UnLockQueue();
    }

    // 向任务队列推送任务 -> 生产者
    void PushTask(T &task)
    {
        LockQueue();
        // 当线程池是启动的时候才允许推送任务
        if (_is_running)
        {
            _task_queue.Push(task);
            if (_sleep_thread_num > 0)
            {
                WakeUp();
            }
        }
        UnLockQueue();
    }

private:
    int _thread_num;              // 线程的数目
    std::vector<Thread> _threads; // 管理线程的容器
    // std::queue<T> _task_queue;    // 任务队列
    BlockingQueue<T> _task_queue; // 阻塞队列
    int _sleep_thread_num;        // 休眠线程的数目
    bool _is_running;             // 线程池的状态
    pthread_mutex_t _mutex;       // 互斥锁
    pthread_cond_t _cond;         // 条件变量

    static ThreadPool<T> *_tp;            // 单例模式
    static pthread_mutex_t _static_mutex; // 单例锁
};

// 类外初始化
template <class T>
ThreadPool<T> *ThreadPool<T>::_tp = nullptr;

template <class T>
pthread_mutex_t ThreadPool<T>::_static_mutex = PTHREAD_MUTEX_INITIALIZER;

#endif

先来测试一下:

OK没有问题!

如何验证当前是单例模式呢?

这里最简单的方式就是,将每一个线程获取的句柄都给打印出来,看看地址是否一样就可以了,如果式样的就证明了是单例对象:

这里显然是一样的,所以当前的线程池就是单例模式的~!

至此我们的线程池就算封装完毕了,以下是一些注意事项:

1、注意加锁解锁的位置,尽可能提高效率

2、使用双检查加锁,避免不必要的竞争

3、可以使用 volatile 修饰静态单例对象指针,避免被编译器优化覆盖

4、周边知识补充

4.1 STL 的线程问题

STL库中的容器是线程安全的呢?

答案是:不是

因为STL设计的初衷是打造出极致的性能,而加锁和解锁势必会影响性能,因此, STL 中的容器并没有考虑线程安全问题,在前面写 生产消费者模型、线程池 我们都是对使用的STL容器进行手动的加锁,确保多线程并发访问的线程安全问题~!

所以在多线程场景中使用 STL 库时,需要自己确保线程安全

4.2 智能指针线程安全问题

C++ 标准提供的智能指针有三种:unique_ptr、shared_ptr、weak_ptr

首先来说 unique_ptr,这是个功能单纯的智能指针,只具备基本的 RAII 风格,不支持拷贝,因此无法作为参数传递,也就不涉及线程安全问题

其次是 shared_ptr,得益于 引用计数,这个智能指针支持拷贝,可能被多线程并发访问,但标准库在设计时考虑到了这个问题,索性将 shared_ptr 对于引用计数的操作设计成了 原子操作 CAS,这就确保了它的 线程安全

至于 weak_ptr,这个就是 shared_ptr 的小弟,名为弱引用智能指针,具体实现与 shared_ptr 一脉相承,因此它也是线程安全


OK,这就是本期的所有内容,好兄弟,我是cp我们下期再见!


http://www.kler.cn/news/356519.html

相关文章:

  • vcenter的使用
  • linux查看系统类型
  • Sqli-labs less-27
  • Oracle 19C VIP跑到了PUBLIC IP上?
  • 学SQL第一天
  • 【C++】- STL之vector模拟实现
  • wifi、热点密码破解 - python
  • 8-基于双TMS320C6678 + XC7K420T的6U CPCI Express高速数据处理平台
  • oracle数据库---基本查询(单表查询、多表查询、子查询、分页查询、oracle内置函数、行列转换、集合运算)
  • 通过docker镜像安装elasticsearch和kibana
  • 充电桩高压快充发展趋势
  • JPA、Hibernate入门及实战
  • Vmware一步安装win7系统
  • 从零开始搭建:基于在线教育系统源码的线上网校开发详解
  • 软考-软件设计师(10)-专业英语词汇汇总与新技术知识点
  • 基于x86_64汇编语言简单教程1: 环境预备与尝试
  • 25计软新增考研院校!或可捡漏上岸!
  • 「C++」内存管理
  • linux centos7系统ARM架构下安装最新版docker 27.3.1及docker-compose v2.3.4
  • 【ESP32】Arduino开发 | LED PWM控制器+呼吸灯例程