C++ Latch 和 Barrier: 新手指南
随着并发和并行编程的重要性日益增加, 理解像 Latch 和 Barrier 这样的同步原语对于现代 C++ 开发者来说至关重要. 这些工具在 C++20 中引入, 用于高效地协调多线程工作. 本文将概述它们的用法, 好处以及实际示例.
什么是 Latch 和 Barrier?
-
Latch(门闩):
std::latch
是一种一次性使用的同步原语, 允许一个线程(或一组线程)等待, 直到计数器减少到零.- 使用场景: 通常用于确保一组线程在满足某个前置条件(例如初始化资源)之前不会继续执行.
-
Barrier(屏障):
std::barrier
是一种可重复使用的同步原语, 允许一组线程反复等待, 直到所有线程到达某个点(称为阶段或屏障点).- 使用场景: 适用于将工作划分为多个阶段的算法, 确保所有线程完成一个阶段后再进入下一个阶段.
为什么要使用 Latch 和 Barrier?
- 提高代码可读性: 与使用互斥锁或条件变量的手动实现相比, 同步逻辑更易读, 更易维护.
- 性能优化: 这些原语针对特定使用场景进行了优化, 比通用同步工具性能更好.
- 避免死锁: 通过明确定义同步点, 减少代码中引入死锁的可能性.
代码示例
示例 1: 使用 std::latch
在这个示例中, 我们有多个线程模拟不同的任务工作. 主线程需要等待所有工作线程完成任务后, 才继续后续的操作.
#include <iostream>
#include <latch>
#include <random>
#include <syncstream>
#include <thread>
#include <vector>
void worker(std::latch& latch, int id) {
// 使用 osyncstream 同步输出
std::osyncstream(std::cout) << "线程 " << id << " 正在工作...\n";
// 生成一个随机值, 范围为[100, 500]
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(100, 500);
int sleep_duration = dis(gen);
// sleep一段时间, 模拟工作
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_duration));
// 发布完成信号并同步输出
std::osyncstream(std::cout) << "线程 " << id << " 完成了任务. \n";
latch.count_down();
}
int main() {
constexpr int num_threads = 5;
std::latch latch(num_threads);
std::vector<std::thread> threads;
for (int i = 0; i < num_threads; ++i) {
threads.emplace_back(worker, std::ref(latch), i + 1);
}
// 等待所有线程完成任务
latch.wait();
std::cout << "所有线程已完成任务. \n";
for (auto& t : threads) {
t.join();
}
return 0;
}
输出:
线程 1 正在工作...
线程 5 正在工作...
线程 3 正在工作...
线程 4 正在工作...
线程 2 正在工作...
线程 4 完成了任务.
线程 2 完成了任务.
线程 3 完成了任务.
线程 5 完成了任务.
线程 1 完成了任务.
所有线程已完成任务.
输出解释:
- 各线程独立工作, 并在完成后递减 latch 计数器.
main
线程等待计数器归零后再继续执行.
示例 2: 多阶段任务
下面这个例子演示了多阶段任务中不同线程之间的同步. 此时latch
的局限性就显示出来了: 需要声明多个latch
, 这不是好的写法. 下个例子中将展示如何用barrier
更好的解决这个问题.
#include <iostream>
#include <latch>
#include <syncstream>
#include <thread>
#include <vector>
void worker(std::latch& latchA, std::latch& latchB, std::latch& latchC,
int id) {
// 任务A
std::this_thread::sleep_for(std::chrono::milliseconds(100 + id * 100));
std::osyncstream(std::cout) << "线程 " << id << " 完成了任务 A. \n";
latchA.arrive_and_wait();
// 任务B
std::this_thread::sleep_for(std::chrono::milliseconds(100 + id * 100));
std::osyncstream(std::cout) << "线程 " << id << " 完成了任务 B. \n";
latchB.arrive_and_wait();
// 任务C
std::this_thread::sleep_for(std::chrono::milliseconds(100 + id * 100));
std::osyncstream(std::cout) << "线程 " << id << " 完成了任务 C. \n";
latchC.arrive_and_wait();
}
int main() {
constexpr int num_threads = 5;
std::latch latchA(num_threads);
std::latch latchB(num_threads);
std::latch latchC(num_threads);
std::vector<std::thread> threads;
for (int i = 0; i < num_threads; ++i) {
threads.emplace_back(worker, std::ref(latchA), std::ref(latchB),
std::ref(latchC), i + 1);
}
for (auto& t : threads) {
t.join();
}
return 0;
}
输出:
线程 1 完成了任务 A.
线程 2 完成了任务 A.
线程 3 完成了任务 A.
线程 4 完成了任务 A.
线程 5 完成了任务 A.
线程 1 完成了任务 B.
线程 2 完成了任务 B.
线程 3 完成了任务 B.
线程 4 完成了任务 B.
线程 5 完成了任务 B.
线程 1 完成了任务 C.
线程 2 完成了任务 C.
线程 3 完成了任务 C.
线程 4 完成了任务 C.
线程 5 完成了任务 C.
可以看到任务 A,B,C 是依次被完成的. 没有出现任务之间的乱序.
示例 3: 使用 std::barrier
在这个示例中, 我们设计了一个分阶段的任务, 每个线程在每个阶段完成工作后需要等待其他线程同步, 然后再进入下一阶段.
#include <barrier>
#include <iostream>
#include <syncstream>
#include <thread>
#include <vector>
void worker(std::barrier<>& barrier, int id) {
// 任务A
std::this_thread::sleep_for(std::chrono::milliseconds(100 + id * 100));
std::osyncstream(std::cout) << "线程 " << id << " 完成了任务 A. \n";
barrier.arrive_and_wait();
// 任务B
std::this_thread::sleep_for(std::chrono::milliseconds(100 + id * 100));
std::osyncstream(std::cout) << "线程 " << id << " 完成了任务 B. \n";
barrier.arrive_and_wait();
// 任务C
std::this_thread::sleep_for(std::chrono::milliseconds(100 + id * 100));
std::osyncstream(std::cout) << "线程 " << id << " 完成了任务 C. \n";
barrier.arrive_and_wait();
}
int main() {
constexpr int num_threads = 5;
std::barrier barrier(num_threads);
std::vector<std::thread> threads;
for (int i = 0; i < num_threads; ++i) {
threads.emplace_back(worker, std::ref(barrier), i + 1);
}
for (auto& t : threads) {
t.join();
}
return 0;
}
输出:
线程 1 完成了任务 A.
线程 2 完成了任务 A.
线程 3 完成了任务 A.
线程 4 完成了任务 A.
线程 5 完成了任务 A.
线程 1 完成了任务 B.
线程 2 完成了任务 B.
线程 3 完成了任务 B.
线程 4 完成了任务 B.
线程 5 完成了任务 B.
线程 1 完成了任务 C.
线程 2 完成了任务 C.
线程 3 完成了任务 C.
线程 4 完成了任务 C.
线程 5 完成了任务 C.
输出解释:
- 每个线程处理一个阶段, 并在屏障点等待.
- 可选的完成操作在所有线程完成阶段后执行.
何时使用?
-
Latch:
- 当需要一次性的同步点时.
- 示例: 确保所有资源初始化完成后再开始处理.
-
Barrier:
- 当需要多次迭代任务并在每次迭代后同步时.
- 示例: 具有阶段执行的并行算法, 如矩阵乘法.
优势
- 易用性: 相比传统同步机制, 减少了样板代码.
- 可扩展性: 为高性能多线程程序设计.
- 调试友好: 简化线程协调的逻辑, 降低错误概率.
使用时需要注意的事项
-
Latch 的常见问题:
- 计数器不足或过多: 确保
count_down
调用的次数准确. 如果某些线程未正确调用count_down
, 可能导致wait
永远不会返回.- 解决方法: 在代码逻辑中严格控制
count_down
的调用次数.
- 解决方法: 在代码逻辑中严格控制
- 无法重复使用:
std::latch
是一次性的, 如果需要多次使用, 请选择std::barrier
.
- 计数器不足或过多: 确保
-
Barrier 的常见问题:
- 线程不平衡: 某些线程可能比其他线程运行得更慢, 导致其他线程在
arrive_and_wait
阻塞过久.- 解决方法: 优化线程的工作量, 尽量均衡任务分配.
- 未正确完成一个阶段: 如果某个线程在某一阶段抛出异常或终止, 可能会导致整个程序卡在屏障点.
- 解决方法: 确保所有线程在异常情况下也能够安全退出, 或捕获异常并手动调用屏障完成操作.
- 线程不平衡: 某些线程可能比其他线程运行得更慢, 导致其他线程在
-
资源释放问题:
- 如果在
latch
或barrier
的作用范围内提前释放相关资源, 可能会导致未定义行为.- 解决方法: 确保
latch
和barrier
的生命周期覆盖所有线程的操作.
- 解决方法: 确保
- 如果在
-
死锁风险:
- 如果线程逻辑中存在相互依赖关系, 可能会导致死锁.
- 解决方法: 尽量减少线程之间的依赖, 并确保每个线程都能独立完成其任务.
- 如果线程逻辑中存在相互依赖关系, 可能会导致死锁.
通过使用 std::latch
和 std::barrier
, C++ 开发者可以编写出更健壮, 可读性更高, 性能更优的多线程程序. 无论您是并发编程的新手还是经验丰富的开发者, 这些工具都应该成为您的编程工具箱的一部分!
参考链接
- std::latch - cppreference.com
- std::barrier - cppreference.com
- C++20 屏障 std::latch 哔哩哔哩
- C++20 屏障 std::barrier 哔哩哔哩
源码链接
源码链接