当前位置: 首页 > article >正文

Linux基础 -- pthread之线程池任务调度

线程池任务依赖设计方案

1. 设计目标

为了在多线程环境中支持任务间的依赖关系,我们设计了一个基于 pthread_create 封装的线程池,任务之间可以设置依赖,只有在依赖的任务完成后,依赖任务才会被执行。该设计目标是简化任务调度的逻辑,让开发者可以专注于任务的编写,而不必关注复杂的线程管理和任务依赖的执行顺序。

2. 核心概念

2.1 任务(Task)

任务是线程池中执行的最小单位。每个任务包含以下信息:

  • 任务函数:待执行的工作函数。
  • 参数:传递给任务函数的参数。
  • 依赖任务:任务可以依赖其他任务,只有依赖的任务执行完成,当前任务才能执行。
  • 依赖计数:记录任务依赖的其他任务数量,每当一个依赖任务完成,该计数器减 1,直到依赖计数为 0 时,任务才会被执行。

2.2 任务队列

任务队列是一个先进先出的队列,用于存储待执行的任务。线程池中的线程会从任务队列中取出任务并执行。为了支持任务依赖,只有当任务的依赖任务全部完成后,任务才会被放入任务队列中。

2.3 线程池(Thread Pool)

线程池是线程管理的核心组件,它包含以下部分:

  • 线程数组:线程池中的线程,负责从任务队列中获取任务并执行。
  • 任务队列:用于存储待执行任务的队列。
  • 互斥锁和条件变量:保证线程安全地访问任务队列,并通过条件变量实现任务的调度。

3. 线程池工作原理

3.1 任务的添加与依赖处理

当开发者向线程池提交一个任务时,可以同时提交该任务所依赖的其他任务。如果任务没有依赖任务,任务会被立即加入任务队列中,等待线程执行。如果任务有依赖任务,任务会暂时挂起,直到其所有依赖任务完成。

每当一个任务执行完成后,线程池会检查是否有其他任务依赖该任务。如果有,线程池会减少依赖任务的依赖计数器,当计数器为 0 时,将该依赖任务放入任务队列中。

3.2 线程的工作流程

线程池中的每个线程都会执行以下操作:

  1. 从任务队列中取出一个任务。
  2. 执行任务函数。
  3. 任务执行完成后,通知所有依赖该任务的其他任务,减少这些任务的依赖计数。
  4. 如果依赖计数为 0,则将依赖任务放入任务队列中。

3.3 线程池的关闭与销毁

在销毁线程池时,线程池会通知所有线程停止工作,等待所有线程结束后,释放所有分配的内存资源,确保程序无资源泄漏。

4. 主要流程

4.1 任务提交流程

  1. 开发者调用线程池的 add_task 接口提交任务。
  2. 如果任务没有依赖,立即将任务加入任务队列中。
  3. 如果任务有依赖,记录依赖关系,并等待依赖任务完成。

4.2 任务执行流程

  1. 线程从任务队列中获取一个任务。
  2. 执行任务。
  3. 检查是否有其他任务依赖于该任务。
  4. 如果有依赖任务,减少依赖任务的计数器。
  5. 如果依赖计数器为 0,将依赖任务放入任务队列。

4.3 任务依赖完成通知

  1. 任务完成后,通知所有依赖它的任务。
  2. 对每个依赖任务,减少依赖计数器。
  3. 当依赖计数器为 0 时,将该任务加入任务队列,等待执行。

5. 锁与同步机制

为了保证多线程环境下任务队列和任务依赖关系的安全性,线程池使用互斥锁和条件变量:

  • 互斥锁:保护任务队列和任务依赖数据的并发访问。
  • 条件变量:用于通知线程任务队列中有新任务需要处理,避免线程一直忙等待。

6. 优势与适用场景

6.1 优势

  • 简化任务依赖管理:通过自动管理任务依赖,减少开发者手动处理依赖逻辑的负担。
  • 高效的线程复用:线程池中的线程复用,避免频繁创建和销毁线程带来的性能开销。
  • 支持任务间复杂依赖:通过依赖计数和依赖队列,可以支持复杂的任务依赖关系,比如任务链和任务图。

6.2 适用场景

  • 多任务的复杂调度:多个任务之间存在依赖关系的场景,例如图像处理中的多个滤波器操作、数据处理管道等。
  • 任务并行化:需要将一系列任务并行化执行,且任务之间存在顺序依赖。
  • 性能优化:通过复用线程池减少频繁线程创建和销毁的开销,适用于对性能要求较高的应用场景。

7. 扩展功能

  • 动态调整线程池大小:根据系统的负载情况,动态调整线程池中的线程数量,以适应不同的任务需求。
  • 任务优先级支持:为任务设置优先级,高优先级的任务可以优先执行。
  • 任务超时处理:增加任务超时功能,确保长时间未完成的任务可以被正确处理,避免任务卡住。

8. 线程池的设计细节

8.1 任务结构设计

每个任务封装了任务函数及其参数,并且通过依赖计数器来追踪当前任务的依赖任务。任务的主要结构包括以下字段:

  • func:任务函数指针,表示具体的工作逻辑。
  • arg:任务函数的参数。
  • dependencies:一个指向依赖任务数组的指针,用于保存该任务所依赖的其他任务。
  • dep_count:依赖任务的总数量,用于初始化 remaining_deps 计数器。
  • remaining_deps:当前任务未完成的依赖任务计数器。当其值为 0 时,任务将可以被执行。

8.2 线程池结构设计

线程池通过维护一个任务队列来管理任务执行,同时通过线程数组来管理线程的执行。主要结构包括:

  • threads:线程数组,用于保存所有创建的线程。
  • task_queue:任务队列,采用链表的形式,存储待执行的任务。
  • lock:互斥锁,保证任务队列在多线程环境下的安全访问。
  • notify:条件变量,用于在线程空闲时唤醒它们执行新的任务。
  • thread_count:线程池中线程的数量。
  • stop:线程池是否已经停止的标志,线程池销毁时设为 true,通知所有线程退出。

8.3 锁与条件变量的使用

  • 锁(Mutex):任务队列的操作需要线程安全,因此我们在对任务队列进行增删操作时,必须加锁保护,确保只有一个线程可以修改任务队列。
  • 条件变量(Condition Variable):当任务队列为空时,线程会进入等待状态,避免忙等待浪费 CPU 资源。当有新任务加入队列时,条件变量会通知空闲线程执行任务。

9. 线程池中的任务调度

9.1 任务提交与依赖关系处理

  1. 无依赖任务:如果任务没有依赖任务,则直接将其加入任务队列,等待线程执行。
  2. 有依赖任务:如果任务有依赖任务,则该任务的 remaining_deps 被设置为依赖任务的数量。当依赖任务完成时,remaining_deps 计数器减少,直到所有依赖任务完成,remaining_deps 为 0,该任务才会被放入任务队列。

9.2 任务执行与依赖任务通知

  1. 当线程从任务队列中取出任务并执行时,首先完成当前任务的工作。
  2. 任务执行完成后,线程检查是否有其他任务依赖于该任务。对于每个依赖任务,减少其 remaining_deps 计数。
  3. 如果依赖任务的 remaining_deps 变为 0,则该任务被放入任务队列,准备执行。

9.3 任务的优先执行

默认情况下,任务是按提交顺序被执行的。如果需要支持优先级,可以为每个任务增加优先级字段,并修改任务队列的插入逻辑,使高优先级任务排在前面。这样可以确保高优先级任务优先执行。

10. 销毁线程池的安全性

10.1 停止标志

当用户请求销毁线程池时,线程池需要安全地通知所有工作线程退出。我们使用 stop 标志来表示线程池是否处于停止状态。当该标志被设置为 true 时,所有线程会在任务执行完毕后退出。

10.2 资源清理

线程池销毁时,需要确保以下资源被正确释放:

  • 线程数组:所有线程在退出前需要 pthread_join,以确保线程正常结束。
  • 任务队列:在销毁线程池前,所有未执行的任务需要被清理,以避免内存泄漏。
  • 锁与条件变量:在释放线程池资源时,需要销毁互斥锁和条件变量。

11. 性能优化建议

11.1 任务批处理

如果任务依赖性较小,线程池可以批量处理任务,减少频繁的锁开销。例如,多个无依赖的任务可以一次性加入任务队列,线程从队列中批量取出任务执行,减少锁的竞争。

11.2 线程池动态扩展

根据系统负载情况,线程池的线程数量可以动态调整。例如,在任务过多时,临时增加线程处理任务;当任务量减少时,减少线程的数量,以避免线程资源浪费。

11.3 任务超时处理

为每个任务设定超时时间,避免任务长时间占用线程。超时未完成的任务可以被取消或重新调度,这对于高实时性要求的系统尤其重要。

12. 常见问题与处理

12.1 任务依赖死锁

在设计任务依赖时,必须确保任务的依赖关系是无环的(即不能出现循环依赖)。如果任务形成了环形依赖,所有任务都将等待依赖任务完成,导致死锁。因此,在任务依赖设计中,必须保证任务依赖关系构成一个有向无环图(DAG)。

12.2 任务队列溢出

如果任务提交过于频繁,任务队列可能会堆积大量任务,导致内存消耗过大。解决方法包括限制任务队列的大小,或动态调整队列大小,并且在队列满时,可以选择拒绝新任务或阻塞提交线程。

12.3 线程饥饿

如果某些任务始终得不到执行(比如低优先级任务),会导致线程饥饿问题。可以通过任务优先级策略和公平调度算法,保证每个任务都能在合理时间内得到执行。

13. 未来扩展

13.1 支持分布式任务调度

线程池的任务调度机制可以扩展到分布式系统中,在多台服务器之间共享任务队列,实现跨节点的任务分配与执行。

13.2 支持更多的任务调度策略

当前设计仅支持简单的任务依赖和优先级调度。未来可以支持更多复杂的调度策略,例如基于任务执行时间的调度、负载均衡调度等。

13.3 与异步I/O的结合

将线程池与异步 I/O 机制结合,允许线程池中的线程在等待 I/O 操作时不阻塞,从而提高任务执行效率,特别适合高并发的网络应用。

代码

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <stdbool.h>

// 任务函数类型
typedef void (*task_func)(void*);

// 任务结构
typedef struct task {
    task_func func;            // 任务函数
    void* arg;                 // 任务参数
    struct task** dependencies; // 依赖的任务
    int dep_count;             // 依赖任务的数量
    int remaining_deps;        // 剩余未完成的依赖任务数量
    struct task* next;         // 下一个任务
} task_t;

// 线程池结构
typedef struct thread_pool {
    pthread_mutex_t lock;      // 互斥锁保护任务队列
    pthread_cond_t notify;     // 条件变量,用于通知线程
    pthread_t* threads;        // 线程数组
    task_t* task_queue;        // 任务队列
    int thread_count;          // 线程数量
    bool stop;                 // 标志是否停止线程池
} thread_pool_t;

// 创建线程池
thread_pool_t* thread_pool_create(int thread_count);

// 添加任务到线程池
int thread_pool_add(thread_pool_t* pool, task_func func, void* arg, task_t** dependencies, int dep_count);

// 销毁线程池
void thread_pool_destroy(thread_pool_t* pool);

// 线程的工作函数
void* thread_work(void* arg);

// 初始化线程池
thread_pool_t* thread_pool_create(int thread_count) {
    thread_pool_t* pool = (thread_pool_t*)malloc(sizeof(thread_pool_t));
    if (pool == NULL) {
        perror("创建线程池失败");
        return NULL;
    }

    pool->thread_count = thread_count;
    pool->task_queue = NULL;
    pool->stop = false;

    pthread_mutex_init(&pool->lock, NULL);
    pthread_cond_init(&pool->notify, NULL);
    pool->threads = (pthread_t*)malloc(sizeof(pthread_t) * thread_count);

    for (int i = 0; i < thread_count; i++) {
        if (pthread_create(&pool->threads[i], NULL, thread_work, (void*)pool) != 0) {
            perror("线程创建失败");
            thread_pool_destroy(pool);
            return NULL;
        }
    }
    return pool;
}

// 线程的工作函数,不断从任务队列中取任务并执行
void* thread_work(void* arg) {
    thread_pool_t* pool = (thread_pool_t*)arg;
    while (true) {
        pthread_mutex_lock(&pool->lock);

        while (pool->task_queue == NULL && !pool->stop) {
            pthread_cond_wait(&pool->notify, &pool->lock);
        }

        if (pool->stop) {
            pthread_mutex_unlock(&pool->lock);
            break;
        }

        // 获取任务
        task_t* task = pool->task_queue;
        if (task != NULL && task->remaining_deps == 0) {
            pool->task_queue = task->next;
        } else {
            task = NULL;
        }

        pthread_mutex_unlock(&pool->lock);

        // 执行任务
        if (task != NULL) {
            task->func(task->arg);

            // 依赖的任务完成后,通知依赖它的任务
            pthread_mutex_lock(&pool->lock);
            for (int i = 0; i < task->dep_count; i++) {
                task->dependencies[i]->remaining_deps--;
                if (task->dependencies[i]->remaining_deps == 0) {
                    // 将依赖任务加入任务队列中
                    task->dependencies[i]->next = pool->task_queue;
                    pool->task_queue = task->dependencies[i];
                    pthread_cond_signal(&pool->notify);
                }
            }
            pthread_mutex_unlock(&pool->lock);

            free(task);
        }
    }
    return NULL;
}

// 添加任务到线程池
int thread_pool_add(thread_pool_t* pool, task_func func, void* arg, task_t** dependencies, int dep_count) {
    task_t* task = (task_t*)malloc(sizeof(task_t));
    if (task == NULL) {
        perror("任务分配失败");
        return -1;
    }

    task->func = func;
    task->arg = arg;
    task->dependencies = dependencies;
    task->dep_count = dep_count;
    task->remaining_deps = dep_count;
    task->next = NULL;

    pthread_mutex_lock(&pool->lock);

    // 如果没有依赖任务,立即执行
    if (task->remaining_deps == 0) {
        task->next = pool->task_queue;
        pool->task_queue = task;
        pthread_cond_signal(&pool->notify);
    } else {
        // 否则等待依赖任务完成
        for (int i = 0; i < dep_count; i++) {
            if (dependencies[i] == NULL) {
                perror("依赖任务无效");
                free(task);
                pthread_mutex_unlock(&pool->lock);
                return -1;
            }
        }
    }

    pthread_mutex_unlock(&pool->lock);
    return 0;
}

// 销毁线程池
void thread_pool_destroy(thread_pool_t* pool) {
    if (pool == NULL) return;

    pthread_mutex_lock(&pool->lock);
    pool->stop = true;
    pthread_cond_broadcast(&pool->notify);
    pthread_mutex_unlock(&pool->lock);

    for (int i = 0; i < pool->thread_count; i++) {
        pthread_join(pool->threads[i], NULL);
    }

    free(pool->threads);

    task_t* current = pool->task_queue;
    while (current != NULL) {
        task_t* temp = current;
        current = current->next;
        free(temp);
    }

    pthread_mutex_destroy(&pool->lock);
    pthread_cond_destroy(&pool->notify);
    free(pool);
}

// 测试任务函数
void print_message(void* arg) {
    char* message = (char*)arg;
    printf("执行任务: %s\n", message);
    sleep(1);
}

// 测试代码
int main() {
    // 创建线程池,包含 4 个线程
    thread_pool_t* pool = thread_pool_create(4);

    // 创建任务结构体
    task_t* task1 = (task_t*)malloc(sizeof(task_t));
    task_t* task2 = (task_t*)malloc(sizeof(task_t));
    task_t* task3 = (task_t*)malloc(sizeof(task_t));

    // 提交任务1
    thread_pool_add(pool, print_message, "任务1", NULL, 0);

    // 任务2依赖任务1
    task_t* dependencies1[] = {task1};
    thread_pool_add(pool, print_message, "任务2", dependencies1, 1);

    // 任务3依赖任务2
    task_t* dependencies2[] = {task2};
    thread_pool_add(pool, print_message, "任务3", dependencies2, 1);

    // 模拟主线程等待任务完成
    sleep(5);

    // 销毁线程池
    thread_pool_destroy(pool);

    return 0;
}


http://www.kler.cn/a/290276.html

相关文章:

  • 将大型语言模型(如GPT-4)微调用于文本续写任务
  • https网站 请求http图片报错:net::ERR_SSL_PROTOCOL_ERROR
  • 25浙江省考-专项刷题(资料分析)-错题本
  • 【设计模式】结构型模式(四):组合模式、享元模式
  • SpringBoot(十三)SpringBoot配置webSocket
  • Local Dimming和Mini LED简介
  • Windows编程系列:PE文件结构
  • 【图论】Dijkstra算法求最短路
  • 【源码】Sharding-JDBC源码分析之ContextManager创建中ShardingSphereDatabase的创建原理
  • 注册安全分析报告:熊猫频道
  • centos 安装使用aria2
  • 数据分析处理库(pandas)
  • 802.11 中 scrambler的matlab仿真
  • Oracle中的临时表Temporary Table
  • [数据集][目标检测]课堂行行为检测数据集VOC+YOLO格式4065张12类别
  • 【2024最新】Adobe Lightroom Classic安装教程(直接使用)
  • 【算法每日一练及解题思路】判断字符串是否包含数字
  • K8S CronJob
  • 跨域问题及解决方案
  • 鸿萌数据恢复服务:VMWare 虚拟机无法访问,该怎样解决?
  • C++中(Qt)类与命名空间
  • 数据结构07
  • idea2021安装教程与常见配置(可激活至2099年)
  • el-select在火狐浏览器中 点击搜索框聚焦时会有一个蓝色的框
  • 新电脑Win11系统想要降级为Win10怎么操作?
  • torchvision库学习之transforms.Compose(模块)