2.5 io_uring
io_uring的相关函数接口介绍
`io_uring` 是 Linux 内核中一种高效的异步 I/O 接口,最早引入于 **Linux 内核 5.1** 版本。它是由 Jens Axboe 开发的,目的是提供更高效的异步 I/O 操作,尤其是相比 `epoll` 和 `aio`,`io_uring` 减少了系统调用的开销。
`io_uring` 相关的函数和系统调用主要包括以下几类:
### 1. **系统调用(syscalls)**
- **`io_uring_setup()`**: 创建并初始化一个 `io_uring` 实例。这个系统调用分配内存,并返回一个文件描述符,用于后续的 `io_uring` 操作。
- **`io_uring_enter()`**: 用于提交请求并等待完成。如果需要等待某些事件,可以通过此调用完成相应的操作。
- **`io_uring_register()`**: 用于注册缓冲区、文件描述符等信息,以便在之后的 I/O 操作中复用,减少系统调用开销。
### 2. **`liburing` 用户态库**
为了更方便的使用 `io_uring`,开发了 `liburing` 库,这个库对系统调用进行了一层封装。它的函数更易于使用,常用的函数有:
- **`io_uring_queue_init()`**: 初始化一个 `io_uring` 实例,封装了 `io_uring_setup()`。
- **`io_uring_queue_exit()`**: 清理和关闭 `io_uring` 实例。
- **`io_uring_get_sqe()`**: 获取一个用于提交 I/O 请求的 `submission queue entry (SQE)`。
- **`io_uring_submit()`**: 提交所有已经排队的 I/O 请求。
- **`io_uring_wait_cqe()`**: 等待一个 I/O 请求完成,并返回一个 `completion queue entry (CQE)`。
- **`io_uring_peek_cqe()`**: 检查是否有已完成的 I/O 请求,而不阻塞当前线程。
- **`io_uring_register_buffers()`**: 将缓冲区注册到 `io_uring` 实例,以便复用这些缓冲区。
- **`io_uring_register_files()`**: 注册文件描述符,使得后续 I/O 请求可以直接使用这些描述符。
- **`io_uring_prep_readv()`**: 准备一个向量化读取请求。
- **`io_uring_prep_writev()`**: 准备一个向量化写入请求。
- **`io_uring_prep_poll_add()`**: 准备一个 `poll` 事件,用于监视文件描述符的变化。
### 3. **从 Linux 5.1 到现在的演进**
虽然 `io_uring` 在 Linux 5.1 中首次引入,但它在后续的内核版本中得到了大量的改进和新功能的添加。每个新版本都可能引入一些新特性和优化。以下是一些关键的内核版本和新增的功能:
- **Linux 5.4**: 添加了超时处理和文件同步功能。
- **Linux 5.6**: 引入了支持 `splice()` 操作和批量处理功能,进一步提升了性能。
- **Linux 5.7**: 添加了对 I/O 超时 (`timeout`) 的支持。
- **Linux 5.9**: 增加了通知机制,可以减少系统调用开销。
- **Linux 5.11**: 引入了更丰富的 `poll` 和缓冲区管理功能。
随着每个新版本的发布,`io_uring` 提供的功能越来越丰富,适用的场景也越来越广泛。特别是在处理高并发和低延迟需求的应用场景中,`io_uring` 展现了其优势。
历史版本的时间点
以下是 `io_uring` 相关的关键 Linux 内核版本及其发布时间:
1. **Linux 5.1** - 引入 `io_uring`:
- **发布时间:2019年5月5日**
- `io_uring` 在这个版本中首次引入,提供了一个高效的异步 I/O 接口。
2. **Linux 5.4** - 添加超时处理和文件同步功能:
- **发布时间:2019年11月24日**
- 增加了对 `io_uring` 操作的超时处理,以及文件同步操作的支持。
3. **Linux 5.6** - 引入 `splice()` 操作和批量处理功能:
- **发布时间:2020年3月29日**
- 支持 `splice()` 操作,允许批量处理多个 I/O 请求,进一步提升性能。
4. **Linux 5.7** - 添加对 I/O 超时 (`timeout`) 的支持:
- **发布时间:2020年5月31日**
- 提供了 I/O 请求的超时支持,使得某些 I/O 操作可以在设定的时间内超时。
5. **Linux 5.9** - 增加通知机制,减少系统调用开销:
- **发布时间:2020年10月11日**
- 增加了 `io_uring` 的通知机制,减少了用户态和内核态之间的系统调用开销。
6. **Linux 5.11** - 引入更丰富的 `poll` 和缓冲区管理功能:
- **发布时间:2021年2月14日**
- 为 `io_uring` 引入了更高级的 `poll` 事件处理以及改进的缓冲区管理功能。
随着每个新版本的发布,`io_uring` 不断优化其性能和扩展功能,尤其适用于高性能、低延迟的 I/O 场景。
是否可移植到低内核版本
移植 `io_uring` 到较低版本的内核是有可能的,但相当复杂,涉及到修改和集成内核中大量的底层组件。以下是一些关键步骤和注意事项:
### 1. **代码依赖性和兼容性检查**
- `io_uring` 引入了很多新机制和优化(如 ring buffer、异步 I/O 提交与完成队列等),依赖于较新内核中的底层基础设施。
- 首先,你需要确保较低版本的内核是否有实现这些依赖组件,如果没有,你需要将它们一起移植。
- 例如,`io_uring` 依赖于 `epoll`、`aio`、`splice` 等功能,并与内核调度、I/O 子系统、锁机制等深度结合。
### 2. **移植步骤**
- **获取 `io_uring` 代码**:你可以从较新版本的内核(如 5.1 或之后的版本)中提取与 `io_uring` 相关的代码,通常位于 `fs/io_uring.c`。
- **移植必要的依赖模块**:查看较低版本内核是否缺少对 `io_uring` 所需的依赖模块。如果缺少(例如 `aio` 改进、ring buffer 支持),你需要从新版本中一起移植。
- **修改内核中的接口**:内核 API 在不同版本中可能有所变化,较低版本的内核中可能没有 `io_uring` 所需的一些 API。你需要根据新旧内核之间的 API 差异做适配。
- 例如,`io_uring` 使用了一些高效的内存管理和调度机制,如果这些机制在低版本内核中不存在,可能需要将其相关的代码一并移植。
- **编译和测试**:在代码移植完成后,你需要编译内核,验证移植的 `io_uring` 能否正确工作。通过用户态程序发起异步 I/O 请求,检查其行为是否与较新内核中的 `io_uring` 相同。
### 3. **移植的挑战**
- **内核 API 差异**:随着内核的升级,内核 API 和子系统实现会发生较大的变化。在低版本内核中,很多 `io_uring` 所依赖的特性和优化可能不存在,这意味着你可能不仅需要移植 `io_uring`,还要移植这些相关功能。
- **性能问题**:即使移植成功,性能可能无法达到较新内核中的水准,因为较低版本内核缺乏许多对 `io_uring` 优化过的底层支持。
- **维护难度**:移植意味着要维护一套和内核不同步的代码,将来可能面临维护和更新的额外工作量。
### 4. **替代方案**
如果完整移植 `io_uring` 过于复杂,可以考虑以下替代方案:
- **升级内核**:如果可能,考虑将系统内核升级到 5.1 或更高版本,直接使用官方的 `io_uring` 支持。这是最简单和稳定的方式。
- **使用现有的异步 I/O 接口**:如果不能升级内核,可以考虑使用较低版本内核支持的其他异步 I/O 接口,例如 `epoll` 或 `aio`,虽然性能上不如 `io_uring`,但移植和实现的难度要小得多。
### 结论
虽然理论上可以将 `io_uring` 移植到低版本内核,但难度相当大,特别是在 API 兼容性、依赖组件、性能优化等方面。如果没有非常强的需求,建议考虑升级内核或者使用其他异步 I/O 方案。如果你决定进行移植,需要对内核开发有深入的了解,并准备充分的时间进行测试和调试。
实例
一个tcp server 在监听 可以连接多个client,用io_uring的方式写个例子,把收到的消息异步打印 后回复给客户端
为了使用 `io_uring` 实现一个 TCP 服务器,可以通过异步方式接收和发送消息。本示例中,服务器将接收客户端发送的消息,异步打印,并回复相同的消息给客户端。我们将使用 `io_uring` 的 `accept`、`recv` 和 `send` 操作来处理多个客户端的连接。
### 示例代码说明
1. **服务器端**使用 `io_uring` 进行异步 `accept` 连接、接收消息 (`recv`),然后通过 `send` 异步回复。
2. **多客户端处理**:服务器可以同时处理多个客户端的连接。
3. **异步消息处理**:消息的接收和发送都是通过 `io_uring` 异步完成。
### 环境依赖
- `liburing`:你需要安装该库来使用 `io_uring`。可以通过以下命令在系统上安装:
```bash
sudo apt-get install liburing-dev
```
### 示例代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <unistd.h>
#include <liburing.h>
/*
结合了 多方例程的优点
*/
#define PORT 8080
#define QUEUE_DEPTH 256
#define BUFFER_SIZE 1024
#define MAX_CLIENTS 1024 // client的最多数量
#define EV_ACCEPT 0
#define EV_READ 1
#define EV_SEND 2
#define log(cnt, ...) fprintf(stderr, "[%s][%d]: " cnt "\n", __func__, __LINE__, ##__VA_ARGS__);
struct io_data {
int fd;
char buffer[BUFFER_SIZE];
struct sockaddr_in client_addr;
socklen_t client_len;
int event;
};
static struct io_data **p_clients = NULL;
static int server_fd;
static struct io_uring ring;
// 信号处理函数
void handle_signal(int sig) {
if (sig == SIGINT) {
log("Caught SIGINT (Ctrl+C)!, %d", sig);
// 执行一些清理操作或退出程序
if (server_fd > 0) close(server_fd);
int i = 0;
for (i = 0; i < MAX_CLIENTS; i++) {
if (p_clients[i] != NULL) {
close(p_clients[i]->fd);
free(p_clients[i]);
p_clients[i] = NULL;
}
}
free(p_clients);
io_uring_queue_exit(&ring);
exit(0);
}
}
void add_accept(struct io_uring *ring, int server_fd, struct io_data *data) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_accept(sqe, server_fd, (struct sockaddr *)&data->client_addr, &data->client_len, 0);
data->event = EV_ACCEPT;
io_uring_sqe_set_data(sqe, data);
}
void add_recv(struct io_uring *ring, struct io_data *data) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
memset(data->buffer, 0, sizeof(data->buffer));
io_uring_prep_recv(sqe, data->fd, data->buffer, BUFFER_SIZE, 0);
data->event = EV_READ;
io_uring_sqe_set_data(sqe, data);
}
void add_send(struct io_uring *ring, struct io_data *data, int len) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_send(sqe, data->fd, data->buffer, len, 0);
data->event = EV_SEND;
io_uring_sqe_set_data(sqe, data);
}
int init_server()
{
int server_fd, opt = 1;
struct sockaddr_in server_addr;
// 创建服务器 socket
server_fd = socket(AF_INET, SOCK_STREAM, 0);
if (server_fd == 0) {
perror("socket failed");
exit(EXIT_FAILURE);
}
// 设置 socket 选项
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
perror("setsockopt failed");
close(server_fd);
exit(EXIT_FAILURE);
}
// 绑定地址和端口
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(PORT);
if (bind(server_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
perror("bind failed");
close(server_fd);
exit(EXIT_FAILURE);
}
// 开始监听
if (listen(server_fd, 5) < 0) {
perror("listen failed");
close(server_fd);
exit(EXIT_FAILURE);
}
return server_fd;
}
int get_cient_pos()
{
int i;
for (i = 0; i < MAX_CLIENTS; i++) {
if (p_clients[i] == NULL) break;
}
if (i == MAX_CLIENTS) {
log("no client pos support");
return -1;
}
p_clients[i] = malloc(sizeof(struct io_data));
if (p_clients[i] == NULL) {
log("malloc err");
return -1;
}
return i;
}
void free_client_source(struct io_data *p)
{
int i;
for (i = 0; i < MAX_CLIENTS; i++) {
if (p_clients[i] == p && p != NULL) {
close(p->fd);
free(p);
p_clients[i] = NULL;
break;
}
}
}
int main(int argc, char *aegv[]) {
server_fd = init_server();
signal(SIGINT, handle_signal);
p_clients = calloc(sizeof(struct io_data (*)[]), MAX_CLIENTS);
if (p_clients == NULL) {
log("calloc err");
close(server_fd);
return -1;
}
// 初始化 io_uring
io_uring_queue_init(QUEUE_DEPTH, &ring, 0);
int pos = get_cient_pos();
if (pos == -1) {
io_uring_queue_exit(&ring);
close(server_fd);
return -1;
}
p_clients[pos]->fd = server_fd;
add_accept(&ring, server_fd, p_clients[pos]);
log("server_fd: %d", server_fd);
while (1) {
struct io_uring_cqe *cqe;
io_uring_submit(&ring); // 提交 SQE
io_uring_wait_cqe(&ring, &cqe); // 等待完成队列的事件 -- 阻塞等待一个事件
(void)cqe;
//struct io_data *data = (struct io_data *)(cqe->user_data);
struct io_uring_cqe *cqes[QUEUE_DEPTH];
int nready = io_uring_peek_batch_cqe(&ring, cqes, QUEUE_DEPTH); // 非阻塞拿出所有的完成事件
int n;
for (n = 0; n < nready; n++) {
struct io_data *data = (struct io_data *)(cqes[n]->user_data);
log("res: %d, data->fd: %d", cqes[n]->res, data->fd);
if (cqes[n]->res < 0) {
log("Async operation failed: %s", strerror(-cqes[n]->res));
free_client_source(data);
io_uring_cqe_seen(&ring, cqes[n]);
continue;
}
if (data->fd == server_fd) {
// 处理新的客户端连接
log("New client connected");
pos = get_cient_pos();
if (pos == -1) continue;
p_clients[pos]->fd = cqes[n]->res; // 客户端文件描述符
p_clients[pos]->client_len = sizeof(p_clients[pos]->client_addr);
add_recv(&ring, p_clients[pos]); // 异步接收数据
add_accept(&ring, server_fd, data); // 准备处理下一个客户端
} else {
if (EV_READ == data->event) {
int nread = cqes[n]->res;
if (nread == 0) {
log("close client: %d", data->fd);
free_client_source(data);
} else {
log("recv client %d: %s", data->fd, data->buffer);
add_send(&ring, data, nread);
}
} else if (EV_SEND == data->event) {
int nsend = cqes[n]->res;
log("send: %d bytes", nsend);
add_recv(&ring, data);
}
}
}
//io_uring_cqe_seen(&ring, cqe); // 标记 CQE 为已处理 和 io_uring_wait_cqe 搭配使用
io_uring_cq_advance(&ring, nready); // 通知内核处理了 nready 个 CQE 和 io_uring_peek_batch_cqe 搭配使用
}
// 清理资源
io_uring_queue_exit(&ring);
close(server_fd);
return 0;
}
epool_server和io_uring server的性能对比
说明手写一个client测试程序创建20个线程,每个线程发送10万包数据,统计总时长。
结果显示io_uring 要好15%左右。
Sign in · GitLab