当前位置: 首页 > article >正文

网络IO与IO多路复用

一、网络IO基础

  • 系统对象
    • 网络IO涉及用户空间调用IO的进程或线程以及内核空间的内核系统。
    • 例如,当进行read操作时,会经历两个阶段:
      1. 等待数据准备就绪。
      2. 将数据从内核拷贝到进程或线程中。
  • 多种网络IO模型的出现原因:由于上述两个阶段的不同情况,出现了多种网络IO模型。

二、阻塞IO(blocking IO)

  • 特点
    • 在Linux中,默认所有socket都是阻塞的。
    • 以读操作为例,当用户进程调用read系统调用,kernel开始IO的第一阶段(准备数据),对于网络IO,数据可能未到,kernel要等待,用户进程会被阻塞。
    • 直到kernel等到数据准备好,将数据从kernel拷贝到用户内存并返回结果,用户进程才解除阻塞状态。
    • 即阻塞IO在等待数据和拷贝数据两个阶段都阻塞进程。
    • 例如,使用listen()send()recv()等接口构建服务器/客户机模型,这些接口大多是阻塞型的。
    • 代码示例:
// 创建服务器端的socket
int serverSocket = socket(AF_INET, SOCK_STREAM, 0);
// 绑定地址
struct sockaddr_in serverAddr;
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(8080);
serverAddr.sin_addr.s_addr = INADDR_ANY;
bind(serverSocket, (struct sockaddr*)&serverAddr, sizeof(serverAddr));
// 监听连接
listen(serverSocket, 5);
// 接受连接
int clientSocket = accept(serverSocket, NULL, NULL);
// 接收数据,在此处进程会阻塞,直到接收到数据或出错
char buffer[1024];
recv(clientSocket, buffer, sizeof(buffer), 0);
// 发送数据,在此处进程也可能阻塞
send(clientSocket, buffer, strlen(buffer), 0);
- 解释:上述代码首先创建了一个服务器端的socket,然后绑定地址和端口,接着监听连接。当调用`accept`时,如果没有连接请求,会阻塞等待。一旦有连接,调用`recv`接收数据时,会阻塞直到有数据到达。发送数据时也可能阻塞,例如网络拥堵或接收方接收缓冲区满。
  • 多线程/多进程改进方案
    • 为解决单个连接阻塞影响其他连接的问题,可在服务器端使用多线程或多进程。
    • 多线程可使用pthread_create()创建,多进程使用fork()创建。
    • 多线程开销相对小,适合为较多客户机服务;进程更安全,适合单个服务执行体需要大量CPU资源的情况。
    • 例如,服务器端为多个客户机提供服务时,可在主线程等待连接请求,有连接时创建新线程或新进程提供服务。
    • 但当需要同时响应大量(成百上千)连接请求时,多线程或多进程会严重占用系统资源,导致系统效率下降和假死。

三、非阻塞IO(non-blocking IO)

  • 特点
    • 通过设置socket为非阻塞,如使用fcntl( fd, F_SETFL, O_NONBLOCK );
    • 当用户进程发出read操作,若kernel数据未准备好,不会阻塞用户进程,而是立即返回error
    • 用户进程可根据返回结果判断数据是否准备好,未准备好可再次发送read操作。
    • 当数据准备好,kernel会将数据拷贝到用户内存并返回。
    • 例如,recv()接口在非阻塞状态下调用后立即返回,返回值有不同含义:
      • recv()返回值大于 0,表示接受数据完毕,返回值是接收到的字节数。
      • recv()返回 0,表示连接正常断开。
      • recv()返回 -1,且errno等于EAGAIN,表示recv操作未完成。
      • recv()返回 - 1,且errno不等于EAGAIN,表示recv操作遇到系统错误errno
    • 代码示例:
// 创建服务器端的socket
int serverSocket = socket(AF_INET, SOCK_STREAM, 0);
// 设置为非阻塞
fcntl(serverSocket, F_SETFL, O_NONBLOCK);
// 绑定地址
struct sockaddr_in serverAddr;
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(8080);
serverAddr.sin_addr.s_addr = INADDR_ANY;
bind(serverSocket, (struct sockaddr*)&serverAddr, sizeof(serverAddr));
// 监听连接
listen(serverSocket, 5);
// 接受连接
int clientSocket = accept(serverSocket, NULL, NULL);
// 接收数据,此处不会阻塞,会根据返回值判断状态
char buffer[1024];
int recvResult;
while ((recvResult = recv(clientSocket, buffer, sizeof(buffer), 0)) == -1 && errno == EAGAIN) {
    // 数据未准备好,可进行其他操作或再次尝试接收
}
if (recvResult > 0) {
    // 处理接收到的数据
} else if (recvResult == 0) {
    // 连接断开
} else {
    // 其他错误
}
- 解释:在这个示例中,将服务器端socket设置为非阻塞后,调用`recv`时不会阻塞进程。如果`recv`返回-1且`errno`为`EAGAIN`,说明数据还未准备好,程序可继续执行其他操作或过段时间再尝试接收,而不是像阻塞IO那样一直等待。

四、多路复用IO(IO multiplexing)

  • 特点
    • 又称事件驱动IO(event driven IO),如select/epoll
    • 单个进程可同时处理多个网络连接的IO,基本原理是select/epoll函数不断轮询所负责的所有socket,当某个socket有数据到达,通知用户进程。
    • 流程:用户进程调用select会被阻塞,kernel监视select负责的socket,当有socket数据准备好,select返回,用户进程再调用read操作将数据从kernel拷贝到用户进程。
    • 虽然使用select需要两个系统调用(selectread),但优势是可在一个线程内处理多个socket的IO请求。
    • 代码示例:
#include <stdio.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>

int main() {
    int serverSocket = socket(AF_INET, SOCK_STREAM, 0);
    struct sockaddr_in serverAddr;
    serverAddr.sin_family = AF_INET;
    serverAddr.sin_port = htons(8080);
    serverAddr.sin_addr.s_addr = INADDR_ANY;
    bind(serverSocket, (struct sockaddr*)&serverAddr, sizeof(serverAddr));
    listen(serverSocket, 5);
    fd_set readfds;
    FD_ZERO(&readfds);
    FD_SET(serverSocket, &readfds);
    int maxFd = serverSocket;
    while (1) {
        fd_set tempFds = readfds;
        int activity = select(maxFd + 1, &tempFds, NULL, NULL, NULL);
        if (FD_ISSET(serverSocket, &tempFds)) {
            int clientSocket = accept(serverSocket, NULL, NULL);
            FD_SET(clientSocket, &readfds);
            if (clientSocket > maxFd) {
                maxFd = clientSocket;
            }
        }
        for (int i = 0; i <= maxFd; i++) {
            if (FD_ISSET(i, &tempFds) && i!= serverSocket) {
                char buffer[1024];
                int valread = recv(i, buffer, 1024, 0);
                if (valread == 0) {
                    close(i);
                    FD_CLR(i, &readfds);
                } else {
                    // 处理接收到的数据
                }
            }
        }
    }
    return 0;
}
- 解释:首先创建服务器socket,绑定并监听。`FD_ZERO`和`FD_SET`用于初始化和设置文件描述符集合。`select`函数会阻塞,等待集合中文件描述符的可读事件。当`serverSocket`有新连接时,添加新连接的socket到集合,并更新最大文件描述符。当其他socket可读时,接收数据并处理。
  • 接口原型
    • FD_ZERO(int fd, fd_set* fds):初始化fd_set
    • FD_SET(int fd, fd_set* fds):将句柄添加到fd_set
    • FD_ISSET(int fd, fd_set* fds):检查句柄是否在fd_set中。
    • FD_CLR(int fd, fd_set* fds):从fd_set中移除句柄。
    • int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout):用于探测多个文件句柄的状态变化,readfdswritefdsexceptfds作为输入和输出参数,可设置超时时间。
  • 问题
    • 当需要探测的句柄值较大时,select()接口本身需要大量时间轮询,很多操作系统提供更高效接口如epoll(Linux)、kqueue(BSD)、/dev/poll(Solaris)等。
    • 该模型将事件探测和事件响应夹杂,若事件响应执行体庞大,会降低事件探测的及时性。
    • 可使用事件驱动库如libeventlibev库解决上述问题。

五、异步IO(Asynchronous I/O)

  • 特点
    • Linux下的异步IO主要用于磁盘IO读写操作,从内核2.6版本开始引入。
    • 用户进程发起read操作后可做其他事,kernel收到asynchronous read后立刻返回,不会阻塞用户进程。
    • kernel等待数据准备完成,将数据拷贝到用户内存,完成后给用户进程发送信号通知。
    • 代码示例(使用aio_read):
#include <aio.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>

void aio_completion_handler(union sigval sigval) {
    struct aiocb *req = (struct aiocb *)sigval.sival_ptr;
    if (aio_error(req) == 0) {
        char *buffer = (char *)malloc(aio_return(req));
        // 处理读取到的数据
        free(buffer);
    }
    aio_destroy(req);
}

int main() {
    int fd = open("test.txt", O_RDONLY);
    if (fd < 0) {
        perror("open");
        return 1;
    }
    struct aiocb my_aiocb;
    memset(&my_aiocb, 0, sizeof(struct aiocb));
    my_aiocb.aio_fildes = fd;
    my_aiocb.aio_buf = malloc(1024);
    my_aiocb.aio_nbytes = 1024;
    my_aiocb.aio_offset = 0;
    my_aiocb.aio_sigevent.sigev_notify = SIGEV_THREAD;
    my_aiocb.aaiocb.sigev_notify_function = aio_completion_handler;
    my_aiocb.aio_sigevent.sigev_notify_attributes = NULL;
    my_aiocb.aio_sigevent.sigev_value.sival_ptr = &my_aiocb;
    if (aio_read(&my_aiocb) < 0) {
        perror("aio_read");
        close(fd);
        return 1;
    }
    // 进程可做其他事情
    sleep(1);
    return 0;
}
- 解释:上述代码使用`aio_read`进行异步读操作。首先打开文件,设置`aiocb`结构(包含文件描述符、缓冲区、字节数等),并设置信号通知方式和处理函数。调用`aio_read`后,进程可以继续做其他事情,当读操作完成,会调用`aio_completion_handler`处理结果。
  • 重要性:异步IO是真正非阻塞的,对高并发网络服务器实现至关重要。

六、信号驱动IO(signal driven I/O, SIGIO)

  • 特点
    • 允许套接口进行信号驱动I/O并安装信号处理函数,进程继续运行不阻塞。
    • 当数据准备好,进程收到SIGIO信号,可在信号处理函数中调用I/O操作函数处理数据。
    • 优势在于等待数据报到达期间,进程可继续执行,避免了select的阻塞与轮询。

七、服务器模型Reactor与Proactor

  • Reactor模型
    • 是一种事件驱动机制,用于同步I/O。
    • 应用程序将处理I/O事件的接口注册到Reactor上,若相应事件发生,Reactor调用注册的接口(回调函数)。
    • 三个重要组件:
      1. 多路复用器:如selectpollepoll等系统调用。
      2. 事件分发器:将多路复用器返回的就绪事件分到对应的处理函数。
      3. 事件处理器:负责处理特定事件的处理函数。
    • 具体流程:
      1. 注册读就绪事件和相应事件处理器。
      2. 事件分离器等待事件。
      3. 事件到来,激活分离器,分离器调用事件对应的处理器。
      4. 事件处理器完成实际的读操作,处理数据,注册新事件,返还控制权。
    • 优点:响应快,编程相对简单,可扩展性和可复用性好。
    • 缺点:当程序需要使用多核资源时会有局限,因为通常是单线程的。
    • 代码示例:
#include <iostream>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>

class EventHandler {
public:
    virtual void handleEvent(int fd) = 0;
};

class Reactor {
private:
    int epollFd;
    std::vector<EventHandler*> handlers;
public:
    Reactor() {
        epollFd = epoll_create1(0);
    }
    ~Reactor() {
        close(epollFd);
    }
    void registerHandler(int fd, EventHandler* handler) {
        struct epoll_event event;
        event.data.fd = fd;
        event.events = EPOLLIN;
        epoll_ctl(epollFd, EPOLL_CTL_ADD, fd, &event);
        handlers.push_back(handler);
    }
    void handleEvents() {
        struct epoll_event events[10];
        int numEvents = epoll_wait(epollFd, events, 10, -1);
        for (int i = 0; i < numEvents; i++) {
            int fd = events[i].data.fd;
            for (EventHandler* handler : handlers) {
                handler->handleEvent(fd);
            }
        }
    }
};

class EchoHandler : public EventHandler {
public:
    void handleEvent(int fd) override {
        char buffer[1024];
        int bytesRead = recv(fd, buffer, sizeof(buffer), 0);
        if (bytesRead > 0) {
            send(fd, buffer, bytesRead, 0);
        }
    }
};

int main() {
    Reactor reactor;
    int serverSocket = socket(AF_INET, SOCK_STREAM, 0);
    struct sockaddr_in serverAddr;
    serverAddr.sin_family = AF_INET;
    serverAddr.sin_port = htons(8080);
    serverAddr.sin_addr.s_addr = INADDR_ANY;
    bind(serverSocket, (struct sockaddr*)&serverAddr, sizeof(serverAddr));
    listen(serverSocket, 5);
    EchoHandler handler;
    reactor.registerHandler(serverSocket, &handler);
    while (1) {
        reactor.handleEvents();
    }
    return 0;
}
- 解释:上述C++代码中,`Reactor`类负责创建`epoll`,注册和处理事件。`EventHandler`是抽象基类,`EchoHandler`是具体处理接收和发送的派生类。`main`函数创建服务器socket,注册`EchoHandler`到`Reactor`,并不断调用`handleEvents`处理事件。
  • Proactor模型
    • 最大特点是使用异步I/O,所有I/O操作都交由系统的异步I/O接口执行,工作线程只负责业务逻辑。
    • 具体流程:
      1. 处理器发起异步操作并关注I/O完成事件。
      2. 事件分离器等待操作完成事件。
      3. 分离器等待时,内核并行执行实际I/O操作并将结果存入用户缓冲区,通知分离器读操作完成。
      4. I/O完成后,通过事件分离器呼唤处理器。
      5. 事件处理器处理用户缓冲区中的数据。
    • 增加了编程复杂度,但给工作线程带来更高效率,可利用系统态的读写优化。
    • 在Windows上常用IOCP支持高并发,Linux上因aio性能不佳,主要以Reactor模型为主。
    • 也可使用Reactor模拟Proactor,但在读写并行能力上会有区别。

八、同步I/O和异步I/O的区别总结

  • 阻塞与非阻塞IO的区别
    • 调用阻塞IO会阻塞进程直到操作完成,非阻塞IO在kernel准备数据时会立刻返回。
  • 同步与异步IO的区别
    • 同步IO在做“IO operation”(如read系统调用)时会阻塞进程。阻塞IO、非阻塞IO、多路复用IO都属于同步IO。
    • 非阻塞IO在数据准备好时的拷贝数据阶段会阻塞进程;而异步IO在整个过程中,进程不会被阻塞,进程发起IO操作后直接做其他事,直到kernel发送信号通知完成。

http://www.kler.cn/a/508205.html

相关文章:

  • SQL-杂记1
  • 2025.1.16——六、BabySQL 双写绕过|联合注入
  • java spring,uName,kValue,前端传值后端接不到
  • 【Gossip 协议】Golang的实现库Memberlist 库简介
  • 若依分页插件失效问题
  • JVM直击重点
  • 前端入门(html)
  • 计算机网络 (45)动态主机配置协议DHCP
  • 回归预测 | MATLAB实RVM相关向量机多输入单输出回归预测
  • 安装k8s前置操作(Ubuntu / CentOS)
  • DeepSeek-v3在训练和推理方面的优化
  • WPF的C1DataGrid根据当前行实时选的值控制另外一列行是否可编辑
  • 群发邮件适合外贸行业吗
  • CES 2025:XEO 展出双眼8K VR头显,售价2000美元
  • python爬虫入门(理论)
  • error: 您尚未结束您的合并(存在 MERGE_HEAD)。 提示:请在合并前先提交您的修改。 fatal: 因为存在未完成的合并而退出。
  • SpringBoot 3.2.4整合Nacos详细流程
  • Django中的QueryDict对象
  • Qiankun 微前端框架全面解析:架构、原理与最佳实践
  • ideal jdk报错如何解决
  • 鸿蒙UI(ArkUI-方舟UI框架)-开发布局
  • Web端实时播放RTSP视频流(监控)
  • oracle goldengate from mongodb to oracle的实时同步
  • Git 仓库 大文件管理
  • Kafka客户端-“远程主机强迫关闭了一个现有的连接”故障排查及解决
  • 闪豆多平台视频批量下载器