redis7.x源码分析:(3) dict字典
dict字典采用经典hash表数据结构实现,由键值对组成,类似于C++中的unordered_map。两者在代码实现层面存在一些差异,比如gnustl的unordered_map分配的桶数组个数是(质数n),而dict分配的桶数组个数是(2^n);另外,dict对hash值相同的key采用了常规的开链法存储,而unordered_map在采用开链法的前提下,又使用了_M_before_begin将不同桶中的链表串联成了一个大链表,从而将遍历算法复杂度优化为O(n);还有就是,dict为应对服务器性能上的特殊要求,设计成了双hash表的形式,这也使得它在rehash,各种操作存在一些特殊性,我在下面的代码分析中会说到。
dict在redis里面的用途十分广泛,几乎所有的模块都会用到,其中的两大核心用途是:
1. 16个数据库空间
2. hash和zset类型数据的存储
dict相关结构定义:
// 节点
typedef struct dictEntry {
// 任意类型键
void *key;
// 存储的值
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
// 同一个桶中链表的下一个元素
struct dictEntry *next; /* Next entry in the same hash bucket. */
void *metadata[]; /* An arbitrary number of bytes (starting at a
* pointer-aligned address) of size as returned
* by dictType's dictEntryMetadataBytes(). */
} dictEntry;
typedef struct dict dict;
// 存储不同类型数据的字典,设置不同的处理函数
typedef struct dictType {
uint64_t (*hashFunction)(const void *key);
void *(*keyDup)(dict *d, const void *key);
void *(*valDup)(dict *d, const void *obj);
int (*keyCompare)(dict *d, const void *key1, const void *key2);
void (*keyDestructor)(dict *d, void *key);
void (*valDestructor)(dict *d, void *obj);
int (*expandAllowed)(size_t moreMem, double usedRatio);
/* Allow a dictEntry to carry extra caller-defined metadata. The
* extra memory is initialized to 0 when a dictEntry is allocated. */
size_t (*dictEntryMetadataBytes)(dict *d);
} dictType;
#define DICTHT_SIZE(exp) ((exp) == -1 ? 0 : (unsigned long)1<<(exp))
#define DICTHT_SIZE_MASK(exp) ((exp) == -1 ? 0 : (DICTHT_SIZE(exp))-1)
struct dict {
// 字典类型,不同的类型有不同的hash函数,dup函数
dictType *type;
// 双哈希表指针数组
dictEntry **ht_table[2];
// 存放的节点数
unsigned long ht_used[2];
// rehash索引(哈希表的下标),大于 -1 表示正在rehash
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
/* Keep small vars at end for optimal (minimal) struct padding */
// rehash暂停标志
int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */
// 表示2的多少次幂,哈希表的大小=2^ht_size_exp
signed char ht_size_exp[2]; /* exponent of size. (size = 1<<exp) */
};
hash表的创建比较简单直接略过,先看下 _dictExpand 的实现,它在hash表扩容缩容和创建时都会用到。当添加dictAdd时,存储的节点数 used / size >= 1,就需要调用它扩容。
int _dictExpand(dict *d, unsigned long size, int* malloc_failed)
{
if (malloc_failed) *malloc_failed = 0;
/* the size is invalid if it is smaller than the number of
* elements already inside the hash table */
// 正在rehash或者 used / size > 1直接退出
if (dictIsRehashing(d) || d->ht_used[0] > size)
return DICT_ERR;
/* the new hash table */
dictEntry **new_ht_table;
unsigned long new_ht_used;
// 获取第一个 2^N > size 的N的大小
signed char new_ht_size_exp = _dictNextExp(size);
/* Detect overflows */
// 2^N 作为hash数组的长度, 另外判断分配的大小是否合法
size_t newsize = 1ul<<new_ht_size_exp;
if (newsize < size || newsize * sizeof(dictEntry*) < newsize)
return DICT_ERR;
/* Rehashing to the same table size is not useful. */
if (new_ht_size_exp == d->ht_size_exp[0]) return DICT_ERR;
/* Allocate the new hash table and initialize all pointers to NULL */
if (malloc_failed) {
new_ht_table = ztrycalloc(newsize*sizeof(dictEntry*));
*malloc_failed = new_ht_table == NULL;
if (*malloc_failed)
return DICT_ERR;
} else
new_ht_table = zcalloc(newsize*sizeof(dictEntry*));
new_ht_used = 0;
/* Is this the first initialization? If so it's not really a rehashing
* we just set the first hash table so that it can accept keys. */
// 如果是第一次创建hash表,则设置完第一个表后直接退出
if (d->ht_table[0] == NULL) {
d->ht_size_exp[0] = new_ht_size_exp;
d->ht_used[0] = new_ht_used;
d->ht_table[0] = new_ht_table;
return DICT_OK;
}
/* Prepare a second hash table for incremental rehashing */
// 设置第二个表后退出,并且开始rehash
d->ht_size_exp[1] = new_ht_size_exp;
d->ht_used[1] = new_ht_used;
d->ht_table[1] = new_ht_table;
d->rehashidx = 0;
return DICT_OK;
}
// 检查是否需要缩容
int htNeedsResize(dict *dict) {
long long size, used;
// 哈希表大小
size = dictSlots(dict);
// 哈希表已用节点数量
used = dictSize(dict);
// 当哈希表的大小大于 > 4 并且用量小于 10%时缩容
return (size && used && size > DICT_HT_INITIAL_SIZE &&
(used*100/size < REDIS_HT_MINFILL));
}
在执行databasesCron时,如果数据库满足 htNeedsResize 会进行缩容,另外hash和zset类型数据在执行删除操作时,也会判断是否需要缩容。
扩容缩容都会触发开启rehash,最终调用 dictRehash 实现rehash的动作,以下是它的代码:
int dictRehash(dict *d, int n) {
// 累计访问多少个空桶后退出rehash
int empty_visits = n*10; /* Max number of empty buckets to visit. */
if (!dictIsRehashing(d)) return 0;
// 进行n次rehash
while(n-- && d->ht_used[0] != 0) {
dictEntry *de, *nextde;
/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
assert(DICTHT_SIZE(d->ht_size_exp[0]) > (unsigned long)d->rehashidx);
while(d->ht_table[0][d->rehashidx] == NULL) {
// 遇到空桶, 索引后移
d->rehashidx++;
if (--empty_visits == 0) return 1;
}
// 得到桶中链表第一个节点
de = d->ht_table[0][d->rehashidx];
/* Move all the keys in this bucket from the old to the new hash HT */
// 遍历链表上所有的节点, 添加到hash表2中
while(de) {
uint64_t h;
nextde = de->next;
/* Get the index in the new hash table */
// 计算hash值对应的索引 = hash & (2^exp - 1)
h = dictHashKey(d, de->key) & DICTHT_SIZE_MASK(d->ht_size_exp[1]);
// 添加到hash表2中
de->next = d->ht_table[1][h];
d->ht_table[1][h] = de;
d->ht_used[0]--;
d->ht_used[1]++;
de = nextde;
}
// 清空hash表1的桶
d->ht_table[0][d->rehashidx] = NULL;
d->rehashidx++;
}
/* Check if we already rehashed the whole table... */
// 如果rehash全部完成了, 则用表2替换表1, 并且释放原来的表1
if (d->ht_used[0] == 0) {
zfree(d->ht_table[0]);
/* Copy the new ht onto the old one */
d->ht_table[0] = d->ht_table[1];
d->ht_used[0] = d->ht_used[1];
d->ht_size_exp[0] = d->ht_size_exp[1];
_dictReset(d, 1);
d->rehashidx = -1;
return 0;
}
/* More to rehash... */
return 1;
}
// 执行多少毫秒的rehash操作
int dictRehashMilliseconds(dict *d, int ms) {
if (d->pauserehash > 0) return 0;
long long start = timeInMilliseconds();
int rehashes = 0;
// 还没超时的情况下,一次尝试执行100个桶的rehash
while(dictRehash(d,100)) {
rehashes += 100;
if (timeInMilliseconds()-start > ms) break;
}
return rehashes;
}
再分别看一下 dictAddRaw、dictGenericDelete、dictFind的代码:
/* Low level add or find:
* This function adds the entry but instead of setting a value returns the
* dictEntry structure to the user, that will make sure to fill the value
* field as they wish.
*
* This function is also directly exposed to the user API to be called
* mainly in order to store non-pointers inside the hash value, example:
*
* entry = dictAddRaw(dict,mykey,NULL);
* if (entry != NULL) dictSetSignedIntegerVal(entry,1000);
*
* Return values:
*
* If key already exists NULL is returned, and "*existing" is populated
* with the existing entry if existing is not NULL.
*
* If key was added, the hash entry is returned to be manipulated by the caller.
*/
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
long index;
dictEntry *entry;
int htidx;
// 正在rehash过程中,则执行一次rehash操作(只把老表中一个桶对应链表内的数据节点重新hash到新表的不同桶中)
if (dictIsRehashing(d)) _dictRehashStep(d);
/* Get the index of the new element, or -1 if
* the element already exists. */
// 获取新节点用的数组索引
if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
return NULL;
/* Allocate the memory and store the new entry.
* Insert the element in top, with the assumption that in a database
* system it is more likely that recently added entries are accessed
* more frequently. */
// 如果在rehash的话,新节点只会加到第二个哈希表中
htidx = dictIsRehashing(d) ? 1 : 0;
size_t metasize = dictMetadataSize(d);
entry = zmalloc(sizeof(*entry) + metasize);
if (metasize > 0) {
memset(dictMetadata(entry), 0, metasize);
}
// 新节点添加到链表头
entry->next = d->ht_table[htidx][index];
d->ht_table[htidx][index] = entry;
d->ht_used[htidx]++;
/* Set the hash entry fields. */
// 设置新节点的key, 如果设置了 type->KeyDup 函数, 则复制出一个key的副本保存到节点中
dictSetKey(d, entry, key);
return entry;
}
/* Search and remove an element. This is a helper function for
* dictDelete() and dictUnlink(), please check the top comment
* of those functions. */
static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) {
uint64_t h, idx;
dictEntry *he, *prevHe;
int table;
/* dict is empty */
if (dictSize(d) == 0) return NULL;
// 正在rehash过程中,则执行一次rehash操作
if (dictIsRehashing(d)) _dictRehashStep(d);
h = dictHashKey(d, key);
// 在hash表1和2中查找
for (table = 0; table <= 1; table++) {
// 计算索引获取桶中链表节点
idx = h & DICTHT_SIZE_MASK(d->ht_size_exp[table]);
he = d->ht_table[table][idx];
prevHe = NULL;
while(he) {
// 遍历链表找到对应key的节点并删除
if (key==he->key || dictCompareKeys(d, key, he->key)) {
/* Unlink the element from the list */
if (prevHe)
prevHe->next = he->next;
else
d->ht_table[table][idx] = he->next;
if (!nofree) {
// 释放节点中key/value
dictFreeUnlinkedEntry(d, he);
}
d->ht_used[table]--;
return he;
}
prevHe = he;
he = he->next;
}
if (!dictIsRehashing(d)) break;
}
return NULL; /* not found */
}
dictEntry *dictFind(dict *d, const void *key)
{
dictEntry *he;
uint64_t h, idx, table;
if (dictSize(d) == 0) return NULL; /* dict is empty */
// 执行一次rehash
if (dictIsRehashing(d)) _dictRehashStep(d);
h = dictHashKey(d, key);
// 在hash表1和2中查找
for (table = 0; table <= 1; table++) {
idx = h & DICTHT_SIZE_MASK(d->ht_size_exp[table]);
he = d->ht_table[table][idx];
while(he) {
if (key==he->key || dictCompareKeys(d, key, he->key))
return he;
he = he->next;
}
if (!dictIsRehashing(d)) return NULL;
}
return NULL;
}
dict的rehash并不是一次完成的,这个是基于性能、响应时间上的考虑。会执行rehash有以下几个函数:
- databasesCron函数,它会定时执行,对16个db中的dict和expires进行rehash,每次调用dictRehashMilliseconds执行1ms时间
- dictAddRaw、dictGenericDelete、dictFind、dictGetRandomKey、dictGetSomeKeys函数,它们每次rehash只处理一个桶
最后看下迭代器的定义:
/* If safe is set to 1 this is a safe iterator, that means, you can call
* dictAdd, dictFind, and other functions against the dictionary even while
* iterating. Otherwise it is a non safe iterator, and only dictNext()
* should be called while iterating. */
typedef struct dictIterator {
// 迭代器对应的字典
dict *d;
// 正在迭代的hash索引
long index;
// table表示迭代的表是两张表中的哪一张;safe表示是否是安全迭代器
int table, safe;
// entry表示迭代器当前的节点;nextEntry表示下个节点
dictEntry *entry, *nextEntry;
/* unsafe iterator fingerprint for misuse detection. */
// 指纹,判断迭代过程中有没执行被禁止的操作(dictNext以外的函数)
unsigned long long fingerprint;
} dictIterator;
迭代器分为安全和不安全迭代器:
- 安全迭代器,针对字典可以执行 dictAdd 等有可能修改的函数
- 不安全迭代器,针对字典只能执行 dictNext
迭代的实现:
dictEntry *dictNext(dictIterator *iter)
{
while (1) {
if (iter->entry == NULL) {
// 第一个迭代或者桶中的链表遍历完了
if (iter->index == -1 && iter->table == 0) {
// 安全模式暂停rehash
if (iter->safe)
dictPauseRehashing(iter->d);
else
iter->fingerprint = dictFingerprint(iter->d);
}
// 依次轮询两张hash表,查找非空节点
iter->index++;
if (iter->index >= (long) DICTHT_SIZE(iter->d->ht_size_exp[iter->table])) {
if (dictIsRehashing(iter->d) && iter->table == 0) {
iter->table++;
iter->index = 0;
} else {
break;
}
}
iter->entry = iter->d->ht_table[iter->table][iter->index];
} else {
// 取当前节点的下一个节点
iter->entry = iter->nextEntry;
}
if (iter->entry) {
// 如果不为空,返回当前节点,保存下个节点
/* We need to save the 'next' here, the iterator user
* may delete the entry we are returning. */
iter->nextEntry = iter->entry->next;
return iter->entry;
}
}
return NULL;
}