Muduo网络库解析---事件循环模块
文章目录
- 前言
- Channel
- Channel代码
- Channel.h
- Channel.cc
- Poller
- Poller代码
- Poller.h
- Poller.cc
- EpollPoller
- EpollPoller代码
- EpollPoller.h
- EpollPoller.cc
- EventLoop
- EventLoop代码
- EventLoop.h
- EventLoop.cc
- 类图
前言
重写Muduo库实现核心模块的Git仓库
注:本文将重点剖析 Muduo
网络库的核心框架,深入探讨作者精妙的代码设计思路,并针对核心代码部分进行重写,将原本依赖 boost
的实现替换为原生的 C++11 语法。需要说明的是,本文并不打算对整个 Muduo
库进行完整的重写。Muduo库源码链接
在前文中,我们对Muduo
的基础模块Logger
、Timestamp
和Buffer
有了初步了解(如果还未阅读过的同学可先参考这篇文章进行回顾)。接下来,我们将深入探讨Muduo
的事件循环相关模块,包括:
- Channel
- Poller
- EpollPoller
- EventLoop
通过对这些模块的剖析,我们将逐步了解事件循环的运行机制和高效的事件分发策略,从而进一步理解 Muduo
在构建高性能服务器时的设计思路与实现细节。接下来,我们将以事件循环的主执行单元 EventLoop
为线索,分别阐述 Channel
、Poller
、EpollPoller
、EventLoop
的职责、设计理念以及交互关系。
Channel
Channel
是 Muduo
事件循环框架中连接文件描述符与事件处理逻辑的中间抽象层。其主要作用是为某个特定的 I/O 通道(通常是一个套接字文件描述符)绑定对应的事件回调函数,并协助 EventLoop
对发生的事件进行分发处理。
简而言之,Channel
并不关心 I/O 操作的具体过程,它更像是一个“桥梁”或“适配器”,将底层的文件描述符事件与上层的业务回调关联起来。它的核心功能和责任包括:
- 文件描述符与事件的关联:
Channel
持有一个文件描述符(如套接字)和一组感兴趣的事件类型(如可读事件、可写事件)。当底层 I/O 多路复用机制(如epoll
)检测到该文件描述符有事件发生时,会通知对应的Channel
。 - 事件回调的注册与触发:
Channel
不负责事件的检测,但负责事件的回调分发。当Poller
(如EpollPoller
) 返回已发生的事件时,EventLoop
会调用Channel
的回调函数(如读回调、写回调、关闭回调、错误回调)。借助Channel
,框架实现了对不同 I/O 通道的统一管理。 - 与
EventLoop
、Poller
协作:
EventLoop
中维护一组与当前线程相关的Channel
。每个Channel
会在内部存储自己的感兴趣事件,并在必要时由EventLoop
通过Poller
注册、修改、删除对相应文件描述符的监听。当事件发生时,Poller
收集事件并通知EventLoop
,EventLoop
随后调用Channel
的相应回调函数。 - 简化上层逻辑:
有了Channel
这个抽象,上层应用代码只需负责定义事件发生时的处理逻辑(即回调函数),而不必直接与底层的多路复用接口交互。这大大简化了事件循环框架的使用难度和代码复杂度。
总之,Channel
是 Muduo
中连接底层事件检测机制与上层事件处理逻辑的关键组件,通过统一的接口与抽象,它极大地降低了事件处理的复杂性,使用户可以更直观地编写网络事件处理代码。
Channel代码
Channel.h
class Channel : noncopyable
{
public:
using EventCallback = std::function<void()>;
using ReadEventCallback = std::function<void(Timestamp)>;
Channel(EventLoop* eventloop, int fd);
~Channel();
void handleEvent(Timestamp receiveTime);
void setReadCallback(ReadEventCallback cb) { readCallback_ = std::move(cb); }
void setWriteCallback(EventCallback cb) { writeCallback_ = std::move(cb); }
void setCloseCallback(EventCallback cb) { closeCallback_ = std::move(cb); }
void setErrorCallback(EventCallback cb) { errorCallback_ = std::move(cb); }
// 将channel绑定到shared_ptr管理的owner对象,
// 防止在handleEvent中销毁owner对象
void tie(const std::shared_ptr<void>&);
int fd() const { return fd_; }
int events() const { return events_; }
// 被poller调用设置已就绪的事件
void set_revents(int revt) { revents_ = revt; }
bool isNonEvent() const { return events_ == kNoneEvent; }
void enableReading() { events_ |= kReadEvent; update();}
void disableReading(){ events_ &= ~kReadEvent; update();}
void enableWriting() { events_ |= kWriteEvent; update();}
void disableWriting(){ events_ &= ~kWriteEvent; update();}
void disableAll() { events_ = kNoneEvent; update();}
bool isReading()const{ return events_ & kReadEvent; }
bool isWriting()const{ return events_ & kWriteEvent; }
int index() const { return index_; }
void set_index(int index) { index_ = index; }
EventLoop* ownerLoop() const { return loop_; }
void remove();
private:
void update();
void handleEventWithGuard(Timestamp receiveTime);
private:
static const int kNoneEvent; // 无事件
static const int kReadEvent; // 读事件
static const int kWriteEvent;// 写事件
EventLoop* loop_; // 该Channel所绑定的EventLoop
const int fd_; // 封装的fd
int events_; // 注册事件
int revents_; // 就绪事件
/*index_描述当前Channel的状态:
-1 : 新添加,还未注册到epoll
1 : 已注册并添加到epoll
2 : 已删除
*/
int index_;
// 通过weak_ptr将Channel绑定到TcpConnection
std::weak_ptr<void> tie_;
// 是否绑定
bool tied_;
ReadEventCallback readCallback_;// 读回调
EventCallback writeCallback_; // 写回调
EventCallback closeCallback_; // 关闭回调
EventCallback errorCallback_; // 错误回调
};
解析:
几个重要的对外接口:
- 几个设置回调函数的公有函数
tie
:将Channel
绑定到给定的参数set_revents
:用来设置Channel
的就绪事件handleEvent
:根据就绪事件revents
,执行回调函数
Channel.cc
const int Channel::kNoneEvent = 0;
// EPOLLPRI: 当文件描述符上有紧急数据时,触发
const int Channel::kReadEvent = EPOLLIN | EPOLLPRI;
const int Channel::kWriteEvent = EPOLLOUT;
Channel::Channel(EventLoop *eventloop, int fd) :
loop_(eventloop),
fd_(fd),
events_(0),
revents_(0),
index_(-1),
tied_(false)
{}
Channel::~Channel() {}
void Channel::handleEvent(Timestamp receiveTime)
{
/*
处理回调函数的向外提供的接口
在EventLoop中调用
*/
if(tied_){
std::shared_ptr<void> guard = tie_.lock(); // 提升
if(guard){
handleEventWithGuard(receiveTime);
}
}else{
handleEventWithGuard(receiveTime);
}
}
// 在TcpConnection::connectEstablish中调用channel::tie
void Channel::tie(const std::shared_ptr<void>& obj)
{
tie_ = obj;
tied_ = true;
}
void Channel::remove()
{
loop_->removeChannel(this);
}
// 当改变channel所表示的fd的events后,需要在poller里面更改fd相应的事件epoll_ctl
void Channel::update()
{
loop_->updateChannel(this);
}
void Channel::handleEventWithGuard(Timestamp receiveTime)
{
/*
调用回调函数的实施接口
*/
if((revents_ & EPOLLHUP) && !(revents_ & EPOLLIN))
{
/*
EPOLLHUP: 表示文件描述符挂起事件,通常表示远端关闭连接
*/
if(closeCallback_){
closeCallback_();
}
}
if(revents_ & EPOLLERR){
if(errorCallback_){
errorCallback_();
}
}
if(revents_ & (EPOLLIN | EPOLLPRI)){
if(readCallback_){
readCallback_(receiveTime);
}
}
if(revents_ & EPOLLOUT){
if(writeCallback_){
writeCallback_();
}
}
}
Poller
Poller
是 Muduo
中对底层事件分发器(Demultiplexer,如 epoll
、poll
等机制)的抽象封装。它负责对已注册的文件描述符进行监听,并在事件准备就绪(如套接字可读或可写)时返回相应事件。通过这种抽象,EventLoop
无需直接与底层的 I/O 多路复用机制交互,而可以统一调用 Poller
接口进行事件的获取和分发,从而提升代码的可扩展性与可维护性。
Poller代码
Poller.h
class Poller : noncopyable
{
public:
using ChannelList = std::vector<Channel*>;
Poller(EventLoop*);
virtual ~Poller() = default;
// 给所有IO复用提供统一的接口
virtual Timestamp poll(int timeout, ChannelList* activeChannels) = 0;
virtual void updateChannel(Channel*) = 0;
virtual void removeChannel(Channel*) = 0;
bool hasChannel(Channel* channel) const;
// Eventloop可以通过该接口获取默认的IO复用的对象
static Poller* newDefaultPoller(EventLoop* loop);
protected:
using ChannelMap = std::unordered_map<int, Channel*>;
ChannelMap channels_;
private:
EventLoop* ownerLoop_;
};
解析:
由于 Poller
是对事件分发器(Demultiplexer)的抽象,因此它作为抽象基类被其他具体的事件分发实现类(如 PollPoller
、EpollPoller
)继承和扩展。为此,Poller
必须提供统一的接口规范,以便各具体实现类能够按照这一统一接口进行定制和实现。具体接口如下:
poll(int timeout, ChannelList* activeChannels)
:事件分发器的核心函数。对于不同的事件分发器具体类,该函数会调用各自底层的IO多路复用系统调用(如poll()
、epoll_wait()
)。当有事件就绪时,它会将相应的事件填充到activeChannels
updateChannel(Channel*)
用于在底层事件分发器中更新指定Channel
对文件描述符所关注的事件类型(如可读、可写等)。emoveChannel(Channel*)
则用于将指定的Channel
从事件监听中移除。newDefaultPoller(EventLoop* loop)
:此方法为静态方法,用于其派生类构造一个新的Poller
。
Poller.cc
Poller::Poller(EventLoop *loop) : ownerLoop_(loop){}
bool Poller::hasChannel(Channel *channel) const
{
auto it = channels_.find(channel->fd());
return it != channels_.end() && it->second == channel;
}
这里有个不同点在于,作者将newDefaultPoller
方法定义到其他文件上(DefaultPoller.cc
):
Poller* Poller::newDefaultPoller(EventLoop* loop)
{
if(::getenv("MUDUO_USE_POLL")){
return nullptr;
}
return new EpollPoller(loop);
}
EpollPoller
EpollPoller
是继承自 Poller
的具体事件分发器实现类。在我们的项目中将只实现 EpollPoller
,而不涉及 PollPoller
,这是因为 Muduo
默认使用 EpollPoller
,并且相较于 poll
,epoll
的效率与性能表现更为优异。
epoll
IO多路复用的接口分别有:
epoll_create
epoll_ctl
- EPOLL_CTL_ADD
- EPOLL_CTL_MOD
- EPOLL_CTL_DEL
epoll_wait
EpollPoller
对这几个系统调用进行了一系列封装,并针对 epoll_wait
函数的 struct epoll_event *events
参数采用了动态扩容策略。具体做法如下:
使用
std::vector<struct epoll_event>
作为events
的承载容器,并初始分配16个元素的空间。当一次epoll_wait
调用返回后,如果就绪事件的数量等于当前vector
的容量,则将其容量扩充为原来的两倍,以应对后续可能出现的更多事件,从而避免频繁的内存分配操作。
EpollPoller代码
EpollPoller.h
class EpollPoller : public Poller
{
public:
EpollPoller(EventLoop* loop);
~EpollPoller() override;
// 重写基类接口
Timestamp poll(int timeout, ChannelList* activeChannels) override;
void updateChannel(Channel*) override;
void removeChannel(Channel*) override;
private:
void update(int operation, Channel* channel);
void fillActiveChannels(int numEvents, ChannelList* activeChannel);
private:
using EventList = std::vector<struct epoll_event>;
// Events初始化为16
static const int kInitEventListSize = 16;
int epollfd_; // epoll_create返回的文件描述符
// 作为epoll_wait中events的承载容器
EventList events_;
};
EpollPoller.cc
/*
表示Channel的状态
*/
const int kNew = -1;
const int kAdded = 1;
const int kDeleted = 2;
EpollPoller::EpollPoller(EventLoop *loop):
Poller(loop),
// EPOLL_CLOEXEC:当进程执行exec时,自动关闭该文件描述符
epollfd_(::epoll_create1(EPOLL_CLOEXEC)),
events_(kInitEventListSize)
{
if(epollfd_ < 0)
{
LOG_ERROR("EPollPoller Create epollfd error\n");
}
}
EpollPoller::~EpollPoller()
{
::close(epollfd_);
}
/*
功能:
向EventLoop提供的接口,监听哪些fd发生事件
参数:
timeout(传入参数): 超时时间
activeChannels(传出参数): 通过fillActiveChannels函数push所有发生事件的Channel
*/
Timestamp EpollPoller::poll(int timeout, ChannelList *activeChannels)
{
//LOG_DEBUG("poll start");
int numEvents = epoll_wait(epollfd_, &*events_.begin(), events_.size(), timeout);
Timestamp now(Timestamp::now());
int saveError = errno;
if(numEvents > 0)
{
LOG_DEBUG("%d Events happened", numEvents);
fillActiveChannels(numEvents, activeChannels);
if(numEvents == events_.size())
{
events_.resize(2 * events_.size());
}
}
else if(numEvents == 0)
{
//LOG_DEBUG("%s timeout!", __FUNCTION__);
}
else
{
if(saveError != EINTR)
{
errno = saveError;
LOG_ERROR("EpollPoller::poll() err!");
}
}
return now;
}
/*
功能:
向EventLoop提供接口,修改Channel所注册的事件
*/
void EpollPoller::updateChannel(Channel* channel)
{
const int index = channel->index();
LOG_INFO("EpollPoller::%s => fd=%d events=%d index=%d", __FUNCTION__, channel->fd(), channel->events(), channel->index());
if(index == kNew || index == kDeleted)
{
int fd = channel->fd();
if(index == kNew)
{
channels_[fd] = channel;
}
channel->set_index(kAdded);
update(EPOLL_CTL_ADD, channel);
}
else
{
// update existing one with EPOLL_CTL_MOD/DEL
int fd = channel->fd();
if(channel->isNonEvent())
{
update(EPOLL_CTL_DEL, channel);
channel->set_index(kDeleted);
}
else
{
update(EPOLL_CTL_MOD, channel);
}
}
}
/*
向EventLoop提供的接口,删除Channel
*/
void EpollPoller::removeChannel(Channel* channel)
{
int fd = channel->fd();
LOG_INFO("func=%s => fd=%d \n", __FUNCTION__, fd);
channels_.erase(fd);
int index = channel->index();
if (index == kAdded)
{
update(EPOLL_CTL_DEL, channel);
}
channel->set_index(kNew);
}
/*
功能:
为Channel封装的文件描述符和Event注册进epoll的实施动作
参数:
operation:
1) EPOLL_CTL_ADD
2) EPOLL_CTL_MOD
3) EPOLL_CTL_DEL
*/
void EpollPoller::update(int operation, Channel *channel)
{
struct epoll_event event;
::bzero(&event, sizeof event);
event.data.ptr = channel;
event.events = channel->events();
int fd = channel->fd();
if(::epoll_ctl(epollfd_, operation, fd, &event) < 0){
if(operation == EPOLL_CTL_DEL)
{
LOG_ERROR("epollfd : %d, fd : %d op : %d, epoll_ctl error", epollfd_, fd, EPOLL_CTL_DEL);
}else{
LOG_FATAL("epollfd : %d, fd : %d op : %d, epoll_ctl error", epollfd_, fd, fd);
}
}
}
/*
功能:
1.设置所有Channel的就绪事件Channel->revents
2.向ChannelList中push发生事件的Channel
*/
void EpollPoller::fillActiveChannels(int numEvents,
ChannelList *activeChannel)
{
for(int i = 0; i < numEvents; i++)
{
Channel* channel = static_cast<Channel*>(events_[i].data.ptr);
channel->set_revents(events_[i].events);
activeChannel->push_back(channel);
}
}
EventLoop
EventLoop
是 Muduo
框架的核心组件之一,它主要负责在单线程模型下驱动整个事件循环系统正常运行,从而实现高效的事件处理和回调分发。MainLoop
和SubLoop
运行在不同的线程下,符合one loop per thread
模型,MainLoop
负责接受新的连接(accept
)和将客户端新连接封装好根据负载均衡算法指定SubLoop
递送。故SubLoop
负责后续客户端的读、写事件等。
EventLoop
类中有一个私有成员变量 (threadId_
),用于缓存 EventLoop
所在的线程 ID,以确保线程安全性。这看似有些多余,因为我们通常遵循 “one loop per thread”(即一个线程只能运行一个 EventLoop
)的设计理念。那么,为什么还要考虑线程安全问题呢?
原因在于,尽管每个
EventLoop
本身只在它所属的线程中运行,但在实际应用中会存在跨线程操作EventLoop
的情况。例如,在TcpServer
类中存在MainLoop
和SubLoop
池。MainLoop
线程负责接收新连接,并通过轮询(负载均衡算法)将新连接分发给其他线程中的SubLoop
。这意味着MainLoop
线程在运行中,需要访问其他线程中EventLoop
的指针,进行相应的分发与调整。在多线程访问同一内存区域的情境下,线程同步问题便不可避免地出现。为了解决此类问题,
EventLoop
通过记录自身所在线程的 ID,来检查调用者是否在同一线程中执行操作。如果跨线程调用不合规,EventLoop
可以及时检测并采取相应措施(如断言失败或者通过安全的跨线程回调机制处理)。这样一来,即便保持 “one loop per thread” 的基本模型,也能在需要跨线程交互的场景中更好地控制与管理事件循环间的通信,从而确保系统的稳定性和安全性。
在实际运行中,当 SubLoop
没有事件需要处理时,它会一直阻塞在 epoll_wait
调用中等待事件。此时问题在于,MainLoop
如何向处于阻塞状态的 SubLoop
发出信号,让其知道有新客户端连接到来,需要 SubLoop
负责接收呢?
原作者的解决方案是:在
SubLoop
创建一个专门用于监听唤醒信号的文件描述符,这个文件描述符通过eventfd
系统调用创建。SubLoop
的EventLoop
会持续监听该文件描述符上的可读事件。一旦有新的客户端连接建立,MainLoop
就会调用EventLoop::wakeup
函数,向这个文件描述符中写入数据,触发可读事件,从而唤醒在epoll_wait
上阻塞的SubLoop
。这样,SubLoop
就能够立即响应新连接的到来并进行处理。
wakeup
的作用仅限于唤醒 SubLoop
(实际上只是在相关文件描述符中写入少量数据),除此之外并不会直接传递任何新客户端连接的信息。那么,MainLoop
又是如何将新客户端连接的信息传递给 SubLoop
,并让 SubLoop
的 Poller
为新连接的套接字进行注册呢?
答案是:在
MainLoop
中事先为SubLoop
注册了一个回调函数。当SubLoop
被wakeup
唤醒后,会调用EventLoop::handleRead()
,进而执行已经注册好的回调函数。若有多个此类回调函数,就需要借助一个队列对它们进行管理。这个维护回调函数的队列正是
EventLoop::pendingFunctors_
。
EventLoop代码
EventLoop.h
class EventLoop : noncopyable
{
public:
using Functor = std::function<void()>;
EventLoop();
~EventLoop();
// 开启事件循环
void loop();
// 退出事件循环
void quit();
Timestamp pollReturnTime() const { return pollReturnTime_; }
// 在当前loop中执行cb
void runInLoop(Functor cb);
void queueInLoop(Functor cb);
// 唤醒loop所在的线程
void wakeup();
void updateChannel(Channel* channel);
void removeChannel(Channel* channel);
// 判断当前线程是否为创建EventLoop中的线程
bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }
private:
// WakeupFd的回调函数
void handleRead();
void doPendingFunctor();
private:
using ChannelList = std::vector<Channel*>;
std::atomic_bool looping_; // 是否在epoll_wait循环中
std::atomic_bool quit_; // 是否退出
std::atomic_bool eventHanding_;// 是否在处理注册的回调函数
std::atomic_bool callingPendingFunctors_; /* atomic */
const pid_t threadId_; // 用来标识此EventLoop的线程号
Timestamp pollReturnTime_;// poller返回发生事件的时间点
std::unique_ptr<Poller> poller_;// EventLoop绑定的poller
// mainloop与subloop的通信方式, 在subloop中监听这个文件描述符,如果读事件就绪,代表mainloop派发了一个新的Channel
int wakeupFd_;
// 监听wakeupFd的Channel
std::unique_ptr<Channel> wakeupChannel_;
// epoll_wait中返回的活跃的Channel集合
ChannelList activeChannels;
std::mutex mutex_; // 保护pendingFunctors的线程安全
std::vector<Functor> pendingFunctors_; // 存储此loop需要执行的所有回调函数
};
解析:
- 构造函数:
在SubLoop
线程中,会在线程的栈(后面会讲)上构建一个EventLoop
对象,从而调用构造函数。threadId_(CurrentThread::tid())
其实就是为EventLoop
对象绑定其所属线程ID。(CurrentThread
的具体实现将在后文讨论)。 loop()
:
负责执行事件循环的主要逻辑,不断监听在poller_
所注册的事件。若有事件发生,则执行该事件对应的回调函数,之后调用doPendingFunctor()
方法。runInLoop(Functor cb)
:
该方法用于让cb
回调函数在EventLoop
所属的线程内执行。如果调用者不在其所属线程,则会通过queueInLoop
将cb
注册到pendingFunctors_
队列中,以便后续在适当的时候被调用。doPendingFunctor()
:
该方法执行pendingFunctors_
内的回调函数。作者用了一个很巧妙的代码思路:预先定义一个临时的std::vector<Functor>
对象,并在临界区内使用swap()
函数将pendingFunctors_
和临时对象进行交换。这样做的好处是,临界区内只需要执行swap()
操作,而不需要逐一执行回调函数。
按照正常逻辑,如果在临界区内直接执行pendingFunctors_
中的所有回调函数,会导致其他线程在尝试通过queueInLoop()
注册新的回调时,必须等待所有回调函数执行完毕并且锁释放。这在多线程环境下会显著降低效率。通过交换操作,doPendingFunctor()
能够快速地将当前的回调函数队列转移到临时对象中,释放锁后再在临时对象上执行回调函数,从而大幅提升并发性能和整体效率。
EventLoop.cc
// 防止一个线程创建多个Eventloop
__thread EventLoop* t_loopInThisThread = nullptr;
// 定义默认的Poller IO复用接口的超时函数
const int kPoolTimeMs = 10000;
int createEventfd()
{
int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if(evtfd< 0)
{
LOG_FATAL("threadId : %d create eventfd error", CurrentThread::tid());
}
return evtfd;
}
EventLoop::EventLoop() :
looping_(false),
quit_(false),
eventHanding_(false),
callingPendingFunctors_(false),
threadId_(CurrentThread::tid()),
poller_(Poller::newDefaultPoller(this)),
wakeupFd_(createEventfd()),
wakeupChannel_(new Channel(this, wakeupFd_))
{
LOG_DEBUG("thread %d Eventloop created", CurrentThread::tid());
if(t_loopInThisThread){
LOG_FATAL("Another EventLoop %p exists in this thread %d.", t_loopInThisThread, threadId_);
}else{
t_loopInThisThread = this;
}
wakeupChannel_->setReadCallback(
std::bind(&EventLoop::handleRead, this)
);
wakeupChannel_->enableReading();
}
EventLoop::~EventLoop()
{
LOG_DEBUG("EventLoop %p of thread %d destructs in thread %d",
this, threadId_, CurrentThread::tid());
wakeupChannel_->disableAll();
wakeupChannel_->remove();
::close(wakeupFd_);
t_loopInThisThread = nullptr;
}
void EventLoop::loop()
{
looping_ = true;
quit_ = false;
LOG_DEBUG("EventLoop %p start looping ", this);
while(!quit_)
{
activeChannels.clear();
pollReturnTime_ = poller_->poll(kPoolTimeMs, &activeChannels);
eventHanding_ = true;
for(auto channel : activeChannels)
{
// Poller监听哪些channel发生事件了,上报给Event
channel->handleEvent(pollReturnTime_);
}
eventHanding_ = false;
// 执行当前EventLoop事件循环需要处理的回调操作
/*
*/
doPendingFunctor();
}
LOG_DEBUG("EventLoop %p stop looping", this);
looping_ = false;
}
void EventLoop::quit()
{
/*
事件循环退出时机:
1. 在自己的线程内调用
2. 在其他线程内调用该函数
2.1 如果EventLoop在阻塞中,wakeup可以唤醒进而退出循环
*/
quit_ = true;
if(!isInLoopThread())
{
wakeup();
}
}
void EventLoop::runInLoop(Functor cb)
{
/*
让某个任务(回调函数cb)在当前线程内执行。
主要用于线程安全和任务调度,确保任务的执行环境和EventLoop的线程一致
*/
if(isInLoopThread())
{
cb();
}
else
{
queueInLoop(std::move(cb));
}
}
void EventLoop::queueInLoop(Functor cb)
{
/*
将任务放入pendingFunctors:
1. 外部线程调用queueInLoop -- !isInLoopThread
2. 本线程正在执行回调函数 -- callingPendingFunctors_
2.1 如果不添加这个判断条件,很有可能导致EventLoop一直阻塞
*/
{
std::lock_guard<std::mutex> mutex(mutex_);
pendingFunctors_.emplace_back(std::move(cb));
}
if (!isInLoopThread() || callingPendingFunctors_)
{
wakeup();
}
}
void EventLoop::updateChannel(Channel *channel)
{
poller_->updateChannel(channel);
}
void EventLoop::removeChannel(Channel *channel)
{
poller_->removeChannel(channel);
}
void EventLoop::handleRead()
{
u_int64_t one = 1;
size_t n = ::read(wakeupFd_, &one, sizeof one);
if(n != sizeof one)
{
LOG_ERROR("EventLoop::handleRead() reads %ld bytes instead of 8", n);
}
}
void EventLoop::doPendingFunctor()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
std::lock_guard<std::mutex> mutex(mutex_);
functors.swap(pendingFunctors_);
}
for(const Functor& functor : functors)
{
functor();
}
callingPendingFunctors_ = false;
}
void EventLoop::wakeup()
{
u_int64_t one = 1;
size_t n = ::write(wakeupFd_, &one, sizeof one);
if(n != sizeof one)
{
LOG_ERROR("EventLoop::wakeup() writes %ld bytes instead of 8", n);
}
}
void EventLoop::handleRead()
{
u_int64_t one = 1;
size_t n = ::read(wakeupFd_, &one, sizeof one);
if(n != sizeof one)
{
LOG_ERROR("EventLoop::handleRead() reads %ld bytes instead of 8", n);
}
}
void EventLoop::doPendingFunctor()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
std::lock_guard<std::mutex> mutex(mutex_);
functors.swap(pendingFunctors_);
}
for(const Functor& functor : functors)
{
functor();
}
callingPendingFunctors_ = false;
}
void EventLoop::wakeup()
{
u_int64_t one = 1;
size_t n = ::write(wakeupFd_, &one, sizeof one);
if(n != sizeof one)
{
LOG_ERROR("EventLoop::wakeup() writes %ld bytes instead of 8", n);
}
}