C++ 并发专题 - 实现一个线程安全的队列
一:概述
本文利用 C++ 标准库中的多线程、条件变量、互斥锁等工具来实现一个线程安全的队列,并且使用多个线程来向队列中添加和获取数据。
二:实现过程:
#include <iostream>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <vector>
template <typename T>
class ThreadSafeQueue {
public:
// 向队列中添加元素
void push(const T& value) {
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(value);
cond_var_.notify_one(); // 通知一个等待的线程
}
// 从队列中取出元素,如果队列为空,阻塞等待
T pop() {
std::unique_lock<std::mutex> lock(mutex_);
cond_var_.wait(lock, [this] { return !queue_.empty(); }); // 等待直到队列非空
T value = queue_.front();
queue_.pop();
return value;
}
// 判断队列是否为空
bool empty() const {
std::lock_guard<std::mutex> lock(mutex_);
return queue_.empty();
}
private:
mutable std::mutex mutex_; // 互斥锁,保护队列
std::queue<T> queue_; // 基础队列
std::condition_variable cond_var_; // 条件变量,用于队列为空时的等待
};
// 示例:使用线程安全队列
void producer(ThreadSafeQueue<int>& queue, int numItems) {
for (int i = 0; i < numItems; ++i) {
queue.push(i);
std::cout << "Produced: " << i << std::endl;
}
}
void consumer(ThreadSafeQueue<int>& queue, int numItems) {
for (int i = 0; i < numItems; ++i) {
int item = queue.pop();
std::cout << "Consumed: " << item << std::endl;
}
}
int main() {
ThreadSafeQueue<int> queue;
const int numItems = 10;
const int numProducers = 2;
const int numConsumers = 2;
std::vector<std::thread> threads;
// 启动生产者线程
for (int i = 0; i < numProducers; ++i) {
threads.push_back(std::thread(producer, std::ref(queue), numItems / numProducers));
}
// 启动消费者线程
for (int i = 0; i < numConsumers; ++i) {
threads.push_back(std::thread(consumer, std::ref(queue), numItems / numConsumers));
}
// 等待所有线程完成
for (auto& thr : threads) {
thr.join();
}
return 0;
}