WebRTC服务质量(10)- Pacer机制(02) RoundRobinPacketQueue
WebRTC服务质量(01)- Qos概述
WebRTC服务质量(02)- RTP协议
WebRTC服务质量(03)- RTCP协议
WebRTC服务质量(04)- 重传机制(01) RTX NACK概述
WebRTC服务质量(05)- 重传机制(02) NACK判断丢包
WebRTC服务质量(06)- 重传机制(03) NACK找到真正的丢包
WebRTC服务质量(07)- 重传机制(04) 接收NACK消息
WebRTC服务质量(08)- 重传机制(05) RTX机制
WebRTC服务质量(09)- Pacer机制(01) 流程概述
WebRTC服务质量(10)- Pacer机制(02) RoundRobinPacketQueue
WebRTC服务质量(11)- Pacer机制(03) IntervalBudget
WebRTC服务质量(12)- Pacer机制(04) 向Pacer中插入数据
一、前言:
RoundRobinPacketQueue
是Pacer模块当中一个非常重要的循环队列模块,主要目的是在多流场景中,根据每个流的 优先级 以及流内的 RTP 包的 入队顺序、类型 和其他因素,动态决定数据包的发送顺序。
二、类关系图:
2.1、类图:
关键部分加了注释,便于理解:
- 多流支持: 每个流按 SSRC(Synchronization Source Identifier,同步信源标识符)进行区分,每个流维护独立的优先级队列。
- 多级优先级调度: 不同流间按优先级调度(基于
StreamPrioKey
),单个流内的包按严格的队列顺序管理。 - 静态与动态属性结合: 例如,属性包括每个包的优先级(
priority
)、入队时间(enqueue_time
)、重传标志、带宽开销等。 - 分组调度: 使用轮询(Round-Robin)机制,从不同流根据流的权重和优先级选取包发送,提高音频、视频和反馈等多种类型数据的实时性。
2.2、重要成员关系:
- stream_priorities当中存放StreamPrioKey和ssrc对应关系;
- streams当中存放ssrc和Stream的对应关系;
- 这样就建立了StreamPrioKey和Stream之间的关系;
- 反过来,也可以通过Stream当中的priority_it找到stream_priorities当中的某一项;
小结下:重要成员变量的功能
成员变量 | 功能 |
---|---|
streams_ | 保存所有流,键是流的 ssrc ,每条流存有独立队列和优先级信息。 |
stream_priorities_ | 将所有流按照 StreamPrioKey 排列,用于实现流间优先级调度。 |
enqueue_times_ | 一个多重集合,保存所有包的入队时间,便于快速找到最早入队的包时间。 |
size_packets_ 和 size_ | 分别记录队列中包的个数和总字节数,动态调整。 |
transport_overhead_per_packet_ | 计算包传输的额外开销(如包头)。 |
time_last_updated_ | 用于统计队列的更新时间,辅助计算入队和等待时间等。 |
三、重要函数:
3.1、Push
Push
会根据包的优先级、流的权重、总队列大小,以及包的类型等,将包插入到对应的流(Stream
)队列,并更新其他与队列状态关联的元数据。总体思路如下:
处理一个新包时:
- 确定该包所属的流,如果不存在该流,则创建一个
Stream
对象。 - 将包包装成为
QueuedPacket
并插入到流的优先级队列。 - 更新流的优先级键
StreamPrioKey
,并在stream_priorities_
中重新排序。 - 更新队列元数据(如队列包大小、队列时间等)。
void RoundRobinPacketQueue::Push(int priority,
Timestamp enqueue_time,
uint64_t enqueue_order,
std::unique_ptr<RtpPacketToSend> packet) {
RTC_DCHECK(packet->packet_type().has_value());
if (size_packets_ == 0) {
// Single packet fast-path.
single_packet_queue_.emplace(
QueuedPacket(priority, enqueue_time, enqueue_order,
enqueue_times_.end(), std::move(packet)));
UpdateQueueTime(enqueue_time);
single_packet_queue_->SubtractPauseTime(pause_time_sum_);
size_packets_ = 1;
size_ += PacketSize(*single_packet_queue_);
} else {
MaybePromoteSinglePacketToNormalQueue();
Push(QueuedPacket(priority, enqueue_time, enqueue_order,
enqueue_times_.insert(enqueue_time), std::move(packet)));
}
}
void RoundRobinPacketQueue::Push(QueuedPacket packet) {
auto stream_info_it = streams_.find(packet.Ssrc());
if (stream_info_it == streams_.end()) {
stream_info_it = streams_.emplace(packet.Ssrc(), Stream()).first;
stream_info_it->second.priority_it = stream_priorities_.end();
stream_info_it->second.ssrc = packet.Ssrc();
}
Stream* stream = &stream_info_it->second;
if (stream->priority_it == stream_priorities_.end()) {
RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
stream->priority_it = stream_priorities_.emplace(StreamPrioKey(packet.Priority(), stream->size), packet.Ssrc());
} else if (packet.Priority() < stream->priority_it->first.priority) {
stream_priorities_.erase(stream->priority_it);
stream->priority_it = stream_priorities_.emplace(StreamPrioKey(packet.Priority(), stream->size), packet.Ssrc());
}
if (packet.EnqueueTimeIterator() == enqueue_times_.end()) {
packet.UpdateEnqueueTimeIterator(enqueue_times_.insert(packet.EnqueueTime()));
} else {
UpdateQueueTime(packet.EnqueueTime());
packet.SubtractPauseTime(pause_time_sum_);
size_packets_ += 1;
size_ += PacketSize(packet);
}
stream->packet_queue.push(packet);
}
- 从streams中找packet所属的Ssrc的stream,如果没有,则在streams中插入一项;
- 查看stream的priority_it是否等于stream_priorities_的end()
- 如果相等,则在stream_priorities_中插入一项;
- 否则,如果新包的优先高(注意,优先级数值越小表示优先级越高),则更新其ssrc对应队列的优先级;
- 更新队列总时长;
- 入队时间减去暂停时间(一般不会有暂停);
- 队列总包数+1;
- 队列总字节大小 = 包的负载大小+Padding大小;
- 插入到stream中的packet_queue中;
3.2、Pop
Pop
会轮询不同的流并从当前优先级最高的流中取出一个包发送,同时维护包的发送顺序。总体思路如下:
- 调用
GetHighestPriorityStream
获取当前优先级最高的流。 - 从该流的优先级队列(
PriorityPacketQueue
)中取出队首包,并更新流的状态(如剩余大小、时间等)。 - 如果该流没有剩余包,删除对应的流优先级键。
std::unique_ptr<RtpPacketToSend> RoundRobinPacketQueue::Pop() {
if (single_packet_queue_.has_value()) {
RTC_DCHECK(stream_priorities_.empty());
std::unique_ptr<RtpPacketToSend> rtp_packet(
single_packet_queue_->RtpPacket());
single_packet_queue_.reset();
queue_time_sum_ = TimeDelta::Zero();
size_packets_ = 0;
size_ = DataSize::Zero();
return rtp_packet;
}
RTC_DCHECK(!Empty());
Stream* stream = GetHighestPriorityStream();
const QueuedPacket& queued_packet = stream->packet_queue.top();
stream_priorities_.erase(stream->priority_it);
// Calculate the total amount of time spent by this packet in the queue
// while in a non-paused state. Note that the |pause_time_sum_ms_| was
// subtracted from |packet.enqueue_time_ms| when the packet was pushed, and
// by subtracting it now we effectively remove the time spent in in the
// queue while in a paused state.
TimeDelta time_in_non_paused_state =
time_last_updated_ - queued_packet.EnqueueTime() - pause_time_sum_;
queue_time_sum_ -= time_in_non_paused_state;
RTC_CHECK(queued_packet.EnqueueTimeIterator() != enqueue_times_.end());
enqueue_times_.erase(queued_packet.EnqueueTimeIterator());
// Update |bytes| of this stream. The general idea is that the stream that
// has sent the least amount of bytes should have the highest priority.
// The problem with that is if streams send with different rates, in which
// case a "budget" will be built up for the stream sending at the lower
// rate. To avoid building a too large budget we limit |bytes| to be within
// kMaxLeading bytes of the stream that has sent the most amount of bytes.
DataSize packet_size = PacketSize(queued_packet);
stream->size =
std::max(stream->size + packet_size, max_size_ - kMaxLeadingSize);
max_size_ = std::max(max_size_, stream->size);
size_ -= packet_size;
size_packets_ -= 1;
RTC_CHECK(size_packets_ > 0 || queue_time_sum_ == TimeDelta::Zero());
std::unique_ptr<RtpPacketToSend> rtp_packet(queued_packet.RtpPacket());
stream->packet_queue.pop();
// If there are packets left to be sent, schedule the stream again.
RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
if (stream->packet_queue.empty()) {
stream->priority_it = stream_priorities_.end();
} else {
int priority = stream->packet_queue.top().Priority();
stream->priority_it = stream_priorities_.emplace(
StreamPrioKey(priority, stream->size), stream->ssrc);
}
return rtp_packet;
}
-
获得优先级最高的stream;
-
从stream的packet_queue当中取出第一个包;
-
将stream在
stream_priorities_
中的项删除;-
思考下虽然包删除了,但是stream还在,为啥要删除?
答案:因为
stream_priorities_
是multimap,允许出现相同key,也就是说,这个包没有,它的stream优先级全靠它撑着(因为是先pop优先级高的包),所以,现在不应该由它撑着了,换下一个优先级最高的包撑着。
-
-
计算Packet入队后距离现在的时间(不包括暂停时间);
-
将这段时间从队列的总时间减去;
-
从
enqueue_times_
中将Packet的项删除; -
总包数减去1;
-
总字节数减去包的字节数;
-
将包从stream中的queue中弹出;
-
如果stream中的队列为空,则令
stream的priority_it
指向stream_priorities_
的end()
; -
否则,从stream队列头部取出Packet,将该Packet的priority插入到
stream_priorities_
中;
四、优先级调度:
-
每个流有单独的优先级队列(
PriorityPacketQueue
),保存QueuedPacket
对象。 -
QueuedPacket
是一个包装类,表示单个 RTP 数据包及其附属信息(例如入队时间、优先级、是否为重传包等)。 -
列表中的流按
StreamPrioKey
保存在stream_priorities_
中,通过此键决定流次序:struct StreamPrioKey { int priority; // 流的优先级,数值越低,优先级越高 DataSize size; // 数据包大小,用于平衡负载 };
优先规则:优先级低(priority 值小) > 数据包大小小(size 值小)。
五、轮询调度:
- 核心逻辑通过
RoundRobin
的方式轮询多个流。但由于流可能有不同的优先级,某些流会被更多次轮询到。 - 函数
GetHighestPriorityStream
定位当前最高优先级的流,从流对应的队列中取得包然后发送。
六、Stream定义与管理:
每个流由 Stream
类表示,Stream
是该流独有的数据结构,包括:
- 当前队列状态,如总字节数、包大小和优先级。
- 内部维护单独的优先级队列(
PriorityPacketQueue
)。 - 调度时实时更新优先级,确保新加入高优先级包时能调整队列次序。
七、其他特性:
7.1、重传支持:
- 标记是否重传的包,在出队时可能依据该标记进行特殊处理。
7.2、时间相关:
queue_time_sum_
和pause_time_sum_
用于统计包在队列中的存留时间,这对于带宽控制和流量管理很有用。- 提供接口如
AverageQueueTime
计算平均队列时间,用于监控流的实时性。
7.3、队列字节限制:
队列有最大可存储的字节数(max_size_
),以防止占用过多资源。
7.4、暂停/恢复功能:
可以通过 SetPauseState
暂停或者恢复队列处理。劝你最好别用!!!
八、总结:
RoundRobinPacketQueue
是一个高效的多流、多优先级调度队列,适用于 RTP 媒体数据的分组发送场景。它通过流内、流间的双重调度机制,结合优先级动态提升、统计队列时间和暂停控制等特性,确保在带宽有限的网络环境中最大程度提高数据的实时性和发送效率,是 WebRTC Pacer 模块的核心部分。