当前位置: 首页 > article >正文

[实现Rpc] 客户端 | Requestor | RpcCaller的设计实现

目录

Requestor类的实现

框架

完善

onResponse处理回复

完整代码

RpcCaller类的实现

1. 同步调用 call

2. 异步调用 call

3. 回调调用 call


Requestor类的实现

(1)主要功能:

  • 客户端发送请求的功能,进行请求描述
  • 对服务器响应的处理机制,并对返回信息 进行对应接收

(2)具体实现:

  • 意义针对客户端的每一条请求进行管理,以便于对请求对应的响应做出合适操作。
  • 对于客户端而言,其通常是主动发起请求服务的一方。然而,在多线程网络通信中,针对多个请求进行响应时可能会存在时序问题,导致无法保证一个线程发送的请求后接收到的响应就是针对这条请求的响应,这是非常危险的情况。
  • 异步IO挑战:类似于Muduo库这种异步IO网络通信库,通常IO操作都是异步的,即发送数据是将数据放入发送缓冲区,而何时发送由底层网络库协调,并且不提供recv接口,而是连接触发可读事件后,IO读取数据完成调用处理回调进行数据处理,因此在发送请求后无法直接等待该条请求的响应。

解决方案

  • 创建请求管理模块,通过给每个请求设定一个请求ID来解决上述问题。服务端响应时会标识出响应针对的是哪个请求(即响应信息包含请求ID)。
  • 客户端无论收到 哪条请求的响应,都将数据存储入hash_map中,以请求ID作为映射,并提供获取指定请求ID响应的阻塞接口。这样,只要知道自己的请求ID,就能准确获取到想要的响应。
  • 进一步优化:可以将每个请求封装描述,添加异步future控制或设置回调函数的方式,不仅支持阻塞获取响应,也能实现异步获取响应及 回调处理响应。

框架

namespace bitrpc{
namespace client
{
    //客户端 部分
    class Requestor
    {
    public:
        using ptr = std::shared_ptr<Requestor>;
        using RequestCallback = std::function<void(const BaseMessage::ptr&)>;
        using AsyncResponse = std::future<BaseMessage::ptr>;

        struct RequestDescribe
        {
            using ptr=std::shared_ptr<RequestDescribe>;//智能指针 管理
        };
        //请求 信息描述
        //之后 好调用 所需要的rsp函数

//Dispatcher调用
        void onResponse(const BaseConnection::ptr &conn,BaseMessage::ptr &msg)
        {
            
        }
    //异步发送
        bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, AsyncResponse &async_rsp)
        {}
    //同步发送
        bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, BaseMessage::ptr &rsp) 
        {}
   //回调发送
        bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, const RequestCallback &cb) 
        {}

     private:
     //对于 请求 描述进行CURD

     //增
            RequestDescribe::ptr newDescribe(const BaseMessage::ptr &req, RType rtype, 
                const RequestCallback &cb = RequestCallback()) {}
    //查 rid
            RequestDescribe::ptr getDescribe(const std::string &rid)
              {}
    //删
            void delDescribe(const std::string &rid)
            {}

        private:
            std::mutex _mutex;
            std::unordered_map<std::string, RequestDescribe::ptr> _request_desc;
    };
}
}

完善

信息描述

struct RequestDescribe {
                    using ptr = std::shared_ptr<RequestDescribe>;
                    BaseMessage::ptr request;
                    RType rtype;
                    std::promise<BaseMessage::ptr> response;
                    RequestCallback callback;
                };

对收到的响应 通过 uid ,对应上是哪个请求发出的

实现了上面的解决问题

void onResponse(const BaseConnection::ptr &conn, BaseMessage::ptr &msg){
    
    std::string rid = msg->rid();
    
    RequestDescribe::ptr rdp = getDescribe(rid);
    //根据先获取msg->rid() 来进行结果的调用
    
    if (rdp.get() == nullptr) {
        ELOG("收到响应 - %s,但是未找到对应的请求描述!", rid.c_str());
        return;
    }
    
    if (rdp->rtype == RType::REQ_ASYNC) {
        rdp->response.set_value(msg);//调用 不同的接口
        
    }else if (rdp->rtype == RType::REQ_CALLBACK){
        if (rdp->callback) rdp->callback(msg);
    }else {
        ELOG("请求类型未知!!");
    }
    
    delDescribe(rid);
}
onResponse处理回复
  • onResponse方法是对收到的消息进行处理的入口点。当收到服务器的响应时,该方法会被调用来匹配相应的请求描述(RequestDescribe),并通过请求类型(RType)来决定如何处理响应:
    • 如果是 异步请求(RType::REQ_ASYNC),则通过设置std::promise的值(response.set_value(msg))来完成对应的std::future,使得调用者可以通过未来对象获取响应。
    • 如果是带有回调的请求(RType::REQ_CALLBACK),则直接调用注册的回调函数(rdp->callback(msg))来处理响应。
    • 如果请求类型未知,则记录错误日志。

onResponse方法则是 对接收到的响应进行处理,


关于 promise set_value: C++11 异步操作 future类_文档学习

send方法负责构建和发送请求,

//异步操作
bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, AsyncResponse &async_rsp)
{
    RequestDescribe::ptr rdp = newDescribe(req, RType::REQ_ASYNC);
    if (rdp.get() == nullptr)
    {
        ELOG("构造请求描述对象失败!");
        return false;
    }
    conn->send(req);
    async_rsp = rdp->response.get_future();
    return true;
}
//同步操作
bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, BaseMessage::ptr &rsp)
{
    AsyncResponse rsp_future;
    bool ret = send(conn, req, rsp_future);
    if (ret == false)
    {
        return false;
    }
    rsp = rsp_future.get();
    return true;
}
//回调函数
bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, const RequestCallback &cb)
{
    RequestDescribe::ptr rdp = newDescribe(req, RType::REQ_CALLBACK, cb);
    if (rdp.get() == nullptr)
    {
        ELOG("构造请求描述对象失败!");
        return false;
    }
    conn->send(req);//!!!!!!!!!!!!!!!!
    return true;
}

1.

前文回顾:[C++#28][多态] 两个条件 | 虚函数表 | 抽象类 | override 和 final | 重载 重写 重定义

Requestor类中的 三个重载send方法,这些方法用于通过指定的连接对象发送消息

同:

  • 基本参数:所有三个send方法都接受两个相同的基本参数:一个指向BaseConnection的智能指针conn(表示网络连接)和一个指向BaseMessage的智能指针req(表示要发送的消息)。
  • 错误处理:每个send方法在无法成功创建请求描述对象时,都会记录错误日志并返回false,指示操作失败。
  • 消息发送:无论哪种方式,最终都是通过调用conn->send(req)来执行实际的消息发送。

异:

  • send方法(有三个重载版本)用于通过网络连接conn发送请求消息req到服务器:
    • 第一个send方法接受一个AsyncResponse &async_rsp参数,用于异步发送请求并返回一个std::future对象,以便于后续获取响应。(不阻塞
    • 第二个send方法是同步的,它 等待直到接收到服务器的响应并将结果赋值给BaseMessage::ptr &rsp
    • 第三个send方法允许用户在发送请求时提供一个回调函数const RequestCallback &cb,当收到响应时会自动调用该回调进行处理。(send 后,就不管了,不会阻塞等待)

📒对比第一种和第三种方式:

  1. 结果获取方式
    • 第一种方法需调用者主动通过future.get()获取结果,可能导致阻塞。
    • 第三种方法响应到达时自动调用回调函数处理结果,无需主动获取。
  1. 编程模型
    • 第一种更接近同步编程风格,通过异步手段避免长时间阻塞。
    • 第三种是典型的异步编程模型,更适合处理并发任务和事件驱动架构。
  1. 灵活性与复杂性
    • 第一种方法直观但可能引入复杂的依赖关系管理。
    • 第三种方法灵活,尤其适合链式异步操作,但可能导致“回调地狱”,增加代码维护难度。

<id,请求 desc> CURD

private:
                // 对于 请求 描述进行CURD
            
            //增
                RequestDescribe::ptr newDescribe(const BaseMessage::ptr &req, RType rtype,
                                                 const RequestCallback &cb = RequestCallback())
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                 //加锁
                    RequestDescribe::ptr rd = std::make_shared<RequestDescribe>();
                    rd->request = req;
                    rd->rtype = rtype;
                    if (rtype == RType::REQ_CALLBACK && cb)
                    {
                        rd->callback = cb;
                    }
                    _request_desc.insert(std::make_pair(req->GetId(), rd));
        //将id 和描述 进行对应
                    return rd;
                }

            //查
                RequestDescribe::ptr getDescribe(const std::string &rid)
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    auto it = _request_desc.find(rid);
                    if (it == _request_desc.end())
                    {
                        return RequestDescribe::ptr();
                    }
                    return it->second;
                }

            //删
                void delDescribe(const std::string &rid)
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    _request_desc.erase(rid);
                }

完整代码

#pragma once
#include "../common/net.hpp"
#include "../common/message.hpp"
#include <future> //异步操作
#include <functional> //1.灵活的函数使用 bind function

namespace bitrpc{
namespace client
{
    //客户端 部分
    class Requestor
    {
    public:
        using ptr = std::shared_ptr<Requestor>;
        using RequestCallback = std::function<void(const BaseMessage::ptr&)>;

        using AsyncResponse = std::future<BaseMessage::ptr>;//异步处理信息

        struct RequestDescribe {
                    using ptr = std::shared_ptr<RequestDescribe>;
                    BaseMessage::ptr request;
                    RType rtype;
                    std::promise<BaseMessage::ptr> response;
                    RequestCallback callback;
                };

        //请求 信息描述
        //之后 好调用 所需要的rsp函数

//Dispatcher 给RSP_RPC回复调用的
         void onResponse(const BaseConnection::ptr &conn, BaseMessage::ptr &msg){
                    std::string rid = msg->GetId();
                    RequestDescribe::ptr rdp = getDescribe(rid);
                    if (rdp.get() == nullptr) {
                        ELOG("收到响应 - %s,但是未找到对应的请求描述!", rid.c_str());
                        return;
                    }
                    if (rdp->rtype == RType::REQ_ASYNC) {
                        rdp->response.set_value(msg);
                    }else if (rdp->rtype == RType::REQ_CALLBACK){
                        if (rdp->callback) rdp->callback(msg);
                    }else {
                        ELOG("请求类型未知!!");
                    }
                    delDescribe(rid);
                }
//!!!!!!!对收到 的回复请求 进行id存储

    //异步详可见demo中的 使用
        //异步操作
                bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, AsyncResponse &async_rsp)
                {
                    RequestDescribe::ptr rdp = newDescribe(req, RType::REQ_ASYNC);
                    if (rdp.get() == nullptr)
                    {
                        ELOG("构造请求描述对象失败!");
                        return false;
                    }
                    conn->send(req);
                    async_rsp = rdp->response.get_future();
                    return true;
                }
        //同步操作
                bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, BaseMessage::ptr &rsp)
                {
                    AsyncResponse rsp_future;
                    bool ret = send(conn, req, rsp_future);
                    if (ret == false)
                    {
                        return false;
                    }
                    rsp = rsp_future.get();
                    return true;
                }
        //回调函数
                bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, const RequestCallback &cb)
                {
                    RequestDescribe::ptr rdp = newDescribe(req, RType::REQ_CALLBACK, cb);
                    if (rdp.get() == nullptr)
                    {
                        ELOG("构造请求描述对象失败!");
                        return false;
                    }
                    conn->send(req);
                    return true;
                }

//
            private:
                // <id,请求 desc> CURD
            
            //增
                RequestDescribe::ptr newDescribe(const BaseMessage::ptr &req, RType rtype,
                                                 const RequestCallback &cb = RequestCallback())
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    RequestDescribe::ptr rd = std::make_shared<RequestDescribe>();
                    rd->request = req;
                    rd->rtype = rtype;
                    if (rtype == RType::REQ_CALLBACK && cb)
                    {
                        rd->callback = cb;
                    }
                    _request_desc.insert(std::make_pair(req->GetId(), rd));
        //将id 和描述 进行对应
                    return rd;
                }

            //查
                RequestDescribe::ptr getDescribe(const std::string &rid)
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    auto it = _request_desc.find(rid);
                    if (it == _request_desc.end())
                    {
                        return RequestDescribe::ptr();
                    }
                    return it->second;
                }

            //删
                void delDescribe(const std::string &rid)
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    _request_desc.erase(rid);
                }

            private:
                std::mutex _mutex;
                std::unordered_map<std::string, RequestDescribe::ptr> _request_desc;
    };
}
}

RpcCaller类的实现

(Requestor 的处理 调用 RpcCaller)

(1)主要功能:

  • 给Requestor() 提供接口。

(2)具体实现:

  • 意义:向用户提供进行RPC调用的模块。这个模块相对简单,主要功能是向外提供几个RPC调用接口,内部实现向服务端发送请求并等待获取结果。
  • 调用方式
    1. 同步调用:发起调用后,等到收到响应结果后返回。
    2. 异步调用:发起调用后立即返回,可以在需要的时候获取结果。
    3. 回调调用:发起调用的同时设置结果的处理回调,收到响应后自动对结果进行回调处理。

❗❗❗❗ 

// requestor中的处理是针对BaseMessage进行处理的
// 用于在rpccaller中针对结果的处理是针对 RpcResponse里边的result进行的
#pragma once
#include "requestor.hpp"

// request 有 rpc topic server
// 其中 rpc部分的 调用函数的 实现

namespace bitrpc
{
    namespace client
    {
        class RpcCaller
        {
        public:
        using ptr = std::shared_ptr<RpcCaller>;
        using JsonAsyncResponse = std::future<Json::Value>;
        using JsonResponseCallback = std::function<void(const Json::Value &)>;
        RpcCaller(const Requestor::ptr &requestor) : _requestor(requestor) {}

        // requestor中的处理是针对BaseMessage进行处理的
        // 用于在rpccaller中针对结果的处理是针对 RpcResponse里边的result进行的

        //1.
        bool call(const BaseConnection::ptr &conn, const std::string &method,
        const Json::Value &params, Json::Value &result)
        {

            DLOG("开始同步rpc调用...");
            // 1. 组织请求
            auto req_msg = MessageFactory::create<RpcRequest>();
            req_msg->SetId(UUID::uuid());
            req_msg->setMethod(method);
            req_msg->setParams(params);
            req_msg->SetMType(MType::REQ_RPC);

            BaseMessage::ptr rsp_msg;
            // 2. 发送请求
            bool ret = _requestor->send(conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), rsp_msg);
            if (ret == false)
            {
                ELOG("同步Rpc请求失败!");
                return false;
            }
            DLOG("收到响应,进行解析,获取结果!");
            // 3. 等待响应
            auto rpc_rsp_msg = std::dynamic_pointer_cast<RpcResponse>(rsp_msg);
            if (!rpc_rsp_msg)
            {
                ELOG("rpc响应,向下类型转换失败!");
                return false;
            }
            if (rpc_rsp_msg->rcode() != RCode::RCODE_OK)
            {
                ELOG("rpc请求出错:%s", errReason(rpc_rsp_msg->rcode()));
                return false;
            }
            result = rpc_rsp_msg->result();
            DLOG("结果设置完毕!");
            return true;
        }


        /
        /

        //2.
        bool call(const BaseConnection::ptr &conn, const std::string &method,
        const Json::Value &params, JsonAsyncResponse &result)
        {
            // 向服务器发送异步回调请求,设置回调函数,回调函数中会传入一个promise对象,在回调函数中去堆promise设置数据
            auto req_msg = MessageFactory::create<RpcRequest>();
            req_msg->SetId(UUID::uuid());
            req_msg->SetMType(MType::REQ_RPC);
            req_msg->setMethod(method);
            req_msg->setParams(params);

            auto json_promise = std::make_shared<std::promise<Json::Value>>();
            result = json_promise->get_future();
            Requestor::RequestCallback cb = std::bind(&RpcCaller::Callback,
            this, json_promise, std::placeholders::_1);
                bool ret = _requestor->send(conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), cb);
                if (ret == false)
                {
                    ELOG("异步Rpc请求失败!");
                    return false;
                }
                return true;
            }

   
//3.

            bool call(const BaseConnection::ptr &conn, const std::string &method,
                      const Json::Value &params, const JsonResponseCallback &cb)
            {
                auto req_msg = MessageFactory::create<RpcRequest>();
                req_msg->SetId(UUID::uuid());
                req_msg->SetMType(MType::REQ_RPC);
                req_msg->setMethod(method);
                req_msg->setParams(params);

                Requestor::RequestCallback req_cb = std::bind(&RpcCaller::Callback1,
                                                              this, cb, std::placeholders::_1);
                bool ret = _requestor->send(conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), req_cb);
                if (ret == false)
                {
                    ELOG("回调Rpc请求失败!");
                    return false;
                }
                return true;
            }

        private:
            void Callback1(const JsonResponseCallback &cb, const BaseMessage::ptr &msg)
            {
                auto rpc_rsp_msg = std::dynamic_pointer_cast<RpcResponse>(msg);
                if (!rpc_rsp_msg)
                {
                    ELOG("rpc响应,向下类型转换失败!");
                    return;
                }
                if (rpc_rsp_msg->rcode() != RCode::RCODE_OK)
                {
                    ELOG("rpc回调请求出错:%s", errReason(rpc_rsp_msg->rcode()));
                    return;
                }
                cb(rpc_rsp_msg->result());
            }
            void Callback(std::shared_ptr<std::promise<Json::Value>> result, const BaseMessage::ptr &msg)
            {
                auto rpc_rsp_msg = std::dynamic_pointer_cast<RpcResponse>(msg);
                if (!rpc_rsp_msg)
                {
                    ELOG("rpc响应,向下类型转换失败!");
                    return;
                }
                if (rpc_rsp_msg->rcode() != RCode::RCODE_OK)
                {
                    ELOG("rpc异步请求出错:%s", errReason(rpc_rsp_msg->rcode()));
                    return;
                }
                result->set_value(rpc_rsp_msg->result());
            }

        private:
            Requestor::ptr _requestor;
        };
    }
}

RpcCaller类中 三个重载的call方法,这些方法提供了不同的方式来发起RPC调用,并处理从服务器返回的响应。每个call方法根据其参数和使用场景的不同,具有特定的功能和适用性:

1. 同步调用 call

  • 目的:同步地发起一个RPC请求,并等待直到收到服务器的响应。
  • 实现
    • 创建并配置一个RpcRequest消息。
    • 使用_requestor->send()发送请求,并阻塞等待响应。
    • 接收到响应后解析结果,并检查是否有错误发生。
    • 将结果设置到输出参数result中 返回给调用者。
  • 应用场景:适用于需要立即获取结果且可以接受当前线程被阻塞的情况。

2. 异步调用 call

  • 目的:异步地发起一个RPC请求,不阻塞当前线程,允许后续通过std::future机制获取结果。
  • 实现
    • 创建并配置一个RpcRequest消息。
    • 绑定一个回调函数Callback用于在接收到响应时设置std::promise的值。
    • 使用_requestor->send()发送请求,并立即返回(非阻塞)。
    • 结果 可以通过result.get()在未来某个时刻获取。
  • 应用场景:适合于那些希望避免阻塞主线程,但仍然需要明确获取结果的场景。

3. 回调调用 call

  • 目的:异步发起RPC请求并在接收到响应时自动调用用户提供的回调函数进行处理。
  • 实现
    • 创建并配置一个RpcRequest消息。
    • 绑定一个回调函数Callback1,该回调会在接收到响应时被调用,并进一步调用用户提供的回调函数处理结果。
    • 使用_requestor->send()发送请求,并立即返回(非阻塞)。
  • 应用场景:适用于不需要立即处理响应结果,到了 就回调 来处理响应数据的场景。

本节重点,通过 重载 来实现同步 回调 异步

  • 同步:阻塞 返回结果参数
  • 回调:非阻塞 到了就返回结果
  • 异步:非阻塞 .get()获取


http://www.kler.cn/a/560393.html

相关文章:

  • JVM view(1)
  • rust笔记9-引用与原始指针
  • 浏览器JS打不上断点,一点就跳到其他文件里。浏览器控制台 js打断点,指定的位置打不上断点,一打就跳到其他地方了。
  • 精准识别IP应用场景
  • 【运维】内网服务器借助通过某台可上外网的服务器实现公网访问
  • 玩机日记 12 fnOS使用lucky反代https转发到外网提供服务
  • MTK Android12 预装apk可卸载
  • Flutter 上的 Platform 和 UI 线程合并是怎么回事?它会带来什么?
  • Gin从入门到精通 (七)文件上传和下载
  • 自定义SpringBoot Starter
  • 1.✨Java学习笔记
  • Win10登录Samba服务器报用户名密码错误问题解决
  • Windows 11【1001问】如何下载Windows 11系统镜像
  • 安装可视化jar包部署平台JarManage
  • 【排序算法】堆排序详解
  • 金融行业数据安全:KSP密钥管理系统如何保障支付交易与客户信息零泄露
  • springcloud负载均衡策略有哪些
  • 芯谷D1308:低成本、高性能的便携式音频解决方案
  • 【数据处理】COCO 数据集掩码 Run-Length Encoding (RLE) 编码转二进制掩码
  • UE5 Gameplay框架及继承关系详解