当前位置: 首页 > article >正文

日志系统扩展一:日志落地数据库:MySQL、SQLite3

日志系统扩展一:日志落地数据库:MySQL、SQLite3

  • 一、设计
    • 1.怎么落地
    • 2.落地的具体设计
    • 3.表的设计
      • 1.MySQL
      • 2.SQLite3
  • 二、数据库访问Helper的实现
    • 1.需要事务,但是无需回滚,如何理解?
      • 1.需要事务
      • 2.无需回滚
    • 2.SqliteHelper
      • 1.SQLite3常用接口介绍
      • 2.实现
    • 3.MySQLHelper
      • 1.MySQL常用接口介绍
      • 2.实现
    • 4.FileHelper补充
  • 三、数据库系列LogSink实现
    • 1.DBLogSink实现
    • 2.SqliteSink实现
    • 3.MySQLSink实现
    • 4.LogSinkFactory完善
  • 四、同步日志器的修改
    • 1.Logger抽象类的修改
    • 2.SyncLogger的修改
    • 3.LoggerBuilder的修改
    • 4.LocalLoggerBuilder和GlobalLoggerBuilder的修改
  • 五、数据库日志缓冲区的实现
    • 1.底层容器的选择
    • 2.原初的vector?
    • 3.具体代码
  • 六、AsyncLooper的完善
  • 七、异步日志器的完善
  • 八、测试
    • 1.前置工作
    • 2.SQLite3测试
      • 1.单线程
        • 1.同步
        • 2.异步安全
        • 3.异步不安全
    • 2.多线程
      • 1.同步
      • 2.异步安全
      • 3.异步不安全
    • 3.MySQL测试
      • 1.单线程
        • 1.同步
        • 2.异步安全
        • 3.异步不安全
      • 2.多线程
        • 1.同步
        • 2.异步安全
        • 3.异步不安全

一、设计

1.怎么落地

将日志落地到数据库,首先肯定是要建表的,而日志落地其实就是向对应表当中插入数据

那么怎么落地呢?
文件落地方式:virtual void log_fs(const char *data, size_t len) = 0;
因为文件是面向字节流的,所以这么落地日志是完全OK的

而数据库落地是向表当中插入数据,因此他的落地应该是这样的:

virtual void log_db(const LogMessage &message) = 0;

将一条LogMessage插入表当中

因此我们的LogSink日志落地基类无法满足数据库的落地需求,要不然就给他增加一个log_db函数
让子类选择性的对其中一个进行重写,另一个进行空实现,这是一种方式

不过不便于扩展,为了遵循高内聚,低耦合的程序设计原则,我们将LogSink分为两个日志基类:
FSLogSink和DBLogSink

新增加的MySQLSink和SqliteSink都继承于DBLogSink

2.落地的具体设计

我们的日志落地方式是允许用户灵活的进行格式控制的,这是为了让用户能够选择性的只记录自己想要的数据

而将日志落地到数据库,是非字符串形式的,因此格式化的意义不大

所以我们的日志落地到数据库当中是无需进行格式化的,这也印证了log_db的参数为何只需要一个LogMessage即可

virtual void log_db(const LogMessage &message) = 0;

所以我们再将日志落地到数据库时,是直接将所有字段全部都进行记录的

3.表的设计

1.MySQL

在这里插入图片描述

2.SQLite3

同样的,SQLite3当中LigData的创建也是如此,只不过具体细节要求不同:
在这里插入图片描述

二、数据库访问Helper的实现

1.需要事务,但是无需回滚,如何理解?

1.需要事务

在这里插入图片描述

2.无需回滚

在这里插入图片描述
不过我们贴心的给了大家savepoint和rollback的使用,大家可以使用

2.SqliteHelper

1.SQLite3常用接口介绍

SQLite3 官方文档
在这里插入图片描述
在这里插入图片描述

2.实现

class SqliteHelper
{
public:
    using SqliteCallback = int (*)(void *, int, char **, char **);
    SqliteHelper(const std::string &dbfile)
        : _dbfile(dbfile), _handler(nullptr) {}

    ~SqliteHelper()
    {
        close();
    }

    bool open()
    {
        if (sqlite3_open_v2(_dbfile.c_str(), &_handler, SQLITE_OPEN_CREATE | SQLITE_OPEN_READWRITE, nullptr) != SQLITE_OK)
        {
            std::cout << "打开数据库失败 " << errmsg() << " _dbfile:" << _dbfile << "\n";
            return false;
        }
        return true;
    }

    void close()
    {
        if (_handler != nullptr)
        {
            sqlite3_close_v2(_handler);
            _handler = nullptr;
        }
    }

    bool begin()
    {
        if (sqlite3_exec(_handler, "begin;", nullptr, nullptr, nullptr) != SQLITE_OK)
        {
            std::cout << "SQLite3开启事务失败," << errmsg() << "\n";
            return false;
        }
        return true;
    }

    bool exec(const std::string &sql, SqliteCallback cb, void *arg)
    {
        if (sqlite3_exec(_handler, sql.c_str(), cb, arg, nullptr) != SQLITE_OK)
        {
            std::cout << "执行sql语句:" << sql << " 失败," << errmsg() << std::endl;
            return false;
        }
        return true;
    }

    bool commit()
    {
        if (sqlite3_exec(_handler, "commit;", nullptr, nullptr, nullptr) != SQLITE_OK)
        {
            std::cout << "SQLite3提交事务失败," << errmsg() << "\n";
            return false;
        }
        return true;
    }

	bool savePoint(const std::string &point)
    {
        std::string savepoint_sql = "savepoint " + point + ";";
        if (sqlite3_exec(_handler, savepoint_sql.c_str(), nullptr, nullptr, nullptr) != SQLITE_OK)
        {
            std::cout << "SQLite3 设置事务保存点失败:" << errmsg() << "\n";
            return false;
        }
        return true;
    }

    bool rollback(const std::string &point = "")
    {
        std::string rollback_sql = "rollback";
        if (!point.empty())
        {
            rollback_sql += " to " + point + ";";
        }
        if (sqlite3_exec(_handler, rollback_sql.c_str(), nullptr, nullptr, nullptr) != SQLITE_OK)
        {
            std::cout << "SQLite3 事务回滚失败:" << errmsg() << "\n";
            return false;
        }
        return true;
    }

    std::string errmsg()
    {
        if (_handler != nullptr)
            return sqlite3_errmsg(_handler);
        else
            return "sqlite3句柄为空";
    }

private:
    std::string _dbfile;
    sqlite3 *_handler;
};

3.MySQLHelper

1.MySQL常用接口介绍

在这里插入图片描述

2.实现

我们要求用户将MySQL连接所需字段放到配置文件当中,将配置文件传递过来
我们内部进行解析

格式要求:

一个字段占一行
以key:value的形式进行传递

host:你MySQL服务器所在机器的IP地址
user:你的用户名
passwd:你的密码
db:你提前创建好并赋予权限了的数据库
port:你MySQL服务器的端口号
class MySQLHelper
{
public:
    MySQLHelper(const std::string &conf_file) : _handler(mysql_init(nullptr)), _conf_file(conf_file){}

    bool begin()
    {
        if (mysql_query(_handler, "begin;") != 0)
        {
            std::cout << "mysql开启事务失败," << errmsg() << "\n";
            return false;
        }
        return true;
    }

    bool commit()
    {
        if (mysql_query(_handler, "commit;") != 0)
        {
            std::cout << "mysql提交事务失败," << errmsg() << "\n";
            return false;
        }
        return true;
    }
	
	bool savePoint(const std::string &point)
    {
        std::string savepoint_sql = "savepoint " + point + ";";
        if (mysql_query(_handler, savepoint_sql.c_str()) != 0)
        {
            std::cout << "MySQL 设置事务保存点失败:" << errmsg() << "\n";
            return false;
        }
        return true;
    }

    bool rollback(const std::string &point = "")
    {
        std::string rollback_sql = "rollback";
        if (!point.empty())
        {
            rollback_sql += " to " + point + ";";
        }
        if (mysql_query(_handler, rollback_sql.c_str()) != 0)
        {
            std::cout << "MySQL 事务回滚失败:" << errmsg() << "\n";
            return false;
        }
        return true;
    }

    bool open()
    {
        // 1. 读取配置文件,拿到host、user、passwd、db、port
        if (!load())
        {
            mysql_close(_handler);
            _handler = nullptr;
            return false;
        }
        // 2. 连接MySQL数据库
        _handler = mysql_real_connect(_handler, _conf_map["host"].c_str(), _conf_map["user"].c_str(),
                                      _conf_map["passwd"].c_str(), _conf_map["db"].c_str(), std::stoi(_conf_map["port"]), nullptr, 0);

        if (_handler == nullptr)
        {
            std::cout << "MySQL连接失败, 原因: " << mysql_error(_handler) << "\n";
            mysql_close(_handler);
            _handler = nullptr;
            return false;
        }
        return true;
    }

    ~MySQLHelper()
    {
        close();
    }

    void close()
    {
        if (_handler != nullptr)
        {
            mysql_close(_handler);
            _handler = nullptr;
        }
    }

    bool exec(const std::string &sql)
    {
        if (0 != mysql_query(_handler, sql.c_str()))
        {
            std::cout << "sql语句执行失败: " << sql << " ,原因: " << errmsg() << std::endl;
            return false;
        }
        return true;
    }

    std::string errmsg()
    {
        if (_handler != nullptr)
            return mysql_error(_handler);
        else
            return "mysql句柄为空";
    }

private:
    bool load()
    {
        std::ifstream ifs(_conf_file);
        std::string line;
        while (std::getline(ifs, line))
        {
            size_t pos = line.find(':');
            if (pos == std::string::npos)
            {
                std::cout << "MySQL配置文件解析失败,某一行不符合规范: " << line << "\n";
                return false;
            }
            _conf_map[line.substr(0, pos)] = line.substr(pos + 1);
        }
        return true;
    }

    MYSQL *_handler;
    std::string _conf_file;
    std::unordered_map<std::string, std::string> _conf_map;
};

4.FileHelper补充

FileHelper里面又加了一个createFile接口,用来创建文件

static bool createFile(const std::string &filename)
{
    // 1. 先看该文件是否存在
    if (exists(filename))
        return true;
    // 2. 写方式打开(即创建)
    // 这里以追加写打开,防止多线程重入该函数导致意想不到的bug,提高代码健壮性
    std::ofstream ofs(filename, std::ios::app);
    if (!ofs.is_open())
    {
        std::cout << "创建文件失败,filename: " << filename << "\n";
        return false;
    }
    ofs.close();
    return true;
}

三、数据库系列LogSink实现

1.DBLogSink实现

class DBLogSink
{
public:
    using ptr = std::shared_ptr<DBLogSink>;
    virtual ~DBLogSink() {}
    virtual bool log(const LogMessage &message) = 0;

    // 开启事务 ->  多次insert -> 关闭事务
    virtual void log(const std::vector<LogMessage> &message_vec, size_t sz) = 0;
};

2.SqliteSink实现

不要忘了先创建数据库目录和文件,再用_helper打开该文件

class SqliteSink : public DBLogSink
{
public:
    SqliteSink(const std::string &filename)
        : _helper(filename)
    {
        // 1. 创建文件所在目录
        if (!FileHelper::createDir(FileHelper::getPath(filename)))
        {
            std::cout << "SQLite3数据库文件所在目录创建失败\n";
            abort();
        }
        // 2. 创建文件
        if (!FileHelper::createFile(filename))
        {
            std::cout << "SQLite3数据库文件创建失败\n";
            abort();
        }
        // 3. 打开数据库文件
        if (!_helper.open())
        {
            std::cout << "SQLite3数据库文件打开失败\n";
            abort();
        }
        // 4. 建表
        if (!createTable())
        {
            std::cout << "SQLite3数据库的建表失败\n";
            abort();
        }
    }

    virtual void log(const LogMessage &message)
    {
        std::ostringstream insert_sql;
        insert_sql << "insert into LogData(level,file,line,logger_name,thread_id,body) values(";
        insert_sql << "'" << LogLevel::LogLevel_Name(message._level) << "',";
        insert_sql << "'" << message._file << "',";
        insert_sql << message._line << ",";
        insert_sql << "'" << message._logger_name << "',";
        insert_sql << message._thread_id << ",";
        insert_sql << "'" << message._body << "');";
        if (!_helper.exec(insert_sql.str(), nullptr, nullptr))
        {
            std::cout << "插入数据失败,表名:LogData\n";
        }
    }

    // 开启事务 ->  多次insert -> 关闭事务
    virtual void log(const std::vector<LogMessage> &message_vec, size_t sz)
    {
        if (!_helper.begin())
        {
            std::cout << "SQLlite3:开启事务失败\n";
            return;
        }
        for (int i = 0; i < sz; i++)
        {
            log(message_vec[i]);
        }
        if (!_helper.commit())
        {
            std::cout << "SQLlite3:提交事务失败\n";
            return;
        }
    }

private:
    bool createTable()
    {
        // 指明autoincrement必须要用integer,不能用int
        // SQLite3的current_timestamp是格林威治时间,需要加上8小时改为北京时间
        static std::string create_sql = R"(create table if not exists LogData(
                                            id integer primary key autoincrement,
                                            date text default(datetime(current_timestamp,'+8 hours')),
                                            level varchar(10),
                                            file varchar(32),
                                            line int,
                                            logger_name varchar(32) not null,
                                            thread_id int,
                                            body text
                                        );)";
        if (!_helper.exec(create_sql, nullptr, nullptr))
        {
            std::cout << "创建SQLite3数据库表失败,表名:LogData\n";
            return false;
        }
        return true;
    }

    SqliteHelper _helper;
};

3.MySQLSink实现

注意:这里是要解析配置文件

class MySQLSink : public DBLogSink
{
public:
    MySQLSink(const std::string &conf_file)
        : _helper(conf_file)
    {
        // 打开数据库文件
        if (!_helper.open())
        {
            std::cout << "MySQL数据库文件打开失败\n";
            abort();
        }
        if (!createTable())
        {
            std::cout << "MySQL数据库的建表失败\n";
            abort();
        }
    }

    virtual void log(const LogMessage &message)
    {
        std::ostringstream insert_sql;
        insert_sql << "insert into LogData(level,file,line,logger_name,thread_id,body) values(";
        insert_sql << "'" << LogLevel::LogLevel_Name(message._level) << "',";
        insert_sql << "'" << message._file << "',";
        insert_sql << message._line << ",";
        insert_sql << "'" << message._logger_name << "',";
        insert_sql << message._thread_id << ",";
        insert_sql << "'" << message._body << "');";
        if (!_helper.exec(insert_sql.str()))
        {
            std::cout << "插入数据失败,表名:LogData\n";
        }
    }

    // 开启事务 ->  多次insert -> 关闭事务
    virtual void log(const std::vector<LogMessage> &message_vec, size_t sz)
    {
        if (!_helper.begin())
        {
            std::cout << "MySQL:开启事务失败\n";
            return;
        }
        for (int i = 0; i < sz; i++)
        {
            log(message_vec[i]);
        }
        if (!_helper.commit())
        {
            std::cout << "MySQL:提交事务失败\n";
            return;
        }
    }

private:
    bool createTable()
    {
        std::string create_sql = R"(create table if not exists LogData(
                                            id int primary key auto_increment,
                                            date datetime default current_timestamp,
                                            level varchar(10),
                                            file varchar(32),
                                            line int,
                                            logger_name varchar(32) not null,
                                            thread_id bigint,
                                            body text
                                        );)";
        if (!_helper.exec(create_sql))
        {
            std::cout << "创建MySQL数据库表失败,表名:LogData\n";
            return false;
        }
        return true;
    }

    MySQLHelper _helper;
};

4.LogSinkFactory完善

增加一个create_db即可

class LogSinkFactory
{
public:
    // 函数模板
    template <class SinkType, class... Args>
    static FSLogSink::ptr create_fs(Args &&...args)
    {
        return std::make_shared<SinkType>(std::forward<Args>(args)...);
    }

    template <class SinkType, class... Args>
    static DBLogSink::ptr create_db(Args &&...args)
    {
        return std::make_shared<SinkType>(std::forward<Args>(args)...);
    }
};

四、同步日志器的修改

1.Logger抽象类的修改

Logger需要加一个成员:std::vector<DBLogSink::ptr> _db_sinks;

同时增加一个用来进行数据库日志落地的接口:

virtual void log_db(const LogMessage &message) = 0;

并且在construct当中继续复用该LogMessage:

void construct(LogLevel::value level, const std::string &file, size_t line, char *body)
{
    // 1. 构造LogMessage
    LogMessage message(level, file, line, _logger_name, body);
    // 2. free掉body,否则会内存泄漏
    free(body);
    body = nullptr;
    // 3. 将LogMessage进行格式化
    std::string real_message = _formatter->format(message);
    // 4. 复用log进行实际的日志落地
    log_fs(real_message.c_str(), real_message.size());
    log_db(message);
}

2.SyncLogger的修改

同步日志器需要重写父类的log_db函数

virtual void log_db(const LogMessage &message)
{
     // 这里必须要加锁,因为存在多线程同时调用同一个日志器对象的log函数的情况
    std::unique_lock<std::mutex> ulock(_mutex);
    for (auto &sp : _db_sinks)
    {
        sp->log(message);
    }
}

直接加锁并且复用即可

3.LoggerBuilder的修改

LoggerBuilder需要加一个 能够往_db_sinks当中添加成员的函数
并且在reset的时候还要清理那里

template <class SinkType, class... Args>
void buildDBLogSink(Args &&...args)
{
    _db_sinks.push_back(LogSinkFactory::create_db<SinkType>(std::forward<Args>(args)...));
}
// 允许一个建造者在调用build接口构造完对象之后将建造者当中保存的原数据进行重置
void reset()
{
    _logger_name.clear();
    _logger_type = LoggerType::SYNC;
    _limit_level = LogLevel::value::DEBUG;
    _fs_sinks.clear();
    _db_sinks.clear();
    _formatter.reset();
}

4.LocalLoggerBuilder和GlobalLoggerBuilder的修改

  1. 修改一下同步和异步日志器的构造函数,将_db_sinks进行传入
  2. 只有当对应日志器既不向文件当中落地,也不向数据库当中落地时,才会默认加上标准输出方向的落地
virtual Logger::ptr build()
{
    if (_logger_name.empty())
    {
        std::cout << "日志器名称为空!!\n";
        abort();
    }
    if (_formatter.get() == nullptr)
    {
        _formatter = std::make_shared<Formatter>();
    }
    if (_fs_sinks.empty() && _db_sinks.empty())
    {
        _fs_sinks.push_back(LogSinkFactory::create_fs<StdoutSink>());
    }
    Logger::ptr ret;
    if (_logger_type == LoggerType::SYNC)
    {
        ret = std::make_shared<SyncLogger>(_logger_name, _limit_level, _formatter, _fs_sinks, _db_sinks);
    }
    else
    {
        // 后面实现完异步日志器之后完善
        ret = std::make_shared<AsyncLogger>(_logger_name, _limit_level, _formatter, _fs_sinks, _db_sinks, _async_type);
    }
    // 重置
    this->reset();
    return ret;
}

五、数据库日志缓冲区的实现

1.底层容器的选择

文件日志缓冲区当中存的是vector<char>,这是可行的,因为文件是面向字节流的,日志输出时以字节为单位没毛病

而对于数据库而言,日志的落地是需要一条一条进行落地的,因此vector行不通,除非进行LogMessage的序列化和反序列化

如果序列化+反序列化,那不还是得一条一条插入吗,直接传结构体他不香吗?
折腾来折腾去,毫无意义的好吗

因此我们搞成:vector<LogMessage>

2.原初的vector?

class DBBuffer0
{
public:
    void push(const LogMessage &message)
    {
        _buffer.push_back(message);
    }

    bool empty()
    {
        return _buffer.empty();
    }

    void swap(DBBuffer2 &buffer)
    {
        _buffer.swap(buffer._buffer);
    }

    void clear()
    {
        _buffer.clear();
    }

    const std::vector<LogMessage> &getBuffer()
    {
        return _buffer;
    }

private:
    std::vector<LogMessage> _buffer;
};

没啥毛病,清晰易懂,可是我们知道:vector一般的size变化是:
插入1024*1024条数据,这种扩容方式在面对海量数据时,扩容代价还是太大
在这里插入图片描述
因此,我们可以用一下reserve
单论扩容而言,reserve是比resize快的,因为resize还会初始化元素,
而LogMessage的默认构造没用,因此我们不用resize,而是选择reserve

3.具体代码

class DBBuffer
{
public:
    DBBuffer(size_t default_size = default_buff_size) 
    {
        _buffer.reserve(default_buff_size);
    }
    void push(const LogMessage &message)
    {
        if (_buffer.size() == _buffer.capacity())
        {
            _buffer.reserve(2 * _buffer.capacity());
        }
        _buffer.push_back(message);
    }

    bool empty()
    {
        return _buffer.empty();
    }
    
	bool full()
    {
        return _buffer.size() == _buffer.capacity();
    }

    void swap(DBBuffer &buffer)
    {
        _buffer.swap(buffer._buffer);
    }

    void clear()
    {
        _buffer.clear();
    }

    size_t readableSize()
    {
        return _buffer.size();
    }
    
    const std::vector<LogMessage> &getBuffer()
    {
        return _buffer;
    }

private:
    std::vector<LogMessage> _buffer;
};

这个bool full()是给异步安全模式使用的

六、AsyncLooper的完善

跟文件日志缓冲区类似,就是多加一份资源的事
没什么特别的,跟文件日志缓冲区那里一样的

using AsyncFSCallback = std::function<void(FSBuffer &)>;
using AsyncDBCallback = std::function<void(DBBuffer &)>;

class AsyncLooper
{
    struct Resource
    {
        std::mutex _mutex;
        std::condition_variable _cond_produce;
        std::condition_variable _cond_consume;
    };
public:
    using ptr = std::shared_ptr<AsyncLooper>;
    AsyncLooper(AsyncFSCallback fs_callback,AsyncDBCallback db_callback, AsyncType async_type = AsyncType::ASYNC_SAFE)
        : _fs_callback(fs_callback),_db_callback(db_callback), _async_type(async_type), _isrunning(true),
        _fs_worker(&AsyncLooper::fs_thread_routine, this),_db_worker(&AsyncLooper::db_thread_routine,this) {}

    ~AsyncLooper()
    {
        if (_isrunning)
        {
            _isrunning = false;
            if (_fs_worker.joinable())
            {
                _fs_resource._cond_produce.notify_all();
                _fs_resource._cond_consume.notify_all();
                _fs_worker.join();
            }
            if(_db_worker.joinable())
            {
                _db_resource._cond_produce.notify_all();
                _db_resource._cond_consume.notify_all();
                _db_worker.join();
            }
        }
    }

    void push(const char *data, size_t len)
    {
        {
            std::unique_lock<std::mutex> ulock(_fs_resource._mutex);
            if (_async_type == AsyncType::ASYNC_SAFE && _fs_buffer_produce.writeableSize() < len)
            {
                _fs_resource._cond_produce.wait(ulock, [this, len]() -> bool
                                   { return !_isrunning || _fs_buffer_produce.writeableSize() >= len; });
            }
            _fs_buffer_produce.push(data, len);
        }
        // 唤醒消费者
        _fs_resource._cond_consume.notify_all();
    }

    void push(const LogMessage& message)
    {
        {
            std::unique_lock<std::mutex> ulock(_db_resource._mutex);
            if (_async_type == AsyncType::ASYNC_SAFE && _db_buffer_produce.full())
            {
                _db_resource._cond_produce.wait(ulock, [this]() -> bool
                                   { return !_isrunning || !_db_buffer_produce.full(); });
            }
            _db_buffer_produce.push(message);
        }
        _db_resource._cond_consume.notify_all();
    }

private:
    void fs_thread_routine()
    {
        while (true)
        {
            {
                // 当停止运行,且生产缓冲区没有数据了,才能退出
                if (!_isrunning && _fs_buffer_produce.empty())
                    break;
                std::unique_lock<std::mutex> ulock(_fs_resource._mutex);
                _fs_resource._cond_consume.wait(ulock, [this]() -> bool
                                   { return !_isrunning || !_fs_buffer_produce.empty(); });
                // 交换生产和消费缓冲区
                _fs_buffer_consume.swap(_fs_buffer_produce);
            }
            if (_async_type == AsyncType::ASYNC_SAFE)
            {
                // 唤醒生产者
                _fs_resource._cond_produce.notify_all();
            }
            // 调用日志落地回调函数
            _fs_callback(_fs_buffer_consume);
            // 把_buffer_consume重置
            _fs_buffer_consume.reset();
        }
    }

    void db_thread_routine()
    {
        while (true)
        {
            {
                std::unique_lock<std::mutex> ulock(_db_resource._mutex);
                // 当停止运行,且生产缓冲区没有数据了,才能退出
                if (!_isrunning && _db_buffer_produce.empty())
                    break;
                _db_resource._cond_consume.wait(ulock, [this]() -> bool
                                   { return !_isrunning || !_db_buffer_produce.empty(); });
                // 醒了之后即使发现当停止运行,且生产缓冲区没有数据了,也不能退, 因为停止时业务线程有可能还需要放数据
                // 因为二者想要醒来需要竞争锁, 因此不敢说当前生产缓冲区无数据我就退出,不行,需要等到下次循环时的判断

                // 交换生产和消费缓冲区
                _db_buffer_consume.swap(_db_buffer_produce);
            }
            if (_async_type == AsyncType::ASYNC_SAFE)
            {
                // 唤醒生产者
                _db_resource._cond_produce.notify_all();
            }
            // 调用日志落地回调函数
            _db_callback(_db_buffer_consume);
            // 把_buffer_consume重置
            _db_buffer_consume.clear();
        }
    }

    std::atomic<bool> _isrunning;
    Resource _fs_resource;
    Resource _db_resource;

    AsyncFSCallback _fs_callback;
    AsyncDBCallback _db_callback;
    AsyncType _async_type;

    FSBuffer _fs_buffer_produce;
    FSBuffer _fs_buffer_consume;

    DBBuffer _db_buffer_produce;
    DBBuffer _db_buffer_consume;

    std::thread _fs_worker;
    std::thread _db_worker;
};

七、异步日志器的完善

重写父类的log_db和realDBLog即可

virtual void log_db(const LogMessage &message)
{
    _looper->push(message);
}

void realDBLog(DBBuffer &buffer)
{
    // 无需加锁, 因为_looper内部本来就加锁了
    size_t readableSize = buffer.readableSize();
    for (auto &sink : _db_sinks)
    {
        sink->log(buffer.getBuffer(), readableSize);
    }
}

八、测试

1.前置工作

const std::string sync_logger_db = "./logdb/sync.db";
const std::string async_safe_logger_db = "./logdb/async_safe.db";
const std::string async_unsafe_logger_db = "./logdb/async_unsafe.db";

const std::string mysql_conf = "./mysql.conf";

void init_sqlite()
{
    // 创建全局日志器
    LoggerBuilder::ptr builder = std::make_shared<GlobalLoggerBuilder>();
    // 1. 同步日志器
    builder->buildLoggerName(sync_logger_name);
    builder->buildLoggerType(LoggerType::SYNC);
    builder->buildDBLogSink<SqliteSink>(sync_logger_db);
    builder->build();
    // 2. 异步安全日志器
    builder->buildLoggerName(async_safe_logger_name);
    builder->buildLoggerType(LoggerType::ASYNC);
    builder->buildDBLogSink<SqliteSink>(async_safe_logger_db);
    builder->buildAsyncType(AsyncType::ASYNC_SAFE);
    builder->build();
    // 3. 异步不安全日志器
    builder->buildLoggerName(async_unsafe_logger_name);
    builder->buildLoggerType(LoggerType::ASYNC);
    builder->buildDBLogSink<SqliteSink>(async_unsafe_logger_db);
    builder->buildAsyncType(AsyncType::ASYNC_UNSAFE);
    builder->build();
}

void init_mysql()
{
    // 创建全局日志器
    LoggerBuilder::ptr builder = std::make_shared<GlobalLoggerBuilder>();
    // 1. 同步日志器
    builder->buildLoggerName(sync_logger_name);
    builder->buildLoggerType(LoggerType::SYNC);
    builder->buildDBLogSink<MySQLSink>(mysql_conf);
    builder->build();
    // 2. 异步安全日志器
    builder->buildLoggerName(async_safe_logger_name);
    builder->buildLoggerType(LoggerType::ASYNC);
    builder->buildDBLogSink<MySQLSink>(mysql_conf);
    builder->buildAsyncType(AsyncType::ASYNC_SAFE);
    builder->build();
    // 3. 异步不安全日志器
    builder->buildLoggerName(async_unsafe_logger_name);
    builder->buildLoggerType(LoggerType::ASYNC);
    builder->buildDBLogSink<MySQLSink>(mysql_conf);
    builder->buildAsyncType(AsyncType::ASYNC_UNSAFE);
    builder->build();
}

依然还是:
const int log_num = 1024*1024; const int per_size = 100;

2.SQLite3测试

因为同步是一条一条插入的,所以磁盘IO非常频繁,因此
同步我们就给2万条就行了,实际当中不推荐用同步

1.单线程

1.同步

在这里插入图片描述
在这里插入图片描述

2.异步安全

在这里插入图片描述
在这里插入图片描述

3.异步不安全

在这里插入图片描述

2.多线程

以三个线程为例,先把数据库文件删掉

1.同步

在这里插入图片描述
在这里插入图片描述

2.异步安全

在这里插入图片描述
在这里插入图片描述

3.异步不安全

在这里插入图片描述
在这里插入图片描述

3.MySQL测试

1.单线程

1.同步

在这里插入图片描述
在这里插入图片描述
这速度,算了吧,打印2万条就行啊

2.异步安全

异步的话最多耽误业务线程10s,可是要耽误工作线程几分钟。。。
这里数据量大的时候才能看出异步的好处
在这里插入图片描述
在这里插入图片描述
然后把表删了

3.异步不安全

在这里插入图片描述
在这里插入图片描述

2.多线程

依然是以3个线程为例

1.同步

2万条
在这里插入图片描述
在这里插入图片描述

2.异步安全

在这里插入图片描述
在这里插入图片描述

3.异步不安全

在这里插入图片描述
在这里插入图片描述

以上就是日志系统扩展一:日志落地数据库:MySQL、SQLite3的全部内容哦~


http://www.kler.cn/news/317222.html

相关文章:

  • 《C++中打造绚丽红色主题图形界面》
  • Qt 文件操作
  • C++ Mean Shift算法
  • Llamaindex 使用过程中的常见问题 (FAQ)
  • 云原生周刊:Artifact Hub 成为 CNCF 孵化项目|2024.9.23
  • 【深度学习】03-神经网络3-1梯度下降网络优化方法
  • 2024年信息安全企业CRM选型与应用研究报告
  • 『功能项目』3D模型动态UI显示【76】
  • MovieLife 电影生活
  • 彻底删除国际版OneDrive for Business上的数据
  • 责任链模式实现规则校验
  • 智慧交通,智能消防系统助力高铁站安全
  • Anaconda 安装
  • Directives Vue3 自定义指令
  • 平衡二叉树(AVL树):原理、常见算法及其应用
  • cccccccccccc
  • Qt_布局管理器
  • 【漏洞复现】HIKVISION 视频编码设备接入网关 showFile.php 任意文件下载漏洞
  • tomcat 配置jenkins_home 目录
  • 动态时间【JavaScript】
  • 使用【Sa-Token】实现Http Basic 认证
  • 输电线塔目标检测数据集yolo格式该数据集包括2644张输电线塔高清图像,该数据集已经过yolo格式标注,具有完整的txt标注文件和yaml配置文件。
  • 论文阅读--Planning-oriented Autonomous Driving(二)
  • C++堆(优先队列)priority_queue
  • 删除topic提示admin token
  • The NCCoE’s Automation of the CMVP
  • 【Oauth2整合gateway网关实现微服务单点登录】
  • k8s云平台部署文档
  • 【MySQL】使用C语言连接数据库
  • Http-浏览器发出⼀个请求到收到响应经历了哪些步骤?