当前位置: 首页 > article >正文

WebRTC服务质量(06)- 重传机制(03) NACK找到真正的丢包

WebRTC服务质量(01)- Qos概述
WebRTC服务质量(02)- RTP协议
WebRTC服务质量(03)- RTCP协议
WebRTC服务质量(04)- 重传机制(01) RTX NACK概述
WebRTC服务质量(05)- 重传机制(02) NACK判断丢包
WebRTC服务质量(06)- 重传机制(03) NACK找到真正的丢包

一、前言:

上一篇文章我们记录了丢包,但是这些包都是一些可疑丢包,本文我们就分析下如何从可疑丢包中找到真正丢包

二、OnReceivedPacket

先看下上一节的OnReceivedPacket函数:

int DEPRECATED_NackModule::OnReceivedPacket(uint16_t seq_num,
                                            bool is_keyframe,
                                            bool is_recovered) {
  // ...

  // 此函数非常重要(拿到这个包到上一次的包newest_seq_num_中间所有丢失的包)
  AddPacketsToNack(newest_seq_num_ + 1, seq_num);
  // 这个就把这次的序列号seq_num赋值给newest_seq_num_,注意回影响后面逻辑
  newest_seq_num_ = seq_num;

  // Are there any nacks that are waiting for this seq_num.
  // 判断真丢包还是假丢包(乱序),最终拿到真正的丢包
  std::vector<uint16_t> nack_batch = GetNackBatch(kSeqNumOnly);
  if (!nack_batch.empty()) {
    // This batch of NACKs is triggered externally; the initiator can
    // batch them with other feedback messages.
    // 告诉对端,到底有哪些包丢失了,需要你重传
    nack_sender_->SendNack(nack_batch, /*buffering_allowed=*/true);
  }

  return 0;
}

我们先分析下,什么情况下会走到这儿:

  • 不是第一个包,不进行初始化;

  • 不是一个重复的包;

  • 不是在newest_seq_num_之前的包(不是乱序包);

  • 不是一个恢复包;

  • 那么只剩下两种情况:

    • 这个包seq刚好是newest_seq_num_ + 1,
    • 这个包seq序号为:newest_seq_num_ 隔了好几个包(这种情况,将中间隔的那几个包都得插入到nack_list当中)

1)逻辑说明:

这个时候我们的AddPacketsToNack函数就天真地将所有跳过的包都加入重传列表nack_list当中,认为是丢包,实际上,我们知道了传输层UDP的特点就是不靠谱,都是分头行动,到目的地汇合,不会按照顺序来的,因此,我们首先要结合序列号时间戳排除的就是这种情况,这些逻辑都在GetNackBatch完成的。

三、GetNackBatch:

3.1、完整代码:

std::vector<uint16_t> DEPRECATED_NackModule::GetNackBatch(
    NackFilterOptions options) {
  bool consider_seq_num = options != kTimeOnly;
  bool consider_timestamp = options != kSeqNumOnly;
  Timestamp now = clock_->CurrentTime();

  std::vector<uint16_t> nack_batch;
  auto it = nack_list_.begin();
  while (it != nack_list_.end()) {
    TimeDelta resend_delay = TimeDelta::Millis(rtt_ms_);
    if (backoff_settings_) {
      resend_delay =
          std::max(resend_delay, backoff_settings_->min_retry_interval);
      if (it->second.retries > 1) {
        TimeDelta exponential_backoff =
            std::min(TimeDelta::Millis(rtt_ms_), backoff_settings_->max_rtt) *
            std::pow(backoff_settings_->base, it->second.retries - 1);
        resend_delay = std::max(resend_delay, exponential_backoff);
      }
    }

    bool delay_timed_out =
        now.ms() - it->second.created_at_time >= send_nack_delay_ms_;
    bool nack_on_rtt_passed =
        now.ms() - it->second.sent_at_time >= resend_delay.ms();
    bool nack_on_seq_num_passed =
        it->second.sent_at_time == -1 &&
        AheadOrAt(newest_seq_num_, it->second.send_at_seq_num);
    if (delay_timed_out && ((consider_seq_num && nack_on_seq_num_passed) ||
                            (consider_timestamp && nack_on_rtt_passed))) {
      nack_batch.emplace_back(it->second.seq_num);
      ++it->second.retries;
      it->second.sent_at_time = now.ms();
      if (it->second.retries >= kMaxNackRetries) {
        RTC_LOG(LS_WARNING) << "Sequence number " << it->second.seq_num
                            << " removed from NACK list due to max retries.";
        it = nack_list_.erase(it);
      } else {
        ++it;
      }
      continue;
    }
    ++it;
  }
  return nack_batch;
}

这个函数的作用是从 nack_list_(丢包列表)中筛选出当前需要发送 NACK 请求的包序号,并返回。

3.2、关键变量:

a)options:

  // 表示我们判断丢包是以seq_num作为判断条件
  bool consider_seq_num = options != kTimeOnly;
  // 表示我们判断丢包是以时间作为判断条件
  bool consider_timestamp = options != kSeqNumOnly;

optionsNackFilterOptions 枚举类型,它决定了判断丢包的条件:

  • kTimeOnly: 只根据时间戳判断。
  • kSeqNumOnly: 只根据序号先后顺序判断。
  • 其他值表示两种条件都考虑。

b)nack_list:

nack_list_:存储丢包信息的列表,类型是 std::list<NackInfo>,其中每个 NackInfo 包括以下关键字段:

  • created_at_time: 包的创建时间(加入丢包列表的时间)。
  • sent_at_time: 上次发送 NACK 的时间。
  • seq_num: 包序号。
  • retries: 发送的 NACK 次数(重试次数)。
  • send_at_seq_num: 期望 NACK 的序号。

c)backoff_settings_:

用于判断重试间隔的配置,可动态调整发送 NACK 请求的间隔时间,当 retries > 1 时,会使用指数型退避逻辑。

3.3、判断丢包的条件:

bool delay_timed_out = now.ms() - it->second.created_at_time >= send_nack_delay_ms_;
bool nack_on_rtt_passed = now.ms() - it->second.sent_at_time >= resend_delay.ms();
bool nack_on_seq_num_passed = it->second.sent_at_time == -1 && 
                              AheadOrAt(newest_seq_num_, it->second.send_at_seq_num);
  • delay_timed_out:超时判断

    • 延迟超时时间 = 现在的时间(now.ms()) - NackInfo创建的时间(it->second.created_at_time);
    • 表示当前包进入丢包列表的时间是否已经超过 send_nack_delay_ms_ 阈值(默认值为 0,即认为所有包都满足这个条件)。
  • nack_on_rtt_passed:是否超过一个RTT

    • resend_delay 会被动态调整,初始值是 RTT,如果设置了 backoff_settings_,会参与指数退避计算,延迟间隔可能变长。

    • 距离上次发送nack的时间 = 当前时间(now.ms()) - 上次nack发送时间(it->second.sent_at_time);

    • 判断 NACK 请求是否已经发送超过一个 RTT 的时间(避免频繁重传导致带宽浪费)。

  • nack_on_seq_num_passed:判断基于序号的优先级

    • it->second.sent_at_time == -1:上次发送nack时间为-1,说明是第一次发送nack;
    • AheadOrAt(newest_seq_num_, it->second.send_at_seq_num):且该包在前一次处理的包序号newest_seq_num_之前(注意和时间无关);
    • 之前没有请求过重传,并且,本次处理的包还是晚到的;

上面三个条件不能同时满足,就不能说明是真正的丢包。

3.4、重试和剔除策略:

如果满足上述条件,NACK 请求会加入结果列表,并更新重试次数和发送时间,但会区分以下两种情况:

  1. 重试次数 +1:

    ++it->second.retries;
    it->second.sent_at_time = now.ms();
    
    • 将当前时间更新为 NACK 包的发送时间。
  2. 超出最大尝试次数,移除包:

    if (it->second.retries >= kMaxNackRetries) {
        RTC_LOG(LS_WARNING) << "Sequence number " << it->second.seq_num
                            << " removed from NACK list due to max retries.";
        it = nack_list_.erase(it);
    } else {
        ++it;
    }
    
    • 如果重试次数 retries 超过了最大重试限制(kMaxNackRetries,通常为10),认为包已经完全丢失,无需再申请重传,将其从丢包表中剔除。

3.4、Backoff 重试间隔的指数退避逻辑:

如果设置了 backoff_settings_,重试间隔会动态调整,初始值为 RTT (resend_delay),之后会根据尝试次数增加:

TimeDelta exponential_backoff =
    std::min(TimeDelta::Millis(rtt_ms_), backoff_settings_->max_rtt) *
    std::pow(backoff_settings_->base, it->second.retries - 1);
resend_delay = std::max(resend_delay, exponential_backoff);
  • 初始间隔最小为 RTT 时间。
  • 随着重传次数增加,使用 backoff_settings_->base 指数增长(遇到高丢包率时缓解重传风暴)。
  • 间隔时间不会超过 backoff_settings_->max_rtt

四、Process:

Process() 是 WebRTC 中用于周期性执行丢包检测和 NACK 发送的关键函数。它的主要功能是通过时间戳判断 RTP 包是否需要申请重传,并调用 GetNackBatch 函数生成符合发送 NACK 请求的丢包序号列表,然后通过 nack_sender_ 发送 NACK 消息。

4.1、完整代码:

void DEPRECATED_NackModule::Process() {
  if (nack_sender_) {
    std::vector<uint16_t> nack_batch;
    {
      MutexLock lock(&mutex_);
      nack_batch = GetNackBatch(kTimeOnly);
    }

    if (!nack_batch.empty()) {
      // This batch of NACKs is triggered externally; there is no external
      // initiator who can batch them with other feedback messages.
      nack_sender_->SendNack(nack_batch, /*buffering_allowed=*/false);
    }
  }

  // Update the next_process_time_ms_ in intervals to achieve
  // the targeted frequency over time. Also add multiple intervals
  // in case of a skip in time as to not make uneccessary
  // calls to Process in order to catch up.
  int64_t now_ms = clock_->TimeInMilliseconds();
  if (next_process_time_ms_ == -1) {
    next_process_time_ms_ = now_ms + kProcessIntervalMs;
  } else {
    next_process_time_ms_ = next_process_time_ms_ + kProcessIntervalMs +
                            (now_ms - next_process_time_ms_) /
                                kProcessIntervalMs * kProcessIntervalMs;
  }
}

4.2、分步解析:

  1. 调用 GetNackBatch 生成丢包序号列表(时间戳判断)

    nack_batch = GetNackBatch(kTimeOnly);
    
    • 使用 kTimeOnly 参数调用 GetNackBatch,只根据“时间逻辑”进行丢包检测,忽略序号逻辑。
    • GetNackBatch 函数会从维护的丢包列表(nack_list_)中筛选出满足条件的包(即超时等待的包),并返回这些包的序号列表。
  2. 检查是否有需要发送的丢包序号

    if (!nack_batch.empty()) {
      nack_sender_->SendNack(nack_batch, /*buffering_allowed=*/false);
    }
    
    • 如果返回的 nack_batch 非空,调用 nack_sender_->SendNack 发送 NACK 消息,即通知远端媒体流发送端重传这些丢包。
    • 注意,其中的参数 buffering_allowed=false 表明这些 NACK 消息不需要进行额外的合并操作,立即发送,因为产生的 NACK 是周期性检测触发的,不是等待特定操作(例如缓冲区满)导致的批量合并。
  3. 更新下次执行时间 next_process_time_ms_

    int64_t now_ms = clock_->TimeInMilliseconds();
    if (next_process_time_ms_ == -1) {
      next_process_time_ms_ = now_ms + kProcessIntervalMs;
    } else {
      next_process_time_ms_ = next_process_time_ms_ + kProcessIntervalMs +
                              (now_ms - next_process_time_ms_) /
                                  kProcessIntervalMs * kProcessIntervalMs;
    }
    
    • 第一次执行时,初始化 next_process_time_ms_,定时下一次调用。
    • 如果已经执行过,则更新下一次调用时间,按照多周期间隔(kProcessIntervalMs)计算,即即使当前周期有跳过(例如时钟误差或者系统负载问题导致执行延迟),也会校准时间。

备注:计算偏差周期:

  • 首先,计算当前时间距离上一次预定执行时间 next_process_time_ms_ 的实际时间差,即 (now_ms - next_process_time_ms_)
  • 然后,将这个时间差除以周期间隔 kProcessIntervalMs,得到一个小数值,表示实际时间差相对于周期间隔的倍数。
  • 接着,将这个小数值乘以周期间隔 kProcessIntervalMs,得到实际时间差相对于周期间隔的整数倍数。
  • 最后,将这个整数倍数加上一个周期间隔 kProcessIntervalMs,即 next_process_time_ms_ + kProcessIntervalMs,以确保下一次执行时间在正确的周期间隔内。这个操作的目的是将下一次执行时间调整为当前时间点之后的一个完整周期。

通过这种方式,即使由于时钟误差或系统负载问题导致上次执行时间和当前时间之间存在偏差,也能保证下一次执行时间能够在正确的周期间隔内进行调整,以维持周期性执行的准确性。

4.3、Process的优势

Process() 是 WebRTC 中基于时间驱动的丢包检测和恢复的核心设计部分,其优势在于:

  1. 周期性执行保证实时性
    • Process() 每隔固定时间间隔被回调一次,确保丢包检测有足够的实时性,在高实时要求的 WebRTC 音视频场景中尤为重要。
  2. 多条件整合
    • Process() 不仅通过时间戳判断丢包,还可以封装其他判断逻辑(例如接收到的序号统计、丢包率分析等),高度模块化的设计使逻辑易于扩展。
  3. 动态调整节奏
    • 如果系统负载导致回调函数延迟,Process() 通过重新计算 next_process_time_ms_,动态调整后续执行时间,确保整体执行频率和节奏不受干扰。
  4. 与 NACK 发送模块的配合
    • Process() 不仅检测丢包,还直接推动 NACK 消息的发送,它与 nack_sender_ 完全集成,确保检测和发送一步到位,而不是分散在不同模块中增加复杂度。

五、总结:

本文主要介绍了从可疑丢包中找到真正丢包的逻辑,我们也看出来,webrtc不是简单的丢了包就要求重传,还对序列号跨度,超时时间等做了限制,这样才不会导致重传风暴。


http://www.kler.cn/a/448655.html

相关文章:

  • R9000P键盘失灵解决办法
  • WPSJS:让 WPS 办公与 JavaScript 完美联动
  • Linux 中 grep、sed、awk 命令
  • 登山第十六梯:深度恢复——解决机器人近视问题
  • Leetcode中最常用的Java API——util包
  • Python tkinter写的《电脑装配单》和 Html版 可打印 可导出 excel 文件
  • Linux之压缩解压相关命令
  • 网上球鞋竞拍系统|Java|SSM|VUE| 前后端分离
  • MVCC了解
  • 2024 高级爬虫笔记(四)协程、selenium
  • 11爬虫:使用requests和selenium分别抓取4399网页游戏名称
  • LeetCode 35. 搜索插入位置 (C++实现)
  • 12.18 web后端开发——数据库
  • 【代码随想录】刷题记录(61)-二叉搜索树中的众数
  • 【Java入门指南 Day12:Java集合框架】
  • PostgreSQL和Postgis安装
  • 正反向代理 Nginx简单使用
  • 麒麟操作系统服务架构保姆级教程(三)ssh远程连接
  • 【从零开始的LeetCode-算法】3285. 找到稳定山的下标
  • LeetCode:1387. 将整数按权重排序(记忆化搜索 Java)
  • 【漏洞复现】CVE-2023-29944 Expression Injection
  • React:闭包陷阱产生和解决
  • 前端面经每日一题Day18
  • 八字精批API接口PHP实现返回json数据
  • GESP CCF C++一级编程等级考试认证真题 2024年12月
  • 银行转账虚拟生成器app银行转账模拟器银行模拟器 手机银行模拟器