当前位置: 首页 > article >正文

muduo库TcpConnection类源码解析——链接管理

目录

引言

TcpConnection类定义的数据

链接的状态

链接管理与事件循环

Socket与Channel

链接的缓冲区

发送数据

一些成员函数的功能

        关闭链接


引言

muduo库EventLoop类源码解析-CSDN博客

muduo库Buffer类源码解析-CSDN博客

        上述两篇文章讲解了muduo库两个比较重要的模块——事件循环和缓冲区。而这篇文章讲解muduo库另一个重要的模块——链接管理。TcpConnection类是代码最多的一个类,我会从源码的角度带大家理解muduo库是怎么管理链接的。

        TcpConnection类管理的是通信套接字,他是专门管理如何和客户端进行IO通信的类,封装了各种事件的回调,链接的状态,链接在用户级的缓冲区,事件管理等。

TcpConnection类定义的数据

  enum StateE { kDisconnected,  //断开连接状态
                kConnecting,    //尝试建立连接的状态
                kConnected,     //成功建立连接的状态
                kDisconnecting  //正在断开连接的状态
                };
  EventLoop* loop_; //把一个链接挂接到EventLoop上进行事件监控
  const string name_; //链接的名字
  StateE state_;  //链接的状态  // FIXME: use atomic variable
  bool reading_;  //为true表示链接正在进行读事件监控
  // we don't expose those classes to client.
  std::unique_ptr<Socket> socket_;
  std::unique_ptr<Channel> channel_;
  const InetAddress localAddr_;                    //IP地址的简单封装
  const InetAddress peerAddr_;
  ConnectionCallback connectionCallback_;           //新建链接的回调
  MessageCallback messageCallback_;                 //消息到来的回调
  WriteCompleteCallback writeCompleteCallback_;     //低水位线回调
  HighWaterMarkCallback highWaterMarkCallback_;     //高水位线回调
  CloseCallback closeCallback_;
  size_t highWaterMark_;
  Buffer inputBuffer_;  //客户代码从 inputBuffer_ 读,
  Buffer outputBuffer_; //客户代码往 outputBuffer_ 写 // FIXME: use list<Buffer> as output buffer.
  boost::any context_;  //链接的上下文


typedef std::shared_ptr<TcpConnection> TcpConnectionPtr;  //用智能指针管理链接

链接的状态

        kConnected表示成功建立链接,在链接建立之前需要有一个kConnecting这个表示在底层已经和客户端完成握手,而且已经拿到了通信的文件描述符,但在用户层需要给链接各种初始化,初始化成功就是kConnected状态。

        对应的,链接成功关闭之前还有一个正在关闭链接的状态,这个状态表示链接正在做一些善后工作,比如缓冲区还有数据没处理,还要是否资源等等。

链接管理与事件循环

        EventLoop* loop_每个链接都指向一个事件循环,这种指向是单向的,EventLoop感受不到链接的存在,也不知道有多少个链接挂到了自己身上,EventLoop只负责处理事件。而链接需要通过loop_把事件抛给EventLoop。

Socket与Channel

        Channel是管理文件描述符的各种事件,以及事件的回调,这个类我在EventLoop类源码解析里提过。

        Socket就是对地址套接字编程原生接口的封装,下面是关于Socket接口和接口的功能

  //获取文件描述符
  int fd() const { return sockfd_; }
  // return true if success.
  //获取TCP链接的各项状态
  bool getTcpInfo(struct tcp_info*) const;
  //获取TCP链接的各项状态
  bool getTcpInfoString(char* buf, int len) const;
  //绑定
  void bindAddress(const InetAddress& localaddr);
  /// abort if address in use
  //监听
  void listen();
  //获取通信文件描述符,非阻塞
  int accept(InetAddress* peeraddr);
  //关闭通信文件描述符的写操作
  void shutdownWrite();
  /是否降低延时,当 TCP_NODELAY 选项被设置为 1 时,Nagle 算法被禁用,这意味着小的数据包会立即被发送
  void setTcpNoDelay(bool on);
  ///是否开启地址和端口重用,防止TIME_WAIT状态而不能快速的重启服务器
  void setReuseAddr(bool on);
  /是否允许在相同的地址和端口上启动多个工作进程
  void setReusePort(bool on);
  //是否启动功能——如果连接在一段时间内没有数据传输,TCP栈将自动发送这些探测包
  void setKeepAlive(bool on);

作为和客户端通信的文件描述符,不用监听和绑定接口。

链接的缓冲区

        inputBuffer_是读缓冲区,outputBuffer_是写缓冲区,这里的读和写是相对于用户来说的。在链接内部是从链接的写缓存区读数据,把读出来的数据写入到TCP发送缓冲区,链接内部是从TCP接收缓冲区读数据,读出来的数据写入读缓冲区。

        关于为什么要有用户级缓冲区,我在Buffer类源码解析有过阐述。

发送数据

        TcpConnection::send 接口是发送数据,数据会放到outputBuffer_缓冲区中,发送数据时,链接必须是kConnected状态。send接口会判断是否是和EventLoop绑定的IO线程在执行该函数,如果是,直接调用TcpConnection::sendInLoop发送数据,如果不是,会把TcpConnection::sendInLoop抛入到EventLoop的任务队列中。(TcpConnection::sendInLoop是私有成员函数)

void TcpConnection::send(const StringPiece& message)
{
  //链接必须是建立成功状态
  if (state_ == kConnected)
  {
    if (loop_->isInLoopThread())
    {
      sendInLoop(message);
    }
    else
    {
      void (TcpConnection::*fp)(const StringPiece& message) = &TcpConnection::sendInLoop;
    //把TcpConnection::sendInLoop抛入到EventLoop的任务队列中  
    loop_->runInLoop(
          std::bind(fp,
                    this,     // FIXME
                    message.as_string()));
                    //std::forward<string>(message)));
    }
  }
}

        如果outputBuffer_缓冲区中没有数据,TcpConnection::sendInLoop会向TCP发送缓冲区发送数据,而不是直接保存在outputBuffer_缓冲区中,此时会有三种情况

1. 数据全部发送成功,直接返回

2.数据部分发送成功,把剩余数据放到outputBuffer_缓冲区中,启动写事件监控,让链接的写回调处理下次的发送

3.出错了,可能客户端把链接关闭了,也可能收到信号了。

        那如果outputBuffer_缓冲区中本来就有数据呢?就不能直接向TCP发送缓冲区发送数据了,这样会让数据乱序。直接把数据发到outputBuffer_缓冲区中,然后启动写事件监控。

        我们来看下源码的实现,我加了一些注释,方便大家理解

void TcpConnection::sendInLoop(const StringPiece& message)
{
  sendInLoop(message.data(), message.size());
}

void TcpConnection::sendInLoop(const void* data, size_t len)
{
  loop_->assertInLoopThread();
  ssize_t nwrote = 0;           //向TCP缓冲区写了多少数据
  size_t remaining = len;       //还剩多少要发送的数据
  bool faultError = false;      //向TCP缓冲区发送数据的时候是否出错了
  //如果链接是断开状态,直接返回
  if (state_ == kDisconnected)
  {
    LOG_WARN << "disconnected, give up writing";
    return;
  }
  // if no thing in output queue, try writing directly
  //连接必须触发了写事件,并且写缓冲区没有数据
  //这里必须判断outputBuffer_里是没有数据的才可以尝试向TCP缓冲区发送数据,不然会使数据乱序
  if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
  {
    //尝试直接向TCP缓冲区发送数据
    nwrote = sockets::write(channel_->fd(), data, len);
    if (nwrote >= 0)
    {
      remaining = len - nwrote;
      if (remaining == 0 && writeCompleteCallback_)
      {
        //放到具有线程安全的任务队列,缓冲区为0,触发低水位回调
        loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
      }
    }
    else // nwrote < 0
    {
      nwrote = 0;
      //尝试从一个非阻塞套接字读取数据时,如果没有数据可读,函数可能会返回 -1 并设置 errno 为 EWOULDBLOCK
      if (errno != EWOULDBLOCK)
      {
        LOG_SYSERR << "TcpConnection::sendInLoop";
        /*EPIPE 是一个错误码,用于指示写入端已经关闭的管道(pipe)或FIFO(命名管道)。例如,当进程试图写入一个它已经不再拥有写权限的管道时,可能会发生这种错误。
        ECONNRESET 是一个错误码,用于指示在传输过程中连接被对方重置。在网络编程中,当TCP连接被对方强制关闭时,可能会遇到这个错误。*/
        if (errno == EPIPE || errno == ECONNRESET) // FIXME: any others?
        {
          faultError = true;
          
        }
      }
    }
  }

  assert(remaining <= len);
  if (!faultError && remaining > 0)
  {
    //进入这个代码块里说明,前面向TCP写入数据成功了,并且只写入了一部分数据,剩下的数据需要保存到缓冲区中,然后启动写事件监控
    //也可能是因为outputBuffer_里有数据,没有尝试向TCP缓冲区里发送数据
    size_t oldLen = outputBuffer_.readableBytes();
    if (oldLen + remaining >= highWaterMark_
        && oldLen < highWaterMark_
        && highWaterMarkCallback_)
    {
      //这里说明缓冲区的数据大小超过了高水位线,并且用户设置高水位线的回调
      loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
    }
    //把数据保存到缓冲区
    outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);
    //启动写事件监控
    if (!channel_->isWriting())
    {
      channel_->enableWriting();
    }
  }
}

        上述代码中还出现了水位线的概念,在muduo库中,如果outputBuffer_缓冲区数据为0且设置了低水位线回调就会调用低水位线回调,如果在发送的时候outputBuffer_缓冲区数据超过了设置的高水位线且设置了高水位线回调,就会调用高水位线回调。

一些成员函数的功能

  //关闭链接,用loop的queueInLoop把forceCloseInLoop抛入到具有线程安全的任务队列中,forceCloseInLoop又会去调用链接关闭回调
  void forceClose();
  //等待seconds秒关闭链接,如果不是Event Loop绑定的IO线程,这个方法也会被抛入到任务队列中
  void forceCloseWithDelay(double seconds);
  //是否降低延时,设置TCP_NODELAY,即使小数据包也会被立即发送出去
  void setTcpNoDelay(bool on);
  //启动读事件监控,会调用runInLoop来保证线程安全
  void startRead();
  //停止读事件监控
  void stopRead();
  //是否在进行读事件监控
  bool isReading() const { return reading_; }; // NOT thread safe, may race with start/stopReadInLoop
  //设置链接上下文
  void setContext(const boost::any& context)
  { context_ = context; }
  //获取链接上下文
  const boost::any& getContext() const
  { return context_; }
  //获取链接上下文
  boost::any* getMutableContext()
  { return &context_; }
  //设置链接的链接建立回调
  void setConnectionCallback(const ConnectionCallback& cb)
  { connectionCallback_ = cb; }
  //设置链接的消息到来的回调
  void setMessageCallback(const MessageCallback& cb)
  { messageCallback_ = cb; }
  //设置低水位回调,如果发送缓冲区被清空就去调用它,低水位回调会在send()和handleWrite()的时候会触发(读事件回调)
  void setWriteCompleteCallback(const WriteCompleteCallback& cb)
  { writeCompleteCallback_ = cb; }
  //设置高水位回调,和高水位线,高水位回调在send()发送数据的时候会触发
  void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark)
  { highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; }
  //获取读缓冲区
  Buffer* inputBuffer()
  { return &inputBuffer_; }
  //获取写缓冲区
  Buffer* outputBuffer()
  { return &outputBuffer_; }
  //设置链接关闭回调
  void setCloseCallback(const CloseCallback& cb)
  { closeCallback_ = cb; }

        关闭链接

TcpConnection::shutdown 其实是关闭了链接的写操作,通过EventLoop::runInLoop确保是IO线程执行TcpConnection::shutdownInLoop,最终会调用::shutdown(sockfd, SHUT_WR)关闭文件描述发的写操作。

//关闭通信文件描述符的写操作
void TcpConnection::shutdown()
{
  // FIXME: use compare and swap
  if (state_ == kConnected)
  {
    //设置链接状态
    setState(kDisconnecting);
    // FIXME: shared_from_this()?
    loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this));
  }
}
//关闭通信文件描述符的写操作
void TcpConnection::shutdownInLoop()
{
  loop_->assertInLoopThread();
  if (!channel_->isWriting())
  {
    // we are not writing
    socket_->shutdownWrite();
  }
}


http://www.kler.cn/a/373787.html

相关文章:

  • 大数据技术实训:Hadoop完全分布式运行模式配置
  • 数学建模学习(130):使用Python基于模糊TOPSIS算法的多准则决策分析
  • 聆听用户声音的3个方法,挖掘客户真实潜在需求
  • 24年10月Google Play政策更新通知
  • 字符串逆序(c语言)
  • 在Android开发中实现静默拍视频
  • 数智时代:以低代码开发为催化剂 加速中国制造转型升级
  • 适配器模式适用的场景
  • PostgreSQL用load语句加载插件
  • Spring 设计模式之装饰器模式
  • Vue组件学习 | 八、 v-bind指令
  • Spring Boot 集成 RabbitMQ
  • linux上使用scp从windows往linux传数据
  • 易优cms webshell
  • STM32 第17章 EXIT--外部中断/事件控制器
  • 【福建医科大学附属第一医院-注册安全分析报告】
  • nvm 版本管理工具
  • 【Python爬虫实战】网络爬虫的完整指南:基础、工作原理与实战
  • 成都云腾五洲科技“智联引擎”服务平台已发布
  • 明达云:赋能化工园区,智绘安全高效新蓝图
  • 道路安全员题库分享