当前位置: 首页 > article >正文

微服务即时通信系统---(九)消息转发子服务

目录

功能设计

模块划分

业务接口/功能示意图

服务实现流程

服务代码实现

数据管理

MySQL(聊天会话成员管理)

chatSessionMember.hxx(ODB文件编写)

客户端操作编写(mysqlChatSessionMemberTable.hpp)

编写proto文件

消息元信息

消息转发proto

发送新消息

RPC调用

服务端创建子类(MessageTransmiteServiceImpl)完成RPC服务调用函数重写

GetTransmiteTargetID(获取转发目标ID并存储消息)

服务端完成消息转发子服务类(MessageTransmiteServer)

注意

实例化服务类对象,启动服务

工程系统构建配置文件(CMakeLists.txt)

服务测试

MySQL的会话成员表测试

向User表新增用户信息

新消息发送测试


本章节,主要对项目中消息转发子服务模块进行分析、开发与测试。

功能设计

消息转发子服务,主要用于对于一条消息内容,组织消息的消息ID以及各项所需要素,然后通知入口网关子服务,消息应该发送给谁。

而消息都是以聊天会话为基础的,因此根据聊天会话找到它的所有成员,这就是转发的目标。

除此之外,消息转发子服务还需要将受到的消息,放入消息队列中,由消息存储子服务进行消费存储。

因此,消息转发子服务提供一个功能性接口:

1、获取消息转发目标:针对消息内容,组织消息,告知入口网关子服务转发目标。

模块划分

参数/配置文件解析模块 基于gflags框架直接使用,进行参数/配置文件的解析。
日志模块 基于spdlog封装的logger 直接进行日志输出。
服务注册模块 基于etcd框架封装的注册模块 直接进行消息转发子服务模块的服务注册。
RPC服务模块 基于brpc框架 搭建消息转发子服务的RPC服务器。
服务发现与调用模块

基于etcd框架封装的服务发现与brpc框架封装的服务调用模块。

1、从用户管理子服务获取消息发送者的用户信息。

数据库数据操作模块

基于odb-mysql数据管理封装的模块,实现关系型数据库中数据的操作。

1、根据消息内容(会话ID),从数据库获取会话成员。

MQ发布模块 基于rabbitmq-client封装的模块将消息发布到消息队列,让消息存储子服务进行消费,对消息进行存储。

业务接口/功能示意图

服务实现流程

1、编写服务所需的proto文件,利用protoc工具生成RPC服务器所需的.pb.h 和 .pb.cc 项目文件。
2、服务端 创建子类,继承于proto文件中RPC调用类,并进行功能性接口函数重写。
3、服务端 完成消息转发子服务类。
4、实例化 服务类对象,启动服务

服务代码实现

数据管理

MySQL(聊天会话成员管理)

因为本服务需要通过聊天会话ID,获取到会话内成员ID,因此需要对聊天会话成员表进行管理。

chatSessionMember.hxx(ODB文件编写)
#pragma once
#include <iostream>
#include <odb/nullable.hxx>
#include <odb/core.hxx>

// odb -d mysql --std c++11 --generate-query --generate-schema --profile boost/date-time chatSessionMember.hxx

namespace yangz
{
#pragma db object table("ChatSessionMember")
    class ChatSessionMember
    {
    public:
        ChatSessionMember() {}
        ChatSessionMember(const std::string &cssid, const std::string &user_id)
            : _chat_session_id(cssid), _user_id(user_id)
        {
        }
        ~ChatSessionMember() {}

    public:
        void set_chat_session_id(const std::string &cssid) { _chat_session_id = cssid; }
        std::string get_chat_session_id() { return _chat_session_id; }

        void set_user_id(const std::string &user_id) { _user_id = user_id; }
        std::string get_user_id() { return _user_id; }

    private:
        friend class odb::access;
#pragma db id auto
        unsigned long _id; // 自增主键
#pragma db type("varchar(64)") index 
        std::string _chat_session_id; // 聊天会话id, varchar(64), 被索引
#pragma db type("varchar(64)")
        std::string _user_id; // 会话内的用户ID
    };
}

编译生成sql文件指令:

odb -d mysql --std c++11 --generate-query --generate-schema --profile boost/date-time chatSessionMember.hxx

 此时在 .sql文件里新增:

然后将该.sql 文件导入数据库中:

mysql -uroot -p 'MicroChat' < chatSessionMember.sql 
Enter password: 

现在数据库中就有对应的表了:

客户端操作编写(mysqlChatSessionMemberTable.hpp)

该模块主要提供五个接口:

1、新增一个ChatSessionMember信息。

2、新增vector个ChatSessionMember信息。

3、删除指定ChatSessionMember信息。

4、删除所有信息。

5、通过聊天会话ID,获取会话内所有成员ID。

#pragma once
#include "odbMysqlHandleFactory.hpp"
#include "chatSessionMember.hxx"
#include "chatSessionMember-odb.hxx"
#include "logger.hpp"

namespace yangz
{
    class ChatSessionMemberTableClient
    {
    public:
        using ptr = std::shared_ptr<ChatSessionMemberTableClient>;
        ChatSessionMemberTableClient(const std::shared_ptr<odb::core::database> &mysql_client) : _mysql_client(mysql_client) {}

    public:
        // 新增ChatSessionMember
        bool append(ChatSessionMember &csm)
        {
            try
            {
                odb::transaction trans(_mysql_client->begin());
                _mysql_client->persist(csm);
                trans.commit();
            }
            catch (const std::exception &e)
            {
                LOG_ERROR("新增一个会话成员失败, cssid: {}, user_id: {}, 失败原因: {}", csm.get_chat_session_id(), csm.get_user_id(), e.what());
                return false;
            }
        }

        // 新增vector<ChatSessionMember>
        bool append(std::vector<ChatSessionMember> &csm_list)
        {
            try
            {
                odb::transaction trans(_mysql_client->begin());
                for (auto &csm : csm_list)
                {
                    _mysql_client->persist(csm);
                }
                trans.commit();
            }
            catch (const std::exception &e)
            {
                LOG_ERROR("新增多个会话成员失败, cssid: {}, user_size: {}, 失败原因: {}", csm_list[0].get_chat_session_id(), csm_list.size(), e.what());
                return false;
            }
        }

        // 删除指定的ChatSessionMember
        bool remove(ChatSessionMember &csm)
        {
            try
            {
                odb::transaction trans(_mysql_client->begin());
                typedef odb::query<ChatSessionMember> query;
                typedef odb::result<ChatSessionMember> result;
                _mysql_client->erase_query<ChatSessionMember>(query::chat_session_id == csm.get_chat_session_id() && query::user_id == csm.get_user_id());
                trans.commit();
            }
            catch (const std::exception &e)
            {
                LOG_ERROR("删除指定会话成员失败, cssid: {}, user_id: {}, 失败原因: {}", csm.get_chat_session_id(), csm.get_user_id(), e.what());
                return false;
            }
        }

        // 删除所有会话成员
        bool remove(const std::string &cssid)
        {
            try
            {
                odb::transaction trans(_mysql_client->begin());
                typedef odb::query<ChatSessionMember> query;
                typedef odb::result<ChatSessionMember> result;
                _mysql_client->erase_query<ChatSessionMember>(query::chat_session_id == cssid);
                trans.commit();
            }
            catch (const std::exception &e)
            {
                LOG_ERROR("删除所有会话成员失败, cssid: {}, 失败原因: {}", cssid, e.what());
                return false;
            }
        }

        // 通过cssid获取会话内成员ID
        std::vector<std::string> get_user_id_list(const std::string &cssid)
        {
            std::vector<std::string> user_id_list;
            try
            {
                odb::transaction trans(_mysql_client->begin());
                typedef odb::query<ChatSessionMember> query;
                typedef odb::result<ChatSessionMember> result;
                result r(_mysql_client->query<ChatSessionMember>(query::chat_session_id == cssid));
                for (result::iterator i(r.begin()); i != r.end(); ++i)
                {
                    user_id_list.push_back(i->get_user_id());
                }

                trans.commit();
            }
            catch (const std::exception &e)
            {
                LOG_ERROR("获取会话内成员ID失败, cssid: {}, 失败原因: {}", cssid, e.what());
            }
            return user_id_list;
        }

    private:
        std::shared_ptr<odb::core::database> _mysql_client;
    };
}

编写proto文件

消息元信息

对于一条消息来说,需要包含:消息ID、消息所属聊天会话ID、消息产生时间、消息发送者信息、消息内容。

而其中:消息内容这里,可能是文件信息、图像信息、语音信息、文字信息。因此消息内容这里,又需要包含:消息类型、消息内容。

消息元信息(MessageInfo)成员(主要是用于好友管理子服务中获取最近的聊天信息所用,此处不用):

1、消息ID:标识消息唯一性。

2、消息所属聊天会话ID:通过该ID,查找聊天会话成员。

3、产生时间:以时间戳的方式记录。

4、消息发


http://www.kler.cn/a/594594.html

相关文章:

  • Sqlserver安全篇之_启用和禁用Named Pipes的案列介绍
  • 基于 HT 2D3D 渲染引擎的新能源充电桩可视化运营系统技术剖析
  • 网络安全证书培训机构有哪些
  • Godep是什么??在GoLang中作用是什么
  • Unity 从零开始的框架搭建1-7 FSM有限状态机与其应用示例
  • 自动生成树形目录结构:与 `el-tree` 和滚动定位结合的完整解决方案
  • WordPress系统获取webshell的攻略
  • 请为下面的html添加一个修改按钮,以便对书名、价格进行修改
  • 【FastGPT】利用知识库创建AI智能助手
  • C++动态库中的静态调用和动态调用,延迟加载
  • 再学:ERC721扩展、ERC1155、SBT,OpenSeaNFT市场 NFT Market 习题讲解
  • Multisim学习-01 特点安装使用和第一个仿真实例
  • 智慧人大系统(源码+文档+讲解+演示)
  • [极客大挑战 2019]Upload_3.19BUUCTF练习day3(2)
  • 《信息系统安全》(第一次上机实验报告)
  • 虚拟机如何扩容磁盘
  • PHP实现用户登录与注册功能
  • ruoyi 小程序使用笔记
  • 第四周日志-用网络请求理解bp(2)
  • React如何导入md5,把密码password进行md5加密