Redis3——线程模型与数据结构
Redis3——线程模型与数据结构
本文讲述了redis的单线程模型和IO多线程工作原理,以及几个主要数据结构的实现。
1. Redis的单线程模型
redis6.0之前,一个redis进程只有一个io线程,通过reactor模式可以连接大量客户端;redis6.0为了高并发下网络IO的性能,引入了多个IO线程。但是自始至终,redis进程只有一个命令处理线程,因此redis仍然可以看作一个单线程模型。
默认情况下,redis进程中的线程:
1.1 Redis6.0为什么引入多个IO线程
引入多个IO线程以解决高并发场景下的网络IO瓶颈,具体来说,网络IO中与cpu有关的开销包括:
- 系统调用上下文切换。
- 内存数据拷贝虽然受内存频率和带宽限制,但仍需要cpu参与。
- CPU主导的协议栈处理。
因此,即使网络带宽和内存频率固定,采用多线程io仍然能够提高整体网络IO性能。
1.2 Redis命令处理为什么采用单线程模型
- 单线程实现简单,不用考虑线程安全和锁竞争。减少了线程切换开销。
- 天然满足事务的原子性、隔离性。
- 得益于数据结构的设计,redis基于内存的操作通常非常高效,cpu通常不是性能瓶颈。
- redis不同键值对的数据结构不同,多线程场景下,加锁操作复杂,加锁粒度不好控制。
Memcached是一个redis的竞品,采用多线程并发的处理客户请求,其支持的数据类型单一,因此能够方便的对某个局部加锁。
1.3 单线程的耗时操作处理
单线程的局限在于,不能有耗时操作:cpu密集型任务、io密集型任务。
磁盘IO
对于磁盘io任务,redis要么启用子进程来持久化,要么使用其它线程进行异步刷盘。
网络IO
对于网络io任务,redis启用多个io线程,每个线程都采用reactor网络模型,并发处理大量客户端连接。
cpu密集型任务
对于cpu密集型任务,redis需要保证其操作高效:
- 数据结构切换:redis采用的数据结构会根据数据量进行切换。当数据量少的时候,redis更注重空间效率,当数据量大时,redis更注重时间效率。
- 分治:例如间断遍历
- 渐进式的数据迁移:增量式rehash
1.4 单线程如何保证高效
- 基于内存的数据库
- 数据(键值对)的组织方式:hashtable,时间复杂度O(1),渐进式的rehash
- 高效的数据结构与数据结构切换
- 高效的reactor模型(redis6.0之前)
2. Redis的IO多线程工作原理
2.1 理论
对于每一个客户端发送来的命令,redis需要进行read、decode、compute、encode、send这五个步骤。
在redis6.0之前的单线程模式下,主线程通过一个reactor模型串行地完成这5个操作。
加入IO多线程后,主线程会在每次循环时,将IO一部分读写任务分配给IO线程执行。主线程仍然运行一个reactor事件循环,在客户端可读或者可写时将其分配给IO线程。
也就是说现在read、decode以及encode、send都是并行的了。compute操作仍然时串行。
这样设计地合理性在于,一般读写和解析协议操作不会出现竞态条件,至于一个客户端连接相关。而执行命令则需要访问全局数据结构,串行执行可以减少竞态条件。
2.2 业务流程
-
在accept建立新连接时,调用acceptCommonHandler创建客户端上下文
-
在createClient中,设置读事件的回调函数readQueryFromClient
-
读事件发生时,在readQueryFromClient中,读取数据,并用一个sds保存到客户端上下中的querybuf中
... qblen = sdslen(c->querybuf); if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); nread = connRead(c->conn, c->querybuf+qblen, readlen); ... processInputBuffer(c);
-
在processInputBuffer中处理读取到的数据
-
分割数据包,如果是单行命令执行processInlineBuffer,否则执行processMultibulkBuffer
if (c->reqtype == PROTO_REQ_INLINE) { if (processInlineBuffer(c) != C_OK) break; ... } else if (c->reqtype == PROTO_REQ_MULTIBULK) { if (processMultibulkBuffer(c) != C_OK) break; } else { serverPanic("Unknown request type"); }
-
如果在IO线程中,则标记其为有命令需要执行的状态,否则直接执行
/* If we are in the context of an I/O thread, we can't really execute the command here. All we can do is to flag the client as one that needs to process the command. */ if (c->flags & CLIENT_PENDING_READ) { c->flags |= CLIENT_PENDING_COMMAND; break; } /* We are finally ready to execute the command. */ if (processCommandAndResetClient(c) == C_ERR) { return; }
-
-
在主线程中事件循环中,每次循环会调用
handleClientsWithPendingReadsUsingThreads
函数,它会将仍有数据待读取的客户端分配给IO线程去并行的读取和解析,然后等待这些IO线程执行完毕,之后再调用processCommand处理所有有待处理的命令的客户端。 -
处理完毕后,调用addReply将输出写入到output buffer
-
当写事件发生时,会调用sendReplyToClient,将数据封装成协议然后发送给客户端。
/* Write event handler. Just send data to the client. */ void sendReplyToClient(connection *conn) { client *c = connGetPrivateData(conn); writeToClient(c,1); }
-
另外,主线程也会调用handleClientsWithPendingWritesUsingThreads,将可写的客户端分配给IO线程完成。
2.3 IO线程池
IO线程的工作非常简单,主线程会将待处理的客户端分配个各个io线程,当io线程检测到有客户端分配来时,就根据主线程设置的操作类型取执行。
while(1) {
/* Wait for start */
for (int j = 0; j < 1000000; j++) {
if (getIOPendingCount(id) != 0) break;
}
/* Give the main thread a chance to stop this thread. */
if (getIOPendingCount(id) == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
serverAssert(getIOPendingCount(id) != 0);
/* Process: note that the main thread will never touch our list before we drop the pending count to 0. */
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
setIOPendingCount(id, 0);
}
3. Redis数据结构
2.1 字典
redis的字典采用hash表实现。
2.1.1 扩容缩容机制
-
哈希冲突:采用拉链法
-
负载因子:1
-
扩容机制:加倍扩容,如果扩容时正在fork(在rdb或aof复写),则阻止扩容,但是若此时负载因子>5,则利用写时复制原理马上扩容。
// redis v=6.2.16 if ((dict_can_resize == DICT_RESIZE_ENABLE && d->ht[0].used >= d->ht[0].size) || (dict_can_resize != DICT_RESIZE_FORBID && d->ht[0].used / d->ht[0].size > dict_force_resize_ratio)) { return dictExpand(d, d->ht[0].used + 1); }
-
缩容机制:如果负载因子<0.1,则恰好大于等于used的2的n次幂为新的容量。例如此时负载因子小于0.1,元素总数为17,则新的容量为2的5次方 = 32 > 17。
// redis v=6.2.16 int htNeedsResize(dict *dict) { long long size, used; size = dictSlots(dict); used = dictSize(dict); return (size > DICT_HT_INITIAL_SIZE && (used*100/size < HASHTABLE_MIN_FILL)); } /* If the percentage of used slots in the HT reaches HASHTABLE_MIN_FILL * we resize the hash table to save memory */ void tryResizeHashTables(int dbid) { if (htNeedsResize(server.db[dbid].dict)) dictResize(server.db[dbid].dict); if (htNeedsResize(server.db[dbid].expires)) dictResize(server.db[dbid].expires); }
2.1.2 渐进式rehash
渐进式rehash:每个dict都有两个hash表,当没有触发扩容时,第二个hash表为NULL,当进行扩容时,则分多次渐进地迁移旧的hash表的每个槽位。有两种迁移策略,这两种策略可以并行。
- 将rehash的操作分摊在之后增删改查操作中。例如当进行查询操作时,就迁移一个槽位。当所有的元素都被迁移完毕,就将旧的删除,用新的hash替换旧的hash表。
- 对于主hash表,使用定时器,每隔一段时间(100ms)就花费1ms时间执行迁移操作。
增删改查时rehash一个:
每次增删改查,rehash一个槽位:
static void _dictRehashStep(dict *d) {
// 判断是否选择这种每次rehash一步的迁移策略
if (d->pauserehash == 0) dictRehash(d,1);
}
// 增
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
long index;
dictEntry *entry;
dictht *ht;
if (dictIsRehashing(d)) _dictRehashStep(d);
...
}
// 删
static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) {
uint64_t h, idx;
dictEntry *he, *prevHe;
int table;
if (d->ht[0].used == 0 && d->ht[1].used == 0) return NULL;
if (dictIsRehashing(d)) _dictRehashStep(d);
...
}
// 查
dictEntry *dictFind(dict *d, const void *key)
{
dictEntry *he;
uint64_t h, idx, table;
if (dictSize(d) == 0) return NULL; /* dict is empty */
if (dictIsRehashing(d)) _dictRehashStep(d);
...
}
dictEntry *dictGetRandomKey(dict *d)
{
dictEntry *he, *orighe;
unsigned long h;
int listlen, listele;
if (dictSize(d) == 0) return NULL;
if (dictIsRehashing(d)) _dictRehashStep(d);
...
}
unsigned int dictGetSomeKeys(dict *d, dictEntry **des, unsigned int count) {
unsigned long j; /* internal hash table id, 0 or 1. */
unsigned long tables; /* 1 or 2 tables? */
unsigned long stored = 0, maxsizemask;
unsigned long maxsteps;
if (dictSize(d) < count) count = dictSize(d);
maxsteps = count*10;
/* Try to do a rehashing work proportional to 'count'. */
for (j = 0; j < count; j++) {
if (dictIsRehashing(d))
_dictRehashStep(d);
else
break;
}
...
}
空闲时,迁移1ms:
// rehash‘ms’毫秒
int dictRehashMilliseconds(dict *d, int ms) {
if (d->pauserehash > 0) return 0;
long long start = timeInMilliseconds();
int rehashes = 0;
while(dictRehash(d,100)) {
rehashes += 100;
if (timeInMilliseconds()-start > ms) break;
}
return rehashes;
}
// 渐进式rehash,每次rehash 1ms
int incrementallyRehash(int dbid) {
/* Keys dictionary */
if (dictIsRehashing(server.db[dbid].dict)) {
dictRehashMilliseconds(server.db[dbid].dict,1);
return 1; /* already used our millisecond for this loop... */
}
/* Expires */
if (dictIsRehashing(server.db[dbid].expires)) {
dictRehashMilliseconds(server.db[dbid].expires,1);
return 1; /* already used our millisecond for this loop... */
}
return 0;
}
redis何时执行增量式rehash
在redis的定时任务中,会判断是否开启了自动rehash,如果开启就执行。
void databasesCron(void) {
...
// 没有fork子进程来持久化的是否进行rehash
if (!hasActiveChildProcess()) {
...
// 如果开启了rehash
if (server.activerehashing) {
for (j = 0; j < dbs_per_call; j++) {
int work_done = incrementallyRehash(rehash_db);
...
}
...
}
}
在配置文件中,可以选择是否开启:
activerehashing yes
2.1.3 哈希表的遍历
遍历数据库的原则为:①不重复出现数据;②不遗漏任何数据。熟悉Redis命令的读者应该知道,遍历Redis整个数据库主要有两种方式:全遍历 (例如keys命令)、间断遍历 (hscan命令),这两种方式将在下面进行详细讲解。
2.1.3.1 全遍历
全遍历:一次命令执行就遍历完整个数据库。
全遍历使用普通迭代器或者间断迭代器:
typedef struct dictIterator {
dict *d; // 当前遍历的字典
long index; // 当前bucket索引
int table, safe; // hash表id和是否是安全迭代器
dictEntry *entry, *nextEntry; // 当前元素和下一个元素
/* unsafe iterator fingerprint for misuse detection. */
unsigned long long fingerprint; // 由hash表中所有元素生成的指纹,如果遍历过程中元素发生变化,就能够根据此值检查出来
} dictIterator;
普通迭代器与安全迭代器的区别在于,普通迭代器迭代期间允许rehash,但是如果数据元素发生变动导致fingerprint发生变化,就会发出异常。而安全迭代器在迭代期间不允许进行渐进式rehash。
一次迭代的操作如下:
dictEntry *dictNext(dictIterator *iter)
{
while (1) {
if (iter->entry == NULL) {
dictht *ht = &iter->d->ht[iter->table];
if (iter->index == -1 && iter->table == 0) {
// 初始时,如果时安全迭代器,就禁止rehash操作,否则就计算指纹
if (iter->safe)
dictPauseRehashing(iter->d);
else
iter->fingerprint = dictFingerprint(iter->d);
}
iter->index++;
if (iter->index >= (long) ht->size) {
if (dictIsRehashing(iter->d) && iter->table == 0) {
iter->table++;
iter->index = 0;
ht = &iter->d->ht[1];
} else {
break;
}
}
iter->entry = ht->table[iter->index];
} else {
iter->entry = iter->nextEntry;
}
if (iter->entry) {
/* We need to save the 'next' here, the iterator user
* may delete the entry we are returning. */
iter->nextEntry = iter->entry->next;
return iter->entry;
}
}
return NULL;
}
2.1.3.2 间断遍历
间断遍历:每次命令执行只取部分数据,分多次遍历。
间断遍历期间允许进行渐进式rehash,它通过采用reverse binary iteration来保证不重复数据也不遗漏数据。
由于扩容及缩容正好为整数倍增长或减少的原理,根据这个特征,很容易就能推导出同一个节点的数据扩容/缩容后在新的Hash表中的分布位置,例如一个hash表大小为4,索引为0的元素,扩容后新的索引可能为0或者4。一个hash表大小为8,索引为6的元素,缩容后新的索引为2。
根据这个规律,可以设计一种迭代顺序规则,即游标值v变化的规则:
v |= ~mask;
v = rev(v); // 二进制逆转
v++;
v = rev(v); // 二进制逆转
两次遍历间隔期间发生扩容:举例,如果size为4,则mask为二进制011,遍历顺序为00, 10, 01, 11,对应十进制索引值0, 2, 1, 3。假如遍历到1时发生了扩容,则mask变为0111,则遍历顺序变为00, 10, 001, 101, 011, 111,对应十进制索引值为0, 2, 1, 5, 3, 7,少了4和6。正好扩容前的0,2中的元素可能被放入4,6。这样就防止了重复遍历。
同理,如果两个遍历地间隔期间发生缩容,这种算法也可以避免重复护着遗漏元素。
两次遍历间隔期间发送缩容:举例,如果size为8,则mask为二进制0111,遍历顺序为000,100,010,110,001,101,011,111,对应十进制0,4,2,6,1,5,3,7。如果在遍历1时,发生了缩容,则mask为011,遍历顺序就变为000,100,010,110,01,11,对应十进制为0,4,2,6,1,3,少了5和7,因为缩容前5和7的元素在缩容后会被放入1和3。这样就防止了重复和遗漏。
这种高位累加的游标变更算法巧妙地利用扩容缩容前后元素索引值地变化规律,设计出游标变化规则,从而避免了重复遍历或漏遍历。
在redis中可以用scan和hscan进行间断遍历:
127.0.0.1:6379> scan 0 match * count 1
1) "2"
2) 1) "yy"
2) "person"
127.0.0.1:6379> scan 2 match * count 1
1) "1"
2) 1) "dsaa"
127.0.0.1:6379> scan 1 match * count 1
1) "3"
2) 1) "name"
127.0.0.1:6379> scan 3 match * count 1
1) "0"
2) (empty array)
2.2 跳表
请参考我之前的博客高级数据结构——跳表skiplist。
2.3 压缩列表
压缩列表ziplist本质上就是一个字节数组,一段连续的内存,是Redis为了节约内存而设计的一种线性数据结构,可以包含多个元素,每个元素可以是一个字节数组或一个整数。
redis6.0之后,逐渐用listpack取代ziplist。redis7.0之后完全移除了ziplist的支持。
Redis的有序集合、散列和列表都直接或者间接使用了压缩列表。当有序集合或散列表的元素个数比较少,且元素都是短字符串时,Redis便使用压缩列表作为其底层数据存储结构。列表使用快速链表(quicklist)数据结构存储,而快速链表就是双向链表与压缩列表的组合。
2.4 quicklist
quicklist由双向链表adlist和ziplist结合而成。Redis中对quciklist的注释为A doubly linked list of ziplists。顾名思义,quicklist是一个双向链表,链表中的每个节点是一个ziplist结构。
结构体:
typedef struct quicklist {
quicklistNode *head;
quicklistNode *tail;
unsigned long count; /* total count of all entries in all ziplists */
unsigned long len; /* number of quicklistNodes */
int fill : QL_FILL_BITS; /* fill factor for individual nodes */
unsigned int compress : QL_COMP_BITS; /* depth of end nodes not to compress;0=off */
unsigned int bookmark_count: QL_BM_BITS;
quicklistBookmark bookmarks[];
} quicklist;
typedef struct quicklistNode {
struct quicklistNode *prev;
struct quicklistNode *next;
unsigned char *zl;
unsigned int sz; /* ziplist size in bytes */
unsigned int count : 16; /* count of items in ziplist */
unsigned int encoding : 2; /* RAW==1 or LZF==2 */
unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */
unsigned int recompress : 1; /* was this node previous compressed? */
unsigned int attempted_compress : 1; /* node can't compress; too small */
unsigned int extra : 10; /* more bits to steal for future usage */
} quicklistNode;
2.5 动态字符串
2.5.1 对象结构
Redis没有直接使用C语言传统的字符串表示(以空字符结尾的字符数组,以下简称C字符串),而是自己构建了一种名为简单动态字符串(simple dynamic string,SDS)的抽象类型,并将SDS用作Redis的默认字符串表示。
下面是redis6.2中sds对象结构:
// 可以存储小于32字节的字符串
struct __attribute__ ((__packed__)) sdshdr5 {
unsigned char flags; /* 3 lsb of type, and 5 msb of string length */
char buf[];
};
// 可以存储小于256字节的字符串
struct __attribute__ ((__packed__)) sdshdr8 {
uint8_t len; /* used */
uint8_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr16 {
uint16_t len; /* used */
uint16_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr32 {
uint32_t len; /* used */
uint32_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr64 {
uint64_t len; /* used */
uint64_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};
为了节省空间,不同大小的字符串对应的头部大小应该不同,对于一个小字符串,没有必要使用64位整数来保存字符串长度。
sds中的len表示字符串长度,alloc表示总的空间大小(不包含头部和末尾空白字符)。与C风格的字符串相比,这样设计的好处是:
- 内存安全:不依赖末尾空白字符,这样可以防止内存越界等问题。
- 兼容二进制数据
末尾仍然以’\0’结尾,可以兼容C风格字符串。
flags字段用来标识类型,由于一共有5种类型,至少需要3位来存储,于是对于最短的字符串来说,剩余5位可以用来存储字符串的长度,可以存储最长31字节的字符串。
**__attribute__ ((_packed_))**需要注意,一般情况下,结构体的整体大小必须是其成员最大对齐要求的倍数。最大对齐要求通常是结构体中对齐要求最高的成员的对齐数,而用packed修饰后,结构体则变为按1字节对齐。
使用该编译器扩展属性后,sdshdr32减少了3字节的填充。
struct sdshdr32_no_packed {
uint32_t len; /* used */
uint32_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};
int main()
{
// sizeof sdshdr32:9
printf("sizeof sdshdr32:%lld\n", sizeof (struct sdshdr32));
// sizeof sdshdr32_no_packed:12
printf("sizeof sdshdr32_no_packed:%lld\n", sizeof (struct sdshdr32_no_packed));
return 0;
}
2.5.2 柔性数组
buf是一个柔性数组成员(flexible array member),也叫伸缩性数组成员,只能被放在结构体的末尾。包含柔性数组成员的结构体,通过malloc函数为柔性数组动态分配内存。
柔性数组的限制:
- 只能是结构体的最后一个成员
- 不能是结构体的唯一一个成员
- 运行时要手动为包含柔性数组的结构体分配足够的连续内存
使用柔性数组的好处:
-
和sds结构体成员共用一个连续内存,减少动态内存分配和间接访问,效率更高。
-
通过首地址偏移方便得到结构体的首地址,进而获取其它成员变量。
首地址偏移示例:
/* Free an sds string. No operation is performed if 's' is NULL. */ void sdsfree(sds s) { if (s == NULL) return; s_free((char*)s-sdsHdrSize(s[-1])); }
2.5.3 相关面试题
-
为什么sds小于等于44字节时采用embstr编码,大于44字节时采用raw编码?
embstr编码将redisObject和sds存储在一个连续内存中,而raw编码将两者分开存储。redisObject的大小为16字节,44字节对应sdshdr8占用3字节,还有1字节的空白字符,所以44字节的sds实际占用64字节。
首先,cache line大小为64字节,如果整个sds结构体小于等于64字节,就能够利用缓存加快访问速度,此时采用压缩编码效率最高。
其次,大多数内存分配器都是按照2的整数次幂的大小分配内存,选择64字节可以减少内存碎片。
综上,当字符串小于等于44字节时,采用嵌入编码能够利用缓存行和减少内存碎片。
-
为什么其它类型中数据结构切换都有一个元素长度不超过64字节这个分界线?
首先当字符串总长度小于等于64字节时,采用的是embstr编码,将sds嵌入到redisObject中,整个元素占用一段连续的内存空间。这可以节省内存空间,减少内存分配开销。
此外,cache line大小为64字节,元素长度不超过64字节可以更高的利用缓存行加速访问。
数据结构切换之前,为了节约内存,通常采用压缩列表,而元素长度不超过64字节的字符串采用embstr也是为了节约内存。
学习参考
学习更多相关知识请参考零声 github。