【Linux】信号量,线程池
目录
信号量
初始化编辑
销毁
等待
发布
基于环形队列的生产消费模型
问题解答:
代码:
线程池
线程池的实现
(1)初始化,构造大致框架
(2)创建线程
(3)创建任务
(4)完善线程要执行的任务
(5)更新初始化Init()函数
代码:
日志
信号量
POSIX 信号量和 SystemV 信号量作用相同, 都是用于同步操作, 达到无冲突的访问共
享资源目的。 但 POSIX 可以用于线程间同步。
信号量本质就是一个计数器,对公共资源的预定机制;
初始化
sem_t _data_sem;
sem_init(&_data_sem,0,0);
//pshared:0 表示线程间共享, 非零表示进程间共享
//value: 信号量初始值
销毁
sem_destroy(&_data_sem);
等待
功能: 等待信号量, 会将信号量的值减 1;
void P(sem_t &s)//申请信号量,--
{
sem_wait(&s);
}
发布
功能: 发布信号量, 表示资源使用完毕, 可以归还资源了。 将信号量值加 1。
void V(sem_t &s)//释放资源,++
{
sem_post(&s);
}
基于环形队列的生产消费模型
之前我们写的生产消费模型是基于 queue 的,其空间可以动态分配,现在基于固定大小
的环形队列重写这个程序(POSIX 信号量) ;
环形队列采用数组模拟, 用模运算来模拟环状特性;
(1)初始化
(2)完善生产消费代码
(3)完善main
当然任务不止是参数,也可以是类:
问题解答:
(1)上面的是单生产单消费的例子,那多生产多消费呢?
两个锁:多个生产者竞争一个锁,多个消费者竞争一个锁;其实本质还是单生产单消费;但是由于处理数据和构造数据都需要时间,所以多生产多消费效率更改高;
(2)在多生产多消费时,是先加锁还是先申请信号量?
先申请信号量;这个问题就好比你去电影院看电影,是先排队(此时你并没有买票)还是先买票的问题,肯定是先买票效率更高,要不然排到你,你没有电影票还是进不去;
(3)为什么信号量对资源进行使用、申请时,不判断一下条件是否满足?
因为信号量本身就是一个判断条件;
代码:
#pragma once
#include<iostream>
#include<pthread.h>
#include<semaphore.h>
#include<vector>
#include<string>
#include<unistd.h>
#include<sys/types.h>
using namespace std;
const int defaultcp =5;
template<typename T>
class Ringqueue
{
private:
void P(sem_t &s)//申请信号量,--
{
sem_wait(&s);
}
void V(sem_t &s)//释放资源,++
{
sem_post(&s);
}
public:
Ringqueue(int max_cp = defaultcp):_max_cp(max_cp),_ringqueue(max_cp),_c_step(0),_p_step(0)
{
sem_init(&_data_sem,0,0);
sem_init(&_space_sem,0,max_cp);
}
~Ringqueue()
{
sem_destroy(&_data_sem);
sem_destroy(&_space_sem);
}
void Push(const T &in)//生产
{
P(_space_sem);
_ringqueue[_p_step]=in;
_p_step++;
_p_step%=_max_cp;
V(_data_sem);
}
void Pop(T *out)//消费
{
P(_data_sem);
*out=_ringqueue[_c_step];
_c_step++;
_c_step%=_max_cp;
V(_space_sem);
}
private:
vector<T>_ringqueue;
int _p_step;
int _c_step;
int _max_cp;
sem_t _data_sem;
sem_t _space_sem;
};
线程池
线程池其实就是一种线程的使用模式;
线程过多会带来调度开销, 进而影响缓存局部性和整体性能。 而线程池维护着多个线程, 等待着监督管理者分配可并发执行的任务。 这避免了在处理短时间任务时创建与销毁线程的代价。 线程池不仅能够保证内核的充分利用, 还能防止过分调度。
线程池的实现
(1)初始化,构造大致框架
大致框架要有多个线程(用vector维护),要有任务队列(task_queue)能生产任务;
(2)创建线程
完成线程的创建,这里我直接用的上一篇文章自己封装的线程;
(3)创建任务
(4)完善线程要执行的任务
(5)更新初始化Init()函数
这样就完成了线程池的主要内容了,剩下的就是修改一下细节部分即可;
线程池的实现:构成出大致框架,在任务的函数中,注意如果任务列表中没有任务,那么线程就要处于等待状态,如果创建出一个任务后,就可以唤醒一个线程去执行即可;
代码:
#pragma once
#include <iostream>
#include <unistd.h>
#include <string>
#include <vector>
#include <queue>
#include "Thread.hpp"
using namespace std;
const int defaultnum = 5;
template <typename T>
class ThreadPool
{
private:
void WakeUpAll()
{
pthread_cond_broadcast(&_cond);
}
void Lock()
{
pthread_mutex_lock(&_mutex);
}
void Unlock()
{
pthread_mutex_unlock(&_mutex);
}
void WakeUp()
{
pthread_cond_signal(&_cond);
}
bool isEmpty()
{
return _task_queue.empty();
}
void HandlerTask(const string &name)
{
while (true)
{
// 取任务
Lock();
while (isEmpty() && _isrunning)
{
// 休眠
_sleep_num++;
pthread_cond_wait(&_cond, &_mutex);
_sleep_num--;
}
if (isEmpty() && !_isrunning)
{
cout << name << "quit..." << endl;
Unlock();
break;
}
// 有任务
T t = _task_queue.front();
_task_queue.pop();
Unlock();
// 处理任务
t();
cout << name << t.Excute() << "任务处理完" << endl;
}
}
public:
ThreadPool(int thread_num = defaultnum) : _thread_num(thread_num), _isrunning(false), _sleep_num(0)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
void Init()
{
func_t func = bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);
// 创建线程
for (int i = 0; i < _thread_num; i++)
{
string name = "thread-" + to_string(i + 1);
_threads.emplace_back(name, func);
}
}
void Start()
{
_isrunning = true;
for (auto &thread : _threads)
{
thread.start();
}
}
void Stop()
{
Lock();
_isrunning = false;
WakeUpAll();
Unlock();
}
void Equeue(const T &in)
{
Lock();
if (_isrunning)
{
// 生产任务
_task_queue.push(in);
// 唤醒线程
if (_sleep_num > 0)
{
WakeUp();
}
}
Unlock();
}
private:
int _thread_num;
vector<Thread> _threads; // 线程
queue<T> _task_queue; // 任务,共享资源
bool _isrunning;
int _sleep_num; // 休眠的个数
pthread_mutex_t _mutex;
pthread_cond_t _cond;
};
日志
日志:软件运行的记录信息、向显示器打印、向文件打印、特定的格式;
【日志等级】【pid】【filename】【filenumber】【time】日志内容(支持可变参数)
日志等级:DEBUG、INFO、WARNING、ERROR、FATAL(致命的);
在初始化的时候,主要就是可变参数的初始化
其实写到上面这一步就以及完成的日志的实现;
我们来运行一下代码来看看:
main函数:
cout<<gettime()<<endl;
Log lg;
lg.logMessage("main.cc",10,DEBUG,"hello %d,world%c,hello %f\n",10,'A',3.14);
下面我们只需要完善一下使该日志信息可以向显示器中打印,也可以向文件中打印;
设置一个类型_type;默认是向显示器打印;在执行打印代码时,只需要判断一下_type即可;
如果是向显示器打印,直接printf即可;如果是文件中打印,需要先打开对应的文件,在将日志信息写入;
完整代码:
#pragma once
#include <iostream>
#include <unistd.h>
#include <sys/types.h>
#include <ctime>
#include <stdarg.h>
// #include<stdio.h>
#include<ostream>
#include <fstream>
using namespace std;
#define SCREEN_TYPE 1
#define FILE_TYPE 2
const string glogfile ="./log.txt";
// 日志等级
enum
{
DEBUG = 1,
INFO,
WARNING,
ERROR,
FATAL
};
string levelTo_string(int level)
{
switch (level)
{
case DEBUG:
return "DEBUG";
case INFO:
return "INFO";
case WARNING:
return "WARNING";
case ERROR:
return "ERROR";
case FATAL:
return "FATAL";
default:
return "UNKNOWN";
}
}
string gettime()
{
time_t now = time(nullptr); // now就是时间戳
struct tm *curr_time = localtime(&now);
char buff[128];
snprintf(buff, sizeof(buff), "%d-%02d-%02d : %02d-%02d-%02d",
curr_time->tm_year + 1900,
curr_time->tm_mon + 1,
curr_time->tm_mday,
curr_time->tm_hour,
curr_time->tm_min,
curr_time->tm_sec);
return buff;
}
class Logmessage
{
public:
string _level;
pid_t _pid;
string _filename;
int _filenumber;
string _curr_time;
string _message_info;
};
class Log
{
private:
void FlushLogScreen(Logmessage &lg)
{
printf("[%s][%d][%s][%d] %s",lg._level.c_str(),lg._pid,lg._filename.c_str(),lg._filenumber,lg._message_info.c_str());
}
void FlushLogFile(Logmessage &lg)
{
ofstream out(_logfile.c_str());
if(!out.is_open())return;
char buff[2048];
snprintf(buff,sizeof(buff),"[%s][%d][%s][%d] %s",lg._level.c_str(),lg._pid,lg._filename.c_str(),lg._filenumber,lg._message_info.c_str());
out.write(buff,strlen(buff));
out.close();
}
public:
Log(const string &logfile = glogfile) : _type(SCREEN_TYPE), _logfile(logfile)
{
}
~Log()
{
}
void Enable(int type)
{
_type = type;
}
void FlushLog(Logmessage &lg)
{
switch (_type)
{
case SCREEN_TYPE:
FlushLogScreen(lg);
break;
case FILE_TYPE:
FlushLogFile(lg);
break;
}
}
void logMessage(string filename, int filenumber, int level, const char *format, ...)
{
Logmessage lg;
lg._level = levelTo_string(level);
lg._pid = getpid();
lg._filename = filename;
lg._filenumber = filenumber;
lg._curr_time = gettime();
va_list ap;
va_start(ap, format);
char log_info[1024];
vsnprintf(log_info, sizeof(log_info), format, ap);
va_end(ap);
lg._message_info = log_info;
FlushLog(lg);
}
private:
int _type;
string _logfile;
};