【项目实战】日志管理和异步任务处理系统
这是一个高效的日志管理和异步任务处理系统,提供了多级别的日志记录、灵活的日志格式化和多种日志输出目标(控制台、文件、文件滚动)。通过异步任务循环器和线程安全的任务队列,系统能够在高并发环境下处理任务,同时避免主线程的阻塞。模块化设计使得日志管理和任务处理功能可灵活扩展,适用于需要高性能和稳定性的多线程应用。
Log system 源代码地址
项目架构
1. 日志系统设计
- 功能描述:系统通过
Logger
类提供了同步和异步的日志记录功能,适用于多线程环境。日志记录可以输出到控制台、文件或其他自定义的LogSink
。使用不同级别的日志输出(DEBUG, INFO, WARN, ERROR, FATAL)可以帮助开发者根据需求进行日志的过滤。 - 日志输出机制:日志输出采用了格式化方式,通过
Formatter
类进行灵活的日志格式化,支持日期、时间、线程ID、日志级别、文件名、行号等信息的输出。此外,系统提供了StdoutSink
和FileSink
等多种Sink
类型来支持日志的输出目标,可以扩展自定义Sink
。 - 线程安全:同步日志器使用互斥锁(
mutex
)来确保日志的线程安全,异步日志器则利用AsyncLooper
将日志任务推送到后台线程进行处理,从而避免阻塞主线程。 - 日志滚动与文件管理:
RollSink
提供了日志文件滚动功能,当单个日志文件大小达到限制时,会自动切换到新的文件,确保日志管理不至于占满硬盘空间。
2. 异步任务处理
- 异步处理机制:
AsyncLooper
类通过工作线程和缓冲区实现异步任务处理。它使用条件变量(std::condition_variable
)和互斥锁(std::mutex
)来同步任务的推送和处理。通过push
方法,任务可以被推送到缓冲区,并通过worker_loop
方法异步执行回调函数处理任务。 - 任务队列管理:使用环形队列(
RingQueue
)来存储任务数据,确保队列的快速读写。环形队列适用于需要循环利用缓冲区的场景,具有较低的内存开销和较高的处理速度。
3. 缓冲区管理
- 动态缓冲区:
Buffer
类管理一个动态扩展的缓冲区,支持数据的读写和自动扩展。当写入数据超过当前缓冲区容量时,缓冲区会自动扩展(每次增大1MB)。通过push
和pop
操作,系统能高效地进行数据的读取和写入。 - 性能优化:缓冲区扩展机制设计避免了频繁的内存分配操作,从而提高了性能。通过确保有足够的可写空间,系统减少了内存重新分配的次数。
4. 数据同步与阻塞队列
- 阻塞队列设计:
BlockQueue
类提供了线程安全的阻塞队列,支持在队列为空时进行阻塞,直到有新元素可用;同样,当队列满时,推送操作会被阻塞,直到队列有空闲空间。这种设计适用于需要严格控制线程间任务同步的场景,避免了忙等待(busy-waiting)的问题。 - 生产者-消费者模式:结合环形队列和阻塞队列的设计,系统实现了生产者-消费者模型,确保了在多线程环境下任务的高效和安全处理。
模块设计
工具模块
文件util.hpp
主要包含两个类:
data
类:提供获取当前时间戳的功能。
file
类:提供与文件操作相关的功能,包括检查文件是否存在、获取文件路径、创建目录等。
在所有头文件的编写中都添加了
#ifndef ……
、#endif
,这是为了防止头文件被重复包含。这个预处理指令确保当文件被多次引用时,不会重复编译,避免编译错误。
1.1 data
类:获取当前系统时间
class date{
public:
static time_t now(){ return time(nullptr); }
};
解释与作用:
time_t
是C标准库中的时间类型,tiem(nullptr)
返回当前系统时间的Unix时间戳。- 这个函数在日志系统重可以用于记录日志的时间戳,以标记日志生成的时间。
注意:
- 静态成员函数:
static
关键字使得函数属于类,而不是类的实例。这意味着可以通过data::now()
直接调用,而无需创建data
类的对象。 - Unix 时间戳:是计算机系统表示时间的标准,即从 1970 年 1 月 1 日(称为“纪元”)到当前时间的秒数。
1.2 file
类:文件操作功能
// 检查文件或目录是否存在
static bool exists(const std::string &name){
struct stat st;
return stat(name.c_str(), &st) == 0;
}
解释与作用:
exists()
:检查文件或目录是否存在。
- 使用
stat()
系统调用获取文件信息,如果返回 0,则表示文件存在。 stat
是 POSIX 标准函数,因此仅在类 Unix 系统(如 Linux、macOS)上可用。
// 获取文件所在目录
static std::string path(const std::string &name) {
if (name.empty()) return ".";
size_t pos = name.find_last_of("/\\");
if (pos == std::string::npos) return ".";
return name.substr(0, pos + 1);
}
path()
:获取文件的目录路径。
- 如果文件路径为空,返回当前目录
"."
。 - 使用
find_last_of()
查找最后一个路径分隔符/
或\
,返回文件所在的目录部分。 - 主要用于日志系统在创建日志文件时获取文件所在目录路径。
// 递归地创建目录及其父目录
static void create_directory(const std::string &path) {
if (path.empty()) return;
if (exists(path)) return;
size_t pos, idx = 0;
while (idx < path.size()) {
pos = path.find_first_of("/\\", idx);
if (pos == std::string::npos) {
mkdir(path.c_str(), 0755);
return;
}
if (pos == idx) {
idx = pos + 1;
continue;
}
std::string subdir = path.substr(0, pos);
if (subdir == "." || subdir == "..") {
idx = pos + 1;
continue;
}
mkdir(subdir.c_str(), 0755);
idx = pos + 1;
}
}
解释与作用:
create_directory()
:递归地创建目录及其父目录。
-
先检查路径是否为空或目录是否已存在。
-
使用
mkdir()
创建目录,权限设置为0755
(所有者读写执行,组和其他用户只读执行)。 -
递归创建多级目录时,使用
find_first_of()
查找路径分隔符,依次创建子目录。
mkdir()
系统调用:用于创建目录。在 Linux 系统中,0755
表示所有者具有读写执行权限,而组和其他用户只有读执行权限。递归创建目录:通过循环查找路径分隔符,并逐层创建目录,从而支持创建多级目录。
日志级别模块
文件level.hpp
定义了一个日志级别管理类LogLevel
,用于表示和管理日志记录的严重性等级。日志级别从低到高依次为DEBUG < INFO < WARN < ERROR < FATAL
,并提供关闭日志功能(OFF
)。此外包含了将日志级别枚举值转换为字符串的方法。
代码解析与作用:
class LogLevel{
public:
enum value{
UNKNOWN = 0,// 未知/默认日志级别
DEBUG, // 调试级别
INFO, // 信息级别
WARN, // 警告级别
ERROR, // 错误级别
FATAL, // 指明错误级别
OFF // 关闭日志记录
};
// 将日志级别转换为对应的字符串表示
static const char *toString(LogLevel::value level){
switch(level){
case LogLevel::value::DEBUG: return "DEBUG";
case LogLevel::value::INFO: return "INFO";
case LogLevel::value::WARN: return "WARN";
case LogLevel::value::ERROR: return "ERROR";
case LogLevel::value::FATAL: return "FATAL";
case LogLevel::value::OFF: return "OFF";
}
return "UNKNOWN";
}
};
应用场景:
- 日志过滤:通过设置当前日志级别,筛选需要记录的日志,忽略低于当前级别的日志。
- 日志管理:明确日志的分类和优先级,为日志分析提供参考。
日志消息模块
文件message.hpp
定义了LogMsg
结构体,用于存储一条完整的日志记录。日志消息包含以下属性:
- 日志级别、时间戳、线程ID、文件名、行号、日志器名称和具体的日志内容。
代码解析与作用:
struct LogMsg{
using ptr = std::shared_ptr<LogMsg>;
size_t _line; // 行号
time_t _ctime; // 时间戳
std::string _name; // 日志器名称
std::string _file; // 文件名
std::thread::id _tid; // 线程ID
std::string _payload; // 日志消息
LogLevel::value _level; // 日志等级
LogMsg(std::string &name, std::string file, size_t line
, std::string &&payload, LogLevel::value level)
: _name(name), _file(file), _line(line), _payload(std::move(payload)), _level(level)
, _ctime(util::date::now()), _tid(std::this_thread::get_id()){
// std::cout << "构造msg\n" ;
}
//~LogMsg() { /*std::cout << "析构msg\n";*/}
};
解释与作用:
-
功能:表示日志的一条记录,记录了日志的所有关键信息。
-
成员变量解析:
_line
:记录日志生成代码的行号,便于定位问题。_ctime
:日志生成的时间戳,由util::data::now()
提供。_name
:日志器的名称,用于区分多个日志模块。_file
:日志生成的源文件路径,便于问题溯源。_tid
:线程ID,区分多线程环境下的日志来源。_payload
:日志的主要内容,即用户指定的日志信息。_level
:日志的级别,用于筛选和展示。
-
实现细节:
-
定义智能指针类型
ptr
,便于管理日志消息对象。避免手动管理内存,适合日志系统中共享日志对象的场景。 -
“日志级别”和“时间戳”直接初始化,避免后续修改。
-
使用
std::move(payload)
将日志内容移入,减少不必要的拷贝,提升性能。 -
调用
std::this_thread::get_id()
获取当前线程ID,支持多线程日志分析。
-
日志格式化模块
日志格式化
formatter.hpp
提供了日志格式化的核心模块,包含以下部分:
FormatItem
:定义日志格式化的基类,所有格式化功能通过子类实现。- 子类实现:包括消息、时间、线程ID、日志级别等格式化项的具体实现。
Formatter
类:负责解析格式化字符串,并动态生成格式化项目,最终将日志按指定格式输出。该模块实现了灵活的日志格式化能力,允许用户通过指定模式动态调整日志输出格式。
1.1 FormatItem
基类
class FormatItem {
public:
using ptr = std::shared_ptr<FormatItem>;
virtual ~FormatItem() {}
virtual void format(std::ostream &os, const LogMsg &msg) const = 0;
};
解释与作用:
- 功能:定义了一个抽象接口(纯虚函数)
format()
用于格式化日志消息,所有具体的格式化项都继承自该类,实现format()
方法。 - 策略模式(Strategy Pattern):
FormatItem
作为策略接口,具体的格式化项作为不同的策略实现。
优势:
- 开放封闭原则(OCP):可以方便地扩展新的格式化项,而无需修改现有代码。
- 多态性:通过基类指针调用子类实现,增强代码的灵活性。
1.2 具体的格式化项子类
FormatItem文件提供了具体的格式化项类,用于格式化日志消息中的各个部分,如时间、线程ID、日志级别、文件名等。每个格式化项都实现了FormatItem接口,提供了format方法用于格式化日志消息。根据格式化模式,可以组合不同的格式化项,生成格式化后的日志消息,用于输出到日志目标。
-
MsgFormatItem
(消息内容):virtual void format(std::ostream &os, const LogMsg &msg) const override { os << msg._payload; }
-
LevelFormatItem
(日志级别):virtual void format(std::ostream &os, const LogMsg &msg) const override { os << LogLevel::toString(msg._level); }
-
NameFormatItem
(日志器名称):virtual void format(std::ostream &os, const LogMsg &msg) const override { os << msg._name; }
-
ThreadFormatItem
(线程ID):virtual void format(std::ostream &os, const LogMsg &msg) const override { os << msg._tid; }
-
TimeFormatItem
(时间):virtual void format(std::ostream &os, const LogMsg &msg) const override { struct tm lt; localtime_r(&msg._ctime, <); // 将时间转化为本地时间结构 char buf[128] = { 0 }; strftime(buf, sizeof(buf) - 1, _format.c_str(), <); // 格式化时间 os << buf; }
-
CFileFormatItem
(文件名)和CLineFormatItem
(行号):输出日志产生的源文件名和行号,便于调试和错误定位。 -
TabFormatItem
和NLineFormatItem
:输出制表符\t
和换行符\n
,用于格式化日志布局。 -
OtherFormatItem
(其他字符):在格式化模式中出现的普通字符(非%
开头的部分)virtual void format(std::ostream &os, const LogMsg &msg) const override { os << _str; }
1.3 Formatter
类
Formatter类就是一个工厂,其可以根据传入格式化指定的不同创建不同的格式化子项(也就是Formatter子类的实例)
class Formatter {
public:
Formatter(const std::string &pattern = "[%d{%H:%M:%S}][%t][%p][%c][%f:%l] %m%n")
: _pattern(pattern) {
assert(parsePattern());
}
std::string format(const LogMsg &msg){ /*...*/ }
std::ostream& format(std::ostream &os, const LogMsg &msg){ /*...*/ }
private:
bool parsePattern(){ /*...*/ }
FormatItem::ptr createItem(const std::string &fmt_char, const std::string &subfmt){ /*...*/ }
std::string _pattern;
std::vector<FormatItem::ptr> _items;
};
解释与作用:解析格式化模式字符串,将其转换为格式化项列表_items
。提供format
方法,将日志消息按照格式化项序列转换为字符串或输出。
工厂方法:根据解析的格式化字符,创建对象的格式化子项指针。
实现细节:
- 解析模式字符串(
parsePattern()
):将模式字符串拆解为一系列的格式化项(FormatItem
)。
// 根据格式化字符和子格式创建对应的格式化选项
FormatItem::ptr createItem(const std::string &fmt_char, const std::string &subfmt){
if(fmt_char == "d") return FormatItem::ptr(new TimeFormatItem(subfmt));
if(fmt_char == "T") return FormatItem::ptr(new TabFormatItem());
if(fmt_char == "t") return FormatItem::ptr(new ThreadFormatItem());
if(fmt_char == "p") return FormatItem::ptr(new LevelFormatItem());
if(fmt_char == "c") return FormatItem::ptr(new NameFormatItem());
if(fmt_char == "f") return FormatItem::ptr(new CFileFormatItem());
if(fmt_char == "l") return FormatItem::ptr(new CLineFormatItem());
if(fmt_char == "m") return FormatItem::ptr(new MsgFormatItem());
if(fmt_char == "n") return FormatItem::ptr(new NLineFormatItem());
if(fmt_char.empty()) return FormatItem::ptr(new OtherFormatItem(subfmt));
std::cout << "使用非法格式化字符: %" << fmt_char << std::endl;
return FormatItem::ptr();
}
- 创建格式化项(
createItem()
):根据解析得到的格式化字符,实例化对应的FormatItem
子类对象。
工厂方法调用:解析逻辑调用,即根据格式化字符串解析成格式化子项后,将这些格式化子项存储到
_items
中,从而完成最终日志打印格式的目的。
bool parsePattern(){
//std::string _pattern = "sg{}fsg%d{%H:%M:%S}%Tsdf%t%T[%p]%T[%c]%T%f:%l%T%m%n";
/*
每个要素分为三部分:
格式化字符:%d %T %p……
对应的输出子格式:{%H:%M:%S}
对应数据的类型:0-表示原始字符串,也就是非格式化字符,1-表示格式化数据类型
默认格式:"%d{%H:%M:%S}%T%t%T[%p]%T[%c]%T%f:%l%T%m%n"
*/
std::vector<std::tuple<std::string, std::string, int>> tokens; // 存储解析后的格式化项信息,每个元素包含上面三部分
std::string format_key; // 存放 % 后的格式化字符
std::string format_val; // 存放格式化字符后 {} 中的子格式字符串
std::string string_row; // 存放原始的非格式字符
bool sub_format_error = false;
size_t pos = 0;
while(pos < _pattern.size()){
if(_pattern[pos] != '%'){
string_row += _pattern[pos++];
continue;
}
// 处理连续的 %%
if(pos + 1 < _pattern.size() && _pattern[pos + 1] == '%'){
string_row += '%';
pos += 2;
continue;
}
// 将已有的普通文本加入 tokens
if(!string_row.empty()){
tokens.emplace_back(string_row, "", 0);
string_row.clear();
}
// 当前位置是%字符位置
++pos; // 跳过 % ,pos指向格式化字符位置
if(pos < _pattern.size() && isalpha(_pattern[pos])){
format_key = _pattern[pos]; // 保存格式化字符
}
else{
std::cout << &_pattern[pos-1] << "位置附近格式错误!" << std::endl;
return false;
}
++pos; // pos指向格式化字符的下一个位置,判断是否包含有子格式 %d{%y-%m-%d}
if(pos < _pattern.size() && _pattern[pos] == '{'){
sub_format_error = true;
++pos; // 跳过 '{'
while(pos < _pattern.size()){
if(_pattern[pos] == '}'){
sub_format_error = false;
pos += 1; // 跳过 '}', 让pos指向 } 的下一个字符处
break;
}
format_val += _pattern[pos++];
}
}
// 将格式化项加入 tokens
tokens.emplace_back(format_key, format_val, 1);
format_key.clear();
format_val.clear();
}
// 将剩余的普通文本加入 tokens
if(sub_format_error){
std::cout << "子规则{}匹配出错" << std::endl;
return false;
}
if(string_row.empty() == false) tokens.emplace_back(string_row, "", 0);
if(format_key.empty() == false) tokens.emplace_back(format_key, format_val, 1);
for(auto &token : tokens){
if(std::get<2>(token) == 0){
FormatItem::ptr fi(new OtherFormatItem(std::get<0>(token)));
_items.push_back(fi);
}
else{
FormatItem::ptr item = createItem(std::get<0>(token), std::get<1>(token));
if(item.get() == nullptr){
std::cout << "没有对应的格式化字符:%" << std::get<0>(token) << std::endl;
return false;
}
_items.push_back(item);
}
}
return true;
}
pattern解析过程:
初始化状态:
- 初始化一些临时变量,
format_key
用于存储格式化字符串(%d, %p),format_val
用于存储子格式字符串(如时间格式),string_row
用于存储非格式字符。- 使用向量
tokens
来存储解析后的结果,也就是存储解析出来格式化指令和其信息(下文中的key:value)遍历模式字符串:
- 遍历格式化字符串,逐字符处理
- 如果遇到普通字符(非
%
),则将其加入string_row
中- 如果遇到
%
,需要进一步判断是格式化字符还是转义的%
如果是%%
,则将其视作单个%
字符,追加存储在string_row
中处理格式化字符:
- 如果遇到格式化字符,先将前面的非格式化字符串(如果有)存入
tokens
。- 读取格式化字符,并检查是否有子格式字符串,检查后面是否存在
{}
。(例如时间格式中的{%y-%m-%d}
)- 将格式化字符和子格式字符串存入
tokens
。(存储格式如下表)处理结果:
如果最后还有非格式化字符串未处理,将其存储
tokens
。遍历
tokens
,创建相应的格式化项实例并存入_items
向量中。
key
val
nullptrt abcd[ d %H:%M:%S nullptr ][ p nullptr nullptr ] T nullptr m nullptr n nullptr
3. **格式化日志消息**(`format()`):遍历`_items`列表,依次调用每个格式化项的`format()`方法,将结果拼接。
输出模块
sink.hpp
文件定义了日志系统的输出目标模块,通过多态、工厂模式等设计模式,构建灵活的日志输出架构,主要包括:
LogSink
:日志输出的抽象基类,定义了纯虚函数log()
。- 具体的Sink子类:
StdoutSink
:将日志输出到标准输出(控制台)。FileSink
:将日志输出到指定文件。RollSink
:支持日志文件滚动,当文件达到最大大小时,创建新文件。
SinkFactory
:工厂类,用于创建不同类型的 Sink 对象。
架构设计:该日志输出模块采用了一种分层和模块化的架构设计,核心思想是通过抽象基类
LogSink
定义统一的日志输出接口,并通过工厂模式SinkFactory
动态创建具体的日志输出实现(如控制台输出、文件输出、滚动文件输出等)。这种设计使得模块具有高度的可扩展性和灵活性。抽象工厂模式:
SinkFactory
类实现了抽象工厂模式,通过模板函数create
动态创建不同类型的日志输出目标(如StdoutSink
,FileSink
,RollSink
)。用户无需直接实例化具体类,只需通过工厂方法即可获得所需的日志输出对象。
优点:提高了代码的可维护性,新增日志输出类型时只需扩展工厂方法,无需修改现有代码。
策略模式:
LogSink
类及其子类(StdoutSink
,FileSink
,RollSink
)构成了策略模式的核心。- 每个子类实现不同的日志输出策略(如控制台输出、文件输出、滚动文件输出),并通过统一的
log
接口对外暴露。- 优点:支持多种日志输出方式,用户可以根据需求选择合适的策略。
1.1 LogSink
类
class LogSink{
public:
using ptr = std::shared_ptr<LogSink>;
LogSink(){}
virtual ~LogSink(){}
// 纯虚函数,子类需要实现具体的日志输出逻辑
virtual void log(const char* data, size_t len) = 0;
};
- 功能:定义日志输出的接口,纯虚函数将日志数据输出到目标介质。将日志记录与日志输出解耦,
LogSink
定义了抽象接口log(const char* data, size_t len)
,强制所有子类实现具体的日志输出逻辑。
1.2 StdoutSink
类
负责将日志数据输出到标准输出(即控制台)。它的核心功能是通过 std::cout.write()
方法直接写入二进制数据,并调用 std::cout.flush()
确保日志立即输出,避免缓冲区滞留。
// 将日志输出到标准输出,即控制台。
class StdoutSink : public LogSink {
public:
using ptr = std::shared_ptr<StdoutSink>;
StdoutSink() = default;
void log(const char* data, size_t len) override {
std::cout.write(data, len);
std::cout.flush(); // 确保日志立即输出
}
};
1.3 FileSink
类
负责将日志数据写入指定文件。它的核心功能包括:
- 确保日志文件所在的目录存在。
- 以二进制追加模式打开文件流。
- 将日志数据写入文件并刷新缓冲区。
- 检查文件流状态并在写入失败时输出错误信息。
class FileSink : public LogSink{
public:
using ptr = std::shared_ptr<FileSink>;
FileSink(const std::string &filename): _filename(filename) {
util::file::create_directory(util::file::path(filename)); // 创建文件所在目录
_ofs.open(_filename, std::ios::binary | std::ios::app);
assert(_ofs.is_open());
}
// 获取日志文件名
const std::string &file() { return _filename; }
// 实现日志输出到文件
void log(const char* data, size_t len) override {
_ofs.write(data, len);
_ofs.flush(); // 确保日志写入文件*
if(_ofs.good() == false){
std::cout << "日志输出文件失败!\n";
}
}
private:
std::string _filename; // 日志文件名
std::ofstream _ofs; // 输出文件流
};
实现细节:
- 在构造函数中,调用
util::file::create_directory()
确保日志文件所在的目录存在(目录不存在也能正常运行)。- 打开文件流
_ofs
,以二进制追加模式(std::ios::binary | std::ios::app
)打开文件。std::ios::binary
:以二进制模式打开文件,避免文本模式下的换行符转换问题。std::ios::app
:以追加模式打开文件,确保新日志不会覆盖已有内容。- 在
log()
方法中,写入日志数据并刷新缓冲区。- 检查文件流状态,输出错误信息。
1.4 RollSink
类
负责实现日志文件滚动功能,当当前日志文件大小超过设定的最大值时,自动创建新的日志文件。其核心功能包括:
- 自动管理日志文件大小,当文件达到设定的最大值时,创建新的日志文件。
- 生成包含时间戳和计数器的唯一文件名,便于日志文件的管理和查找。
class RollSink : public LogSink{
public:
using ptr = std::shared_ptr<RollSink>;
// 构造函数,接受日志文件基础名和单个文件的最大大小
RollSink(const std::string &basename, size_t max_fsize)
:_basename(basename), _max_fsize(max_fsize), _cur_fsize(0){
util::file::create_directory(util::file::path(basename));
}
// 实现日志输出,支持文件滚动
void log(const char* data, size_t len) override {
initLogFile(); // 检查并初始化日志文件
_ofs.write(data, len);
_ofs.flush(); // 确保日志写入文件
if(_ofs.good() == false){
std::cout << "日志输出文件失败!\n";
}
_cur_fsize += len; // 更新当前文件大小
}
private:
// 初始化日志文件,如果文件未打开或已达到最大大小,创建新文件
void initLogFile(){
if(_ofs.is_open() == false || _cur_fsize >= _max_fsize){
_ofs.close();
std::string name = createFilename();
_ofs.open(name, std::ios::binary | std::ios::app);
assert(_ofs.is_open());
_cur_fsize = 0;
}
return;
}
// 创建新的日志文件名,包含时间戳
std::string createFilename(){ /*...*/ }
private:
size_t _name_count;
std::string _basename; // 日志文件基础名
std::ofstream _ofs; // 输出文件流
size_t _max_fsize; // 单个日志文件的最大大小
size_t _cur_fsize; // 当前日志文件的大小
};
实现细节:
- 初始化日志文件(
initLogFile()
):
- 检查当前文件是否打开,或者文件大小是否超过最大值。
- 如果需要,关闭当前文件,生成新的文件名并打开新的文件流。
- 生成日志文件名(
createFilename()
):
- 根据当前时间(YYYYMMDDHHMMSS)和计数器
_name_count
生成唯一的日志文件名。- 文件名格式示例:
basename20231021153045-0.log
。- 日志写入:
- 在
log()
方法中,调用initLogFile()
确保文件状态正常。- 写入日志数据,刷新缓冲区,更新当前文件大小。
- 自动管理日志文件大小:避免单个日志文件过大,影响系统性能或占用过多磁盘空间。
- 文件命名有序:包含时间戳和计数器,便于日志文件的管理和查找。
1.5 SinkFactory
类
负责创建不同类型的 LogSink
对象。它的核心功能包括:
- 提供统一接口,封装对象创建过程。
- 支持通过模板函数和可变参数模板动态创建不同类型的日志输出目标(如
StdoutSink
,FileSink
,RollSink
)。
// 提供一个通用工厂方法,创建不同类型的 LogSink 对象。
class SinkFactory {
public:
template<typename SinkType, typename ...Args>
static LogSink::ptr create(Args &&...args){
return std::make_shared<SinkType>(std::forward<Args>(args)...);
}
};
设计思路:
- 通用工厂方法:工厂模式是一种创建设计模式,用于封装对象的创建过程,提供统一的接口。
- 使用模板函数
create
创建指定类型的LogSink
对象。- 模板参数
SinkType
表示具体的日志输出目标类型(如StdoutSink
,FileSink
,RollSink
)。- 参数包
Args &&...args
支持任意数量和类型的构造函数参数。- 完美转发:用于将函数参数的类型和值类别(左值/右值)精确地传递给另一个函数。
- 使用
std::forward<Args>(args)...
实现完美转发,保留参数的左值/右值属性。- 这确保了传递给构造函数的参数类型不会发生意外转换(如将右值转换为左值)。
缓存模块
Buffer
类实现了一个动态大小的缓冲区,用于高效地管理数据的读写操作。它通过自动扩展缓冲区大小避免了频繁的内存分配,从而提高了性能。
1.数据结构设计
Buffer
类的核心数据结构是一个std::vector<char>
,它用于存储缓冲区中的数据。std::vector
提供了动态调整大小的能力,使得缓冲区可以根据实际需求动态扩展。
- 读写索引:
_reader_idx
和_writer_idx
用于标记数据的读写位置,分别表示当前可读和可写的数据的起始位置。 vector<char> _v
:实际存储数据的容器,使用char
类型存储字节数据。
2.主要方法分析
- 检查缓冲区是否为空。
- 获取可读/可写数据的大小。
- 支持数据的写入和弹出操作。
- 自动扩展缓冲区大小以确保足够的写入空间。
- push(const char* data, size_t len)
void push(const char* data, size_t len){
assert(len <= writeAbleSize());
ensureEnoughSpace(len);
std::copy(data, data + len, &_v[_writer_idx]);
_writer_idx += len;
}
push()
方法向缓冲区写入数据。首先,确保写入的数据不会超过可写空间;如果数据量大于当前可写空间,ensureEnoughSpace()
方法会自动扩展缓冲区。接着,通过std::copy
将数据拷贝到缓冲区的合适位置,并更新写索引。
- pop(size_t len)
void pop(size_t len){
_reader_idx += len;
assert(_reader_idx <= _writer_idx);
}
pop()
方法用于从缓冲区中弹出数据。它通过增加读索引来标记已经读取的数据,并确保读索引不会超过写索引
- ensureEnoughSpace(size_t len)
void ensureEnoughSpace(size_t len){
if(len <= writeAbleSize()) return;
size_t new_capacity;
if(_v.size() < BUFFER_THRESHOLD_SIZE)
new_capacity = _v.size() * 2 + len;
else
new_capacity = _v.size() + BUFFER_INCREMENT_SIZE + len;
_v.resize(new_capacity);
}
ensureEnoughSpace()
方法确保缓冲区有足够的空间来写入新的数据。如果当前的可写空间不足,它会根据缓冲区的大小增长策略扩展缓冲区。若缓冲区大小小于阈值BUFFER_THRESHOLD_SIZE
,则使用倍增策略,否则采用增量扩展策略。扩展后的缓冲区大小由_v.resize(new_capacity)
来调整。
#define BUFFER_DEFAULT_SIZE (1*1024*1024) // 默认大小 1MB
#define BUFFER_INCREMENT_SIZE (1*1024*1024) // 每次增大 1MB
#define BUFFER_THRESHOLD_SIZE (10*1024*1024) // 阈值大小 10MB
// 检查缓冲区是否为空
bool empty() const { return _reader_idx == _writer_idx; }
// 获取可读/可写数据的大小
size_t readAbleSize() const { return _writer_idx - _reader_idx; }
size_t writeAbleSize() const { return _v.size() - _writer_idx; }
// 重置缓冲区的读写索引
void reset() { _reader_idx = _writer_idx = 0; }
// 交换当前缓冲区与另一个缓冲区的内容
void swap(Buffer &buf){
_v.swap(buf._v);
std::swap(_reader_idx, buf._reader_idx);
std::swap(_writer_idx, buf._writer_idx);
}
// 向缓冲区写入数据
void push(const char* data, size_t len){
assert(len <= writeAbleSize()); // 确保写入长度不超过可写大小
ensureEnoughSpace(len); // 确保有足够的空间
std::copy(data, data+len, &_v[_writer_idx]); // 拷贝数据
_writer_idx += len; // 更新索引
}
// 获取当前可读数据的开始地址
const char* begin() const { return &_v[_writer_idx]; }
// 从缓冲区弹出数据
void pop(size_t len){
_reader_idx += len;
assert(_reader_idx <= _writer_idx);
}
protected:
// 确保缓冲区有足够的空间来写入数据
void ensureEnoughSpace(size_t len){
if(len <= writeAbleSize()) return;
// 每次增大 1MB 大小
size_t new_capacity;
if(_v.size() < BUFFER_THRESHOLD_SIZE)
new_capacity = _v.size() * 2 + len;
else
new_capacity = _v.size() + BUFFER_INCREMENT_SIZE + len;
_v.resize(new_capacity);
}
3.性能优化思考
- 内存增长策略:
ensureEnoughSpace
方法的内存扩展策略很重要。缓冲区容量的增长策略可以通过BUFFER_THRESHOLD_SIZE
控制,当缓冲区大小超过该阈值后,扩展的速度变得更慢。这个策略在大数据量的日志或消息处理中尤为有效,可以减少内存碎片化和频繁扩展带来的性能瓶颈。 vector
的使用:vector
是基于动态数组实现的,因此当容量不足时,扩展会涉及到重新分配和数据拷贝。尽管vector
提供了很高的性能,但对于极大数据量的缓冲区,可能会存在性能瓶颈。未来可以考虑使用其他动态数据结构,如环形缓冲区(circular buffer)来进一步优化内存使用。环形队列代码
异步处理模块
AsyncLooper
类实现了异步任务处理机制,允许在后台线程中异步处理。它通过条件变量和缓冲区管理任务队列,从而使得任务的推送和处理分离,提升系统性能。
1.类结构设计
AsyncLooper
采用了生产者-消费者模式。生产者将任务(消息)推送到缓冲区,消费者(即工作线程)从缓冲区中提取任务并进行处理。关键数据成员:
std::mutex _mutex
:用于保护任务队列的互斥锁,避免多个线程不会同时修改缓冲区,确保数据一致性。
Buffer _tasks_push
和Buffer _tasks_pop
:这两个缓冲区分别用于任务的推送和弹出。
_tasks_push
:用于存储生产者推送进来的任务。任务会先进入这个缓冲区。_tasks_pop
:用于存储准备处理的任务,消费者(即工作线程)从这里获取任务进行处理。每当任务推送时,
_tasks_push
中的数据会被复制到_tasks_pop
,并有后台工作线程处理。
std::atomic<bool> _running
:标志异步任务处理器是否正在运行。stop()
方法会设置该标志为false
,从而停止任务的处理。线程同步:
std::condition_variable _push_cond
:用于任务推送时的同步,确保当缓冲区空间不足时,生产者线程会等待直到有足够空间。std::condition_variable _pop_cond
:用于工作线程的同步,确保工作线程在有任务时才开始执行,任务队列为空时会等待。
2.主要方法分析
-
stop()
:停止异步处理器。该方法通过设置_running
标志为false
来终止任务的处理,并通知工作线程结束处理。使用_pop_cond.notify_all()
通知工作线程退出循环,最后通知_thread.join()
等待工作工作线程完成任务处理。// 停止异步处理器 void stop(){ _running = false; _pop_cond.notify_all(); // 通知所有等待的线程 _thread.join(); // 等待工作线程结束 }
-
push(const std::string &msg)
:向任务队列中推送消息。该方法首先通过条件变量push_cond
等待,直到缓冲区有足够的空间来接收新消息。然后将数据拷贝到tasks_push
缓冲区中。在数据写入之后,调用pop_cond.notify_all()
通知工作线程有新的任务待处理。// 向任务队列中推送消息 void push(const std::string &msg){ if(_running == false) return; // 如果不在运行,直接返回 { std::unique_lock<std::mutex> lock(_mutex); // 等待直到缓冲区有足够空间来写入新消息 _push_cond.wait(lock, [&]{ return _tasks_push.writeAbleSize() >= msg.size(); }); _tasks_push.push(msg.c_str(), msg.size()); // 将消息写入缓冲区 // _pop_cond.notify_one(); } _pop_cond.notify_all(); // 通知工作线程有新任务 }
-
worker_loop()
:工作线程的主循环。工作线程会等待任务队列中有任务可处理(通过条件变量pop_cond
)。工作线程从_tasks_push
缓冲区中获取任务并执行:- 等待直到任务队列中有任务可处理,或者
_running
标志为false
表示需要停止任务处理。 - 使用
swap
将_tasks_push
的任务内容交换到_tasks_pop
,避免缓冲区的竞争。 - 通知生产者缓冲区已经准备好,可以继续添加任务。
- 调用回调函数
_looper_callback
来处理弹出的任务。 - 处理完成后,重置
_tasks_pop
,为下一个任务做好准备。
// 工作线程循环 void worker_loop(){ while(true){ { std::unique_lock<std::mutex> lock(_mutex); // 如果运行标志为false且任务队列为空,退出循环 if(_running == false && _tasks_push.empty()) { break; } // 等待直到有任务可处理或已请求停止 _pop_cond.wait(lock, [&]{ return !_tasks_push.empty() || !_running; }); _tasks_push.swap(_tasks_pop); // 交换任务队列 } _push_cond.notify_all(); // 通知处理任务 _looper_callback(_tasks_pop); // 执行回调函数处理任务 _tasks_pop.reset(); // 重置已处理任务的缓冲区 } return; }
- 等待直到任务队列中有任务可处理,或者
3.性能优化思考
- 双缓冲策略:使用两个缓冲区(
_tasks_push
和_tasks_pop
)来实现任务的并行推送和处理。这种双缓冲策略减少了任务处理过程中对缓冲区的锁竞争,提高了处理效率。生产者和消费者分别操作不同的缓冲区,避免了线程间的直接冲突。(之后深入探讨) - 条件变量的使用:
std::condition_variable
的使用确保了生产者和消费者能够有效地等待和通知。通过在push()
和worker_loop()
中使用条件变量,系统能够根据缓冲区状态进行灵活的任务调度。 - 任务处理的异步化:通过将任务推送和处理分离,
AsyncLooper
可以异步处理任务。这样,任务的推送不会阻塞主线程,能够提高系统的吞吐量和响应速度。
*AsyncLooper异步处理器与Buffer
graph TD %% 缓存模块部分 A[数据写入请求] --> B[检查缓冲区可写空间] B -->|有足够空间| C[数据写入到_tasks_push缓冲区] B -->|空间不足| D[扩展缓冲区大小] D --> C C --> E[更新_writer_idx索引] E --> F[数据存储完毕] %% 异步处理模块部分 F --> G[任务推送到_tasks_push缓冲区] G --> H[等待条件:_tasks_push是否有足够空间] H -->|空间足够| I[将任务推送到任务队列_tasks_push] H -->|空间不足| J[等待通知:_tasks_push空间可用] I --> K[任务队列满,通知工作线程处理] %% 工作线程部分 K --> L[工作线程等待任务] L --> M[从_tasks_push缓冲区获取任务] M --> N[执行任务回调函数] N --> O[任务处理完成,重置_tasks_push缓冲区] O --> L %% 辅助关系 subgraph 缓冲区管理 B D C E end subgraph 异步任务处理 G H I J K L M N O end
架构分析
Buffer类的设计是配合异步日志器使用,本质是一个异步任务处理器。设计的目的是将任务推送到队列中,并将任务推送到队列中,并由后台线程异步进行处理。核心思想则是将时间密集型和IO密集型任务(更准确来说是为了避免写日志过程中阻塞)放到后台线程中执行,从而减少主线程的任务负担,从而提高性能。双缓冲区设计确保了日志的处理,避免了频繁的内存申请与释放,并且通过异步线程处理日志信息,优化了日志系统的响应速度和吞吐量。
总体思路
缓冲区设计:为了避免缓冲区频繁申请和释放内存空间,采用了环形队列。这种设计可以确保内存空间的循环利用,从而避免内存碎片化,提高效率。缓冲区预分配一定的空间,并在运行过程中动态扩展,确保有足够的空间存储待处理的任务。
线程安全:在多线程环境中,为了避免对缓冲区的并发访问产生数据竞争,缓冲区操作必须是线程安全的。为此,采用了读写加锁的方式来保证线程安全。
线程分配:由于日志记录的I/O密集性较高,而CPU消耗较少,因此不需要多个线程来进行日志记录。一个工作线程足以处理所有的日志记录任务。
锁冲突问题:生产者线程和消费者线程之间的互斥是不可避免的。在高并发场景中,任务的生产和消费会涉及到锁的竞争,为了减少锁冲突,采用了双缓冲区设计。
双重缓冲区深度分析
日志器模块
头文件Logger.hpp
定义了一个完整的日志系统,支持同步和异步日志输出,并且使用了多种设计模式(如建造者模式、单例模式等)来组织日志器的创建和管理。下面我将对代码结构和核心功能进行详细分析。
1.代码结构概述
- Logger 类:主要负责提供各种日志记录功能,支持多种日志级别(DEBUG、INFO、WARN、ERROR、FATAL)。Logger 支持同步和异步日志,适合不同性能需求的应用场景。
- Logger 类型:通过
Logger::Type
区分日志器是同步还是异步。 - 日志级别控制:Logger 通过
_level
控制日志输出,日志级别低于设置的不会输出,以减少不必要的开销。 - 日志接口:
debug()
、info()
、warn()
、error()
和fatal()
函数用于不同级别的日志记录。
- Logger 类型:通过
- SyncLogger 与 AsyncLogger 类:
- SyncLogger:使用互斥锁保证线程安全,日志立即输出。适用于对实时性要求高的应用场景。
- AsyncLogger:使用异步循环器(
AsyncLooper
),通过后台线程输出日志。适合对性能要求较高的场景,避免主线程阻塞。
- Logger Builder(构造器模式):
- LocalLoggerBuilder:构建本地日志器,不加入全局管理器。适用于需要独立管理的日志器,避免与全局日志器产生冲突。
- GlobalLoggerBuilder:构建全局日志器,加入全局日志器管理器,提供全局可访问的日志接口。
- LoggerManager 类:管理所有日志器,负责维护日志器的生命周期和配置。
- 通过
LoggerManager
可以获取、添加、检查日志器的存在。它通常是单例模式,确保全局只有一个管理器。
- 通过
好的,我将通过添加一些代码片段来对各个重要部分进行更深入的解析。这些代码片段将帮助您理解代码的具体实现逻辑、各个组件之间的协作方式,以及它们在不同情景中的使用。
2.具体实现
2.1 Logger 类:日志记录的实现
以下代码片段展示了日志记录函数 debug()
的详细实现,以及如何调用它来记录日志。
void debug(const std::string &file, size_t line, const std::string &fmt, ...) {
if (LogLevel::value::DEBUG < _level) {
return; // 如果当前日志级别比DEBUG低,直接返回,不记录日志
}
va_list ap; // 用于处理可变参数
va_start(ap, fmt);
char *res;
int ret = vasprintf(&res, fmt.c_str(), ap); // 格式化字符串,将可变参数与格式化字符串组合成日志内容
if (ret == -1) {
std::cout << "vasprintf failed" << std::endl;
return; // 格式化失败,输出错误信息并返回
}
va_end(ap);
serialize(LogLevel::value::DEBUG, file, line, res); // 将格式化的字符串序列化以便进一步处理
free(res); // 释放动态分配的内存
}
深度解析
-
LogLevel::value::DEBUG < _level
:- 每次记录日志之前,首先判断当前设置的日志级别
_level
是否允许输出该日志。这是通过比较日志器的当前级别和请求的日志级别来实现的。 - 例如,如果当前日志级别为
WARN
,低于WARN
的DEBUG
和INFO
日志就不会输出,以此控制日志的详细程度。
- 每次记录日志之前,首先判断当前设置的日志级别
-
va_list ap
和vasprintf()
:va_list ap
用于处理变长参数函数中的参数。vasprintf()
函数将格式化字符串与参数结合,生成最终的日志内容。vasprintf()
会动态分配内存用于存储最终生成的字符串,所以需要在使用完毕后手动free(res)
。
-
serialize()
:- 调用
serialize()
将日志内容进行封装,它会进一步格式化日志消息,并调用具体的日志输出函数。
- 调用
调用示例:
Logger::ptr logger = std::make_shared<SyncLogger>("MyLogger", std::make_shared<Formatter>(), std::vector<LogSink::ptr>(), LogLevel::value::DEBUG);
logger->debug(__FILE__, __LINE__, "This is a debug message: %s", "Details about debug");
在上面的代码中:
- 创建了一个名为
"MyLogger"
的同步日志器,设置格式器和日志级别为DEBUG
。 - 调用
debug()
方法记录日志,__FILE__
和__LINE__
宏用于记录当前文件和行号。
2.2 SyncLogger 和 AsyncLogger:同步与异步日志的区别
以下代码展示了同步日志器和异步日志器的 logIt()
方法实现差异。
2.2.1 SyncLogger 的 logIt()
实现:
virtual void logIt(const std::string &msg, size_t len) override {
std::unique_lock<std::mutex> lock(_mutex); // 加锁,保证线程安全
if (_sinks.empty()) {
return; // 如果没有任何输出目标,直接返回
}
for (const auto &it : _sinks) {
it->log(msg.c_str(), msg.size()); // 逐个调用每个 Sink 的 log 方法,将日志输出到目标
}
}
深度解析:
- 使用
std::unique_lock<std::mutex>
对_mutex
进行加锁,确保同一时间只有一个线程可以执行logIt()
。这避免了在多线程环境中同时写入同一日志目标的竞争问题。 - 遍历所有的
_sinks
(例如控制台输出、文件输出等),依次将日志内容写入输出目标。使用 Sink 提供了高度的灵活性,能够根据需求选择或组合多个输出方式。
2.2.2 AsyncLogger 的 logIt()
实现
virtual void logIt(const std::string &msg, size_t len) override {
_looper->push(msg); // 将日志消息推送到异步循环器中,由后台线程处理
}
深度解析:
- 与同步日志器不同,异步日志器直接将日志消息推送到异步循环器
_looper
,异步循环器会将消息添加到内部的队列中并由后台线程逐一处理。 - 这种设计能够显著提高日志的写入性能,尤其是在高并发场景中,可以避免主线程被阻塞。
2.3 AsyncLooper 的 backendLogIt()
void backendLogIt(Buffer &msg) {
if (_sinks.empty()) {
return; // 如果没有输出目标,直接返回
}
for (const auto &it : _sinks) {
it->log(msg.begin(), msg.readAbleSize()); // 从缓冲区读取日志消息,并输出到目标
}
}
backendLogIt()
是后台线程实际负责输出日志的函数,使用了Buffer
类来管理待输出的日志。- 这样实现的好处是将日志处理分为两部分,主线程负责将日志推送到队列中,后台线程负责实际的日志输出,从而实现非阻塞的日志处理。
2.3 LoggerManager:日志管理的实现
以下代码片段展示了 LoggerManager
类中日志器的获取与添加方法。
2.3.1 addLogger()
实现
void addLogger(const std::string &name, const Logger::ptr logger) {
std::unique_lock<std::mutex> lock(_mutex); // 加锁,保护共享资源
_loggers[name] = logger; // 将日志器以名称为键加入到日志器映射表中
}
深度解析:
- 通过
std::unique_lock
锁定_mutex
,以防止多线程环境中多个线程同时访问_loggers
,导致数据竞态条件。 _loggers
是一个std::unordered_map
,用于保存日志器实例。键是日志器名称,值是对应的Logger::ptr
。
2.3.2 getLogger()
实现
Logger::ptr getLogger(const std::string &name) {
std::unique_lock<std::mutex> lock(_mutex); // 加锁,保证线程安全
auto it = _loggers.find(name);
if (it == _loggers.end()) {
return Logger::ptr(); // 如果找不到对应名称的日志器,返回空指针
}
return it->second; // 返回找到的日志器
}
深度解析:
- 使用
std::unique_lock
锁定_mutex
以保护共享的_loggers
资源。 - 使用
std::unordered_map::find()
来查找对应名称的日志器,返回对应的日志器指针。 - 如果找不到对应的日志器,返回空指针,这样调用者可以检查日志器是否存在并进行相应处理。
2.4 Sink 的扩展与应用
以下代码片段展示了如何创建一个标准输出 (StdoutSink
) 的 Sink 以及其 log()
方法的实现。
2.4.1 StdoutSink
类
class StdoutSink : public LogSink {
public:
void log(const char* msg, size_t len) override {
std::cout.write(msg, len); // 将日志消息写入标准输出
std::cout << std::endl; // 添加换行
}
};
深度解析
StdoutSink
是LogSink
的一种实现,重写了log()
方法,将日志输出到标准输出。- 通过
std::cout.write(msg, len)
可以直接输出消息内容,这种方式比std::cout << msg
更适合处理原始字符数组。
2.4.2 如何使用 StdoutSink
Logger::Builder::ptr builder = std::make_shared<LocalLoggerBuilder>();
builder->buildLoggerName("stdout_logger");
builder->buildFormatter("%d{%Y-%m-%d %H:%M:%S} [%p] %m%n");
builder->buildSink<StdoutSink>(); // 添加标准输出的 Sink
Logger::ptr logger = builder->build();
logger->info(__FILE__, __LINE__, "This is an info message to standard output.");
- 使用
LocalLoggerBuilder
来构建一个名为"stdout_logger"
的日志器。 - 设置日志格式为
"%d{%Y-%m-%d %H:%M:%S} [%p] %m%n"
,即输出时间、日志级别、消息。 - 使用
builder->buildSink<StdoutSink>()
为日志器添加一个标准输出的目标,这样所有日志都会输出到控制台。
3.小结
- 日志级别控制:日志器每次记录日志前会先检查日志级别,确保只输出需要的日志。
- 同步 vs 异步日志器:同步日志器在多线程环境中使用锁来保护共享资源,而异步日志器通过异步循环器将日志推送到后台线程处理,从而减少主线程阻塞。
- 日志器管理:
LoggerManager
是单例管理器,用于统一管理所有日志器,可以通过名称获取或添加日志器实例。 - 输出目标 (
Sink
):可以将日志输出到不同的目标,如控制台、文件或网络,并且通过Sink
的抽象,可以非常灵活地扩展日志输出方式。
这些代码片段展示了日志系统的核心实现和应用方式,通过深入分析每个部分的具体逻辑和协作方式,希望您能更好地理解和使用这个日志系统。
深入分析:双缓冲区机制
一言以蔽之:这份代码实现了一个双缓冲区执行流程,生产者线程(业务线程)将日志任务写入到第一个缓冲区(
_tasks_push
),并通过条件变量通知消费者线程有新的任务待处理。消费者线程(异步线程)则从第二个缓冲区(_tasks_pop
)中读取任务,并执行日志输出操作。为了避免生产者和消费者之间锁冲突,任务在两个任务之间进行交换,确保每个线程只访问自己负责的缓冲区。当任生产者写入数据后,交换操作将写入缓冲区的数据移到任务处理缓冲区,消费者线程则开始处理这些任务,直到缓冲区内容被完全消费(处理),并通知生产者继续写入新任务。这样,通过双重缓冲区的设计,有效减少了线程间的竞争,提高了系统的性能和吞吐量。交换缓冲区的操作是
“强制的”
、“无缝的”
,也就是说,无论消费者线程是否已完成处理当前任务,只要tasks_push
缓冲区中有新任务,都会触发交换操作。这是因为交换缓冲区的设计是为了确保生产者和消费者能够独立操作不同的缓冲区,从而避免锁竞争。生产者将任务按顺序写入
tasks_push
缓冲区,消费者从tasks_pop
中读取并处理任务,缓冲区交换操作仅在数据完全写入后执行,从而保持了任务的顺序性和完整性。
在 AsyncLooper
类中,采用了**“双缓冲区”**机制来管理任务的推送和处理。具体来说,使用了两个 Buffer
类实例:
_tasks_push
:任务推送缓冲区,生产者(主线程或其他线程)将任务数据写入该缓冲区。_tasks_pop
:任务弹出缓冲区,消费者(工作线程)从该缓冲区中提取任务进行处理。
这种设计采用了生产者-消费者模式,并利用缓冲区的交换机制实现了任务的异步处理。具体来说,这种双缓冲区机制的设计具有以下几个关键优势:
1.缓解读写冲突
- 并发读写:双缓冲区机制的核心优势之一就是缓解了生产者和消费者之间的读写冲突。在传统的单缓冲区设计中,生产者和消费者往往需要争夺对缓冲区的访问权限,造成频繁的锁竞争,进而影响系统性能。而在
AsyncLooper
中,生产者和消费者各自操作不同的缓冲区,生产者将任务写入_tasks_push
,消费者从_tasks_pop
中读取任务并处理。这种设计避免了在同一缓冲区中同时进行读写操作,从而降低了锁竞争的次数。 - 避免等待和阻塞:生产者在写入任务时并不需要等待消费者完成处理,它直接将任务放入
tasks_push
缓冲区。消费者在处理完当前缓冲区的任务后,会将tasks_push
与tasks_pop
交换,使得生产者继续往新的tasks_push
中写入数据,而消费者可以开始处理交换后的tasks_pop
数据。这意味着即便消费者的处理速度较慢,生产者依然可以持续推送任务,而不会受到影响。
2.提高系统响应性
-
任务积压的隔离:使用双缓冲区的另一个重要优势是可以隔离任务处理的延迟。在传统的单缓冲区设计中,如果任务的处理速度跟不上任务的生成速度,生产者会被迫等待消费者处理完任务才能继续推送。而双缓冲区设计使得生产者可以在一个缓冲区中继续推送任务,同时消费者在另一个缓冲区中独立处理任务,从而避免了阻塞和延迟,提高了系统的响应性。
-
任务处理的异步性:在双缓冲区机制下,任务的推送和任务的处理是完全独立的。生产者向
tasks_push
写入数据时不需要等待任务被处理完成,而消费者则可以在后台独立地处理任务。这种解耦的设计显著提高了系统的响应性,尤其是在任务积压较大的情况下。
3.提高系统吞吐量
-
异步处理机制:双缓冲区设计允许任务在两个缓冲区之间异步交换。生产者将任务写入
_tasks_push
后,不会因为消费者的处理而被阻塞,消费者则可以从_tasks_pop
中异步处理任务。这种设计减少了系统内的同步等待,充分利用了计算资源,最终提高了系统的吞吐量。 -
优化任务处理效率:消费者在处理缓冲区任务时,不会因为任务推送而被阻塞或延迟,可以快速地完成任务处理并处理下一批任务。因为生产者和消费者通过缓冲区交换任务,而不是相互依赖,因此每个部分都能够高效执行。
4.提高容错性与系统稳定性
-
减少内存分配频率:每次生产者将任务推送到
tasks_push
缓冲区时,缓冲区只会在空间不足时进行扩容。在AsyncLooper
中,Buffer
类会根据任务的写入量动态调整缓冲区的大小,避免了频繁的内存分配操作,从而优化了内存使用。双缓冲区机制的设计还通过将任务数据隔离到不同的缓冲区,避免了频繁的内存访问冲突。每次任务交换后,生产者与消费者各自操作不同的缓冲区,这样就减少了锁竞争带来的性能损失。 -
任务交换的容错性:采用双缓冲区机制后,交换缓冲区的操作是一种无缝的任务切换过程,减少了生产者与消费者之间同步和通信的复杂度。这意味着即使在某些情况下(例如消费者异常退出),缓冲区的数据不会丢失,生产者仍然可以继续将任务推送到另一个缓冲区。这种设计提高了系统的容错性和稳定性,确保了任务的可靠性。
-
灵活的任务流控制:双缓冲区设计使得系统可以根据任务的数量和处理速度动态调整工作流程。例如,当缓冲区内的任务积压较多时,可以通过动态调整任务交换的频率或优先级来平衡生产者与消费者之间的任务流动,从而避免因任务堆积导致的性能下降。
生产者消费者模型与双缓冲去结合分析
结合的主要目的在于提升性能、提高响应速度、提高稳定性以及日志输出一致性。首先,两种机制结合,系统能够有效处理高并发下的大量日志请求,显著提升系统性能;其次,主线程基本不会因为日志写入阻塞,从而提高系统的整体响应速度;最后,缓冲区机制确保了日志写入的顺序性和稳定性,避免因并发竞争而导致日志丢失和重复写入的情况。
实现流程
- 日志生成: 主线程作为生产者,不断生成日志消息,并通过
Logger
模块将消息添加到Buffer
中- 缓冲区切换: 当
current_buffer_
满时,Buffer
模块切换到write_buffer_
,并通知Looper
模块开始异步写入日志- 异步写入:
Looper
模块从write_buffer
中提取日志消息,异步写入到目标位置- 清空缓冲区: 写入完成后,清空
write_buffer
,等待下一次切换
双缓冲区机制的具体实现细节
在 AsyncLooper
类中,双缓冲区的实现主要体现在以下几行代码:
push()
方法:生产者将任务写入_tasks_push
缓冲区,并通过条件变量控制生产者等待缓冲区有足够空间。
// 向任务队列中推送消息
void push(const std::string &msg){
if(_running == false) return; // 如果不在运行,直接返回
{
std::unique_lock<std::mutex> lock(_mutex);
// 等待直到缓冲区有足够空间来写入新消息
_push_cond.wait(lock, [&]{ return _tasks_push.writeAbleSize() >= msg.size(); });
_tasks_push.push(msg.c_str(), msg.size()); // 将消息写入缓冲区
}
_pop_cond.notify_all(); // 通知工作线程有新任务
}
worker_loop()
方法:消费者从_tasks_push
中交换任务到_tasks_pop
,然后执行任务处理,处理完成后重置缓冲区,准备下一次任务处理。
// 工作线程循环
void worker_loop(){
while(true){
{
std::unique_lock<std::mutex> lock(_mutex);
// 等待直到有任务可处理或已请求停止
if(_running == false && _tasks_push.empty()) { break; }
// 等待直到有任务或已请求停止
_pop_cond.wait(lock, [&]{ return !_tasks_push.empty() || !_running; });
// 交换任务队列
_tasks_push.swap(_tasks_pop);
}
_push_cond.notify_all(); // 通知处理任务
_looper_callback(_tasks_pop); // 执行回调函数处理任务
_tasks_pop.reset(); // 重置已处理任务的缓冲区
}
return;
}
难点与挑战
该项目的主要难点在于处理高并发、确保数据一致性以及实现系统拓展性等方面。但通过引用异步处理、生产者消费者模型、双缓冲区机制以及模块化设计,解决了上述难点。从而构建一个高效可靠且灵活的日志系统。
1.高并发问题
- 主线程阻塞:日志写入需要进行IO操作,如果不加以优化主线程可能会长时间等待日志写入完成,无法及时响应其他请求。
- 锁竞争:在多线程环境下,如果多个线程同时写入日志,容易发生锁竞争,导致系统性能下降,尤其是对于高并发请求的处理。
- IO瓶颈:大量日志写入请求可能导致频繁的IO操作,从而成为系统的性能瓶颈。
解决方案:
- 异步处理机制:通过将日志的记录与写入分离,主线程只需将日志消息推送到线程安全的队列中,然后立即返回。实际的日志写入有后台线程异步完成,从而避免主线程阻塞。
- 生产者-消费者模型:主线程作为生产者,持续生成日志消息并推送到队列,消费者线程从队列中提取日志并进行写入。通过这种方式,能够平衡日志生成和写入速度,避免日志队列溢出和消息丢失。
- 双缓冲区机制:通过引入双缓冲区机制,一个缓冲区用于接收新生成的日志消息,另一个缓冲区用于异步写入日志。当写入缓冲区满时,系统会切换到下一个缓冲区,这样可以避免主线程等待日志写入完成并减少锁竞争。
2.日志丢失与数据一致性问题
- 异常情况下的日志丢失:在系统崩溃或突然断电时,正在写入的日志可能会丢失,这将影响系统的日志完整性和后续问题排查。
- 数据一致性问题:在并发环境下,如果日志消息的处理顺序被打乱,可能导致日志数据不一致,影响调试和问题排查。
解决方案:
- 持久化机制:系统在异步处理日志的过程中,采用日志持久化机制。在后台线程从队列中提取日志消息时,会先将其写入临时文件或缓冲区。这样可以确保及时系统崩溃,日志数据也不会丢失。恢复时,系统能够从临时文件中读取未写入的日志消息进行恢复。
- 顺序写入:通过严格的队列管理和双缓冲区切换机制,确保日志消息按生成的顺序被写入。这样避免了由于并发引发的数据不一致问题。
- 日志冗余机制:为了防止单一日志写入失败导致的数据丢失,系统支持日志冗余写入,将日志同时输出到多个目标位置(例如同时写入文件和远程服务器)。即使一个目标失败,其他目标仍能保证日志数据的完整性。
3.系统的拓展性和灵活性
- 多样化需求:不同的应用可能对日志格式和输出方式有不同的需求,因此日志系统需要能够灵活的配置,以适应各种环境。
- 拓展难度:随着系统功能的增加,需要确保新的功能或特性不会对现有系统的稳定性和性能造成影响。
解决方案:
- 模块化设计:系统采用模块化设计,将日志记录、格式化、输出等功能分离为独立的模块。通过这种方式,用户可以根据需求灵活组合和替换这些模块,而不会影响系统的核心功能。
- 设计模式的应用:在系统设计中广泛应用了工厂模式、策略模式和代理模式。例如,工厂模式用于灵活创建不同的
Sink
实例,策略模式用于动态选择或更改日志的格式化方式。通过这些设计模式,系统能够根据去求灵活拓展。- 动态配置支持:系统支持通过配置文件或环境变量动态调整日志级别、输出目标和格式化方式。用户可以在运行时配置自己的日志系统,而无需重新编译代码。这样可以更好地满足不同应用场景的需求。
性能测试
测试环境
云服务器环境
2核2G 2M Linux华为云服务器
Ubuntu系统
本地电脑环境
- 处理器:AMD Ryzen 7 4800H with Radeon Graphics2.90 GHz
- 已安装的内存(RAM):16.0GB(15.4GB可用)
- 系统类型:64位操作系统,基于x64的处理器
百万压力测试
// 同步日志器性能测试
void sync_bench(){
std::unique_ptr<GlobalLoggerBuilder> builder(new GlobalLoggerBuilder());
builder->buildLoggerName("sync_logger");
builder->buildLoggerType(cl::Logger::Type::LOGGER_SYNC);
builder->buildFormatter("%m%n");
builder->buildSink<cl::FileSink>("../logs/sync.log");
builder->build();
bench("sync_logger", 5, 1000000, 100);
}
// 异步日志器性能测试
void async_bench(){
std::unique_ptr<GlobalLoggerBuilder> builder(new GlobalLoggerBuilder());
builder->buildLoggerName("Async_logger");
builder->buildLoggerType(cl::Logger::Type::LOGGER_ASYNC);
builder->buildFormatter("%m%n");
builder->buildSink<cl::FileSink>("../logs/async.log");
builder->build();
bench("Async_logger", 5, 1000000, 100);
}
测试结果
- **响应:**耗时
1.52513
秒,高负载情况下实现百万并发。- **吞吐量:**每秒处理
655681条
日志,处理数据量63M
,实现高并发情况下的数据处理性能。- **多线程并发:**5个线程并行处理日志,充分利用CPU提高运行性能。
同步日志器 [root] 创建成功,级别:DEBUG
同步日志器 [sync_logger] 创建成功,级别:DEBUG
测试日志: 1000000 条,总大小: 97656KB
线程1: 输出日志数量: 200000,耗时: 3.67529s
线程2: 输出日志数量: 200000,耗时: 4.64539s
线程3: 输出日志数量: 200000,耗时: 4.58836s
线程4: 输出日志数量: 200000,耗时: 4.64894s
线程5: 输出日志数量: 200000,耗时: 4.55612s
总耗时: 4.64894s
每秒输出日志数量: 215102条
每秒输出日志大小: 21006KB
异步日志器 [Async_logger] 创建成功,级别:DEBUG
测试日志: 1000000 条,总大小: 97656KB
线程1: 输出日志数量: 200000,耗时: 1.51637s
线程2: 输出日志数量: 200000,耗时: 1.52513s
线程3: 输出日志数量: 200000,耗时: 1.49736s
线程4: 输出日志数量: 200000,耗时: 1.46013s
线程5: 输出日志数量: 200000,耗时: 1.50217s
总耗时: 1.52513s
每秒输出日志数量: 655681条
每秒输出日志大小: 64031KB
网络延时测试
模拟日志系统在网络波动时候的行为,经测试可以实现在高延迟网络条件下继续进行稳定的传输数据
#include"../Logs/cl_logs.hpp"
#include <cstdlib>
#include <iostream>
using namespace cl;
void testNetworkDelay()
{
LOG_INFO("开始网络延迟测试");
// 删除现有的 netem 规则,防止冲突
std::string deleteCommand = "sudo tc qdisc del dev eth0 root";
int deleteResult = system(deleteCommand.c_str());
if (deleteResult == 0) {
LOG_INFO("成功删除现有的网络延迟规则");
} else {
LOG_WARN("未找到现有的网络延迟规则,可能未设置或已被删除");
}
// 添加新的网络延迟规则
std::string command = "sudo tc qdisc add dev eth0 root netem delay 100ms";
int addResult = system(command.c_str());
if (addResult == 0) {
LOG_INFO("成功引入100ms的网络延迟");
} else {
LOG_ERROR("无法引入网络延迟,命令执行失败");
}
// 打印当前的网络配置
std::string showCommand = "sudo tc qdisc show dev eth0";
LOG_INFO("当前网络配置如下:");
int showResult = system(showCommand.c_str());
if (showResult != 0) {
LOG_ERROR("无法显示当前的网络配置");
}
LOG_INFO("网络延迟测试结束");
}
int main(){
// 执行网络延迟测试
testNetworkDelay();
return 0;
}
资源限制测试
- CPU压力测试:通过引入CPU高负载,测试系统在高CPU占用情况下的性能。希望看到系统是否能够继续高效地处理日志记录任务
- 内存压力测试:通过模拟高内存占用,测试系统在内存资源有限时的表现。希望确认系统在内存压力下的稳定性和响应能力
#include"../Logs/cl_logs.hpp"
#include <cstdlib>
using namespace cl;
void testCpuStress() {
LOG_INFO("开始CPU压力测试");
// 模拟CPU高负载
std::string command = "stress-ng --cpu 2 --timeout 60s";
if (system(command.c_str()) != 0) {
LOG_ERROR("CPU压力测试失败");
}
LOG_INFO("CPU压力测试结束");
}
void testMemoryStress() {
LOG_INFO("开始内存压力测试");
// 模拟内存压力
std::string command = "stress-ng --vm 1 --vm-bytes 90% --timeout 60s";
if (system(command.c_str()) != 0) {
LOG_ERROR("内存压力测试失败");
}
LOG_INFO("内存压力测试结束");
}
int main() {
// 执行CPU压力测试
testCpuStress();
// 执行内存压力测试
testMemoryStress();
return 0;
}
混合写入测试
实时读取写入测试,模拟高并发场景下的并发读写日志,测试日志的性能和稳定性
#include"../Logs/cl_logs.hpp"
#include <fstream>
#include <thread>
#include <vector>
#include <mutex>
#include <chrono>
#include <string>
using namespace cl;
void writeLogs(int numMessages, int threadId) {
for (int i = 0; i < numMessages; ++i) {
LOG_INFO("线程 %d 写入的日志消息 %d 时间戳: %ld", threadId, i, std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()));
}
}
void readLogs(const std::string &logFile, int numReads){
for (int i = 0; i < numReads; ++i) {
std::ifstream ifs(logFile);
if (!ifs.is_open()) {
LOG_ERROR("无法打开日志文件进行读取: %s", logFile.c_str());
return;
}
std::string line;
while (std::getline(ifs, line)) {
// 模拟读取操作,可以在实际使用时处理或输出日志内容// LOG_INFO("读取的日志内容: %s", line.c_str()); // 如果需要打印读取内容,可以启用这行
}
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟读取间隔
}
}
int main(){
const std::string logFile = "test_logs.txt";
const int numMessages = 1000;
const int numReads = 100;
// 使用 GlobalLoggerBuilder 初始化日志系统
GlobalLoggerBuilder::ptr lbp(new GlobalLoggerBuilder);
lbp->buildLoggerName("test_logger");
lbp->buildLoggerLevel(LogLevel::value::INFO); // 确保设置正确的日志级别
lbp->buildFormatter("%m");
lbp->buildSink<FileSink>(logFile); // 设置日志输出文件为 test_logs.txt
lbp->buildLoggerType(Logger::Type::LOGGER_SYNC); // 使用同步日志器
lbp->build();
// 创建写入和读取线程
std::thread writer1(writeLogs, numMessages, 1);
std::thread writer2(writeLogs, numMessages, 2);
std::thread reader(readLogs, logFile, numReads);
// 等待所有线程完成
writer1.join();
writer2.join();
reader.join();
LOG_INFO("日志写入与读取测试完成。");
return 0;
}