当前位置: 首页 > article >正文

(面试经典问题之无锁队列篇)无锁队列的基本原理及其作用

一、什么是无锁队列

由名字可知,无锁队列首先是个队列,并且不能使用互斥锁。

核心概念:

  • 无锁(Lock-Free):无锁编程意味着在多个线程访问共享资源时,不会使用互斥锁来避免竞态条件(race condition)。而是通过原子操作(atomic operations)来保证数据的一致性和线程安全。即使在多个线程同时访问队列时,队列的操作依然是线程安全的。
  • 队列:队列是一种数据结构,遵循“先进先出”(FIFO,First In, First Out)的原则。队列的操作通常包括插入元素(入队)和移除元素(出队)。

二、无锁队列的作用

1、避免了多线程环境下锁的开销:当多个线程尝试获取锁并操作队列时,没有获得锁的线程将进入阻塞态,当锁被释放后,线程又会从阻塞态切换为就绪态,这些过程涉及线程切换以及上下文切换,是比较大的开销,并且有cpu核心cache损坏的风险。

2、存在不能使用锁的场景:如信号处理程序(会打断程序运行),若信号处理程序使用了锁,若某线程持有了锁,当信号触发强制打断该线程后,该锁无法得到正常释放,会影响其他线程获取锁的操作,甚至造成死锁。

什么时候使用有锁队列,什么时候使用无锁队列?

当任务为耗时任务时,使用有锁队列。因为是耗时任务,所以如果使用无锁队列,则其他线程会一直在就绪态自旋,会导致cpu空转,因此可以使用自旋锁让其他线程阻塞,从而解放cpu去执行其他线程。

总的来说:

  • 有锁队列 适用于任务较为耗时、长时间的场景,其中阻塞线程反而能更有效地利用 CPU,避免无谓的自旋。
  • 无锁队列 更适合于任务较轻、对并发性要求较高的场景,能够避免线程阻塞和高开销的上下文切换,但对于耗时任务的情况,可能会引发 CPU 自旋浪费资源。

 三、无锁队列的基本原理

构成队列的数据结构主要分为数组和链表:

数组:优点:不会频繁分配空间;缺点:固定队列的数量(有边界),可能造成空间浪费;

链表:优点:实现简单,并且可以动态扩容(无边界);缺点:频繁分配空间,降低进程的性能;

如何优化?

可以使用块状链表:

块状链表 结合了数组和链表的优点,并试图克服它们的缺点。块状链表将 链表 中的元素按块(通常是一个固定大小的数组)组织起来,形成一个链式结构。每个块内部使用数组来存储数据,而块与块之间通过指针链接。

块状链表的优点:
  1. 动态扩展且高效:

    • 块状链表通过预先分配较大的块,避免了频繁的内存分配和释放。每次需要扩容时,添加一个新的块就足够了,避免了链表节点的频繁分配。
    • 在内存分配方面,块的大小通常为固定的内存块或一个较大的数组,这使得内存分配的次数减少了,降低了内存碎片问题。
  2. 避免空间浪费:

    • 每个块内部的数组长度固定,可以根据实际需求调整块的大小。如果某个块内的数据较少,也不会产生大量的空闲空间。相对来说,比起完全使用链表,块状链表的空间使用更高效。
  3. 提高性能:

    • 因为每个块内部使用的是数组,数组的内存地址是连续的,所以可以提高内存的 缓存局部性。这使得队列操作的速度大大提高。
    • 插入与删除:在一个块内部进行插入和删除操作非常高效,因为它本质上是在一个数组内操作,而不是在链表中操作指针。

四、使用原子操作实现无锁队列示例 

#include <atomic>
#include <iostream>

template <typename T>
class LockFreeQueue {
private:
    struct Node {
        T data;
        std::atomic<Node*> next;
        Node(const T& value) : data(value), next(nullptr) {}
    };

    std::atomic<Node*> head;
    std::atomic<Node*> tail;

public:
    LockFreeQueue() {
        // 创建一个哨兵节点,避免复杂的边界条件
        Node* dummy = new Node(T());
        head.store(dummy);
        tail.store(dummy);
    }

    ~LockFreeQueue() {
        // 清理队列中的所有节点
        while (head.load()) {
            Node* tmp = head.load();
            head.store(tmp->next);
            delete tmp;
        }
    }

    // **无锁入队(Enqueue)**
    void enqueue(const T& value) {
        Node* newNode = new Node(value);
        Node* oldTail = nullptr;
        Node* nullNode = nullptr;

        while (true) {
            oldTail = tail.load();
            Node* next = oldTail->next.load();

            if (oldTail == tail.load()) {
                if (next == nullptr) {
                    // 尝试将 tail->next 设置为 newNode
                    if (oldTail->next.compare_exchange_weak(nullNode, newNode)) {
                        // 入队成功,更新 tail 指针
                        tail.compare_exchange_weak(oldTail, newNode);
                        return;
                    }
                } else {
                    // tail 落后了,推进 tail 指针
                    tail.compare_exchange_weak(oldTail, next);
                }
            }
        }
    }

    // **无锁出队(Dequeue)**
    bool dequeue(T& value) {
        Node* oldHead = nullptr;

        while (true) {
            oldHead = head.load();
            Node* oldTail = tail.load();
            Node* next = oldHead->next.load();

            if (oldHead == head.load()) {
                // 队列为空(head 和 tail 指向同一个哨兵节点)
                if (oldHead == oldTail) {
                    if (next == nullptr) {
                        return false;  // 队列为空
                    }
                    // 推进 tail 指针
                    tail.compare_exchange_weak(oldTail, next);
                } else {
                    // 获取数据,并尝试移动 head 指针
                    value = next->data;
                    if (head.compare_exchange_weak(oldHead, next)) {
                        delete oldHead; // 释放旧的 head 节点
                        return true;
                    }
                }
            }
        }
    }
};
#include <thread>
#include <vector>

int main() {
    LockFreeQueue<int> queue;

    // 生产者线程(入队操作)
    std::thread producer([&queue]() {
        for (int i = 1; i <= 10; ++i) {
            queue.enqueue(i);
            std::cout << "Enqueued: " << i << std::endl;
        }
    });

    // 消费者线程(出队操作)
    std::thread consumer([&queue]() {
        int value;
        for (int i = 1; i <= 10; ++i) {
            while (!queue.dequeue(value));  // 直到成功出队
            std::cout << "Dequeued: " << value << std::endl;
        }
    });

    producer.join();
    consumer.join();

    return 0;
}


http://www.kler.cn/a/564697.html

相关文章:

  • 【easy视频 | day01】项目了解 + 登录注册 + 使用 token 作为客户端请求令牌
  • 2.4 自动化评测答疑机器人的表现-大模型ACP模拟题-真题
  • 日志分析集群安装部署(ELK) 保姆级教程
  • LLVM - 编译器前端 - 将源文件转换为抽象语法树
  • 大中型虚拟化园区网络设计
  • IDEA入门及常用快捷键
  • 【02】Cocos游戏开发引擎从0开发一款游戏-cocos项目目录结构熟悉-调试运行项目-最重要的assets资源文件认识-场景sense了解-优雅草卓伊凡
  • 3DM转换成STL
  • 解决npm run dev报错
  • JavaScript 作用域与作用域链深度解析
  • 服务器为什么会禁止 Ping?服务器禁止 Ping 的好处
  • nz-upload 手动上传 PDF预览
  • DeepSeek R1 + 飞书机器人实现AI智能助手
  • Linux中子线程会继承父线程对相关变量的可见性
  • docker-compose部署onlyoffice8.3.0并支持ssl,且支持通过nginx代理,关闭JWT配置
  • 【MySQL学习】关系数据库标准语言SQL
  • SC95F8767的学习——新工程的建立
  • WPF学习之Prism(二)
  • 11特殊函数
  • 【数据结构】二叉树(门槛极低的系统理解)