高并发场景下,如何用无锁实现高性能LRU缓存?
《百万人高并发场景下,我如何用无锁实现高性能LRU缓存?》
LRU算法核心原理
LRU(Least Recently Used)算法是缓存系统的核心淘汰策略,其核心逻辑可以用一张流程图描述:
(图:访问数据时触发链表重组,新增数据时触发淘汰检测)
一、分段锁设计思路
-
分段缓存(Segment):
将整个缓存按 key 的 hash 值划分为多个 Segment,每个 Segment 内部维护一个小型 LRU 缓存(HashMap + 双向链表)。 -
独立锁:
每个 Segment 都持有自己的 ReentrantLock,操作各自区域的数据时只锁定当前 Segment,而不同 Segment 的操作可以并发执行。 -
整体映射:
外层类根据 key 的 hash 值决定使用哪个 Segment,从而在降低锁竞争的同时仍能保证整体缓存的线程安全。
二、分段锁 LRU 缓存代码实现
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
/**
* SegmentedLRUCache 是一个基于分段锁的线程安全 LRU 缓存实现。
* 通过将缓存划分为多个 Segment,每个 Segment 内部维护自己的 HashMap 和双向链表,
* 有效降低了全局锁的竞争,提高了并发性能。
*
* @param <K> 键的类型
* @param <V> 值的类型
*/
public class SegmentedLRUCache<K, V> {
/**
* 每个 Segment 内部的缓存实现,包含一个 HashMap 和双向链表,并用 ReentrantLock 保护操作。
*/
private static class Segment<K, V> {
private final int capacity; // 当前 Segment 的容量
private final Map<K, Node<K, V>> map; // 快速查找 key 对应节点
private Node<K, V> head; // 双向链表头,最近使用的数据
private Node<K, V> tail; // 双向链表尾,最久未使用的数据
private final ReentrantLock lock = new ReentrantLock();
public Segment(int capacity) {
this.capacity = capacity;
this.map = new HashMap<>();
}
public V get(K key) {
lock.lock();
try {
Node<K, V> node = map.get(key);
if (node == null) {
return null;
}
// 访问后将节点移动到链表头部
moveToHead(node);
return node.value;
} finally {
lock.unlock();
}
}
public void put(K key, V value) {
lock.lock();
try {
Node<K, V> node = map.get(key);
if (node != null) {
// 更新值并移动到头部
node.value = value;
moveToHead(node);
} else {
// 如果超出容量,则移除链表尾部节点(最久未使用)
if (map.size() >= capacity) {
if (tail != null) {
map.remove(tail.key);
removeNode(tail);
}
}
Node<K, V> newNode = new Node<>(key, value);
addNodeAtHead(newNode);
map.put(key, newNode);
}
} finally {
lock.unlock();
}
}
/**
* 将节点移动到链表头部。
*/
private void moveToHead(Node<K, V> node) {
if (node == head) {
return;
}
removeNode(node);
addNodeAtHead(node);
}
/**
* 从链表中移除节点。
*/
private void removeNode(Node<K, V> node) {
if (node.prev != null) {
node.prev.next = node.next;
} else {
head = node.next;
}
if (node.next != null) {
node.next.prev = node.prev;
} else {
tail = node.prev;
}
node.prev = null;
node.next = null;
}
/**
* 将节点添加到链表头部。
*/
private void addNodeAtHead(Node<K, V> node) {
node.next = head;
node.prev = null;
if (head != null) {
head.prev = node;
}
head = node;
if (tail == null) {
tail = node;
}
}
/**
* 调试使用:打印当前 Segment 内缓存的状态(从头到尾)。
*/
public void printSegment() {
lock.lock();
try {
Node<K, V> current = head;
while (current != null) {
System.out.print(current.key + ":" + current.value + " ");
current = current.next;
}
System.out.println();
} finally {
lock.unlock();
}
}
}
/**
* 双向链表节点,保存缓存中的键值对及前后指针。
*/
private static class Node<K, V> {
K key;
V value;
Node<K, V> prev;
Node<K, V> next;
public Node(K key, V value) {
this.key = key;
this.value = value;
}
}
private final int segmentCount; // Segment 数量
private final Segment<K, V>[] segments; // 分段数组
/**
* 构造方法
*
* @param capacity 整个缓存的总容量
* @param segmentCount 分段数量,建议取 16 或 32
*/
@SuppressWarnings("unchecked")
public SegmentedLRUCache(int capacity, int segmentCount) {
if (capacity < segmentCount) {
throw new IllegalArgumentException("容量必须大于或等于分段数量");
}
this.segmentCount = segmentCount;
segments = new Segment[segmentCount];
// 将总容量均摊到各个 Segment 上
int capacityPerSegment = capacity / segmentCount;
for (int i = 0; i < segmentCount; i++) {
segments[i] = new Segment<>(capacityPerSegment);
}
}
/**
* 根据 key 的 hash 值定位到对应的 Segment。
*/
private Segment<K, V> segmentFor(K key) {
int h = key.hashCode();
// 使用 & 运算保证非负,再取模
int index = (h & 0x7fffffff) % segmentCount;
return segments[index];
}
/**
* 获取指定 key 对应的值。
*/
public V get(K key) {
if (key == null) throw new NullPointerException();
return segmentFor(key).get(key);
}
/**
* 插入或更新一个键值对。
*/
public void put(K key, V value) {
if (key == null) throw new NullPointerException();
segmentFor(key).put(key, value);
}
/**
* 调试方法:打印所有 Segment 中缓存的状态。
*/
public void printCache() {
for (int i = 0; i < segmentCount; i++) {
System.out.print("Segment " + i + ": ");
segments[i].printSegment();
}
}
/**
* 测试入口:演示 SegmentedLRUCache 在多线程环境下的表现。
*/
public static void main(String[] args) {
// 总容量为 16,分成 4 个 Segment
final SegmentedLRUCache<Integer, String> cache = new SegmentedLRUCache<>(16, 4);
// 写线程:不断插入数据
Runnable writer = () -> {
for (int i = 0; i < 50; i++) {
cache.put(i, "Value" + i);
System.out.println(Thread.currentThread().getName() + " put: " + i + " => Value" + i);
try {
Thread.sleep(20);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
};
// 读线程:随机读取数据
Runnable reader = () -> {
for (int i = 0; i < 50; i++) {
int key = (int) (Math.random() * 50);
String value = cache.get(key);
System.out.println(Thread.currentThread().getName() + " get: " + key + " => " + value);
try {
Thread.sleep(30);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
};
Thread writerThread1 = new Thread(writer, "Writer-1");
Thread writerThread2 = new Thread(writer, "Writer-2");
Thread readerThread1 = new Thread(reader, "Reader-1");
Thread readerThread2 = new Thread(reader, "Reader-2");
writerThread1.start();
writerThread2.start();
readerThread1.start();
readerThread2.start();
try {
writerThread1.join();
writerThread2.join();
readerThread1.join();
readerThread2.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Final cache state:");
cache.printCache();
}
}
三、代码详解
-
分段划分:
- 构造方法中,我们将总容量均摊到各个 Segment,每个 Segment 内部维护一个 HashMap 和双向链表。
- 方法
segmentFor(key)
根据 key 的 hash 值计算出所属的 Segment,从而确保不同 key 分布到不同的 Segment。
-
细粒度锁定:
- 每个 Segment 内部用
ReentrantLock
对操作进行保护,保证在同一 Segment 内的并发操作安全。 - 不同 Segment 之间互不干扰,极大减少了锁竞争。
- 每个 Segment 内部用
-
LRU 实现细节:
- 每个 Segment 内的双向链表用于记录访问顺序。
- 访问(get)或更新(put)时,都会将对应节点移到链表头部;当容量超限时,移除尾部节点(最久未使用的数据)。
-
多线程测试:
- 在
main
方法中,我们启动多个读写线程,模拟高并发访问场景,并最终打印每个 Segment 内的缓存状态以验证正确性。
- 在
四、总结
通过分段锁设计,我们将单一全局锁的竞争问题转化为多个独立锁的局部竞争,从而在高并发场景下显著提升 LRU 缓存的性能。
兼顾了线程安全和高效性,非常适用于对缓存响应时间要求较高的业务场景。源码经过测试,拿走即用,你可以根据实际情况调整分段数及容量,进一步优化性能。
最后
欢迎关注gzh:加瓦点灯,每天推送干货知识!