当前位置: 首页 > article >正文

redis7.x源码分析:(4) ae事件处理器(一)

ae模块是redis实现的Reactor模型的封装。它的主要代码实现集中在 ae.c 中,另外还提供了平台相关的io多路复用的封装,它们都实现了一套相同的poll接口,就类似于C++中提供了一个接口基类,由针对不同平台的派生类去实现。

// 创建平台相关的io模型实例
static int aeApiCreate(aeEventLoop *eventLoop)
// 修改可侦听的fd数量
static int aeApiResize(aeEventLoop *eventLoop, int setsize)
// 释放实例
static void aeApiFree(aeEventLoop *eventLoop)
// 添加或修改fd的侦听事件
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)
// 删除或修改fd的侦听事件
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask)
// 侦听所有fd的事件
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)
// 返回使用的io模型名称
static char *aeApiName(void)

实际使用的io模型会根据编译时定义的宏在 ae.c 的代码头部直接引入。

/* Include the best multiplexing layer supported by this system.
 * The following should be ordered by performances, descending. */
// 根据不同操作系统下不同的宏选择应该使用的io复用模型
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    // 对应linux
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        // 对应BSD(FreeBSD、MacOS等)
        #include "ae_kqueue.c"
        #else
        // 对应Solaris
        #include "ae_select.c"
        #endif
    #endif
#endif

以linux为例先来看一下ae_epoll.c的实现,它的代码也比较简单:

static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));

    if (!state) return -1;
    // 分配events数组大小
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
        zfree(state);
        return -1;
    }
    // 创建epoll, kernal 2.6.8 之后参数被忽略了
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
    }
    anetCloexec(state->epfd);
    eventLoop->apidata = state;
    return 0;
}

......

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    // 没注册过, 认为是添加fd到epoll
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    // 合并新老事件
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    // 设置需要的事件
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    // 删除指定事件
    int mask = eventLoop->events[fd].mask & (~delmask);

    ee.events = 0;
    // 设置需要的事件
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    // 没有事件执行删除,否则执行修改
    if (mask != AE_NONE) {
        epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
    } else {
        /* Note, Kernel < 2.6.9 requires a non null event pointer even for
         * EPOLL_CTL_DEL. */
        epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
    }
}

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    // 此处的 (tvp->tv_usec + 999)/1000 是为了避免当 tvp->tv_usec < 1000us 时转换成毫秒为 0ms, 导致epoll_wait不会等待
    // 因此这么写之后能保证只要 tvp->tv_usec 不为0,那么至少都能等待 1ms 时长
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
    if (retval > 0) {
        int j;

        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
            // 保存触发的事件
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    } else if (retval == -1 && errno != EINTR) {
        panic("aeApiPoll: epoll_wait, %s", strerror(errno));
    }

    return numevents;
}

接下来再看一下ae相关接口,先看下fd相关实现。

主要结构体定义如下:

/* File event structure */
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */ // 事件类型
    aeFileProc *rfileProc; // 读事件回调
    aeFileProc *wfileProc; // 写事件回调
    void *clientData; // 上下文指针
} aeFileEvent;

/* Time event structure */
typedef struct aeTimeEvent {
    long long id; /* time event identifier. */ // 定时器事件id
    monotime when; // 触发时间
    aeTimeProc *timeProc; // 定时器触发回调
    aeEventFinalizerProc *finalizerProc; // 定时器销毁时执行的回调
    void *clientData; // 上下文指针
    struct aeTimeEvent *prev; // 前一个定时器事件
    struct aeTimeEvent *next; // 后一个定时器事件
    int refcount; /* refcount to prevent timer events from being
  		   * freed in recursive time event calls. */ // 引用计数
} aeTimeEvent;

/* A fired event */
typedef struct aeFiredEvent {
    int fd; // 触发事件的定时器
    int mask; // 触发的事件类型
} aeFiredEvent;

/* State of an event based program */
typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */ // 注册的最大fd值
    int setsize; /* max number of file descriptors tracked */ // 注册的fd数量
    long long timeEventNextId; // 自增用于生成定时器id
    aeFileEvent *events; /* Registered events */ // 注册的fd事件
    aeFiredEvent *fired; /* Fired events */ // 触发的fd事件
    aeTimeEvent *timeEventHead; // 定时器事件链表头
    int stop; // 结束标志
    void *apidata; /* This is used for polling API specific data */ // 不同poll的上下文信息
    aeBeforeSleepProc *beforesleep; // poll前调用
    aeBeforeSleepProc *aftersleep; // poll后调用
    int flags;
} aeEventLoop;

aeEventLoop中的events是用于保存fd注册事件的数组,它是以fd值作为索引来存取事件的。

创建eventloop事件管理器:

aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;

    // 初始化一下单调时钟使用哪种方式实现
    monotonicInit();    /* just in case the calling app didn't initialize */

    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    // events、fired数组以fd值为下标,直接存取对应fd的相关数据
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    eventLoop->setsize = setsize;
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    eventLoop->beforesleep = NULL;
    eventLoop->aftersleep = NULL;
    eventLoop->flags = 0;
    // 根据平台创建io模型实例
    if (aeApiCreate(eventLoop) == -1) goto err;
    /* Events with mask == AE_NONE are not set. So let's initialize the
     * vector with it. */
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;
    return eventLoop;

err:
    if (eventLoop) {
        zfree(eventLoop->events);
        zfree(eventLoop->fired);
        zfree(eventLoop);
    }
    return NULL;
}

添加和删除事件:

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    // 超出events数组有效范围直接报错
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];
    // 添加或修改fd读写事件
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
{
    if (fd >= eventLoop->setsize) return;
    aeFileEvent *fe = &eventLoop->events[fd];
    if (fe->mask == AE_NONE) return;

    /* We want to always remove AE_BARRIER if set when AE_WRITABLE
     * is removed. */
    if (mask & AE_WRITABLE) mask |= AE_BARRIER;

    aeApiDelEvent(eventLoop, fd, mask);
    fe->mask = fe->mask & (~mask);
    // 如果本次删除事件的fd是最大的fd,并且该fd已经没有事件了(可以认为该fd被清除了)
    if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
        /* Update the max fd */
        int j;

        // 从后往前遍历找到第一个有事件的fd赋给maxfd
        for (j = eventLoop->maxfd-1; j >= 0; j--)
            if (eventLoop->events[j].mask != AE_NONE) break;
        eventLoop->maxfd = j;
    }
}

事件处理函数,处理一轮事件:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* Note that we want to call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        struct timeval tv, *tvp;
        int64_t usUntilTimer = -1;

        // 获取最近的一个定时器距当前时间的触发时长
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            usUntilTimer = usUntilEarliestTimer(eventLoop);

        if (usUntilTimer >= 0) {
            tv.tv_sec = usUntilTimer / 1000000;
            tv.tv_usec = usUntilTimer % 1000000;
            tvp = &tv;
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }

        // 不需要等待,超时时间设为0
        if (eventLoop->flags & AE_DONT_WAIT) {
            tv.tv_sec = tv.tv_usec = 0;
            tvp = &tv;
        }

        // 执行poll前的回调(对fd读写以及命令处理都会在beforsleep中执行, 如果开启多线程读写也会在它内部一并处理)
        if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
            eventLoop->beforesleep(eventLoop);

        /* Call the multiplexing API, will return only on timeout or when
         * some event fires. */
        // 等待事件
        numevents = aeApiPoll(eventLoop, tvp);

        /* After sleep callback. */
        // 执行poll后的回调
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

        // 处理poll到的读写事件
        for (j = 0; j < numevents; j++) {
            int fd = eventLoop->fired[j].fd;
            aeFileEvent *fe = &eventLoop->events[fd];
            int mask = eventLoop->fired[j].mask;
            int fired = 0; /* Number of events fired for current fd. */

            /* Normally we execute the readable event first, and the writable
             * event later. This is useful as sometimes we may be able
             * to serve the reply of a query immediately after processing the
             * query.
             *
             * However if AE_BARRIER is set in the mask, our application is
             * asking us to do the reverse: never fire the writable event
             * after the readable. In such a case, we invert the calls.
             * This is useful when, for instance, we want to do things
             * in the beforeSleep() hook, like fsyncing a file to disk,
             * before replying to a client. */
            // 是否反转读写事件的顺序(不设置 AE_BARRIER 时,先读后写)
            int invert = fe->mask & AE_BARRIER;

            /* Note the "fe->mask & mask & ..." code: maybe an already
             * processed event removed an element that fired and we still
             * didn't processed, so we check if the event is still valid.
             *
             * Fire the readable event if the call sequence is not
             * inverted. */
            if (!invert && fe->mask & mask & AE_READABLE) {
                // 处理读事件回调
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
                // 防止在事件处理函数中 resize 过数组大小,所以需要重新获取下地址
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
            }

            /* Fire the writable event. */
            if (fe->mask & mask & AE_WRITABLE) {
                // 处理写事件回调(读写回调函数相同时,每轮poll只执行一次读或者写回调)
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            /* If we have to invert the call, fire the readable event now
             * after the writable one. */
            if (invert) {
                // 反转读写时先写,然后在这边后读(读写回调函数相同时,每轮poll只执行一次读或者写回调)
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
                if ((fe->mask & mask & AE_READABLE) &&
                    (!fired || fe->wfileProc != fe->rfileProc))
                {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            processed++;
        }
    }
    /* Check time events */
    // 处理定时器事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

执行主事件循环:

void aeMain(aeEventLoop *eventLoop) {
    // 启动事件循环
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        // 处理事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|
                                   AE_CALL_BEFORE_SLEEP|
                                   AE_CALL_AFTER_SLEEP);
    }
}


http://www.kler.cn/a/406323.html

相关文章:

  • html5复习一
  • 小红书运营教程10(小红书笔记制作01)
  • Redis ⽀持哪⼏种数据类型?适⽤场景,底层结构
  • 了解Redis(第一篇)
  • 缓存工具类编写
  • 网络安全应急响应及其发展方向
  • 《Django 5 By Example》阅读笔记:p645-p650
  • SQL注入:理解、防范与最佳实践
  • Ubuntu安装Electron环境
  • 学习electron
  • C#实现blob分析——分别基于OpenCvSharp和Emgu实现
  • 力扣 LeetCode 501. 二叉搜索树中的众数(Day10:二叉树)
  • 【vim】vim怎么从指定行到指定行的行首添加内容
  • 真题-桂城2018年六年级
  • OpenCV与AI深度学习|16个含源码和数据集的计算机视觉实战项目(建议收藏!)
  • HarmonyOS . 沉浸状态栏使用
  • Elasticsearch Windows版的安装及启动
  • 14:00面试,14:08就出来了,问的问题有点变态。。。
  • Unreal从入门到精通之如何绘制用于VR的3DUI交互的手柄射线
  • 基于干扰观测器的 PD 控制
  • 高性能存储SIG月度动态:重构和优化fuse,推动containerd社区支持erofs
  • 大模型基本能力评测---知识利用
  • Linux2.6内核进程调度队列
  • Windows 驱动开发中 ExAcquireResourceExclusiveLite 和其他锁的区别:
  • Windows中指定路径安装DockerDesktop
  • 死锁相关习题 10道 附详解