Linux从0到1——线程池【利用日志Debug】
Linux从0到1——线程池
- 1. 简易日志系统
- 1.1 可变参数列表
- 1.2 设计日志
- 1.3 改进
- 2. 线程池
- 2.1 需要用到的头文件
- 2.2 需求分析
- 2.3 实现细节
- 2.4 完整代码
- 3. 改造单例版本
- 4. 改进思路
1. 简易日志系统
1.1 可变参数列表
1. printf
- 我们其实一直在使用可变参数列表,
printf
函数就使用了它。
- 第一个参数
format
是一个字符串,指定可变参数的数据格式。第二个参数...
就是可变参数。
2. 自己设计一个可变参数的接口,计算任意整数的和
#include <iostream>
#include <cstdarg> // 包含处理可变参数的宏
// 函数声明
int sum(int count, ...);
// 函数定义
int sum(int count, ...)
{
int total = 0;
va_list args; // 定义一个 va_list 对象,这是一个char *类型(或者void *)的指针
// 初始化 va_list 对象
va_start(args, count);
// 遍历所有参数
for (int i = 0; i < count; ++i)
{
total += va_arg(args, int); // 获取下一个参数并累加
}
// 清理 va_list 对象
va_end(args);
return total;
}
int main()
{
// 测试函数
int result = sum(5, 1, 2, 3, 4, 5);
std::cout << "Sum: " << result << std::endl;
result = sum(3, 10, 20, 30);
std::cout << "Sum: " << result << std::endl;
return 0;
}
va_list
:是一个char*
或void*
的指针类型,不同编译器可能有所差别。va_start
:初始化va_list
对象,使其指向第一个可变参数。第二个参数固定传可变参数的前一个参数。va_arg
:从可变参数列表中获取下一个参数。需要指定参数的类型(在这里是int
)。va_end
:清理va_list
对象,确保资源被正确释放(就是将该指针置空)。
1.2 设计日志
1. 需求分析
- 首先,这个日志中的信息要有各种等级,来表明各个信息的重要程度,和他们的意义;
- 日志信息要能向显示器写入,也能向文件写入。向文件写入时,可以向一个文件写入,也可以按信息不同等级,向不同等级文件写入;
- 每一条日志信息的组成:[等级][写入时间][进程id] 用户自定义信息。
2. 大致思路
- 设计两个枚举类型,一个来表示信息等级,一个来表示信息输出的方式。
log
类对外只暴漏一个接口,功能是写入日志信息。
3. 完整的日志类实现Log.hpp
#pragma once
#include<iostream>
#include<string>
#include<cstdarg>
#include<ctime>
#include<fstream>
#include<sys/types.h>
#include<sys/stat.h>
#include<fcntl.h>
#include<unistd.h>
enum
{
Debug = 0,
Info, // 正常信息
Warning, // 告警
Error, // 错误
Fatal // 致命错误
};
enum
{
Screen = 0, // 向显示器打印
OneFile, // 向一个文件打印
ClassFile // 向多个文件打印
};
// 将日志等级转换为string
std::string LevelToString(int level)
{
switch(level)
{
case Debug:
return "Debug";
case Info:
return "Info";
case Warning:
return "Warning";
case Error:
return "Error";
case Fatal:
return "Fatal";
default:
return "Unknown";
}
}
const int defaultstyle = Screen; // 默认风格是向显示器打印
const std::string default_filename = "log."; // 默认文件名
const std::string logdir = "log"; // 默认日志文件夹
class Log
{
public:
Log()
:_style(defaultstyle)
,_filename(default_filename)
{
mkdir(logdir.c_str(), 0775);
}
void Enable(int style)
{
_style = style;
}
std::string TimeStampExLocalTime()
{
time_t currtime = time(nullptr);
struct tm *curr = localtime(&currtime);
char time_buffer[128];
snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d:%d:%d", \
curr->tm_year+1900, curr->tm_mon+1, curr->tm_mday,\
curr->tm_hour, curr->tm_min, curr->tm_sec);
return time_buffer;
}
void WriteLogToOneFile(const std::string &logname, const std::string &message)
{
umask(0);
int fd = open(logname.c_str(), O_CREAT | O_WRONLY | O_APPEND, 0666);
if(fd < 0) return;
write(fd, message.c_str(), message.size());
close(fd);
}
void WriteLogToClassFile(const std::string &levelstr, const std::string &message)
{
std::string logname = logdir;
logname += "/";
logname += _filename;
logname += levelstr;
WriteLogToOneFile(logname, message);
}
void WriteLog(const std::string &levelstr, const std::string &message)
{
switch(_style)
{
case Screen:
std::cout << message;
break;
case OneFile:
WriteLogToClassFile("all", message);
break;
case ClassFile:
WriteLogToClassFile(levelstr, message);
break;
default:
break;
}
}
void LogMessage(int level, const char* format, ...) // 类C的一个日志接口
{
char rightbuffer[1024];
va_list args; // 这是一个char *类型(或者void *)的指针
va_start(args, format); // 让arg指向可变部分
vsnprintf(rightbuffer, sizeof(rightbuffer), format, args); // 将可变部分按照指定格式写入到content中
va_end(args); // 释放args, args = nullptr
char leftbuffer[1024];
std::string levelstr = LevelToString(level);
std::string currtime = TimeStampExLocalTime(); // 获取当前时间
std::string idstr = std::to_string(getpid());
snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%s][%s] ", \
levelstr.c_str(), currtime.c_str(), idstr.c_str());
std::string loginfo = leftbuffer;
loginfo += rightbuffer;
WriteLog(levelstr, loginfo);
}
~Log()
{}
private:
int _style;
std::string _filename;
};
LogMessage
是对外暴漏的接口,用户可以通过该接口,写入格式化的日志信息。第一个参数是信息等级,第二个是信息格式,第三个是可变参数列表。- 日志在使用时需要
Enable
使能,指定日志的输出方式。 - 日志信息统一放在一个的文件夹中,名字默认是
log
。向文件中写入时,文件名的格式是:log.等级
。其中如果指定向一个文件中写入,文件名是log.all
。
4. 测试代码:
int main()
{
Log log;
log.Enable(OneFile);
log.LogMessage(Debug, "hello %d world %lf\n", 10, 3.14);
log.LogMessage(Error, "hello %d world %lf\n", 10, 3.14);
log.LogMessage(Warning, "hello %d world %lf\n", 10, 3.14);
log.LogMessage(Info, "hello %d world %lf\n", 10, 3.14);
log.LogMessage(Fatal, "hello %d world %lf\n", 10, 3.14);
log.LogMessage(Debug, "hello %d world %lf\n", 10, 3.14);
log.LogMessage(Debug, "hello %d world %lf\n", 10, 3.14);
return 0;
}
1.3 改进
1. 线程安全问题
- 向文件或显示器写入时,不是线程安全的,可以考虑对
WriteLog
方法上锁。
#include"LockGuard.hpp"
...
class Log
{
public:
Log()
:_style(defaultstyle)
,_filename(default_filename)
{
mkdir(logdir.c_str(), 0775);
pthread_mutex_init(&_mutex, nullptr);
}
...
void LogMessage(int level, const char* format, ...) // 类C的一个日志接口
{
...
{
LockGuard lockguard(&_mutex);
WriteLog(levelstr, loginfo);
}
}
~Log()
{}
private:
int _style;
std::string _filename;
pthread_mutex_t _mutex;
};
LockGuard.hpp
是我们之前实现过的守护锁。
2. 配置log
- 以下代码可以写在
Log.hpp
头文件中,配置log
只需要修改Config
类的构造函数即可。
// 配置log
Log log;
class Config
{
public:
Config()
{
// 在此配置
log.Enable(ClassFile);
}
~Config()
{}
};
Config config;
2. 线程池
2.1 需要用到的头文件
1. Log.hpp
#pragma once
#include<iostream>
#include<string>
#include<cstdarg>
#include<ctime>
#include<fstream>
#include<sys/types.h>
#include<sys/stat.h>
#include<fcntl.h>
#include<unistd.h>
#include"LockGuard.hpp"
enum
{
Debug = 0,
Info, // 正常信息
Warning, // 告警
Error, // 错误
Fatal // 致命错误
};
enum
{
Screen = 0, // 向显示器打印
OneFile, // 向一个文件打印
ClassFile // 向多个文件打印
};
// 将日志等级转换为string
std::string LevelToString(int level)
{
switch(level)
{
case Debug:
return "Debug";
case Info:
return "Info";
case Warning:
return "Warning";
case Error:
return "Error";
case Fatal:
return "Fatal";
default:
return "Unknown";
}
}
const int defaultstyle = Screen; // 默认风格是向显示器打印
const std::string default_filename = "log."; // 默认文件名
const std::string logdir = "log"; // 默认日志文件夹
class Log
{
public:
Log()
:_style(defaultstyle)
,_filename(default_filename)
{
mkdir(logdir.c_str(), 0775);
pthread_mutex_init(&_mutex, nullptr);
}
void Enable(int style)
{
_style = style;
}
std::string TimeStampExLocalTime()
{
time_t currtime = time(nullptr);
struct tm *curr = localtime(&currtime);
char time_buffer[128];
snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d:%d:%d", \
curr->tm_year+1900, curr->tm_mon+1, curr->tm_mday,\
curr->tm_hour, curr->tm_min, curr->tm_sec);
return time_buffer;
}
void WriteLogToOneFile(const std::string &logname, const std::string &message)
{
umask(0);
int fd = open(logname.c_str(), O_CREAT | O_WRONLY | O_APPEND, 0666);
if(fd < 0) return;
write(fd, message.c_str(), message.size());
close(fd);
}
void WriteLogToClassFile(const std::string &levelstr, const std::string &message)
{
std::string logname = logdir;
logname += "/";
logname += _filename;
logname += levelstr;
WriteLogToOneFile(logname, message);
}
void WriteLog(const std::string &levelstr, const std::string &message)
{
switch(_style)
{
case Screen:
std::cout << message;
break;
case OneFile:
WriteLogToClassFile("all", message);
break;
case ClassFile:
WriteLogToClassFile(levelstr, message);
break;
default:
break;
}
}
void LogMessage(int level, const char* format, ...) // 类C的一个日志接口
{
char rightbuffer[1024];
va_list args; // 这是一个char *类型(或者void *)的指针
va_start(args, format); // 让arg指向可变部分
vsnprintf(rightbuffer, sizeof(rightbuffer), format, args); // 将可变部分按照指定格式写入到content中
va_end(args); // 释放args, args = nullptr
char leftbuffer[1024];
std::string levelstr = LevelToString(level);
std::string currtime = TimeStampExLocalTime(); // 获取当前时间
std::string idstr = std::to_string(getpid());
snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%s][%s] ", \
levelstr.c_str(), currtime.c_str(), idstr.c_str());
std::string loginfo = leftbuffer;
loginfo += rightbuffer;
{
LockGuard lockguard(&_mutex);
WriteLog(levelstr, loginfo);
}
}
~Log()
{}
private:
int _style;
std::string _filename;
pthread_mutex_t _mutex;
};
// 配置log
Log log;
class Config
{
public:
Config()
{
// 在此配置
log.Enable(ClassFile);
}
~Config()
{}
};
Config config;
2. Thread.hpp
#pragma once
#include <iostream>
#include <string>
#include <functional>
template<class T>
using func_t = std::function<void(T&)>;
template<class T>
class Thread
{
public:
Thread(const std::string &threadname, func_t<T> func, T &data)
:_tid(0), _threadname(threadname), _isrunning(false), _func(func), _data(data)
{}
static void *ThreadRoutine(void* args)
{
Thread *ts = static_cast<Thread *>(args);
ts->_func(ts->_data);
return nullptr;
}
bool Start()
{
int n = pthread_create(&_tid, nullptr, ThreadRoutine, this);
if(n == 0)
{
_isrunning = true;
return true;
}
else return false;
}
bool Join()
{
if(!_isrunning) return false;
int n = pthread_join(_tid, nullptr);
if(n == 0)
{
_isrunning = false;
return true;
}
return false;
}
bool IsRunning()
{
return _isrunning;
}
std::string ThreadName()
{
return _threadname;
}
~Thread()
{}
private:
pthread_t _tid; // 库级别线程id
std::string _threadname; // 线程名
bool _isrunning; // 运行状态
func_t<T> _func; // 线程执行的回调方法
T _data;
};
3. LockGuard.hpp
#pragma once
#include <pthread.h>
// 不定义锁,默认认为外部会给我们传入锁对象
class Mutex
{
public:
Mutex(pthread_mutex_t *lock):_lock(lock)
{}
void Lock()
{
pthread_mutex_lock(_lock);
}
void Unlock()
{
pthread_mutex_unlock(_lock);
}
~Mutex()
{}
private:
pthread_mutex_t *_lock;
};
class LockGuard
{
public:
LockGuard(pthread_mutex_t *lock):_mutex(lock)
{
_mutex.Lock();
}
~LockGuard()
{
_mutex.Unlock();
}
private:
Mutex _mutex;
};
4. Task.hpp
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
const int defaultvalue = 0;
enum
{
ok = 0,
div_zero, // 除0错误
mod_zero, // 模0错误
unknow // 未知错误
};
const std::string opers = "+-*/%)(&";
class Task
{
public:
Task()
{
}
Task(int x, int y, char op)
: data_x(x), data_y(y), oper(op), result(defaultvalue), code(ok)
{
}
void Run()
{
switch (oper)
{
case '+':
result = data_x + data_y;
break;
case '-':
result = data_x - data_y;
break;
case '*':
result = data_x * data_y;
break;
case '/':
{
if (data_y == 0)
code = div_zero;
else
result = data_x / data_y;
}
break;
case '%':
{
if (data_y == 0)
code = mod_zero;
else
result = data_x % data_y;
}
break;
default:
code = unknow;
break;
}
}
std::string PrintTask()
{
std::string s;
s = std::to_string(data_x);
s += oper;
s += std::to_string(data_y);
s += "=?";
return s;
}
std::string PrintResult()
{
std::string s;
s = std::to_string(data_x);
s += oper;
s += std::to_string(data_y);
s += "=";
s += std::to_string(result);
s += " [";
s += std::to_string(code);
s += "]";
return s;
}
void operator()()
{
Run();
}
~Task()
{
}
private:
int data_x;
int data_y;
char oper; // + - * / %
int result;
int code; // 结果码,0: 结果可信 !0: 结果不可信,1,2,3,4
};
2.2 需求分析
这个线程池要有一个任务队列queue
,和一个线程数组vector
,任务由外部Push
进线程池内部的任务队列中,多个线程并发的从任务队列拿任务,并执行。
并发访问任务队列时,会有线程安全问题,需要控制同步和互斥:
Push
操作一定是互斥的,Pop
操作一定是互斥的;- 如果任务队列为空,则让线程等待,
Push
进去一个任务,就激活一个线程。
每一个线程需要执行的逻辑是,先从任务队列中拿任务,如果没拿到(说明任务队列为空),就先等待;拿到了就立即执行该任务。
2.3 实现细节
1. 线程池的内部成员变量
- 一个任务队列;
- 一个线程数组;
- 一个整数类型
_thread_num
记录线程池中的线程数量; - 一个互斥量和一个条件变量,用来控制访问任务队列时的同步和互斥。
template <class T>
class ThreadPool
{
...
private:
std::queue<T> _q;
std::vector<Thread<ThreadData>> _threads; // ThreadData是线程信息类
int _thread_num; // 线程个数
pthread_mutex_t _mutex;
pthread_cond_t _cond;
};
2. 设计一个线程信息类,存储每个线程的名字(可扩展)
class ThreadData
{
public:
ThreadData(const std::string &threadname)
:_threadname(threadname)
{}
~ThreadData()
{}
public:
std::string _threadname;
};
3. 构造函数和析构函数
- 构造函数需要初始化互斥量和条件变量,并且创建指定个数的线程,不启动;
- 析构函数只需要释放互斥量和条件变量即可。
static const int default_num = 5; // 默认线程池中的线程个数
template <class T>
class ThreadPool
{
public:
ThreadPool(int thread_num = default_num)
: _thread_num(thread_num)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
// 构建指定个数的线程
for (int i = 0; i < _thread_num; i++)
{
// 待优化
std::string threadname = "thread-";
threadname += std::to_string(i + 1);
ThreadData td(threadname);
Thread<ThreadData> t(threadname, \
std::bind(&ThreadPool<T>::ThreadRun, \
this, std::placeholders::_1), td);
_threads.push_back(t);
log.LogMessage(Info, "%s is created...\n", threadname.c_str());
}
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
...
private:
std::queue<T> _q;
std::vector<Thread<ThreadData>> _threads;
int _thread_num; // 线程个数
pthread_mutex_t _mutex;
pthread_cond_t _cond;
};
std::bind(&ThreadPool<T>::ThreadRun, this, std::placeholders::_1)
是一个函数模版对象,其中ThreadRun
为每一个线程要执行的方法,这个方法是一个成员函数。
4. ThreadRun方法
- 向队列中拿任务时,需要上锁;队列中没有任务时,需要让线程等待。
template <class T>
class ThreadPool
{
public:
void ThreadWait(const ThreadData &td)
{
log.LogMessage(Debug, "no task, %s is sleeping...\n", td._threadname.c_str());
pthread_cond_wait(&_cond, &_mutex);
}
void ThreadRun(ThreadData &td)
{
while (true)
{
// 取任务
T t;
{
LockGuard lockguard(&_mutex);
while (_q.empty())
{
ThreadWait(td);
log.LogMessage(Debug, "thread, %s wake up.\n", td._threadname.c_str());
}
t = _q.front();
_q.pop();
}
// 处理任务
t();
log.LogMessage(Info, "%s handler task %s done, result is: %s\n", \
td._threadname.c_str(), t.PrintTask().c_str(), t.PrintResult().c_str());
}
}
...
private:
std::queue<T> _q;
std::vector<Thread<ThreadData>> _threads;
int _thread_num; // 线程个数
pthread_mutex_t _mutex;
pthread_cond_t _cond;
};
5. Push方法,向任务队列中放任务
- 访问队列时上锁,
Push
完成后激活一个等待的线程。
template <class T>
class ThreadPool
{
public:
void ThreadWakeup()
{
pthread_cond_signal(&_cond);
}
void Push(T &in)
{
log.LogMessage(Debug, "other thread push a task, task is: %s\n", in.PrintTask().c_str());
LockGuard lockgurad(&_mutex);
_q.push(in);
ThreadWakeup();
}
...
private:
std::queue<T> _q;
std::vector<Thread<ThreadData>> _threads;
int _thread_num; // 线程个数
pthread_mutex_t _mutex;
pthread_cond_t _cond;
};
6. 启动线程池,join线程池中的线程
template <class T>
class ThreadPool
{
public:
bool Start()
{
// 启动
for (auto &thread : _threads)
{
thread.Start();
log.LogMessage(Info, "%s is running ...\n", thread.ThreadName().c_str());
}
return true;
}
// for debug
void Wait()
{
for (auto &thread : _threads)
{
thread.Join();
}
}
...
private:
std::queue<T> _q;
std::vector<Thread<ThreadData>> _threads;
int _thread_num; // 线程个数
pthread_mutex_t _mutex;
pthread_cond_t _cond;
};
2.4 完整代码
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include <vector>
#include <functional>
#include "Log.hpp"
#include "Thread.hpp"
#include "LockGuard.hpp"
static const int default_num = 5; // 默认线程池中的线程个数
class ThreadData
{
public:
ThreadData(const std::string &threadname)
:_threadname(threadname)
{}
~ThreadData()
{}
public:
std::string _threadname;
};
template <class T>
class ThreadPool
{
public:
ThreadPool(int thread_num = default_num)
: _thread_num(thread_num)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
// 构建指定个数的线程
for (int i = 0; i < _thread_num; i++)
{
// 待优化
std::string threadname = "thread-";
threadname += std::to_string(i + 1);
ThreadData td(threadname);
Thread<ThreadData> t(threadname, \
std::bind(&ThreadPool<T>::ThreadRun, \
this, std::placeholders::_1), td);
_threads.push_back(t);
log.LogMessage(Info, "%s is created...\n", threadname.c_str());
}
}
bool Start()
{
// 启动
for (auto &thread : _threads)
{
thread.Start();
log.LogMessage(Info, "%s is running ...\n", thread.ThreadName().c_str());
}
return true;
}
void ThreadWait(const ThreadData &td)
{
log.LogMessage(Debug, "no task, %s is sleeping...\n", td._threadname.c_str());
pthread_cond_wait(&_cond, &_mutex);
}
void ThreadWakeup()
{
pthread_cond_signal(&_cond);
}
void ThreadRun(ThreadData &td)
{
while (true)
{
// 取任务
T t;
{
LockGuard lockguard(&_mutex);
while (_q.empty())
{
ThreadWait(td);
log.LogMessage(Debug, "thread, %s wake up.\n", td._threadname.c_str());
}
t = _q.front();
_q.pop();
}
// 处理任务
t();
log.LogMessage(Info, "%s handler task %s done, result is: %s\n", \
td._threadname.c_str(), t.PrintTask().c_str(), t.PrintResult().c_str());
}
}
void Push(T &in)
{
log.LogMessage(Debug, "other thread push a task, task is: %s\n", in.PrintTask().c_str());
LockGuard lockgurad(&_mutex);
_q.push(in);
ThreadWakeup();
}
// for debug
void Wait()
{
for (auto &thread : _threads)
{
thread.Join();
}
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
private:
std::queue<T> _q;
std::vector<Thread<ThreadData>> _threads;
int _thread_num; // 线程个数
pthread_mutex_t _mutex;
pthread_cond_t _cond;
};
测试代码
#include<iostream>
#include<memory>
#include"ThreadPool.hpp"
#include"Task.hpp"
#include"Log.hpp"
int main()
{
std::unique_ptr<ThreadPool<Task>> tp(new ThreadPool<Task>());
tp->Start();
srand((uint64_t)time(nullptr) ^ getpid());
while(true)
{
int x = rand() % 100 + 1;
usleep(1234);
int y = rand() % 200;
usleep(1234);
char oper = opers[rand() % opers.size()];
Task t(x, y, oper);
tp->Push(t);
sleep(1);
}
tp->Wait();
return 0;
}
3. 改造单例版本
线程池一般在整个程序中只有唯一一份,适合用单例模式实现,下面实现“懒汉”线程池。
第一步:构造函数私有化,并禁掉拷贝构造和赋值
template <class T>
class ThreadPool
{
private:
ThreadPool(int thread_num = default_num)
: _thread_num(thread_num)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
// 构建指定个数的线程
for (int i = 0; i < _thread_num; i++)
{
// 待优化
std::string threadname = "thread-";
threadname += std::to_string(i + 1);
ThreadData td(threadname);
Thread<ThreadData> t(threadname, \
std::bind(&ThreadPool<T>::ThreadRun, \
this, std::placeholders::_1), td);
_threads.push_back(t);
log.LogMessage(Info, "%s is created...\n", threadname.c_str());
}
}
ThreadPool(const ThreadPool<T> &tp) = delete;
const ThreadPool<T> &operator=(const ThreadPool<T> &tp) = delete;
public:
...
private:
...
};
第二步:提供静态指针instance,并提供静态方法GetInstance获取静态指针
template <class T>
class ThreadPool
{
private:
// 构造函数私有化,并禁掉拷贝构造和赋值
...
public:
// 有线程安全问题
static ThreadPool<T> *GetInstance()
{
LockGuard lockguard(&sig_lock);
if(instance == nullptr)
{
log.LogMessage(Info, "创建单例成功...\n");
instance = new ThreadPool<T>();
}
return instance;
}
private:
// 成员变量
...
static ThreadPool<T> *instance;
static pthread_mutex_t sig_lock;
};
template<class T>
ThreadPool<T>* ThreadPool<T>::instance = nullptr;
template<class T>
pthread_mutex_t ThreadPool<T>::sig_lock = PTHREAD_MUTEX_INITIALIZER;
特别注意:
GetInstance
函数存在线程安全问题,同时可能有多个线程满足instance == nullptr
,从而错误的调用多次new
方法,所以需要对GetInstance
函数整个上锁;- 这把锁保护的是
instance
,所以需要我们设计一个新的互斥量,并且因为GetInstance
方法是静态的,没有this
指针,所以这个新的互斥量也要是静态的; - 可是如果是上面这种写法,效率是很低的,在
instance
不为空时,任何线程想要调用GetInstance
应该是立即返回的,但是此时却要回回上锁; - 下面是
GetInstance
的改良版,保证了在instance
不为空时,任何线程想要调用GetInstance
会立即返回,不需要上锁。
template <class T>
class ThreadPool
{
private:
...
public:
// 有线程安全问题
static ThreadPool<T> *GetInstance()
{
if(instance == nullptr)
{
LockGuard lockguard(&sig_lock);
if(instance == nullptr)
{
log.LogMessage(Info, "创建单例成功...\n");
instance = new ThreadPool<T>();
}
}
return instance;
}
private:
...
static ThreadPool<T> *instance;
static pthread_mutex_t sig_lock;
};
template<class T>
ThreadPool<T>* ThreadPool<T>::instance = nullptr;
template<class T>
pthread_mutex_t ThreadPool<T>::sig_lock = PTHREAD_MUTEX_INITIALIZER;
完整单例线程池代码
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include <vector>
#include <functional>
#include "Log.hpp"
#include "Thread.hpp"
#include "LockGuard.hpp"
static const int default_num = 5; // 默认线程池中的线程个数
class ThreadData
{
public:
ThreadData(const std::string &threadname)
:_threadname(threadname)
{}
~ThreadData()
{}
public:
std::string _threadname;
};
template <class T>
class ThreadPool
{
private:
ThreadPool(int thread_num = default_num)
: _thread_num(thread_num)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
// 构建指定个数的线程
for (int i = 0; i < _thread_num; i++)
{
// 待优化
std::string threadname = "thread-";
threadname += std::to_string(i + 1);
ThreadData td(threadname);
Thread<ThreadData> t(threadname, \
std::bind(&ThreadPool<T>::ThreadRun, \
this, std::placeholders::_1), td);
_threads.push_back(t);
log.LogMessage(Info, "%s is created...\n", threadname.c_str());
}
}
ThreadPool(const ThreadPool<T> &tp) = delete;
const ThreadPool<T> &operator=(const ThreadPool<T> &tp) = delete;
public:
// 有线程安全问题
static ThreadPool<T> *GetInstance()
{
if(instance == nullptr)
{
LockGuard lockguard(&sig_lock);
if(instance == nullptr)
{
log.LogMessage(Info, "创建单例成功...\n");
instance = new ThreadPool<T>();
}
}
return instance;
}
bool Start()
{
// 启动
for (auto &thread : _threads)
{
thread.Start();
log.LogMessage(Info, "%s is running ...\n", thread.ThreadName().c_str());
}
return true;
}
void ThreadWait(const ThreadData &td)
{
log.LogMessage(Debug, "no task, %s is sleeping...\n", td._threadname.c_str());
pthread_cond_wait(&_cond, &_mutex);
}
void ThreadWakeup()
{
pthread_cond_signal(&_cond);
}
void ThreadRun(ThreadData &td)
{
while (true)
{
// 取任务
T t;
{
LockGuard lockguard(&_mutex);
while (_q.empty())
{
ThreadWait(td);
log.LogMessage(Debug, "thread, %s wake up.\n", td._threadname.c_str());
}
t = _q.front();
_q.pop();
}
// 处理任务
t();
log.LogMessage(Info, "%s handler task %s done, result is: %s\n", \
td._threadname.c_str(), t.PrintTask().c_str(), t.PrintResult().c_str());
}
}
void Push(T &in)
{
log.LogMessage(Debug, "other thread push a task, task is: %s\n", in.PrintTask().c_str());
{
LockGuard lockgurad(&_mutex);
_q.push(in);
ThreadWakeup();
}
}
// for debug
void Wait()
{
for (auto &thread : _threads)
{
thread.Join();
}
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
private:
std::queue<T> _q; // 任务队列
std::vector<Thread<ThreadData>> _threads;
int _thread_num; // 线程个数
pthread_mutex_t _mutex;
pthread_cond_t _cond;
static ThreadPool<T> *instance;
static pthread_mutex_t sig_lock
};
template<class T>
ThreadPool<T>* ThreadPool<T>::instance = nullptr;
template<class T>
pthread_mutex_t ThreadPool<T>::sig_lock = PTHREAD_MUTEX_INITIALIZER;
测试代码
#include<iostream>
#include<memory>
#include"ThreadPool.hpp"
#include"Task.hpp"
#include"Log.hpp"
int main()
{
ThreadPool<Task>::GetInstance()->Start();
srand((uint64_t)time(nullptr) ^ getpid());
while(true)
{
int x = rand() % 100 + 1;
usleep(1234);
int y = rand() % 200;
usleep(1234);
char oper = opers[rand() % opers.size()];
Task t(x, y, oper);
ThreadPool<Task>::GetInstance()->Push(t);
sleep(1);
}
ThreadPool<Task>::GetInstance()->Wait();
return 0;
}
4. 改进思路
设计一个动态线程池,可以根据当前的线程池中的线程数量和任务数量,动态创建或销毁线程。
具体思路以伪代码的形式呈现给大家,感兴趣的小伙伴可以自己实现:
template <class T>
class ThreadPool
{
private:
...
public:
...
void CheckSelf()
{
// 1. _task_num > _task_num_high_water && _thread_num < _thread_num_high_water
// 创建更多线程,并更新_thread_num
// 2. _task_num == _task_num_low_water && _thread_num >= _thread_num_high_water
// 把自己退出了,并且更新_thread_num
}
void ThreadRun(ThreadData &td)
{
while (true)
{
// 取任务
CheckSelf();
...
}
}
private:
...
// 扩展
int _task_num; // 任务数量
// 高低水位线
int _thread_num_low_water;
int _thread_num_high_water;
int _task_num_low_water;
int _task_num_high_water;
};