当前位置: 首页 > article >正文

[多线程]基于单例懒汉模式的线程池的实现

标题:[多线程]基于单例懒汉模式的线程池的实现
@水墨不写bug


在这里插入图片描述

文章目录

  • 一、锁的RAII设计
  • 二、任务类举例
  • 三、日志系统的设计
  • 四、基于POSIX线程封装自己的线程模块
    • 设计要点
    • 主要功能
    • 使用示例
  • 五、基于懒汉模式的线程池主体逻辑设计
    • 类 `ThreadPool` 的成员函数详解
    • 静态成员初始化
  • 六、总结


本文我们将要实现一个线程池,其中用到的一些模块如LockGuard,Log等在之前的文章中有所讲述,所以本文就不再赘述。


一、锁的RAII设计

#pragma once
#include <pthread.h>

// 使用锁需要频繁调用系统调用,十分麻烦
// 于是实现锁的RAII设计
class LockGuard
{
private:
	pthread_mutex_t *GetMutex() { return _mutex; }

public:
	LockGuard(pthread_mutex_t *mutex)
		: _mutex(mutex)
	{
		pthread_mutex_lock(_mutex);
	}
	~LockGuard()
	{
		pthread_mutex_unlock(_mutex);
	}

private:
	pthread_mutex_t *_mutex;
};

/*
 *之前的理解错误了:不需要init和destroy,因为锁本身就是存在的!
 *锁在一般情况下会内置在类的内部,需要使用(加锁)的时候,把锁的地址传进来就行了
 *在构造函数内加锁,析构函数内解锁。
 *e.g.
 while(ture)
 {
	LockGuard lockguard(td->_mutex);
	if(tickets > 0)
	{
		//抢票
	}
	else
	{
		break;
	}
 }
 *while内每一次进行循环,都需要创建一个新的锁,创建即加锁,if代码块结束即为解锁
 */

/*
	void UnLock() { pthread_mutex_unlock(&_mutex); }
	void Lock() { pthread_mutex_lock(&_mutex); }
*/

以上代码是基于RAII(Resource Acquisition Is Initialization)设计模式的锁管理类LockGuard

该类用于简化对pthread库的互斥锁(pthread_mutex_t)的使用,确保在加锁和解锁过程中的安全性和简便性。


二、任务类举例

#pragma once
#include <functional>
#include <iostream>

using std::cout;
using std::endl;

// void DownLoad()
// {
//     cout<<"I am a DownTask"<<endl;
// }

// 任务类,包含各种任务
class Task
{
private:
    using task_t = std::function<void()>;

    void Excute()
    {
        _result = _x + _y;
    }

public:
    Task()
        : _x(0), _y(0)
    {
    }
    Task(int a, int b)
        : _x(a), _y(b)
    {
    }

    void operator()()
    {
        Excute();
    }

    void Debug()
    {
        cout<<"Debug : "<<_x<<" + "<<_y<<" = "<<_result<<endl;
    }

private:
    int _x;
    int _y;
    int _result;
};

这个类封装了一个具体的任务:两数相加。
尽管这是一项简单的任务(这样设计是为了便于理解线程池的部分),但是其实任务类可以根据用户的需要来自行设计,可以是下载资源,请求网页html等。

任务类的设计目的就是一个演示作用。


三、日志系统的设计

#pragma once
#include <string>
#include <iostream>
#include <unistd.h>
#include <sys/types.h>
#include <ctime>
#include <cstdarg>
#include <fstream>
#include <cstring>
#include "LockGuard.hpp"

namespace log_ddsm
{
    using std::cout;
    using std::endl;

    // 日志信息管理

    enum LEVEL
    {
        DEBUG = 1,   // 调试信息
        INFO = 2,    // 提示信息
        WARNING = 3, // 警告
        ERROR = 4,   // 错误,但是不影响服务正常运行
        FATAL = 5,   // 致命错误,服务无法正常运行
    };
    enum SIZE
    {
        TIMESIZE = 128,
        LOGSIZE = 1024,

        FILE_TYPE_LOG_SIZE = 2048
    };
    enum TYPE
    {
        SCREEN_TYPE = 8,
        FILE_TYPE = 16
    };

    // 默认日志文件名称
    const char *DEFAULT_LOG_NAME = "./log.txt";
    // 全局锁,保护打印日志
    pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;

    struct logMessage
    {
        std::string _level; // 日志等级
        int _level_num;     // 日志等级的int格式

        pid_t _pid;             // 这条日志的进程id
        std::string _file_name; // 文件名
        int _file_number;       // 行号
        std::string _cur_time;  // 当时的时间
        std::string _log_info;  // 日志正文
    };

    // 通过int获取日志等级
    std::string getLevel(int level)
    {
        switch (level)
        {
        case 1:
            return "DEBUG";
        case 2:
            return "INFO";
        case 3:
            return "WARNING";
        case 4:
            return "ERROR";
        case 5:
            return "FATAL";
        default:
            return "UNKNOWN";
        }
    }

    std::string getCurTime()
    {
        time_t cur = time(nullptr);
        struct tm *curtime = localtime(&cur);
        char buf[TIMESIZE] = {0};
        snprintf(buf, sizeof(buf), "%d-%d-%d %d:%d:%d",
                 curtime->tm_year + 1900,
                 curtime->tm_mon + 1,
                 curtime->tm_mday,
                 curtime->tm_hour,
                 curtime->tm_min,
                 curtime->tm_sec);
        return buf;
    }

    class Log
    {
    private:
        void FlushMessage(const logMessage &lg)
        {
            // 互斥锁,保护Print过程
            LockGuard lockguard(&gmutex);

            // 过滤逻辑
            // 致命错误到文件逻辑
            if (lg._level_num >= _ignore_level)
            {
                if (_print_type == SCREEN_TYPE)
                    PrintToScreen(lg);
                else if (_print_type == FILE_TYPE)
                    PrintToFile(lg);
                else
                    std::cerr << __FILE__ << " " << __LINE__ << ":" << " UNKNOWN_TYPE " << std::endl;
            }
        }
        void PrintToScreen(const logMessage &lg)
        {
            printf("[%s][%d][%s][%d][%s] %s", lg._level.c_str(),
                   lg._pid,
                   lg._file_name.c_str(),
                   lg._file_number,
                   lg._cur_time.c_str(),
                   lg._log_info.c_str());
        }

        void PrintToFile(const logMessage &lg)
        {
            std::ofstream out(_log_file_name, std::ios::app); // 追加打印
            if (!out.is_open())
            {
                std::cerr << __FILE__ << " " << __LINE__ << ":" << " LOG_FILE_OPEN fail " << std::endl;
                return;
            }
            char log_txt[FILE_TYPE_LOG_SIZE] = {0}; // 缓冲区
            snprintf(log_txt, sizeof(log_txt), "[%s][%d][%s][%d][%s] %s", lg._level.c_str(),
                     lg._pid,
                     lg._file_name.c_str(),
                     lg._file_number,
                     lg._cur_time.c_str(),
                     lg._log_info.c_str());

            out.write(log_txt, strlen(log_txt)); // 写入文件
        }

    public:
        // 打印方式——默认向显示器打印
        // 如果选择文件类型打印,默认的文件名称是当前目录的log.txt
        //
        Log(int print_type = SCREEN_TYPE)
            : _print_type(print_type), _log_file_name(DEFAULT_LOG_NAME), _ignore_level(DEBUG)
        {
        }

        // 只有一个全局对象,多个构造函数多余,不如设计一个构造,添加一个设置log文件名称的接口
        //  Log(int print_type, const char *logname = DEFAULT_LOG_NAME)
        //      : _print_type(print_type), _log_file_name(logname), _ignore_level(1)
        //  {
        //  }

        void Enable(int type)
        {
            _print_type = type;
        }

        /// @brief 加载日志信息
        /// @param format 格式化输出
        /// @param  可变参数
        /*区分了本层和商城之后,就容易设置参数了*/
        void load_message(int level, std::string filename, int filenumber, const char *format, ...)
        {
            logMessage lg;
            lg._level = getLevel(level);
            lg._level_num = level;

            lg._pid = getpid();
            lg._file_name = filename;
            lg._file_number = filenumber;
            lg._cur_time = getCurTime();

            // valist + vsnprintf 处理可变参数
            va_list ap;
            va_start(ap, format);
            char log_info[LOGSIZE] = {0};
            vsnprintf(log_info, sizeof(log_info), format, ap);
            va_end(ap);
            lg._log_info = log_info;

            // 打印逻辑
            FlushMessage(lg);
        }

        /// @brief
        /// @param ignorelevel
        /* DEBUG = 1,   调试信息
         *INOF = 2,     提示信息
         *WARNING = 3,  警告
         *ERROR = 4,    错误,但是不影响服务正常运行
         *FATAL = 5,    致命错误,服务无法正常运行 */
        void SetIgnoreLevel(int ignorelevel)
        {
            _ignore_level = ignorelevel;
        }
        void SetLogFileName(const char *newlogfilename)
        {
            _log_file_name = newlogfilename;
        }
        ~Log() {}

    private:
        int _print_type;
        std::string _log_file_name;
        int _ignore_level;
    };

    // 全局的Log对象
    Log lg;

    // 打印日志信息
    // e.g.
    // LOG(DEBUG,"this is a test message %d %f %c", 13 , 9.81 , 'A' );
    //

// 使用日志的一般格式
#define LOG(Level, Format, ...)                                            \
    do                                                                     \
    {                                                                      \
        lg.load_message(Level, __FILE__, __LINE__, Format, ##__VA_ARGS__); \
    } while (0)

    /// 无法设计为inlne——__VA_ARGS只能出现在宏替换中
    // inline void LOG(int level,const char* format, ...)
    // {
    //     lg.load_message(level,__FILE__,__LINE__,format,__VA_ARGS__);
    // }

// 往文件打印
#define ENABLE_FILE()         \
    do                        \
    {                         \
        lg.Enable(FILE_TYPE); \
    } while (0)
// 往显示器打印
#define ENABLE_SCREEN()         \
    do                          \
    {                           \
        lg.Enable(SCREEN_TYPE); \
    } while (0)
//设置日志忽略等级
#define SET_IGNORE_LEVEL(Level)  \
    do                           \
    {                            \
        lg.SetIgnoreLevel(Level) \
    } while (0)
//设置日志文件名称
#define SET_LOG_FILENAME(Name)  \
    do                          \
    {                           \
        lg.SetLogFileName(Name) \
    } while (0)

}; // namespace log_ddsm

以上是日志系统的设计,具体的讲述请参考我之前的这篇文章:

《[HTTP协议]应用层协议HTTP从入门到深刻理解并落地部署自己的云服务(2)实操部署》:
https://blog.csdn.net/2301_79465388/article/details/146114051?spm=1001.2014.3001.5501

四、基于POSIX线程封装自己的线程模块

#pragma once
#include <pthread.h>
#include <vector>
#include <functional>
#include <iostream>
#include <cassert>
#include <cstring>
#include "Log.hpp"

using namespace log_ddsm;
// 理解C++11做的工作,封装自己的原生线程
// 创建线程,执行任务
namespace ThreadMoudle
{
    // 支持模板
    using func_t = std::function<void(const std::string &name)>;

    // 线程控制-自主管理原生线程
    class Thread
    {
    private:
        // 线程创建后例行执行的函数
        static void *Routine(void *args) // 设置为static,防止this指针干扰类型匹配
        {
            Thread *self = static_cast<Thread *>(args);
            self->Excute();
            return nullptr;
        }

        void Excute()
        {
            _isrunning = true;
            _func(_name);
            _isrunning = false;
        }

    public:
        /// @brief 构造线程
        /// @param name 线程名
        /// @param func 线程要执行的任务
        Thread(const std::string &name, func_t func)
            : _name(name), _tid(), _isrunning(false), _func(func)
        {
        }

        bool Start()
        {
            int res = ::pthread_create(&_tid, nullptr, Routine, this); // 要求类型:void*(void*)——有this指针导致类型不匹配
            if (res != 0)
            {
                LOG(ERROR, "pthread_create failed with error: %s\n", strerror(res));

                // std::cerr << __FILE__ << " " << __LINE__ << ":" << "pthread_creat: error!" << std::endl;
                // printf("pthread_create failed with error: %s\n", strerror(res));

                return false;
            }
            else
            {
                return true;
            }
        }

        bool Stop()
        {
            if (_isrunning)
            {
                int res = ::pthread_cancel(_tid);
                if (res != 0)
                {
                    // std::cerr << __FILE__ << " " << __LINE__ << ":" << "pthread_cancel: error!" << std::endl;
                    // printf("pthread_cancel failed with error: %s\n", strerror(res));
                    LOG(ERROR, "pthread_cancel failed with error: %s\n", strerror(res));
                    return false;
                }
                else
                {
                    return true;
                }
            }
            //走到这里,说明线程已经stop了
            return true;
        }

        // Stop之后join
        bool Join()
        {
            if (!_isrunning)
            {
                int res = ::pthread_join(_tid, nullptr);
                if (res != 0)
                {
                    // std::cout<<"Debug res:"<<res<<std::endl;

                    // std::cerr << __FILE__ << " " << __LINE__ << ":" << "pthread_join: error!" << std::endl;
                    // printf("pthread_join failed with error: %s\n", strerror(res));
                    LOG(ERROR, "pthread_join failed with error: %s\n", strerror(res));

                    return false;
                }
                else
                {
                    return true;
                }
            }
            else
            {
                // Running时join
                LOG(INFO, "join while Running\n");

                // std::cerr << "join while Running" << std::endl;
                return false;
            }
        }
        ~Thread() {}

        bool Status() { return _isrunning; }
        std::string Name() { return _name; }

    private:
        std::string _name; // 线程的名字
        pthread_t _tid;    // 线程的类型
        bool _isrunning;   // 线程是否在运行
        func_t _func;      // 线程将要执行的回调函数
    };

}; // namespace ThreadMoudle

以上代码实现了一个线程管理模块 ThreadMoudle,其中包含一个线程类 Thread。它封装了POSIX线程(pthread)的创建、启动、停止和加入(join)操作,并提供了一些辅助功能。

设计要点

  1. 线程管理

    • 该模块封装了POSIX线程的基本操作,提供了一个易于使用的接口来管理线程的创建、启动、停止和加入操作。
  2. 可重用性和可扩展性

    • 通过使用std::function<void(const std::string &name)>,可以将任意符合该签名的回调函数传递给线程对象,从而实现任务的多样性和灵活性。
  3. 线程安全性

    • 通过内部状态标志(_isrunning)和适当的日志记录,确保线程的状态和操作的一致性,避免了多次启动或停止同一线程的问题。

主要功能

  1. 线程创建和启动

    • Thread::Start():调用pthread_create创建一个新线程,并启动执行Routine函数。Routine函数将调用实例的Excute方法执行具体任务。
    • Routine函数被声明为静态函数,以避免this指针干扰类型匹配。
  2. 线程执行具体任务

    • Thread::Excute():设置线程状态为运行中,并调用回调函数_func执行具体任务。
  3. 线程停止

    • Thread::Stop():调用pthread_cancel取消运行中的线程。对于不符合逻辑的情况:线程未运行,则记录错误日志。
  4. 线程加入

    • Thread::Join():调用pthread_join等待线程终止。对于不符合逻辑的情况:线程仍在运行,则记录错误日志。
  5. 日志记录

    • 使用日志模块(log_ddsm)记录线程操作中的错误信息,便于调试和问题排查。

使用示例

#include "Thread.hpp"

void MyTask(const std::string &name)
{
    std::cout << "Thread " << name << " is running." << std::endl;
    int count = 5;
    while(count--)
    {
        std::cout<< count<<std::endl;
    }
}

int main()
{
    ThreadMoudle::Thread myThread("TestThread", MyTask);
    if (myThread.Start())
    {
        std::cout << "Thread started successfully." << std::endl;
    }

    // 等待线程完成任务
    myThread.Join();

    return 0;
}

结果:
在这里插入图片描述

在这个示例中:

  • 创建一个名为"TestThread"的线程对象,并传递任务函数MyTask
  • 调用myThread.Start()启动线程。
  • 等待线程完成任务后,调用myThread.Join()等待线程终止。

五、基于懒汉模式的线程池主体逻辑设计

#pragma once
#include <queue>
#include <unistd.h>
#include "Log.hpp"
#include "Thread.hpp" //using pthread lib

using std::cout;
using std::endl;

using ThreadMoudle::func_t;
using namespace log_ddsm;

enum NUM
{
    DEFAULTNUM = 10
};

/// @brief
/// @tparam T:任务类型
//懒汉模式设计单例ThreadPool

template <typename T>
class ThreadPool
{
private:
    void LockQueue() // 加锁
    {
        pthread_mutex_lock(&_taskqueue_mutex);
    }
    void UnLockQueue() // 解锁
    {
        pthread_mutex_unlock(&_taskqueue_mutex);
    }
    void Sleep() // 在条件变量_taskqueue_cond下等待
    {
        pthread_cond_wait(&_taskqueue_cond, &_taskqueue_mutex);
    }

    void WakeUp() // 叫醒一个进程
    {
        pthread_cond_signal(&_taskqueue_cond);
    }
    void WakeUpAll() // 叫醒所有线程
    {
        pthread_cond_broadcast(&_taskqueue_cond);
    }
    bool IsEmpty()
    {
        return _taskqueue.empty();
    }

    // thread将要执行的函数逻辑
    void HandlerTask(const std::string &name)
    {
        // 一直循环:等待任务——执行任务——等待任务,直到运行到break逻辑
        while (true)
        {
            LockQueue();

            /*  优先级上,先考虑任务队列不为空,只有任务队列为空,再考虑是否要线程退出*/
            while (IsEmpty() && _isrunning) // 没有任务则等待,while——在唤醒之后先判断是否队列仍然有任务,否则再次等待
            {
                _sleep_num++;
                Sleep();
                _sleep_num--;
            }
            if (IsEmpty() && !_isrunning)
            {
                UnLockQueue();

                LOG(DEBUG, "thread quit\n");
                // cout << "thread quit" << endl;
                break;
            }

            T t; // 从taskqueue中获取任务
            t = _taskqueue.front();
            _taskqueue.pop();
            UnLockQueue();

            // 执行任务,在解锁之后进行,执行任务要与从队列中获取任务并发
            // 不能在临界区中进行
            t();

            LOG(DEBUG, "%s", name.c_str());
            // cout << name << " : ";
            t.Debug();
        }
    }
    // 缺省个数——10
    ThreadPool(size_t threadnum)
        : _threadnum(threadnum), _taskqueue(), _isrunning(false), _sleep_num(0) // 初始化列中调用has a的构造??
    {
        // 初始化
        pthread_mutex_init(&_taskqueue_mutex, nullptr);
        pthread_cond_init(&_taskqueue_cond, nullptr);
    }
    // 初始化线程池
    void Init()
    {
        func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);
        for (int i = 0; i < _threadnum; ++i)
        {
            std::string name = "thread - " + std::to_string(i + 1);

            _threads.emplace_back(name, func); // 减少一次拷贝,直接在vector内构造
            // usleep(10000);
            // cout << "emplaced i+1 :" << i + 1 << endl;
        }
    }

    // 开始运行
    void Start()
    {
        _isrunning = true; // 单进程访问不需要保护

        for (int i = 0; i < _threadnum; ++i)
        {
            _threads[i].Start();
            LOG(DEBUG, "thread - %d started \n", i + 1);
        }
    }

public:
    static ThreadPool<T> *GetInstance()
    {
        if (_ptp == nullptr)
        {
            LockGuard lockguard(&_sig_mutex);
            if (_ptp == nullptr)
            {
                LOG(INFO, "create threadspool\n");
                _ptp = new ThreadPool<T>(DEFAULTNUM);//线程安全问题
                _ptp->Init();
                _ptp->Start();
            }
        }
        else
        {
            LOG(INFO, "get threadspool\n");
        }
        return _ptp;
    }

    // 停止运行
    void Stop()
    {
        LockQueue();
        _isrunning = false;
        WakeUpAll();
        UnLockQueue();
    }

    // 向任务队列加入任务
    void PushTask(const T &in)
    {
        LockQueue();
        if (_isrunning == true) // 防止出现线程池想要退出但是 却被一直分配任务,导致无法退出
        {
            _taskqueue.push(in); // 加入任务
            if (_sleep_num > 0)
                WakeUp();
        }
        UnLockQueue();
    }

    ~ThreadPool()
    {
        for (int i = 0; i < _threadnum; ++i)
        {
            LOG(INFO, "thread-%d joined\n", i + 1);
            _threads[i].Join();
        }
        pthread_mutex_destroy(&_taskqueue_mutex);
        pthread_cond_destroy(&_taskqueue_cond);
    }

private:
    size_t _threadnum;                          // 线程池中线程的个数
    std::vector<ThreadMoudle::Thread> _threads; // vector维护线程池

    std::queue<T> _taskqueue; // 任务队列
    bool _isrunning;          // 标识线程池是否在运行

    /*多线程对taskqueue访问需要整体使用——加锁*/
    pthread_mutex_t _taskqueue_mutex; // 用于维护taskqueue的mutex cond
    pthread_cond_t _taskqueue_cond;

    int _sleep_num; // 休眠的线程个数

    static ThreadPool<T> *_ptp; // 单例对象
    static pthread_mutex_t _sig_mutex;
};

// 静态成员在类外初始化
template <class T>
ThreadPool<T> *ThreadPool<T>::_ptp = nullptr;

// 第一次保护获取单例的mutex
template <class T>
pthread_mutex_t ThreadPool<T>::_sig_mutex = PTHREAD_MUTEX_INITIALIZER;

ThreadPool 的成员函数详解

  1. LockQueue()UnLockQueue()

    • 这两个函数分别用来加锁和解锁 _taskqueue_mutex 互斥锁。
    • 目的是保护任务队列 _taskqueue 的访问,确保在多线程环境下对任务队列的操作是线程安全的。
    void LockQueue() // 加锁
    {
        pthread_mutex_lock(&_taskqueue_mutex);
    }
    
    void UnLockQueue() // 解锁
    {
        pthread_mutex_unlock(&_taskqueue_mutex);
    }
    
  2. Sleep()WakeUp()WakeUpAll()

    • Sleep() 函数使线程在条件变量 _taskqueue_cond 上等待,等待条件满足时被唤醒。
    • WakeUp() 函数唤醒一个在 _taskqueue_cond 上等待的线程。
    • WakeUpAll() 函数唤醒所有在 _taskqueue_cond 上等待的线程。
    • 这些函数用于实现线程休眠和唤醒机制,当任务队列为空时,线程进入休眠状态;当有新任务加入时,唤醒线程执行任务。
    void Sleep() // 在条件变量_taskqueue_cond下等待
    {
        pthread_cond_wait(&_taskqueue_cond, &_taskqueue_mutex);
    }
    
    void WakeUp() // 叫醒一个进程
    {
        pthread_cond_signal(&_taskqueue_cond);
    }
    
    void WakeUpAll() // 叫醒所有线程
    {
        pthread_cond_broadcast(&_taskqueue_cond);
    }
    
  3. IsEmpty()

    • 检查任务队列是否为空。
    • 返回布尔值,表示任务队列是否为空。
    bool IsEmpty()
    {
        return _taskqueue.empty();
    }
    
  4. HandlerTask(const std::string &name)

    • 线程执行的主要函数逻辑。
    • 线程在循环中等待任务队列中的任务,当有任务时,从任务队列中取出任务并执行。
    • 当任务队列为空且线程池不再运行时,线程退出循环,结束执行。
    // thread将要执行的函数逻辑
    void HandlerTask(const std::string &name)
    {
        // 一直循环:等待任务——执行任务——等待任务,直到运行到break逻辑
        while (true)
        {
            LockQueue();
    
            /*  优先级上,先考虑任务队列不为空,只有任务队列为空,再考虑是否要线程退出*/
            while (IsEmpty() && _isrunning) // 没有任务则等待,while——在唤醒之后先判断是否队列仍然有任务,否则再次等待
            {
                _sleep_num++;
                Sleep();
                _sleep_num--;
            }
            if (IsEmpty() && !_isrunning)
            {
                UnLockQueue();
    
                LOG(DEBUG, "thread quit\n");
                break;
            }
    
            T t; // 从taskqueue中获取任务
            t = _taskqueue.front();
            _taskqueue.pop();
            UnLockQueue();
    
            // 执行任务,在解锁之后进行,执行任务要与从队列中获取任务并发
            // 不能在临界区中进行
            t();
    
            LOG(DEBUG, "%s", name.c_str());
            // cout << name << " : ";
            t.Debug();
        }
    }
    
  5. 构造函数 ThreadPool(size_t threadnum)

    • 初始化线程池,包括线程数量、任务队列、互斥锁和条件变量。
    • 设置 _threadnum 为线程池中的线程数量,初始化其他成员变量。
    ThreadPool(size_t threadnum)
        : _threadnum(threadnum), _taskqueue(), _isrunning(false), _sleep_num(0)
    {
        pthread_mutex_init(&_taskqueue_mutex, nullptr);
        pthread_cond_init(&_taskqueue_cond, nullptr);
    }
    
  6. Init()

    • 初始化线程池中的线程。
    • 创建 thread 对象,并将 HandlerTask 绑定为线程执行的任务函数。这一操作的目的是为了使emplace_back传入Thread构造函数的类型匹配。
    void Init()
    {
        func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);
        for (int i = 0; i < _threadnum; ++i)
        {
            std::string name = "thread - " + std::to_string(i + 1);
            _threads.emplace_back(name, func);
        }
    }
    
  7. Start()

    • 启动线程池中的线程。
    • 设置 _isrunningtrue,然后启动线程池中每个线程。
    void Start()
    {
        _isrunning = true;
        for (int i = 0; i < _threadnum; ++i)
        {
            _threads[i].Start();
            LOG(DEBUG, "thread - %d started \n", i + 1);
        }
    }
    
  8. GetInstance()

    • 获取线程池的单例实例。
    • 使用双重检查锁定机制确保线程安全。
    static ThreadPool<T> *GetInstance()
    {
        if (_ptp == nullptr)
        {
            LockGuard lockguard(&_sig_mutex);
            if (_ptp == nullptr)
            {
                LOG(INFO, "create threadspool\n");
                _ptp = new ThreadPool<T>(DEFAULTNUM);
                _ptp->Init();
                _ptp->Start();
            }
        }
        else
        {
            LOG(INFO, "get threadspool\n");
        }
        return _ptp;
    }
    
  9. Stop()

    • 停止线程池的运行。
    • 设置 _isrunningfalse,并唤醒所有休眠的线程。
    void Stop()
    {
        LockQueue();
        _isrunning = false;
        WakeUpAll();
        UnLockQueue();
    }
    
  10. PushTask(const T &in)

    • 向任务队列中添加任务。
    • 如果线程池正在运行,添加任务到任务队列,并唤醒一个休眠的线程。
    void PushTask(const T &in)
    {
        LockQueue();
        if (_isrunning == true)
        {
            _taskqueue.push(in);
            if (_sleep_num > 0)
                WakeUp();
        }
        UnLockQueue();
    }
    
  11. 析构函数 ~ThreadPool()

    • 销毁线程池,等待所有线程结束并释放资源。
    • 销毁互斥锁和条件变量。
    ~ThreadPool()
    {
        for (int i = 0; i < _threadnum; ++i)
        {
            LOG(INFO, "thread-%d joined\n", i + 1);
            _threads[i].Join();
        }
        pthread_mutex_destroy(&_taskqueue_mutex);
        pthread_cond_destroy(&_taskqueue_cond);
    }
    

静态成员初始化

  • 静态成员变量 _ptp_sig_mutex 在类外进行初始化。
template <class T>
ThreadPool<T> *ThreadPool<T>::_ptp = nullptr;

template <class T>
pthread_mutex_t ThreadPool<T>::_sig_mutex = PTHREAD_MUTEX_INITIALIZER;

六、总结

ThreadPool 类实现了一个线程池的基本功能,当然你也可以在这个线程池的基础上扩充实现自己需要的功能。
设计中使用单例模式确保只有一个线程池实例,使用互斥锁和条件变量保证多线程环境下的线程安全,使用日志记录监控线程池的运行状态。


完~
转载请注明出处

在这里插入图片描述


http://www.kler.cn/a/582796.html

相关文章:

  • Redis 2025/3/9
  • nextJs在DOM视图中渲染未转为状态值的localStorage导致报错
  • mac 被禁用docker ui后,如何使用lima虚拟机启动docker
  • 【实战ES】实战 Elasticsearch:快速上手与深度实践-8.1.2近似最近邻(ANN)算法选型
  • 【Synchronized】不同的使用场景和案例
  • 信号处理之插值、抽取与多项滤波
  • 【C++】C++11新特性
  • ELK traceId 通过A服务调用B服务举例
  • Hive SQL 精进系列:COALESCE 手册
  • 跨境电商IP安全怎么做?从基础到高级防护的实战经验分享
  • 信息学奥赛c++语言:整数去重
  • idea maven 编译报错Java heap space解决方法
  • 华为欧拉操作系统安装Docker服务
  • 基于 GEE 利用 Sentinel-2 数据反演叶绿素与冠层水分含量
  • Android Glide 的显示与回调模块原理源码级深度剖析
  • Vue+Node.js+MySQL+Element-Plus实现一个账号注册与登录功能
  • FPGA 实现 OV5640 摄像头视频图像显示
  • 如何制作Windows系统盘、启动盘?(MediaCreationTool_22H2)
  • Banana Pi 与瑞萨电子携手共同推动开源创新:BPI-AI2N
  • Java 大视界 -- Java 大数据在智能安防视频摘要与检索技术中的应用(128)