当前位置: 首页 > article >正文

Linux从0到1——线程池【利用日志Debug】

Linux从0到1——线程池

  • 1. 简易日志系统
    • 1.1 可变参数列表
    • 1.2 设计日志
    • 1.3 改进
  • 2. 线程池
    • 2.1 需要用到的头文件
    • 2.2 需求分析
    • 2.3 实现细节
    • 2.4 完整代码
  • 3. 改造单例版本
  • 4. 改进思路


1. 简易日志系统


1.1 可变参数列表


1. printf

  • 我们其实一直在使用可变参数列表,printf函数就使用了它。

在这里插入图片描述

  • 第一个参数format是一个字符串,指定可变参数的数据格式。第二个参数...就是可变参数。

2. 自己设计一个可变参数的接口,计算任意整数的和

#include <iostream>
#include <cstdarg> // 包含处理可变参数的宏

// 函数声明
int sum(int count, ...);

// 函数定义
int sum(int count, ...) 
{
    int total = 0;
    va_list args; // 定义一个 va_list 对象,这是一个char *类型(或者void *)的指针

    // 初始化 va_list 对象
    va_start(args, count);

    // 遍历所有参数
    for (int i = 0; i < count; ++i) 
    {
        total += va_arg(args, int); // 获取下一个参数并累加
    }

    // 清理 va_list 对象
    va_end(args);

    return total;
}

int main() 
{
    // 测试函数
    int result = sum(5, 1, 2, 3, 4, 5);
    std::cout << "Sum: " << result << std::endl;

    result = sum(3, 10, 20, 30);
    std::cout << "Sum: " << result << std::endl;

    return 0;
}
  • va_list:是一个char*void*的指针类型,不同编译器可能有所差别。
  • va_start:初始化 va_list 对象,使其指向第一个可变参数。第二个参数固定传可变参数的前一个参数。
  • va_arg:从可变参数列表中获取下一个参数。需要指定参数的类型(在这里是 int)。
  • va_end:清理 va_list 对象,确保资源被正确释放(就是将该指针置空)。

在这里插入图片描述


1.2 设计日志


1. 需求分析

  • 首先,这个日志中的信息要有各种等级,来表明各个信息的重要程度,和他们的意义;
  • 日志信息要能向显示器写入,也能向文件写入。向文件写入时,可以向一个文件写入,也可以按信息不同等级,向不同等级文件写入;
  • 每一条日志信息的组成:[等级][写入时间][进程id] 用户自定义信息。

2. 大致思路

  • 设计两个枚举类型,一个来表示信息等级,一个来表示信息输出的方式。
  • log类对外只暴漏一个接口,功能是写入日志信息。

3. 完整的日志类实现Log.hpp

#pragma once

#include<iostream>
#include<string>
#include<cstdarg>
#include<ctime>
#include<fstream>
#include<sys/types.h>
#include<sys/stat.h>
#include<fcntl.h>
#include<unistd.h>

enum
{
    Debug = 0,
    Info,       // 正常信息
    Warning,    // 告警
    Error,      // 错误
    Fatal       // 致命错误
};

enum
{
    Screen = 0,    // 向显示器打印
    OneFile,        // 向一个文件打印
    ClassFile       // 向多个文件打印
};

// 将日志等级转换为string
std::string LevelToString(int level)
{
    switch(level)
    {
    case Debug:
        return "Debug";
    case Info:
        return "Info";
    case Warning:
        return "Warning";
    case Error:
        return "Error";
    case Fatal:
        return "Fatal";
    default:
        return "Unknown";
    }
}

const int defaultstyle = Screen;                // 默认风格是向显示器打印
const std::string default_filename = "log."; // 默认文件名
const std::string logdir = "log";              // 默认日志文件夹

class Log
{
public:
    Log()
        :_style(defaultstyle)
        ,_filename(default_filename)
    {
        mkdir(logdir.c_str(), 0775);
    }

    void Enable(int style)
    {
        _style = style;
    }

    std::string TimeStampExLocalTime()
    {
        time_t currtime = time(nullptr);
        struct tm *curr = localtime(&currtime);
        char time_buffer[128];
        snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d:%d:%d", \
            curr->tm_year+1900, curr->tm_mon+1, curr->tm_mday,\
            curr->tm_hour, curr->tm_min, curr->tm_sec);
        return time_buffer;
    }

    void WriteLogToOneFile(const std::string &logname, const std::string &message)
    {
        umask(0);
        int fd = open(logname.c_str(), O_CREAT | O_WRONLY | O_APPEND, 0666);
        if(fd < 0) return;
        write(fd, message.c_str(), message.size());
        close(fd);
    }

    void WriteLogToClassFile(const std::string &levelstr, const std::string &message)
    {
        std::string logname = logdir;
        logname += "/";
        logname += _filename;
        logname += levelstr;
        WriteLogToOneFile(logname, message);
    }

    void WriteLog(const std::string &levelstr, const std::string &message)
    {
        switch(_style)
        {
            case Screen:
                std::cout << message;
                break;
            case OneFile:
                WriteLogToClassFile("all", message);
                break;
            case ClassFile:
                WriteLogToClassFile(levelstr, message);
                break;
            default:
                break;
        }
    }

    void LogMessage(int level, const char* format, ...) // 类C的一个日志接口
    {
        char rightbuffer[1024];
        va_list args;    // 这是一个char *类型(或者void *)的指针
        va_start(args, format);     // 让arg指向可变部分

        vsnprintf(rightbuffer, sizeof(rightbuffer), format, args);  // 将可变部分按照指定格式写入到content中
        va_end(args);   // 释放args, args = nullptr

        char leftbuffer[1024];
        std::string levelstr = LevelToString(level);
        std::string currtime = TimeStampExLocalTime();      // 获取当前时间
        std::string idstr = std::to_string(getpid());
        
        snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%s][%s] ", \
            levelstr.c_str(), currtime.c_str(), idstr.c_str());
        
        std::string loginfo = leftbuffer;
        loginfo += rightbuffer;
        
        WriteLog(levelstr, loginfo);
    }

    ~Log()
    {}
private:
    int _style;
    std::string _filename;
};
  • LogMessage是对外暴漏的接口,用户可以通过该接口,写入格式化的日志信息。第一个参数是信息等级,第二个是信息格式,第三个是可变参数列表。
  • 日志在使用时需要Enable使能,指定日志的输出方式。
  • 日志信息统一放在一个的文件夹中,名字默认是log。向文件中写入时,文件名的格式是:log.等级。其中如果指定向一个文件中写入,文件名是log.all

4. 测试代码:

int main()
{
    Log log;
    log.Enable(OneFile);

    log.LogMessage(Debug, "hello %d world %lf\n", 10, 3.14);
    log.LogMessage(Error, "hello %d world %lf\n", 10, 3.14);
    log.LogMessage(Warning, "hello %d world %lf\n", 10, 3.14);
    log.LogMessage(Info, "hello %d world %lf\n", 10, 3.14);
    log.LogMessage(Fatal, "hello %d world %lf\n", 10, 3.14);
    log.LogMessage(Debug, "hello %d world %lf\n", 10, 3.14);
    log.LogMessage(Debug, "hello %d world %lf\n", 10, 3.14);

	return 0;
}

在这里插入图片描述


1.3 改进


1. 线程安全问题

  • 向文件或显示器写入时,不是线程安全的,可以考虑对WriteLog方法上锁。
#include"LockGuard.hpp"

...

class Log
{
public:
    Log()
        :_style(defaultstyle)
        ,_filename(default_filename)
    {
        mkdir(logdir.c_str(), 0775);
        pthread_mutex_init(&_mutex, nullptr);
    }

	...

    void LogMessage(int level, const char* format, ...) // 类C的一个日志接口
    {
        ...
        
        {
            LockGuard lockguard(&_mutex);
            WriteLog(levelstr, loginfo);
        }
    }

    ~Log()
    {}
private:
    int _style;
    std::string _filename;
    pthread_mutex_t _mutex;
};
  • LockGuard.hpp是我们之前实现过的守护锁。

2. 配置log

  • 以下代码可以写在Log.hpp头文件中,配置log只需要修改Config类的构造函数即可。
// 配置log
Log log;

class Config
{
public:
    Config()
    {
        // 在此配置
        log.Enable(ClassFile);
    }

    ~Config()
    {}
};

Config config;

2. 线程池


2.1 需要用到的头文件


1. Log.hpp

#pragma once

#include<iostream>
#include<string>
#include<cstdarg>
#include<ctime>
#include<fstream>
#include<sys/types.h>
#include<sys/stat.h>
#include<fcntl.h>
#include<unistd.h>
#include"LockGuard.hpp"

enum
{
    Debug = 0,
    Info,       // 正常信息
    Warning,    // 告警
    Error,      // 错误
    Fatal       // 致命错误
};

enum
{
    Screen = 0,    // 向显示器打印
    OneFile,        // 向一个文件打印
    ClassFile       // 向多个文件打印
};

// 将日志等级转换为string
std::string LevelToString(int level)
{
    switch(level)
    {
    case Debug:
        return "Debug";
    case Info:
        return "Info";
    case Warning:
        return "Warning";
    case Error:
        return "Error";
    case Fatal:
        return "Fatal";
    default:
        return "Unknown";
    }
}

const int defaultstyle = Screen;                // 默认风格是向显示器打印
const std::string default_filename = "log."; // 默认文件名
const std::string logdir = "log";              // 默认日志文件夹

class Log
{
public:
    Log()
        :_style(defaultstyle)
        ,_filename(default_filename)
    {
        mkdir(logdir.c_str(), 0775);
        pthread_mutex_init(&_mutex, nullptr);
    }

    void Enable(int style)
    {
        _style = style;
    }

    std::string TimeStampExLocalTime()
    {
        time_t currtime = time(nullptr);
        struct tm *curr = localtime(&currtime);
        char time_buffer[128];
        snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d:%d:%d", \
            curr->tm_year+1900, curr->tm_mon+1, curr->tm_mday,\
            curr->tm_hour, curr->tm_min, curr->tm_sec);
        return time_buffer;
    }

    void WriteLogToOneFile(const std::string &logname, const std::string &message)
    {
        umask(0);
        int fd = open(logname.c_str(), O_CREAT | O_WRONLY | O_APPEND, 0666);
        if(fd < 0) return;
        write(fd, message.c_str(), message.size());
        close(fd);
    }

    void WriteLogToClassFile(const std::string &levelstr, const std::string &message)
    {
        std::string logname = logdir;
        logname += "/";
        logname += _filename;
        logname += levelstr;
        WriteLogToOneFile(logname, message);
    }

    void WriteLog(const std::string &levelstr, const std::string &message)
    {
        switch(_style)
        {
            case Screen:
                std::cout << message;
                break;
            case OneFile:
                WriteLogToClassFile("all", message);
                break;
            case ClassFile:
                WriteLogToClassFile(levelstr, message);
                break;
            default:
                break;
        }
    }

    void LogMessage(int level, const char* format, ...) // 类C的一个日志接口
    {
        char rightbuffer[1024];
        va_list args;    // 这是一个char *类型(或者void *)的指针
        va_start(args, format);     // 让arg指向可变部分

        vsnprintf(rightbuffer, sizeof(rightbuffer), format, args);  // 将可变部分按照指定格式写入到content中
        va_end(args);   // 释放args, args = nullptr

        char leftbuffer[1024];
        std::string levelstr = LevelToString(level);
        std::string currtime = TimeStampExLocalTime();      // 获取当前时间
        std::string idstr = std::to_string(getpid());
        
        snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%s][%s] ", \
            levelstr.c_str(), currtime.c_str(), idstr.c_str());
        
        std::string loginfo = leftbuffer;
        loginfo += rightbuffer;
        
        {
            LockGuard lockguard(&_mutex);
            WriteLog(levelstr, loginfo);
        }
    }

    ~Log()
    {}
private:
    int _style;
    std::string _filename;
    pthread_mutex_t _mutex;
};


// 配置log
Log log;

class Config
{
public:
    Config()
    {
        // 在此配置
        log.Enable(ClassFile);
    }

    ~Config()
    {}
};

Config config;

2. Thread.hpp

#pragma once

#include <iostream>
#include <string>
#include <functional>

template<class T>
using func_t = std::function<void(T&)>;

template<class T>
class Thread
{
public:
    Thread(const std::string &threadname, func_t<T> func, T &data)
        :_tid(0), _threadname(threadname), _isrunning(false), _func(func), _data(data)
    {}

    static void *ThreadRoutine(void* args)
    {
        Thread *ts = static_cast<Thread *>(args);

        ts->_func(ts->_data);

        return nullptr;
    }

    bool Start()
    {
        int n = pthread_create(&_tid, nullptr, ThreadRoutine, this);
        if(n == 0) 
        {
            _isrunning = true;
            return true;
        }
        else return false;
    }

    bool Join()
    {
        if(!_isrunning) return false;
        int n = pthread_join(_tid, nullptr);
        if(n == 0)
        {
            _isrunning = false;
            return true;
        }
        return false;
    }

    bool IsRunning()
    {
        return _isrunning;
    }

    std::string ThreadName()
    {
        return _threadname;
    }

    ~Thread()
    {}

private:
    pthread_t _tid;             // 库级别线程id
    std::string _threadname;    // 线程名
    bool _isrunning;             // 运行状态
    func_t<T> _func;               // 线程执行的回调方法
    T _data;
};

3. LockGuard.hpp

#pragma once

#include <pthread.h>

// 不定义锁,默认认为外部会给我们传入锁对象
class Mutex
{
public:
    Mutex(pthread_mutex_t *lock):_lock(lock)
    {}

    void Lock()
    {
        pthread_mutex_lock(_lock);
    }

    void Unlock()
    {
        pthread_mutex_unlock(_lock);
    }

    ~Mutex()
    {}

private:
    pthread_mutex_t *_lock;
};

class LockGuard
{
public:
    LockGuard(pthread_mutex_t *lock):_mutex(lock)
    {
        _mutex.Lock();
    }

    ~LockGuard()
    {
        _mutex.Unlock();
    }

private:
    Mutex _mutex;
};

4. Task.hpp

#pragma once
#include <iostream>
#include <string>
#include <unistd.h>

const int defaultvalue = 0;

enum
{
    ok = 0,
    div_zero, // 除0错误
    mod_zero, // 模0错误
    unknow    // 未知错误
};

const std::string opers = "+-*/%)(&";

class Task
{
public:
    Task()
    {
    }

    Task(int x, int y, char op)
        : data_x(x), data_y(y), oper(op), result(defaultvalue), code(ok)
    {
    }

    void Run()
    {
        switch (oper)
        {
        case '+':
            result = data_x + data_y;
            break;
        case '-':
            result = data_x - data_y;
            break;
        case '*':
            result = data_x * data_y;
            break;
        case '/':
        {
            if (data_y == 0)
                code = div_zero;
            else
                result = data_x / data_y;
        }
        break;
        case '%':
        {
            if (data_y == 0)
                code = mod_zero;
            else
                result = data_x % data_y;
        }
        break;
        default:
            code = unknow;
            break;
        }
    }

    std::string PrintTask()
    {
        std::string s;
        s = std::to_string(data_x);
        s += oper;
        s += std::to_string(data_y);
        s += "=?";

        return s;
    }

    std::string PrintResult()
    {
        std::string s;
        s = std::to_string(data_x);
        s += oper;
        s += std::to_string(data_y);
        s += "=";
        s += std::to_string(result);
        s += " [";
        s += std::to_string(code);
        s += "]";

        return s;
    }

    void operator()()
    {
        Run();
    }
    
    ~Task()
    {
    }

private:
    int data_x;
    int data_y;
    char oper; // + - * / %

    int result;
    int code; // 结果码,0: 结果可信 !0: 结果不可信,1,2,3,4
};

2.2 需求分析


在这里插入图片描述

这个线程池要有一个任务队列queue,和一个线程数组vector,任务由外部Push进线程池内部的任务队列中,多个线程并发的从任务队列拿任务,并执行。

并发访问任务队列时,会有线程安全问题,需要控制同步和互斥:

  • Push操作一定是互斥的,Pop操作一定是互斥的;
  • 如果任务队列为空,则让线程等待,Push进去一个任务,就激活一个线程。

每一个线程需要执行的逻辑是,先从任务队列中拿任务,如果没拿到(说明任务队列为空),就先等待;拿到了就立即执行该任务。


2.3 实现细节


1. 线程池的内部成员变量

  • 一个任务队列;
  • 一个线程数组;
  • 一个整数类型_thread_num记录线程池中的线程数量;
  • 一个互斥量和一个条件变量,用来控制访问任务队列时的同步和互斥。
template <class T>
class ThreadPool
{
	...
private:
    std::queue<T> _q;
    std::vector<Thread<ThreadData>> _threads;	// ThreadData是线程信息类
    int _thread_num; // 线程个数
    pthread_mutex_t _mutex;
    pthread_cond_t _cond;
};

2. 设计一个线程信息类,存储每个线程的名字(可扩展)

class ThreadData
{
public:
    ThreadData(const std::string &threadname)
        :_threadname(threadname)
    {}

    ~ThreadData()
    {}

public:
    std::string _threadname;
};

3. 构造函数和析构函数

  • 构造函数需要初始化互斥量和条件变量,并且创建指定个数的线程,不启动;
  • 析构函数只需要释放互斥量和条件变量即可。
static const int default_num = 5; // 默认线程池中的线程个数

template <class T>
class ThreadPool
{
public:
    ThreadPool(int thread_num = default_num)
        : _thread_num(thread_num)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);

        // 构建指定个数的线程
        for (int i = 0; i < _thread_num; i++)
        {
            // 待优化
            std::string threadname = "thread-";
            threadname += std::to_string(i + 1);

            ThreadData td(threadname);

            Thread<ThreadData> t(threadname, \
                std::bind(&ThreadPool<T>::ThreadRun, \
                this, std::placeholders::_1), td);

            _threads.push_back(t);
            log.LogMessage(Info, "%s is created...\n", threadname.c_str());
        }
    }

    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }
    
	...

private:
    std::queue<T> _q;
    std::vector<Thread<ThreadData>> _threads;
    int _thread_num; // 线程个数
    pthread_mutex_t _mutex;
    pthread_cond_t _cond;
};
  • std::bind(&ThreadPool<T>::ThreadRun, this, std::placeholders::_1)是一个函数模版对象,其中ThreadRun为每一个线程要执行的方法,这个方法是一个成员函数。

4. ThreadRun方法

  • 向队列中拿任务时,需要上锁;队列中没有任务时,需要让线程等待。
template <class T>
class ThreadPool
{
public:
    void ThreadWait(const ThreadData &td)
    {
        log.LogMessage(Debug, "no task, %s is sleeping...\n", td._threadname.c_str());
        pthread_cond_wait(&_cond, &_mutex);
    }

    void ThreadRun(ThreadData &td)
    {
        while (true)
        {
            // 取任务
            T t;
            {
                LockGuard lockguard(&_mutex);
                while (_q.empty())
                {
                    ThreadWait(td);
                    log.LogMessage(Debug, "thread, %s wake up.\n", td._threadname.c_str());
                }

                t = _q.front();
                _q.pop();
            }

            // 处理任务
            t();
            log.LogMessage(Info, "%s handler task %s done, result is: %s\n", \
                td._threadname.c_str(), t.PrintTask().c_str(), t.PrintResult().c_str());
        }
    }

	...

private:
    std::queue<T> _q;
    std::vector<Thread<ThreadData>> _threads;
    int _thread_num; // 线程个数
    pthread_mutex_t _mutex;
    pthread_cond_t _cond;
};

5. Push方法,向任务队列中放任务

  • 访问队列时上锁,Push完成后激活一个等待的线程。
template <class T>
class ThreadPool
{
public:
    void ThreadWakeup()
    {
        pthread_cond_signal(&_cond);
    }

    void Push(T &in)
    {
        log.LogMessage(Debug, "other thread push a task, task is: %s\n", in.PrintTask().c_str());
        LockGuard lockgurad(&_mutex);
        _q.push(in);
        ThreadWakeup();
    }

	...

private:
    std::queue<T> _q;
    std::vector<Thread<ThreadData>> _threads;
    int _thread_num; // 线程个数
    pthread_mutex_t _mutex;
    pthread_cond_t _cond;
};

6. 启动线程池,join线程池中的线程

template <class T>
class ThreadPool
{
public:
    bool Start()
    {
        // 启动
        for (auto &thread : _threads)
        {
            thread.Start();
            log.LogMessage(Info, "%s is running ...\n", thread.ThreadName().c_str());
        }

        return true;
    }

    // for debug
    void Wait()
    {
        for (auto &thread : _threads)
        {
            thread.Join();
        }
    }

	...

private:
    std::queue<T> _q;
    std::vector<Thread<ThreadData>> _threads;
    int _thread_num; // 线程个数
    pthread_mutex_t _mutex;
    pthread_cond_t _cond;
};

2.4 完整代码


#pragma once

#include <iostream>
#include <queue>
#include <pthread.h>
#include <vector>
#include <functional>
#include "Log.hpp"
#include "Thread.hpp"
#include "LockGuard.hpp"

static const int default_num = 5; // 默认线程池中的线程个数

class ThreadData
{
public:
    ThreadData(const std::string &threadname)
        :_threadname(threadname)
    {}

    ~ThreadData()
    {}

public:
    std::string _threadname;
};

template <class T>
class ThreadPool
{
public:
    ThreadPool(int thread_num = default_num)
        : _thread_num(thread_num)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);

        // 构建指定个数的线程
        for (int i = 0; i < _thread_num; i++)
        {
            // 待优化
            std::string threadname = "thread-";
            threadname += std::to_string(i + 1);

            ThreadData td(threadname);

            Thread<ThreadData> t(threadname, \
                std::bind(&ThreadPool<T>::ThreadRun, \
                this, std::placeholders::_1), td);

            _threads.push_back(t);
            log.LogMessage(Info, "%s is created...\n", threadname.c_str());
        }
    }

    bool Start()
    {
        // 启动
        for (auto &thread : _threads)
        {
            thread.Start();
            log.LogMessage(Info, "%s is running ...\n", thread.ThreadName().c_str());
        }

        return true;
    }

    void ThreadWait(const ThreadData &td)
    {
        log.LogMessage(Debug, "no task, %s is sleeping...\n", td._threadname.c_str());
        pthread_cond_wait(&_cond, &_mutex);
    }

    void ThreadWakeup()
    {
        pthread_cond_signal(&_cond);
    }

    void ThreadRun(ThreadData &td)
    {
        while (true)
        {
            // 取任务
            T t;
            {
                LockGuard lockguard(&_mutex);
                while (_q.empty())
                {
                    ThreadWait(td);
                    log.LogMessage(Debug, "thread, %s wake up.\n", td._threadname.c_str());
                }

                t = _q.front();
                _q.pop();
            }

            // 处理任务
            t();
            log.LogMessage(Info, "%s handler task %s done, result is: %s\n", \
                td._threadname.c_str(), t.PrintTask().c_str(), t.PrintResult().c_str());
        }
    }

    void Push(T &in)
    {
        log.LogMessage(Debug, "other thread push a task, task is: %s\n", in.PrintTask().c_str());
        LockGuard lockgurad(&_mutex);
        _q.push(in);
        ThreadWakeup();
    }

    // for debug
    void Wait()
    {
        for (auto &thread : _threads)
        {
            thread.Join();
        }
    }

    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }

private:
    std::queue<T> _q;
    std::vector<Thread<ThreadData>> _threads;
    int _thread_num; // 线程个数
    pthread_mutex_t _mutex;
    pthread_cond_t _cond;
};

测试代码

#include<iostream>
#include<memory>
#include"ThreadPool.hpp"
#include"Task.hpp"
#include"Log.hpp"

int main()
{
    std::unique_ptr<ThreadPool<Task>> tp(new ThreadPool<Task>());
    tp->Start();

    srand((uint64_t)time(nullptr) ^ getpid());

    while(true)
    {
        int x = rand() % 100 + 1;
        usleep(1234);
        int y = rand() % 200;
        usleep(1234);
        char oper = opers[rand() % opers.size()];

        Task t(x, y, oper);
        tp->Push(t);

        sleep(1);
    }

    tp->Wait();

    return 0;
}

3. 改造单例版本


线程池一般在整个程序中只有唯一一份,适合用单例模式实现,下面实现“懒汉”线程池。

第一步:构造函数私有化,并禁掉拷贝构造和赋值

template <class T>
class ThreadPool
{
private:
    ThreadPool(int thread_num = default_num)
        : _thread_num(thread_num)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);

        // 构建指定个数的线程
        for (int i = 0; i < _thread_num; i++)
        {
            // 待优化
            std::string threadname = "thread-";
            threadname += std::to_string(i + 1);

            ThreadData td(threadname);

            Thread<ThreadData> t(threadname, \
                std::bind(&ThreadPool<T>::ThreadRun, \
                this, std::placeholders::_1), td);

            _threads.push_back(t);
            log.LogMessage(Info, "%s is created...\n", threadname.c_str());
        }
    }

    ThreadPool(const ThreadPool<T> &tp) = delete;
    const ThreadPool<T> &operator=(const ThreadPool<T> &tp) = delete;

public:
   	...

private:
    ...
};

第二步:提供静态指针instance,并提供静态方法GetInstance获取静态指针

template <class T>
class ThreadPool
{
private:
    // 构造函数私有化,并禁掉拷贝构造和赋值
	...
	
public:
    // 有线程安全问题
    static ThreadPool<T> *GetInstance()
    {
        LockGuard lockguard(&sig_lock);
        if(instance == nullptr)
        {
            log.LogMessage(Info, "创建单例成功...\n");
            instance = new ThreadPool<T>();
        }

        return instance;
    }

private:
    // 成员变量
    ...
    
	static ThreadPool<T> *instance;
    static pthread_mutex_t sig_lock;
};

template<class T>
ThreadPool<T>* ThreadPool<T>::instance = nullptr;

template<class T>
pthread_mutex_t ThreadPool<T>::sig_lock = PTHREAD_MUTEX_INITIALIZER;

特别注意:

  • GetInstance函数存在线程安全问题,同时可能有多个线程满足instance == nullptr,从而错误的调用多次new方法,所以需要对GetInstance函数整个上锁;
  • 这把锁保护的是instance,所以需要我们设计一个新的互斥量,并且因为GetInstance方法是静态的,没有this指针,所以这个新的互斥量也要是静态的;
  • 可是如果是上面这种写法,效率是很低的,在instance不为空时,任何线程想要调用GetInstance应该是立即返回的,但是此时却要回回上锁;
  • 下面是GetInstance的改良版,保证了在instance不为空时,任何线程想要调用GetInstance会立即返回,不需要上锁。
template <class T>
class ThreadPool
{
private:
  	...

public:
    // 有线程安全问题
    static ThreadPool<T> *GetInstance()
    {
        if(instance == nullptr)
        {
            LockGuard lockguard(&sig_lock);
            if(instance == nullptr)
            {
                log.LogMessage(Info, "创建单例成功...\n");
                instance = new ThreadPool<T>();
            }
        }

        return instance;
    }

private:
    ...

    static ThreadPool<T> *instance;
    static pthread_mutex_t sig_lock;
};

template<class T>
ThreadPool<T>* ThreadPool<T>::instance = nullptr;

template<class T>
pthread_mutex_t ThreadPool<T>::sig_lock = PTHREAD_MUTEX_INITIALIZER;

完整单例线程池代码

#pragma once

#include <iostream>
#include <queue>
#include <pthread.h>
#include <vector>
#include <functional>
#include "Log.hpp"
#include "Thread.hpp"
#include "LockGuard.hpp"

static const int default_num = 5; // 默认线程池中的线程个数

class ThreadData
{
public:
    ThreadData(const std::string &threadname)
        :_threadname(threadname)
    {}

    ~ThreadData()
    {}

public:
    std::string _threadname;
};

template <class T>
class ThreadPool
{
private:
    ThreadPool(int thread_num = default_num)
        : _thread_num(thread_num)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);

        // 构建指定个数的线程
        for (int i = 0; i < _thread_num; i++)
        {
            // 待优化
            std::string threadname = "thread-";
            threadname += std::to_string(i + 1);

            ThreadData td(threadname);

            Thread<ThreadData> t(threadname, \
                std::bind(&ThreadPool<T>::ThreadRun, \
                this, std::placeholders::_1), td);

            _threads.push_back(t);
            log.LogMessage(Info, "%s is created...\n", threadname.c_str());
        }
    }

    ThreadPool(const ThreadPool<T> &tp) = delete;
    const ThreadPool<T> &operator=(const ThreadPool<T> &tp) = delete;

public:
    // 有线程安全问题
    static ThreadPool<T> *GetInstance()
    {
        if(instance == nullptr)
        {
            LockGuard lockguard(&sig_lock);
            if(instance == nullptr)
            {
                log.LogMessage(Info, "创建单例成功...\n");
                instance = new ThreadPool<T>();
            }
        }

        return instance;
    }

    bool Start()
    {
        // 启动
        for (auto &thread : _threads)
        {
            thread.Start();
            log.LogMessage(Info, "%s is running ...\n", thread.ThreadName().c_str());
        }

        return true;
    }

    void ThreadWait(const ThreadData &td)
    {
        log.LogMessage(Debug, "no task, %s is sleeping...\n", td._threadname.c_str());
        pthread_cond_wait(&_cond, &_mutex);
    }

    void ThreadWakeup()
    {
        pthread_cond_signal(&_cond);
    }

    void ThreadRun(ThreadData &td)
    {
        while (true)
        {
            // 取任务
            T t;
            {
                LockGuard lockguard(&_mutex);
                while (_q.empty())
                {
                    ThreadWait(td);
                    log.LogMessage(Debug, "thread, %s wake up.\n", td._threadname.c_str());
                }

                t = _q.front();
                _q.pop();
            }

            // 处理任务
            t();
            log.LogMessage(Info, "%s handler task %s done, result is: %s\n", \
                td._threadname.c_str(), t.PrintTask().c_str(), t.PrintResult().c_str());
        }
    }

    void Push(T &in)
    {
        log.LogMessage(Debug, "other thread push a task, task is: %s\n", in.PrintTask().c_str());
        {
            LockGuard lockgurad(&_mutex);
            _q.push(in);
            ThreadWakeup();
        }
    }

    // for debug
    void Wait()
    {
        for (auto &thread : _threads)
        {
            thread.Join();
        }
    }

    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }

private:
    std::queue<T> _q;   // 任务队列
    std::vector<Thread<ThreadData>> _threads;
    int _thread_num; // 线程个数
    pthread_mutex_t _mutex;
    pthread_cond_t _cond;

    static ThreadPool<T> *instance;
    static pthread_mutex_t sig_lock
};

template<class T>
ThreadPool<T>* ThreadPool<T>::instance = nullptr;

template<class T>
pthread_mutex_t ThreadPool<T>::sig_lock = PTHREAD_MUTEX_INITIALIZER;

测试代码

#include<iostream>
#include<memory>
#include"ThreadPool.hpp"
#include"Task.hpp"
#include"Log.hpp"

int main()
{
    ThreadPool<Task>::GetInstance()->Start();

    srand((uint64_t)time(nullptr) ^ getpid());

    while(true)
    {
        int x = rand() % 100 + 1;
        usleep(1234);
        int y = rand() % 200;
        usleep(1234);
        char oper = opers[rand() % opers.size()];

        Task t(x, y, oper);
        ThreadPool<Task>::GetInstance()->Push(t);

        sleep(1);
    }

    ThreadPool<Task>::GetInstance()->Wait();

    return 0;
}

4. 改进思路


设计一个动态线程池,可以根据当前的线程池中的线程数量和任务数量,动态创建或销毁线程。

具体思路以伪代码的形式呈现给大家,感兴趣的小伙伴可以自己实现:

template <class T>
class ThreadPool
{
private:
  	...

public:
  	...

    void CheckSelf()
    {
        // 1. _task_num > _task_num_high_water && _thread_num < _thread_num_high_water
        // 创建更多线程,并更新_thread_num

        // 2. _task_num == _task_num_low_water && _thread_num >= _thread_num_high_water
        // 把自己退出了,并且更新_thread_num
    }

    void ThreadRun(ThreadData &td)
    {
        while (true)
        {
            // 取任务
            CheckSelf();
            ...
        }
    }

private:
    ...

    // 扩展
    int _task_num;	// 任务数量

	// 高低水位线
    int _thread_num_low_water;
    int _thread_num_high_water;
    int _task_num_low_water;
    int _task_num_high_water;
};


http://www.kler.cn/a/549396.html

相关文章:

  • nlf 3d pose 部署学习笔记
  • 在使用 uni.getLocation 步骤和一些坑
  • Django简介
  • RedisTimeSeries
  • vscode本地和远程对应分支没有同步提交数量
  • 深度学习实战道路裂缝缺陷识别
  • 【大模型】DeepSeek 高级提示词技巧使用详解
  • LeetCodeHot100(普通数组和矩阵篇)
  • 计算机网络知识速记 :HTTP多个TCP连接的实现方式
  • 大语言模型评判者是什么,有什么应用领域
  • 【数据采集】基于Selenium爬取猫眼Top100电影信息
  • Tetragon:一款基于eBPF的运行时环境安全监控工具
  • 【机器学习】线性回归 线性回归模型的损失函数 MSE RMSE MAE R方
  • 【Elasticsearch】multi_match查询
  • 【微服务学习四】gateway网关的使用
  • 切换git仓库远程地址
  • SaaS 平台开发要点
  • 如何在Servlet容器中使用HttpServletResponse?
  • 【Three.js】JS 3D library(一个月进化史)
  • 根据deepseek模型微调训练自动驾驶模型及数据集的思路