多线程程序的测试和调试_第11章_《C++并发编程实战》笔记
多线程程序的测试和调试
- 1. 并发相关Bug的核心类型
- 1.1 数据竞争(Data Race)
- 1.2 死锁(Deadlock)
- 1.3 活锁(Livelock)
- 2. 定位并发Bug的技巧
- 3. 代码优化与修复示例
- 3.1 修复数据竞争(使用原子操作)
- 3.2 避免死锁(统一锁顺序)
- 4. 总结
- 5. 多选题目及答案
- 6. 设计题目
- 7. 设计题目参考答案
1. 并发相关Bug的核心类型
1.1 数据竞争(Data Race)
- 定义:多线程同时访问共享数据,且至少有一个线程进行写操作,且未正确同步。
- 示例代码:
#include<bits/stdc++.h>
int counter = 0; // 共享变量
void increment() {
for (int i = 0; i < 100000; ++i) {
++counter; // 未同步的写操作
}
}
int main() {
std::thread t1(increment);
std::thread t2(increment);
t1.join();
t2.join();
std::cout << counter; // 结果不确定!
}
1.2 死锁(Deadlock)
定义:多个线程相互等待对方释放锁,导致永久阻塞。
// 经典场景:锁顺序不一致
#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
std::mutex m1, m2;
void thread_a() {
std::cout << "Thread A is trying to lock mutex m1." << std::endl;
std::lock_guard<std::mutex> lk1(m1);
std::cout << "Thread A has locked mutex m1." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::cout << "Thread A is trying to lock mutex m2." << std::endl;
std::lock_guard<std::mutex> lk2(m2); // 可能在此处死锁
std::cout << "Thread A has locked mutex m2." << std::endl;
std::cout << "Thread A is doing some work with both mutexes." << std::endl;
}
void thread_b() {
std::cout << "Thread B is trying to lock mutex m2." << std::endl;
std::lock_guard<std::mutex> lk1(m2);
std::cout << "Thread B has locked mutex m2." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::cout << "Thread B is trying to lock mutex m1." << std::endl;
std::lock_guard<std::mutex> lk2(m1); // 顺序与thread_a相反
std::cout << "Thread B has locked mutex m1." << std::endl;
std::cout << "Thread B is doing some work with both mutexes." << std::endl;
}
int main() {
std::cout << "Main thread starts." << std::endl;
// 创建两个线程
std::thread t1(thread_a);
std::thread t2(thread_b);
// 等待线程完成(由于可能死锁,这里可能不会正常完成)
t1.join();
t2.join();
std::cout << "Main thread ends." << std::endl;
return 0;
}
// 解决:1. 固定顺序 2. 原子操作 3. 使用 std::adopt_lock
解决方法:
- 固定顺序: 两个线程都先获取 m1 再获取 m2,避免了循环等待,从而防止死锁。
#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
std::mutex m1, m2;
void thread_a() {
std::cout << "Thread A is trying to lock mutex m1." << std::endl;
std::lock_guard<std::mutex> lk1(m1);
std::cout << "Thread A has locked mutex m1." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::cout << "Thread A is trying to lock mutex m2." <<< std::endl;
std::lock_guard<std::mutex> lk2(m2);
std::cout << "Thread A has locked mutex m2." << std::endl;
std::cout << "Thread A is doing some work with both mutexes." << std::endl;
}
void thread_b() {
// 与 thread_a 保持相同的锁获取顺序
std::cout << "Thread B is trying to lock mutex m1." << std::endl;
std::lock_guard<std::mutex> lk1(m1);
std::cout << "Thread B has locked mutex m1." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::cout << "Thread B is trying to lock mutex m2." << std::endl;
std::lock_guard<std::mutex> lk2(m2);
std::cout << "Thread B has locked mutex m2." << std::endl;
std::cout << "Thread B is doing some work with both mutexes." << std::endl;
}
int main() {
std::cout << "Main thread starts." << std::endl;
std::thread t1(thread_a);
std::thread t2(thread_b);
t1.join();
t2.join();
std::cout << "Main thread ends." << std::endl;
return 0;
}
- 原子操作: 使用 std::atomic 类型的变量模拟锁的状态,通过 exchange 和 store 操作来进行加锁和解锁。线程会不断尝试获取锁,直到成功。
#include <iostream>
#include <thread>
#include <atomic>
#include <chrono>
std::atomic<bool> m1_locked(false);
std::atomic<bool> m2_locked(false);
void thread_a() {
while (m1_locked.exchange(true, std::memory_order_acquire)) {
std::this_thread::yield();
}
std::cout << "Thread A has locked m1." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
while (m2_locked.exchange(true, std::memory_order_acquire)) {
std::this_thread::yield();
}
std::cout << "Thread A has locked m2." << std::endl;
std::cout << "Thread A is doing some work with both mutexes." << std::endl;
m2_locked.store(false, std::memory_order_release);
m1_locked.store(false, std::memory_order_release);
}
void thread_b() {
// 与 thread_a 保持相同的锁获取顺序
while (m1_locked.exchange(true, std::memory_order_acquire)) {
std::this_thread::yield();
}
std::cout << "Thread B has locked m1." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
while (m2_locked.exchange(true, std::memory_order_acquire)) {
std::this_thread::yield();
}
std::cout << "Thread B has locked m2." << std::endl;
std::cout << "Thread B is doing some work with both mutexes." << std::endl;
m2_locked.store(false, std::memory_order_release);
m1_locked.store(false, std::memory_order_release);
}
int main() {
std::cout << "Main thread starts." << std::endl;
std::thread t1(thread_a);
std::thread t2(thread_b);
t1.join();
t2.join();
std::cout << "Main thread ends." << std::endl;
return 0;
}
- 使用 std::adopt_lock: 手动锁定互斥量,然后使用 std::lock_guard 结合 std::adopt_lock 接管已锁定的互斥量,确保在作用域结束时自动解锁。同时保持锁的获取顺序一致,避免死锁。
#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
std::mutex m1, m2;
void thread_a() {
m1.lock();
std::cout << "Thread A has locked mutex m1." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
m2.lock();
std::cout << "Thread A has locked mutex m2." << std::endl;
{
std::lock_guard<std::mutex> lk1(m1, std::adopt_lock);
std::lock_guard<std::mutex> lk2(m2, std::adopt_lock);
std::cout << "Thread A is doing some work with both mutexes." << std::endl;
}
}
void thread_b() {
// 与 thread_a 保持相同的锁获取顺序
m1.lock();
std::cout << "Thread B has locked mutex m1." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
m2.lock();
std::cout << "Thread B has locked mutex m2." << std::endl;
{
std::lock_guard<std::mutex> lk1(m1, std::adopt_lock);
std::lock_guard<std::mutex> lk2(m2, std::adopt_lock);
std::cout << "Thread B is doing some work with both mutexes." << std::endl;
}
}
int main() {
std::cout << "Main thread starts." << std::endl;
std::thread t1(thread_a);
std::thread t2(thread_b);
t1.join();
t2.join();
std::cout << "Main thread ends." << std::endl;
return 0;
}
1.3 活锁(Livelock)
定义:线程不断重试某个操作,但始终无法推进。
示例:两个线程互相“礼让”:
#include <iostream>
#include <thread>
#include <mutex>
std::mutex m1, m2;
void worker1() {
while (true) {
if (m1.try_lock()) {
std::cout << "Worker1 locked m1." << std::endl;
if (m2.try_lock()) {
std::cout << "Worker1 locked m2." << std::endl;
m2.unlock();
std::cout << "Worker1 unlocked m2." << std::endl;
}
m1.unlock();
std::cout << "Worker1 unlocked m1." << std::endl;
}
std::this_thread::yield();
}
}
void worker2() {
while (true) {
if (m2.try_lock()) {
std::cout << "Worker2 locked m2." << std::endl;
if (m1.try_lock()) {
std::cout << "Worker2 locked m1." << std::endl;
m1.unlock();
std::cout << "Worker2 unlocked m1." << std::endl;
}
m2.unlock();
std::cout << "Worker2 unlocked m2." << std::endl;
}
std::this_thread::yield();
}
}
int main() {
std::thread t1(worker1);
std::thread t2(worker2);
t1.join();
t2.join();
return 0;
}
解决方法:引入随机退避(backoff)机制。
#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
#include <random>
std::mutex m1, m2;
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(1, 500); // 生成 1 到 500 毫秒的随机数
void worker1() {
while (true) {
if (m1.try_lock()) {
std::cout << "Worker1 acquired m1." << std::endl;
if (m2.try_lock()) {
std::cout << "Worker1 acquired m2. Doing work..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // 模拟工作
m2.unlock();
std::cout << "Worker1 released m2." << std::endl;
m1.unlock();
std::cout << "Worker1 released m1. Work done." << std::endl;
break;
} else {
m1.unlock();
std::cout << "Worker1 couldn't acquire m2. Backing off..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(dis(gen))); // 随机退避
}
} else {
std::cout << "Worker1 couldn't acquire m1. Backing off..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(dis(gen))); // 随机退避
}
}
}
void worker2() {
while (true) {
if (m2.try_lock()) {
std::cout << "Worker2 acquired m2." << std::endl;
if (m1.try_lock()) {
std::cout << "Worker2 acquired m1. Doing work..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // 模拟工作
m1.unlock();
std::cout << "Worker2 released m1." << std::endl;
m2.unlock();
std::cout << "Worker2 released m2. Work done." << std::endl;
break;
} else {
m2.unlock();
std::cout << "Worker2 couldn't acquire m1. Backing off..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(dis(gen))); // 随机退避
}
} else {
std::cout << "Worker2 couldn't acquire m2. Backing off..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(dis(gen))); // 随机退避
}
}
}
int main() {
std::thread t1(worker1);
std::thread t2(worker2);
t1.join();
t2.join();
std::cout << "Both threads finished." << std::endl;
return 0;
}
2. 定位并发Bug的技巧
- 代码审查要点
- 共享数据:识别所有跨线程访问的数据,检查同步措施。
- 锁的范围:确保临界区最小化,避免持有锁时阻塞(如I/O)。
- 工具辅助
动态分析工具:
- Valgrind (Helgrind):检测数据竞争、锁顺序问题。
valgrind --tool=helgrind ./your_program
静态分析工具:
- Clang线程安全注解:标记变量为GUARDED_BY(mutex)。
- 日志与断点调试
- 记录线程ID:
std::cout << "Thread ID: " << std::this_thread::get_id() << " counter=" << counter << std::endl;
- 条件断点(GDB):
break file.cpp:10 if counter == 42
- 单元测试策略
- 并发压力测试:重复运行并发代码,增加竞态暴露概率
TEST(ConcurrencyTest, DataRace) {
for (int i = 0; i < 1000; ++i) {
int counter = 0;
std::thread t1([&]{ ++counter; });
std::thread t2([&]{ ++counter; });
t1.join();
t2.join();
ASSERT_EQ(counter, 2); // 可能失败
}
}
3. 代码优化与修复示例
3.1 修复数据竞争(使用原子操作)
- 修改后代码:
#include <atomic>
std::atomic<int> counter{0}; // 原子变量
void increment() {
for (int i = 0; i < 100000; ++i) {
++counter; // 原子操作,无竞争
}
}
// 结果:counter最终准确为200000。
3.2 避免死锁(统一锁顺序)
void safe_thread() {
std::lock(m1, m2); // 同时获取两把锁
std::lock_guard<std::mutex> lk1(m1, std::adopt_lock);
std::lock_guard<std::mutex> lk2(m2, std::adopt_lock);
// 安全操作
}
4. 总结
- 核心原则:最小化共享数据、优先使用高层抽象(如std::async)、利用RAII管理锁。
- 调试心法:怀疑一切共享状态,工具验证结合代码审查。
- 测试策略:确定性测试(如检查不变式)结合随机压力测试。
5. 多选题目及答案
-
关于多线程数据竞争的描述,以下哪些说法正确?
A. 所有不加锁的并发写操作必然引发数据竞争
B. 数据竞争发生的条件是多个线程访问同一共享资源且至少一个线程执行写操作
C. 使用volatile关键字可以完全消除数据竞争
D. 原子操作的内存顺序选择不当可能导致数据竞争 -
死锁的产生需要满足以下哪些必要条件?
A. 线程优先级不同
B. 互斥条件(资源独占)
C. 持有并等待(部分占用+请求新资源)
D. 系统存在多个CPU核心 -
以下哪些方法是定位死锁的有效手段?
A. 用Valgrind的Helgrind工具检测
B. 在代码中增加sleep(1)调试
C. 将多线程逻辑全部改为单线程执行观察结果
D. 分析代码中是否存在"锁的顺序不一致" -
以下关于多线程测试策略的描述,正确的有?
A. 静态代码分析工具可以100%发现所有竞态条件
B. 压力测试能增加线程调度的随机性,更容易暴露竞态问题
C. 为稳定复现问题,应在测试代码中全程固定线程执行顺序
D. 增加sleep()的时间参数可以彻底解决时序敏感型Bug -
关于活锁(Livelock)的描述,正确的有?
A. 活锁的直接表现是线程长时间无法获取所需资源
B. 活锁可以通过引入随机退避机制来缓解
C. 活锁的线程实际处于运行状态但无法推进任务
D. 修改线程优先级是解决活锁的最佳方法 -
B、D
解析:
B正确(数据竞争的条件是并发访问共享变量+至少一个写操作,见11.1节)。
D正确(原子操作若内存序选择错误,如memory_order_relaxed可能导致其他线程看到不一致的状态)。
A错误(并非所有无锁写都会竞争,如原子操作);C错误(volatile不保证原子性,无法消除竞争)。 -
B、C
解析:
死锁必要条件是:互斥、持有并等待、不可抢占、循环等待(书中11.1节)。BD非必要条件。 -
A、D
解析:
A正确(Helgrind是工具示例)。D正确(锁顺序不一致是常见死锁原因)。
B错误(sleep增加不确定性),C错误(单线程无法触发死锁)。 -
B
解析:
B正确(压力测试提高随机性,利于发现竞态,见11.2节);
A错误(静态分析无法覆盖所有情况);CD均错误(固定顺序掩盖问题,sleep无法根治Bug)。 -
B、C
解析:
B正确(随机退避可打破活锁循环,如11.2节建议);C正确(活锁线程仍在运行但无进展)。
A描述的是死锁;D错误(优先级无关活锁本质)。
重点回顾
- 数据竞争要求 并发写+无同步,原子操作需注意内存序。
- 死锁的必要条件需完整满足,调试时关注锁顺序和使用工具。
- 活锁需通过行为观察(线程活跃但无进展),引入随机性可解。
- 多线程测试需依赖压力测试和代码分析,而非强制时序控制。
6. 设计题目
- 数据竞争检测与修复
场景描述:
以下代码存在数据竞争问题,多个线程同时修改共享计数器。请修复该问题并解释原理。
#include <iostream>
#include <thread>
#include <vector>
int counter = 0;
void increment() {
for (int i = 0; i < 1000; ++i) {
++counter;
}
}
int main() {
std::vector<std::thread> threads;
for (int i = 0; i < 10; ++i) {
threads.emplace_back(increment);
}
for (auto& t : threads) t.join();
std::cout << "Final counter: " << counter << "\n";
}
- 死锁场景分析与解决
问题描述:
以下代码可能产生死锁,请分析原因并提供修复方案。
#include <mutex>
#include <thread>
std::mutex mtx1, mtx2;
void threadA() {
std::lock_guard<std::mutex> lock1(mtx1);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::lock_guard<std::mutex> lock2(mtx2);
}
void threadB() {
std::lock_guard<std::mutex> lock2(mtx2);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::lock_guard<std::mutex> lock1(mtx1);
}
int main() {
std::thread t1(threadA), t2(threadB);
t1.join(); t2.join();
std::cout << "Program completed\n";
}
- 条件变量虚假唤醒实战
问题场景:
以下代码使用条件变量存在虚假唤醒风险,请修复并说明如何验证。
#include <condition_variable>
#include <mutex>
std::mutex mtx;
std::condition_variable cv;
bool ready = false;
void consumer() {
std::unique_lock<std::mutex> lock(mtx);
if (!ready) cv.wait(lock); // 错误使用if判断
std::cout << "Consumed\n";
}
void producer() {
std::lock_guard<std::mutex> lock(mtx);
ready = true;
cv.notify_all();
}
// 测试函数
void test() {
std::thread c(consumer), p(producer);
c.join(); p.join();
}
- 题目4:原子操作实战
问题描述:
以下非原子操作导致数据竞争,请改为原子操作并验证。
#include <thread>
int non_atomic = 0; // 普通变量
void increment() {
for (int i = 0; i < 10000; ++i) {
++non_atomic;
}
}
// main创建10个线程调用increment
- 活锁问题解决
问题场景:
两个线程不断尝试获取资源导致活锁,请修改退让策略。
#include <thread>
#include <atomic>
#include <iostream>
#include <chrono>
using namespace std;
void worker(atomic<bool>& my_flag, atomic<bool>& other_flag) {
while (true) {
while (other_flag.load()) { // 如果对方标志有效
my_flag.store(false); // 重置自己的标志
this_thread::yield(); // 让出CPU
}
my_flag.store(true); // 尝试获取资源
// 临界区开始
if (!other_flag.load()) { // 再次检查确保资源可用
cout << "Thread " << this_thread::get_id() << " entered critical section\n";
this_thread::sleep_for(100ms); // 模拟工作
my_flag.store(false); // 释放资源
return;
}
my_flag.store(false); // 获取失败,重试
}
}
int main() {
atomic<bool> flag1{false};
atomic<bool> flag2{false};
thread t1([&]() { worker(flag1, flag2); });
thread t2([&]() { worker(flag2, flag1); });
t1.join();
t2.join();
}
/*
导致活锁的原因:两个线程同时检查到对方标志为false,都将自己标志设为true,随后立即互相让步,导致无限循环而无法进入临界区。
*/
7. 设计题目参考答案
- 答案:
#include <mutex>
std::mutex mtx; // 添加互斥锁
void increment() {
for (int i = 0; i < 1000; ++i) {
std::lock_guard<std::mutex> lock(mtx);
++counter;
}
}
/*
原问题发生在++counter操作的非原子性。多线程直接修改共享变量会导致数据竞争。通过互斥锁确保操作的原子性,每次修改前获取锁。测试结果从随机值变为稳定的10000。
*/
- 答案:
void threadA() {
std::lock(mtx1, mtx2); // 同时锁定多个互斥量
std::lock_guard<std::mutex> lock1(mtx1, std::adopt_lock);
std::lock_guard<std::mutex> lock2(mtx2, std::adopt_lock);
}
void threadB() {
std::lock(mtx1, mtx2);
std::lock_guard<std::mutex> lock2(mtx2, std::adopt_lock);
std::lock_guard<std::mutex> lock1(mtx1, std::adopt_lock);
}
/*
原代码因获取锁的顺序不一致导致死锁可能。解决方案使用std::lock()原子化锁定多个互斥量,保证获取顺序一致。测试显示程序能正常完成执行。
*/
- 答案
void consumer() {
std::unique_lock<std::mutex> lock(mtx);
while (!ready) cv.wait(lock); // 改为循环检查
// ...后续操作
}
/*
条件变量可能因系统原因虚假唤醒,使用while循环能重复检查条件保证正确性。添加多个消费者线程测试可验证修复效果。
*/
- 原子解决方案:
#include <atomic>
std::atomic<int> atomic_counter{0}; // 原子变量
void increment() {
for (int i = 0; i < 10000; ++i) {
atomic_counter.fetch_add(1, std::memory_order_relaxed);
}
}
/*
将普通int改为std::atomic<int>保证操作的原子性。memory_order_relaxed适用于无需严格顺序的场景。测试结果稳定输出100000。
*/
- 解决方案:
#include <thread>
#include <atomic>
#include <iostream>
#include <chrono>
#include <random>
using namespace std;
void worker(atomic<bool>& my_flag, atomic<bool>& other_flag) {
random_device rd;
mt19937 gen(rd());
uniform_int_distribution<> dist(1, 100);
while (true) {
my_flag.store(true);
while (other_flag.load()) {
my_flag.store(false);
// 退让策略:随机延迟打破对称性
this_thread::sleep_for(dist(gen) * 1ms);
my_flag.store(true);
}
// 临界区
cout << "Thread " << this_thread::get_id() << " entered critical section\n";
this_thread::sleep_for(100ms); // 模拟工作
my_flag.store(false);
break;
}
}
int main() {
atomic<bool> flag1{false};
atomic<bool> flag2{false};
thread t1([&]() { worker(flag1, flag2); });
thread t2([&]() { worker(flag2, flag1); });
t1.join();
t2.join();
}
/*
1.随机退避:通过随机延迟破坏两个线程的同步让步
2.标志设置顺序:先设置自己的标志为true,再检查对方标志
3.引入概率:让线程的等待时间出现差异化,大大降低活锁概率
*/