【brpc学习实战三】同步、异步、半同步原理
同步访问
指的是:RPC请求(CallMethod、xxx_stub.xxxService)会阻塞到收到server端返回response或发生错误(包括超时)。
同步访问中的response/controller不会在CallMethod后被框架使用,它们都可以分配在栈上。注意,如果request/response字段特别多字节数特别大的话,还是更适合分配在堆上。
MyRequest request;
MyResponse response;
brpc::Controller cntl;
XXX_Stub stub(&channel);
request.set_foo(...);
cntl.set_timeout_ms(...);
// 发起请求,阻塞到返回
stub.some_method(&cntl, &request, &response, NULL);
if (cntl->Failed()) {
// RPC失败了. response里的值是未定义的,勿用。
} else {
// RPC成功了,response里有我们想要的回复数据。
}
警告: 请勿在持有pthread锁的情况下,调用brpc的同步CallMethod!否则很容易导致死锁。
解决方案(二选一):
将pthread锁换成bthread锁(bthread_mutex_t)
在CallMethod之前将锁释放
异步访问
指的是:给CallMethod传递一个额外的回调对象done,CallMethod在发出request后就结束了,而不是在RPC结束后。当server端返回response或发生错误(包括超时)时,done->Run()会被调用。对RPC的后续处理应该写在done->Run()里,而不是CallMethod后。
也就是说我发起请求后我就退出了,至于请求的响应及其处理,我给你提供一个回调来处理。
由于CallMethod结束不意味着RPC结束,response/controller仍可能被框架及done->Run()使用,它们一般得创建在堆上,并在done->Run()中删除。如果提前删除了它们,那当done->Run()被调用时,将访问到无效内存。
你可以独立地创建这些对象,并使用NewCallback生成done,也可以把Response和Controller作为done的成员变量,一起new出来,一般使用前一种方法。
发起异步请求后Request可以立刻析构。(SelectiveChannel是个例外,SelectiveChannel情况下必须在请求处理完成后再释放request对象)
发起异步请求后Channel可以立刻析构。
注意:这是说Request/Channel的析构可以立刻发生在CallMethod之后,并不是说析构可以和CallMethod同时发生,删除正被另一个线程使用的Channel是未定义行为(很可能crash)。
使用NewCallback
static void OnRPCDone(MyResponse* response, brpc::Controller* cntl) {
// unique_ptr会帮助我们在return时自动删掉response/cntl,防止忘记。gcc 3.4下的unique_ptr是模拟版本。
std::unique_ptr<MyResponse> response_guard(response);
std::unique_ptr<brpc::Controller> cntl_guard(cntl);
if (cntl->Failed()) {
// RPC失败了. response里的值是未定义的,勿用。
} else {
// RPC成功了,response里有我们想要的数据。开始RPC的后续处理。
}
// NewCallback产生的Closure会在Run结束后删除自己,不用我们做。
}
MyResponse* response = new MyResponse;
brpc::Controller* cntl = new brpc::Controller;
MyService_Stub stub(&channel);
MyRequest request; // 你不用new request,即使在异步访问中.
request.set_foo(...);
cntl->set_timeout_ms(...);
stub.some_method(cntl, &request, response, brpc::NewCallback(OnRPCDone, response, cntl));
由于protobuf 3把NewCallback设置为私有,r32035后brpc把NewCallback独立于src/brpc/callback.h(并增加了一些重载)。如果你的程序出现NewCallback相关的编译错误,把google::protobuf::NewCallback替换为brpc::NewCallback就行了。
继承google::protobuf::Closure
使用done->run()(需重载run()及将response和cntl作为成员变量)
使用NewCallback的缺点是要分配三次内存:response, controller, done。如果profiler证明这儿的内存分配有瓶颈,可以考虑自己继承Closure,把response/controller作为成员变量,这样可以把三次new合并为一次。但缺点就是代码不够美观,如果内存分配不是瓶颈,别用这种方法。
// 核心是通过继承重载Closure的run()函数及将response和cntl作为成员变量
class OnRPCDone: public google::protobuf::Closure {
public:
void Run() {
// unique_ptr会帮助我们在return时自动delete this,防止忘记。gcc 3.4下的unique_ptr是模拟版本。
std::unique_ptr<OnRPCDone> self_guard(this);
if (cntl->Failed()) {
// RPC失败了. response里的值是未定义的,勿用。
} else {
// RPC成功了,response里有我们想要的数据。开始RPC的后续处理。
}
}
MyResponse response;
brpc::Controller cntl;
}
OnRPCDone* done = new OnRPCDone;
MyService_Stub stub(&channel);
MyRequest request; // 你不用new request,即使在异步访问中.
request.set_foo(...);
done->cntl.set_timeout_ms(...);
stub.some_method(&done->cntl, &request, &done->response, done);
如果异步访问中的回调函数特别复杂会有什么影响吗?
没有特别的影响,回调会运行在独立的bthread中,不会阻塞其他的逻辑。你可以在回调中做各种阻塞操作。
rpc发送处的代码和回调函数是在同一个线程里执行吗?
一定不在同一个线程里运行,即使该次rpc调用刚进去就失败了,回调也会在另一个bthread中运行。这可以在加锁进行rpc(不推荐)的代码中避免死锁。
等待RPC完成
注意:当你需要发起多个并发操作时,可能ParallelChannel更方便。
rpc join及call_id用法
如下代码发起两个异步RPC后等待它们完成。
const brpc::CallId cid1 = controller1->call_id();
const brpc::CallId cid2 = controller2->call_id();
…
stub.method1(controller1, request1, response1, done1);
stub.method2(controller2, request2, response2, done2);
…
brpc::Join(cid1);
brpc::Join(cid2);
在发起RPC前调用Controller.call_id()获得一个id,发起RPC调用后Join那个id。
Join()的行为是等到RPC结束且done->Run()运行后,一些Join的性质如下:
如果对应的RPC已经结束,Join将立刻返回。
多个线程可以Join同一个id,它们都会醒来。
同步RPC也可以在另一个线程中被Join,但一般不会这么做。
Join()在之前的版本叫做JoinResponse(),如果你在编译时被提示deprecated之类的,修改为Join()。
在RPC调用后Join(controller->call_id())是错误的行为,一定要先把call_id保存下来。因为RPC调用后controller可能被随时开始运行的done删除。下面代码的Join方式是错误的。
static void on_rpc_done(Controller* controller, MyResponse* response) {
... Handle response ...
delete controller;
delete response;
}
Controller* controller1 = new Controller;
Controller* controller2 = new Controller;
MyResponse* response1 = new MyResponse;
MyResponse* response2 = new MyResponse;
...
stub.method1(controller1, &request1, response1, google::protobuf::NewCallback(on_rpc_done, controller1, response1));
stub.method2(controller2, &request2, response2, google::protobuf::NewCallback(on_rpc_done, controller2, response2));
...
brpc::Join(controller1->call_id()); // 错误,controller1可能被on_rpc_done删除了
brpc::Join(controller2->call_id()); // 错误,controller2可能被on_rpc_done删除了
半同步
Join可用来实现“半同步”访问:即等待多个异步访问完成。由于调用处的代码会等到所有RPC都结束后再醒来,所以controller和response都可以放栈上。一般来说工作中大都使用半同步的写法,因为我们不能等上一个请求结束了再去调用下一个,这样通常会让我们的服务平响增加,而异步的写法对服务响应的管理又不方便,我们有时是需要多个响应结合处理的,这时就需要半同步了。
brpc::Controller cntl1;
brpc::Controller cntl2;
MyResponse response1;
MyResponse response2;
// ...
stub1.method1(&cntl1, &request1, &response1, brpc::DoNothing());
// 上一步发起method1访问后就退出了,不会阻塞继续发起method2的访问
stub2.method2(&cntl2, &request2, &response2, brpc::DoNothing());
// 使用call_id等待所有访问结束后再一次性处理
brpc::Join(cntl1.call_id());
brpc::Join(cntl2.call_id());
brpc::DoNothing()可获得一个什么都不干的done,专门用于半同步访问。它的生命周期由框架管理,用户不用关心。
注意在上面的代码中,我们在RPC结束后又访问了controller.call_id(),这是没有问题的,因为DoNothing中并不会像上节中的on_rpc_done中那样删除Controller。但最好还是先在访问前将call_id保存下来