当前位置: 首页 > article >正文

Muduo网络库解析---事件循环模块

文章目录

  • 前言
  • Channel
    • Channel代码
      • Channel.h
      • Channel.cc
  • Poller
    • Poller代码
      • Poller.h
      • Poller.cc
  • EpollPoller
    • EpollPoller代码
      • EpollPoller.h
      • EpollPoller.cc
  • EventLoop
    • EventLoop代码
      • EventLoop.h
      • EventLoop.cc
    • 类图

前言

重写Muduo库实现核心模块的Git仓库

注:本文将重点剖析 Muduo 网络库的核心框架,深入探讨作者精妙的代码设计思路,并针对核心代码部分进行重写,将原本依赖 boost 的实现替换为原生的 C++11 语法。需要说明的是,本文并不打算对整个 Muduo 库进行完整的重写。Muduo库源码链接

在前文中,我们对Muduo的基础模块LoggerTimestampBuffer有了初步了解(如果还未阅读过的同学可先参考这篇文章进行回顾)。接下来,我们将深入探讨Muduo的事件循环相关模块,包括:

  • Channel
  • Poller
  • EpollPoller
  • EventLoop

通过对这些模块的剖析,我们将逐步了解事件循环的运行机制和高效的事件分发策略,从而进一步理解 Muduo 在构建高性能服务器时的设计思路与实现细节。接下来,我们将以事件循环的主执行单元 EventLoop 为线索,分别阐述 ChannelPollerEpollPollerEventLoop 的职责、设计理念以及交互关系。

Channel

ChannelMuduo 事件循环框架中连接文件描述符事件处理逻辑的中间抽象层。其主要作用是为某个特定的 I/O 通道(通常是一个套接字文件描述符)绑定对应的事件回调函数,并协助 EventLoop 对发生的事件进行分发处理。

简而言之,Channel 并不关心 I/O 操作的具体过程,它更像是一个“桥梁”或“适配器”,将底层的文件描述符事件与上层的业务回调关联起来。它的核心功能和责任包括:

  1. 文件描述符与事件的关联
    Channel 持有一个文件描述符(如套接字)和一组感兴趣的事件类型(如可读事件、可写事件)。当底层 I/O 多路复用机制(如 epoll)检测到该文件描述符有事件发生时,会通知对应的 Channel
  2. 事件回调的注册与触发
    Channel 不负责事件的检测,但负责事件的回调分发。当 Poller (如 EpollPoller) 返回已发生的事件时,EventLoop 会调用 Channel 的回调函数(如读回调、写回调、关闭回调、错误回调)。借助 Channel,框架实现了对不同 I/O 通道的统一管理。
  3. EventLoopPoller协作:
    EventLoop 中维护一组与当前线程相关的 Channel。每个 Channel 会在内部存储自己的感兴趣事件,并在必要时由 EventLoop 通过 Poller 注册、修改、删除对相应文件描述符的监听。当事件发生时,Poller 收集事件并通知 EventLoopEventLoop 随后调用 Channel 的相应回调函数。
  4. 简化上层逻辑
    有了 Channel 这个抽象,上层应用代码只需负责定义事件发生时的处理逻辑(即回调函数),而不必直接与底层的多路复用接口交互。这大大简化了事件循环框架的使用难度和代码复杂度。

总之,ChannelMuduo 中连接底层事件检测机制上层事件处理逻辑的关键组件,通过统一的接口与抽象,它极大地降低了事件处理的复杂性,使用户可以更直观地编写网络事件处理代码。

Channel代码

Channel.h

class Channel : noncopyable
{
public:
    using EventCallback = std::function<void()>;
    using ReadEventCallback = std::function<void(Timestamp)>;

    Channel(EventLoop* eventloop, int fd);
    ~Channel();

    void handleEvent(Timestamp receiveTime);

    void setReadCallback(ReadEventCallback cb) { readCallback_ = std::move(cb); }
    void setWriteCallback(EventCallback cb)   { writeCallback_ = std::move(cb); }
    void setCloseCallback(EventCallback cb)   { closeCallback_ = std::move(cb); }
    void setErrorCallback(EventCallback cb)   { errorCallback_ = std::move(cb); }
    
    // 将channel绑定到shared_ptr管理的owner对象,
    // 防止在handleEvent中销毁owner对象
    void tie(const std::shared_ptr<void>&);

    int fd() const { return fd_; }
    int events() const { return events_; }
    // 被poller调用设置已就绪的事件
    void set_revents(int revt) { revents_ = revt; }
    bool isNonEvent() const { return events_ == kNoneEvent; }

    void enableReading() { events_ |= kReadEvent; update();}
    void disableReading(){ events_ &= ~kReadEvent; update();}
    void enableWriting() { events_ |= kWriteEvent; update();}
    void disableWriting(){ events_ &= ~kWriteEvent; update();}
    void disableAll()    { events_ = kNoneEvent; update();}
    bool isReading()const{ return events_ & kReadEvent; }
    bool isWriting()const{ return events_ & kWriteEvent; }

    int index() const { return index_; }
    void set_index(int index) { index_ = index; }

    EventLoop* ownerLoop() const { return loop_; }
    void remove();
private:
    void update();
    void handleEventWithGuard(Timestamp receiveTime);

private:
    static const int kNoneEvent; // 无事件
    static const int kReadEvent; // 读事件
    static const int kWriteEvent;// 写事件

    EventLoop* loop_;   // 该Channel所绑定的EventLoop
    const int fd_;      // 封装的fd
    int events_;        // 注册事件
    int revents_;       // 就绪事件
    /*index_描述当前Channel的状态:
        -1 : 新添加,还未注册到epoll
        1 : 已注册并添加到epoll
        2 : 已删除 
    */
    int index_;
	// 通过weak_ptr将Channel绑定到TcpConnection
    std::weak_ptr<void> tie_;	
    // 是否绑定
    bool tied_;

    ReadEventCallback readCallback_;// 读回调
    EventCallback writeCallback_;	// 写回调
    EventCallback closeCallback_;	// 关闭回调
    EventCallback errorCallback_;	// 错误回调
};

解析:

几个重要的对外接口:

  • 几个设置回调函数的公有函数
  • tie:将Channel绑定到给定的参数
  • set_revents:用来设置Channel的就绪事件
  • handleEvent:根据就绪事件revents,执行回调函数

Channel.cc

const int Channel::kNoneEvent = 0;
// EPOLLPRI: 当文件描述符上有紧急数据时,触发
const int Channel::kReadEvent = EPOLLIN | EPOLLPRI;
const int Channel::kWriteEvent = EPOLLOUT;

Channel::Channel(EventLoop *eventloop, int fd) :
  loop_(eventloop),
  fd_(fd), 
  events_(0),
  revents_(0),
  index_(-1),
  tied_(false)
{}

Channel::~Channel() {}

void Channel::handleEvent(Timestamp receiveTime)
{
  /*
  处理回调函数的向外提供的接口
  在EventLoop中调用
  */
    if(tied_){
      std::shared_ptr<void> guard = tie_.lock();  // 提升
      if(guard){
        handleEventWithGuard(receiveTime);
      }

    }else{
      handleEventWithGuard(receiveTime);
    }
}

// 在TcpConnection::connectEstablish中调用channel::tie
void Channel::tie(const std::shared_ptr<void>& obj)
{
    tie_ = obj;
    tied_ = true;
}

void Channel::remove()
{
  loop_->removeChannel(this);
}

// 当改变channel所表示的fd的events后,需要在poller里面更改fd相应的事件epoll_ctl
void Channel::update()
{
  loop_->updateChannel(this);
}

void Channel::handleEventWithGuard(Timestamp receiveTime)
{
  /*
  调用回调函数的实施接口
  */
    if((revents_ & EPOLLHUP) && !(revents_ & EPOLLIN))
    {
      /*
      EPOLLHUP: 表示文件描述符挂起事件,通常表示远端关闭连接
      */
        if(closeCallback_){
           closeCallback_();
        }
    }
    
    if(revents_ & EPOLLERR){
      if(errorCallback_){
        errorCallback_();
      }
    }

    if(revents_ & (EPOLLIN | EPOLLPRI)){
      if(readCallback_){
        readCallback_(receiveTime);
      }
    }

    if(revents_ & EPOLLOUT){
      if(writeCallback_){
        writeCallback_();
      }
    }
}

Poller

PollerMuduo 中对底层事件分发器(Demultiplexer,如 epollpoll 等机制)的抽象封装。它负责对已注册的文件描述符进行监听,并在事件准备就绪(如套接字可读或可写)时返回相应事件。通过这种抽象,EventLoop 无需直接与底层的 I/O 多路复用机制交互,而可以统一调用 Poller 接口进行事件的获取和分发,从而提升代码的可扩展性与可维护性。

Poller代码

Poller.h

class Poller : noncopyable
{
public:
    using ChannelList = std::vector<Channel*>;

    Poller(EventLoop*);
    virtual ~Poller() = default;

    // 给所有IO复用提供统一的接口
    virtual Timestamp poll(int timeout, ChannelList* activeChannels) = 0;
    virtual void updateChannel(Channel*) = 0;
    virtual void removeChannel(Channel*) = 0;
    bool hasChannel(Channel* channel) const;
    // Eventloop可以通过该接口获取默认的IO复用的对象
    static Poller* newDefaultPoller(EventLoop* loop);
protected:
    using ChannelMap = std::unordered_map<int, Channel*>; 
    ChannelMap channels_;
private:
    EventLoop* ownerLoop_;
};

解析:

由于 Poller 是对事件分发器(Demultiplexer)的抽象,因此它作为抽象基类被其他具体的事件分发实现类(如 PollPollerEpollPoller)继承和扩展。为此,Poller 必须提供统一的接口规范,以便各具体实现类能够按照这一统一接口进行定制和实现。具体接口如下:

  • poll(int timeout, ChannelList* activeChannels):事件分发器的核心函数。对于不同的事件分发器具体类,该函数会调用各自底层的IO多路复用系统调用(如poll()epoll_wait())。当有事件就绪时,它会将相应的事件填充到activeChannels
  • updateChannel(Channel*) 用于在底层事件分发器中更新指定 Channel 对文件描述符所关注的事件类型(如可读、可写等)。
  • emoveChannel(Channel*) 则用于将指定的 Channel 从事件监听中移除。
  • newDefaultPoller(EventLoop* loop):此方法为静态方法,用于其派生类构造一个新的Poller

Poller.cc

Poller::Poller(EventLoop *loop) : ownerLoop_(loop){}

bool Poller::hasChannel(Channel *channel) const 
{
    auto it = channels_.find(channel->fd());
    return it != channels_.end() && it->second == channel;
}

这里有个不同点在于,作者将newDefaultPoller方法定义到其他文件上(DefaultPoller.cc):

Poller* Poller::newDefaultPoller(EventLoop* loop)
{
    if(::getenv("MUDUO_USE_POLL")){
        return nullptr;
    }
    return new EpollPoller(loop);
}

EpollPoller

EpollPoller 是继承自 Poller 的具体事件分发器实现类。在我们的项目中将只实现 EpollPoller,而不涉及 PollPoller,这是因为 Muduo 默认使用 EpollPoller,并且相较于 pollepoll 的效率与性能表现更为优异。

epollIO多路复用的接口分别有:

  1. epoll_create
  2. epoll_ctl
    • EPOLL_CTL_ADD
    • EPOLL_CTL_MOD
    • EPOLL_CTL_DEL
  3. epoll_wait

EpollPoller对这几个系统调用进行了一系列封装,并针对 epoll_wait 函数的 struct epoll_event *events 参数采用了动态扩容策略。具体做法如下:

使用std::vector<struct epoll_event>作为events的承载容器,并初始分配16个元素的空间。当一次 epoll_wait 调用返回后,如果就绪事件的数量等于当前 vector 的容量,则将其容量扩充为原来的两倍,以应对后续可能出现的更多事件,从而避免频繁的内存分配操作。

EpollPoller代码

EpollPoller.h

class EpollPoller : public Poller
{
public:
    EpollPoller(EventLoop* loop);
    ~EpollPoller() override;

    // 重写基类接口
    Timestamp poll(int timeout, ChannelList* activeChannels) override;
    void updateChannel(Channel*) override;
    void removeChannel(Channel*) override;

private:
    void update(int operation, Channel* channel);
    void fillActiveChannels(int numEvents, ChannelList* activeChannel);

private:
    using EventList = std::vector<struct epoll_event>;

    // Events初始化为16
    static const int kInitEventListSize = 16;

    int epollfd_;	// epoll_create返回的文件描述符
    // 作为epoll_wait中events的承载容器
    EventList events_;
};

EpollPoller.cc

/*
    表示Channel的状态
*/
const int kNew = -1;
const int kAdded = 1;
const int kDeleted = 2;

EpollPoller::EpollPoller(EventLoop *loop):
    Poller(loop),
    // EPOLL_CLOEXEC:当进程执行exec时,自动关闭该文件描述符
    epollfd_(::epoll_create1(EPOLL_CLOEXEC)),
    events_(kInitEventListSize)
{
    if(epollfd_ < 0)
    {
        LOG_ERROR("EPollPoller Create epollfd error\n");
    }
}

EpollPoller::~EpollPoller()
{
    ::close(epollfd_);
}


/*
功能:
    向EventLoop提供的接口,监听哪些fd发生事件
参数:
    timeout(传入参数): 超时时间
    activeChannels(传出参数): 通过fillActiveChannels函数push所有发生事件的Channel
*/
Timestamp EpollPoller::poll(int timeout, ChannelList *activeChannels)
{
    //LOG_DEBUG("poll start");
    int numEvents = epoll_wait(epollfd_, &*events_.begin(), events_.size(), timeout);
    Timestamp now(Timestamp::now());
    int saveError = errno;
    if(numEvents > 0)
    {
        LOG_DEBUG("%d Events happened", numEvents);
        fillActiveChannels(numEvents, activeChannels);
        if(numEvents == events_.size())
        {
            events_.resize(2 * events_.size());
        }
    }
    else if(numEvents == 0) 
    {
        //LOG_DEBUG("%s timeout!", __FUNCTION__);
    }
    else
    {
        if(saveError != EINTR)
        {
            errno = saveError;
            LOG_ERROR("EpollPoller::poll() err!");
        }
    }
    return now;
}

/*
功能: 
    向EventLoop提供接口,修改Channel所注册的事件
*/
void EpollPoller::updateChannel(Channel* channel)
{
    const int index = channel->index(); 
    LOG_INFO("EpollPoller::%s => fd=%d events=%d index=%d", __FUNCTION__, channel->fd(), channel->events(), channel->index());
    if(index == kNew || index == kDeleted)
    {
        int fd = channel->fd();
        if(index == kNew)
        {
            channels_[fd] = channel;
        }
        channel->set_index(kAdded);
        update(EPOLL_CTL_ADD, channel);
    }
    else
    {
        // update existing one with EPOLL_CTL_MOD/DEL
        int fd = channel->fd();
        if(channel->isNonEvent())
        {
            update(EPOLL_CTL_DEL, channel);
            channel->set_index(kDeleted);
        }
        else
        {
            update(EPOLL_CTL_MOD, channel);
        }
    }
}

/*
向EventLoop提供的接口,删除Channel
*/
void EpollPoller::removeChannel(Channel* channel)
{
    int fd = channel->fd();
    LOG_INFO("func=%s => fd=%d \n", __FUNCTION__, fd);
    channels_.erase(fd); 
    int index = channel->index();
    if (index == kAdded)
    {
        update(EPOLL_CTL_DEL, channel);
    }
    channel->set_index(kNew);
}
/*
功能:
    为Channel封装的文件描述符和Event注册进epoll的实施动作
参数:
    operation:
        1) EPOLL_CTL_ADD
        2) EPOLL_CTL_MOD
        3) EPOLL_CTL_DEL
*/
void EpollPoller::update(int operation, Channel *channel)
{
    struct epoll_event event;
    ::bzero(&event, sizeof event);
    event.data.ptr = channel;
    event.events = channel->events();
    int fd = channel->fd();
    if(::epoll_ctl(epollfd_, operation, fd, &event) < 0){
        if(operation == EPOLL_CTL_DEL)
        {
            LOG_ERROR("epollfd : %d, fd : %d op : %d, epoll_ctl error", epollfd_, fd, EPOLL_CTL_DEL);
        }else{
            LOG_FATAL("epollfd : %d, fd : %d op : %d, epoll_ctl error", epollfd_, fd, fd);
        }
    } 
}


/*
功能:
    1.设置所有Channel的就绪事件Channel->revents
    2.向ChannelList中push发生事件的Channel 
*/
void EpollPoller::fillActiveChannels(int numEvents,
                                     ChannelList *activeChannel)
{
    for(int i = 0; i < numEvents; i++)
    {
        Channel* channel = static_cast<Channel*>(events_[i].data.ptr);
        channel->set_revents(events_[i].events);
        activeChannel->push_back(channel);
    }
}

EventLoop

EventLoopMuduo 框架的核心组件之一,它主要负责在单线程模型下驱动整个事件循环系统正常运行,从而实现高效的事件处理和回调分发。MainLoopSubLoop运行在不同的线程下,符合one loop per thread模型,MainLoop负责接受新的连接(accept)和将客户端新连接封装好根据负载均衡算法指定SubLoop递送。故SubLoop负责后续客户端的读、写事件等。

EventLoop 类中有一个私有成员变量 (threadId_),用于缓存 EventLoop 所在的线程 ID,以确保线程安全性。这看似有些多余,因为我们通常遵循 “one loop per thread”(即一个线程只能运行一个 EventLoop)的设计理念。那么,为什么还要考虑线程安全问题呢?

原因在于,尽管每个 EventLoop 本身只在它所属的线程中运行,但在实际应用中会存在跨线程操作 EventLoop 的情况。例如,在 TcpServer 类中存在 MainLoopSubLoop 池。MainLoop 线程负责接收新连接,并通过轮询(负载均衡算法)将新连接分发给其他线程中的 SubLoop。这意味着 MainLoop 线程在运行中,需要访问其他线程中 EventLoop 的指针,进行相应的分发与调整。在多线程访问同一内存区域的情境下,线程同步问题便不可避免地出现。

为了解决此类问题,EventLoop 通过记录自身所在线程的 ID,来检查调用者是否在同一线程中执行操作。如果跨线程调用不合规,EventLoop 可以及时检测并采取相应措施(如断言失败或者通过安全的跨线程回调机制处理)。这样一来,即便保持 “one loop per thread” 的基本模型,也能在需要跨线程交互的场景中更好地控制与管理事件循环间的通信,从而确保系统的稳定性和安全性。

在实际运行中,当 SubLoop 没有事件需要处理时,它会一直阻塞在 epoll_wait 调用中等待事件。此时问题在于,MainLoop 如何向处于阻塞状态的 SubLoop 发出信号,让其知道有新客户端连接到来,需要 SubLoop 负责接收呢?

原作者的解决方案是:在 SubLoop 创建一个专门用于监听唤醒信号的文件描述符,这个文件描述符通过 eventfd 系统调用创建。SubLoopEventLoop 会持续监听该文件描述符上的可读事件。一旦有新的客户端连接建立,MainLoop 就会调用 EventLoop::wakeup 函数,向这个文件描述符中写入数据,触发可读事件,从而唤醒在 epoll_wait 上阻塞的 SubLoop。这样,SubLoop 就能够立即响应新连接的到来并进行处理。

wakeup 的作用仅限于唤醒 SubLoop(实际上只是在相关文件描述符中写入少量数据),除此之外并不会直接传递任何新客户端连接的信息。那么,MainLoop 又是如何将新客户端连接的信息传递给 SubLoop,并让 SubLoopPoller 为新连接的套接字进行注册呢?

答案是:在 MainLoop 中事先为 SubLoop 注册了一个回调函数。当 SubLoopwakeup 唤醒后,会调用 EventLoop::handleRead(),进而执行已经注册好的回调函数。若有多个此类回调函数,就需要借助一个队列对它们进行管理。

这个维护回调函数的队列正是 EventLoop::pendingFunctors_

EventLoop代码

EventLoop.h

class EventLoop : noncopyable
{
public:
    using Functor = std::function<void()>;

    EventLoop();
    ~EventLoop();

    // 开启事件循环
    void loop();
    // 退出事件循环
    void quit();

    Timestamp pollReturnTime() const { return pollReturnTime_; }
    // 在当前loop中执行cb
    void runInLoop(Functor cb);
    void queueInLoop(Functor cb);
    // 唤醒loop所在的线程
    void wakeup();

    void updateChannel(Channel* channel);
    void removeChannel(Channel* channel);
    // 判断当前线程是否为创建EventLoop中的线程
    bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }

private:
    // WakeupFd的回调函数
    void handleRead();
    void doPendingFunctor();

private:
    using ChannelList = std::vector<Channel*>;

    std::atomic_bool looping_;  // 是否在epoll_wait循环中
    std::atomic_bool quit_;     // 是否退出
    std::atomic_bool eventHanding_;// 是否在处理注册的回调函数
    std::atomic_bool callingPendingFunctors_; /* atomic */
    const pid_t threadId_;   // 用来标识此EventLoop的线程号
    Timestamp pollReturnTime_;// poller返回发生事件的时间点
    std::unique_ptr<Poller> poller_;// EventLoop绑定的poller

    // mainloop与subloop的通信方式, 在subloop中监听这个文件描述符,如果读事件就绪,代表mainloop派发了一个新的Channel 
    int wakeupFd_;  
    // 监听wakeupFd的Channel
    std::unique_ptr<Channel> wakeupChannel_;

    // epoll_wait中返回的活跃的Channel集合
    ChannelList activeChannels;

    std::mutex mutex_; // 保护pendingFunctors的线程安全
    std::vector<Functor> pendingFunctors_; // 存储此loop需要执行的所有回调函数
};

解析

  • 构造函数:
    SubLoop线程中,会在线程的(后面会讲)上构建一个EventLoop对象,从而调用构造函数。threadId_(CurrentThread::tid())其实就是为EventLoop对象绑定其所属线程ID。(CurrentThread 的具体实现将在后文讨论)。
  • loop()
    负责执行事件循环的主要逻辑,不断监听在poller_所注册的事件。若有事件发生,则执行该事件对应的回调函数,之后调用doPendingFunctor()方法。
  • runInLoop(Functor cb)
    该方法用于让 cb 回调函数在 EventLoop 所属的线程内执行。如果调用者不在其所属线程,则会通过 queueInLoopcb 注册到 pendingFunctors_ 队列中,以便后续在适当的时候被调用。
  • doPendingFunctor()
    该方法执行pendingFunctors_内的回调函数。作者用了一个很巧妙的代码思路:预先定义一个临时的 std::vector<Functor> 对象,并在临界区内使用 swap() 函数将 pendingFunctors_ 和临时对象进行交换。这样做的好处是,临界区内只需要执行 swap() 操作,而不需要逐一执行回调函数。
    按照正常逻辑,如果在临界区内直接执行 pendingFunctors_ 中的所有回调函数,会导致其他线程在尝试通过 queueInLoop() 注册新的回调时,必须等待所有回调函数执行完毕并且锁释放。这在多线程环境下会显著降低效率。通过交换操作,doPendingFunctor() 能够快速地将当前的回调函数队列转移到临时对象中,释放锁后再在临时对象上执行回调函数,从而大幅提升并发性能和整体效率。

EventLoop.cc

// 防止一个线程创建多个Eventloop
__thread EventLoop* t_loopInThisThread = nullptr;
// 定义默认的Poller IO复用接口的超时函数
const int kPoolTimeMs = 10000;


int createEventfd()
{
    int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    if(evtfd< 0)
    {
        LOG_FATAL("threadId : %d create eventfd error", CurrentThread::tid());
    }
    return evtfd;
}

EventLoop::EventLoop() :
    looping_(false),
    quit_(false),
    eventHanding_(false),
    callingPendingFunctors_(false),
    threadId_(CurrentThread::tid()),
    poller_(Poller::newDefaultPoller(this)),
    wakeupFd_(createEventfd()),
    wakeupChannel_(new Channel(this, wakeupFd_)) 
{
    LOG_DEBUG("thread %d Eventloop created", CurrentThread::tid());
    if(t_loopInThisThread){
        LOG_FATAL("Another EventLoop %p exists in this thread %d.", t_loopInThisThread, threadId_);
    }else{
        t_loopInThisThread = this;
    }

    wakeupChannel_->setReadCallback(
        std::bind(&EventLoop::handleRead, this)
    );
        
    wakeupChannel_->enableReading();
}

EventLoop::~EventLoop()
{
    LOG_DEBUG("EventLoop %p of thread %d destructs in thread %d", 
        this, threadId_, CurrentThread::tid());
    wakeupChannel_->disableAll();
    wakeupChannel_->remove();
    ::close(wakeupFd_);
    t_loopInThisThread = nullptr;
}

void EventLoop::loop()
{
    looping_ = true;
    quit_ = false;
    LOG_DEBUG("EventLoop %p start looping ", this);

    while(!quit_)
    {
        activeChannels.clear();
        pollReturnTime_ = poller_->poll(kPoolTimeMs, &activeChannels);

        eventHanding_ = true;
        for(auto channel : activeChannels)
        {
            // Poller监听哪些channel发生事件了,上报给Event
            channel->handleEvent(pollReturnTime_);
        }
        eventHanding_ = false;
        // 执行当前EventLoop事件循环需要处理的回调操作
        /*
        
        */
        doPendingFunctor();
    }

    LOG_DEBUG("EventLoop %p stop looping", this);
    looping_ = false;
}


void EventLoop::quit()
{
    /*
    事件循环退出时机:
        1. 在自己的线程内调用
        2. 在其他线程内调用该函数
             2.1 如果EventLoop在阻塞中,wakeup可以唤醒进而退出循环
    */
    quit_ = true;

    if(!isInLoopThread())
    {
        wakeup();
    }
}

void EventLoop::runInLoop(Functor cb)
{
    /*
    让某个任务(回调函数cb)在当前线程内执行。
    主要用于线程安全和任务调度,确保任务的执行环境和EventLoop的线程一致
    */
    if(isInLoopThread())
    {
        cb();
    }
    else
    {
        queueInLoop(std::move(cb));
    }
}

void EventLoop::queueInLoop(Functor cb)
{
    /*
    将任务放入pendingFunctors:
        1. 外部线程调用queueInLoop -- !isInLoopThread
        2. 本线程正在执行回调函数 -- callingPendingFunctors_
            2.1 如果不添加这个判断条件,很有可能导致EventLoop一直阻塞
    */
    {
        std::lock_guard<std::mutex> mutex(mutex_);
        pendingFunctors_.emplace_back(std::move(cb));
    }
    if (!isInLoopThread() || callingPendingFunctors_)
    {
        wakeup();
    }
}

void EventLoop::updateChannel(Channel *channel)
{
    poller_->updateChannel(channel);
}

void EventLoop::removeChannel(Channel *channel)
{
    poller_->removeChannel(channel);
}

void EventLoop::handleRead()
{
    u_int64_t one = 1;
    size_t n = ::read(wakeupFd_, &one, sizeof one);
    if(n != sizeof one)
    {
        LOG_ERROR("EventLoop::handleRead() reads %ld bytes instead of 8", n);
    }
}

void EventLoop::doPendingFunctor()
{
    std::vector<Functor> functors;
    callingPendingFunctors_ = true;

    {
        std::lock_guard<std::mutex> mutex(mutex_);
        functors.swap(pendingFunctors_);
    }

    for(const Functor& functor : functors)
    {
        functor();
    }

    callingPendingFunctors_ = false;
}

void EventLoop::wakeup()
{
    u_int64_t one = 1;
    size_t n = ::write(wakeupFd_, &one, sizeof one);
    if(n != sizeof one)
    {
        LOG_ERROR("EventLoop::wakeup() writes %ld bytes instead of 8", n);
    }
}


void EventLoop::handleRead()
{
	u_int64_t one = 1;
	size_t n = ::read(wakeupFd_, &one, sizeof one);
    if(n != sizeof one)
    {
        LOG_ERROR("EventLoop::handleRead() reads %ld bytes instead of 8", n);
    }
}

void EventLoop::doPendingFunctor()
{
    std::vector<Functor> functors;
    callingPendingFunctors_ = true;

    {
        std::lock_guard<std::mutex> mutex(mutex_);
        functors.swap(pendingFunctors_);
    }

    for(const Functor& functor : functors)
    {
        functor();
    }

    callingPendingFunctors_ = false;
}

void EventLoop::wakeup()
{
    u_int64_t one = 1;
    size_t n = ::write(wakeupFd_, &one, sizeof one);
    if(n != sizeof one)
    {
        LOG_ERROR("EventLoop::wakeup() writes %ld bytes instead of 8", n);
    }
}

类图

image-20241210162647589


http://www.kler.cn/a/430627.html

相关文章:

  • X Window System 架构概述
  • 强化学习笔记(5)——PPO
  • Haskell语言的多线程编程
  • 笔试-排列组合
  • unity学习23:场景scene相关,场景信息,场景跳转
  • SQL Server查询计划操作符(7.3)——查询计划相关操作符(5)
  • Java 基础之 XQuery:强大的 XML 查询语言
  • 【C++指南】类和对象(七):友元
  • html小白初学
  • go语言zero框架对接阿里云消息队列MQ的rabbit的配置与调用
  • Java项目--仿RabbitMQ的消息队列--需求分析
  • PVE修改IP地址
  • 基于 SSM 的个性化商铺系统:先进架构保障系统稳定性
  • el-table手动触发懒加载
  • 【优选算法】哈希表
  • 文件下载和图片预览 Blob FileReader
  • elementUI修改table样式
  • SQL面试题——连续问题 连续点击三次用户
  • 5G中的随机接入过程可以不用收RAR?
  • Android可长按拖拽item的列表
  • U2F和FIDO2 两种安全认证技术优劣势对比
  • 【万字详解】三维重建(二)——NeRF、NeuS、MeshUDF、NeuralUDF、3DGS、GShell
  • C语言单元总结
  • 虚幻引擎游戏开发专题-1-引擎术语
  • 关于转包关系和挂靠关系的认定
  • 【JavaEE初阶】CSS