Linux:线程池
什么是线程池
线程池就是一个容纳多个线程的容器,对于一线线程我们可以多次对此线程进行重复使用,从而省去频繁创建线程对象的操作。
妈的写死我了。。回来再说,金工实习去了
#include<stdio.h>
#include<pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
//来写一个线程池,线程池需要一个存储任务的队列,线程池的容器,线程执行的任务
//任务队列创造
typedef struct task{
void* arg;//指向指向函数的参数
void(*run_task)(void*arg);//指向任务函数的指针
struct task *next; //next指针,指向下一个任务节点
}task;
//创建线程池
typedef struct thread_pool{
task* first;//队列的开头
task* end;//队列结尾
int threadNum;//线程池数量
int tasksize;//任务队列的大小
pthread_mutex_t mutex_pool;//线程池的锁
pthread_cond_t notempty;//确认队列是不是空
pthread_t*threads;
int shutdowm;//是否销毁线程池
}pthread_pool;
//创建接口
//执行任务
void* thread_work(void*arg){
pthread_pool*pool=(pthread_pool*)arg;
while (1) {
pthread_mutex_lock(&pool->mutex_pool);
// 如果没有任务且线程池未关闭,等待任务
while (pool->tasksize == 0 && !pool->shutdowm) {
pthread_cond_wait(&pool->notempty, &pool->mutex_pool);
}
// 如果线程池关闭且任务为空,退出线程
if (pool->shutdowm && pool->tasksize == 0) {
pthread_mutex_unlock(&pool->mutex_pool);
pthread_exit(NULL);
}
// 获取任务
task* t = pool->first;
pool->first = t->next;
if (pool->first == NULL) {
pool->end = NULL;
}
pool->tasksize--;
pthread_mutex_unlock(&pool->mutex_pool);
// 执行任务
t->run_task(t->arg);
free(t);
}
}
//给线程池添加任务
void addWork(pthread_pool* pool, void* arg, void(*run_task)(void*arg)){
if(pool->shutdowm==1){//如果线程池已经关闭
return;
}
//初始化
task* task_pool=(task*)malloc(sizeof(task));
task_pool->arg=arg;
task_pool->run_task=run_task;
task_pool->next=NULL;
//初始化完毕,开锁!
pthread_mutex_lock(&pool->mutex_pool);
if(pool->first==NULL){//如果是第一个线程
pool->first=task_pool;
pool->end=task_pool;
}else{
pool->end->next=task_pool;//追加新节点
pool->end=task_pool;//更新链表
}
pool->tasksize++;
pthread_cond_signal(&pool->notempty);//唤醒阻塞的线程
pthread_mutex_unlock(&pool->mutex_pool);//解锁
}
//初始化线程
pthread_pool* pthread_init(int number){
pthread_pool *pool=(pthread_pool*)malloc(sizeof(pthread_pool));
if(pool==NULL){
perror("malloc error");
return NULL;
}
//初始化线程池信息
pool->threadNum=number;
pool->first=NULL;
pool->end=NULL;
pool->tasksize=0;
//初始化条件变量和互斥锁
pthread_mutex_init(&pool->mutex_pool,NULL);
pthread_cond_init(&pool->notempty,NULL);
//初始化线程表
pool->threads=(pthread_t*)malloc(number*sizeof(pthread_t));
//生产线程
for(int i=0;i<number;i++){
pthread_t tid;
pthread_create(&tid,NULL,thread_work,pool);//传入你的线程池结构体
}
return pool;
}
//写个模拟任务吧
void run_task(void* arg){
char* task_name = (char*)arg;
printf("任务 %s 正在执行...\n", task_name);
usleep(10);
}
//销毁线程
int pthread_destory(pthread_pool* pool){
if(pool==NULL){
return 0;
}
pool->shutdowm=1;
for(int i=0;i<pool->threadNum;i++){
pthread_cond_broadcast(&pool->notempty);//唤醒阻塞线程
pthread_join(pool->threads[i], NULL);//分离线程
printf("线程-%d正在销毁。。。\n",i);
}
pthread_mutex_destroy(&pool->mutex_pool);
pthread_cond_destroy(&pool->notempty);
//释放线程池
free(pool);
pool=NULL;
return 0;
}
int main(){
pthread_pool* pool=pthread_init(5);//创建5个线程
// 添加任务到线程池
for (int i = 0; i < 10; i++) {
char* task_name = malloc(20);
snprintf(task_name, 20, "任务-%d", i + 1);
addWork(pool, task_name, run_task);
}
usleep(100);
pthread_destory(pool);
return 0;
}
我回来了,恨,因为没戴帽子被扣分了
这个代码的思路是这样的:
首先我们需要一个队列来维护我们的任务,线程池里的线程每次只要从队列里拿任务就可以了
然后创造一个结构体,存储线程池内部的信息,包括:线程的id,线程池中线程的数量,线程队列的队头、队尾,任务队列的大小,互斥锁,条件变量,线程池是否销毁的标志量
一个线程池中的线程被重复的执行任务,对于线程的操作有:初始化线程、添加任务、执行任务、销毁线程池
初始化线程:创建线程,初始化线程池相关的量
添加任务:整理线程池的队列,锁上,初始化相关内容,解锁
执行任务:整理队列,获取任务、执行,任务size--
销毁线程池:设置线程池的标志量=1,唤醒线程、分离线程、销毁锁,释放内存
再写一个模拟任务执行的函数
忘了改哪了。。好像就会看起来会舒服一点:
#include<stdio.h>
#include<pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
//来写一个线程池,线程池需要一个存储任务的队列,线程池的容器,线程执行的任务
//任务队列创造
typedef struct task{
void* arg;//指向指向函数的参数
void(*run_task)(void*arg);//指向任务函数的指针
struct task *next; //next指针,指向下一个任务节点
}task;
//创建线程池
typedef struct thread_pool{
task* first;//队列的开头
task* end;//队列结尾
int threadNum;//线程池数量
int tasksize;//任务队列的大小
pthread_mutex_t mutex_pool;//线程池的锁
pthread_cond_t notempty;//确认队列是不是空
pthread_t*threads;
int shutdowm;//是否销毁线程池
}pthread_pool;
//创建接口
//执行任务
void* thread_work(void*arg){
pthread_pool*pool=(pthread_pool*)arg;
while (1) {
pthread_mutex_lock(&pool->mutex_pool);
// 如果没有任务且线程池未关闭,等待任务
while (pool->tasksize == 0 && !pool->shutdowm) {
pthread_cond_wait(&pool->notempty, &pool->mutex_pool);
}
// 如果线程池关闭且任务为空,退出线程
if (pool->shutdowm && pool->tasksize == 0) {
pthread_mutex_unlock(&pool->mutex_pool);
pthread_exit(NULL);
}
// 获取任务
task* t = pool->first;
pool->first = t->next;
if (pool->first == NULL) {
pool->end = NULL;
}
pool->tasksize--;
pthread_mutex_unlock(&pool->mutex_pool);
// 执行任务
t->run_task(t->arg);
free(t);
}
}
//给线程池添加任务
void addWork(pthread_pool* pool, void* arg, void(*run_task)(void*arg)){
if(pool->shutdowm==1){//如果线程池已经关闭
return;
}
//初始化
task* task_pool=(task*)malloc(sizeof(task));
task_pool->arg=arg;
task_pool->run_task=run_task;
task_pool->next=NULL;
//初始化完毕,锁!
pthread_mutex_lock(&pool->mutex_pool);
if(pool->first==NULL){//如果是第一个线程
pool->first=task_pool;
pool->end=task_pool;
}else{
pool->end->next=task_pool;//追加新节点
pool->end=task_pool;//更新链表
}
pool->tasksize++;
printf("添加任务中。。。\n");
pthread_cond_signal(&pool->notempty);//唤醒阻塞的线程
pthread_mutex_unlock(&pool->mutex_pool);//解锁
}
//初始化线程
pthread_pool* pthread_init(int number){
pthread_pool *pool=(pthread_pool*)malloc(sizeof(pthread_pool));
if(pool==NULL){
perror("malloc error");
return NULL;
}
//初始化线程池信息
pool->threadNum=number;
pool->first=NULL;
pool->end=NULL;
pool->tasksize=0;
//初始化条件变量和互斥锁
pthread_mutex_init(&pool->mutex_pool,NULL);
pthread_cond_init(&pool->notempty,NULL);
//初始化线程表
pool->threads=(pthread_t*)malloc(number*sizeof(pthread_t));
//生产线程
for(int i=0;i<number;i++){
printf("线程%d初始化中。。。\n",i+1);
pthread_create(&pool->threads[i],NULL,thread_work,pool);//传入你的线程池结构体
}
return pool;
}
//写个模拟任务吧
void run_task(void* arg){
char* task_name = (char*)arg;
printf("任务 %s 正在执行...\n", task_name);
usleep(10);
}
//销毁线程
int pthread_destory(pthread_pool* pool){
if(pool==NULL){
return 0;
}
pool->shutdowm=1;
for(int i=0;i<pool->threadNum;i++){
pthread_cond_broadcast(&pool->notempty);//唤醒阻塞线程
pthread_join(pool->threads[i], NULL);//分离线程
printf("线程-%d正在销毁。。。\n",i+1);
}
pthread_mutex_destroy(&pool->mutex_pool);
pthread_cond_destroy(&pool->notempty);
free(pool->threads);
//释放线程池
free(pool);
pool=NULL;
return 0;
}
int main(){
pthread_pool* pool=pthread_init(5);//创建5个线程
// 添加任务到线程池
for (int i = 0; i < 10; i++) {
char* task_name = malloc(20);
snprintf(task_name, 20, "任务-%d", i + 1);
addWork(pool, task_name, run_task);
}
usleep(1000);
pthread_destory(pool);
return 0;
}
为什么要使用线程池
频繁的进行进程的创建与销毁将带来很多开销。不但如此,进程间频繁的切换也将减低 CPU 的利用率。
如果能复用之前创建的进程,而不是为每个并发任务创建一个进程,能有效降低进程创建与销毁的开销并减少进程间切换,从而减少对 CPU 资源的浪费。
虽然线程创建与销毁的代价小于进程创建与销毁,隶属同一进程的线程间切换的代价也小于进程间切换,但复用之前创建的线程,也能有效降低线程创建与销毁的开销并减少线程间切换,从而减少对 CPU 资源的浪费。
当任务很多的时,我们就可以调用线程池,从而有效的的对CPU资源的浪费。
线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
什么时候会用到线程池技术:
单个任务小而任务量巨大,例如访问一个web网页、一个点击量大的网站;而对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了
总体的设计流程就是:
创建固定数量的线程池,从任务队列中获取任务对象
找到任务对象并执行
维护任务队列