【RabbitMQ 项目】服务端:数据管理模块之消息管理
文章目录
- 一.编写思路
- 1.定义消息类
- 2.定义消息持久化类
- 3.定义队列消息管理类
- 4.定义消息管理类
- 二.代码实践
一.编写思路
1.定义消息类
因为消息要在网络中传输,并且还要持久化到磁盘,所以使用 Protobuf 来定义,目的就是使用它的序列化反序列化方法。
首先需要客户端填充传递过来的字段:
- 消息 id:在服务端上用来唯一标识一条消息
- 路由关键字 routing_key:和路由交换有关
- 投递模式 delivery_mode:分为持久化和非持久化两种,非持久化表示这条消息不太重要,丢了也就丢了
- 消息主体 body
再加上一个字段:- 消息有效标志位 is_valid:服务端持久化时要用到的。磁盘上的消息是顺序存储的,想要删除一条消息,直接删除需要把大量消息往前移动覆盖,成本太高。解决方案就是将消息的有效标志位置为无效,具体来说先把消息提取上来反序列化,修改有效标志位,然后覆盖式写回
以上四个字段是需要存储到磁盘上的,下面还有 3 个和持久化相关字段,要在内存中记录- 磁盘上消息偏移量 offset:你要去将把消息置为无效,首先要在文件中找到那条消息
- 磁盘上消息的长度 length:找到消息起点后,还要把它提取上来
补充说明:
Protobuf 中 bool 类型 true 和 false 序列化后长度不一样,如果消息有效标志位使用 bool 类型,修改该字段然后序列化得到的结果长短不一,写回磁盘会覆盖其它消息,所以这个字段要用 string,“0”表示无效,“1”表示有效,都占 1 个字节
2.定义消息持久化类
前置说明:
- 为什么不用数据库而用磁盘?
- 有的消息体积很大,不适合用 sqlite 这样的小型数据库存储
- 消息持久化和确认应答删除是很频繁的动作,比交换机,队列,绑定这些要频繁地多,RabbitMQ 作为一个成熟的中间件,要追求效率,所以自己存储方案,更加灵活且高效
- 1 个文件还是多个文件?
每个队列中的消息最好分开持久化,因为持久化相关的操作不仅涉及到文件,还有内存,比如移动消息的位置,内存中的 offset 字段就会修改,这个时候如果所有消息都放在一起管理,文件锁,互斥锁竞争非常激烈,所以最好的办法以队列为单位管理消息,每个队列的消息有自己独立的文件。- 文件存储方案?
消息中有偏移量和长度字段,可以根据内存中的消息找到磁盘消息,但是程序刚启动时,内存是没有消息的,而我们的目的是把消息恢复到内存。那我们怎么保证读取到一个个完整消息呢?所以需要定制存储协议。定义如下:
先存储 8 字节消息长度,然后再存储消息。这和网络协议是类似的,4 字节消息长度就是报头,消息就是有效载荷,怎么把报头和有效载荷分离?定长 4 字节报头,怎么读取一个完整报文?4 字节 + 消息长度即为一个完整的报文
成员变量:
- 队列名称
- 存储队列消息的文件路径
- 转移队列消息的临时文件路径
成员方法:- 构造函数:确保数据文件存在
- 插入消息:根据传入的消息智能指针,将消息需要持久化的部分写入到文件末尾,并且修改内存中的 offset 和 length 字段
- 删除消息:根据传入的消息智能指针,在磁盘上找到消息并反序列化,把有效标志位置“0”,重新写回原位
- 恢复消息:顺序地读取消息文件,把有效的消息插入链表中,返回给外界
- 垃圾回收:由于删除消息只是修改了数据内容,并没有删除数据,所以消息不断累积给磁盘带来巨大压力,所以我们要定期清理掉无效的消息。怎么清理呢?先把磁盘上有效消息都加载到内存,处理完后把删除原来存储消息的文件,对临时文件重命名。最后把链表返回给外界
补充说明:
为什么恢复消息要返回链表?
它的上层,队列消息管理类会在程序启动时调用它,从文件中恢复出来的消息都会插入到待推送链表中,所以 recover 接口返回的链表可以直接作为待推送的消息链表
3.定义队列消息管理类
成员变量:
- 队列名称
- 队列消息持久化句柄
- 持久化消息的总数量
- 持久化消息中有效消息的总数量
这两个字段是为了判断是否需要垃圾回收,当文件中消息总数大于 2000,且有效消息比例不足 50% 时就垃圾回收- 待消费的消息(链表)
选择链表是因为有大量的队头和队尾操作,并且链表比队列更加灵活,可以访问中间元素(本项目没有用到)- 待应答的消息(哈希表,key:message id, value:MessagePtr)
客户端拿着消息 id 来确认的,所有要用 id 作为键值- 持久化的消息(哈希表,key:message id, value:MessagePtr)
其一,当要在文件中删除一条持久化的消息,需要找到它的偏移量以及长度,从哪找呢?就拿着消息 id,到这个成员中去找。为什么要用 id 做键值,因为一般是消息确认之后,就要删除持久化消息,而消息确认用的就是 id。
其二,垃圾回收后,需要更新内存中消息的 offset 字段,在哪找到消息呢?就从这里找
成员方法:- 构造函数:恢复文件中的消息到待消费的消息链表(后续改进)
- 插入消息:传入的参数告诉我是否需要持久化,先持久化,并在持久化消息列表中添加该消息,再尾插到待消费的消息链表中
- 移除消息:传入的是消息 id,根据 id 再在待应答消息列表中找到消息,如果已经持久化就先删除持久化消息,然后在在持久化消息列表中移除该消息,最后再 erase 待应答列表中的消息
- 取队首消息:从待消费链表中头部取出一条消息,再把它插入待应答列表,最后再返回给外部
重要的私有成员方法:- 垃圾回收:先要判断是否需要垃圾回收,如果需要再调用队列消息持久化句柄的垃圾回收方法,得到一个新链表,用每个结点的 offset 字段去更新持久化列表中的每个消息。每次移除持久化的消息后,就要检查是否需要垃圾回收
4.定义消息管理类
成员变量:
- 队列消息管理句柄(哈希表:key:队列名称,value:队列消息管理句柄指针)
成员方法:- 构造函数
传入所有队列名称,消息文件所在目录,去构造队列消息管理句柄(在它们构造函数中恢复消息)- 向指定队列插入消息
- 向指定队列应答消息
调用队里消息管理句柄的移除消息方法- 移除指定队列的所有消息
当客户端把队列删除时会调用此接口
二.代码实践
Message.hpp:
#pragma once
#include "../common/message.pb.h"
#include "../common/Util.hpp"
#include <memory>
#include <list>
#include <mutex>
namespace ns_data
{
class Message;
class QueueMessageManager;
using MessagePtr = std::shared_ptr<Message>;
using QueueMessageManagerPtr = std::shared_ptr<QueueMessageManager>;
static const std::string dataFileSuffix = ".data";
static const std::string tmpFileSuffix = ".data.tmp";
class QueueMessageMapper
{
private:
std::string _qname;
std::string _dataFilePath;
std::string _tmpFilePath;
public:
QueueMessageMapper(std::string baseDir, const std::string &qname)
: _qname(qname)
{
if (baseDir.back() != '/')
{
baseDir += '/';
}
_dataFilePath = baseDir + _qname + dataFileSuffix;
_tmpFilePath = baseDir + _qname + tmpFileSuffix;
// 确保数据文件存在
if (!ns_util::FileUtil::createFile(_dataFilePath))
{
LOG(FATAL) << "create message file fail, file: " << _dataFilePath << endl;
exit(1);
}
}
/***************
* 插入消息
* 注意:*messagePtr是一个输入输出型参数
* ******************/
bool insertMessage(MessagePtr msgPtr)
{
return insertMessageIntoFile(msgPtr, _dataFilePath);
}
/***************
* 移除消息
* 注意:*messagePtr是一个输入输出型参数,但这不重要,因为messagePtr在外界一定会从链表中移除
* ******************/
bool removeMessage(MessagePtr msgPtr)
{
size_t pos = msgPtr->offset();
size_t msgLen = msgPtr->length();
ns_util::FileUtil fileUtil(_dataFilePath);
if (!fileUtil.isOpen())
{
LOG(WARNING) << "open message file " << _dataFilePath << " fail" << endl;
return false;
}
// 1.读取消息
std::string msgBytes;
if (!fileUtil.read(&msgBytes, pos, msgLen))
{
LOG(WARNING) << "read msg fail" << endl;
return false;
}
// 2.反序列化
if (!msgPtr->mutable_saved_info()->ParseFromString(msgBytes)) // 注意!!!
{
LOG(WARNING) << "parse Message fail, offset: " << pos << ", msgLen: " << msgLen << ", msgBytes: "
<< msgBytes << endl;
return false;
}
// 3.修改标志位为“0”
msgPtr->mutable_saved_info()->set_valid("0");
// 4.序列化
std::string newMsgBytes = msgPtr->saved_info().SerializeAsString(); // 注意
if (newMsgBytes.size() != msgLen)
{
cout << "修改有效标志位后序列化长度与之前不一致, old: " << msgLen << ", new: " << newMsgBytes.size() << endl;
return false;
}
// 5.写回原处
if (!fileUtil.write(newMsgBytes.c_str(), pos, msgLen))
{
LOG(WARNING) << "rewrite msg fail" << endl;
return false;
}
return true;
}
/*************
* 程序启动时调用,只能成功,失败直接退出
* listPtr是输出型参数
* ******************/
void recoverMessages(std::list<MessagePtr> *listPtr)
{
ns_util::FileUtil fileUtil(_dataFilePath);
if (!fileUtil.isOpen())
{
LOG(WARNING) << "open message file " << _dataFilePath << " fail" << endl;
exit(1);
}
size_t pos = 0;
size_t fsize = fileUtil.size();
while (pos < fsize)
{
size_t msgLen;
// 先读取8字节长度
if (!fileUtil.read(&msgLen, pos, sizeof(msgLen)))
{
LOG(WARNING) << "read 8 byte msgLen fail, pos=" << pos << ", fsize=" << fsize << endl;
exit(1);
}
// LOG(INFO) << "read msgLen success, msgLen: " << msgLen << endl;
pos += sizeof(msgLen);
// 再读取消息
std::string msgBytes;
if (!fileUtil.read(&msgBytes, pos, msgLen))
{
LOG(WARNING) << "read msg fail" << endl;
exit(1);
}
// LOG(INFO) << "read msg success, msgBytes: " << msgBytes << endl;
// 反序列化
MessagePtr msgPtr = std::make_shared<Message>();
if (!msgPtr->mutable_saved_info()->ParseFromString(msgBytes))
{
LOG(WARNING) << "parse Message fail" << endl;
exit(1);
}
if (msgPtr->saved_info().valid() == "1")
{
msgPtr->set_length(msgBytes.size());
msgPtr->set_offset(pos);
listPtr->push_back(msgPtr);
}
pos += msgLen;
}
}
/***************
* 垃圾回收
* *******************/
bool gc(std::list<MessagePtr> *listPtr)
{
recoverMessages(listPtr);
// 确保临时文件存在,且是一个崭新的
ns_util::FileUtil::removeFileOrDir(_tmpFilePath);
if (!ns_util::FileUtil::createFile(_tmpFilePath))
{
LOG(FATAL) << "create message file fail, file: " << _tmpFilePath << endl;
return false;
}
for (auto it = listPtr->begin(); it != listPtr->end(); it++)
{
if (!insertMessageIntoFile(*it, _tmpFilePath))
{
return false;
}
}
// 删除数据文件,给临时文件重命名
assert(ns_util::FileUtil::removeFileOrDir(_dataFilePath));
assert(ns_util::FileUtil::renameFile(_tmpFilePath, _dataFilePath));
return true;
}
/***************
* 删除消息文件
* ****************/
void removeDataFile()
{
assert(ns_util::FileUtil::removeFileOrDir(_dataFilePath));
}
private:
bool insertMessageIntoFile(MessagePtr msgPtr, const std::string &pathName)
{
// 1.有效标志位设为“1”
msgPtr->mutable_saved_info()->set_valid("1");
// 2.对需要存储的部分序列化
std::string msg = msgPtr->saved_info().SerializeAsString();
ns_util::FileUtil fileUtil(pathName);
if (!fileUtil.isOpen())
{
LOG(WARNING) << "open message file " << pathName << " fail" << endl;
return false;
}
// 3.先写入8字节长度
size_t msgSize = msg.size();
size_t fsize = fileUtil.size();
if (!fileUtil.write(&msgSize, fsize, sizeof(msgSize)))
{
LOG(WARNING) << "write 8 Byte msgSize fail" << endl;
return false;
}
// 4.写入消息
if (!fileUtil.write(msg))
{
LOG(WARNING) << "write msg fail" << endl;
return false;
}
// 5.填充offset和length字段
msgPtr->set_offset(fsize + sizeof(msgSize));
msgPtr->set_length(msgSize);
return true;
}
};
class QueueMessageManager
{
private:
std::string _qname;
QueueMessageMapper _mapper;
size_t _validDurableNum;
size_t _totalDurableNum;
std::list<MessagePtr> _waitConsumeMessages;
std::unordered_map<std::string, MessagePtr> _waitAckMessages;
std::unordered_map<std::string, MessagePtr> _durableMessages;
std::mutex _mtx;
public:
QueueMessageManager(const std::string &qname, const std::string &baseDir)
: _qname(qname),
_mapper(baseDir, _qname),
_validDurableNum(0),
_totalDurableNum(0)
{
// 恢复队列消息
_mapper.recoverMessages(&_waitConsumeMessages);
for (const auto &msgPtr : _waitConsumeMessages)
{
_durableMessages[msgPtr->saved_info().id()] = msgPtr;
}
_validDurableNum = _totalDurableNum = _durableMessages.size();
}
/***************
* 向待消费队列尾插一条消息,根据需要看是否要持久化
* 我们允许一条消息没有id,如果没有id,还需要持久化,我们就给它手动设置一个uuid
* **************/
bool insertMessage(const std::string &id, const std::string &routingKey, const std::string &body,
DeliveryMode deliveryMode)
{
auto msgPtr = std::make_shared<Message>();
if (id == "")
{
msgPtr->mutable_saved_info()->set_id(ns_util::UUIDUtil::uuid());
}
else
{
msgPtr->mutable_saved_info()->set_id(id);
}
msgPtr->mutable_saved_info()->set_routing_key(routingKey);
msgPtr->mutable_saved_info()->set_delivery_mode(deliveryMode);
msgPtr->mutable_saved_info()->set_body(body);
std::unique_lock<std::mutex> lck(_mtx);
// 判断是否要持久化
if (deliveryMode == DeliveryMode::DURABLE)
{
if (!_mapper.insertMessage(msgPtr))
{
LOG(WARNING) << "持久化消息失败, message body: " << msgPtr->saved_info().body() << endl;
return false;
}
_durableMessages[msgPtr->saved_info().id()] = msgPtr;
_totalDurableNum++;
_validDurableNum++;
}
_waitConsumeMessages.push_back(msgPtr);
return true;
}
/**************
* 移除一条待确认应答消息
* **************/
bool removeMessage(const std::string &id)
{
std::unique_lock<std::mutex> lck(_mtx);
if (_waitAckMessages.count(id) == 0)
{
LOG(INFO) << "message not found in waitAckMessages, id: " << id << endl;
return true;
}
auto msgPtr = _waitAckMessages[id];
// 看看它是否持久化了
if (msgPtr->saved_info().delivery_mode() == DeliveryMode::DURABLE)
{
if (!_mapper.removeMessage(msgPtr))
{
LOG(WARNING) << "remove durable message in file fail, message body: "
<< msgPtr->saved_info().body() << endl;
return false;
}
_validDurableNum--;
_durableMessages.erase(id);
}
_waitAckMessages.erase(id);
// 最后检查是否需要垃圾回收
if (gcCheck())
{
gc();
}
return true;
}
/**************
* 取走队头消息
* ***************/
MessagePtr popFront()
{
std::unique_lock<std::mutex> lck(_mtx);
if (_waitConsumeMessages.empty())
{
return nullptr;
}
auto msgPtr = _waitConsumeMessages.front();
_waitConsumeMessages.pop_front();
_waitAckMessages[msgPtr->saved_info().id()] = msgPtr;
return msgPtr;
}
/***************
* 清空队列消息
* **************/
void clear()
{
std::unique_lock<std::mutex> lck(_mtx);
_mapper.removeDataFile();
_waitConsumeMessages.clear();
_waitAckMessages.clear();
_durableMessages.clear();
_validDurableNum = _totalDurableNum = 0;
}
/************
* 以下成员仅作调试用
* ***************/
size_t waitConsumeSize()
{
std::unique_lock<std::mutex> lck(_mtx);
return _waitConsumeMessages.size();
}
size_t waitAckSize()
{
std::unique_lock<std::mutex> lck(_mtx);
return _waitAckMessages.size();
}
size_t durableSize()
{
std::unique_lock<std::mutex> lck(_mtx);
return _durableMessages.size();
}
private:
bool gcCheck()
{
if (_totalDurableNum >= 2000 && _validDurableNum * 10 / _totalDurableNum < 5)
{
return true;
}
return false;
}
bool gc()
{
std::list<MessagePtr> newMessageList;
if (!_mapper.gc(&newMessageList))
{
LOG(WARNING) << "gc fail" << endl;
return false;
}
for (const auto &msgPtr : newMessageList)
{
std::string id = msgPtr->saved_info().id();
if (_durableMessages.count(id) == 0)
{
LOG(WARNING) << "one durable message not be managered, message body: "
<< msgPtr->saved_info().body() << endl;
// 补救措施:添加到待消费链表和持久化列表中
_waitConsumeMessages.push_back(msgPtr);
_durableMessages[id] = msgPtr;
}
else
{
// 更新offset
_durableMessages[id]->set_offset(msgPtr->offset());
assert(_durableMessages[id]->length() == msgPtr->length());
}
}
_validDurableNum = _totalDurableNum = _durableMessages.size();
return true;
}
};
class MessageManager
{
private:
std::unordered_map<std::string, QueueMessageManagerPtr> _qMsgManagers;
std::mutex _mtx;
public:
MessageManager(const std::string &baseDir, const std::vector<std::string> &qnames)
{
// 构造函数无需锁住
for (const auto &qname : qnames)
{
_qMsgManagers[qname] = std::make_shared<QueueMessageManager>(qname, baseDir); // 完成后消息已经恢复了
}
}
/***************
* 向指定队列插入消息
* ***************/
bool insertMessage(const std::string &qname, const std::string &id, const std::string &routingKey,
const std::string &body, DeliveryMode deliveryMode)
{
std::unique_lock<std::mutex> lck(_mtx);
if (_qMsgManagers.count(qname) == 0)
{
LOG(WARNING) << "insert message fail, because MessageQueue not found, qname: " << qname << endl;
return false;
}
return _qMsgManagers[qname]->insertMessage(id, routingKey, body, deliveryMode);
}
/************
* 对指定队列的消息应答
* *************/
void ack(const std::string &qname, const std::string &msgId)
{
std::unique_lock<std::mutex> lck(_mtx);
if (_qMsgManagers.count(qname) == 0)
{
LOG(INFO) << "ack error, because MessageQueue not found, qname: " << qname << endl;
return;
}
_qMsgManagers[qname]->removeMessage(msgId);
}
/***********
* 移除指定队列的消息,当删除一个队列时会用到
* ***********/
void removeQueueMessages(const std::string &qname)
{
std::unique_lock<std::mutex> lck(_mtx);
if (_qMsgManagers.count(qname) == 0)
{
LOG(WARNING) << "removeQueueMessages fail, because MessageQueue not found, qname: " << qname << endl;
return;
}
_qMsgManagers[qname]->clear();
_qMsgManagers.erase(qname);
}
/*********
* 取出指定队列队首消息
* **************/
MessagePtr popFront(const std::string &qname)
{
std::unique_lock<std::mutex> lck(_mtx);
if (_qMsgManagers.count(qname) == 0)
{
LOG(WARNING) << "popFront fail, because MessageQueue not found, qname: " << qname << endl;
return nullptr;
}
return _qMsgManagers[qname]->popFront();
}
/***********
* 清除所有消息(仅调试)
* *****************/
void clear()
{
std::unique_lock<std::mutex> lck(_mtx);
for (auto it = _qMsgManagers.begin(); it != _qMsgManagers.end(); it++)
{
it->second->clear();
}
}
/**********
* 以下成员仅作调试用
* ***************/
size_t waitConsumeSize(const std::string &qname)
{
std::unique_lock<std::mutex> lck(_mtx);
if (_qMsgManagers.count(qname) == 0)
{
LOG(WARNING) << qname << " not found" << qname << endl;
return 0;
}
return _qMsgManagers[qname]->waitConsumeSize();
}
size_t waitAckSize(const std::string &qname)
{
std::unique_lock<std::mutex> lck(_mtx);
if (_qMsgManagers.count(qname) == 0)
{
LOG(WARNING) << qname << " not found" << qname << endl;
return 0;
}
return _qMsgManagers[qname]->waitAckSize();
}
size_t durableSize(const std::string &qname)
{
std::unique_lock<std::mutex> lck(_mtx);
if (_qMsgManagers.count(qname) == 0)
{
LOG(WARNING) << qname << " not found" << qname << endl;
return 0;
}
return _qMsgManagers[qname]->durableSize();
}
};
}
Util.hpp:
#pragma once
#include "Log.hpp"
#include <string>
#include <sqlite3.h>
#include <iostream>
#include <sys/stat.h>
#include <sys/types.h>
#include <fcntl.h>
#include <unistd.h>
#include <fstream>
#include <sstream>
#include <random>
#include <iomanip>
#include <atomic>
using namespace ns_log;
namespace ns_util
{
class Sqlite3Util
{
private:
std::string _dbfile;
sqlite3 *_handler;
bool _isOpen;
public:
Sqlite3Util(const std::string &dbfile)
: _dbfile(dbfile),
_handler(nullptr),
_isOpen(false)
{
open();
}
~Sqlite3Util()
{
close();
}
bool open(int safeLevel = SQLITE_OPEN_FULLMUTEX)
{
if (_isOpen)
{
return true;
}
// 可读可写,不存在就创建,默认串行化访问
int ret = sqlite3_open_v2(_dbfile.c_str(), &_handler, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | safeLevel,
nullptr);
if (ret != SQLITE_OK)
{
LOG(WARNING) << sqlite3_errmsg(_handler) << endl;
return false;
}
_isOpen = true;
return true;
}
// int sqlite3_exec(sqlite3*, char *sql, int (*callback) (void* arg,int colNum ,char** lines,char** fields), void* arg, char **err)
bool exec(const std::string &sql, int (*cb)(void *, int, char **, char **), void *arg)
{
if (sqlite3_exec(_handler, sql.c_str(), cb, arg, nullptr) != SQLITE_OK)
{
LOG(WARNING) << "execute fail: error: " << sqlite3_errmsg(_handler) << endl;
return false;
}
return true;
}
void close()
{
if (_handler)
{
sqlite3_close_v2(_handler);
}
}
};
class FileUtil
{
private:
std::string _pathName;
std::fstream _fs;
public:
FileUtil(const std::string &pathName)
: _pathName(pathName),
_fs(_pathName, std::ios::in | std::ios::out | std::ios::binary)
{
if (!_fs.is_open())
{
LOG(WARNING) << "open file fail: " << _pathName << endl;
}
}
bool isOpen()
{
return _fs.is_open();
}
/***************
* 从指定位置开始写指定个字节的内容
* *****************/
bool write(const void *buf, size_t pos, size_t len)
{
_fs.seekp(pos);
_fs.write((char *)buf, len);
if (!_fs.good())
{
LOG(WARNING) << "write fail, error: " << strerror(errno) << endl;
return false;
}
return true;
}
/******************
* 向文件末尾写string
* ***************/
bool write(const std::string &content)
{
return write(content.c_str(), size(), content.size());
}
/**************
* 从指定位置开始读取指定字节的内容
* ************/
bool read(void* buf, size_t pos, size_t len)
{
_fs.seekg(pos);
_fs.read((char *)buf, len);
if (!_fs.good())
{
LOG(WARNING) << "read fail, error: " << strerror(errno) << endl;
return false;
}
return true;
}
bool read(std::string *contentPtr, size_t pos, size_t len)
{
contentPtr->resize(len, '\0');
return read((void*)(contentPtr->c_str()), pos, len);
}
/********************
* 读取整个文件
* *****************/
bool read(std::string *contentPtr)
{
size_t fsize = size();
contentPtr->resize(fsize, '\0');
return read((void *)contentPtr->c_str(), 0, fsize);
}
size_t size()
{
size_t curPos = _fs.tellg();
_fs.seekg(0, std::ios::end);
size_t ret = _fs.tellg();
// 恢复原来的位置
_fs.seekg(curPos);
return ret;
}
static void getParentDirectory(const std::string &pathName, std::string *dirPtr)
{
// 从后往前找,找到第一个“/”
auto pos = pathName.rfind('/');
if (pos == std::string::npos)
{
*dirPtr = "./";
return;
}
*dirPtr = pathName.substr(0, pos);
}
static bool createDirectory(const std::string &dirName)
{
// 从第一个父目录开始,逐层创建
size_t prev = 0;
while (true)
{
auto pos = dirName.find('/', prev);
if (pos == std::string::npos)
{
break;
}
std::string dir = dirName.substr(0, pos);
int ret = mkdir(dir.c_str(), 0775);
if (ret != 0 && errno != EEXIST)
{
LOG(WARNING) << "创建目录" << dir << "失败, error: " << strerror(errno) << endl;
return false;
}
prev = pos + 1;
}
int ret = mkdir(dirName.c_str(), 0775);
if (ret != 0 && errno != EEXIST)
{
LOG(WARNING) << "创建目录" << dirName << "失败, error: " << strerror(errno) << endl;
return false;
}
return true;
}
static bool createFile(const std::string pathName)
{
// 先创建它的父目录
std::string parentDir;
getParentDirectory(pathName, &parentDir);
if (!createDirectory(parentDir))
{
LOG(WARNING) << "创建文件失败,因为创建父目录失败" << endl;
return false;
}
// 再创建文件
int fd = open(pathName.c_str(), O_CREAT, 0775);
if (fd == -1)
{
LOG(WARNING) << "创建文件失败, error: " << strerror(errno) << endl;
return false;
}
close(fd);
return true;
}
static bool removeFileOrDir(const std::string &name)
{
if (remove(name.c_str()) != 0)
{
LOG(WARNING) << "remove " << name << " fail, error: " << strerror(errno) << endl;
return false;
}
return true;
}
static bool renameFile(const std::string &oldPathName, const std::string &newPathName)
{
if (rename(oldPathName.c_str(), newPathName.c_str()) == -1)
{
LOG(WARNING) << "rename " << oldPathName << " to " << newPathName << " fail, error: "
<< strerror(errno) << endl;
return false;
}
return true;
}
};
class UUIDUtil
{
public:
static std::string uuid()
{
std::random_device rd;
std::mt19937_64 gernator(rd());
std::uniform_int_distribution<int> distribution(0, 255);
std::stringstream ss;
for (int i = 0; i < 8; i++)
{
ss << std::setw(2) << std::setfill('0')
<< std::hex << distribution(gernator);
if (i == 3 || i == 5 || i == 7)
{
ss << "-";
}
}
static std::atomic<size_t> seq(1);
size_t num = seq.fetch_add(1);
for (int i = 7; i >= 0; i--)
{
ss << std::setw(2) << std::setfill('0') << std::hex << ((num >> (i * 8)) & 0xff);
if (i == 6)
ss << "-";
}
return ss.str();
}
};
}