C++并发编程之普通无锁队列与单生成者单消费者队列
在C++并发编程中,无锁队列是一种常见的并发数据结构,可以用于多线程环境下的高效数据传递。下面将分别介绍一种普通无锁队列和一种单生产者单消费者(SPSC, Single Producer Single Consumer)无锁队列的设计思路,并提供相应的代码实现。
普通无锁队列
普通无锁队列的设计要点包括:
- 使用原子操作:利用
std::atomic
和CAS(Compare-And-Swap)操作来确保线程安全。 - 链表结构:使用链表来存储元素,以便在队列的两端进行插入和删除操作。
- 指针原子更新:队列的头尾指针必须使用原子操作进行更新,以确保多线程环境下的一致性。
普通无锁队列的实现
#include <atomic>
#include <memory>
template <typename T>
class LockFreeQueue {
private:
struct Node {
std::shared_ptr<T> data;
std::atomic<Node*> next;
Node(T value) : data(std::make_shared<T>(std::move(value))), next(nullptr) {}
};
std::atomic<Node*> head;
std::atomic<Node*> tail;
Node* pop_head() {
Node* const old_head = head.load();
if (old_head == tail.load()) {
return nullptr;
}
head.store(old_head->next);
return old_head;
}
public:
LockFreeQueue() : head(new Node(T())), tail(head.load()) {}
~LockFreeQueue() {
while (Node* const old_head = head.load()) {
head.store(old_head->next);
delete old_head;
}
}
void push(T value) {
std::shared_ptr<T> new_data = std::make_shared<T>(std::move(value));
Node* new_node = new Node(T());
Node* const old_tail = tail.load();
while (!old_tail->next.compare_exchange_weak(nullptr, new_node)) {
// Busy wait
}
old_tail->data = new_data;
tail.store(new_node);
}
std::shared_ptr<T> pop() {
Node* old_head = pop_head();
if (old_head == nullptr) {
return std::shared_ptr<T>();
}
std::shared_ptr<T> res = old_head->data;
delete old_head;
return res;
}
};
设计思维要点
head
和tail
的原子性:head
和tail
指针使用std::atomic
来确保多个线程可以安全地访问和更新它们。- CAS操作:在
push
操作中,使用compare_exchange_weak
来确保只有一个线程能够更新tail
指针的next
指针。 - 虚拟节点:队列初始化时创建一个虚拟节点,简化了队列的初始化和边界条件的处理。
单生产者单消费者无锁队列(SPSC)
单生产者单消费者无锁队列的设计要点包括:
- 环形缓冲区:使用固定大小的环形缓冲区来存储元素,减少内存分配的开销。
- 指针校准:在生产者和消费者之间使用两个指针(
head
和tail
)来管理队列的访问。 - 避免使用原子操作:由于只有一个生产者和一个消费者,可以在不使用原子操作的情况下实现线程安全。
单生产者单消费者无锁队列的实现
#include <vector>
#include <memory>
#include <atomic>
template <typename T, std::size_t Capacity>
class SpscLockFreeQueue {
private:
std::vector<std::shared_ptr<T>> buffer;
std::atomic<std::size_t> head;
std::atomic<std::size_t> tail;
public:
SpscLockFreeQueue() : buffer(Capacity), head(0), tail(0) {}
bool push(T value) {
std::size_t current_tail = tail.load(std::memory_order_relaxed);
std::size_t next_tail = (current_tail + 1) % Capacity;
if (next_tail == head.load(std::memory_order_acquire)) {
return false; // Queue is full
}
buffer[current_tail] = std::make_shared<T>(std::move(value));
tail.store(next_tail, std::memory_order_release);
return true;
}
std::shared_ptr<T> pop() {
std::size_t current_head = head.load(std::memory_order_relaxed);
if (current_head == tail.load(std::memory_order_acquire)) {
return std::shared_ptr<T>(); // Queue is empty
}
std::shared_ptr<T> res = buffer[current_head];
head.store((current_head + 1) % Capacity, std::memory_order_release);
return res;
}
};
设计思维要点
- 环形缓冲区:使用固定大小的
std::vector
作为环形缓冲区,减少了动态内存分配的开销。 - 指针校准:使用
head
和tail
指针来管理队列的访问,确保生产者和消费者可以正确地访问和更新队列。 - 内存顺序控制:使用
std::memory_order_relaxed
、std::memory_order_acquire
和std::memory_order_release
来控制内存顺序,确保操作的正确性和性能。
总结
- 普通无锁队列:适用于多生产者和多消费者的场景,需要使用原子操作和CAS来确保线程安全。
- 单生产者单消费者无锁队列:适用于单生产者和单消费者的场景,可以使用环形缓冲区和简单的指针操作来实现高效的无锁队列。
在这两种实现中,选择适合具体应用场景的队列类型可以显著提高并发程序的性能和可维护性。