Redis后台任务有哪些
Redis后台任务
为了有更好的性能表现,redis对于一些比较耗时的操作会异步执行,不阻塞线上请求。文章从源码(redis7.0)来看,aof、rdb文件的关闭,aof文件的刷盘以及部分内存释放会采用异步方式,在后台线程中执行。接下来我们看下这些具体的任务以及后台任务的具体实现。
Redis后台任务类型
在源码中的bio.h bio.c文件是redis后台线程任务的实现逻辑,包含以下三类任务:
/* Background job opcodes */
#define BIO_CLOSE_FILE 0 /* Deferred close(2) syscall. */
#define BIO_AOF_FSYNC 1 /* Deferred AOF fsync. */
#define BIO_LAZY_FREE 2 /* Deferred objects freeing. */
#define BIO_NUM_OPS 3
- 关闭文件,包括socket、数据文件的关闭。redis中主要是aof和rdb文件的close。
- 刷盘,内容由内存到磁盘的过程,通常是异步写的,首先会写入page cahe,刷盘就是将修改的数据从page cache同步到磁盘。
- 释放内存。
实现思路
三类异步任务对应三个任务队列,分别不同的函数进行任务提交,任务队列是一个list,每个list对应一个消费线程。
void bioCreateCloseJob(int fd, int need_fsync);
提交异步关闭任务void bioCreateFsyncJob(int fd);
提交异步刷盘任务void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...);
提交异步释放任务
源码分析
初始化
在启动初始化完成后,main函数会调用InitServerLast
做一些系统准备完毕后的其他初始化工作,其中就包括后台线程任务初始化初void bioInit(void);
。
- 初始化每个线程用到的互斥锁、信号量、消费队列(bio_jobs)和队列长度(bio_pending)
- 计算线程栈大小
- 初始化每个线程,线程句柄函数
bioProcessBackgroundJobs
,每个线程会先进行设置名称(redis_set_thread_title)、绑核(redisSetCpuAffinity)、设置信号(pthread_sigmask),然后进入while(1)循环执行。
/* Initialize the background system, spawning the thread. */
void bioInit(void) {
pthread_attr_t attr;
pthread_t thread;
size_t stacksize;
int j;
/* Initialization of state vars and objects */
for (j = 0; j < BIO_NUM_OPS; j++) {
pthread_mutex_init(&bio_mutex[j],NULL);
pthread_cond_init(&bio_newjob_cond[j],NULL);
pthread_cond_init(&bio_step_cond[j],NULL);
bio_jobs[j] = listCreate();
bio_pending[j] = 0;
}
/* Set the stack size as by default it may be small in some system */
pthread_attr_init(&attr);
pthread_attr_getstacksize(&attr,&stacksize);
if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
pthread_attr_setstacksize(&attr, stacksize);
/* Ready to spawn our threads. We use the single argument the thread
* function accepts in order to pass the job ID the thread is
* responsible of. */
for (j = 0; j < BIO_NUM_OPS; j++) {
void *arg = (void*)(unsigned long) j;
if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
exit(1);
}
bio_threads[j] = thread;
}
}
线程任务
每个线程的执行的函数都是同一个,在这个函数中根据不同的函数入参进行区别。
- 首先是等待该线程对应的队列非空
- 非空时从任务队列中取出头节点任务 ln->value
- 根据不同的type类型分别执行刷盘、关闭文件、异步释放等操作
- 执行完毕后删除节点。这里有个细节是广播bio_step_cond,是为了释放查询待执行任务数时加的锁(查询进来时,刚好有任务在执行,此时等待这个任务执行完毕)。
while(1) {
listNode *ln;
/* The loop always starts with the lock hold. */
if (listLength(bio_jobs[type]) == 0) {
pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
continue;
}
/* Pop the job from the queue. */
ln = listFirst(bio_jobs[type]);
job = ln->value;
/* It is now possible to unlock the background system as we know have
* a stand alone job structure to process.*/
pthread_mutex_unlock(&bio_mutex[type]);
/* Process the job accordingly to its type. */
if (type == BIO_CLOSE_FILE) {
if (job->fd_args.need_fsync) {
redis_fsync(job->fd_args.fd);
}
close(job->fd_args.fd);
} else if (type == BIO_AOF_FSYNC) {
/* The fd may be closed by main thread and reused for another
* socket, pipe, or file. We just ignore these errno because
* aof fsync did not really fail. */
if (redis_fsync(job->fd_args.fd) == -1 &&
errno != EBADF && errno != EINVAL)
{
int last_status;
atomicGet(server.aof_bio_fsync_status,last_status);
atomicSet(server.aof_bio_fsync_status,C_ERR);
atomicSet(server.aof_bio_fsync_errno,errno);
if (last_status == C_OK) {
serverLog(LL_WARNING,
"Fail to fsync the AOF file: %s",strerror(errno));
}
} else {
atomicSet(server.aof_bio_fsync_status,C_OK);
}
} else if (type == BIO_LAZY_FREE) {
job->free_args.free_fn(job->free_args.free_args);
} else {
serverPanic("Wrong job type in bioProcessBackgroundJobs().");
}
zfree(job);
/* Lock again before reiterating the loop, if there are no longer
* jobs to process we'll block again in pthread_cond_wait(). */
pthread_mutex_lock(&bio_mutex[type]);
listDelNode(bio_jobs[type],ln);
bio_pending[type]--;
/* Unblock threads blocked on bioWaitStepOfType() if any. */
pthread_cond_broadcast(&bio_step_cond[type]);
}
提交任务
任务提交比较简单,通过上述三个Create接口实例化一个bio_job 然后追加到对应的list尾部。
bio_job使用了union,对于close和刷盘来说使用fd_args, 对于异步释放来说使用free_args。
typedef union bio_job {
/* Job specific arguments.*/
struct {
int fd; /* Fd for file based background jobs */
unsigned need_fsync:1; /* A flag to indicate that a fsync is required before
* the file is closed. */
} fd_args;
struct {
lazy_free_fn *free_fn; /* Function that will free the provided arguments */
void *free_args[]; /* List of arguments to be passed to the free function */
} free_args;
} bio_job;
总结
redis 针对大文件关闭、大内存释放、刷盘这些操作,分别使用对应的后台线程防止其阻塞线上请求,保证线上请求的高性能。其实现方式比较清晰,每种后台任务对应一个链表实现的消费队列和一个后台线程作为消费者,前台请求只需要通过提交函数向队列中追加待执行任务即可。实现清晰、简洁,也有一些细节,例如后台线程要屏蔽 watchdog, 这里不再深究。