深入理解Linux网络随笔(四):内核是如何与用户进程协作的(下篇:多路I/O复用模型epoll)
3 多路I/O复用模型(epoll)
epoll是一种I/O事件通知机制,是linux 内核实现IO多路复用的一个实现。I/O多路复用是指一个进程或线程同时监听多个 I/O 事件(如网络连接、文件操作等),并能够在其中一个或多个 I/O 事件就绪时进行读写操作,而不会因为某个 I/O 操作阻塞而停滞不前。
I/O解释:在Linux中均用文件描述符fd表示,输入输出的对象可以是文件(file), 网络(socket),进程之间的管道(pipe)。
事件解释:I/O 操作中某个特定条件的触发,例如:读、写等操作。
通知机制解释:当事件发生时,系统或程序通过某种方式告知进程或线程,通常是通过回调函数、信号或状态变更等手段。
epoll用户态调用函数如下:
int epfd = epoll_create(EPOLL_SIZE);
内核会产生一个epoll 实例数据结构并返回一个文件描述符,EPOLL_SIZE
告诉内核这个监听的数目一共有多大。
epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ev);
将被监听的描述符fd添加到红黑树或从红黑树中删除或者对监听事件进行修改。
nfds = epoll_wait(kdpfd, events, maxevents, -1);
阻塞等待注册的事件发生,返回事件的数目并将触发的事件写入events数组中。
3.1epoll_create–创建epoll对象
当我们在用户态使用epoll_create
创建一个epool对象epfd时,进入系统调用后,内核会创建一个struct eventpoll
结构体对象,由于一个红黑树rbr、两个双向链表rdllist、wait组成。
struct eventpoll {
struct rb_root rbr; // 红黑树的根节点,用于管理注册的事件
struct list_head rdllist; // 就绪队列,存储准备好的事件
wait_queue_head_t wait; // 等待队列,用于等待事件的进程可以在这里等待
......
};
调用ep_alloc
函数进行初始化,完成内容如下:
(1)kzalloc申请eventpoll内存
(2)初始化等待队列头
(3)初始化就绪列表
(4)初始化红黑树指针
static int ep_alloc(struct eventpoll **pep)
{
int error;
struct user_struct *user;
struct eventpoll *ep;
......
ep = kzalloc(sizeof(*ep), GFP_KERNEL);
init_waitqueue_head(&ep->wq);
init_waitqueue_head(&ep->poll_wait);
INIT_LIST_HEAD(&ep->rdllist);
ep->rbr = RB_ROOT_CACHED;
}
3.2 epoll_ctl–向epoll对象添加socket
使用epoll_ctl注册socket时,通过SYSCALL_DEFINE4
系统调用进入内核。接收的参数如下:
- epfd:epoll实例的文件描述符,op:操作类型,fd:需要操作的文件描述符,event:epoll事件结构体
- op操作类型:EPOLL_CTL_ADD:添加一个事件。EPOLL_CTL_MOD:修改一个事件。EPOLL_CTL_DEL:删除一个事件。
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
struct epoll_event epds;
//将用户空间数据event复制到内核空间epds中
if (ep_op_has_event(op) &&
copy_from_user(&epds, event, sizeof(struct epoll_event)))
return -EFAULT;
//实际操作
return do_epoll_ctl(epfd, op, fd, &epds, false);
}
do_epoll_ctl
是epoll_ctl
添加socket的核心处理函数,根据不同的op操作类型进行不同的操作,EPOLL_CTL_ADD
添加事件调用ep_insert
,EPOLL_CTL_DEL
删除事件调用ep_remove
,EPOLL_CTL_MOD
修改事件调用ep_modify
。
int do_epoll_ctl(int epfd, int op, int fd, struct epoll_event *epds,
bool nonblock)
{
.......
switch (op) {
//添加事件
case EPOLL_CTL_ADD:
if (!epi) {
epds->events |= EPOLLERR | EPOLLHUP;
error = ep_insert(ep, epds, tf.file, fd, full_check);
} else
error = -EEXIST;
break;
//删除事件
case EPOLL_CTL_DEL:
if (epi)
error = ep_remove(ep, epi);
else
error = -ENOENT;
break;
//修改事件
case EPOLL_CTL_MOD:
if (epi) {
if (!(epi->event.events & EPOLLEXCLUSIVE)) {
epds->events |= EPOLLERR | EPOLLHUP;
error = ep_modify(ep, epi, epds);
}
} else
error = -ENOENT;
break;
}
.....
}
调用ep_insert
函数注册socket。完成内容如下:
(1)kmem_cache_zalloc
分配一个epi
对象进行初始化
(2)对epi初始化,设置epoll实例ep、句柄号以及struct file地址
(3)设置socket等待队列epq,定义回调函数ep_ptable_queue_proc
(4)epi插入红黑树
static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
struct file *tfile, int fd, int full_check)
{
int error, pwake = 0; // error 用于存储错误码,pwake 用于控制唤醒的次数
__poll_t revents; // 存储从 poll 回调中获取的事件标志
struct epitem *epi; // 用于存储插入的 epoll 事件项
struct ep_pqueue epq; // 用于事件队列处理
struct eventpoll *tep = NULL; // 临时的 epoll 结构体
// 分配新的 epitem 结构体用于存储事件信息
if (!(epi = kmem_cache_zalloc(epi_cache, GFP_KERNEL))) {
percpu_counter_dec(&ep->user->epoll_watches); // 失败时减少计数
return -ENOMEM; // 内存分配失败
}
// 初始化 epitem 结构体
INIT_LIST_HEAD(&epi->rdllink); // 初始化链表
epi->ep = ep; // 设置所属的 epoll 实例
ep_set_ffd(&epi->ffd, tfile, fd); // 设置文件描述符和文件结构
ep_rbtree_insert(ep, epi);
// 初始化 poll 表,设置回调函数
epq.epi = epi;
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc); // 设置 poll 回调函数
/*
* 将事件项挂到 poll 队列,并获取当前的事件位。
* 使用的文件描述符已经由调用方增加了引用计数,所以这里可以安全使用。
*/
revents = ep_item_poll(epi, &epq.pt, 1); // 获取事件的 poll 状态
return 0; // 成功
}
调用epoll_ctl分配一个epitem对象,如下:红黑树节点rbn、等待队列pwqlist、socket文件描述符ffd、eventpoll。
struct epitem {
union {
/* RB tree node links this structure to the eventpoll RB tree */
struct rb_node rbn;
/* Used to free the struct epitem */
struct rcu_head rcu;
};
/* The file descriptor information this item refers to */
struct epoll_filefd ffd;
/* List containing poll wait queues */
struct eppoll_entry *pwqlist;
/* The structure that describe the interested events and the source fd */
struct epoll_event event;
};
函数调用ep_item_poll-->sock_poll-->tcp_poll-->sock_poll_wait-->poll_wait
,主要作用是在等待队列中挂起进程,等待I/O事件发生。_qproc
指针指向的回调函数是ep_ptable_queue_proc
。
static inline void poll_wait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p)
{
if (p && p->_qproc && wait_address)
p->_qproc(filp, wait_address, p);
}
调用sock_poll_wait
之前会先调用sleep_wait
函数,返回sk关联的等待队列头wait_queue_head_t
地址。
static inline wait_queue_head_t *sk_sleep(struct sock *sk)
{
BUILD_BUG_ON(offsetof(struct socket_wq, wait) != 0);
return &rcu_dereference_raw(sk->sk_wq)->wait;
}
ep_ptable_queue_proc
通过init_waitqueue_func_entry
新建并初始化一个等待项,注册其回调函数ep_poll_callback
,调用add_wait_queue
函数将注册的等待项放入socket的等待队列whead。软中断收到数据放入socket接收队列,调用这个回调函数通知epoll。
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
poll_table *pt)
{
struct ep_pqueue *epq = container_of(pt, struct ep_pqueue, pt);
struct epitem *epi = epq->epi;
struct eppoll_entry *pwq;
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
pwq->whead = whead;
pwq->base = epi;
if (epi->event.events & EPOLLEXCLUSIVE)
add_wait_queue_exclusive(whead, &pwq->wait);
else
add_wait_queue(whead, &pwq->wait);
pwq->next = epi->pwqlist;
epi->pwqlist = pwq;
}
分配完epitem对象将其插入红黑树,这里使用红黑树是为了让epoll在查找、插入效率、内存开销等方面均衡。
3.2 epoll_wait–等待其管理的连接上的I/O事件
调用epoll_wait时,主要观察 eventpoll->rdllist 就绪链表里有没有数据,如果有数据就返回,没有数据就创建一个等待队列项,将其添加到 eventpoll 的等待队列上,阻塞自己。系统调用如下:
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
int, maxevents, int, timeout)
{
...
error = ep_poll(ep, events, maxevents, timeout);
}
ep_poll
函数完成实际的操作,主要工作如下:
(1)ep_events_available
判断就绪队列是否存在就绪事件
(2)init_waitqueue_entry
等待事件关联当前进程
(3)__add_wait_queue_exclusive
将进程加入等待队列
(4)设置进程的状态 TASK_INTERRUPTIBLE
,让出CPU,主动进入睡眠状态
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, struct timespec64 *timeout)
{
int res, eavail, timed_out = 0;
u64 slack = 0;
wait_queue_entry_t wait;
ktime_t expires, *to = NULL;
eavail = ep_events_available(ep);
while (1) {
if (eavail) {
res = ep_send_events(ep, events, maxevents);
if (res)
return res;
}
init_wait(&wait);
wait.func = ep_autoremove_wake_function;
__add_wait_queue_exclusive(&ep->wq, &wait);
__set_current_state(TASK_RUNNING);
}
ep_events_available
负责判断就绪链表中是否有可处理的事件。
static inline int ep_events_available(struct eventpoll *ep)
{
//判断就绪链表是否为空,不为空则存在就绪事件
return !list_empty_careful(&ep->rdllist) ||
READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR;
}
如果不存在就绪连接,调用init_waitqueue_entry
保存当前进程上下文状态信息,并设置唤醒进程的回调函数。
static inline void init_waitqueue_entry(struct wait_queue_entry *wq_entry, struct task_struct *p)
{
wq_entry->flags = 0;
wq_entry->private = p;//保存当前任务状态信息
wq_entry->func = default_wake_function;//设置唤醒回调函数
}
在__add_wait_queue_exclusive
中将wait_queue_entry
等待队列项加入等待队列。
static inline void
__add_wait_queue_exclusive(struct wait_queue_head *wq_head, struct wait_queue_entry *wq_entry)
{
//设置进程状态标志,如果某个等待队列项具有这个标志,它将在等待队列中拥有独占访问权
wq_entry->flags |= WQ_FLAG_EXCLUSIVE;
//将进程添加到等待队列中
__add_wait_queue(wq_head, wq_entry);
}
小结部分:
epoll整体流程图如下:
epoll_create负责初始化,创建eventpoll结构体,包含红黑树、就绪列表、等待队列等,并在 struct file
里挂载 private_data
指针,用于存储 epoll 相关的数据结构。
epoll_ctl负责将要监听的socket挂载到epoll的红黑树中,并给内核中断处理注册了一个回调函数default_wake_function
,当内核检测到fd上有可读/可写事件时,调用回调函数将fd从红黑树移动到就绪列表中,唤醒被阻塞epoll_wait
的进程,修改任务状态为可运行状态。
epoll_wait负责检查就绪列表,就绪列表没有数据会修改任务状态为可中断状态,并加入到epoll 的接收队列中,让出CPU。
那么在多路I/O复用中,方案select、poll以及epoll为什么说epoll性能高呢?
(1)避免了O(n) 轮询
传统的 select
/ poll
每次调用都要 遍历所有监听的 fd(O(n)),而 epoll 采用 事件驱动 方式,只有真正发生事件的 fd 才会被处理(O(1))。
(2)减少了用户态和内核态的切换
epoll_wait
可以一次返回多个事件,用户进程可以一次性处理多个I/O事件,并非1对1关系。例如在 高并发 场景下,监听的 socket fd 很多,可能在 epoll_wait
还未进入休眠前,新的数据就已经到达,导致 epoll_wait
立即返回,所以只要就绪列表不为空,那么epoll_wait就会一直处于工作状态。而 select
/ poll
需要用户进程 反复轮询 并调用 read/write
进行 I/O 操作,造成大量的 系统调用开销。
(3)支持 EPOLLEXCLUSIVE
解决惊群
在多线程/多进程监听相同 fd 时,epoll 允许 只有一个线程 被唤醒,而不是所有线程都唤醒,减少了 惊群(Thundering Herd)问题。