redis7.x源码分析:(4) ae事件处理器(一)
ae模块是redis实现的Reactor模型的封装。它的主要代码实现集中在 ae.c 中,另外还提供了平台相关的io多路复用的封装,它们都实现了一套相同的poll接口,就类似于C++中提供了一个接口基类,由针对不同平台的派生类去实现。
// 创建平台相关的io模型实例
static int aeApiCreate(aeEventLoop *eventLoop)
// 修改可侦听的fd数量
static int aeApiResize(aeEventLoop *eventLoop, int setsize)
// 释放实例
static void aeApiFree(aeEventLoop *eventLoop)
// 添加或修改fd的侦听事件
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)
// 删除或修改fd的侦听事件
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask)
// 侦听所有fd的事件
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)
// 返回使用的io模型名称
static char *aeApiName(void)
实际使用的io模型会根据编译时定义的宏在 ae.c 的代码头部直接引入。
/* Include the best multiplexing layer supported by this system.
* The following should be ordered by performances, descending. */
// 根据不同操作系统下不同的宏选择应该使用的io复用模型
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
// 对应linux
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
// 对应BSD(FreeBSD、MacOS等)
#include "ae_kqueue.c"
#else
// 对应Solaris
#include "ae_select.c"
#endif
#endif
#endif
以linux为例先来看一下ae_epoll.c的实现,它的代码也比较简单:
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
// 分配events数组大小
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
// 创建epoll, kernal 2.6.8 之后参数被忽略了
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
anetCloexec(state->epfd);
eventLoop->apidata = state;
return 0;
}
......
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
// 没注册过, 认为是添加fd到epoll
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
// 合并新老事件
mask |= eventLoop->events[fd].mask; /* Merge old events */
// 设置需要的事件
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
// 删除指定事件
int mask = eventLoop->events[fd].mask & (~delmask);
ee.events = 0;
// 设置需要的事件
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
// 没有事件执行删除,否则执行修改
if (mask != AE_NONE) {
epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
} else {
/* Note, Kernel < 2.6.9 requires a non null event pointer even for
* EPOLL_CTL_DEL. */
epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
}
}
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
// 此处的 (tvp->tv_usec + 999)/1000 是为了避免当 tvp->tv_usec < 1000us 时转换成毫秒为 0ms, 导致epoll_wait不会等待
// 因此这么写之后能保证只要 tvp->tv_usec 不为0,那么至少都能等待 1ms 时长
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
// 保存触发的事件
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
} else if (retval == -1 && errno != EINTR) {
panic("aeApiPoll: epoll_wait, %s", strerror(errno));
}
return numevents;
}
接下来再看一下ae相关接口,先看下fd相关实现。
主要结构体定义如下:
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */ // 事件类型
aeFileProc *rfileProc; // 读事件回调
aeFileProc *wfileProc; // 写事件回调
void *clientData; // 上下文指针
} aeFileEvent;
/* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */ // 定时器事件id
monotime when; // 触发时间
aeTimeProc *timeProc; // 定时器触发回调
aeEventFinalizerProc *finalizerProc; // 定时器销毁时执行的回调
void *clientData; // 上下文指针
struct aeTimeEvent *prev; // 前一个定时器事件
struct aeTimeEvent *next; // 后一个定时器事件
int refcount; /* refcount to prevent timer events from being
* freed in recursive time event calls. */ // 引用计数
} aeTimeEvent;
/* A fired event */
typedef struct aeFiredEvent {
int fd; // 触发事件的定时器
int mask; // 触发的事件类型
} aeFiredEvent;
/* State of an event based program */
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */ // 注册的最大fd值
int setsize; /* max number of file descriptors tracked */ // 注册的fd数量
long long timeEventNextId; // 自增用于生成定时器id
aeFileEvent *events; /* Registered events */ // 注册的fd事件
aeFiredEvent *fired; /* Fired events */ // 触发的fd事件
aeTimeEvent *timeEventHead; // 定时器事件链表头
int stop; // 结束标志
void *apidata; /* This is used for polling API specific data */ // 不同poll的上下文信息
aeBeforeSleepProc *beforesleep; // poll前调用
aeBeforeSleepProc *aftersleep; // poll后调用
int flags;
} aeEventLoop;
aeEventLoop中的events是用于保存fd注册事件的数组,它是以fd值作为索引来存取事件的。
创建eventloop事件管理器:
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
// 初始化一下单调时钟使用哪种方式实现
monotonicInit(); /* just in case the calling app didn't initialize */
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
// events、fired数组以fd值为下标,直接存取对应fd的相关数据
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
eventLoop->aftersleep = NULL;
eventLoop->flags = 0;
// 根据平台创建io模型实例
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}
添加和删除事件:
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
// 超出events数组有效范围直接报错
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd];
// 添加或修改fd读写事件
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
{
if (fd >= eventLoop->setsize) return;
aeFileEvent *fe = &eventLoop->events[fd];
if (fe->mask == AE_NONE) return;
/* We want to always remove AE_BARRIER if set when AE_WRITABLE
* is removed. */
if (mask & AE_WRITABLE) mask |= AE_BARRIER;
aeApiDelEvent(eventLoop, fd, mask);
fe->mask = fe->mask & (~mask);
// 如果本次删除事件的fd是最大的fd,并且该fd已经没有事件了(可以认为该fd被清除了)
if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
/* Update the max fd */
int j;
// 从后往前遍历找到第一个有事件的fd赋给maxfd
for (j = eventLoop->maxfd-1; j >= 0; j--)
if (eventLoop->events[j].mask != AE_NONE) break;
eventLoop->maxfd = j;
}
}
事件处理函数,处理一轮事件:
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/* Note that we want to call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
struct timeval tv, *tvp;
int64_t usUntilTimer = -1;
// 获取最近的一个定时器距当前时间的触发时长
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
usUntilTimer = usUntilEarliestTimer(eventLoop);
if (usUntilTimer >= 0) {
tv.tv_sec = usUntilTimer / 1000000;
tv.tv_usec = usUntilTimer % 1000000;
tvp = &tv;
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}
// 不需要等待,超时时间设为0
if (eventLoop->flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
}
// 执行poll前的回调(对fd读写以及命令处理都会在beforsleep中执行, 如果开启多线程读写也会在它内部一并处理)
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
// 等待事件
numevents = aeApiPoll(eventLoop, tvp);
/* After sleep callback. */
// 执行poll后的回调
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
// 处理poll到的读写事件
for (j = 0; j < numevents; j++) {
int fd = eventLoop->fired[j].fd;
aeFileEvent *fe = &eventLoop->events[fd];
int mask = eventLoop->fired[j].mask;
int fired = 0; /* Number of events fired for current fd. */
/* Normally we execute the readable event first, and the writable
* event later. This is useful as sometimes we may be able
* to serve the reply of a query immediately after processing the
* query.
*
* However if AE_BARRIER is set in the mask, our application is
* asking us to do the reverse: never fire the writable event
* after the readable. In such a case, we invert the calls.
* This is useful when, for instance, we want to do things
* in the beforeSleep() hook, like fsyncing a file to disk,
* before replying to a client. */
// 是否反转读写事件的顺序(不设置 AE_BARRIER 时,先读后写)
int invert = fe->mask & AE_BARRIER;
/* Note the "fe->mask & mask & ..." code: maybe an already
* processed event removed an element that fired and we still
* didn't processed, so we check if the event is still valid.
*
* Fire the readable event if the call sequence is not
* inverted. */
if (!invert && fe->mask & mask & AE_READABLE) {
// 处理读事件回调
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
// 防止在事件处理函数中 resize 过数组大小,所以需要重新获取下地址
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
}
/* Fire the writable event. */
if (fe->mask & mask & AE_WRITABLE) {
// 处理写事件回调(读写回调函数相同时,每轮poll只执行一次读或者写回调)
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
/* If we have to invert the call, fire the readable event now
* after the writable one. */
if (invert) {
// 反转读写时先写,然后在这边后读(读写回调函数相同时,每轮poll只执行一次读或者写回调)
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
if ((fe->mask & mask & AE_READABLE) &&
(!fired || fe->wfileProc != fe->rfileProc))
{
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
}
/* Check time events */
// 处理定时器事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
执行主事件循环:
void aeMain(aeEventLoop *eventLoop) {
// 启动事件循环
eventLoop->stop = 0;
while (!eventLoop->stop) {
// 处理事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}