当前位置: 首页 > article >正文

【brpc学习实战三】同步、异步、半同步原理

同步访问

指的是:RPC请求(CallMethod、xxx_stub.xxxService)会阻塞到收到server端返回response或发生错误(包括超时)。

同步访问中的response/controller不会在CallMethod后被框架使用,它们都可以分配在栈上。注意,如果request/response字段特别多字节数特别大的话,还是更适合分配在堆上。

MyRequest request;
MyResponse response;
brpc::Controller cntl;
XXX_Stub stub(&channel);
 
request.set_foo(...);
cntl.set_timeout_ms(...);
// 发起请求,阻塞到返回
stub.some_method(&cntl, &request, &response, NULL);
if (cntl->Failed()) {
    // RPC失败了. response里的值是未定义的,勿用。
} else {
    // RPC成功了,response里有我们想要的回复数据。
}
警告: 请勿在持有pthread锁的情况下,调用brpc的同步CallMethod!否则很容易导致死锁。

解决方案(二选一):

    将pthread锁换成bthread锁(bthread_mutex_t)
    在CallMethod之前将锁释放

异步访问

指的是:给CallMethod传递一个额外的回调对象done,CallMethod在发出request后就结束了,而不是在RPC结束后。当server端返回response或发生错误(包括超时)时,done->Run()会被调用。对RPC的后续处理应该写在done->Run()里,而不是CallMethod后。

也就是说我发起请求后我就退出了,至于请求的响应及其处理,我给你提供一个回调来处理。

由于CallMethod结束不意味着RPC结束,response/controller仍可能被框架及done->Run()使用,它们一般得创建在堆上,并在done->Run()中删除。如果提前删除了它们,那当done->Run()被调用时,将访问到无效内存。

你可以独立地创建这些对象,并使用NewCallback生成done,也可以把Response和Controller作为done的成员变量,一起new出来,一般使用前一种方法。

发起异步请求后Request可以立刻析构。(SelectiveChannel是个例外,SelectiveChannel情况下必须在请求处理完成后再释放request对象)

发起异步请求后Channel可以立刻析构。

注意:这是说Request/Channel的析构可以立刻发生在CallMethod之后,并不是说析构可以和CallMethod同时发生,删除正被另一个线程使用的Channel是未定义行为(很可能crash)。

使用NewCallback

static void OnRPCDone(MyResponse* response, brpc::Controller* cntl) {
    // unique_ptr会帮助我们在return时自动删掉response/cntl,防止忘记。gcc 3.4下的unique_ptr是模拟版本。
    std::unique_ptr<MyResponse> response_guard(response);
    std::unique_ptr<brpc::Controller> cntl_guard(cntl);
    if (cntl->Failed()) {
        // RPC失败了. response里的值是未定义的,勿用。
    } else {
        // RPC成功了,response里有我们想要的数据。开始RPC的后续处理。    
    }
    // NewCallback产生的Closure会在Run结束后删除自己,不用我们做。
}
 
MyResponse* response = new MyResponse;
brpc::Controller* cntl = new brpc::Controller;
MyService_Stub stub(&channel);
 
MyRequest request;  // 你不用new request,即使在异步访问中.
request.set_foo(...);
cntl->set_timeout_ms(...);
stub.some_method(cntl, &request, response, brpc::NewCallback(OnRPCDone, response, cntl));

由于protobuf 3把NewCallback设置为私有,r32035后brpc把NewCallback独立于src/brpc/callback.h(并增加了一些重载)。如果你的程序出现NewCallback相关的编译错误,把google::protobuf::NewCallback替换为brpc::NewCallback就行了。
继承google::protobuf::Closure

使用done->run()(需重载run()及将response和cntl作为成员变量)

使用NewCallback的缺点是要分配三次内存:response, controller, done。如果profiler证明这儿的内存分配有瓶颈,可以考虑自己继承Closure,把response/controller作为成员变量,这样可以把三次new合并为一次。但缺点就是代码不够美观,如果内存分配不是瓶颈,别用这种方法。

// 核心是通过继承重载Closure的run()函数及将response和cntl作为成员变量
class OnRPCDone: public google::protobuf::Closure {
public:
    void Run() {
        // unique_ptr会帮助我们在return时自动delete this,防止忘记。gcc 3.4下的unique_ptr是模拟版本。
        std::unique_ptr<OnRPCDone> self_guard(this);
          
        if (cntl->Failed()) {
            // RPC失败了. response里的值是未定义的,勿用。
        } else {
            // RPC成功了,response里有我们想要的数据。开始RPC的后续处理。
        }
    }
 
    MyResponse response;
    brpc::Controller cntl;
}
 
OnRPCDone* done = new OnRPCDone;
MyService_Stub stub(&channel);
 
MyRequest request;  // 你不用new request,即使在异步访问中.
request.set_foo(...);
done->cntl.set_timeout_ms(...);
stub.some_method(&done->cntl, &request, &done->response, done);

如果异步访问中的回调函数特别复杂会有什么影响吗?

没有特别的影响,回调会运行在独立的bthread中,不会阻塞其他的逻辑。你可以在回调中做各种阻塞操作。
rpc发送处的代码和回调函数是在同一个线程里执行吗?

一定不在同一个线程里运行,即使该次rpc调用刚进去就失败了,回调也会在另一个bthread中运行。这可以在加锁进行rpc(不推荐)的代码中避免死锁。
等待RPC完成

注意:当你需要发起多个并发操作时,可能ParallelChannel更方便。

rpc join及call_id用法

如下代码发起两个异步RPC后等待它们完成。

const brpc::CallId cid1 = controller1->call_id();
const brpc::CallId cid2 = controller2->call_id();

stub.method1(controller1, request1, response1, done1);
stub.method2(controller2, request2, response2, done2);

brpc::Join(cid1);
brpc::Join(cid2);

在发起RPC前调用Controller.call_id()获得一个id,发起RPC调用后Join那个id。

Join()的行为是等到RPC结束且done->Run()运行后,一些Join的性质如下:

如果对应的RPC已经结束,Join将立刻返回。
多个线程可以Join同一个id,它们都会醒来。
同步RPC也可以在另一个线程中被Join,但一般不会这么做。

Join()在之前的版本叫做JoinResponse(),如果你在编译时被提示deprecated之类的,修改为Join()。

在RPC调用后Join(controller->call_id())是错误的行为,一定要先把call_id保存下来。因为RPC调用后controller可能被随时开始运行的done删除。下面代码的Join方式是错误的。

static void on_rpc_done(Controller* controller, MyResponse* response) {
    ... Handle response ...
    delete controller;
    delete response;
}
 
Controller* controller1 = new Controller;
Controller* controller2 = new Controller;
MyResponse* response1 = new MyResponse;
MyResponse* response2 = new MyResponse;
...
stub.method1(controller1, &request1, response1, google::protobuf::NewCallback(on_rpc_done, controller1, response1));
stub.method2(controller2, &request2, response2, google::protobuf::NewCallback(on_rpc_done, controller2, response2));
...
brpc::Join(controller1->call_id());   // 错误,controller1可能被on_rpc_done删除了
brpc::Join(controller2->call_id());   // 错误,controller2可能被on_rpc_done删除了

半同步

Join可用来实现“半同步”访问:即等待多个异步访问完成。由于调用处的代码会等到所有RPC都结束后再醒来,所以controller和response都可以放栈上。一般来说工作中大都使用半同步的写法,因为我们不能等上一个请求结束了再去调用下一个,这样通常会让我们的服务平响增加,而异步的写法对服务响应的管理又不方便,我们有时是需要多个响应结合处理的,这时就需要半同步了。

brpc::Controller cntl1;
brpc::Controller cntl2;
MyResponse response1;
MyResponse response2;
// ...
stub1.method1(&cntl1, &request1, &response1, brpc::DoNothing());
// 上一步发起method1访问后就退出了,不会阻塞继续发起method2的访问
stub2.method2(&cntl2, &request2, &response2, brpc::DoNothing());
// 使用call_id等待所有访问结束后再一次性处理
brpc::Join(cntl1.call_id());
brpc::Join(cntl2.call_id());

brpc::DoNothing()可获得一个什么都不干的done,专门用于半同步访问。它的生命周期由框架管理,用户不用关心。

注意在上面的代码中,我们在RPC结束后又访问了controller.call_id(),这是没有问题的,因为DoNothing中并不会像上节中的on_rpc_done中那样删除Controller。但最好还是先在访问前将call_id保存下来


http://www.kler.cn/news/134811.html

相关文章:

  • VB.net读写S50/F08IC卡,修改卡片密码控制位源码
  • 警惕.360勒索病毒,您需要知道的预防和恢复方法。
  • IPKISS Tutorials 3------绘制矩形版图
  • Docker 安装 Oracle Database 23c
  • 前端图片转成base64
  • 8年资深测试,自动化测试常见问题总结,惊险避坑...
  • Docker基础知识总结
  • 医院陪诊服务预约小程序的作用如何
  • 源启容器平台KubeGien 打造云原生转型的破浪之舰
  • [uni-app]记录APP端跳转页面自动滚动到底部的bug
  • hiredis/examples /example-libevent.c
  • 如何进行手动脱壳
  • Hive客户端hive与beeline的区别
  • VR智慧景区:VR赋能文旅产业,激活消费潜能
  • EtherCAT 伺服控制功能块实现
  • 3D建模基础教程:编辑多边形功能命令快捷方式
  • SpringBoot 整合 Freemarker
  • 小程序判断是否授权位置信息和手动授权
  • 【每日一题】最大子数组和
  • 小程序商城免费搭建之java商城 电子商务Spring Cloud+Spring Boot+二次开发+mybatis+MQ+VR全景+b2b2c
  • 越南MIC新规针对ICT和ITE产品电气授权标准变更
  • 一起学docker系列之四docker的常用命令--系统操作docker命令及镜像命令
  • Springcloud可视化物联网智慧工地云SaaS平台源码 支持二开和私有化部署
  • 沸点 | Ultipa 图数据库金融应用场景优秀案例首批入选,金融街论坛年会发布
  • Chat GPT 用于论文润色,常用指令这里都全了
  • ts视频文件转为mp4(FFmpeg)
  • 『亚马逊云科技产品测评』活动征文|基于next.js搭建一个企业官网
  • 每天一道算法题(五)——判断一组数字是否连续,出现连续数字的时候以‘-’输出
  • Flutter笔记:目录与文件存储以及在Flutter中的使用(上)
  • Git 提交竟然还能这么用?