当前位置: 首页 > article >正文

【从零实现Json-Rpc框架】- 项目实现 - Dispatcher模块实现篇

📢博客主页:https://blog.csdn.net/2301_779549673
📢博客仓库:https://gitee.com/JohnKingW/linux_test/tree/master/lesson
📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
📢本文由 JohnKi 原创,首发于 CSDN🙉
📢未来很长,值得我们全力奔赴更美好的生活✨

在这里插入图片描述

在这里插入图片描述

文章目录

  • 📢前言
  • 🏳️‍🌈一、通信模拟
    • 1.1 服务端模拟
    • 1.2 客户端模拟
  • 🏳️‍🌈二、Dispatcher 的作用与设计意义
    • ​2.1 核心作用
    • 2.2 注册消息处理函数
    • 2.2 消息分发逻辑
  • 🏳️‍🌈三、应用场景
    • 3.1 服务端
    • 3.2 客户端
  • 🏳️‍🌈补充 - dispatcher 模板化
    • 客户端
    • 服务端
  • 👥总结


📢前言

前几篇文章中,笔者介绍了rpc的原理和目的,也介绍了需要使用的部分第三方库和我们所需实现的功能

现在我们着手项目实现篇章,目前零碎接口项目消息字段类型 都已经完成了

截至上一篇文章,抽象层及其封装都已经完善了

所以我们现在需要去使用这些代码,去简单实现单线程的通信传播是否可行

并由此引出 Dispatcher路由


🏳️‍🌈一、通信模拟

这是利用代码实现的服务端通信流程图

sequenceDiagram
    participant Client
    participant MuduoServer
    participant LVProtocol
    participant Dispatcher

    Client->>MuduoServer: 发送二进制数据流
    MuduoServer->>LVProtocol: canProcessed()检查数据完整性
    LVProtocol->>LVProtocol: onMessage()解析消息头
    LVProtocol->>MessageFactory: 创建具体消息对象
    MessageFactory-->>LVProtocol: 返回消息对象
    LVProtocol->>Dispatcher: 传递消息对象
    Dispatcher->>Handler: 调用注册的回调函数
    Handler->>MuduoServer: 生成响应消息
    MuduoServer->>Client: 发送响应

1.1 服务端模拟

所以我们实现服务端的流程就是

  1. 创建服务端 对象
  2. 对请求数据的回调函数 设置进 服务端对象
  3. 开始循环获取 客户端连接数据
  4. 根据 协议 判断是否合法
  5. 合法,根据 回调函数 获取信息,并 设置响应信息
  6. 发送响应信息给 客户端
void onMessage(const rpc::BaseConnection::ptr& conn, rpc::BaseMessage::ptr& msg){
    std::string body = msg->serialize();
    std::cout << body << std::endl;
    auto rpc_rsp = rpc::MessageFactory::create<rpc::RpcResponse>();
    rpc_rsp->setId("0001");
    rpc_rsp->setMType(fields::MType::RSP_RPC);
    rpc_rsp->setRcode(fields::RCode::RCODE_OK);
    rpc_rsp->setResult("success");
    conn->send(rpc_rsp);
}


int main(){
    auto server = rpc::ServerFactory::create(9090);


    server->setMessageCallback(onMessage);
    server->start();
    return 0;
}

1.2 客户端模拟

  1. 创建客户端对象,确保它可以连接到服务端的ip和端口
  2. 设置其对响应数据的回调函数
  3. 创建与服务端之间的连接
  4. 设置请求数据
  5. 发送请求数据到服务端
  6. 服务端根据流程将响应数据通过连接返回
  7. 根据回调函数处理响应数据
void onMessage(const rpc::BaseConnection::ptr& conn, rpc::BaseMessage::ptr& msg){
    std::string body = msg->serialize();
    std::cout << body << std::endl;
}


int main(){
    auto client = rpc::ClientFactory::create("127.0.0.1", 9090);
    client->setMessageCallback(onMessage);
    DLOG("setMessageCallback is OK");

    client->connect();
    DLOG("connect is OK");

    if(client->connected() == false)
        ELOG("client is not connected");

    auto rpc_req = rpc::MessageFactory::create<rpc::RpcRequest>();
    rpc_req->setId("0002");
    rpc_req->setMType(fields::MType::REQ_RPC);
    rpc_req->setMethod("client_method_add");
    DLOG("rpc_req is OK");
    
    Json::Value param;
    param["num1"] = 11;
    param["num2"] = 22;
    rpc_req->setParams(param);
    DLOG("setParams is OK");

    client->send(rpc_req);
    DLOG("send is OK");

    std::this_thread::sleep_for(std::chrono::seconds(10));

    // client->shutdown();
    // DLOG("shutdown is OK");
    return 0;
}

最终情况大约这样
在这里插入图片描述

🏳️‍🌈二、Dispatcher 的作用与设计意义

Dispatcher 的作用是作为消息分发中心,根据消息类型将消息路由到对应的处理函数,实现了解耦和灵活性。通过注册机制,可以方便地扩展支持新的消息类型,而无需修改 Dispatcher 本身的代码。线程安全通过互斥锁保证,确保在多线程环境下正确运行。

​2.1 核心作用

​消息路由中心:根据消息类型(MType)将接收到的消息动态分发给预先注册的处理函数。
​解耦业务逻辑:分离消息接收与处理逻辑,新增消息类型时无需修改分发逻辑。

2.2 注册消息处理函数

void registerMessage(MType mtype, const MessageCallBack& handler) {
    std::unique_lock<std::mutex> lock(_mutex); // 线程安全
    _handlers[mtype] = handler; // 存储消息类型与处理函数的映射
}

功能:将特定消息类型绑定到对应的处理函数(如处理RPC请求、主题订阅等)。
​线程安全:使用互斥锁保护 _handlers,防止多线程并发修改冲突。

2.2 消息分发逻辑

void onMessage(const BaseConnection::ptr& conn, BaseMessage::ptr& msg) {
    std::unique_lock<std::mutex> lock(_mutex);
    auto it = _handlers.find(msg->mtype()); // 查找处理函数
    if (it != _handlers.end()) {
        it->second(conn, msg); // 调用注册的回调
    } else {
        ELOG("未知消息类型: %d", static_cast<int>(msg->mtype()));
        conn->shutdown(); // 关闭非法连接
    }
}

流程

  • 根据消息类型查找注册的处理函数。
  • 找到则执行回调,否则记录错误并关闭连接。

​健壮性:对未知消息类型强制关闭连接,防止无效或恶意请求影响系统。

在这里插入图片描述

🏳️‍🌈三、应用场景

3.1 服务端

#include "message.hpp"
#include "net.hpp"
#include "dispatcher.hpp"
#include <thread>


// version - 2
// 设置Rpc响应的处理函数
void onRpcRespond(const rpc::BaseConnection::ptr& conn, rpc::BaseMessage::ptr& msg){
    std::cout << "收到了Rpc响应:";
    std::string body = msg->serialize();
    std::cout << body << std::endl;
    std::cout << "----------------------------" << std::endl;
}

// 设置Topic响应的处理函数
void onTopicRespond(const rpc::BaseConnection::ptr& conn, rpc::BaseMessage::ptr& msg){
    std::cout << "收到了Topic响应:";
    std::string body = msg->serialize();
    std::cout << body << std::endl;
    std::cout << "----------------------------" << std::endl;
}



int main(){
    auto dispatcher = std::make_shared<rpc::Dispatcher>();
    dispatcher->registerMessage(fields::MType::RSP_RPC, onRpcRespond);          // 注册Rpc响应处理函数
    dispatcher->registerMessage(fields::MType::RSP_TOPIC, onTopicRespond);      // 注册Topic响应处理函数

    auto client = rpc::ClientFactory::create("127.0.0.1", 9090);

    // 调用 message_cb(conn, msg) → 实际执行 dispatcher->onMessage(conn, msg)
    auto message_cb = std::bind(&rpc::Dispatcher::onMessage, dispatcher.get(), std::placeholders::_1, std::placeholders::_2); 



    // 模拟发送rpc请求
    client->setMessageCallback(message_cb);
    DLOG("client setMessageCallback is OK");

    client->connect();
    DLOG("client connect is OK");

    if(client->connected() == false)
        ELOG("client is not connected");

    auto rpc_req = rpc::MessageFactory::create<rpc::RpcRequest>();
    rpc_req->setId("0001");
    rpc_req->setMType(fields::MType::REQ_RPC);
    rpc_req->setMethod("client_method_add");
    DLOG("rpc_req is OK");
    
    Json::Value param;
    param["num1"] = 11;
    param["num2"] = 22;
    rpc_req->setParams(param);
    DLOG("setParams is OK");

    client->send(rpc_req);
    DLOG("send is OK");





    // 模拟发送topic请求
    auto topic_req = rpc::MessageFactory::create<rpc::TopicRequest>();
    topic_req->setId("0002");
    topic_req->setMType(fields::MType::REQ_TOPIC);
    topic_req->setOptype(fields::TopicOptype::TOPIC_CREATE);
    topic_req->setTopicKey("client_topic_key");
    topic_req->setTopicMsg("client_topic_msg");

    client->send(topic_req);   



    std::this_thread::sleep_for(std::chrono::seconds(10));

    // client->shutdown();
    // DLOG("shutdown is OK");
    return 0;
}

3.2 客户端

#include "message.hpp"
#include "net.hpp"
#include "dispatcher.hpp"

//version - 2
// 设置PRC请求的处理函数
void onRpcRequest(const rpc::BaseConnection::ptr& conn, rpc::BaseMessage::ptr& msg){
    std::cout << "收到了Rpc请求:";
    std::string body = msg->serialize();
    std::cout << body << std::endl;
    std::cout << "----------------------------" << std::endl;

    auto rpc_req = rpc::MessageFactory::create<rpc::RpcResponse>();
    rpc_req->setId("0001");
    rpc_req->setMType(fields::MType::RSP_RPC);
    rpc_req->setRcode(fields::RCode::RCODE_OK);
    rpc_req->setResult("success");

    conn->send(rpc_req);
}

// 设置主题请求的处理函数
void onTopicRequest(const rpc::BaseConnection::ptr& conn, rpc::BaseMessage::ptr& msg){
    std::cout << "收到了Topic请求:";
    std::string body = msg->serialize();
    std::cout << body << std::endl;
    std::cout << "----------------------------" << std::endl;

    auto topic_req = rpc::MessageFactory::create<rpc::TopicResponse>();
    topic_req->setId("0002");
    topic_req->setMType(fields::MType::RSP_TOPIC);
    topic_req->setRcode(fields::RCode::RCODE_OK);
    
    conn->send(topic_req);
}


int main(){
    auto dispatcher = std::make_shared<rpc::Dispatcher>();
    dispatcher->registerMessage(fields::MType::REQ_RPC, onRpcRequest);
    dispatcher->registerMessage(fields::MType::REQ_TOPIC, onTopicRequest);

    auto server = rpc::ServerFactory::create(9090);

    auto message_cb = std::bind(&rpc::Dispatcher::onMessage, dispatcher.get(), std::placeholders::_1, std::placeholders::_2);


    server->setMessageCallback(message_cb);
    server->start();
    return 0;
}

在这里插入图片描述

🏳️‍🌈补充 - dispatcher 模板化

因为我们现在的下述语句中是直接传进子类给 dispatcher

auto dispatcher = std::make_shared<rpc::Dispatcher>();
    dispatcher->registerMessage(fields::MType::RSP_RPC, onRpcRespond);          // 注册Rpc响应处理函数
    dispatcher->registerMessage(fields::MType::RSP_TOPIC, onTopicRespond);      // 注册Topic响应处理函数

    auto client = rpc::ClientFactory::create("127.0.0.1", 9090);

    // 调用 message_cb(conn, msg) → 实际执行 dispatcher->onMessage(conn, msg)
    auto message_cb = std::bind(&rpc::Dispatcher::onMessage, dispatcher.get(), std::placeholders::_1, std::placeholders::_2); 

但是我们在定义 dispatcher 时采用的是父类对象定义的,所以在调用这个方法时,会提高代码的冗余性,就比如说,每次要使用这个方法,都要判断一下他是属于哪一个子类,因此我们可以给 dispatcher 封装一个模板,提高效率
在这里插入图片描述

#include "message.hpp"
#include "net.hpp"

// Dispatcher 类在 RPC 框架中担任 ​消息路由中心 的角色,负责将接收到的不同消息类型动态分发给对应的处理函数
namespace rpc{
    class Callback{
        public:
            using ptr = std::shared_ptr<Callback>;
            virtual void onMessage(const BaseConnection::ptr& conn, BaseMessage::ptr& msg) = 0;
    };

    template<typename T>
    class CallbackT : public Callback{
        public:
            using ptr = std::shared_ptr<CallbackT<T>>;
            using MessageCallBack = std::function<void(const BaseConnection::ptr& conn, std::shared_ptr<T>& msg)>;


            CallbackT(const MessageCallBack& handler)
                : _handler(handler)
            {}

            void onMessage(const BaseConnection::ptr& conn, BaseMessage::ptr& msg){
                auto type_msg = std::dynamic_pointer_cast<T>(msg);
                _handler(conn, type_msg);
            }
        private:
            MessageCallBack _handler;
    };

    class Dispatcher{
        public:
            using ptr = std::shared_ptr<Dispatcher>;
            
            // 注册消息类型与处理函数的映射
            template<typename T>
            void registerHandler(MType mtype, const typename CallbackT<T>::MessageCallBack& handler){
                std::unique_lock<std::mutex> lock(_mutex);
                auto cb = std::make_shared<CallbackT<T>>(handler);
                _handlers.insert(std::make_pair(mtype, cb));
            }


            // 处理消息的分发逻辑
            void onMessage(const BaseConnection::ptr& conn, BaseMessage::ptr& msg){
                // 找到消息类型对应的业务处理函数,并调用
                std::unique_lock<std::mutex> lock(_mutex);
                auto it = _handlers.find(msg->mtype());
                if(it != _handlers.end()){
                    it->second->onMessage(conn, msg);
                }
                else{
                    // 没有找到指定类型的处理回调 - 因为客户端和服务端都是我们自己设计的,因此不可能出现这种情况
                    ELOG("收到未知类型的消息: %d!", static_cast<int>(msg->mtype()));
                    conn->shutdown();
                }
            }
        private:
             std::mutex _mutex;
             std::unordered_map<MType, Callback::ptr> _handlers;
    };
}

当然因为 dispatcher 设置了模板,所以我们还是需要对服务端和客户端重新的相应方法改一下

客户端

两者直接传入目标消息类,不用传基类

void onRpcRespond(const rpc::BaseConnection::ptr& conn, rpc::RpcResponse::ptr& msg)
void onTopicRespond(const rpc::BaseConnection::ptr& conn, rpc::TopicResponse::ptr& msg)

使用时添加模板

dispatcher->registerHandler<rpc::RpcResponse>(fields::MType::RSP_RPC, onRpcRespond);          // 注册Rpc响应处理函数
dispatcher->registerHandler<rpc::TopicResponse>(fields::MType::RSP_TOPIC, onTopicRespond);      // 注册Topic响应处理函数

服务端

void onRpcRequest(const rpc::BaseConnection::ptr& conn, rpc::RpcRequest::ptr& msg)
void onTopicRequest(const rpc::BaseConnection::ptr& conn, rpc::TopicRequest::ptr& msg)
    dispatcher->registerHandler<rpc::RpcRequest>(fields::MType::REQ_RPC, onRpcRequest);
    dispatcher->registerHandler<rpc::TopicRequest>(fields::MType::REQ_TOPIC, onTopicRequest);

👥总结

本篇博文对 【从零实现Json-Rpc框架】- 项目实现 - Dispatcher路由实现篇 做了一个较为详细的介绍,不知道对你有没有帮助呢

觉得博主写得还不错的三连支持下吧!会继续努力的~

请添加图片描述


http://www.kler.cn/a/614732.html

相关文章:

  • AI+Xmind自动生成测试用例(思维导图格式)
  • RabbitMQ消息相关
  • 垃圾回收机制的几种实现机制简介
  • 【keil】单步调试
  • ✨分享我在飞书多维表格中使用DeepSeek的经历✨
  • c++第三课(基础c)
  • Android Jetpack学习总结(源码级理解)
  • 《云原生安全攻防》-- K8s容器安全:权限最小化与SecurityContext
  • 模块化革命:树莓派CM5嵌入式工业计算机如何重构嵌入式系统开发边界
  • 在IDEA中使用TortoiseSVN
  • C语言笔记数据结构(链表)
  • LangChain 基础系列之 Prompt 工程详解:从设计原理到实战模板
  • 爬虫问题整理(2025.3.27)
  • 软件的常用设计模式。可参考一个一个学习
  • 数据结构与算法——顺序表之手撕OJ题
  • 【商城实战(95)】Ansible自动化运维,开启高效部署新篇章
  • 2025清华大学:DeepSeek教程全集(PDF+视频精讲,共10份).zip
  • 前端工程化--gulp的使用
  • 思维链技术(Chain-of-Thought, CoT)
  • Python库()