当前位置: 首页 > article >正文

线程池以及日志、线程总结

一、线程池以及日志

1、基础线程池写法

主线程在main函数中构建一个线程池,初始化(Init)后开始工作(Start)

此时线程池中每个线程都已经工作起来了,只是任务队列中任务为空,所有线程处于休眠状态(通过线程同步中的条件变量实现,即pthread_cond_wait)

当主线程向任务队列中添加任务后,唤醒一个在休眠中的线程,让其执行任务,执行完后再进入休眠状态。

基础的线程池的写法:

ThreadPool.hpp:

#pragma once

#include <vector>
#include <queue>
#include <pthread.h>
#include <unistd.h>
#include "mythreadlib.hpp"
#define DEFAULT_THREAD_NUMS 5

template <typename T> // 什么类型的任务
class ThreadPool
{

private:
    void test(const std::string name)
    {
        while (1)
        {
            std::cout << name << " is running" << std::endl;
            sleep(1);
        }
    }
    void LockQueue()
    {
        pthread_mutex_lock(&_mutex);
    }
    void UnlockQueue()
    {
        pthread_mutex_unlock(&_mutex);
    }
    bool QueueIsEmpty()
    {
        return _task_queue.empty();
    }
    void Sleep()
    {
        pthread_cond_wait(&_cond, &_mutex);
    }
    void Wake()
    {
        pthread_cond_signal(&_cond);
    }
    void WakeAll()
    {
        pthread_cond_broadcast(&_cond);
    }

    void HandlerTask(const std::string name)
    {
        while (1)
        {
            LockQueue();
            while (QueueIsEmpty() && _isrunning)
            {
                _sleep_nums++;
                Sleep();
                _sleep_nums--;
            }
            if (QueueIsEmpty() && !_isrunning)
            {
                UnlockQueue();
                break;
            }
            // 苏醒过来 执行任务
            T t = _task_queue.front();
            _task_queue.pop();

            UnlockQueue();
            std::cout<<name<<" is running:";
            t(); // 执行任务和获取任务一定要分开,一个在锁内,一个在锁外
        }
    }

public:
    ThreadPool(int threadnums = DEFAULT_THREAD_NUMS)
        : _threadnums(threadnums), _isrunning(false), _sleep_nums(0)
    {
        //_my_threads.resize(_threadnums); 不能resize会有问题
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }

    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }
    void Init()
    {
        fun_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);
        // fun_t func = std::bind(&ThreadPool::test, this, std::placeholders::_1);

        for (int i = 0; i < _threadnums; i++)
        {
            std::string name = "thread-" + std::to_string(i + 1);
            _my_threads.emplace_back(name, func); // 要将test改成处理Task的函数
        }
    }

    void Start()
    {
        _isrunning = true;
        for (auto &thread : _my_threads)
        {
            thread.Start();
        }
    }

    void Stop()
    {
        LockQueue();
        _isrunning = false;

        WakeAll();

        UnlockQueue();
    }

    void Enqueue(const T &in)
    {
        // 主线程向线程池中派发任务,让线程池中的线程执行任务

        LockQueue();
        if (_isrunning)
        {
            _task_queue.push(in);
            if(_sleep_nums>0) Wake();//只要有休眠的线程就进行唤醒
            
        }

        UnlockQueue();
    }

private:
    int _threadnums;
    std::vector<mythread> _my_threads;
    int _sleep_nums;
    bool _isrunning;           // 也是临界资源
    std::queue<T> _task_queue; // 临界资源
    pthread_mutex_t _mutex;
    pthread_cond_t _cond;
};

mythreadlib.hpp:

#pragma once

#include <pthread.h>
#include <iostream>
#include <string>
#include <functional>

using fun_t = std::function<void(const std::string &)>;

class mythread
{

    // typedef void (*fun_t)(ThreadData *td);

public:
    mythread(const std::string name, fun_t func)
        : _name(name), _func(func)
    {
        _isrunning = false;
    }
    static void *threadRun(void *args)
    {
        mythread *thread = static_cast<mythread *>(args);
        // thread->_func(thread->_td);
        thread->_func(thread->_name);
        thread->_isrunning = false; // 运行结束
        return nullptr;
    }

    bool Start()
    {
        int n = pthread_create(&_tid, nullptr, threadRun, (void *)this);
        if (n != 0)
            return false;
        else
        {
            _isrunning = true;
            return true;
        }
    }
    void Stop()
    {
        if (_isrunning)
        {
            _isrunning = false;
            ::pthread_cancel(_tid);
        }
    }
    void Join()
    {
        pthread_join(_tid, nullptr);
    }

    ~mythread()
    {
        if (_isrunning)
        {
            Stop();
            Join();
        }
    }

private:
    pthread_t _tid;
    std::string _name;
    bool _isrunning;
    fun_t _func;
};

task.hpp:

#pragma once

#include<iostream>
#include<functional>

// typedef std::function<void()> task_t;
// using task_t = std::function<void()>;

// void Download()
// {
//     std::cout << "我是一个下载的任务" << std::endl;
// }


// 要做加法
class Task
{
public:
    Task()
    {
    }
    Task(int x, int y) : _x(x), _y(y)
    {
    }
    void Excute()
    {
        _result = _x + _y;
        std::cout<<result()<<std::endl;
    }
    void operator ()()
    {
        Excute();
    }
    std::string debug()
    {
        std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=?";
        return msg;
    }
    std::string result()
    {
        std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=" + std::to_string(_result);
        return msg;
    }

private:
    int _x;
    int _y;
    int _result;
};

main.cpp:

#include"ThreadPool.hpp"
#include"task.hpp"
#include<ctime>
#include<unistd.h>

int main()
{
    //线程池
    srand(time(nullptr)^getpid());
    ThreadPool<Task> *thread_pool=new ThreadPool<Task>();
    thread_pool->Init();
    thread_pool->Start();
    while(1)
    {
        int num1=rand()%9+1;
        int num2=rand()%9+1;
        Task ADD(num1,num2);
        thread_pool->Enqueue(ADD);
        
        sleep(1);

    }
    return 0;
}

2、日志

(1)日志概念

日志:软件运行的记录信息,可以向显示器打印,也可以向文件中打印,可以根据信息的重要程度使用不同的打印格式。

(2)基本格式

【日志等级】【pid】【filename】【filenumber】【time】日志内容(支持可变参数)

其中日志等级可分为:DEBUG(调试)、INFO(信息)、 WARNING(警告) 、ERROR(错误) 、FATAL(致命的)

可变参数的处理:用va_list指向可变部分, vsnprintf()

29. C语言 可变参数详解-CSDN博客

使用宏优化日志格式。

最终形成的日志:

#pragma once

#include <ctime>
#include <string>
#include <unistd.h>
#include <cstdarg>
#include <fstream>
#include <cstring>
#include "LockGuard.hpp"
#include <pthread.h>
#define FLUSHSCREEN 1
#define FLUSHFILE 2
const std::string default_file = "log.txt";

enum
{
    DEBUG = 1,
    INFO,
    WARNING,
    ERROR,
    FATAL
};

class logmessage
{
public:
    std::string _level;
    int _id;
    std::string _filename;
    int _filenumber;
    std::string _time;
    std::string _content;
};

std::string LevelToString(int level)
{
    switch (level)
    {
    case 1:
        return "DEBUG";
        break;
    case 2:
        return "INFO";
        break;
    case 3:
        return "WARNING";
        break;
    case 4:
        return "ERROR";
        break;
    case 5:
        return "FATAL";
        break;
    }
}

std::string CurrentTime()
{
    time_t now = time(nullptr);
    struct tm *curr_time = localtime(&now);
    char time_string[128];
    snprintf(time_string, sizeof(time_string), "%d-%02d-%02d %02d:%02d:%02d", curr_time->tm_year + 1900, // 02d表示的是控制输出格式为2位整数,不够2位的用0填充
             curr_time->tm_mon + 1,
             curr_time->tm_mday,
             curr_time->tm_hour,
             curr_time->tm_min,
             curr_time->tm_sec);
    return time_string;
}

pthread_mutex_t _mutex=PTHREAD_MUTEX_INITIALIZER;
class Log
{
public:
    Log(const std::string flush_file = default_file, int flush_type = FLUSHSCREEN)
        : _flush_file(flush_file), _flush_type(flush_type)
    {
        //pthread_mutex_init(&_mutex, nullptr);
    }
    ~Log()
    {
        //pthread_mutex_destroy(&_mutex);
    }

    void logMessage(int level, const std::string filename, int filenumber, const char *format, ...) // 可变参数
    {
        logmessage logm;
        logm._level = LevelToString(level);
        logm._id = getpid();
        logm._filename = filename;
        logm._filenumber = filenumber;
        va_list vl;
        va_start(vl, format);
        char log_info[1024];
        vsnprintf(log_info, sizeof(log_info), format, vl);
        va_end(vl);
        logm._content = log_info;
        logm._time = CurrentTime();
        FlushLog(logm);
    }
    void Enable(int type)
    {
        _flush_type = type;
    }
    void FlushLog(logmessage &log)
    {
        //pthread_mutex_lock(&_mutex);
        lockguard lock(&_mutex);
        switch (_flush_type)
        {
        case FLUSHSCREEN:
            FlushToScreen(log);
            break;
        case FLUSHFILE:
            FlushToFile(log);
            break;
        }
        //pthread_mutex_unlock(&_mutex);

    }
    void FlushToScreen(logmessage &log)
    {
        printf("[%s][%d][%s][%d][%s] %s", log._level.c_str(), log._id, log._filename.c_str(), log._filenumber, log._time.c_str(), log._content.c_str());
    }
    void FlushToFile(logmessage &log)
    {
        std::ofstream out(_flush_file, std::ios::app);
        char message[1024];
        snprintf(message, sizeof(message), "[%s][%d][%s][%d][%s] %s", log._level.c_str(), log._id, log._filename.c_str(), log._filenumber, log._time.c_str(), log._content.c_str());
        out.write(message, strlen(message));
        out.close();
    }

private:
    std::string _flush_file;
    int _flush_type;
    
};

// 优化
Log glog;

#define LOG(level, format, ...)                                            \
    do                                                                     \
    {                                                                      \
        glog.logMessage(level, __FILE__, __LINE__, format, ##__VA_ARGS__); \
    } while (0) // 带两个#是为了处理可变参数为空的情况

#define EnableScreen()  \
    do                  \
    {                   \
        glog.Enable(1); \
    } while (0)

#define EnableFile()    \
    do                  \
    {                   \
        glog.Enable(2); \
    } while (0)

3、单例模式

单例模式:一个类只创建一个对象

单例模式实现方式有两种:懒汉模式和饿汉模式。

4、最终代码

经过单例模式的修改,最终形成的线程池代码:

Threadpool.hpp:

#pragma once

#include <vector>
#include <queue>
#include <pthread.h>
#include <unistd.h>
#include "mythreadlib.hpp"
#include "Log.hpp"
#include "LockGuard.hpp"
#define DEFAULT_THREAD_NUMS 5

template <typename T> // 什么类型的任务
class ThreadPool
{

private:
    void test(const std::string name)
    {
        while (1)
        {
            std::cout << name << " is running" << std::endl;
            sleep(1);
        }
    }
    void LockQueue()
    {
        pthread_mutex_lock(&_mutex);
    }
    void UnlockQueue()
    {
        pthread_mutex_unlock(&_mutex);
    }
    bool QueueIsEmpty()
    {
        return _task_queue.empty();
    }
    void Sleep()
    {
        pthread_cond_wait(&_cond, &_mutex);
    }
    void Wake()
    {
        pthread_cond_signal(&_cond);
    }
    void WakeAll()
    {
        pthread_cond_broadcast(&_cond);
    }

    void HandlerTask(const std::string name)
    {
        while (1)
        {
            LockQueue();
            while (QueueIsEmpty() && _isrunning)
            {
                _sleep_nums++;
                LOG(DEBUG, "%s begin sleep\n", name.c_str());
                Sleep();
                LOG(DEBUG, "%s wake up\n", name.c_str());

                _sleep_nums--;
            }
            if (QueueIsEmpty() && !_isrunning)
            {
                UnlockQueue();
                break;
            }
            // 苏醒过来 执行任务
            T t = _task_queue.front();
            _task_queue.pop();

            UnlockQueue();

            t(); // 执行任务和获取任务一定要分开,一个在锁内,一个在锁外
            LOG(DEBUG, "%s Handler task done\n", name.c_str());
        }
    }
    ThreadPool(int threadnums = DEFAULT_THREAD_NUMS)
        : _threadnums(threadnums), _isrunning(false), _sleep_nums(0)
    {
        //_my_threads.resize(_threadnums); 不能resize会有问题
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }
    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }
    ThreadPool(const ThreadPool<T> &) = delete;
    void operator=(const ThreadPool<T> &) = delete;//设置拷贝构造和=操作符重载为delete

    void Init()
    {
        fun_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);
        // fun_t func = std::bind(&ThreadPool::test, this, std::placeholders::_1);

        for (int i = 0; i < _threadnums; i++)
        {
            std::string name = "thread-" + std::to_string(i + 1);
            _my_threads.emplace_back(name, func); // 要将test改成处理Task的函数
            LOG(DEBUG, "construct %s done,init success\n", name.c_str());
        }
    }

    void Start()
    {
        _isrunning = true;
        for (auto &thread : _my_threads)
        {
            thread.Start();
            LOG(DEBUG, "start %s done\n", thread.getname().c_str());
        }
    }

public:
    static ThreadPool<T> *Get_Instance()
    {
        if (_tp == nullptr)
        {
            lockguard lock(&_tp_mutex);
            if (_tp == nullptr)
            {
                _tp = new ThreadPool<T>;
                _tp->Init();
                _tp->Start();
                LOG(DEBUG,"Thread pool create success!\n");
            }
        }

        return _tp;
    }
    void Stop()
    {
        LockQueue();
        _isrunning = false;

        WakeAll();

        UnlockQueue();
        LOG(DEBUG,"all thread stop success!\n");
    }

    void Enqueue(const T &in)
    {
        // 主线程向线程池中派发任务,让线程池中的线程执行任务

        LockQueue();
        if (_isrunning)
        {
            _task_queue.push(in);
            if (_sleep_nums > 0)
                Wake(); // 只要有休眠的线程就进行唤醒
        }

        UnlockQueue();
    }

private:
    int _threadnums;
    std::vector<mythread> _my_threads;
    int _sleep_nums;
    bool _isrunning;           // 也是临界资源
    std::queue<T> _task_queue; // 临界资源
    pthread_mutex_t _mutex;
    pthread_cond_t _cond;
    static ThreadPool<T> *_tp;
    static pthread_mutex_t _tp_mutex;
};

template <typename T>
ThreadPool<T> *ThreadPool<T>::_tp = nullptr;

template <typename T>
pthread_mutex_t ThreadPool<T>::_tp_mutex = PTHREAD_MUTEX_INITIALIZER;

main.cpp:

#include "ThreadPool.hpp"
#include "task.hpp"
#include "Log.hpp"
#include <ctime>
#include <unistd.h>

/*int main()
{
    //线程池
    srand(time(nullptr)^getpid());
    ThreadPool<Task> *thread_pool=new ThreadPool<Task>();
    thread_pool->Init();
    thread_pool->Start();
    while(1)
    {
        int num1=rand()%9+1;
        int num2=rand()%9+1;
        Task ADD(num1,num2);
        thread_pool->Enqueue(ADD);

        sleep(1);

    }
    return 0;
}*/

/*int main()
{
    //测试未优化的日志
    Log log1;
    log1.logMessage(DEBUG, __FILE__, __LINE__, "this is a logmessage of %d,%f\n", 30, 3.14);

    sleep(1);
    log1.logMessage(DEBUG, __FILE__, __LINE__, "this is a logmessage of %d,%f\n", 30, 3.14);
    log1.Enable(FLUSHFILE);
    sleep(1);
    log1.logMessage(DEBUG, __FILE__, __LINE__, "this is a logmessage of %d,%f\n", 30, 3.14);
    sleep(1);
    log1.logMessage(DEBUG, __FILE__, __LINE__, "this is a logmessage of %d,%f\n", 30, 3.14);
    return 0;
}*/

/*int main()
{
    EnableScreen();
    //LOG(DEBUG,"YES");//依然支持
    LOG(DEBUG,"this is a logmessage of %d,%f\n", 30, 3.14);
    LOG(WARNING,"this is a logmessage of %d,%f\n", 30, 3.14);
    LOG(ERROR,"this is a logmessage of %d,%f\n", 30, 3.14);
    LOG(INFO,"this is a logmessage of %d,%f\n", 30, 3.14);


    EnableFile();
    LOG(DEBUG,"this is a logmessage of %d,%f\n", 30, 3.14);
    LOG(WARNING,"this is a logmessage of %d,%f\n", 30, 3.14);
    LOG(ERROR,"this is a logmessage of %d,%f\n", 30, 3.14);
    LOG(INFO,"this is a logmessage of %d,%f\n", 30, 3.14);

    return 0;
}*/

/*int main()
{
    //线程池
    srand(time(nullptr)^getpid());
    ThreadPool<Task> *thread_pool=new ThreadPool<Task>();
    thread_pool->Init();
    thread_pool->Start();
    while(1)
    {
        int num1=rand()%9+1;
        int num2=rand()%9+1;
        Task ADD(num1,num2);
        thread_pool->Enqueue(ADD);
        LOG(INFO,"task is product:%s\n",ADD.debug().c_str());

        sleep(1);

    }
    return 0;
}*/

int main()
{
    // 单例模式线程池的测试
    srand(time(nullptr) ^ getpid());
    int cnt=10;
    while (cnt--)
    {
        int num1 = rand() % 9 + 1;
        int num2 = rand() % 9 + 1;
        Task ADD(num1, num2);
        ThreadPool<Task>::Get_Instance()->Enqueue(ADD);
        sleep(1);
    }
    ThreadPool<Task>::Get_Instance()->Stop();

    return 0;
}

二、线程概念拓展

1、可重入和线程安全

(1)概念

(2)理解

一个函数是可重入的,那么它一定是线程安全的;但一个线程是安全的,不表示该线程内的函数是可重入的,其内部的函数可能是不可重入的,例如加锁但不释放的函数,是线程安全但不可重入函数。

2、死锁

(1)死锁概念

一个线程一把锁也可能形成死锁,例如对同一个锁加锁两次而未释放。

但更多的情况是俩个执行流,两把锁,要推进代码需要两把锁都申请到,但俩个执行流分别申请一把锁,而不释放,都同时申请对方的锁。

(2)死锁的必要条件

(3)避免死锁

破坏必要条件:

①尽量不使用锁

②当某优先级比较高的线程申请不到锁的时候强制释放锁

③申请锁的代码的顺序要一致

3、STL,智能指针和线程安全


http://www.kler.cn/a/540461.html

相关文章:

  • WidowX-250s 机械臂学习记录
  • Spring Boot Actuator(官网文档解读)
  • 用 DeepSeek + Kimi 自动做 PPT,效率起飞
  • C++ STL容器之vector的使用及复现
  • C语言中的共用体(Union):嵌入式开发中的节省内存利器
  • Jenkins数据备份到windows FTP服务器
  • v-for的数据返回和接口返回不一致
  • LeetCode刷题---数组---665
  • SpringSecurity高级用法
  • day4.。。。。。。。。。。。。。。。。。
  • 【学术投稿-第六届新材料与清洁能源国际学术会议(ICAMCE 2025)】组织与结构:HTML中的<fieldset>与<legend>标签解析
  • UE求职Demo开发日志#27 几个交互完善
  • C++学习笔记——类和对象(中)
  • [c语言日记]动态规划入门:杨辉三角
  • 2月10日习题
  • Android多包路由方案: ARouter 路由库
  • java实现Http请求方式的几种常见方式
  • 安装zk的方法
  • 今日AI和商界事件(2025-02-10)
  • 网站的记住我功能与用户登录持久化
  • 【UVM】寄存器模型
  • opencv:基于暗通道先验(DCP)的内窥镜图像去雾
  • fastjson2学习大纲
  • init的service 启动顺序
  • 基于 gitee 的 CI/CD
  • 球弹跳高度的计算(信息学奥赛一本通-1085)