当前位置: 首页 > article >正文

Linux从入门到开发实战(C/C++)Day13-线程池

线程池是什么:池化技术之一
        面向对象思想:封装
        一个程序要用到许多线程,线程池是管理线程的一种方式
        一般说三线程两池:
            任务线程:生成任务对象并把任务对象添加到 任务池
            管理线程:从任务池中取出任务对象,交给 线程池 中的某个线程处理
            监督线程:监督线程池中的线程,如果空闲线程数量过少就增加一些,过多就减少一些

        简易线程池:
            任务池对象
            线程池对象

            创建并初始化线程池
            线程函数:包含监督线程的功能
            添加任务到线程池中:任务线程的功能
            工作函数
            销毁函数


简易线程池:

// 任务类(也是任务池类)
struct job
{
	void *(*callback_func)(void *arg); // 任务函数
	void *arg;						   // 数据(函数参数)
	struct job *pNext;
};

// 线程池类
struct pthreadPool
{
	pthread_t *pthreads; // 数组方式存储所有线程id
	struct job *phead;	 // 任务池头节点
	struct job *ptail;	 // 任务池尾节点

	int curNum; // 当前线程数
	int maxNum; // 最大线程数限制

	// 线程同步
	pthread_mutex_t mutex;		   // 互斥量
	pthread_cond_t pool_empty;	   // 当前池空了
	pthread_cond_t pool_not_empty; // 当前池不空
	pthread_cond_t pool_not_full;  // 当前池不满
	pthread_cond_t pool_full;	   // 当前池满

	int job_nums;	 // 当前任务数
	int job_closed;	 // 是否关闭任务池
	int pool_closed; // 是否关闭线程池
};

// 任务函数
void *work(void *arg)
{
	char *str = (char *)arg;
	printf("``````````任务函数:%s开始\n", str);
	sleep(5); // 模拟任务
	printf("``````````任务函数:%s结束\n", str);
}

// 线程函数
void *pthread_func(void *arg)
{
	// 拿到线程池
	struct pthreadPool *pool = (struct pthreadPool *)arg;
	if (NULL == pool)
	{
		return NULL;
	}
	// 获取具体的任务
	struct job *pjob = NULL;

	while (1)
	{
		// 核心就是锁的操作
		// 加锁
		pthread_mutex_lock(&(pool->mutex));

		// 这里进行监督线程工作
		// 如果队列为空就阻塞,非空就继续执行
		while ((0 == pool->job_nums) && (0 == pool->job_closed))
		{
			printf("队列为空,阻塞\n");
			pthread_cond_wait(&(pool->pool_not_empty), &(pool->mutex));
		}
		// 如果线程池关闭了就结束线程
		if (pool->pool_closed)
		{
			printf("线程池关闭了,结束线程\n");
			pthread_mutex_unlock(&(pool->mutex));
			pthread_exit(NULL);
		}
		// 如果队列不为空就执行任务,当前任务数-1
		pool->job_nums--;
		pjob = pool->phead;
		if (0 == pool->job_nums)
		{
			pool->phead = pool->ptail = NULL;
			printf("任务队列为空,销毁线程函数\n");
			pthread_cond_signal(&(pool->pool_empty));
		}
		else
		{
			pool->phead = pjob->pNext;
		}
		// 如果任务队列不满,就添加新任务
		if (pool->job_nums < pool->maxNum)
		{
			printf("任务队列不满,添加新任务\n");
			pthread_cond_broadcast(&(pool->pool_not_full));
		}

		// 解锁
		pthread_mutex_unlock(&(pool->mutex));

		// 执行线程函数
		(*(pjob->callback_func))(pjob->arg);
		// 释放任务对象
		free(pjob);
		pjob = NULL;
	}
}

// 创建并初始化线程池
struct pthreadPool *pthreadPool_init(int pthread_nums, int max_nums)
{
	// 开内存
	struct pthreadPool *pool = (struct pthreadPool *)calloc(1, sizeof(struct pthreadPool));
	if (NULL == pool)
	{
		printf("pthreadPool_init:内存分配失败:%m\n");
		return NULL;
	}

	// 成员赋值
	pool->curNum = pthread_nums;
	pool->maxNum = max_nums;
	pool->job_nums = 0;						 // 当前0任务
	pool->job_closed = pool->job_closed = 0; // 非关闭状态
	pool->pthreads = (pthread_t *)calloc(pool->curNum, sizeof(pthread_t));
	pthread_mutex_init(&(pool->mutex), NULL);
	pthread_cond_init(&(pool->pool_empty), NULL);
	pthread_cond_init(&(pool->pool_not_empty), NULL);
	pthread_cond_init(&(pool->pool_not_full), NULL);
	pthread_cond_init(&(pool->pool_full), NULL);

	pool->phead = pool->ptail = NULL;

	// 创建线程,阻塞在这里
	for (int i = 0; i < pool->curNum; i++)
	{
		pthread_create(&(pool->pthreads[i]), NULL, pthread_func, (void *)pool);
	}

	// 返回
	return pool;
}

// 添加任务到线程池中
int add_job(struct pthreadPool *pool, void *(*callback_func)(void *arg), void *arg)
{
	if (NULL == pool || NULL == callback_func || NULL == arg)
	{
		printf("pthread_pool:参数错误\n");
		return -1;
	}
	pthread_mutex_lock(&(pool->mutex));
	// 阻塞
	while ((pool->job_nums >= pool->maxNum) && !(pool->job_closed || pool->pool_closed))
	{
		printf("任务队列已满,阻塞\n");
		pthread_cond_wait(&(pool->pool_not_full), &(pool->mutex));
	}
	// 关闭
	if (pool->job_closed || pool->pool_closed)
	{
		pthread_mutex_unlock(&(pool->mutex));
		return -1;
	}

	// 往任务池中添加任务
	struct job *pjob = (struct job *)calloc(1, sizeof(struct job));
	if (NULL == pjob)
	{
		printf("pthread_pool:内存分配失败:%m\n");
		pthread_mutex_unlock(&(pool->mutex));
		return -1;
	}
	pjob->callback_func = callback_func;
	pjob->arg = arg;
	pjob->pNext = NULL;
	if (NULL == pool->phead)
	{
		pool->phead = pool->ptail = pjob;
		// 发信号 队列非空
		pthread_cond_signal(&(pool->pool_not_full));
	}
	else
	{
		pool->ptail->pNext = pjob;
		pool->ptail = pjob;
	}
	pool->job_nums++;

	pthread_mutex_unlock(&(pool->mutex));
}

// 销毁线程池
int destory_pthreadPool(struct pthreadPool *pool)
{
	if (NULL == pool)
	{
		return -1;
	}
	printf("销毁线程池\n");
	// 释放id内存
	free(pool->pthreads);
	// 如果还有任务,也释放掉
	struct job *pjob = pool->phead;
	while (pjob)
	{
		pool->phead = pjob->pNext;

		free(pjob);

		pjob = pool->phead;
	}
	// 释放本身
	free(pool);
	return 0;
}

void pthread_pool()
{
	// 创建线程池
	struct pthreadPool *pool = pthreadPool_init(5, 20);
	// 添加任务
	add_job(pool, work, "第1个任务");
	add_job(pool, work, "第2个任务");
	add_job(pool, work, "第3个任务");
	add_job(pool, work, "第4个任务");
	add_job(pool, work, "第5个任务");
	add_job(pool, work, "第6个任务");
	add_job(pool, work, "第7个任务");
	add_job(pool, work, "第8个任务");
	add_job(pool, work, "第9个任务");
	add_job(pool, work, "第10个任务");
	add_job(pool, work, "第11个任务");
	add_job(pool, work, "第12个任务");
	add_job(pool, work, "第13个任务");
	add_job(pool, work, "第14个任务");
	add_job(pool, work, "第15个任务");
	add_job(pool, work, "第16个任务");
	add_job(pool, work, "第17个任务");
	add_job(pool, work, "第18个任务");
	add_job(pool, work, "第19个任务");
	add_job(pool, work, "第20个任务");

	sleep(20);
	destory_pthreadPool(pool);
}

复杂一点的线程池(封装严密一些的)

// 封装线程函数参数
struct tp_work_arg_s
{
	void *arg;
};
// 封装线程的工作函数
struct tp_work_s
{
	void *(*process_job)(void *arg);
};
// 为了方便使用起别名
typedef struct tp_work_arg_s tp_work_arg;
typedef struct tp_work_s tp_work;
// 线程类
struct tp_thread_info_s
{
	pthread_t id;  // 线程id
	int status;	   // 线程状态 0-空闲 1-工作中
	int need_exit; // 线程是否需要退出 1-需要退出 0-不需要退出

	pthread_mutex_t thread_lock; // 互斥线程锁
	pthread_cond_t thread_cond;	 // 条件变量

	tp_work *work;
	tp_work_arg *work_arg;

	struct tp_thread_info_s *next;
	struct tp_thread_info_s *prev;
};
// 线程池类
struct tp_thread_pool_s
{
	int max_thread_num;	 // 最大线程数
	int cur_thread_num;	 // 当前线程数
	int min_thread_num;	 // 最小线程数
	int idle_thread_num; // 空闲线程数

	pthread_mutex_t pool_lock;			  // 整个线程池的互斥锁
	pthread_t manager_id;				  // 线程池管理线程id
	struct tp_thread_info_s *thread_head; // 线程链表头
};
// 起别名
typedef struct tp_thread_info_s tp_thread_info;
typedef struct tp_thread_pool_s tp_thread_pool;
// 创建线程池中的线程
tp_thread_info *create_thread(int num)
{
	if (num <= 0)
		return NULL;
	// 双链表
	tp_thread_info *head;
	tp_thread_info *cur;
	tp_thread_info *prev;
	// 头节点
	head = (tp_thread_info *)calloc(1, sizeof(tp_thread_info));
	if (NULL == head)
	{
		printf("create_thread中calloc创建head线程失败:%m\n");
		return NULL;
	}
	num--;
	// 头节点之后的节点
	cur = head;
	prev = NULL;
	while (num--)
	{
		cur->prev = prev;
		cur->next = (tp_thread_info *)calloc(1, sizeof(tp_thread_info));
		prev = cur;
		cur = cur->next;
	}
	cur->next = NULL;
	return head;
}
// 管理线程的函数
void *tp_manager_thread(void *arg)
{
	// 空闲线程数过多,就删掉一些
	// 空闲线程过少,就创建一些
	int cur;	  // 当前线程数
	int idle;	  // 空闲线程数
	int min;	  // 最小线程数
	int exit_num; // 退出线程数
	tp_thread_pool *pool = (tp_thread_pool *)arg;
	// 统计三次
	int idle_arg[3] = {0};
	int averIdle = 0;
	while (1)
	{
		cur = pool->cur_thread_num;
		idle = pool->idle_thread_num;
		min = pool->min_thread_num;
		exit_num = 0;
		idle_arg[2] = idle;
		averIdle = (idle_arg[0] + idle_arg[1] + idle_arg[2]) / 3;
		printf("==========服务器当前状态==========");
		printf("总线程数:%d\n", cur);
		printf("空闲线程数:%d\n", idle);
		printf("平均空闲线程数:%d\n", averIdle);

		// 空闲线程过多
		if (averIdle > (cur / 2))
		{
			exit_num = cur / 5;
		}
		// 修正
		if (cur - exit_num < min)
		{
			exit_num = cur - min;
		}
		printf("退出线程数:%d\n", exit_num);
		// 循环销毁(跳过第一个,第一个不好操作)
		tp_thread_info *phead;
		for (phead = pool->thread_head->next; phead != NULL && exit_num > 1; phead = phead->next)
		{
			if (0 != phead->status)
			{
				pthread_mutex_unlock(&(phead->thread_lock));
				continue;
			}
			phead->need_exit = 1; // 标记为退出线程
			pthread_mutex_unlock(&(phead->thread_lock));
			exit_num--;
			pthread_cond_signal(&(phead->thread_cond));

			idle_arg[0] = idle_arg[1];
			idle_arg[1] = idle_arg[2];
		}
		sleep(5); // 每五秒检测一次
	} // end of while (1)
}
// 工作函数
void *myWork(void *arg)
{
	// tp_work_arg *a = (tp_work_arg *)arg;
	// int val = *(int *)a->arg;
	// printf("```````线程%ld开始执行任务%d\n", pthread_self(), val);
	printf("=========start of work==========\n");
	sleep(5); // 模拟耗时操作
	printf("=========end of work==========\n");
	// printf("```````线程%ld执行任务%d完成\n", pthread_self(), val);
}
// 初始化线程 返回0表示失败
int init_thread(tp_thread_pool *pool, tp_thread_info *thread)
{
	// 锁
	pthread_mutex_init(&(thread->thread_lock), NULL);
	pthread_cond_init(&(thread->thread_cond), NULL);

	int r = pthread_create(&(thread->id), NULL, myWork, pool);
	if (-1 == r)
	{
		printf("init_thread中创建线程失败:%m\n");
		return 0;
	}
	// 设置线程分离(自动释放)
	pthread_detach(thread->id);
	thread->status = 0;
	return 1;
}
// 初始化线程池
int init_thread_pool(tp_thread_pool *pool)
{
	tp_thread_info *phead = pool->thread_head;
	// 创建并初始化线程池中的线程对象
	while (phead != NULL)
	{
		if (init_thread(pool, phead))
		{
			printf("初始化线程成功:%lu\n", phead->id);
		}
		else
		{
			printf("初始化线程失败:%m\n");
			return 0;
		}
		phead = phead->next;
	}
	// 创建管理线程
	int ret = pthread_create(&(pool->manager_id), NULL, tp_manager_thread, pool);
	if (-1 == ret)
	{
		printf("创建管理线程失败:%m\n");
		return 0;
	}
	else
	{
		printf("创建管理线程成功:%lu\n", pool->manager_id);
	}
}
// 创建线程池
tp_thread_pool *creatd_thread_pool(int min, int max)
{
	tp_thread_pool *pool = (tp_thread_pool *)calloc(1, sizeof(tp_thread_pool));
	if (NULL == pool)
	{
		printf("calloc创建线程池失败:%m\n");
		return NULL;
	}
	pool->max_thread_num = max;
	pool->min_thread_num = min;
	pool->idle_thread_num = min;

	pthread_mutex_init(&(pool->pool_lock), NULL);

	if (pool->thread_head)
		free(pool->thread_head);
	// 开内存
	pool->thread_head = create_thread(pool->min_thread_num);
	if (NULL == pool->thread_head)
	{
		printf("create_thread创建线程失败:%m\n");
		free(pool);
		return NULL;
	}
	// 初始化线程池
	init_thread_pool(pool);

	return pool;
}
// 通知管理线程增加一个线程
tp_thread_info *tp_add_pthread(tp_thread_pool *pool)
{
	// 加一个线程到线程链表的末尾
	if (pool->cur_thread_num >= pool->max_thread_num)
	{
		printf("线程池中线程数达到最大值,无法再增加线程\n");
		return NULL;
	}
	tp_thread_info *pnew = (tp_thread_info *)calloc(1, sizeof(tp_thread_info));
	tp_thread_info *phead = pool->thread_head;
	while (phead->next != NULL)
	{
		phead = phead->next;
	}
	phead->next = pnew;
	pnew->prev = phead;
	pnew->next = NULL;
	pnew->status = 0;
	pnew->need_exit = 0;
	pnew->work = NULL;
	pnew->work_arg = NULL;
	pthread_mutex_init(&(pnew->thread_lock), NULL);
	pthread_cond_init(&(pnew->thread_cond), NULL);
	pool->cur_thread_num++;
	int r = pthread_create(&(pnew->id), NULL, myWork, pool);
	if (-1 == r)
	{
		printf("tp_add_pthread创建线程失败:%m\n");
		free(pnew);
		return NULL;
	}
	pthread_detach(pnew->id);
	pool->idle_thread_num++;
	return pnew;
}
// 向线程池中添加任务
int tp_add_job(tp_thread_pool *pool, tp_work *worker, tp_work_arg *arg)
{
	// 去线程池中找空闲线程,把任务分配给空闲线程并执行任务
	tp_thread_info *phead;
	tp_thread_info *pnew;
	while (1)
	{
		for (phead = pool->thread_head; phead != NULL; phead = phead->next)
		{
			if (0 == phead->status)
			{
				printf("找到空闲线程%lu\n", phead->id);
				// 设置为非空闲状态
				phead->status = 1;
				pool->idle_thread_num--;
				pthread_mutex_unlock(&(phead->thread_lock));
				phead->work = worker;
				phead->work_arg = arg;
				// 唤醒该线程
				pthread_cond_signal(&(phead->thread_cond));
			}
		} // end of for(phead=pool->thread_head;phead!=NULL;phead=phead->next)

		// 到这里说明没有空闲线程,就需要新增线程
		// 这里创建新线程并开内存,在监督线程启动
		pthread_mutex_lock(&(pool->pool_lock));
		pnew = tp_add_pthread(pool);
		if (pnew)
		{
			pnew->work = worker;
			pnew->work_arg = arg;
			pthread_cond_signal(&(pnew->thread_cond));
			pthread_mutex_unlock(&(pool->pool_lock));
			return 1;
		}
		// 没创建成功就解锁,进行新一轮循环
		pthread_mutex_unlock(&(pool->pool_lock));
	}
	return 1;
}

void pthread_pool_plus()
{
	int working[10] = {0};
	// 1.创建线程池
	tp_thread_pool *pool = creatd_thread_pool(3, 20);
	// 2.创建任务池
	tp_work *worker = (tp_work *)calloc(1, sizeof(tp_work));
	worker->process_job = myWork;
	for (int i = 0; i < 10; i++)
	{
		tp_work_arg *arg = (tp_work_arg *)calloc(1, sizeof(tp_work_arg));
		working[i] = i + 60;
		arg->arg = (void *)&(working[i]);
		tp_add_job(pool, worker, arg);
	}
	sleep(100);
}

 

 


http://www.kler.cn/a/302203.html

相关文章:

  • 【ROS2】坐标TF变换工具-tf2_ros
  • C/C++圣诞树
  • WPF+MVVM案例实战与特效(四十五)- 打造优雅交互:ListBox 的高级定制与行为触发(侧边菜单交互面板)
  • 一些elasticsearch重要概念与配置参数
  • Cline 3.0发布:从AI编程助手到通用智能体平台的进化
  • Android 蓝牙Bluedroid线程池设计思路介绍
  • 滚雪球学SpringCloud[1.1]:Spring Cloud概述与环境搭建(入门章节)
  • QT中使用UTF-8编码
  • Linux echo命令讲解及与重定向符搭配使用方法,tail命令及日志监听方式详解
  • 从戴尔公司中国大饭店DTF大会,看科技外企如何在中国市场发展
  • Docker快速部署Apache Guacamole
  • 前端三件套(HTML,CSS,JS)查漏补缺
  • 交换两实数的整数部分
  • 【数据结构】选择题错题集
  • log4j 的参数配置
  • CUDA-中值滤波算法
  • git标签、repo如何打tag
  • 828华为云征文|基于华为云Flexus云服务器X部署Minio服务
  • 领夹麦克风哪个品牌好?大疆、西圣、博雅无线麦克风在线测评
  • 关于 Embedding 的个人粗略见解
  • cross-plateform 跨平台应用程序-05-Flutter 介绍
  • 【2024 版】最新 kali linux 入门及常用简单工具介绍(非常详细)
  • Unet改进30:添加CAA(2024最新改进方法)|上下文锚定注意模块来捕获远程上下文信息。
  • UE5 性能分析 UnrealInsights
  • MATLAB下载详细教程及下载链接
  • 如何取消密码?打印加密的PDF文件?