FastDDS之进程内通信
目录
本文主要记录Fast DDS中进程内通信的原理和流程。
Fast DDS 允许在同一进程内的实体之间加快通信,避免了传输层带来的任何开销。进程内通信通过发布者(Publisher)直接调用订阅者(Subscriber)的接收函数,不仅避免了传输过程中的数据复制或发送操作,还确保了消息能够被订阅者接收到,从而绕过了确认机制。
进程内的通信默认是开启的,也可以通过配置XML开启。有三个值:
INTRAPROCESS_OFF
: 关闭该功能INTRAPROCESS_USER_DATA_ONLY
: 发现的元数据仍然使用普通的传输方式。INTRAPROCESS_FULL
: 默认值,业务数据和服务发现数据都使用进程内传输(TODO:服务发现怎么使用进程内通信?)
xml配置格式如下:
<library_settings>
<intraprocess_delivery>FULL</intraprocess_delivery> <!-- OFF | USER_DATA_ONLY | FULL -->
</library_settings>
Fast DDS 使用DomainParticipant的GuidPrefix_t
来识别在同一进程中运行的对等方。如果两个参与者的GuidPrefix_t
的前8个字节相同,则认为它们运行在同一进程中,因此会使用进程内传递。API:is_on_same_process_as()
可以判断是否在同一进程内。
待确认:Fast DDS根据主机的多个参数来分配GUID前缀,其中一个参数就是当前启用的网络接口。如果在网络运行过程中网络接口有所更改,那么新创建的DomainParticipant将会得到一个新的GUID前缀,并且系统会认为它是在不同的主机上运行。
一个使用进程内通信的subscriber接收堆栈如下:
#0 learning_dds::DDSSubscriber::SubListener::on_data_available (this=0x7ffeaee77308, reader=0x6088ac321dc0) at /FastDDS/examples/cpp/dds/HelloFastDDS/DDSSubscriber.cpp:335
#1 0x0000763e9b9cea05 in eprosima::fastdds::dds::DataReaderImpl::InnerDataReaderListener::on_data_available (this=0x6088ac322730, writer_guid=..., first_sequence=..., last_sequence=...,
should_notify_individual_changes=@0x763e90bfe73e: false) at /FastDDS/src/cpp/fastdds/subscriber/DataReaderImpl.cpp:918
#2 0x0000763e9b883641 in eprosima::fastrtps::rtps::StatefulReader::NotifyChanges (this=0x6088ac331f30, prox=0x763e68001480) at /FastDDS/src/cpp/rtps/reader/StatefulReader.cpp:1231
#3 0x0000763e9b883199 in eprosima::fastrtps::rtps::StatefulReader::change_received (this=0x6088ac331f30, a_change=0x6088ac338d50, prox=0x763e68001480, unknown_missing_changes_up_to=0)
at /FastDDS/src/cpp/rtps/reader/StatefulReader.cpp:1162
#4 0x0000763e9b880daa in eprosima::fastrtps::rtps::StatefulReader::processDataMsg (this=0x6088ac331f30, change=0x6088ac1f0270) at /FastDDS/src/cpp/rtps/reader/StatefulReader.cpp:651
#5 0x0000763e9b7be99b in eprosima::fastrtps::rtps::StatefulWriter::intraprocess_delivery (this=0x6088ac1f0a40, change=0x6088ac1f0270, reader_proxy=0x763e400017d0)
at /FastDDS/src/cpp/rtps/writer/StatefulWriter.cpp:475
#6 0x0000763e9b7bf6c5 in eprosima::fastrtps::rtps::StatefulWriter::deliver_sample_to_intraprocesses (this=0x6088ac1f0a40, change=0x6088ac1f0270)
at /FastDDS/src/cpp/rtps/writer/StatefulWriter.cpp:634
#7 0x0000763e9b7c6d4f in eprosima::fastrtps::rtps::StatefulWriter::deliver_sample_nts (this=0x6088ac1f0a40, cache_change=0x6088ac1f0270, group=..., locator_selector=..., max_blocking_time=...)
at /FastDDS/src/cpp/rtps/writer/StatefulWriter.cpp:2186
#8 0x0000763e9bceba56 in eprosima::fastdds::rtps::FlowControllerImpl<eprosima::fastdds::rtps::FlowControllerSyncPublishMode, eprosima::fastdds::rtps::FlowControllerFifoSchedule>::add_new_sample_impl<eprosima::fastdds::rtps::FlowControllerSyncPublishMode> (this=0x6088ac16bb40, writer=0x6088ac1f0a40, change=0x6088ac1f0270, max_blocking_time=...)
at /FastDDS/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp:1190
#9 0x0000763e9bce8a44 in eprosima::fastdds::rtps::FlowControllerImpl<eprosima::fastdds::rtps::FlowControllerSyncPublishMode, eprosima::fastdds::rtps::FlowControllerFifoSchedule>::add_new_sample (
this=0x6088ac16bb40, writer=0x6088ac1f0a40, change=0x6088ac1f0270, max_blocking_time=...) at /FastDDS/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp:1011
#10 0x0000763e9b7be743 in eprosima::fastrtps::rtps::StatefulWriter::unsent_change_added_to_history (this=0x6088ac1f0a40, change=0x6088ac1f0270, max_blocking_time=...)
at /FastDDS/src/cpp/rtps/writer/StatefulWriter.cpp:447
#11 0x0000763e9b801d0e in eprosima::fastrtps::rtps::WriterHistory::notify_writer (this=0x6088ac1ecd90, a_change=0x6088ac1f0270, max_blocking_time=...)
at /FastDDS/src/cpp/rtps/history/WriterHistory.cpp:123
#12 0x0000763e9b975283 in eprosima::fastrtps::rtps::WriterHistory::add_change_with_commit_hook<eprosima::fastdds::dds::DataWriterImpl::perform_create_new_change(eprosima::fastrtps::rtps::ChangeKind_t, void*, eprosima::fastrtps::rtps::WriteParams&, const InstanceHandle_t&)::<lambda(eprosima::fastdds::dds::DataWriterImpl::CacheChange_t&)> >(eprosima::fastrtps::rtps::CacheChange_t *, eprosima::fastrtps::rtps::WriteParams &, struct {...}, std::chrono::time_point<std::chrono::_V2::steady_clock, std::chrono::duration<long, std::ratio<1, 1000000000> > >) (this=0x6088ac1ecd90,
a_change=0x6088ac1f0270, wparams=..., pre_commit=..., max_blocking_time=...) at /FastDDS/include/fastdds/rtps/history/WriterHistory.h:208
#13 0x0000763e9b974ae0 in eprosima::fastdds::dds::DataWriterHistory::add_pub_change_with_commit_hook<eprosima::fastdds::dds::DataWriterImpl::perform_create_new_change(eprosima::fastrtps::rtps::ChangeKind_t, void*, eprosima::fastrtps::rtps::WriteParams&, const InstanceHandle_t&)::<lambda(eprosima::fastdds::dds::DataWriterImpl::CacheChange_t&)> >(eprosima::fastrtps::rtps::CacheChange_t *, eprosima::fastrtps::rtps::WriteParams &, struct {...}, std::unique_lock<std::recursive_timed_mutex> &, const std::chrono::time_point<std::chrono::_V2::steady_clock, std::chrono::duration<long, std::ratio<1, 1000000000> > > &) (this=0x6088ac1ecd90, change=0x6088ac1f0270, wparams=..., pre_commit=..., lock=..., max_blocking_time=...) at /FastDDS/src/cpp/fastdds/publisher/DataWriterHistory.hpp:141
#14 0x0000763e9b96da02 in eprosima::fastdds::dds::DataWriterImpl::perform_create_new_change (this=0x6088ac1ec8a0, change_kind=eprosima::fastrtps::rtps::ALIVE, data=0x7ffeaef0a0a8, wparams=...,
handle=...) at /FastDDS/src/cpp/fastdds/publisher/DataWriterImpl.cpp:993
#15 0x0000763e9b96e002 in eprosima::fastdds::dds::DataWriterImpl::create_new_change_with_params (this=0x6088ac1ec8a0, changeKind=eprosima::fastrtps::rtps::ALIVE, data=0x7ffeaef0a0a8, wparams=...)
at /FastDDS/src/cpp/fastdds/publisher/DataWriterImpl.cpp:1067
#16 0x0000763e9b96d28d in eprosima::fastdds::dds::DataWriterImpl::create_new_change (this=0x6088ac1ec8a0, changeKind=eprosima::fastrtps::rtps::ALIVE, data=0x7ffeaef0a0a8)
at /FastDDS/src/cpp/fastdds/publisher/DataWriterImpl.cpp:914
#17 0x0000763e9b96b78a in eprosima::fastdds::dds::DataWriterImpl::write (this=0x6088ac1ec8a0, data=0x7ffeaef0a0a8) at /FastDDS/src/cpp/fastdds/publisher/DataWriterImpl.cpp:594
#18 0x0000763e9b964d63 in eprosima::fastdds::dds::DataWriter::write (this=0x6088ac1ec840, data=0x7ffeaef0a0a8) at /FastDDS/src/cpp/fastdds/publisher/DataWriter.cpp:84
#19 0x00006088ab856185 in learning_dds::DDSPublisher::SimpleInfo_value (this=0x7ffeaef09b10) at /FastDDS/examples/cpp/dds/HelloFastDDS/DDSPublisher.cpp:302
#20 0x00006088ab855f7a in learning_dds::DDSPublisher::publish (this=0x7ffeaef09b10, waitForListener=false) at /FastDDS/examples/cpp/dds/HelloFastDDS/DDSPublisher.cpp:275
#21 0x00006088ab855e2f in learning_dds::DDSPublisher::runThread (this=0x7ffeaef09b10, sleep=1000) at /FastDDS/examples/cpp/dds/HelloFastDDS/DDSPublisher.cpp:250
#22 0x00006088ab8711ac in std::__invoke_impl<void, void (learning_dds::DDSPublisher::*)(unsigned int), learning_dds::DDSPublisher*, unsigned int> (
__f=@0x6088ac2539e8: (void (learning_dds::DDSPublisher::*)(learning_dds::DDSPublisher * const, unsigned int)) 0x6088ab855dfc <learning_dds::DDSPublisher::runThread(unsigned int)>,
__t=@0x6088ac2539e0: 0x7ffeaef09b10) at /usr/include/c++/11/bits/invoke.h:74
#23 0x00006088ab8710c6 in std::__invoke<void (learning_dds::DDSPublisher::*)(unsigned int), learning_dds::DDSPublisher*, unsigned int> (
__fn=@0x6088ac2539e8: (void (learning_dds::DDSPublisher::*)(learning_dds::DDSPublisher * const, unsigned int)) 0x6088ab855dfc <learning_dds::DDSPublisher::runThread(unsigned int)>)
at /usr/include/c++/11/bits/invoke.h:96
#24 0x00006088ab870fe5 in std::thread::_Invoker<std::tuple<void (learning_dds::DDSPublisher::*)(unsigned int), learning_dds::DDSPublisher*, unsigned int> >::_M_invoke<0ul, 1ul, 2ul> (
this=0x6088ac2539d8) at /usr/include/c++/11/bits/std_thread.h:259
#25 0x00006088ab870f7e in std::thread::_Invoker<std::tuple<void (learning_dds::DDSPublisher::*)(unsigned int), learning_dds::DDSPublisher*, unsigned int> >::operator() (this=0x6088ac2539d8)
at /usr/include/c++/11/bits/std_thread.h:266
#26 0x00006088ab870f5e in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (learning_dds::DDSPublisher::*)(unsigned int), learning_dds::DDSPublisher*, unsigned int> > >::_M_run (
this=0x6088ac2539d0) at /usr/include/c++/11/bits/std_thread.h:211
#27 0x0000763e9aab0253 in ?? () from /lib/x86_64-linux-gnu/libstdc++.so.6
--Type <RET> for more, q to quit, c to continue without paging--
#28 0x0000763e9a83fac3 in start_thread (arg=<optimized out>) at ./nptl/pthread_create.c:442
#29 0x0000763e9a8d0a04 in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:100