+int Start(const bthread_attr_t* consumer_thread_attr)
+int AddConsumer(SocketId socket_id, int fd)
+int RegisterEvent(SocketId socket_id, int fd, bool pollin)
+int UnregisterEvent(SocketId socket_id, int fd, bool pollin)
-static void* RunThis(void* arg)
-void Run()
-int RemoveConsumer(int fd)



void InitializeGlobalDispatchers() {
    g_edisp = new EventDispatcher[FLAGS_task_group_ntags * FLAGS_event_dispatcher_num];
    for (int i = 0; i < FLAGS_task_group_ntags; ++i) {
        for (int j = 0; j < FLAGS_event_dispatcher_num; ++j) {
            bthread_attr_t attr =
                FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL;
            attr.tag = (BTHREAD_TAG_DEFAULT + i) % FLAGS_task_group_ntags;
            CHECK_EQ(0, g_edisp[i * FLAGS_event_dispatcher_num + j].Start(&attr));
    // This atexit is will be run before g_task_control.stop() because above
    // Start() initializes g_task_control by creating bthread (to run epoll/kqueue).
    CHECK_EQ(0, atexit(StopAndJoinGlobalDispatchers));


EventDispatcher& GetGlobalEventDispatcher(int fd, bthread_tag_t tag) {
    pthread_once(&g_edisp_once, InitializeGlobalDispatchers);
    if (FLAGS_task_group_ntags == 1 && FLAGS_event_dispatcher_num == 1) {
        return g_edisp[0];
    int index = butil::fmix32(fd) % FLAGS_event_dispatcher_num;
    return g_edisp[tag * FLAGS_event_dispatcher_num + index];



options.on_edge_triggered_events = OnNewConnections;


        if (am->_use_rdma) {
            options.on_edge_triggered_events = rdma::RdmaEndpoint::OnNewDataFromTcp;
        } else {
            options.on_edge_triggered_events = InputMessenger::OnNewMessages;



int Socket::Connect(const timespec* abstime,
                    int (*on_connect)(int, int, void*), void* data) {
    if (_ssl_ctx) {
        _ssl_state = SSL_CONNECTING;
    } else {
        _ssl_state = SSL_OFF;
    struct sockaddr_storage serv_addr;
    socklen_t addr_size = 0;
    if (butil::endpoint2sockaddr(remote_side(), &serv_addr, &addr_size) != 0) {
        PLOG(ERROR) << "Fail to get sockaddr";
        return -1;
    butil::fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0));
    if (sockfd < 0) {
        PLOG(ERROR) << "Fail to create socket";
        return -1;
    CHECK_EQ(0, butil::make_close_on_exec(sockfd));
    // We need to do async connect (to manage the timeout by ourselves).
    CHECK_EQ(0, butil::make_non_blocking(sockfd));
    const int rc = ::connect(
        sockfd, (struct sockaddr*)&serv_addr, addr_size);
    if (rc != 0 && errno != EINPROGRESS) {
        PLOG(WARNING) << "Fail to connect to " << remote_side();
        return -1;
    if (on_connect) {
        EpollOutRequest* req = new(std::nothrow) EpollOutRequest;
        if (req == NULL) {
            LOG(FATAL) << "Fail to new EpollOutRequest";
            return -1;
        req->fd = sockfd;
        req->timer_id = 0;
        req->on_epollout_event = on_connect;
        req->data = data;
        // A temporary Socket to hold `EpollOutRequest', which will
        // be added into epoll device soon
        SocketId connect_id;
        SocketOptions options;
        options.bthread_tag = _bthread_tag;
        options.user = req;
        if (Socket::Create(options, &connect_id) != 0) {
            LOG(FATAL) << "Fail to create Socket";
            delete req;
            return -1;
        // From now on, ownership of `req' has been transferred to
        // `connect_id'. We hold an additional reference here to
        // ensure `req' to be valid in this scope
        SocketUniquePtr s;
        CHECK_EQ(0, Socket::Address(connect_id, &s));

        // Add `sockfd' into epoll so that `HandleEpollOutRequest' will
        // be called with `req' when epoll event reaches
        if (GetGlobalEventDispatcher(sockfd, _bthread_tag).RegisterEvent(connect_id, sockfd, false) !=
            0) {
            const int saved_errno = errno;
            PLOG(WARNING) << "Fail to add fd=" << sockfd << " into epoll";
            s->SetFailed(saved_errno, "Fail to add fd=%d into epoll: %s",
                         (int)sockfd, berror(saved_errno));
            return -1;
        // Register a timer for EpollOutRequest. Note that the timeout
        // callback has no race with the one above as both of them try
        // to `SetFailed' `connect_id' while only one of them can succeed
        // It also work when `HandleEpollOutRequest' has already been
        // called before adding the timer since it will be removed
        // inside destructor of `EpollOutRequest' after leaving this scope
        if (abstime) {
            int rc = bthread_timer_add(&req->timer_id, *abstime,
            if (rc) {
                LOG(ERROR) << "Fail to add timer: " << berror(rc);
                s->SetFailed(rc, "Fail to add timer: %s", berror(rc));
                return -1;
    } else {
        if (WaitEpollOut(sockfd, false, abstime) != 0) {
            PLOG(WARNING) << "Fail to wait EPOLLOUT of fd=" << sockfd;
            return -1;
        if (CheckConnected(sockfd) != 0) {
            return -1;
    return sockfd.release();



