【C】一文速学----线程池原理与实战
文章目录
- 线程池简介
- 线程池原理
- 线程池实现
- 数据结构设计
- 接口设计
- 线程池实现
线程池简介
线程池是管理一系列线程的资源池,其提供了一种限制和管理线程资源的方式。之所以采用线程池,因为如果每次接收请求就创建一个线程,不断地对线程进行创建和销毁对服务器来说是一种资源的浪费,尤其当接入数量持续增加,系统资源是有限的,这会成为系统的负担。
为了规避上述问题,给系统固定数量的线程来处理请求,这就是线程池。对于如何确定线程池中应该有多少线程,应该依据任务类型来看,如果是CPU密集型,由系统cpu核心数量确定,如果是IO密集型则需要至少cpu核心数量两倍的线程数量。
线程池原理
线程池工作原理如下:
线程池实现
数据结构设计
typedef struct thrdpool_s thrdpool_t;
// 任务执行的规范 ctx 上下文
typedef void (*handler_pt)(void * /* ctx */);
//任务
typedef struct task_s {
void *next; //下一个任务
handler_pt func; //回调函数
void *arg; //回调函数参数
} task_t;
//任务队列
typedef struct task_queue_s {
void *head; //队列头指针
void **tail; //队列尾指针
int block; //标识队列是否为空
//为了学习不同的锁而在这里设置
spinlock_t lock; //队列锁 原子锁适合轻量级、细粒度的操作,更关注性能,主要用于低竞争场景。
pthread_mutex_t mutex; //互斥锁 适合复杂的同步需求,更关注功能,主要用于高竞争或复杂的资源保护。
pthread_cond_t cond; //条件变量
} task_queue_t;
//线程池
struct thrdpool_s {
task_queue_t *task_queue; //任务队列
atomic_int quit; //线程池是否关闭
int thrd_count; //线程数量
pthread_t *threads; //保存线程ID
};
接口设计
只向用户暴露关于线程池的创建,以及向线程池中输入任务和任务执行的函数。对于线程池内部关于任务的添加,任务的消费全部封装。
thrdpool_t *thrdpool_create(int thrd_count);
int thrdpool_post(thrdpool_t *pool, handler_pt func, void *arg);
void thrdpool_waitdone(thrdpool_t *pool);
线程池实现
基于上面的设定,对线程池给予实现
//创建任务队列
static task_queue_t *_taskqueue_create()
{
int ret;
task_queue_t *queue = (task_queue_t *)malloc(sizeof(task_queue_t));
if (queue)
{
ret = pthread_mutex_init(&queue->mutex, NULL);
if (ret == 0)
{
ret = pthread_cond_init(&queue->cond, NULL);
if (ret == 0)
{
queue->head = NULL;
//二级指针,指向head指针的地址
queue->tail = &queue->head;
//设置队列是锁住的,无任务
queue->block = 1;
return queue;
}
pthread_mutex_destroy(&queue->mutex);
}
free(queue);
}
return NULL;
}
//从队列弹出任务
static void * _pop_task(task_queue_t *queue){
//原子锁
spinlock_lock(&queue->lock);
if(queue->head==NULL){
spinlock_unlock(&queue->lock);
return NULL;
}
task_t *task;
task=queue->head;
//强制转换task为二级指针,因为task结构体第一个是next指针,其实link指向的是next
void **link = (void**)task;
//*link的值就是next指向的节点
queue->head=*link;
if(queue->head==NULL)
{
queue->tail=&queue->head;
}
spinlock_unlock(&queue->lock);
return task;
}
//获得任务
static void * _get_task(task_queue_t *queue){
task_t *task;
//当无法再弹出任务是进入循环
while((task=_pop_task(queue))==NULL){
pthread_mutex_lock(&queue->mutex);
//当任务队列锁住时退出互斥锁
if (queue->block == 0) {
pthread_mutex_unlock(&queue->mutex);
return NULL;
}
// 1. 先 unlock(&mtx)
// 2. 在 cond 休眠
// --- __add_task 时唤醒
// 3. 在 cond 唤醒
// 4. 加上 lock(&mtx);
//释放锁,等待信号量变化;当信号量变化,再次重新获得锁
pthread_cond_wait(&queue->cond, &queue->mutex);
pthread_mutex_unlock(&queue->mutex);
}
return task;
}
//线程执行函数
static void *_thrdpool_worker(void *arg){
thrdpool_t *pool=(thrdpool_t *)arg;
task_t* task;
void *ctx;
//检测线程池是否在运行状态
while(atomic_load(&pool->quit)==0){
task=(task_t*)_get_task(pool->task_queue);
if(!task) break;
handler_pt func=task->func;
ctx=task->arg;
free(task);
//执行任务函数
func(ctx);
}
}
//任务队列添加任务
static void _add_task(task_queue_t *queue,void *task){
//强制转换二级指针,link指向next
void **link = (void **)task;
//next指向为NULL
*link=NULL;
spinlock_lock(&queue->lock);
//*queue->tail表示next的值,link即是指向next,同时那个地址也是task的首地址
*queue->tail=link;
queue->tail=link;
spinlock_unlock(&queue->lock);
}
//往线程池中放入任务
int thrdpool_post(thrdpool_t* pool,handler_pt func,void *arg){
if(atomic_load(&pool->quit)==1) return -1;
task_t * task =(task_t*)malloc(sizeof(task_t));
if(!task) return -1;
task->func=func;
task->arg=arg;
_add_task(pool->task_queue,task);
return 0;
}
//创建线程
static int _threads_create(thrdpool_t *pool, int count)
{
int ret;
pthread_attr_t attr;
ret = pthread_attr_init(&attr);
if (ret == 0)
{
pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * count);
if (pool->threads)
{
int i = 0;
for (i = 0; i < count; i++)
{
if ((pthread_create(&pool->threads[i], &attr, _thrdpool_worker, pool)) != 0)
{
break;
}
}
pool->thrd_count = i;
pthread_attr_destroy(&attr);
}
ret = -1;
}
return ret;
}
//线程池创建
thrdpool_t *thrdpool_create(int thrd_count)
{
thrdpool_t *pool;
pool = (thrdpool_t *)malloc(sizeof(thrdpool_t));
if (pool)
{
//创建任务队列
task_queue_t *queue = _taskqueue_create();
if (queue)
{
pool->task_queue = queue;
//初始化线程池标志,0表示线程池在运行状态,1表示停止运行状态
atomic_init(&pool->quit,0);
if (__threads_create(pool, thrd_count) == 0)
return pool;
_taskqueue_destroy(queue);
}
free(pool);
}
return NULL;
}
//销毁任务队列
static void _taskqueue_destroy(task_queue_t *queue) {
task_t *task;
while ((task = __pop_task(queue))) {
free(task);
}
spinlock_destroy(&queue->lock);
pthread_cond_destroy(&queue->cond);
pthread_mutex_destroy(&queue->mutex);
free(queue);
}
//关闭线程池
void thrdpool_waitdone(thrdpool_t *pool) {
int i;
for (i=0; i<pool->thrd_count; i++) {
pthread_join(pool->threads[i], NULL);
}
_taskqueue_destroy(pool->task_queue);
free(pool->threads);
free(pool);
}
在上面的代码中涉及到强制转换二级指针的操作,如果不能很好的理解,可以参考【C】二级指针的原理与应用