【从零实现Json-Rpc框架】- 项目实现 - Dispatcher模块实现篇
📢博客主页:https://blog.csdn.net/2301_779549673
📢博客仓库:https://gitee.com/JohnKingW/linux_test/tree/master/lesson
📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
📢本文由 JohnKi 原创,首发于 CSDN🙉
📢未来很长,值得我们全力奔赴更美好的生活✨
文章目录
- 📢前言
- 🏳️🌈一、通信模拟
- 1.1 服务端模拟
- 1.2 客户端模拟
- 🏳️🌈二、Dispatcher 的作用与设计意义
- 2.1 核心作用
- 2.2 注册消息处理函数
- 2.2 消息分发逻辑
- 🏳️🌈三、应用场景
- 3.1 服务端
- 3.2 客户端
- 🏳️🌈补充 - dispatcher 模板化
- 客户端
- 服务端
- 👥总结
📢前言
前几篇文章中,笔者介绍了rpc
的原理和目的,也介绍了需要使用的部分第三方库
和我们所需实现的功能
现在我们着手项目实现篇章
,目前零碎接口 和 项目消息字段类型 都已经完成了
截至上一篇文章,抽象层及其封装都已经完善了
所以我们现在需要去使用这些代码,去简单实现单线程的通信传播是否可行
并由此引出 Dispatcher路由
🏳️🌈一、通信模拟
这是利用代码实现的服务端通信流程图
sequenceDiagram
participant Client
participant MuduoServer
participant LVProtocol
participant Dispatcher
Client->>MuduoServer: 发送二进制数据流
MuduoServer->>LVProtocol: canProcessed()检查数据完整性
LVProtocol->>LVProtocol: onMessage()解析消息头
LVProtocol->>MessageFactory: 创建具体消息对象
MessageFactory-->>LVProtocol: 返回消息对象
LVProtocol->>Dispatcher: 传递消息对象
Dispatcher->>Handler: 调用注册的回调函数
Handler->>MuduoServer: 生成响应消息
MuduoServer->>Client: 发送响应
1.1 服务端模拟
所以我们实现服务端的流程就是
- 创建服务端 对象
- 将 对请求数据的回调函数 设置进 服务端对象 中
- 开始循环获取 客户端连接数据
- 根据 协议 判断是否合法
- 合法,根据 回调函数 获取信息,并 设置响应信息
- 发送响应信息给 客户端
void onMessage(const rpc::BaseConnection::ptr& conn, rpc::BaseMessage::ptr& msg){
std::string body = msg->serialize();
std::cout << body << std::endl;
auto rpc_rsp = rpc::MessageFactory::create<rpc::RpcResponse>();
rpc_rsp->setId("0001");
rpc_rsp->setMType(fields::MType::RSP_RPC);
rpc_rsp->setRcode(fields::RCode::RCODE_OK);
rpc_rsp->setResult("success");
conn->send(rpc_rsp);
}
int main(){
auto server = rpc::ServerFactory::create(9090);
server->setMessageCallback(onMessage);
server->start();
return 0;
}
1.2 客户端模拟
- 创建客户端对象,确保它可以连接到服务端的ip和端口
- 设置其对响应数据的回调函数
- 创建与服务端之间的连接
- 设置请求数据
- 发送请求数据到服务端
- 服务端根据流程将响应数据通过连接返回
- 根据回调函数处理响应数据
void onMessage(const rpc::BaseConnection::ptr& conn, rpc::BaseMessage::ptr& msg){
std::string body = msg->serialize();
std::cout << body << std::endl;
}
int main(){
auto client = rpc::ClientFactory::create("127.0.0.1", 9090);
client->setMessageCallback(onMessage);
DLOG("setMessageCallback is OK");
client->connect();
DLOG("connect is OK");
if(client->connected() == false)
ELOG("client is not connected");
auto rpc_req = rpc::MessageFactory::create<rpc::RpcRequest>();
rpc_req->setId("0002");
rpc_req->setMType(fields::MType::REQ_RPC);
rpc_req->setMethod("client_method_add");
DLOG("rpc_req is OK");
Json::Value param;
param["num1"] = 11;
param["num2"] = 22;
rpc_req->setParams(param);
DLOG("setParams is OK");
client->send(rpc_req);
DLOG("send is OK");
std::this_thread::sleep_for(std::chrono::seconds(10));
// client->shutdown();
// DLOG("shutdown is OK");
return 0;
}
最终情况大约这样
🏳️🌈二、Dispatcher 的作用与设计意义
Dispatcher
的作用是作为消息分发中心,根据消息类型将消息路由到对应的处理函数,实现了解耦和灵活性。通过注册机制,可以方便地扩展支持新的消息类型,而无需修改 Dispatcher
本身的代码。线程安全通过互斥锁保证,确保在多线程环境下正确运行。
2.1 核心作用
消息路由中心:根据消息类型(MType)将接收到的消息动态分发给预先注册的处理函数。
解耦业务逻辑:分离消息接收与处理逻辑,新增消息类型时无需修改分发逻辑。
2.2 注册消息处理函数
void registerMessage(MType mtype, const MessageCallBack& handler) {
std::unique_lock<std::mutex> lock(_mutex); // 线程安全
_handlers[mtype] = handler; // 存储消息类型与处理函数的映射
}
功能:将特定消息类型绑定到对应的处理函数(如处理RPC请求、主题订阅等)。
线程安全:使用互斥锁保护 _handlers
,防止多线程并发修改冲突。
2.2 消息分发逻辑
void onMessage(const BaseConnection::ptr& conn, BaseMessage::ptr& msg) {
std::unique_lock<std::mutex> lock(_mutex);
auto it = _handlers.find(msg->mtype()); // 查找处理函数
if (it != _handlers.end()) {
it->second(conn, msg); // 调用注册的回调
} else {
ELOG("未知消息类型: %d", static_cast<int>(msg->mtype()));
conn->shutdown(); // 关闭非法连接
}
}
流程:
- 根据消息类型查找注册的处理函数。
- 找到则执行回调,否则记录错误并关闭连接。
健壮性:对未知消息类型强制关闭连接,防止无效或恶意请求影响系统。
🏳️🌈三、应用场景
3.1 服务端
#include "message.hpp"
#include "net.hpp"
#include "dispatcher.hpp"
#include <thread>
// version - 2
// 设置Rpc响应的处理函数
void onRpcRespond(const rpc::BaseConnection::ptr& conn, rpc::BaseMessage::ptr& msg){
std::cout << "收到了Rpc响应:";
std::string body = msg->serialize();
std::cout << body << std::endl;
std::cout << "----------------------------" << std::endl;
}
// 设置Topic响应的处理函数
void onTopicRespond(const rpc::BaseConnection::ptr& conn, rpc::BaseMessage::ptr& msg){
std::cout << "收到了Topic响应:";
std::string body = msg->serialize();
std::cout << body << std::endl;
std::cout << "----------------------------" << std::endl;
}
int main(){
auto dispatcher = std::make_shared<rpc::Dispatcher>();
dispatcher->registerMessage(fields::MType::RSP_RPC, onRpcRespond); // 注册Rpc响应处理函数
dispatcher->registerMessage(fields::MType::RSP_TOPIC, onTopicRespond); // 注册Topic响应处理函数
auto client = rpc::ClientFactory::create("127.0.0.1", 9090);
// 调用 message_cb(conn, msg) → 实际执行 dispatcher->onMessage(conn, msg)
auto message_cb = std::bind(&rpc::Dispatcher::onMessage, dispatcher.get(), std::placeholders::_1, std::placeholders::_2);
// 模拟发送rpc请求
client->setMessageCallback(message_cb);
DLOG("client setMessageCallback is OK");
client->connect();
DLOG("client connect is OK");
if(client->connected() == false)
ELOG("client is not connected");
auto rpc_req = rpc::MessageFactory::create<rpc::RpcRequest>();
rpc_req->setId("0001");
rpc_req->setMType(fields::MType::REQ_RPC);
rpc_req->setMethod("client_method_add");
DLOG("rpc_req is OK");
Json::Value param;
param["num1"] = 11;
param["num2"] = 22;
rpc_req->setParams(param);
DLOG("setParams is OK");
client->send(rpc_req);
DLOG("send is OK");
// 模拟发送topic请求
auto topic_req = rpc::MessageFactory::create<rpc::TopicRequest>();
topic_req->setId("0002");
topic_req->setMType(fields::MType::REQ_TOPIC);
topic_req->setOptype(fields::TopicOptype::TOPIC_CREATE);
topic_req->setTopicKey("client_topic_key");
topic_req->setTopicMsg("client_topic_msg");
client->send(topic_req);
std::this_thread::sleep_for(std::chrono::seconds(10));
// client->shutdown();
// DLOG("shutdown is OK");
return 0;
}
3.2 客户端
#include "message.hpp"
#include "net.hpp"
#include "dispatcher.hpp"
//version - 2
// 设置PRC请求的处理函数
void onRpcRequest(const rpc::BaseConnection::ptr& conn, rpc::BaseMessage::ptr& msg){
std::cout << "收到了Rpc请求:";
std::string body = msg->serialize();
std::cout << body << std::endl;
std::cout << "----------------------------" << std::endl;
auto rpc_req = rpc::MessageFactory::create<rpc::RpcResponse>();
rpc_req->setId("0001");
rpc_req->setMType(fields::MType::RSP_RPC);
rpc_req->setRcode(fields::RCode::RCODE_OK);
rpc_req->setResult("success");
conn->send(rpc_req);
}
// 设置主题请求的处理函数
void onTopicRequest(const rpc::BaseConnection::ptr& conn, rpc::BaseMessage::ptr& msg){
std::cout << "收到了Topic请求:";
std::string body = msg->serialize();
std::cout << body << std::endl;
std::cout << "----------------------------" << std::endl;
auto topic_req = rpc::MessageFactory::create<rpc::TopicResponse>();
topic_req->setId("0002");
topic_req->setMType(fields::MType::RSP_TOPIC);
topic_req->setRcode(fields::RCode::RCODE_OK);
conn->send(topic_req);
}
int main(){
auto dispatcher = std::make_shared<rpc::Dispatcher>();
dispatcher->registerMessage(fields::MType::REQ_RPC, onRpcRequest);
dispatcher->registerMessage(fields::MType::REQ_TOPIC, onTopicRequest);
auto server = rpc::ServerFactory::create(9090);
auto message_cb = std::bind(&rpc::Dispatcher::onMessage, dispatcher.get(), std::placeholders::_1, std::placeholders::_2);
server->setMessageCallback(message_cb);
server->start();
return 0;
}
🏳️🌈补充 - dispatcher 模板化
因为我们现在的下述语句中是直接传进子类给 dispatcher
的
auto dispatcher = std::make_shared<rpc::Dispatcher>();
dispatcher->registerMessage(fields::MType::RSP_RPC, onRpcRespond); // 注册Rpc响应处理函数
dispatcher->registerMessage(fields::MType::RSP_TOPIC, onTopicRespond); // 注册Topic响应处理函数
auto client = rpc::ClientFactory::create("127.0.0.1", 9090);
// 调用 message_cb(conn, msg) → 实际执行 dispatcher->onMessage(conn, msg)
auto message_cb = std::bind(&rpc::Dispatcher::onMessage, dispatcher.get(), std::placeholders::_1, std::placeholders::_2);
但是我们在定义 dispatcher
时采用的是父类对象定义的,所以在调用这个方法时,会提高代码的冗余性,就比如说,每次要使用这个方法,都要判断一下他是属于哪一个子类,因此我们可以给 dispatcher
封装一个模板,提高效率
#include "message.hpp"
#include "net.hpp"
// Dispatcher 类在 RPC 框架中担任 消息路由中心 的角色,负责将接收到的不同消息类型动态分发给对应的处理函数
namespace rpc{
class Callback{
public:
using ptr = std::shared_ptr<Callback>;
virtual void onMessage(const BaseConnection::ptr& conn, BaseMessage::ptr& msg) = 0;
};
template<typename T>
class CallbackT : public Callback{
public:
using ptr = std::shared_ptr<CallbackT<T>>;
using MessageCallBack = std::function<void(const BaseConnection::ptr& conn, std::shared_ptr<T>& msg)>;
CallbackT(const MessageCallBack& handler)
: _handler(handler)
{}
void onMessage(const BaseConnection::ptr& conn, BaseMessage::ptr& msg){
auto type_msg = std::dynamic_pointer_cast<T>(msg);
_handler(conn, type_msg);
}
private:
MessageCallBack _handler;
};
class Dispatcher{
public:
using ptr = std::shared_ptr<Dispatcher>;
// 注册消息类型与处理函数的映射
template<typename T>
void registerHandler(MType mtype, const typename CallbackT<T>::MessageCallBack& handler){
std::unique_lock<std::mutex> lock(_mutex);
auto cb = std::make_shared<CallbackT<T>>(handler);
_handlers.insert(std::make_pair(mtype, cb));
}
// 处理消息的分发逻辑
void onMessage(const BaseConnection::ptr& conn, BaseMessage::ptr& msg){
// 找到消息类型对应的业务处理函数,并调用
std::unique_lock<std::mutex> lock(_mutex);
auto it = _handlers.find(msg->mtype());
if(it != _handlers.end()){
it->second->onMessage(conn, msg);
}
else{
// 没有找到指定类型的处理回调 - 因为客户端和服务端都是我们自己设计的,因此不可能出现这种情况
ELOG("收到未知类型的消息: %d!", static_cast<int>(msg->mtype()));
conn->shutdown();
}
}
private:
std::mutex _mutex;
std::unordered_map<MType, Callback::ptr> _handlers;
};
}
当然因为 dispatcher 设置了模板,所以我们还是需要对服务端和客户端重新的相应方法改一下
客户端
两者直接传入目标消息类,不用传基类
void onRpcRespond(const rpc::BaseConnection::ptr& conn, rpc::RpcResponse::ptr& msg)
void onTopicRespond(const rpc::BaseConnection::ptr& conn, rpc::TopicResponse::ptr& msg)
使用时添加模板
dispatcher->registerHandler<rpc::RpcResponse>(fields::MType::RSP_RPC, onRpcRespond); // 注册Rpc响应处理函数
dispatcher->registerHandler<rpc::TopicResponse>(fields::MType::RSP_TOPIC, onTopicRespond); // 注册Topic响应处理函数
服务端
void onRpcRequest(const rpc::BaseConnection::ptr& conn, rpc::RpcRequest::ptr& msg)
void onTopicRequest(const rpc::BaseConnection::ptr& conn, rpc::TopicRequest::ptr& msg)
dispatcher->registerHandler<rpc::RpcRequest>(fields::MType::REQ_RPC, onRpcRequest);
dispatcher->registerHandler<rpc::TopicRequest>(fields::MType::REQ_TOPIC, onTopicRequest);
👥总结
本篇博文对 【从零实现Json-Rpc框架】- 项目实现 - Dispatcher路由实现篇 做了一个较为详细的介绍,不知道对你有没有帮助呢
觉得博主写得还不错的三连支持下吧!会继续努力的~