brpc之io事件分发器
结构
初始化
brpc的io事件分发器,使用多线程Reactor模式
通过InitializeGlobalDispatchers
来初始化全局io事件分发器
分为task_group_ntags
组,每组有event_dispatcher_num
void InitializeGlobalDispatchers() {
g_edisp = new EventDispatcher[FLAGS_task_group_ntags * FLAGS_event_dispatcher_num];
for (int i = 0; i < FLAGS_task_group_ntags; ++i) {
for (int j = 0; j < FLAGS_event_dispatcher_num; ++j) {
bthread_attr_t attr =
FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL;
attr.tag = (BTHREAD_TAG_DEFAULT + i) % FLAGS_task_group_ntags;
CHECK_EQ(0, g_edisp[i * FLAGS_event_dispatcher_num + j].Start(&attr));
}
}
// This atexit is will be run before g_task_control.stop() because above
// Start() initializes g_task_control by creating bthread (to run epoll/kqueue).
CHECK_EQ(0, atexit(StopAndJoinGlobalDispatchers));
}
分发策略
EventDispatcher& GetGlobalEventDispatcher(int fd, bthread_tag_t tag) {
pthread_once(&g_edisp_once, InitializeGlobalDispatchers);
if (FLAGS_task_group_ntags == 1 && FLAGS_event_dispatcher_num == 1) {
return g_edisp[0];
}
int index = butil::fmix32(fd) % FLAGS_event_dispatcher_num;
return g_edisp[tag * FLAGS_event_dispatcher_num + index];
}
读事件
对于acceptor,读事件处理函数为OnNewConnections
options.on_edge_triggered_events = OnNewConnections;
连接后新socket的读事件处理函数为OnNewDataFromTcp
或者OnNewMessages
#if BRPC_WITH_RDMA
if (am->_use_rdma) {
options.on_edge_triggered_events = rdma::RdmaEndpoint::OnNewDataFromTcp;
} else {
#else
{
#endif
options.on_edge_triggered_events = InputMessenger::OnNewMessages;
}
写事件
对于非阻塞connect时,会调用事件分发器的RegisterEvent
注册EPOLLOUT
int Socket::Connect(const timespec* abstime,
int (*on_connect)(int, int, void*), void* data) {
if (_ssl_ctx) {
_ssl_state = SSL_CONNECTING;
} else {
_ssl_state = SSL_OFF;
}
struct sockaddr_storage serv_addr;
socklen_t addr_size = 0;
if (butil::endpoint2sockaddr(remote_side(), &serv_addr, &addr_size) != 0) {
PLOG(ERROR) << "Fail to get sockaddr";
return -1;
}
butil::fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0));
if (sockfd < 0) {
PLOG(ERROR) << "Fail to create socket";
return -1;
}
CHECK_EQ(0, butil::make_close_on_exec(sockfd));
// We need to do async connect (to manage the timeout by ourselves).
CHECK_EQ(0, butil::make_non_blocking(sockfd));
const int rc = ::connect(
sockfd, (struct sockaddr*)&serv_addr, addr_size);
if (rc != 0 && errno != EINPROGRESS) {
PLOG(WARNING) << "Fail to connect to " << remote_side();
return -1;
}
if (on_connect) {
EpollOutRequest* req = new(std::nothrow) EpollOutRequest;
if (req == NULL) {
LOG(FATAL) << "Fail to new EpollOutRequest";
return -1;
}
req->fd = sockfd;
req->timer_id = 0;
req->on_epollout_event = on_connect;
req->data = data;
// A temporary Socket to hold `EpollOutRequest', which will
// be added into epoll device soon
SocketId connect_id;
SocketOptions options;
options.bthread_tag = _bthread_tag;
options.user = req;
if (Socket::Create(options, &connect_id) != 0) {
LOG(FATAL) << "Fail to create Socket";
delete req;
return -1;
}
// From now on, ownership of `req' has been transferred to
// `connect_id'. We hold an additional reference here to
// ensure `req' to be valid in this scope
SocketUniquePtr s;
CHECK_EQ(0, Socket::Address(connect_id, &s));
// Add `sockfd' into epoll so that `HandleEpollOutRequest' will
// be called with `req' when epoll event reaches
if (GetGlobalEventDispatcher(sockfd, _bthread_tag).RegisterEvent(connect_id, sockfd, false) !=
0) {
const int saved_errno = errno;
PLOG(WARNING) << "Fail to add fd=" << sockfd << " into epoll";
s->SetFailed(saved_errno, "Fail to add fd=%d into epoll: %s",
(int)sockfd, berror(saved_errno));
return -1;
}
// Register a timer for EpollOutRequest. Note that the timeout
// callback has no race with the one above as both of them try
// to `SetFailed' `connect_id' while only one of them can succeed
// It also work when `HandleEpollOutRequest' has already been
// called before adding the timer since it will be removed
// inside destructor of `EpollOutRequest' after leaving this scope
if (abstime) {
int rc = bthread_timer_add(&req->timer_id, *abstime,
HandleEpollOutTimeout,
(void*)connect_id);
if (rc) {
LOG(ERROR) << "Fail to add timer: " << berror(rc);
s->SetFailed(rc, "Fail to add timer: %s", berror(rc));
return -1;
}
}
} else {
if (WaitEpollOut(sockfd, false, abstime) != 0) {
PLOG(WARNING) << "Fail to wait EPOLLOUT of fd=" << sockfd;
return -1;
}
if (CheckConnected(sockfd) != 0) {
return -1;
}
}
return sockfd.release();
}