线程池实现
目录
- 1.1线程池概念
- 1.2线程池实现
1.1线程池概念
线程池(Thread Pool)是一种并发编程中常用的技术,用于管理和重用线程。它由线程池管理器、工作队列和线程池线程组成。
他是一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
- 线程池的应用场景:
-
- 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技
术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个
Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
- 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技
-
- 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
-
- 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误.
1.2线程池实现
- 创建固定数量线程池,循环从任务队列中获取任务对象,
- 获取到任务对象后,执行任务对象中的任务接口
下面并不是完全的写完的代码,下面只是写了构造函数析构函数和线程池启动等接口还有些功能还没有写,因为前面这些比较简单,所以就直接跳过了,后面文章会把线程池慢慢完善
ThreadPool.hpp 代码
#pragma once
#include<iostream>
#include<queue>
#include<cstdlib>
#include<pthread.h>
#include<assert.h>
#include <memory>
#include <unistd.h>
#include <sys/prctl.h>
using namespace std;
int gThreadNum = 5;
template<class T>
class ThreadPool
{
public:
ThreadPool(int _threadNum = gThreadNum)
:_isStart(false)
{
assert(_threadNum > 0);
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_cond,nullptr);
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
public:
//类内成员, 成员函数,都有默认参数this
void *threadRoutine(void*arg)
{
while(1)
{
sleep(1);
cout<< "pthread{" <<pthread_self() <<"}runing.."<< endl;
}
}
void start()
{
assert(!_isStart);
for(int i = 0;i < gThreadNum;i++)
{
pthread_t temp;
pthread_create(&temp,nullptr,threadRoutine,nullptr);//后面的nullptr后面会改的
}
_isStart = true;
}
void push(const T& in)
{
}
private:
bool _isStart;
int _threadNum;
queue<T> _taskQueue;//任务队列
pthread_mutex_t _mutex;
pthread_cond_t _cond;
};
ThreadPool.cc
#include"ThreadPool.hpp"
int main()
{
unique_ptr<ThreadPool<int>> tp(new ThreadPool<int>());
tp->start();
while(true)
{
sleep(1);//先不让主函数工作先
}
return 0;
}
这个时候让我们来运行一下代码
可以看到报错了 下面报错的意思是类型转化失败了
因为threadRoutine这个是类内成员, 成员函数,都有默认参数this
所以那个函数看起来是传了一个参数其实是传了二个参数,所以我们要定义为static,而又因为static的成员函数是没办法直接访问类内的所以我们要定义接口来给他访问,所以要改成下面这个样子
start函数里面
pthread_create(&temp,nullptr,threadRoutine,this);
的意思创建一个线程把this传进threadRoutine函数里面的void*arg里面,所以我们
ThreadPool p = static_cast<ThreadPool>(arg);
就能得到线程池里面的对象
//类内成员, 成员函数,都有默认参数this
static void *threadRoutine(void*arg)
{
ThreadPool<T> *p = static_cast<ThreadPool<T>*>(arg);
while(1)
{
sleep(1);
cout<< "pthread{" <<pthread_self() <<"}runing.."<< endl;
}
}
void start()
{
assert(!_isStart);
for(int i = 0;i < gThreadNum;i++)
{
pthread_t temp;
pthread_create(&temp,nullptr,threadRoutine,this);
}
_isStart = true;
}
再运行就可以看到没有问题了
因为要封装,所以接口私有化
push函数 //负责插入任务
void push(const T& in)
{
lockQueue();
_taskQueue.push(in);
choiceThreadForHandler();
unlockQueue();
}
private:
void lockQueue() { pthread_mutex_lock(&_mutex); }
void unlockQueue() { pthread_mutex_unlock(&_mutex); }
void choiceThreadForHandler() { pthread_cond_signal(&_cond); }
补全剩下的函数实现因为比较简单就直接全部放上来了
因为我是打算ThreadPool.hpp这个线程池负责让线程抢任务,主函数负责给线程发任务,我们简单点就搞个Task.hpp负责生产算术题来生产任务,Log.hpp负责打印
ThreadPool.hpp
#pragma once
#include <iostream>
#include <cassert>
#include <queue>
#include <memory>
#include <cstdlib>
#include <pthread.h>
#include <unistd.h>
#include <sys/prctl.h>
#include"Log.hpp"
using namespace std;
int gThreadNum = 5;
template<class T>
class ThreadPool
{
public:
ThreadPool(int _threadNum = gThreadNum)
:_isStart(false)
{
assert(_threadNum > 0);
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_cond,nullptr);
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
public:
//类内成员, 成员函数,都有默认参数this
static void *threadRoutine(void*args)
{
pthread_detach(pthread_self());
ThreadPool<T> *tp = static_cast<ThreadPool<T>*>(args);
while (1)
{
tp->lockQueue();
while (!tp->haveTask())
{
tp->waitForTask();
}
//这个任务就被拿到了线程的上下文中
T t = tp->pop();
tp->unlockQueue();
// for debug
int one, two;
char oper;
t.get(&one, &two, &oper);
//规定,所有的任务都必须有一个run方法
Log() << "新线程完成计算任务: " << one << oper << two << "=" << t.run() << "\n";
}
}
void start()
{
assert(!_isStart);
for (int i = 0; i < gThreadNum; i++)
{
pthread_t temp;
pthread_create(&temp, nullptr, threadRoutine, this);
}
_isStart = true;
}
void push(const T& in)
{
lockQueue();
_taskQueue.push(in);
choiceThreadForHandler();
unlockQueue();
}
private:
void lockQueue() { pthread_mutex_lock(&_mutex); }
void unlockQueue() { pthread_mutex_unlock(&_mutex); }
bool haveTask() { return (!_taskQueue.empty()); }
void waitForTask() { pthread_cond_wait(&_cond, &_mutex); }
void choiceThreadForHandler() { pthread_cond_signal(&_cond); }
T pop()
{
T temp = _taskQueue.front();
_taskQueue.pop();
return temp;
}
private:
bool _isStart;
int _threadNum;
queue<T> _taskQueue;
pthread_mutex_t _mutex;
pthread_cond_t _cond;
};
Log.hpp
#pragma once
#include <iostream>
#include <ctime>
#include <pthread.h>
std::ostream &Log()
{
std::cout << "Fot Debug |" << " timestamp: " << (uint64_t)time(nullptr) << " | " << " Thread[" << pthread_self() << "] | ";
return std::cout;
}
Task.hpp
#pragma once
#include <iostream>
#include <string>
class Task
{
public:
Task() : elemOne_(0), elemTwo_(0), operator_('0')
{
}
Task(int one, int two, char op) : elemOne_(one), elemTwo_(two), operator_(op)
{
}
int operator() ()
{
return run();
}
int run()
{
int result = 0;
switch (operator_)
{
case '+':
result = elemOne_ + elemTwo_;
break;
case '-':
result = elemOne_ - elemTwo_;
break;
case '*':
result = elemOne_ * elemTwo_;
break;
case '/':
{
if (elemTwo_ == 0)
{
std::cout << "div zero, abort" << std::endl;
result = -1;
}
else
{
result = elemOne_ / elemTwo_;
}
}
break;
case '%':
{
if (elemTwo_ == 0)
{
std::cout << "mod zero, abort" << std::endl;
result = -1;
}
else
{
result = elemOne_ % elemTwo_;
}
}
break;
default:
std::cout << "非法操作: " << operator_ << std::endl;
break;
}
return result;
}
int get(int *e1, int *e2, char *op)
{
*e1 = elemOne_;
*e2 = elemTwo_;
*op = operator_;
}
private:
int elemOne_;
int elemTwo_;
char operator_;
};
主函数
#include"ThreadPool.hpp"
#include"Task.hpp"
#include<ctime>
int main()
{
const string operators = "+/*/%";
unique_ptr<ThreadPool<Task> > tp(new ThreadPool<Task>());
tp->start();
srand((unsigned long)time(nullptr) ^ getpid() ^ pthread_self());
// 派发任务的线程
while(true)
{
int one = rand()%50;
int two = rand()%10;
char oper = operators[rand()%operators.size()];
Log() << "主线程派发计算任务: " << one << oper << two << "=?" << "\n";
Task t(one, two, oper);
tp->push(t);
sleep(1);
}
return 0;
}
程序运行结果
可以看到和我们想的思路一样,主线程发任务,新线程抢任务并完成任务