Redis Sorted Set 跳表的实现原理和分析
跳表(Skip List)是一种随机化的数据结构,基于有序链表,通过在链表上增加多级索引来提高数据的查找效率。它是由 William Pugh 在 1990 年提出的。
为什么 Redis 中的 Sorted Set 使用跳跃表
Redis 的有序集合(Sorted Set)使用跳跃表(Skip List)作为底层实现,主要是因为跳跃表在性能、实现复杂度和灵活性等方面具有显著优势,可以替代平衡树的数据结构。
跳跃表的原理
跳跃表是一种扩展的有序链表,通过维护多个层级的索引来提高查找效率。每个节点包含一个数据元素和一组指向其他节点的指针,这些指针分布在不同的层级。最底层的链表包含所有元素,而每一层的节点数量逐渐减少。这样,查找操作可以从高层开始,快速跳过不需要的元素,减少遍历的节点数,从而提高查找效率。
-
查找过程:从最高层的头节点开始,沿着索引节点向右查找,如果当前节点的下一个节点的值大于或等于查找的目标值,则向下移动到下一层继续查找;否则,向右移动到下一个索引节点。这个过程一直持续到找到目标节点或到达最底层链表。
-
插入和删除操作:跳跃表支持高效的动态插入和删除,时间复杂度均为 O(log n)。插入时,首先找到插入位置,然后根据随机函数决定新节点的层数,最后在相应的层中插入节点。
跳跃表的优势
-
简单性:跳跃表的实现相对简单,易于理解和维护,而平衡树(如红黑树)的实现较为复杂,容易出错。
-
高效的范围查询:跳跃表在进行范围查询时效率更高,因为它是有序的链表,可以直接遍历后继节点,而平衡树需要中序遍历,复杂度更高。
-
灵活性:跳跃表的层数可以根据需要动态调整,适应不同的查询需求。
-
高并发性能:跳跃表的节点可以支持多个线程同时插入或删除节点,而平衡树和哈希表通常需要复杂的并发控制。
-
空间效率:跳跃表的空间复杂度为 O(n),并且通过调整节点的抽取间隔,可以灵活平衡空间和时间复杂度。
正是因为有这些优势,Redis 选择使用跳跃表来实现有序集合,而不是红黑树或其他数据结构。这使得 Redis 在处理有序集合时能够高效地支持插入、删除和查找操作,同时保持元素的有序性,非常适合实现如排行榜、范围查询等功能。
为了讲明白跳表的原理,现在我们来模拟一个简单的跳表实现。
在 Java 中模拟实现 Redis 的 Sorted Set 跳表,我们需要定义跳表的数据结构,包括节点和跳表本身。以下是一个简单的实现:
import java.util.Random;
public class SkipList {
private static final double P = 0.5; // 节点晋升的概率
private static final int MAX_LEVEL = 16; // 最大层数
private int levelCount; // 当前跳表的层数
private Node head; // 头节点
private int size; // 跳表中元素的数量
private Random random; // 随机数生成器
public SkipList() {
this.levelCount = 0;
this.size = 0;
this.head = new Node(-1, Integer.MIN_VALUE, MAX_LEVEL);
this.random = new Random();
}
// 节点定义
private class Node {
int value;
int key;
Node[] forward; // 向前指针数组
Node(int value, int key, int level) {
this.value = value;
this.key = key;
this.forward = new Node[level + 1];
}
}
// 随机生成节点的层数
private int randomLevel() {
int level = 0;
while (random.nextFloat() < P && level < MAX_LEVEL) {
level++;
}
return level;
}
// 搜索指定值的节点
public Node search(int key) {
Node current = head;
for (int i = levelCount - 1; i >= 0; i--) {
while (current.forward[i] != null && current.forward[i].key < key) {
current = current.forward[i]; // 沿着当前层的指针移动
}
}
current = current.forward[0]; // 移动到第0层
return current;
}
// 插入节点
public void insert(int key, int value) {
Node[] update = new Node[MAX_LEVEL + 1];
Node current = head;
// 查找插入位置
for (int i = levelCount - 1; i >= 0; i--) {
while (current.forward[i] != null && current.forward[i].key < key) {
current = current.forward[i];
}
update[i] = current;
}
int level = randomLevel(); // 随机生成层数
if (level > levelCount) {
levelCount = level;
for (int i = levelCount - 1; i >= 0; i--) {
update[i] = head;
}
}
current = new Node(value, key, level);
for (int i = 0; i < level; i++) {
current.forward[i] = update[i].forward[i];
update[i].forward[i] = current;
}
size++;
}
// 删除节点
public void delete(int key) {
Node[] update = new Node[MAX_LEVEL + 1];
Node current = head;
// 查找要删除的节点
for (int i = levelCount - 1; i >= 0; i--) {
while (current.forward[i] != null && current.forward[i].key < key) {
current = current.forward[i];
}
update[i] = current;
}
current = current.forward[0];
if (current != null && current.key == key) {
for (int i = 0; i < levelCount; i++) {
if (update[i].forward[i] != current) {
break;
}
update[i].forward[i] = current.forward[i];
}
size--;
while (levelCount > 0 && head.forward[levelCount - 1] == null) {
levelCount--;
}
}
}
// 打印跳表
public void printList() {
Node current = head.forward[0];
while (current != null) {
System.out.println(current.key + ":" + current.value);
current = current.forward[0];
}
}
// 主函数测试跳表
public static void main(String[] args) {
SkipList list = new SkipList();
list.insert(3, 100);
list.insert(6, 300);
list.insert(4, 400);
list.insert(7, 700);
list.insert(5, 500);
list.printList();
list.delete(3);
list.printList();
}
}
下面,V 哥来解释一下:
-
跳表节点:每个节点包含一个值、一个键和一个向前指针数组。向前指针数组存储了指向同一层下一个节点的引用。
-
随机层数:每个节点的层数是根据预设的概率
P
随机生成的,这样可以模拟出不同高度的索引层。 -
搜索操作:从最高层开始,沿着当前层的指针移动,直到找到插入点或到达底层。
-
插入操作:首先找到插入位置,然后根据随机生成的层数创建新节点,并更新每一层的指针。
-
删除操作:找到要删除的节点,然后逐层更新指针,最后减少跳表的层数。
-
打印跳表:从底层的头节点开始,遍历打印每个节点的键和值。
从这个简化版的跳表实现可以看到跳表的基本操作。可以帮助我们理解跳表实现的原理。
如何在 Java 中实现跳表的并发操作?
在 Java 中实现跳表的并发操作需要考虑线程安全问题。可以通过以下方法来实现:
-
使用同步块:在每个公共方法中使用
synchronized
关键字来确保同一时间只有一个线程可以执行方法。 -
使用锁:使用
ReentrantLock
或其他并发锁来控制对跳表的并发访问。 -
使用原子引用:使用
AtomicReference
或AtomicReferenceArray
来确保节点的原子更新。 -
使用并发集合:使用
ConcurrentHashMap
等并发集合作为辅助工具来实现线程安全的跳表。
以下是一个使用 synchronized
关键字实现线程安全的跳表的示例:
import java.util.Random;
public class ConcurrentSkipList {
private static final double P = 0.5; // 节点晋升的概率
private static final int MAX_LEVEL = 16; // 最大层数
private int levelCount; // 当前跳表的层数
private Node head; // 头节点
private int size; // 跳表中元素的数量
private Random random; // 随机数生成器
public ConcurrentSkipList() {
this.levelCount = 0;
this.size = 0;
this.head = new Node(-1, Integer.MIN_VALUE, MAX_LEVEL);
this.random = new Random();
}
private class Node {
int value;
int key;
Node[] forward; // 向前指针数组
Node(int value, int key, int level) {
this.value = value;
this.key = key;
this.forward = new Node[level + 1];
}
}
private int randomLevel() {
int level = 0;
while (random.nextFloat() < P && level < MAX_LEVEL) {
level++;
}
return level;
}
public synchronized Node search(int key) {
Node current = head;
for (int i = levelCount - 1; i >= 0; i--) {
while (current.forward[i] != null && current.forward[i].key < key) {
current = current.forward[i];
}
}
current = current.forward[0];
return current;
}
public synchronized void insert(int key, int value) {
Node[] update = new Node[MAX_LEVEL + 1];
Node current = head;
for (int i = levelCount - 1; i >= 0; i--) {
while (current.forward[i] != null && current.forward[i].key < key) {
current = current.forward[i];
}
update[i] = current;
}
int level = randomLevel();
if (level > levelCount) {
for (int i = levelCount; i < level; i++) {
update[i] = head;
}
levelCount = level;
}
current = new Node(value, key, level);
for (int i = 0; i < level; i++) {
current.forward[i] = update[i].forward[i];
update[i].forward[i] = current;
}
size++;
}
public synchronized void delete(int key) {
Node[] update = new Node[MAX_LEVEL + 1];
Node current = head;
for (int i = levelCount - 1; i >= 0; i--) {
while (current.forward[i] != null && current.forward[i].key < key) {
current = current.forward[i];
}
update[i] = current;
}
current = current.forward[0];
if (current != null && current.key == key) {
for (int i = 0; i < levelCount; i++) {
if (update[i].forward[i] != current) {
break;
}
update[i].forward[i] = current.forward[i];
}
size--;
while (levelCount > 0 && head.forward[levelCount - 1] == null) {
levelCount--;
}
}
}
public synchronized void printList() {
Node current = head.forward[0];
while (current != null) {
System.out.println(current.key + ":" + current.value);
current = current.forward[0];
}
}
public static void main(String[] args) {
ConcurrentSkipList list = new ConcurrentSkipList();
list.insert(3, 100);
list.insert(6, 300);
list.insert(4, 400);
list.insert(7, 700);
list.insert(5, 500);
list.printList();
list.delete(3);
list.printList();
}
}
在这个示例中,我们使用了 synchronized
关键字来确保 search
、insert
和 delete
方法是线程安全的。这会锁定当前对象,确保同一时间只有一个线程可以执行这些方法。
请注意,虽然 synchronized
可以提供线程安全,但它也可能导致性能瓶颈,特别是在高并发环境下。在实际应用中,可以考虑使用更细粒度的锁,如 ReentrantLock
,或者使用原子引用和其他并发工具来提高性能。
使用 ReentrantLock
的实现方式
使用 ReentrantLock
实现跳表的并发操作可以提供比 synchronized
更细粒度的锁定,从而提高并发性能。ReentrantLock
允许您在不同的方法中锁定和解锁,甚至可以在不同的类中使用同一个锁对象。
以下是使用 ReentrantLock
实现线程安全的跳表的示例:
import java.util.Random;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConcurrentSkipList {
private static final double P = 0.5; // 节点晋升的概率
private static final int MAX_LEVEL = 16; // 最大层数
private final Lock lock = new ReentrantLock(); // 创建一个 ReentrantLock 对象
private int levelCount; // 当前跳表的层数
private Node head; // 头节点
private int size; // 跳表中元素的数量
private Random random; // 随机数生成器
public ConcurrentSkipList() {
this.levelCount = 0;
this.size = 0;
this.head = new Node(Integer.MIN_VALUE, MAX_LEVEL);
this.random = new Random();
}
private class Node {
int value;
int key;
Node[] forward; // 向前指针数组
Node(int key, int level) {
this.key = key;
this.forward = new Node[level + 1];
}
}
private int randomLevel() {
int level = 0;
while (random.nextFloat() < P && level < MAX_LEVEL) {
level++;
}
return level;
}
public void search(int key) {
lock.lock(); // 加锁
try {
Node current = head;
for (int i = levelCount - 1; i >= 0; i--) {
while (current.forward[i] != null && current.forward[i].key < key) {
current = current.forward[i];
}
}
current = current.forward[0];
// ... 处理找到的节点
} finally {
lock.unlock(); // 释放锁
}
}
public void insert(int key, int value) {
lock.lock(); // 加锁
try {
Node[] update = new Node[MAX_LEVEL + 1];
Node current = head;
for (int i = levelCount - 1; i >= 0; i--) {
while (current.forward[i] != null && current.forward[i].key < key) {
current = current.forward[i];
}
update[i] = current;
}
int level = randomLevel();
if (level > levelCount) {
levelCount = level;
}
current = new Node(value, key, level);
for (int i = 0; i < level; i++) {
current.forward[i] = update[i].forward[i];
update[i].forward[i] = current;
}
size++;
} finally {
lock.unlock(); // 释放锁
}
}
public void delete(int key) {
lock.lock(); // 加锁
try {
Node[] update = new Node[MAX_LEVEL + 1];
Node current = head;
for (int i = levelCount - 1; i >= 1; i--) {
while (current.forward[i] != null && current.forward[i].key < key) {
current = current.forward[i];
}
update[i] = current;
}
current = current.forward[0];
if (current != null && current.key == key) {
for (int i = 0; i < levelCount; i++) {
if (update[i].forward[i] != current) {
break;
}
update[i].forward[i] = current.forward[i];
}
size--;
while (levelCount > 0 && head.forward[levelCount - 1] == null) {
levelCount--;
}
}
} finally {
lock.unlock(); // 释放锁
}
}
public void printList() {
lock.lock(); // 加锁
try {
Node current = head.forward[0];
while (current != null) {
System.out.println(current.key + ":" + current.value);
current = current.forward[0];
}
} finally {
lock.unlock(); // 释放锁
}
}
public static void main(String[] args) {
ConcurrentSkipList list = new ConcurrentSkipList();
list.insert(3, 100);
list.insert(6, 300);
list.insert(4, 400);
list.insert(7, 700);
list.insert(5, 500);
list.printList();
list.delete(3);
list.printList();
}
}
在这个示例中,我们使用了 ReentrantLock
对象来控制对跳表的并发访问。每个公共方法在执行之前都会调用 lock.lock()
加锁,并在执行完毕后(包括正常执行和异常退出)调用 lock.unlock()
释放锁。
使用 ReentrantLock
的好处是它提供了比 synchronized
更灵活的锁定机制,例如:
-
可中断的锁定:
ReentrantLock
允许线程在尝试获取锁时被中断。 -
尝试非阻塞锁定:
ReentrantLock
提供了tryLock()
方法,允许线程尝试获取锁而不无限等待。 -
超时获取锁:
ReentrantLock
还提供了tryLock(long timeout, TimeUnit unit)
方法,允许线程在指定时间内等待锁。 -
公平锁:
ReentrantLock
可以选择是否使用公平锁,公平锁会按照线程请求锁的顺序来分配锁。 -
多个条件变量:
ReentrantLock
可以与多个Condition
对象配合使用,而synchronized
只能与一个条件变量配合使用。
理解了以上代码实现原理后,我们再来理解 Redis Sorted Set 就不难了。
Redis 的 Sorted Set 是一种包含元素和关联分数的数据结构,它能够根据分数对元素进行排序。Sorted Set 在 Redis 中的内部实现可以是跳跃表(Skip List)和字典(Hash Table)的组合,或者是压缩列表(Zip List),具体使用哪种实现取决于 Sorted Set 的大小和元素的长度。
跳跃表 + 字典实现
当 Sorted Set 的元素数量较多或者元素长度较长时,Redis 使用跳跃表和字典来实现 Sorted Set。跳跃表是一种概率平衡的数据结构,它通过多级索引来提高搜索效率,类似于二分查找。字典则用于快速查找和更新操作。
跳跃表的每个节点包含以下信息:
- 元素(member)
- 分数(score)
- 后退指针(backward)
- 多层前进指针(forward),每一层都是一个有序链表
字典则存储了元素到分数的映射,以便快速访问。
压缩列表实现
当 Sorted Set 的元素数量较少(默认小于128个),并且所有元素的长度都小于64字节时,Redis 使用压缩列表来存储 Sorted Set。压缩列表是一种连续内存块的顺序存储结构,它将所有的元素和分数紧凑地存储在一起,以节省内存空间。
应用场景
Sorted Set 常用于以下场景:
- 排行榜:例如游戏中的玩家分数排名。
- 范围查询:例如获取一定分数范围内的用户。
- 任务调度:例如按照任务的优先级执行。
- 实时排名:例如股票价格的实时排名。
代码分析
在 Redis 源码中,Sorted Set 的实现主要在 t_zset.c
文件中。插入操作(zaddCommand
)会根据 Sorted Set 的编码类型(跳跃表或压缩列表)来执行不同的逻辑。如果是跳跃表编码,那么插入操作会涉及到字典的更新和跳跃表节点的添加。如果是压缩列表编码,则会检查是否需要转换为跳跃表编码。
总结
Sorted Set 是 Redis 提供的一种强大的有序数据结构,它结合了跳跃表和字典的优点,提供了高效的插入、删除、更新和范围查询操作。通过合理的使用 Sorted Set,可以有效地解决许多实际问题。如何以上内容对你有帮助,恳请点赞转发,V 哥在此感谢各位兄弟的支持。88,洗洗睡了。