当前位置: 首页 > article >正文

FastDDS之进程内通信

目录


本文主要记录Fast DDS中进程内通信的原理和流程。

Fast DDS 允许在同一进程内的实体之间加快通信,避免了传输层带来的任何开销。进程内通信通过发布者(Publisher)直接调用订阅者(Subscriber)的接收函数,不仅避免了传输过程中的数据复制或发送操作,还确保了消息能够被订阅者接收到,从而绕过了确认机制。
进程内的通信默认是开启的,也可以通过配置XML开启。有三个值:

  • INTRAPROCESS_OFF: 关闭该功能
  • INTRAPROCESS_USER_DATA_ONLY: 发现的元数据仍然使用普通的传输方式。
  • INTRAPROCESS_FULL: 默认值,业务数据和服务发现数据都使用进程内传输(TODO:服务发现怎么使用进程内通信?)

xml配置格式如下:

<library_settings>
    <intraprocess_delivery>FULL</intraprocess_delivery> <!-- OFF | USER_DATA_ONLY | FULL -->
</library_settings>

Fast DDS 使用DomainParticipant的GuidPrefix_t来识别在同一进程中运行的对等方。如果两个参与者的GuidPrefix_t前8个字节相同,则认为它们运行在同一进程中,因此会使用进程内传递。API:is_on_same_process_as()可以判断是否在同一进程内。

待确认:Fast DDS根据主机的多个参数来分配GUID前缀,其中一个参数就是当前启用的网络接口。如果在网络运行过程中网络接口有所更改,那么新创建的DomainParticipant将会得到一个新的GUID前缀,并且系统会认为它是在不同的主机上运行。

一个使用进程内通信的subscriber接收堆栈如下:

#0  learning_dds::DDSSubscriber::SubListener::on_data_available (this=0x7ffeaee77308, reader=0x6088ac321dc0) at /FastDDS/examples/cpp/dds/HelloFastDDS/DDSSubscriber.cpp:335
#1  0x0000763e9b9cea05 in eprosima::fastdds::dds::DataReaderImpl::InnerDataReaderListener::on_data_available (this=0x6088ac322730, writer_guid=..., first_sequence=..., last_sequence=...,
    should_notify_individual_changes=@0x763e90bfe73e: false) at /FastDDS/src/cpp/fastdds/subscriber/DataReaderImpl.cpp:918
#2  0x0000763e9b883641 in eprosima::fastrtps::rtps::StatefulReader::NotifyChanges (this=0x6088ac331f30, prox=0x763e68001480) at /FastDDS/src/cpp/rtps/reader/StatefulReader.cpp:1231
#3  0x0000763e9b883199 in eprosima::fastrtps::rtps::StatefulReader::change_received (this=0x6088ac331f30, a_change=0x6088ac338d50, prox=0x763e68001480, unknown_missing_changes_up_to=0)
    at /FastDDS/src/cpp/rtps/reader/StatefulReader.cpp:1162
#4  0x0000763e9b880daa in eprosima::fastrtps::rtps::StatefulReader::processDataMsg (this=0x6088ac331f30, change=0x6088ac1f0270) at /FastDDS/src/cpp/rtps/reader/StatefulReader.cpp:651
#5  0x0000763e9b7be99b in eprosima::fastrtps::rtps::StatefulWriter::intraprocess_delivery (this=0x6088ac1f0a40, change=0x6088ac1f0270, reader_proxy=0x763e400017d0)
    at /FastDDS/src/cpp/rtps/writer/StatefulWriter.cpp:475
#6  0x0000763e9b7bf6c5 in eprosima::fastrtps::rtps::StatefulWriter::deliver_sample_to_intraprocesses (this=0x6088ac1f0a40, change=0x6088ac1f0270)
    at /FastDDS/src/cpp/rtps/writer/StatefulWriter.cpp:634
#7  0x0000763e9b7c6d4f in eprosima::fastrtps::rtps::StatefulWriter::deliver_sample_nts (this=0x6088ac1f0a40, cache_change=0x6088ac1f0270, group=..., locator_selector=..., max_blocking_time=...)
    at /FastDDS/src/cpp/rtps/writer/StatefulWriter.cpp:2186
#8  0x0000763e9bceba56 in eprosima::fastdds::rtps::FlowControllerImpl<eprosima::fastdds::rtps::FlowControllerSyncPublishMode, eprosima::fastdds::rtps::FlowControllerFifoSchedule>::add_new_sample_impl<eprosima::fastdds::rtps::FlowControllerSyncPublishMode> (this=0x6088ac16bb40, writer=0x6088ac1f0a40, change=0x6088ac1f0270, max_blocking_time=...)
    at /FastDDS/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp:1190
#9  0x0000763e9bce8a44 in eprosima::fastdds::rtps::FlowControllerImpl<eprosima::fastdds::rtps::FlowControllerSyncPublishMode, eprosima::fastdds::rtps::FlowControllerFifoSchedule>::add_new_sample (
    this=0x6088ac16bb40, writer=0x6088ac1f0a40, change=0x6088ac1f0270, max_blocking_time=...) at /FastDDS/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp:1011
#10 0x0000763e9b7be743 in eprosima::fastrtps::rtps::StatefulWriter::unsent_change_added_to_history (this=0x6088ac1f0a40, change=0x6088ac1f0270, max_blocking_time=...)
    at /FastDDS/src/cpp/rtps/writer/StatefulWriter.cpp:447
#11 0x0000763e9b801d0e in eprosima::fastrtps::rtps::WriterHistory::notify_writer (this=0x6088ac1ecd90, a_change=0x6088ac1f0270, max_blocking_time=...)
    at /FastDDS/src/cpp/rtps/history/WriterHistory.cpp:123
#12 0x0000763e9b975283 in eprosima::fastrtps::rtps::WriterHistory::add_change_with_commit_hook<eprosima::fastdds::dds::DataWriterImpl::perform_create_new_change(eprosima::fastrtps::rtps::ChangeKind_t, void*, eprosima::fastrtps::rtps::WriteParams&, const InstanceHandle_t&)::<lambda(eprosima::fastdds::dds::DataWriterImpl::CacheChange_t&)> >(eprosima::fastrtps::rtps::CacheChange_t *, eprosima::fastrtps::rtps::WriteParams &, struct {...}, std::chrono::time_point<std::chrono::_V2::steady_clock, std::chrono::duration<long, std::ratio<1, 1000000000> > >) (this=0x6088ac1ecd90,
    a_change=0x6088ac1f0270, wparams=..., pre_commit=..., max_blocking_time=...) at /FastDDS/include/fastdds/rtps/history/WriterHistory.h:208
#13 0x0000763e9b974ae0 in eprosima::fastdds::dds::DataWriterHistory::add_pub_change_with_commit_hook<eprosima::fastdds::dds::DataWriterImpl::perform_create_new_change(eprosima::fastrtps::rtps::ChangeKind_t, void*, eprosima::fastrtps::rtps::WriteParams&, const InstanceHandle_t&)::<lambda(eprosima::fastdds::dds::DataWriterImpl::CacheChange_t&)> >(eprosima::fastrtps::rtps::CacheChange_t *, eprosima::fastrtps::rtps::WriteParams &, struct {...}, std::unique_lock<std::recursive_timed_mutex> &, const std::chrono::time_point<std::chrono::_V2::steady_clock, std::chrono::duration<long, std::ratio<1, 1000000000> > > &) (this=0x6088ac1ecd90, change=0x6088ac1f0270, wparams=..., pre_commit=..., lock=..., max_blocking_time=...) at /FastDDS/src/cpp/fastdds/publisher/DataWriterHistory.hpp:141
#14 0x0000763e9b96da02 in eprosima::fastdds::dds::DataWriterImpl::perform_create_new_change (this=0x6088ac1ec8a0, change_kind=eprosima::fastrtps::rtps::ALIVE, data=0x7ffeaef0a0a8, wparams=...,
    handle=...) at /FastDDS/src/cpp/fastdds/publisher/DataWriterImpl.cpp:993
#15 0x0000763e9b96e002 in eprosima::fastdds::dds::DataWriterImpl::create_new_change_with_params (this=0x6088ac1ec8a0, changeKind=eprosima::fastrtps::rtps::ALIVE, data=0x7ffeaef0a0a8, wparams=...)
    at /FastDDS/src/cpp/fastdds/publisher/DataWriterImpl.cpp:1067
#16 0x0000763e9b96d28d in eprosima::fastdds::dds::DataWriterImpl::create_new_change (this=0x6088ac1ec8a0, changeKind=eprosima::fastrtps::rtps::ALIVE, data=0x7ffeaef0a0a8)
    at /FastDDS/src/cpp/fastdds/publisher/DataWriterImpl.cpp:914
#17 0x0000763e9b96b78a in eprosima::fastdds::dds::DataWriterImpl::write (this=0x6088ac1ec8a0, data=0x7ffeaef0a0a8) at /FastDDS/src/cpp/fastdds/publisher/DataWriterImpl.cpp:594
#18 0x0000763e9b964d63 in eprosima::fastdds::dds::DataWriter::write (this=0x6088ac1ec840, data=0x7ffeaef0a0a8) at /FastDDS/src/cpp/fastdds/publisher/DataWriter.cpp:84
#19 0x00006088ab856185 in learning_dds::DDSPublisher::SimpleInfo_value (this=0x7ffeaef09b10) at /FastDDS/examples/cpp/dds/HelloFastDDS/DDSPublisher.cpp:302
#20 0x00006088ab855f7a in learning_dds::DDSPublisher::publish (this=0x7ffeaef09b10, waitForListener=false) at /FastDDS/examples/cpp/dds/HelloFastDDS/DDSPublisher.cpp:275
#21 0x00006088ab855e2f in learning_dds::DDSPublisher::runThread (this=0x7ffeaef09b10, sleep=1000) at /FastDDS/examples/cpp/dds/HelloFastDDS/DDSPublisher.cpp:250
#22 0x00006088ab8711ac in std::__invoke_impl<void, void (learning_dds::DDSPublisher::*)(unsigned int), learning_dds::DDSPublisher*, unsigned int> (
    __f=@0x6088ac2539e8: (void (learning_dds::DDSPublisher::*)(learning_dds::DDSPublisher * const, unsigned int)) 0x6088ab855dfc <learning_dds::DDSPublisher::runThread(unsigned int)>,
    __t=@0x6088ac2539e0: 0x7ffeaef09b10) at /usr/include/c++/11/bits/invoke.h:74
#23 0x00006088ab8710c6 in std::__invoke<void (learning_dds::DDSPublisher::*)(unsigned int), learning_dds::DDSPublisher*, unsigned int> (
    __fn=@0x6088ac2539e8: (void (learning_dds::DDSPublisher::*)(learning_dds::DDSPublisher * const, unsigned int)) 0x6088ab855dfc <learning_dds::DDSPublisher::runThread(unsigned int)>)
    at /usr/include/c++/11/bits/invoke.h:96
#24 0x00006088ab870fe5 in std::thread::_Invoker<std::tuple<void (learning_dds::DDSPublisher::*)(unsigned int), learning_dds::DDSPublisher*, unsigned int> >::_M_invoke<0ul, 1ul, 2ul> (
    this=0x6088ac2539d8) at /usr/include/c++/11/bits/std_thread.h:259
#25 0x00006088ab870f7e in std::thread::_Invoker<std::tuple<void (learning_dds::DDSPublisher::*)(unsigned int), learning_dds::DDSPublisher*, unsigned int> >::operator() (this=0x6088ac2539d8)
    at /usr/include/c++/11/bits/std_thread.h:266
#26 0x00006088ab870f5e in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (learning_dds::DDSPublisher::*)(unsigned int), learning_dds::DDSPublisher*, unsigned int> > >::_M_run (
    this=0x6088ac2539d0) at /usr/include/c++/11/bits/std_thread.h:211
#27 0x0000763e9aab0253 in ?? () from /lib/x86_64-linux-gnu/libstdc++.so.6
--Type <RET> for more, q to quit, c to continue without paging--
#28 0x0000763e9a83fac3 in start_thread (arg=<optimized out>) at ./nptl/pthread_create.c:442
#29 0x0000763e9a8d0a04 in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:100

http://www.kler.cn/a/401413.html

相关文章:

  • RHCE——系统的延迟任务及定时任务
  • linux004.在ubuntu中smb.conf配置文件中配置内容详解
  • 独立开发:一人公司模式下副业产品的全流程
  • 运维面试题.云计算面试题集锦之二
  • 游戏引擎学习第12天
  • 【大语言模型】ACL2024论文-16 基于地图制图的罗马尼亚自然语言推理语料库的新型课程学习方法
  • 统计学习模型相关知识简记
  • 基于springboot健康医院门诊在线挂号系统源码和论文
  • 2024山西省网络建设运维第十八届职业院校技能大赛解析答案(3. ansible 服务)
  • 计算机网络 (2)计算机网络的类别
  • Java-04 深入浅出 MyBatis - SqlSessionFactory 与 SqlSession DAO与Mapper 代理模式
  • Kubernetes部署Grafana详细教程
  • SpringBoot线程池的使用
  • H.265流媒体播放器EasyPlayer.js H5流媒体播放器如何验证视频播放是否走硬解
  • MyBatis-Plus中使用JSON 类型字段
  • 11.15机器学习_线性回归
  • Android开发实战班 - 现代 UI 开发之 Jetpack Compose 基础
  • ssm137基于SSM框架的微博系统+vue(论文+源码)_kaic
  • 学习记录:js算法(九十九):冗余连接
  • 嵌入式硬件实战基础篇(二)-稳定输出3.3V的太阳能电池-无限充放电
  • vue 常用特性 ( 计算属性 | 侦听器 | 过滤器 )
  • 每日一练:【优先算法】双指针之移动零(easy)
  • springboot 配置文件中 multipart.max-file-size 各个版本的写法
  • 网络安全之信息收集-实战-1
  • Vue3 动态获取 assets 文件夹图片
  • 下一代以区域为导向的电子/电气架构