当前位置: 首页 > article >正文

2.5 io_uring

io_uring的相关函数接口介绍

`io_uring` 是 Linux 内核中一种高效的异步 I/O 接口,最早引入于 **Linux 内核 5.1** 版本。它是由 Jens Axboe 开发的,目的是提供更高效的异步 I/O 操作,尤其是相比 `epoll` 和 `aio`,`io_uring` 减少了系统调用的开销。

`io_uring` 相关的函数和系统调用主要包括以下几类:

### 1. **系统调用(syscalls)**

- **`io_uring_setup()`**: 创建并初始化一个 `io_uring` 实例。这个系统调用分配内存,并返回一个文件描述符,用于后续的 `io_uring` 操作。

- **`io_uring_enter()`**: 用于提交请求并等待完成。如果需要等待某些事件,可以通过此调用完成相应的操作。

- **`io_uring_register()`**: 用于注册缓冲区、文件描述符等信息,以便在之后的 I/O 操作中复用,减少系统调用开销。

### 2. **`liburing` 用户态库**

为了更方便的使用 `io_uring`,开发了 `liburing` 库,这个库对系统调用进行了一层封装。它的函数更易于使用,常用的函数有:

- **`io_uring_queue_init()`**: 初始化一个 `io_uring` 实例,封装了 `io_uring_setup()`。

- **`io_uring_queue_exit()`**: 清理和关闭 `io_uring` 实例。

- **`io_uring_get_sqe()`**: 获取一个用于提交 I/O 请求的 `submission queue entry (SQE)`。

- **`io_uring_submit()`**: 提交所有已经排队的 I/O 请求。

- **`io_uring_wait_cqe()`**: 等待一个 I/O 请求完成,并返回一个 `completion queue entry (CQE)`。

- **`io_uring_peek_cqe()`**: 检查是否有已完成的 I/O 请求,而不阻塞当前线程。

- **`io_uring_register_buffers()`**: 将缓冲区注册到 `io_uring` 实例,以便复用这些缓冲区。

- **`io_uring_register_files()`**: 注册文件描述符,使得后续 I/O 请求可以直接使用这些描述符。

- **`io_uring_prep_readv()`**: 准备一个向量化读取请求。

- **`io_uring_prep_writev()`**: 准备一个向量化写入请求。

- **`io_uring_prep_poll_add()`**: 准备一个 `poll` 事件,用于监视文件描述符的变化。

### 3. **从 Linux 5.1 到现在的演进**

虽然 `io_uring` 在 Linux 5.1 中首次引入,但它在后续的内核版本中得到了大量的改进和新功能的添加。每个新版本都可能引入一些新特性和优化。以下是一些关键的内核版本和新增的功能:

- **Linux 5.4**: 添加了超时处理和文件同步功能。

- **Linux 5.6**: 引入了支持 `splice()` 操作和批量处理功能,进一步提升了性能。

- **Linux 5.7**: 添加了对 I/O 超时 (`timeout`) 的支持。

- **Linux 5.9**: 增加了通知机制,可以减少系统调用开销。

- **Linux 5.11**: 引入了更丰富的 `poll` 和缓冲区管理功能。

随着每个新版本的发布,`io_uring` 提供的功能越来越丰富,适用的场景也越来越广泛。特别是在处理高并发和低延迟需求的应用场景中,`io_uring` 展现了其优势。

历史版本的时间点

以下是 `io_uring` 相关的关键 Linux 内核版本及其发布时间:

1. **Linux 5.1** - 引入 `io_uring`:

   - **发布时间:2019年5月5日**

   - `io_uring` 在这个版本中首次引入,提供了一个高效的异步 I/O 接口。

2. **Linux 5.4** - 添加超时处理和文件同步功能:

   - **发布时间:2019年11月24日**

   - 增加了对 `io_uring` 操作的超时处理,以及文件同步操作的支持。

3. **Linux 5.6** - 引入 `splice()` 操作和批量处理功能:

   - **发布时间:2020年3月29日**

   - 支持 `splice()` 操作,允许批量处理多个 I/O 请求,进一步提升性能。

4. **Linux 5.7** - 添加对 I/O 超时 (`timeout`) 的支持:

   - **发布时间:2020年5月31日**

   - 提供了 I/O 请求的超时支持,使得某些 I/O 操作可以在设定的时间内超时。

5. **Linux 5.9** - 增加通知机制,减少系统调用开销:

   - **发布时间:2020年10月11日**

   - 增加了 `io_uring` 的通知机制,减少了用户态和内核态之间的系统调用开销。

6. **Linux 5.11** - 引入更丰富的 `poll` 和缓冲区管理功能:

   - **发布时间:2021年2月14日**

   - 为 `io_uring` 引入了更高级的 `poll` 事件处理以及改进的缓冲区管理功能。

随着每个新版本的发布,`io_uring` 不断优化其性能和扩展功能,尤其适用于高性能、低延迟的 I/O 场景。

是否可移植到低内核版本

移植 `io_uring` 到较低版本的内核是有可能的,但相当复杂,涉及到修改和集成内核中大量的底层组件。以下是一些关键步骤和注意事项:

### 1. **代码依赖性和兼容性检查**

   - `io_uring` 引入了很多新机制和优化(如 ring buffer、异步 I/O 提交与完成队列等),依赖于较新内核中的底层基础设施。

   - 首先,你需要确保较低版本的内核是否有实现这些依赖组件,如果没有,你需要将它们一起移植。

     - 例如,`io_uring` 依赖于 `epoll`、`aio`、`splice` 等功能,并与内核调度、I/O 子系统、锁机制等深度结合。

### 2. **移植步骤**

   - **获取 `io_uring` 代码**:你可以从较新版本的内核(如 5.1 或之后的版本)中提取与 `io_uring` 相关的代码,通常位于 `fs/io_uring.c`。

   - **移植必要的依赖模块**:查看较低版本内核是否缺少对 `io_uring` 所需的依赖模块。如果缺少(例如 `aio` 改进、ring buffer 支持),你需要从新版本中一起移植。

   - **修改内核中的接口**:内核 API 在不同版本中可能有所变化,较低版本的内核中可能没有 `io_uring` 所需的一些 API。你需要根据新旧内核之间的 API 差异做适配。

     - 例如,`io_uring` 使用了一些高效的内存管理和调度机制,如果这些机制在低版本内核中不存在,可能需要将其相关的代码一并移植。

   - **编译和测试**:在代码移植完成后,你需要编译内核,验证移植的 `io_uring` 能否正确工作。通过用户态程序发起异步 I/O 请求,检查其行为是否与较新内核中的 `io_uring` 相同。

### 3. **移植的挑战**

   - **内核 API 差异**:随着内核的升级,内核 API 和子系统实现会发生较大的变化。在低版本内核中,很多 `io_uring` 所依赖的特性和优化可能不存在,这意味着你可能不仅需要移植 `io_uring`,还要移植这些相关功能。

   - **性能问题**:即使移植成功,性能可能无法达到较新内核中的水准,因为较低版本内核缺乏许多对 `io_uring` 优化过的底层支持。

   - **维护难度**:移植意味着要维护一套和内核不同步的代码,将来可能面临维护和更新的额外工作量。

### 4. **替代方案**

   如果完整移植 `io_uring` 过于复杂,可以考虑以下替代方案:

   - **升级内核**:如果可能,考虑将系统内核升级到 5.1 或更高版本,直接使用官方的 `io_uring` 支持。这是最简单和稳定的方式。

   - **使用现有的异步 I/O 接口**:如果不能升级内核,可以考虑使用较低版本内核支持的其他异步 I/O 接口,例如 `epoll` 或 `aio`,虽然性能上不如 `io_uring`,但移植和实现的难度要小得多。

### 结论

虽然理论上可以将 `io_uring` 移植到低版本内核,但难度相当大,特别是在 API 兼容性、依赖组件、性能优化等方面。如果没有非常强的需求,建议考虑升级内核或者使用其他异步 I/O 方案。如果你决定进行移植,需要对内核开发有深入的了解,并准备充分的时间进行测试和调试。

实例

一个tcp server 在监听 可以连接多个client,用io_uring的方式写个例子,把收到的消息异步打印 后回复给客户端

为了使用 `io_uring` 实现一个 TCP 服务器,可以通过异步方式接收和发送消息。本示例中,服务器将接收客户端发送的消息,异步打印,并回复相同的消息给客户端。我们将使用 `io_uring` 的 `accept`、`recv` 和 `send` 操作来处理多个客户端的连接。

### 示例代码说明

1. **服务器端**使用 `io_uring` 进行异步 `accept` 连接、接收消息 (`recv`),然后通过 `send` 异步回复。

2. **多客户端处理**:服务器可以同时处理多个客户端的连接。

3. **异步消息处理**:消息的接收和发送都是通过 `io_uring` 异步完成。

### 环境依赖

- `liburing`:你需要安装该库来使用 `io_uring`。可以通过以下命令在系统上安装:

  ```bash

  sudo apt-get install liburing-dev

  ```

### 示例代码

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <unistd.h>
#include <liburing.h>

/*
结合了 多方例程的优点
*/

#define PORT 8080
#define QUEUE_DEPTH 256
#define BUFFER_SIZE 1024
#define MAX_CLIENTS 1024 // client的最多数量

#define EV_ACCEPT 0
#define EV_READ  1
#define EV_SEND  2

#define log(cnt, ...) fprintf(stderr, "[%s][%d]: " cnt "\n", __func__, __LINE__, ##__VA_ARGS__);

struct io_data {
    int fd;
    char buffer[BUFFER_SIZE];
    struct sockaddr_in client_addr;
    socklen_t client_len;
	int event;
};

static struct io_data **p_clients = NULL;
static int server_fd;
static struct io_uring ring;

// 信号处理函数
void handle_signal(int sig) {
    if (sig == SIGINT) {
        log("Caught SIGINT (Ctrl+C)!, %d", sig);
        // 执行一些清理操作或退出程序
        if (server_fd > 0) close(server_fd);

        int i = 0;
        for (i = 0; i < MAX_CLIENTS; i++) {
            if (p_clients[i] != NULL) {
				close(p_clients[i]->fd);
                free(p_clients[i]);
                p_clients[i] = NULL;
            }
        }

        free(p_clients);
	    io_uring_queue_exit(&ring);
        exit(0);
    }
}

void add_accept(struct io_uring *ring, int server_fd, struct io_data *data) {
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    io_uring_prep_accept(sqe, server_fd, (struct sockaddr *)&data->client_addr, &data->client_len, 0);
	data->event = EV_ACCEPT;
    io_uring_sqe_set_data(sqe, data);
}

void add_recv(struct io_uring *ring, struct io_data *data) {
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
	memset(data->buffer, 0, sizeof(data->buffer));
    io_uring_prep_recv(sqe, data->fd, data->buffer, BUFFER_SIZE, 0);
	data->event = EV_READ;
    io_uring_sqe_set_data(sqe, data);
}

void add_send(struct io_uring *ring, struct io_data *data, int len) {
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    io_uring_prep_send(sqe, data->fd, data->buffer, len, 0);
	data->event = EV_SEND;
    io_uring_sqe_set_data(sqe, data);
}

int init_server()
{
    int server_fd, opt = 1;
    struct sockaddr_in server_addr;

    // 创建服务器 socket
    server_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (server_fd == 0) {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }

    // 设置 socket 选项
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
        perror("setsockopt failed");
        close(server_fd);
        exit(EXIT_FAILURE);
    }

    // 绑定地址和端口
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY;
    server_addr.sin_port = htons(PORT);

    if (bind(server_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
        perror("bind failed");
        close(server_fd);
        exit(EXIT_FAILURE);
    }

    // 开始监听
    if (listen(server_fd, 5) < 0) {
        perror("listen failed");
        close(server_fd);
        exit(EXIT_FAILURE);
    }	
	return server_fd;
}

int get_cient_pos()
{
	int i;
	for (i = 0; i < MAX_CLIENTS; i++) {
		if (p_clients[i] == NULL) break;
	}
	if (i == MAX_CLIENTS) {
		log("no client pos support");
		return -1;
	}
	p_clients[i] = malloc(sizeof(struct io_data));
	if (p_clients[i] == NULL) {
		log("malloc err");
		return -1;
	}
	return i;
}

void free_client_source(struct io_data *p)
{
	int i;
	for (i = 0; i < MAX_CLIENTS; i++) {
		if (p_clients[i] == p && p != NULL) {
			close(p->fd);
            free(p);
			p_clients[i] = NULL;
			break;
		}
	}	
}


int main(int argc, char *aegv[]) {
	
	server_fd = init_server();
	signal(SIGINT, handle_signal);

	p_clients = calloc(sizeof(struct io_data (*)[]), MAX_CLIENTS);
	if (p_clients == NULL) {
		log("calloc err");
		close(server_fd);
		return -1;
	}

    // 初始化 io_uring
    io_uring_queue_init(QUEUE_DEPTH, &ring, 0);

    int pos = get_cient_pos();
	if (pos == -1) {
		io_uring_queue_exit(&ring);
		close(server_fd);
		return -1;
	}
    p_clients[pos]->fd = server_fd;
    add_accept(&ring, server_fd, p_clients[pos]);
	log("server_fd: %d", server_fd);

    while (1) {
        struct io_uring_cqe *cqe;
        io_uring_submit(&ring);  // 提交 SQE

        io_uring_wait_cqe(&ring, &cqe); // 等待完成队列的事件 -- 阻塞等待一个事件
        (void)cqe;
        //struct io_data *data = (struct io_data *)(cqe->user_data);

		struct io_uring_cqe *cqes[QUEUE_DEPTH];
		int nready = io_uring_peek_batch_cqe(&ring, cqes, QUEUE_DEPTH); // 非阻塞拿出所有的完成事件
		int n;
		for (n = 0; n < nready; n++) {
			struct io_data *data = (struct io_data *)(cqes[n]->user_data);
			log("res: %d, data->fd: %d", cqes[n]->res, data->fd);
	        if (cqes[n]->res < 0) {
	            log("Async operation failed: %s", strerror(-cqes[n]->res));
	            free_client_source(data);
	            io_uring_cqe_seen(&ring, cqes[n]);
	            continue;
	        }

	        if (data->fd == server_fd) {
	            // 处理新的客户端连接
	            log("New client connected");
	            pos = get_cient_pos();
				if (pos == -1) continue;
	            p_clients[pos]->fd = cqes[n]->res;  // 客户端文件描述符
	            p_clients[pos]->client_len = sizeof(p_clients[pos]->client_addr);
	            add_recv(&ring, p_clients[pos]);  // 异步接收数据
	            add_accept(&ring, server_fd, data);  // 准备处理下一个客户端
	        } else {
				if (EV_READ == data->event) {
					int nread = cqes[n]->res;
					if (nread == 0) {
						log("close client: %d", data->fd);
						free_client_source(data);
					} else {
						log("recv client %d: %s", data->fd, data->buffer);
						add_send(&ring, data, nread);
					}
				} else if (EV_SEND == data->event) {
					int nsend = cqes[n]->res;
					log("send: %d bytes", nsend);
					add_recv(&ring, data);
				}
	        }
		}

        //io_uring_cqe_seen(&ring, cqe);  // 标记 CQE 为已处理 和 io_uring_wait_cqe 搭配使用
        io_uring_cq_advance(&ring, nready); // 通知内核处理了 nready 个 CQE 和 io_uring_peek_batch_cqe 搭配使用
    }

    // 清理资源
    io_uring_queue_exit(&ring);
    close(server_fd);
    return 0;
}

epool_server和io_uring server的性能对比

说明手写一个client测试程序创建20个线程,每个线程发送10万包数据,统计总时长。

结果显示io_uring 要好15%左右。


Sign in · GitLab


http://www.kler.cn/a/447417.html

相关文章:

  • 单调栈基础用法
  • Linux系统的阻塞方式和非阻塞方式是什么意思?
  • OpenHarmony 3.2 网卡获取ip地址缓慢分析
  • shutil 标准库: Python 文件操作的万用刀
  • 7-2 排序
  • WeakAuras NES Script(lua)
  • 黑马Java面试教程_P7_常见集合_P4_HashMap
  • homebrew,gem,cocoapod 换源,以及安装依赖
  • uniapp实现手写签名,并在app中将其转为base64格式的图片
  • springboot中的AOP以及面向切面编程思想
  • Vue.js前端框架教程8:Vue消息提示ElMessage和ElMessageBox
  • Win/Mac 如何实现测试 IP 和端口
  • ​在VMware虚拟机上设置Ubuntu与主机共享文件夹​
  • ubuntu 开机自动mount 的方法
  • 行情接入手册
  • 信息安全管理与评估赛题第6套
  • 【初阶数据结构与算法】八大排序算法之选择排序(直接选择排序、堆排)
  • 使用C#绘制具有平滑阴影颜色的曼德布洛特集分形
  • 国产操作系统openEuler22.09系统OpenStackYoga 部署指南
  • [笔记]关于Qt的nativeEvent事件无法接收window消息的Bug
  • 【从零开始入门unity游戏开发之——C#篇17】C#面向对象的封装——类(Class)和对象、成员变量和访问修饰符、成员方法
  • Liquibase结合SpringBoot使用实现数据库管理
  • 使用 mstsc 远程桌面连接时无法复制粘贴本地文件或文字解决方法
  • SAP PP ECN CSAP_MAT_BOM_MAINTAIN
  • run postinstall error, please remove node_modules before retry!
  • PyTorch实战-模拟线性函数和非线性函数