当前位置: 首页 > article >正文

手写线程池

一 项目概念

1.1  并发与并行

并发:在同一时间上有多个任务进行,每个任务都分配了时间片,cpu切换速度较快,从宏观上可以看作是一起执行的,实际上一段时间,只有一个任务

并行:指在同一时刻,有多个任务在同时执行,这需要有多个处理单元(如多个 CPU 核心、多个处理器或多台计算机)同时工作,每个任务可以分配到不同的处理单元上,真正地同时进行处理。

1.2  IO密集型和CPU密集型

IO密集型: 程序里面的指令,涉及了一些IO操作,比如设备,网络,文件操作,对于计算操作较少

CPU密集型:CPU 密集型任务或应用程序是指在其执行过程中,主要的时间和资源消耗在 CPU 的计算处理上

那对于这两个来说,单核和多核,使用多线程有什么不同呢?

对于IO密集型来说,无论是单核还是多核使用多线程都是可以的,IO密集型更加时候多线程的模式,因为会有等待外部输入的情况的

对于CPU密集型来说如果是单核多线程,那么就会出现一个线程算一段,算完,再组合起来,其实线程的调度是有额外消费的(线程的上下文切换)

所以并不是所有的情况都适合用多线程

1.3  创建很多线程可以吗?线程真的是越多越好?

1.线程的创建和销毁都是非常“重”的操作

2.线程本身占用大量内存

3.线程的上下文的切换需要占用大量时间

4.线程的大量唤醒会导致系统负载很大,可能导致宕机

每个线程都有线程函数,而我们的用户空间在虚拟地址上只有3G,那么每个函数都会开一个栈帧,那么空间被大量占用,运行的空间就会少

1.4  线程同步

线程通信包括线程互斥和线程通信

线程互斥包括了互斥锁和atomic原子类型

线程通信包括条件变量和信号量

二  项目整体架构

这张图片就是整个项目的整体架构了,这个项目的使用就是外部写出需求,把需求扔进线程池里面,里面的线程会去拿取任务,然后通过返回值得到结果

所以ThreadPool里面有两个队列,一个是任务队列,一个是线程队列

这里我们需要注意的是任务队列会被多个线程进入,所以要保证是线程安全的,线程队列不用,因为没有线程竞争的情况

2.1 ThreadPool类

这个类需要包含两个队列,一个是任务队列,一个是线程队列

线程用的map,任务用的queue,对于线程容器来说,在后面细节的设计中涉及了进程id的使用,所以我们使用map这样操作比较方便,对于任务队列来说,先进先出的特性,很快就想到了队列

我们使用的时候需要有开启线程池的接口,同时我们还需要设计一下线程池的启动模式,比如是动态增长的,还是固定不变的

关于队列的大小,不是固定不变的

对于 Result 我们后面再做解析

对于start函数的详解

void ThreadPool::start(int initThreadSize)
{
	isPoolRunning_ = true;
	curThreadSize_ = initThreadSize;
	this->initThreadSize_ = initThreadSize;

	for (int i = 0; i < initThreadSize_; i++)
	{
		auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this,std::placeholders::_1));
		int threadId = ptr->getThreadId();
		threads_.emplace(threadId, std::move(ptr));
	/*	this->threads_.emplace_back(std::move(ptr));*/

	}

	for (int i = 0; i < initThreadSize_; i++)
	{

		this->threads_[i]->start();
		idleThreadSize_++;
	}
}

对于线程池的开始,我们应该先把线程队列创建好,也就是创建一个一个的线程,放入到容器中去,其中线程对象使用的智能指针,传入了一个函数对象,使用了bind绑定了一个函数,然后加入到容器中,其中需要注意的是,因为是unique_ptr是不能拷贝和赋值的,所以我们得使用move(ptr)

然后每个线程开始启动,这里的启动还没拿取任务,所以是空闲的,设置了一个变量去记录它

对于 submitTask函数详解

Result ThreadPool::submitTask(std::shared_ptr<Task> sp)
{
	//获取锁
	std::unique_lock<std::mutex> lock(taskQueMtx_);
	//线程通信
	if (!notFull_.wait_for(lock, std::chrono::seconds(1),
		[&]()->bool {return taskQue_.size() < taskQueMaxThreadHold_; }))
	{
		std::cout << "提交超时" << std::endl;
		return Result(sp, false);
	}
	//如果有空余把任务放入
	taskQue_.emplace(sp);
	taskSize_++;

	//通知消费者消费
	notNull_.notify_all();

	//根据任务的数量和线程的数量,判断是否需要创建新的线程


	if (poolMode_ == PoolMode::MODE_CACHED
		&&taskSize_>idleThreadSize_
		&&curThreadSize_<threadSizeThresHold_)
	{
		//创建新线程
		auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this,std::placeholders::_1));
		int threadId = ptr->getThreadId();
		threads_.emplace(threadId, std::move(ptr));
		threads_[threadId]->start();
		curThreadSize_++;
	}

	return Result(sp);
}

这里的参数设置成了智能指针,因为我们没办法判断用户提交的任务是不是已经销毁了,如果是单纯的指针,那么就会出现野指针的情况

提交任务涉及了任务队列,所以我们需要保持该队列的线程安全,这里我们用互斥锁和条件变量进行操作

这里线程会拿到锁然后判断队列是不是满的,如果是满的那么该线程会进入等待,同时放出锁,让其他线程去抢,也就是每个线程,如果不是满的,那么就会往下走,放入用户提交的任务,同时通知消费者消费

然后就是模式的选择,如果是动态增长的就需要考虑线程数量的大小了

对于 线程函数的详解

ThreadPool::threadFunc 是线程池中的工作线程执行的函数,它的主要职责是从任务队列中取出任务并执行。同时,该函数还处理了线程的超时回收、线程池关闭等情况。

void ThreadPool::threadFunc(int threadid)
{
	auto lastTime = std::chrono::high_resolution_clock().now();

	for (;;)
	{
		std::shared_ptr<Task> task;
		{//获取锁
			std::unique_lock<std::mutex>lock(taskQueMtx_);

		
			
				//判断是不是超时返回,还是有任务阻塞
				while (taskQue_.size() == 0)
				{
					if (poolMode_ == PoolMode::MODE_CACHED)
					{
						if (std::cv_status::timeout ==
							notFull_.wait_for(lock, std::chrono::seconds(1)))
						{
							auto now = std::chrono::high_resolution_clock().now();
							auto dur = std::chrono::duration_cast<std::chrono::seconds>(now - lastTime);
							if (dur.count() >= 60 &&
								curThreadSize_ > initThreadSize_)
							{
								//开始回收当前线程 
								threads_.erase(threadid);
								curThreadSize_--;
								idleThreadSize_--;
								return;
							}
						}
					}
					else
					{
						notNull_.wait(lock);
					}
					//线程池要结束
					if (!isPoolRunning_)
					{
						threads_.erase(threadid);
						curThreadSize_--;
				 		idleThreadSize_--;
						exitCond_.notify_all();
						return;
					}
				}
			idleThreadSize_--;

			//消费任务
			task = taskQue_.front();
			taskQue_.pop();
			taskSize_--;

			//通知
			if (taskQue_.size() > 0)
			{
				notNull_.notify_all();
			}

			notFull_.notify_all();
		}
		if (task != nullptr)
		{
			task->exec(); 
		}
		idleThreadSize_++;
	}
	threads_.erase(threadid);
	curThreadSize_--;
	idleThreadSize_--;


	/*std::cout << "begin threadFunc" << std::endl;
	std::cout << std::this_thread::get_id() << std::endl;
	std:: cout<< "end threadFunc" << std::endl;*/
}

这里就是消费者的模型,同时涉及了两个模式的转换,还有一些小细节

当任务队列中有任务时,将空闲线程数量 idleThreadSize_ 减 1,表示当前线程不再空闲。然后从任务队列的头部取出一个任务,将其存储在 task 变量中,并将任务从队列中移除,同时减少任务数量 taskSize_

  • 如果任务队列中还有剩余任务,使用 notNull_.notify_all() 通知所有等待在 notNull_ 条件变量上的线程,表明任务队列中还有任务可供消费。
  • 使用 notFull_.notify_all() 通知所有等待在 notFull_ 条件变量上的线程,表明任务队列中有空闲空间,可以继续提交任务。

2.2 Thread类

线程类就是一个start函数比较重要


void Thread::start()
{
	std::thread t(func_,threadId_);
	t.detach();
}

这里我们设置了一个线程分离,因为如果不调用 t.detach(),并且 std::thread 对象 t 在其生命周期结束时线程还在运行,就会有问题。std::thread 对象在销毁时,会检查它所关联的线程是否还在运行。如果线程还在运行且没有被分离(即没有调用 detach),也没有调用 join 等待线程结束,那么程序会调用 std::terminate 来终止整个程序。

2.3  返回值的考虑

用户提交的任务千奇百怪,,所以我们需要用一个类型去把他们全部收起来,这里很容易想到继承,我们写一个父类Task,用户提交的任务继承父类,这样就很好解决了

但是这样还不行,因为有的用户需要返回值,那么我们就需要提供一个可以接收任意类型的类,模板?我一开始也想这个,这个显然不想,因为我们子类继承父类肯定要重写run函数,虚函数带模板肯定是不行的,因为模板不能正确实例化

这里给出代码

class Any
{
public:
	Any() = default;
	~Any() = default;
	//这里用了unique_ptr所以要
	Any(const Any&) = delete;
	Any& operator=(const Any&) = delete;
	Any(Any&&) = default;
	Any& operator=(Any&&) = default;


	template <class T>
	Any(T data):base_(std::make_unique<Derive<T>>(data))
	{}
private:
	class Base
	{
	public:
		virtual ~Base() = default;
	private:
	};

	template <class T>
	class Derive :public Base
	{
	public:
		Derive(T data)
		{
			data_ = data;
		}
		T data_;
	};

	template<class T>
	T cast_()
	{
		//从成员变量中提取data
		Derive<T>* pd = dynamic_cast<Derive<T>*>(base_.get());
		if (pd == nullptr)
		{
			throw"type is unmatch!";
		}
		return pd->data_;
	}


private:
	std::unique_ptr<Base> base_;
};

这里在Any类里面放了一个父类Base和子类Derive,这样就可以接收到各式各样的返回值了,同时Derive用模板,这样Derive就可以存储各种值了

模板函数 cast_:用于将存储在 Any 对象中的数据转换回原始类型。使用 dynamic_cast 进行类型转换,如果转换失败(即 pd 为 nullptr),则抛出一个异常,表示类型不匹配。

这里的cast_是为了获取返回值的类型,好返回给外界

Result类

这里为什么需要这个呢?

  • Any 类的局限性Any 类的核心功能是存储任意类型的数据,它本身并不具备对任务执行状态进行管理的能力。当向线程池提交一个任务时,任务可能因为各种原因(如任务队列已满导致提交超时)无法成功提交,或者处于正在执行、执行完成等不同状态,Any 无法表示这些状态信息。
  • Result 类的优势Result 类通过构造函数中的 isValid 参数,能够明确标识任务是否有效提交。在 ThreadPool::submitTask 函数中,如果任务提交超时,会创建一个 Result 对象并将 isValid 设为 false。调用者可以通过检查这个标志来判断任务是否成功提交,从而采取相应的处理措施。

还有就是,当用户在获取返回值的时候,可能线程还没执行完毕或者在阻塞,那么我们就需要根据这个情况去判断

Result 类使用了信号量(sem_)来实现线程同步。当调用 Result::get 方法时,如果任务还没有执行完,调用线程会被阻塞,直到任务执行完毕并设置了结果。这是通过信号量的 wait 操作实现的。而当任务执行完毕,调用 Result::setVal 方法设置结果时,会通过信号量的 post 操作释放资源,唤醒等待的线程,确保调用者能安全地获取任务结果。

这里的一些小细节就是Task类里面有一个Result指针,这样在任务执行完毕的时候,我们可以把返回值放到Result里面去,因为Result里面有一个Any类型,Task::run函数返回值也是any,这样我们就可以拿到返回值了

这里还有个重点,也算是我的心得

Task里面有个Result指针,这个指针的作用主要是用来给Result对象设置返回值的,因为我们submit函数里面有Result返回,在构造Result对象的时候,该对象里面有一个Task的指针指针,构造的时候就会构造Task,在Task的构造中会传入Result的指针,让他们两个建立联系

在 Result 类的构造函数中,会调用 task_->setResult(this) 方法,将当前 Result 对象的指针传递给 Task 对象,从而建立两者之间的关联

三  总结 

整个项目总共二百多行,里面的细节值得细细品味,整个项目的精髓也在里面,特别是Result类的作用,它怎么和Task联系起来的,这点很重要


http://www.kler.cn/a/568093.html

相关文章:

  • 一周一个Unity小游戏2D反弹球游戏 - 移动的弹板(鼠标版)
  • Java 9模块与Maven的深度结合
  • Deepseek 开源周第一天:FlashMLA
  • 2025年证券从业资格考试报名全流程图解✅
  • 期权帮|国内期权交易投资人做卖出期权价差交易收取的保证金是单边的还是双向的?
  • OpenWebUI配置异常的外部模型导致页面无法打开
  • C# 牵手DeepSeek:打造本地AI超能力
  • 杰发科技AC7801——滴答定时器获取时间戳
  • 2025春新生培训数据结构(树,图)
  • HTML 日常开发常用标签
  • RabbitMQ—保障消费者的可靠性和机制与策略
  • 【Vue教程】使用Vite快速搭建前端工程化项目 Vue3 Vite Node.js
  • 怎么写C#命令行参数程序,及控制台带参数案例(程序完整源码)下载
  • 【造个轮子】使用Golang实现简易令牌桶算法
  • 数据库测试
  • 蓝桥杯---归并排序算法题目(leetcode第912题)
  • 【SQL】MySQL中的字符串处理函数:concat 函数拼接字符串,COALESCE函数处理NULL字符串
  • 供应链管理系统--升鲜宝门店收银系统功能解析,登录、主界面、会员 UI 设计图(一)
  • 构建神经网络之常用pandas(补充中 )
  • JEEWMS departController.do存在SQL注入(DVB-2025-8837)