当前位置: 首页 > article >正文

详解WebRTC rtc::Thread实现

rtc::Thread介绍

rtc::Thread类不仅仅实现了线程这个执行器(比如posix底层调用pthread相关接口创建线程,管理线程等),还包括消息队列(message_queue)的实现,rtc::Thread启动后就作为一个永不停止的event loop,没有任务待执行就阻塞等待,添加任务后就唤醒event loop,去执行任务,周而复始,直到调用stop退出event loop,退出线程(线程join)。

在WebRTC内部,可以将消息队列等同于event loop,消息队列为空,就进行阻塞等待。


class RTC_LOCKABLE Thread : public MessageQueue {

Thread关键接口

public:
 // Starts the execution of the thread.
  bool Start(Runnable* runnable = nullptr);

  // Tells the thread to stop and waits until it is joined.
  // Never call Stop on the current thread.  Instead use the inherited Quit
  // function which will exit the base MessageQueue without terminating the
  // underlying OS thread.
  virtual void Stop();
  
  virtual void Send(const Location& posted_from,
                    MessageHandler* phandler,
                    uint32_t id = 0,
                    MessageData* pdata = nullptr);

  // Convenience method to invoke a functor on another thread.  Caller must
  // provide the |ReturnT| template argument, which cannot (easily) be deduced.
  // Uses Send() internally, which blocks the current thread until execution
  // is complete.
  // Ex: bool result = thread.Invoke<bool>(RTC_FROM_HERE,
  // &MyFunctionReturningBool);
  // NOTE: This function can only be called when synchronous calls are allowed.
  // See ScopedDisallowBlockingCalls for details.
  template <class ReturnT, class FunctorT>
  ReturnT Invoke(const Location& posted_from, FunctorT&& functor) {
    FunctorMessageHandler<ReturnT, FunctorT> handler(
        std::forward<FunctorT>(functor));
    InvokeInternal(posted_from, &handler);
    return handler.MoveResult();
  }
    // ProcessMessages will process I/O and dispatch messages until:
  //  1) cms milliseconds have elapsed (returns true)
  //  2) Stop() is called (returns false)
  bool ProcessMessages(int cms);
  
 protected:
  // Blocks the calling thread until this thread has terminated.
  void Join();

MessageQueue关键接口

public:
virtual void Quit();

// Get() will process I/O until:
//  1) A message is available (returns true)
//  2) cmsWait seconds have elapsed (returns false)
//  3) Stop() is called (returns false)
virtual bool Get(Message* pmsg,
                 int cmsWait = kForever,
                 bool process_io = true);

virtual void Post(const Location& posted_from,
                  MessageHandler* phandler,
                  uint32_t id = 0,
                  MessageData* pdata = nullptr,
                  bool time_sensitive = false);
virtual void PostDelayed(const Location& posted_from,
                         int cmsDelay,
                         MessageHandler* phandler,
                         uint32_t id = 0,
                         MessageData* pdata = nullptr);
virtual void PostAt(const Location& posted_from,
                    int64_t tstamp,
                    MessageHandler* phandler,
                    uint32_t id = 0,
                    MessageData* pdata = nullptr);

virtual void Dispatch(Message* pmsg);
virtual void ReceiveSends();

protected:
void WakeUpSocketServer();

MessageList msgq_ RTC_GUARDED_BY(crit_);
PriorityQueue dmsgq_ RTC_GUARDED_BY(crit_);

线程启动Start

调用Start接口启动底层线程,同时进入一个永不停止的event loop(除非调用Stop接口)
流程如下:
Start->pthread_create->PreRun->Run

void Thread::Run() {
  ProcessMessages(kForever);
}

在这里插入图片描述
最终通过Get接口获取消息去执行(Dispatch),Get获取不到消息就是进入阻塞状态(wait),等待有消息后被唤醒。
在这里插入图片描述

线程消息队列处理消息的流程ProcessMessage

  • 1、处理从其他线程发送的要在本线程去执行的消息,即同步调用
    在这里插入图片描述

接收者线程处理流程:
在这里插入图片描述在这里插入图片描述

发送者线程流程:
在这里插入图片描述

  • 2、处理延迟消息(存储在优先级队列)
    延迟消息是通过PostDelayed和PostAt接口调用然后push到优先级队列中(dmsgq_,小根堆)
    在这里插入图片描述

  • 3、异步消息(存储在普通队列里)
    延迟消息是通过Pos接口调用然后push到普通队列中(msgq_)
    在这里插入图片描述

任务提交方式(Invoke/Post)

webrtc内部消息其实是对待执行任务的封装,消息和任务可以认为是一个意思

消息要继承MessageHandler,实现OnMessage

class MessageHandler {
 public:
  virtual ~MessageHandler();
  virtual void OnMessage(Message* msg) = 0;

 protected:
  MessageHandler() {}

 private:
  RTC_DISALLOW_COPY_AND_ASSIGN(MessageHandler);
};

因为执行消息,实际上就是执行OnMessage(详见Dispatch接口实现)
在这里插入图片描述

上一章节其实已经把三种任务提交方式介绍过了
1、同步阻塞调用(Send,Invoke)
Invoke其实最终也是调用Send,Invoke是个函数模版,可以非常方便在目标执行线程执行函数然后获得返回值,Invoke实现如下:

  // Convenience method to invoke a functor on another thread.  Caller must
  // provide the |ReturnT| template argument, which cannot (easily) be deduced.
  // Uses Send() internally, which blocks the current thread until execution
  // is complete.
  // Ex: bool result = thread.Invoke<bool>(RTC_FROM_HERE,
  // &MyFunctionReturningBool);
  // NOTE: This function can only be called when synchronous calls are allowed.
  // See ScopedDisallowBlockingCalls for details.
  template <class ReturnT, class FunctorT>
  ReturnT Invoke(const Location& posted_from, FunctorT&& functor) {
    FunctorMessageHandler<ReturnT, FunctorT> handler(
        std::forward<FunctorT>(functor));
    InvokeInternal(posted_from, &handler);
    return handler.MoveResult();
  }

void Thread::InvokeInternal(const Location& posted_from,
                            MessageHandler* handler) {
  TRACE_EVENT2("webrtc", "Thread::Invoke", "src_file_and_line",
               posted_from.file_and_line(), "src_func",
               posted_from.function_name());
  Send(posted_from, handler);
}

调用方式举例:

bool result = thread.Invoke<bool>(RTC_FROM_HERE, &MyFunctionReturningBool);

2、异步非阻塞延迟调用
PostDelayed和PostAt

3、异步非阻塞调用
Post

线程退出Stop

void Thread::Stop() {
  MessageQueue::Quit();
  Join();
}

void MessageQueue::Quit() {
  AtomicOps::ReleaseStore(&stop_, 1);
  WakeUpSocketServer();
}

void Thread::Join() {
  if (!IsRunning())
    return;

  RTC_DCHECK(!IsCurrent());
  if (Current() && !Current()->blocking_calls_allowed_) {
    RTC_LOG(LS_WARNING) << "Waiting for the thread to join, "
                        << "but blocking calls have been disallowed";
  }

#if defined(WEBRTC_WIN)
  RTC_DCHECK(thread_ != nullptr);
  WaitForSingleObject(thread_, INFINITE);
  CloseHandle(thread_);
  thread_ = nullptr;
  thread_id_ = 0;
#elif defined(WEBRTC_POSIX)
  pthread_join(thread_, nullptr);
  thread_ = 0;
#endif
}

http://www.kler.cn/a/229950.html

相关文章:

  • 微信小程序实现登录注册
  • OpenCV相机标定与3D重建(53)解决 Perspective-3-Point (P3P) 问题函数solveP3P()的使用
  • MMDetection框架下的常见目标检测与分割模型综述与实践指南
  • Linux 文件的特殊权限—ACL项目练习
  • ArrayList和HashMap区别
  • 如何让用户在网页中填写PDF表格?
  • 全新 鸿蒙系统
  • Leetcode—33. 搜索旋转排序数组【中等】
  • Spring设计模式之单例模式
  • 计算huggingface模型占用硬盘空间的实战代码
  • 电机粘性阻尼系数D
  • 分享springboot框架的一个开源的本地开发部署教程(若依开源项目开发部署过程分享持续更新二开宝藏项目MySQL数据库版)
  • opensuse安装百度Linux输入法
  • 2024.02.05
  • 假期2.5
  • 六轴机器人奇异点
  • C++——stack与queue与容器适配器
  • 基于Vue2用keydown、keyup事件实现长按键盘任意键(或组合键)3秒触发自定义事件(以F1键为例)
  • 小学教师职称等级顺序 申请条件有哪些要求
  • 《C程序设计》上机实验报告(八)之结构体和共用体
  • GNU C和标准C
  • 风控安全产品系统设计
  • 2024年考PMP还有什么用?
  • Leetcode 55. 跳跃游戏
  • 五大架构风格之三:独立构件风格
  • 找城市 - 华为OD统一考试