线程池以及日志、线程总结
一、线程池以及日志
1、基础线程池写法
主线程在main函数中构建一个线程池,初始化(Init)后开始工作(Start)
此时线程池中每个线程都已经工作起来了,只是任务队列中任务为空,所有线程处于休眠状态(通过线程同步中的条件变量实现,即pthread_cond_wait)
当主线程向任务队列中添加任务后,唤醒一个在休眠中的线程,让其执行任务,执行完后再进入休眠状态。
基础的线程池的写法:
ThreadPool.hpp:
#pragma once
#include <vector>
#include <queue>
#include <pthread.h>
#include <unistd.h>
#include "mythreadlib.hpp"
#define DEFAULT_THREAD_NUMS 5
template <typename T> // 什么类型的任务
class ThreadPool
{
private:
void test(const std::string name)
{
while (1)
{
std::cout << name << " is running" << std::endl;
sleep(1);
}
}
void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
void UnlockQueue()
{
pthread_mutex_unlock(&_mutex);
}
bool QueueIsEmpty()
{
return _task_queue.empty();
}
void Sleep()
{
pthread_cond_wait(&_cond, &_mutex);
}
void Wake()
{
pthread_cond_signal(&_cond);
}
void WakeAll()
{
pthread_cond_broadcast(&_cond);
}
void HandlerTask(const std::string name)
{
while (1)
{
LockQueue();
while (QueueIsEmpty() && _isrunning)
{
_sleep_nums++;
Sleep();
_sleep_nums--;
}
if (QueueIsEmpty() && !_isrunning)
{
UnlockQueue();
break;
}
// 苏醒过来 执行任务
T t = _task_queue.front();
_task_queue.pop();
UnlockQueue();
std::cout<<name<<" is running:";
t(); // 执行任务和获取任务一定要分开,一个在锁内,一个在锁外
}
}
public:
ThreadPool(int threadnums = DEFAULT_THREAD_NUMS)
: _threadnums(threadnums), _isrunning(false), _sleep_nums(0)
{
//_my_threads.resize(_threadnums); 不能resize会有问题
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
void Init()
{
fun_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);
// fun_t func = std::bind(&ThreadPool::test, this, std::placeholders::_1);
for (int i = 0; i < _threadnums; i++)
{
std::string name = "thread-" + std::to_string(i + 1);
_my_threads.emplace_back(name, func); // 要将test改成处理Task的函数
}
}
void Start()
{
_isrunning = true;
for (auto &thread : _my_threads)
{
thread.Start();
}
}
void Stop()
{
LockQueue();
_isrunning = false;
WakeAll();
UnlockQueue();
}
void Enqueue(const T &in)
{
// 主线程向线程池中派发任务,让线程池中的线程执行任务
LockQueue();
if (_isrunning)
{
_task_queue.push(in);
if(_sleep_nums>0) Wake();//只要有休眠的线程就进行唤醒
}
UnlockQueue();
}
private:
int _threadnums;
std::vector<mythread> _my_threads;
int _sleep_nums;
bool _isrunning; // 也是临界资源
std::queue<T> _task_queue; // 临界资源
pthread_mutex_t _mutex;
pthread_cond_t _cond;
};
mythreadlib.hpp:
#pragma once
#include <pthread.h>
#include <iostream>
#include <string>
#include <functional>
using fun_t = std::function<void(const std::string &)>;
class mythread
{
// typedef void (*fun_t)(ThreadData *td);
public:
mythread(const std::string name, fun_t func)
: _name(name), _func(func)
{
_isrunning = false;
}
static void *threadRun(void *args)
{
mythread *thread = static_cast<mythread *>(args);
// thread->_func(thread->_td);
thread->_func(thread->_name);
thread->_isrunning = false; // 运行结束
return nullptr;
}
bool Start()
{
int n = pthread_create(&_tid, nullptr, threadRun, (void *)this);
if (n != 0)
return false;
else
{
_isrunning = true;
return true;
}
}
void Stop()
{
if (_isrunning)
{
_isrunning = false;
::pthread_cancel(_tid);
}
}
void Join()
{
pthread_join(_tid, nullptr);
}
~mythread()
{
if (_isrunning)
{
Stop();
Join();
}
}
private:
pthread_t _tid;
std::string _name;
bool _isrunning;
fun_t _func;
};
task.hpp:
#pragma once
#include<iostream>
#include<functional>
// typedef std::function<void()> task_t;
// using task_t = std::function<void()>;
// void Download()
// {
// std::cout << "我是一个下载的任务" << std::endl;
// }
// 要做加法
class Task
{
public:
Task()
{
}
Task(int x, int y) : _x(x), _y(y)
{
}
void Excute()
{
_result = _x + _y;
std::cout<<result()<<std::endl;
}
void operator ()()
{
Excute();
}
std::string debug()
{
std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=?";
return msg;
}
std::string result()
{
std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=" + std::to_string(_result);
return msg;
}
private:
int _x;
int _y;
int _result;
};
main.cpp:
#include"ThreadPool.hpp"
#include"task.hpp"
#include<ctime>
#include<unistd.h>
int main()
{
//线程池
srand(time(nullptr)^getpid());
ThreadPool<Task> *thread_pool=new ThreadPool<Task>();
thread_pool->Init();
thread_pool->Start();
while(1)
{
int num1=rand()%9+1;
int num2=rand()%9+1;
Task ADD(num1,num2);
thread_pool->Enqueue(ADD);
sleep(1);
}
return 0;
}
2、日志
(1)日志概念
日志:软件运行的记录信息,可以向显示器打印,也可以向文件中打印,可以根据信息的重要程度使用不同的打印格式。
(2)基本格式
【日志等级】【pid】【filename】【filenumber】【time】日志内容(支持可变参数)
其中日志等级可分为:DEBUG(调试)、INFO(信息)、 WARNING(警告) 、ERROR(错误) 、FATAL(致命的)
可变参数的处理:用va_list指向可变部分, vsnprintf()
29. C语言 可变参数详解-CSDN博客
使用宏优化日志格式。
最终形成的日志:
#pragma once
#include <ctime>
#include <string>
#include <unistd.h>
#include <cstdarg>
#include <fstream>
#include <cstring>
#include "LockGuard.hpp"
#include <pthread.h>
#define FLUSHSCREEN 1
#define FLUSHFILE 2
const std::string default_file = "log.txt";
enum
{
DEBUG = 1,
INFO,
WARNING,
ERROR,
FATAL
};
class logmessage
{
public:
std::string _level;
int _id;
std::string _filename;
int _filenumber;
std::string _time;
std::string _content;
};
std::string LevelToString(int level)
{
switch (level)
{
case 1:
return "DEBUG";
break;
case 2:
return "INFO";
break;
case 3:
return "WARNING";
break;
case 4:
return "ERROR";
break;
case 5:
return "FATAL";
break;
}
}
std::string CurrentTime()
{
time_t now = time(nullptr);
struct tm *curr_time = localtime(&now);
char time_string[128];
snprintf(time_string, sizeof(time_string), "%d-%02d-%02d %02d:%02d:%02d", curr_time->tm_year + 1900, // 02d表示的是控制输出格式为2位整数,不够2位的用0填充
curr_time->tm_mon + 1,
curr_time->tm_mday,
curr_time->tm_hour,
curr_time->tm_min,
curr_time->tm_sec);
return time_string;
}
pthread_mutex_t _mutex=PTHREAD_MUTEX_INITIALIZER;
class Log
{
public:
Log(const std::string flush_file = default_file, int flush_type = FLUSHSCREEN)
: _flush_file(flush_file), _flush_type(flush_type)
{
//pthread_mutex_init(&_mutex, nullptr);
}
~Log()
{
//pthread_mutex_destroy(&_mutex);
}
void logMessage(int level, const std::string filename, int filenumber, const char *format, ...) // 可变参数
{
logmessage logm;
logm._level = LevelToString(level);
logm._id = getpid();
logm._filename = filename;
logm._filenumber = filenumber;
va_list vl;
va_start(vl, format);
char log_info[1024];
vsnprintf(log_info, sizeof(log_info), format, vl);
va_end(vl);
logm._content = log_info;
logm._time = CurrentTime();
FlushLog(logm);
}
void Enable(int type)
{
_flush_type = type;
}
void FlushLog(logmessage &log)
{
//pthread_mutex_lock(&_mutex);
lockguard lock(&_mutex);
switch (_flush_type)
{
case FLUSHSCREEN:
FlushToScreen(log);
break;
case FLUSHFILE:
FlushToFile(log);
break;
}
//pthread_mutex_unlock(&_mutex);
}
void FlushToScreen(logmessage &log)
{
printf("[%s][%d][%s][%d][%s] %s", log._level.c_str(), log._id, log._filename.c_str(), log._filenumber, log._time.c_str(), log._content.c_str());
}
void FlushToFile(logmessage &log)
{
std::ofstream out(_flush_file, std::ios::app);
char message[1024];
snprintf(message, sizeof(message), "[%s][%d][%s][%d][%s] %s", log._level.c_str(), log._id, log._filename.c_str(), log._filenumber, log._time.c_str(), log._content.c_str());
out.write(message, strlen(message));
out.close();
}
private:
std::string _flush_file;
int _flush_type;
};
// 优化
Log glog;
#define LOG(level, format, ...) \
do \
{ \
glog.logMessage(level, __FILE__, __LINE__, format, ##__VA_ARGS__); \
} while (0) // 带两个#是为了处理可变参数为空的情况
#define EnableScreen() \
do \
{ \
glog.Enable(1); \
} while (0)
#define EnableFile() \
do \
{ \
glog.Enable(2); \
} while (0)
3、单例模式
单例模式:一个类只创建一个对象
单例模式实现方式有两种:懒汉模式和饿汉模式。
4、最终代码
经过单例模式的修改,最终形成的线程池代码:
Threadpool.hpp:
#pragma once
#include <vector>
#include <queue>
#include <pthread.h>
#include <unistd.h>
#include "mythreadlib.hpp"
#include "Log.hpp"
#include "LockGuard.hpp"
#define DEFAULT_THREAD_NUMS 5
template <typename T> // 什么类型的任务
class ThreadPool
{
private:
void test(const std::string name)
{
while (1)
{
std::cout << name << " is running" << std::endl;
sleep(1);
}
}
void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
void UnlockQueue()
{
pthread_mutex_unlock(&_mutex);
}
bool QueueIsEmpty()
{
return _task_queue.empty();
}
void Sleep()
{
pthread_cond_wait(&_cond, &_mutex);
}
void Wake()
{
pthread_cond_signal(&_cond);
}
void WakeAll()
{
pthread_cond_broadcast(&_cond);
}
void HandlerTask(const std::string name)
{
while (1)
{
LockQueue();
while (QueueIsEmpty() && _isrunning)
{
_sleep_nums++;
LOG(DEBUG, "%s begin sleep\n", name.c_str());
Sleep();
LOG(DEBUG, "%s wake up\n", name.c_str());
_sleep_nums--;
}
if (QueueIsEmpty() && !_isrunning)
{
UnlockQueue();
break;
}
// 苏醒过来 执行任务
T t = _task_queue.front();
_task_queue.pop();
UnlockQueue();
t(); // 执行任务和获取任务一定要分开,一个在锁内,一个在锁外
LOG(DEBUG, "%s Handler task done\n", name.c_str());
}
}
ThreadPool(int threadnums = DEFAULT_THREAD_NUMS)
: _threadnums(threadnums), _isrunning(false), _sleep_nums(0)
{
//_my_threads.resize(_threadnums); 不能resize会有问题
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
ThreadPool(const ThreadPool<T> &) = delete;
void operator=(const ThreadPool<T> &) = delete;//设置拷贝构造和=操作符重载为delete
void Init()
{
fun_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);
// fun_t func = std::bind(&ThreadPool::test, this, std::placeholders::_1);
for (int i = 0; i < _threadnums; i++)
{
std::string name = "thread-" + std::to_string(i + 1);
_my_threads.emplace_back(name, func); // 要将test改成处理Task的函数
LOG(DEBUG, "construct %s done,init success\n", name.c_str());
}
}
void Start()
{
_isrunning = true;
for (auto &thread : _my_threads)
{
thread.Start();
LOG(DEBUG, "start %s done\n", thread.getname().c_str());
}
}
public:
static ThreadPool<T> *Get_Instance()
{
if (_tp == nullptr)
{
lockguard lock(&_tp_mutex);
if (_tp == nullptr)
{
_tp = new ThreadPool<T>;
_tp->Init();
_tp->Start();
LOG(DEBUG,"Thread pool create success!\n");
}
}
return _tp;
}
void Stop()
{
LockQueue();
_isrunning = false;
WakeAll();
UnlockQueue();
LOG(DEBUG,"all thread stop success!\n");
}
void Enqueue(const T &in)
{
// 主线程向线程池中派发任务,让线程池中的线程执行任务
LockQueue();
if (_isrunning)
{
_task_queue.push(in);
if (_sleep_nums > 0)
Wake(); // 只要有休眠的线程就进行唤醒
}
UnlockQueue();
}
private:
int _threadnums;
std::vector<mythread> _my_threads;
int _sleep_nums;
bool _isrunning; // 也是临界资源
std::queue<T> _task_queue; // 临界资源
pthread_mutex_t _mutex;
pthread_cond_t _cond;
static ThreadPool<T> *_tp;
static pthread_mutex_t _tp_mutex;
};
template <typename T>
ThreadPool<T> *ThreadPool<T>::_tp = nullptr;
template <typename T>
pthread_mutex_t ThreadPool<T>::_tp_mutex = PTHREAD_MUTEX_INITIALIZER;
main.cpp:
#include "ThreadPool.hpp"
#include "task.hpp"
#include "Log.hpp"
#include <ctime>
#include <unistd.h>
/*int main()
{
//线程池
srand(time(nullptr)^getpid());
ThreadPool<Task> *thread_pool=new ThreadPool<Task>();
thread_pool->Init();
thread_pool->Start();
while(1)
{
int num1=rand()%9+1;
int num2=rand()%9+1;
Task ADD(num1,num2);
thread_pool->Enqueue(ADD);
sleep(1);
}
return 0;
}*/
/*int main()
{
//测试未优化的日志
Log log1;
log1.logMessage(DEBUG, __FILE__, __LINE__, "this is a logmessage of %d,%f\n", 30, 3.14);
sleep(1);
log1.logMessage(DEBUG, __FILE__, __LINE__, "this is a logmessage of %d,%f\n", 30, 3.14);
log1.Enable(FLUSHFILE);
sleep(1);
log1.logMessage(DEBUG, __FILE__, __LINE__, "this is a logmessage of %d,%f\n", 30, 3.14);
sleep(1);
log1.logMessage(DEBUG, __FILE__, __LINE__, "this is a logmessage of %d,%f\n", 30, 3.14);
return 0;
}*/
/*int main()
{
EnableScreen();
//LOG(DEBUG,"YES");//依然支持
LOG(DEBUG,"this is a logmessage of %d,%f\n", 30, 3.14);
LOG(WARNING,"this is a logmessage of %d,%f\n", 30, 3.14);
LOG(ERROR,"this is a logmessage of %d,%f\n", 30, 3.14);
LOG(INFO,"this is a logmessage of %d,%f\n", 30, 3.14);
EnableFile();
LOG(DEBUG,"this is a logmessage of %d,%f\n", 30, 3.14);
LOG(WARNING,"this is a logmessage of %d,%f\n", 30, 3.14);
LOG(ERROR,"this is a logmessage of %d,%f\n", 30, 3.14);
LOG(INFO,"this is a logmessage of %d,%f\n", 30, 3.14);
return 0;
}*/
/*int main()
{
//线程池
srand(time(nullptr)^getpid());
ThreadPool<Task> *thread_pool=new ThreadPool<Task>();
thread_pool->Init();
thread_pool->Start();
while(1)
{
int num1=rand()%9+1;
int num2=rand()%9+1;
Task ADD(num1,num2);
thread_pool->Enqueue(ADD);
LOG(INFO,"task is product:%s\n",ADD.debug().c_str());
sleep(1);
}
return 0;
}*/
int main()
{
// 单例模式线程池的测试
srand(time(nullptr) ^ getpid());
int cnt=10;
while (cnt--)
{
int num1 = rand() % 9 + 1;
int num2 = rand() % 9 + 1;
Task ADD(num1, num2);
ThreadPool<Task>::Get_Instance()->Enqueue(ADD);
sleep(1);
}
ThreadPool<Task>::Get_Instance()->Stop();
return 0;
}
二、线程概念拓展
1、可重入和线程安全
(1)概念
(2)理解
一个函数是可重入的,那么它一定是线程安全的;但一个线程是安全的,不表示该线程内的函数是可重入的,其内部的函数可能是不可重入的,例如加锁但不释放的函数,是线程安全但不可重入函数。
2、死锁
(1)死锁概念
一个线程一把锁也可能形成死锁,例如对同一个锁加锁两次而未释放。
但更多的情况是俩个执行流,两把锁,要推进代码需要两把锁都申请到,但俩个执行流分别申请一把锁,而不释放,都同时申请对方的锁。
(2)死锁的必要条件
(3)避免死锁
破坏必要条件:
①尽量不使用锁
②当某优先级比较高的线程申请不到锁的时候强制释放锁
③申请锁的代码的顺序要一致