当前位置: 首页 > article >正文

高并发场景下,如何用无锁实现高性能LRU缓存?

《百万人高并发场景下,我如何用无锁实现高性能LRU缓存?》

LRU算法核心原理

LRU(Least Recently Used)算法是缓存系统的核心淘汰策略,其核心逻辑可以用一张流程图描述:

(图:访问数据时触发链表重组,新增数据时触发淘汰检测)


一、分段锁设计思路

  • 分段缓存(Segment):
    将整个缓存按 key 的 hash 值划分为多个 Segment,每个 Segment 内部维护一个小型 LRU 缓存(HashMap + 双向链表)。

  • 独立锁:
    每个 Segment 都持有自己的 ReentrantLock,操作各自区域的数据时只锁定当前 Segment,而不同 Segment 的操作可以并发执行。

  • 整体映射:
    外层类根据 key 的 hash 值决定使用哪个 Segment,从而在降低锁竞争的同时仍能保证整体缓存的线程安全。


二、分段锁 LRU 缓存代码实现

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;

/**
 * SegmentedLRUCache 是一个基于分段锁的线程安全 LRU 缓存实现。
 * 通过将缓存划分为多个 Segment,每个 Segment 内部维护自己的 HashMap 和双向链表,
 * 有效降低了全局锁的竞争,提高了并发性能。
 *
 * @param <K> 键的类型
 * @param <V> 值的类型
 */
public class SegmentedLRUCache<K, V> {
    
    /**
     * 每个 Segment 内部的缓存实现,包含一个 HashMap 和双向链表,并用 ReentrantLock 保护操作。
     */
    private static class Segment<K, V> {
        private final int capacity;                    // 当前 Segment 的容量
        private final Map<K, Node<K, V>> map;          // 快速查找 key 对应节点
        private Node<K, V> head;                       // 双向链表头,最近使用的数据
        private Node<K, V> tail;                       // 双向链表尾,最久未使用的数据
        private final ReentrantLock lock = new ReentrantLock();
        
        public Segment(int capacity) {
            this.capacity = capacity;
            this.map = new HashMap<>();
        }
        
        public V get(K key) {
            lock.lock();
            try {
                Node<K, V> node = map.get(key);
                if (node == null) {
                    return null;
                }
                // 访问后将节点移动到链表头部
                moveToHead(node);
                return node.value;
            } finally {
                lock.unlock();
            }
        }
        
        public void put(K key, V value) {
            lock.lock();
            try {
                Node<K, V> node = map.get(key);
                if (node != null) {
                    // 更新值并移动到头部
                    node.value = value;
                    moveToHead(node);
                } else {
                    // 如果超出容量,则移除链表尾部节点(最久未使用)
                    if (map.size() >= capacity) {
                        if (tail != null) {
                            map.remove(tail.key);
                            removeNode(tail);
                        }
                    }
                    Node<K, V> newNode = new Node<>(key, value);
                    addNodeAtHead(newNode);
                    map.put(key, newNode);
                }
            } finally {
                lock.unlock();
            }
        }
        
        /**
         * 将节点移动到链表头部。
         */
        private void moveToHead(Node<K, V> node) {
            if (node == head) {
                return;
            }
            removeNode(node);
            addNodeAtHead(node);
        }
        
        /**
         * 从链表中移除节点。
         */
        private void removeNode(Node<K, V> node) {
            if (node.prev != null) {
                node.prev.next = node.next;
            } else {
                head = node.next;
            }
            if (node.next != null) {
                node.next.prev = node.prev;
            } else {
                tail = node.prev;
            }
            node.prev = null;
            node.next = null;
        }
        
        /**
         * 将节点添加到链表头部。
         */
        private void addNodeAtHead(Node<K, V> node) {
            node.next = head;
            node.prev = null;
            if (head != null) {
                head.prev = node;
            }
            head = node;
            if (tail == null) {
                tail = node;
            }
        }
        
        /**
         * 调试使用:打印当前 Segment 内缓存的状态(从头到尾)。
         */
        public void printSegment() {
            lock.lock();
            try {
                Node<K, V> current = head;
                while (current != null) {
                    System.out.print(current.key + ":" + current.value + "  ");
                    current = current.next;
                }
                System.out.println();
            } finally {
                lock.unlock();
            }
        }
    }
    
    /**
     * 双向链表节点,保存缓存中的键值对及前后指针。
     */
    private static class Node<K, V> {
        K key;
        V value;
        Node<K, V> prev;
        Node<K, V> next;
        
        public Node(K key, V value) {
            this.key = key;
            this.value = value;
        }
    }
    
    private final int segmentCount;         // Segment 数量
    private final Segment<K, V>[] segments;   // 分段数组
    
    /**
     * 构造方法
     *
     * @param capacity     整个缓存的总容量
     * @param segmentCount 分段数量,建议取 16 或 32
     */
    @SuppressWarnings("unchecked")
    public SegmentedLRUCache(int capacity, int segmentCount) {
        if (capacity < segmentCount) {
            throw new IllegalArgumentException("容量必须大于或等于分段数量");
        }
        this.segmentCount = segmentCount;
        segments = new Segment[segmentCount];
        // 将总容量均摊到各个 Segment 上
        int capacityPerSegment = capacity / segmentCount;
        for (int i = 0; i < segmentCount; i++) {
            segments[i] = new Segment<>(capacityPerSegment);
        }
    }
    
    /**
     * 根据 key 的 hash 值定位到对应的 Segment。
     */
    private Segment<K, V> segmentFor(K key) {
        int h = key.hashCode();
        // 使用 & 运算保证非负,再取模
        int index = (h & 0x7fffffff) % segmentCount;
        return segments[index];
    }
    
    /**
     * 获取指定 key 对应的值。
     */
    public V get(K key) {
        if (key == null) throw new NullPointerException();
        return segmentFor(key).get(key);
    }
    
    /**
     * 插入或更新一个键值对。
     */
    public void put(K key, V value) {
        if (key == null) throw new NullPointerException();
        segmentFor(key).put(key, value);
    }
    
    /**
     * 调试方法:打印所有 Segment 中缓存的状态。
     */
    public void printCache() {
        for (int i = 0; i < segmentCount; i++) {
            System.out.print("Segment " + i + ": ");
            segments[i].printSegment();
        }
    }
    
    /**
     * 测试入口:演示 SegmentedLRUCache 在多线程环境下的表现。
     */
    public static void main(String[] args) {
        // 总容量为 16,分成 4 个 Segment
        final SegmentedLRUCache<Integer, String> cache = new SegmentedLRUCache<>(16, 4);
        
        // 写线程:不断插入数据
        Runnable writer = () -> {
            for (int i = 0; i < 50; i++) {
                cache.put(i, "Value" + i);
                System.out.println(Thread.currentThread().getName() + " put: " + i + " => Value" + i);
                try {
                    Thread.sleep(20);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        };
        
        // 读线程:随机读取数据
        Runnable reader = () -> {
            for (int i = 0; i < 50; i++) {
                int key = (int) (Math.random() * 50);
                String value = cache.get(key);
                System.out.println(Thread.currentThread().getName() + " get: " + key + " => " + value);
                try {
                    Thread.sleep(30);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        };
        
        Thread writerThread1 = new Thread(writer, "Writer-1");
        Thread writerThread2 = new Thread(writer, "Writer-2");
        Thread readerThread1 = new Thread(reader, "Reader-1");
        Thread readerThread2 = new Thread(reader, "Reader-2");
        
        writerThread1.start();
        writerThread2.start();
        readerThread1.start();
        readerThread2.start();
        
        try {
            writerThread1.join();
            writerThread2.join();
            readerThread1.join();
            readerThread2.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        System.out.println("Final cache state:");
        cache.printCache();
    }
}

三、代码详解

  1. 分段划分:

    • 构造方法中,我们将总容量均摊到各个 Segment,每个 Segment 内部维护一个 HashMap 和双向链表。
    • 方法 segmentFor(key) 根据 key 的 hash 值计算出所属的 Segment,从而确保不同 key 分布到不同的 Segment。
  2. 细粒度锁定:

    • 每个 Segment 内部用 ReentrantLock 对操作进行保护,保证在同一 Segment 内的并发操作安全。
    • 不同 Segment 之间互不干扰,极大减少了锁竞争。
  3. LRU 实现细节:

    • 每个 Segment 内的双向链表用于记录访问顺序。
    • 访问(get)或更新(put)时,都会将对应节点移到链表头部;当容量超限时,移除尾部节点(最久未使用的数据)。
  4. 多线程测试:

    • main 方法中,我们启动多个读写线程,模拟高并发访问场景,并最终打印每个 Segment 内的缓存状态以验证正确性。

四、总结

通过分段锁设计,我们将单一全局锁的竞争问题转化为多个独立锁的局部竞争,从而在高并发场景下显著提升 LRU 缓存的性能。

兼顾了线程安全和高效性,非常适用于对缓存响应时间要求较高的业务场景。源码经过测试,拿走即用,你可以根据实际情况调整分段数及容量,进一步优化性能。

最后


欢迎关注gzh:加瓦点灯,每天推送干货知识!


http://www.kler.cn/a/546093.html

相关文章:

  • MySQL SQL优化策略:全面提升查询性能的实用技巧与深入分析
  • 数据分析——动态分配内存、结构体
  • STM32单片机芯片与内部85 RS232 RS485 UART ISP下载硬件选择 电路设计 IO分配
  • python学opencv|读取图像(六十八)使用cv2.Canny()函数实现图像边缘检测
  • 3dtiles——Cesium ion for Autodesk Revit Add-In插件
  • Linux 文件系统:恢复已删除文件的挑战
  • HTTP/2 ddos攻击脚本
  • Pytorch深度学习教程_1_Python基础快速掌握
  • Python用PyMC3马尔可夫链蒙特卡罗MCMC对疾病症状数据贝叶斯推断
  • wps配置deepseek
  • Texas Moves to Regulate AI 德克萨斯州着手规范人工智能
  • 用户管理中心--注册登录功能的设计
  • 【弹性计算】弹性计算的技术架构
  • 单调队列与栈
  • 分享一些处理复杂HTML结构的经验
  • 闭源大语言模型的怎么增强:提示工程 检索增强生成 智能体
  • 如何在 ONLYOFFICE 编辑器中使用 DeepSeek
  • python class详解
  • 51单片机09 DS1302时钟
  • HTN77A0F:拥有强制脉宽调制的0.7A同步降压降压变换器资料参数