Linux学习之路 -- 线程 -- 线程池
前面介绍了条件变量的生产消费模型,下面介绍一下条件变量的另一个用法,那就是线程池。线程池的用法其实就是先创建一批线程,然后让这些线程从任务队列中取数据。具体就是生产消费者模型,(我的代码中生产线程只有一个并且生产的任务比较简单,如果有需要可以自行设计添加)
目录
1、示例代码
<1>ThreadModule.hpp
<2>Threadpool.hpp
<3>Log.hpp
<4>Task.hpp
<5>Main.cc
2、日志
<1>日志等级
<2>日志时间
<3>日志内容
<4>日志宏
<5>日志打印
1、示例代码
<1>ThreadModule.hpp
#pragma once
#ifndef __THREAD_HPP__
#define __THREAD_HPP__
#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>
namespace ThreadModule
{
using func_t = std::function<void(std::string name)>;
// typedef std::function<void(const T&)> func_t;
class Thread
{
public:
void Excute()
{
_func(_threadname);
}
public:
Thread(func_t func, const std::string name="none-name")//右值
: _func(func), _threadname(name), _stop(true)
{}
static void *threadroutine(void *args) // 类成员函数,形参是有this指针的!!
{
Thread *self = static_cast<Thread *>(args);
self->Excute();
return nullptr;
}
bool Start()
{
int n = pthread_create(&_tid, nullptr, threadroutine, this);
if(!n)
{
_stop = false;
return true;
}
else
{
return false;
}
}
void Detach()
{
if(!_stop)
{
pthread_detach(_tid);
}
}
void Join()
{
if(!_stop)
{
pthread_join(_tid, nullptr);
}
}
std::string name()
{
return _threadname;
}
void Stop()
{
_stop = true;
}
~Thread() {}
private:
pthread_t _tid;
std::string _threadname;
func_t _func;
bool _stop;
};
}
#endif
<2>Threadpool.hpp
#pragma once
#include <iostream>
#include <queue>
#include <vector>
#include "pthread.h"
#include "Log.hpp"
#include "unistd.h"
#include "ThreadMode.hpp"
using namespace ThreadModule;
static int pthread_num = 4;
template <class T>
class Threadpool
{
private:
void Lock()
{
pthread_mutex_lock(&glock);
}
void Unlock()
{
pthread_mutex_unlock(&glock);
}
void Thread_Sleep()
{
pthread_cond_wait(&gcond, &glock);
}
void Thread_wakeup()
{
pthread_cond_signal(&gcond);
}
void Thread_wakeAll()
{
pthread_cond_broadcast(&gcond);
}
public:
Threadpool(int cap = pthread_num) : _cap(cap), _isrunning(false)
{
pthread_mutex_init(&glock, nullptr);
pthread_cond_init(&gcond, nullptr);
}
void Handler_Task(std::string threadname)
{
while (true)
{
Lock();
// 任务队列为空,并且在运行
while (_task_pool.empty() && _isrunning)
{
_wait_num++;
Thread_Sleep();
LOG(INFO, "%s wake up",threadname.c_str())
_wait_num--;
}
// 任务队列为空,并且不运行了
if (_task_pool.empty() && !_isrunning)
{
Unlock();
break;
}
// 任务队列不为空
LOG(DEBUG,"%s gain task", threadname.c_str())
T t = _task_pool.front();
_task_pool.pop();
Unlock();
t();
LOG(INFO, "task done,result is %s", t.Result().c_str())
sleep(1);
// 执行自己的任务
}
}
void Stop()
{
Lock();
_isrunning = false;
Thread_wakeAll();
Unlock();
}
void Enqueue(const T &task)
{
Lock();
_task_pool.push(task);
if(_wait_num > 0)//如果有线程在等,直接唤醒线程
{
Thread_wakeup();
}
Unlock();
}
void Init_Threadpool()
{
for (int i = 0; i < _cap; i++)
{
std::string name = "thread - " + std::to_string(i + 1);
_thread_pool.emplace_back(Thread(std::bind(&Threadpool::Handler_Task, this, std::placeholders::_1), name));
LOG(INFO, "%s init success..." , name.c_str())
}
_isrunning = true;
}
void Start()
{
for (auto &e : _thread_pool)
{
LOG(DEBUG,"%s is running..." , e.name().c_str())
e.Start();
}
}
void Join()
{
for (auto &e : _thread_pool)
{
usleep(1500);
LOG(DEBUG, "%s is quiting...", e.name().c_str())
e.Join();
}
}
~Threadpool()
{
pthread_mutex_destroy(&glock);
pthread_cond_destroy(&gcond);
}
private:
std::vector<Thread> _thread_pool;
std::queue<T> _task_pool; // 任务队列
int _cap; // 线程数量
pthread_mutex_t glock;
pthread_cond_t gcond;
// 等待线程数
int _wait_num;
bool _isrunning; // 线程池是否在跑
};
<3>Log.hpp
#pragma once
#include <iostream>
#include <string>
#include <cstdio>
#include <unistd.h>
#include <stdarg.h>
#include <time.h>
#include <pthread.h>
#include <fstream>
enum Level
{
INFO = 0,
DEBUG,
WARNING,
ERROR,
FATAL
};
std::string Level_tostring(int level)
{
switch (level)
{
case INFO:
return "INFO";
case DEBUG:
return "DEBUG";
case WARNING:
return "ERROR";
case ERROR:
return "ERROR";
case FATAL:
return "FATAL";
default:
return "Unkown";
}
}
pthread_mutex_t _glock = PTHREAD_MUTEX_INITIALIZER;
bool _is_save = false;
const std::string filename = "log.txt";
void SaveLog(const std::string context)
{
std::ofstream infile;
infile.open(filename,std::ios::app);
if(!infile.is_open())
{
std::cout << "open file failed" << std::endl;
}
else
{
infile << context;
}
infile.close();
}
std::string Gettime()
{
time_t cur_time = time(NULL);
struct tm *time_data = localtime(&cur_time);
if (time_data == nullptr)
{
return "None";
}
char buffer[1024];
snprintf(buffer, sizeof(buffer), "%d-%d-%d %d:%d:%d",
time_data->tm_year + 1900,
time_data->tm_mon + 1,
time_data->tm_mday,
time_data->tm_hour,
time_data->tm_min,
time_data->tm_sec);
return buffer;
}
void LogMessage(std::string filename, int line, bool issave, int level, const char *format, ...)
{
std::string levelstr = Level_tostring(level);
std::string time = Gettime();
// 可变参数
char buffer[1024];
va_list args;
va_start(args, format);
vsnprintf(buffer, sizeof(buffer), format, args);
va_end(args);
std::string context = "[" + levelstr + "]" + "[" + time + "]" + "[" + "line : " + std::to_string(line) + "]" + "[" + filename + "]" + ": " + buffer;
pthread_mutex_lock(&_glock);
if(!issave)
{
std::cout << context << std::endl;
}
else{
SaveLog(context);
}
pthread_mutex_unlock(&_glock);
}
#define LOG(level, format, ...) \
do \
{ \
LogMessage(__FILE__, __LINE__, _is_save, level, format, ##__VA_ARGS__); \
} while (0);
#define EnableFile() \
do \
{ \
_is_save = true; \
} while (0);
#define EnableScreen() \
do \
{ \
_is_save = false;\
} while (0);
<4>Task.hpp
#pragma once
#include <iostream>
using namespace std;
class task
{
public:
task(int a, int b) : _a(a), _b(b)
{
}
std::string Result()
{
return to_string(_a) + "+" + to_string(_b) + "=" + to_string(_result);
}
void Excute()
{
_result = _a + _b;
}
void operator()()
{
Excute();
}
~task()
{}
private:
int _a;
int _b;
int _result;
};
<5>Main.cc
#include <iostream>
#include "Threadpool.hpp"
#include "Log.hpp"
#include <memory>
#include <time.h>
#include <stdlib.h>
#include "Task.hpp"
int main()
{
EnableScreen();
std::unique_ptr<Threadpool<task>> pool = std::make_unique<Threadpool<task>>(5);//c++14语法
pool->Init_Threadpool();
pool->Start();
srand(time(NULL) ^ getpid() ^ pthread_self());
int tasknum = 10;
while (tasknum--)
{
int a = rand() % 4 + 2;
int b = rand() % 3 + 5;
usleep(1200);
pool->Enqueue(task(a, b));
LOG(INFO, "task has pushed into queue,a is %d, b is %d", a,b)
}
pool->Stop();
pool->Join();
return 0;
}
2、日志
下面着重介绍一下日志的编写,线程池的其余部分都是生产消费模型的代码基础之上改进而来,代码量也不大,所以不详细介绍。日志是较为特殊,所以这里单独介绍一下。
<1>日志等级
日志实际上是需要分等级的,用于标明当前代码的执行情况。这里我设置五个等级
从上到下依次,正常,测试,警告,错误,致命。这几个等级表明了日志的级别,在打印日志时,我们需要手动设定(在示例代码中)
<2>日志时间
日志的时间也是日志的一项重要数据,所以这里就涉及了当前时间的获取。我们可以先设置当前时间的时间戳类型time_t变量,然后通过localtime函数转成当地时间(不用担心时区问题,这里会自行设置),头文件均是time.h
我们将设定的time_t的变量取地址传入time函数中,由于该函数的参数是输出型参数,所以time_t变量的值就变成了当前时间戳,再将该变量传入localtime函数中,此时函数就会返回一个struct tm* 类型的指针,该结构体的成员如下图。
该结构体会自行将时间戳转换成当前时间。需要注意的是,这里换算的月份会少一个月,年数会少1900年,所以我们在最后输出时需要注意加上去。
<3>日志内容
由于日志内容是不确定,所以这里我们需要使用可变参数进行调整。下面介绍一下如何提取可变参数中的内容。
首先我们需要先定义一个va_list 类型的变量,然后再用va_start对该变量进行初始化,初始化完毕后再一个一个可变参数中的内容。提取完成后,再用va_end对va_list类型的变量进行清除。
va_start 有两个参数,第一个是我们定义的va_list变量,第二个就是在可变参数前的最后一个参数,也就是"..."前面一个参数。
然后我们可以使用va_arg对va_list列表中的内容进行提取,第一个参数还是va_list变量,第二个参数是需要提取的数据类型(假设要提取的是int,就填int)。提取一个该类型的数据后,该函数就会返回提取的数据(带类型的)。
va_end只有一个参数,我们传入开始定义的va_list变量,对其进行清空即可。
虽然上述的方法能够提取可变参数中的数据,但是我们最终的目的就是将可变参数的数据变成字符串打印到显示器上。所以我们这里还可以使用别的方式,也就是vsnprintf。
这几个函数的功能都差不过,这里不详细介绍,我们介绍完vsnprintf,其他的也就好理解了。这个vsprintf其实就是将va_list变量中的可变参数数据按照特定格式写入到大小为size的str缓冲区中。这里的format是可变参数前的最后一个数据(也就是...前的最后一个参数)。
<4>日志宏
如果我们打日志时,都需要在不断调用函数,那样太累了。所以这里我们可以使用宏来对函数进行替换。
这里我们用LOG替换LogMessage函数,其中__FILE__会自动显示当前日志函数所在的文件名称,__LINE__会自动显示当前日志函数所在的行数(_is_save稍后介绍),我们在使用宏时,只需传入level,和可变参数部分即可。然后需要注意的是,宏替换的LogMessage的可变参数部分不能也是...,我们必须使用__VA_ARGS__来表示,其中在该宏之前,我们还要加上##,否则在可变参数部分为零时,会出现问题。最后建议这里用do{} while(0)结构对函数进行包装,这样我们在使用宏替换时,替换的是一整个代码块,不容易出现问题。
<5>日志打印
日志不仅要能向显示器中打印,还得能向文件中打印。其中,上图的_is_save参数就是用于表示向哪里打印。该变量我设置成了全局变量,通过对其修改,就可以实现向不同的地方打印。(这里还有别的方式,如有需要,自行补充)
在这里,我设置了两个宏,用于调整_is_save的值,如果_is_save为false就是向显示器打印,为true就向文件中打印。在LogMessage函数内部会对该变量进行判断后决定向哪里打印。
注:打印部分最好加锁,显示器和文件在线程池这里也是临界资源
以上就是所有内容,文中如有不对之处,还望指出,谢谢!!!