当前位置: 首页 > article >正文

并发编程(19)——引用计数型无锁栈

文章目录

  • 十九、day19
  • 1. 引用计数
  • 2. 代码实现
    • 2.1 单引用计数器无锁栈
    • 2.2 双引用计数器无锁栈
  • 3. 本节的一些理解

十九、day19

上一节我们学习通过侯删链表以及风险指针与侯删链表的组合两种方式实现了并发无锁栈,但是这两种方式有以下缺点:

第一种方式:如果 pop 操作频繁被多个线程调用,待删除的节点将不断累积,导致待删除列表无法及时回收,进而占用大量内存。此时我们需要一种机制,每次调用 pop 函数结束后都需要判断侯删链表中的节点是否可以被 delete,即风险指针。

第二种方式

  1. 如何正确且高效地分配这块内存是一个难点。

  2. 为了删除一个节点,需要遍历风险指针数组,检查该节点是否被某个风险指针引用。同时,在处理待删除节点时,也需要从风险数组中选取合适的节点记录其地址,这进一步增加了开销。

  3. 而且因为该方法受专利保护,所以在一些场合可能无法使用

在这一节中,我们学习如何通过引用计数来实现无锁栈以及安全回收机制。

参考:

恋恋风辰官方博客

《c++并发编程》中无锁栈的实现为什么要用双引用计数器_无锁栈 双重引用技术-CSDN博客

C++实现“无锁”双buffer - 知乎


1. 引用计数

本节的引用计数通过两个计数器实现:一个是存储在节点内部的 内部计数器,另一个是与指针绑定在一起的 外部计数器。外部计数器和指针被封装在一个结构体 counted_node_ptr 中,因此栈顶指针的类型从原生指针(void⭐、int⭐、char⭐、node⭐…)修改为 counted_node_ptr

struct counted_node_ptr {
    int external_count; // 外部计数器
    node* ptr;          // 原始指针,指向节点
};

具体机制如下:

  • 当线程加载 head 指针时,外部计数器会加 1,表示有新的线程在引用这个节点。
  • 当线程不再引用某个节点时,内部计数器会减 1。

通过将 内部计数器外部计数器 相加,就可以得到该节点的实际引用计数。当引用计数归零时,说明没有线程再访问这个节点,此时可以安全地删除它。这种设计确保了节点的引用计数在并发环境下能够准确管理,同时避免了资源泄露的问题。数据结构如下图所示

在这里插入图片描述

图片来源:https://blog.csdn.net/ld_long/article/details/137692475

当线程尝试从栈中弹出数据时,会按照以下步骤操作:

  1. 读取栈顶指针
    线程首先读取 head 指针(栈顶指针),将其值存入线程的局部变量 local_head 中,同时将 head 指针的外部计数器加 1。这个过程是原子的,因此能够确保线程安全。
  2. 同步计数器
    local_head 的外部计数器更新为与 head 的外部计数器相同。随后,线程可以通过 local_head 访问节点。
  3. 判断能否弹出节点
    • 如果在 local_head 载入后,发现 local_head 的指针值与当前 head 不一致,说明有其他线程在此期间修改了 head。此时,当前线程无法弹出节点,因为它载入的 local_head 已作废。
    • 线程会将节点的内部计数器减 1(因为当前线程不再指向该节点)。如果内部计数器变为 0(说明没有其他线程再引用这个节点),当前线程负责删除节点;否则,什么也不做。
  4. 成功弹出节点
    • 如果 local_head 的指针值仍与 head 相同,说明没有其他线程修改 head,当前线程可以成功弹出节点。
    • 线程将 head 指针更新为当前节点的下一个节点(即 head.ptr->next),这个更新操作是原子的。
  5. 更新计数器
    • 弹出节点后,将 local_head 的外部计数器减 1,因为 head 指针已经不再指向这个节点。
    • 再次将 local_head 的外部计数器减 1,因为 local_head 也准备不再指向这个节点(实际上,可以一次性减 2,这里分开解释是为了说明两个来源)。
    • 然后,将 local_head 的外部计数器值加到节点的内部计数器上。
  6. 判断节点是否需要删除
    • 如果内部计数器因此变为 0(即它原本是外部计数器的相反数),说明没有线程再引用该节点,当前线程负责删除节点。
    • 如果内部计数器不为 0,则无需删除节点。

通过这样的流程,可以确保节点在无人引用时才被安全回收,同时减少不必要的内存操作,提升了效率和线程安全性。

这段文字光看很难看懂,我们通过代码进行解释:

2. 代码实现

2.1 单引用计数器无锁栈

在此之前,我们需要知道:该方式必须通过两个计数器才能实现,如果只用一个,会有一些问题。我们先通过只用一个计数器实现引用计数,进而说明为什么只能使用两个计数器实现。

template<typename T>
class single_ref_stack {
public:
    single_ref_stack():head(nullptr) {
    }
    ~single_ref_stack() {
        //循环出栈
        while (pop());
    }
    
    void push(T const& data);
    std::shared_ptr<T> pop();   
private:
    struct ref_node {
        //1 数据域智能指针
        std::shared_ptr<T>  _data;
        //2 引用计数
        std::atomic<int> _ref_count;
        //3  下一个节点
        ref_node* _next;
        ref_node(T const& data_) : _data(std::make_shared<T>(data_)),
            _ref_count(1), _next(nullptr) {}
    };
    //头部节点
    std::atomic<ref_node*> head;
};

上段代码是一个单引用的无锁栈实现,内容很简单不多于解释,我们只需要注意一点:计数器引用计数必须要使用原子类型,为了保证多线程修改下仍是线程安全的。

接下来实现 pop() 和 push():

void push(T const& data) {
    auto new_node = new ref_node(data);
    new_node->next = head.load();
    while (!head.compare_exchange_weak(new_node->next, new_node));
}

push() 函数很简单,就是将数据插入至栈链表的表头,并且调用原子操作 compare_exchange_weakhead 修改为 new_node,即将 head 指针指向新插入的表头节点 new_node

std::shared_ptr<T> pop() {
    ref_node* old_head = head.load();
    for (;;) {
        if (!old_head) { // 空栈检查
            return std::shared_ptr<T>();
        }
        //1 只要执行pop就对引用计数+1
        ++(old_head->_ref_count);
        //2 比较head和old_head想等则交换否则说明head已经被其他线程更新
        if (head.compare_exchange_strong(old_head, old_head->_next)) {
            auto cur_count = old_head->_ref_count.load();
            auto new_count;
            //3  循环重试保证引用计数安全更新
            do {
                //4 减去本线程增加的1次和初始的1次
                new_count = cur_count - 2;
            } while (!old_head->_ref_count.compare_exchange_weak(cur_count, new_count));
            //返回头部数据
            std::shared_ptr<T> res;
            //5  交换数据
            res.swap(old_head->_data);
            //6
            if (old_head->_ref_count == 0) {
                delete old_head;
            }
            return res;
        }
        else {
            //7 
            if (old_head->_ref_count.fetch_sub(1) == 1) {
                delete old_head;
            }
        }
    }
}

该函数仅有两种返回值:空栈返回一个空的智能指针;非空栈返回一个指向弹出数据的智能指针。执行流程如下:

  1. 进行空栈检查,如果是空栈,直接返回空的 std::shared_ptr<T>(),否则继续执行
  2. 因为节点结构体的成员增加了一个原子类型的引用计数,这里增加当前线程对节点(old_head)的引用计数(因为计数器是 std::atomic<int> 类型的整型原子变量,所以 ++ 是原子操作),避免该节点(old_head)在操作过程中被其他线程删除
  3. 使用 CAS(Compare-And-Swap) 操作尝试将 headold_head 更新为 old_head->_next。如果成功说明当前线程成功弹出头节点,否则说明其他线程在此期间修改了 head,需要重新尝试。
  4. 如果判断成功,则 head 被修改为 old_head->_next。按照上一节的做法,这里需要将 old_head 加入至侯删链表,但是在本节中我们需要在这里更新引用计数
    1. 首先,读取 old_head 的引用计数(我们在 pop 函数最开始将其 +1,保证其他线程不会删除它),并将其赋值给 cur_count
    2. 然后将引用计数减去 2(pop 函数一开始加了1,每个节点初始化的时候默认构造函数将引用计数设置为 1,一共增加了两次 1)
    3. 使用 CAS(Weak) 重试,保证引用计数能被安全更新到 new_count
    4. 引用计数修改成功之后,将弹出节点 old_head 的数据存储至 std::shared_ptr<T> 类型的智能指针中,并判断 old_head 的引用计数是否为 0,如果为 0 则直接将 old_head 删除,否则保留
    5. 最后返回存有 old_head->_data 的智能指针 res
  5. 如果判断失败,说明其他线程在此期间修改了 head。判断失败的线程可以理解为抢占失败的线程,因为 pop 函数的一开始会将栈表头节点的引用计数 +1,所以如果抢占失败,需要在 pop 函数结束前将引用计数 -1。如果引用 -1 之后的值为 1,则表示其他对该节点 pop 的线程已经结束,当前线程是最后引用此指针的线程,我们需要在该线程中将 old_head 删除。

但是该函数存在隐含的风险

  • 风险1:假设线程 1 和线程 2 同时调用pop函数,并已经读取了head 的值,现在准备将 old_head 的引用计数 +1,但线程 2 率先完成了pop函数操作,将引用计数 _ref_count 更新为 0 并删除了 old_head。此时,线程 1 恰好继续执行第 1 步代码,却尝试访问已经被删除的 old_head,因此导致程序崩溃。
  • 风险2:假设线程 1 和线程 2 同时调用 pop函数,并先后将 old_head 的引用计数 +1(此时 old_head 的引用计数总计为 3),假如线程 1 率先通过 if 的判断,将 head 成功更新为 old_head->next;而线程 2 会因为 CAS 会读取到 head 最新修改的值,即 old_head->next,因为 head 最新的值和 old_head 不同,所以会将 old_head 修改为 old_head->next,线程 2 走进 else 分支之后判断的 old_head->ref_count 其实是 old_head 修改后的引用计数,即 old_head->next->ref_count,为1(只有初始化的时候才 +1,其他线程没有对它的引用计数进行操作),从而会将其删除。其他线程如果要弹出 old_head->next 的时候会发现节点已经不存在,报错。流程如下图所示:

在这里插入图片描述

  • 假设线程 1 和线程 2 同时调用 pop函数,并先后将 old_headnode1)的引用计数 +1(此时 old_head 的引用计数总计为 3),如上图中的流程1
  • 假如线程 1 率先通过 if 的判断,将 head 成功更新为 old_head->next,即node2,如上图的流程2
  • 线程 2 会因为 CAS 会读取到 head 最新修改的值,即 old_head->nextnode2),因为 head 最新的值和 old_head 不同,所以会将 old_head 修改为和 head 相同的值 old_head->next,如上图的流程3
  • 因为线程 2 没通过 if 的判断条件,从而走进了 else 分支,else分支中会执行 old_head->_ref_count.fetch_sub(1) == 1操作,此时 old_head 指向节点的引用计数是 1。注意,fetch_sub_ref_count 减 1 后返回的是减之前的值,即1,而不是减之后的值 0,所以这里虽然将 _ref_count 从 1 减为 0 ,但是 if 条件仍然会判断成功,从而将 old_head 删除
  • 假如线程 3 在线程 2 执行完之后调用了 pop 函数想要将 node2 弹出,按正常流程来走的话,会读取 head 指向的节点,即 node2,但因为 node2 已经被线程 2删除,所以这里在读取 head.load() 的时候会报错,此时 head 指向空指针 nullptr,如下图所示:

在这里插入图片描述

解决方法就是将引用计数与指针解耦,将引用计数从结构体提出来,不与指针耦合

将原本数据结构体 ref_node 分解为 ref_nodenode 两个结构体,前者用于存储引用计数,后者用于存储数据。

struct node {
    //1 数据域智能指针
    std::shared_ptr<T>  _data;
    //2  下一个节点
    ref_node _next;
    node(T const& data_) : _data(std::make_shared<T>(data_)) {}
};
struct ref_node {
    // 引用计数
    std::atomic<int> _ref_count;
    node* _node_ptr;
    ref_node( T const & data_):_node_ptr(new node(data_)), _ref_count(1){}
    ref_node():_node_ptr(nullptr),_ref_count(0){}
};

如果将 head 存储为指针类型而不是副本类型

std::atomic<ref_node*> head; // 头节点

注意:这里的头节点存储的类型是 ref_node* 而不是 node*

那么 pop() 函数需修改如下:

std::shared_ptr<T> pop() {
    //0 处
    ref_node* old_head = head.load();
    for (;;) {
        //1 只要执行pop就对引用计数+1并更新到head中
        ref_node* new_head;
        do {
            new_head = old_head;
            new_head->_ref_count += 1; // 7
        } while (!head.compare_exchange_weak(old_head, new_head));
        //4 确保 head == old_head == new_head 
        old_head = new_head;
        
        auto* node_ptr = old_head->_node_ptr; // 获取当前节点的 node(存储数据的结构体)
        if (node_ptr == nullptr) { // 判断空栈
            return  std::shared_ptr<T>();
        }
        
        //2 比较head和old_head想等则交换否则说明head已经被其他线程更新
        if (head.compare_exchange_strong(old_head, node_ptr->_next)) {
            // 要返回的值
            std::shared_ptr<T> res;
            //5 交换智能指针
            res.swap(node_ptr->_data);
            //6 增加的数量
            int increase_count = old_head->_ref_count.fetch_sub(2);
            //3 判断仅有当前线程持有指针则删除
            if (increase_count == 2) {
                delete node_ptr;
            }
            return res;
        }else {
            if (old_head->_ref_count.fetch_sub(1) == 1) {
                delete node_ptr;
            }
        }
    }
}
  • 第一个 while 是为了在多线程下安全地更新头节点的引用计数
  • 因为 headold_headnew_head 都是 ref_node 类型的结构体,这里必须使用 auto* node_ptr = old_head->node_ptr 获取和当前引用计数匹配的数据节点,使用该节点判断是否存在内容;而 head 的下一个节点并不是 head ->node_ptr,而是 head ->node_ptr->next
  • 第二个 while 是为了更新 head 的值,将表头节点从链表中删除(从链表中删除而不是从内存中删除)。
  • 如果 head 更新成功,则将 old_head ->node_ptr 弹出,而不是将 old_head 弹出,old_head 表示的仅仅只是引用计数而不是数据。
  • int increase_count = old_head->_ref_count.fetch_sub(2)是为了获取引用计数增加的数量(ref_count.fetch_sub(2) 会将 ref_count 的值减去 2 并返回 _ref_count 的旧值,而不是减去 2 后的新值),如果确实只增加了 2,那么表示当前线程是弹出节点的唯一持有者,可以直接删除。

如果我们将 head存储为指针类型而不是副本类型,那么上面的代码有以下风险:

  • 风险1:和上面引用计数和数据耦合为一个结构体时的风险2相同,会造成误删:

    假设线程 1 比线程 2 先执行,线程 1 在步骤 2 进行比较并交换操作(CAS)后,会将 head 更新为新的值。而此时线程 2 也尝试执行同样的 CAS 操作,但由于 head 已被线程 1 修改,线程 2 的 CAS 操作会失败。

    此时,线程 2 会进入 else 分支进行处理,并更新 old_head 为当前的 head 值。然而,此时线程 2 中的 old_head 的引用计数为 1,而 old_head 指向的数据实际上是新的 head 所指向的数据节点。如果线程 2 随后误将 old_head 指向的数据节点删除,就会导致问题。因为线程 2 删除的实际上是当前 head 所指向的节点,而它并未真正弹出节点或修改 head 的值。这会导致其他线程在尝试弹出栈顶元素时访问到已被删除的节点,从而引发程序崩溃。

  • 风险2

    假设线程 1 和线程 2 都执行完了第 0 步的代码,此时它们读取到的 old_head 值是相同的。线程 1 抢先一步执行,完成了第 5 步操作,并准备执行第 3 步判断逻辑时,线程 2 抢占了 CPU 开始执行第 7 步的代码。

    虽然线程 2 的 while 循环会检测到 old_head 和当前 head 不同,并因此进行重试,更新了 old_head,但在 do 循环的第一次迭代中,线程 2 的 old_head 和线程 1 的 old_head 仍然指向同一个节点。此时,线程 2 修改了 old_head 的引用计数。这会导致线程 1 在执行第 3 步时,引用计数不满足删除条件(因为线程 2 将old_head的计数加一,因为head、old_head、new_head的类型都是指针类型,指向的是同一块内存(指针),所以会反映到线程1中。将此时为3,线程 1 加一,线程 2 加一,初始化加一),因此不会进入 if 逻辑删除节点。

    与此同时,因为线程2在2处之后while会不断重试,线程2的head已经和old_head指向不同了,导致线程2也不会回收old_head内部节点指向的数据。最终,old_head 节点中的数据既没有被线程 1 删除,也没有被线程 2 回收,导致内存泄漏。

    问题根源在于,多个线程对 old_head 的引用计数存在竞争,但删除逻辑对引用计数的操作未能同步,导致某些节点被遗漏回收。

  • 一种和风险2相似但是正常的情况

    假设线程 1 和线程 2 都执行完了第 0 步的代码,此时两者读取的 old_head 值是相同的。接下来,线程 1 比线程 2 先获得 CPU 时间片并继续执行,而线程 2 因未抢占到 CPU 时间片停在了第 1 步。

    线程 1 按照顺序继续执行,最终到达第 3 步并将 node_ptr 所指向的节点删除。同时,head 已更新为新的栈顶元素,也就是 old_head 的下一个节点。

    这时,线程 2 抢占到了 CPU 时间片,继续执行第 1 步的代码,并在第一个 while 循环中因为 CAS 失败将 old_head 更新为当前 head 的值。此时,因为 new_head 是指针,所以对 new_head 的修改能反映到old_head 本身,old_head的引用计数也增加了 1,变为 2,但它实际指向的是下一个节点(即新的栈顶)。

    在这种情况下,线程 2 仍会进入 if 条件(不是进入 else 分支),对新的 old_head 节点执行删除操作。由于此时的 old_head 和引用计数逻辑匹配,这种情况是正常的,不会引发错误。

我们总结如下:

  1. 在实现引用计数内存回收机制时,应尽量避免直接存储指针,因为存储指针会带来多个线程同时操作同一块内存的风险。我们这里需要将 head 的类型从 std::atomic<ref_node*> 修改为 std::atomic<ref_node>,使用副本类型而不是指针类型。

  2. 引入一个专门用于记录减少引用计数的计数器。为了确保在多个线程中修改该计数器的安全性,可以将其设计为 node 类的成员变量。由于 node 类的实例是通过指针存储在栈的节点中的,多个线程能够安全地访问和修改它。

通过将引用计数分为“增加”和“减少”两部分,并将减少的计数器放入 node 类中,我们能够更清晰地追踪各线程对节点的引用情况,从而避免因引用计数不准确导致的内存泄漏或竞争问题。即双引用计数器版本。

  1. 一个节点是否可以被回收,取决于其整体引用计数(增加的引用计数加减少的引用计数)是否为 0。只有当引用计数为 0 时,节点才可以被安全地删除。

2.2 双引用计数器无锁栈

有两种双引用计数器无锁栈的实现方式,一种是博主恋恋风辰的实现方式:将增加计数器放在 ref_node 结构体中,将减少计数器放在 node 结构体中;另一种是C++并发编程书中的实现方式,使用了内存序进行了优化,具体实现相同,只不过后者将一些功能分离成了单独的函数,并通过内存序增加了效率。后者我们在下一节学习,本节只学习第一种。

#pragma once
#include <atomic>

template<typename T>
class single_ref_stack {
public:
	single_ref_stack(){}

	~single_ref_stack() {
		//循环出栈
		while (pop());
	}

	void push(T const& data) {
		auto new_node =  ref_node(data);
		new_node._node_ptr->_next = head.load();
		while (!head.compare_exchange_weak(new_node._node_ptr->_next, new_node));
	}

	std::shared_ptr<T> pop() {
		ref_node old_head = head.load();
		for (;;) {
			//1 只要执行pop就对引用计数+1并更新到head中
			ref_node new_head;
			do {
				new_head = old_head;
				new_head._ref_count += 1;
			} while (!head.compare_exchange_weak(old_head, new_head));

			old_head = new_head;

			auto* node_ptr = old_head._node_ptr;
			if (node_ptr == nullptr) {
				return  std::shared_ptr<T>();
			}

			//2 比较head和old_head想等则交换否则说明head已经被其他线程更新
			if (head.compare_exchange_strong(old_head, node_ptr->_next)) {
				
				//要返回的值
				std::shared_ptr<T> res;
				//交换智能指针
				res.swap(node_ptr->_data);

				//增加的数量
				int increase_count = old_head._ref_count - 2;
				
				if (node_ptr->_dec_count.fetch_add(increase_count) == -increase_count) {
					delete node_ptr;
				}

				return res;
			}else {
				if (node_ptr->_dec_count.fetch_sub(1) == 1) {
					delete node_ptr;
				}
			}
		}
	}

private:
	struct ref_node;
	struct node {
		//1 数据域智能指针
		std::shared_ptr<T>  _data;
		//2  下一个节点
		ref_node _next;
		node(T const& data_) : _data(std::make_shared<T>(data_)) {}
		// 减少引用计数器
		std::atomic<int>  _dec_count;
	};

	struct ref_node {
		// 增加引用计数
		int  _ref_count;
		node* _node_ptr;
		ref_node( T const & data_):_node_ptr(new node(data_)), _ref_count(1){}
		ref_node():_node_ptr(nullptr),_ref_count(0){}
	};

	//头部节点
	std::atomic<ref_node> head;
};

针对单引用计数器提出的结论,我们做如下修改:

  1. 增加一个减少计数器,用于记录引用计数减少的次数,存储至 node 结构体中。因为 head 存储的是 std::atomic<ref_node> 类型,所以 _ref_count 的类型是 int 而不用是 std::atomic<int>ref_node 的数据只需每个线程自己看到就行,不用分享给其他线程。

    因为 _ref_count 的类型是 int,所以其他线程没办法看到,为了让其他线程看到 ref_count 的值,我们在 pop 内部将 _ref_count - 2 的值加到了 _dec_count 上面,dec_count原子变量,其他线程可以看到。而且我们通过CAS可以修改head的值,因为head是原子类型,所以head的引用计数也会被其他线程看到。一共有两种方式。

    struct ref_node;
    struct node {
        //1 数据域智能指针
        std::shared_ptr<T>  _data;
        //2  下一个节点
        ref_node _next;
        node(T const& data_) : _data(std::make_shared<T>(data_)) {}
        //减少的数量
        std::atomic<int>  _dec_count;
    };
    struct ref_node {
        // 引用计数
        int  _ref_count;
        node* _node_ptr;
        ref_node( T const & data_):_node_ptr(new node(data_)), _ref_count(1){}
        ref_node():_node_ptr(nullptr),_ref_count(0){}
    };
  1. head 的类型从 std::atomic<ref_node*> 修改为 std::atomic<ref_node>
std::atomic<ref_node> head;

push() 函数的实现如下:

void push(T const& data) {
    auto new_node =  ref_node(data);
    new_node._node_ptr->_next = head.load();
    while (!head.compare_exchange_weak(new_node._node_ptr->_next, new_node));
}

pop() 函数的实现如下:

std::shared_ptr<T> pop() {
    ref_node old_head = head.load();
    for (;;) {
        //1 只要执行pop就对引用计数+1并更新到head中
        ref_node new_head;
        //2
        do {
            new_head = old_head;
            new_head._ref_count += 1;
        } while (!head.compare_exchange_weak(old_head, new_head));
        old_head = new_head;
        //3
        auto* node_ptr = old_head._node_ptr;
        if (node_ptr == nullptr) {
            return  std::shared_ptr<T>();
        }
        //4 比较head和old_head相等则交换否则说明head已经被其他线程更新
        if (head.compare_exchange_strong(old_head, node_ptr->_next)) {
            //要返回的值
            std::shared_ptr<T> res;
            //交换智能指针
            res.swap(node_ptr->_data);
            //5  增加的数量
            int increase_count = old_head._ref_count - 2;
            //6  
            if (node_ptr->_dec_count.fetch_add(increase_count) == -increase_count) {
                delete node_ptr;
            }
            return res;
        }else {
            //7
            if (node_ptr->_dec_count.fetch_sub(1) == 1) {
                delete node_ptr;
            }
        }
    }
}
  1. 当多个线程并发调用 pop 时,可能会有线程在代码的第 2 处不断重试。这种重试的原因是 headold_head 之间的某些字段(如引用计数或节点node地址)可能已经被其他线程更新。但无论如何,head 使用的是副本存储(std::atomic<ref_node>),即使重试增加了引用计数,也不会影响到其他线程,因为每个线程操作的都是自己的 old_head 副本
  2. 在代码的第 3 处,我们从 old_head 中取出 node_ptr,用来保存指向节点的地址。这样,后续操作中可以安全地减少该节点的引用计数(比如 6 和 7 处的 node_ptr->_dec_count.fetch_add(x))。因为多个线程可能同时操作同一个 node_ptr 指向的节点,所以引用计数使用了原子变量,以确保多个线程对其修改时相互可见且线程安全。
  3. 在第 4 处,我们通过比较交换(compare_exchange_strong)操作来判断 old_headhead 是否一致。因为 head 、new_head、old_head 存储的是副本类型,所以多个线程看到的 old_head 的值可能不一样(因为每个线程的 old_head 都是独立的副本,互不干涉),但我们能保证仅有一个线程进入if逻辑,进入的线程就是old_head和head匹配的那个。 如果一致,说明当前线程成功抢占操作权,进入 if 逻辑;如果不一致,说明 head 已被其他线程更新,当前线程无法进入 if,需要进行后续的 else 逻辑处理。值得注意的是,即使多个线程同时看到不同的 old_head,也只有一个线程能进入 if 条件,其它线程都会失败。
  4. 在第 5 处,我们已经确定当前线程需要处理的 node_ptr 节点。此时,我们定义了 res,用来保存弹出的数据。在调整引用计数时,先减去 2,其中 1 表示当前线程自身的操作,另 1 表示 初始化的 1。接着计算其他并发线程操作该节点的数量,存储在 increase_count 中。最终,是否删除节点取决于增加和减少的引用计数之和是否为 0。
  5. 在第 6 处,我们使用 fetch_add 操作来调整 _dec_count,并返回操作前的值。如果返回值为负的 increase_count,说明当前线程是唯一操作该节点的线程(因为没有其他线程增加引用计数),此时可以安全删除该节点。如果返回值不是负的 increase_count,则说明还有其他线程正在操作此节点,不能删除。

示例

假设有两个线程 t1t2 执行 pop() 函数,假如线程 t1 先于线程 t2 执行 2 处代码:

  • head 的类型是std::atomic<ref_node>,而 new_headold_head 的类型是 ref_node,所以在线程 t1t2 中的 old_headnew_head 互相独立互不影响。
  • 线程 t1 将属于自身线程的 old_headnew_head 副本的引用计数 +1,并将 head 中的数据修改为和 old_head 相同的数据内容,head 的增计数器 _ref_count 值此时为 2。
  • 当线程 t1 执行完 2 处代码后,线程 t2 开始执行,此时因为 CAS 机制使得线程 t2 专属的 old_head 中的增引用计数器被修改为和 head 相同的内容,即 2。因为 head 的原子类型,所以线程 t1head 的修改会被线程 t2 看到,此时线程 t2 看到的 head 的增引用计数器的值为 2,在线程 t2 第二次 CAS 重试之后,会将 old_head 的增计数器 +1,从 2 增到 3。(所以虽然 _ref_count 的类型是 int,但是通过这一种方式可以间接的让其他线程看到)。

假如线程 t1 先于线程 t2 执行 4 处代码:

  • 因为 head.load() == old_head,所以线程 t1 会进入 if 分支并将 head 更新到 node_ptr->_next

  • increase_count 表示除本线程对目标节点的引用外,其他线程对该目标节点的引用数量,这里 -2 是将线程 t1 对目标节点的引用减去,再将初始化时对引用的初始化减一,一共减去二。increase_count 此时应该为 1,表示除了线程 t1 在引用目标节点外,还有线程 t2 在引用目标节点(除了将 head 更新这种方式,也可以通过这种方式让其他线程看到引用计数的修改increase_count 表示其他线程对目标节点的引用数量)

线程 t1 在执行 6 处代码之前,一共有两种可能:

  • 第一种可能:

    线程 t2 还未执行 7 处代码,此时 _dec_count 的默认初始值为 0,线程 t1_dec_countincrease_count ,即加 1,并返回 _dec_count 原本的值 0;此时不满足 if 的判断,即 0 != -1,所以不会删除数据节点。

    线程 t2 在线程 t1 执行完 6 处代码之后执行 7 处代码,此时 _dec_count 被线程 t1 增加到了 1,然后在 if 判断中对 _dec_count - 1 并返回 _dec_count 初值,即 1;因为 1 == 1,表示当前仅有线程 t2 引用目标节点,可以直接删除。

  • 第二种可能:

    线程 t2 已经执行了 7 处代码,此时 _dec_count 的默认初始值为 0,线程 t2_dec_count 减 1 ,并返回 0;因为 0 != 1,所以线程 t2 不会将数据内容删除。

    线程 t1 在线程 t2 执行完 7 处代码后执行 6 处代码,此时 _dec_count 的值被线程 t2 修改为 -1,然后在 if 判断中将 _dec_countincrease_count,并返回 -1;因为 -1 == -1,此时仅有线程 t1 引用了目标节点,线程 t1 负责将节点删除。

headstd::atomic<ref_node> 类型,old_headnew_headref_node 类型。head 的修改能及时反映到不同线程上(在局一致性内存序下)

3. 本节的一些理解

为什么需要两个计数器,一个是否足够?

如果只用一个计数器,我们首先要考虑把它放在哪里。直觉上,可能会想到将计数器放在节点内部,但这样是行不通的。因为计数器的目的是避免空悬指针解引用(访问已删除的节点),但如果计数器在节点内部,访问计数器本身就需要先访问节点。如果节点已经被删除,这就会导致问题(所以我们将引用计数与指针解耦,将引用计数从结构体提出来,不与指针耦合,将一个结构体拆解成了两个)。

举个场景:线程 A 读取了 head 指针,并存到 local_head 中。这时线程 A 还没来得及通过 local_head 访问节点内部的计数器并增加计数,而线程 B 率先操作了节点的计数器,加了 1,然后弹出了节点,并在操作完成后让计数器减 1,发现计数变成了 0,于是删除了节点。此时,线程 A 再试图通过 local_head 访问节点时,节点已经被删除了,出现了空悬指针解引用。

因此,如果只使用一个计数器,它就不能放在节点内部,而需要放在节点外部。计数器需要和 head 指针处于同一“层级”(即数据结构体node和ref_node相互关联,我们通过ref_node->node_ptr即可访问数据内容,通过node->next即可访问下一个节点的计数器),成为所有线程都能直接操作的数据。

为了实现这一点,我们可以把计数器和 head 指针合二为一。这样,读取 head 指针和增加计数器的操作可以原子地进行,避免在这两步之间出现时间差(如果计数器和 head 是两个独立变量,操作之间无法保证原子性)。理想情况下,每当有一个指针指向 head 节点,计数器加 1;当指针不再指向该节点时,计数器减 1。但这种设计在实现时会遇到复杂问题。使用一个计数器无法实现,我们需要使用两个计数器来实现。

为什么一个计数器无法实现?

设想以下过程:

  1. 线程 A 加载 headnew_head(类型为 ref_node),如果 head 没有变化,就增加 new_head 的计数器,并更新 head 的计数器。最后将 new_head 的计数器赋值给 old_head,此时,三者的计数器都变为 2。注意,这些操作发生在线程 A 访问节点之前,因此确保节点不会被删除。
  2. 在线程 A 还没来得及访问节点时,线程 B 也加载了 head 到它的 local_head。线程 B 增加了计数器,此时计数器值变为 3(表明两个线程正在引用该节点,在此之前线程B会经历一次CAS,因为head已经被线程A修改,所以线程B的old_head会读取head最新的值,此时线程B即可读取到线程A对引用计数修改的最新值,再+1会变为3)。接着,线程 B 将 head 修改为指向下一个节点,并让 head 的计数器减 2,计数器变回 1。
  3. 线程 B 完成操作后退出,此时节点只被线程A引用,计数器保持为 1。线程 A 继续操作,但发现 old_head(3) 和当前的 head(1) 已不一致,放弃弹出该节点,并进入 else 分支让减计数器减 1,计数器变为 0。

问题是,上述过程仅仅是在两个计数器的共同配合下才会实现的。如果仅有一个计数器,那么计数器永远不会减到 0,导致节点永远不会被删除。这是因为不同线程对计数器的操作是独立的,无法相互感知。

为什么需要两个计数器?

为了解决这个问题,计数器必须与节点绑定,且每个节点只能有一个全局计数器,就像 shared_ptr 的设计一样。然而,shared_ptr 无法实现对 head 的无锁、原子性操作,因此必须引入第二个计数器。

两个计数器的设计允许:

  • 一个计数器跟随 head,用于线程安全地读取和操作(增计数器)。
  • 另一个计数器绑定到节点,用于判断节点是否可以安全删除(减计数器)。

这样的设计虽然复杂,但能确保计数的正确性和节点的安全性。


http://www.kler.cn/a/450349.html

相关文章:

  • 【C++】B2066救援题目分析和解决讲解
  • 【Web】2024“国城杯”网络安全挑战大赛决赛题解(全)
  • openjdk17 从C++视角看 String的intern的jni方法JVM_InternString方法被gcc编译器连接
  • 【FAQ】HarmonyOS SDK 闭源开放能力 — Vision Kit(2)
  • 重温设计模式--单例模式
  • 华为ensp--BGP路由反射器
  • 【UI自动化】从WebDriver看Selenium与Appium的底层关联
  • 【python 逆向分析某有道翻译】分析有道翻译公开的密文内容,webpack类型,全程扣代码,最后实现接口调用翻译,仅供学习参考
  • SQL面试题——奖金瓜分问题
  • ChatGPT与Postman协作完成接口测试(一)
  • 处理字体图标、js、html及其他资源
  • 精读 84页华为BLM战略规划方法论
  • 概率论得学习和整理32: 用EXCEL描述正态分布,用δ求累计概率,以及已知概率求X的区间
  • css一道光闪过动效
  • 鸿蒙开发-ArkTS的ContainerSpan组件
  • 二进制部署k8s
  • Vite +Vue3打包生产环境去除项目中的console.log
  • Linux C/C++编程-线程退出时的清理机会
  • 易语言 OCR 文字识别
  • LightGBM分类算法在医疗数据挖掘中的深度探索与应用创新(上)
  • 数据结构-串-顺序结构实现
  • 如何使用vscode解决git冲突
  • 【微信小程序】微信小程序中的异步函数是如何实现同步功能的
  • C# 异步编程与多线程简析
  • 【python】装饰器
  • 云端地球模型标注如何添加?