当前位置: 首页 > article >正文

Linux——五种IO模型

目录

一IO基本理解

二五种IO模型

1五种IO模型示意图

2同步IO和异步IO 

二非阻塞IO

1fcntl

2实现非阻塞IO

三多路复用

1select

1.1定位和作用

1.2介绍参数

1.3编写多路复用代码

1.4优缺点

2poll

2.1作用和定位

2.2介绍参数 

2.3修改select代码

3epoll

3.1介绍参数

3.2工作原理

理解数据到达主机

​3.3编写epoll代码 

3.4优点

3.5工作模式

理解LT和ET

四Reactor

​完整源代码


一IO基本理解

前面学习了网络通信,我们认识到:网络通信本质上是进程间通信;而进程间通信本质上是IO;关于IO:I:Input;O:Output;这又是什么意思

站在进程角度上:Input是外面交给进程的数据;Output是进程发出去的数据;                                 站在系统角度上:Input是数据从硬件交给OS;Output是数据从OS输出出去;                                站在内存角度上:Input是内存数据交给磁盘;Output是磁盘数据交给内存

总之:IO本质上是外设与内设之间的交互

那对应我们来说:IO该如何理解呢??

C/C++解除到的IO接口:read/recv write/send 当底层有新数据到来时(前提接收/发送缓冲区未被占满的情况下),这些接口才能给我们返回fd:让我们读/写数据,否则会一直阻塞下去知道新数据的到来!

所以说:IO = 等 + 拷贝

那什么叫做高效的IO呢?<-> 在单位时间内,减少IO等的比率

二五种IO模型

在现实生活中,有很多人喜欢钓鱼,也被我们称之为钓鱼佬;它们钓鱼方式都是按照自己对钓鱼的理解去实现的,所以也就产生出不同的钓鱼方式;这也就总共可分为以下五种:(钓鱼 = 等 + 钓)

张三:一动不动,检测鱼漂,钓;(阻塞IO

李四:一直在动,随便检测鱼漂,钓;(非阻塞IO

王五:在鱼漂上挂铃铛(铃铛没响就刷抖音),钓;(信号驱动IO

赵六:准备很多钓鱼竿,定期检测鱼竿,;(多路复用IO

田七:找司机小王钓,自己去做别的事(田七没有参与调用,只是发起钓鱼);(异步IO

在以上例子中:

人:系统调用

鱼竿:sockfd

湖:系统内部

鱼:数据

鱼漂浮动:数据就绪

钓:接收数据 

田七:发起IO 小王:操作系统

通过上面的例子可以基本对应5中IO模型,但又有问题了:

阻塞IO vs 非阻塞IO

IO = 等 + 拷贝:拷贝(钓)的方式是一样的,就是等的方式不同(一个一直等,一个一直动)

谁的钓鱼效率最高呢?(单位时间内等待的时间短)

你可以会说是王五或者田七:但是答案是赵六!因为如果你是鱼的话,湖面上有104个诱饵(光赵六就有100个),你吃到赵六的鱼饵的概率高,也就是赵六更容易有鱼咬钩,自然比别人效率高,而王五田七他们只有一个鱼竿(假设与咬钩概率相等),一天内钓上来鱼的数量是差不多的,只是多做了一些事情(多刷了几个视频,多参加了几个会(对于别人来说))

1五种IO模型示意图

阻塞 IO(常见): 在内核将数据准备好之前, 系统调用会一直等待;所有的套接字,默认都是阻塞方式

非阻塞 IO: 如果内核还未将数据准备好, 系统调用仍然会直接返回, 并且返回错误码

信号驱动 IO: 内核将数据准备好的时候, 使用 SIGIO 信号通知应用程序进行 IO操作


 

IO 多路转接: 虽然从流程图上看起来和阻塞 IO 类似. 实际上最核心在于 IO 多路转接能够同时等待多个文件描述符的就绪状态

异步 IO: 由内核在数据拷贝完成时, 通知应用程序(而信号驱动是告诉应用程序何时可以开始拷贝数据)

2同步IO和异步IO 

我们也将IO模型分为两大类:同步IO(前四种IO模型)和异步IO

之前也有人为了信号驱动IO是同步还是异步争吵过,但这里把信号驱动IO归结为同步IO:看似王五在做别的事,但是鱼咬钩你是不是要去把鱼给钓上来呢?实际上还是受到IO影响(刷着视频突然鱼咬钩了你要立刻放下手机去钓鱼)!

两者区别:你有没有参与IO(等)的过程

二非阻塞IO

非阻塞 IO 往往需要程序员循环的方式反复尝试读写文件描述符, 这个过程称为轮询:
对 CPU 来说是较大的浪费, 一般只有特定场景下才使用

1fcntl

作用:将fd设置成非阻塞IO

函数的第二个参数:

复制一个现有的描述符(cmd=F_DUPFD) .
获得/设置文件描述符标记(cmd=F_GETFD 或 F_SETFD).
获得/设置文件状态标记(cmd=F_GETFL 或 F_SETFL).
获得/设置异步 I/O 所有权(cmd=F_GETOWN 或 F_SETOWN).
获得/设置记录锁(cmd=F_GETLK,F_SETLK 或 F_SETLKW).

2实现非阻塞IO

#include <iostream>
#include <sys/fcntl.h>
#include <string>
#include <unistd.h>

// 设置非阻塞轮询方式
void NoBlock(int fd)
{
    int f1 = fcntl(fd, F_GETFL);
    if (f1 < 0)
    {
        std::cerr << "fcntl fail" << std::endl;
        return;
    }
    fcntl(fd, F_SETFL, f1 | O_NONBLOCK);
}

int main()
{
    char buffer[128];
    NoBlock(0);
    while (true)
    {
        ssize_t n = read(0, buffer, sizeof(buffer) - 1);
        if (n > 0)
        {
            buffer[n] = 0;
            std::cout << "echo:" << buffer;
        }
        else if (n == 0)
        {
            std::cout << "read none" << std::endl;
            break;
        }
        else
        {
            // 如何区别read是出错了还是非阻塞了? -> 通过errno来判断!
            if (errno == EWOULDBLOCK)
            {
                std::cout << "Non blocking polling" << std::endl;
                std::cout << "Do other things" << std::endl;
                sleep(1);
                continue;
            }
            else
            {
                std::cout << "read error" << std::endl;
                break;
            }
        }
    }
    return 0;
}

现象: 

为什么我输入的时候信息会与打印的信息混杂在一起?--> OS(默认)回显

如果read被信号中断了,循环就会结束,不往下进行读了:我们也要保证该情况不会影响我们读取

#include <iostream>
#include <sys/fcntl.h>
#include <string>
#include <unistd.h>

// 设置非阻塞轮询方式
void NoBlock(int fd)
{
    int f1 = fcntl(fd, F_GETFL);
    if (f1 < 0)
    {
        std::cerr << "fcntl fail" << std::endl;
        return;
    }
    fcntl(fd, F_SETFL, f1 | O_NONBLOCK);
}

int main()
{
    char buffer[128];
    NoBlock(0);
    while (true)
    {
        ssize_t n = read(0, buffer, sizeof(buffer) - 1);
        if (n > 0)
        {
            buffer[n] = 0;
            std::cout << "echo:" << buffer;
        }
        else if (n == 0)//ctrl+d
        {
            std::cout << "read none" << std::endl;
            break;
        }
        else
        {
            // 如何区别read是出错了还是非阻塞了? -> 通过errno来判断!
            if (errno == EWOULDBLOCK)
            {
                std::cout << "Non blocking polling" << std::endl;
                std::cout << "Do other things" << std::endl;
                sleep(1);
                continue;//未来是要break的!
            }
            else if(errno==EINTR)//read被信号中断
            {
                continue;
            }
            else
            {
                std::cout << "read error" << std::endl;
                break;
            }
        }
    }
    return 0;
}

三多路复用

IO多路复用(Input/Output Multiplexing)是一种在单个线程中管理多个输入/输出通道的技术。它允许一个线程同时监听多个输入流(例如sockfd、fd等),并在有数据可读或可写时进行相应的处理,而不需要为每个通道创建一个独立的线程。

1select

系统提供 select 函数来实现多路复用输入/输出模型

1.1定位和作用

定位:它只负责进行等,不负责IO

作用:等待多个fd(等该fd上面的新事件就绪),通知用户新事件已经就绪,可以来IO了

1.2介绍参数

1.3编写多路复用代码

使用select要注意以下细节: 

1.select后accept还是阻塞吗?

一定不会阻塞(事件已经就绪)

2.read能直接读吗?如果不能,谁最清楚底层fd的数据就绪了?

绝对不能读,读取的时候,条件不一定满足; 由select来统一监管(select知道)

3.select要正常工作,要借助辅助数组(fd_array),来保存所有合法的fd!                                  (select的fd_set* 是输入输出参数,它会继续参数重置)

4.select统一监管是怎么做到的?

(accept后)将最新的fd添加到辅助数组(fd_array)就行了

5.就绪了循环处理所有事件

//SelectServer.hpp
#pragma once

#include <cstdint>
#include <memory>
#include <sys/select.h>
#include "Socket.hpp"

const static int gnum = sizeof(fd_set) * 8;
const static int gfd = -1;

class SelectServer
{
public:
    SelectServer(uint16_t port) : _port(port), _st(std::make_unique<TcpSocket>())
    {
        _st->Tcp_ServerSocket(_port);
    }
    void Init()
    {
        for (int i = 0; i < gnum; i++)
        {
            fd_array[i] = gfd;
        }
        // 自己先设置
        fd_array[0] = _st->Sockfd();
    }

    void Loop()
    {
        while (true)
        {
            fd_set ft;
            FD_ZERO(&ft); // 使用前先清除
            int fd_max = gfd;
            // 合法的fd添加到ft中
            for (int i = 0; i < gnum; i++)
            {
                if (fd_array[i] == gfd)
                    continue;

                FD_SET(fd_array[i], &ft);
                // 更新出最大值fd_max
                if (fd_max < fd_array[i])
                    fd_max = fd_array[i];
            }
            struct timeval tl = {10, 0};
            // 只关心读事件
            int n = ::select(fd_max + 1, &ft, nullptr, nullptr, nullptr);
            switch (n)
            {
            case 0:
                std::cout << "time done " << tl.tv_sec << ' ' << tl.tv_usec << std::endl;
                break;
            case -1:
                std::cerr << "select fail" << std::endl;
                break;
            default:
                std::cout << "have even: " << n << std::endl;
                HandEvent(ft); // 处理事件(不出来会一直通知)
                PrintFd_array();
                break;
            }
        }
    }
    //一定会存在大量的不用类型的sockfd!
    void HandEvent(fd_set &ft)
    {
        for (int i = 0; i < gnum; i++)
        {
            if (fd_array[i] == gfd)
                continue;
            // 一定是合法的fd
            // 判断fd是否就绪
            if (FD_ISSET(fd_array[i], &ft)) // 动态检测
            {
                // 事件就绪
                // sockfd类型?
                if (fd_array[i] == _st->Sockfd())
                {
                    Link();
                }
                else
                {
                    Read_Write(i);
                }
            }
        }
    }

    void Link()
    {
        InetAddr ir;
        int sockfd = _st->AcceptSocket(&ir); // 一定不会阻塞!
        if (sockfd > 0)
        {
            // 新的sockfd -> fd_array 让select进行管理
            bool tmp = false;
            for (int i = 1; i < gnum; i++)
            {
                if (fd_array[i] == gfd)
                {
                    tmp = true;
                    fd_array[i] = sockfd;
                    std::cout << "add:" << fd_array[i] << std::endl;
                    break;
                }
            }
            // fd_array满了
            if (!tmp)
            {
                std::cout << "fd_array full" << std::endl;
                ::close(sockfd);
            }
        }
    }

    void Read_Write(int i)
    {
        //没协议,可以直接读(这里考虑IO)
        char buffer[1024];
        ssize_t n = ::read(fd_array[i], buffer, sizeof(buffer) - 1); // 阻塞?No
        if (n > 0)
        {
            buffer[n] = 0;
            std::cout << "Server read:" << buffer << std::endl;

            std::string content = "<html><body><h1>hello warld</h1></body></html>";

            std::string ech_str = "HTTP/1.0 200 OK\r\n";
            ech_str += "Content-Length: " + std::to_string(content.size()) + "\r\n\r\n";
            ech_str += content;
            //(这里)默认是就绪的
            ::write(fd_array[i], ech_str.c_str(), ech_str.size());//临时
        }
        else if (n == 0)
        {
            std::cout << "user quit.." << std::endl;
            ::close(fd_array[i]);
            //清理fd
            fd_array[i] = gfd;
        }
        else
        {
            std::cout << "recv fail.." << std::endl;
            ::close(fd_array[i]);
            //清理fd
            fd_array[i] = gfd;
        }
    }

    void PrintFd_array()
    {
        for (int i = 0; i < gnum; i++)
        {
            if (fd_array[i] == gfd)
                continue;
            std::cout << fd_array[i] << ' ';
        }
        std::cout << std::endl;
    }

private:
    uint16_t _port;
    std::unique_ptr<Socket> _st;
    // fd数组->select管理
    int fd_array[gnum];
};

//Main.cc
#include <iostream>
#include <memory>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <pthread.h>
#include <cstring>
#include <string>

#include"Socket.hpp"
#include"SelectServer.hpp"

int main(int args, char *argv[])
{
    if (args != 2)
    {
        std::cerr << "./Server Localport" << std::endl;
        exit(0);
    }
    uint16_t serverport = std::stoi(argv[1]);

    std::unique_ptr<SelectServer> sr=std::make_unique<SelectServer>(serverport);
    sr->Init();
    sr->Loop();

    return 0;
}

//InetAddr.hpp
#pragma once
#include<iostream>
#include<string>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

class InetAddr
{
    void ToHost()
    {
        //_ip=inet_ntoa(_addr.sin_addr);//4字节地址->char*
        _port=ntohs(_addr.sin_port);

        char buffer[124];
        inet_ntop(AF_INET,&_addr.sin_addr,buffer,sizeof(buffer));
        _ip=buffer;
    }
public:
    InetAddr()
    {
        
    }
    InetAddr(const struct sockaddr_in& addr)
    :_addr(addr)
    {
        ToHost();
    }

    std::string Ip()
    {
        return _ip;
    }

    uint16_t Port()
    {
        return _port;
    }

    struct sockaddr_in Addr()
    {
        return _addr;
    }

    bool operator==(const InetAddr& ad)
    {
        return (this->_ip==ad._ip&&this->_port==ad._port);
    }

    std::string User()
    {
        std::string tmp=_ip+" "+std::to_string(_port)+":";
        return tmp;
    } 
    
private:
    std::string _ip;
    uint16_t _port;
    struct sockaddr_in _addr;
};

//Socket.hpp
#pragma once


#include "InetAddr.hpp"

class Socket;
enum
{
    SOCK_ERROR = 1,
    BIND_ERROR,
    LISTEN_ERROR,
};
const int gbacklog = 8;

class Socket
{
public:
    virtual void CreateSocket() = 0;
    virtual void InitSocket(uint16_t port) = 0;
    virtual int AcceptSocket(InetAddr *addr) = 0;                         // 对象/变量
    virtual bool ConnectSocket(uint16_t port, const std::string &ip) = 0; // clinet连接成功与失败

    virtual int Sockfd() = 0;
    virtual void Close() = 0;
    virtual ssize_t Recv(std::string *out) = 0;
    virtual ssize_t Send(const std::string &in) = 0;

public:
    void Tcp_ServerSocket(uint16_t port)
    {
        CreateSocket();
        InitSocket(port);
    }
    bool Tcp_ClientSocket(uint16_t port, const std::string &ip)
    {
        CreateSocket();
        return ConnectSocket(port, ip);
    }
};

class TcpSocket : public Socket
{
public:
    TcpSocket()
    {
    }
    TcpSocket(int sockfd)
        : _sockfd(sockfd)
    {
    }
    ~TcpSocket()
    {
    }
    virtual void CreateSocket() override
    {
        _sockfd = ::socket(AF_INET, SOCK_STREAM, 0);
        if (_sockfd < 0)
        {
            std::cerr << "socket fail" << std::endl;
            exit(SOCK_ERROR);
        }
        std::cout << "socket sucess" << std::endl;
    }
    virtual void InitSocket(uint16_t port) override
    {
        struct sockaddr_in perr;
        memset(&perr, 0, sizeof(perr));
        perr.sin_family = AF_INET;
        perr.sin_port = htons(port);
        perr.sin_addr.s_addr = INADDR_ANY;
        if (::bind(_sockfd, (struct sockaddr *)&perr, sizeof(perr)) < 0)
        {
            std::cerr << "bind fail" << std::endl;
            exit(BIND_ERROR);
        }
        std::cout << "bind sucess" << std::endl;

        if (::listen(_sockfd, gbacklog) < 0)
        {
            std::cerr << "listen fail" << std::endl;
            exit(LISTEN_ERROR);
        }
        std::cout << "listen sucess" << std::endl;
    }
    virtual int AcceptSocket(InetAddr *addr) override // 外层要获取客户端信息 TODO
    {
        struct sockaddr_in client;
        socklen_t len = sizeof(client);
        int sockfd = ::accept(_sockfd, (struct sockaddr *)&client, &len);
        if (sockfd < 0)
        {
            std::cerr << "accept fail" << std::endl;
            return -1;
        }
        *addr = client;
        std::cout << "get a new link " <<addr->User()<< std::endl;
        return sockfd;
    }

    virtual bool ConnectSocket(uint16_t port, const std::string &ip) override
    {
        struct sockaddr_in server;
        memset(&server, 0, sizeof(server));
        server.sin_family = AF_INET;
        server.sin_port = htons(port);
        inet_pton(AF_INET, ip.c_str(), &server.sin_addr);
        int n = connect(_sockfd, (struct sockaddr *)&server, sizeof(server));
        if (n < 0)
        {
            return false;
        }
        return true;
    }

    virtual int Sockfd() override
    {
        return _sockfd;
    }
    virtual void Close() override
    {
        if (_sockfd > 0)
        {
            ::close(_sockfd);
        }
    }
    virtual ssize_t Recv(std::string *out) override
    {
        char buffer[4096];
        ssize_t n = ::recv(_sockfd, buffer, sizeof(buffer) - 1, 0);
        if (n > 0)
        {
            buffer[n] = 0;
            *out += buffer; // 细节
        }
        return n;
    }
    virtual ssize_t Send(const std::string &in) override
    {
        return ::send(_sockfd, in.c_str(), in.size(), 0);
    }

private:
    int _sockfd; // 两个角色
};

1.4优缺点

优点:                                                                                                                                                实现简单                                                                                                                                          兼容性好,可以实现跨平台                                                                                                             在一些老内核中就只能使用select来实现多路复用 

缺点:                                                                                                                                                 需要手动设置 fd 集合(使用起来麻烦(fd 集合混杂着不同的事件类型))
需要把 fd 集合从用户态拷贝到内核态(OS不相信任何人)(无法避免
需要在内核遍历传递进来的所有 fd
select 支持的文件描述符数量太小进程打开的文件描述符也是有上限

2poll

poll主要来解决手动设置核fd太小的问题(其它缺点poll同样存在)

2.1作用和定位

作用:等待多个fd上面的事件就绪,通知用户事件已经就绪,可以进行IO了

定位:只负责等:等就绪后进行事件派发

2.2介绍参数 

events和revents的取值

仅仅用一个结构体就解决了:用户让OS关心的事件(设置)用户让我关心的事件(返回

不用因为select设计的缺陷, 对fd和fd关心的事件进行重新设定!

而数量太小问题:poll就从此就给了用户,由用户定义fd数量后交给poll

2.3修改select代码

//PollServer.hpp
#pragma once

#include <cstdint>
#include <memory>
#include <sys/poll.h>
#include "Socket.hpp"

const static int gnum = sizeof(fd_set) * 8;
const static int gfd = -1;

class PollServer
{
public:
    PollServer(uint16_t port) : _port(port), _st(std::make_unique<TcpSocket>())
    {
        _st->Tcp_ServerSocket(_port);
    }
    void Init()
    {
        for (int i = 0; i < gnum; i++)
        {
            fd_array[i].fd = gfd;
            fd_array[i].events = 0;
            fd_array[i].revents = 0;
        }
        // 自己先设置
        fd_array[0].fd = _st->Sockfd();
        fd_array[0].events = POLLIN;
    }

    void Link()
    {
        InetAddr ir;
        int sockfd = _st->AcceptSocket(&ir);
        if (sockfd > 0)
        {
            // 新的sockfd -> fd_array 让select进行管理
            bool tmp = false;
            for (int i = 1; i < gnum; i++)
            {
                if (fd_array[i].fd == gfd)
                {
                    tmp = true;
                    fd_array[i].fd = sockfd;
                    fd_array[i].events = POLLIN;
                    std::cout << "add:" << fd_array[i].fd << std::endl;
                    break;
                }
            }
            // fd_array满了
            if (!tmp)
            {
                std::cout << "fd_array full" << std::endl;
            }
        }
    }

    void Read_Write(int i)
    {
        char buffer[1024];
        ssize_t n = ::read(fd_array[i].fd, buffer, sizeof(buffer) - 1); // 阻塞?No
        if (n > 0)
        {
            buffer[n] = 0;
            std::cout << "Server read:" << buffer << std::endl;

            std::string content = "<html><body><h1>hello warld</h1></body></html>";

            std::string ech_str = "HTTP/1.0 200 OK\r\n";
            ech_str += "Content-Type: text/html\r\n";
            ech_str += "Content-Length: " + std::to_string(content.size()) + "\r\n\r\n";
            ech_str += content;

            ::write(fd_array[i].fd, ech_str.c_str(), ech_str.size());
        }
        else if (n == 0)
        {
            std::cout << "user quit.." << std::endl;
            ::close(fd_array[i].fd);

            fd_array[i].fd = gfd;
            fd_array[i].events = 0;
            fd_array[i].revents = 0;
        }
        else
        {
            std::cout << "recv fail.." << std::endl;
            ::close(fd_array[i].fd);

            fd_array[i].fd = gfd;
            fd_array[i].events = 0;
            fd_array[i].revents = 0;
        }
    }

    void HandEvent()
    {
        for (int i = 0; i < gnum; i++)
        {
            if (fd_array[i].fd == gfd)
                continue;
            // 哪个读事件就绪
            if (fd_array[i].revents & POLLIN)
            {
                // 处理特定sockfd
                if (fd_array[i].fd == _st->Sockfd())
                {
                    Link();
                }
                // 处理普通fd
                else
                {
                    Read_Write(i);
                }
            }
        }
    }
    void Loop()
    {
        while (true)
        {
            // 不用设置啦 -> 优雅
            int n = ::poll(fd_array, gnum, 1000);
            switch (n)
            {
            case 0:
                std::cout << "time done " << std::endl;
                break;
            case -1:
                std::cerr << "select fail" << std::endl;
                break;
            default:
                std::cout << "have even: " << n << std::endl;
                HandEvent();
                PrintFd_array();
                break;
            }
        }
    }

    void PrintFd_array()
    {
        for (int i = 0; i < gnum; i++)
        {
            if (fd_array[i].fd == gfd)
                continue;
            std::cout << fd_array[i].fd << ' ';
        }
        std::cout << std::endl;
    }

private:
    uint16_t _port;
    std::unique_ptr<Socket> _st;
    // fd数组->select管理
    pollfd fd_array[gnum];
};

poll的底层也是要OS遍历所有的fd,来获取fd和对应的事件(效率与select一样不高)

所以就有了下面的epoll~ 

3epoll

按照 man 手册的说法: 为处理大批量句柄而作了改进的 poll
它是在 2.5.44 内核中被引进的(epoll(4) is a new API introduced in Linux kernel 2.5.44)它几乎具备了之前所说的一切优点, 被公认为 Linux2.6 下性能最好的多路 I/O 就绪通知方法

3.1介绍参数

event可以是几个宏的集合

EPOLLIN : 表示对应的文件描述符可以读 (包括对端 SOCKET 正常关闭);
EPOLLOUT : 表示对应的文件描述符可以写;
EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外
数据到来);
EPOLLERR : 表示对应的文件描述符发生错误;
EPOLLHUP : 表示对应的文件描述符被挂断;
EPOLLET : 将 EPOLL 设为边缘触发(Edge Triggered)模式, 这是相对于水平触发来说的.
EPOLLONESHOT: 只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个 socket 的话, 需要再次把这个 socket 加入到 EPOLL 队列里

虽然说是epoll的加强版,但两者差别真的挺大的!

3.2工作原理

介绍完参数,此刻的你或许有很多问题:不急先来谈谈epoll的工作原理

理解数据到达主机

主机从网络中收数据时通过网卡来解收的,从而往上交给数据链路层;数据链路层是OS管的,也就是说OS怎么知道网卡有数据了需要进行拷贝了?

有人会说:简单,让OS定期检测一遍不就行了?OS要管理小到进程行大到外设,所有事情都要轮询OS不得忙死,所以OS是通过网卡给我发送信号(硬件中断)来得知网卡有数据啦!

epoll函数与OS的关系

但实际上,OS管理就绪队列时不用创建节点的! 

OS也要对epoll模型作管理:先描述,在组织

3.3编写epoll代码 

//ServerEpoll.hpp
#pragma once

#include "Socket.hpp"

const static int gnum = 128;
const static int gsize = 128;

class ServerEpoll
{
public:
    ServerEpoll(uint16_t port)
        : _port(port), _st(std::make_unique<TcpSocket>())
    {
        _st->Tcp_ServerSocket(_port);
        _epfd = ::epoll_create(gsize);
        if (_epfd < 0)
        {
            LOG(FATAL, "epoll_creat fail\n");
            exit(1);
        }
        LOG(INFO, "epoll_creat sucess,_epfd:%d\n", _epfd);
    }
    ~ServerEpoll()
    {
        if (_epfd > 0)
            ::close(_epfd);
        _st->Close();
    }

    void Init()
    {
        epoll_event et;
        et.data.fd = _st->Sockfd(); // 为了在事件就绪时知道fd就绪了
        et.events = EPOLLIN;
        int n = ::epoll_ctl(_epfd, EPOLL_CTL_ADD, _st->Sockfd(), &et);
        if (n < 0)
        {
            LOG(ERROR, "epoll_ctl fail\n");
            exit(2);
        }
        LOG(INFO, "epoll_ctl sucess,add %d\n", _st->Sockfd());
    }

    void Start()
    {
        int time = -1;
        while (true)
        {
            int n = ::epoll_wait(_epfd, ets, gnum, time);
            switch (n)
            {
            case 0:
                LOG(INFO, "epoll_wait time done\n");
                break;
            case -1:
                LOG(INFO, "epoll_wait fail\n");
                break;
            default:
                LOG(INFO, "epoll sucess total:%d\n", n);
                Handler(n);
                break;
            }
        }
    }

    void Handler(int n)
    {
        for (int i = 0; i < n; i++)
        {
            int sockfd = ets[i].data.fd;
            uint32_t event = ets[i].events;
            LOG(INFO, "%d 事件就绪 event:%s\n", sockfd, EventToString(event).c_str());
            if (event & EPOLLIN)
            {
                if (sockfd == _st->Sockfd())
                {
                    Accept();
                }
                else
                {
                    HandlerIO(sockfd);
                }
            }
        }
    }

    void Accept()
    {
        InetAddr ar;
        int sockfd = _st->AcceptSocket(&ar);
        if (sockfd < 0)
        {
            LOG(ERROR, "gain line error sockfd: %d\n", sockfd);
            return;
        }

        LOG(INFO, "gain link:%d client:%s\n", sockfd, ar.User().c_str());
        epoll_event et;
        et.data.fd = sockfd;
        et.events = EPOLLIN;
        // OS在底层建立红黑树节点 -- select/poll要循环自己维护~
        int n = ::epoll_ctl(_epfd, EPOLL_CTL_ADD, sockfd, &et);
        if (n < 0)
        {
            LOG(ERROR, "epoll_ctl fail\n");
            exit(2);
        }
        LOG(INFO, "epoll_ctl sucess,add %d\n", sockfd);
    }

    void HandlerIO(int fd)
    {
        // 不保证读到的数据是正确的:要通过协议来保证~
        char buffer[4096];
        ssize_t n = ::recv(fd, buffer, sizeof(buffer) - 1, 0);
        if (n > 0)
        {
            buffer[n] = 0;
            std::cout << buffer;
            std::string content = "<html><body><h1>Hello Warld</h1></body></html>";
            std::string ech_str = "HTTP/1.0 200 OK\r\n";
            ech_str += "Content-Length:" + std::to_string(content.size()) + "\r\n";
            ech_str += "\r\n";
            ech_str += content;
            ::send(fd, ech_str.c_str(), ech_str.size(), 0);
        }
        else if (n == 0)
        {
            LOG(INFO, "Clinet quit fd:%d\n", fd);
            // epoll移除 fd fd要保证:健康&&合法
            ::epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);
            // 再关闭
            ::close(fd);
        }
        else
        {
            LOG(INFO, "read fail\n");
            // epoll移除 fd fd要保证:健康&&合法
            ::epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);
            ::close(fd);
        }
    }

    std::string EventToString(uint32_t event)
    {
        std::string et;
        if (event & EPOLLIN)
            et += "EPOLLIN";
        if (event & EPOLLOUT)
            et += " | EPOLLOUT";
        return et;
    }

private:
    uint16_t _port;
    std::unique_ptr<Socket> _st;
    int _epfd;
    epoll_event ets[gnum]; // 不用自己维护
};

//Main.cc
#include<iostream>
#include<memory>
#include<sys/epoll.h>

#include"ServerEpoll.hpp"


int main(int args,char* argc[])
{
    if(args!=2)
    {
        std::cout<<"./Main Port"<<std::endl;
        exit(1);
    }
    uint16_t port=std::stoi(argc[1]);

    std::unique_ptr<ServerEpoll> usr=std::make_unique<ServerEpoll>(port);
    usr->Init();
    usr->Start();
    return 0;
}

//InetAddr.hpp
#pragma once
#include<iostream>
#include<string>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

class InetAddr
{
    void ToHost()
    {
        //_ip=inet_ntoa(_addr.sin_addr);//4字节地址->char*
        _port=ntohs(_addr.sin_port);

        char buffer[124];
        inet_ntop(AF_INET,&_addr.sin_addr,buffer,sizeof(buffer));
        _ip=buffer;
    }
public:
    InetAddr()
    {
        
    }
    InetAddr(const struct sockaddr_in& addr)
    :_addr(addr)
    {
        ToHost();
    }

    std::string Ip()
    {
        return _ip;
    }

    uint16_t Port()
    {
        return _port;
    }

    struct sockaddr_in Addr()
    {
        return _addr;
    }

    bool operator==(const InetAddr& ad)
    {
        return (this->_ip==ad._ip&&this->_port==ad._port);
    }

    std::string User()
    {
        std::string tmp=_ip+" "+std::to_string(_port)+":";
        return tmp;
    } 
    
private:
    std::string _ip;
    uint16_t _port;
    struct sockaddr_in _addr;
};

//Log.hpp
#pragma once

#include<iostream>
#include<string>
#include<cstring>
#include<fstream>
#include <stdarg.h>
#include <unistd.h>
#include <sys/syscall.h>


enum
{
    DEBUG = 1,
    INFO,
    WARNING,
    ERROR,
    FATAL
};

std::string Getlevel(int level)
{
    switch (level)
    {
    case DEBUG:
        return "DEBUG";
        break;
    case INFO:
        return "INFO";
        break;
    case WARNING:
        return "WARNING";
        break;
    case ERROR:
        return "ERROR";
        break;
    case FATAL:
        return "FATAL";
        break;
    default:
        return "";
        break;
    }
}

std::string Gettime()
{
    time_t now = time(nullptr);
    struct tm *time = localtime(&now);
    char buffer[128];
    snprintf(buffer, sizeof(buffer), "%d-%02d-%02d %02d:%02d:%02d",
             time->tm_year + 1900,
             time->tm_mon + 1,
             time->tm_mday,
             time->tm_hour,
             time->tm_min,
             time->tm_sec);
    return buffer;
}

struct log_message
{
    std::string _level;
    int _id;
    std::string _filename;
    int _filenumber;
    std::string _cur_time;
    std::string _message;
};

#define SCREAM 1
#define FILE 2

#define DEVELOP 3
#define OPERATION 4

const std::string gpath = "./log.txt";
pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;

class log
{
public:
    log(const std::string &path = gpath, const int status = DEVELOP)
        : _mode(SCREAM), _path(path), _status(status)
    {
    }
    void SelectMode(int mode)
    {
        _mode = mode;
    }
    void SelectStatus(int status)
    {
        _status = status;
    }

    void PrintScream(const log_message &le)
    {
        printf("[%s][%d][%s][%d][%s] %s",
               le._level.c_str(),
               le._id,
               le._filename.c_str(),
               le._filenumber,
               le._cur_time.c_str(),
               le._message.c_str());
    }
    void PrintFile(const log_message &le)
    {
        std::fstream in(_path, std::ios::app);
        if (!in.is_open())
            return;
        char buffer[1024];
        snprintf(buffer, sizeof(buffer), "[%s][%d][%s][%d][%s] %s",
                 le._level.c_str(),
                 le._id,
                 le._filename.c_str(),
                 le._filenumber,
                 le._cur_time.c_str(),
                 le._message.c_str());
        in.write(buffer, strlen(buffer)); // 不用sizeof
        in.close();
    }
    void PrintLog(const log_message &le)
    {
        // 过滤
        if (_status == OPERATION)
            return;

        // 线程安全
        pthread_mutex_lock(&gmutex);

        switch (_mode)
        {
        case SCREAM:
            PrintScream(le);
            break;
        case FILE:
            PrintFile(le);
            break;
        default:
            break;
        }

        pthread_mutex_unlock(&gmutex);
    }
    void logmessage(int level, const std::string &filename, int filenumber, const char *message, ...)
    {
        log_message le;
        le._level = Getlevel(level);
        le._id = syscall(SYS_gettid);
        le._filename = filename;
        le._filenumber = filenumber;
        le._cur_time = Gettime();

        va_list vt;
        va_start(vt, message);
        char buffer[128];
        vsnprintf(buffer, sizeof(buffer), message, vt);
        va_end(vt);
        le._message = buffer;

        // 打印日志
        PrintLog(le);
    }
    ~log()
    {
    }

private:
    int _mode;
    std::string _path;
    int _status;
};

// 方便上层调用
log lg;

// ##不传时可忽略参数
#define LOG(level, message, ...)                                          \
    do                                                                    \
    {                                                                     \
        lg.logmessage(level, __FILE__, __LINE__, message, ##__VA_ARGS__); \
    } while (0)

#define SleftScream()          \
    do                         \
    {                          \
        lg.SelectMode(SCREAM); \
    } while (0)
#define SleftFile()          \
    do                       \
    {                        \
        lg.SelectMode(FILE); \
    } while (0)

#define SleftDevelop()            \
    do                            \
    {                             \
        lg.SelectStatus(DEVELOP); \
    } while (0)
#define SleftOperation()            \
    do                              \
    {                               \
        lg.SelectStatus(OPERATION); \
    } while (0)

//#pragma once


#include "InetAddr.hpp"
#include "Log.hpp"

class Socket;
enum
{
    SOCK_ERROR = 1,
    BIND_ERROR,
    LISTEN_ERROR,
};
const int gbacklog = 8;

class Socket
{
public:
    virtual void CreateSocket() = 0;
    virtual void InitSocket(uint16_t port) = 0;
    virtual int AcceptSocket(InetAddr *addr) = 0;                         // 对象/变量
    virtual bool ConnectSocket(uint16_t port, const std::string &ip) = 0; // clinet连接成功与失败

    virtual int Sockfd() = 0;
    virtual void Close() = 0;
    virtual ssize_t Recv(std::string *out) = 0;
    virtual ssize_t Send(const std::string &in) = 0;

public:
    void Tcp_ServerSocket(uint16_t port)
    {
        CreateSocket();
        InitSocket(port);
    }
    bool Tcp_ClientSocket(uint16_t port, const std::string &ip)
    {
        CreateSocket();
        return ConnectSocket(port, ip);
    }
};

class TcpSocket : public Socket
{
public:
    TcpSocket()
    {
    }
    TcpSocket(int sockfd)
        : _sockfd(sockfd)
    {
    }
    ~TcpSocket()
    {
    }
    virtual void CreateSocket() override
    {
        _sockfd = ::socket(AF_INET, SOCK_STREAM, 0);
        if (_sockfd < 0)
        {
            LOG(ERROR,"create socket fail\n");
            exit(SOCK_ERROR);
        }
        LOG(INFO,"create socket sucess,sockfd:%d\n",_sockfd);
    }
    virtual void InitSocket(uint16_t port) override
    {
        struct sockaddr_in perr;
        memset(&perr, 0, sizeof(perr));
        perr.sin_family = AF_INET;
        perr.sin_port = htons(port);
        perr.sin_addr.s_addr = INADDR_ANY;
        if (::bind(_sockfd, (struct sockaddr *)&perr, sizeof(perr)) < 0)
        {
            LOG(ERROR,"bind fail\n");
            exit(BIND_ERROR);
        }
        LOG(INFO,"bind sucess\n");

        if (::listen(_sockfd, gbacklog) < 0)
        {
            LOG(ERROR,"listen fail\n");
            exit(LISTEN_ERROR);
        }
        LOG(INFO,"listen sucess\n");
    }
    virtual int AcceptSocket(InetAddr *addr) override // 外层要获取客户端信息 TODO
    {
        struct sockaddr_in client;
        socklen_t len = sizeof(client);
        int sockfd = ::accept(_sockfd, (struct sockaddr *)&client, &len);
        if (sockfd < 0)
        {
            LOG(ERROR,"link fail\n");
            return -1;
        }
        *addr = client;
        return sockfd;
    }

    virtual bool ConnectSocket(uint16_t port, const std::string &ip) override
    {
        struct sockaddr_in server;
        memset(&server, 0, sizeof(server));
        server.sin_family = AF_INET;
        server.sin_port = htons(port);
        inet_pton(AF_INET, ip.c_str(), &server.sin_addr);
        int n = connect(_sockfd, (struct sockaddr *)&server, sizeof(server));
        if (n < 0)
        {
            return false;
        }
        return true;
    }

    virtual int Sockfd() override
    {
        return _sockfd;
    }
    virtual void Close() override
    {
        if (_sockfd > 0)
        {
            ::close(_sockfd);
        }
    }
    virtual ssize_t Recv(std::string *out) override
    {
        char buffer[4096];
        ssize_t n = ::recv(_sockfd, buffer, sizeof(buffer) - 1, 0);
        if (n > 0)
        {
            buffer[n] = 0;
            *out += buffer; // 细节
        }
        return n;
    }
    virtual ssize_t Send(const std::string &in) override
    {
        return ::send(_sockfd, in.c_str(), in.size(), 0);
    }

private:
    int _sockfd; // 两个角色
};

3.4优点

 • 接口使用方便: 虽然拆分成了三个函数, 但是反而使用起来更方便高效. 不需要每次循环都设置关注的文件描述符, 也做到了输入输出参数分离开
数据拷贝轻量: 只在合适的时候调用 EPOLL_CTL_ADD 将文件描述符结构拷贝到内核中, 这个操作并不频繁(而 select/poll 都是每次循环都要进行拷贝)
事件回调机制: 避免使用遍历, 而是使用回调函数的方式, 将就绪的文件描述符结构加入到就绪队列中(填写就绪队列的节点), epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪. 这个操作时间复杂度 O(1),拷贝到用户定义的数组虽然也是O(N),但它严格按顺序进行拷贝

3.5工作模式

在XX快递站中有两个快递员:张三和李四;张三工作相对负责,而李四工作较毛躁;有一天住在5楼的小王在网上买了5个快递,现在正在派发:有三个在张三手上,有两个在李四手上;现在张三先到了小王楼下,带电话给小王叫他下来去快递,但由于小王在玩游戏,也就没下去取;张三一看小王没下来就重复给小王打电话;等到小王游戏打完后就下楼拿快递了,但他只拿了两个就上楼了;现在李四到了问张三:你还有多少个快递没派完?“还有一个小王的”张三说;那正好我手上有2个小王的包裹,你帮我一起派发吧!张三也只好接下了2个小王新增的包裹,继续给小王打电话,知道他吧包裹全取走为止...

后来小王又在网上买了5个快递,派发是恰好3个在李四手上,2个在张三手上;现在轮到李四先到小王楼下;由于李四工作比较毛躁,打电话给小王说:我只打电话给你一次,如果你不下来取快递,派发完别人的我就先走了!小王一听对方语气有点强硬,也就刚上了就不下去取;此时张三到了小王楼下,看到李四问有小王的快递没?一听有,张三就把小王的快递交给了李四说:上次我帮了你,这次你帮我下;李四也是挺讲义气的,也就把包裹拿来了,这次手上新增了2个包裹,再次打给了小王,小王说:你不是说不再给我打电话的吗?没办法,手上又新增了你的2个快递,你快点下来拿把,不然我就走了!小王也就得毕竟是自己的快递没必要不下去拿,也就下去把5个快递都拿回家了~

张三:只要底层一直有快递(数据),就一直通知小王(上层)

李四:包裹从无到有,从有到多的时候,才会通知小王(上层)

张三和李四就对应着epoll的两种不同的工作模式:水平触发(LT)边缘触发(ET)

理解LT和ET

场景:客户端向(ET模式下)服务器发送10k请求数据,服务器只读了1k数据

在ET模式下只通知一次,本轮数据如果没读完不会再通知;如果服务器没读完数据,导致双方都要发数据给对方,对方才能进行处理数据(陷入死循环)

这就要求在ET模式下读事件一旦就绪,必须把数据读完;那我怎么知道我把数据读完了?

这就好比关系还不错的朋友来向你借钱,第一次借100,下一次再借100,那他下次会不会再向你借钱呢?肯定会(知道你有钱~);那他怎么知道你没钱了?第三次向你借钱时,你发信息给他:我只有36.66块钱了,这是你朋友就知道你没钱了!

也就是在设计时进行循环读取,只到读取不到数据就证明没数据了!

如果这时朋友发信息再次向你借钱,你没回复他,他也不知道是什么情况(没钱还是没看到信息),就会一直僵持着

也就是说:循环读取势必会引起阻塞问题,但服务器敢让你阻塞吗(一阻塞可能服务器就挂了)!所以我们要对fd设置非阻塞来解决该问题

以上便是为了引出结论:在ET模式下,所有的fd必须是非阻塞的!LT模式阻塞非阻塞都行~

ET更高效,体现在:1通知效率更高;2IO效率更高(给对方通告一个更大的接收窗口,让对方能发送更多的数据,提高IO效率)

那LT也可以设置非阻塞,也可以循环读取所有数据啊!但它会有退路可言(设置成阻塞,一次一次读数据)而ET一定要让上层读取全部数据!这在设计上ET也比LT要复杂些;但虽然ET更高效,也并不意味着LT就没用了:在一些场景中如果要实现响应是有顺序的或者是要求设计简单的,就选LT~

四Reactor

编写Reactor的基本框架

在Reactor的代码中,我们要来解决读和写的问题 

关于读:我们能够直接读吗?   不能!

我们要怎么知道接收缓冲区有数据了?   通过epoll等待读事件就绪!

我们能保证读到的就是一个完整的请求吗?   不能!

如何保证?   引入协议:对读到的内容进行报文解析

关于写:a当我们获取一个新的sockfd时,输入和输出缓冲区都是空的

b读事件就绪:就是输入缓冲区有了数据/底层有新连接到来

c写事件就绪:不是关心数据,而是关心发送缓冲区中有没有空间,有空间就证明写事件就绪;否则写事件不满足!

d把sockfd托管给select,poll,epoll:原因是sockfd上事件没就绪

所以默认sockfd新建的情况下,读事件是不就绪的:接收缓冲区暂时没数据,所以我们要把它进行托管

而默认sockfd新建的情况下,写事件是就绪的:接收缓冲区本来就有空间,所以我们直接写

当写条件不满足时,我们才要按需开启对EPOLLOUT的关心!

什么情况下写条件不满足?   发送缓冲区写满了&&数据还没发完

设置EPOLLOUT之后,我们就不用管了!后续的剩余数据epoll会自动给我发送

补充细节:

1如果我们直接开启EPOLLOUT关心,epoll就会一直就绪

2如果未来我们发完数据了,对EPOLLOUT的关心就会被关闭

3如果缓冲区没满&&数据发完了,我们就不用开启对EPOLLOUT关心

4如果我们设置对EPOLLOUT的关心,epoll默认只会就绪一次!

5直接发,我们怎么知道写入条件不满足?   我们写入时进行判断errno == EWOULDBLOCK就是缓冲区被写满了!

在代码中我们没有sockfd的概念,只有一个一个的connection到来:所以我们要对connection进行描述并构建成对象,有Reactor对象来管理connection对象(通过unordered_map数据结构来组织)

accept设置连接时并不知道有多少个连接即将到来,所以我们要把listensockfd设置成非阻塞,循环获取新连接!

获取新连接时,我们要对读 写 异常方法进行bind回调函数,我们不在listener类中进行bind,我们统一在Main.cc(外部)将方法bind,怎么实现?                                                                                在connection类中定义一个type:表示该连接是监听sockfd还是普通sockfd;在Reactor类中定义一批方法集(读 写 异常方法),对外提供接口设置这批方法(也就是先在外部bind);从此我们AddConnection()函数添加连接是通过type判断是那种连接需要添加从而进行注册对应的方法(注册进connection类的成员方法中 ,后面等新事件到来时就能通过connection找到要执行的方法从而回调出去!)

在connection类中定义一个Reactor* R指针?   进行回指找到Reacor中的AddConnection()函数(在获取连接时我们要把新连接添加到unordered_map中要通过Reactor对象才能实现)


完整源代码

未来服务器将不会有sockfd的概念,只有一个一个的Connection

//Connection.hpp
#pragma once
#include <string>
#include <functional>

#define ListenConnection 0
#define NornalConnection 1

class Connection;
class Reactor;
using handler = std::function<void(Connection *)>;


class Connection
{
public:
    Connection(int sockfd)
        : _sockfd(sockfd)
    {
    }
    ~Connection()
    {
    }

    void SetEvent(uint32_t event)
    {
        _event = event;
    }
    void SetReactor(Reactor *R)
    {
        _R = R;
    }
    void SetType(int type)
    {
        _type = type;
    }
    void SetAddr(InetAddr &addr)
    {
        _addr = addr;
    }
    void AppendInbuffer(const std::string &in)
    {
        _inbuffer += in;
    }
    void AppendOutbuffer(const std::string &in)
    {
        _outbuffer += in;
    }

    int Sockfd()
    {
        return _sockfd;
    }
    uint32_t Event()
    {
        return _event;
    }
    handler Recver()
    {
        return _recver;
    }
    handler Sender()
    {
        return _sender;
    }
    handler Excepter()
    {
        return _excepter;
    }
    Reactor *R()
    {
        return _R;
    }
    int Type()
    {
        return _type;
    }
    std::string &Inbuffer() // 引用!
    {
        return _inbuffer;
    }
    InetAddr Addr()
    {
        return _addr;
    }
    std::string &Outbuffer()
    {
        return _outbuffer;
    }

    void RegisterHandler(handler recver, handler sender, handler excepter)
    {
        _recver = recver;
        _sender = sender;
        _excepter = excepter;
    }

    void DiscardOutbuffer(ssize_t n)
    {
        _outbuffer.erase(0, n);
    }

    void Close()
    {
        if (_sockfd >= 0)
            ::close(_sockfd);
    }

private:
    int _sockfd;
    uint32_t _event;
    std::string _inbuffer;
    std::string _outbuffer;

    handler _recver;   // 处理读事件
    handler _sender;   // 处理写事件
    handler _excepter; // 处理异常事件

    Reactor *_R; // 定义了一个Reactor指针?

    int _type;
    InetAddr _addr;
};

Connection被Reactor进行管理 

//Reactor.hpp
#pragma once
#include <unordered_map>
#include <memory>

#include "Multiplex.hpp"
#include "Connection.hpp"

// 只对Connection进行管理
class Reactor
{
    const static int gnum = 128;

public:
    Reactor()
        : _epoller(std::make_unique<Epoller>()), _isrunning(false)
    {
    }
    ~Reactor()
    {
    }
    // 我不想让外部去new Connection对象
    // void AddConnection(int sockfd, uint32_t event, handler recver, handler sender, handler excepter)
    void AddConnection(int sockfd, uint32_t event, InetAddr &addr, int type)
    {
        // 构建Connection -- 不要用智能指针!!!
        Connection *conn = new Connection(sockfd);
        conn->SetEvent(event);
        conn->SetReactor(this); // 设置conn与R的回指
        conn->SetType(type);    // 设置connection的类型
        conn->SetAddr(addr);
        // 注册读写异常方法
        // conn->RegisterHandler(recver, sender, excepter);
        if (conn->Type() == ListenConnection)
            conn->RegisterHandler(_OnAccepter, nullptr, nullptr);
        if (conn->Type() == NornalConnection)
            conn->RegisterHandler(_OnRecver, _OnSender, _OnExcepter);
        // epoll托管 - epoll模型进行封装
        if (_epoller->AddEvent(conn->Sockfd(), conn->Event()))
            _conn.insert({sockfd, conn}); // TcpServer管理
    }
    void Dispatcher()
    {
        int timeout = -1;
        _isrunning = true;
        while (_isrunning)
        {
            LoopOnce(timeout);
            // Do Other Thing
            PrintDebug();
        }
        _isrunning = false;
    }
    void LoopOnce(int timeout)
    {
        int n = _epoller->WaitEvent(revs, gnum, timeout);
        for (int i = 0; i < n; i++)
        {
            int sockfd = revs[i].data.fd;
            uint32_t event = revs[i].events;
            if (event & EPOLLERR)
                event |= (EPOLLIN | EPOLLOUT);
            if (event & EPOLLHUP)
                event |= (EPOLLIN | EPOLLOUT);
            if (event & EPOLLIN)
            {
                if (IsConnExist(sockfd) && _conn[sockfd]->Recver())
                    _conn[sockfd]->Recver()(_conn[sockfd]); // 写事件派发
            }
            if (event & EPOLLOUT)
            {
                if (IsConnExist(sockfd) && _conn[sockfd]->Sender())
                    _conn[sockfd]->Sender()(_conn[sockfd]); // 读事件派发
            }
        }
    }
    bool IsConnExist(int sockfd)
    {
        return _conn.find(sockfd) != _conn.end();
    }

    void SetListenConnection(handler OnAccepter)
    {
        _OnAccepter = OnAccepter;
    }
    void SetNornalConnection(handler OnRecver, handler OnSender, handler OnExcepter)
    {
        _OnRecver = OnRecver;
        _OnSender = OnSender;
        _OnExcepter = OnExcepter;
    }

    handler OnAccepter()
    {
        return _OnAccepter;
    }
    handler OnRecver()
    {
        return _OnRecver;
    }
    handler OnSender()
    {
        return _OnSender;
    }
    handler OnExcepter()
    {
        return _OnExcepter;
    }

    void PrintDebug()
    {
        std::string str;
        for (auto &conn : _conn)
        {
            str += std::to_string(conn.second->Sockfd()) + " ";
        }
        LOG(DEBUG, "conn list:%s\n", str.c_str());
    }

    void EnableReadWriteEvent(int sockfd, bool read, bool write)
    {
        if (!IsConnExist(sockfd))
            return;
        uint32_t event = (read ? EPOLLIN : 0) | (write ? EPOLLOUT : 0) | EPOLLET;
        // 修改
        _conn[sockfd]->SetEvent(event);
        // 设置进内核
        _epoller->ModEvent(_conn[sockfd]->Sockfd(), _conn[sockfd]->Event());
    }

    void DelConnection(int sockfd)
    {
        // 进行安全检测
        if (!IsConnExist(sockfd))
            return;
        LOG(INFO, "client %s quit,server delete all sources\n", _conn[sockfd]->Addr().User().c_str());
        // 先从epoller中移除->细节!epoller移除的sockfd必须是健康的
        // EnableReadWriteEvent(sockfd,false,false);
        _epoller->DelEvent(sockfd);
        // 关闭sockfd
        _conn[sockfd]->Close();
        // 从_conn中去除
        _conn.erase(sockfd);
    }

private:
    // sockfd与connection进行映射
    std::unordered_map<int, Connection *> _conn;
    std::unique_ptr<Multiplex> _epoller;
    bool _isrunning;
    epoll_event revs[gnum];
    // 设置方法集进行统一处理
    handler _OnAccepter;
    handler _OnRecver;
    handler _OnSender;
    handler _OnExcepter;
};

epoll接口封装成对象

//Multiplex.hpp
#pragma once
#include <sys/epoll.h>

#include "Log.hpp"
#include "Comm.hpp"

using namespace log_ns;
class Multiplex
{
public:
    virtual bool AddEvent(int sockfd, uint32_t event) = 0;
    virtual int WaitEvent(struct epoll_event revt[], int num, int timeout) = 0;
    virtual bool ModEvent(int sockfd, uint32_t event) = 0;
    virtual bool DelEvent(int sockfd) = 0;
};

class Epoller : public Multiplex
{
    const static int size = 128;

private:
    bool EventHelper(int sockfd, uint32_t event, int oper)
    {
        struct epoll_event evt;
        evt.data.fd = sockfd;
        evt.events = event;
        int n = ::epoll_ctl(_epfd, oper, sockfd, &evt);
        if (n < 0)
        {
            LOG(ERROR, "epoll_ctl failed\n");
            return false;
        }
        LOG(INFO, "epoll_ctl %d events is %s sucess\n", sockfd, EventToString(event).c_str());
        return true;
    }

public:
    Epoller()
    {
        _epfd = ::epoll_create(size);
        if (_epfd < 0)
        {
            LOG(FATAL, "epoll_create error\n");
            exit(EPOLL_CTL_ERROR);
        }
        LOG(INFO, "epoll_create sucess efd: %d\n", _epfd);
    }
    ~Epoller()
    {
    }
    std::string EventToString(uint32_t event)
    {
        std::string str;
        if (event & EPOLLIN)
            str += "EPOLLIN";
        if (event & EPOLLOUT)
            str += "|EPOLLOUT";
        if (event & EPOLLET)
            str += "|EPOLLET";
        return str;
    }
    virtual bool AddEvent(int sockfd, uint32_t event) override
    {
        return EventHelper(sockfd, event, EPOLL_CTL_ADD);
    }
    virtual bool ModEvent(int sockfd, uint32_t event) override
    {
        return EventHelper(sockfd, event, EPOLL_CTL_MOD);
    }
    virtual bool DelEvent(int sockfd) override
    {
        return EventHelper(sockfd, 0, EPOLL_CTL_DEL);
    }
    virtual int WaitEvent(epoll_event revs[], int num, int timeout) override
    {
        return ::epoll_wait(_epfd, revs, num, timeout);
    }

private:
    int _epfd;
};

// class Poller:private Multiplex
// {

// };

// class Select:private Multiplex
// {

// };

对listensockfd的封装,内含读事件(连接)方法

//Listener.hpp
#pragma once
#include <memory>

#include "Socket.hpp"
#include "Connection.hpp"
#include "Log.hpp"
#include "Reactor.hpp"

using namespace socket_ns;
using namespace log_ns;
class Listener
{
public:
    Listener(uint16_t port)
        : _port(port), _ListenSocket(std::make_unique<TcpSocket>())
    {
        _ListenSocket->Tcp_ServerSocket(port);
    }

    int Listensockfd()
    {
        return _ListenSocket->Sockfd();
    }

    // listensockfd 读方法
    void Accepter(Connection *conn)
    {
        while (true)
        {
            errno = 0;
            InetAddr addr;
            int code = 0;
            int sockfd = _ListenSocket->AcceptSocket(&addr, &code);
            if (sockfd > 0)
            {
                LOG(INFO, "get a new link %s sockfd: %d\n", addr.User().c_str(), sockfd);
                conn->R()->AddConnection(sockfd, EPOLLIN | EPOLLET, addr, NornalConnection); 
            }
            else
            {
                if (code == EWOULDBLOCK)
                {
                    LOG(INFO, "gain connection is finished\n");
                    break;
                }
                else if (code == EINTR)
                {
                    continue;
                }
                else
                {
                    LOG(ERROR, "gain connection failed\n");
                    break;
                }
            }
        }
    }

private:
    uint16_t _port;
    std::unique_ptr<Socket> _ListenSocket;
};

//Socket.hpp
#pragma once
#include <iostream>
#include <cstring>
#include <functional>
#include <unistd.h>
#include <memory>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include <pthread.h>

#include "Log.hpp"
#include "InetAddr.hpp"
#include "Comm.hpp"

namespace socket_ns
{
    using namespace log_ns;
    const static int gbacklog = 8;

    // 模板方法模式
    class Socket
    {
    public:
        virtual void CreateSocket() = 0;
        virtual void InitSocket(uint16_t port, int backlog = gbacklog) = 0;
        virtual int AcceptSocket(InetAddr *addr, int *code) = 0;              // 对象/变量
        virtual bool ConnectSocket(uint16_t port, const std::string &ip) = 0; // clinet连接成功与失败

        virtual int Sockfd() = 0;
        virtual void Close() = 0;
        virtual ssize_t Recv(std::string *out) = 0;
        virtual ssize_t Send(const std::string &in) = 0;

        virtual void ReuseAddr() = 0;

    public:
        void Tcp_ServerSocket(uint16_t port)
        {
            CreateSocket();
            ReuseAddr();
            InitSocket(port);
        }
        bool Tcp_ClientSocket(uint16_t port, const std::string &ip)
        {
            CreateSocket();
            return ConnectSocket(port, ip);
        }
    };

    class TcpSocket : public Socket
    {
    public:
        TcpSocket()
        {
        }
        TcpSocket(int sockfd)
            : _sockfd(sockfd)
        {
        }

        ~TcpSocket()
        {
        }
        virtual void CreateSocket() override
        {
            _sockfd = ::socket(AF_INET, SOCK_STREAM, 0);
            if (_sockfd < 0)
            {
                LOG(FATAL, "socket fail\n");
                exit(SOCK_ERROR);
            }
            LOG(INFO, "socket sucess sockfd: %d\n", _sockfd);
            NoBlock(_sockfd); // 设置非阻塞
        }
        virtual void InitSocket(uint16_t port, int backlog) override
        {
            struct sockaddr_in perr;
            memset(&perr, 0, sizeof(perr));
            perr.sin_family = AF_INET;
            perr.sin_port = htons(port);
            perr.sin_addr.s_addr = INADDR_ANY;
            if (::bind(_sockfd, (struct sockaddr *)&perr, sizeof(perr)) < 0)
            {
                LOG(FATAL, "bind fail\n");
                exit(BIND_ERROR);
            }
            if (::listen(_sockfd, backlog) < 0)
            {
                LOG(ERROR, "listen fail\n");
                exit(LISTEN_ERROR);
            }
        }
        virtual int AcceptSocket(InetAddr *addr, int *code) override
        {
            struct sockaddr_in client;
            socklen_t len = sizeof(client);
            int sockfd = ::accept(_sockfd, (struct sockaddr *)&client, &len);
            *code = errno;
            if (sockfd < 0)
            {
                return -1;
            }
            NoBlock(sockfd); // 设置非阻塞
            *addr = client;
            return sockfd;
        }

        virtual bool ConnectSocket(uint16_t port, const std::string &ip) override
        {
            struct sockaddr_in server;
            memset(&server, 0, sizeof(server));
            server.sin_family = AF_INET;
            server.sin_port = htons(port);
            inet_pton(AF_INET, ip.c_str(), &server.sin_addr);
            int n = connect(_sockfd, (struct sockaddr *)&server, sizeof(server));
            if (n < 0)
            {
                return false;
            }
            return true;
        }

        virtual int Sockfd() override
        {
            return _sockfd;
        }
        virtual void Close() override
        {
            if (_sockfd > 0)
            {
                ::close(_sockfd);
            }
        }
        virtual ssize_t Recv(std::string *out) override
        {
            char buffer[4096];
            ssize_t n = ::recv(_sockfd, buffer, sizeof(buffer) - 1, 0);
            if (n > 0)
            {
                buffer[n] = 0;
                *out += buffer; // 细节
            }
            return n;
        }
        virtual ssize_t Send(const std::string &in) override
        {
            return ::send(_sockfd, in.c_str(), in.size(), 0);
        }
        virtual void ReuseAddr() override
        {
            int opt = 1;
            ::setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
        }

    private:
        int _sockfd; // 两个角色
    };
    // class UdpServer:public Socket
    //{}
}

//Log.hpp
#pragma once
#include <pthread.h>
#include <fstream>
#include <syscall.h>
#include <stdarg.h>
#include <unistd.h>
#include <cstring>

namespace log_ns
{
    enum
    {
        DEBUG = 1,
        INFO,
        WARNING,
        ERROR,
        FATAL
    };

    std::string Getlevel(int level)
    {
        switch (level)
        {
        case DEBUG:
            return "DEBUG";
            break;
        case INFO:
            return "INFO";
            break;
        case WARNING:
            return "WARNING";
            break;
        case ERROR:
            return "ERROR";
            break;
        case FATAL:
            return "FATAL";
            break;
        default:
            return "";
            break;
        }
    }

    std::string Gettime()
    {
        time_t now = time(nullptr);
        struct tm *time = localtime(&now);
        char buffer[128];
        snprintf(buffer, sizeof(buffer), "%d-%02d-%02d %02d:%02d:%02d",
                 time->tm_year + 1900,
                 time->tm_mon + 1,
                 time->tm_mday,
                 time->tm_hour,
                 time->tm_min,
                 time->tm_sec);
        return buffer;
    }

    struct log_message
    {
        std::string _level;
        int _id;
        std::string _filename;
        int _filenumber;
        std::string _cur_time;
        std::string _message;
    };

#define SCREAM_TYPE 1
#define FILE_TYPE 2

#define DEVELOP 3
#define OPERATION 4

    const std::string gpath = "./log.txt";
    pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;

    class log
    {
    public:
        log(const std::string &path = gpath, const int status = DEVELOP)
            : _mode(SCREAM_TYPE), _path(path), _status(status)
        {
        }
        void SelectMode(int mode)
        {
            _mode = mode;
        }
        void SelectStatus(int status)
        {
            _status = status;
        }

        void PrintScream(const log_message &le)
        {
            printf("[%s][%d][%s][%d][%s] %s",
                   le._level.c_str(),
                   le._id,
                   le._filename.c_str(),
                   le._filenumber,
                   le._cur_time.c_str(),
                   le._message.c_str());
        }
        void PrintFile(const log_message &le)
        {
            std::fstream in(_path, std::ios::app);
            if (!in.is_open())
                return;
            char buffer[1024];
            snprintf(buffer, sizeof(buffer), "[%s][%d][%s][%d][%s] %s",
                     le._level.c_str(),
                     le._id,
                     le._filename.c_str(),
                     le._filenumber,
                     le._cur_time.c_str(),
                     le._message.c_str());
            in.write(buffer, strlen(buffer)); // 不用sizeof
            in.close();
        }
        void PrintLog(const log_message &le)
        {
            // 过滤
            if (_status == OPERATION)
                return;

            // 线程安全
            pthread_mutex_lock(&gmutex);

            switch (_mode)
            {
            case SCREAM_TYPE:
                PrintScream(le);
                break;
            case FILE_TYPE:
                PrintFile(le);
                break;
            default:
                break;
            }

            pthread_mutex_unlock(&gmutex);
        }
        void logmessage(const std::string &filename, int filenumber, int level, const char *message, ...)
        {
            log_message le;
            le._level = Getlevel(level);
            le._id = syscall(SYS_gettid);
            le._filename = filename;
            le._filenumber = filenumber;
            le._cur_time = Gettime();

            va_list vt;
            va_start(vt, message);
            char buffer[128];
            vsnprintf(buffer, sizeof(buffer), message, vt);
            va_end(vt);
            le._message = buffer;

            // 打印日志
            PrintLog(le);
        }
        ~log()
        {
        }

    private:
        int _mode;
        std::string _path;
        int _status;
    };

    // 方便上层调用
    log lg;
// ##不传时可忽略参数
#define LOG(level, message, ...)                                          \
    do                                                                    \
    {                                                                     \
        lg.logmessage(__FILE__, __LINE__, level, message, ##__VA_ARGS__); \
    } while (0)

#define SleftScream()               \
    do                              \
    {                               \
        lg.SelectMode(SCREAM_TYPE); \
    } while (0)
#define SleftFile()               \
    do                            \
    {                             \
        lg.SelectMode(FILE_TYPE); \
    } while (0)

}

//InetAddr.hpp
#pragma once
#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

class InetAddr
{
    void ToHost()
    {
        //_ip=inet_ntoa(_addr.sin_addr);//4字节地址->char*
        _port = ntohs(_addr.sin_port);

        char buffer[124];
        inet_ntop(AF_INET, &_addr.sin_addr, buffer, sizeof(buffer));
        _ip = buffer;
    }

public:
    InetAddr(const std::string &ip, uint16_t port)
    {
        _ip = ip;
        _port = port;
        _addr.sin_family = AF_INET6;
        _addr.sin_port = htons(port);
        _addr.sin_addr.s_addr = INADDR_ANY;
    }

    InetAddr()
    {
    }
    InetAddr(const struct sockaddr_in &addr)
        : _addr(addr)
    {
        ToHost();
    }

    std::string Ip()
    {
        return _ip;
    }

    uint16_t Port()
    {
        return _port;
    }

    struct sockaddr_in Addr()
    {
        return _addr;
    }

    bool operator==(const InetAddr &ad)
    {
        return (this->_ip == ad._ip && this->_port == ad._port);
    }

    std::string User()
    {
        std::string tmp = _ip + ":" + std::to_string(_port);
        return tmp;
    }

private:
    std::string _ip;
    uint16_t _port;
    struct sockaddr_in _addr;
};

对读 写 异常 方法进行封装 

//HandlerConnection.hpp
#pragma once
#include "Connection.hpp"
#include "Log.hpp"

using namespace log_ns;

class HandlerConnection
{
    static const int gsize = 1024;

public:
    HandlerConnection(handler server)
    {
        _server = server;
    }
    // 就绪的conn
    void HandlerRecver(Connection *conn)
    {
        while (true)
        {
            errno = 0;
            char bufferstr[gsize];
            ssize_t n = ::recv(conn->Sockfd(), bufferstr, sizeof(bufferstr), 0);
            if (n > 0)
            {
                bufferstr[n] = 0;
                conn->AppendInbuffer(bufferstr);
            }
            else
            {
                if (errno == EWOULDBLOCK)
                {
                    break;
                }
                else if (errno == EINTR)
                {
                    continue;
                }
                else
                {
                    conn->Excepter()(conn); // 统一处理异常
                    return;                 // 这里是返回!
                }
            }
        }
        // 读取完成 - 进行内容解析(处理粘包问题)
        // 交给上层
        _server(conn);
    }
    void HandlerSender(Connection *conn)
    {
        // 直接写
        while (true)
        {
            errno = 0;
            ssize_t n = ::send(conn->Sockfd(), conn->Outbuffer().c_str(), conn->Outbuffer().size(), 0);
            if (n > 0)
            {
                conn->DiscardOutbuffer(n);
            }
            else if (n == 0)
            {
                break;
            }
            else
            {
                if (errno == EWOULDBLOCK)
                {
                    break;
                }
                else if (errno == EINTR)
                {
                    continue;
                }
                else
                {
                    conn->Excepter()(conn);
                    return;
                }
            }
        }
        // 一定是读条件不就绪(发送缓冲区满了)
        if (!conn->Outbuffer().empty())
        {
            // 开启读事件->剩下的数据epoll会自动帮我发完
            conn->R()->EnableReadWriteEvent(conn->Sockfd(), true, true);
        }
        else
        {
            conn->R()->EnableReadWriteEvent(conn->Sockfd(), true, false);
        }
    }
    void HandlerExcepter(Connection *conn)
    {
        conn->R()->DelConnection(conn->Sockfd());
    }

private:
    handler _server;
};

读事件要进行报文解析,读到完整请求后进行处理,生成应答后发出!

//Protocol.hpp
#pragma once
#include <iostream>
#include <memory>
#include <jsoncpp/json/json.h>

// #define SELF 1

// "len\r\n{json}\r\n" -- 自己设计出完整报文 len json的长度
// "\r\n" 第一个:区分len 和json边界 第二个:观察现象方便
const std::string sym = "\r\n";
const std::string space = " ";

// jsonstr变成完整报文
std::string Encode(const std::string &jsonstr)
{
    int len = jsonstr.size();
    return std::to_string(len) + sym + jsonstr + sym;
}

// 把json提取出来
// "len\r"
// "len\r\n{json}\r"
// "len\r\n{json}\r\n"
// "len\r\n{json}\r\n""len\r\n{json}\r\n""len\r\n{j"
std::string Decode(std::string &nameplate)
{
    size_t pos = nameplate.find(sym);
    if (pos == std::string::npos)
    {
        return "";
    }
    std::string lenstr = nameplate.substr(0, pos);
    int len = std::stoi(lenstr);
    // 计算出完整报文长度
    int TotalLen = lenstr.size() + sym.size() + len + sym.size();
    if (nameplate.size() < TotalLen)
    {
        return "";
    }
    // 读报文
    std::string jsonstr = nameplate.substr(pos + sym.size(), len);
    // 删报文
    nameplate.erase(0, TotalLen);
    return jsonstr;
}

class req
{
public:
    req()
    {
    }
    req(int x, int y, std::string &sym)
        : _x(x), _y(y), _sym(sym)
    {
    }
    // 结构化->字符串
    void Serialize(std::string *out)
    {
#ifdef SELF
        // 1.自己做 -> "_x _sym _y"
        // 2.使用现成库:jsoncpp
        Json::Value root;
        root["x"] = _x;
        root["y"] = _y;
        root["sym"] = _sym;

        Json::FastWriter writer;
        *out = writer.write(root);
#else
        //"len\r\n {_x _sym _y} \r\n"
        std::string x = std::to_string(_x);
        std::string y = std::to_string(_y);
        *out = x + space + _sym + space + y;

#endif
    }
    // 字符串->结构化
    bool Deserialize(std::string &in)
    {
#ifdef SELF
        Json::Value root;
        Json::Reader reader;
        bool res = reader.parse(in, root);
        if (!res)
            return false;
        _x = root["x"].asInt();
        _y = root["y"].asInt();
        _sym = root["sym"].asString();
        return true;
#else
        //"len\r\n {_x _sym _y} \r\n"
        auto left_space = in.find(space);
        if (left_space == std::string::npos)
            return false;
        auto right_space = in.rfind(space);
        if (right_space == std::string::npos)
            return false;
        if (left_space + space.size() + 1 != right_space)
            return false;

        std::string x = in.substr(0, left_space);
        if (x.empty())
            return false;
        std::string sym = in.substr(left_space + space.size(), right_space);
        if (sym.empty())
            return false;
        std::string y = in.substr(right_space + space.size());
        if (y.empty())
            return false;
        _x = std::stoi(x.c_str());
        _sym = sym;
        _y = std::stoi(y.c_str());
        return true;

#endif
    }
    void SetValue(int x, int y, char sym)
    {
        _x = x;
        _y = y;
        _sym = sym;
    }
    ~req() {}

    int _x;
    int _y;
    std::string _sym; //-x +-*/ _y
};

class rep
{
public:
    rep()
        : _result(0), _code(0), _des("sucess")
    {
    }
    void Serialize(std::string *out)
    {
#ifdef SELF
        Json::Value root;
        root["result"] = _result;
        root["code"] = _code;
        root["des"] = _des;

        Json::FastWriter writer;
        *out = writer.write(root);
#else
        std::string result = std::to_string(_result);
        std::string code = std::to_string(_code);
        *out = result + space + code + space + _des;
#endif
    }
    bool Deserialize(std::string &in)
    {
#ifdef SELF
        Json::Value root;
        Json::Reader reader;
        bool res = reader.parse(in, root);
        if (!res)
            return false;
        _result = root["result"].asInt();
        _code = root["code"].asInt();
        _des = root["des"].asString();
        return true;
#else
        auto left_space = in.find(space);
        if (left_space == std::string::npos)
            return false;
        auto right_space = in.rfind(space);
        if (right_space == std::string::npos)
            return false;
        if (left_space + space.size() + 1 != right_space)
            return false;

        std::string result = in.substr(0, left_space);
        if (result.empty())
            return false;
        std::string code = in.substr(left_space + space.size(), right_space);
        if (code.empty())
            return false;
        std::string des = in.substr(right_space + space.size());
        if (des.empty())
            return false;
        _result = std::stoi(result.c_str());
        _code = std::stoi(code.c_str());
        _des = des;
        return true;
#endif
    }
    ~rep() {}

    int _result;
    int _code; // 0-> sucess 1->div zero 2->mod zero 3->fail
    std::string _des;
};

// 工厂模式
class Factory
{
public:
    static std::shared_ptr<req> BuidRequest()
    {
        return std::make_shared<req>();
    }
    static std::shared_ptr<rep> BuidReponse()
    {
        return std::make_shared<rep>();
    }
};

//PackageParse.hpp
#pragma once
#include <iostream>
#include <functional>

#include "Log.hpp"
#include "Protocol.hpp"
#include "Connection.hpp"
using namespace log_ns;

using protocol_t = std::function<std::shared_ptr<rep>(std::shared_ptr<req>)>;

class PackageParse
{
public:
    PackageParse(protocol_t process)
        : _process(process)
    {
    }
    ~PackageParse()
    {
    }
    void server(Connection *conn)
    {
        while (true)
        {
            // 报文解析 -- 不能保证读到完整的报文
            std::string message = ::Decode(conn->Inbuffer());
            if (message == "")
                break;

            // 反序列化 -- 能保证读到完整报文
            std::shared_ptr<req> q = Factory::BuidRequest();
            q->Deserialize(message);

            // 处理事务
            auto p = _process(q);

            // 序列化处理
            std::string result;
            p->Serialize(&result);

            LOG(DEBUG, "client request finish: %s\n", result.c_str());
            // 添加报头
            result = Encode(result);

            // 发送
            conn->AppendOutbuffer(result);
            // break;
        }
        // 至少处理了一个请求,有一个应答
        // 方法1:直接发 - 单进程
        if (!conn->Outbuffer().empty()) conn->Sender()(conn);

        // 方法2:激活对事件的关心即可,IO全都是由Reactor处理 -> 解决线程安全
        // 半同步半异步模式
        //if (!conn->Outbuffer().empty()) conn->R()->EnableReadWriteEvent(conn->Sockfd(),true,true);

        
    }

private:
    protocol_t _process;
};

//Netcal.hpp
#pragma once
#include "Protocol.hpp"

// req -> rep
class Cal
{
public:
    Cal() {}
    ~Cal() {}
    std::shared_ptr<rep> Count(std::shared_ptr<req> q)
    {
        auto p = Factory::BuidReponse();
        const char *a = q->_sym.c_str();
        switch (*a)
        {
        case '+':
            p->_result = q->_x + q->_y;
            break;
        case '-':
            p->_result = q->_x - q->_y;
            break;
        case '*':
            p->_result = q->_x * q->_y;
            break;
        case '/':
        {
            if (q->_y == 0)
            {
                p->_result = -1;
                p->_code = 1;
                p->_des = "div zero";
            }
            else
            {
                p->_result = q->_x / q->_y;
            }
            break;
        }
        case '%':
        {
            if (q->_y == 0)
            {
                p->_result = -1;
                p->_code = 2;
                p->_des = "mod zero";
            }
            else
            {
                p->_result = q->_x % q->_y;
            }
            break;
        }
        default:
            p->_result = -1;
            p->_code = 3;
            p->_des = "illegal operation";
            break;
        }
        return p;
    }
};

现象 

对单进程版Reactor进行改进:添加线程池

另外一种方案

实现思路

以上便是IO模型的全部内容,有错误欢迎在评论区指正,感谢观看!


http://www.kler.cn/a/374153.html

相关文章:

  • Cuebric:用AI重新定义3D创作的未来
  • 关于springboot跨域与拦截器的问题
  • Android Preference浅析(设置Setting)
  • 美团嵌入式面试题及参考答案(无人机团队)
  • 达梦数据迁移工具DTS使用实践
  • Ollama接口文档中文版
  • linux命令之top(Linux Command Top)
  • day14:RSYNC同步
  • 第72期 | GPTSecurity周报
  • 书生-第四期闯关:完成SSH连接与端口映射并运行hello_world.py
  • 如何使用 Vue CLI 创建 Vue 项目?
  • Java迭代器:深入理解与应用
  • 二百七十四、Kettle——ClickHouse中对错误数据表中进行数据修复(实时)
  • Spark集群管理脚本详解
  • 【数据结构-邻项消除】力扣2211. 统计道路上的碰撞次数
  • UDP-鼠李糖合成酶基因的克隆与鉴定-文献精读76
  • 系统学习CFD,常见收敛问题、及如何与机器学习相结合
  • zynq PS端跑Linux响应中断
  • windows下安装python库wordCloud报错
  • 【PGCCC】Postgresql BgWriter 原理
  • Java实现数据去重的几种方案及其去重原理
  • 【skywalking】监控 Spring Cloud Gateway 数据
  • flask框架用法介绍(一)
  • 从零学习大模型(十)-----剪枝基本概念
  • 【SSE】前端vue3使用SSE,EventSource携带请求头
  • H2 Database IDEA 源码 DEBUG 环境搭建