当前位置: 首页 > article >正文

Linux——IO模型_多路转接(epoll)

目录

 0.往期文章

1.epoll的三个接口

1.epoll_create

2.epoll_ctl

结构体 epoll_event

3.epoll_wait

2. epoll的工作原理,和接口对应

1.理解数据到达主机

2.epoll的工作原理

 3.基于epoll的TCP服务器(代码)

 辅助库

基于TCP的Socket封装

服务器代码

测试

 4.epoll的工作模式

边缘触发(Edge Triggered, ET)模式

水平触发(Level Triggered, LT)模式

理解 ET 模式和非阻塞文件描述符


 0.往期文章

Linux--应用层协议HTTP协议(http服务器构建)-CSDN博客

Linux--传输层协议UDP-CSDN博客

Linux--传输层协议TCP-CSDN博客

1.epoll的三个接口

定位:只负责进行等,不进行拷贝。
作用:epoll系统调用是用来让我们的程序监视多个文件描述符的状态变化的;程序会停在epoll这里等待, 直到被监视的文件描述符有一个或多个发生了状态改变。

1.epoll_create

  • 参数
    • size:这是内核用来内部优化 epoll 实例的提示值,表示你预期将要添加到 epoll 实例中的文件描述符的最大数量。然而,这个参数在 Linux 2.6.8 及以后的版本中实际上被忽略了,因为内核能够动态地调整大小。
  • 返回值
    • 成功时,返回一个非负整数,即新创建的 epoll 实例的文件描述符。
    • 出错时,返回 -1,并设置 errno 以指示错误原因。

2.epoll_ctl

参数:

  • epfd:这是由 epoll_create 或 epoll_create1 返回的 epoll 实例的文件描述符。它指定了要操作的 epoll 实例。
  • op:这是一个操作码,指定了要对目标文件描述符 fd 执行的操作类型。有效的操作码包括:
    • EPOLL_CTL_ADD:向 epoll 实例注册一个新的文件描述符,以便监视其上的事件。
    • EPOLL_CTL_MOD:修改已经注册到 epoll 实例中的文件描述符的事件类型或用户数据。
    • EPOLL_CTL_DEL:从 epoll 实例中删除一个文件描述符,停止对其上事件的监视。
  • fd:这是要操作的目标文件描述符,即要注册、修改或删除的文件描述符。
  • event:这是一个指向 struct epoll_event 结构体的指针,它包含了要注册或修改的事件信息。如果操作是 EPOLL_CTL_DEL,则此参数可以为 NULL。

返回值:

  • 成功时,epoll_ctl 返回 0。
  • 出错时,返回 -1,并设置 errno 以指示错误原因。
结构体 epoll_event
typedef union epoll_data {  
    void    *ptr;  
    int      fd;  
    uint32_t u32;  
    uint64_t u64;  
} epoll_data_t;  
  
struct epoll_event {  
    uint32_t    events;   /* Epoll 事件类型 */  
    epoll_data_t data;    /* 用户数据 */  
};
  • events:这是一个位掩码,用于指定要监视的事件类型。常见的事件类型包括 EPOLLIN(可读事件)、EPOLLOUT(可写事件)、EPOLLERR(错误事件)等。此外,epoll 还支持 EPOLLET(边缘触发模式)和 EPOLLONESHOT(只触发一次,就移除该fd)等特殊事件类型。
  • data:这是一个联合体,包含了用户数据。它可以是一个指针、文件描述符、32位或64位无符号整数。这个数据在事件触发时会原样返回给用户,以便用户识别是哪个文件描述符触发了事件。

3.epoll_wait

参数:

  • epfd:这是由 epoll_create 或 epoll_create1 返回的 epoll 实例的文件描述符。它指定了要等待事件的 epoll 实例。
  • events:这是一个指向 struct epoll_event 结构体数组的指针,用于接收准备就绪的事件。在调用 epoll_wait 后,该数组会被填充为准备就绪的文件描述符和它们关联的事件(如读就绪、写就绪等)。
  • maxevents:这个参数指定了 events 数组可以容纳的最大事件数。epoll_wait 最多会返回这个数目的准备就绪事件。如果少于这个数目的事件准备就绪,那么实际返回的事件数会少于 maxevents
  • timeout:这个参数指定了 epoll_wait 在没有事件准备就绪时应等待的最长时间(以毫秒为单位)。如果设置为 -1,epoll_wait 将无限期地等待,直到至少有一个事件准备就绪。如果设置为 0,epoll_wait 将立即返回,不等待任何事件发生。如果设置为一个正整数 N,epoll_wait 将等待最多 N 毫秒。

返回值:

  • 当成功时,epoll_wait 返回准备就绪的文件描述符数量。如果返回值为 0,表示在指定的超时时间内没有事件发生。
  • 当出错时,返回 -1,并设置全局变量 errno 以指示错误类型。

注意事项:

  • epoll_wait 是阻塞调用,这意味着在指定的超时时间内如果没有任何事件准备就绪,调用线程将被阻塞。如果超时时间到达并且没有事件准备就绪,epoll_wait 将返回 0。
  • epoll_wait 使用的 epoll_event 结构体包含了与事件相关的文件描述符和数据。在调用 epoll_wait 后,用户可以通过遍历 events 数组来处理所有准备就绪的事件。
  • epoll_wait 是 Linux 下进行高性能网络编程和并发编程的重要工具之一,它能够显著提高处理大量并发连接时的效率和可扩展性。

2. epoll的工作原理,和接口对应

1.理解数据到达主机

        数据到达主机的过程是一个复杂的多层封装与解封装过程,涉及到了网络协议栈的各个层次以及网络设备的协同工作。这个过程确保了数据能够准确、可靠地从源主机传输到目的主机。

        但是数据到主机,一定先经过网卡的,由网卡将数据交给网络协议栈,OS又如何知道网卡中有数据呢?

        答案是,中断机制

        当网卡接收到数据时,它会通过中断的方式通知CPU。中断是CPU与硬件设备之间的一种通信方式,用于在硬件事件发生时请求CPU的注意。当网卡接收到一个数据包时,它会触发一个中断信号,该信号被发送到CPU的中断控制器。CPU在接收到中断信号后,会暂停当前正在执行的程序,转而执行一个中断服务例程(ISR),该例程负责处理网卡接收到的数据。

2.epoll的工作原理

        当OS创建epoll模型,首先要在底层构建一颗红黑树。每个红黑树的结点一定要包括以下几个字段:int fd; unit32_t events; struct rb_node*left ,*right。

        该红黑树用来标识用户让内核关心的fd及其对应的事件。该红黑树由epoll_ctl进行增加,删除,修改操作。 fd就是key值。除了维护红黑树,还要维护一个就绪队列,其中每个结点包括:int fd; unit32_t revents; struct node*next ,*prev。

        一旦网卡中有数据了,网卡同个中断交给OS,接着网络协议栈就拿到数据了,所以在输入和输出缓冲区里有没有数据,OS是很清楚的,OS在缓冲区中设置回调方法,该回调方法就是epoll_ctl构建的。底层一旦有数据就绪并且是用户关心的,此时OS就会调用回调方法构建就绪队列的结点,填充清楚,是哪一个fd的什么事件已经就绪了,并连入就绪队列。

        上层要知道哪些数据就绪了,就可以直接调用epoll_wait,他会将相关的结点通过events字段返回出去。所以epoll检测有没有就绪事件, 这个过程的事件复杂度就是O(1),因为epoll_wait只需要检测就绪队列是否为空。获取就绪事件只能是O(N),因为只能将结点一个一个拷贝到events字段,在这个过程中,epoll_wait会将就绪事件,依次言给按照顺序放入到我们定义的用户缓冲区数组中。

        那么使用epoll_create就能创建一个epoll模型,只要有需要,在OS中是可以同时存在多个epoll模型的,那么OS就要管理存在的epoll模型:

        当某一进程调用 epoll_create 方法时, Linux 内核会创建一个 eventpoll 结构体, 这个结构体中有两个成员(红黑树和就绪队列)与 epoll 的使用方式密切相关.

  • 每一个 epoll 对象都有一个独立的 eventpoll 结构体, 用于存放通过 epoll_ctl 方法向 epoll 对象中添加进来的事件.
  • 这些事件都会挂载在红黑树中, 如此, 重复添加的事件就可以通过红黑树而高效的识别出来(红黑树的插入时间效率是 lgn, 其中 n 为树的高度).
  • 而所有添加到 epoll 中的事件都会与设备(网卡)驱动程序建立回调关系, 也就是说, 当响应的事件发生时会调用这个回调方法.
  • • 这个回调方法在内核中叫 ep_poll_callback,它会将发生的事件添加到 rdlist 双链表中.
  • 在 epoll 中, 对于每一个事件, 都会建立一个 epitem 结构体
     
struct epitem
{
    struct rb_node rbn;       // 红黑树节点
    struct list_head rdllink; // 双向链表节点
    struct epoll_filefd ffd;  // 事件句柄信息
    struct eventpoll *ep;     // 指向其所属的 eventpoll 对象
    struct epoll_event event; // 期待发生的事件类型
}
  • 当调用 epoll_wait 检查是否有事件发生时, 只需要检查 eventpoll 对象中的rdlist 双链表中是否有 epitem 元素即可.
  • 如果 rdlist 不为空, 则把发生的事件复制到用户态, 同时将事件数量返回给用户. 这个操作的时间复杂度是 O(1)

        当使用create创建一个epoll模型时,会返回一个fd,为什么呢?

        fd作为用户空间和内核空间之间的桥梁,允许你通过标准的文件描述符操作(如readwriteclose等,尽管对于epoll来说,主要使用的是epoll_ctl来添加、修改或删除监控的文件描述符,以及epoll_wait来等待事件)来与内核中的eventpoll结构体进行交互。

总结一下, epoll 的使用过程就是三部曲:

  • 调用 epoll_create 创建一个 epoll 句柄;
  • 调用 epoll_ctl, 将要监控的文件描述符进行注册;
  • 调用 epoll_wait, 等待文件描述符就绪;
     

 3.基于epoll的TCP服务器(代码)

 辅助库

用于封装和处理 IP 地址及其端口号:InetAddr.hpp

#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>

class InetAddr
{
private:
    void ToHost(const struct sockaddr_in &addr)
    {
        _port = ntohs(addr.sin_port);
        // _ip = inet_ntoa(addr.sin_addr);
        char ip_buf[32];
        // inet_p to n
        // p: process
        // n: net
        // inet_pton(int af, const char *src, void *dst);
        // inet_pton(AF_INET, ip.c_str(), &addr.sin_addr.s_addr);
        ::inet_ntop(AF_INET, &addr.sin_addr, ip_buf, sizeof(ip_buf));
        _ip = ip_buf;
    }

public:
    InetAddr(const struct sockaddr_in &addr):_addr(addr)
    {
        ToHost(addr);
    }
    InetAddr()
    {}
    bool operator == (const InetAddr &addr)
    {
        return (this->_ip == addr._ip && this->_port == addr._port);
    }
    std::string Ip()
    {
        return _ip;
    }
    uint16_t Port()
    {
        return _port;
    }
    struct sockaddr_in Addr()
    {
        return _addr;
    }
    std::string AddrStr()
    {
        return _ip + ":" + std::to_string(_port);
    }
    ~InetAddr()
    {
    }

private:
    std::string _ip;
    uint16_t _port;
    struct sockaddr_in _addr;
};

日志库:Log.hpp

#include <iostream>
#include <sys/types.h>
#include <unistd.h>
#include <ctime>
#include <cstdarg>
#include <fstream>
#include <cstring>
#include <pthread.h>
#include "LockGuard.hpp"

namespace log_ns
{

    enum
    {
        DEBUG = 1,
        INFO,
        WARNING,
        ERROR,
        FATAL
    };

    std::string LevelToString(int level)
    {
        switch (level)
        {
        case DEBUG:
            return "DEBUG";
        case INFO:
            return "INFO";
        case WARNING:
            return "WARNING";
        case ERROR:
            return "ERROR";
        case FATAL:
            return "FATAL";
        default:
            return "UNKNOWN";
        }
    }

    std::string GetCurrTime()
    {
        time_t now = time(nullptr);
        struct tm *curr_time = localtime(&now);
        char buffer[128];
        snprintf(buffer, sizeof(buffer), "%d-%02d-%02d %02d:%02d:%02d",
                 curr_time->tm_year + 1900,
                 curr_time->tm_mon + 1,
                 curr_time->tm_mday,
                 curr_time->tm_hour,
                 curr_time->tm_min,
                 curr_time->tm_sec);
        return buffer;
    }

    class logmessage
    {
    public:
        std::string _level;
        pid_t _id;
        std::string _filename;
        int _filenumber;
        std::string _curr_time;
        std::string _message_info;
    };

#define SCREEN_TYPE 1
#define FILE_TYPE 2

    const std::string glogfile = "./log.txt";
    pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER;

    // log.logMessage("", 12, INFO, "this is a %d message ,%f, %s hellwrodl", x, , , );
    class Log
    {
    public:
        Log(const std::string &logfile = glogfile) : _logfile(logfile), _type(SCREEN_TYPE)
        {
        }
        void Enable(int type)
        {
            _type = type;
        }
        void FlushLogToScreen(const logmessage &lg)
        {
            printf("[%s][%d][%s][%d][%s] %s",
                   lg._level.c_str(),
                   lg._id,
                   lg._filename.c_str(),
                   lg._filenumber,
                   lg._curr_time.c_str(),
                   lg._message_info.c_str());
        }
        void FlushLogToFile(const logmessage &lg)
        {
            std::ofstream out(_logfile, std::ios::app);
            if (!out.is_open())
                return;
            char logtxt[2048];
            snprintf(logtxt, sizeof(logtxt), "[%s][%d][%s][%d][%s] %s",
                     lg._level.c_str(),
                     lg._id,
                     lg._filename.c_str(),
                     lg._filenumber,
                     lg._curr_time.c_str(),
                     lg._message_info.c_str());
            out.write(logtxt, strlen(logtxt));
            out.close();
        }
        void FlushLog(const logmessage &lg)
        {
            // 加过滤逻辑 --- TODO

            LockGuard lockguard(&glock);
            switch (_type)
            {
            case SCREEN_TYPE:
                FlushLogToScreen(lg);
                break;
            case FILE_TYPE:
                FlushLogToFile(lg);
                break;
            }
        }
        void logMessage(std::string filename, int filenumber, int level, const char *format, ...)
        {
            logmessage lg;

            lg._level = LevelToString(level);
            lg._id = getpid();
            lg._filename = filename;
            lg._filenumber = filenumber;
            lg._curr_time = GetCurrTime();

            va_list ap;
            va_start(ap, format);
            char log_info[1024];
            vsnprintf(log_info, sizeof(log_info), format, ap);
            va_end(ap);
            lg._message_info = log_info;

            // 打印出来日志
            FlushLog(lg);
        }
        ~Log()
        {
        }

    private:
        int _type;
        std::string _logfile;
    };

    Log lg;

#define LOG(Level, Format, ...)                                        \
    do                                                                 \
    {                                                                  \
        lg.logMessage(__FILE__, __LINE__, Level, Format, ##__VA_ARGS__); \
    } while (0)
#define EnableScreen()          \
    do                          \
    {                           \
        lg.Enable(SCREEN_TYPE); \
    } while (0)
#define EnableFILE()          \
    do                        \
    {                         \
        lg.Enable(FILE_TYPE); \
    } while (0)
};

给日志库上锁,保证线程安全:LockGuard.hpp

#include <pthread.h>

class LockGuard
{
public:
    LockGuard(pthread_mutex_t *mutex):_mutex(mutex)
    {
        pthread_mutex_lock(_mutex);
    }
    ~LockGuard()
    {
        pthread_mutex_unlock(_mutex);
    }
private:
    pthread_mutex_t *_mutex;
};

基于TCP的Socket封装

使得Socket的使用更加面向对象。 

#include <iostream>
#include <cstring>
#include <functional>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include <pthread.h>
#include <memory>

#include "Log.hpp"
#include "InetAddr.hpp"
//以下是对socket的封装,方便面向对象式的使用socket
namespace socket_ns
{
    using namespace log_ns;
    class Socket;
    using SockSPtr = std::shared_ptr<Socket>;//Socket是虚基类,实际上是拿TcpSocket
                                             //定义的对象
    enum//创建失败的常量
    {
        SOCKET_ERROR = 1,
        BIND_ERROR,
        LISTEN_ERR
    };
    const static int gblcklog = 8;//监听队列默认大小。
    // 模版方法模式
    class Socket
    {
    public:
        virtual void CreateSocketOrDie() = 0;
        virtual void CreateBindOrDie(uint16_t port) = 0;
        virtual void CreateListenOrDie(int backlog = gblcklog) = 0;
        virtual SockSPtr Accepter(InetAddr *cliaddr) = 0;
        
        virtual bool Conntecor(const std::string &peerip, uint16_t peerport) = 0;
        virtual int Sockfd() = 0;
        virtual void Close() = 0;

        virtual ssize_t Recv(std::string *out) = 0;//进行读取
        virtual ssize_t Send(const std::string &in) = 0;//进行发送

    public:
        void BuildListenSocket(uint16_t port)//创建监听套接字
        {
            CreateSocketOrDie();
            CreateBindOrDie(port);
            CreateListenOrDie();

        }
        //创建客户端套接字
        bool BuildClientSocket(const std::string &peerip, uint16_t peerport)
        {
            CreateSocketOrDie();
            return Conntecor(peerip, peerport);
        }
        // void BuildUdpSocket()
        // {}
    };

    class TcpSocket : public Socket
    {
    public:
        TcpSocket()
        {
        }
        //监听套接字初始化/构造函数式的初始化
        TcpSocket(int sockfd) : _sockfd(sockfd)
        {
        }
        ~TcpSocket()
        {
        }
        void CreateSocketOrDie() override
        {
            // 1. 创建socket
            _sockfd = ::socket(AF_INET, SOCK_STREAM, 0);
            if (_sockfd < 0)
            {
                LOG(FATAL, "socket create error\n");
                exit(SOCKET_ERROR);
            }
            LOG(INFO, "socket create success, sockfd: %d\n", _sockfd); // 3
        }
        void CreateBindOrDie(uint16_t port) override//bind
        {
            
            struct sockaddr_in local;
            memset(&local, 0, sizeof(local));
            local.sin_family = AF_INET;
            local.sin_port = htons(port);
            local.sin_addr.s_addr = INADDR_ANY;

            // 2. bind sockfd 和 Socket addr
            if (::bind(_sockfd, (struct sockaddr *)&local, sizeof(local)) < 0)
            {
                LOG(FATAL, "bind error\n");
                exit(BIND_ERROR);
            }
            LOG(INFO, "bind success, sockfd: %d\n", _sockfd); // 3
        }
        //监听
        void CreateListenOrDie(int backlog) override
        {
            // 3. 因为tcp是面向连接的,tcp需要未来不断地能够做到获取连接
            if (::listen(_sockfd, gblcklog) < 0)
            {
                LOG(FATAL, "listen error\n");
                exit(LISTEN_ERR);
            }
            LOG(INFO, "listen success\n");
        }
        //方便获取客户端地址,accept获取一个新的文件描述符
        //而该文件描述符本质就是ip+端口号
        //之前我们使用文件描述符都是面向过程,都是作为函数参数进行传递的
        //我们需要面向对象的使用套接字,我们将得到的IO文件描述符设置进套接字里面
        //返回该套接字
        //using SockSPtr = std::shared_ptr<Socket>;//Socket是虚基类,实际上是拿TcpSocket
                                                   //定义的对象
        SockSPtr Accepter(InetAddr *cliaddr) override
        {
            struct sockaddr_in client;
            socklen_t len = sizeof(client);
            // 4. 获取新连接:得到一个新的文件描述符,得到新的客户端
            int sockfd = ::accept(_sockfd, (struct sockaddr *)&client, &len);
            if (sockfd < 0)
            {
                LOG(WARNING, "accept error\n");
                return nullptr;
            }
            *cliaddr = InetAddr(client);
            LOG(INFO, "get a new link, client info : %s, sockfd is : %d\n", cliaddr->AddrStr().c_str(), sockfd);
            return std::make_shared<TcpSocket>(sockfd); // C++14
        }
        //连接目标服务器(是否成功)
        //客户端ip和端口号
        bool Conntecor(const std::string &peerip, uint16_t peerport) override
        {
            struct sockaddr_in server;
            memset(&server, 0, sizeof(server));
            server.sin_family = AF_INET;
            server.sin_port = htons(peerport);
            //将IPv4地址的字符串形式转换为网络字节顺序的二进制形式,
            //并将其存储在server.sin_addr中
            ::inet_pton(AF_INET, peerip.c_str(), &server.sin_addr);
            
            int n = ::connect(_sockfd, (struct sockaddr *)&server, sizeof(server));
            if (n < 0)
            { 
                return false;
            }
            return true;
        }
        int Sockfd()//文件描述符
        {
            return _sockfd;
        }
        void Close()
        {
            if (_sockfd > 0)
            {
                ::close(_sockfd);
            }
        }
        ssize_t Recv(std::string *out) override//读到的消息
        {
            char inbuffer[4096];
            //从sockfd中读
            ssize_t n = ::recv(_sockfd, inbuffer, sizeof(inbuffer) - 1, 0);
            if (n > 0)
            {
                inbuffer[n] = 0;
                //这里不能是=,不可以覆盖式的读取,因为每一次可能并不是读取到一条完整的报文
                // "len"\r\n
                // "len"\r\n"{json}"\r\n
                //向上面的情况如果覆盖的读取将读取不到完整的报文了
                //所以要用+=
                *out += inbuffer;
            }
            return n;
        }
        ssize_t Send(const std::string &in) override
        {
            return ::send(_sockfd, in.c_str(), in.size(), 0);
        }

    private:
        int _sockfd; // 可以是listensock,普通socketfd
    };
    // class UdpSocket : public Socket
    // {};
} // namespace socket_n

代码逻辑:

  1. 命名空间和类定义
    • 定义了一个命名空间socket_ns,用于封装Socket相关的类和函数。
    • 定义了一个基类Socket,它是一个抽象类,提供了Socket操作的基本接口,如创建、绑定、监听、接收连接、发送和接收数据等。
    • 定义了一个派生类TcpSocket,它继承自Socket类,并实现了所有虚函数,提供了TCP Socket的具体实现。
  2. Socket基类
    • 定义了多个纯虚函数,包括创建Socket、绑定、监听、接受连接、连接服务器、获取文件描述符、关闭Socket、接收和发送数据等。
    • 提供了一个构建监听Socket的成员函数BuildListenSocket,它依次调用创建Socket、绑定和监听函数来初始化监听Socket。
    • 提供了一个构建客户端Socket的成员函数BuildClientSocket,它调用创建Socket和连接服务器函数来初始化客户端Socket。
  3. TcpSocket类
    • 实现了Socket类中的所有纯虚函数,提供了TCP Socket的具体实现。
    • 在构造函数中,可以初始化一个已存在的文件描述符,或者通过调用CreateSocketOrDie函数创建一个新的Socket文件描述符。
    • CreateSocketOrDie函数用于创建一个新的Socket文件描述符。
    • CreateBindOrDie函数用于将Socket绑定到一个指定的端口上。
    • CreateListenOrDie函数用于将Socket设置为监听模式,以便接受连接。
    • Accepter函数用于接受一个新的连接,并返回一个表示该连接的TcpSocket对象。
    • Conntecor函数用于连接到一个指定的服务器。
    • Sockfd函数用于获取Socket的文件描述符。
    • Close函数用于关闭Socket。
    • Recv函数用于从Socket接收数据。
    • Send函数用于向Socket发送数据。
  4. 日志和错误处理
    • 使用了自定义的日志系统(log_ns命名空间中的LOG宏)来记录日志和错误信息。
    • 在发生错误时,使用exit函数终止程序,并传递一个错误码。
  5. 内存管理
    • 使用了智能指针(std::shared_ptr)来管理TcpSocket对象的内存,以避免内存泄漏。

服务器代码

#pragma once

#include <iostream>
#include <string>
#include <memory>
#include <sys/epoll.h>
#include "Log.hpp"
#include "Socket.hpp"

using namespace socket_ns;

class EpollServer
{
    const static int size = 128; //epoll fd size
    const static int num = 128;

public:
    EpollServer(uint16_t port) : _port(port), _listensock(std::make_unique<TcpSocket>())
    {
        _listensock->BuildListenSocket(port);
        _epfd = ::epoll_create(size);
        //创建成功了吗
        if (_epfd < 0)
        {
            LOG(FATAL, "epoll_create error!\n");
            exit(1);
        }
        LOG(INFO, "epoll create success, epfd: %d\n", _epfd);
    }
    void InitServer()
    {
        // 新链接到来,我们认为是读事件就绪
        struct epoll_event ev;
        ev.events = EPOLLIN;//读事件就绪
        // ev.events = EPOLLIN | EPOLLET;
        ev.data.fd = _listensock->Sockfd(); // 为了在事件就绪的时候,得到是那一个fd就绪了
        // 必须先把listensock 添加到epoll中,这样epoll才知道你是否就绪了
        //EPOLL_CTL_ADD 创建一个新节点
        int n = ::epoll_ctl(_epfd, EPOLL_CTL_ADD, _listensock->Sockfd(), &ev);
        if (n < 0)//添加失败
        {
            LOG(FATAL, "epoll_ctl error!\n");
            exit(2);
        }
        LOG(INFO, "epoll_ctl success, add new sockfd : %d\n", _listensock->Sockfd());
    }
    std::string EventsToString(uint32_t events)
    {
        std::string eventstr; 
        if (events & EPOLLIN)
            eventstr = "EPOLLIN";
        if (events & EPOLLOUT)
            eventstr += "|EPOLLOUT";
        return eventstr;
    }
    void Accepter()
    {
        InetAddr addr;
        int sockfd = _listensock->Accepter(&addr); // 肯定不会被阻塞,因为epoll知道就绪了,直接进行连接
        if (sockfd < 0)
        {
            LOG(ERROR, "获取连接失败\n");
            return;
        }
        LOG(INFO, "得到一个新的连接: %d, 客户端信息: %s:%d\n", sockfd, addr.Ip().c_str(), addr.Port());
        // 得到了一个新的sockfd,我们能不能要进行read、recv?不能.
        // 等底层有数据(读事件就绪), read/recv才不会被阻塞
        // 底层有数据 谁最清楚呢?epoll!
        // 将新的sockfd添加到epoll中!怎么做呢?
        struct epoll_event ev;
        ev.data.fd = sockfd;
        ev.events = EPOLLIN;
        ::epoll_ctl(_epfd, EPOLL_CTL_ADD, sockfd, &ev);
        LOG(INFO, "epoll_ctl success, add new sockfd : %d\n", sockfd);
    }
    void HandlerIO(int fd)
    {
        char buffer[4096];
        // 你怎么保证buffer就是一个完整的请求?或者有多个请求??
        // 一个fd,都要有一个自己的缓冲区
        // 引入协议
        int n = ::recv(fd, buffer, sizeof(buffer) - 1, 0); // 会阻塞吗?不会
        //因为已经就绪了才recv  
        if (n > 0)
        {
            buffer[n] = 0;
            std::cout << buffer;
            std::string response = "HTTP/1.0 200 OK\r\n";
            std::string content = "<html><body><h1>hello world</h1></body></html>";
            response += "Content-Type: text/html\r\n";
            response += "Content-Length: " + std::to_string(content.size()) + "\r\n";
            response += "\r\n";
            response += content;

            ::send(fd, response.c_str(), response.size(), 0);
        }
        else if (n == 0)
        {
            LOG(INFO, "client quit, close fd: %d\n", fd);
            // 1. 从epoll中移除, 从epoll中移除fd,这个fd必须是健康&&合法的fd. 否则会移除出错
            ::epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);
            // 2. 关闭fd
            ::close(fd);
        }
        else
        {
            LOG(ERROR, "recv error, close fd: %d\n", fd);
            // 1. EPOLL_CTL_DEL,从epoll中移除, 从epoll中移除fd,这个fd必须是健康&&合法的fd. 否则会移除出错
            ::epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);
            // 2. 关闭fd
            ::close(fd);
        }
    }
    void HandlerEvent(int n)//处理就绪的n个事件
    {
        for (int i = 0; i < n; i++)
        {
            int fd = revs[i].data.fd;//哪个fd?
            uint32_t revents = revs[i].events;//什么事件?
            LOG(INFO, "%d 上面有事件就绪了,具体事件是: %s\n", fd, EventsToString(revents).c_str());
            if (revents & EPOLLIN)
            {
                // listensock 读事件就绪, 新连接到来了
                if (fd == _listensock->Sockfd())
                    Accepter();
                else
                    HandlerIO(fd);
            }
        }
    }
    void Loop()
    {
        int timeout = -1;
        while (true)
        {
            //这里只有epoll知道listensocket是否就绪,不让accept在这一直阻塞
            // 事件通知,事件派发
            int n = ::epoll_wait(_epfd, revs, num, timeout);//返回准备就绪的文件描述符数量
            switch (n)
            {
            case 0:
                LOG(INFO, "epoll time out...\n");
                break;
            case -1:
                LOG(ERROR, "epoll error\n");
                break;
            default:
                LOG(INFO, "haved event happend!, n : %d\n", n);
                HandlerEvent(n);
                break;
            }
        }
    } 
    ~EpollServer()
    {
        //关闭fd
        if (_epfd >= 0)
            ::close(_epfd);
        _listensock->Close();
    }

private:
    uint16_t _port;
    std::unique_ptr<Socket> _listensock;
    int _epfd;//epoll fd
    struct epoll_event revs[num];//缓冲区 指向 struct epoll_event 结构体数组的指针,用于接收准备就绪的事件
};
  1. 初始化服务器 (InitServer):
    • 正确地创建了监听套接字并将其添加到 epoll 实例中。
    • 使用 EPOLLIN 来监听读事件(即新连接的到来)。
  2. 接受新连接 (Accepter):
    • 从 TcpSocket 类的 Accepter 方法中接受新连接。
    • 将新连接的套接字添加到 epoll 实例中,以便监听读事件。
  3. 处理IO (HandlerIO):
    • 读取套接字数据并处理(例如,简单的 HTTP 响应)。
    • 根据读取结果(成功、客户端关闭连接、错误)采取不同的操作(包括发送响应、关闭套接字、从 epoll 中移除套接字)。
  4. 事件处理循环 (Loop):
    • 使用 epoll_wait 等待事件发生。
    • 遍历就绪的事件并调用相应的处理函数(这里是 HandlerEvent,但实际上是在 Loop 中直接处理)。

测试

Main.cc

#include "SelectServer.hpp"
#include <memory>


// ./tcpserver 8888
int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        std::cerr << "Usage: " << argv[0] << " local-port" << std::endl;
        exit(0);
    }
    uint16_t port = std::stoi(argv[1]);
    EnableScreen();
    std::unique_ptr<SelectServer> svr = std::make_unique<SelectServer>(port);
    svr->InitServer();
    svr->Loop();

    return 0;
}

 4.epoll的工作模式

边缘触发(Edge Triggered, ET)模式

特点

  • 当文件描述符从非就绪状态变为就绪状态时,epoll会通知用户程序。
  • 如果用户程序没有将文件描述符中的所有数据读取完毕,即使文件描述符中还有剩余数据,epoll也不会再次发送通知,直到下一次文件描述符从非就绪状态再次变为就绪状态。
  • ET模式通常与非阻塞I/O结合使用,以避免因阻塞读/写操作而导致的性能问题。

使用场景

  • 适用于需要高效处理大量并发连接的场景,如高性能的Web服务器、数据库服务器等。

注意事项

  • 在ET模式下,用户程序需要确保在接收到通知后,尽可能多地读取或写入数据,直到文件描述符变为非就绪状态,以避免遗漏数据。
  • 由于ET模式只通知一次,如果处理不当,可能会导致“饥饿”现象,即某些文件描述符因为没有被及时处理而错过通知。

水平触发(Level Triggered, LT)模式

特点

  • 只要文件描述符处于就绪状态,epoll就会一直通知用户程序。
  • 无论用户程序是否读取或写入了数据,只要文件描述符仍然处于就绪状态,epoll就会继续发送通知。
  • LT模式是epoll的默认模式,同时支持阻塞和非阻塞socket。

使用场景

  • 适用于对实时性要求不是特别高,但希望确保不遗漏任何I/O事件的场景。

注意事项

  • 在LT模式下,如果用户程序没有及时处理通知,可能会导致大量通知被累积,从而增加系统的负担。
  • LT模式在处理大量并发连接时可能不如ET模式高效,因为它可能会产生更多的通知。

        epoll的ET模式和LT模式各有优缺点,选择哪种模式取决于具体的应用场景和需求。在需要处理大量并发连接和追求高性能的场景中,ET模式通常是更好的选择;而在对实时性要求不高或希望简化编程模型的场景中,LT模式可能更为合适。无论使用哪种模式,都需要仔细设计用户程序的处理逻辑,以确保能够高效地处理I/O事件并避免潜在的问题。

理解 ET 模式和非阻塞文件描述符

        使用 ET 模式的 epoll, 需要将文件描述设置为非阻塞. 这个不是接口上的要求, 而是 "工
程实践" 上的要求.
        假设这样的场景: 服务器接收到一个 10k 的请求, 会向客户端返回一个应答数据. 如果客
户端收不到应答, 不会发送第二个 10k 请求.

        如果服务端写的代码是阻塞式的 read, 并且一次只 read 1k 数据的话(read 不能保证一
次就把所有的数据都读出来, 参考 man 手册的说明, 可能被信号打断), 剩下的 9k 数据就会待在缓冲区中

        此时由于 epoll 是 ET 模式, 并不会认为文件描述符读就绪. epoll_wait 就不会再次返
回. 剩下的 9k 数据会一直在缓冲区中. 直到下一次客户端再给服务器写数据.epoll_wait 才能返回

但问题来了:

  • 服务器只读到 1k 个数据, 要 10k 读完才会给客户端返回响应数据.
  • 客户端要读到服务器的响应, 才会发送下一个请求
  • 客户端发送了下一个请求, epoll_wait 才会返回, 才能去读缓冲区中剩余的数据

        所以, 为了解决上述问题(阻塞 read 不一定能一下把完整的请求读完), 于是就可以使用
非阻塞轮训的方式来读缓冲区, 保证一定能把完整的请求都读出来

        ET模式下,只通知一次,本轮数据没有读完,epoll不再通知,因此ET模式下,一旦就绪就必须把数据全部读完。但是你怎么知道有没有把数据读完?只能循环读取,知道读不到数据,循环读取肯定是会遇到阻塞问题的,epoll当然是不敢阻塞的,否则进程会被挂起,因此fd必须是非阻塞的


http://www.kler.cn/news/289236.html

相关文章:

  • 监控平台之针对vue,react上报
  • ​yum安装/更新时报错:SyntaxError: invalid syntax​
  • 全局网络代理的使用与选择
  • css的position定位的属性
  • 【Java那些事】关于Git的使用
  • Datawhale X 李宏毅苹果书 AI夏令营|机器学习基础之案例学习
  • Vue -- 总结 02
  • adb大全指令(持续更新)
  • 动态住宅IP代理的搭建指南:实现高效网络访问
  • 数据访问:JPA关联MyBatis
  • Elasticsearch的Restful风格API
  • 达梦常用SQL及脚本工具
  • 哈希 详解
  • echart自适应tree树图,结构组织图模板
  • 国赛数模C题模型(五)
  • 将泛型和函数式编程结合,竟然会让代码这么优雅!
  • (一)、软硬件全开源智能手表,与手机互联,标配多表盘,功能丰富(ZSWatch-Zephyr)
  • 大数据系列之:OutOfMemoryError: unable to create new native thread
  • 简单好用的SD卡克隆软件:轻松克隆SD卡
  • 路径优化 minimum-snap(对A*的全局路径进行优化)
  • 使用Python写一个适用于Dify和FastGPT的JsonPath插件
  • VideoCrafter1:Open Diffusion models for high-quality video generation
  • 【Android】最好用的网络库:Retrofit
  • 深度学习中的PyTorch Tensor详解
  • IntelliJ IDEA 自定义字体大小
  • Milvus向量数据库-数据备份与恢复
  • Kotlin 流 Flow
  • pikachu文件包含漏洞靶场
  • JavaScript-document.write和innerHTML的区别
  • Unity(2022.3.41LTS) - UI详细介绍-Scroll View(滚动视图)