Linux/POSIX 多路IO复用
多路复用IO概念
如何管理 fd 的可读可写事件?
IO 多路:指同时对多个文件进行读写操作。大体有两种方案:
-
阻塞IO模式
-
一个线程只能处理一个流的IO事件
-
缺点:若线程数增多性能会变差
多个线程处理多个IO(浪费CPU资源,效率低)—— 单个线程while循环。
-
while(true) {
select(stream[]);
for (i->stream[]) {
if i has data {
read data until unavailable
}
}
}
-
多路复用IO。复用是指:使用一个线程处理多个文件fd的读写 的模式。
-
让每一个 fd 觉得,当前线程只在给他一个fd跑腿。
-
要尽可能的少的空转,要把所有的时间都用在处理句柄的 IO 上。
-
为了实现上诉的功能,内核提供了 3 种系统调用 :select
,poll
,epoll
。
系统调用
select poll epoll比较
相同:这 3 种系统调用都能够管理 fd 的可读可写事件,
- 所有 fd 不可读不可写时,阻塞线程切走 cpu
- 存在 fd 可读写时,唤醒对应线程。
适用场景:
-
select/poll:适合连接数少、跨平台需求或兼容性要求高的场景。
-
优先选择
poll
:在多数现代场景中,poll
的灵活性(无 fd 数量限制、独立事件设置)更优。 -
仅在特定场景用
select
:如兼容性要求、极低并发且需多事件监控,或依赖剩余超时时间时。
-
-
高并发场景选
epoll
/kqueue
:若在 Linux 下,直接使用epoll
;BSD 系用kqueue
,彻底超越select
/poll
。适合 Linux 下高并发、长连接的服务器(如 Web 服务器、实时通信系统)。短连接频繁的场景可能因频繁调用epoll_ctl
影响性能。
特性 | select | poll | epoll |
---|---|---|---|
文件描述符数量限制 | 有(1024) | 无(基于链表实现) | 无 |
内核实现 | 基于轮询(polling),内核遍历 fd 检查状态。 | 基于轮询:同select | 基于回调(事件驱动) |
遍历效率 | 低(O(n) 遍历)返回后需遍历fd检查状态。 | 低:同select | 高(O(1) 回调) |
拷贝开销 | 开销大:每次调用都需要把fd集合从用户态拷贝到内核态。 | 开销大:同select | 开销小:在内核中维护了一个事件表。 |
数据拷贝 | 每次复制 fd 集合 | 每次复制 fd 数组 | 内核维护事件表,无需全拷贝 |
可移植性 | POSIX 标准,跨平台 | POSIX 标准,跨平台 | Linux 特有 |
触发模式 | 水平触发(LT) | 水平触发(LT) | 水平触发(LT)、边缘触发(ET) |
水平触发(LT)、边缘触发(ET)
- 水平触发是只要fd处于就绪状态,每次调用都会通知;
- 边缘触发只在状态变化时通知一次,需要程序处理完所有数据,否则可能丢失事件。
epoll
简介
总而言之,epoll是
-
linux内核提供的一种系统调用,性能好于另外两个系统调用 poll,select。
-
用于处理IO 多路复用(管理 fd 的可读可写事件)
-
在所有 fd 不可读不可写无所事事的时候,可以阻塞线程,切走 cpu
-
fd 可读写的时候,对应线程会被唤醒
-
使用事件驱动的方式,当某个fd就绪时,内核会采用回调机制直接将该fd加入到就绪列表,不需要每次都扫描所有fd。epoll_wait返回时只提供就绪的事件。会把哪个流发生的什么样的IO事件进行通知,时间复杂度O(1),对流的操作都是有意义的
while(true) {
active_stream[] = epoll_wait();
for i in active_stream[] {
read or write till unavailable
}
}
epoll 的使用
使用 epoll 需要以下三个系统调用:
//头文件
#include <sys/epoll.h>
// 创建一个epoll句柄,size用来告诉内核需要监听的数目
int epoll_creat(int size);
// epoll的事件注册函数
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
// 参数events用来从内核得到事件的几个
// // maxevents events的大小,不能大于epoll_creat创建时的size
// // timeout 超时时间
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
epollcreate
epollcreate
创建用于监控、管理 句柄 fd
的池子
把池子当作是黑盒,不用纠结其中细节
// 创建一个epoll句柄,size用来告诉内核需要监听的数目
int epoll_creat(int size);
#include <sys/epoll.h>
// 创建一个epoll句柄,size用来告诉内核需要监听的数目
// 拿到句柄 epollfd ,这个句柄唯一代表该 epoll 池
int epollfd = epoll_create(1024);
if (epollfd == -1) {
perror("epoll_create");
exit(EXIT_FAILURE);
}
epollctl
epollctl
负责管理这个池子里的 fd 增、删、改;
// epoll的事件注册函数
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
#include <sys/epoll.h>
int epollfd = epoll_create(1024);
struct epoll_event {
// events 用于指定我们监听的 fd 事件类型,常见的值有:
// - EPOLLIN 可读事件
// - EPOLLOUT 可写事件
// 可以通过或操作同时生效: event.events = EPOLLIN | EPOLLOUT;
__uint32_t events; /* Epoll 事件 */
epoll_data_t data; /* 用户数据 */
};
// 指定:同时监听可读可写事件
epoll_event ev;
ev.events = EPOLLIN | EPOLLOUT;
ev.data.u32 = 1;
// 往 epoll 池里放 fd
// - EPOLL_CTL_ADD 表明操作是增加 fd
// - 最后一个参数ev是 epoll_event 结构体
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev) == -1) {
perror("epoll_ctl: listen_sock");
exit(EXIT_FAILURE);
}
// 从 epoll 池里删除 fd
for (auto fd : fds) {
epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, nullptr);
close(fd);
}
epollwait
epollwait
让出 CPU 调度,但是只要有“事”,立马会从这里唤醒;
- 调用 epoll_wait 进入休眠状态
- 可读或可写事件到来时,休眠中的程序从 epoll_wait 处被唤醒。
// 让出 CPU 调度
// - epollfd 表示epoll池
// - events 用于存储收到的多个事件,是一个数组 epoll_event *
// - maxevents 用于设定最多监听多少个事件,不能大于epoll_creat创建时的size
// - timeout 指定阻塞时间上限,-1:表示调用将一直阻塞
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
#include <sys/epoll.h>
while (true)
{
// 用于取出,放入事件们的数组
epoll_event events[10];
// 进入休眠状态
int count = epoll_wait(epollfd, events, EPOLL_SIZE, -1);
if (count < 0) { perror("epoll failed"); break; }
for (int i=0;i < count;i++) {
//处理可读或可写事件
// uint32_t alarm_idx = events[i].data.u32;
// uint64_t unused;
// ssize_t err = read(fds[alarm_idx], &unused, sizeof(unused));
}
}
例子: 监听timerfd
epoll_create 创建一个,用于监控、管理 句柄 fd
的池子;
// 1.创建一个 timerfd 句柄
int fdTimer = timerfd_create(CLOCK_MONOTONIC, 0 /*flags*/);
// 2.启动定时器
// 2-1 init struct itimerspec
itimerspec timespec {
// 设置超时间隔
.it_interval = {
.tv_sec = 5,
.tv_nsec = 0,
},
//第一次超时时间
.it_value = {
.tv_sec = 5,
.tv_nsec = 0,
},
};
// 2-2 启动 timerfd
int err = timerfd_settime(fdTimer, 0 /*flags*/, ×pec, nullptr);
if (err < 0) { ALOGE("timerfd_settime() failed: %s", strerror(errno)); return 0;}
// 3.初始化epoll fd
epollfd = epoll_create(1024);
if (epollfd == -1) {perror("epoll_create"); exit(EXIT_FAILURE);}
// epoll 监听 timerfd对象 fdTimer
epoll_ctl(epollfd, EPOLL_CTL_Add, fdTimer, &eventItem);
while (true)
{
// 用于 取出&存储 事件们的数组
epoll_event eventItems[10];
// 进入休眠状态
epoll_wait(epollfd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
if (count < 0) {
perror("epoll failed");
break;
}
for (int i=0;i < count;i++)
{
//处理计时器到达事件
// uint32_t alarm_idx = eventItems[i].data.u32;
}
}
常用文件句柄
eventfd
介绍
正因eventfd在计数器 大于/等于 0时,对应的可读状态 会存在 非阻塞/阻塞 的区别,所以是epoll常用的文件句柄。
功能:用来实现多进程/多线程的之间的事件通知的,也可以由内核通知用户空间应用程序。
实现:通过在 linux 内核空间维护一个 64 位计数器,并在进程间共享,从而完成IPC。
操作
eventfd 有三个常用的操作:
- 初始化 init:给内核计数器赋初始值,一般为 0
- 写操作 write:向内核空间写入一个 64 位的值,递增内核计数器
- 读操作 read:
- 当内核计数器为 0 时,读操作阻塞
- 当内核计数器大于 0 时,读操作不会阻塞。取计数器的累加值,将计数器赋 0。
初始化 eventfd
使用 eventfd 函数来进行初始化:
#include <sys/eventfd.h>
int eventfd(unsigned int initval, int flags);
// 举个例子
int efd = eventfd(0, 0);
if (efd == -1)
{
handle_error("eventfd");
}
params
initval
:创建eventfd时它所对应的 64 位计数器的初始值;flags
:eventfd文件描述符的标志,用以改变 eventfd 的行为,大多情况设置为 0 即可。0
:如果是2.6.26或之前版本的内核,flags 必须设置为 0EFD_CLOEXEC
(since Linux 2.6.27):文件被设置成 O_CLOEXEC,创建子进程 (fork) 时不继承父进程的文件描述符EFD_NONBLOCK
(since Linux 2.6.27):设置文件描述符为非阻塞的,设置了这个标志后,如果没有数据可读,就返回一个 EAGAIN 错误,不会一直阻塞。EFD_SEMAPHORE
(since Linux 2.6.30):提供类似信号量语义的 read 操作,简单说就是计数值 count 递减 1。可以多次 read
读写 write read
初始化后的 eventfd,在内核中有一个 64 位的整数与之对应。可通过 系统调用read/write 来修改这个数字。
写操作:递增内核计数器,累加计数
uint64_t u = 1;
ssize_t n;
// 写 eventfd,必须是 64 位整数
// 内部计数器 += 1 变为 1
n = write(efd, &u, sizeof(uint64_t));
// 内部计数器 += 2 变为 3
u = 2;
n = write(efd, &u, sizeof(uint64_t));
// 内部计数器 += 3 变为 6
u = 3;
n = write(efd, &u, sizeof(uint64_t));
读操作:读出计数器的值,并将内核中的计数器赋值为 0
- 当内核计数器为 0 时,读操作阻塞
- 当内核计数器大于 0 时,读操作不会阻塞
n = read(efd, &u, sizeof(uint64_t));
timerfd
timerfd 是一个时间相关的 fd,初始化时可以设置一个超时时间。
- 超时之前,句柄不可读
- 超时之后,句柄可读,读出来的是超时的次数。
// 创建一个 timerfd 句柄
int timerfd_create(int clockid, int flags);
// 启动或关闭 timerfd 对应的定时器
int timerfd_settime(int fd, int flags, const struct itimerspec *new_value, struct itimerspec *old_value);
// 获取指定 timerfd 距离下一次超时还剩的时间
int timerfd_gettime(int fd, struct itimerspec *curr_value);
由于其 超时前后可读/不可读的特性,常与 epoll 系统调用结合使用。