【RabbitMQ 项目】服务端:数据管理模块之消息队列管理
文章目录
- 一.编写思路
- 二.代码实践
一.编写思路
- 定义消息队列
- 名字
- 是否持久化
- 定义队列持久化类(持久化到 sqlite3)
- 构造函数(只能成功,不能失败)
- 如果数据库(文件)不存在则创建
- 打开数据库
- 打开 msg_queue_table 数据库表
- 插入队列
- 移除队列
- 将数据库中的队列恢复到内存中
传入一个哈希表,key 为名字,value 为队列的智能指针,填充该哈希表- 定义队列管理类(包含内存管理和持久化管理)
- 构造函数:从数据库中恢复队列
- 声明队列
- 移除队列
- 获取队列
二.代码实践
MsgQueue.hpp:
#pragma once
#include "../common/Log.hpp"
#include "../common/Util.hpp"
#include "../common/Util.hpp"
#include <memory>
#include <unordered_map>
#include <mutex>
namespace ns_data
{
class MsgQueue;
using MsgQueuePtr = std::shared_ptr<MsgQueue>;
/************
* 定义消息队列
* ****************/
struct MsgQueue
{
std::string _name;
bool _isDurable;
MsgQueue(const std::string &name, bool isDurable)
: _name(name),
_isDurable(isDurable)
{
}
};
/*****************
* 定义消息队列持久化类
* ******************/
class MsgQueueMapper
{
private:
ns_util::Sqlite3Util _sqlite;
public:
MsgQueueMapper(const std::string &dbName)
: _sqlite(dbName)
{
// 确保数据库文件已经存在,不存在就创建
if (!ns_util::FileUtil::createFile(dbName))
{
LOG(FATAL) << "create database " << dbName << " fail" << endl;
exit(1);
}
if (!_sqlite.open())
{
LOG(FATAL) << "open database " << dbName << " fail" << endl;
exit(1);
}
createTable();
}
/*************
* 插入消息队列
* *************/
bool insertMsgQueue(MsgQueuePtr msgQueuePtr)
{
char insertSql[1024];
sprintf(insertSql, "insert into msg_queue_table values('%s', '%d');",
msgQueuePtr->_name.c_str(), msgQueuePtr->_isDurable);
if (!_sqlite.exec(insertSql, nullptr, nullptr))
{
LOG(WARNING) << "insert MsgQueue fail, MsgQueue: " << msgQueuePtr->_name << endl;
return false;
}
return true;
}
/**********
* 移除消息队列
* ***************/
void removeMsgQueue(const std::string &name)
{
char deleteSql[1024];
sprintf(deleteSql, "delete from msg_queue_table where name='%s';", name.c_str());
if (!_sqlite.exec(deleteSql, nullptr, nullptr))
{
LOG(WARNING) << "remove MsgQueue fail, MsgQueue: " << name << endl;
}
}
/***********
* 从数据库中恢复消息队列到内存
* *****************/
void recoverMsgQueue(std::unordered_map<std::string, MsgQueuePtr> *mapPtr)
{
const std::string selectSql = "select * from msg_queue_table;";
if (!_sqlite.exec(selectSql.c_str(), selectCallback, mapPtr))
{
LOG(FATAL) << "recover MsgQueue from msg_queue_table fail" << endl;
exit(1);
}
}
/**************
* 删除数据库表(仅调试)
* ***************/
void removeTable()
{
const std::string dropSql = "drop table if exists msg_queue_table;";
if (_sqlite.exec(dropSql.c_str(), nullptr, nullptr))
{
LOG(WARNING) << "remove table msg_queue_table fail" << endl;
}
}
private:
void createTable()
{
const std::string createSql = "create table if not exists msg_queue_table(\
name varchar(32) primary key,\
durable int\
);";
if (!_sqlite.exec(createSql.c_str(), nullptr, nullptr))
{
LOG(FATAL) << "create table msg_queue_table fail" << endl;
exit(1);
}
}
static int selectCallback(void *arg, int colNum, char **line, char **fields)
{
auto mapPtr = static_cast<std::unordered_map<std::string, MsgQueuePtr> *>(arg);
std::string name = line[0];
bool isDurable = std::stoi(line[1]);
auto msgQueuePtr = std::make_shared<MsgQueue>(name, isDurable);
mapPtr->insert({name, msgQueuePtr});
return 0;
}
};
class MsgQueueManager
{
private:
MsgQueueMapper _mapper;
std::unordered_map<std::string, MsgQueuePtr> _msgQueues;
std::mutex _mtx;
public:
MsgQueueManager(const std::string &dbName)
: _mapper(dbName)
{
_mapper.recoverMsgQueue(&_msgQueues);
}
/***********
* 声明队列
* ************/
bool declareMsgQueue(const std::string &name, bool isDurable)
{
std::unique_lock<std::mutex> lck(_mtx);
if (_msgQueues.count(name))
{
return true;
}
auto msgQueuePtr = std::make_shared<MsgQueue>(name, isDurable);
_msgQueues[name] = msgQueuePtr;
if (isDurable)
{
return _mapper.insertMsgQueue(msgQueuePtr);
}
return true;
}
/**********
* 移除队列
* ***********/
void removeMsgQueue(const std::string &name)
{
std::unique_lock<std::mutex> lck(_mtx);
auto it = _msgQueues.find(name);
if (it == _msgQueues.end())
{
return;
}
if (it->second->_isDurable)
{
_mapper.removeMsgQueue(name);
}
_msgQueues.erase(name);
}
/************
* 获取指定队列
* ***************/
MsgQueuePtr getMsgQueue(const std::string &name)
{
std::unique_lock<std::mutex> lck(_mtx);
if (_msgQueues.count(name) == 0)
{
return nullptr;
}
return _msgQueues[name];
}
/*************
* 清理所有队列(仅调试)
* ******************/
void clearMsgQueues()
{
std::unique_lock<std::mutex> lck(_mtx);
_msgQueues.clear();
_mapper.removeTable();
}
};
}