当前位置: 首页 > article >正文

brpc之io事件分发器

结构

EventDispatcher
+int Start(const bthread_attr_t* consumer_thread_attr)
+int AddConsumer(SocketId socket_id, int fd)
+int RegisterEvent(SocketId socket_id, int fd, bool pollin)
+int UnregisterEvent(SocketId socket_id, int fd, bool pollin)
-static void* RunThis(void* arg)
-void Run()
-int RemoveConsumer(int fd)

初始化

brpc的io事件分发器,使用多线程Reactor模式
通过InitializeGlobalDispatchers来初始化全局io事件分发器
分为task_group_ntags组,每组有event_dispatcher_num

void InitializeGlobalDispatchers() {
    g_edisp = new EventDispatcher[FLAGS_task_group_ntags * FLAGS_event_dispatcher_num];
    for (int i = 0; i < FLAGS_task_group_ntags; ++i) {
        for (int j = 0; j < FLAGS_event_dispatcher_num; ++j) {
            bthread_attr_t attr =
                FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL;
            attr.tag = (BTHREAD_TAG_DEFAULT + i) % FLAGS_task_group_ntags;
            CHECK_EQ(0, g_edisp[i * FLAGS_event_dispatcher_num + j].Start(&attr));
        }
    }
    // This atexit is will be run before g_task_control.stop() because above
    // Start() initializes g_task_control by creating bthread (to run epoll/kqueue).
    CHECK_EQ(0, atexit(StopAndJoinGlobalDispatchers));
}

分发策略

EventDispatcher& GetGlobalEventDispatcher(int fd, bthread_tag_t tag) {
    pthread_once(&g_edisp_once, InitializeGlobalDispatchers);
    if (FLAGS_task_group_ntags == 1 && FLAGS_event_dispatcher_num == 1) {
        return g_edisp[0];
    }
    int index = butil::fmix32(fd) % FLAGS_event_dispatcher_num;
    return g_edisp[tag * FLAGS_event_dispatcher_num + index];
}

读事件

对于acceptor,读事件处理函数为OnNewConnections

options.on_edge_triggered_events = OnNewConnections;

连接后新socket的读事件处理函数为OnNewDataFromTcp或者OnNewMessages

#if BRPC_WITH_RDMA
        if (am->_use_rdma) {
            options.on_edge_triggered_events = rdma::RdmaEndpoint::OnNewDataFromTcp;
        } else {
#else
        {
#endif
            options.on_edge_triggered_events = InputMessenger::OnNewMessages;
        }

写事件

对于非阻塞connect时,会调用事件分发器的RegisterEvent注册EPOLLOUT

int Socket::Connect(const timespec* abstime,
                    int (*on_connect)(int, int, void*), void* data) {
    if (_ssl_ctx) {
        _ssl_state = SSL_CONNECTING;
    } else {
        _ssl_state = SSL_OFF;
    }
    struct sockaddr_storage serv_addr;
    socklen_t addr_size = 0;
    if (butil::endpoint2sockaddr(remote_side(), &serv_addr, &addr_size) != 0) {
        PLOG(ERROR) << "Fail to get sockaddr";
        return -1;
    }
    butil::fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0));
    if (sockfd < 0) {
        PLOG(ERROR) << "Fail to create socket";
        return -1;
    }
    CHECK_EQ(0, butil::make_close_on_exec(sockfd));
    // We need to do async connect (to manage the timeout by ourselves).
    CHECK_EQ(0, butil::make_non_blocking(sockfd));
    
    const int rc = ::connect(
        sockfd, (struct sockaddr*)&serv_addr, addr_size);
    if (rc != 0 && errno != EINPROGRESS) {
        PLOG(WARNING) << "Fail to connect to " << remote_side();
        return -1;
    }
    if (on_connect) {
        EpollOutRequest* req = new(std::nothrow) EpollOutRequest;
        if (req == NULL) {
            LOG(FATAL) << "Fail to new EpollOutRequest";
            return -1;
        }
        req->fd = sockfd;
        req->timer_id = 0;
        req->on_epollout_event = on_connect;
        req->data = data;
        // A temporary Socket to hold `EpollOutRequest', which will
        // be added into epoll device soon
        SocketId connect_id;
        SocketOptions options;
        options.bthread_tag = _bthread_tag;
        options.user = req;
        if (Socket::Create(options, &connect_id) != 0) {
            LOG(FATAL) << "Fail to create Socket";
            delete req;
            return -1;
        }
        // From now on, ownership of `req' has been transferred to
        // `connect_id'. We hold an additional reference here to
        // ensure `req' to be valid in this scope
        SocketUniquePtr s;
        CHECK_EQ(0, Socket::Address(connect_id, &s));

        // Add `sockfd' into epoll so that `HandleEpollOutRequest' will
        // be called with `req' when epoll event reaches
        if (GetGlobalEventDispatcher(sockfd, _bthread_tag).RegisterEvent(connect_id, sockfd, false) !=
            0) {
            const int saved_errno = errno;
            PLOG(WARNING) << "Fail to add fd=" << sockfd << " into epoll";
            s->SetFailed(saved_errno, "Fail to add fd=%d into epoll: %s",
                         (int)sockfd, berror(saved_errno));
            return -1;
        }
        
        // Register a timer for EpollOutRequest. Note that the timeout
        // callback has no race with the one above as both of them try
        // to `SetFailed' `connect_id' while only one of them can succeed
        // It also work when `HandleEpollOutRequest' has already been
        // called before adding the timer since it will be removed
        // inside destructor of `EpollOutRequest' after leaving this scope
        if (abstime) {
            int rc = bthread_timer_add(&req->timer_id, *abstime,
                                       HandleEpollOutTimeout,
                                       (void*)connect_id);
            if (rc) {
                LOG(ERROR) << "Fail to add timer: " << berror(rc);
                s->SetFailed(rc, "Fail to add timer: %s", berror(rc));
                return -1;
            }
        }
        
    } else {
        if (WaitEpollOut(sockfd, false, abstime) != 0) {
            PLOG(WARNING) << "Fail to wait EPOLLOUT of fd=" << sockfd;
            return -1;
        }
        if (CheckConnected(sockfd) != 0) {
            return -1;
        }
    }
    return sockfd.release();
}

http://www.kler.cn/news/318705.html

相关文章:

  • 【会议征稿通知】第三届图像处理、计算机视觉与机器学习国际学术会议(ICICML 2024)
  • Java使用Map数据结构配合函数式接口存储方法引用
  • 洛谷P2571.传送带
  • request库的使用 | get请求
  • 微软Active Directory:组织身份与访问管理的基石
  • 字符串——String
  • 量子计算如何引发第四次工业革命——解读加来道雄的量子物理观
  • Android平台使用VIA创建语音交互应用
  • 【ArcGIS微课1000例】0122:经纬网、方里网、参考格网绘制案例教程
  • 0基础带你入门Linux之使用
  • 初识C#(一)
  • 2.以太网
  • 毕设基于SSM+Vue3实现设备维修管理系统四:后台框架及基础增删改查功能实现
  • 常见区块链数据模型介绍
  • [leetcode]113_路径总和II_输出所有路径
  • 【AIGC】ChatGPT RAG提取文档内容,高效制作PPT、论文
  • day-59 四数之和
  • 【React】(推荐项目)使用 React、Socket.io、Nodejs、Redux-Toolkit、MongoDB 构建聊天应用程序 (2024)
  • 数据集-目标检测系列-海洋鱼类检测数据集 fish>> DataBall
  • short-link笔记
  • ubuntu 24.04 输入设备显示没有,系统没有找到电脑麦克风
  • web平台搭建-LAMP(CentOS-7)
  • 【自动驾驶】基于车辆几何模型的横向控制算法 | Stanley 算法详解与编程实现
  • ai写论文哪个平台好?分享4款ai论文写作平台软件
  • Python范例总结
  • 【计算机视觉】YoloV8-训练与测试教程
  • javascript是什么语言?它是干什么的?
  • Element UI在工程中使用方式
  • 79、Python之鸭子类型:没有听过鸭子类型?关键在于认知的转变
  • 网络安全-长亭雷池waf的sql绕过,安全狗绕过(5种绕过3+2)