当前位置: 首页 > article >正文

解密高性能异步I/O:io_uring的魔力与应用

在 Linux 系统的发展历程中,I/O 操作的效率一直是影响系统性能的关键因素。从早期简单的阻塞式 I/O,到后来的非阻塞 I/O、I/O 多路复用,每一次的技术演进都在不断突破 I/O 性能的瓶颈。而 io_uring 的出现,更是为 Linux 异步 I/O 领域带来了一场革命。

它的高效性和创新性,使得开发者能够构建出性能更卓越的应用程序,无论是在高性能网络服务、数据库系统,还是大规模文件处理等场景中,io_uring 都展现出了巨大的潜力。接下来,让我们一起深入探索 io_uring 的实现原理与应用案例。

一、io_uring简介

1.1io_uring概述

io_uring 是一个Linux内核提供的高性能异步 I/O 框架,最初在 Linux 5.1 版本中引入。它的设计目标是解决传统的异步 I/O 模型(如epoll或者 POSIX AIO)在大规模 I/O 操作中效率不高的问题。

在传统的 Linux I/O 操作中,存在一些性能瓶颈。例如,系统调用的开销较大,同步 I/O 操作会导致线程在等待 I/O 完成时被阻塞,浪费了 CPU 资源。随着对高性能、高并发服务器和应用程序的需求不断增加,需要一种更高效的 I/O 处理机制。io_uring 应运而生,它是由 Jens Axboe 开发的,目的是为了解决这些传统 I/O 机制的效率问题。

*过往IO接口的缺陷

(1)同步IO接口

最原始的文件IO系统调用就是read,write。read系统调用从文件描述符所指代的打开文件中读取数据。write系统调用将数据写入一个已打开的文件中。在文件特定偏移处的IO是pread,pwrite。调用时可以指定位置进行文件IO操作,而非始于文件的当前偏移处,且他们不会改变文件的当前偏移量。

分散输入和集中输出(Scatter-Gather IO)是readv, writev,调用并非只对单个缓冲区进行读写操作,而是一次即可传输多个缓冲区的数据,免除了多次系统调用的开销,提高文件 I/O 的效率,特别是当需要读写多个连续或非连续的数据块时。

该机制使用一个数组iov定义了一组用来传输数据的缓冲区,一个整形数iovcnt指定iov的成员个数,其中,iov中的每个成员都是如下形式的数据结构。

struct iovec {
   void  *iov_base;    /* Starting address */
   size_t iov_len;     /* Number of bytes to transfer */
};

上述接口在读写IO时,系统调用会阻塞住等待,在数据读取或写入后才返回结果。同步导致的后果就是在阻塞的同时无法继续执行其他的操作,只能等待IO结果返回。存储场景中对性能的要求非常高,所以需要异步IO。

(2)异步IO接口:AIO

Linux 的异步 IO(AIO,Asynchronous I/O)是一种高级的文件 IO 模型,允许应用程序在发起 IO 操作后不必等待操作完成,而是可以继续执行其他任务。这与传统的同步 IO 模型不同,后者在 IO 操作完成之前会阻塞应用程序的执行。

1.2io_uring设计思路

(1)解决“系统调用开销大”的问题?

针对这个问题,考虑是否每次都需要系统调用。如果能将多次系统调用中的逻辑放到有限次数中来,就能将消耗降为常数时间复杂度。

(2)解决“拷贝开销大”的问题?

之所以在提交和完成事件中存在大量的内存拷贝,是因为应用程序和内核之间的通信需要拷贝数据,所以为了避免这个问题,需要重新考量应用与内核间的通信方式。我们发现,两者通信,不是必须要拷贝,通过现有技术,可以让应用与内核共享内存。

要实现核外与内核的零拷贝,最佳方式就是实现一块内存映射区域,两者共享一段内存,核外往这段内存写数据,然后通知内核使用这段内存数据,或者内核填写这段数据,核外使用这部分数据。因此,需要一对共享的ring buffer用于应用程序和内核之间的通信。

  • 一块用于核外传递数据给内核,一块是内核传递数据给核外,一方只读,一方只写。

  • 提交队列SQ(submission queue)中,应用是IO提交的生产者,内核是消费者。

  • 完成队列CQ(completion queue)中,内核是IO完成的生产者,应用是消费者。

  • 内核控制SQ ring的head和CQ ring的tail,应用程序控制SQ ring的tail和CQ ring的head

(3)解决“API不友好”的问题?

问题在于需要多个系统调用才能完成,考虑是否可以把多个系统调用合而为一。有时候,将多个类似的函数合并并通过参数区分不同的行为是更好的选择,而有时候可能需要将复杂的函数分解为更简单的部分来进行重构。

如果发现函数中的某一部分代码可以独立出来成为一个单独的函数,可以先进行这样的提炼,然后再考虑是否需要进一步使用参数化方法重构。

1.3与其他 I/O 模型的对比

阻塞 I/O:阻塞 I/O 是最基本的 I/O 模型,在这种模型下,当应用程序调用 I/O 操作(如 read、write)时,线程会被阻塞,直到 I/O 操作完成。例如,当读取一个文件时,线程会一直等待,直到数据从磁盘读取到内存中。这种模型的优点是简单直观,易于理解和实现,但缺点也很明显,在 I/O 操作期间,线程无法执行其他任务,这在高并发场景下会导致大量线程被阻塞,严重降低系统的性能和响应速度。例如,在一个同时处理多个客户端请求的服务器中,如果使用阻塞 I/O,每个请求都可能导致线程阻塞,当请求数量较多时,服务器将无法及时响应其他请求。

非阻塞 I/O:非阻塞 I/O 允许应用程序在 I/O 操作未完成时立即返回,线程不会被阻塞。应用程序可以通过轮询的方式检查 I/O 操作的状态,以确定是否完成。虽然这种模型避免了线程的阻塞,但频繁的轮询会消耗大量的 CPU 资源,并且在 I/O 操作较多时,管理和协调这些操作会变得非常复杂。以网络编程为例,在非阻塞 I/O 模式下,应用程序需要不断地检查 socket 是否有数据可读或可写,这会增加 CPU 的负担,降低系统的整体性能。

epoll:epoll 是一种 I/O 多路复用技术,它允许应用程序同时监控多个文件描述符的事件(如可读、可写、异常等)。当有事件发生时,epoll 会通知应用程序进行处理。epoll 在一定程度上提高了 I/O 操作的效率,特别是在高并发场景下,它通过减少系统调用的次数,降低了 CPU 的开销。然而,epoll 本质上还是同步 I/O,它只是提供了一种高效的事件通知机制,应用程序在处理 I/O 事件时,仍然需要进行实际的 I/O 操作,这可能会导致线程阻塞。比如,在一个使用 epoll 的网络服务器中,当有新的连接请求或数据到达时,epoll 会通知应用程序,但应用程序在读取或写入数据时,仍然可能会因为 I/O 操作的延迟而阻塞线程。

传统 AIO:传统 AIO 虽然提供了异步 I/O 的功能,但存在诸多限制。如前文所述,它只能在 Direct I/O 模式下使用,无法利用页缓存,这使得数据读写的效率受到影响。此外,传统 AIO 在 I/O 提交时可能会出现阻塞,导致其异步性并不完全可靠。在实际应用中,由于这些限制,传统 AIO 的使用场景相对较窄,难以满足大多数应用程序对高效 I/O 的需求。

相比之下,io_uring 具有明显的优势。它通过用户态和内核态共享提交队列(Submission Queue)和完成队列(Completion Queue) ,减少了系统调用的次数和上下文切换的开销。在 io_uring 中,应用程序只需将 I/O 请求放入提交队列,内核会在后台处理这些请求,并将结果放入完成队列,应用程序可以随时从完成队列中获取结果,无需频繁进行系统调用和轮询。此外,io_uring 支持更多的异步系统调用,不仅适用于存储文件的 I/O 操作,还能很好地应用于网络套接字(network sockets)的 I/O 处理,具有更广泛的适用性和更高的灵活性。

二、io_uring的实现原理

io_uring实现异步I/O的方式其实是一个生产者-消费者模型:

  • 用户进程生产I/O请求,放入提交队列(Submission Queue,简称SQ)。

  • 内核消费SQ中的I/O请求,完成后将结果放入完成队列(Completion Queue,简称CQ)。

  • 用户进程从CQ中收割I/O结果。

SQ和CQ是内核初始化io_uring实例的时候创建的。为了减少系统调用和减少用户进程与内核之间的数据拷贝,io_uring使用mmap的方式让用户进程和内核共享SQ和CQ的内存空间。

另外,由于先提交的I/O请求不一定先完成,SQ保存的其实是一个数组索引(数据类型 uint32),真正的SQE(Submission Queue Entry)保存在一个独立的数组(SQ Array)。所以要提交一个I/O请求,得先在SQ Array中找到一个空闲的SQE,设置好之后,将其数组索引放到SQ中。

用户进程、内核、SQ、CQ和SQ Array之间的基本关系如下:

图片

2.1核心组件解析

提交队列(SQ)与提交队列项(SQE):提交队列(Submission Queue,简称 SQ)是 io_uring 中用于存储 I/O 请求的队列,它是一个环形缓冲区,位于用户态和内核态共享的内存区域。每个 I/O 请求在提交队列中都以提交队列项(Submission Queue Entry,简称 SQE)的形式存在。SQE 是一个结构体,它存储了 I/O 请求的详细信息,包括操作类型(如读、写、异步连接等)、目标文件描述符、缓冲区地址、操作长度、偏移量等关键信息。

例如,在进行文件读取操作时,SQE 会记录要读取的文件描述符、读取数据的缓冲区地址、读取的字节数以及文件中的偏移量等信息。应用程序通过填充 SQE 结构体,并将其添加到 SQ 中,来向内核提交 I/O 请求。由于 SQ 是环形缓冲区,当队列满时,新的请求会覆盖旧的请求,从而保证 I/O 请求的持续提交。

完成队列(CQ)与完成队列项(CQE):完成队列(Completion Queue,简称 CQ)同样是一个环形缓冲区,用于存储 I/O 请求的完成结果。当内核完成一个 I/O 操作后,会将操作的结果封装成一个完成队列项(Completion Queue Entry,简称 CQE),并将其放入 CQ 中。CQE 结构体包含了 I/O 操作的返回值、状态码、用户自定义数据等信息。

通过这些信息,应用程序可以判断 I/O 操作是否成功,并获取操作的相关结果。比如,在文件读取操作完成后,CQE 中的返回值会表示实际读取的字节数,状态码则用于指示操作是否成功,若操作失败,状态码会包含具体的错误信息。应用程序可以通过轮询 CQ 或者等待特定的事件通知,来获取完成的 I/O 请求结果,从而进行后续的处理。

SQ Ring 与 CQ Ring:SQ Ring 和 CQ Ring 分别是提交队列和完成队列的环形缓冲区结构。它们包含了队列本身(即 SQ 和 CQ)、头部索引(head)、尾部索引(tail)以及队列大小等关键信息。头部索引(head)指向队列中第一个待处理的元素,而尾部索引(tail)则指向队列中下一个空闲的位置。当应用程序向 SQ 提交 I/O 请求时,它会将请求信息填充到 tail 指向的 SQE 中,然后将 tail 指针递增,指向下一个空闲位置。

内核在处理 I/O 请求时,会从 head 指向的 SQE 中获取请求信息,处理完成后,将结果放入 CQ 中。同样,CQ Ring 通过 head 和 tail 指针来管理完成队列,内核将完成的 I/O 结果放入 tail 指向的 CQE 中,并递增 tail 指针,应用程序则从 head 指向的 CQE 中获取结果。这种环形缓冲区结构以及基于 head 和 tail 指针的操作方式,实现了用户态和内核态之间高效的数据交换,减少了锁的使用和上下文切换的开销,从而大大提高了 I/O 操作的效率。

2.2系统调用详解

io_uring的实现仅仅使用了三个syscall:io_uring_setup, io_uring_enter和io_uring_register。

这几个系统调用接口都在io_uring.c文件中:

⑴io_uring_setup

io_uring_setup 是用于初始化 io_uring 环境的系统调用。在使用 io_uring 进行异步 I/O 操作之前,首先需要调用 io_uring_setup 来创建一个 io_uring 实例。它接受两个参数,第一个参数是期望的提交队列(SQ)的大小,即队列中可以容纳的 I/O 请求数量;第二个参数是一个指向 io_uring_params 结构体的指针,该结构体用于返回 io_uring 实例的相关参数,如实际分配的 SQ 和完成队列(CQ)的大小、队列的偏移量等信息。

在调用 io_uring_setup 时,内核会为 io_uring 实例分配所需的内存空间,包括 SQ、CQ 以及相关的控制结构。同时,内核还会创建一些内部数据结构,用于管理和调度 I/O 请求。如果初始化成功,io_uring_setup 会返回一个文件描述符,这个文件描述符用于标识创建的 io_uring 实例,后续的 io_uring 系统调用(如 io_uring_enter、io_uring_register)将通过这个文件描述符来操作该 io_uring 实例。若初始化失败,函数将返回一个负数,表示相应的错误代码。

io_uring_setup():

SYSCALL_DEFINE2(io_uring_setup, u32, entries,
                struct io_uring_params __user *, params)                                                                                                                                                           
{
        return io_uring_setup(entries, params);
}
  • 功能:用于初始化和配置 io_uring 。

  • 应用用途:在使用 io_uring 之前,首先需要调用此接口初始化一个 io_uring 环,并设置其参数。

⑵io_uring_enter

io_uring_enter 是用于提交和等待 I/O 操作的系统调用。它的主要作用是将应用程序准备好的 I/O 请求提交给内核,并可以选择等待这些操作完成。io_uring_enter 接受多个参数,其中包括 io_uring_setup 返回的文件描述符,用于指定要操作的 io_uring 实例;to_submit 参数表示要提交的 I/O 请求的数量,即从提交队列(SQ)中取出并提交给内核的 SQE 的数量;min_complete 参数指定了内核在返回之前必须等待完成的 I/O 操作的最小数量;flags 参数则用于控制 io_uring_enter 的行为,例如可以设置是否等待 I/O 操作完成、是否获取完成的 I/O 事件等。当调用 io_uring_enter 时,如果 to_submit 参数大于 0,内核会从 SQ 中取出相应数量的 SQE,并将这些 I/O 请求提交到内核中进行处理。

同时,如果设置了等待 I/O 操作完成的标志,内核会阻塞等待,直到至少有 min_complete 个 I/O 操作完成,然后将这些完成的操作结果放入完成队列(CQ)中。应用程序可以通过检查 CQ 来获取这些完成的 I/O 请求的结果。通过 io_uring_enter,应用程序可以灵活地控制 I/O 请求的提交和等待策略,提高 I/O 操作的效率和灵活性。

io_uring_enter():

SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,                                                                                                                                                  
                u32, min_complete, u32, flags, const void __user *, argp,
                size_t, argsz)
  • 功能:用于提交和处理异步 I/O 操作。

  • 应用用途:在向 io_uring 环中提交 I/O 操作后,通过调用此接口触发内核处理这些操作,并获取完成的操作结果。

⑶io_uring_register

io_uring_register 用于注册文件描述符或事件文件描述符到 io_uring 实例中,以便在后续的 I/O 操作中使用。它接受四个参数,第一个参数是 io_uring_setup 返回的文件描述符,用于指定要注册到的 io_uring 实例;第二个参数 opcode 表示注册的类型,例如可以是 IORING_REGISTER_FILES(注册文件描述符集合)、IORING_REGISTER_BUFFERS(注册内存缓冲区)、IORING_REGISTER_EVENTFD(注册 eventfd 用于通知完成事件)等;

第三个参数 arg 是一个指针,根据 opcode 的类型不同,它指向不同的内容,如注册文件描述符时,arg 指向一个包含文件描述符的数组;注册缓冲区时,arg 指向一个描述缓冲区的结构体数组;第四个参数 nr_args 表示 arg 所指向的数组的长度。通过 io_uring_register 注册文件描述符或缓冲区等资源后,内核在处理 I/O 请求时,可以直接访问这些预先注册的资源,而无需每次都重新设置相关信息,从而提高了 I/O 操作的效率。例如,在进行大量文件读写操作时,预先注册文件描述符可以避免每次提交 I/O 请求时都进行文件描述符的查找和验证,减少了系统开销,提升了 I/O 性能。

io_uring_register():

SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
                void __user *, arg, unsigned int, nr_args)
  • 功能:用于注册文件描述符、缓冲区、事件文件描述符等资源到 io_uring 环中。

  • 应用用途:在进行 I/O 操作之前,需要将相关的资源注册到 io_uring 环中,以便进行后续的异步 I/O 操作。

2.3工作流程深度剖析

①创建 io_uring 对象

使用 io_uring 进行异步 I/O 操作的第一步是创建 io_uring 对象。内核提供了io_uring_setup系统调用来初始化一个io_uring实例,创建SQ、CQ和SQ Array,entries参数表示的是SQ和SQArray的大小,CQ的大小默认是2 * entries。params参数既是输入参数,也是输出参数。

该函数返回一个file descriptor,并将io_uring支持的功能、以及各个数据结构在fd中的偏移量存入params。用户根据偏移量将fd通过mmap内存映射得到一块内核用户共享的内存区域。这块内存区域中,有io_uring的上下文信息:SQ信息、CQ信息和SQ Array信息。

int io_uring_setup(int entries, struct io_uring_params *params);

这通过调用 io_uring_setup 系统调用来完成。在调用 io_uring_setup 时,用户需要指定提交队列(SQ)的大小,即期望的 I/O 请求队列长度。内核会根据这个请求,为 io_uring 对象分配必要的内存空间,包括提交队列(SQ)、完成队列(CQ)以及相关的控制结构。内核会创建一个 io_ring_ctx 结构体对象,用于管理 io_uring 的上下文信息。

同时,还会创建一个 io_urings 结构体对象,该对象包含了 SQ 和 CQ 的具体实现,如队列的头部索引(head)、尾部索引(tail)、队列大小等信息。在创建过程中,内核会初始化 SQ 和 CQ 的所有队列项(SQE 和 CQE),并设置好相关的指针和标志位。如果用户在调用 io_uring_setup 时设置了 IORING_SETUP_SQPOLL 标志位,内核还会创建一个 SQ 线程,用于从 SQ 队列中获取 I/O 请求并提交给内核处理。

创建完成后,io_uring_setup 会返回一个文件描述符,这个文件描述符是后续操作 io_uring 对象的关键标识,通过它可以进行 I/O 请求的提交、注册文件描述符等操作。

②准备 I/O 请求

在创建 io_uring 对象后,需要准备具体的 I/O 请求。这通常通过 io_uring_prep_XXX 系列函数来完成,这些函数用于准备不同类型的 I/O 请求,如 io_uring_prep_read 用于准备读取操作,io_uring_prep_write 用于准备写入操作,io_uring_prep_accept 用于准备异步接受连接操作等。

以 io_uring_prep_read 为例,它接受多个参数,包括指向提交队列项(SQE)的指针、目标文件描述符、读取数据的缓冲区地址、读取的字节数以及文件中的偏移量等。函数会根据这些参数,将 I/O 请求的相关信息填充到 SQE 结构体中,包括设置操作类型(如 IORING_OP_READ)、目标文件描述符、缓冲区地址、数据长度、偏移量等字段。

除了基本的 I/O 操作参数外,还可以设置一些额外的标志位和选项,如 I/O 操作的优先级、是否使用直接 I/O 等,以满足不同的应用需求。通过这些函数,应用程序可以灵活地构建各种类型的 I/O 请求,并将其准备好以便提交到内核中进行处理。

③提交 I/O 请求

当 I/O 请求准备好后,需要将其提交到内核中执行。这通过调用 io_uring_submit 函数(内部调用 io_uring_enter 系统调用)来实现。在提交 I/O 请求时,首先应用程序会将准备好的 SQE 添加到提交队列(SQ)中。SQ 是一个环形缓冲区,应用程序通过操作 SQ Ring 中的 tail 指针来将 SQE 放入队列。具体来说,应用程序会将 tail 指向的 SQE 填充为准备好的 I/O 请求信息,然后将 tail 指针递增,指向下一个空闲的 SQE 位置。在填充 SQE 时,需要注意按照 SQE 结构体的定义,正确设置各项字段,确保 I/O 请求的信息准确无误。

默认情况下,使用 io_uring 提交 I/O 请求需要:

  • 从SQ Arrary中找到一个空闲的SQE;

  • 根据具体的I/O请求设置该SQE;

  • 将SQE的数组索引放到SQ中;

  • 调用系统调用io_uring_enter提交SQ中的I/O请求。

图片

当所有要提交的 I/O 请求都添加到 SQ 中后,调用 io_uring_submit 函数,该函数会触发 io_uring_enter 系统调用,将 SQ 中的 I/O 请求提交给内核。内核接收到请求后,会从 SQ 中获取 SQE,并根据 SQE 中的信息执行相应的 I/O 操作。在这个过程中,由于 SQ 是用户态和内核态共享的内存区域,避免了数据的多次拷贝和额外的系统调用开销,提高了 I/O 请求提交的效率。

④等待 IO 请求完成

提交 I/O 请求后,应用程序可以选择等待请求完成。等待 I/O 请求完成有两种主要方式。一种是使用 io_uring_wait_cqe 函数,该函数会阻塞调用线程,直到至少有一个 I/O 请求完成,并返回完成的完成队列项(CQE)。当调用 io_uring_wait_cqe 时,它会检查完成队列(CQ)中是否有新完成的 I/O 请求。如果没有,线程会进入阻塞状态,直到内核将完成的 I/O 请求结果放入 CQ 中。一旦有新的 CQE 可用,io_uring_wait_cqe 会返回该 CQE,应用程序可以通过 CQE 获取 I/O 操作的结果。

另一种方式是使用 io_uring_peek_batch_cqe 函数,它是非阻塞的,用于检查 CQ 中是否有已经完成的 I/O 请求。如果有,它会返回已完成的 CQE 列表,应用程序可以根据返回的 CQE 进行相应的处理;如果没有完成的请求,函数会立即返回,应用程序可以继续执行其他任务,然后在适当的时候再次调用该函数检查 CQ。这两种方式为应用程序提供了灵活的等待策略,使其可以根据自身的业务需求和性能要求,选择合适的方式来处理 I/O 请求的完成事件。

⑤获取 IO 请求结果

当 I/O 请求完成后,应用程序需要从完成队列(CQ)中获取结果。这可以通过 io_uring_peek_cqe 函数来实现。io_uring_peek_cqe 函数用于从 CQ 中获取一个完成的 CQE,而不将其从队列中移除。应用程序获取到 CQE 后,可以根据 CQE 中的信息来处理完成的 I/O 请求。CQE 中包含了丰富的信息,如 I/O 操作的返回值、状态码、用户自定义数据等。例如,对于文件读取操作,CQE 中的返回值表示实际读取的字节数,状态码用于指示操作是否成功,若操作失败,状态码会包含具体的错误信息。

应用程序可以根据这些信息进行相应的处理,如读取数据并进行后续的业务逻辑处理,或者在操作失败时进行错误处理,如记录错误日志、重新尝试 I/O 操作等。在获取 CQE 后,应用程序通常会根据 I/O 操作的类型和结果,执行相应的业务逻辑,以实现应用程序的功能需求。

⑥释放 IO 请求结果

在获取并处理完 IO 请求结果后,需要释放该结果,以便内核可以继续使用完成队列(CQ)。这通过调用 io_uring_cqe_seen 函数来实现。io_uring_cqe_seen 函数的作用是标记一个完成的 CQE 已经被处理,它会将 CQ Ring 中的 head 指针递增,指向下一个未处理的 CQE。通过这种方式,内核可以知道哪些 CQE 已经被应用程序处理,从而可以继续向 CQ 中放入新的完成结果。

在释放 IO 请求结果时,需要注意确保已经完成了对 CQE 中信息的处理,避免在释放后再次访问已释放的 CQE。同时,及时释放 CQE 也有助于提高系统的性能和资源利用率,避免 CQ 队列被占用过多而影响后续 I/O 请求结果的存储和处理。通过正确地释放 IO 请求结果,保证了 io_uring 的工作流程能够持续高效地运行,为应用程序提供稳定的异步 I/O 服务。

三、io_uring案例分析

3.1简单文件读写案例

⑴代码实现

#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <string.h>
#include <unistd.h>
#include <sys/ioctl.h>
#include <linux/io_uring.h>

int main() {
    struct io_uring ring;
    struct io_uring_sqe *sqe;
    struct io_uring_cqe *cqe;
    int fd, ret;
    // 打开文件
    fd = open("example.txt", O_RDONLY);
    if (fd < 0) {
        perror("Failed to open file");
        return 1;
    }
    // 初始化io_uring
    io_uring_queue_init(8, &ring, 0);
    // 获取一个提交队列条目
    sqe = io_uring_get_sqe(&ring);
    if (!sqe) {
        fprintf(stderr, "Could not get sqe\n");
        return 1;
    }
    // 准备异步读操作
    char *buf = malloc(1024);
    io_uring_prep_read(sqe, fd, buf, 1024, 0);
    // 提交请求
    io_uring_submit(&ring);
    // 等待完成
    ret = io_uring_wait_cqe(&ring, &cqe);
    if (ret < 0) {
        perror("io_uring_wait_cqe");
        return 1;
    }
    // 检查结果
    if (cqe->res < 0) {
        fprintf(stderr, "Async read failed: %s\n", strerror(-cqe->res));
    } else {
        printf("Read %d bytes: %s\n", cqe->res, buf);
    }
    // 释放资源
    io_uring_cqe_seen(&ring, cqe);
    io_uring_queue_exit(&ring);
    close(fd);
    free(buf);
    return 0;
}

代码解读

文件打开:fd = open("example.txt", O_RDONLY); 这行代码使用 open 函数打开名为 example.txt 的文件,以只读模式(O_RDONLY)打开。如果打开失败,open 函数会返回一个负数,并通过 perror 函数打印错误信息,然后程序返回错误代码 1。

io_uring 初始化:io_uring_queue_init(8, &ring, 0); 这行代码用于初始化 io_uring 实例。其中,第一个参数 8 表示提交队列(SQ)和完成队列(CQ)的大小,即队列中可以容纳的 I/O 请求数量;第二个参数 &ring 是指向 io_uring 结构体的指针,用于存储初始化后的 io_uring 实例;第三个参数 0 表示使用默认的初始化标志。

获取提交队列条目:sqe = io_uring_get_sqe(&ring); 从 io_uring 的提交队列中获取一个提交队列项(SQE)。如果获取失败,io_uring_get_sqe 函数会返回 NULL,程序会打印错误信息并返回错误代码 1。

准备异步读操作:

char *buf = malloc(1024); //分配 1024 字节的内存空间,用于存储读取的文件数据。

io_uring_prep_read(sqe, fd, buf, 1024, 0); 使用 io_uring_prep_read 函数准备一个异步读操作。它接受五个参数,第一个参数 sqe 是之前获取的提交队列项;第二个参数 fd 是要读取的文件描述符;第三个参数 buf 是用于存储读取数据的缓冲区;第四个参数 1024 表示要读取的字节数;第五个参数 0 表示从文件的起始位置开始读取。

提交请求:io_uring_submit(&ring); 将准备好的 I/O 请求提交到内核中执行。这个函数会触发 io_uring_enter 系统调用,将提交队列中的请求提交给内核。

等待完成:ret = io_uring_wait_cqe(&ring, &cqe); 等待 I/O 操作完成。这个函数会阻塞调用线程,直到至少有一个 I/O 请求完成,并返回完成的完成队列项(CQE)。如果等待过程中出现错误,io_uring_wait_cqe 函数会返回一个负数,程序会通过 perror 函数打印错误信息并返回错误代码 1。

检查结果:

if (cqe->res < 0) 检查 I/O 操作的结果。如果 cqe->res 小于 0,表示操作失败,通过 fprintf 函数打印错误信息。

else 分支表示操作成功,打印实际读取的字节数和读取到的数据。

释放资源:

io_uring_cqe_seen(&ring, cqe); /* 知内核已经处理完一个完成事件,
           释放相关资源。这通过将完成队列的头部指针递增来实现,以便内核可以继续使用完成队列。*/

io_uring_queue_exit(&ring); 释放 io_uring 实例所占用的资源,包括提交队列和完成队列等。

close(fd); 关闭之前打开的文件。

free(buf); 释放之前分配的内存缓冲区。

3.2网络编程案例(TCP 服务器)

⑴代码实现

#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <liburing.h>

#define ENTRIES_LENGTH 4096
#define MAX_CONNECTIONS 1024
#define BUFFER_LENGTH 1024

char buf_table[MAX_CONNECTIONS][BUFFER_LENGTH] = {0};

enum {
    READ,
    WRITE,
    ACCEPT,
};

struct conninfo {
    int connfd;
    int type;
};

void set_read_event(struct io_uring *ring, int fd, void *buf, size_t len, int flags) {
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    io_uring_prep_recv(sqe, fd, buf, len, flags);
    struct conninfo ci = {.connfd = fd,.type = READ};
    memcpy(&sqe->user_data, &ci, sizeof(struct conninfo));
}

void set_write_event(struct io_uring *ring, int fd, const void *buf, size_t len, int flags) {
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    io_uring_prep_send(sqe, fd, buf, len, flags);
    struct conninfo ci = {.connfd = fd,.type = WRITE};
    memcpy(&sqe->user_data, &ci, sizeof(struct conninfo));
}

void set_accept_event(struct io_uring *ring, int fd, struct sockaddr *cliaddr, socklen_t *clilen, unsigned flags) {
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    io_uring_prep_accept(sqe, fd, cliaddr, clilen, flags);
    struct conninfo ci = {.connfd = fd,.type = ACCEPT};
    memcpy(&sqe->user_data, &ci, sizeof(struct conninfo));
}

int main() {
    int listenfd = socket(AF_INET, SOCK_STREAM, 0);
    if (listenfd == -1) return -1;
    struct sockaddr_in servaddr, clientaddr;
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(9999);
    if (-1 == bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr))) {
        return -2;
    }
    listen(listenfd, 10);
    struct io_uring_params params;
    memset(&params, 0, sizeof(params));
    struct io_uring ring;
    memset(&ring, 0, sizeof(ring));
    /*初始化params 和 ring*/
    io_uring_queue_init_params(ENTRIES_LENGTH, &ring, &params);
    socklen_t clilen = sizeof(clientaddr);
    set_accept_event(&ring, listenfd, (struct sockaddr *)&clientaddr, &clilen, 0);
    while (1) {
        struct io_uring_cqe *cqe;
        io_uring_submit(&ring);
        int ret = io_uring_wait_cqe(&ring, &cqe);
        struct io_uring_cqe *cqes[10];
        int cqecount = io_uring_peek_batch_cqe(&ring, cqes, 10);
        unsigned count = 0;
        for (int i = 0; i < cqecount; i++) {
            cqe = cqes[i];
            count++;
            struct conninfo ci;
            memcpy(&ci, &cqe->user_data, sizeof(ci));
            if (ci.type == ACCEPT) {
                int connfd = cqe->res;
                char *buffer = buf_table[connfd];
                set_read_event(&ring, connfd, buffer, 1024, 0);
                set_accept_event(&ring, listenfd, (struct sockaddr *)&clientaddr, &clilen, 0);
            } else if (ci.type == READ) {
                int bytes_read = cqe->res;
                if (bytes_read == 0) {
                    close(ci.connfd);
                } else if (bytes_read < 0) {
                    close(ci.connfd);
                    printf("client %d disconnected!\n", ci.connfd);
                } else {
                    char *buffer = buf_table[ci.connfd];
                    set_write_event(&ring, ci.connfd, buffer, bytes_read, 0);
                }
            } else if (ci.type == WRITE) {
                char *buffer = buf_table[ci.connfd];
                set_read_event(&ring, ci.connfd, buffer, 1024, 0);
            }
        }
        io_uring_cq_advance(&ring, count);
    }
    return 0;
}

⑵代码解读

创建监听套接字:int listenfd = socket(AF_INET, SOCK_STREAM, 0); 使用 socket 函数创建一个 TCP 套接字,AF_INET 表示使用 IPv4 协议,SOCK_STREAM 表示使用流式套接字(即 TCP 协议),0 表示默认协议。如果创建失败,socket 函数会返回 -1,程序返回 -1。

绑定地址和端口:

填充服务器地址结构体 servaddr,包括地址族(AF_INET)、IP 地址(INADDR_ANY 表示绑定到所有可用的网络接口)和端口号(9999)。

if (-1 == bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr))) 使用 bind 函数将创建的套接字绑定到指定的地址和端口。如果绑定失败,bind 函数返回 -1,程序返回 -2。

监听连接:listen(listenfd, 10); 使用 listen 函数开始监听套接字,第二个参数 10 表示最大连接数,即允许同时存在的未处理连接请求的最大数量。

初始化 io_uring:

struct io_uring_params params; 和 struct io_uring ring; 分别定义了 io_uring 的参数结构体和实例结构体。
memset(&params, 0, sizeof(params)); 和 memset(&ring, 0, sizeof(ring)); 初始化这两个结构体的内容为 0。

io_uring_queue_init_params(ENTRIES_LENGTH, &ring, &params); 使用 io_uring_queue_init_params 函数初始化 io_uring 实例,ENTRIES_LENGTH 表示提交队列和完成队列的大小,&ring 是指向 io_uring 实例的指针,&params 是指向参数结构体的指针。

设置接受连接事件:set_accept_event(&ring, listenfd, (struct sockaddr *)&clientaddr, &clilen, 0); 调用 set_accept_event 函数设置一个接受连接的异步事件。在这个函数中,首先获取一个提交队列项(SQE),然后使用 io_uring_prep_accept 函数准备接受连接的请求,将相关信息(如监听套接字、客户端地址、地址长度等)填充到 SQE 中,并将自定义的连接信息结构体 conninfo 复制到 SQE 的用户数据区域,用于标识该请求的类型和相关连接信息。

事件循环处理:

  • while (1) 进入一个无限循环,用于持续处理 I/O 事件。

  • io_uring_submit(&ring); 提交准备好的 I/O 请求到内核。

  • int ret = io_uring_wait_cqe(&ring, &cqe); 等待 I/O 操作完成,获取完成的完成队列项(CQE)。

  • struct io_uring_cqe *cqes[10]; 和 int cqecount = io_uring_peek_batch_cqe(&ring, cqes, 10); 使用 io_uring_peek_batch_cqe 函数尝试批量获取完成的 CQE,最多获取 10 个。

遍历获取到的 CQE:

struct conninfo ci; 和 memcpy(&ci, &cqe->user_data, sizeof(ci)); 从 CQE 的用户数据区域复制之前设置的连接信息结构体 conninfo。

根据连接信息中的类型(ci.type)进行不同的处理:

如果是 ACCEPT 类型,表示有新的连接请求被接受。获取新的连接描述符 connfd,设置读取事件,准备从新连接中读取数据,并再次设置接受连接事件,以便继续接受新的连接请求。
如果是 READ 类型,表示有数据可读。根据读取的字节数进行处理,如果读取到的字节数为 0,表示客户端断开连接,关闭连接;如果读取失败(字节数小于 0),也关闭连接并打印断开连接的信息;如果读取成功,设置写入事件,将读取到的数据回显给客户端。
如果是 WRITE 类型,表示数据写入完成,设置读取事件,准备从客户端读取下一次的数据。

io_uring_cq_advance(&ring, count); 告知内核已经处理完 count 个完成事件,通过将完成队列的头部指针递增 count 个位置,以便内核可以继续使用完成队列。

3.3性能对比测试

⑴测试环境与方法

测试环境搭建:在一台配备 Intel (R) Xeon (R) CPU E5 - 2682 v4 @ 2.50GHz 处理器、16GB 内存、运行 Linux 5.10 内核的服务器上进行测试。使用的存储设备为 NVMe SSD,以确保 I/O 性能不受磁盘性能的过多限制。测试机器的网络配置为千兆以太网,以保证网络传输的稳定性。

⑵测试方法设计:

针对文件读写场景,使用 fio 工具进行测试。分别设置不同的 I/O 模式,包括阻塞 I/O、非阻塞 I/O、epoll 以及 io_uring。对于每种模式,进行多次测试,每次测试设置不同的文件大小(如 1MB、10MB、100MB)和 I/O 操作类型(如随机读、顺序读、随机写、顺序写)。在每次测试中,fio 工具会按照设定的参数进行 I/O 操作,并记录操作的时间、吞吐量等性能指标。例如,在随机读测试中,fio 会随机读取文件中的数据块,并统计单位时间内读取的数据量。

在网络编程场景下,搭建一个简单的 echo 服务器模型,分别使用 epoll 和 io_uring 实现。客户端通过多线程模拟大量并发连接,向服务器发送数据并接收服务器回显的数据。在测试过程中,逐渐增加并发连接数,从 100 个连接开始,每次增加 100 个,直到达到 1000 个连接。使用 iperf 等工具测量不同并发连接数下的 QPS(每秒查询率)、延迟等性能指标。iperf 工具会在客户端和服务器之间建立 TCP 连接,发送一定量的数据,并记录数据传输的速率、延迟等信息。

⑶测试结果分析

文件读写性能:在小文件(1MB)读写测试中,阻塞 I/O 由于线程阻塞等待 I/O 操作完成,导致其吞吐量最低,平均吞吐量约为 50MB/s。非阻塞 I/O 虽然避免了线程阻塞,但频繁的轮询使得 CPU 利用率较高,且由于 I/O 操作的碎片化,其吞吐量也不高,平均约为 80MB/s。epoll 在处理多个文件描述符的 I/O 事件时,通过高效的事件通知机制,提高了 I/O 操作的效率,平均吞吐量达到 120MB/s。

四、io_uring的应用场景及未来发展

4.1适用场景探讨

数据库系统:在数据库系统中,大量的数据读写操作对 I/O 性能要求极高。io_uring 的高效异步 I/O 特性能够显著提升数据库的性能。以关系型数据库 MySQL 为例,在处理大量并发查询和更新操作时,传统的 I/O 模型会导致线程频繁阻塞和上下文切换,从而降低系统的响应速度。而使用 io_uring,MySQL 可以将 I/O 请求异步提交到内核,内核在后台处理这些请求,当请求完成时,通过完成队列通知 MySQL。这样,MySQL 的线程在 I/O 操作期间可以继续执行其他任务,如查询优化、事务处理等,大大提高了系统的并发处理能力。

同时,io_uring 支持直接 I/O 模式,这对于数据库系统来说非常重要,因为数据库通常需要直接访问存储设备以提高数据读写的效率,避免了操作系统页缓存带来的额外开销。此外,io_uring 的批量提交和处理能力,使得数据库在进行大规模数据导入、导出等操作时,能够一次性提交多个 I/O 请求,减少系统调用次数,进一步提升了 I/O 性能。

网络服务器:在网络服务器领域,io_uring 同样展现出了巨大的优势。以 Nginx 服务器为例,传统的基于 epoll 的 I/O 模型在处理高并发连接时,虽然通过事件驱动机制提高了 I/O 的效率,但在 I/O 操作过程中,仍然存在一定的上下文切换开销。而 io_uring 通过用户态和内核态共享的提交队列和完成队列,减少了系统调用和上下文切换的次数,使得 Nginx 在处理大量并发连接时,能够更加高效地进行数据的读写操作。

例如,当有大量客户端同时请求 Nginx 服务器时,Nginx 可以使用 io_uring 将这些请求的 I/O 操作异步提交到内核,内核在后台处理这些请求,并将完成结果放入完成队列。Nginx 可以随时从完成队列中获取完成的 I/O 操作结果,进行相应的处理,如返回响应数据给客户端。这种方式大大提高了 Nginx 的并发处理能力,降低了延迟,提升了服务器的性能和响应速度。同时,io_uring 支持网络套接字的异步操作,使得 Nginx 在处理网络连接的建立、断开以及数据传输等操作时,能够更加灵活和高效。

文件存储系统:在文件存储系统中,io_uring 的应用可以有效提升文件的读写性能和系统的整体效率。以 Ceph 分布式文件系统为例,它需要处理大量的文件读写请求,并且要保证数据的一致性和可靠性。使用 io_uring 后,Ceph 可以将文件读写请求异步提交到内核,利用内核的高效 I/O 处理能力来完成这些请求。

在文件读取时,io_uring 可以提前将文件数据预读到内存中,当应用程序请求数据时,能够快速从内存中获取,减少了磁盘 I/O 的等待时间。在文件写入时,io_uring 可以将数据异步写入磁盘,同时应用程序可以继续执行其他任务,提高了系统的并发性能。此外,io_uring 的零拷贝特性在文件存储系统中也具有重要意义,它减少了数据在内存中的拷贝次数,提高了数据传输的效率,降低了 CPU 的开销。这对于大规模文件存储系统来说,能够显著提升系统的性能和可扩展性,更好地满足用户对文件存储和访问的需求。

4.2未来发展趋势展望

内核支持的增强:随着 Linux 内核的不断发展,对 io_uring 的支持有望进一步增强。未来的内核版本可能会优化 io_uring 的实现,减少其在高并发场景下的锁竞争和资源争用问题,从而进一步提升其性能。在多线程同时访问提交队列和完成队列时,内核可能会采用更高效的无锁数据结构或优化的锁机制,以确保多个线程能够高效地进行 I/O 请求的提交和结果的获取。此外,内核可能会增加对更多设备和文件系统的支持,使 io_uring 能够更好地应用于各种硬件平台和存储设备。

例如,对于新型的存储设备,如基于 3D XPoint 技术的非易失性内存,内核可能会优化 io_uring 的驱动程序,充分发挥这些设备的高性能优势。同时,对于不同的文件系统,如 ext4、XFS、Btrfs 等,内核可能会针对 io_uring 进行特定的优化,提高其在不同文件系统上的兼容性和性能表现。

应用领域的拓展:io_uring 在未来有望拓展到更多的应用领域。随着物联网(IoT)的快速发展,大量的物联网设备需要进行高效的数据传输和处理。io_uring 可以应用于物联网网关和边缘计算设备中,提高这些设备在处理大量传感器数据和设备通信时的 I/O 性能。在智能工厂中,物联网网关需要实时采集和处理大量的生产设备数据,使用 io_uring 可以实现高效的异步 I/O 操作,确保数据的及时传输和处理,提高生产效率和设备的智能化管理水平。

此外,在大数据处理和人工智能领域,io_uring 也具有广阔的应用前景。大数据处理框架如 Hadoop、Spark 等,在处理大规模数据集时,需要进行大量的文件读写和网络传输操作,io_uring 可以提高这些框架的数据处理速度和效率。在人工智能训练和推理过程中,需要频繁地读取和写入模型数据和训练样本,io_uring 的高效 I/O 特性可以加速这些操作,提升人工智能系统的性能和响应速度。

五、io_uring代码实践

#include <stdio.h>
#include <liburing.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>
 
#define EVENT_ACCEPT 0
#define EVENT_READ 1
#define EVENT_WRITE 2
 
struct conn_info
{
  int fd;
  int event;
};
 
int init_server(unsigned short port)
{
 
  int sockfd = socket(AF_INET, SOCK_STREAM, 0);
  struct sockaddr_in serveraddr;
  memset(&serveraddr, 0, sizeof(struct sockaddr_in));
  serveraddr.sin_family = AF_INET;
  serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
  serveraddr.sin_port = htons(port);
 
  if (-1 == bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(struct sockaddr)))
  {
    perror("bind");
    return -1;
  }
 
  listen(sockfd, 10);
 
  return sockfd;
}
 
#define ENTRIES_LENGTH 1024
#define BUFFER_LENGTH 1024
 
int set_event_recv(struct io_uring *ring, int sockfd,
           void *buf, size_t len, int flags)
{
 
  struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
 
  struct conn_info accept_info = {
    .fd = sockfd,
    .event = EVENT_READ,
  };
 
  io_uring_prep_recv(sqe, sockfd, buf, len, flags);
  memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}
 
int set_event_send(struct io_uring *ring, int sockfd,
           void *buf, size_t len, int flags)
{
 
  struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
 
  struct conn_info accept_info = {
    .fd = sockfd,
    .event = EVENT_WRITE,
  };
 
  io_uring_prep_send(sqe, sockfd, buf, len, flags);
  memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}
 
int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr,
           socklen_t *addrlen, int flags)
{
 
  struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
 
  struct conn_info accept_info = {
    .fd = sockfd,
    .event = EVENT_ACCEPT,
  };
 
  io_uring_prep_accept(sqe, sockfd, (struct sockaddr *)addr, addrlen, flags);
  memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}
 
int main(int argc, char *argv[])
{
 
  unsigned short port = 9999;
  int sockfd = init_server(port);
 
  struct io_uring_params params;
  memset(&params, 0, sizeof(params));
 
  struct io_uring ring;
  io_uring_queue_init_params(ENTRIES_LENGTH, &ring, &params);
 
#if 0
  struct sockaddr_in clientaddr;  
  socklen_t len = sizeof(clientaddr);
  accept(sockfd, (struct sockaddr*)&clientaddr, &len);
#else
 
  struct sockaddr_in clientaddr;
  socklen_t len = sizeof(clientaddr);
  set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0);
 
#endif
 
  char buffer[BUFFER_LENGTH] = {0};
 
  while (1)
  {
 
    io_uring_submit(&ring);
 
    struct io_uring_cqe *cqe;
    io_uring_wait_cqe(&ring, &cqe);
 
    struct io_uring_cqe *cqes[128];
    int nready = io_uring_peek_batch_cqe(&ring, cqes, 128); // epoll_wait
 
    int i = 0;
    for (i = 0; i < nready; i++)
    {
 
      struct io_uring_cqe *entries = cqes[i];
      struct conn_info result;
      memcpy(&result, &entries->user_data, sizeof(struct conn_info));
 
      if (result.event == EVENT_ACCEPT)
      {
 
        set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0);
        // printf("set_event_accept\n"); //
 
        int connfd = entries->res;
 
        set_event_recv(&ring, connfd, buffer, BUFFER_LENGTH, 0);
      }
      else if (result.event == EVENT_READ)
      { //
 
        int ret = entries->res;
        // printf("set_event_recv ret: %d, %s\n", ret, buffer); //
 
        if (ret == 0)
        {
          close(result.fd);
        }
        else if (ret > 0)
        {
          set_event_send(&ring, result.fd, buffer, ret, 0);
        }
      }
      else if (result.event == EVENT_WRITE)
      {
        //
 
        int ret = entries->res;
        // printf("set_event_send ret: %d, %s\n", ret, buffer);
 
        set_event_recv(&ring, result.fd, buffer, BUFFER_LENGTH, 0);
      }
    }
 
    io_uring_cq_advance(&ring, nready);
  }
}

5.1服务器初始化

int init_server(unsigned short port)
{
	int sockfd = socket(AF_INET, SOCK_STREAM, 0);
	struct sockaddr_in serveraddr;
	memset(&serveraddr, 0, sizeof(struct sockaddr_in));
	serveraddr.sin_family = AF_INET;
	serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
	serveraddr.sin_port = htons(port);
 
	if (-1 == bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(struct sockaddr)))
	{
		perror("bind");
		return -1;
	}
 
	listen(sockfd, 10);
	return sockfd;
}

该函数初始化了一个 TCP 服务器套接字,用于监听客户端连接请求。socket、bind 和 listen 是常规的服务器初始化步骤,将服务器绑定到指定的端口,并使其开始监听客户端连接。

5.2io_uring 环境初始化

struct io_uring_params params;
memset(&params, 0, sizeof(params));
 
struct io_uring ring;
io_uring_queue_init_params(ENTRIES_LENGTH, &ring, &params);

io_uring_queue_init_params 函数初始化了一个 io_uring 实例,这个实例将用于管理所有的异步 I/O 操作,ENTRIES_LENGTH 定义了提交队列和完成队列的大小,表示可以同时处理的最大 I/O 操作数量。

5.3设置 accept 事件

struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0);

set_event_accept 函数将一个 accept 操作添加到 io_uring 的提交队列中。这个操作用于接受客户端连接请求。这一步是服务器启动时的初始操作,它告诉 io_uring 开始监听并处理客户端连接。

5.4主循环:提交操作和处理完成事件

while (1)
{
	io_uring_submit(&ring);
	struct io_uring_cqe *cqe;
	io_uring_wait_cqe(&ring, &cqe);
 
	struct io_uring_cqe *cqes[128];
	int nready = io_uring_peek_batch_cqe(&ring, cqes, 128);
  • io_uring_submit:将之前添加到提交队列中的所有操作提交给内核,由内核异步执行这些操作。

  • io_uring_wait_cqe:等待至少一个操作完成,这是一个阻塞调用。

  • io_uring_peek_batch_cqe:批量获取已经完成的操作结果,nready 表示完成的操作数量。

5.5处理完成的事件

for (i = 0; i < nready; i++)
{
	struct io_uring_cqe *entries = cqes[i];
	struct conn_info result;
	memcpy(&result, &entries->user_data, sizeof(struct conn_info));
 
	if (result.event == EVENT_ACCEPT)
	{
		set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0);
		int connfd = entries->res;
		set_event_recv(&ring, connfd, buffer, BUFFER_LENGTH, 0);
	}
	else if (result.event == EVENT_READ)
	{
		int ret = entries->res;
		if (ret == 0)
		{
			close(result.fd);
		}
		else if (ret > 0)
		{
			set_event_send(&ring, result.fd, buffer, ret, 0);
		}
	}
	else if (result.event == EVENT_WRITE)
	{
		int ret = entries->res;
		set_event_recv(&ring, result.fd, buffer, BUFFER_LENGTH, 0);
	}
}
  • EVENT_ACCEPT:处理 accept 事件。当一个新的客户端连接到来时,io_uring 完成队列会返回 EVENT_ACCEPT 事件,表示一个新的连接已经建立。此时,服务器会:重新设置 accept 事件,继续监听新的客户端连接。获取新连接的文件描述符 connfd,并设置一个 recv 事件来准备接收数据。

  • EVENT_READ:处理 recv 事件。当从客户端接收到数据时,io_uring 返回 EVENT_READ 事件。如果接收到的数据长度大于0,则会设置一个 send 事件来将数据发送回客户端。如果 ret == 0,说明客户端关闭了连接,则关闭文件描述符。

  • EVENT_WRITE:处理 send 事件。当数据成功发送给客户端后,io_uring 返回 EVENT_WRITE 事件。此时,服务器会再次设置一个 recv 事件,准备接收更多数据。

5.6完成队列的推进

io_uring_cq_advance(&ring, nready);

这个函数通知 io_uring,你已经处理完了 nready 个完成队列条目(CQE)。io_uring 可以释放这些 CQE 供后续操作使用。

总结

io_uring 的作用:在这个示例中,io_uring 被用来高效地处理网络 I/O 操作。通过异步提交和处理 accept、recv、send 操作,服务器能够高效处理多个并发连接,而无需阻塞等待每个I/O操作完成。

异步模型:io_uring 提供了一种低延迟、高并发的异步 I/O 处理方式。操作在提交后由内核异步执行,完成后再由应用程序查询并处理结果。这种方式大大减少了系统调用的开销,提高了程序的并发处理能力。

关键点:

  • 提交操作:使用 io_uring_prep_* 函数准备操作,并提交给内核处理。

  • 等待完成:使用 io_uring_wait_cqe 等方法等待操作完成,并获取结果。

  • 处理结果:根据完成队列中的事件类型(如 EVENT_ACCEPT、EVENT_READ、EVENT_WRITE)进行相应的处理和后续操作。


http://www.kler.cn/a/529862.html

相关文章:

  • 穷举vs暴搜vs深搜vs回溯vs剪枝系列一>单词搜索
  • DeepSeek-R1 低成本训练的根本原因是?
  • 玩转Docker | 使用Docker部署SSCMS内容管理系统
  • 01.双Android容器解决方案
  • PHP 常用函数2025.02
  • QT知识点复习
  • 理解知识蒸馏中的散度损失函数(KLDivergence/kldivloss )-以DeepSeek为例
  • scrape登录(js逆向)
  • 负载均衡器高可用部署
  • 【数据结构】_链表经典算法OJ:链表判环问题
  • C#面试常考随笔9:什么是闭包?
  • C++泛型编程指南04-(对默认调用参数的类型推断)
  • 最新码支付个人免签支付系统源码 三网免挂版本 兼容易支付
  • 【数据结构】_链表经典算法OJ:相交链表
  • linux中统计文件中特定单词或字符串的出现次数
  • CMake项目编译与开源项目目录结构
  • 面试常考题目——状态码总结
  • 96,【4】 buuctf web [BJDCTF2020]EzPHP
  • JavaFX - 事件处理
  • Mac上的虚拟化软件推荐
  • Go 中 defer 的机制
  • 基于开源AI智能名片2 + 1链动模式S2B2C商城小程序源码在抖音招商加盟中的应用与创新
  • web前端13--动画
  • 129.求根节点到叶节点数字之和(遍历思想)
  • 面试题:React实现鼠标托转文字绕原点旋转
  • DeepSeek是什么?横空出世意味着什么?