当前位置: 首页 > article >正文

Redis后台任务有哪些

Redis后台任务

为了有更好的性能表现,redis对于一些比较耗时的操作会异步执行,不阻塞线上请求。文章从源码(redis7.0)来看,aof、rdb文件的关闭,aof文件的刷盘以及部分内存释放会采用异步方式,在后台线程中执行。接下来我们看下这些具体的任务以及后台任务的具体实现。

Redis后台任务类型

在源码中的bio.h bio.c文件是redis后台线程任务的实现逻辑,包含以下三类任务:

/* Background job opcodes */
#define BIO_CLOSE_FILE    0 /* Deferred close(2) syscall. */
#define BIO_AOF_FSYNC     1 /* Deferred AOF fsync. */
#define BIO_LAZY_FREE     2 /* Deferred objects freeing. */
#define BIO_NUM_OPS       3
  1. 关闭文件,包括socket、数据文件的关闭。redis中主要是aof和rdb文件的close。
    异步close
  2. 刷盘,内容由内存到磁盘的过程,通常是异步写的,首先会写入page cahe,刷盘就是将修改的数据从page cache同步到磁盘。
    异步fsync
  3. 释放内存。
    异步释放

实现思路

异步任务实现逻辑
三类异步任务对应三个任务队列,分别不同的函数进行任务提交,任务队列是一个list,每个list对应一个消费线程。

  • void bioCreateCloseJob(int fd, int need_fsync); 提交异步关闭任务
  • void bioCreateFsyncJob(int fd); 提交异步刷盘任务
  • void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...); 提交异步释放任务

源码分析

初始化

在启动初始化完成后,main函数会调用InitServerLast做一些系统准备完毕后的其他初始化工作,其中就包括后台线程任务初始化初void bioInit(void);

  • 初始化每个线程用到的互斥锁、信号量、消费队列(bio_jobs)和队列长度(bio_pending)
  • 计算线程栈大小
  • 初始化每个线程,线程句柄函数bioProcessBackgroundJobs,每个线程会先进行设置名称(redis_set_thread_title)、绑核(redisSetCpuAffinity)、设置信号(pthread_sigmask),然后进入while(1)循环执行。
/* Initialize the background system, spawning the thread. */
void bioInit(void) {
    pthread_attr_t attr;
    pthread_t thread;
    size_t stacksize;
    int j;

    /* Initialization of state vars and objects */
    for (j = 0; j < BIO_NUM_OPS; j++) {
        pthread_mutex_init(&bio_mutex[j],NULL);
        pthread_cond_init(&bio_newjob_cond[j],NULL);
        pthread_cond_init(&bio_step_cond[j],NULL);
        bio_jobs[j] = listCreate();
        bio_pending[j] = 0;
    }

    /* Set the stack size as by default it may be small in some system */
    pthread_attr_init(&attr);
    pthread_attr_getstacksize(&attr,&stacksize);
    if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
    while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
    pthread_attr_setstacksize(&attr, stacksize);

    /* Ready to spawn our threads. We use the single argument the thread
     * function accepts in order to pass the job ID the thread is
     * responsible of. */
    for (j = 0; j < BIO_NUM_OPS; j++) {
        void *arg = (void*)(unsigned long) j;
        if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
            exit(1);
        }
        bio_threads[j] = thread;
    }
}

线程任务

每个线程的执行的函数都是同一个,在这个函数中根据不同的函数入参进行区别。

  • 首先是等待该线程对应的队列非空
  • 非空时从任务队列中取出头节点任务 ln->value
  • 根据不同的type类型分别执行刷盘、关闭文件、异步释放等操作
  • 执行完毕后删除节点。这里有个细节是广播bio_step_cond,是为了释放查询待执行任务数时加的锁(查询进来时,刚好有任务在执行,此时等待这个任务执行完毕)。
    while(1) {
        listNode *ln;

        /* The loop always starts with the lock hold. */
        if (listLength(bio_jobs[type]) == 0) {
            pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
            continue;
        }
        /* Pop the job from the queue. */
        ln = listFirst(bio_jobs[type]);
        job = ln->value;
        /* It is now possible to unlock the background system as we know have
         * a stand alone job structure to process.*/
        pthread_mutex_unlock(&bio_mutex[type]);

        /* Process the job accordingly to its type. */
        if (type == BIO_CLOSE_FILE) {
            if (job->fd_args.need_fsync) {
                redis_fsync(job->fd_args.fd);
            }
            close(job->fd_args.fd);
        } else if (type == BIO_AOF_FSYNC) {
            /* The fd may be closed by main thread and reused for another
             * socket, pipe, or file. We just ignore these errno because
             * aof fsync did not really fail. */
            if (redis_fsync(job->fd_args.fd) == -1 &&
                errno != EBADF && errno != EINVAL)
            {
                int last_status;
                atomicGet(server.aof_bio_fsync_status,last_status);
                atomicSet(server.aof_bio_fsync_status,C_ERR);
                atomicSet(server.aof_bio_fsync_errno,errno);
                if (last_status == C_OK) {
                    serverLog(LL_WARNING,
                        "Fail to fsync the AOF file: %s",strerror(errno));
                }
            } else {
                atomicSet(server.aof_bio_fsync_status,C_OK);
            }
        } else if (type == BIO_LAZY_FREE) {
            job->free_args.free_fn(job->free_args.free_args);
        } else {
            serverPanic("Wrong job type in bioProcessBackgroundJobs().");
        }
        zfree(job);

        /* Lock again before reiterating the loop, if there are no longer
         * jobs to process we'll block again in pthread_cond_wait(). */
        pthread_mutex_lock(&bio_mutex[type]);
        listDelNode(bio_jobs[type],ln);
        bio_pending[type]--;

        /* Unblock threads blocked on bioWaitStepOfType() if any. */
        pthread_cond_broadcast(&bio_step_cond[type]);
    }

提交任务

任务提交比较简单,通过上述三个Create接口实例化一个bio_job 然后追加到对应的list尾部。
bio_job使用了union,对于close和刷盘来说使用fd_args, 对于异步释放来说使用free_args。

typedef union bio_job {
    /* Job specific arguments.*/
    struct {
        int fd; /* Fd for file based background jobs */
        unsigned need_fsync:1; /* A flag to indicate that a fsync is required before
                                * the file is closed. */
    } fd_args;

    struct {
        lazy_free_fn *free_fn; /* Function that will free the provided arguments */
        void *free_args[]; /* List of arguments to be passed to the free function */
    } free_args;
} bio_job;

总结

redis 针对大文件关闭、大内存释放、刷盘这些操作,分别使用对应的后台线程防止其阻塞线上请求,保证线上请求的高性能。其实现方式比较清晰,每种后台任务对应一个链表实现的消费队列和一个后台线程作为消费者,前台请求只需要通过提交函数向队列中追加待执行任务即可。实现清晰、简洁,也有一些细节,例如后台线程要屏蔽 watchdog, 这里不再深究。


http://www.kler.cn/a/375344.html

相关文章:

  • 【ROS的TF系统】
  • 基于Spring Boot的员工与部门信息管理系统:增删改查全攻略
  • C++ 复习记录(个人记录)
  • [供应链] 邀请招标
  • [CUDA] atomic函数闭坑技巧
  • NPOI 操作详解(操作Excel)
  • Docker:存储原理
  • XtraBackup开源热备工具
  • vite5 打包项目兼容ie和低版本chrome
  • 卡尔曼滤波-应用白话
  • 文件上传漏洞-通过.htaccess文件绕过
  • Python-数据爬取(爬虫)
  • Web大型网站的性能测试要求和工具方法
  • Python数据分析入门知识基础和案例(万字长文)
  • Linux通过ifconfig命令ens33没有显示ip地址解决方式
  • select函数、I/O复用、并发服务器
  • 奥数与C++小学四年级(第十八题 小球重量)
  • 微服务基础-Ribbon
  • UE4安卓打aab包时,同时存在“gradle”、“arm64/gradle”两个Gradle工程的原因
  • Excel 单元格小数点精确位数机制
  • java随记
  • 基于深度学习的社交网络中的社区检测
  • Android启动流程_Zygote阶段
  • 音视频入门基础:FLV专题(18)——Audio Tag简介
  • Android面试八股文
  • 渗透测试练习题解析 7 (CTF web)