【RabbitMQ 项目】服务端:数据管理模块之虚拟机模块
文章目录
- 一.编写思路
- 二.代码实践
一.编写思路
前置说明:
该项目是 RabbitMQ 项目的简化版,服务器上只会有一个虚拟机,所以虚拟机模块就是最终的数据模块,并且只需定义虚拟机类,无需虚拟机管理类
成员变量:
- 交换机管理句柄
- 消息队列管理句柄
- 绑定管理句柄
- 消息管理句柄
句柄建议使用智能指针,防止将来虚拟机定义在栈上,给栈带来不小的负担
成员方法- 声明交换机
- 删除交换机
与之相关的绑定也要随之删除- 声明队列
同时要调用消息管理句柄中的接口,初始化队列消息管理句柄- 删除队列
与之相关的绑定也要随之删除,同时删除队列消息- 绑定交换机和队列
- 解绑交换机和队列
- 向某个队列发布消息
- 从某个队列消费消息
- 确认应答指定队列的指定消息
- 获取指定交换机相关的所有绑定:给服务器模块用的,用于路由交换
二.代码实践
#pragma once
#include "Exchange.hpp"
#include "MsgQueue.hpp"
#include "Binding.hpp"
#include "Message.hpp"
#include "../common/Log.hpp"
#include <memory>
namespace ns_data
{
using ExchangeManagerPtr = std::shared_ptr<ExchangeManager>;
using MsgQueueManagerPtr = std::shared_ptr<MsgQueueManager>;
using BindingManagerPtr = std::shared_ptr<BindingManager>;
using MessageManagerPtr = std::shared_ptr<MessageManager>;
class VirtualHost
{
private:
ExchangeManagerPtr _exchangeManagerPtr;
MsgQueueManagerPtr _msgQueueManagerPtr;
BindingManagerPtr _bindingManagerPtr;
MessageManagerPtr _messageManagerPtr;
public:
VirtualHost(const std::string &dbName, const std::string &baseDir)
: _exchangeManagerPtr(std::make_shared<ExchangeManager>(dbName)),
_msgQueueManagerPtr(std::make_shared<MsgQueueManager>(dbName)),
_bindingManagerPtr(std::make_shared<BindingManager>(dbName))
{
std::vector<std::string> qnames;
_msgQueueManagerPtr->getAllQueueName(&qnames);
_messageManagerPtr = std::make_shared<MessageManager>(baseDir, qnames);
}
/**********
* 声明交换机
* *****************/
bool declareExchange(const std::string &name, ExchangeType type, bool isDurable)
{
return _exchangeManagerPtr->declareExechange(name, type, isDurable);
}
/***************
* 删除交换机
* 交换机删除,与之相关的绑定也要删除
* *************/
void deleteExchange(const std::string &name)
{
_exchangeManagerPtr->removeExchange(name);
_bindingManagerPtr->removeExchangeBindings(name);
}
/***********
* 声明队列
* 新增一个队列,消息管理句柄中也要新增一个队列消息管理句柄
* ****************/
bool declareMsgQueue(const std::string &name, bool isDurable)
{
_messageManagerPtr->initQueueMessageManager(name);
return _msgQueueManagerPtr->declareMsgQueue(name, isDurable);
}
/************
* 删除队列
* 删除一个队列,与之相关的绑定也要随之删除
* 同时要移除与之对应的队列消息
* **********/
void deleteMsgQueue(const std::string &name)
{
_bindingManagerPtr->removeMsqQueueBindings(name);
_messageManagerPtr->removeQueueMessages(name);
_msgQueueManagerPtr->removeMsgQueue(name);
}
/***************
* 绑定交换机和队列
* ***********/
bool bind(const std::string &ename, const std::string &qname, const std::string& bindingKey)
{
//内部判断绑定是否需要持久化
bool isDurable = false;
if (_msgQueueManagerPtr->getMsgQueue(qname)->_isDurable && _exchangeManagerPtr->getExchange(ename))
{
isDurable = true;
}
return _bindingManagerPtr->bind(ename, qname, bindingKey, isDurable);
}
/*************
* 解绑交换机和队列
* *************/
void unbind(const std::string &ename, const std::string &qname)
{
auto exchanePtr = _exchangeManagerPtr->getExchange(ename);
if (exchanePtr == nullptr)
{
return;
}
auto msgQueuePtr = _msgQueueManagerPtr->getMsgQueue(qname);
if (msgQueuePtr == nullptr)
{
return;
}
bool isDurable = false;
if (msgQueuePtr->_isDurable && exchanePtr->_isDurable)
{
isDurable = true;
}
_bindingManagerPtr->unbind(ename, qname, isDurable);
}
/**********
* 向指定队列发布一条消息
* ***************/
bool publish(const std::string &qname, const std::string &id, const std::string &routingKey,
const std::string &body, DeliveryMode deliveryMode)
{
return _messageManagerPtr->insertMessage(qname, id, routingKey, body, deliveryMode);
}
/*************
* 从指定队列消费一条消息
* *************/
MessagePtr consume(const std::string& qname)
{
return _messageManagerPtr->popFront(qname);
}
/***************
* 应答指定队列的指定消息
* **************/
void ack(const std::string& qname, const std::string& msgId)
{
_messageManagerPtr->ack(qname, msgId);
}
/************
* 获取与指定交换机相关的所有绑定
* *mapPtr:key->队列名, value->BindingPtr
* ****************/
bool getExchangeBindings(const std::string& ename, std::unordered_map<std::string, BindingPtr>* mapPtr)
{
return _bindingManagerPtr->getExchangeBindings(ename, mapPtr);
}
};
}