当前位置: 首页 > article >正文

【Linux】其他备选高级IO模型

其他高级 I/O 模型

以上基本介绍的都是同步IO相关知识点,即在同步I/O模型中,程序发起I/O操作后会等待I/O操作完成,即程序会被阻塞,直到I/O完成。整个I/O过程在同一个线程中进行,程序在等待期间不能执行其他任务。下面将会介绍除同步IO之外的其他常见IO模型。

什么是IO?什么是高效的IO?

**I/O(输入/输出)**是计算机与外部世界进行数据交换的过程。在Linux系统中,I/O操作通常指的是程序与硬盘、网络设备、终端等进行数据交换的操作。I/O性能直接影响到系统的响应速度和吞吐量,是很多应用系统优化的关键目标。

高效的IO 涉及优化I/O操作的延迟、吞吐量和资源消耗。在一个复杂的系统中,I/O效率通常通过以下方式得到提升:

  • 减少I/O等待时间(例如,通过非阻塞I/O或异步I/O)
  • 最大化吞吐量(例如,通过数据预读或缓存机制)
  • 优化系统资源的利用(例如,通过多路复用和线程池等技术)

高效的IO不仅可以提升程序的响应速度,还能减少系统的负载,提高并发处理能力。

IO模型分析方法

出处:Linux五种IO模型-CSDN博客

分析IO模型需要了解2个问题:

  • 问题1:发送IO请求,IO请求可以理解为用户空间和内核空间数据同步,根据发起者不同分为以下两种情况:

    • 由用户程序发起(同步IO)。
    • 由内核发起(异步IO)。
  • 问题2:等待数据到来,等待数据到来的方式有以下几种:

    • 阻塞(阻塞IO)。
    • 轮询(非阻塞IO)。
    • 信号通知(信号驱动IO)。

内核空间和用户空间数据同步由谁发起是分析Linux IO模型最核心的问题

1.阻塞式I/O(Blocking I/O)

  • 阻塞式I/O是最常见的I/O模型,在这种模型下,当程序请求I/O操作时,会阻塞当前线程直到I/O操作完成(如读/写数据)。在等待期间,线程无法进行其他操作。
  • 优点:简单,易于理解和实现。
  • 缺点:性能瓶颈,当进行大量I/O操作时,阻塞会导致线程无法有效利用,浪费CPU资源。
代码示例:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>

#define PORT 8080
#define BUFFER_SIZE 1024

int main() {
    int sockfd;
    struct sockaddr_in server_addr;
    char buffer[BUFFER_SIZE];

    // 创建一个TCP套接字
    if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
        perror("套接字创建失败");
        return -1;
    }

    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(PORT);
    server_addr.sin_addr.s_addr = INADDR_ANY;

    // 绑定套接字
    if (bind(sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
        perror("绑定失败");
        close(sockfd);
        return -1;
    }

    // 监听端口
    if (listen(sockfd, 3) < 0) {
        perror("监听失败");
        close(sockfd);
        return -1;
    }

    printf("等待客户端连接...\n");

    int new_sock;
    struct sockaddr_in client_addr;
    socklen_t client_len = sizeof(client_addr);

    // 接受客户端连接
    if ((new_sock = accept(sockfd, (struct sockaddr*)&client_addr, &client_len)) < 0) {
        perror("接受连接失败");
        close(sockfd);
        return -1;
    }

    printf("客户端已连接。等待数据...\n");

    // 阻塞式recv:等待客户端发送数据
    while (1) {
        printf("发起 I/O 请求:调用 recv() 等待数据...\n");
        ssize_t bytes_received = recv(new_sock, buffer, BUFFER_SIZE - 1, 0); // BUFFER_SIZE -1 保证留出 '\0'

        if (bytes_received < 0) {
            perror("recv 失败");
            break;
        }

        if (bytes_received == 0) {
            printf("客户端已断开连接。\n");
            break;
        }

        buffer[bytes_received] = '\0';  // 确保字符串结束
        printf("收到数据:%s\n", buffer); // 数据到达并唤醒进程
    }

    close(new_sock);
    close(sockfd);
    return 0;
}
原理分析:

image-20241207224835694

阶段 1: 用户程序调用 recv 发起 I/O 请求(同步 I/O)

背景: 在阻塞 I/O 模式下,用户程序发起 I/O 操作时(比如通过 recv 函数从套接字读取数据),如果内核空间的套接字缓冲区没有数据准备好,用户进程会被阻塞,直到数据可用。

步骤 1: 发起 I/O 请求

ssize_t bytes_received = recv(sockfd, buffer, BUFFER_SIZE, 0);
  • 当用户程序调用 recv 函数时,内核会检查指定的套接字是否有足够的数据可以读取。如果套接字的缓冲区为空,内核会将调用进程从 TASK_RUNNING 状态切换到 TASK_INTERRUPTIBLE 状态。

步骤 2: 进程切换状态并阻塞

  • 进程状态的切换意味着 CPU 将会把当前进程放入 进程等待队列,并且该进程不再占用 CPU 资源。系统调度程序会选择其他可以运行的进程来执行,这就是“进程阻塞”的表现。
  • 被阻塞的进程会被加入到该套接字的等待队列中,等待数据的到来。一旦数据可用,进程就会被唤醒并继续执行。

重要的概念:

  • 阻塞 I/O 并不意味着阻塞 CPU,它只会使进程切换到阻塞状态,让出 CPU 的控制权,其他进程可以继续执行。
  • 进程的阻塞状态是由内核进行管理的,通常是 TASK_INTERRUPTIBLETASK_UNINTERRUPTIBLE,这取决于 I/O 请求的性质。TASK_INTERRUPTIBLE 状态表示进程可以响应信号中断,而 TASK_UNINTERRUPTIBLE 状态下进程不会响应信号。
阶段 2: 网卡收到数据包并唤醒进程

背景: 数据包通过网络接口卡(NIC)到达时,内核通过硬件中断机制将数据拷贝到内核空间。内核会将这些数据存入相应的套接字缓冲区,进而唤醒之前等待的进程。

步骤 1: 网卡接收数据包

  • 网络接口卡(NIC)通过 DMA(直接内存访问)机制将收到的网络数据包直接拷贝到内核空间的环形缓冲区(RingBuffer)中,这一过程在硬件和内核中自动完成。
  • 一旦数据包被成功接收,NIC 会触发硬件中断,通知操作系统数据包已到达。

步骤 2: 数据包复制到套接字接收缓冲区

  • 中断处理程序会将数据包从环形缓冲区复制到对应套接字的接收缓冲区。
  • 在这时,内核会检查是否有任何进程(如在第一阶段被阻塞的进程)正在等待该套接字的接收数据。如果有,内核会唤醒等待队列中的进程,使其恢复执行。

步骤 3: 唤醒等待的进程

  • 被唤醒的进程会重新进入 TASK_RUNNING 状态,系统调度器会将该进程从等待队列中移除,并将其放回可运行队列。
  • 用户进程被恢复执行后,recv 函数会从套接字接收缓冲区中读取数据,并将数据返回到用户空间的缓冲区(例如 buffer)。

步骤 4: 用户程序完成 I/O 操作

  • 一旦数据被成功读取,recv 函数会返回读取的字节数,用户程序可以继续处理接收到的数据。


2.非阻塞IO(Non-blocking I/O)

首先我们先来认识一个新的函数fcntl():

2.1 fcntl 函数的基本用法

fcntl 函数主要用于对已打开的文件描述符进行控制操作,以改变文件描述符的属性或获取其状态。fcntl函数通常在处理低级别的文件I/O时使用,例如复制文件描述符、获取和设置文件描述符标志、获取和设置文件状态标志、文件锁定(共享锁和排他锁)、设置文件所有者。

fcntl函数的基本原型如下:

#include <fcntl.h>
int fcntl(int fd, int cmd, ... /* arg */ );

参数说明

  • fd 是文件描述符。
  • cmd 是要执行的命令。
  • arg 可选参数,可能还需要一个或多个附加参数,具体取决于 cmd 的值。

返回值

  • 成功时,根据cmd的不同,可能需要一个或多个附加参数。
  • 失败时,返回-1,并设置errno以只是错误原因。
fcntl 的常见命令
  1. 复制文件描述符

    • F_DUPFD:返回一个大于或等于指定值的最小可用文件描述符,该描述符与原来的描述符指向同一个文件。
    • F_DUPFD_CLOEXEC:与 F_DUPFD 类似,但新描述符设置 FD_CLOEXEC 标志。

    示例代码:

    #include <fcntl.h> // 包含 fcntl 相关的头文件
    #include <unistd.h> // 包含 close 和 open 函数的头文件
    #include <iostream> // 包含输入输出流的头文件
    
    int main() {
        int old_fd = open("example.txt", O_RDONLY); // 打开文件用于只读
        if (old_fd == -1) {
            std::perror("打开文件失败:"); // 输出错误信息
            return -1;
        }
    
        int new_fd = fcntl(old_fd, F_DUPFD, 3); // 复制文件描述符,要求新的描述符至少为 3
        if (new_fd == -1) {
            std::perror("复制文件描述符失败:");
            close(old_fd);
            return -1;
        }
    
        std::cout << "新的文件描述符:" << new_fd << std::endl; // 输出新的文件描述符
    
        close(old_fd); // 关闭旧的文件描述符
        close(new_fd); // 关闭新的文件描述符
    
        return 0;
    }
    
  2. 获取/设置文件描述符标志

    • F_GETFD:获取文件描述符的标志。

    • F_SETFD:设置文件描述符的标志。

    • FD_CLOEXEC:设置或取消 close-on-exec 标志,该标志表示在执行 exec 系列函数时关闭该描述符。

    标志

    • O_NONBLOCK:非阻塞模式。
    • O_APPEND:追加模式。
    • O_ASYNC:启用信号驱动 I/O。
    • O_DIRECT:直接 I/O(尽量绕过缓冲区缓存)。
    • O_NOATIME:不更新文件的访问时间。

    示例代码:

    #include <fcntl.h> // 包含 fcntl 相关的头文件
    #include <unistd.h> // 包含 close 和 open 函数的头文件
    #include <iostream> // 包含输入输出流的头文件
    
    int main() {
        int fd = open("example.txt", O_RDONLY); // 打开文件用于只读
        if (fd == -1) {
            std::perror("打开文件失败:"); // 输出错误信息
            return -1;
        }
    
        int flags = fcntl(fd, F_GETFD); // 获取文件描述符的标志
        if (flags == -1) {
            std::perror("获取文件描述符标志失败:");
            close(fd);
            return -1;
        }
    
        // 设置 FD_CLOEXEC 标志,使文件描述符在执行 exec 时关闭
        if (fcntl(fd, F_SETFD, flags | FD_CLOEXEC) == -1) {
            std::perror("获取文件描述符标志失败:");
            close(fd);
            return -1;
        }
    
        std::cout << "设置 FD_CLOEXEC 前的标志:" << flags << std::endl; // 输出设置前的标志
        std::cout << "设置 FD_CLOEXEC 后的标志:" << new_flags << std::endl; // 输出设置后的标志
    
        close(fd); // 关闭文件描述符
        return 0;
    }
    
  3. 获取/设置文件状态标志

    • F_GETFL:获取文件状态标志。
    • F_SETFL:设置文件状态标志。

    示例代码:

    #include <fcntl.h> // 包含 fcntl 相关的头文件
    #include <unistd.h> // 包含 close 和 open 函数的头文件
    #include <iostream> // 包含输入输出流的头文件
    
    int main() {
        int fd = open("example.txt", O_RDONLY); // 打开文件用于只读
        if (fd == -1) {
            std::perror("打开文件失败:"); // 输出错误信息
            return -1;
        }
    
        int flags = fcntl(fd, F_GETFL); // 获取文件状态标志
        if (flags == -1) {
            std::perror("获取文件状态标志失败:");
            close(fd);
            return -1;
        }
    
        // 设置 O_NONBLOCK 标志,使文件描述符处于非阻塞模式
        fcntl(fd, F_SETFL, flags | O_NONBLOCK);
    
        int new_flags = fcntl(fd, F_GETFL); // 再次获取文件状态标志
        if (new_flags == -1) {
            std::perror("获取文件状态标志失败:");
            close(fd);
            return -1;
        }
    
        std::cout << "设置 O_NONBLOCK 前的标志:" << flags << std::endl; // 输出设置前的标志
        std::cout << "设置 O_NONBLOCK 后的标志:" << new_flags << std::endl; // 输出设置后的标志
    
        close(fd); // 关闭文件描述符
    
        return 0;
    }
    

2.2 阻塞模式 vs 非阻塞模式

  1. 阻塞模式:(上面已有介绍)

    • 默认情况下,文件描述符处于阻塞模式。
    • 当你尝试从一个没有数据可读的文件描述符中读取数据,或者尝试向一个写缓冲区已满的文件描述符写入数据时,进程会被阻塞,直到操作可以完成。
  2. 非阻塞模式

    • 如果设置了 O_NONBLOCK 标志,那么当尝试读取没有数据可读或写缓冲区已满的情况下,不会阻塞进程,而是立即返回一个错误。
    • 典型的错误码是 EAGAINEWOULDBLOCK,这表明操作不能立即完成。

    非阻塞I/O模型下,I/O请求立即返回,不会阻塞程序。如果数据没有准备好,程序会收到错误或“没有数据”的通知,之后可以重新尝试。

    • 优点:线程可以进行其他任务,而不是等待I/O操作完成。
    • 缺点:需要不断轮询(polling)I/O操作,增加了额外的CPU开销。

代码示例:

#include <iostream>
#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#include <cstring>

using namespace std;
#define PORT 8080
#define BUFFER_SIZE 1024

// 设置套接字为非阻塞模式
int setNonBlocking(int sockfd) {
    int flags = fcntl(sockfd, F_GETFL, 0);
    if (flags == -1) {
        cerr << "获取文件状态标志失败: " << strerror(errno) << endl;
        return -1;
    }
    if (fcntl(sockfd, F_SETFL, flags | O_NONBLOCK) == -1) {
        cerr << "设置非阻塞模式失败: " << strerror(errno) << endl;
        return -1;
    }
    return 0;
}

int main() {
    // 创建套接字
    int server_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (server_fd == -1) {
        cerr << "创建套接字失败: " << strerror(errno) << endl;
        return -1;
    }

    // 设置套接字为非阻塞
    if (setNonBlocking(server_fd) == -1) {
        close(server_fd);
        return -1;
    }

    // 绑定地址和端口
    struct sockaddr_in address;
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
        cerr << "绑定地址和端口失败: " << strerror(errno) << endl;
        close(server_fd);
        return -1;
    }

    // 监听连接
    if (listen(server_fd, 3) < 0) {
        cerr << "监听连接失败: " << strerror(errno) << endl;
        close(server_fd);
        return -1;
    }

    cout << "服务器正在监听端口 " << PORT << endl;

    // 接受客户端连接
    struct sockaddr_in client_address;
    socklen_t addr_len = sizeof(client_address);
    int client_fd = -1;

    while (client_fd == -1) {
        client_fd = accept(server_fd, (struct sockaddr *)&client_address, &addr_len);

        if (client_fd == -1) {
            // 如果没有连接,errno 会被设置为 EWOULDBLOCK 或 EAGAIN
            if (errno == EWOULDBLOCK || errno == EAGAIN) {
                cout << "没有可用连接,继续等待..." << endl;
                sleep(1);  // 延时 1s 后重试
            } else {
                cerr << "接受客户端连接失败: " << strerror(errno) << endl;
                close(server_fd);
                return -1;
            }
        }
    }

    cout << "接受到客户端连接" << endl;

    // 设置客户端套接字为非阻塞
    if (setNonBlocking(client_fd) == -1) {
        close(server_fd);
        close(client_fd);
        return -1;
    }

    char buffer[BUFFER_SIZE];
    int bytes_read;

    while (true) {
        // 尝试读取客户端数据
        bytes_read = recv(client_fd, buffer, sizeof(buffer), 0);

        if (bytes_read == -1) {
            // 如果没有数据,errno 会被设置为 EWOULDBLOCK 或 EAGAIN
            if (errno == EWOULDBLOCK || errno == EAGAIN) {
                cout << "没有可用数据,执行其他任务..." << endl;
                usleep(100000);  // 假设其他任务的延时(100ms)
            } else {
                cerr << "接收数据失败: " << strerror(errno) << endl;
                break;
            }
        } else if (bytes_read == 0) {
            // 客户端关闭连接
            cout << "客户端已断开连接" << endl;
            break;
        } else {
            // 成功接收到数据
            buffer[bytes_read] = '\0';  // 确保接收到的字符串以 '\0' 结尾
            cout << "收到数据: " << buffer << endl;
        }
    }

    // 关闭连接
    close(client_fd);
    close(server_fd);

    return 0;
}

原理分析:

image-20241210170602345

非阻塞 I/O/阶段1:用户程序调用 recv 发起 I/O 请求

  • 在循环中调用 recv,如果没有数据,recv 会立即返回 -1 并设置 errnoEWOULDBLOCKEAGAIN
  • 程序检测到 EWOULDBLOCK 错误后,执行其他任务(如打印信息和休眠),然后继续尝试接收数据。
  • 这对应于用户程序调用 recv 发起 I/O 请求,读取 socket 缓冲区数据。由于 socket 缓冲区没有就绪数据包,非阻塞 I/O recv 直接返回 EWOULDBLOCK 错误码,用户如果一直调用 recv 函数则一直返回 EWOULDBLOCK 错误码,直到数据准备好。

非阻塞 I/O/阶段2:数据到达并唤醒进程

  • 当数据通过网络接口卡(NIC)接收并被内核处理后,数据被复制到套接字接收缓冲区。
    • 随后,recv 调用会成功读取数据,返回接收到的字节数,程序继续处理数据。
  • 这与阻塞 I/O 的阶段2 相同,即网卡收到数据包并唤醒进程,包括数据复制到套接字接收缓冲区和唤醒等待的进程。

拓展知识Ctrl+CCtrl+D:

组合键作用退出状态码典型场景
Ctrl+C发送 SIGINT 信号给前台进程,请求程序终止通常为 130128 + 2用于中断正在运行的程序
Ctrl+D发送 EOF 信号,表示输入结束通常为 0用于结束输入或退出交互式 shell


3.IO多路复用(Multiplexing)

IO多路复用是一种高效的IO处理方式,它可以让一个进程同时监控多个文件描述符,当其中任意一个文件描述符就绪时,就可以进行相应的IO操作。

相比于传统的阻塞IO和非阻塞IO,IO复用可以打打提高IO效率,减少CPU资源的浪费。

在Linux中,常用的IO复用模型有selectpollepoll等。

IO复用模型请求由用户程序发起,所以IO复用模型为同步IO。

3.1 IO复用select模型

3.1.1 认识select函数

系统调用select()会一直阻塞,直到一个或多个文件描述符集合成为就绪态。它通过传递三个文件描述符集合(读集合、写集合和异常集合)给内核,内核会在这些集合中等待任意一个文件描述符就绪。

#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
参数:
  • nfds:设为比下面三个文件描述符集合中所包含的最大文件描述符号 +1

  • readfds:是用来检测输入是否就绪的文件描述符集合。

  • writefds:是用来检测输出是否就绪的文件描述符集合。

  • exceptfds:是用来检测异常情况是否发生的文件描述符集合。

  • timeout:超时时间,这个参数控制着select()的阻塞行为。

    • 该参数可指定为NULL,此时select()会一直阻塞。又或者指向一个timeval结构体。
    • 如果结构体timeval的两个域都为0的话,此时select()不会阻塞。它只是简单低轮询指定的文件描述符集合,查看其中是否有就绪的文件描述符并立即返回。
    • 否则,timeval的两个域都不为0的话,timeval将为select()指定一个等待时间上限值。代表多久之后返回,即超时时间。
返回值:
  • 如果有文件描述符变得可读、可写或发生异常,返回值为准备就绪的文件描述符的数目。
  • 如果 timeout 到达,返回值为 0。这种情况下每个返回的文件描述符集合将被清空。
  • 如果发生错误,返回值为 -1。

timeval结构体原型

struct timeval
{
__time_t tv_sec;		/* 秒.  */
__suseconds_t tv_usec;	/* 微秒.  */
};

timeout设为NULL,或其指向的结构体字段非零时,select()将阻塞直到下列事件发生:

  • readfdswritefdsexceptfds中指定的文件描述符中至少有一个成为就绪态。
  • 该调用被信号处理中断。
  • timeout 中指定的时间上限已超时。

关于 fd_setfd_set 是一个位图结构,用于标识一组文件描述符(通常是网络套接字)。fd_set 的定义通常是由底层实现细节决定的,但通常它是一个足够大的位字段,能够容纳系统中可能的最大文件描述符值。fd_set 的主要用途包括:

  • 输入参数:告诉 select 哪些文件描述符应该被监控。你可以在调用 select 之前,通过调用 FD_ZERO 清空集合,然后用 FD_SET 向集合中添加感兴趣的文件描述符。
  • 输出参数select 返回后,被监控的文件描述符集合中,就绪的描述符对应的比特位会被设置为 1。你可以通过 FD_ISSET 宏来检查特定的文件描述符是否已经就绪。
3.1.2 fd_set类型和宏

通常数据类型fd_set以位掩码的形式来实现。但是,我们并不需要知道这些细节,因为所有关于文件描述符集合的操作都是通过四个宏完成的:

#include <sys/select.h>
void FD_ZERO(fd_set *set);			/* 将fdset所指向的集合初始化为空 */
void FD_SET(int fd, fd_set *set);	/* 将文件描述符fd添加到由fdset所指向的集合中 */
void FD_CLR(int fd, fd_set *set);	/* 将文件描述符fd从fdset所指向的集合中移除 */
int FD_ISSET(int fd, fd_set *set);	/* 检查已连接的套接字是否有数据可读 */

参数readfdswritefdsexceptfds所指向的结构体都是保存结果值的地方。在调用select()之前,这些指向的结构体必须初始化(通过FD_ZERO()FD_SET()),以包含我们感兴趣的文件描述符集合。之后select()调用会修改这些结构体,当select()返回时,它们包含的就是已处于就绪态的文件描述符集合了(由于这些结构体会在调用中被修改,如果在循环中重复调用select(),我们必须保证每次都要重新初始化它们)。之后这些结构体可以通过FD_ISSET()来检查是否有数据可读。

3.1.3 select特点
  1. 文件描述符上限

    select 函数能同时等待的文件描述符(fd)是有上限的。这个上限在很多操作系统中默认为 1024,可以通过调整内核参数来增加这个上限,但即使增加,仍然存在一个固定的上限。这是因为 select 使用位图来表示文件描述符集合,而位图的大小是固定的。如果需要更大的文件描述符数量,需要修改内核或使用其他 I/O 多路复用技术,如 epoll

  2. 维护合法的文件描述符

    使用 select 时,需要维护一个第三方数组来保存合法的文件描述符。这是因为在实际应用中,文件描述符可能会动态增加或减少,而 select 本身并不提供任何机制来自动跟踪这些变化。因此,程序员需要自己维护这样一个列表,确保每次调用 select 时提供的文件描述符集合是最新的。

  3. 输入输出型参数

    select 的参数是输入输出型的。这意味着在调用 select 之前,你需要设置好各个集合(readfdswritefdsexceptfds),告诉内核你需要监控哪些文件描述符的状态。而在 select 返回后,你需要检查这些集合,确定哪些文件描述符就绪。这种模式要求在每次调用 select 之前重置集合,并在调用之后检查集合,这增加了用户的负担。

  4. 第一个参数是最大 fd+1
    select 的第一个参数 nfds 是最大的文件描述符值加 1。这是因为 select 在内核中需要遍历从 0 到最大文件描述符值的范围,来检查哪些文件描述符处于就绪状态。如果最大文件描述符是 n,那么实际上需要检查的范围是从 0n,共 n+1 个文件描述符。例如,如果最大的文件描述符是 5,那么 select 实际上需要检查 0 到 5 的文件描述符,共 6 个。

  5. 位图的使用

    select 使用位图(fd_set)来表示文件描述符集合。位图是一种高效的数据结构,每个比特位代表一个文件描述符的状态。当用户向内核传递文件描述符集合时,实际上是传递了一个位图。内核在监控过程中会修改这个位图,将就绪的文件描述符标记出来。当 select 返回后,用户可以根据位图来确定哪些文件描述符已经准备好。这种方式简化了内核与用户空间之间的交互,但也带来了额外的内存拷贝成本。

代码示例:
#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <cstring>
#include <fcntl.h>
#include <sys/select.h>

#define SERVER_PORT 8080
#define MAX_CLIENTS 10
#define BUF_SIZE 1024

void set_socket_non_blocking(int sockfd) {
    int flags = fcntl(sockfd, F_GETFL, 0);
    fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);
}

int main() {
    int server_fd, client_fd, max_fd, new_socket;
    struct sockaddr_in server_addr;
    char buffer[BUF_SIZE];
    fd_set read_fds, master_fds;
    struct timeval timeout;

    // 创建 TCP socket
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
        perror("创建套接字失败");
        return -1;
    }

    // 设置服务器地址
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY;
    server_addr.sin_port = htons(SERVER_PORT);

    // 绑定套接字
    if (bind(server_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
        perror("绑定失败");
        return -1;
    }

    // 开始监听客户端连接
    if (listen(server_fd, MAX_CLIENTS) == -1) {
        perror("监听失败");
        return -1;
    }

    // 设置 server_fd 为非阻塞
    set_socket_non_blocking(server_fd);

    // 初始化 fd_set
    FD_ZERO(&master_fds);
    FD_SET(server_fd, &master_fds);
    max_fd = server_fd;

    while (true) {
        // 将 master_fds 赋值给 read_fds,因为 select() 会修改 read_fds
        read_fds = master_fds;

        // 设置超时,阻塞 5 秒
        timeout.tv_sec = 5;
        timeout.tv_usec = 0;

        // 调用 select() 进行 I/O 多路复用
        int activity = select(max_fd + 1, &read_fds, nullptr, nullptr, &timeout);
        if (activity == -1) {
            perror("select 错误");
            break;
        } else if (activity == 0) {
            std::cout << "没有活动,继续等待...\n";
            continue;
        }

        // 遍历所有文件描述符
        for (int fd = 0; fd <= max_fd; ++fd) {
            // 如果 fd 是活动的文件描述符
            if (FD_ISSET(fd, &read_fds)) {
                if (fd == server_fd) {
                    // 处理新的客户端连接
                    if ((new_socket = accept(server_fd, nullptr, nullptr)) == -1) {
                        perror("接受连接失败,继续...");
                        continue;
                    }

                    std::cout << "新连接,套接字 fd: " << new_socket << "\n";

                    // 设置新的套接字为非阻塞
                    set_socket_non_blocking(new_socket);

                    // 将新的客户端套接字加入 fd_set
                    FD_SET(new_socket, &master_fds);
                    if (new_socket > max_fd) {
                        max_fd = new_socket;
                    }
                } else {
                    // 处理已连接客户端的 I/O 操作
                    int bytes_read = read(fd, buffer, sizeof(buffer) - 1);
                    if (bytes_read == 0) {
                        // 客户端关闭连接
                        std::cout << "客户端断开连接,套接字 fd: " << fd << "\n";
                        close(fd);
                        FD_CLR(fd, &master_fds);
                    } else if (bytes_read > 0) {
                        // 处理收到的数据
                        buffer[bytes_read] = '\0';
                        std::cout << "收到来自客户端 " << fd << " 的数据: " << buffer << "\n";

                        // 回送数据给客户端
                        send(fd, buffer, bytes_read, 0);
                    } else {
                        perror("读取错误");
                        close(fd);
                        FD_CLR(fd, &master_fds);
                    }
                }
            }
        }
    }

    close(server_fd);
    return 0;
}
原理分析:

image-20241219120501775


3.2 IO复用poll模型

poll是一种改进的 IO 多路复用模型,它解决了select模型中的一些局限性,尤其是在处理大量文件描述符和提高性能方面。pollselect类似,但它通过使用pollfd结构体数组来管理文件描述符,而不是像select那样依赖位图(bitmask)。这使得poll在某些方面更加灵活和高效。

3.2.1 poll的基本使用方法
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);

参数说明

  • fds:指向 pollfd 结构体数组的指针,每个结构体代表一个需要监视的文件描述符及其事件。
  • nfdspollfd 数组中元素的数量,即有多少个文件描述符需要被监视。
  • timeout:等待事件的超时时间,单位为毫秒。-1 表示无限等待直到至少有一个文件描述符准备好;0 表示非阻塞;如果是正数,则表示最大等待的时间长度。

pollfd 结构体

struct pollfd {
    int   fd;         // 要监视的文件描述符
    short events;     // 监视的事件
    short revents;    // 实际发生的事件
};

事件类型常用值:

  • POLLIN:数据可读。
  • POLLOUT:可以写数据。
  • POLLERR:发生错误。
  • POLLHUP:挂断。
  • POLLPRI:高优先级数据可读。
3.2.2 poll解决select存在的问题

poll相较于select在多个方面进行了改进,解决了select模型中的一些关键限制。以下是poll解决select存在问题的主要方面:

  1. 无需重新设定参数:与select每次调用前都需要重新初始化监视的文件描述符集合不同,poll使用一个独立的结构体数组(pollfd),该数组在函数调用后保持不变,只需更新事件结果。这使得代码更简洁、易于维护。
  2. 消除文件描述符数量的上限select受限于系统定义的最大文件描述符数(如FD_SETSIZE),而poll通过动态管理文件描述符数组,仅受系统资源和内核能力的限制,适合处理大量并发连接。
  3. 更高效的事件通知机制:尽管两者都可能需要扫描所有文件描述符,poll得益于其灵活的结构和操作系统的优化,在实际应用中通常性能更优。
  4. 支持更多事件类型:相较于select仅支持基本事件类型,poll支持更多种类的事件,如高优先级数据和挂断事件,提供更细致的监控能力。
  5. 更好的跨平台兼容性poll在多种现代操作系统中的实现更为一致,简化了跨平台应用的开发难度。
代码示例:
#include <iostream>
#include <vector>
#include <poll.h>
#include <unistd.h>
#include <cstring>
#include <arpa/inet.h>
#include <fcntl.h>

#define PORT 8080
#define MAX_EVENTS 1024
#define BUFFER_SIZE 1024

int set_non_blocking(int fd) {
    int flags = fcntl(fd, F_GETFL, 0);
    if (flags == -1)
        return -1;
    return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}

int main() {
    int server_fd, new_socket;
    struct sockaddr_in address;
    int opt = 1;
    int addrlen = sizeof(address);
    
    // 创建监听套接字
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }
    
    // 设置套接字选项
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt))) {
        perror("setsockopt");
        close(server_fd);
        exit(EXIT_FAILURE);
    }
    
    // 设置地址结构
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);
    
    // 绑定套接字
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
        perror("bind failed");
        close(server_fd);
        exit(EXIT_FAILURE);
    }
    
    // 开始监听
    if (listen(server_fd, 10) < 0) {
        perror("listen");
        close(server_fd);
        exit(EXIT_FAILURE);
    }
    
    // 设置服务器套接字为非阻塞
    if (set_non_blocking(server_fd) < 0) {
        perror("set_non_blocking");
        close(server_fd);
        exit(EXIT_FAILURE);
    }
    
    // 初始化pollfd数组
    std::vector<struct pollfd> fds;
    struct pollfd server_pollfd;
    server_pollfd.fd = server_fd;
    server_pollfd.events = POLLIN;
    server_pollfd.revents = 0;
    fds.push_back(server_pollfd);
    
    std::cout << "服务器正在端口 " << PORT << " 上监听" << std::endl;
    
    while (true) {
        int activity = poll(fds.data(), fds.size(), -1);
        
        if (activity < 0) {
            perror("poll error");
            break;
        }
        
        for (size_t i = 0; i < fds.size(); ++i) {
            // 检测是否有事件发生
            if (fds[i].revents & POLLIN) {
                if (fds[i].fd == server_fd) {
                    // 有新的连接请求
                    if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen)) < 0) {
                        perror("accept");
                        continue;
                    }
                    
                    // 设置新套接字为非阻塞
                    if (set_non_blocking(new_socket) < 0) {
                        perror("set_non_blocking");
                        close(new_socket);
                        continue;
                    }
                    
                    // 添加新套接字到pollfd数组
                    struct pollfd client_pollfd;
                    client_pollfd.fd = new_socket;
                    client_pollfd.events = POLLIN;
                    client_pollfd.revents = 0;
                    fds.push_back(client_pollfd);
                    
                    std::cout << "新连接,socket fd: " << new_socket << std::endl;
                } else {
                    // 处理已连接的客户端数据
                    char buffer[BUFFER_SIZE];
                    int valread = read(fds[i].fd, buffer, BUFFER_SIZE);
                    
                    if (valread <= 0) {
                        // 客户端关闭连接或发生错误
                        close(fds[i].fd);
                        std::cout << "连接关闭,socket fd: " << fds[i].fd << std::endl;
                        fds.erase(fds.begin() + i);
                        --i;
                        continue;
                    }
                    
                    buffer[valread] = '\0';
                    std::cout << "收到: " << buffer << " 来自socket fd: " << fds[i].fd << std::endl;
                    
                    // 回显数据给客户端
                    send(fds[i].fd, buffer, valread, 0);
                }
            }
        }
    }
    
    // 关闭所有套接字
    for (auto &pfd : fds) {
        close(pfd.fd);
    }
    
    return 0;
}
原理分析:

poll模型和select非常相似,主要区别为poll模型把位图改为链表,poll通过链表实现IO复用,将 socket 注册 poll_list 链表,通过poll系统调用轮询链表,获取 socket 事件。成功获取到 socket 事件后,poll成功返回,此时可以通过接收函数读取 socket 缓冲区数据。

image-20250115234811074


3.3 IO复用epoll模型

epoll是Linux下高效的IO多路复用机制,相较于selectpollepoll在处理大量并发连接时具有更好的性能和扩展性。epoll的工作机制主要包括下面几个步骤:

3.3.1 epoll的基本使用方法
  1. 创建 epoll 实例

使用 epoll_createepoll_create1 系统调用创建一个 epoll 实例,该调用返回一个 epoll 文件描述符(epoll_fd)。这个文件描述符用于后续的事件注册和事件等待。

#include <sys/epoll.h>
int epoll_create1(int flags);
  • 参数:
    • flags: 可以是0或EPOLL_CLOEXEC。如果设置为EPOLL_CLOEXEC,则会在新创建的文件描述符上设置“执行时关闭”标志(close-on-exec),这意味着当调用 exec 系列函数执行新程序时,这个文件描述符会自动关闭,防止不必要的资源泄漏。
  • 返回值:成功时返回一个非负整数的文件描述符,失败时返回 -1 并设置相应的 errno 错误码。
  1. 注册感兴趣的事件
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll_ctlepoll API 的核心组成部分之一,它用于控制 epoll 实例中的文件描述符集合。通过这个函数,可以向 epoll 实例中添加、修改或删除感兴趣的文件描述符及其对应的事件类型。使用 epoll_ctl 系统调用向 epoll 实例中注册需要监控的文件描述符及其感兴趣的事件类型(如可读、可写等)

  • 参数
    • epfd: epoll_create1epoll_create 返回的 epoll 实例的文件描述符。
    • op: 操作类型,可以是以下三种之一:
      • EPOLL_CTL_ADD: 向 epoll 实例中添加新的文件描述符,并注册感兴趣的事件。
      • EPOLL_CTL_MOD: 修改已存在的文件描述符上的事件类型。
      • EPOLL_CTL_DEL: 从 epoll 实例中移除指定的文件描述符,不再监听其上的任何事件。
    • fd: 需要操作的目标文件描述符。
    • event: 如果 op 不是 EPOLL_CTL_DEL,则需要提供一个指向 epoll_event 结构体的指针,用于指定事件类型和用户数据。
  • 返回值:成功时返回 0;失败时返回 -1 并设置相应的 errno 错误码。

事件结构体 epoll_event

struct epoll_event {
    uint32_t events;    /* Epoll events */
    epoll_data_t data;  /* User data variable */
};

typedef union epoll_data {
    void *ptr;
    int fd;
    uint32_t u32;
    uint64_t u64;
} epoll_data_t;

在使用 epoll_ctl 之前,我们需要定义一个 epoll_event 结构体来描述要监控的事件和关联的数据。这个结构体有两个主要成员:

  • events:这是一个位掩码,指定了我们对文件描述符感兴趣的事件类型。常见的事件类型包括:
    • EPOLLIN:文件描述符可读(例如,socket接收缓冲区中有数据)。
    • EPOLLOUT:文件描述符可写(例如,socket发送缓冲区有空间)。
    • EPOLLET:边缘触发模式(Edge Triggered),意味着只有状态变化时才会触发事件。
    • EPOLLRDHUP:远程端关闭连接或半关闭连接(TCP-specific)。
  • data:这是一个联合体,通常用来存储与该文件描述符相关的用户数据。最常见的做法是使用 data.fd 来存储文件描述符本身,以便在事件发生时能够快速识别出哪个描述符触发了事件。

示例代码

struct epoll_event event;
event.events = EPOLLIN | EPOLLET; 	// 监听可读事件,采用边缘触发
event.data.fd = sock_fd;			// 将sock_fd绑定到event.data.fd
// 使用EPOLL_CTL_ADD操作将sock_fd加入epoll实例
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock_fd, &event) == -1) {
    perror("epoll_ctl: add");
    exit(EXIT_FAILURE);
}
  1. 等待事件的发生

使用 epoll_wait 系统调用阻塞等待注册的事件发生。当有事件就绪时,epoll_wait 返回就绪事件的数量,并提供epoll_event数组中填充准备好的事件信息。

#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
  • 参数
    • epfd: 指向 epoll 实例的文件描述符。
    • events: 一个指向 epoll_event 结构数组的指针,用于存储已就绪的事件。
    • maxevents: 表示 events 数组的最大大小,即一次最多能返回多少个事件。
    • timeout: 超时时间(毫秒)。如果设置为 -1,则无限期等待;如果设置为 0,则立即返回,即使没有事件发生;如果设置为正数,则表示最长等待的时间。
  • 返回值:成功时返回已就绪的文件描述符的数量;如果没有事件发生且超时到期,则返回 0;出错时返回 -1 并设置 errno
struct epoll_event events[MAX_EVENTS];
int n = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
if (n == -1) {
    perror("epoll_wait");
    exit(EXIT_FAILURE);
}
for (int i = 0; i < n; i++) {
    if (events[i].events & EPOLLIN) {
        // 处理可读事件
    }
    // 处理其他事件类型
}
  1. 处理事件

根据就绪事件的类型,执行相应的 I/O 操作,如读取数据、写入数据或关闭连接等。在边缘触发模式下,必须一次性将所有可读或可写的数据处理完毕,否则可能会错过后续的事件通知。

5.关闭 epoll 实例

当不再需要 epoll 实例时,使用 close 关闭 epoll 文件描述符,释放资源。

close(epoll_fd);
3.3.2 epoll原理
  1. 事件就绪的定义:底层的IO条件满足了,可以进行某种IO行为了,就叫做事件就绪。selectpollepoll的工作方式都是基于“等待”到“IO就绪”的事件通知机制。它们通过不同的数据结构和算法实现对文件描述符的高效管理和事件通知。

  2. 红黑树epoll内部使用红黑树来管理事件的注册和监控。红黑树作为一种自平衡的二叉搜索树,能够高效的执行增、删、改、查操作。红黑树的使用大大提高了epoll在大量文件描述符管理中的效率,尤其是在处理动态注册和插销事件时。

  3. 回调通知机制:当 epoll 监听的套接字上有数据可读或可写时,内核会通过回调机制通知用户进程。这种机制能够精确地通知程序哪些文件描述符上的事件发生了,而不需要每次都循环遍历检查数据是否到达以及数据该由哪个进程处理。这样,程序只需关注那些已经就绪的文件描述符,避免了不必要的轮询。

3.3.3 水平触发(LT)模式 vs 边缘触发(ET)模式

epoll提供了两种主要的事件触发模式:水平触发(LT) vs 边缘触发(ET)。它们决定了在文件描述符的状态发生变化时,内核如何通知应用程序。理解这两种触发的特点及其使用场景,对高效使用epoll至关重要。

  1. 工作原理
  • 水平触发(Level-triggered LT):当一个文件描述符准备好了,内核会一直报告这个文件描述符,直到应用程序处理这个事件。
  • 边缘触发(Edge-triggered ET):当一个文件描述符从无数据变为有数据时,内核只会报告一次,如果应用程序没有及时处理,那么需要等下一次数据变动时才会再次报告。
  1. 水平触发和边缘触发的比较
特性水平触发(LT)边缘触发(ET)
通知次数只要事件未被处理完,内核会持续通知。只会在文件描述符的状态发生变化时通知一次。
对阻塞 I/O 的要求不要求,I/O 可以是阻塞的。必须使用非阻塞 I/O,防止错过后续事件。
复杂度实现简单,事件处理较为直观。需要更复杂的事件处理逻辑,必须一次性处理所有数据。
资源消耗可能会造成不必要的重复通知,增加 CPU 占用。更加高效,减少了重复的事件通知。
适用场景普通的 I/O 密集型应用,如文件处理、轻量级的网络服务等。高并发、高性能的网络应用,如 Web 服务器、实时流处理等。
  1. 边缘触发(ET)模式的优化技巧

由于 ET 模式要求事件处理更为高效,一些优化策略可以确保高并发下的稳定运行:

  • 非阻塞 I/O
    必须将所有受 epoll 监控的文件描述符设置为非阻塞模式。否则,在数据未及时读取或写入的情况下,ET 模式可能错过后续事件通知,导致程序不响应新事件。

    int flags = fcntl(sock_fd, F_GETFL, 0);
    fcntl(sock_fd, F_SETFL, flags | O_NONBLOCK);
    
  • 一次性处理所有数据
    在 ET 模式下,应用程序必须确保在收到事件通知时一次性处理所有可读或可写的数据。如果不处理完所有数据,下一次数据到达时 epoll 可能不会再次通知,从而导致数据丢失。

    例如,对于一个可读事件的套接字,应该不断调用 read()recv() 来读取所有数据,直到 read() 返回 EAGAINEWOULDBLOCK,表示数据已读取完毕。

    ssize_t bytes_read;
    while ((bytes_read = read(sock_fd, buffer, sizeof(buffer))) > 0) {
        // 处理数据
    }
    if (bytes_read < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
        perror("read failed");
        // 错误处理
    }
    
  • 处理数据时避免阻塞
    ET 模式通常与非阻塞 I/O 一起使用,因此处理事件时要特别小心。确保每次操作不会阻塞整个进程,避免应用程序挂起。

  • 合理选择超时值
    如果你的应用程序不需要实时响应,也可以考虑使用超时等待(epoll_wait 的第四个参数设置为适当的超时值),这样可以避免应用程序因过于频繁的调用 epoll_wait 而浪费 CPU 时间。

代码示例:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#define MAX_EVENTS 10
#define PORT 8080
#define BUFFER_SIZE 1024

// 设置文件描述符为非阻塞
int set_nonblocking(int fd) {
    int flags = fcntl(fd, F_GETFL, 0);
    if (flags == -1) {
        perror("fcntl(F_GETFL)");
        return -1;
    }
    if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
        perror("fcntl(F_SETFL)");
        return -1;
    }
    return 0;
}

// 处理客户端数据的函数
void handle_client_data(int client_fd) {
    char buffer[BUFFER_SIZE];
    ssize_t bytes_read = read(client_fd, buffer, sizeof(buffer));
    if (bytes_read == -1) {
        if (errno != EAGAIN && errno != EWOULDBLOCK) {
            perror("read failed");
            close(client_fd);
        }
    } else if (bytes_read == 0) {
        // 客户端关闭连接
        printf("Client disconnected\n");
        close(client_fd);
    } else {
        // 处理读取的数据
        buffer[bytes_read] = '\0';
        printf("Received data: %s\n", buffer);

        // 发送数据到客户端
        ssize_t bytes_written = write(client_fd, buffer, bytes_read);
        if (bytes_written == -1) {
            perror("write failed");
            close(client_fd);
        }
    }
}

int main() {
    int server_fd, epoll_fd;
    struct sockaddr_in server_addr;
    struct epoll_event ev, events[MAX_EVENTS];

    // 创建监听套接字
    server_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (server_fd == -1) {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }

    // 设置服务器地址
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY;
    server_addr.sin_port = htons(PORT);

    // 绑定套接字
    if (bind(server_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) {
        perror("bind failed");
        close(server_fd);
        exit(EXIT_FAILURE);
    }

    // 监听连接
    if (listen(server_fd, 10) == -1) {
        perror("listen failed");
        close(server_fd);
        exit(EXIT_FAILURE);
    }

    // 创建 epoll 实例
    epoll_fd = epoll_create1(0);
    if (epoll_fd == -1) {
        perror("epoll_create1 failed");
        close(server_fd);
        exit(EXIT_FAILURE);
    }

    // 将服务器套接字加入 epoll 事件监听
    ev.events = EPOLLIN | EPOLLET;  // 监听可读事件,采用边缘触发模式
    ev.data.fd = server_fd;
    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev) == -1) {
        perror("epoll_ctl failed");
        close(server_fd);
        close(epoll_fd);
        exit(EXIT_FAILURE);
    }

    // 主事件循环
    while (1) {
        int num_events = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
        if (num_events == -1) {
            perror("epoll_wait failed");
            break;
        }

        // 处理就绪事件
        for (int i = 0; i < num_events; i++) {
            if (events[i].data.fd == server_fd) {
                // 服务器套接字有连接请求
                int client_fd = accept(server_fd, NULL, NULL);
                if (client_fd == -1) {
                    perror("accept failed");
                    continue;
                }

                // 设置客户端套接字为非阻塞
                if (set_nonblocking(client_fd) == -1) {
                    close(client_fd);
                    continue;
                }

                // 将客户端套接字添加到 epoll 中
                ev.events = EPOLLIN | EPOLLET;  // 监听客户端的可读事件,采用边缘触发模式
                ev.data.fd = client_fd;
                if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &ev) == -1) {
                    perror("epoll_ctl: add client failed");
                    close(client_fd);
                    continue;
                }

                printf("New client connected\n");

            } else if (events[i].events & EPOLLIN) {
                // 客户端有数据可读
                handle_client_data(events[i].data.fd);
            }
        }
    }

    // 清理资源
    close(server_fd);
    close(epoll_fd);
    return 0;
}
原理分析:

epoll 是一种高效的 I/O 事件通知机制,它通过以下方式提高效率:

  1. 红黑树管理文件描述符:使用 epoll_ctl 系统调用将感兴趣的 socket(或其他文件描述符)注册到 epoll 实例中。这些描述符被存储在一个内部的红黑树结构中,这使得添加、删除和查找操作都非常高效。
  2. 就绪队列记录活动事件:当某个 socket 上有数据到达时,内核会自动将该 socket 标记为就绪,并将其加入到一个就绪队列中。这里并没有使用回调函数,而是依赖于内核的通知机制。
  3. 事件通知而非轮询:与 selectpoll 不同,epoll 使用的是事件驱动的通知机制。这意味着只有当有实际事件发生时(如可读或可写),epoll_wait 才会返回并告知应用程序哪些 socket 已准备好进行 I/O 操作。这样避免了对所有文件描述符的重复检查,提高了效率。
  4. epoll_wait 获取事件:应用程序调用 epoll_wait 来等待事件的发生。一旦有事件发生,epoll_wait 就会返回一个包含所有就绪事件的列表,供应用程序处理。

image-20250123111604603



4.信号驱动式IO

信号驱动式IO是Linux提供的一种IO模型,通过信号通知应用程序某个文件描述符的状态发生变化,适用于需要非阻塞IO操作的场景。它利用信号机制将内核的事件通知传递给用户空间,使得程序无需主动伦旭即可对IO事件做出响应。

4.1 工作原理

  1. 信号和信号处理:
  • 在信号驱动I/O中,当某个I/O事件(如数据准备好读或写)发生时,内核会向进程发送一个信号。进程通过信号处理函数来处理该事件。
  • 该信号通常是 SIGIO(I/O信号)或 SIGURG(紧急数据的信号)等。
  1. 非阻塞模式:
  • 为了使信号驱动I/O工作,通常需要将相关的套接字设置为非阻塞模式。这样,I/O操作(如 recvsend)不会因为没有数据而阻塞。
  • 一旦数据准备好,操作系统会发送 SIGIO 信号,通知进程可以进行读取或写入。
  1. 信号处理函数:
  • 在信号触发时,操作系统会中断进程的正常执行流程,并转到预先注册的信号处理函数中执行。这使得信号驱动I/O成为一种异步机制。
  • 信号处理函数中通常会包含对 recvsend 等函数的调用,以便处理就绪的I/O数据。

4.2 为什么使用信号驱动IO

  1. **避免轮询:**传统的非阻塞I/O通常需要通过 selectpollepoll 等方式轮询文件描述符,检查是否有数据可以读取。信号驱动I/O消除了这种轮询机制,避免了CPU时间的浪费。
  2. **减少阻塞:**通过信号驱动I/O,应用程序不再需要阻塞等待I/O事件发生,而是通过信号触发事件,从而使得程序可以在等待I/O的同时执行其他任务。
  3. **资源高效:**信号驱动I/O能够实现资源的高效利用,因为它允许应用程序在I/O事件到达时立即处理,而不需要检查文件描述符状态。
  4. **适用于实时应用:**对于一些需要及时响应的实时应用,信号驱动I/O提供了一种快速响应数据的方式。

4.3 实现流程

信号驱动式I/O的核心思想是预先告知内核当某个描述符准备发生某件事情(如数据到达)时发送一个信号(SIGIO)给进程。这使得进程可以在等待数据的过程中不被阻塞,只有在接收到SIGIO信号后才去处理I/O事件。程序需要按照如下步骤执行:

  1. 为内核发送的通知信号安装一个信号处理例程。默认情况下,这个通知信号为SIGIO
  2. 设定文件符的属主,也就是当文件描述符山可执行IO时会接收通知信号的进程或进程组。通常我们让调用进程成为属主。设定属主可通过fcntl()F_SETOWN操作来完成
fcntl(fd, F_SETOWN , pid);
  1. 通过设定O_NONBLOCK标识使能非阻塞IO

  2. 通过打开O_ASYNC标志使能信号驱动IO。这可以和上一步合并为一个操作,因为它们都需要用到fcntl()F_SETFL操作

flags = fcntl(fd, F_GETFL);  // get current flags
fcntl(fd, F_SETFL, flags | O_ASYNC | O_NONBLOCK);
  1. 调用进程线程可以执行其他任务了。当IO操作就绪时,内核为进程发送一个信号,然后调用在第1步中安装好的信号处理例程
  2. 信号驱动IO提供的是边缘触发通知。这表示一旦进程被通知IO就绪,它就应该尽可能的能多地执行 I/O(例如尽可能多地读取字节)。假设文件描述符时非阻塞的,这表示需要在循环中执行IO系统调用直到失败位置,此时的错误码为EAGAIN(再来一次)或者EWOULDBLOCK(期望阻塞)。

示例代码及图解

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>

#define PORT 8080
#define MAXLINE 1024

static int sockfd;  // 监听套接字
static struct sockaddr_in cli_addr;
static socklen_t clilen = sizeof(cli_addr);

// 信号处理函数
void do_sometime(int signal) {
    char buffer[MAXLINE] = {0};
    int len = recvfrom(sockfd, buffer, MAXLINE, 0, (struct sockaddr *)&cli_addr, (socklen_t*)&clilen);
    if (len > 0) {
        printf("收到客户端消息: %s\n", buffer);
        strcat(buffer, "→[Msg]");
        sendto(sockfd, buffer, sizeof(buffer), 0, (struct sockaddr *)&cli_addr, clilen);  // 回显消息
    } else {
        printf("没有收到数据或出现错误\n");
    }
}

int main(int argc, char const *argv[]) {
    // 创建套接字
    if ((sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
        perror("socket creation failed");
        exit(EXIT_FAILURE);
    }
    
    // 注册信号处理函数
    struct sigaction act;
    act.sa_handler = do_sometime;
    sigemptyset(&act.sa_mask);
    act.sa_flags = SA_RESTART;  // 重新启动被信号中断的系统调用
    sigaction(SIGIO, &act, NULL);

    // 创建并初始化地址结构
    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(PORT);
    servaddr.sin_addr.s_addr = INADDR_ANY;
    
    // 设置文件描述符的拥有者为当前进程
    fcntl(sockfd, F_SETOWN, getpid());
    int flags = fcntl(sockfd, F_GETFL, 0);
    // 启用信号驱动模式 | 设置文件描述符为非阻塞模式
    fcntl(sockfd, F_SETFL, flags | O_ASYNC | O_NONBLOCK);
    
    // 绑定地址
    if (bind(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
        perror("bind failed");
        close(sockfd);
        exit(EXIT_FAILURE);
    }

    while (1)
        sleep(1);   // 等待信号

    close(sockfd);
    return 0;
}

image-20250123111713690

异步IO通知的工作流程

  1. 注册异步通知

在用户空间,应用程序通过 fcntl 系统调用来设置文件描述符的异步通知机制:

  • 设置 O_ASYNC 标志:使用 fcntl(fd, F_SETFL, flags | O_ASYNC) 来启用异步通知。
  • 设置接收 SIGIO 信号的进程 ID:使用 fcntl(fd, F_SETOWN, getpid()) 来指定哪个进程应该接收 SIGIO 信号,通知它可以进行IO操作了。
  1. 内核空间处理

在内核空间,驱动程序需要支持异步通知机制。具体步骤如下:

  • 管理 fasync_struct 队列:使用 fasync_helper 函数来管理一个链表(队列),该链表存储了所有注册了异步通知的进程信息。
  • 触发事件:当某个事件发生时(如数据到达),驱动程序调用 kill_fasync 函数来通知所有注册了异步通知的进程。
  1. 触发事件

当驱动程序检测到某个事件(如数据到达)时,它会调用 kill_fasync 函数来通知所有注册了异步通知的进程。kill_fasync 函数会遍历 fasync_struct 队列,并向每个进程发送 SIGIO 信号。

  1. 处理信号

用户空间的应用程序收到 SIGIO 信号后,可以在其信号处理函数中执行相应的操作。例如,处理接收到的数据或进行其他必要的操作。



5.异步IO

异步IO(Asynchronous IO,AIO)是一种处理输入/输出操作的方式,它允许程序在发起IO操作后立即返回,而不是等待操作完成。这种方式可以显著提高应用程序的并发性和吞吐量,特别是在IO密集型的应用场景中。下面将从多个角度深入探讨异步IO的概念,实现机制及其应用场景。

5.1 异步I/O的基本概念

5.1.1 同步 vs 异步
  • 同步I/O:当一个进程发起I/O请求时,它会被阻塞直到该请求完成。这意味着在此期间,进程不能执行其他任务。
  • 异步I/O:当一个进程发起I/O请求时,它可以立即继续执行其他任务,而不需要等待I/O操作完成。一旦I/O操作完成,系统会通过某种方式通知进程结果。
5.1.2 阻塞 vs 非阻塞
  • 阻塞I/O:如果文件描述符未准备好进行读写操作,调用将被挂起,直到操作准备好为止。
  • 非阻塞I/O:如果文件描述符未准备好进行读写操作,调用会立即返回一个错误码,允许进程尝试其他操作或稍后再试。

虽然“异步”和“非阻塞”听起来相似,但它们实际上是不同的概念。异步I/O指的是整个操作由内核而非用户进程来完成,并且在完成后通知用户进程;而非阻塞I/O则是指用户进程可以在没有数据可读/写时不被阻塞,但仍需主动轮询检查状态

5.1.3 异步IO的工作原理

异步 I/O 的实现通常依赖于以下几个关键组件:

  1. 事件通知机制:内核提供一种机制,能够在 I/O 操作完成时通知应用程序。常见的事件通知机制有:
    • 信号通知:如 SIGIO
    • 回调通知:如 io_uringAIO 中的回调机制。
    • 轮询机制:应用程序主动检查 I/O 状态,类似于 select()poll()
  2. 文件描述符和 I/O 操作:文件描述符是 I/O 操作的基础。内核会根据文件描述符的状态来决定 I/O 操作是否可以完成,并在完成时通知程序。
  3. 非阻塞模式:异步 I/O 需要文件描述符处于非阻塞模式。通过设置 O_NONBLOCK 或其他标志,程序可以立即返回,而不是等待 I/O 完成。
POSIX AIO 示例:

POSIX AIO 是 Linux 提供的一套接口,用于执行异步 I/O 操作。在异步 I/O 中,程序可以发起 I/O 操作(如 aio_readaio_write),然后继续执行其他任务,而不是等待 I/O 操作完成。当 I/O 操作完成时,程序可以通过轮询、信号或回调来获取结果。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <aio.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <errno.h>

#define PORT 8080
#define BUFFER_SIZE 1024

int main() {
    // 创建 TCP 套接字
    int server_fd = socket(AF_INET, SOCK_STREAM, 0);

    // 设置服务器地址结构
    struct sockaddr_in server_addr;
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY;
    server_addr.sin_port = htons(PORT);

    // 绑定地址
    bind(server_fd, (struct sockaddr *)&server_addr, sizeof(server_addr));

    // 监听连接
    listen(server_fd, 5);

    printf("Server listening on port %d...\n", PORT);

    // 接受客户端连接
    struct sockaddr_in client_addr;
    socklen_t client_len = sizeof(client_addr);
    int client_fd = accept(server_fd, (struct sockaddr *)&client_addr, &client_len);

    // 设置异步读操作
    char buffer[BUFFER_SIZE];
    struct aiocb aio_read_cb;
    memset(&aio_read_cb, 0, sizeof(struct aiocb));
    aio_read_cb.aio_fildes = client_fd;
    aio_read_cb.aio_buf = buffer;
    aio_read_cb.aio_nbytes = sizeof(buffer);
    aio_read_cb.aio_offset = 0;

    // 发起异步读操作
    aio_read(&aio_read_cb);

    // 等待读操作完成
    while (aio_error(&aio_read_cb) == EINPROGRESS) {
        // 可以执行其他操作
        usleep(10000);  // 等待10ms
    }

    // 读取完成,获取结果
    int bytes_read = aio_return(&aio_read_cb);

    printf("Received message: %s\n", buffer);

    // 发送数据到客户端
    send(client_fd, buffer, bytes_read, 0);

    close(client_fd);
    close(server_fd);
    return 0;
}

图:异步IO-POISX AIO


http://www.kler.cn/a/516580.html

相关文章:

  • Java学习教程,从入门到精通,JDBC创建数据库语法知识点及案例代码(99)
  • 小样本学习中的Prototypical Network(原型网络)详解
  • 当 Facebook 窥探隐私:用户的数字权利如何捍卫?
  • 【2024年华为OD机试】 (C卷,100分)- 用户调度问题(JavaScriptJava PythonC/C++)
  • 【橘子ES】Kibana的分析能力Analytics简易分析
  • 【2024年华为OD机试】(A卷,200分)- 优雅子数组 (JavaScriptJava PythonC/C++)
  • IPhone16 Plus 设备详情
  • 详解:TCP/IP五层(四层)协议模型
  • 23.日常算法
  • CVPR 2024 无人机/遥感/卫星图像方向总汇(航空图像和交叉视角定位)
  • pandas基础:文件的读取和写入
  • leetcode——矩阵置零(java)
  • 亚马逊新店铺流量怎么提升?自养号测评新趋势
  • rabbitmq单机与集群模式的部署
  • 刷题笔记 贪心算法-1 贪心算法理论基础
  • 拒绝 Github 投毒,通过 Sharp4SuoBrowser 分析 Visual Studio 隐藏文件
  • 前后分离Vue3+Django 之简单的登入
  • C++函数——fill
  • leetcode刷题记录(八十四)——739. 每日温度
  • 2.2.2 大小写敏感性
  • Facebook广告零支出无消耗:可能原因与解决方法
  • 鞅的定义_
  • Ubuntu二进制部署K8S 1.29.2
  • C语言数组详解:从基础到进阶的全面解析
  • bat批处理删除此电脑左侧及另存为下文档视屏等多余项
  • [java] java基础-字符串篇