【modou网络库】Reactor架构与TCP通信机制分析
Reactor模式
EventLoop
实现逻辑分析
针对于EventLoop的设计还是严格遵循其核心思想one loop per thread思想,也就是说一个线程只可以拥有一个EventLoop实例,那么为什么这样实现?主要有以下两点原因
- 线程安全性:多线程环境下,确保每一个线程只有一个EventLoop实例,这样就可以避免线程竞争条件,因为EventLoop内部大部分操作都是线程不安全的,必须让其所属线程来执行
- 并发性能:将每个EventLoop限定在一个特定的线程中,mudou库可以在多线程环境中并行处理事件,避免了线程之间频繁加锁和竞争的性能开销
EventLoop每次执行的时候,都会检查当前线程是否已经创建了EventLoop实例
EventLoop::EventLoop()
: looping_(false),
threadId_(CurrentThread::tid()) // 获取当前线程ID
{
LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
if (t_loopInThisThread) // 检查当前线程是否已有 EventLoop 实例
{
LOG_FATAL << "Another EventLoop " << t_loopInThisThread
<< " exists in this thread " << threadId_;
}
else
{
t_loopInThisThread = this; // 绑定当前线程的 EventLoop 实例
}
}
EventLoop内部提供了线程安全检查接口
- isInLoopThread():判断调用是否在EventLoop创建的线程中执行,如果当前线程的ID与EventLoop中保存的线程ID不匹配,那么就可以证明不是EventLoop所属的线程调用的
- assertInLoopThread():该函数的主要作用就是调用线程不在EventLoop创建的线程中,则终止程序运行
void EventLoop::assertInLoopThread()
{
if (!isInLoopThread())
{
abortNotInLoopThread(); // 终止程序
}
}
bool EventLoop::isInLoopThread() const
{
return threadId_ == CurrentThread::tid(); // 检查当前线程ID是否与EventLoop线程一致
}
EventLoop的生命周期跟随其线程相同,线程退出其也会跟着退出,它会析构之间进行检查
EventLoop::~EventLoop()
{
assert(!looping_); // 事件循环必须已经停止
t_loopInThisThread = NULL; // 清空当前线程的EventLoop指针
}
EventLoop::loop()是时间循环的核心,负责监听文件描述符等操作(代码对源码进行了缩减)
void EventLoop::loop()
{
assert(!looping_);
assertInLoopThread(); // 检查是否在EventLoop线程中调用
looping_ = true;
while (!quit_) // 事件循环,直到 quit_ 被设置为 true
{
poller_->poll(kPollTimeMs, &activeEvents); // 轮询I/O事件
for (EventList::iterator it = activeEvents.begin();
it != activeEvents.end(); ++it)
{
(*it)->handleEvent(); // 处理事件
}
doPendingFunctors(); // 执行待处理的任务
}
looping_ = false;
}
总结EventLoop的设计原则
- 每个线程只拥有一个EventLoop实例
- EventLoop的所有操作都是与其绑定线程去执行
- 内部设计的有线程安全检查机制,确保线程正确性和安全性
线程池与多进程方式对该架构优化思路
线程池优化思路
- 主线程:该线程持有一个Reactor,专门负责接收新连接,然后分发新连接给工作线程
- 线程池:多个工作线程组成,每一个工作线程都有自己的EventLoop,专门处理交付给自己的连接
- 任务分发:新连接到来的时候,主线程可以根据自己的需要使用轮询或者哈希或者其他更加高效的方式交付给任务给工作线程
- 任务处理:工作线程开始处理交付给自己的任务
多进程优化思路
- 主进程:创建监听套接字,并通过fork()创建多个子进程
- 工作进程:每一个工作进程都运行自己的EventLoop,专门负责处理来自客户端的请求,可以通过进程间通信的方式分发信息
- 负载均衡:新连接到来后,主进程就将闲着的工作进程干活
I/O线程在modou网络库中作用分析
服务器中采用I/O线程管理客户端连接(一个I/O线程也就是一个Reactor)
- 非阻塞I/O:I/O线程与非阻塞I/O配合使用功能,这样就可以避免因等待I/O操作完成而被阻塞,从而提升系统的吞吐量
- 事件驱动: I/O线程通过事件驱动机制来管理I/O事件,只有当I/O事件发生的时候才进行处理,从而有效提高性能
I/O线程在mudou的作用分析
每一个EventLoop对象都会绑定到一个线程,这个线程也就是I/O线程,然后EventLoop负责管理和处理该线程中所有的I/O事件
- socket读写事件:当有数据可读或者可写的时候,I/O线程会被唤醒,并且通过回调函数来处理对应的事件
- 定时器事件:I/O线程可能会管理一些定时任务,确保在特定的时间点触发对应的回调函数
- 任务队列:一些异步任务可以通过runInLoop()函数传递给I/O线程中,从而确保任务在I/O线程中安全的被执行
I/O线程的理解
I/O线程的核心功能就是等待并处理I/O事件,而不阻塞其他任务的执行,利用该设计允许服务器在处理大量的I/O操作的时候可以保持高效快速的效应,不需要为每一个连接创建单独的线程
因为I/O线程非常适合高并发场景,所以其应用主要在于以下几个方面
- HTTP/HTTPS服务器:处理多个客户端请求,同时保障每个连接的I/O操作都是非阻塞的
- 即时通讯:如果在大规模消息通信过程中,通过事件驱动机制,可以实现延迟消息的转发
- 数据流处理系统:对来自多个客户端的数据进行读取、处理和存储
Reactor重要结构
Channel类
核心功能分析
- I/O事件的监听和回调处理
- Channel是不拥有文件描述符的,而是绑定一个fd监听这个fd上发生的I/O事件,真正管理fd的是Connection
- Channel对不同的事件进行监听,并且可以为每一种事件类型设置对应的回调函数
- Channel与EventLoop关系
- 每个Channel对象只属于一个EventLoop,EventLoop负责管理Channel并调用其预先设置好的回到函数
- 具体实现逻辑,I/O事件发生的时候,EventLoop调用Poller首先检查fd的状态,然后通知对应Channel来处理函数
- 事件管理
- Channel内部通过标记events_ 和 revents来管理所关心的事件和当前发生的事件
- 文件描述符的管理
- Channel类中fd_表示的是当前监听的文件描述符,events_则是表示Channel关心的事件,revents_则是实际发生的事件
Poller类
Poller功能分析
- Poller类核心作用:监控多个文件描述符,然后将这些文件描述符分发给对应的Channel进行处理
- Poller是一个I/O多路复用机制的封装,在modou库中既封装了poll 还封装了epoll ,其生命周期是和EventLoop的生命周期一致的
内部机制分析
- pollfds_:因为Poller是通过pollfds_保存所有正在监听的文件描述符和其事件
- 内部也提供函数,当找到活跃的fd的时候,通知对应的Channel进行处理
void Poller::fillActiveChannels(int numEvents, ChannelList* activeChannels) const
{
for (PollFdList::const_iterator pfd = pollfds_.begin(); pfd != pollfds_.end() && numEvents > 0; ++pfd)
{
if (pfd->revents > 0)
{
--numEvents;
ChannelMap::const_iterator ch = channels_.find(pfd->fd);
Channel* channel = ch->second;
channel->set_revents(pfd->revents); // 设置事件
activeChannels->push_back(channel); // 添加到活跃的Channel列表中
}
}
}
梳理Channel与Poller类之间交互逻辑
- 新连接到达服务器后,服务器为该连接的socket创建一个Channel,同时将socket的文件描述符传递给Channel(Channel并不管理文件描述符,文件描述符的生命周期是由Connection来进行管理的)
- 服务器设置该Channel的回调函数(例如可以设置其读写异常等回调处理函数)
- 服务器通过Poller::updateChannel()会将Channel注册到Poller类中进行管理,让Poller类监听socket事件
- 当socket上有可读事件的时候,此时Poller会通知到对应的Channel,然后调用的预先设定好的回调函数
核心成员作用分析
- pollfds_
- vector数组,存储Poll()中需要关心的事件:vector<struct pollfd>
- pollfd结构体
- fd:文件描述符
- events:关心的事件类型(读写或异常)
- revents:实际发生的事件类型
- Channes_
- map<int,Channel*>:文件描述符映射到对应的fd对象上
- 作用:快速定位某个fd对应的Channel,然后在事件发生的时候找到对应的Channel进行处理
Poller::updateChannel()
void Poller::updateChannel(Channel* channel)
{
assertInLoopThread(); // 确保在正确的线程中调用
if (channel->index() < 0) // index() < 0 表示这是一个新的 Channel
{
// 构造 pollfd 结构体
struct pollfd pfd;
pfd.fd = channel->fd(); // 设置文件描述符
pfd.events = static_cast<short>(channel->events()); // 设置感兴趣的事件类型
pfd.revents = 0; // 初始化为 0,表示暂时没有事件发生
pollfds_.push_back(pfd); // 将新的 pollfd 添加到 pollfds_ 中
int idx = static_cast<int>(pollfds_.size()) - 1; // 记录该 pollfd 在 pollfds_ 中的索引
channel->set_index(idx); // 设置 Channel 的索引
channels_[pfd.fd] = channel; // 在 channels_ 映射表中保存该 Channel
}
else // 更新已有的 Channel
{
int idx = channel->index(); // 获取 Channel 在 pollfds_ 中的索引
struct pollfd& pfd = pollfds_[idx]; // 根据索引找到对应的 pollfd
pfd.fd = channel->fd(); // 更新文件描述符
pfd.events = static_cast<short>(channel->events()); // 更新感兴趣的事件类型
pfd.revents = 0; // 重置事件
if (channel->isNoneEvent()) // 如果 Channel 不关心任何事件
{
pfd.fd = -1; // 将 fd 设置为 -1,表示不关注该文件描述符
}
}
}
EventLoop类
核心函数执行过程分析 EventLoop::loop()
- 调用poll方法,阻塞等待I/O事件,获取当前活跃的文件描述符
- Poller将响应的Channel加入到activeChannels_中
- 最后对于活跃的channel,调用其回调方法处理其预先绑定的文件描述符
void EventLoop::loop()
{
assert(!looping_); // 确保当前没有在循环中
assertInLoopThread(); // 确保在正确的线程中调用
looping_ = true; // 标记开始循环
quit_ = false; // 退出标志初始化为false
while (!quit_) // 当退出标志为false时继续循环
{
activeChannels_.clear(); // 清空活跃的Channel列表
poller_->poll(kPollTimeMs, &activeChannels_); // 调用Poller,获取活跃事件
for (ChannelList::iterator it = activeChannels_.begin();
it != activeChannels_.end(); ++it)
{
(*it)->handleEvent(); // 逐个处理Channel上的事件
}
}
looping_ = false; // 事件循环结束,标记停止
}
EVentLoop::quit函数的设计
该函数就是用户通过调用该方法结束事件循环的方法,也就是将quit_变量设置为true;但是其内部具有延迟处理机制,也就是该调用执行后,不会立即退出EVentLoop,而是会等待EventLoop中所有的活动完成后再进行退出
void EventLoop::quit()
{
quit_ = true;
// 如果EventLoop不在当前线程中,可能需要唤醒
// 比如通过eventfd或者其他方法唤醒正在poll()中的线程
}
TimerQueue定时器
定时器类的设计
- 该类就是一个定时器队列的实现,主要功能就是定时任务按时间排序,并且保证线程安全
- addTimer:向定时器队列中添加定时任务,可以指定定时触发的时间和重复间隔
- cancel:取消已经添加的定时任务
数据结构的设计与选择
定时器类中选择了两种数据结构类型,其一是set容器,主要用于按照时间顺序来保存Timer;其二Timer是以pair<Timestamp,Timer*>的形式来存储的。
- set:TimerQueue的存储,使用的就是set容器,因为利用该数据结构可以以O(log n)的时间复杂度高效的查询到即将过期的Timer
- multimap :则是用来解决时间戳相同的定时任务存储问题,采用该数据结构主要就是允许多个任务具有相同的触发时间
EventLoop中增加定时器相关接口
- EventLoop类中增加定时器相关接口
- runAt():指定的时间执行某个回调函数
- 根据指定的时间,直接执行回调函数即可
- funAfter():延迟一段时间再执行回调
- 在当前时间基础上延迟一段时间执行回调,会将当前时间加上delay,然后再调用runAt
- runEvery():每隔一段时间重复执行回到
- runAt():指定的时间执行某个回调函数
TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb)
{
return timerQueue_->addTimer(cb, time, 0.0); // 第三个参数为 0.0,表示这是一次性定时器
}
TimerId EventLoop::runAfter(double delay, const TimerCallback& cb)
{
Timestamp time(addTime(Timestamp::now(), delay)); // 计算延迟后的时间
return runAt(time, cb); // 在计算出的时间点运行回调
}
TimerId EventLoop::runEvery(double interval, const TimerCallback& cb)
{
Timestamp time(addTime(Timestamp::now(), interval)); // 计算首次触发的时间
return timerQueue_->addTimer(cb, time, interval); // 第三个参数为 interval,表示周期性定时器
}
跨线程安全性问题
EventLoop的定时器接口是允许跨线程调用的,也就是用户可以从不同的线程去调用这些定时器的函数,不需要担心线程安全问题。mudou库在此的解决办法则是通过runInLoop()函数,将定时器任务的操作转移到IO线程中执行,而不是加锁。
简单来说起流程,首先runInLoop函数会检查当前线程是否是EventLoop所属于的IO线程,如果是则直接执行任务,如果不是,则将任务添加到一个任务队列中,然后由IO线程执行
分析从EventLoop中的事件循环如何处理定时任务
- EventLoop事件循环
- EventLoop::loop()方法启动循环,开始监听文件描述符事件,检查I/O事件或者定时器事件是否响应
- Poller监听事件
- 多路复用(epoll 或者 poll)来监听多个文件描述符,等待事件发生,当定时器超时,poller::poll()就返回,然后告诉EventLoop中发生了什么事件
- 激活Channel的列表返回给EventLoop
- 只要检测到Channel对象,就会将这些对象交给EventLoop,然后定时器事件与TimerQueue中的Channel对象相关联,表示该定时器已经超时,准备执行回调函数
- EventLoop处理Channel事件
- TimerQueue获取已经过期的定时器
- 如果激活了定时器的回调函数,则执行该回调函数
EventLoop::runInLoop()函数
runInLoop函数逻辑分析
该函数作用:允许程序在运行的时候,在当前事件循环线程中安全的执行某个用户提供的回调函数
执行逻辑
- 如果用户在当前事件循环线程中调用runInLoop(),那么回调函数会立即执行
- 如果用户是在其他线程中调用runInLoop(),则回调函数会被加入任务队列,等待当前事件循环的线程来进行处理(通过该中方式,避免了切换线程所带来的线程安全问题)
void EventLoop::runInLoop(const Functor& cb)
{
if (isInLoopThread()) {
cb(); // 如果是在IO线程中,直接执行回调函数
} else {
queueInLoop(cb); // 否则,将回调函数放入队列,稍后在IO线程中执行
}
}
根据主从Reactor模型理解该处执行逻辑,例如两个线程分别运行一个Reactor,A线程向B线程发送了一个任务,为了保证线程安全,A线程是无法直接操作B线程的资源的(需要将它们两个的资源进行开,从而避免线程争夺临界资源的问题),而是通过runInLoop()函数将任务交给B线程来处理
跨线程的任务队列
如果回调函数不可以立即执行,比如主线程中的runInLoop(),任务会被加入一个等待队列中等待执行,为了保证线程安全,这个任务需要进行加锁保护
void EventLoop::queueInLoop(const Functor& cb)
{
{
MutexLockGuard lock(mutex_);
pendingFunctors_.push_back(cb); // 将任务加入队列
}
if (!isInLoopThread() || callingPendingFunctors_) {
wakeup(); // 如果当前线程不是IO线程,或者正处于执行其他任务,唤醒IO线程
}
}
weakup()唤醒机制
该处逻辑可以简单的理解为一个人通过一种特定的方式唤醒另一个人起来干活
wakeup()的作用就是通过向IO线程发出信号,例如通过eventfd机制发出信号,从而打断poll()的阻塞,让IO线程可以立刻处理新加入的任务,这也是一种跨线程通信的经典方法
通过该机制可以实现即使IO线程处于阻塞状态,也可以被快速唤醒,处理主线程提交的任务
void EventLoop::wakeup()
{
uint64_t one = 1;
ssize_t n = ::write(wakeupFd_, &one, sizeof(one)); // 向wakeupFd写数据,唤醒IO线程
}
类似于双缓冲区机制的doPendingFunctors()函数的执行逻辑
该函数的执行逻辑就是执行所有已经加入到任务队列中的任务,其底层逻辑不是通过遍历一个一个的执行任务,而是通过任务队列与局部变量进行交换,从而实现缩短临界区的长度,避免阻塞其他线程向队列中添加新任务
void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
MutexLockGuard lock(mutex_);
functors.swap(pendingFunctors_); // 将任务队列与局部变量交换
}
for (const Functor& functor : functors) {
functor(); // 执行每一个任务
}
callingPendingFunctors_ = false;
}
跨线程的唤醒机制分析
- pipe机制
- 通过一个pipe来实现线程间的通信,例如Nginx中就是使用该方式,每个worker进程都监听自己的pipe端点,当其他线程想要唤醒该线程的时候,可以通过写入pipe,这样可以监听pipe的epoll_wait就会被唤醒
- eventfd机制
- pipe高效的替换方案,是一种更轻量级的文件描述符,可以在不同线程之间发送事件信号
- 当一个线程需要通知另一个线程的时候,可以向eventfd中写入一个值,这就会触发该eventfd的epoll_wait()返回,从而唤醒事件循环
- modou库就是典型的使用者
- bufferevent机制
- 多线程之间传递数据机制,一个线程可以写入数据到bufferevent,然后另一个线程通过监听bufferevent的读事件来获取数据
EventLoopThread类
modou库中提供这个类的作用就是允许我们在任意线程中创建并运行的EVentLoop,该类是一个封装了EventLoop和线程管理的类,不需要担心线程的管理细节
class EventLoopThread {
public:
EventLoopThread();
~EventLoopThread();
EventLoop* startLoop();
private:
void threadFunc();
EventLoop* loop_;
Thread thread_;
MutexLock mutex_;
Condition cond_;
};
TCP网络库
Accept class
Accept主要就是用于监听新连接并通知上层应用层,该类主要的作用就是用来管理服务端的监听套接字,并在新客户端连接超时时进行相应的处理
主要功能
- 初始化并设置监听套接字:创建一个非阻塞的TCP套接字,并通过bind()函数绑定服务器的IP地址和端口号
- 管理监听事件:当新连接到来的时候,Channel触发器回调函数处理新连接,然后通知EVentLoop对其进行处理
- RAII管理:通过RAII模式管理监听套接字的生命周期,确保套接字在对象析构的时候可以正确的关闭,避免资源泄漏
- 通过Channel处理I/O:所有的操作都通过Channel的回调函数进行处理
使用简单示例
void newConnection(int sockfd, const muduo::InetAddress& peerAddr) {
printf("New connection from %s\n", peerAddr.toHostPort().c_str());
::write(sockfd, "Hello, World!\n", 14); // 向新连接发送一条信息
muduo::sockets::close(sockfd); // 关闭连接
}
int main() {
muduo::InetAddress listenAddr(9981); // 监听9981端口
muduo::EventLoop loop;
muduo::Acceptor acceptor(&loop, listenAddr);
acceptor.setNewConnectionCallback(newConnection); // 设置新的连接回调
acceptor.listen(); // 开始监听
loop.loop(); // 事件循环开始
}
TcpServer接收新连接
处理新连接过程分析
- EventLoop事件循环
- 该类负责管理和监听所有的I/O事件,当有新事件到来的时候(也就是新连接到来),EventLoop就会调用相关的函数来处理
- Channel监听可读事件
- 一个Channel监听一个文件描述符状态,当Channel检测到套接字变成可读状态的时候,就会触发handleEvent()函数(因为之前已经在Channel中设置了回调函数,所以在使用的时候通过该函数进行调用即可)
- Accept处理新连接
- Accept::handleRead():当Channel回调触发的时候,就证明有新连接了,Acceptor通过调用其内部accept函数来处理新连接
- accept系统调用从监听套接字中提取客户端的连接请求,并返回一个新的套接字,然后使用该套接字与客户端进行通信
- newConn():在accept执行完成后,就会执行该函数创建一个新连接(在modou库中,这个类中有关于该连接的所有信息)
- TcpServer创建TcpConnection对象
- Acceptor通知TcpServer有新连接到来的时候,TcpServer会给这个新连接创建一个对应的TcpConnection对象(该对象封装了新连接的所有信息,其中包括读写处理、事件处理、回调等)
- 连接建立完成
- 连接创建就绪后,调用TcpConnection::established()执行后续的数据传输、读取或者写入
- connCb():这是一个连接回调函数,也就是连接建立完成后,就会自动执行的函数,主要任务就是通知应用层有新的连接到来了
TcpServer类的主要作用分析
- 管理新连接的创建:获取新连接,然后创建一个新的TcpConnection对象,该连接对象中拥有关于连接的所有信息
- 回调机制:很多操作都是通过回调函数完成,例如当新连接建立或者有信息到达的时候
- 线程安全和生命周期管理:因为连接对象的生命周期是由shared_ptr来管理,目的就是为了不让连接对象过早的销毁。TcpServer持有的连接也是通过其哈希表来存储的,用户同样也是访问这个连接哈希获取的活跃连接,通过这种方式,确保了连接在使用期间的线程安全
TcpConnection
作用分析
- 管理一个TCP连接,同时可以处理读写事件,管理套接字的数据传输
- 这是一个不可再生的TCP连接,连接一旦断开,这个连接对象就会销毁
- 其内部继承了enable_shared_form_this实现智能指针共享,这也就表明它是线程安全
状态管理
- kConnection:连接正在建立中
- kConnectioned:连接已经建立成功
数据传输
连接对象主要就是依靠Channel来监听和处理套接字上的读写事件,同时通过handleRead()函数来处理接收到的数据,当read()返回0的时候就是表示连接断开了
数据处理则主要是在连接建立后,通过连接回调机制将接收到的数据,传递给回调函数进行处理
TcpServer与TcpConnection之间的关联
- TcpServer类主要就是负责接受新连接,同时为每一个连接都创建一个连接对象,但是不管理连接对象的生命周期(由shared_ptr来进行管理),确保连接断开后对象可以自动销毁
- TcpServer会给连接对象设置相应的回调函数
TcpConnection断开连接
监听连接状态 -- 检测到断开 -- 执行关闭操作 -- 通知应用程序 -- 清理资源
- 事件监听:首先服务器会不断的监听客户端的请求,同时监视其连接状态。例如当客户端主动关闭连接,或者到了定时器设定的超时事件,服务器可以通过EventLoop机制来发现该连接不在活跃
- 检测连接是否已经关闭:服务器会去确认该连接是否真正的关闭了,也就是通过一次read操作返回0来确认。也就是说服务器会从这个连接读取数据,如果都读不到数据,自然也就证明了已经没有数据可读了,这也就意味着客户端已经断开的连接
- 开始关闭流程:服务器执行关闭流程,也就是在该连接的标记为准备关闭状态,同时进入关闭的执行流程
- 移除连接:服务器将这个断开的连接从活跃连接列表中移除,也就是告知服务器抛弃该连接
- 通知上层:服务器关闭后,会通知应用程序不需要在关注该连接了,也不需要处理其数据了
- 清理资源:服务器清理这个连接相关的资源,也就是关闭套接字、释放内存等操作
Channel类处理关闭连接回调的思路
关闭回调调用的场景则是客户端断开连接后,服务器需要知道该连接已经关闭并且释放相应的资源,其中会涉及到断开连接的情况主要有:套接字文件描述符被关闭了、连接管理器中的对象被移除了,触发了用户定义的回调函数机制
总体实现的逻辑
- Channel对象监视fd上的事件
- 如果发现连接关闭,那么PHLLHUP事件会被处罚
- handleEvent()检查PHLLHUP事件,同时调用关闭回调函数
- 在回调的函数执行逻辑中,也就会执行清理资源等操作
void Channel::handleEvent() {
eventHandling_ = true;
if ((revents_ & POLLHUP) && !(revents_ & POLLIN)) {
LOG_WARN << "Channel::handleEvent() POLLHUP";
if (closeCallback_) closeCallback_();
}
// 其他事件处理
eventHandling_ = false;
}
TcpConnection类中关闭连接处理逻辑
主要通过两个函数执行关闭逻辑,handleRead():当连接关闭的时候,read()返回0,触发handClose()函数 ;如果还有数据可读的时候,则调用上层消息回调处理逻辑
void TcpConnection::handleRead() {
char buf[65536];
ssize_t n = ::read(channel_->fd(), buf, sizeof buf);
if (n > 0) {
messageCallback_(shared_from_this(), buf, n);
} else if (n == 0) {
handleClose(); // 连接关闭,调用 handleClose
} else {
handleError(); // 错误处理
}
}
handClose()函数:主要就是先禁用事件监听,然后通过回调函数将连接移除出去,最后调用回调释放资源
void TcpConnection::handleClose() {
loop_->assertInLoopThread();
LOG_TRACE << "TcpConnection::handleClose state = " << state_;
assert(state_ == kConnected);
channel_->disableAll(); // 禁用所有事件监听
closeCallback_(shared_from_this()); // 调用 closeCallback 通知 TcpServer 移除连接
}
TcpServer类在创建爱连接的时候,就设置了关闭连接的回调函数
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) {
loop_->assertInLoopThread();
char buf[64];
snprintf(buf, sizeof buf, "#%d", nextConnId_);
++nextConnId_;
string connName = name_ + buf;
LOG_INFO << "TcpServer::newConnection [" << name_
<< "] - new connection [" << connName
<< "] from " << peerAddr.toIpPort();
InetAddress localAddr(sockets::getLocalAddr(sockfd));
TcpConnectionPtr conn(new TcpConnection(loop_, connName, sockfd, localAddr, peerAddr));
connections_[connName] = conn; // 将新建的连接加入 ConnectionMap
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setCloseCallback(
std::bind(&TcpServer::removeConnection, this, _1)); // 设置关闭回调
conn->connectEstablished(); // 设置连接已建立
}
removeConnection移除连接咯几,首先将其从连接管理的哈希表中移除,确保不会处理该连接的事件,最后清理该连接即可
void TcpServer::removeConnection(const TcpConnectionPtr& conn) {
loop_->assertInLoopThread();
LOG_INFO << "TcpServer::removeConnection [" << conn->name() << "]";
connections_.erase(conn->name()); // 从 ConnectionMap 中移除连接
loop_->queueInLoop(
std::bind(&TcpConnection::connectDestroyed, conn)); // 清理连接
}
Poller中removeChannel移除Channel对象实现逻辑
该函数的目标就是高效的移除监听事件,通过维护一个pollfds_数组和channels_哈希表,从而实现一个高效率移除操作,该种方式适用于大规模的并发操作上,可以减轻系统负担,提高服务器的响应速度
void Poller::removeChannel(Channel* channel) {
assertInLoopThread();
LOG_TRACE << "fd = " << channel->fd();
assert(channels_.find(channel->fd()) != channels_.end());
assert(channels_[channel->fd()] == channel);
assert(channel->isNoneEvent()); // 确保Channel已经没有事件要监听
int idx = channel->index();
assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));
const struct pollfd& pfd = pollfds_[idx];
assert(pfd.fd == channel->fd() || pfd.fd == -channel->fd()-1);
size_t n = channels_.erase(channel->fd()); // 从channels_表中移除
assert(n == 1);
if (implicit_cast<size_t>(idx) == pollfds_.size()-1) {
pollfds_.pop_back(); // 如果是最后一个元素,直接移除
} else {
int channelAtEnd = pollfds_.back().fd; // 用最后一个元素覆盖当前元素
iter_swap(pollfds_.begin()+idx, pollfds_.end()-1);
if (channelAtEnd < 0) {
channelAtEnd = -channelAtEnd-1;
}
channels_[channelAtEnd]->set_index(idx);
pollfds_.pop_back(); // 移除最后一个元素
}
}
Buffer缓冲区
Buffer作用理解
主要就是负责从网络中接收数据,用于确保完成收到的发送端发来的数据,Buffer可以理解成一个本子,可以帮助记下还没有完全收到的消息,等收到完整的消息后,再将其一起处理即可。
Buffer的主要功能就是高效的管理网络中的I/O数据,非阻塞机制则是用来缓存收到的数据。因为再非阻塞情况下,数据并不是一次性达到完成,而是不确定什么时候数据可以完全到达,所以需要设计一个缓冲区来存放这些临时数据,这也就是Buffer的租用
核心执行逻辑
- 当数据到达的时候,Buffer接收并存储这些数据信息
- 如果一次接收的数据是不完整的,则Bufferu会继续保存,直到接收完整的数据
- 等数据传输完整后,从Buffer中读取这些数据进行处理
TcpConnection使用Buffer作为输入缓冲区
首先是在连接类中,新增了Buffer作为成员变量,专门用于存储接收到的数据,也就是先把接收到的数据放入缓冲区中,然后找到合适的时机再对其进行统一的处理
其次TcpConnection::handleRead()则是主要负责从网络中读取数据的关键,在此也是通过Buffer接收缓存数据
void TcpConnection::handleRead(Timestamp receiveTime) {
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if (n > 0) {
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
} else if (n == 0) {
handleClose();
} else {
errno = savedErrno;
LOG_SYSERR << "TcpConnection::handleRead";
handleError();
}
}
OnMesssage()函数则是从连接读取接收到的内容,也就是读取不是直接从连接中读取,而是直接从缓冲区中读取
void onMessage(const muduo::TcpConnectionPtr& conn,
muduo::Buffer* buf,
muduo::Timestamp receiveTime) {
printf("onMessage(): received %zd bytes from connection [%s] at %s\n",
buf->readableBytes(),
conn->name().c_str(),
receiveTime.toFormattedString().c_str());
printf("onMessage(): [%s]\n", buf->retrieveAsString().c_str());
}
Buffer使用的优点
- 数据分段接收:也就是说数据可以分段进行接受,即使到达的数据不完整,也可以临时存放并等待下一次的接收数据
- 避免丢失重要数据:因为直接处理read()的数据可能会导致数据的丢失,通过Buffer缓存则可以避免该情况的发生
- 优化性能:减少I/O的频率,提高网络整体性能
详细的分析Buffer::readFd()函数的作用
ssize_t Buffer::readFd(int fd, int* savedErrno) {
char extrabuf[65536]; // 临时缓冲区,用于接收大于 Buffer 剩余空间的数据
struct iovec vec[2]; // 使用 scatter/gather IO,允许一次 read 调用写入两个缓冲区
const size_t writable = writableBytes(); // 获取 Buffer 中可写的字节数
// vec[0] 用来存储 Buffer 中的可写空间
vec[0].iov_base = begin() + writerIndex_;
vec[0].iov_len = writable;
// vec[1] 用来存储超出 Buffer 可写空间的数据
vec[1].iov_base = extrabuf;
vec[1].iov_len = sizeof(extrabuf);
const ssize_t n = readv(fd, vec, 2); // 一次性从 fd 中读取数据,写入两个缓冲区
if (n < 0) {
*savedErrno = errno; // 保存读取错误信息
} else if (implicit_cast<size_t>(n) <= writable) {
writerIndex_ += n; // 数据全写入 Buffer,无需使用 extrabuf
} else {
writerIndex_ = buffer_.size(); // Buffer 写满了,剩下的数据写入 extrabuf
append(extrabuf, n - writable); // 将 extrabuf 中的数据追加到 Buffer
}
return n; // 返回读取的字节数
}
重点分析
- 该方法使用了scatter/gather I/O技术
- 该技术就是在一次read()系统调用中,允许将数据写入多个非连续的缓冲区的技术
- 先将数据写入Buffer中剩余可用的空间中,然后将多余的数据写入到临时缓冲区中
- 可以理解为手中已经拿满了东西,其他东西没有地方放置了,就可以选择先将东西放入袋子中,等有时间的时候,再就爱那个数据放入箱子中
- 仅调用readv()
- 通过readv()调用一次I/O操作,就可以同时把数据写入到Buffer和临时缓冲区中,这样就避免的频繁的I/O操作,从而节省系统调用开销,并提高性能
- 例如有两袋东西要放上车,最好的方式还是一次性将物品都拿着在手里,而不是先拿一袋后再拿一袋
- 水平触发机制
TcpConnection 发送数据
发送数据逻辑分析
- 检查线程安全
- 首先就是检查当前的send()函数是否在I/O线程中(是否在当前Reactor线程中)
- 如果不再当前I/O线程中,则需要通过funInLoop()把任务分配到正确的线程中进行
- 直接发送数据
- 如果当前的socket是可写的,则sendInLoop()会直接调用write()函数尝试将数据发送出去
- 如果成功将所有数据发送完毕,那么整个发送操作就结束了
- 如果没有完全发送所有数据,那么就要将剩下的数据存入到outputBuffer_中
- 监听可写事件
- 如果数据还没有发送完,那么outputBuffer_中的数据会等待write事件触发
- 系统继续监听socket中的可写事件,当socket再次变成可写的时候,则触发其可写回调函数
- 处理可写函数
- 当socket变的可写的时候,handleWrite()函数会从outputBuffer_中取得数据,然后通过write()系统调用继续发送
- 如果outputBuffer_为空,那么就要停止监听write事件,避免出现不必要的循环
- 处理错误
- 如果在write()的过程中遇到错误,则会将数据存储在outputBuffer_,同时等待下次可写事件触发
代码逻辑事例
class TcpConnection {
public:
void send(const std::string& message) {
// 检查是否在 I/O 线程
if (loop_->isInLoopThread()) {
sendInLoop(message);
} else {
loop_->runInLoop([this, message]() { sendInLoop(message); });
}
}
private:
void sendInLoop(const std::string& message) {
ssize_t nwrote = 0;
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) {
// 尝试直接发送数据
nwrote = ::write(channel_->fd(), message.data(), message.size());
if (nwrote >= 0) {
if (implicit_cast<size_t>(nwrote) < message.size()) {
// 剩余数据写入缓冲区
outputBuffer_.append(message.data() + nwrote, message.size() - nwrote);
if (!channel_->isWriting()) {
// 开启写事件监听
channel_->enableWriting();
}
}
} else if (errno != EWOULDBLOCK) {
// 处理错误
handleError();
}
} else {
// 如果缓冲区已有数据,将当前数据加入缓冲区
outputBuffer_.append(message);
if (!channel_->isWriting()) {
// 开启写事件监听
channel_->enableWriting();
}
}
}
void handleWrite() {
// socket 可写时调用此函数
ssize_t n = ::write(channel_->fd(), outputBuffer_.peek(), outputBuffer_.readableBytes());
if (n > 0) {
outputBuffer_.retrieve(n);
if (outputBuffer_.readableBytes() == 0) {
// 停止写事件监听
channel_->disableWriting();
if (state_ == kDisconnecting) {
shutdownInLoop();
}
}
} else {
// 处理写入错误
handleError();
}
}
// 缓冲区
Buffer outputBuffer_;
EventLoop* loop_;
Channel* channel_;
enum StateE { kConnected, kDisconnecting };
StateE state_;
};
TcpConnection 连接状态转变分析
- 连接的生命周期是从Connecting 开始,一旦连接成功就会进入到connected状态,此时就可以进行正常数据的读写了
- 当调用shutdown()函数的时候,连接并不会立刻就断开了,而是先进入Disconnecting状态,这个状态表示系统在清理数据或者等待未完成的写操作的。
- 当数据发送完成后,连接就会进入Disconnected状态
- 连接同样也可以在Connected状态的时候,由对方关闭连接而直接进入Disconnected,表示连接终止
Connector
mudou库设计这个类的主要作用就是发起TCP连接,然后创建socket,发起连接请求,处理连接失败的重试,然后确保连接的建立
主要功能分析
- 发起连接:通过socket调用connect()方法,来与指定的服务器建立连接
- 管理连接的生命周期:Connector不仅需要发起连接,还需要处理错误情况,比如拒绝连接、网络错误等情况。如果要是连接失败,还需要不断尝试新的连接,直到成功为止
- 可重复使用:Connector是可以被重复使用的,直到连接成功
需要注意的重点
- socket是一次性的:Socket只要建立连接失败,就会重新打开一个socket,这也就是说,每次连接都会创建一个socket
- 错误处理:如果Connect()返回EAGAIN错误,就表示临时错误,也就是连接无法立即建立
- Connector会定时重试,直到连接成功,这也就涉及到了socket的状态管理方面的问题
- 连接超时和自连接:因为在尝试连接的过程中,Connector可能会遇到自连接的情况,也就是客户端和服务器使用的相同的源IP和目的IP地址以及相同的端口号,这就会导致自连接,Connector需要处理这种情况,也就是通过检测自连接来避免其发生
参考文章
- modou源码
- 《Linux多线程服务器编程:使用muduo C++ 网络库》 陈硕
- 《UNIX 网络编程1》
- Boost.Asio