【WebRTC】视频发送链路中类的简单分析(下)
目录
- 1.任务队列节流发送器(TaskQueuePacedSender)
- 1.1 节流控制器添加RTP数据包(PacingController::EnqueuePacket())
- 1.2 监测是否要处理Packet(PacingController::MaybeProcessPackets())
- 2.数据包路由(PacketRouter)
- 3.模块RtpRtcp的实现(ModuleRtpRtcpImpl2)
- 4.RTP出站发送器(RTPSenderEgress)
- 5.媒体通道的实现(MediaChannelUtil)
- 6.基础通道(BaseChannel)
- 6.安全RTP传输(SrtpTransport)
- 7.DTLS-SRTP 传输(DtlsSrtpTransport)
- 8.DTLS传输(DtlsTransport)
- 9.P2P传输通道(P2PTransportChannel)
WebRTC中类的简单分析:
【WebRTC】视频发送链路中类的简单分析(上)
在前一部分当中,记录视频流已经传输到RTPSender当中的paced_sender_->EnqueuePackets(),这个函数会逐渐向底层深入,发送RTP数据包,下面记录后一部分的传输流程
1.任务队列节流发送器(TaskQueuePacedSender)
paced_sender_->EnqueuePackets()的具体实现位于TaskQueuePacedSender这个类中,声明在modules/pacing/task_queue_paced_sender.h,其中会对任务队列进行一些处理,例如暂停队列,重启队列,检查队列信息等,最核心的函数是EnqueuePackets(),用于将RTP数据包添加到队列当中
class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender {
public:
static const int kNoPacketHoldback;
// The pacer can be configured using `field_trials` or specified parameters.
//
// The `hold_back_window` parameter sets a lower bound on time to sleep if
// there is currently a pacer queue and packets can't immediately be
// processed. Increasing this reduces thread wakeups at the expense of higher
// latency.
//
// The taskqueue used when constructing a TaskQueuePacedSender will also be
// used for pacing.
/*
节流器可以通过 `field_trials` 进行配置,或者指定参数。
`hold_back_window` 参数设置了如果当前有节流队列且数据包不能立即被处理时,睡眠时间的下限。
增加这个值可以减少线程唤醒次数,但代价是增加延迟。
在构造 TaskQueuePacedSender 时使用的 taskqueue 也将用于节流。
*/
TaskQueuePacedSender(Clock* clock,
PacingController::PacketSender* packet_sender,
const FieldTrialsView& field_trials,
TimeDelta max_hold_back_window,
int max_hold_back_window_in_packets);
~TaskQueuePacedSender() override;
// The pacer is allowed to send enqued packets in bursts and can build up a
// packet "debt" that correspond to approximately the send rate during
// 'burst_interval'.
// 节流器被允许以突发的方式发送排队的数据包,并可以累积一个“债务”数据包数量,
// 这个数量大致对应于在 'burst_interval' 期间的发送速率。
// 设置发送突发间隔
void SetSendBurstInterval(TimeDelta burst_interval);
// A probe may be sent without first waing for a media packet.
// 一个探测包可能会在没有首先等待媒体数据包的情况下被发送
void SetAllowProbeWithoutMediaPacket(bool allow);
// Ensure that necessary delayed tasks are scheduled.
// 确保必要的延迟任务被安排
void EnsureStarted();
// Methods implementing RtpPacketSender.
// Adds the packet to the queue and calls
// PacingController::PacketSender::SendPacket() when it's time to send.
// 将数据包添加到队列当中
void EnqueuePackets(
std::vector<std::unique_ptr<RtpPacketToSend>> packets) override;
// Remove any pending packets matching this SSRC from the packet queue.
// 从数据包队列中移除所有与此SSRC匹配的待处理数据包
void RemovePacketsForSsrc(uint32_t ssrc) override;
// Methods implementing RtpPacketPacer.
// 创建探测集群(cluster),网络拥塞控制中,探测集群是一种用来估计网络带宽的方法
void CreateProbeClusters(
std::vector<ProbeClusterConfig> probe_cluster_configs) override;
// Temporarily pause all sending.
// 暂停发送数据包
void Pause() override;
// Resume sending packets.
// 重新发送数据包
void Resume() override;
// 设置网络是否处于拥塞状态
void SetCongested(bool congested) override;
// Sets the pacing rates. Must be called once before packets can be sent.
// 设置发送RTP包的节流速率和填充速率
void SetPacingRates(DataRate pacing_rate, DataRate padding_rate) override;
// Currently audio traffic is not accounted for by pacer and passed through.
// With the introduction of audio BWE, audio traffic will be accounted for
// in the pacer budget calculation. The audio traffic will still be injected
// at high priority.
// 指示节流器是否应该考虑音频流量
void SetAccountForAudioPackets(bool account_for_audio) override;
// 是否应该在计算发送速率时考虑传输开销。如果启用,发送器将在计算节流速率时包括传输开销,如IP和UDP头部等
void SetIncludeOverhead() override;
// 设置每个传输包的开销大小。overhead_per_packet参数指定了每个RTP包的固定开销大小,包括IP、UDP和可能的其他协议头部
void SetTransportOverhead(DataSize overhead_per_packet) override;
// Returns the time since the oldest queued packet was enqueued.
// 返回队列中最老的包自入队以来的等待时间
TimeDelta OldestPacketWaitTime() const override;
// Returns total size of all packets in the pacer queue.
// 返回节流器队列中所有包的总大小
DataSize QueueSizeData() const override;
// Returns the time when the first packet was sent;
// 返回发送的第一个数据包的时间戳
std::optional<Timestamp> FirstSentPacketTime() const override;
// Returns the number of milliseconds it will take to send the current
// packets in the queue, given the current size and bitrate, ignoring prio.
// 根据当前队列中数据包的大小和比特率,忽略优先级,返回发送当前队列中数据包所需的毫秒数
TimeDelta ExpectedQueueTime() const override;
// Set the max desired queuing delay, pacer will override the pacing rate
// specified by SetPacingRates() if needed to achieve this goal.
// 设置最大期望的排队延迟,如果需要的话,节流器将覆盖通过 SetPacingRates() 指定的节流速率以实现这一目标
void SetQueueTimeLimit(TimeDelta limit) override;
protected:
// Exposed as protected for test.
struct Stats {
Stats()
: oldest_packet_enqueue_time(Timestamp::MinusInfinity()),
queue_size(DataSize::Zero()),
expected_queue_time(TimeDelta::Zero()) {}
Timestamp oldest_packet_enqueue_time;
DataSize queue_size;
TimeDelta expected_queue_time;
std::optional<Timestamp> first_sent_packet_time;
};
void OnStatsUpdated(const Stats& stats);
private:
// Call in response to state updates that could warrant sending out packets.
// Protected against re-entry from packet sent receipts.
// 检查当前是否是发送数据包的合适时机,如果是,则直接调用ProcessPackets()来发送数据包;
// 如果不是,则安排一个延迟任务,在将来的某个时间点再次检查
// 决定是否需要根据状态更新来发送数据包。它通常在网络状态变化、新数据包到达或其他可能影响发送计划的事件发生时被调用
void MaybeScheduleProcessPackets() RTC_RUN_ON(task_queue_);
// Check if it is time to send packets, or schedule a delayed task if not.
// Use Timestamp::MinusInfinity() to indicate that this call has _not_
// been scheduled by the pacing controller. If this is the case, check if we
// can execute immediately otherwise schedule a delay task that calls this
// method again with desired (finite) scheduled process time.
// 检查是否是一个合适的时机去发送数据包
/*
1.如果scheduled_process_time是Timestamp::MinusInfinity(),这表示这个调用没有被节流控制器安排,
而是由于其他原因(如新数据包到达)被触发。在这种情况下,函数会检查是否可以立即执行ProcessPackets();
如果不可以,则安排一个延迟任务,在合适的时间再次调用MaybeProcessPackets(),并传入一个具体的计划发送时间
2.如果scheduled_process_time是一个具体的有限值,函数会检查当前时间是否已经到达或超过了这个计划时间。
如果是,它会调用ProcessPackets()来发送数据包;如果不是,它会安排一个延迟任务,在计划时间到达时再次调用
MaybeProcessPackets()
*/
void MaybeProcessPackets(Timestamp scheduled_process_time);
void UpdateStats() RTC_RUN_ON(task_queue_);
Stats GetStats() const;
Clock* const clock_;
// The holdback window prevents too frequent delayed MaybeProcessPackets()
// calls. These are only applicable if `allow_low_precision` is false.
const TimeDelta max_hold_back_window_;
const int max_hold_back_window_in_packets_;
// 节流控制器
PacingController pacing_controller_ RTC_GUARDED_BY(task_queue_);
// We want only one (valid) delayed process task in flight at a time.
// If the value of `next_process_time_` is finite, it is an id for a
// delayed task that will call MaybeProcessPackets() with that time
// as parameter.
// Timestamp::MinusInfinity() indicates no valid pending task.
/*
我们希望在任何时候只有一个(有效的)延迟处理任务在执行中。如果 `next_process_time_` 的值是有限的,
那么它就是一个延迟任务的标识符,这个任务将会使用那个时间作为参数调用 `MaybeProcessPackets()`。
Timestamp::MinusInfinity() 表示没有有效的待处理任务
*/
Timestamp next_process_time_ RTC_GUARDED_BY(task_queue_);
// Indicates if this task queue is started. If not, don't allow
// posting delayed tasks yet.
// 如果当前任务队列没有开始,不允许延时任务
bool is_started_ RTC_GUARDED_BY(task_queue_);
// Indicates if this task queue is shutting down. If so, don't allow
// posting any more delayed tasks as that can cause the task queue to
// never drain.
// 如果任务队列关闭,不允许添加延时任务
bool is_shutdown_ RTC_GUARDED_BY(task_queue_);
// Filtered size of enqueued packets, in bytes.
rtc::ExpFilter packet_size_ RTC_GUARDED_BY(task_queue_);
bool include_overhead_ RTC_GUARDED_BY(task_queue_);
Stats current_stats_ RTC_GUARDED_BY(task_queue_);
// Protects against ProcessPackets reentry from packet sent receipts.
bool processing_packets_ RTC_GUARDED_BY(task_queue_) = false;
ScopedTaskSafety safety_;
TaskQueueBase* task_queue_;
};
TaskQueuePacedSender::EnqueuePackets()的实现方式如下
void TaskQueuePacedSender::EnqueuePackets(
std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
task_queue_->PostTask(
SafeTask(safety_.flag(), [this, packets = std::move(packets)]() mutable {
RTC_DCHECK_RUN_ON(task_queue_);
TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("webrtc"),
"TaskQueuePacedSender::EnqueuePackets");
for (auto& packet : packets) {
TRACE_EVENT2(TRACE_DISABLED_BY_DEFAULT("webrtc"),
"TaskQueuePacedSender::EnqueuePackets::Loop",
"sequence_number", packet->SequenceNumber(),
"rtp_timestamp", packet->Timestamp());
size_t packet_size = packet->payload_size() + packet->padding_size();
if (include_overhead_) {
packet_size += packet->headers_size();
}
packet_size_.Apply(1, packet_size);
RTC_DCHECK_GE(packet->capture_time(), Timestamp::Zero());
// 将RTP数据包添加到PacingController当中
pacing_controller_.EnqueuePacket(std::move(packet));
}
// 检查当前时刻是否可以发送数据包
MaybeProcessPackets(Timestamp::MinusInfinity());
}));
}
上面核心的处理函数为pacing_controller_.EnqueuePacket()和MaybeProcessPackets(),功能分别是添加RTP数据包到PacingController中和检查是否可以发送数据包。
1.1 节流控制器添加RTP数据包(PacingController::EnqueuePacket())
void PacingController::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
// ...
// 探测器检测到packet
prober_.OnIncomingPacket(DataSize::Bytes(packet->payload_size()));
const Timestamp now = CurrentTime();
if (packet_queue_.Empty()) {
// If queue is empty, we need to "fast-forward" the last process time,
// so that we don't use passed time as budget for sending the first new
// packet.
Timestamp target_process_time = now;
Timestamp next_send_time = NextSendTime();
if (next_send_time.IsFinite()) {
// There was already a valid planned send time, such as a keep-alive.
// Use that as last process time only if it's prior to now.
target_process_time = std::min(now, next_send_time);
}
UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(target_process_time));
}
// 将packet送入到队列当中
packet_queue_.Push(now, std::move(packet));
seen_first_packet_ = true;
// Queue length has increased, check if we need to change the pacing rate.
MaybeUpdateMediaRateDueToLongQueue(now);
}
1.2 监测是否要处理Packet(PacingController::MaybeProcessPackets())
函数的作用是检查是否要处理当前队列中的packets,其中最核心的函数是ProcessPackets(),表示处理packets
void TaskQueuePacedSender::MaybeProcessPackets(
Timestamp scheduled_process_time) {
// ...
// Process packets and update stats.
while (next_send_time <= now + early_execute_margin) {
// 处理packets
pacing_controller_.ProcessPackets();
next_send_time = pacing_controller_.NextSendTime();
RTC_DCHECK(next_send_time.IsFinite());
// Probing state could change. Get margin after process packets.
early_execute_margin = pacing_controller_.IsProbing()
? PacingController::kMaxEarlyProbeProcessing
: TimeDelta::Zero();
}
UpdateStats();
// ...
}
pacing_controller_.ProcessPackets()的实现方式如下,主要使用packet_sender_->SendPacket()来发送数据
void PacingController::ProcessPackets() {
// ...
if (ShouldSendKeepalive(now)) {
DataSize keepalive_data_sent = DataSize::Zero();
// We can not send padding unless a normal packet has first been sent. If
// we do, timestamps get messed up.
if (seen_first_packet_) {
std::vector<std::unique_ptr<RtpPacketToSend>> keepalive_packets =
packet_sender_->GeneratePadding(DataSize::Bytes(1));
for (auto& packet : keepalive_packets) {
keepalive_data_sent +=
DataSize::Bytes(packet->payload_size() + packet->padding_size());
packet_sender_->SendPacket(std::move(packet), PacedPacketInfo());
for (auto& packet : packet_sender_->FetchFec()) {
EnqueuePacket(std::move(packet));
}
}
}
OnPacketSent(RtpPacketMediaType::kPadding, keepalive_data_sent, now);
}
// ...
DataSize data_sent = DataSize::Zero();
int iteration = 0;
int packets_sent = 0;
int padding_packets_generated = 0;
for (; iteration < circuit_breaker_threshold_; ++iteration) {
// ...
if (rtp_packet == nullptr) {
// ...
} else {
// ...
packet_sender_->SendPacket(std::move(rtp_packet), pacing_info);
for (auto& packet : packet_sender_->FetchFec()) {
EnqueuePacket(std::move(packet));
}
// ...
}
}
}
这里的packet_sender_->SendPacket()的实现由PacketRouter给出
2.数据包路由(PacketRouter)
PacketRouter的声明位于modules/pacing/packet_router.h中,这是一个路由器的类,主要的功能是接收RTP数据包,将其送入到下一模块中。另外,PacketRouter能够发送一些反馈消息
// PacketRouter keeps track of rtp send modules to support the pacer.
// In addition, it handles feedback messages, which are sent on a send
// module if possible (sender report), otherwise on receive module
// (receiver report). For the latter case, we also keep track of the
// receive modules.
/*
PacketRouter 负责跟踪 RTP 发送模块,以支持节奏发送器(pacer)。此外,它还处理反馈消息,
这些消息如果可能会在发送模块上发送(发送报告),否则则在接收模块上发送(接收报告)。
对于后者情况,我们还会跟踪接收模块。
*/
class PacketRouter : public PacingController::PacketSender {
public:
PacketRouter();
~PacketRouter() override;
PacketRouter(const PacketRouter&) = delete;
PacketRouter& operator=(const PacketRouter&) = delete;
// Callback is invoked after pacing, before a packet is forwarded to the
// sending rtp module.
// 添加一些回调函数,可以在发送数据包之前做一些定义,例如监控带宽等
void RegisterNotifyBweCallback(
absl::AnyInvocable<void(const RtpPacketToSend& packet,
const PacedPacketInfo& pacing_info)> callback);
// 添加一个Rtp发送模块(PacketRouter能够管理多个Rtp模块),REMB:Receiver Estimated Max Bitrate
void AddSendRtpModule(RtpRtcpInterface* rtp_module, bool remb_candidate);
// 移除一个Rtp发送模块
void RemoveSendRtpModule(RtpRtcpInterface* rtp_module);
// 检查当前的 PacketRouter 是否支持RTX 载填充,如果支持则具备更好的鲁棒性
bool SupportsRtxPayloadPadding() const;
// 添加一个Rtp接收模块
void AddReceiveRtpModule(RtcpFeedbackSenderInterface* rtcp_sender,
bool remb_candidate);
// 移除一个Rtp接收模块
void RemoveReceiveRtpModule(RtcpFeedbackSenderInterface* rtcp_sender);
// 核心函数,发送数据包到下一模块
void SendPacket(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info) override;
// 获取前向纠错
std::vector<std::unique_ptr<RtpPacketToSend>> FetchFec() override;
std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding(
DataSize size) override;
// 终止重传请求
void OnAbortedRetransmissions(
uint32_t ssrc,
rtc::ArrayView<const uint16_t> sequence_numbers) override;
// 查询与给定媒体 SSRC 相关联的 RTX(重传扩展)SSRC
std::optional<uint32_t> GetRtxSsrcForMedia(uint32_t ssrc) const override;
// 在一批数据包处理完成后被调用
void OnBatchComplete() override;
// Send REMB feedback.
// 发送 REMB 反馈
void SendRemb(int64_t bitrate_bps, std::vector<uint32_t> ssrcs);
// Sends `packets` in one or more IP packets.
// 将传入的 RTCP 数据包发送出去。这些数据包可能会被组合成一个或多个 IP 数据包进行发送
void SendCombinedRtcpPacket(
std::vector<std::unique_ptr<rtcp::RtcpPacket>> packets);
private:
// 添加一个候选的 REMB 模块
void AddRembModuleCandidate(RtcpFeedbackSenderInterface* candidate_module,
bool media_sender);
// 条件性地移除一个候选的 REMB 模块
void MaybeRemoveRembModuleCandidate(
RtcpFeedbackSenderInterface* candidate_module,
bool media_sender);
// 取消当前活动的 REMB 模块,在需要切换或停止使用当前的 REMB 模块时,可以调用此方法
void UnsetActiveRembModule();
// 确定当前的活动 REMB 模块,可能会根据某些条件(如模块的状态、能力等)来选择合适的模块
void DetermineActiveRembModule();
// 将一个 RTP 发送模块添加到映射中,PacketRouter 可以管理多个 RTP 模块,并根据 SSRC 跟踪它们
void AddSendRtpModuleToMap(RtpRtcpInterface* rtp_module, uint32_t ssrc);
// 从映射中移除一个 RTP 发送模块
void RemoveSendRtpModuleFromMap(uint32_t ssrc);
// 确保某些操作(如对其他成员变量的访问)只在特定线程中执行,防止数据竞争和不一致的状态
SequenceChecker thread_checker_;
// Ssrc to RtpRtcpInterface module;
// 哈希表,映射 SSRC(同步源标识符)到对应的 RTP 模块
// 通过 SSRC,系统可以快速查找和访问与特定媒体流相关的 RTP 发送模块。这种映射支持高效的数据包路由和管理
std::unordered_map<uint32_t, RtpRtcpInterface*> send_modules_map_
RTC_GUARDED_BY(thread_checker_);
// 存储指向 RTP 模块的指针
// 用于维护添加的 RTP 模块的顺序,便于迭代和管理。这种数据结构在插入和删除操作上效率较高
std::list<RtpRtcpInterface*> send_modules_list_
RTC_GUARDED_BY(thread_checker_);
// The last module used to send media.
// 指向最后一个用于发送媒体的 RTP 模块的指针
// 记录最近使用的 RTP 模块,可以在需要时快速访问。这对于优化发送过程和管理状态非常有用
RtpRtcpInterface* last_send_module_ RTC_GUARDED_BY(thread_checker_);
// Rtcp modules of the rtp receivers.
// 存储指向 RTCP 反馈发送接口的指针
// 这个向量可以包含多个 RTCP 反馈发送模块,用于处理反馈消息的发送,如带宽估计等。
// 通过管理这些发送者,系统可以灵活地发送反馈信息
std::vector<RtcpFeedbackSenderInterface*> rtcp_feedback_senders_
RTC_GUARDED_BY(thread_checker_);
// Candidates for the REMB module can be RTP sender/receiver modules, with
// the sender modules taking precedence.
// 发送REMB候选模块
std::vector<RtcpFeedbackSenderInterface*> sender_remb_candidates_
RTC_GUARDED_BY(thread_checker_);
// 接收REMB候选模块
std::vector<RtcpFeedbackSenderInterface*> receiver_remb_candidates_
RTC_GUARDED_BY(thread_checker_);
// 当前活动的REMB模块
RtcpFeedbackSenderInterface* active_remb_module_
RTC_GUARDED_BY(thread_checker_);
// 用于跟踪传输序列
uint64_t transport_seq_ RTC_GUARDED_BY(thread_checker_);
// 用于在带宽估计变化时通知
absl::AnyInvocable<void(RtpPacketToSend& packet,
const PacedPacketInfo& pacing_info)>
notify_bwe_callback_ RTC_GUARDED_BY(thread_checker_) = nullptr;
// 待发送的前向纠错(FEC)数据包
std::vector<std::unique_ptr<RtpPacketToSend>> pending_fec_packets_
RTC_GUARDED_BY(thread_checker_);
// 当前批处理中使用的 RTP 模块
std::set<RtpRtcpInterface*> modules_used_in_current_batch_
RTC_GUARDED_BY(thread_checker_);
};
PacketRouter中的核心函数SendPacket()的实现方式如下,其中调用了rtp_module->SendPacket()将RTP数据包送入RtpRtcpInterface中,rtp_module的数据类型为RtpRtcpInterface
void PacketRouter::SendPacket(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info) {
// ...
rtp_module->SendPacket(std::move(packet), cluster_info);
modules_used_in_current_batch_.insert(rtp_module);
// Sending succeeded.
if (rtp_module->SupportsRtxPayloadPadding()) {
// This is now the last module to send media, and has the desired
// properties needed for payload based padding. Cache it for later use.
last_send_module_ = rtp_module;
}
for (auto& packet : rtp_module->FetchFecPackets()) {
pending_fec_packets_.push_back(std::move(packet));
}
}
实际上,RtpRtcpInterface中并没有实现SendPacket()这个函数,而是声明为了纯虚函数,位于modules/rtp_rtcp/source/rtp_rtcp_interface.h
class RtpRtcpInterface : public RtcpFeedbackSenderInterface {
public:
// Try to send the provided packet. Returns true iff packet matches any of
// the SSRCs for this module (media/rtx/fec etc) and was forwarded to the
// transport.
virtual bool TrySendPacket(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& pacing_info) = 0;
// Returns true if the module can send media packets and the module is ready
// so send `packet` A RTP Sequence numbers may or may not have been assigned
// to the packet.
virtual bool CanSendPacket(const RtpPacketToSend& packet) const = 0;
// Assigns continuous RTP sequence number to packet.
virtual void AssignSequenceNumber(RtpPacketToSend& packet) = 0;
// Send the packet to transport. Before using this method, a caller must
// ensure the packet can be sent by first checking if the packet can be sent
// using CanSendPacket and the packet must be assigned a sequence number using
// AssignSequenceNumber.
virtual void SendPacket(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& pacing_info) = 0;
}
RtpRtcpInterface中SendPacket()的实现由ModuleRtpRtcpImpl2给出,这个类以public的方式继承了RtpRtcpInterface
3.模块RtpRtcp的实现(ModuleRtpRtcpImpl2)
RtpRtcp模块负责处理 RTP数据包的发送和接收,而ModuleRtpRtcpImpl2是RtpRtcp模块的具体实现,其中实现了RtpRtcpInterface中最重要的函数SendPacket(),声明位于modules/rtp_rtcp/source/rtp_rtcp_impl2.h
class ModuleRtpRtcpImpl2 final : public RtpRtcpInterface,
public RTCPReceiver::ModuleRtpRtcp {
public:
ModuleRtpRtcpImpl2(const Environment& env,
const RtpRtcpInterface::Configuration& configuration);
~ModuleRtpRtcpImpl2() override;
// Receiver part.
// Called when we receive an RTCP packet.
// 接收到Rtcp数据包
void IncomingRtcpPacket(
rtc::ArrayView<const uint8_t> incoming_packet) override;
// 设置远端SSRC
void SetRemoteSSRC(uint32_t ssrc) override;
// 设置本地SSRC
void SetLocalSsrc(uint32_t local_ssrc) override;
// Sender part.
// 设置发送负载频率
void RegisterSendPayloadFrequency(int payload_type,
int payload_frequency) override;
// 注销一个发送载荷类型
int32_t DeRegisterSendPayload(int8_t payload_type) override;
// 是否允许在RTP头部扩展中使用混合的extmap配置
void SetExtmapAllowMixed(bool extmap_allow_mixed) override;
// 注册一个RTP头部扩展
void RegisterRtpHeaderExtension(absl::string_view uri, int id) override;
// 注销一个RTP头部扩展
void DeregisterSendRtpHeaderExtension(absl::string_view uri) override;
bool SupportsPadding() const override;
bool SupportsRtxPayloadPadding() const override;
// Get start timestamp.
// 获取开始时间戳
uint32_t StartTimestamp() const override;
// Configure start timestamp, default is a random number.
// 设置开始时间戳
void SetStartTimestamp(uint32_t timestamp) override;
uint16_t SequenceNumber() const override;
// Set SequenceNumber, default is a random number.
void SetSequenceNumber(uint16_t seq) override;
void SetRtpState(const RtpState& rtp_state) override;
void SetRtxState(const RtpState& rtp_state) override;
RtpState GetRtpState() const override;
RtpState GetRtxState() const override;
void SetNonSenderRttMeasurement(bool enabled) override;
uint32_t SSRC() const override { return rtcp_sender_.SSRC(); }
// Semantically identical to `SSRC()` but must be called on the packet
// delivery thread/tq and returns the ssrc that maps to
// RtpRtcpInterface::Configuration::local_media_ssrc.
uint32_t local_media_ssrc() const;
void SetMid(absl::string_view mid) override;
RTCPSender::FeedbackState GetFeedbackState();
void SetRtxSendStatus(int mode) override;
int RtxSendStatus() const override;
std::optional<uint32_t> RtxSsrc() const override;
void SetRtxSendPayloadType(int payload_type,
int associated_payload_type) override;
std::optional<uint32_t> FlexfecSsrc() const override;
// Sends kRtcpByeCode when going from true to false.
// 设置发送状态
int32_t SetSendingStatus(bool sending) override;
// 检查当前是否在发送
bool Sending() const override;
// Drops or relays media packets.
// 设置发送媒体状态
void SetSendingMediaStatus(bool sending) override;
// 检查是否处于发送媒体状态
bool SendingMedia() const override;
bool IsAudioConfigured() const override;
void SetAsPartOfAllocation(bool part_of_allocation) override;
// 发送RTP帧之前被调用。它提供了RTP帧的时间戳、捕获时间、载荷类型,以及一个标志来强制发送一个发送者报告
bool OnSendingRtpFrame(uint32_t timestamp,
int64_t capture_time_ms,
int payload_type,
bool force_sender_report) override;
// 检查是否可以发送给定的RTP包。它根据当前的网络条件和发送策略来决定是否可以发送这个包
bool CanSendPacket(const RtpPacketToSend& packet) const override;
// 为RTP包分配一个序列号。RTP序列号是一个递增的整数,用于在接收端正确地排序和重组RTP流
void AssignSequenceNumber(RtpPacketToSend& packet) override;
// 发送数据包
void SendPacket(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& pacing_info) override;
// 尝试发送数据包
bool TrySendPacket(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& pacing_info) override;
// 在一批RTP包发送完成后被调用。它可以用来执行一些清理工作,或者更新发送器的状态
void OnBatchComplete() override;
// 设置前向纠错(FEC)保护参数
void SetFecProtectionParams(const FecProtectionParams& delta_params,
const FecProtectionParams& key_params) override;
std::vector<std::unique_ptr<RtpPacketToSend>> FetchFecPackets() override;
// 放弃重传操作
void OnAbortedRetransmissions(
rtc::ArrayView<const uint16_t> sequence_numbers) override;
// RTP包被确认收
void OnPacketsAcknowledged(
rtc::ArrayView<const uint16_t> sequence_numbers) override;
std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding(
size_t target_size_bytes) override;
// 获取发送的RTP数据包信息
std::vector<RtpSequenceNumberMap::Info> GetSentRtpPacketInfos(
rtc::ArrayView<const uint16_t> sequence_numbers) const override;
// ...
// Force a send of an RTCP packet.
// Normal SR and RR are triggered via the task queue that's current when this
// object is created.
// 发送RTCP数据包
int32_t SendRTCP(RTCPPacketType rtcpPacketType) override;
// 获取发送流的数据计数器
void GetSendStreamDataCounters(
StreamDataCounters* rtp_counters,
StreamDataCounters* rtx_counters) const override;
// A snapshot of the most recent Report Block with additional data of
// interest to statistics. Used to implement RTCRemoteInboundRtpStreamStats.
// Within this list, the `ReportBlockData::source_ssrc()`, which is the SSRC
// of the corresponding outbound RTP stream, is unique.
// 获取最新的接收报告块数据
std::vector<ReportBlockData> GetLatestReportBlockData() const override;
// 获取发送者报告统计信息
std::optional<SenderReportStats> GetSenderReportStats() const override;
// 获取非发送者往返时间(RTT)统计信息
std::optional<NonSenderRttStats> GetNonSenderRttStats() const override;
// (REMB) Receiver Estimated Max Bitrate.
void SetRemb(int64_t bitrate_bps, std::vector<uint32_t> ssrcs) override;
void UnsetRemb() override;
void SetTmmbn(std::vector<rtcp::TmmbItem> bounding_set) override;
// 获取Rtp最大Packet大小
size_t MaxRtpPacketSize() const override;
// 设置Rtp最大Packet大小
void SetMaxRtpPacketSize(size_t max_packet_size) override;
// (NACK) Negative acknowledgment part.
// Send a Negative acknowledgment packet.
// TODO(philipel): Deprecate SendNACK and use SendNack instead.
int32_t SendNACK(const uint16_t* nack_list, uint16_t size) override;
void SendNack(const std::vector<uint16_t>& sequence_numbers) override;
// Store the sent packets, needed to answer to a negative acknowledgment
// requests.
// 设置是否存储已发送的RTP包,以及存储多少个包
void SetStorePacketsStatus(bool enable, uint16_t number_to_store) override;
// 发送一个组合的RTCP包
void SendCombinedRtcpPacket(
std::vector<std::unique_ptr<rtcp::RtcpPacket>> rtcp_packets) override;
// Video part.
// 发送丢包通知
int32_t SendLossNotification(uint16_t last_decoded_seq_num,
uint16_t last_received_seq_num,
bool decodability_flag,
bool buffering_allowed) override;
// 获取发送速率
RtpSendRates GetSendRates() const override;
// 收到负确认(NACK)
void OnReceivedNack(
const std::vector<uint16_t>& nack_sequence_numbers) override;
// 收到RTCP报告块
void OnReceivedRtcpReportBlocks(
rtc::ArrayView<const ReportBlockData> report_blocks) override;
// 请求发送报告
void OnRequestSendReport() override;
// 设置视频比特率分配
void SetVideoBitrateAllocation(
const VideoBitrateAllocation& bitrate) override;
RTPSender* RtpSender() override;
const RTPSender* RtpSender() const override;
private:
FRIEND_TEST_ALL_PREFIXES(RtpRtcpImpl2Test, Rtt);
FRIEND_TEST_ALL_PREFIXES(RtpRtcpImpl2Test, RttForReceiverOnly);
// Rtp发送器上下文结构体
struct RtpSenderContext {
explicit RtpSenderContext(const Environment& env,
TaskQueueBase& worker_queue,
const RtpRtcpInterface::Configuration& config);
// Storage of packets, for retransmissions and padding, if applicable.
// packets的历史记录
RtpPacketHistory packet_history;
SequenceChecker sequencing_checker;
// Handles sequence number assignment and padding timestamp generation.
// 个负责RTP包序列号分配和填充时间戳生成的组件。它确保RTP包按照正确的顺序发送
PacketSequencer sequencer RTC_GUARDED_BY(sequencing_checker);
// Handles final time timestamping/stats/etc and handover to Transport.
// 负责最终时间戳标记、统计信息收集等处理,并将RTP包交给传输层(Transport)的组件
RtpSenderEgress packet_sender;
// If no paced sender configured, this class will be used to pass packets
// from `packet_generator_` to `packet_sender_`.
/*
如果没有配置节流发送器(paced sender),则non_paced_sender将被用来将packet_generator_
生成的包直接传递给packet_sender_。NonPacedPacketSender是一个不进行节流控制的发送器,
它简单地将包从一个生成器传递到另一个发送器
*/
RtpSenderEgress::NonPacedPacketSender non_paced_sender;
// Handles creation of RTP packets to be sent.
// 负责创建要发送的RTP包的组件。它根据输入的媒体流(如音频或视频)生成RTP包,
// 包括添加RTP头、有效载荷和其他必要的RTP信息
RTPSender packet_generator;
};
void set_rtt_ms(int64_t rtt_ms);
int64_t rtt_ms() const;
bool TimeToSendFullNackList(int64_t now) const;
// Called on a timer, once a second, on the worker_queue_, to update the RTT,
// check if we need to send RTCP report, send TMMBR updates and fire events.
// 周期性更新RTT,检查是否需要发送RTCP报告
void PeriodicUpdate();
// Returns true if the module is configured to store packets.
// 当前的配置下是否会存储packets
bool StorePackets() const;
// ...
};
这些函数和成员中,最重要的是结构体RtpSenderContext中的RtpSenderEgress packet_sender,这个成员调用了TrySendPacket()函数,将当前模块的packet送入到传输层进行传输。实现的方式如下,其中TrySendPacket()又调用了SendPacket()实现具体的发送任务
bool ModuleRtpRtcpImpl2::TrySendPacket(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& pacing_info) {
if (!packet || !CanSendPacket(*packet)) {
return false;
}
AssignSequenceNumber(*packet);
// 调用SendPacket()执行具体的发送任务
SendPacket(std::move(packet), pacing_info);
return true;
}
void ModuleRtpRtcpImpl2::SendPacket(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& pacing_info) {
RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker);
RTC_DCHECK(CanSendPacket(*packet));
// 将packet送入到传输层,这里的rtp_sender_数据类型为RtpSenderEgress
rtp_sender_->packet_sender.SendPacket(std::move(packet), pacing_info);
}
4.RTP出站发送器(RTPSenderEgress)
RTPSenderEgrees是RTP层面最后一个模块,其中的"Egress"翻译为”出站“或”出口“。这个模块的功能是将之前处理好的网络协议层信息(RTP和RTCP的信息)发送给传输层(Transport),交给其执行。这个类的声明位于modules/rtp_rtcp/source/rtp_sender_egress.h中
class RtpSenderEgress {
public:
// Helper class that redirects packets directly to the send part of this class
// without passing through an actual paced sender.
// 不进行节流的Packet发送器
class NonPacedPacketSender : public RtpPacketSender {
public:
NonPacedPacketSender(TaskQueueBase& worker_queue,
RtpSenderEgress* sender,
PacketSequencer* sequencer);
virtual ~NonPacedPacketSender();
void EnqueuePackets(
// ...
};
RtpSenderEgress(const Environment& env,
const RtpRtcpInterface::Configuration& config,
RtpPacketHistory* packet_history);
~RtpSenderEgress();
// 发送数据包到下一层级
void SendPacket(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& pacing_info);
void OnBatchComplete();
uint32_t Ssrc() const { return ssrc_; }
std::optional<uint32_t> RtxSsrc() const { return rtx_ssrc_; }
std::optional<uint32_t> FlexFecSsrc() const { return flexfec_ssrc_; }
RtpSendRates GetSendRates(Timestamp now) const;
void GetDataCounters(StreamDataCounters* rtp_stats,
StreamDataCounters* rtx_stats) const;
// 强制将发送数据包包含在比特率分配中
void ForceIncludeSendPacketsInAllocation(bool part_of_allocation);
// 媒体是否已被发送
bool MediaHasBeenSent() const;
// 设置媒体是否已被发送
void SetMediaHasBeenSent(bool media_sent);
// 设置时间戳偏移量
void SetTimestampOffset(uint32_t timestamp);
// For each sequence number in `sequence_number`, recall the last RTP packet
// which bore it - its timestamp and whether it was the first and/or last
// packet in that frame. If all of the given sequence numbers could be
// recalled, return a vector with all of them (in corresponding order).
// If any could not be recalled, return an empty vector.
// 获取Rtp数据包信息
std::vector<RtpSequenceNumberMap::Info> GetSentRtpPacketInfos(
rtc::ArrayView<const uint16_t> sequence_numbers) const;
void SetFecProtectionParameters(const FecProtectionParams& delta_params,
const FecProtectionParams& key_params);
std::vector<std::unique_ptr<RtpPacketToSend>> FetchFecPackets();
// Clears pending status for these sequence numbers in the packet history.
// 终止重传
void OnAbortedRetransmissions(
rtc::ArrayView<const uint16_t> sequence_numbers);
private:
// ...
// Sends packet on to `transport_`, leaving the RTP module.
// 将packet送入到transport模块,数据包离开RTP模块
bool SendPacketToNetwork(const RtpPacketToSend& packet,
const PacketOptions& options,
const PacedPacketInfo& pacing_info);
// 更新Rtp状态
void UpdateRtpStats(Timestamp now,
uint32_t packet_ssrc,
RtpPacketMediaType packet_type,
RtpPacketCounter counter,
size_t packet_size);
// ...
};
其中的核心函数为SendPacket()、CompleteSendPacket()和SendPacketToNetwork(),它们的调用关系是
SendPakcet()->CompleteSendPacket()->SendPacketToNetwork()
前面两个函数还会处理一些其他的信息,下面直接看最终出站的函数SendPacketToNetwork(),调用了transport_->SendRtp()将packet送入到transport模块中
bool RtpSenderEgress::SendPacketToNetwork(const RtpPacketToSend& packet,
const PacketOptions& options,
const PacedPacketInfo& pacing_info) {
RTC_DCHECK_RUN_ON(worker_queue_);
// SendRtp()将packet送入到transport模块中
if (transport_ == nullptr || !transport_->SendRtp(packet, options)) {
RTC_LOG(LS_WARNING) << "Transport failed to send packet.";
return false;
}
env_.event_log().Log(std::make_unique<RtcEventRtpPacketOutgoing>(
packet, pacing_info.probe_cluster_id));
return true;
}
Transport的声明位于api/call/transport.h中,其中包括了发送Rtp和Rtcp两个数据包的纯虚函数
class Transport {
public:
virtual bool SendRtp(rtc::ArrayView<const uint8_t> packet,
const PacketOptions& options) = 0;
virtual bool SendRtcp(rtc::ArrayView<const uint8_t> packet) = 0;
protected:
virtual ~Transport() {}
};
这里的SendRtp()由MediaChannelUtil::TransportForMediaChannels实现
5.媒体通道的实现(MediaChannelUtil)
MediaChannelUtil当中提供了TransportForMediaChannels类,该类封装了媒体通道的发送和接收功能,使得媒体通道可以通过统一的接口与不同的传输层(如RTP/RTCP传输)进行交互。最核心的函数为SendRtp(),能够用于发送RTP数据包,
除了发送RTP包,MediaChannelUtil还提供了接收RTCP包的功能。这包括处理RTCP报告块、发送器报告等,以支持媒体通道的监控和质量控制。另外,可能涉及将媒体通道适配到不同的传输协议和配置中,以确保媒体数据能够正确、高效地传输。
MediaChannelUtil的声明位于media/base/media_channel_impl.h中
/
/ The `MediaChannelUtil` class provides functionality that is used by
// multiple MediaChannel-like objects, of both sending and receiving
// types.
class MediaChannelUtil {
public:
MediaChannelUtil(webrtc::TaskQueueBase* network_thread,
bool enable_dscp = false);
virtual ~MediaChannelUtil();
// Returns the absolute sendtime extension id value from media channel.
virtual int GetRtpSendTimeExtnId() const;
// webrtc命名空间的transport类
webrtc::Transport* transport() { return &transport_; }
// Base methods to send packet using MediaChannelNetworkInterface.
// These methods are used by some tests only.
bool SendPacket(rtc::CopyOnWriteBuffer* packet,
const rtc::PacketOptions& options);
bool SendRtcp(rtc::CopyOnWriteBuffer* packet,
const rtc::PacketOptions& options);
int SetOption(MediaChannelNetworkInterface::SocketType type,
rtc::Socket::Option opt,
int option);
// Functions that form part of one or more interface classes.
// Not marked override, since this class does not inherit from the
// interfaces.
// Corresponds to the SDP attribute extmap-allow-mixed, see RFC8285.
// Set to true if it's allowed to mix one- and two-byte RTP header extensions
// in the same stream. The setter and getter must only be called from
// worker_thread.
void SetExtmapAllowMixed(bool extmap_allow_mixed);
bool ExtmapAllowMixed() const;
void SetInterface(MediaChannelNetworkInterface* iface);
// Returns `true` if a non-null MediaChannelNetworkInterface pointer is held.
// Must be called on the network thread.
bool HasNetworkInterface() const;
protected:
bool DscpEnabled() const;
void SetPreferredDscp(rtc::DiffServCodePoint new_dscp);
private:
// Implementation of the webrtc::Transport interface required
// by Call().
// webrtc::Transport的具体实现
class TransportForMediaChannels : public webrtc::Transport {
public:
TransportForMediaChannels(webrtc::TaskQueueBase* network_thread,
bool enable_dscp);
virtual ~TransportForMediaChannels();
// Implementation of webrtc::Transport
// webrtc::Transport的具体实现,发送Rtp数据
bool SendRtp(rtc::ArrayView<const uint8_t> packet,
const webrtc::PacketOptions& options) override;
// 发送Rtcp数据
bool SendRtcp(rtc::ArrayView<const uint8_t> packet) override;
// Not implementation of webrtc::Transport
void SetInterface(MediaChannelNetworkInterface* iface);
int SetOption(MediaChannelNetworkInterface::SocketType type,
rtc::Socket::Option opt,
int option);
// 执行packet的发送
bool DoSendPacket(rtc::CopyOnWriteBuffer* packet,
bool rtcp,
const rtc::PacketOptions& options);
// 检查网络接口
bool HasNetworkInterface() const {
RTC_DCHECK_RUN_ON(network_thread_);
return network_interface_ != nullptr;
}
// 查询DSCP是否被启用
bool DscpEnabled() const { return enable_dscp_; }
void SetPreferredDscp(rtc::DiffServCodePoint new_dscp);
private:
// This is the DSCP value used for both RTP and RTCP channels if DSCP is
// enabled. It can be changed at any time via `SetPreferredDscp`.
rtc::DiffServCodePoint PreferredDscp() const {
RTC_DCHECK_RUN_ON(network_thread_);
return preferred_dscp_;
}
// Apply the preferred DSCP setting to the underlying network interface RTP
// and RTCP channels. If DSCP is disabled, then apply the default DSCP
// value.
void UpdateDscp() RTC_RUN_ON(network_thread_);
int SetOptionLocked(MediaChannelNetworkInterface::SocketType type,
rtc::Socket::Option opt,
int option) RTC_RUN_ON(network_thread_);
const rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> network_safety_
RTC_PT_GUARDED_BY(network_thread_);
webrtc::TaskQueueBase* const network_thread_;
const bool enable_dscp_;
MediaChannelNetworkInterface* network_interface_
RTC_GUARDED_BY(network_thread_) = nullptr;
rtc::DiffServCodePoint preferred_dscp_ RTC_GUARDED_BY(network_thread_) =
rtc::DSCP_DEFAULT;
};
bool extmap_allow_mixed_ = false;
TransportForMediaChannels transport_;
};
MediaChannelUtil::TransportForMediaChannels::SendRtp()的定义位于media/base/media_channel_impl.cc中,调用DoSendPacket()执行具体的packet发送任务
bool MediaChannelUtil::TransportForMediaChannels::SendRtp(
rtc::ArrayView<const uint8_t> packet,
const webrtc::PacketOptions& options) {
auto send =
[this, packet_id = options.packet_id,
included_in_feedback = options.included_in_feedback,
included_in_allocation = options.included_in_allocation,
batchable = options.batchable,
last_packet_in_batch = options.last_packet_in_batch,
is_media = options.is_media,
packet = rtc::CopyOnWriteBuffer(packet, kMaxRtpPacketLen)]() mutable {
rtc::PacketOptions rtc_options;
rtc_options.packet_id = packet_id;
if (DscpEnabled()) {
rtc_options.dscp = PreferredDscp();
}
rtc_options.info_signaled_after_sent.included_in_feedback =
included_in_feedback;
rtc_options.info_signaled_after_sent.included_in_allocation =
included_in_allocation;
rtc_options.info_signaled_after_sent.is_media = is_media;
rtc_options.batchable = batchable;
rtc_options.last_packet_in_batch = last_packet_in_batch;
// 执行发送packet
DoSendPacket(&packet, false, rtc_options);
};
// ...
}
DoSendPacket()的定义如下
bool MediaChannelUtil::TransportForMediaChannels::DoSendPacket(
rtc::CopyOnWriteBuffer* packet,
bool rtcp,
const rtc::PacketOptions& options) {
// ...
// 如果不是Rtcp,则使用SendPacket()发送数据包
return (!rtcp) ? network_interface_->SendPacket(packet, options)
: network_interface_->SendRtcp(packet, options);
}
这里的network_interface_的数据类型为MediaChannelNetworkInterface,这个类中只定义了一些纯虚函数,没有实现SendPacket(),SendPacket()的实现由BaseChannel给出
6.基础通道(BaseChannel)
BaseChannel是WebRTC中的通道层核心,为不同类型的媒体通道(如音频、视频和数据)提供了一个共同的接口和实现基础。另外,BaseChannel与PeerConnection和Transport层进行对接,管理底层的网络连接和数据传输。从RTP数据包传输的角度来说,BaseChannel中实现了将Transport模块中的packet送入到PeerConnection通道上。BaseChannel的声明位于pc/channel.h中,其中的pc表示”Peer Connection“。
由于BaseChannel的声明很长,这里截取部分
class BaseChannel : public ChannelInterface,
// TODO(tommi): Consider implementing these interfaces
// via composition.
public MediaChannelNetworkInterface,
public webrtc::RtpPacketSinkInterface {
// ...
// NetworkInterface implementation, called by MediaEngine
// 发送packet
bool SendPacket(rtc::CopyOnWriteBuffer* packet,
const rtc::PacketOptions& options) override;
// 发送Rtcp数据包
bool SendRtcp(rtc::CopyOnWriteBuffer* packet,
const rtc::PacketOptions& options) override;
// From RtpTransportInternal
void OnWritableState(bool writable);
// 检测到网络路由案发生变化
void OnNetworkRouteChanged(std::optional<rtc::NetworkRoute> network_route);
// 带有选项的Packet数据包发送
bool SendPacket(bool rtcp,
rtc::CopyOnWriteBuffer* packet,
const rtc::PacketOptions& options);
}
其中最重要的是SendPacket()函数
bool BaseChannel::SendPacket(bool rtcp,
rtc::CopyOnWriteBuffer* packet,
const rtc::PacketOptions& options) {
// ...
if (!srtp_active()) {
if (srtp_required_) {
// The audio/video engines may attempt to send RTCP packets as soon as the
// streams are created, so don't treat this as an error for RTCP.
// See: https://bugs.chromium.org/p/webrtc/issues/detail?id=6809
// However, there shouldn't be any RTP packets sent before SRTP is set
// up (and SetSend(true) is called).
RTC_DCHECK(rtcp) << "Can't send outgoing RTP packet for " << ToString()
<< " when SRTP is inactive and crypto is required";
return false;
}
RTC_DLOG(LS_WARNING) << "Sending an " << (rtcp ? "RTCP" : "RTP")
<< " packet without encryption for " << ToString()
<< ".";
}
// ...
// 如果不是rtcp,使用SendRtpPacket()传递packet
return rtcp ? rtp_transport_->SendRtcpPacket(packet, options, PF_SRTP_BYPASS)
: rtp_transport_->SendRtpPacket(packet, options, PF_SRTP_BYPASS);
}
rtp_transport_的数据类型为RtpTransportInternal,这是一个基础类,其中只定义了SendRtpPacket()的纯虚函数
// TODO(zhihuang): Pass the `packet` by copy so that the original data
// wouldn't be modified.
virtual bool SendRtpPacket(rtc::CopyOnWriteBuffer* packet,
const rtc::PacketOptions& options,
int flags) = 0;
virtual bool SendRtcpPacket(rtc::CopyOnWriteBuffer* packet,
const rtc::PacketOptions& options,
int flags) = 0;
从BaseChannel::SendPacket()中看,会先使用srtp_active(),考虑了Srtp的情况,所以这里SendRtpPacket()应该是由SrtpTransport实现
6.安全RTP传输(SrtpTransport)
SrtpTransport继承自RtpTransport,主要保障RTP的安全性,声明位于pc/srtp_transpory.h中
class SrtpTransport : public RtpTransport {
public:
SrtpTransport(bool rtcp_mux_enabled, const FieldTrialsView& field_trials);
virtual ~SrtpTransport() = default;
// 发送Rtp数据包
bool SendRtpPacket(rtc::CopyOnWriteBuffer* packet,
const rtc::PacketOptions& options,
int flags) override;
// 发送Rtcp数据包
bool SendRtcpPacket(rtc::CopyOnWriteBuffer* packet,
const rtc::PacketOptions& options,
int flags) override;
// The transport becomes active if the send_session_ and recv_session_ are
// created.
// 如果创建了发送和接收会话,则transport会被启用
bool IsSrtpActive() const override;
bool IsWritable(bool rtcp) const override;
// Create new send/recv sessions and set the negotiated crypto keys for RTP
// packet encryption. The keys can either come from SDES negotiation or DTLS
// handshake.
// 设置Rtp参数
bool SetRtpParams(int send_crypto_suite,
const rtc::ZeroOnFreeBuffer<uint8_t>& send_key,
const std::vector<int>& send_extension_ids,
int recv_crypto_suite,
const rtc::ZeroOnFreeBuffer<uint8_t>& recv_key,
const std::vector<int>& recv_extension_ids);
// Create new send/recv sessions and set the negotiated crypto keys for RTCP
// packet encryption. The keys can either come from SDES negotiation or DTLS
// handshake.
// 设置Rtcp参数
bool SetRtcpParams(int send_crypto_suite,
const rtc::ZeroOnFreeBuffer<uint8_t>& send_key,
const std::vector<int>& send_extension_ids,
int recv_crypto_suite,
const rtc::ZeroOnFreeBuffer<uint8_t>& recv_key,
const std::vector<int>& recv_extension_ids);
// ...
}
SrtpTransport::SendRtpPacket()的定义位于pc/srtp_transport.cc中,其中最后会调用SendPacket()发送packet
bool SrtpTransport::SendRtpPacket(rtc::CopyOnWriteBuffer* packet,
const rtc::PacketOptions& options,
int flags) {
// ...
return SendPacket(/*rtcp=*/false, packet, updated_options, flags);
}
值得注意的是,SrtpTransport中并没有实现SendPacket()函数,而是继承自其父类RtpTransport中的SendPacket(),定义位于pc/rtp_transport.cc中
bool RtpTransport::SendPacket(bool rtcp,
rtc::CopyOnWriteBuffer* packet,
const rtc::PacketOptions& options,
int flags) {
rtc::PacketTransportInternal* transport = rtcp && !rtcp_mux_enabled_
? rtcp_packet_transport_
: rtp_packet_transport_;
// 发送packet
int ret = transport->SendPacket(packet->cdata<char>(), packet->size(),
options, flags);
if (ret != static_cast<int>(packet->size())) {
if (set_ready_to_send_false_if_send_fail_) {
// TODO: webrtc:361124449 - Remove SetReadyToSend if field trial
// WebRTC-SetReadyToSendFalseIfSendFail succeed 2024-12-01.
if (transport->GetError() == ENOTCONN) {
RTC_LOG(LS_WARNING) << "Got ENOTCONN from transport.";
SetReadyToSend(rtcp, false);
}
}
return false;
}
return true;
}
在上面的调用之中,rtp_packet_transport_由RtpTransport::SetRtpPacketTransport()确定,这个函数会在DtlsSrtpTransport::SetDtlsTransports()中调用,所以这里的transport使用的SendPacket()函数由DtlsSrtpTransport这个类来实现
7.DTLS-SRTP 传输(DtlsSrtpTransport)
DtlsSrtpTransport结合了DTLS(Datagram Transport Layer Security)的密钥交换功能和 SRTP(Secure Real-time Transport Protocol)的媒体加密功能,为WebRTC提供安全保障。DTLS 用于在通信双方之间安全地协商加密密钥,而 SRTP 则使用这些密钥来加密实时媒体流(如音频和视频),一旦 DTLS 握手完成,DtlsSrtpTransport 确保通过该通道发送的 RTP 或 RTCP 数据被安全地加密和保护。这意味着数据在传输过程中不会被窃听或篡改,从而保护通信的隐私和完整性。这个类声明在pc/dtls_srtp_transport.h中
// The subclass of SrtpTransport is used for DTLS-SRTP. When the DTLS handshake
// is finished, it extracts the keying materials from DtlsTransport and
// configures the SrtpSessions in the base class.
/*
SrtpTransport 的子类用于 DTLS-SRTP。当 DTLS 握手完成后,它从 DtlsTransport 中提取
密钥材料,并在基类中配置 SrtpSessions
*/
class DtlsSrtpTransport : public SrtpTransport {
public:
DtlsSrtpTransport(bool rtcp_mux_enabled, const FieldTrialsView& field_trials);
// Set P2P layer RTP/RTCP DtlsTransports. When using RTCP-muxing,
// `rtcp_dtls_transport` is null.
// 设置 P2P 层的 RTP/RTCP DtlsTransports。当使用 RTCP-muxing 时,`rtcp_dtls_transport` 为 null
void SetDtlsTransports(cricket::DtlsTransportInternal* rtp_dtls_transport,
cricket::DtlsTransportInternal* rtcp_dtls_transport);
// 启用或禁用RTCP多路复用
void SetRtcpMuxEnabled(bool enable) override;
// Set the header extension ids that should be encrypted.
// 更新应该被加密的发送方RTP头部扩展ID列表
void UpdateSendEncryptedHeaderExtensionIds(
const std::vector<int>& send_extension_ids);
// 更新应该被加密的接收方RTP头部扩展ID列表
void UpdateRecvEncryptedHeaderExtensionIds(
const std::vector<int>& recv_extension_ids);
// 检测到DTLS状态发生变化
void SetOnDtlsStateChange(std::function<void(void)> callback);
// If `active_reset_srtp_params_` is set to be true, the SRTP parameters will
// be reset whenever the DtlsTransports are reset.
// 设置是否在DTLS传输被重置时主动重置SRTP参数
void SetActiveResetSrtpParams(bool active_reset_srtp_params) {
active_reset_srtp_params_ = active_reset_srtp_params;
}
private:
// ...
// Owned by the TransportController.
cricket::DtlsTransportInternal* rtp_dtls_transport_ = nullptr;
cricket::DtlsTransportInternal* rtcp_dtls_transport_ = nullptr;
// The encrypted header extension IDs.
std::optional<std::vector<int>> send_extension_ids_;
std::optional<std::vector<int>> recv_extension_ids_;
bool active_reset_srtp_params_ = false;
std::function<void(void)> on_dtls_state_change_;
};
使用SetDtlsTransport()来配置RtpTransport::SendPacket()中的transport,如下所示
void DtlsSrtpTransport::SetDtlsTransports(
cricket::DtlsTransportInternal* rtp_dtls_transport,
cricket::DtlsTransportInternal* rtcp_dtls_transport) {
// ...
SetRtcpDtlsTransport(rtcp_dtls_transport);
SetRtcpPacketTransport(rtcp_dtls_transport);
RTC_LOG(LS_INFO) << "Setting RTP Transport on " << transport_name
<< " transport " << rtp_dtls_transport;
SetRtpDtlsTransport(rtp_dtls_transport);
// 设置rtp_dtls_transport为RtpTransport::SendPacket()中的的transport
// rtp_dtls_transport的数据类型为cricket::DtlsTransportInternal
// SetRtpPacketTransport()的定义位于RtpTransport当中
SetRtpPacketTransport(rtp_dtls_transport);
MaybeSetupDtlsSrtp();
}
从代码中看,使用rtp_dtls_transport初始化,rtp_dtls_transport的数据类型为cricket::DtlsTransportInternal*,这是一个基础类,其实现由DtlsTransport给出。
在WebRTC中,存在两个DtlsTransport的定义,分别位于pc文件夹和p2p/base两个文件夹,这两者的作用有所区别,p2p/base下的dtls_transport.h提供了底层的DTLS传输功能,包括数据包的发送和接收,而pc下的 dtls_transport.h提供了与高层API交互的接口,用于管理和配置DTLS传输。这里需要使用p2p/base,执行RTP数据包的发送
8.DTLS传输(DtlsTransport)
DTLS传输定义位于p2p/base/dtls_transport.h中,声明了数据包发送和接收的函数
// This class provides a DTLS SSLStreamAdapter inside a TransportChannel-style
// packet-based interface, wrapping an existing TransportChannel instance
// (e.g a P2PTransportChannel)
/*
这个类在TransportChannel风格的基于数据包的接口内提供了一个DTLS SSLStreamAdapter,
包装了一个现有的TransportChannel实例(例如P2PTransportChannel)。
*/
// Here's the way this works:
//
// DtlsTransport {
// SSLStreamAdapter* dtls_ {
// StreamInterfaceChannel downward_ {
// IceTransportInternal* ice_transport_;
// }
// }
// }
//
// - Data which comes into DtlsTransport from the underlying
// ice_transport_ via OnReadPacket() is checked for whether it is DTLS
// or not, and if it is, is passed to DtlsTransport::HandleDtlsPacket,
// which pushes it into to downward_. dtls_ is listening for events on
// downward_, so it immediately calls downward_->Read().
//
/*
通过OnReadPacket()从底层ice_transport_传入DtlsTransport的数据会被检查是否为DTLS数据,
如果是,会被传递给DtlsTransport::HandleDtlsPacket,该函数将其推送到downward_。dtls_
监听downward_上的事件,因此它会立即调用downward_->Read()。
*/
// - Data written to DtlsTransport is passed either to downward_ or directly
// to ice_transport_, depending on whether DTLS is negotiated and whether
// the flags include PF_SRTP_BYPASS
//
/*
写入DtlsTransport的数据会被传递给downward_或者直接传递给ice_transport_,
这取决于是否协商了DTLS以及标志是否包含PF_SRTP_BYPASS。
*/
// - The SSLStreamAdapter writes to downward_->Write() which translates it
// into packet writes on ice_transport_.
//
/*
SSLStreamAdapter 写入到 downward_->Write(),这将其转换为在 ice_transport_ 上的数据包写入
*/
// This class is not thread safe; all methods must be called on the same thread
// as the constructor.
// 这个类不是线程安全的;所有方法都必须在与构造函数相同的线程上调用
class DtlsTransport : public DtlsTransportInternal {
public:
// `ice_transport` is the ICE transport this DTLS transport is wrapping. It
// must outlive this DTLS transport.
//
// `crypto_options` are the options used for the DTLS handshake. This affects
// whether GCM crypto suites are negotiated.
//
// `event_log` is an optional RtcEventLog for logging state changes. It should
// outlive the DtlsTransport.
DtlsTransport(
IceTransportInternal* ice_transport,
const webrtc::CryptoOptions& crypto_options,
webrtc::RtcEventLog* event_log,
rtc::SSLProtocolVersion max_version = rtc::SSL_PROTOCOL_DTLS_12);
~DtlsTransport() override;
DtlsTransport(const DtlsTransport&) = delete;
DtlsTransport& operator=(const DtlsTransport&) = delete;
// ...
// Called to send a packet (via DTLS, if turned on).
int SendPacket(const char* data,
size_t size,
const rtc::PacketOptions& options,
int flags) override;
// ...
IceTransportInternal* ice_transport() override;
// Underlying ice_transport, not owned by this class.
IceTransportInternal* const ice_transport_;
std::unique_ptr<rtc::SSLStreamAdapter> dtls_; // The DTLS stream
// ...
};
其中最核心的发送函数SendPacket()的定义如下
// Called from upper layers to send a media packet.
int DtlsTransport::SendPacket(const char* data,
size_t size,
const rtc::PacketOptions& options,
int flags) {
if (!dtls_active_) {
// Not doing DTLS.
// 不进行dtls,直接发送packet
return ice_transport_->SendPacket(data, size, options);
}
switch (dtls_state()) {
// ...
case webrtc::DtlsTransportState::kConnected:
if (flags & PF_SRTP_BYPASS) {
RTC_DCHECK(!srtp_ciphers_.empty());
if (!IsRtpPacket(rtc::MakeArrayView(
reinterpret_cast<const uint8_t*>(data), size))) {
return -1;
}
// ICE接口来发送packet
return ice_transport_->SendPacket(data, size, options);
} else {
// ...
}
}
}
这里进一步使用ice_transport_来发送数据包,这是一个IceTransportInternal数据类型,其SendPacket()的实现由P2PTransportChannel给出
9.P2P传输通道(P2PTransportChannel)
// P2PTransportChannel manages the candidates and connection process to keep
// two P2P clients connected to each other.
class RTC_EXPORT P2PTransportChannel : public IceTransportInternal,
public IceAgentInterface {
public:
static std::unique_ptr<P2PTransportChannel> Create(
absl::string_view transport_name,
int component,
webrtc::IceTransportInit init);
// For testing only.
// TODO(zstein): Remove once AsyncDnsResolverFactory is required.
P2PTransportChannel(absl::string_view transport_name,
int component,
PortAllocator* allocator,
const webrtc::FieldTrialsView* field_trials = nullptr);
~P2PTransportChannel() override;
P2PTransportChannel(const P2PTransportChannel&) = delete;
P2PTransportChannel& operator=(const P2PTransportChannel&) = delete;
// ...
// From TransportChannel:
int SendPacket(const char* data,
size_t len,
const rtc::PacketOptions& options,
int flags) override;
// ...
}
SendPacket()的定义如下
/ Send data to the other side, using our selected connection.
int P2PTransportChannel::SendPacket(const char* data,
size_t len,
const rtc::PacketOptions& options,
int flags) {
// ...
// 发送数据包,送入到网络中进行传输
int sent = selected_connection_->Send(data, len, modified_options);
// ...
}
selected_connection_->Send()中使用SendTo()发送数据包到给定的IP地址
int ProxyConnection::Send(const void* data,
size_t size,
const rtc::PacketOptions& options) {
// ...
int sent =
port_->SendTo(data, size, remote_candidate_.address(), options, true);
// ...
}
SendTo()函数根据具体的协议有所不同,例如UDP协议和TCP协议
int UDPPort::SendTo(const void* data,
size_t size,
const rtc::SocketAddress& addr,
const rtc::PacketOptions& options,
bool payload);
int TCPPort::SendTo(const void* data,
size_t size,
const rtc::SocketAddress& addr,
const rtc::PacketOptions& options,
bool payload);
SendTo()之后就是具体的网络部分了,这样视频流发送过程中类的简单分析就结束了