【线程】线程池
线程池通过一个线程安全的阻塞任务队列加上一个或一个以上的线程实现,线程池中的线程可以从阻塞队列中获取任务进行任务处理,当线程都处于繁忙状态时可以将任务加入阻塞队列中,等到其它的线程空闲后进行处理。
线程池作用:
1.降低资源消耗:通过重用已经创建的线程来降低线程创建和销毁的消耗
2.提高线程的可管理性:线程池可以统一管理、分配、调优和监控
3.降低程序的耦合程度: 提高程序的运行效率
4.多线程程序的运行效率, 是一个正态分布的结果, 线程数量从1开始增加, 随着线程数量的增加, 程序的运行效率逐渐变高, 直到线程数量达到一个临界值, 当在增加线程数量时, 程序的运行效率会减小(主要是由于频繁线程切换影响线程运行效率),所以并不是创建的线程越多性能越高
下面利用原生线程库来实现线程池
大家可以拷贝到VS Code下来看
代码中的一个解释
main.cc
#include<iostream>
#include<unistd.h>
#include<time.h>
#include"threadpool.hpp"
using namespace std;
int main()
{
srand(time(nullptr) ^ getpid());
ThreadPool<Task>* tp=new ThreadPool<Task>;
tp->Start();
while(true)
{
//1. 构建任务
int x = rand() % 10 + 1;
usleep(10);
int y = rand() % 5;
char op = opers[rand()%opers.size()];
Task t(x, y, op);
tp->Push(t);
//2. 交给线程池处理
std::cout << "main thread make task: " << t.GetTask() << std::endl;
sleep(1);
}
return 0;
}
threadpool.hpp
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <pthread.h>
#include "task.hpp"
using namespace std;
struct ThreadInfo
{
pthread_t tid;
string name;
};
template <class T>
class ThreadPool
{
public:
void Lock()
{
pthread_mutex_lock(&mutex_);
}
void Unlock()
{
pthread_mutex_unlock(&mutex_);
}
void Threadsleep()
{
pthread_cond_wait(&cond_, &mutex_);
}
void Wakeup()
{
pthread_cond_signal(&cond_);
}
bool IsQueueEmpty()
{
return tasks_.empty();
}
string GetThreadName(pthread_t tid)
{
for (const auto &ti : threads_)
{
if (ti.tid == tid)
return ti.name;
}
return "None";
}
public:
ThreadPool(int num = 5) : threads_(num)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
static void *HandlerTask(void *args) // 必须定义为静态函数,这样才不会传this指针过来,才不会导致函数不匹配问题
{
ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
string name=tp->GetThreadName(pthread_self());
while(true)
{
tp->Lock();
while(tp->IsQueueEmpty())
{
tp->Threadsleep();
}
//消费任务
T t=tp->Pop();
tp->Unlock();
//处理任务
t();
cout <<name << " run, "<< "result: " << t.GetResult() <<endl;
}
return nullptr;
}
void Start()
{
int size = threads_.size();
for (int i = 0; i < size; i++)
{
threads_[i].name= "thread-" + to_string(i + 1);
pthread_create(&(threads_[i].tid), nullptr, HandlerTask, this); // 传入this指针,使静态函数可以访问类内成员和函数
}
}
T Pop()
{
T out=tasks_.front();
tasks_.pop();
return out;
}
void Push(const T& in)
{
//需要加锁
Lock();
tasks_.push(in);
Wakeup();
Unlock();
}
~ThreadPool()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
private:
vector<ThreadInfo> threads_; // 用数组管理创建出来的线程
queue<T> tasks_; // 用队列来管理任务
pthread_mutex_t mutex_;
pthread_cond_t cond_;
};
Task.hpp
#pragma once
#include <iostream>
#include <string>
using namespace std;
string opers="+-*/%";
enum
{
Divzero = 1,
Modzero,
Unknown
};
class Task
{
public:
Task()
{}
Task(int data1, int data2, char op) : _data1(data1), _data2(data2), _op(op), _result(0), _exitcode(0)
{}
void run()
{
switch (_op)
{
case '+':
{
_result = _data1+_data2;
break;
}
case '-':
{
_result = _data1-_data2;
break;
}
case '*':
{
_result = _data1*_data2;
break;
}
case '/':
{
if (_data2 == 0) _exitcode = Divzero;
else _result = _data1/_data2;
break;
}
case '%':
{
if (_data2 == 0) _exitcode = Modzero;
else _result = _data1%_data2;
break;
}
default:
{
_exitcode=Unknown;
break;
}
}
}
void operator()()
{
run();
}
string GetResult()
{
string r=to_string(_data1);
r+=_op;
r+=to_string(_data2);
r+='=';
r+=to_string(_result);
r+='[';
r+=to_string(_exitcode);
r+=']';
return r;
}
string GetTask()
{
string r=to_string(_data1);
r+=_op;
r+=to_string(_data2);
r+="=?";
return r;
}
private:
int _data1;
int _data2;
char _op;
int _result;
int _exitcode;
};