Linux基础 -- pthread之线程池任务调度
线程池任务依赖设计方案
1. 设计目标
为了在多线程环境中支持任务间的依赖关系,我们设计了一个基于 pthread_create
封装的线程池,任务之间可以设置依赖,只有在依赖的任务完成后,依赖任务才会被执行。该设计目标是简化任务调度的逻辑,让开发者可以专注于任务的编写,而不必关注复杂的线程管理和任务依赖的执行顺序。
2. 核心概念
2.1 任务(Task)
任务是线程池中执行的最小单位。每个任务包含以下信息:
- 任务函数:待执行的工作函数。
- 参数:传递给任务函数的参数。
- 依赖任务:任务可以依赖其他任务,只有依赖的任务执行完成,当前任务才能执行。
- 依赖计数:记录任务依赖的其他任务数量,每当一个依赖任务完成,该计数器减 1,直到依赖计数为 0 时,任务才会被执行。
2.2 任务队列
任务队列是一个先进先出的队列,用于存储待执行的任务。线程池中的线程会从任务队列中取出任务并执行。为了支持任务依赖,只有当任务的依赖任务全部完成后,任务才会被放入任务队列中。
2.3 线程池(Thread Pool)
线程池是线程管理的核心组件,它包含以下部分:
- 线程数组:线程池中的线程,负责从任务队列中获取任务并执行。
- 任务队列:用于存储待执行任务的队列。
- 互斥锁和条件变量:保证线程安全地访问任务队列,并通过条件变量实现任务的调度。
3. 线程池工作原理
3.1 任务的添加与依赖处理
当开发者向线程池提交一个任务时,可以同时提交该任务所依赖的其他任务。如果任务没有依赖任务,任务会被立即加入任务队列中,等待线程执行。如果任务有依赖任务,任务会暂时挂起,直到其所有依赖任务完成。
每当一个任务执行完成后,线程池会检查是否有其他任务依赖该任务。如果有,线程池会减少依赖任务的依赖计数器,当计数器为 0 时,将该依赖任务放入任务队列中。
3.2 线程的工作流程
线程池中的每个线程都会执行以下操作:
- 从任务队列中取出一个任务。
- 执行任务函数。
- 任务执行完成后,通知所有依赖该任务的其他任务,减少这些任务的依赖计数。
- 如果依赖计数为 0,则将依赖任务放入任务队列中。
3.3 线程池的关闭与销毁
在销毁线程池时,线程池会通知所有线程停止工作,等待所有线程结束后,释放所有分配的内存资源,确保程序无资源泄漏。
4. 主要流程
4.1 任务提交流程
- 开发者调用线程池的
add_task
接口提交任务。 - 如果任务没有依赖,立即将任务加入任务队列中。
- 如果任务有依赖,记录依赖关系,并等待依赖任务完成。
4.2 任务执行流程
- 线程从任务队列中获取一个任务。
- 执行任务。
- 检查是否有其他任务依赖于该任务。
- 如果有依赖任务,减少依赖任务的计数器。
- 如果依赖计数器为 0,将依赖任务放入任务队列。
4.3 任务依赖完成通知
- 任务完成后,通知所有依赖它的任务。
- 对每个依赖任务,减少依赖计数器。
- 当依赖计数器为 0 时,将该任务加入任务队列,等待执行。
5. 锁与同步机制
为了保证多线程环境下任务队列和任务依赖关系的安全性,线程池使用互斥锁和条件变量:
- 互斥锁:保护任务队列和任务依赖数据的并发访问。
- 条件变量:用于通知线程任务队列中有新任务需要处理,避免线程一直忙等待。
6. 优势与适用场景
6.1 优势
- 简化任务依赖管理:通过自动管理任务依赖,减少开发者手动处理依赖逻辑的负担。
- 高效的线程复用:线程池中的线程复用,避免频繁创建和销毁线程带来的性能开销。
- 支持任务间复杂依赖:通过依赖计数和依赖队列,可以支持复杂的任务依赖关系,比如任务链和任务图。
6.2 适用场景
- 多任务的复杂调度:多个任务之间存在依赖关系的场景,例如图像处理中的多个滤波器操作、数据处理管道等。
- 任务并行化:需要将一系列任务并行化执行,且任务之间存在顺序依赖。
- 性能优化:通过复用线程池减少频繁线程创建和销毁的开销,适用于对性能要求较高的应用场景。
7. 扩展功能
- 动态调整线程池大小:根据系统的负载情况,动态调整线程池中的线程数量,以适应不同的任务需求。
- 任务优先级支持:为任务设置优先级,高优先级的任务可以优先执行。
- 任务超时处理:增加任务超时功能,确保长时间未完成的任务可以被正确处理,避免任务卡住。
8. 线程池的设计细节
8.1 任务结构设计
每个任务封装了任务函数及其参数,并且通过依赖计数器来追踪当前任务的依赖任务。任务的主要结构包括以下字段:
func
:任务函数指针,表示具体的工作逻辑。arg
:任务函数的参数。dependencies
:一个指向依赖任务数组的指针,用于保存该任务所依赖的其他任务。dep_count
:依赖任务的总数量,用于初始化remaining_deps
计数器。remaining_deps
:当前任务未完成的依赖任务计数器。当其值为 0 时,任务将可以被执行。
8.2 线程池结构设计
线程池通过维护一个任务队列来管理任务执行,同时通过线程数组来管理线程的执行。主要结构包括:
threads
:线程数组,用于保存所有创建的线程。task_queue
:任务队列,采用链表的形式,存储待执行的任务。lock
:互斥锁,保证任务队列在多线程环境下的安全访问。notify
:条件变量,用于在线程空闲时唤醒它们执行新的任务。thread_count
:线程池中线程的数量。stop
:线程池是否已经停止的标志,线程池销毁时设为true
,通知所有线程退出。
8.3 锁与条件变量的使用
- 锁(Mutex):任务队列的操作需要线程安全,因此我们在对任务队列进行增删操作时,必须加锁保护,确保只有一个线程可以修改任务队列。
- 条件变量(Condition Variable):当任务队列为空时,线程会进入等待状态,避免忙等待浪费 CPU 资源。当有新任务加入队列时,条件变量会通知空闲线程执行任务。
9. 线程池中的任务调度
9.1 任务提交与依赖关系处理
- 无依赖任务:如果任务没有依赖任务,则直接将其加入任务队列,等待线程执行。
- 有依赖任务:如果任务有依赖任务,则该任务的
remaining_deps
被设置为依赖任务的数量。当依赖任务完成时,remaining_deps
计数器减少,直到所有依赖任务完成,remaining_deps
为 0,该任务才会被放入任务队列。
9.2 任务执行与依赖任务通知
- 当线程从任务队列中取出任务并执行时,首先完成当前任务的工作。
- 任务执行完成后,线程检查是否有其他任务依赖于该任务。对于每个依赖任务,减少其
remaining_deps
计数。 - 如果依赖任务的
remaining_deps
变为 0,则该任务被放入任务队列,准备执行。
9.3 任务的优先执行
默认情况下,任务是按提交顺序被执行的。如果需要支持优先级,可以为每个任务增加优先级字段,并修改任务队列的插入逻辑,使高优先级任务排在前面。这样可以确保高优先级任务优先执行。
10. 销毁线程池的安全性
10.1 停止标志
当用户请求销毁线程池时,线程池需要安全地通知所有工作线程退出。我们使用 stop
标志来表示线程池是否处于停止状态。当该标志被设置为 true
时,所有线程会在任务执行完毕后退出。
10.2 资源清理
线程池销毁时,需要确保以下资源被正确释放:
- 线程数组:所有线程在退出前需要
pthread_join
,以确保线程正常结束。 - 任务队列:在销毁线程池前,所有未执行的任务需要被清理,以避免内存泄漏。
- 锁与条件变量:在释放线程池资源时,需要销毁互斥锁和条件变量。
11. 性能优化建议
11.1 任务批处理
如果任务依赖性较小,线程池可以批量处理任务,减少频繁的锁开销。例如,多个无依赖的任务可以一次性加入任务队列,线程从队列中批量取出任务执行,减少锁的竞争。
11.2 线程池动态扩展
根据系统负载情况,线程池的线程数量可以动态调整。例如,在任务过多时,临时增加线程处理任务;当任务量减少时,减少线程的数量,以避免线程资源浪费。
11.3 任务超时处理
为每个任务设定超时时间,避免任务长时间占用线程。超时未完成的任务可以被取消或重新调度,这对于高实时性要求的系统尤其重要。
12. 常见问题与处理
12.1 任务依赖死锁
在设计任务依赖时,必须确保任务的依赖关系是无环的(即不能出现循环依赖)。如果任务形成了环形依赖,所有任务都将等待依赖任务完成,导致死锁。因此,在任务依赖设计中,必须保证任务依赖关系构成一个有向无环图(DAG)。
12.2 任务队列溢出
如果任务提交过于频繁,任务队列可能会堆积大量任务,导致内存消耗过大。解决方法包括限制任务队列的大小,或动态调整队列大小,并且在队列满时,可以选择拒绝新任务或阻塞提交线程。
12.3 线程饥饿
如果某些任务始终得不到执行(比如低优先级任务),会导致线程饥饿问题。可以通过任务优先级策略和公平调度算法,保证每个任务都能在合理时间内得到执行。
13. 未来扩展
13.1 支持分布式任务调度
线程池的任务调度机制可以扩展到分布式系统中,在多台服务器之间共享任务队列,实现跨节点的任务分配与执行。
13.2 支持更多的任务调度策略
当前设计仅支持简单的任务依赖和优先级调度。未来可以支持更多复杂的调度策略,例如基于任务执行时间的调度、负载均衡调度等。
13.3 与异步I/O的结合
将线程池与异步 I/O 机制结合,允许线程池中的线程在等待 I/O 操作时不阻塞,从而提高任务执行效率,特别适合高并发的网络应用。
代码
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <stdbool.h>
// 任务函数类型
typedef void (*task_func)(void*);
// 任务结构
typedef struct task {
task_func func; // 任务函数
void* arg; // 任务参数
struct task** dependencies; // 依赖的任务
int dep_count; // 依赖任务的数量
int remaining_deps; // 剩余未完成的依赖任务数量
struct task* next; // 下一个任务
} task_t;
// 线程池结构
typedef struct thread_pool {
pthread_mutex_t lock; // 互斥锁保护任务队列
pthread_cond_t notify; // 条件变量,用于通知线程
pthread_t* threads; // 线程数组
task_t* task_queue; // 任务队列
int thread_count; // 线程数量
bool stop; // 标志是否停止线程池
} thread_pool_t;
// 创建线程池
thread_pool_t* thread_pool_create(int thread_count);
// 添加任务到线程池
int thread_pool_add(thread_pool_t* pool, task_func func, void* arg, task_t** dependencies, int dep_count);
// 销毁线程池
void thread_pool_destroy(thread_pool_t* pool);
// 线程的工作函数
void* thread_work(void* arg);
// 初始化线程池
thread_pool_t* thread_pool_create(int thread_count) {
thread_pool_t* pool = (thread_pool_t*)malloc(sizeof(thread_pool_t));
if (pool == NULL) {
perror("创建线程池失败");
return NULL;
}
pool->thread_count = thread_count;
pool->task_queue = NULL;
pool->stop = false;
pthread_mutex_init(&pool->lock, NULL);
pthread_cond_init(&pool->notify, NULL);
pool->threads = (pthread_t*)malloc(sizeof(pthread_t) * thread_count);
for (int i = 0; i < thread_count; i++) {
if (pthread_create(&pool->threads[i], NULL, thread_work, (void*)pool) != 0) {
perror("线程创建失败");
thread_pool_destroy(pool);
return NULL;
}
}
return pool;
}
// 线程的工作函数,不断从任务队列中取任务并执行
void* thread_work(void* arg) {
thread_pool_t* pool = (thread_pool_t*)arg;
while (true) {
pthread_mutex_lock(&pool->lock);
while (pool->task_queue == NULL && !pool->stop) {
pthread_cond_wait(&pool->notify, &pool->lock);
}
if (pool->stop) {
pthread_mutex_unlock(&pool->lock);
break;
}
// 获取任务
task_t* task = pool->task_queue;
if (task != NULL && task->remaining_deps == 0) {
pool->task_queue = task->next;
} else {
task = NULL;
}
pthread_mutex_unlock(&pool->lock);
// 执行任务
if (task != NULL) {
task->func(task->arg);
// 依赖的任务完成后,通知依赖它的任务
pthread_mutex_lock(&pool->lock);
for (int i = 0; i < task->dep_count; i++) {
task->dependencies[i]->remaining_deps--;
if (task->dependencies[i]->remaining_deps == 0) {
// 将依赖任务加入任务队列中
task->dependencies[i]->next = pool->task_queue;
pool->task_queue = task->dependencies[i];
pthread_cond_signal(&pool->notify);
}
}
pthread_mutex_unlock(&pool->lock);
free(task);
}
}
return NULL;
}
// 添加任务到线程池
int thread_pool_add(thread_pool_t* pool, task_func func, void* arg, task_t** dependencies, int dep_count) {
task_t* task = (task_t*)malloc(sizeof(task_t));
if (task == NULL) {
perror("任务分配失败");
return -1;
}
task->func = func;
task->arg = arg;
task->dependencies = dependencies;
task->dep_count = dep_count;
task->remaining_deps = dep_count;
task->next = NULL;
pthread_mutex_lock(&pool->lock);
// 如果没有依赖任务,立即执行
if (task->remaining_deps == 0) {
task->next = pool->task_queue;
pool->task_queue = task;
pthread_cond_signal(&pool->notify);
} else {
// 否则等待依赖任务完成
for (int i = 0; i < dep_count; i++) {
if (dependencies[i] == NULL) {
perror("依赖任务无效");
free(task);
pthread_mutex_unlock(&pool->lock);
return -1;
}
}
}
pthread_mutex_unlock(&pool->lock);
return 0;
}
// 销毁线程池
void thread_pool_destroy(thread_pool_t* pool) {
if (pool == NULL) return;
pthread_mutex_lock(&pool->lock);
pool->stop = true;
pthread_cond_broadcast(&pool->notify);
pthread_mutex_unlock(&pool->lock);
for (int i = 0; i < pool->thread_count; i++) {
pthread_join(pool->threads[i], NULL);
}
free(pool->threads);
task_t* current = pool->task_queue;
while (current != NULL) {
task_t* temp = current;
current = current->next;
free(temp);
}
pthread_mutex_destroy(&pool->lock);
pthread_cond_destroy(&pool->notify);
free(pool);
}
// 测试任务函数
void print_message(void* arg) {
char* message = (char*)arg;
printf("执行任务: %s\n", message);
sleep(1);
}
// 测试代码
int main() {
// 创建线程池,包含 4 个线程
thread_pool_t* pool = thread_pool_create(4);
// 创建任务结构体
task_t* task1 = (task_t*)malloc(sizeof(task_t));
task_t* task2 = (task_t*)malloc(sizeof(task_t));
task_t* task3 = (task_t*)malloc(sizeof(task_t));
// 提交任务1
thread_pool_add(pool, print_message, "任务1", NULL, 0);
// 任务2依赖任务1
task_t* dependencies1[] = {task1};
thread_pool_add(pool, print_message, "任务2", dependencies1, 1);
// 任务3依赖任务2
task_t* dependencies2[] = {task2};
thread_pool_add(pool, print_message, "任务3", dependencies2, 1);
// 模拟主线程等待任务完成
sleep(5);
// 销毁线程池
thread_pool_destroy(pool);
return 0;
}