微服务即时通信系统---(九)消息转发子服务
目录
功能设计
模块划分
业务接口/功能示意图
服务实现流程
服务代码实现
数据管理
MySQL(聊天会话成员管理)
chatSessionMember.hxx(ODB文件编写)
客户端操作编写(mysqlChatSessionMemberTable.hpp)
编写proto文件
消息元信息
消息转发proto
发送新消息
RPC调用
服务端创建子类(MessageTransmiteServiceImpl)完成RPC服务调用函数重写
GetTransmiteTargetID(获取转发目标ID并存储消息)
服务端完成消息转发子服务类(MessageTransmiteServer)
注意
实例化服务类对象,启动服务
工程系统构建配置文件(CMakeLists.txt)
服务测试
MySQL的会话成员表测试
向User表新增用户信息
新消息发送测试
本章节,主要对项目中消息转发子服务模块进行分析、开发与测试。
功能设计
消息转发子服务,主要用于对于一条消息内容,组织消息的消息ID以及各项所需要素,然后通知入口网关子服务,消息应该发送给谁。
而消息都是以聊天会话为基础的,因此根据聊天会话找到它的所有成员,这就是转发的目标。
除此之外,消息转发子服务还需要将受到的消息,放入消息队列中,由消息存储子服务进行消费存储。
因此,消息转发子服务提供一个功能性接口:
1、获取消息转发目标:针对消息内容,组织消息,告知入口网关子服务转发目标。
模块划分
参数/配置文件解析模块 | 基于gflags框架直接使用,进行参数/配置文件的解析。 |
日志模块 | 基于spdlog封装的logger 直接进行日志输出。 |
服务注册模块 | 基于etcd框架封装的注册模块 直接进行消息转发子服务模块的服务注册。 |
RPC服务模块 | 基于brpc框架 搭建消息转发子服务的RPC服务器。 |
服务发现与调用模块 | 基于etcd框架封装的服务发现与brpc框架封装的服务调用模块。 1、从用户管理子服务获取消息发送者的用户信息。 |
数据库数据操作模块 | 基于odb-mysql数据管理封装的模块,实现关系型数据库中数据的操作。 1、根据消息内容(会话ID),从数据库获取会话成员。 |
MQ发布模块 | 基于rabbitmq-client封装的模块将消息发布到消息队列,让消息存储子服务进行消费,对消息进行存储。 |
业务接口/功能示意图
服务实现流程
1、编写服务所需的proto文件,利用protoc工具生成RPC服务器所需的.pb.h 和 .pb.cc 项目文件。 |
2、服务端 创建子类,继承于proto文件中RPC调用类,并进行功能性接口函数重写。 |
3、服务端 完成消息转发子服务类。 |
4、实例化 服务类对象,启动服务 |
服务代码实现
数据管理
MySQL(聊天会话成员管理)
因为本服务需要通过聊天会话ID,获取到会话内成员ID,因此需要对聊天会话成员表进行管理。
chatSessionMember.hxx(ODB文件编写)
#pragma once
#include <iostream>
#include <odb/nullable.hxx>
#include <odb/core.hxx>
// odb -d mysql --std c++11 --generate-query --generate-schema --profile boost/date-time chatSessionMember.hxx
namespace yangz
{
#pragma db object table("ChatSessionMember")
class ChatSessionMember
{
public:
ChatSessionMember() {}
ChatSessionMember(const std::string &cssid, const std::string &user_id)
: _chat_session_id(cssid), _user_id(user_id)
{
}
~ChatSessionMember() {}
public:
void set_chat_session_id(const std::string &cssid) { _chat_session_id = cssid; }
std::string get_chat_session_id() { return _chat_session_id; }
void set_user_id(const std::string &user_id) { _user_id = user_id; }
std::string get_user_id() { return _user_id; }
private:
friend class odb::access;
#pragma db id auto
unsigned long _id; // 自增主键
#pragma db type("varchar(64)") index
std::string _chat_session_id; // 聊天会话id, varchar(64), 被索引
#pragma db type("varchar(64)")
std::string _user_id; // 会话内的用户ID
};
}
编译生成sql文件指令:
odb -d mysql --std c++11 --generate-query --generate-schema --profile boost/date-time chatSessionMember.hxx
此时在 .sql文件里新增:
然后将该.sql 文件导入数据库中:
mysql -uroot -p 'MicroChat' < chatSessionMember.sql
Enter password:
现在数据库中就有对应的表了:
客户端操作编写(mysqlChatSessionMemberTable.hpp)
该模块主要提供五个接口:
1、新增一个ChatSessionMember信息。
2、新增vector个ChatSessionMember信息。
3、删除指定ChatSessionMember信息。
4、删除所有信息。
5、通过聊天会话ID,获取会话内所有成员ID。
#pragma once
#include "odbMysqlHandleFactory.hpp"
#include "chatSessionMember.hxx"
#include "chatSessionMember-odb.hxx"
#include "logger.hpp"
namespace yangz
{
class ChatSessionMemberTableClient
{
public:
using ptr = std::shared_ptr<ChatSessionMemberTableClient>;
ChatSessionMemberTableClient(const std::shared_ptr<odb::core::database> &mysql_client) : _mysql_client(mysql_client) {}
public:
// 新增ChatSessionMember
bool append(ChatSessionMember &csm)
{
try
{
odb::transaction trans(_mysql_client->begin());
_mysql_client->persist(csm);
trans.commit();
}
catch (const std::exception &e)
{
LOG_ERROR("新增一个会话成员失败, cssid: {}, user_id: {}, 失败原因: {}", csm.get_chat_session_id(), csm.get_user_id(), e.what());
return false;
}
}
// 新增vector<ChatSessionMember>
bool append(std::vector<ChatSessionMember> &csm_list)
{
try
{
odb::transaction trans(_mysql_client->begin());
for (auto &csm : csm_list)
{
_mysql_client->persist(csm);
}
trans.commit();
}
catch (const std::exception &e)
{
LOG_ERROR("新增多个会话成员失败, cssid: {}, user_size: {}, 失败原因: {}", csm_list[0].get_chat_session_id(), csm_list.size(), e.what());
return false;
}
}
// 删除指定的ChatSessionMember
bool remove(ChatSessionMember &csm)
{
try
{
odb::transaction trans(_mysql_client->begin());
typedef odb::query<ChatSessionMember> query;
typedef odb::result<ChatSessionMember> result;
_mysql_client->erase_query<ChatSessionMember>(query::chat_session_id == csm.get_chat_session_id() && query::user_id == csm.get_user_id());
trans.commit();
}
catch (const std::exception &e)
{
LOG_ERROR("删除指定会话成员失败, cssid: {}, user_id: {}, 失败原因: {}", csm.get_chat_session_id(), csm.get_user_id(), e.what());
return false;
}
}
// 删除所有会话成员
bool remove(const std::string &cssid)
{
try
{
odb::transaction trans(_mysql_client->begin());
typedef odb::query<ChatSessionMember> query;
typedef odb::result<ChatSessionMember> result;
_mysql_client->erase_query<ChatSessionMember>(query::chat_session_id == cssid);
trans.commit();
}
catch (const std::exception &e)
{
LOG_ERROR("删除所有会话成员失败, cssid: {}, 失败原因: {}", cssid, e.what());
return false;
}
}
// 通过cssid获取会话内成员ID
std::vector<std::string> get_user_id_list(const std::string &cssid)
{
std::vector<std::string> user_id_list;
try
{
odb::transaction trans(_mysql_client->begin());
typedef odb::query<ChatSessionMember> query;
typedef odb::result<ChatSessionMember> result;
result r(_mysql_client->query<ChatSessionMember>(query::chat_session_id == cssid));
for (result::iterator i(r.begin()); i != r.end(); ++i)
{
user_id_list.push_back(i->get_user_id());
}
trans.commit();
}
catch (const std::exception &e)
{
LOG_ERROR("获取会话内成员ID失败, cssid: {}, 失败原因: {}", cssid, e.what());
}
return user_id_list;
}
private:
std::shared_ptr<odb::core::database> _mysql_client;
};
}
编写proto文件
消息元信息
对于一条消息来说,需要包含:消息ID、消息所属聊天会话ID、消息产生时间、消息发送者信息、消息内容。
而其中:消息内容这里,可能是文件信息、图像信息、语音信息、文字信息。因此消息内容这里,又需要包含:消息类型、消息内容。
消息元信息(MessageInfo)成员(主要是用于好友管理子服务中获取最近的聊天信息所用,此处不用):
1、消息ID:标识消息唯一性。
2、消息所属聊天会话ID:通过该ID,查找聊天会话成员。
3、产生时间:以时间戳的方式记录。
4、消息发