redis原理之数据结构
dict
dict,哈希表,redis 所有的 key-value 都存储在里面。如果曾经学过哈希表这种数据结构,那么很容易能写出一个来,但 redis dict 考虑了更多的功能。
// 哈希表(字典)数据结构,redis 的所有键值对都会存储在这里。其中包含两个哈希表。
typedef struct dict {
// 哈希表的类型,包括哈希函数,比较函数,键值的内存释放函数
dictType *type;
// 存储一些额外的数据
void *privdata;
// 两个哈希表
dictht ht[2];
// 哈希表重置下标,指定的是哈希数组的数组下标
int rehashidx; /* rehashing not in progress if rehashidx == -1 */
// 绑定到哈希表的迭代器个数
int iterators; /* number of iterators currently running */
} dict;
redisObject
redisObject,任何 value 都会被包装成一个 redisObject,redisObject 能指定 value 的类型,编码方式等数据属性。
typedef struct redisObject {
// 刚刚好 32 bits
// 对象的类型,字符串/列表/集合/哈希表
unsigned type:4;
// 未使用的两个位
unsigned notused:2; /* Not used */
// 编码的方式,redis 为了节省空间,提供多种方式来保存一个数据
// 譬如:“123456789” 会被存储为整数 123456789
unsigned encoding:4;
// 当内存紧张,淘汰数据的时候用到
unsigned lru:22; /* lru time (relative to server.lruclock) */
// 引用计数
int refcount;
// 数据指针
void *ptr;
} robj;
zset
zset,是一个跳表,插入删除速度非常快。
typedef struct zset {
// 哈希表
dict *dict;
// 跳表
zskiplist *zsl;
} zset;
adlist
typedef struct list {
// 头指针
listNode *head;
// 尾指针
listNode *tail;
// 数据拷贝函数指针
void *(*dup)(void *ptr);
// 析构函数指针
void (*free)(void *ptr);
// 数据比较指针
int (*match)(void *ptr, void *key);
// 链表长度
unsigned long len;
} list;
ziplist
ziplist,是一个压缩的双链表,实现了针对 CPU cache 的优化。ziplist 实际上一个字符串,通过一系列的算法来实现压缩双链表。
intset
intset,整数集合。
typedef struct intset {
// 每个整数的类型
uint32_t encoding;
// intset 长度
uint32_t length;
// 整数数组
int8_t contents[];
} intset;
sds
sds,字符串数据结构,因为经常涉及字符串的操作,redis 做了特殊的实现,文档中将其称为 Hacking String.
typedef char *sds;
redisObject 结构简介
redis 是 key-value 存储系统,其中 key 类型一般为字符串,而 value 类型则为 redis 对象(redis object)。redis 对象可以绑定各种类型的数据,譬如 string、list 和 set。
typedef struct redisObject {
// 刚刚好 32 bits
// 对象的类型,字符串/列表/集合/哈希表
unsigned type:4;
// 未使用的两个位
unsigned notused:2; /* Not used */
// 编码的方式,redis 为了节省空间,提供多种方式来保存一个数据
// 譬如:“123456789” 会被存储为整数 123456789
unsigned encoding:4;
// 当内存紧张,淘汰数据的时候用到
unsigned lru:22; /* lru time (relative to server.lruclock) */
// 引用计数
int refcount;
// 数据指针
void *ptr;
} robj;
其中,{void *ptr}已经给了我们无限的遐想空间了。
redisObject 数据的属性
redis.h 中定义了 struct redisObject,它是一个简单优秀的数据结构,因为在 redisObject 中数据属性和数据分开来了,其中,数据属性包括数据类型,存储编码方式,淘汰时钟,引用计数。下面一一展开:
数据类型
,标记了 redis 对象绑定的是什么类型的数据,有下面几种可能的值;
/* Object types */
#define REDIS_STRING 0
#define REDIS_LIST 1
#define REDIS_SET 2
#define REDIS_ZSET 3
#define REDIS_HASH 4
存储编码方式
一个数据,可以以多种方式存储。譬如,数据类型为 REDIS_SET 的数据编码方式可能为 REDIS_ENCODING_HT,也可能为 REDIS_ENCODING_INTSET。
/* Objects encoding. Some kind of objects like Strings and Hashes can be
* internally represented in multiple ways. The 'encoding' field of the object
* is set to one of this fields for this object. */
#define REDIS_ENCODING_RAW 0 /* Raw representation */
#define REDIS_ENCODING_INT 1 /* Encoded as integer */
#define REDIS_ENCODING_HT 2 /* Encoded as hash table */
#define REDIS_ENCODING_ZIPMAP 3 /* Encoded as zipmap */
#define REDIS_ENCODING_LINKEDLIST 4 /* Encoded as regular linked list */
#define REDIS_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
#define REDIS_ENCODING_INTSET 6 /* Encoded as intset */
#define REDIS_ENCODING_SKIPLIST 7 /* Encoded as skiplist */
淘汰时钟
redis 对数据集占用内存的大小有「实时」的计算,当超出限额时,会淘汰超时的数据。
引用计数
一个 redis 对象可能被多个指针引用。当需要增加或者减少引用的时候,必须调用相应的函数,程序员必须遵守这一准则。
// 增加 redis 对象引用
void incrRefCount(robj *o) {
o->refcount++;
}
// 减少 redis 对象引用。特别的,引用为零的时候会销毁对象
void decrRefCount(robj *o) {
if (o->refcount <= 0) redisPanic("decrRefCount against refcount <= 0");
// 如果取消的是最后一个引用,则释放资源
if (o->refcount == 1) {
// 不同数据类型,销毁操作不同
switch(o->type) {
case REDIS_STRING: freeStringObject(o); break;
case REDIS_LIST: freeListObject(o); break;
case REDIS_SET: freeSetObject(o); break;
case REDIS_ZSET: freeZsetObject(o); break;
case REDIS_HASH: freeHashObject(o); break;
default: redisPanic("Unknown object type"); break;
}
zfree(o);
} else {
o->refcount--;
}
}
得益于 redis 是单进程单线程工作的,所以增加/减少引用的操作不必保证原子性,这在 memcache 中是做不到的(memcached 是多线程的工作模式,需要做到互斥)。struct redisObject 把最后一个指针留给了真正的数据。
redis 数据结构 sds
sds 结构简介
sds 被称为是 Hacking String. hack 的地方就在 sds 保存了字符串的长度以及剩余空间。sds 的实现在 sds.c 中。
sds 头部的实现:
struct sdshdr {
int len;
int free;
char buf[];
};
hacking sds
倘若使用指针即 char buf,分配内存需要量两个步骤:一次分配结构体,一次分配 char buf,在是否内存的时候也需要释放两次内存:一次为 char *buf,一次为结构体内存。而用长度为 0 的字符数组可以将分配和释放内存的次数都降低为 1次,从而简化内存的管理。
另外,长度为 0 的数组即 char buf[] 不占用内存:
// char buf[] 的情况
struct sdshdr s;
printf("%d",sizeof(s));
// 8
// char *buf 的情况
struct sdshdr s;
printf("%d",sizeof(s));
// 12
redis 中涉及较多的字符串操作,譬如 APPEND 命令。相比普通的字符串,sds 获取字符串的长度以及剩余空间的复杂度都是 O(1),前者需要 O(N).
// 返回 sdshdr.len
static inline size_t sdslen(const sds s) {
struct sdshdr *sh = (void*)(s-(sizeof(struct sdshdr)));
return sh->len;
}
// 返回 sdshdr.free
static inline size_t sdsavail(const sds s) {
struct sdshdr *sh = (void*)(s-(sizeof(struct sdshdr)));
return sh->free;
}
sds.c 中还实现了针对 sds 的字符串操作函数,譬如分配,追加,释放等,这些函数具体详细的实现大家可以自行剖析。
Redis 数据结构 dict
Redis 的键值对存储在哪里
在 redis 中有多个数据集,数据集采用的数据结构是哈希表,用以存储键值对。默认所有的客户端都是使用第一个数据集,一个数据集对应一个哈希表。如果客户端有需要可以使用 select 命令来选择不同的数据集。redis 在初始化服务器的时候就会初始化所有的数据集:
void initServer() {
......
// 分配数据集空间
server.db = zmalloc(sizeof(redisDb)*server.dbnum);
......
// 初始化 redis 数据集
/* Create the Redis databases, and initialize other internal state. */
for (j = 0; j < server.REDIS_DEFAULT_DBNUM; j++) { // 初始化多个数据库
// 哈希表,用于存储键值对
server.db[j].dict = dictCreate(&dbDictType,NULL);
// 哈希表,用于存储每个键的过期时间
server.db[j].expires = dictCreate(&keyptrDictType,NULL);
......
}
......
}
哈希表 dict
我们来看看哈希表的数据结构是怎么样的:
数据集采用的数据结构是哈希表,数据真正存储在哈希表中,用开链法解决冲突问题,struct dictht 即为一个哈希表。但在 redis 哈希表数据结构 struct dict 中有两个哈希表,下文将两个哈希表分别称为第一个和第二个哈希表,redis 提供两个哈希表是为了能够在不中断服务的情况下扩展(expand)哈希表,这是很有趣的一部分。
// 可以把它认为是一个链表,提示,开链法
typedef struct dictEntry {
void *key;
union {
\\ val 指针可以指向一个 redisObject
void *val;
uint64_t u64;
int64_t s64;
} v;
struct dictEntry *next;
} dictEntry;
// 要存储多种多样的数据结构,势必不同的数据有不同的哈希算法,不同的键值比较算法,
// 不同的析构函数。
typedef struct dictType {
// 哈希函数
unsigned int (*hashFunction)(const void *key);
void *(*keyDup)(void *privdata, const void *key);
void *(*valDup)(void *privdata, const void *obj);
// 比较函数
int (*keyCompare)(void *privdata, const void *key1, const void *key2);
// 键值析构函数
void (*keyDestructor)(void *privdata, void *key);
void (*valDestructor)(void *privdata, void *obj);
} dictType;
// 一般哈希表数据结构
/* This is our hash table structure. Every dictionary has two of this as we
* implement incremental rehashing, for the old to the new table. */
typedef struct dictht {
// 两个哈希表
dictEntry **table;
// 哈希表的大小
unsigned long size;
// 哈希表大小掩码
unsigned long sizemask;
// 哈希表中数据项数量
unsigned long used;
} dictht;
// 哈希表(字典)数据结构,redis 的所有键值对都会存储在这里。其中包含两个哈希表。
typedef struct dict {
// 哈希表的类型,包括哈希函数,比较函数,键值的内存释放函数
dictType *type;
// 存储一些额外的数据
void *privdata;
// 两个哈希表
dictht ht[2];
// 哈希表重置下标,指定的是哈希数组的数组下标
int rehashidx; /* rehashing not in progress if rehashidx == -1 */
// 绑定到哈希表的迭代器个数
int iterators; /* number of iterators currently running */
} dict;
扩展哈希表
redis 为每个数据集配备两个哈希表,能在不中断服务的情况下扩展哈希表。平时哈希表扩展的做法是,为新的哈希表另外开辟一个空间,将原哈希表的数据重新计算哈希值,以移动到新哈希表。如果原哈希表数据过多,中间大量的计算过程较好费大量时间,这段时间 redis 将不能提供服务。
redis 扩展哈希表的做法有点小聪明:为第二个哈希表分配新空间,其空间大小为原哈希表键值对数量的两倍(是的,没错),接着逐步将第一个哈希表中的数据移动到第二个哈希表;待移动完毕后,将第二个哈希值赋值给第一个哈希表,第二个哈希表置空。在这个过程中,数据会分布在两个哈希表,这时候就要求在 CURD 时,都要考虑两个哈希表。
而这里,将第一个哈希表中的数据移动到第二个哈希表被称为重置哈希(rehash)。
重置哈希表
在 CURD 的时候会执行一步的重置哈希表操作,在服务器定时程序 serverCorn() 中会执行一定时间的重置哈希表操作。为什么在定时程序中重置哈希表了,还 CURD 的时候还要呢?或者反过来问。一个可能的原因是 redis 做了两手准备:在服务器空闲的时候,定时程序会完成重置哈希表;在服务器过载的时候,更多重置哈希表操作会落在 CURD 的服务上。
下面是重置哈希表的函数,其主要任务就是选择哈希表中的一个位置上的单链表,重新计算哈希值,放到第二个哈希表。
int dictRehash(dict *d, int n) {
// 重置哈希表结束,直接返回
if (!dictIsRehashing(d)) return 0;
while(n--) {
dictEntry *de, *nextde;
// 第一个哈希表为空,证明重置哈希表已经完成,将第二个哈希表赋值给第一个,
// 结束
/* Check if we already rehashed the whole table... */
if (d->ht[0].used == 0) {
zfree(d->ht[0].table);
d->ht[0] = d->ht[1];
_dictReset(&d->ht[1]);
d->rehashidx = -1;
return 0;
}
/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
assert(d->ht[0].size > (unsigned)d->rehashidx);
// 找到哈希表中不为空的位置
while(d->ht[0].table[d->rehashidx] == NULL) d->rehashidx++;
de = d->ht[0].table[d->rehashidx];
// 此位置的所有数据移动到第二个哈希表
/* Move all the keys in this bucket from the old to the new hash HT */
while(de) {
unsigned int h;
nextde = de->next;
/* Get the index in the new hash table */
// 计算哈希值
h = dictHashKey(d, de->key) & d->ht[1].sizemask;
// 头插法
de->next = d->ht[1].table[h];
d->ht[1].table[h] = de;
// 更新哈希表中的数据量
d->ht[0].used--;
d->ht[1].used++;
de = nextde;
}
// 置空
d->ht[0].table[d->rehashidx] = NULL;
// 指向哈希表的下一个位置
d->rehashidx++;
}
return 1;
}
低效率的哈希表添加、替换操作
在 redis 添加替换的时候,都先要查看数据集中是否已经存在该键,也就是一个查找的过程,如果一个redis 命令导致过多的查找,会导致效率低下。可能是为了扬长避短,即高读性能和低写性能,redis 中数据的添加和替换效率不高,特别是替换效率低的恶心。
在 redis SET 命令的调用链中,添加键值对会导致了 2次的键值对查找;替换键值对最多会导致 4次的键值对查找。在 dict 的实现中,dictFind() 和 _dictIndex() 都会导致键值对的查找,详细可以参看源码。所以,从源码来看,经常在 redis 上写不是一个明智的选择。
哈希表的迭代
在 RDB 和 AOF 持久化操作中,都需要迭代哈希表。哈希表的遍历本身难度不大,但因为每个数据集都有两个哈希表,所以遍历哈希表的时候也需要注意遍历两个哈希表:第一个哈希表遍历完毕的时候,如果发现重置哈希表尚未结束,则需要继续遍历第二个哈希表。
// 迭代器取下一个数据项的入口
dictEntry *dictNext(dictIterator *iter)
{
while (1) {
if (iter->entry == NULL) {
dictht *ht = &iter->d->ht[iter->table];
// 新的迭代器
if (iter->index == -1 && iter->table == 0) {
if (iter->safe)
iter->d->iterators++;
else
iter->fingerprint = dictFingerprint(iter->d);
}
iter->index++;
// 下标超过了哈希表大小,不合法
if (iter->index >= (signed) ht->size) {
// 如果正在重置哈希表,redis 会尝试在第二个哈希表上进行迭代,
// 否则真的就不合法了
if (dictIsRehashing(iter->d) && iter->table == 0) {
// 正在重置哈希表,证明数据正在从第一个哈希表整合到第二个哈希表,
// 则指向第二个哈希表
iter->table++;
iter->index = 0;
ht = &iter->d->ht[1];
} else {
// 否则迭代完毕,这是真正不合法的情况
break;
}
}
// 取得数据项入口
iter->entry = ht->table[iter->index];
} else {
// 取得下一个数据项人口
iter->entry = iter->nextEntry;
}
// 迭代器会保存下一个数据项的入口,因为用户可能会删除此函数返回的数据项
// 入口,如此会导致迭代器失效,找不到下一个数据项入口
if (iter->entry) {
/* We need to save the 'next' here, the iterator user
* may delete the entry we are returning. */
iter->nextEntry = iter->entry->next;
return iter->entry;
}
}
return NULL;
}
Redis 数据结构 ziplist
在 redis 中,list 有两种存储方式:双链表(LinkedList)和压缩双链表(ziplist)。双链表即普通数据结构中遇到的,在 adlist.h 和 adlist.c 中实现。压缩双链表以连续的内存空间来表示双链表,压缩双链表节省前驱和后驱指针的空间(8b),这在小的 list 上,压缩效率是非常明显的,因为一个普通的双链表中,前驱后驱指针在 64位机器上需分别占用 8B;压缩双链表在 ziplist.h 和 ziplist.c 中实现。 压缩列表是一种数据结构,这种数据结构的功能是将一系列数据与其编码信息存储在一块连续的内存区域,这块内存物理上是连续的,逻辑上被分为多个组成部分,其目的是在一定可控的时间复杂读条件下尽可能的减少不必要的内存开销,从而达到节省内存的效果
压缩双链表的具体实现
在压缩双链表中,节省了前驱和后驱指针的空间,在 64位机器上共节省了 8个字节,这让数据在内存中更为紧凑。只要清晰的描述每个数据项的边界,就可以轻易得到前驱后驱数据项的位置,ziplist 就是这么做的。
ziplist 的格式可以表示为:
<zlbytes><zltail><zllen><entry>...<entry><zlend>
zlbytes 是 ziplist 占用的空间;zltail 是最后一个数据项的偏移位置,这方便逆向遍历链表,也是双链表的特性;zllen 是数据项 entry 的个数;zlend 就是 255,占 1B. 下面详细展开 entry 的结构。
entry 的格式即为典型的 type-lenght-value,即 TLV,表述如下:
|<prelen><<encoding+lensize><len>><data>|
|---1----------------2--------------3---|
域 1)是前驱数据项的大小。因为不用描述前驱的数据类型,描述较为简单。
域 2) 是此数据项的的类型和数据大小。为了节省空间,redis 预设定了多种长度的字符串和整数。
域 3)为真正的数据。
透过 ziplist 查找函数 ziplistFind(),来熟悉 ziplist entry 的数据格式:
// 在 ziplist 中查找数据项
/* Find pointer to the entry equal to the specified entry. Skip 'skip' entries
* between every comparison. Returns NULL when the field could not be found. */
unsigned char *ziplistFind(unsigned char *p, unsigned char *vstr,
unsigned int vlen, unsigned int skip) {
int skipcnt = 0;
unsigned char vencoding = 0;
long long vll = 0;
while (p[0] != ZIP_END) {
unsigned int prevlensize, encoding, lensize, len;
unsigned char *q;
ZIP_DECODE_PREVLENSIZE(p, prevlensize);
// 跳过前驱数据项大小,解析数据项大小
// len 为 data 大小
// lensize 为 len 所占内存大小
ZIP_DECODE_LENGTH(p + prevlensize, encoding, lensize, len);
// q 指向 data
q = p + prevlensize + lensize;
if (skipcnt == 0) {
/* Compare current entry with specified entry */
if (ZIP_IS_STR(encoding)) {
// 字符串比较
if (len == vlen && memcmp(q, vstr, vlen) == 0) {
return p;
}
} else {
// 整数比较
/* Find out if the searched field can be encoded. Note that
* we do it only the first time, once done vencoding is set
* to non-zero and vll is set to the integer value. */
if (vencoding == 0) {
// 尝试将 vstr 解析为整数
if (!zipTryEncoding(vstr, vlen, &vll, &vencoding)) {
/* If the entry can't be encoded we set it to
* UCHAR_MAX so that we don't retry again the next
* time. */
// 不能编码为数字!!!会导致当前查找的数据项被跳过
vencoding = UCHAR_MAX;
}
/* Must be non-zero by now */
assert(vencoding);
}
/* Compare current entry with specified entry, do it only
* if vencoding != UCHAR_MAX because if there is no encoding
* possible for the field it can't be a valid integer. */
if (vencoding != UCHAR_MAX) {
// 读取整数
long long ll = zipLoadInteger(q, encoding);
if (ll == vll) {
return p;
}
}
}
/* Reset skip count */
skipcnt = skip;
} else {
/* Skip entry */
skipcnt--;
}
// 移动到 ziplist 的下一个数据项
/* Move to next entry */
p = q + len;
}
// 没有找到
return NULL;
}
为什么要用 ziplist
redis HSET 命令官网的描述是:
Sets field in the hash stored at key to value. If key does not exist, a new key holding a hash is created. If field already exists in the hash, it is overwritten.
实际上,HSET 底层所使用的数据结构正是上面所说的 ziplist,而不是平时所说的 hashtable。
那为什么要使用 ziplist,反对的理由是查找来说,(ziplist O(N))VS(hashtable O(1))?redis 可是为内存节省想破了头。首先 ziplist 比 hashtable 更节省内存,再者,redis 考虑到如果数据紧凑的 ziplist 能够放入 CPU 缓存(hashtable 很难,因为它是非线性的),那么查找算法甚至会比 hashtable 要快!。ziplist 由此有性能和内存空间的优势。
Redis 数据结构 skiplist
跳表(skiplist)是一个特俗的链表,相比一般的链表,有更高的查找效率,其效率可比拟于二叉查找树。
一张关于跳表和跳表搜索过程如下图:
在图中,需要寻找 68,在给出的查找过程中,利用跳表数据结构优势,只比较了 3次,横箭头不比较,竖箭头比较。由此可见,跳表预先间隔地保存了有序链表中的节点,从而在查找过程中能达到类似于二分搜索的效果,而二分搜索思想就是通过比较中点数据放弃另一半的查找,从而节省一半的查找时间。
缺点即浪费了空间,自古空间和时间两难全。
插播一段:跳表在 1990年由 William Pugh 提出,而红黑树早在 1972年由鲁道夫·贝尔发明了。红黑树在空间和时间效率上略胜跳表一筹,但跳表实现相对简单得到程序猿们的青睐。redis 和 leveldb 中都有采用跳表。
这篇文章,借着 redis 的源码了解跳表的实现。
从上图中,总结跳表的性质:
- 由很多层结构组成
- 每一层都是一个有序的链表
- 最底层(Level 1)的链表包含所有元素
- 如果一个元素出现在 Level i 的链表中,则它在 Level i 之下的链表也都会出现。
- 每个节点包含两个指针,一个指向同一链表中的下一个元素,一个指向下面一层的元素。
redis 中跳表数据结构定义:
// 跳表节点结构体
/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
// 节点数据
robj *obj;
// 分数,游戏分数?按游戏分数排序
double score;
// 后驱指针
struct zskiplistNode *backward;
// 前驱指针数组 TODO
struct zskiplistLevel {
struct zskiplistNode *forward;
// 调到下一个数据项需要走多少步,这在计算 rank 的非常有帮助
unsigned int span;
} level[];
} zskiplistNode;
typedef struct zskiplist {
// 跳表头尾指针
struct zskiplistNode *header, *tail;
// 跳表的长度
unsigned long length;
// 跳表的高度
int level;
} zskiplist;
特别的,在上图中似乎每个数据都被保存了多次,其实只保存了一次。在 struct zskiplistNode 中数据和指针是分开存储的,struct zskiplistLevel 即是一个描述跳表层级的数据结构。我们可以看到,一个节点主要由有一个存储真实数据的指针,一个后驱指针,和多个前驱指针。
跳表的插入
跳表算法描述如下:找出每一层新插入数据位置的前驱并保存,在 redis 中跳表插入是根据 score/member 的大小来决定插入的位置;将新数据插入到指定位置,并调整指针,在 redis 中还会调整 span。
什么是 span?
span 即从两个相邻节点间隔了多少节点。譬如 level 1,-1 的 span 就是 1;level 2,-1 的 span 为 2。
因为新出入数据的层数是随机的,有两种情况:
- 小于等于原有的层数
- 大于原有的层数。需要做特殊处理。
1)小于等于原有的层数
redis 中跳表插入算法的具体实现:
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
// update 是插入节点所在位置的前一个节点。我们在学习链表插入的时候,需要找到插入
// 位置的前一个节点。因为在跳表中一个节点是有多个前驱指针的,所以这里需要保存的
// 是多个节点,而不是一个节点
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
redisAssert(!isnan(score));
x = zsl->header;
// 遍历 skiplist 中所有的层,找到数据将要插入的位置,并保存在 update 中
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
// 链表的搜索
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
// update[i] 记录了新数据项的前驱
update[i] = x;
}
// random 一个 level,是随机的
/* we assume the key is not already inside, since we allow duplicated
* scores, and the re-insertion of score and redis object should never
* happen since the caller of zslInsert() should test in the hash table
* if the element is already inside or not. */
level = zslRandomLevel();
// random level 比原有的 zsl->level 大,需要增加 skiplist 的 level
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
// 插入
x = zslCreateNode(level,score,obj);
for (i = 0; i < level; i++) {
// 新节点项插到 update[i] 的后面
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
/* update span covered by update[i] as x is inserted here */
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
// 更高的 level 尚未调整 span
/* increment span for untouched levels */
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
// 调整新节点的后驱指针
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
// 调整 skiplist 的长度
zsl->length++;
return x;
}
跳表的删除
跳表的删除算和插入算法步骤类似:找出每一层需删除数据的前驱并保存;接着调整指针,在 redis 中还会调整 span。
redis 中跳表删除算法的具体实现:
// x 是需要删除的节点
// update 是 每一个层 x 的前驱数组
/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
// 调整 span 和 forward 指针
for (i = 0; i < zsl->level; i++) {
if (update[i]->level[i].forward == x) {
update[i]->level[i].span += x->level[i].span - 1;
update[i]->level[i].forward = x->level[i].forward;
} else {
// update[i]->level[i].forward == NULL,只调整 span
update[i]->level[i].span -= 1;
}
}
// 调整后驱指针
if (x->level[0].forward) {
x->level[0].forward->backward = x->backward;
} else {
zsl->tail = x->backward;
}
// 删除某一个节点后,层数 level 可能降低,调整 level
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
zsl->level--;
// 调整跳表的长度
zsl->length--;
}
Redis 中的跳表
redis 中结合跳表(skiplist)和哈希表(dict)形成一个新的数据结构 zset。添加 dict 是为了快速定位跳表中是否存在某个 member!
typedef struct zset {
dict *dict;
zskiplist *zsl;
} zset;
Redis 数据结构 intset
intset 和 dict 都是 sadd 命令的底层数据结构,当添加的所有数据都是整数时,会使用前者;否则使用后者。特别的,当遇到添加数据为字符串,即不能表示为整数时,redis 会把数据结构转换为 dict,即把 intset 中的数据全部搬迁到 dict。
intset 结构体
intset 底层本质是一个有序的、不重复的、整型的数组,支持不同类型整数。
typedef struct intset {
// 每个整数的类型
uint32_t encoding;
// intset 长度
uint32_t length;
// 整数数组
int8_t contents[];
} intset;
结构体中 intset.contents 数组没有指定长度,这样是为了方便分配释放内存。
encoding 能下面的三个值:分别是 16,32 和 64位整数:
/* Note that these encodings are ordered, so:
* INTSET_ENC_INT16 < INTSET_ENC_INT32 < INTSET_ENC_INT64. */
#define INTSET_ENC_INT16 (sizeof(int16_t))
#define INTSET_ENC_INT32 (sizeof(int32_t))
#define INTSET_ENC_INT64 (sizeof(int64_t))
intset 搜索
intset 是有序的整数数组,可以用二分搜索查找。
static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) {
int min = 0, max = intrev32ifbe(is->length)-1, mid = -1;
int64_t cur = -1;
/* The value can never be found when the set is empty */
// 集合为空
if (intrev32ifbe(is->length) == 0) {
if (pos) *pos = 0;
return 0;
} else {
/* Check for the case where we know we cannot find the value,
* but do know the insert position. */
// value 比最大元素还大
if (value > _intsetGet(is,intrev32ifbe(is->length)-1)) {
if (pos) *pos = intrev32ifbe(is->length);
return 0;
// value 比最小元素还小
} else if (value < _intsetGet(is,0)) {
if (pos) *pos = 0;
return 0;
}
}
// 熟悉的二分查找
while(max >= min) {
mid = (min+max)/2;
cur = _intsetGet(is,mid);
if (value > cur) {
min = mid+1;
} else if (value < cur) {
max = mid-1;
} else {
break;
}
}
if (value == cur) {
if (pos) *pos = mid;
return 1;
} else {
if (pos) *pos = min;
return 0;
}
}
喜欢的朋友记得点赞、收藏、关注哦!!!