[多线程]基于单例懒汉模式的线程池的实现
标题:[多线程]基于单例懒汉模式的线程池的实现
@水墨不写bug
文章目录
- 一、锁的RAII设计
- 二、任务类举例
- 三、日志系统的设计
- 四、基于POSIX线程封装自己的线程模块
- 设计要点
- 主要功能
- 使用示例
- 五、基于懒汉模式的线程池主体逻辑设计
- 类 `ThreadPool` 的成员函数详解
- 静态成员初始化
- 六、总结
本文我们将要实现一个线程池,其中用到的一些模块如LockGuard,Log等在之前的文章中有所讲述,所以本文就不再赘述。
一、锁的RAII设计
#pragma once
#include <pthread.h>
// 使用锁需要频繁调用系统调用,十分麻烦
// 于是实现锁的RAII设计
class LockGuard
{
private:
pthread_mutex_t *GetMutex() { return _mutex; }
public:
LockGuard(pthread_mutex_t *mutex)
: _mutex(mutex)
{
pthread_mutex_lock(_mutex);
}
~LockGuard()
{
pthread_mutex_unlock(_mutex);
}
private:
pthread_mutex_t *_mutex;
};
/*
*之前的理解错误了:不需要init和destroy,因为锁本身就是存在的!
*锁在一般情况下会内置在类的内部,需要使用(加锁)的时候,把锁的地址传进来就行了
*在构造函数内加锁,析构函数内解锁。
*e.g.
while(ture)
{
LockGuard lockguard(td->_mutex);
if(tickets > 0)
{
//抢票
}
else
{
break;
}
}
*while内每一次进行循环,都需要创建一个新的锁,创建即加锁,if代码块结束即为解锁
*/
/*
void UnLock() { pthread_mutex_unlock(&_mutex); }
void Lock() { pthread_mutex_lock(&_mutex); }
*/
以上代码是基于RAII(Resource Acquisition Is Initialization)
设计模式的锁管理类LockGuard
。
该类用于简化对pthread库
的互斥锁(pthread_mutex_t)
的使用,确保在加锁和解锁过程中的安全性和简便性。
二、任务类举例
#pragma once
#include <functional>
#include <iostream>
using std::cout;
using std::endl;
// void DownLoad()
// {
// cout<<"I am a DownTask"<<endl;
// }
// 任务类,包含各种任务
class Task
{
private:
using task_t = std::function<void()>;
void Excute()
{
_result = _x + _y;
}
public:
Task()
: _x(0), _y(0)
{
}
Task(int a, int b)
: _x(a), _y(b)
{
}
void operator()()
{
Excute();
}
void Debug()
{
cout<<"Debug : "<<_x<<" + "<<_y<<" = "<<_result<<endl;
}
private:
int _x;
int _y;
int _result;
};
这个类封装了一个具体的任务:两数相加。
尽管这是一项简单的任务(这样设计是为了便于理解线程池的部分),但是其实任务类可以根据用户的需要来自行设计,可以是下载资源,请求网页html
等。
任务类的设计目的就是一个演示作用。
三、日志系统的设计
#pragma once
#include <string>
#include <iostream>
#include <unistd.h>
#include <sys/types.h>
#include <ctime>
#include <cstdarg>
#include <fstream>
#include <cstring>
#include "LockGuard.hpp"
namespace log_ddsm
{
using std::cout;
using std::endl;
// 日志信息管理
enum LEVEL
{
DEBUG = 1, // 调试信息
INFO = 2, // 提示信息
WARNING = 3, // 警告
ERROR = 4, // 错误,但是不影响服务正常运行
FATAL = 5, // 致命错误,服务无法正常运行
};
enum SIZE
{
TIMESIZE = 128,
LOGSIZE = 1024,
FILE_TYPE_LOG_SIZE = 2048
};
enum TYPE
{
SCREEN_TYPE = 8,
FILE_TYPE = 16
};
// 默认日志文件名称
const char *DEFAULT_LOG_NAME = "./log.txt";
// 全局锁,保护打印日志
pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;
struct logMessage
{
std::string _level; // 日志等级
int _level_num; // 日志等级的int格式
pid_t _pid; // 这条日志的进程id
std::string _file_name; // 文件名
int _file_number; // 行号
std::string _cur_time; // 当时的时间
std::string _log_info; // 日志正文
};
// 通过int获取日志等级
std::string getLevel(int level)
{
switch (level)
{
case 1:
return "DEBUG";
case 2:
return "INFO";
case 3:
return "WARNING";
case 4:
return "ERROR";
case 5:
return "FATAL";
default:
return "UNKNOWN";
}
}
std::string getCurTime()
{
time_t cur = time(nullptr);
struct tm *curtime = localtime(&cur);
char buf[TIMESIZE] = {0};
snprintf(buf, sizeof(buf), "%d-%d-%d %d:%d:%d",
curtime->tm_year + 1900,
curtime->tm_mon + 1,
curtime->tm_mday,
curtime->tm_hour,
curtime->tm_min,
curtime->tm_sec);
return buf;
}
class Log
{
private:
void FlushMessage(const logMessage &lg)
{
// 互斥锁,保护Print过程
LockGuard lockguard(&gmutex);
// 过滤逻辑
// 致命错误到文件逻辑
if (lg._level_num >= _ignore_level)
{
if (_print_type == SCREEN_TYPE)
PrintToScreen(lg);
else if (_print_type == FILE_TYPE)
PrintToFile(lg);
else
std::cerr << __FILE__ << " " << __LINE__ << ":" << " UNKNOWN_TYPE " << std::endl;
}
}
void PrintToScreen(const logMessage &lg)
{
printf("[%s][%d][%s][%d][%s] %s", lg._level.c_str(),
lg._pid,
lg._file_name.c_str(),
lg._file_number,
lg._cur_time.c_str(),
lg._log_info.c_str());
}
void PrintToFile(const logMessage &lg)
{
std::ofstream out(_log_file_name, std::ios::app); // 追加打印
if (!out.is_open())
{
std::cerr << __FILE__ << " " << __LINE__ << ":" << " LOG_FILE_OPEN fail " << std::endl;
return;
}
char log_txt[FILE_TYPE_LOG_SIZE] = {0}; // 缓冲区
snprintf(log_txt, sizeof(log_txt), "[%s][%d][%s][%d][%s] %s", lg._level.c_str(),
lg._pid,
lg._file_name.c_str(),
lg._file_number,
lg._cur_time.c_str(),
lg._log_info.c_str());
out.write(log_txt, strlen(log_txt)); // 写入文件
}
public:
// 打印方式——默认向显示器打印
// 如果选择文件类型打印,默认的文件名称是当前目录的log.txt
//
Log(int print_type = SCREEN_TYPE)
: _print_type(print_type), _log_file_name(DEFAULT_LOG_NAME), _ignore_level(DEBUG)
{
}
// 只有一个全局对象,多个构造函数多余,不如设计一个构造,添加一个设置log文件名称的接口
// Log(int print_type, const char *logname = DEFAULT_LOG_NAME)
// : _print_type(print_type), _log_file_name(logname), _ignore_level(1)
// {
// }
void Enable(int type)
{
_print_type = type;
}
/// @brief 加载日志信息
/// @param format 格式化输出
/// @param 可变参数
/*区分了本层和商城之后,就容易设置参数了*/
void load_message(int level, std::string filename, int filenumber, const char *format, ...)
{
logMessage lg;
lg._level = getLevel(level);
lg._level_num = level;
lg._pid = getpid();
lg._file_name = filename;
lg._file_number = filenumber;
lg._cur_time = getCurTime();
// valist + vsnprintf 处理可变参数
va_list ap;
va_start(ap, format);
char log_info[LOGSIZE] = {0};
vsnprintf(log_info, sizeof(log_info), format, ap);
va_end(ap);
lg._log_info = log_info;
// 打印逻辑
FlushMessage(lg);
}
/// @brief
/// @param ignorelevel
/* DEBUG = 1, 调试信息
*INOF = 2, 提示信息
*WARNING = 3, 警告
*ERROR = 4, 错误,但是不影响服务正常运行
*FATAL = 5, 致命错误,服务无法正常运行 */
void SetIgnoreLevel(int ignorelevel)
{
_ignore_level = ignorelevel;
}
void SetLogFileName(const char *newlogfilename)
{
_log_file_name = newlogfilename;
}
~Log() {}
private:
int _print_type;
std::string _log_file_name;
int _ignore_level;
};
// 全局的Log对象
Log lg;
// 打印日志信息
// e.g.
// LOG(DEBUG,"this is a test message %d %f %c", 13 , 9.81 , 'A' );
//
// 使用日志的一般格式
#define LOG(Level, Format, ...) \
do \
{ \
lg.load_message(Level, __FILE__, __LINE__, Format, ##__VA_ARGS__); \
} while (0)
/// 无法设计为inlne——__VA_ARGS只能出现在宏替换中
// inline void LOG(int level,const char* format, ...)
// {
// lg.load_message(level,__FILE__,__LINE__,format,__VA_ARGS__);
// }
// 往文件打印
#define ENABLE_FILE() \
do \
{ \
lg.Enable(FILE_TYPE); \
} while (0)
// 往显示器打印
#define ENABLE_SCREEN() \
do \
{ \
lg.Enable(SCREEN_TYPE); \
} while (0)
//设置日志忽略等级
#define SET_IGNORE_LEVEL(Level) \
do \
{ \
lg.SetIgnoreLevel(Level) \
} while (0)
//设置日志文件名称
#define SET_LOG_FILENAME(Name) \
do \
{ \
lg.SetLogFileName(Name) \
} while (0)
}; // namespace log_ddsm
以上是日志系统的设计,具体的讲述请参考我之前的这篇文章:
《[HTTP协议]应用层协议HTTP从入门到深刻理解并落地部署自己的云服务(2)实操部署》:
https://blog.csdn.net/2301_79465388/article/details/146114051?spm=1001.2014.3001.5501
四、基于POSIX线程封装自己的线程模块
#pragma once
#include <pthread.h>
#include <vector>
#include <functional>
#include <iostream>
#include <cassert>
#include <cstring>
#include "Log.hpp"
using namespace log_ddsm;
// 理解C++11做的工作,封装自己的原生线程
// 创建线程,执行任务
namespace ThreadMoudle
{
// 支持模板
using func_t = std::function<void(const std::string &name)>;
// 线程控制-自主管理原生线程
class Thread
{
private:
// 线程创建后例行执行的函数
static void *Routine(void *args) // 设置为static,防止this指针干扰类型匹配
{
Thread *self = static_cast<Thread *>(args);
self->Excute();
return nullptr;
}
void Excute()
{
_isrunning = true;
_func(_name);
_isrunning = false;
}
public:
/// @brief 构造线程
/// @param name 线程名
/// @param func 线程要执行的任务
Thread(const std::string &name, func_t func)
: _name(name), _tid(), _isrunning(false), _func(func)
{
}
bool Start()
{
int res = ::pthread_create(&_tid, nullptr, Routine, this); // 要求类型:void*(void*)——有this指针导致类型不匹配
if (res != 0)
{
LOG(ERROR, "pthread_create failed with error: %s\n", strerror(res));
// std::cerr << __FILE__ << " " << __LINE__ << ":" << "pthread_creat: error!" << std::endl;
// printf("pthread_create failed with error: %s\n", strerror(res));
return false;
}
else
{
return true;
}
}
bool Stop()
{
if (_isrunning)
{
int res = ::pthread_cancel(_tid);
if (res != 0)
{
// std::cerr << __FILE__ << " " << __LINE__ << ":" << "pthread_cancel: error!" << std::endl;
// printf("pthread_cancel failed with error: %s\n", strerror(res));
LOG(ERROR, "pthread_cancel failed with error: %s\n", strerror(res));
return false;
}
else
{
return true;
}
}
//走到这里,说明线程已经stop了
return true;
}
// Stop之后join
bool Join()
{
if (!_isrunning)
{
int res = ::pthread_join(_tid, nullptr);
if (res != 0)
{
// std::cout<<"Debug res:"<<res<<std::endl;
// std::cerr << __FILE__ << " " << __LINE__ << ":" << "pthread_join: error!" << std::endl;
// printf("pthread_join failed with error: %s\n", strerror(res));
LOG(ERROR, "pthread_join failed with error: %s\n", strerror(res));
return false;
}
else
{
return true;
}
}
else
{
// Running时join
LOG(INFO, "join while Running\n");
// std::cerr << "join while Running" << std::endl;
return false;
}
}
~Thread() {}
bool Status() { return _isrunning; }
std::string Name() { return _name; }
private:
std::string _name; // 线程的名字
pthread_t _tid; // 线程的类型
bool _isrunning; // 线程是否在运行
func_t _func; // 线程将要执行的回调函数
};
}; // namespace ThreadMoudle
以上代码实现了一个线程管理模块 ThreadMoudle
,其中包含一个线程类 Thread
。它封装了POSIX线程(pthread
)的创建、启动、停止和加入(join
)操作,并提供了一些辅助功能。
设计要点
-
线程管理:
- 该模块封装了
POSIX线程
的基本操作,提供了一个易于使用的接口来管理线程的创建、启动、停止和加入操作。
- 该模块封装了
-
可重用性和可扩展性:
- 通过使用
std::function<void(const std::string &name)>
,可以将任意符合该签名的回调函数传递给线程对象,从而实现任务的多样性和灵活性。
- 通过使用
-
线程安全性:
- 通过内部状态标志(
_isrunning
)和适当的日志记录,确保线程的状态和操作的一致性,避免了多次启动或停止同一线程的问题。
- 通过内部状态标志(
主要功能
-
线程创建和启动:
Thread::Start()
:调用pthread_create
创建一个新线程,并启动执行Routine
函数。Routine
函数将调用实例的Excute
方法执行具体任务。Routine
函数被声明为静态函数,以避免this
指针干扰类型匹配。
-
线程执行具体任务:
Thread::Excute()
:设置线程状态为运行中,并调用回调函数_func
执行具体任务。
-
线程停止:
Thread::Stop()
:调用pthread_cancel
取消运行中的线程。对于不符合逻辑的情况:线程未运行,则记录错误日志。
-
线程加入:
Thread::Join()
:调用pthread_join
等待线程终止。对于不符合逻辑的情况:线程仍在运行,则记录错误日志。
-
日志记录:
- 使用日志模块(
log_ddsm
)记录线程操作中的错误信息,便于调试和问题排查。
- 使用日志模块(
使用示例
#include "Thread.hpp"
void MyTask(const std::string &name)
{
std::cout << "Thread " << name << " is running." << std::endl;
int count = 5;
while(count--)
{
std::cout<< count<<std::endl;
}
}
int main()
{
ThreadMoudle::Thread myThread("TestThread", MyTask);
if (myThread.Start())
{
std::cout << "Thread started successfully." << std::endl;
}
// 等待线程完成任务
myThread.Join();
return 0;
}
结果:
在这个示例中:
- 创建一个名为
"TestThread"
的线程对象,并传递任务函数MyTask
。 - 调用
myThread.Start()
启动线程。 - 等待线程完成任务后,调用
myThread.Join()
等待线程终止。
五、基于懒汉模式的线程池主体逻辑设计
#pragma once
#include <queue>
#include <unistd.h>
#include "Log.hpp"
#include "Thread.hpp" //using pthread lib
using std::cout;
using std::endl;
using ThreadMoudle::func_t;
using namespace log_ddsm;
enum NUM
{
DEFAULTNUM = 10
};
/// @brief
/// @tparam T:任务类型
//懒汉模式设计单例ThreadPool
template <typename T>
class ThreadPool
{
private:
void LockQueue() // 加锁
{
pthread_mutex_lock(&_taskqueue_mutex);
}
void UnLockQueue() // 解锁
{
pthread_mutex_unlock(&_taskqueue_mutex);
}
void Sleep() // 在条件变量_taskqueue_cond下等待
{
pthread_cond_wait(&_taskqueue_cond, &_taskqueue_mutex);
}
void WakeUp() // 叫醒一个进程
{
pthread_cond_signal(&_taskqueue_cond);
}
void WakeUpAll() // 叫醒所有线程
{
pthread_cond_broadcast(&_taskqueue_cond);
}
bool IsEmpty()
{
return _taskqueue.empty();
}
// thread将要执行的函数逻辑
void HandlerTask(const std::string &name)
{
// 一直循环:等待任务——执行任务——等待任务,直到运行到break逻辑
while (true)
{
LockQueue();
/* 优先级上,先考虑任务队列不为空,只有任务队列为空,再考虑是否要线程退出*/
while (IsEmpty() && _isrunning) // 没有任务则等待,while——在唤醒之后先判断是否队列仍然有任务,否则再次等待
{
_sleep_num++;
Sleep();
_sleep_num--;
}
if (IsEmpty() && !_isrunning)
{
UnLockQueue();
LOG(DEBUG, "thread quit\n");
// cout << "thread quit" << endl;
break;
}
T t; // 从taskqueue中获取任务
t = _taskqueue.front();
_taskqueue.pop();
UnLockQueue();
// 执行任务,在解锁之后进行,执行任务要与从队列中获取任务并发
// 不能在临界区中进行
t();
LOG(DEBUG, "%s", name.c_str());
// cout << name << " : ";
t.Debug();
}
}
// 缺省个数——10
ThreadPool(size_t threadnum)
: _threadnum(threadnum), _taskqueue(), _isrunning(false), _sleep_num(0) // 初始化列中调用has a的构造??
{
// 初始化
pthread_mutex_init(&_taskqueue_mutex, nullptr);
pthread_cond_init(&_taskqueue_cond, nullptr);
}
// 初始化线程池
void Init()
{
func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);
for (int i = 0; i < _threadnum; ++i)
{
std::string name = "thread - " + std::to_string(i + 1);
_threads.emplace_back(name, func); // 减少一次拷贝,直接在vector内构造
// usleep(10000);
// cout << "emplaced i+1 :" << i + 1 << endl;
}
}
// 开始运行
void Start()
{
_isrunning = true; // 单进程访问不需要保护
for (int i = 0; i < _threadnum; ++i)
{
_threads[i].Start();
LOG(DEBUG, "thread - %d started \n", i + 1);
}
}
public:
static ThreadPool<T> *GetInstance()
{
if (_ptp == nullptr)
{
LockGuard lockguard(&_sig_mutex);
if (_ptp == nullptr)
{
LOG(INFO, "create threadspool\n");
_ptp = new ThreadPool<T>(DEFAULTNUM);//线程安全问题
_ptp->Init();
_ptp->Start();
}
}
else
{
LOG(INFO, "get threadspool\n");
}
return _ptp;
}
// 停止运行
void Stop()
{
LockQueue();
_isrunning = false;
WakeUpAll();
UnLockQueue();
}
// 向任务队列加入任务
void PushTask(const T &in)
{
LockQueue();
if (_isrunning == true) // 防止出现线程池想要退出但是 却被一直分配任务,导致无法退出
{
_taskqueue.push(in); // 加入任务
if (_sleep_num > 0)
WakeUp();
}
UnLockQueue();
}
~ThreadPool()
{
for (int i = 0; i < _threadnum; ++i)
{
LOG(INFO, "thread-%d joined\n", i + 1);
_threads[i].Join();
}
pthread_mutex_destroy(&_taskqueue_mutex);
pthread_cond_destroy(&_taskqueue_cond);
}
private:
size_t _threadnum; // 线程池中线程的个数
std::vector<ThreadMoudle::Thread> _threads; // vector维护线程池
std::queue<T> _taskqueue; // 任务队列
bool _isrunning; // 标识线程池是否在运行
/*多线程对taskqueue访问需要整体使用——加锁*/
pthread_mutex_t _taskqueue_mutex; // 用于维护taskqueue的mutex cond
pthread_cond_t _taskqueue_cond;
int _sleep_num; // 休眠的线程个数
static ThreadPool<T> *_ptp; // 单例对象
static pthread_mutex_t _sig_mutex;
};
// 静态成员在类外初始化
template <class T>
ThreadPool<T> *ThreadPool<T>::_ptp = nullptr;
// 第一次保护获取单例的mutex
template <class T>
pthread_mutex_t ThreadPool<T>::_sig_mutex = PTHREAD_MUTEX_INITIALIZER;
类 ThreadPool
的成员函数详解
-
LockQueue()
和UnLockQueue()
:- 这两个函数分别用来加锁和解锁
_taskqueue_mutex
互斥锁。 - 目的是保护任务队列
_taskqueue
的访问,确保在多线程环境下对任务队列的操作是线程安全的。
void LockQueue() // 加锁 { pthread_mutex_lock(&_taskqueue_mutex); } void UnLockQueue() // 解锁 { pthread_mutex_unlock(&_taskqueue_mutex); }
- 这两个函数分别用来加锁和解锁
-
Sleep()
和WakeUp()
、WakeUpAll()
:Sleep()
函数使线程在条件变量_taskqueue_cond
上等待,等待条件满足时被唤醒。WakeUp()
函数唤醒一个在_taskqueue_cond
上等待的线程。WakeUpAll()
函数唤醒所有在_taskqueue_cond
上等待的线程。- 这些函数用于实现线程休眠和唤醒机制,当任务队列为空时,线程进入休眠状态;当有新任务加入时,唤醒线程执行任务。
void Sleep() // 在条件变量_taskqueue_cond下等待 { pthread_cond_wait(&_taskqueue_cond, &_taskqueue_mutex); } void WakeUp() // 叫醒一个进程 { pthread_cond_signal(&_taskqueue_cond); } void WakeUpAll() // 叫醒所有线程 { pthread_cond_broadcast(&_taskqueue_cond); }
-
IsEmpty()
:- 检查任务队列是否为空。
- 返回布尔值,表示任务队列是否为空。
bool IsEmpty() { return _taskqueue.empty(); }
-
HandlerTask(const std::string &name)
:- 线程执行的主要函数逻辑。
- 线程在循环中等待任务队列中的任务,当有任务时,从任务队列中取出任务并执行。
- 当任务队列为空且线程池不再运行时,线程退出循环,结束执行。
// thread将要执行的函数逻辑 void HandlerTask(const std::string &name) { // 一直循环:等待任务——执行任务——等待任务,直到运行到break逻辑 while (true) { LockQueue(); /* 优先级上,先考虑任务队列不为空,只有任务队列为空,再考虑是否要线程退出*/ while (IsEmpty() && _isrunning) // 没有任务则等待,while——在唤醒之后先判断是否队列仍然有任务,否则再次等待 { _sleep_num++; Sleep(); _sleep_num--; } if (IsEmpty() && !_isrunning) { UnLockQueue(); LOG(DEBUG, "thread quit\n"); break; } T t; // 从taskqueue中获取任务 t = _taskqueue.front(); _taskqueue.pop(); UnLockQueue(); // 执行任务,在解锁之后进行,执行任务要与从队列中获取任务并发 // 不能在临界区中进行 t(); LOG(DEBUG, "%s", name.c_str()); // cout << name << " : "; t.Debug(); } }
-
构造函数
ThreadPool(size_t threadnum)
:- 初始化线程池,包括线程数量、任务队列、互斥锁和条件变量。
- 设置
_threadnum
为线程池中的线程数量,初始化其他成员变量。
ThreadPool(size_t threadnum) : _threadnum(threadnum), _taskqueue(), _isrunning(false), _sleep_num(0) { pthread_mutex_init(&_taskqueue_mutex, nullptr); pthread_cond_init(&_taskqueue_cond, nullptr); }
-
Init()
:- 初始化线程池中的线程。
- 创建
thread
对象,并将HandlerTask
绑定为线程执行的任务函数。这一操作的目的是为了使emplace_back
传入Thread构造函数
的类型匹配。
void Init() { func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1); for (int i = 0; i < _threadnum; ++i) { std::string name = "thread - " + std::to_string(i + 1); _threads.emplace_back(name, func); } }
-
Start()
:- 启动线程池中的线程。
- 设置
_isrunning
为true
,然后启动线程池中每个线程。
void Start() { _isrunning = true; for (int i = 0; i < _threadnum; ++i) { _threads[i].Start(); LOG(DEBUG, "thread - %d started \n", i + 1); } }
-
GetInstance()
:- 获取线程池的单例实例。
- 使用双重检查锁定机制确保线程安全。
static ThreadPool<T> *GetInstance() { if (_ptp == nullptr) { LockGuard lockguard(&_sig_mutex); if (_ptp == nullptr) { LOG(INFO, "create threadspool\n"); _ptp = new ThreadPool<T>(DEFAULTNUM); _ptp->Init(); _ptp->Start(); } } else { LOG(INFO, "get threadspool\n"); } return _ptp; }
-
Stop()
:- 停止线程池的运行。
- 设置
_isrunning
为false
,并唤醒所有休眠的线程。
void Stop() { LockQueue(); _isrunning = false; WakeUpAll(); UnLockQueue(); }
-
PushTask(const T &in)
:- 向任务队列中添加任务。
- 如果线程池正在运行,添加任务到任务队列,并唤醒一个休眠的线程。
void PushTask(const T &in) { LockQueue(); if (_isrunning == true) { _taskqueue.push(in); if (_sleep_num > 0) WakeUp(); } UnLockQueue(); }
-
析构函数
~ThreadPool()
:- 销毁线程池,等待所有线程结束并释放资源。
- 销毁互斥锁和条件变量。
~ThreadPool() { for (int i = 0; i < _threadnum; ++i) { LOG(INFO, "thread-%d joined\n", i + 1); _threads[i].Join(); } pthread_mutex_destroy(&_taskqueue_mutex); pthread_cond_destroy(&_taskqueue_cond); }
静态成员初始化
- 静态成员变量
_ptp
和_sig_mutex
在类外进行初始化。
template <class T>
ThreadPool<T> *ThreadPool<T>::_ptp = nullptr;
template <class T>
pthread_mutex_t ThreadPool<T>::_sig_mutex = PTHREAD_MUTEX_INITIALIZER;
六、总结
ThreadPool
类实现了一个线程池的基本功能,当然你也可以在这个线程池的基础上扩充实现自己需要的功能。
设计中使用单例模式确保只有一个线程池实例,使用互斥锁和条件变量保证多线程环境下的线程安全,使用日志记录监控线程池的运行状态。
完~
转载请注明出处