IM项目-----消息转发子服务
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
文章目录
- 前言
- 聊天会话表
- 表的数据库操作类
- 编写业务代码
- 封装消息转发服务器类
前言
消息转发子服务只提供一个服务功能,就是告知一条消息需要转发给谁。所以消息转发子服务不是转发消息的,而是告知消息应该转发给谁。
在收到了请求后,会先根据请求中的字段组织一个完整的消息结构。其中消息结构中有一个消息发送者信息,因此我们需要调用用户子服务获取用户信息。
为了完成实际业务,需要在聊天会话表中查询指定会话Id的会话成员。因此需要一个封装一个聊天会话表的数据库操作类。
同时为了支持离线消息,我们需要对消息进行持久化操作,而消息的持久化是消息存储自服务完成的,我们需要将完整的消息类型序列化后发布到MQ消息队列服务器中。待消息存储子服务进行消费。因此需要一个MQ客户端操作句柄。
聊天会话表
有三个字段:自增ID,会话Id和用户ID.其中会话Id需要建立一个索引,我们这个自服务就是通过会话ID来查询用户Id的。
#pragma db object table("chat_session_member")
class ChatSessionMember {
public:
ChatSessionMember(){}
ChatSessionMember(const std::string &ssid, const std::string &uid):
_session_id(ssid), _user_id(uid){}
~ChatSessionMember(){}
std::string session_id() const { return _session_id; }
void session_id(std::string &ssid) { _session_id = ssid; }
std::string user_id() const { return _user_id; }
void user_id(std::string &uid) { _user_id = uid; }
private:
friend class odb::access;
#pragma db id auto
unsigned long _id;
#pragma db type("varchar(64)") index
std::string _session_id;
#pragma db type("varchar(64)")
std::string _user_id;
};
表的数据库操作类
需要提供五个操作:
• 向指定会话中添加单个成员
• 向指定会话中添加多个成员。
• 从指定会话中删除单个成员
• 通过会话 ID,获取会话的所有成员 ID
• 删除会话所有成员:在删除会话的时候使用。
我们这个子服务只需要用到通过会话ID,获取会话的所有成员ID就行。其他的功能是好友自服务需要用到的。
//获取会话所有成员ID
std::vector<std::string> members(const std::string &ssid)
{
std::vector<std::string> res;
try {
odb::transaction trans(_db->begin());
typedef odb::query<ChatSessionMember> query;
typedef odb::result<ChatSessionMember> result;
result r(_db->query<ChatSessionMember>(query::session_id == ssid));
for (result::iterator i(r.begin()); i != r.end(); ++i) {
res.push_back(i->user_id());
}
trans.commit();
}catch (std::exception &e) {
LOG_ERROR("获取会话成员失败:{}-{}!", ssid, e.what());
}
return res;
}
编写业务代码
-
从请求中取出消息内容,会话 ID, 用户 ID
会话Id是为了获取消息转发的目标,用户Id是为了组织完整消息结构。 -
根据用户 ID 从用户子服务获取当前发送者用户信息
-
根据消息内容构造完成的消息结构(分配消息 ID,填充发送者信息,填充消息产生时间)
-
将消息序列化后发布到 MQ 消息队列中,让消息存储子服务对消息进行持久化存储
这个消息是不包含消息Id的(如果是非文本消息),在消息存储子服务消费这条消息后需要将消息中的文件上传到文件子服务中,并获取文 件ID,存储在数据库中和ES搜索引擎中。 -
从数据库获取目标会话所有成员 ID
-
组织响应(完整消息+目标用户 ID),发送给网关,告知网关该将消息发送给谁。
发送给网关的消息就是一个完整的消息结构,包含了消息正文,但没有消息ID。
void GetTransmitTarget(google::protobuf::RpcController* controller,
const ::lkm_im::NewMessageReq* request,
::lkm_im::GetTransmitTargetRsp* response,
::google::protobuf::Closure* done) override {
brpc::ClosureGuard rpc_guard(done);
auto err_response = [this, response](const std::string &rid,
const std::string &errmsg) -> void {
response->set_request_id(rid);
response->set_success(false);
response->set_errmsg(errmsg);
return;
};
//从请求中获取关键信息:用户ID,所属会话ID,消息内容
std::string rid = request->request_id();
std::string uid = request->user_id();
std::string chat_ssid = request->chat_session_id();
const MessageContent &content = request->message();
// 进行消息组织:发送者-用户子服务获取信息,所属会话,消息内容,产生时间,消息ID
auto channel = _mm_channels->choose(_user_service_name);
if (!channel) {
LOG_ERROR("{}-{} 没有可供访问的用户子服务节点!", rid, _user_service_name);
return err_response(rid, "没有可供访问的用户子服务节点!");
}
UserService_Stub stub(channel.get());
GetUserInfoReq req;
GetUserInfoRsp rsp;
req.set_request_id(rid);
req.set_user_id(uid);
brpc::Controller cntl;
stub.GetUserInfo(&cntl, &req, &rsp, nullptr);
if (cntl.Failed() == true || rsp.success() == false) {
LOG_ERROR("{} - 用户子服务调用失败:{}!", request->request_id(), cntl.ErrorText());
return err_response(request->request_id(), "用户子服务调用失败!");
}
MessageInfo message;
message.set_message_id(uuid());
message.set_chat_session_id(chat_ssid);
message.set_timestamp(time(nullptr));
message.mutable_sender()->CopyFrom(rsp.user_info());
message.mutable_message()->CopyFrom(content);
// 获取消息转发客户端用户列表
auto target_list = _mysql_session_member_table->members(chat_ssid);
// 将封装完毕的消息,发布到消息队列,待消息存储子服务进行消息持久化
bool ret = _mq_client->publish(_exchange_name, message.SerializeAsString(), _routing_key);
if (ret == false) {
LOG_ERROR("{} - 持久化消息发布失败:{}!", request->request_id(), cntl.ErrorText());
return err_response(request->request_id(), "持久化消息发布失败:!");
}
//组织响应
response->set_request_id(rid);
response->set_success(true);
response->mutable_message()->CopyFrom(message);
for (const auto &id : target_list) {
response->add_target_id_list(id);
}
}
封装消息转发服务器类
这块和前面用户子服务差不多,就不总结了,其中多了一个MQ的句柄,另外还需要传递一个交换机名称和队列名称传递到服务类对象中,用于消息的发布。