当前位置: 首页 > article >正文

手写线程池c

c语言版线程池

  • 为什么要用线程池
  • 线程池主要的三部分
  • 声明
  • 实现
  • 测试
  • 结果

为什么要用线程池

使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。

线程池主要的三部分

任务队列:存储需要处理的任务,由工作的线程来处理这些任务—>存储函数的地址
任务队列就是一块存储空间
生产者线程向任务队列放任务,消费者线程从任务队列中取任务
线程池维护的是消费者和任务队列
谁使用任务队列谁就是生产者,把任务放入到任务队列实际上调用了线程池的api接口
消费者循环消费,如果没有东西进行消费了那么就使用条件变量可以进行阻塞操作(生产者同理)
工作线程(任务队列任务的消费者)–> n个
主要是取任务---->任务都是回调函数
管理者线程 --> 1个
它的任务是周期性的对任务队列中的任务数量以及处于忙状态的工作线程个数进行检测
当任务过多的时候, 可以适当的创建一些新的工作线程
当任务过少的时候, 可以适当的销毁一些工作的线程

声明

#pragma once
#include"threadpool.h"
#include<pthread.h>
#include<unistd.h>
#include<stdio.h>
#include<stdlib.h>
#include<string.h>

typedef struct ThreadPool ThreadPool;

ThreadPool* threadPoolCreate(int min, int max, int queueSize);

// 工作的线程(消费者线程)任务函数
void* worker(void* arg);

//管理者线程
void* manager(void* arg);

//线程退出
void threadExit(ThreadPool* pool);

//线程添加
void threadPoolAdd(ThreadPool* pool, void* function(void*), void* arg);

// 获取线程池中工作的线程的个数
int threadPoolBusyNum(ThreadPool* pool);

// 获取线程池中活着的线程的个数
int threadPoolAliveNum(ThreadPool* pool);

//销毁线程池
int threadPoolDestroy(ThreadPool* pool);

实现

#include"threadpool.h"
const int NUMBER = 2;

typedef struct Task
{
    void (*function)(void* arg);
    void* arg;
}Task;

struct ThreadPool
{
    // 任务队列
    Task* taskQ;
    int queueCapacity;  // 容量
    int queueSize;      // 当前任务个数
    int queueFront;     // 队头 -> 取数据
    int queueRear;      // 队尾 -> 放数据

    pthread_t managerID;    // 管理者线程ID
    pthread_t* threadIDs;   // 工作的线程ID
    int minNum;             // 最小线程数量
    int maxNum;             // 最大线程数量
    int busyNum;            // 忙的线程的个数
    int liveNum;            // 存活的线程的个数
    int exitNum;            // 要销毁的线程个数
    pthread_mutex_t mutexPool;  // 锁整个的线程池
    pthread_mutex_t mutexBusy;  // 锁busyNum变量
    pthread_cond_t notFull;     // 任务队列是不是满了
    pthread_cond_t notEmpty;    // 任务队列是不是空了

    int shutdown;           // 是不是要销毁线程池, 销毁为1, 不销毁为0
};


ThreadPool* threadPoolCreate(int min, int max, int queueSize)
{
    ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
    do {
        if (pool == NULL)
        {
            printf("malloc threadpool fail...\n");
            //printf("线程池初始化申请内存失败...\n");
            break;
        }
        pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);
        if (pool->threadIDs == NULL)
        {
            printf("malloc threadIDs fail...\n");
            //printf("工作线程初始化申请内存失败...\n");
            break;
        }
        memset(pool->threadIDs, 0, sizeof(pthread_t) * max);
        pool->busyNum = 0;
        pool->minNum = min;
        pool->maxNum = max;
        pool->liveNum = min;
        pool->exitNum = 0;
        if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 || pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
            pthread_cond_init(&pool->notEmpty, NULL) != 0 || pthread_cond_init(&pool->notFull, NULL) != 0)
        {
            printf("mutex or condition init fail...\n");
            //printf("互斥锁或者条件变量初始化申请内存失败...\n");
            break;
        }
        pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);
        pool->queueCapacity = queueSize;
        pool->queueSize = 0;
        pool->queueFront = 0;
        pool->queueRear = 0;

        pool->shutdown = 0;

        pthread_create(&pool->managerID, NULL, manager, pool);
        for (int i = 0; i < min; i++)
        {
            pthread_create(&pool->threadIDs[i], NULL, worker, pool);//要从任务队列取任务传pool
        }
        return pool;
    } while (0);

    // 释放资源
    if (pool && pool->threadIDs) free(pool->threadIDs);
    if (pool && pool->taskQ) free(pool->taskQ);
    if (pool) free(pool);

    return NULL;
}


void* worker(void* arg)
{
    ThreadPool* pool = (ThreadPool*)arg;
    while (1)
    {
        //线程的共享资源-->线程池中的任务队列即线程池
        pthread_mutex_lock(&pool->mutexPool);
        //当前队列为空
        while (pool->queueSize == 0 && !pool->shutdown)
        {
            //阻塞工作线程
            pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);
            // 判断是不是要销毁线程 ---> 自杀
            if (pool->exitNum > 0)
            {
                pool->exitNum--;// >>>>>>>>>>> 放在外面是因为如果下面的if条件不满足的情况这个语句在if里面的话
                //>>>>>>>>>>>当生产者生产通知工作者线程消费的时候你这时候线程自杀显然不符合逻辑的
                //>>>>>>>>>>>应当在这个时候吧要销毁的线程数量清0
                if (pool->liveNum > pool->minNum)
                {
                    pool->liveNum--;
                    pthread_mutex_unlock(&pool->mutexPool);
                    threadExit(pool);
                }
            }
        }
        if (pool->shutdown)
        {
            pthread_mutex_unlock(&pool->mutexPool);
            threadExit(pool);
        }

        Task task;
        task.function = pool->taskQ[pool->queueFront].function;
        task.arg = pool->taskQ[pool->queueFront].arg;

        //移动头节点
        pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;
        pool->queueSize--;
        // >>>>>>>> 条件变量通知生产者生产
        pthread_cond_signal(&pool->notFull);
        pthread_mutex_unlock(&pool->mutexPool);

        printf("thread %ld start working...\n", pthread_self());
        //printf("开始工作了...\n");
        pthread_mutex_lock(&pool->mutexBusy);// >>>>> 锁这个变量就不需要锁整个线程池了
        pool->busyNum++;
        pthread_mutex_unlock(&pool->mutexBusy);
        task.function(task.arg);
        free(task.arg);
        task.arg = NULL;

        printf("thread %ld end working...\n", pthread_self());
        //printf("结束工作了...\n");
        pthread_mutex_lock(&pool->mutexBusy);
        pool->busyNum--;
        pthread_mutex_unlock(&pool->mutexBusy);
    }
    return NULL;
}

void* manager(void* arg)
{
    ThreadPool* pool = (ThreadPool*)arg;
    while(!pool->shutdown)
    {   
        sleep(3);
        //工作线程在读的时候 工作线程可能在写所以要加锁
        pthread_mutex_lock(&pool->mutexPool);
        int queueSize = pool->queueSize;
        int liveNum = pool->liveNum;
        pthread_mutex_unlock(&pool->mutexPool);

        // 取出忙的线程的数量
        pthread_mutex_lock(&pool->mutexBusy);
        int busyNum = pool->busyNum;
        pthread_mutex_unlock(&pool->mutexBusy);

        // 添加线程
       // 任务的个数>存活的线程个数 && 存活的线程数<最大线程数
        if (queueSize > liveNum && liveNum < pool->maxNum)
        {
            pthread_mutex_lock(&pool->mutexPool);
            int counter = 0;
            for (int i = 0; i < pool->maxNum && counter < NUMBER
                && pool->liveNum < pool->maxNum; ++i)
            {
                if (pool->threadIDs[i] == 0)// >>>>>>> 如果创建线程的时候这个数组存入后经过一段时间自杀了那么这个数组对应的位置是需要清0的
                    //>>>>>>>>>>因此应该设置在线程退出的时候把数组所在的线程id清0
                {
                    pthread_create(&pool->threadIDs[i], NULL, worker, pool);
                    counter++;
                    pool->liveNum++;// >>>>>>>> 需要加锁
                }
            }
            pthread_mutex_unlock(&pool->mutexPool);
        }
        // 销毁线程
        // 忙的线程*2 < 存活的线程数 && 存活的线程>最小线程数
        if (busyNum * 2 < liveNum && liveNum > pool->minNum)
        {
            pthread_mutex_lock(&pool->mutexPool);
            pool->exitNum = NUMBER;
            pthread_mutex_unlock(&pool->mutexPool);
            // 让工作的线程自杀
            for (int i = 0; i < NUMBER; ++i)
            {
                pthread_cond_signal(&pool->notEmpty);
            }
        }
    }
    return NULL;
}

void threadExit(ThreadPool* pool)
{
    pthread_t tid = pthread_self();
    for (int i = 0; i < pool->maxNum; i++)
    {
        if (pool->threadIDs[i] == tid)
        {
            pool->threadIDs[i] = 0;
            printf("threadExit() called, %ld exiting...\n", tid);
            break;
        }
    }
    pthread_exit(NULL);
}

//生产
void threadPoolAdd(ThreadPool* pool, void* function(void*), void* arg)
{
    pthread_mutex_lock(&pool->mutexPool);
    while (pool->queueCapacity == pool->queueSize && !pool->shutdown)
    {
        pthread_cond_wait(&pool->notFull,&pool->mutexPool);
    }
    if (pool->shutdown)
    {
        pthread_mutex_unlock(&pool->mutexPool);
        return;
    }
    pool->taskQ[pool->queueRear].function = function;
    pool->taskQ[pool->queueRear].arg = arg;
    pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
    pool->queueSize++;

    pthread_cond_signal(&pool->notEmpty);
    pthread_mutex_unlock(&pool->mutexPool);
}

int threadPoolBusyNum(ThreadPool* pool)
{
    pthread_mutex_lock(&pool->mutexBusy);
    int busyNum = pool->busyNum;
    pthread_mutex_unlock(&pool->mutexBusy);
    return busyNum;
}

int threadPoolAliveNum(ThreadPool* pool)
{
    pthread_mutex_lock(&pool->mutexPool);
    int aliveNum = pool->liveNum;
    pthread_mutex_unlock(&pool->mutexPool);
    return aliveNum;
}

int threadPoolDestroy(ThreadPool* pool)
{
    if (pool == NULL)
    {
        return -1;
    }

    // 关闭线程池
    pool->shutdown = 1;
    // 阻塞回收管理者线程
    pthread_join(pool->managerID, NULL);
    // 唤醒阻塞的消费者线程
    for (int i = 0; i < pool->liveNum; ++i)
    {
        pthread_cond_signal(&pool->notEmpty);
    }
    // 释放堆内存
    if (pool->taskQ)
    {
        free(pool->taskQ);
    }
    if (pool->threadIDs)
    {
        free(pool->threadIDs);
    }

    pthread_mutex_destroy(&pool->mutexPool);
    pthread_mutex_destroy(&pool->mutexBusy);
    pthread_cond_destroy(&pool->notEmpty);
    pthread_cond_destroy(&pool->notFull);

    free(pool);
    pool = NULL;

    return 0;
}

测试

#include "threadpool.h"

void taskFunc(void* arg)
{
    int num = *(int*)arg;
    printf("thread %ld is working, number = %d 工作中请勿打扰谢谢\n",
        pthread_self(), num);
    sleep(1);
}

int main()
{
    // 创建线程池
    ThreadPool* pool = threadPoolCreate(3, 10, 100);
    for (int i = 0; i < 100; ++i)
    {
        int* num = (int*)malloc(sizeof(int));
        *num = i + 100;
        threadPoolAdd(pool, taskFunc, num);
    }

    sleep(30);

    threadPoolDestroy(pool);
    return 0;
}

结果

在这里插入图片描述


http://www.kler.cn/a/388046.html

相关文章:

  • C++ 如何将 gRPC集成到机器人系统中
  • C++实现图书管理系统(Qt C++ GUI界面版)
  • eNSP之家----ACL实验入门实例详解(Access Control List访问控制列表)(重要重要重要的事说三遍)
  • C++ 常见面试题(二)
  • Ubuntu网络连接问题(笔记本更换wifi后,虚拟机连不上网络)
  • execl条件比较两个sheet每个单元格的值
  • 前端基础面试题·第四篇——Vue(其三)
  • 【flask web】补充内容,session、全局对象g、装饰器、钩子函数。
  • RandomWords随机生成单词
  • 「QT」几何数据类 之 QLineF 浮点型直线类
  • 前端入门一之DOM、获取元素、DOM核心、事件高级、操作元素、事件基础、节点操作
  • Spring Security @PreAuthorize @PostAuthorize 权限控制
  • 牛客小白月赛104——D.小红开锁
  • 机器人零位、工作空间、坐标系及其变换,以UR5e机器人为例
  • 大数据程序猿不可不看的资料大全
  • [AI] 深度学习的局限性:人工智能为什么仍然无法实现自主学习
  • 营业执照OCR识别API接口如何用PHP调用
  • RabbitMQ设置TTL(消息过期)时间(重要)
  • Linux(CentOS)安装 Nginx
  • Redis9:商户查询缓存3
  • 探索 Python 图像处理的瑞士军刀:Pillow 库
  • AC+AP 基本配置
  • 括号组合。(DFS)
  • 【前端打包秘籍】一文掌握webpack入口与出口的最佳实践 (9)
  • 超详细:三大范式设计+反范式设计+树
  • Java使用aspose导出pdf、字体乱码问题(小方块,不能识别中文)