当前位置: 首页 > article >正文

并发编程(10)——内存模型和原子操作

文章目录

  • 十、day10
  • 1. 内存模型基础
    • 1.1 对象和内存区域
    • 1.2 改动序列
  • 2. 原子操作及其类型
    • 2.1 原子操作
    • 2.2 原子类型
    • 2.3 内存次序
    • 2.4 std::atomic_flag
      • 2.4.1 自旋锁
    • 2.5 std::atomic<bool>
    • 2.6 std::atomic<T*>
    • 2.7 标准整数原子类型
    • 2.8 std::atomic<std::shared_ptr>

十、day10

今天学习有关内存模型和原子操作的知识。包含:

  1. 内存模型基础 ⭐⭐
  2. 原子操作、原子类型,以及一些常见的基本原子类型及其成员函数 ⭐⭐⭐⭐⭐
  3. 智能指针shared_ptr是否是线程安全的?原子类型的shared_ptr和基本类型的shared_ptr有什么区别?⭐⭐⭐⭐

什么是原子操作?什么又是内存模型?

简单来说,

  • 内存模型定义了多线程程序中,读写操作如何在不同线程之间可见,以及这些操作在何种顺序下执行。内存模型确保程序的行为在并发环境下是可预测的。
  • 原子操作即不可分割的操作。系统的所有线程,不可能观察到原子操作完成了一半。

参考:

  1. 博主恋恋风辰的个人博客
  2. up主mq白cpp的个人仓库

1. 内存模型基础

内存模型牵涉两个方面:基本结构并发。而基本结构其实就是对象和内存区域。

1.1 对象和内存区域

对象有很多类型,比如C++的内建基本类型(int、float、char等)和用户自定义类型(类、命名的lambda函数等)。但不论对象属于什么类型,它都会存储在一个或多个内存区域中,或是对象/子对象(类、int等等),或是一连串的位域(虽然相邻的位域分属不同对象,但仍然算作同一内存区域)。

比如,整个结构体就是一个对象,它由几个子对象组成,每个数据成员为一个子对象。如下图所示,结构体被分解为对象和内存区域。

在这里插入图片描述

其中,bf1、bf2、bf3、bf4是位域,但是bf3是0宽度位域(其变量名会被注释掉,因为0宽度位域必须匿名),所以bf1、bf2共用同一个内存区域,bf3、bf4各自用一个内存区域,因为bf3是0宽度,所以会将bf4排除在bf3的内存区域之外,但是bf3实际上并不占有任何内存区域。string类型的s由几块内存区域构成。

那么这又和并发有何关系?

其实,任何与多线程相关的事项都牵涉到内存区域。在多线程中,任一线程改动同一内存中的数据,都可能引发条件竞争,要避免条件竞争,可以使用之前学习的方法:通过互斥量加锁,保证某一访问总是先于另一个(也可变动,即随应用软件的运行而间隔轮换访问次序,但总要保证顺序访问)。

还有另一种方法:利用原子操作的同步性质,在目标内存区域采取原子操作,从而强制多个线程遵从一定的访问次序。

1.2 改动序列

在一个C++程序中,每个对象都具有一个改动序列,它由所有线程在对象上的全部写操作构成,其中第一个写操作即为对象的初始化。大部分情况下,这个序列会随程序的多次运行而发生变化,但是在程序的任意一次运行过程中,所含的全部线程都必须形成相同的改动序列。

若多个线程共同操作某一对象,但它不属于原子类型,我们就需要自己对这些线程进行互斥加锁,保证各个线程是按一定顺序访问操作该对象,进而确保对于一个变量,所有线程就其达成一致的改动序列(所有线程对变量的修改顺序相同,要么从头,要么从尾开始)。变量的值会随时间推移形成一个序列,在不同的线程上观察属于同一个变量的序列,如果所见各异,就说明出现了数据竞争和未定义行为.

改动序列基本要求如下

  1. 只要某线程看到过某个对象,则该线程的后续读操作必须获得相对新近的值,并且,该线程就同一对象的后续写操作,必然出现在改动序列后方(每一次写都基于上一次的改动);
  2. 如果某线程先向一个对象写数据,过后再读取它,那么必须能读取到前面写的值;
  3. 若在改动序列中,上述读写操作之间还有别的写操作,则必须读取最后写的值;
  4. 在程序内部,对于同一个对象,全部线程都必须就其形成相同的改动序列,并且在所有对象上都要求如此;
  5. 多个对象上的改动序列只是相对关系,线程之间不必达成一致。

2. 原子操作及其类型

  • 原子操作是不可分割的操作。在系统的任一线程中,我们不会观察到这种操作处于半完成状态;它或者完全做好,或者完全没做。
  • 非原子操作在完成到一半的时候,有可能为另一线程所见。假定由原子操作组合出非原子操作,例如向结构体的原子数据成员赋值,那么别的线程有可能观察到其中的某些原子操作已完成,而某些却还没开始,若多个线程同时赋值,而底层操作相交进行,本来意图完整存入的数据就会彼此错乱。

很明显,在任何情况下访问非原子变量却欠缺同步保护(锁),会造成简单的条件竞争,进而诱发问题。或者说,这种级别的访问可能造成数据竞争,并导致未定义行为。

当多个线程或进程并发访问同一内存位置,并且至少一个线程在写入数据,其他线程在读取或写入数据,而没有适当的同步机制来保护该内存位置时,称这些表达式冲突拥有两个冲突的求值的程序就有数据竞争,除非

  • 两个求值都在同一线程上,或者在同一信号处理函数中执行,或
  • 两个冲突的求值都是原子操作(见 std::atomic),或
  • 一个冲突的求值发生早于 另一个(见 std::memory_order)

如果出现数据竞争,那么程序的行为未定义。

如果一个线程写入原子对象,同时另一线程从它读取,那么行为有良好定义原子操作都是线程安全的。

2.1 原子操作

int a = 0;
void f(){
    ++a;
}
std::thread B(f);
std::thread A(f);

显然,++a 是非原子操作,也就是说在多线程中可能会被另一个线程观察到只完成一半。

  1. 线程 A 和线程 B 同时开始修改变量 a 的值。
  2. 线程 A 对 a 执行递增操作,但还未完成。
  3. 在线程 A 完成递增操作之前,线程 B 也执行了递增操作。
  4. 线程 C 读取 a 的值。

线程 C 到底读取到多少不确定,a 的值是多少也不确定。显然,这构成了数据竞争,出现了未定义行为。

在之前学习的内容中,我们可以通过互斥量来保护共享资源:

std::mutex m;
void f() {
    std::lock_guard<std::mutex> lc{ m };
    ++a;
}

通过互斥量的保护,即使 ++a 本身不是原子操作,逻辑上也可视为原子操作。互斥量确保了对共享资源的读写是线程安全的,避免了数据竞争问题。

不过这显然不是我们的重点。我们想要的是一种原子类型,它的所有操作都直接是原子的,不需要额外的同步设施进行保护。C++11 引入了原子类型 std::atomic

2.2 原子类型

标准原子类型定义在头文件<atomic>中,这些类型的操作都是原子的(如atomic<bool>atomic<int>等等),语言定义中只有这些类型的操作是原子的(注意,原子类型的初始化不是原子的,不能保证线程安全),但我们也可以用互斥量来模拟原子操作(见上文)。

因为可以通过互斥量来模拟原子操作,那么原子操作本身在内部也可能通过互斥进行模拟。但原子操作的关键用途是取代需要互斥的同步方式,那么如果原子操作内部使用了互斥,就没办法达到取代互斥的目的。为了判断一个原子操作内部是通过互斥模拟的,还是通过原子指令实现的,几乎所有标准原子类型都实现了成员函数**is_lock_free()**,允许用户查询特定原子类型的内部操作是否是通过直接的原子指令实现(返回 true),还是通过锁来实现(返回 false)。

只有一个原子类型不提供is_lock_free()成员函数:std::atomic_flag 。它是简单的布尔标志,所以必须采取无锁操作。可以利用这种简单的无锁布尔标志,实现一个简易的锁,进而基于该锁实现其他所有原子类型。

类型std::atomic_flag的对象在初始化时清零(置为false),随后即可通过成员函数test_and_set()查值并设置成立,或者由clear()清零。整个过程只有这两个操作。没有赋值,没有拷贝构造,没有“查值并清零”,也没有其他操作。

从C++17开始,所有的原子类型都包含一个静态常量表达式成员变量,std::atomic::is_always_lock_free。这个成员变量的值表示在任意给定的目标硬件上,原子类型X是否始终以无锁结构形式实现。如果在所有支持该程序运行的硬件上,原子类型X都以无锁结构形式实现,那么这个成员变量的值就为true;否则为false。


标准库还提供了一组宏 ATOMIC_xxx_LOCK_FREE ,在编译时对各种整数原子类型是否无锁进行判断。

// (C++11 起)
#define ATOMIC_BOOL_LOCK_FREE     /* 未指定 */
#define ATOMIC_CHAR_LOCK_FREE     /* 未指定 */
#define ATOMIC_CHAR16_T_LOCK_FREE /* 未指定 */
#define ATOMIC_CHAR32_T_LOCK_FREE /* 未指定 */
#define ATOMIC_WCHAR_T_LOCK_FREE  /* 未指定 */
#define ATOMIC_SHORT_LOCK_FREE    /* 未指定 */
#define ATOMIC_INT_LOCK_FREE      /* 未指定 */
#define ATOMIC_LONG_LOCK_FREE     /* 未指定 */
#define ATOMIC_LLONG_LOCK_FREE    /* 未指定 */
#define ATOMIC_POINTER_LOCK_FREE  /* 未指定 */
// (C++20 起)
#define ATOMIC_CHAR8_T_LOCK_FREE  /* 未指定 */
  • 对于一定有锁的内建原子类型是 0;
  • 对于有时无锁的内建原子类型是 1;
  • 对于一定无锁的内建原子类型是 2。

我们可以使用这些宏来对代码进行编译时的优化和检查,以确保在特定平台上原子操作的性能。例如,如果我们知道某些操作在目标平台上是无锁的,那么我们可以利用这一点进行性能优化。如果这些操作在目标平台上是有锁的,我们可能会选择其它同步机制。

// 检查 std::atomic<int> 是否总是无锁
if constexpr(std::atomic<int>::is_always_lock_free) {
    std::cout << "当前环境 std::atomic<int> 始终是无锁" << std::endl;
}
else {
    std::cout << "当前环境 std::atomic<int> 并不总是无锁" << std::endl;
}

// 使用 ATOMIC_INT_LOCK_FREE 宏进行编译时检查
#if ATOMIC_INT_LOCK_FREE == 2
    std::cout << "int 类型的原子操作一定无锁的。" << std::endl;
#elif ATOMIC_INT_LOCK_FREE == 1
    std::cout << "int 类型的原子操作有时是无锁的。" << std::endl;
#else
    std::cout << "int 类型的原子操作一定有锁的。" << std::endl;
#endif

上述代码使用 C++17 的静态数据成员 is_always_lock_free 和预处理宏来让程序执行不同的代码。因为 is_always_lock_free 是编译期常量(静态常量),所以可以使用 C++17 引入的 constexpr if ,它可以在编译阶段进行决策,避免了运行时的判断开销,提高了性能。

如果一个类型的原子操作总是无锁的,我们可以更放心地在性能关键的代码路径中使用它,无锁的原子操作可以显著减少锁的开销和竞争,提高系统的吞吐量和响应时间。

另一方面,如果发现某些原子类型在目标平台上是有锁的,我们可以考虑以下优化策略:

  1. 使用不同的数据结构:有时可以通过改变数据结构来避免对原子操作的依赖。
  2. 减少原子操作的频率:通过批处理等技术,减少对原子操作的调用次数。
  3. 使用更高效的同步机制:在一些情况下,其它同步机制(如读写锁)可能比原子操作更高效。

当然,其实很多时候根本没这种性能的担忧,我们很多时候使用原子对象只是为了简单方便,比如 std::atomic<bool> 表示状态、std::atomic<int> 进行计数等。即使它们是用了锁,那也是封装好了的,起码用着方便,而不需要在代码中引入额外的互斥量来保护,更加简洁。这也是很正常的需求,各位不但要考虑程序的性能,同时也要考虑代码的简洁性、易用性。即使使用原子类型无法带来效率的提升,那也没有负提升。


std::atomic 对象不可进行复制、赋值,因为它们的复制构造与复制赋值运算符被定义为弃置的。不过可以接受内建类型赋值,也支持隐式转换成对应的内建类型,因为它有转换函数。

atomic(const atomic&) = delete;
atomic& operator=(const atomic&) = delete;
// 转换函数
operator T() const noexcept;

转换函数的作用等价于load()原子地加载并返回原子变量的当前值

说到load(),我们其实也可以通过 load()store()exchange()compare_exchange_weak()compare_exchange_strong() 等成员函数对 std::atomic 原子类型进行操作。如果是整数类型的特化,还支持 ++--+=-=&=|=^=fetch_addfetch_sub 等操作方式。

最后强调一下:任何 std::atomic 类型,初始化不是原子操作。当我们在多个线程中同时对一个 std::atomic 对象进行初始化时,并不会自动保证线程安全。

这是因为std::atomic 类型提供了对共享数据的原子操作,但这仅仅是指对该对象进行修改(如读、写、加法、减法等)时,操作本身是原子的,即操作是不可分割的。但对于初始化操作来说,它仍然是普通的内存操作。具体来说,初始化是对象创建的一部分,而对象的创建与内存分配过程(如内存的分配和指针的设置)并没有任何与原子性相关的保障。

不过我们可以通过以下三种方式保证 std::atomic 类型有线程安全的初始化:

  1. 在主线程中初始化
  2. 使用同步机制(如互斥锁、std::call_once 等)
  3. 可以使用单例模式的初始化,确保初始化只发生一次,比如使用 std::once_flagstd::call_once 来保证在多线程环境下初始化只执行一次

2.3 内存次序

内存次序指的是在多线程环境中,线程之间的内存操作顺序。由于现代处理器和编译器通常会进行优化(如指令重排序、缓存等),线程的内存操作可能不是按程序代码中的顺序执行的。内存次序的概念就是为了控制和保证不同线程间对共享数据的访问顺序,以确保程序行为的一致性和正确性

在多线程编程中,常见的内存次序操作包括顺序一致性(sequential consistency)、**强制顺序(strong order)弱顺序(weak order)**等。

  • 顺序一致性(Sequential Consistency):要求所有线程看到的操作顺序是全局一致的,程序的执行行为按线程间的顺序一致。即每个线程中的操作执行顺序是按程序代码顺序进行的,不允许重排序
  • 强顺序(Strong Ordering):对于某些特定的内存操作(如读取、写入),强顺序要求操作顺序严格按照代码中的顺序执行。
  • 弱顺序(Weak Ordering):允许内存操作在一定程度上进行重排序,但要求特定的同步操作(如锁)保证共享数据的正确性。

而对于原子类型上的每一种操作,我们都可以提供额外的参数(这个参数可以用来指定执行顺序),从枚举类std::memory_order取值,用于设定所需的内存次序语义。枚举类std::memory_order具有6个可能的值,包括std::memory_order_relaxedstd:: memory_order_acquirestd::memory_order_consumestd::memory_order_acq_relstd::memory_order_releasestd::memory_order_seq_cst

  • std::memory_order_relaxed:不保证任何内存顺序,允许最大程度的重排序。
  • std::memory_order_consume:用于读取依赖于先前写入的值。大多数情况下和memory_order_acquire相同。
  • std::memory_order_acquire:确保当前线程的所有读取和写入操作在当前原子操作之前完成。
  • std::memory_order_release:确保当前线程的所有读取和写入操作在当前原子操作之后完成。
  • std::memory_order_acq_rel:同时拥有acquirerelease语义,适用于读写操作都涉及共享数据的情况。
  • std::memory_order_seq_cst:保证所有原子操作的顺序一致性,是最强的内存顺序保证。

原子类型的操作被划分为以下三类

  • 存储store)操作,可选用的内存次序有std::memory_order_relaxedstd::memory_order_releasestd::memory_order_seq_cst
  • 载入load)操作,可选用的内存次序有std::memory_order_relaxedstd::memory_order_consumestd::memory_order_acquirestd::memory_order_seq_cst
  • “读-改-写”read-modify-write)操作,可选用的内存次序有std::memory_order_relaxedstd::memory_order_consumestd::memory_order_acquirestd::memory_order_releasestd::memory_order_acq_relstd::memory_order_seq_cst

操作的类别决定了内存次序所准许的取值,若我们没有把内存次序显式设定成上面的值,则默认采用最严格的内存次序,即std::memory_order_seq_cst

这六种内存顺序相互组合可以实现三种顺序模型 (ordering model):

  • Sequencial consistent ordering :实现同步, 且保证全局顺序一致 (single total order) 的模型. 是一致性最强的模型, 也是默认的顺序模型
  • Acquire-release ordering: 实现同步, 但不保证保证全局顺序一致的模型
  • Relaxed ordering :不能实现同步, 只保证原子性的模型

在后面,我会专门写一篇文章对六种内存次序和三种内存模型进行分析,这里只做简单了解

2.4 std::atomic_flag

上面说过,几乎所有原子类型都提供is_lock_free()成员函数,但是只有一个原子类型不提供is_lock_free()成员函数:std::atomic_flag 。它用于表示一个布尔标志,所有只有两种状态:成立或置零(二者必居其一)。

这个类型的对象可以在两个状态间切换:设置(true)和清除(false)

在 C++20 之前,std::atomic_flag 类型的对象需要以 ATOMIC_FLAG_INIT 初始化,可以确保此时对象处于 “清除”(false)状态。

std::atomic_flag f = ATOMIC_FLAG_INIT;

C++20std::atomic_flag 的默认构造函数保证对象为“清除”(false)状态,就不再需要使用 ATOMIC_FLAG_INIT

ATOMIC_FLAG_INIT 其实并不是什么复杂的东西,它在不同的标准库实现中只是简单的初始化:在 MSVC STL 它只是一个 {},在 libstdc++libc++ 它只是一个 { 0 }。也就是说我们可以这样初始化:

std::atomic_flag f ATOMIC_FLAG_INIT;
std::atomic_flag f2 = {};
std::atomic_flag f3{};
std::atomic_flag f4{ 0 };

若标志对象已初始化,它只能做三件事情:销毁、清除、设置。这些操作对应的函数分别是:

  1. clear() (清除):将标志对象的状态原子地更改为清除(false),是一个“读-改-写”操作,默认的内存顺序是 memory_order_seq_cst
  2. test_and_set(测试并设置):将标志对象的状态原子地更改为设置(true),并返回它先前保有的值。
  3. 销毁:对象的生命周期结束时,自动调用析构函数进行销毁操作。
f.clear(std::memory_order_release);
bool r = f.test_and_set();
  1. f 的状态原子地更改为清除(false),指明 memory_order_release 内存序。
  2. f 的状态原子地更改为设置(true),并返回它先前保有的值给 r。使用默认的 memory_order_seq_cst 内存序。

std::atomic_flag 不可复制不可赋值。这不是 std::atomic_flag 特有的,而是所有原子类型共有的属性。原子类型的所有操作都是原子的,而赋值和复制涉及两个对象,破坏了操作的原子性(复制构造和复制赋值操作不具备原子性)。复制构造和复制赋值会先读取第一个对象的值,然后再写入另一个对象。对于两个独立的对象,这里实际上有两个独立的操作,合并这两个操作无法保证其原子性。因此,这些操作是不被允许的。详细说明:

复制构造和复制赋值操作涉及两个对象,这实际上是两个操作:

  1. 读取第一个对象的值(对于复制构造或赋值的目标对象);
  2. 写入到另一个对象(即目标对象)。

这两个操作并不是在一个单一的原子步骤中完成的,而是需要两个独立的步骤。这会导致以下问题:

  • 先读后写:在读第一个对象值并写入第二个对象之间,其他线程可能会修改第一个对象的值或第二个对象的值。这就破坏了操作的原子性,可能会导致数据不一致。
  • 竞态条件:这两个步骤之间如果没有正确同步(如加锁或其他同步机制),就会出现竞态条件,多个线程同时进行赋值或复制操作时,会导致结果无法预测,发生未定义行为。

2.4.1 自旋锁

自旋锁可以理解为一种忙等锁,它的基本思想是,当一个线程尝试获取锁时,如果锁已经被其他线程持有,那么该线程就会不断地循环检查锁的状态,直到成功获取到锁为止。与此相对,std::mutex 互斥量是一种睡眠锁。当线程请求锁(lock())而未能获取时,它会放弃 CPU 时间片,让其他线程得以执行,从而有效利用系统资源。

从性能上看,自旋锁的响应更快,但是睡眠锁更加节省资源,高效。

我们可以利用std::atomic_flag实现一个自旋锁:

#include <iostream>
#include <atomic>
#include <thread>

class SpinLock {
public:
    void lock() {
        //1 处
        while (flag.test_and_set(std::memory_order_acquire)); // 自旋等待,直到成功获取到锁
    }
    void unlock() {
        //2 处
        flag.clear(std::memory_order_release); // 释放锁
    }
private:
    std::atomic_flag flag = ATOMIC_FLAG_INIT;
};
  • 通过lock()函数,我们可以将flag通过test_and_set函数设为true,然后返回上一次flag的值。如果返回的为false(未持有锁),那就退出lock()函数,上锁完毕;如果返回的为true(持有锁),说明其他线程已持有该锁,无法继续上锁,通过循环调用test_and_set函数,可以实现循环的判断锁的状态,一旦其他线程解锁,当前线程便上锁;
  • 通过unlock()函数,我们可以将flag设为false,表示释放锁;
  • ATOMIC_FLAG_INIT默认设flagfalse

测试函数:

void TestSpinLock() {
    SpinLock spinlock;
    std::thread t1([&spinlock]() {
        spinlock.lock(); // 设置自旋锁
        for (int i = 0; i < 3; i++) {
            std::cout << "*";
            }
        std::cout << std::endl;
        spinlock.unlock();
        });
    
    std::thread t2([&spinlock]() {
        spinlock.lock(); // 设置自旋锁
        for (int i = 0; i < 3; i++) {
            std::cout << "?";
        }
        std::cout << std::endl;
        spinlock.unlock();
        });
    
    t1.join();
    t2.join();
}

我们的 SpinLock 对象中存储的 flag 对象在默认构造时是清除 (false) 状态。在 lock() 函数中调用 test_and_set 函数,它是原子的,只有一个线程能成功调用并将 flag 的状态原子地更改为设置 (true),并返回它先前的值 (false)。此时,该线程成功获取了锁,退出循环。

flag 对象的状态为设置 (true) 时,其它线程调用 test_and_set 函数会返回 true,导致它们继续在循环中自旋,无法退出。直到先前持有锁的线程调用 unlock() 函数,将 flag 对象的状态原子地更改为清除 (false) 状态。此时,等待的线程中会有一个线程成功调用 test_and_set 返回 false,然后退出循环,成功获取锁。

我们看到在设置时使用memory_order_acquire内存次序,在清除时使用了memory_order_release内存次序。这是为什么?

其实memory_order_acquirememory_order_release都是六种内存次序的一种,我们这节并不对内存次序进行详细的学习,这里简单的介绍二者的作用。六种内存次序的原理以及使用,在之后我会写一篇文章进行详细分析。

1.memory_order_acquire (在 lock() 中使用)

memory_order_acquire 保证的是:

  • 获取锁之后,所有在获取锁操作之前的操作都不会被重新排序到获取锁之后
  • 具体来说,当线程调用 flag.test_and_set(std::memory_order_acquire) 时,它实际上会等待直到成功获取到锁(即 flag 被设置为 true),并确保在它成功获取到锁之前,所有在这之前的内存操作(读写)不会被重排序到锁的获取之后。

为什么需要 memory_order_acquire

  • 假设在其他线程中,有对共享数据的修改操作,并且这些修改操作必须在当前线程获得锁之前完成。通过使用 memory_order_acquire,我们保证了当前线程在获取锁之后,才能读取到其它线程通过锁更新的数据,防止数据重排序导致读取到过时的或不一致的数据。
  • 确保在获取锁之前的操作顺序:它防止了内存操作的重排序,确保了线程获取锁后,它能看到所有更新的共享数据。

2. memory_order_release (在 unlock() 中使用)

memory_order_release 保证的是:

  • 释放锁时,所有在释放锁之前的操作都会对其他线程可见
  • 具体来说,当一个线程调用 flag.clear(std::memory_order_release) 来释放锁时,它确保该线程在清除 flag 并释放锁之前,所有的操作(例如对共享资源的修改)都已经完成,并且对其他线程是可见的。

为什么需要 memory_order_release

  • 在释放锁之前,线程可能已经修改了一些共享变量,或者执行了一些计算。如果没有 memory_order_release,这些修改可能在锁释放后才对其他线程可见,导致其他线程读取到不一致的数据。
  • 使用 memory_order_release 确保释放锁之前的所有内存操作都已经完成,并且会对等待获取锁的其他线程可见。这样,当其他线程在 lock() 方法中获得锁时,它们能确保看到释放锁线程在修改共享资源时的最新值。

std::atomic_flag 的局限性太强,甚至不能当普通的 bool 标志那样使用。一般最好使用std::atomic<bool>

2.5 std::atomic<bool>

std::atomic<bool> 是最基本的整数原子类型 ,它相较于 std::atomic_flag 提供了更加完善的布尔标志。虽然同样不可复制不可移动,但可以使用非原子的 bool 类型进行构造,初始化为 true 或 false(而std::atomic_flag只能初始化为false),并且能从非原子的 bool 对象赋值给 std::atomic<bool>

注意,如果不给 std::atomic 对象的变量指定值,它默认初始化为 false

// 将 std::atomic<bool> 类型的对象初始化为 true
std::atomic<bool> b{ true }; 
// 可以将非原子的bool类型 'false' 赋值给 std::atomic<bool> 类型的对象 b
b = false; 

在这里,std::atomic<bool> 类型的赋值操作 b = false 和普通的 bool 赋值操作有所不同。关键在于std::atomic 类型的赋值操作通常会确保原子性和同步问题,所以 std::atomic<bool> 的赋值操作(如 b = false返回一个普通的 bool(返回的是普通类型的bool而不是原子类型的bool)。而普通布尔类型的赋值操作并没有返回值。

std::atomic<bool> 的赋值操作(operator=)会执行原子操作,将 false 赋值给原子布尔变量 b,并返回赋值后的 bool 值(即 std::atomic<bool>当前持有的值)。上面代码的操作可以理解为:

std::atomic<bool> b{ true }; // 初始化为 true
bool old_value = (b = false);  // 赋值后,old_value 将为 false, b 为 false

这种赋值行为不仅仅适用于std::atomic<bool>,而是适用于所有std::atomic类型。


如果原子变量的赋值操作返回了一个引用,那么依赖这个结果的代码需要显式地进行加载(load,该函数用于原子的加载并返回当前变量的持有值),以确保数据的正确性。例如:

std::atomic<bool>b {true}; // 初始化为 true
auto& ref = (b = false);  // 返回 atomic 引用
bool flag = ref.load();   // 那就必须显式调用 load() 加载

赋值操作 b = false 是原子操作,它会将 false 赋值给 std::atomic<bool> 对象 b,确保线程安全。如果返回的是一个引用,即 auto& ref = (b = false);,那么 ref 就会是一个对 b 的引用。

因为std::atomic<bool> 是一个原子类型,它有专门的方法来保证内存操作的原子性。例如,load() 方法会返回原子对象的当前值,并且可以指定内存次序来控制内存访问的顺序。因此,调用 load() 来显式地获取原子变量的值是确保正确同步的方式。如果直接使用 ref,它是 std::atomic<bool> 的引用类型,并不自动解包(load)出原子值。在这时需要显式调用 load() 方法以保证返回正确的值(也就是通过 load函数将普通布尔类型的值解包出来)。

我们可以通过返回非原子值进行赋值,从而避免多余的加载(load)过程,得到实际存储的值:

std::atomic<bool> b{ true };
bool new_value = (b = false);  // new_value 将是 false

我们可以使用 store 函数来原子地替换当前对象的值,远好于 std::atomic_flagclear()test_and_set() 也可以换为更加通用常见的 exchange,它可以原子地使用新的值替换已经存储的值,并返回旧值

获取 std::atomic<bool> 的值有两种方式,调用 load() 函数,或者隐式转换(我们在原子类型中提到的转换函数,作用和load相同)。

store 是一个存储操作load 是一个加载操作exchange 是一个“读-改-写”操作:

std::atomic<bool> b; // 默认初始化为false
bool x = b.load(std::memory_order_acquire); // 获取 b 的当前持有值
b.store(true); // 将 b 的值修改为 true
x = b.exchange(false, std::memory_order_acq_rel); // 将 b 修改为false,并返回 b 持有的旧值

上面代码中各个变量的变化为:

0   // b 首先初始化为 false
0   // 使用 load() 函数原子地将 b 的当前持有值提取出来,并通过 '=' 赋值给普通布尔类型 x
1   // 使用 store() 函数原子地将 b 的值修改为true
1   // 使用 exchange() 函数原子地将 b 的值修改为 false,并返回 b 的旧值,最后通过 '=' 赋值给普通布尔类型 x

std::atomic<bool> 提供多个“读-改-写”的操作,exchange 只是其中之一。它还提供了另外一种存储方式:当前值与预期一致时,存储新值。

这种操作叫做“比较/交换”,它的形式表现为compare_exchange_weak()compare_exchang_strong()

  • compare_exchange_weak:尝试将原子对象的当前值与预期值进行比较1,如果相等则将其更新为新值(不是将expected的值赋给flag,而是有另外一个设定值)并返回 true;否则,将原子对象的值加载进 expected(进行加载操作)并返回 false此操作可能会由于某些硬件的特性而出现假失败,需要在循环中重试

    std::atomic<bool> flag{ false }; // 初始化为 false
    bool expected = false; // 比较值
    
    while (!flag.compare_exchange_weak(expected, true));
    
    • 它比较原子对象的当前持有值与预期值 expected 是否相等

    • 如果相等,则将原子对象的值更新为新值(此例为 true),并返回 true

    • 如果不相等,则不会更新原子对象的值,并将原子对象的当前值加载到 expected 中,返回 false

    返回 false 即代表出现了假失败,因此需要在循环中重试。。

  • compare_exchange_strong:类似于 compare_exchange_weak但不会出现假失败,因此不需要重试。适用于需要确保操作成功的场合。

    std::atomic<bool> flag{ false }; // 初始化为 false
    bool expected = false; // 比较值
    
    void try_set_flag() {
        // 判断 flag 的值与 expected 是否相同,如果相同,将 flag 修改为我们设定的值,并返回 true
        if (flag.compare_exchange_strong(expected, true)) {
            std::cout << "flag 为 false,设为 true。\n";
        }
        else { // 如果不相同,将 expected 的值修改为我们设定的值,并返回false
            std::cout << "flag 为 true, expected 设为 true。\n";
        }
    }
    

    假设有两个线程运行 try_set_flag 函数,那么第一个线程调用 compare_exchange_strong 将原子对象 flag 设置为 true。第二个线程调用 compare_exchange_strong,当前原子对象的值为 true,而 expectedfalse,不相等,将原子对象的值设置给 expected。此时 flagexpected 均为 true

    std::thread t1{ try_set_flag };
    std::thread t2{ try_set_flag };
    t1.join();
    t2.join();
    std::cout << "flag: " << std::boolalpha << flag << '\n';
    std::cout << "expected: " << std::boolalpha << expected << '\n';
    

    输出为:

    flag 为 false,flag 设为 true。
    flag 为 true, expected 设为 true。
    flag: true
    expected: true
    

exchange 的另一个不同是,compare_exchange_weakcompare_exchange_strong 允许指定成功和失败情况下的内存次序。这意味着可以根据成功或失败的情况,为原子操作指定不同的内存次序。

std::atomic<bool> data{ false };
bool expected = false;

// 成功时的内存序为 memory_order_release,失败时的内存序为 memory_order_acquire
if (data.compare_exchange_weak(expected, true,
    std::memory_order_release, std::memory_order_acquire)) {
    // 操作成功
}
else {
    // 操作失败
}

另一个简单的原子类型是特化的原子指针,即:std::atomic<T>,下一节我们来看看它是如何工作的。

2.6 std::atomic<T*>

std::atomic<T*> 是一个原子指针类型T 是指针所指向的对象类型。操作是针对 T 类型的指针进行的。虽然 std::atomic<T*> 不能被拷贝和移动,但它可以通过符合类型的指针进行构造和赋值

std::atomic<T*> 拥有以下成员函数:

  • load():以原子方式读取指针值。
  • store():以原子方式存储指针值。
  • exchange():以原子方式交换指针值。
  • compare_exchange_weak()compare_exchange_strong():以原子方式比较并交换指针值。

这些函数接受并返回的类型都是 T*。此外,std::atomic<T*> 还提供了以下操作:

  • fetch_add:以原子方式增加指针的值。(p.fetch_add(1) 会将指针 p 向前移动一个元素,并返回操作的指针值)
  • fetch_sub:以原子方式减少指针的值。返回操作的指针值。
  • operator+=operator-=operator++operator--:以原子方式增加或减少指针的值。返回操作的指针值。

这些操作确保在多线程环境下进行安全的指针操作,避免数据竞争和并发问题。

使用示例如下:

struct Foo {}; // 定义一个无任何成员变量的结构体

Foo array[5]{}; // 定义一个数组,元素类型为 Foo 结构体
std::atomic<Foo*> p{ array }; // 定义一个原子指针 p,指向 array 数组的第一个元素

// p 加 2,并返回原始值
Foo* x = p.fetch_add(2);
assert(x == array); // 验证 x 是原始值,即指向 array[0]
assert(p.load() == &array[2]); // 验证操作完成后,p 确实指向数组的第 3 个元素

// p 减 1,并返回操作后的值
x = (p -= 1);
assert(x == &array[1]); // 验证 x 的值确实是原始值 &array[1]
assert(p.load() == &array[1]); // p 应该指向 array[1]

// 函数也允许内存序作为给定函数的参数
p.fetch_add(3, std::memory_order_release);

2.7 标准整数原子类型

std::atomic<int>std::atomic<unsigned long long>这样的整数原子类型(有很多,char、short、int、size_t等等都是)上,我们可以执行很多操作:既包括常用的原子操作(load、store、exchange、compare_exchange_weak、compare_exchange_strong),也报价原子运算(fecth_add、fetch_sub、fetch_and、fetch——xor),以及运算的复合赋值形式(+=、-=、&=、|=、^=),还有前后缀形式的自增和自减(++x、–x、x++、x–)。

虽然std::atomic也是整数原子类型,但是它不能使用原子运算,它只能使用原子操作。

2.8 std::atomic<std::shared_ptr>

多个线程能在不同的 shared_ptr 对象上调用所有成员函数(包含复制构造函数与复制赋值)而不附加同步,即使这些实例是同一对象的副本且共享所有权也是如此。若多个执行线程访问同一 shared_ptr 对象而不同步,且任一线程使用 shared_ptr 的非 const 成员函数,则将出现数据竞争;而std::atomic<shared_ptr> 能用于避免数据竞争。

那么,std::shared_ptr 是不是线程安全的?

我直接说结论std::shared_ptr 本身对于 引用计数的修改 是线程安全的,但如果多个线程同时访问同一个 shared_ptr 管理的对象,并且至少一个线程修改了对象,那么在这种情况下 shared_ptr 并不保证线程安全,需要额外的同步。

详细解释可以参考这一篇文章,说的很清晰:当我们谈论shared_ptr的线程安全性时,我们在谈论什么

而在 C++20 中,原子模板 std::atomic 引入了一个偏特化版本 std::atomic允许用户原子地操纵 shared_ptr 对象。因为它是 std::atomic 的特化版本,即使我们还没有深入讲述它,也能知道它是原子类型,这意味着它的所有操作都是原子操作,肯定是线程安全的(即使多个执行线程不同步地同时访问同一 std::shared_ptr 对象,且任何这些访问使用了 shared_ptr 的非 const 成员函数)。

下面我分别使用std::shared_ptr和**std::atomic<std::shared_ptr>**来说明二者的区别:

class Data {
public:
    Data(int value = 0) : value_(value) {}
    int get_value() const { return value_; }
    void set_value(int new_value) { value_ = new_value; }
private:
    int value_;
};

auto data = std::make_shared<Data>();

void writer(){
    for (int i = 0; i < 10; ++i) {
        std::shared_ptr<Data> new_data = std::make_shared<Data>(i);
        data.swap(new_data); // 调用非 const 成员函数
        std::this_thread::sleep_for(100ms);
    }
}

void reader(){
    for (int i = 0; i < 10; ++i) {
        if (data) {
            std::cout << "读取线程值: " << data->get_value() << std::endl;
        }
        else {
            std::cout << "没有读取到数据" << std::endl;
        }
        std::this_thread::sleep_for(100ms);
    }
}

int main(){
    std::thread writer_thread{ writer };
    std::thread reader_thread{ reader };

    writer_thread.join();
    reader_thread.join();
}

以上这段代码是典型的线程不安全,它满足:

  1. 多个线程不同步地同时访问同一 std::shared_ptr 对象
  2. 任一线程使用 shared_ptr 的非 const 成员函数

那么为什么呢?为什么满足这些概念就是线程不安全呢?为了理解这些概念,首先需要了解 shared_ptr 的内部实现:

shared_ptr 的通常实现只保有两个指针

  • 指向底层元素的指针(get()) 所返回的指针)
  • 指向控制块 的指针

控制块是一个动态分配的对象,其中包含:

  • 指向被管理对象的指针或被管理对象本身
  • 删除器(类型擦除)
  • 分配器(类型擦除)
  • 持有被管理对象的 shared_ptr 的数量
  • 涉及被管理对象的 weak_ptr 的数量

控制块是线程安全的,这意味着多个线程可以安全地操作引用计数和访问管理对象,即使这些 shared_ptr 实例是同一对象的副本且共享所有权也是如此。因此,多个线程可以安全地创建、销毁和复制 shared_ptr 对象,因为这些操作仅影响控制块中的引用计数。也就是说对于引用计数这一变量的存储,是在堆上的,多个shared_ptr的对象都指向同一个堆地址,对引用计数的加减过程是一个原子过程,是线程安全的。

然而,shared_ptr 对象实例本身并不是线程安全的。shared_ptr 对象实例包含一个指向控制块的指针和一个指向底层元素的指针。这两个指针的操作在多个线程中并没有同步机制。因此,如果多个线程同时访问同一个 shared_ptr 对象实例并调用非 const 成员函数(如 resetoperator=),这些操作会导致对这些指针的并发修改,进而引发数据竞争。

如果不是同一 shared_ptr 对象,每个线程读写的指针也不是同一个,控制块又是线程安全的,那么自然不存在数据竞争,可以安全的调用所有成员函数。


使用 std::atomic<shared_ptr> 修改:

std::atomic<std::shared_ptr<Data>> data = std::make_shared<Data>();

void writer() {
    for (int i = 0; i < 10; ++i) {
        std::shared_ptr<Data> new_data = std::make_shared<Data>(i);
        data.store(new_data); // 原子地替换所保有的值
        std::this_thread::sleep_for(10ms);
    }
}

void reader() {
    for (int i = 0; i < 10; ++i) {
        if (auto sp = data.load()) {
            std::cout << "读取线程值: " << sp->get_value() << std::endl;
        }
        else {
            std::cout << "没有读取到数据" << std::endl;
        }
        std::this_thread::sleep_for(10ms);
    }
}

很显然,这是线程安全的,store 是原子操作,而 sp->get_value() 只是个读取操作,并会对数据进行修改,所以读操作不需要调用原子操作。

能不能调用 load() 成员函数原子地返回底层的 std::shared_ptr 再调用 swap 成员函数?

可以,但是没有意义,因为 load() 成员函数返回的是底层 std::shared_ptr副本,也就是一个临时对象。对这个临时对象调用 swap 并不会改变 data 本身的值,因此这种操作没有实际意义,尽管这不会引发数据竞争(因为是副本)。

由于我们没有对读写操作进行同步,只是确保了操作的线程安全,所以多次运行时可能会看到一些无序的打印,这是正常的。

不过事实上 std::atomic<std::shared_ptr> 的功能相当有限,单看它提供的修改接口(=storeloadexchang)就能明白。如果要操作其保护的共享指针指向的资源还是得 load() 获取底层共享指针的副本。此时再进行操作时就得考虑 std::shared_ptr 本身在多线程的支持了。


在使用 std::atomic<std::shared_ptr> 的时候,并发编程(3)中关于共享数据的一句话:

切勿将受保护数据的指针或引用传递到互斥量作用域之外,不然保护将形同虚设

原子类型也有类似的问题,以下是一个例子:

std::atomic<std::shared_ptr<int>> ptr = std::make_shared<int>(10);
*ptr.load() = 100;
  1. 调用 load() 成员函数,原子地返回底层共享指针的副本 std::shared_ptr
  2. 解引用,等价 *get(),返回了 int&
  3. 直接修改这个引用所指向的资源。

在第一步时,已经脱离了 std::atomic 的保护,第二步就获取了被保护的数据的引用,第三步进行了修改,这导致了数据竞争。当然了,这种做法非常的愚蠢,只是为了表示,所谓的线程安全,也是要靠开发者的正确使用

正确的用法如下:

std::atomic<std::shared_ptr<int>> ptr = std::make_shared<int>(10);
std::atomic_ref<int> ref{ *ptr.load() };
ref = 100; // 原子地赋 100 给被引用的对象

通过使用 std::atomic_ref 我们得以确保在修改共享资源时保持操作的原子性,从而避免了数据竞争。


waitnotify_onenotify_all 也是 atomic 特化的成员函数,在C++20以后,任何 atomic 的特化都拥有这些成员函数。使用过程类似于条件变量。

std::atomic<std::shared_ptr<int>> ptr = std::make_shared<int>();

void wait_for_wake_up(){
    std::osyncstream{ std::cout }
        << "线程 "
        << std::this_thread::get_id()
        << " 阻塞,等待更新唤醒\n";

    // 等待 ptr 变为其它值
    ptr.wait(ptr.load());

    std::osyncstream{ std::cout }
        << "线程 "
        << std::this_thread::get_id()
        << " 已被唤醒\n";
}

void wake_up(){
    std::this_thread::sleep_for(5s);

    // 更新值并唤醒
    ptr.store(std::make_shared<int>(10));
    ptr.notify_one();
}

http://www.kler.cn/a/393085.html

相关文章:

  • 【安全通信】告别信息泄露:搭建你的开源视频聊天系统briefing
  • UniApp 应用、页面与组件的生命周期详解
  • 【OceanBase 诊断调优】—— ocp上针对OB租户CPU消耗计算逻辑
  • 虚拟机安装Ubuntu 24.04服务器版(命令行版)
  • mac终端使用pytest执行iOS UI自动化测试方法
  • Openstack7--安装消息队列服务RabbitMQ
  • 【故障解决】麒麟系统右下角网络图标取消显示叹号
  • 基于Java人力资源管理系统
  • session的工作原理
  • OpenCV DNN
  • 【图文】【DIY便签】如何自行编译OPENCV使用动态库
  • GitLab 降级安装出现 500 错误,如何解决?
  • CSS教程(二)- CSS选择器
  • 【stable diffusion模型】Stable diffusion模型分几种?一文详解,入门必看!
  • 达梦清空表数据,锁超时TRUNCATE TABLE EM;说明有上一个对话框没有提交
  • 【SQL】双层嵌套< exists not exists >
  • 超文本协议和内外网的划分(详见B站泷羽sec)
  • PostCSS 介绍
  • fastapi 查询参数支持 Pydantic Model:参数校验与配置技巧
  • 用户管理【MySQL】
  • 社区物资交易互助平台(程序+数据库+报告)
  • opencv(c++)图像的灰度转换
  • 【JVM】关于JVM的内部原理你到底了解多少(八股文面经知识点)
  • 推荐一款好用的postman替代工具2024
  • php 字符串与变量
  • web浏览器环境下使用window.open()打开PDF文件不是预览,而是下载文件?