Linux从入门到开发实战(C/C++)Day13-线程池
线程池是什么:池化技术之一
面向对象思想:封装
一个程序要用到许多线程,线程池是管理线程的一种方式
一般说三线程两池:
任务线程:生成任务对象并把任务对象添加到 任务池
管理线程:从任务池中取出任务对象,交给 线程池 中的某个线程处理
监督线程:监督线程池中的线程,如果空闲线程数量过少就增加一些,过多就减少一些
简易线程池:
任务池对象
线程池对象
创建并初始化线程池
线程函数:包含监督线程的功能
添加任务到线程池中:任务线程的功能
工作函数
销毁函数
简易线程池:
// 任务类(也是任务池类)
struct job
{
void *(*callback_func)(void *arg); // 任务函数
void *arg; // 数据(函数参数)
struct job *pNext;
};
// 线程池类
struct pthreadPool
{
pthread_t *pthreads; // 数组方式存储所有线程id
struct job *phead; // 任务池头节点
struct job *ptail; // 任务池尾节点
int curNum; // 当前线程数
int maxNum; // 最大线程数限制
// 线程同步
pthread_mutex_t mutex; // 互斥量
pthread_cond_t pool_empty; // 当前池空了
pthread_cond_t pool_not_empty; // 当前池不空
pthread_cond_t pool_not_full; // 当前池不满
pthread_cond_t pool_full; // 当前池满
int job_nums; // 当前任务数
int job_closed; // 是否关闭任务池
int pool_closed; // 是否关闭线程池
};
// 任务函数
void *work(void *arg)
{
char *str = (char *)arg;
printf("``````````任务函数:%s开始\n", str);
sleep(5); // 模拟任务
printf("``````````任务函数:%s结束\n", str);
}
// 线程函数
void *pthread_func(void *arg)
{
// 拿到线程池
struct pthreadPool *pool = (struct pthreadPool *)arg;
if (NULL == pool)
{
return NULL;
}
// 获取具体的任务
struct job *pjob = NULL;
while (1)
{
// 核心就是锁的操作
// 加锁
pthread_mutex_lock(&(pool->mutex));
// 这里进行监督线程工作
// 如果队列为空就阻塞,非空就继续执行
while ((0 == pool->job_nums) && (0 == pool->job_closed))
{
printf("队列为空,阻塞\n");
pthread_cond_wait(&(pool->pool_not_empty), &(pool->mutex));
}
// 如果线程池关闭了就结束线程
if (pool->pool_closed)
{
printf("线程池关闭了,结束线程\n");
pthread_mutex_unlock(&(pool->mutex));
pthread_exit(NULL);
}
// 如果队列不为空就执行任务,当前任务数-1
pool->job_nums--;
pjob = pool->phead;
if (0 == pool->job_nums)
{
pool->phead = pool->ptail = NULL;
printf("任务队列为空,销毁线程函数\n");
pthread_cond_signal(&(pool->pool_empty));
}
else
{
pool->phead = pjob->pNext;
}
// 如果任务队列不满,就添加新任务
if (pool->job_nums < pool->maxNum)
{
printf("任务队列不满,添加新任务\n");
pthread_cond_broadcast(&(pool->pool_not_full));
}
// 解锁
pthread_mutex_unlock(&(pool->mutex));
// 执行线程函数
(*(pjob->callback_func))(pjob->arg);
// 释放任务对象
free(pjob);
pjob = NULL;
}
}
// 创建并初始化线程池
struct pthreadPool *pthreadPool_init(int pthread_nums, int max_nums)
{
// 开内存
struct pthreadPool *pool = (struct pthreadPool *)calloc(1, sizeof(struct pthreadPool));
if (NULL == pool)
{
printf("pthreadPool_init:内存分配失败:%m\n");
return NULL;
}
// 成员赋值
pool->curNum = pthread_nums;
pool->maxNum = max_nums;
pool->job_nums = 0; // 当前0任务
pool->job_closed = pool->job_closed = 0; // 非关闭状态
pool->pthreads = (pthread_t *)calloc(pool->curNum, sizeof(pthread_t));
pthread_mutex_init(&(pool->mutex), NULL);
pthread_cond_init(&(pool->pool_empty), NULL);
pthread_cond_init(&(pool->pool_not_empty), NULL);
pthread_cond_init(&(pool->pool_not_full), NULL);
pthread_cond_init(&(pool->pool_full), NULL);
pool->phead = pool->ptail = NULL;
// 创建线程,阻塞在这里
for (int i = 0; i < pool->curNum; i++)
{
pthread_create(&(pool->pthreads[i]), NULL, pthread_func, (void *)pool);
}
// 返回
return pool;
}
// 添加任务到线程池中
int add_job(struct pthreadPool *pool, void *(*callback_func)(void *arg), void *arg)
{
if (NULL == pool || NULL == callback_func || NULL == arg)
{
printf("pthread_pool:参数错误\n");
return -1;
}
pthread_mutex_lock(&(pool->mutex));
// 阻塞
while ((pool->job_nums >= pool->maxNum) && !(pool->job_closed || pool->pool_closed))
{
printf("任务队列已满,阻塞\n");
pthread_cond_wait(&(pool->pool_not_full), &(pool->mutex));
}
// 关闭
if (pool->job_closed || pool->pool_closed)
{
pthread_mutex_unlock(&(pool->mutex));
return -1;
}
// 往任务池中添加任务
struct job *pjob = (struct job *)calloc(1, sizeof(struct job));
if (NULL == pjob)
{
printf("pthread_pool:内存分配失败:%m\n");
pthread_mutex_unlock(&(pool->mutex));
return -1;
}
pjob->callback_func = callback_func;
pjob->arg = arg;
pjob->pNext = NULL;
if (NULL == pool->phead)
{
pool->phead = pool->ptail = pjob;
// 发信号 队列非空
pthread_cond_signal(&(pool->pool_not_full));
}
else
{
pool->ptail->pNext = pjob;
pool->ptail = pjob;
}
pool->job_nums++;
pthread_mutex_unlock(&(pool->mutex));
}
// 销毁线程池
int destory_pthreadPool(struct pthreadPool *pool)
{
if (NULL == pool)
{
return -1;
}
printf("销毁线程池\n");
// 释放id内存
free(pool->pthreads);
// 如果还有任务,也释放掉
struct job *pjob = pool->phead;
while (pjob)
{
pool->phead = pjob->pNext;
free(pjob);
pjob = pool->phead;
}
// 释放本身
free(pool);
return 0;
}
void pthread_pool()
{
// 创建线程池
struct pthreadPool *pool = pthreadPool_init(5, 20);
// 添加任务
add_job(pool, work, "第1个任务");
add_job(pool, work, "第2个任务");
add_job(pool, work, "第3个任务");
add_job(pool, work, "第4个任务");
add_job(pool, work, "第5个任务");
add_job(pool, work, "第6个任务");
add_job(pool, work, "第7个任务");
add_job(pool, work, "第8个任务");
add_job(pool, work, "第9个任务");
add_job(pool, work, "第10个任务");
add_job(pool, work, "第11个任务");
add_job(pool, work, "第12个任务");
add_job(pool, work, "第13个任务");
add_job(pool, work, "第14个任务");
add_job(pool, work, "第15个任务");
add_job(pool, work, "第16个任务");
add_job(pool, work, "第17个任务");
add_job(pool, work, "第18个任务");
add_job(pool, work, "第19个任务");
add_job(pool, work, "第20个任务");
sleep(20);
destory_pthreadPool(pool);
}
复杂一点的线程池(封装严密一些的)
// 封装线程函数参数
struct tp_work_arg_s
{
void *arg;
};
// 封装线程的工作函数
struct tp_work_s
{
void *(*process_job)(void *arg);
};
// 为了方便使用起别名
typedef struct tp_work_arg_s tp_work_arg;
typedef struct tp_work_s tp_work;
// 线程类
struct tp_thread_info_s
{
pthread_t id; // 线程id
int status; // 线程状态 0-空闲 1-工作中
int need_exit; // 线程是否需要退出 1-需要退出 0-不需要退出
pthread_mutex_t thread_lock; // 互斥线程锁
pthread_cond_t thread_cond; // 条件变量
tp_work *work;
tp_work_arg *work_arg;
struct tp_thread_info_s *next;
struct tp_thread_info_s *prev;
};
// 线程池类
struct tp_thread_pool_s
{
int max_thread_num; // 最大线程数
int cur_thread_num; // 当前线程数
int min_thread_num; // 最小线程数
int idle_thread_num; // 空闲线程数
pthread_mutex_t pool_lock; // 整个线程池的互斥锁
pthread_t manager_id; // 线程池管理线程id
struct tp_thread_info_s *thread_head; // 线程链表头
};
// 起别名
typedef struct tp_thread_info_s tp_thread_info;
typedef struct tp_thread_pool_s tp_thread_pool;
// 创建线程池中的线程
tp_thread_info *create_thread(int num)
{
if (num <= 0)
return NULL;
// 双链表
tp_thread_info *head;
tp_thread_info *cur;
tp_thread_info *prev;
// 头节点
head = (tp_thread_info *)calloc(1, sizeof(tp_thread_info));
if (NULL == head)
{
printf("create_thread中calloc创建head线程失败:%m\n");
return NULL;
}
num--;
// 头节点之后的节点
cur = head;
prev = NULL;
while (num--)
{
cur->prev = prev;
cur->next = (tp_thread_info *)calloc(1, sizeof(tp_thread_info));
prev = cur;
cur = cur->next;
}
cur->next = NULL;
return head;
}
// 管理线程的函数
void *tp_manager_thread(void *arg)
{
// 空闲线程数过多,就删掉一些
// 空闲线程过少,就创建一些
int cur; // 当前线程数
int idle; // 空闲线程数
int min; // 最小线程数
int exit_num; // 退出线程数
tp_thread_pool *pool = (tp_thread_pool *)arg;
// 统计三次
int idle_arg[3] = {0};
int averIdle = 0;
while (1)
{
cur = pool->cur_thread_num;
idle = pool->idle_thread_num;
min = pool->min_thread_num;
exit_num = 0;
idle_arg[2] = idle;
averIdle = (idle_arg[0] + idle_arg[1] + idle_arg[2]) / 3;
printf("==========服务器当前状态==========");
printf("总线程数:%d\n", cur);
printf("空闲线程数:%d\n", idle);
printf("平均空闲线程数:%d\n", averIdle);
// 空闲线程过多
if (averIdle > (cur / 2))
{
exit_num = cur / 5;
}
// 修正
if (cur - exit_num < min)
{
exit_num = cur - min;
}
printf("退出线程数:%d\n", exit_num);
// 循环销毁(跳过第一个,第一个不好操作)
tp_thread_info *phead;
for (phead = pool->thread_head->next; phead != NULL && exit_num > 1; phead = phead->next)
{
if (0 != phead->status)
{
pthread_mutex_unlock(&(phead->thread_lock));
continue;
}
phead->need_exit = 1; // 标记为退出线程
pthread_mutex_unlock(&(phead->thread_lock));
exit_num--;
pthread_cond_signal(&(phead->thread_cond));
idle_arg[0] = idle_arg[1];
idle_arg[1] = idle_arg[2];
}
sleep(5); // 每五秒检测一次
} // end of while (1)
}
// 工作函数
void *myWork(void *arg)
{
// tp_work_arg *a = (tp_work_arg *)arg;
// int val = *(int *)a->arg;
// printf("```````线程%ld开始执行任务%d\n", pthread_self(), val);
printf("=========start of work==========\n");
sleep(5); // 模拟耗时操作
printf("=========end of work==========\n");
// printf("```````线程%ld执行任务%d完成\n", pthread_self(), val);
}
// 初始化线程 返回0表示失败
int init_thread(tp_thread_pool *pool, tp_thread_info *thread)
{
// 锁
pthread_mutex_init(&(thread->thread_lock), NULL);
pthread_cond_init(&(thread->thread_cond), NULL);
int r = pthread_create(&(thread->id), NULL, myWork, pool);
if (-1 == r)
{
printf("init_thread中创建线程失败:%m\n");
return 0;
}
// 设置线程分离(自动释放)
pthread_detach(thread->id);
thread->status = 0;
return 1;
}
// 初始化线程池
int init_thread_pool(tp_thread_pool *pool)
{
tp_thread_info *phead = pool->thread_head;
// 创建并初始化线程池中的线程对象
while (phead != NULL)
{
if (init_thread(pool, phead))
{
printf("初始化线程成功:%lu\n", phead->id);
}
else
{
printf("初始化线程失败:%m\n");
return 0;
}
phead = phead->next;
}
// 创建管理线程
int ret = pthread_create(&(pool->manager_id), NULL, tp_manager_thread, pool);
if (-1 == ret)
{
printf("创建管理线程失败:%m\n");
return 0;
}
else
{
printf("创建管理线程成功:%lu\n", pool->manager_id);
}
}
// 创建线程池
tp_thread_pool *creatd_thread_pool(int min, int max)
{
tp_thread_pool *pool = (tp_thread_pool *)calloc(1, sizeof(tp_thread_pool));
if (NULL == pool)
{
printf("calloc创建线程池失败:%m\n");
return NULL;
}
pool->max_thread_num = max;
pool->min_thread_num = min;
pool->idle_thread_num = min;
pthread_mutex_init(&(pool->pool_lock), NULL);
if (pool->thread_head)
free(pool->thread_head);
// 开内存
pool->thread_head = create_thread(pool->min_thread_num);
if (NULL == pool->thread_head)
{
printf("create_thread创建线程失败:%m\n");
free(pool);
return NULL;
}
// 初始化线程池
init_thread_pool(pool);
return pool;
}
// 通知管理线程增加一个线程
tp_thread_info *tp_add_pthread(tp_thread_pool *pool)
{
// 加一个线程到线程链表的末尾
if (pool->cur_thread_num >= pool->max_thread_num)
{
printf("线程池中线程数达到最大值,无法再增加线程\n");
return NULL;
}
tp_thread_info *pnew = (tp_thread_info *)calloc(1, sizeof(tp_thread_info));
tp_thread_info *phead = pool->thread_head;
while (phead->next != NULL)
{
phead = phead->next;
}
phead->next = pnew;
pnew->prev = phead;
pnew->next = NULL;
pnew->status = 0;
pnew->need_exit = 0;
pnew->work = NULL;
pnew->work_arg = NULL;
pthread_mutex_init(&(pnew->thread_lock), NULL);
pthread_cond_init(&(pnew->thread_cond), NULL);
pool->cur_thread_num++;
int r = pthread_create(&(pnew->id), NULL, myWork, pool);
if (-1 == r)
{
printf("tp_add_pthread创建线程失败:%m\n");
free(pnew);
return NULL;
}
pthread_detach(pnew->id);
pool->idle_thread_num++;
return pnew;
}
// 向线程池中添加任务
int tp_add_job(tp_thread_pool *pool, tp_work *worker, tp_work_arg *arg)
{
// 去线程池中找空闲线程,把任务分配给空闲线程并执行任务
tp_thread_info *phead;
tp_thread_info *pnew;
while (1)
{
for (phead = pool->thread_head; phead != NULL; phead = phead->next)
{
if (0 == phead->status)
{
printf("找到空闲线程%lu\n", phead->id);
// 设置为非空闲状态
phead->status = 1;
pool->idle_thread_num--;
pthread_mutex_unlock(&(phead->thread_lock));
phead->work = worker;
phead->work_arg = arg;
// 唤醒该线程
pthread_cond_signal(&(phead->thread_cond));
}
} // end of for(phead=pool->thread_head;phead!=NULL;phead=phead->next)
// 到这里说明没有空闲线程,就需要新增线程
// 这里创建新线程并开内存,在监督线程启动
pthread_mutex_lock(&(pool->pool_lock));
pnew = tp_add_pthread(pool);
if (pnew)
{
pnew->work = worker;
pnew->work_arg = arg;
pthread_cond_signal(&(pnew->thread_cond));
pthread_mutex_unlock(&(pool->pool_lock));
return 1;
}
// 没创建成功就解锁,进行新一轮循环
pthread_mutex_unlock(&(pool->pool_lock));
}
return 1;
}
void pthread_pool_plus()
{
int working[10] = {0};
// 1.创建线程池
tp_thread_pool *pool = creatd_thread_pool(3, 20);
// 2.创建任务池
tp_work *worker = (tp_work *)calloc(1, sizeof(tp_work));
worker->process_job = myWork;
for (int i = 0; i < 10; i++)
{
tp_work_arg *arg = (tp_work_arg *)calloc(1, sizeof(tp_work_arg));
working[i] = i + 60;
arg->arg = (void *)&(working[i]);
tp_add_job(pool, worker, arg);
}
sleep(100);
}