当前位置: 首页 > article >正文

Linux应用——线程池

1. 线程池要求

我们创建线程池的目的本质上是用空间换取时间,而我们选择于 C++ 的类内包装原生线程库的形式来创建,其具体实行逻辑如图

可以看到,整个线程池其实就是一个大型的 CP 模型,接下来我们来完成它

2. 整体模板

#pragma once

#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <pthread.h>

struct ThreadInfo
{
    pthread_t tid;
    std::string name;
};

static const int defalutnum = 5;

template<class T>
class ThreadPool
{
public:
    ThreadPoo1(int num = defalutnum)
        :threads_(num)
    {
        pthread_mutex_init(&mutex_, nullptr);
        pthread_cond_init(&cond_, nullptr);
    }
    
    ~ThreadPoo1()
    {
        pthread_mutex_destroy(&mutex_);
        pthread_mutex_destroy(&cond_);
    }
private:
    std::vector<ThreadInfo> threads_;
    std::queue<T>tasks_;

    pthread_mutex_t mutex_;
    pthread_cond_t cond_;
};

3. 具体实现

#pragma once

#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <pthread.h>

// 存储线程池中各个线程信息
struct ThreadInfo
{
    pthread_t tid;
    std::string name;
};

// 默认线程池容量
static const int defalutnum = 5;

template<class T>
class ThreadPool
{
private:
    // 加锁
    void Lock()
    {
        pthread_mutex_lock(&mutex_);
    }

    // 释放锁
    void Unlock()
    {
        pthread_mutex_unlock (&mutex_);
    }

    // 唤醒线程
    void Wakeup()
    {
        pthread_cond_signal(&cond_);
    }

    // 资源不就绪, 线程同步
    void ThreadSleep()
    {
        pthread_cond_wait(&cond_, &mutex_);
    }

    // 对当前任务列表判空
    bool IsQueueEmpty()
    {
        return tasks_.size() == 0 ? true : false;
    }

    // 获取线程 name
    std::string GetThreadName(pthread_t tid)
    {
        for (const auto &ti : threads_)
        {
            if (ti.tid == tid)
                return ti.name;
        }
        return "None";
    }
public:
    ThreadPool(int num = defalutnum)
        :threads_(num)
    {
        pthread_mutex_init(&mutex_, nullptr);
        pthread_cond_init(&cond_, nullptr);
    }
    
    // 由于 pthread_create 函数的特殊性
    // 只能将 HandlerTask 设置为静态函数
    // 同时将 this 指针以参数的形式传入
    static void *HandlerTask(void *args)
    {
        ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);

        while (true)
        {
            // 确保同一时刻只有一个线程在进行消费
            tp->Lock();

            // 如果当前任务列表为空就让线程等待资源
            // 为防止伪唤醒的情况, 使用 while
            while (tp->IsQueueEmpty())
            {
                tp->ThreadSleep();
            }
            T t = tp->Pop();

            tp->Unlock();

            // 执行任务
            // 注: 需要任务中重载 operator()
            t();
        }
    }

    // 启动线程池
    void Start()
    {
        int num = threads_.size();
        for (int i = 0; i < num; i++)
        {
            threads_[i].name = "thread-" + std::to_string(i+1);
            pthread_create(&(threads_[i].tid), nullptr, HandlerTask, this);
        }
    }

    // Pop 函数在锁内执行因此不需要单独加锁
    T Pop()
    {
        T t = tasks_.front();
        tasks_.pop();
        return t;
    }

    // 将外界资源投入线程池
    void Push(const T &t)
    {
        Lock();

        tasks_.push(t);
        Wakeup(); // 投入资源成功后唤醒线程 

        Unlock();
    }

    ~ThreadPool()
    {
        pthread_mutex_destroy(&mutex_);
        pthread_cond_destroy(&cond_);
    }
private:
    std::vector<ThreadInfo> threads_; // 线程池中所有线程的信息
    std::queue<T> tasks_;             // 任务队列

    pthread_mutex_t mutex_;           // 互斥锁
    pthread_cond_t cond_;             // 条件变量
};

4. 使用测试

在这里我们引用一个 Task.hpp 的任务工具,如下

#pragma once

#include "Log.hpp"
#include <iostream>
#include <string>

std::string opers = "+-*/%";

enum ErrorCode
{
    DevideZero,
    ModZero,
    Unknown,
    Normal
};

class Task
{
public:
    Task(int x, int y, char op)
        :a(x), b(y), op_(op) 
    {}

    void operator()()
    {
        run();
        lg(Info, "run a task: %s", GetResult().c_str());
    }

    void run()
    {
        switch(op_)
        {
            case '+':
                answer = a + b;
                break;
            case '-':
                answer = a - b;
                break;
            case '*':
                answer = a * b;
                break;
            case '/':
                if (b != 0) answer = a / b;
                else exitcode = DevideZero;
                break;
            case '%':
                if (b != 0) answer = a % b;
                else exitcode = ModZero;
                break;
            default:
                lg(Error, "Using correct operation: + - * / %");
                exitcode = Unknown;
                break;
        }
        
    }

    std::string GetTask()
    {
        std::string ret;
        ret += "Task: ";
        ret += std::to_string(a);
        ret += " ";
        ret += op_;
        ret += " ";
        ret += std::to_string(b);
        ret += " ";
        ret += "= ?";

        return ret;
    }

    std::string GetResult()
    {
        std::string ret;
        
        if (exitcode <= Unknown)
        {
            ret += "run the task fail, reason: ";
            switch (exitcode)
            {
                case DevideZero:
                    ret += "DevideZero";
                    break;
                case ModZero:
                    ret += "ModZero";
                    break;
                case Unknown:
                    ret += "Unknown";
                    break;
                default:
                    break;
            }
        }
        else
        {
            ret += "run the task success, result: ";
            ret += std::to_string(a);
            ret += " ";
            ret += op_;
            ret += " ";
            ret += std::to_string(b);
            ret += " ";
            ret += "= ";
            ret += std::to_string(answer);
        }

        return ret;
    }

    ~Task()
    {}

private:
    int a;
    int b;
    char op_;

    int answer = 0;
    ErrorCode exitcode = Normal;
};

main 函数如下

#include <iostream>
#include <time.h>
#include "ThreadPool.hpp"
#include "Task.hpp"

extern std::string opers;

int main()
{
    ThreadPool<Task>* tp = new ThreadPool<Task>(5);
    
    tp->Start();
    
    srand(time(nullptr)^ getpid());
    while(true)
    {
        //1.构建任务
        int x = rand() % 10 + 1;
        usleep(10);
        int y =rand() % 5;
        char op = opers[rand()%opers.size()];

        Task t(x, y, op);
        tp->Push(t);
    
        // 2.交给线程池处理
        lg(Info, "main thread make task: %s", t.GetTask().c_str());
        sleep(1);
    }

    return 0;
}

运行效果如下

也就是说我们想要使用这个线程池,只需要

1. 创建一个固定容量的进程池

2. 调用 Start() 函数启动进程池

3. 调用 Push() 函数向进程池中添加任务

即可! 


http://www.kler.cn/a/392546.html

相关文章:

  • 1.1.1 认识时间复杂度
  • (二)当人工智能是一个函数,函数形式怎么选择?ChatGPT的函数又是什么?
  • node.js之---事件循环机制
  • 出现 Error during query execution: StatementCallback; bad SQL grammar 解决方法
  • 【51项目】51单片机自制小霸王游戏机
  • vue v-for 数据增加页面不刷新
  • Spring Boot框架:电商解决方案的构建
  • 2.操作系统常见面试问题2
  • MySQL数据库常用命令大全(完整版——表格形式)
  • 「漏洞复现」某赛通电子文档安全管理系统 HookService SQL注入漏洞复现(CVE-2024-10660)
  • C++(Qt)软件调试---符号转换工具cv2pdb (24)
  • 【c++丨STL】list的使用
  • 【目标检测】【Ultralytics-YOLO系列】Windows11下YOLOV5人脸目标检测
  • 【ACM出版】第四届信号处理与通信技术国际学术会议(SPCT 2024)
  • 软间隔支持向量机
  • 表格理解专题(五)表头和数据项定义
  • spark 设置hive.exec.max.dynamic.partition不生效
  • 01-Ajax入门与axios使用、URL知识
  • 深入理解指针
  • 搜索引擎算法解析提升搜索效率的关键要素
  • 【Linux】Ubuntu中muduo库的编译环境安装
  • C# (定时器、线程)
  • Flutter【04】高性能表单架构设计
  • 【深度学习】线性与非线性
  • 工程认证标准下的Spring Boot课程管理平台
  • Scala中set和case class的特点和例题