当前位置: 首页 > article >正文

【C】一文速学----线程池原理与实战

文章目录

  • 线程池简介
  • 线程池原理
  • 线程池实现
    • 数据结构设计
    • 接口设计
    • 线程池实现

线程池简介

线程池是管理一系列线程的资源池,其提供了一种限制和管理线程资源的方式。之所以采用线程池,因为如果每次接收请求就创建一个线程,不断地对线程进行创建和销毁对服务器来说是一种资源的浪费,尤其当接入数量持续增加,系统资源是有限的,这会成为系统的负担。

为了规避上述问题,给系统固定数量的线程来处理请求,这就是线程池。对于如何确定线程池中应该有多少线程,应该依据任务类型来看,如果是CPU密集型,由系统cpu核心数量确定,如果是IO密集型则需要至少cpu核心数量两倍的线程数量。

线程池原理

线程池工作原理如下:
在这里插入图片描述

线程池实现

数据结构设计


typedef struct thrdpool_s thrdpool_t;
// 任务执行的规范 ctx 上下文
typedef void (*handler_pt)(void * /* ctx */);

//任务
typedef struct task_s {
    void *next;  //下一个任务
    handler_pt func; //回调函数
    void *arg;  //回调函数参数
} task_t;

//任务队列
typedef struct task_queue_s {
    void *head; //队列头指针
    void **tail; //队列尾指针
    int block;  //标识队列是否为空
    //为了学习不同的锁而在这里设置
    spinlock_t lock; //队列锁  原子锁适合轻量级、细粒度的操作,更关注性能,主要用于低竞争场景。
    pthread_mutex_t mutex; //互斥锁 适合复杂的同步需求,更关注功能,主要用于高竞争或复杂的资源保护。
    pthread_cond_t cond; //条件变量
} task_queue_t;

//线程池
struct thrdpool_s {
    task_queue_t *task_queue; //任务队列
    atomic_int quit; //线程池是否关闭
    int thrd_count; //线程数量
    pthread_t *threads; //保存线程ID
};

接口设计

只向用户暴露关于线程池的创建,以及向线程池中输入任务和任务执行的函数。对于线程池内部关于任务的添加,任务的消费全部封装。

thrdpool_t *thrdpool_create(int thrd_count);
int thrdpool_post(thrdpool_t *pool, handler_pt func, void *arg);
void thrdpool_waitdone(thrdpool_t *pool);

线程池实现

基于上面的设定,对线程池给予实现

//创建任务队列
static task_queue_t *_taskqueue_create()
{
    int ret;
    task_queue_t *queue = (task_queue_t *)malloc(sizeof(task_queue_t));
    if (queue)
    {
        ret = pthread_mutex_init(&queue->mutex, NULL);
        if (ret == 0)
        {
            ret = pthread_cond_init(&queue->cond, NULL);
            if (ret == 0)
            {
                queue->head = NULL;
                //二级指针,指向head指针的地址
                queue->tail = &queue->head;
                //设置队列是锁住的,无任务
                queue->block = 1;
                return queue;
            }
            pthread_mutex_destroy(&queue->mutex);
        }
        free(queue);
    }
    return NULL;
}
//从队列弹出任务
static void * _pop_task(task_queue_t *queue){
	//原子锁
    spinlock_lock(&queue->lock);
    if(queue->head==NULL){
        spinlock_unlock(&queue->lock);
        return NULL;
    }

    task_t *task;
    task=queue->head;

    //强制转换task为二级指针,因为task结构体第一个是next指针,其实link指向的是next
    void **link = (void**)task;
    //*link的值就是next指向的节点
    queue->head=*link;
    
    if(queue->head==NULL)
    {
        queue->tail=&queue->head;
    }

    spinlock_unlock(&queue->lock);
    return task;
}
//获得任务
static void * _get_task(task_queue_t *queue){
    task_t *task;
    //当无法再弹出任务是进入循环
    while((task=_pop_task(queue))==NULL){
        pthread_mutex_lock(&queue->mutex);
        //当任务队列锁住时退出互斥锁
        if (queue->block == 0) {
            pthread_mutex_unlock(&queue->mutex);
            return NULL;
        }
        // 1. 先 unlock(&mtx)
        // 2. 在 cond 休眠
        // --- __add_task 时唤醒
        // 3. 在 cond 唤醒
        // 4. 加上 lock(&mtx);
        
        //释放锁,等待信号量变化;当信号量变化,再次重新获得锁
        pthread_cond_wait(&queue->cond, &queue->mutex);
        pthread_mutex_unlock(&queue->mutex);
    }

    return task;
}
//线程执行函数
static void *_thrdpool_worker(void *arg){
    thrdpool_t *pool=(thrdpool_t *)arg;

    task_t* task;
    void *ctx;
    //检测线程池是否在运行状态
    while(atomic_load(&pool->quit)==0){
        task=(task_t*)_get_task(pool->task_queue);
        if(!task) break;
        handler_pt func=task->func;
        ctx=task->arg;
        free(task);
        //执行任务函数
        func(ctx);
    }
}
//任务队列添加任务
static void _add_task(task_queue_t *queue,void *task){
	//强制转换二级指针,link指向next
    void **link = (void **)task;
    //next指向为NULL
    *link=NULL;

    spinlock_lock(&queue->lock);
    //*queue->tail表示next的值,link即是指向next,同时那个地址也是task的首地址
    *queue->tail=link;
    
    queue->tail=link;
    spinlock_unlock(&queue->lock);
}
//往线程池中放入任务
int thrdpool_post(thrdpool_t* pool,handler_pt func,void *arg){
    if(atomic_load(&pool->quit)==1) return -1;

    task_t * task =(task_t*)malloc(sizeof(task_t));
    if(!task) return -1;
    task->func=func;
    task->arg=arg;
    _add_task(pool->task_queue,task);
    return 0;
}
//创建线程
static int _threads_create(thrdpool_t *pool, int count)
{
    int ret;
    pthread_attr_t attr;
    ret = pthread_attr_init(&attr);

    if (ret == 0)
    {
        pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * count);
        if (pool->threads)
        {
            int i = 0;
            for (i = 0; i < count; i++)
            {
                if ((pthread_create(&pool->threads[i], &attr, _thrdpool_worker, pool)) != 0)
                {
                    break;
                }
            }

            pool->thrd_count = i;
            pthread_attr_destroy(&attr);

        }
        ret = -1;
    }

    return ret;
}
//线程池创建
thrdpool_t *thrdpool_create(int thrd_count)
{
    thrdpool_t *pool;

    pool = (thrdpool_t *)malloc(sizeof(thrdpool_t));
    if (pool)
    {
    	//创建任务队列
        task_queue_t *queue = _taskqueue_create();
        if (queue)
        {
            pool->task_queue = queue;
            //初始化线程池标志,0表示线程池在运行状态,1表示停止运行状态
            atomic_init(&pool->quit,0);
            if (__threads_create(pool, thrd_count) == 0)
                return pool;
            _taskqueue_destroy(queue);
        }
        free(pool);
    }
    return NULL;
}
//销毁任务队列
static void _taskqueue_destroy(task_queue_t *queue) {
    task_t *task;
    while ((task = __pop_task(queue))) {
        free(task);
    }
    spinlock_destroy(&queue->lock);
    pthread_cond_destroy(&queue->cond);
    pthread_mutex_destroy(&queue->mutex);
    free(queue);
}
//关闭线程池
void thrdpool_waitdone(thrdpool_t *pool) {
    int i;
    for (i=0; i<pool->thrd_count; i++) {
        pthread_join(pool->threads[i], NULL);
    }
    _taskqueue_destroy(pool->task_queue);
    free(pool->threads);
    free(pool);
}

在上面的代码中涉及到强制转换二级指针的操作,如果不能很好的理解,可以参考【C】二级指针的原理与应用


http://www.kler.cn/a/389475.html

相关文章:

  • Systemd: disable和mask的区别
  • 【Playwright + Python】系列(十)利用 Playwright 完美处理 Dialogs 对话框
  • 2-UML概念模型测试
  • PaaS云原生:分布式集群中如何构建自动化压测工具
  • 【LeetCode】【算法】23. 合并K个升序链表
  • 03WIFI与蓝牙1——基于全志V3S的Linux开发板教程笔记
  • 18. 友元
  • 分享三个python爬虫案例
  • ServletContext介绍
  • 别再为视频转文字烦恼啦!这10个转换工具帮你一键搞定。
  • UE5 随机生成地牢关卡
  • Python酷库之旅-第三方库Pandas(206)
  • 信息安全数学基础(47)域的结构
  • 浔川 AI 翻译 v5.0 上线时间相关公告
  • canal配置之一:admin配置
  • 手边酒店多商户版V2源码独立部署_博纳软云
  • 多智能体系统的构建
  • C++线程
  • 【大数据学习 | kafka高级部分】kafka的快速读写
  • 道品科技水肥一体化在农业生产中的必要性与应用领域探讨
  • 微服务架构面试内容整理-消息驱动-RocketMQ
  • redis RDB持久化技术
  • mysql第二次作业---单表和多表查询
  • Rust性能优化与调试之性能基准测试
  • 如何使用SparkSQL在hive中使用Spark的引擎计算
  • 全网最详细的自动化测试(Jenkins 篇)