当前位置: 首页 > article >正文

FastDDS服务发现之PDP的收发

目录

  • PDP发送
  • PDP接收
  • EDP发送
  • EDP接收

通过FastDDS服务发现之PDP和EDP的创建这一节内容,可以了解服务发现的概念,机制和PDP/EDP中各类对象的创建,本文详细介绍Simple PDP发送数据,接收数据和处理报文的流程。

PDP发送

通过在RTPSParticipantImpl::enable中调用BuiltinProtocols::enable函数,开始服务发现的PDP的报文发送

void BuiltinProtocols::enable()
{
    if (nullptr != mp_PDP)
    {
        mp_PDP->enable();
        mp_PDP->announceParticipantState(true);
        mp_PDP->resetParticipantAnnouncement();
    }
}

mp_PDP->enable()中主要实现分为三部分:

  1. 创建一个定时器,用于周期性发送PDP报文
bool PDP::enable()
{
    ...
    resend_participant_info_event_ = new TimedEvent(mp_RTPSParticipant->getEventResource(),
                    [&]() -> bool
                    {
                        announceParticipantState(false);
                        set_next_announcement_interval();
                        return true;
                    },
                    0);

    set_initial_announcement_interval();
    ...
}

通过函数set_initial_announcement_interval()设置初始发送周期为100ms,按这个周期连续发送initial_announcements_.count个报文后,周期重新设定为m_discovery.discovery_config.leaseDuration_announcementperiod,默认是3s
《TODO:抓包示例》
2. 发现自己

bool PDP::enable()
{
    ...
    // Notify "self-discovery"
    getRTPSParticipant()->on_entity_discovery(mp_RTPSParticipant->getGuid(),
            get_participant_proxy_data(mp_RTPSParticipant->getGuid().guidPrefix)->m_properties);
    ...
}

这里调用

bool StatisticsParticipantImpl::are_statistics_writers_enabled(
        uint32_t checked_enabled_writers)
{
    return (enabled_writers_mask_ & checked_enabled_writers);
}

默认enabled_writers_mask_ 为0,需要实际并没有做什么操作(TODO)
3. 分配组播和单播地址:mp_PDP->enable()中继续调用builtin_endpoints_->enable_pdp_readers函数,这个函数最终调到RTPSParticipantImpl::assignEndpointListenResources

bool RTPSParticipantImpl::assignEndpointListenResources(
        Endpoint* endp)
{
    //Tag the endpoint with the ReceiverResources
    bool valid = true;
    //UNICAST
    assignEndpoint2LocatorList(endp, endp->getAttributes().unicastLocatorList);
    //MULTICAST
    assignEndpoint2LocatorList(endp, endp->getAttributes().multicastLocatorList);
    return valid;
}

默认单播地址为“本地ip:7410”,组播地址为“239.255.0.1:7400”assignEndpoint2LocatorList中继续调用 MessageReceiver::associateEndpoint函数,会将PDP对象的EntityID添加到接收消息的readers中:

oid MessageReceiver::associateEndpoint(
        Endpoint* to_add)
{
    ...
    const auto reader = dynamic_cast<RTPSReader*>(to_add);
    const auto entityId = reader->getGuid().entityId;
    // search for set of readers by entity ID
    const auto readers = associated_readers_.find(entityId);
    if (readers == associated_readers_.end())
    {
        auto vec = std::vector<RTPSReader*>();
        vec.push_back(reader);
        associated_readers_.emplace(entityId, vec);
    }
    ...
}

{entityid: PDPreader} 添加到associated_readers_用于接收收到其他participant的服务发现消息。

mp_PDP->announceParticipantState(true):完成在mp_PDP->enable()的三部分后,开始调用PDPSimple::announceParticipantState发送第一个PDP数据包,具体报文的组装发送在PDP::announceParticipantState中实现。
随后调用mp_PDP->resetParticipantAnnouncement()开启定时器,开始周期性发送。

PDP接收

PDPListener用于监听和接收PDP报文,接收流程同其他DataReader的接收流程,具体可以参考 FastDDS之UDP通信,UDP层收到PDP消息后调用PDPListener::onNewCacheChangeAdded开始处理PDP报文

void PDPListener::onNewCacheChangeAdded(
        RTPSReader* reader,
        const CacheChange_t* const change_in)
{
    CacheChange_t* change = const_cast<CacheChange_t*>(change_in);
    GUID_t writer_guid = change->writerGUID;
    EPROSIMA_LOG_INFO(RTPS_PDP, "SPDP Message received from: " << writer_guid);

    // Make sure we have an instance handle (i.e GUID)
    if (change->instanceHandle == c_InstanceHandle_Unknown)
    {
        if (!this->get_key(change))
        {
            EPROSIMA_LOG_WARNING(RTPS_PDP, "Problem getting the key of the change, removing");
            parent_pdp_->builtin_endpoints_->remove_from_pdp_reader_history(change);
            return;
        }
    }

    // Take GUID from instance handle
    GUID_t guid;
    iHandle2GUID(guid, change->instanceHandle);

    if (change->kind == ALIVE)
    {
        // Ignore announcement from own RTPSParticipant
        if (guid == parent_pdp_->getRTPSParticipant()->getGuid())
        {
            EPROSIMA_LOG_INFO(RTPS_PDP, "Message from own RTPSParticipant, removing");
            parent_pdp_->builtin_endpoints_->remove_from_pdp_reader_history(change);
            return;
        }

        // Release reader lock to avoid ABBA lock. PDP mutex should always be first.
        // Keep change information on local variables to check consistency later
        SequenceNumber_t seq_num = change->sequenceNumber;
        reader->getMutex().unlock();
        std::unique_lock<std::recursive_mutex> lock(*parent_pdp_->getMutex());
        reader->getMutex().lock();

        // If change is not consistent, it will be processed on the thread that has overriten it
        if ((ALIVE != change->kind) || (seq_num != change->sequenceNumber) || (writer_guid != change->writerGUID))
        {
            return;
        }

        // Access to temp_participant_data_ is protected by reader lock

        // Load information on temp_participant_data_
        CDRMessage_t msg(change->serializedPayload);
        temp_participant_data_.clear();
        if (temp_participant_data_.readFromCDRMessage(&msg, true, parent_pdp_->getRTPSParticipant()->network_factory(),
                parent_pdp_->getRTPSParticipant()->has_shm_transport(), true, change_in->vendor_id))
        {
            // After correctly reading it
            change->instanceHandle = temp_participant_data_.m_key;
            guid = temp_participant_data_.m_guid;

            if (parent_pdp_->getRTPSParticipant()->is_participant_ignored(guid.guidPrefix))
            {
                return;
            }

            // Filter locators
            const auto& pattr = parent_pdp_->getRTPSParticipant()->getAttributes();
            fastdds::rtps::network::external_locators::filter_remote_locators(temp_participant_data_,
                    pattr.builtin.metatraffic_external_unicast_locators, pattr.default_external_unicast_locators,
                    pattr.ignore_non_matching_locators);

            // Check if participant already exists (updated info)
            ParticipantProxyData* pdata = nullptr;
            bool already_processed = false;
            for (ParticipantProxyData* it : parent_pdp_->participant_proxies_)
            {
                if (guid == it->m_guid)
                {
                    pdata = it;

                    // This means this is the same DATA(p) that we have already processed.
                    // We do not compare sample_identity directly because it is not properly filled
                    // in the change during desearialization.
                    if (it->m_sample_identity.writer_guid() == change->writerGUID &&
                            it->m_sample_identity.sequence_number() == change->sequenceNumber)
                    {
                        already_processed = true;
                    }

                    break;
                }
            }

            // Only process the DATA(p) if it is not a repeated one
            if (!already_processed)
            {
                temp_participant_data_.m_sample_identity.writer_guid(change->writerGUID);
                temp_participant_data_.m_sample_identity.sequence_number(change->sequenceNumber);
                process_alive_data(pdata, temp_participant_data_, writer_guid, reader, lock);
            }
        }
    }
    ...

    //Remove change form history.
    parent_pdp_->builtin_endpoints_->remove_from_pdp_reader_history(change);
}

主要分三种情况:

  1. 如果发现了自己(通过对比guid: guid == parent_pdp_->getRTPSParticipant()->getGuid())不做任何处理直接返回,因为自身的ParticipantProxyData已经添加到participant_proxies_中了,细节请参考FastDDS服务发现之PDP和EDP的创建
  2. 如果是第一次发现某一个其他Participant,PDP的数据会保存到ParticipantProxyData的对象中,调用PDPListener::process_alive_data进行下一步处理
  3. 如果这个Participant已经被发现和接收处理了(通过already_processed判断,already_processed是由guidSampleIdentitywriter_guidsequence_number一致才可以置为true, TODO:为什么这么判断),也不做任何处理退出
  4. 上述三种情况完成处理后都会调用remove_from_pdp_reader_history从datareader的history中删除CacheChange。
void PDPListener::process_alive_data(
        ParticipantProxyData* old_data,
        ParticipantProxyData& new_data,
        GUID_t& writer_guid,
        RTPSReader* reader,
        std::unique_lock<std::recursive_mutex>& lock)
{
    GUID_t participant_guid = new_data.m_guid;

    if (old_data == nullptr)
    {
        // Create a new one when not found
        old_data = parent_pdp_->createParticipantProxyData(new_data, writer_guid);

        if (old_data != nullptr)
        {
            // Copy proxy to be passed forward before releasing PDP mutex
            ParticipantProxyData old_data_copy(*old_data);

            reader->getMutex().unlock();
            lock.unlock();
            parent_pdp_->assignRemoteEndpoints(&old_data_copy);
        }
        else
        {
            reader->getMutex().unlock();
            lock.unlock();
        }
    }
    else
    {
        old_data->updateData(new_data);
        old_data->isAlive = true;

        reader->getMutex().unlock();

        EPROSIMA_LOG_INFO(RTPS_PDP_DISCOVERY, "Update participant "
                << old_data->m_guid << " at "
                << "MTTLoc: " << old_data->metatraffic_locators
                << " DefLoc:" << old_data->default_locators);

        if (parent_pdp_->updateInfoMatchesEDP())
        {
            parent_pdp_->mp_EDP->assignRemoteEndpoints(*old_data, true);
        }

        // Copy proxy to be passed forward before releasing PDP mutex
        ParticipantProxyData old_data_copy(*old_data);

        lock.unlock();

        RTPSParticipantListener* listener = parent_pdp_->getRTPSParticipant()->getListener();
        if (listener != nullptr)
        {
            bool should_be_ignored = false;

            {
                std::lock_guard<std::mutex> cb_lock(parent_pdp_->callback_mtx_);
                ParticipantDiscoveryInfo info(old_data_copy);
                info.status = ParticipantDiscoveryInfo::CHANGED_QOS_PARTICIPANT;

                listener->onParticipantDiscovery(
                    parent_pdp_->getRTPSParticipant()->getUserRTPSParticipant(),
                    std::move(info),
                    should_be_ignored);
            }
            if (should_be_ignored)
            {
                parent_pdp_->getRTPSParticipant()->ignore_participant(participant_guid.guidPrefix);
            }
        }
    }

#ifdef FASTDDS_STATISTICS
    //! Addition or update of a participant proxy should trigger
    //! a connections update on the local participant connection list
    if (nullptr != parent_pdp_->getRTPSParticipant()->get_connections_observer())
    {
        parent_pdp_->getRTPSParticipant()->get_connections_observer()->on_local_entity_connections_change(
            parent_pdp_->getRTPSParticipant()->getGuid());
    }
#endif //FASTDDS_STATISTICS

    // Take again the reader lock
    reader->getMutex().lock();
}

PDPListener::process_alive_data中的处理分为parent_pdp_->participant_proxies_中没有ParticipantProxyData和接收到PDP报文相同的guid(对应if(old_data == nullptr)分支)和在parent_pdp_->participant_proxies_中找到ParticipantProxyData和接收到PDP报文相同的guid两种情况。
第一种情况:通过调用parent_pdp_->createParticipantProxyData创建一个ParticipantProxyData对象,再调用PDPSimple::assignRemoteEndpointsPDPSimple::assignRemoteEndpoints中调用PDPSimple::match_pdp_remote_endpointsPDPSimple::assign_low_level_remote_endpoints

void PDPSimple::match_pdp_remote_endpoints(
        const ParticipantProxyData& pdata,
        bool notify_secure_endpoints)
{
#if !HAVE_SECURITY
    static_cast<void>(notify_secure_endpoints);
#endif // !HAVE_SECURITY

    auto endpoints = static_cast<fastdds::rtps::SimplePDPEndpoints*>(builtin_endpoints_.get());

    const NetworkFactory& network = mp_RTPSParticipant->network_factory();
    bool use_multicast_locators = !mp_RTPSParticipant->getAttributes().builtin.avoid_builtin_multicast ||
            pdata.metatraffic_locators.unicast.empty();
    const uint32_t endp = pdata.m_availableBuiltinEndpoints;

    // Default to values for non-secure endpoints
    auto reliability_kind = BEST_EFFORT_RELIABILITY_QOS;
    uint32_t pdp_reader_mask = DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR;
    uint32_t pdp_writer_mask = DISC_BUILTIN_ENDPOINT_PARTICIPANT_ANNOUNCER;
    EntityId_t reader_entity_id = c_EntityId_SPDPReader;
    EntityId_t writer_entity_id = c_EntityId_SPDPWriter;
    RTPSReader* reader = endpoints->reader.reader_;
    RTPSWriter* writer = endpoints->writer.writer_;

#if HAVE_SECURITY
    // If the other participant has been authenticated, use values for secure endpoints
    if (notify_secure_endpoints)
    {
        auto secure_endpoints = static_cast<fastdds::rtps::SimplePDPEndpointsSecure*>(builtin_endpoints_.get());
        reliability_kind = RELIABLE_RELIABILITY_QOS;
        pdp_reader_mask = DISC_BUILTIN_ENDPOINT_PARTICIPANT_SECURE_DETECTOR;
        pdp_writer_mask = DISC_BUILTIN_ENDPOINT_PARTICIPANT_SECURE_ANNOUNCER;
        reader_entity_id = c_EntityId_spdp_reliable_participant_secure_reader;
        writer_entity_id = c_EntityId_spdp_reliable_participant_secure_writer;
        reader = secure_endpoints->secure_reader.reader_;
        writer = secure_endpoints->secure_writer.writer_;
    }
#endif // HAVE_SECURITY

    if (0 != (endp & pdp_writer_mask))
    {
        auto temp_writer_data = get_temporary_writer_proxies_pool().get();

        temp_writer_data->clear();
        temp_writer_data->guid().guidPrefix = pdata.m_guid.guidPrefix;
        temp_writer_data->guid().entityId = writer_entity_id;
        temp_writer_data->persistence_guid(pdata.get_persistence_guid());
        temp_writer_data->set_persistence_entity_id(writer_entity_id);
        temp_writer_data->set_remote_locators(pdata.metatraffic_locators, network, use_multicast_locators);
        temp_writer_data->m_qos.m_reliability.kind = reliability_kind;
        temp_writer_data->m_qos.m_durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS;
#if HAVE_SECURITY
        if (notify_secure_endpoints)
        {
            if (!mp_RTPSParticipant->security_manager().discovered_builtin_writer(
                        reader->getGuid(), pdata.m_guid, *temp_writer_data,
                        reader->getAttributes().security_attributes()))
            {
                EPROSIMA_LOG_ERROR(RTPS_EDP, "Security manager returns an error for writer " <<
                        temp_writer_data->guid());
            }
        }
        else
#endif // HAVE_SECURITY
        {
            reader->matched_writer_add(*temp_writer_data);
        }
    }

    if (0 != (endp & pdp_reader_mask))
    {
        auto temp_reader_data = get_temporary_reader_proxies_pool().get();

        temp_reader_data->clear();
        temp_reader_data->m_expectsInlineQos = false;
        temp_reader_data->guid().guidPrefix = pdata.m_guid.guidPrefix;
        temp_reader_data->guid().entityId = reader_entity_id;
        temp_reader_data->set_remote_locators(pdata.metatraffic_locators, network, use_multicast_locators);
        temp_reader_data->m_qos.m_reliability.kind = reliability_kind;
        temp_reader_data->m_qos.m_durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS;
#if HAVE_SECURITY
        if (notify_secure_endpoints)
        {
            if (!mp_RTPSParticipant->security_manager().discovered_builtin_reader(
                        writer->getGuid(), pdata.m_guid, *temp_reader_data,
                        writer->getAttributes().security_attributes()))
            {
                EPROSIMA_LOG_ERROR(RTPS_EDP, "Security manager returns an error for reader " <<
                        temp_reader_data->guid());
            }
        }
        else
#endif // HAVE_SECURITY
        {
            writer->matched_reader_add(*temp_reader_data);
        }

        if (BEST_EFFORT_RELIABILITY_QOS == reliability_kind)
        {
            endpoints->writer.writer_->unsent_changes_reset();
        }
    }
}

这段代码主要设置了ProxyPool<WriterProxyData>ProxyPool<ReaderProxyData>这两个对象,其中guidprefixpersistence_guidmetatraffic_locators都来自PDP报文中的值,其他都是固定赋值。然后通过调用StatelessReader::matched_writer_add/StatelessWriter::matched_reader_add更新当前PDP的reader/writer。

void PDPSimple::assignRemoteEndpoints(
        ParticipantProxyData* pdata)
{
    EPROSIMA_LOG_INFO(RTPS_PDP, "For RTPSParticipant: " << pdata->m_guid.guidPrefix);

    auto endpoints = static_cast<fastdds::rtps::SimplePDPEndpoints*>(builtin_endpoints_.get());

    const NetworkFactory& network = mp_RTPSParticipant->network_factory();
    uint32_t endp = pdata->m_availableBuiltinEndpoints;
    uint32_t auxendp = endp;
    bool use_multicast_locators = !mp_RTPSParticipant->getAttributes().builtin.avoid_builtin_multicast ||
            pdata->metatraffic_locators.unicast.empty();
    auxendp &= DISC_BUILTIN_ENDPOINT_PARTICIPANT_ANNOUNCER;
    if (auxendp != 0)
    {
        auto temp_writer_data = get_temporary_writer_proxies_pool().get();

        temp_writer_data->clear();
        temp_writer_data->guid().guidPrefix = pdata->m_guid.guidPrefix;
        temp_writer_data->guid().entityId = c_EntityId_SPDPWriter;
        temp_writer_data->persistence_guid(pdata->get_persistence_guid());
        temp_writer_data->set_persistence_entity_id(c_EntityId_SPDPWriter);
        temp_writer_data->set_remote_locators(pdata->metatraffic_locators, network, use_multicast_locators);
        temp_writer_data->m_qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS;
        temp_writer_data->m_qos.m_durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS;
        endpoints->reader.reader_->matched_writer_add(*temp_writer_data);
    }
    auxendp = endp;
    auxendp &= DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR;
    if (auxendp != 0)
    {
        auto temp_reader_data = get_temporary_reader_proxies_pool().get();

        temp_reader_data->clear();
        temp_reader_data->m_expectsInlineQos = false;
        temp_reader_data->guid().guidPrefix = pdata->m_guid.guidPrefix;
        temp_reader_data->guid().entityId = c_EntityId_SPDPReader;
        temp_reader_data->set_remote_locators(pdata->metatraffic_locators, network, use_multicast_locators);
        temp_reader_data->m_qos.m_reliability.kind = BEST_EFFORT_RELIABILITY_QOS;
        temp_reader_data->m_qos.m_durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS;
        endpoints->writer.writer_->matched_reader_add(*temp_reader_data);

        StatelessWriter* pW = endpoints->writer.writer_;

        if (pW != nullptr)
        {
            pW->unsent_changes_reset();
        }
        else
        {
            EPROSIMA_LOG_ERROR(RTPS_PDP, "Using PDPSimple protocol with a reliable writer");
        }
    }

#if HAVE_SECURITY
    // Validate remote participant
    mp_RTPSParticipant->security_manager().discovered_participant(*pdata);
#else
    //Inform EDP of new RTPSParticipant data:
    notifyAboveRemoteEndpoints(*pdata, true);
#endif // if HAVE_SECURITY
}

首先分析StatelessWriter::matched_reader_add

bool StatelessReader::matched_writer_add(
        const WriterProxyData& wdata)
{
    ReaderListener* listener = nullptr;

    {
        std::unique_lock<RecursiveTimedMutex> guard(mp_mutex);
        listener = mp_listener;

        for (RemoteWriterInfo_t& writer : matched_writers_)
        {
            if (writer.guid == wdata.guid())
            {
                EPROSIMA_LOG_INFO(RTPS_READER, "Attempting to add existing writer, updating information");

                if (EXCLUSIVE_OWNERSHIP_QOS == m_att.ownershipKind &&
                        writer.ownership_strength != wdata.m_qos.m_ownershipStrength.value)
                {
                    mp_history->writer_update_its_ownership_strength_nts(
                        writer.guid, wdata.m_qos.m_ownershipStrength.value);
                }
                writer.ownership_strength = wdata.m_qos.m_ownershipStrength.value;

                if (nullptr != listener)
                {
                    // call the listener without the lock taken
                    guard.unlock();
                    listener->on_writer_discovery(this, WriterDiscoveryInfo::CHANGED_QOS_WRITER, wdata.guid(),
                            &wdata);
                }

#ifdef FASTDDS_STATISTICS
                // notify monitor service so that the connectionlist for this entity
                // could be updated
                if (nullptr != mp_RTPSParticipant->get_connections_observer() && !m_guid.is_builtin())
                {
                    mp_RTPSParticipant->get_connections_observer()->on_local_entity_connections_change(m_guid);
                }
#endif //FASTDDS_STATISTICS

                return false;
            }
        }

        bool is_same_process = RTPSDomainImpl::should_intraprocess_between(m_guid, wdata.guid());
        bool is_datasharing = is_datasharing_compatible_with(wdata);

        RemoteWriterInfo_t info;
        info.guid = wdata.guid();
        info.persistence_guid = wdata.persistence_guid();
        info.has_manual_topic_liveliness = (MANUAL_BY_TOPIC_LIVELINESS_QOS == wdata.m_qos.m_liveliness.kind);
        info.is_datasharing = is_datasharing;
        info.ownership_strength = wdata.m_qos.m_ownershipStrength.value;

        if (is_datasharing)
        {
            if (datasharing_listener_->add_datasharing_writer(wdata.guid(),
                    m_att.durabilityKind == VOLATILE,
                    mp_history->m_att.maximumReservedCaches))
            {
                EPROSIMA_LOG_INFO(RTPS_READER, "Writer Proxy " << wdata.guid() << " added to " << this->m_guid.entityId
                                                               << " with data sharing");
            }
            else
            {
                EPROSIMA_LOG_ERROR(RTPS_READER, "Failed to add Writer Proxy " << wdata.guid()
                                                                              << " to " << this->m_guid.entityId
                                                                              << " with data sharing.");
                return false;
            }

        }

        if (matched_writers_.emplace_back(info) == nullptr)
        {
            EPROSIMA_LOG_WARNING(RTPS_READER, "No space to add writer " << wdata.guid() << " to reader " << m_guid);
            if (is_datasharing)
            {
                datasharing_listener_->remove_datasharing_writer(wdata.guid());
            }
            return false;
        }
        EPROSIMA_LOG_INFO(RTPS_READER, "Writer " << wdata.guid() << " added to reader " << m_guid);

        add_persistence_guid(info.guid, info.persistence_guid);

        m_acceptMessagesFromUnkownWriters = false;

        // Intraprocess manages durability itself
        if (is_datasharing && !is_same_process && m_att.durabilityKind != VOLATILE)
        {
            // simulate a notification to force reading of transient changes
            // this has to be done after the writer is added to the matched_writers or the processing may fail
            datasharing_listener_->notify(false);
        }
    }

    if (liveliness_lease_duration_ < c_TimeInfinite)
    {
        auto wlp = mp_RTPSParticipant->wlp();
        if ( wlp != nullptr)
        {
            wlp->sub_liveliness_manager_->add_writer(
                wdata.guid(),
                liveliness_kind_,
                liveliness_lease_duration_);
        }
        else
        {
            EPROSIMA_LOG_ERROR(RTPS_LIVELINESS, "Finite liveliness lease duration but WLP not enabled");
        }
    }

    if (nullptr != listener)
    {
        listener->on_writer_discovery(this, WriterDiscoveryInfo::DISCOVERED_WRITER, wdata.guid(), &wdata);
    }

#ifdef FASTDDS_STATISTICS
    // notify monitor service so that the connectionlist for this entity
    // could be updated
    if (nullptr != mp_RTPSParticipant->get_connections_observer() && !m_guid.is_builtin())
    {
        mp_RTPSParticipant->get_connections_observer()->on_local_entity_connections_change(m_guid);
    }
#endif //FASTDDS_STATISTICS

    return true;
}

继续分析StatelessWriter::matched_reader_add

bool StatelessWriter::matched_reader_add(
        const ReaderProxyData& data)
{
    using fastdds::rtps::network::external_locators::filter_remote_locators;

    std::unique_lock<RecursiveTimedMutex> guard(mp_mutex);
    std::unique_lock<LocatorSelectorSender> locator_selector_guard(locator_selector_);

    assert(data.guid() != c_Guid_Unknown);

    if (for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_,
            [this, &data](ReaderLocator& reader)
            {
                if (reader.remote_guid() == data.guid())
                {
                    EPROSIMA_LOG_WARNING(RTPS_WRITER, "Attempting to add existing reader, updating information.");
                    if (reader.update(data.remote_locators().unicast,
                    data.remote_locators().multicast,
                    data.m_expectsInlineQos))
                    {
                        filter_remote_locators(*reader.general_locator_selector_entry(),
                        m_att.external_unicast_locators, m_att.ignore_non_matching_locators);
                        update_reader_info(true);
                    }
                    return true;
                }
                return false;
            }
            ))
    {
        if (nullptr != mp_listener)
        {
            // call the listener without locks taken
            locator_selector_guard.unlock();
            guard.unlock();
            mp_listener->on_reader_discovery(this, ReaderDiscoveryInfo::CHANGED_QOS_READER, data.guid(), &data);
        }

#ifdef FASTDDS_STATISTICS
        // notify monitor service so that the connectionlist for this entity
        // could be updated
        if (nullptr != mp_RTPSParticipant->get_connections_observer() && !m_guid.is_builtin())
        {
            mp_RTPSParticipant->get_connections_observer()->on_local_entity_connections_change(m_guid);
        }
#endif //FASTDDS_STATISTICS

        return false;
    }

    // Get a locator from the inactive pool (or create a new one if necessary and allowed)
    std::unique_ptr<ReaderLocator> new_reader;
    if (matched_readers_pool_.empty())
    {
        size_t max_readers = matched_readers_pool_.max_size();
        if (getMatchedReadersSize() + matched_readers_pool_.size() < max_readers)
        {
            const RemoteLocatorsAllocationAttributes& loc_alloc =
                    mp_RTPSParticipant->getRTPSParticipantAttributes().allocation.locators;

            new_reader.reset(new ReaderLocator(
                        this,
                        loc_alloc.max_unicast_locators,
                        loc_alloc.max_multicast_locators));
        }
        else
        {
            EPROSIMA_LOG_WARNING(RTPS_WRITER, "Couldn't add matched reader due to resource limits");
            return false;
        }
    }
    else
    {
        new_reader = std::move(matched_readers_pool_.back());
        matched_readers_pool_.pop_back();
    }

    // Add info of new datareader.
    new_reader->start(data.guid(),
            data.remote_locators().unicast,
            data.remote_locators().multicast,
            data.m_expectsInlineQos,
            is_datasharing_compatible_with(data));
    filter_remote_locators(*new_reader->general_locator_selector_entry(),
            m_att.external_unicast_locators, m_att.ignore_non_matching_locators);

    locator_selector_.locator_selector.add_entry(new_reader->general_locator_selector_entry());

    if (new_reader->is_local_reader())
    {
        matched_local_readers_.push_back(std::move(new_reader));
        EPROSIMA_LOG_INFO(RTPS_WRITER, "Adding reader " << data.guid() << " to " << this->m_guid.entityId
                                                        << " as local reader");
    }
    else if (new_reader->is_datasharing_reader())
    {
        matched_datasharing_readers_.push_back(std::move(new_reader));
        EPROSIMA_LOG_INFO(RTPS_WRITER, "Adding reader " << data.guid() << " to " << this->m_guid.entityId
                                                        << " as data sharing");
    }
    else
    {
        matched_remote_readers_.push_back(std::move(new_reader));
        EPROSIMA_LOG_INFO(RTPS_WRITER, "Adding reader " << data.guid() << " to " << this->m_guid.entityId
                                                        << " as remote reader");
    }

    update_reader_info(true);

    if (nullptr != mp_listener)
    {
        // call the listener without locks taken
        locator_selector_guard.unlock();
        guard.unlock();
        mp_listener->on_reader_discovery(this, ReaderDiscoveryInfo::DISCOVERED_READER, data.guid(), &data);
    }

#ifdef FASTDDS_STATISTICS
    // notify monitor service so that the connectionlist for this entity
    // could be updated
    if (nullptr != mp_RTPSParticipant->get_connections_observer() && !m_guid.is_builtin())
    {
        mp_RTPSParticipant->get_connections_observer()->on_local_entity_connections_change(m_guid);
    }
#endif //FASTDDS_STATISTICS

    return true;
}

PDPSimple::assign_low_level_remote_endpoints中调用EDPSimple::assignRemoteEndpoints实现对EDP对象的更新:

void EDPSimple::assignRemoteEndpoints(
        const ParticipantProxyData& pdata,
        bool assign_secure_endpoints)
{
    EPROSIMA_LOG_INFO(RTPS_EDP, "New DPD received, adding remote endpoints to our SimpleEDP endpoints");
    const NetworkFactory& network = mp_RTPSParticipant->network_factory();
    uint32_t endp = pdata.m_availableBuiltinEndpoints;
    uint32_t auxendp;
    bool use_multicast_locators = !mp_PDP->getRTPSParticipant()->getAttributes().builtin.avoid_builtin_multicast ||
            pdata.metatraffic_locators.unicast.empty();

    auto temp_reader_proxy_data = get_temporary_reader_proxies_pool().get();

    temp_reader_proxy_data->clear();
    temp_reader_proxy_data->m_expectsInlineQos = false;
    temp_reader_proxy_data->guid().guidPrefix = pdata.m_guid.guidPrefix;
    temp_reader_proxy_data->set_remote_locators(pdata.metatraffic_locators, network, use_multicast_locators);
    temp_reader_proxy_data->m_qos.m_durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS;
    temp_reader_proxy_data->m_qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS;

    auto temp_writer_proxy_data = get_temporary_writer_proxies_pool().get();

    temp_writer_proxy_data->clear();
    temp_writer_proxy_data->guid().guidPrefix = pdata.m_guid.guidPrefix;
    temp_writer_proxy_data->persistence_guid(pdata.get_persistence_guid());
    temp_writer_proxy_data->set_remote_locators(pdata.metatraffic_locators, network, use_multicast_locators);
    temp_writer_proxy_data->m_qos.m_durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS;
    temp_writer_proxy_data->m_qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS;

    auxendp = endp;
    auxendp &= DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER;
    if (auxendp != 0 && publications_reader_.first != nullptr) //Exist Pub Writer and i have pub reader
    {
        EPROSIMA_LOG_INFO(RTPS_EDP, "Adding SEDP Pub Writer to my Pub Reader");
        temp_writer_proxy_data->guid().entityId = c_EntityId_SEDPPubWriter;
        temp_writer_proxy_data->set_persistence_entity_id(c_EntityId_SEDPPubWriter);
        publications_reader_.first->matched_writer_add(*temp_writer_proxy_data);
    }
    auxendp = endp;
    auxendp &= DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR;
    if (auxendp != 0 && publications_writer_.first != nullptr) //Exist Pub Detector
    {
        EPROSIMA_LOG_INFO(RTPS_EDP, "Adding SEDP Pub Reader to my Pub Writer");
        temp_reader_proxy_data->guid().entityId = c_EntityId_SEDPPubReader;
        publications_writer_.first->matched_reader_add(*temp_reader_proxy_data);
    }
    auxendp = endp;
    auxendp &= DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER;
    if (auxendp != 0 && subscriptions_reader_.first != nullptr) //Exist Pub Announcer
    {
        EPROSIMA_LOG_INFO(RTPS_EDP, "Adding SEDP Sub Writer to my Sub Reader");
        temp_writer_proxy_data->guid().entityId = c_EntityId_SEDPSubWriter;
        temp_writer_proxy_data->set_persistence_entity_id(c_EntityId_SEDPSubWriter);
        subscriptions_reader_.first->matched_writer_add(*temp_writer_proxy_data);
    }
    auxendp = endp;
    auxendp &= DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR;
    if (auxendp != 0 && subscriptions_writer_.first != nullptr) //Exist Pub Announcer
    {
        EPROSIMA_LOG_INFO(RTPS_EDP, "Adding SEDP Sub Reader to my Sub Writer");
        temp_reader_proxy_data->guid().entityId = c_EntityId_SEDPSubReader;
        subscriptions_writer_.first->matched_reader_add(*temp_reader_proxy_data);
    }

#if HAVE_SECURITY
    auxendp = endp;
    auxendp &= DISC_BUILTIN_ENDPOINT_PUBLICATION_SECURE_ANNOUNCER;
    if (auxendp != 0 && publications_secure_reader_.first != nullptr && assign_secure_endpoints)
    {
        temp_writer_proxy_data->guid().entityId = sedp_builtin_publications_secure_writer;
        temp_writer_proxy_data->set_persistence_entity_id(sedp_builtin_publications_secure_writer);

        if (!mp_RTPSParticipant->security_manager().discovered_builtin_writer(
                    publications_secure_reader_.first->getGuid(), pdata.m_guid, *temp_writer_proxy_data,
                    publications_secure_reader_.first->getAttributes().security_attributes()))
        {
            EPROSIMA_LOG_ERROR(RTPS_EDP, "Security manager returns an error for writer " <<
                    publications_secure_reader_.first->getGuid());
        }
    }

    auxendp = endp;
    auxendp &= DISC_BUILTIN_ENDPOINT_PUBLICATION_SECURE_DETECTOR;
    if (auxendp != 0 && publications_secure_writer_.first != nullptr && assign_secure_endpoints)
    {
        temp_reader_proxy_data->guid().entityId = sedp_builtin_publications_secure_reader;
        if (!mp_RTPSParticipant->security_manager().discovered_builtin_reader(
                    publications_secure_writer_.first->getGuid(), pdata.m_guid, *temp_reader_proxy_data,
                    publications_secure_writer_.first->getAttributes().security_attributes()))
        {
            EPROSIMA_LOG_ERROR(RTPS_EDP, "Security manager returns an error for writer " <<
                    publications_secure_writer_.first->getGuid());
        }
    }

    auxendp = endp;
    auxendp &= DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_SECURE_ANNOUNCER;
    if (auxendp != 0 && subscriptions_secure_reader_.first != nullptr && assign_secure_endpoints)
    {
        temp_writer_proxy_data->guid().entityId = sedp_builtin_subscriptions_secure_writer;
        temp_writer_proxy_data->set_persistence_entity_id(sedp_builtin_subscriptions_secure_writer);

        if (!mp_RTPSParticipant->security_manager().discovered_builtin_writer(
                    subscriptions_secure_reader_.first->getGuid(), pdata.m_guid, *temp_writer_proxy_data,
                    subscriptions_secure_reader_.first->getAttributes().security_attributes()))
        {
            EPROSIMA_LOG_ERROR(RTPS_EDP, "Security manager returns an error for writer " <<
                    subscriptions_secure_reader_.first->getGuid());
        }
    }

    auxendp = endp;
    auxendp &= DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_SECURE_DETECTOR;
    if (auxendp != 0 && subscriptions_secure_writer_.first != nullptr && assign_secure_endpoints)
    {
        EPROSIMA_LOG_INFO(RTPS_EDP, "Adding SEDP Sub Reader to my Sub Writer");
        temp_reader_proxy_data->guid().entityId = sedp_builtin_subscriptions_secure_reader;
        if (!mp_RTPSParticipant->security_manager().discovered_builtin_reader(
                    subscriptions_secure_writer_.first->getGuid(), pdata.m_guid, *temp_reader_proxy_data,
                    subscriptions_secure_writer_.first->getAttributes().security_attributes()))
        {
            EPROSIMA_LOG_ERROR(RTPS_EDP, "Security manager returns an error for writer " <<
                    subscriptions_secure_writer_.first->getGuid());
        }
    }
#else
    static_cast<void>(assign_secure_endpoints);
#endif // if HAVE_SECURITY

}

此外,服务发现的EntityID是标准固定,回调函数中通过固定EntityID来调用不同的监听对象。这些固定的Entity ID定义在EntityId_t.hpp中,主要有以下:

const EntityId_t c_EntityId_Unknown = ENTITYID_UNKNOWN;
const EntityId_t c_EntityId_SPDPReader = ENTITYID_SPDP_BUILTIN_RTPSParticipant_READER;
const EntityId_t c_EntityId_SPDPWriter = ENTITYID_SPDP_BUILTIN_RTPSParticipant_WRITER;

const EntityId_t c_EntityId_SEDPPubWriter = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER;
const EntityId_t c_EntityId_SEDPPubReader = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER;
const EntityId_t c_EntityId_SEDPSubWriter = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER;
const EntityId_t c_EntityId_SEDPSubReader = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER;

const EntityId_t c_EntityId_RTPSParticipant = ENTITYID_RTPSParticipant;

const EntityId_t c_EntityId_WriterLiveliness = ENTITYID_P2P_BUILTIN_RTPSParticipant_MESSAGE_WRITER;
const EntityId_t c_EntityId_ReaderLiveliness = ENTITYID_P2P_BUILTIN_RTPSParticipant_MESSAGE_READER;

const EntityId_t participant_stateless_message_writer_entity_id = ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_WRITER;
const EntityId_t participant_stateless_message_reader_entity_id = ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_READER;

const EntityId_t c_EntityId_TypeLookup_request_writer = ENTITYID_TL_SVC_REQ_WRITER;
const EntityId_t c_EntityId_TypeLookup_request_reader = ENTITYID_TL_SVC_REQ_READER;
const EntityId_t c_EntityId_TypeLookup_reply_writer = ENTITYID_TL_SVC_REPLY_WRITER;
const EntityId_t c_EntityId_TypeLookup_reply_reader = ENTITYID_TL_SVC_REPLY_READER;

EDP发送

EDP(Endpoint Discovery Protocol)通过比较参与者(Participant)的读者(Reader)和写者(Writer)的主题(Topic)和数据类型(Data Type)进行匹配。

在Fast DDS中,读者和写者是通信的端点,他们通过主题和数据类型进行交互。一个写者会发布一个特定主题的数据,而读者则会订阅这个主题的数据。

当EDP开始时,每个参与者都会公布其读者和写者的信息(包括主题和数据类型),并接收其他参与者的读者和写者的信息。然后,EDP会比较这些信息,如果一个写者的主题和数据类型与一个读者的主题和数据类型相匹配,那么这个写者和读者就会被匹配起来,它们就可以进行数据通信。

域内发现(Simple Endpoint Discovery Protocol),DomainParticipants中不同DDS实体中互相发现的协议机制,如DataWriter和DataReader。

SEDP的发现阶段是在创建DataWriter和DataReader时进行的。

  1. 发送阶段
    发送阶段是创建DataWriter时开始(不在讨论范围的代码省略):
DataWriter* PublisherImpl::create_datawriter(
        Topic* topic,
        DataWriterImpl* impl,
        const StatusMask& mask)
{
    topic->get_impl()->reference();
    DataWriter* writer = new DataWriter(impl, mask);
    impl->user_datawriter_ = writer;
    // ...
    if (user_publisher_->is_enabled() && qos_.entity_factory().autoenable_created_entities)  
    {
        if (ReturnCode_t::RETCODE_OK != writer->enable())  // 这里开始SEDP
        // ...
    }
    return writer;
}

接着在BuiltinProtocols::addLocalWriter函数中调用EDP::newLocalWriterProxyData函数,在processLocalWriterProxyData中发送SEDP数据:

bool EDPSimple::processLocalWriterProxyData(
        RTPSWriter* local_writer,
        WriterProxyData* wdata)
{
    auto* writer = &publications_writer_;
    // ...
    CacheChange_t* change = nullptr;
    bool ret_val = serialize_writer_proxy_data(*wdata, *writer, true, &change);
    if (change != nullptr) {
        writer->second->add_change(change);
    }
    return ret_val;
}

Simple EDP Attributes

NameDescriptionTypeDefault
SIMPLE EDP定义了EDP阶段SIMPLE protocol发现协议的使用,DomainParticipant可能会创建DataWriters, DataReaders,或者都创建和都不创建booltrue
Publication writer and Subscription readerDomainParticipant只实现一个或两个DataWriters,不需要DataReaders,所以只需要创建与DataReader发现相关的EDP端点booltrue
Publication reader and Subscription writerDomainParticipant只实现一个或两个DataReaders,不需要DataWriter,所以只需要创建与DataWriter发现相关的EDP端点booltrue
  1. Initial peers
    根据RTPS标准,每个RTPSParticipant必须用两个不同的端口监听PDP,一个多播地址,一个单播地址。Fast DDS允许配置对端的IP-Port。

EDP接收


http://www.kler.cn/a/382573.html

相关文章:

  • 计算机网络之---局域网
  • 一块钱的RISC-V 32位芯片
  • 【LeetCode Hot100 贪心算法】 买卖股票的最佳时机、跳跃游戏、划分字母区间
  • java mail 535 Login Fail. Please enter your authorization code to login
  • 在JavaScript开发中,如何判断对象自身为空?
  • 什么是网络安全攻防演练,即红蓝对抗?
  • 【线程与并发】详谈 可见性,有序问题
  • Spring Boot 与 Vue 共筑畅销图书推荐卓越平台
  • C#-类:静态成员的介绍
  • LaTeX中的\pdfoutput命令:输出PDF
  • android 使用xml设置背景图片和圆角
  • 新兴好用办公软件,协作效率翻倍就用它了!ONLYOFFICE最近发布了文档8.2版本
  • 猫用宠物空气净化器推荐,希喂,美的哪款除毛好、噪音小?
  • dolphin 配置data 从文件导入hive
  • 智慧水肥一体化:道品科技现代农业的智能管理模式
  • 印度高速升降机行业深度分析及未来预测
  • Echats柱状图的横坐标用图片显示
  • Python机器学习:分类器决策函数详解
  • 【jvm】堆空间分代思想
  • Renesas R7FA8D1BH (Cortex®-M85) Flash的功能介绍
  • 美畅物联丨物联网通信新纪元:Cat.1与5G RedCap的差异化应用
  • [linux驱动开发--读设备树] 基于qemu9.1+linux内核6.11.0
  • Spire.PDF for .NET【页面设置】演示:获取 PDF 文件中的页数
  • python全栈开发《60.获取两个集合的差集》
  • 摩尔线程嵌入式面试题及参考答案(2万字长文)
  • 突破1200°C高温性能极限!北京科技大学用机器学习合成24种耐火高熵合金,室温延展性极佳