当前位置: 首页 > article >正文

【RabbitMQ 项目】服务端:数据管理模块之消息队列管理

文章目录

  • 一.编写思路
  • 二.代码实践

一.编写思路

  1. 定义消息队列
    1. 名字
    2. 是否持久化
  2. 定义队列持久化类(持久化到 sqlite3)
    1. 构造函数(只能成功,不能失败)
      1. 如果数据库(文件)不存在则创建
      2. 打开数据库
      3. 打开 msg_queue_table 数据库表
    2. 插入队列
    3. 移除队列
    4. 将数据库中的队列恢复到内存中
      传入一个哈希表,key 为名字,value 为队列的智能指针,填充该哈希表
  3. 定义队列管理类(包含内存管理和持久化管理)
    1. 构造函数:从数据库中恢复队列
    2. 声明队列
    3. 移除队列
    4. 获取队列

二.代码实践

MsgQueue.hpp:

#pragma once
#include "../common/Log.hpp"
#include "../common/Util.hpp"
#include "../common/Util.hpp"
#include <memory>
#include <unordered_map>
#include <mutex>
namespace ns_data
{

    class MsgQueue;
    using MsgQueuePtr = std::shared_ptr<MsgQueue>;

    /************
     * 定义消息队列
     * ****************/
    struct MsgQueue
    {
        std::string _name;
        bool _isDurable;
        MsgQueue(const std::string &name, bool isDurable)
            : _name(name),
              _isDurable(isDurable)
        {
        }
    };

    /*****************
     * 定义消息队列持久化类
     * ******************/
    class MsgQueueMapper
    {
    private:
        ns_util::Sqlite3Util _sqlite;

    public:
        MsgQueueMapper(const std::string &dbName)
            : _sqlite(dbName)
        {
            // 确保数据库文件已经存在,不存在就创建
            if (!ns_util::FileUtil::createFile(dbName))
            {
                LOG(FATAL) << "create database " << dbName << " fail" << endl;
                exit(1);
            }

            if (!_sqlite.open())
            {
                LOG(FATAL) << "open database " << dbName << " fail" << endl;
                exit(1);
            }
            createTable();
        }

        /*************
         * 插入消息队列
         * *************/
        bool insertMsgQueue(MsgQueuePtr msgQueuePtr)
        {
            char insertSql[1024];
            sprintf(insertSql, "insert into msg_queue_table values('%s', '%d');",
                    msgQueuePtr->_name.c_str(), msgQueuePtr->_isDurable);
            if (!_sqlite.exec(insertSql, nullptr, nullptr))
            {
                LOG(WARNING) << "insert MsgQueue fail, MsgQueue: " << msgQueuePtr->_name << endl;
                return false;
            }
            return true;
        }

        /**********
         * 移除消息队列
         * ***************/
        void removeMsgQueue(const std::string &name)
        {
            char deleteSql[1024];
            sprintf(deleteSql, "delete from msg_queue_table where name='%s';", name.c_str());
            if (!_sqlite.exec(deleteSql, nullptr, nullptr))
            {
                LOG(WARNING) << "remove MsgQueue fail, MsgQueue: " << name << endl;
            }
        }

        /***********
         * 从数据库中恢复消息队列到内存
         * *****************/
        void recoverMsgQueue(std::unordered_map<std::string, MsgQueuePtr> *mapPtr)
        {
            const std::string selectSql = "select * from msg_queue_table;";
            if (!_sqlite.exec(selectSql.c_str(), selectCallback, mapPtr))
            {
                LOG(FATAL) << "recover MsgQueue from msg_queue_table fail" << endl;
                exit(1);
            }
        }

        /**************
         * 删除数据库表(仅调试)
         * ***************/
        void removeTable()
        {
            const std::string dropSql = "drop table if exists msg_queue_table;";
            if (_sqlite.exec(dropSql.c_str(), nullptr, nullptr))
            {
                LOG(WARNING) << "remove table msg_queue_table fail" << endl;
            }
        }

    private:
        void createTable()
        {
            const std::string createSql = "create table if not exists msg_queue_table(\
                name varchar(32) primary key,\
                durable int\
            );";

            if (!_sqlite.exec(createSql.c_str(), nullptr, nullptr))
            {
                LOG(FATAL) << "create table msg_queue_table fail" << endl;
                exit(1);
            }
        }

        static int selectCallback(void *arg, int colNum, char **line, char **fields)
        {
            auto mapPtr = static_cast<std::unordered_map<std::string, MsgQueuePtr> *>(arg);
            std::string name = line[0];
            bool isDurable = std::stoi(line[1]);
            auto msgQueuePtr = std::make_shared<MsgQueue>(name, isDurable);

            mapPtr->insert({name, msgQueuePtr});
            return 0;
        }
    };

    class MsgQueueManager
    {
    private:
        MsgQueueMapper _mapper;
        std::unordered_map<std::string, MsgQueuePtr> _msgQueues;
        std::mutex _mtx;

    public:
        MsgQueueManager(const std::string &dbName)
            : _mapper(dbName)
        {
            _mapper.recoverMsgQueue(&_msgQueues);
        }

        /***********
         * 声明队列
         * ************/
        bool declareMsgQueue(const std::string &name, bool isDurable)
        {
            std::unique_lock<std::mutex> lck(_mtx);

            if (_msgQueues.count(name))
            {
                return true;
            }

            auto msgQueuePtr = std::make_shared<MsgQueue>(name, isDurable);
            _msgQueues[name] = msgQueuePtr;

            if (isDurable)
            {
                return _mapper.insertMsgQueue(msgQueuePtr);
            }
            return true;
        }

        /**********
         * 移除队列
         * ***********/
        void removeMsgQueue(const std::string &name)
        {
            std::unique_lock<std::mutex> lck(_mtx);
            
            auto it = _msgQueues.find(name);
            if (it == _msgQueues.end())
            {
                return;
            }
            if (it->second->_isDurable)
            {
                _mapper.removeMsgQueue(name);
            }

            _msgQueues.erase(name);
        }

        /************
         * 获取指定队列
         * ***************/
        MsgQueuePtr getMsgQueue(const std::string &name)
        {
            std::unique_lock<std::mutex> lck(_mtx);
            
            if (_msgQueues.count(name) == 0)
            {
                return nullptr;
            }
            return _msgQueues[name];
        }

        /*************
         * 清理所有队列(仅调试)
         * ******************/
        void clearMsgQueues()
        {
            std::unique_lock<std::mutex> lck(_mtx);

            _msgQueues.clear();
            _mapper.removeTable();
        }
    };

}

http://www.kler.cn/a/305430.html

相关文章:

  • Spring Boot3 配合ProxySQL实现对 MySQL 主从同步的读写分离和负载均衡
  • 关于地平线开发板使用nhwc格式的前向传播输出格式的理解
  • Springboot——钉钉(站内)实现登录第三方应用
  • 算法(二)——一维差分、等差数列差分
  • spring mvc源码学习笔记之八
  • 网络-ping包分析
  • 速盾:高防服务器租用需要注意什么事项
  • FPGA开发:模块 × 实例化
  • postgres_fdw访问存储在外部 PostgreSQL 服务器中的数据
  • 无线麦克风哪款好用,手机领夹麦克风哪个牌子好,麦克风推荐
  • 软件开发详解:同城O2O系统源码的架构设计与外卖跑腿APP的开发要点
  • 在Linux中安装FFmpeg
  • Brave编译指南2024 Windows篇:拉取Brave源码(六)
  • bin | hex
  • OpenCV class1-C#+winfrom显示控件并内存管理
  • 计算机网络 ---- 计算机网络的体系结构【计算机网络的分层结构】
  • uni-app开发安卓应用
  • 150+个流行的Instagram标签(及如何找到并正确使用它们)
  • 半导体芯闻--20240913
  • Ubuntu 22.04 LTS 上安装 Docker
  • 混合整数规划及其MATLAB实现
  • Arcgis api 4.x 专题图制作之分级色彩,采用自然间断法;(使用simple-statistics JS数学统计库生成自然间断点)
  • npm切换淘宝最新镜像
  • 玩转springboot之为什么springboot可以直接执行
  • 清理C盘缓存,删除电脑缓存指令是什么
  • http的请求方式