(面试经典问题之无锁队列篇)无锁队列的基本原理及其作用
一、什么是无锁队列
由名字可知,无锁队列首先是个队列,并且不能使用互斥锁。
核心概念:
- 无锁(Lock-Free):无锁编程意味着在多个线程访问共享资源时,不会使用互斥锁来避免竞态条件(race condition)。而是通过原子操作(atomic operations)来保证数据的一致性和线程安全。即使在多个线程同时访问队列时,队列的操作依然是线程安全的。
- 队列:队列是一种数据结构,遵循“先进先出”(FIFO,First In, First Out)的原则。队列的操作通常包括插入元素(入队)和移除元素(出队)。
二、无锁队列的作用
1、避免了多线程环境下锁的开销:当多个线程尝试获取锁并操作队列时,没有获得锁的线程将进入阻塞态,当锁被释放后,线程又会从阻塞态切换为就绪态,这些过程涉及线程切换以及上下文切换,是比较大的开销,并且有cpu核心cache损坏的风险。
2、存在不能使用锁的场景:如信号处理程序(会打断程序运行),若信号处理程序使用了锁,若某线程持有了锁,当信号触发强制打断该线程后,该锁无法得到正常释放,会影响其他线程获取锁的操作,甚至造成死锁。
什么时候使用有锁队列,什么时候使用无锁队列?
当任务为耗时任务时,使用有锁队列。因为是耗时任务,所以如果使用无锁队列,则其他线程会一直在就绪态自旋,会导致cpu空转,因此可以使用自旋锁让其他线程阻塞,从而解放cpu去执行其他线程。
总的来说:
- 有锁队列 适用于任务较为耗时、长时间的场景,其中阻塞线程反而能更有效地利用 CPU,避免无谓的自旋。
- 无锁队列 更适合于任务较轻、对并发性要求较高的场景,能够避免线程阻塞和高开销的上下文切换,但对于耗时任务的情况,可能会引发 CPU 自旋浪费资源。
三、无锁队列的基本原理
构成队列的数据结构主要分为数组和链表:
数组:优点:不会频繁分配空间;缺点:固定队列的数量(有边界),可能造成空间浪费;
链表:优点:实现简单,并且可以动态扩容(无边界);缺点:频繁分配空间,降低进程的性能;
如何优化?
可以使用块状链表:
块状链表 结合了数组和链表的优点,并试图克服它们的缺点。块状链表将 链表 中的元素按块(通常是一个固定大小的数组)组织起来,形成一个链式结构。每个块内部使用数组来存储数据,而块与块之间通过指针链接。
块状链表的优点:
-
动态扩展且高效:
- 块状链表通过预先分配较大的块,避免了频繁的内存分配和释放。每次需要扩容时,添加一个新的块就足够了,避免了链表节点的频繁分配。
- 在内存分配方面,块的大小通常为固定的内存块或一个较大的数组,这使得内存分配的次数减少了,降低了内存碎片问题。
-
避免空间浪费:
- 每个块内部的数组长度固定,可以根据实际需求调整块的大小。如果某个块内的数据较少,也不会产生大量的空闲空间。相对来说,比起完全使用链表,块状链表的空间使用更高效。
-
提高性能:
- 因为每个块内部使用的是数组,数组的内存地址是连续的,所以可以提高内存的 缓存局部性。这使得队列操作的速度大大提高。
- 插入与删除:在一个块内部进行插入和删除操作非常高效,因为它本质上是在一个数组内操作,而不是在链表中操作指针。
四、使用原子操作实现无锁队列示例
#include <atomic>
#include <iostream>
template <typename T>
class LockFreeQueue {
private:
struct Node {
T data;
std::atomic<Node*> next;
Node(const T& value) : data(value), next(nullptr) {}
};
std::atomic<Node*> head;
std::atomic<Node*> tail;
public:
LockFreeQueue() {
// 创建一个哨兵节点,避免复杂的边界条件
Node* dummy = new Node(T());
head.store(dummy);
tail.store(dummy);
}
~LockFreeQueue() {
// 清理队列中的所有节点
while (head.load()) {
Node* tmp = head.load();
head.store(tmp->next);
delete tmp;
}
}
// **无锁入队(Enqueue)**
void enqueue(const T& value) {
Node* newNode = new Node(value);
Node* oldTail = nullptr;
Node* nullNode = nullptr;
while (true) {
oldTail = tail.load();
Node* next = oldTail->next.load();
if (oldTail == tail.load()) {
if (next == nullptr) {
// 尝试将 tail->next 设置为 newNode
if (oldTail->next.compare_exchange_weak(nullNode, newNode)) {
// 入队成功,更新 tail 指针
tail.compare_exchange_weak(oldTail, newNode);
return;
}
} else {
// tail 落后了,推进 tail 指针
tail.compare_exchange_weak(oldTail, next);
}
}
}
}
// **无锁出队(Dequeue)**
bool dequeue(T& value) {
Node* oldHead = nullptr;
while (true) {
oldHead = head.load();
Node* oldTail = tail.load();
Node* next = oldHead->next.load();
if (oldHead == head.load()) {
// 队列为空(head 和 tail 指向同一个哨兵节点)
if (oldHead == oldTail) {
if (next == nullptr) {
return false; // 队列为空
}
// 推进 tail 指针
tail.compare_exchange_weak(oldTail, next);
} else {
// 获取数据,并尝试移动 head 指针
value = next->data;
if (head.compare_exchange_weak(oldHead, next)) {
delete oldHead; // 释放旧的 head 节点
return true;
}
}
}
}
}
};
#include <thread>
#include <vector>
int main() {
LockFreeQueue<int> queue;
// 生产者线程(入队操作)
std::thread producer([&queue]() {
for (int i = 1; i <= 10; ++i) {
queue.enqueue(i);
std::cout << "Enqueued: " << i << std::endl;
}
});
// 消费者线程(出队操作)
std::thread consumer([&queue]() {
int value;
for (int i = 1; i <= 10; ++i) {
while (!queue.dequeue(value)); // 直到成功出队
std::cout << "Dequeued: " << value << std::endl;
}
});
producer.join();
consumer.join();
return 0;
}