当前位置: 首页 > article >正文

【Linux网络-poll与epoll】epollserver设计(两个版本 Reactor)+epoll理论补充(LT ET)

四、epollserver设计

Main.cc

epollserver.cc

类基本设计:创建监听套接字

 创建epoll实例对象,其中size = 128(只要是大于0都可以)

初始化不一定先有,但一定要先给LOOP循环,服务器是不能挂掉的

接着初始化服务器,处理链接,新链接到来,我们认为是读事件就绪

把新链接的fd交给epoll管理

 接着等待事件就绪,也就是处理LOOP

【测试】服务器正常,并且每1s轮询检测一次;当新连接连入的时候,事件就绪,狂输出(LT模式 + 就绪后我们没有进行处理任务)

接着进行处理事件,就绪事件放在struct epoll_event revs[num]缓冲区中

我们直接低耦合编写,分模块

事件就绪后,

接受新链接

并且将新得到的sockfd添加到epoll中,让epoll检测底层有没有数据,设计的时候也只监视读事件

本质:添加Accept返回值sockfd,到RBTree中

处理IO

有数据就读数据

没有数据,或者读错误,一定要先移除epoll中的fd,再close

 再将析构补上,以及将struct epoll_event revs[num]; 定义在成员变量中

private:
    uint16_t _port;
    std::unique_ptr<Socket> _listensock;
    int _epfd;
    struct epoll_event revs[num]; // 128

public:
    ~EpollServer()
    {
        if (_epfd >= 0)
            ::close(_epfd);
        _listensock->Close();
    }

【测试】一切正常,能发数据能收数据,因为我们处理事件了,也不会死循环一直输出了

总结

总结一下,epoll的使用过程就是三部曲:

  • 调用epoll_create 创建一个epoll句柄

  • 调用epoll_ctl,将要监控的文件描述符进行注册

  • 调用epoll_wait,等待文件描述符就绪

epoll的优点(和select的缺点对应)

  • 接口使用方便:虽然拆分成了三个函数,但是反而使用起来更方便高效。不需要每次循环都设置关注的文件描述符,也做到了输入输出参数分离开

  • 数据拷贝轻量:只在合适的时候调用 EPOLL_CTL_ADD 将文件描述符结构拷贝到内核中,这个操作并不频繁(而select/poll 都是每次循环都要进行拷贝)

  • 事件回调机制:避免使用遍历,而是使用回调函数的方式,将就绪的文件描述符结构加入到就绪队列中,epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪. 这个操作时间复杂度 O(1).即使文件描述符数目很多,效率也不会受到影响.

  • 没有数量限制:文件描述符数目无上限.

注意:网上有些博客说:epoll中使用了内存映射机制

  • 内存映射机制:内核直接将就绪队列通过mmap 的方式映射到用户态. 避免了拷贝内存这样的额外性能开销.

这种说法是不准确的,我们定义的struct epoll_event是我们在用户空间中分配好的内存,势必还是需要将内核的数据拷贝到这个用户空间的内存中的

五、epoll补充理论

1.epoll工作方式

epoll有两种工作模式,水平触发(LT)工作模式,边缘触发(ET)工作模式

【例子引入】现在有张三,李四两名快递员;小王有5个包裹,其中3个包裹分配给张三派送,2个包裹分给李四派送

小王住小区的5楼

  • 张三是一个非常负责任的快递员,首先张三打电话给小王,”小王这里你的3个快递到了,麻烦你下楼取一下”,张三下楼,只拿走一个快递;张三一看这不对啊,于是又打电话给小王“你的快递没有取完”,只要底层有快递,就一直通知上层小王

  • 李四是按照公司规则进行通知,李四刚到小区楼下给小王打电话,通知快递到了,如果小王没有一次性把快递全部拿走,小王就不通知了,小王只通知一次;这时候张三把剩下快递交给李四,李四才会再次打电话给小王,通知一次有快递到了;包裹从无到有,从有到多的时候(数据量在内核层面“变化”的时候),才会通知上层

这里的张三就是水平触发(LT)工作模式、李四就是边缘触发(ET)工作模式

快递员:多路转接的系统调用;张三、李四,epoll的不同工作模式

小王的5个包裹:数据

小王:应用层

【例子】假如有这样的一个例子,我们已经把一个tcp socket添加到epoll描述符中,这个时候socket的另一端写入了2KB的数据,调用epoll_wait,并且它会返回,说明它已经准备好读取操作,然后调用read,只读取了1KB的数据,继续调用epoll_wait。。。

2.水平触发 Level Triggered工作模式

epoll默认状态下就是LT工作模式

只要底层有事件就绪,epoll 就会一直通知用户。类似于数字电路中的高电平触发一样,只要一直处于高电平,则会一直触发。

由于在 LT 工作模式下,只要底层有事件就绪就会一直通知用户,因此当 epoll 检测到底层读事件就绪时,可以不立即进行处理,或者只处理一部分,因为只要底层数据没有处理完,下一次 epoll 还会通知用户事件就绪。

  • 如上面的例子,由于只读了1K数据,缓冲区中还剩1K数据,在第二次调用epoll_wait时,epoll_wait仍然会立刻返回并通知socket读事件继续,直到缓冲区上所有的数据都被处理完,epoll_wait才不会立刻返回

  • 支持阻塞读写和非阻塞读写。

3.边缘触发 Edge Triggered 工作模式

如果在第 1 步将 socket 添加到 epoll 描述符的时候使用了 EPOLLET 标志,epoll 进入 ET 工作模式。

只有底层就绪事件数量由无到有或由有到多发生变化的时候,epoll 才会通知用户。类似于数字电路中的上升沿触发一样,只有当电平由低变高的那一瞬间才会触发。

  • 当epoll检测到socket上事件就绪时,必须立刻处理

  • 如上面的例子,虽然只读了1K的数据,缓冲区还剩1K的数据,在第二次调用epoll_wait的时候,epoll_wait 不会再返回了

  • 也就是说,ET模式下,文件描述符上的事件就绪后,只有一次处理机会

  • ET的性能比LT性能更高(epoll_wait返回的次数少了很多),Nginx默认采用ET模式使用epoll

  • 只支持非阻塞读写

select和poll其实也是工作在LT模式下,epoll既可以支持LT,也可以支持ET

4.对比LT和ET

LT是epoll的默认行为

使用ET能够减少epoll触发的次数,但是代价就是强逼着程序员一次响应就绪过程中就把所有的数据都处理完

相当于一个文件描述符就绪之后,不会反复被提示就绪,看起来就比LT更高效一些,但是在LT情况下如果也能做到每次就绪的文件描述符都立刻处理,不让这个就绪被重复提示的话,其实性能也是一样的

另一方面,ET的代码复杂程度更高了

ET vs LT更高效:

1.ET的通知效率更高

2.IO效率更高

ET模式下,只通知一次,本轮数据没读完,epoll不再通知;ET模式一旦就绪,就必须把数据全部读完;你怎么知道你把数据全部读完了?我们选择循环读取,直到读取不到为止,这样会引发阻塞问题,那就设计fd为非阻塞;所以ET工作模式下,所有的fd都必须是非阻塞的,LT模式下,fd阻塞或者非阻塞都行

5.理解ET模式和非阻塞文件描述符

使用ET 模式的epoll,需要将文件描述设置为非阻塞.这个不是接口上的要求,而是“工程实践"上的要求. 假设这样的场景:服务器接收到一个 10k 的请求,会向客户端返回一个应答数据. 如果客户端收不到应答,不会发送第二个 10k 请求.

如果服务端写的代码是阻塞式的 read,并且一次只 read 1k 数据的话(read 不能保证一次就把所有的数据都读出来,参考 man 手册的说明,可能被信号打断),剩下的 9k 数据就会待在缓冲区中。

此时由于 epoll 是 ET 模式,并不会认为文件描述符读就绪,epoll_wait 就不会再次返回,剩下的 9k 数据会一直在缓冲区中,直到下一次客户端再给服务器写数据,epoll_wait 才能返回。

但是问题来了,服务器只读到 1k 个数据,要 10k 读完才会给客户端返回响应数据。客户端要读到服务器的响应,才会发送下一个请求。客户端发送了下一个请求,epoll_wait 才会返回,才能去读缓冲区中剩余的数据。

所以,为了解决上述问题(阻塞 read 不一定能一下把完整的请求读完),于是就可以使用非阻塞轮询的方式来读缓冲区,保证一定能把完整的请求都读出来,而如果是 LT 没这个问题。只要缓冲区中的数据没读完,就能够让 epoll_wait 返回文件描述符读就绪。

6.epoll 的使用场景

epoll 的高性能是有一定的特定场景的。如果场景选择的不适宜,epoll 的性能可能适得其反。

对于多连接,且多连接中只有一部分连接比较活跃时,比较适合使用 epoll。例如,典型的一个需要处理上万个客户端的服务器,例如各种互联网 APP 的入口服务器,这样的服务器就很适合 epoll。

如果只是系统内部,服务器和服务器之间进行通信,只有少数的几个连接,这种情况下用 epoll 就并不合适,具体要根据需求和场景特点来决定使用哪种 IO 模型。

7.epoll 中的惊群问题(了解)

//待补充

8.EPOLLET选项

EPOLLET:将EPOLL设为边缘触发模式,这是相对于水平触发来说的。

select/poll/epoll(默认)一旦有事件就绪,但是我们不处理,底层一直通知我就绪,循环打印日志信息;使用EPOLLET就可以解决这个问题

我们修改代码部分内容,添加EPOLLET,并且我们不处理了事件就绪

此时测试不会再一顿输出了

六、epollserver设计版本2

第一份代码中的问题:在读取的过程中,你怎么保证buffer就是一个完整的请求?或者有多个请求??

解决方案:1.每一个fd都要有一个自己的缓冲区 2.引入协议

对文件描述符做管理,如何管理-->先描述,再组织

我们把所有文件描述符叫做连接,封装成一个Connection

从现在开始,在EPOLL中没有文件描述符,在往后写服务器时所有的描述都叫连接,包括Listen套接字

Connection.hpp
#pragma once

#include <iostream>
#include <string>
#include <functional>
#include <unistd.h>
#include "InetAddr.hpp"

#define ListenConnection 0
#define NormalConnection 1

class Connection;
class Reactor;
using handler_t = std::function<void(Connection *con)>;

// 未来我们的服务器,一切皆Connection,对我们来将listensockfd也是一样
class Connection
{
public:
    Connection(int sockfd) : _sockfd(sockfd)
    {
    }
    void RegisterHandler(handler_t recver, handler_t sender, handler_t excepter)
    {
        _handler_recver = recver;
        _handler_sender = sender;
        _handler_excepter = excepter;
    }
    void SetEvents(uint32_t events)
    {
        _events = events;
    }
    uint32_t Events()
    {
        return _events;
    }
    void SetConnectionType(int type)
    {
        _type = type;
    }
    int Type()
    {
        return _type;
    }
    int Sockfd()
    {
        return _sockfd;
    }
    void SetReactor(Reactor *R)
    {
        _R = R;
    }
    void SetAddr(const InetAddr &addr)
    {
        _addr = addr;
    }
    void AppendInbuffer(const std::string &in)
    {
        _inbuffer += in;
    }
    void AppendOutbuffer(const std::string &in)
    {
        _outbuffer += in;
    }
    std::string &Inbuffer()
    {
        return _inbuffer;
    }
    std::string &OutBuffer()
    {
        return _outbuffer;
    }
    void DiscardOutbuffer(int n)
    {
        _outbuffer.erase(0, n);
    }
    void Close()
    {
        if (_sockfd >= 0)
            ::close(_sockfd);
    }
    ~Connection()
    {
    }

private:
    int _sockfd;
    uint32_t _events;
    std::string _inbuffer; // 我们在这里,用string充当缓冲区
    std::string _outbuffer;
    int _type;

public:
    handler_t _handler_recver;   // 处理读取
    handler_t _handler_sender;   // 处理写入
    handler_t _handler_excepter; // 处理异常

    Reactor *_R; // 回指向自己所属的Reactor
    InetAddr _addr;
};
Epoller.hpp && Comm.hpp

我们还是epoll模型,那么我们也专门需要一个类,将事件添加到epoll模型中,以及设置事件类型,以及对事件的处理;这里我们使用纯虚函数,通过继承与多态,为了满足我们对模型的拓展

我们实现的事基于ET模式下epoll模型,由于ET模式下,事件只会通知一次,所以在读写期间,为了防止数据没有读完,我们需要将套接字设置为非阻塞状态;为了对模块进行解耦,我们把设置非阻塞状态单独封装在Comm.hpp中

#pragma once

#include <iostream>
#include <unistd.h>
#include <fcntl.h>

enum
{
    SOCKET_ERROR = 1,
    BIND_ERROR,
    LISTEN_ERROR,
    EPOLL_CREATE_ERROR
};

void SetNonBlock(int fd)
{
    int f1 = ::fcntl(fd, F_GETFL);
    if(f1 < 0)
    {
        std::cout << "fcntl error" << std::endl;
        return ;
    }
    ::fcntl(fd, F_SETFL, f1 | O_NONBLOCK);
}
#pragma once
#include <iostream>
#include <stdlib.h>
#include <sys/epoll.h>
#include "Log.hpp"
#include "Comm.hpp"

static const int gsize = 128;

using namespace log_ns;

class Multiplex
{
public:
    virtual bool AddEvent(int fd, uint32_t events) = 0;
    virtual int Wait(struct epoll_event revs[], int num, int timeout) = 0;
    virtual bool ModEvent(int fd, uint32_t events) = 0;
    virtual bool DelEvent(int fd) = 0;
};

class Epoller : public Multiplex
{
private:
    // 对添加事件 更改事件进行单独的封装
    bool ModEventHelper(int fd, uint32_t events, int oper)
    {
        // int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
        struct epoll_event ev;
        ev.events = events;
        ev.data.fd = fd;// 特别重要,通过这个字段,向上触发
        int n = :: epoll_ctl(_epfd, oper, fd, &ev);
        if(n < 0)
        {
            LOG(ERROR, "epoll_ctl %d events is %s failed\n", fd, EventsToString(events).c_str());
            return false;
        }
        LOG(INFO, "epoll_ctl %d events is %s seucees\n", fd, EventsToString(events).c_str());
        return true;
    }
public:
    Epoller() 
    {
        _epfd = ::epoll_create(gsize);
        if(_epfd < 0)
        {
            LOG(FATAL, "epoll create failed \n");
            exit(EPOLL_CREATE_ERROR);
        }
        LOG(INFO, "epoll create sucess, epfd: %d \n", _epfd);
    }

    // 将关心事件专程字符串 遍于观察
    std::string EventsToString(uint32_t events) 
    {
        std::string eventstr;
        if(events & EPOLLIN)
            eventstr = "EPOLLIN";
        if(events & EPOLLOUT)
            eventstr += " | EPOLLOUT";
        if(events & EPOLLET)
            eventstr += " | EPOLLET";
        
        return eventstr;
    }

    // 添加事件
    bool AddEvent(int fd, uint32_t events) override
    {
        return ModEventHelper(fd, events, EPOLL_CTL_ADD);
    }
    bool ModEvent(int fd, uint32_t events) override
    {
        return ModEventHelper(fd, events, EPOLL_CTL_MOD);
    }
    bool DelEvent(int fd) override
    {
        return 0 == ::epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);
    }
    int Wait(struct epoll_event revs[], int num, int timeout)
    {
        return ::epoll_wait(_epfd, revs, num, timeout);
    }
    ~Epoller()
    {
    }

private:
    int _epfd;
};
 
// 未来我们可以通过继承实现扩展
// class Poller : public Multiplex
// {

// };

// class Selector: public Multiplex
// {

// };
Listener.hpp

因为我们把连接分为一个一个的Connection,那么我们就需要单独处理Listen套接字,和正常套接字,这里我们先处理Listen套接字,Listen套接字的作用就是通过accept接收新连接

#pragma once
#include <iostream>
#include <memory>
#include "Socket.hpp"
#include "Connection.hpp"

// 该类未来统一进行listensock的管理工作,获取新连接
using namespace socket_ns;

class Listener
{
public:
    Listener(uint16_t port) : _port(port), _listensock(std::make_unique<TcpSocket>())
    {
        _listensock->BuildListenSocket(_port);
    }

    int ListenSockfd()
    {
        return _listensock->Sockfd();
    }

    void Accepter(Connection *conn)
    {
        while (true)
        {
            errno = 0; // 重新设置错误码
            InetAddr addr;
            int sockfd = _listensock->Accepter(&addr, &code);
            if (sockfd > 0)
            {
                // 该socket获取成功了,我们该怎么办??
                LOG(INFO, "获取连接成功, 客户端: %s:%d, sockfd: %d\n", addr.Ip().c_str(), addr.Port(), sockfd);
                // 对于listen套接字 我们只关心读事件
                conn->_R->AddConnection(sockfd, EPOLLIN | EPOLLET, addr, NormalConnection);
            }
            else
            {
                if (code == EWOULDBLOCK)
                {
                    LOG(INFO, "底层连接全部获取完毕\n");
                    break;
                }
                else if (code == EINTR)
                {
                    continue;
                }
                else
                {
                    LOG(ERROR, "获取连接失败.!\n");
                    break;
                }
            }
        }
    }

    ~Listener()
    {
    }

private:
    uint16_t _port;
    std::unique_ptr<Socket> _listensock;
};
Reactor.hpp

我们之前都写了一个TCP服务器,这里我们更个名叫Reactor,那么TCP rename to Reactor,那么说明Reactor还是要有TCPSrver的方法,我们已经把Socket单独分类出来了,那么Reactor只需要管理链接 和 事件派发

#pragma once
#include <iostream>
#include <memory>
#include <string>
#include <unordered_map>

#include "Connection.hpp"
#include "Epoller.hpp"

// 暂时叫做TcpServer->rename
// Reactor类似一个connection的容器,核心工作就是
// 1. 管理connection和对应的内核事件
// 2. 事件派发

class Reactor
{
    static const int gnum = 64;

public:
    Reactor() : _epoller(std::make_unique<Epoller>()), _isrunning(false)
    {
    }

    // 添加链接
    void AddConnection(int fd, uint32_t events, const InetAddr &addr, int type)
    {
        // 1.构建一个connection
        Connection *conn = new Connection(fd);
        conn->SetConnectionType(type);
        conn->SetEvents(events);
        conn->SetReactor(this); // 将当前对象设置进入所有的conn对象中
        conn->SetAddr(addr);

        // 设置对connection的上层处理,即,如果该connection就绪被激活,我们应该如何处理它?
        if (conn->Type() == ListenConnection)
        {
            conn->RegisterHandler(_OnConnect, nullptr, nullptr); // listen套接字只关心读事件
        }
        else
        {
            conn->RegisterHandler(_OnRecver, _OnSender, _OnExcepter);
        }

        // 2. fd和events写透到内核中,托管给epoll
        if (!_epoller->AddEvent(conn->Sockfd(), conn->Events()))
            return;

        // 3. 托管给_connections
        _connections.insert(std::make_pair(fd, conn));
    }

    // 确定事件类型
    void EnableConnectionReadWrite(int sockfd, bool readable, bool writeable)
    {
        if (!IsConnectionExists(sockfd))
        {
            return;
        }

        uint32_t events = ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET);
        _connections[sockfd]->SetEvents(events);

        // 写透到内核中!
        _epoller->ModEvent(_connections[sockfd]->Sockfd(), _connections[sockfd]->Events());
    }

    void DelConnection(int sockfd)
    {
        // 0. 安全检测
        if (!IsConnectionExists(sockfd))
        {
            if (!IsConnectionExists(sockfd))
            {
                return;
            }
            LOG(INFO, "sockfd %d quit, 服务器释放所有资源\n", sockfd);
            // 1. 在内核中移除对sockfd的关心
            EnableConnectionReadWrite(sockfd, false, false);
            _epoller->DelEvent(sockfd);
            // 2. sockfd 关闭
            _connections[sockfd]->Close();
            // 3. 在Reactor中移除对Connection的关心
            delete _connections[sockfd];
            _connections.erase(sockfd);
            return;
        }
    }

    void LoopOnce(int timeout)
    {
        int n = _epoller->Wait(revs, gnum, timeout);
        for (int i = 0; i < n; i++)
        {
            int sockfd = revs[i].data.fd;
            uint32_t revents = revs[i].events;

            if (revents & EPOLLERR) // 错误事件
                revents |= (EPOLLIN | EPOLLOUT);
            if (revents & EPOLLHUP) // 挂起事件
                revents |= (EPOLLIN | EPOLLHUP);

            if (revents & EPOLLIN)
            {
                if(IsConnectionExists(sockfd) && _connections[sockfd]->_handler_recver)
                    _connections[sockfd]->_handler_recver(_connections[sockfd]); // 读事件就绪,派发给对应的conn
            }
            if (revents & EPOLLOUT)
            {
                if (IsConnectionExists(sockfd) && _connections[sockfd]->_handler_sender)
                    _connections[sockfd]->_handler_sender(_connections[sockfd]); // 写事件就绪,派发给对应的conn
            }
        }
    }

    void Dispather() // 事件派发
    {
        int timeout = -1;
        _isrunning = true;
        while (_isrunning)
        {
            LoopOnce(timeout); // 默认是阻塞等待的
            // 做做其他事情
            // 就可以获取新的fd,并添加了!
            PrintDebug();
        }
        _isrunning = false;
    }

    bool IsConnectionExists(int sockfd)
    {
        return _connections.find(sockfd) != _connections.end();
    }
    void SetOnConnect(handler_t OnConnect)
    {
        _OnConnect = OnConnect;
    }
    void SetOnNormalHandler(handler_t recver, handler_t sender, handler_t excepter)
    {
        _OnRecver = recver;
        _OnSender = sender;
        _OnExcepter = excepter;
    }
    void PrintDebug()
    {
        std::string fdlist;
        for (auto &conn : _connections)
        {
            fdlist += std::to_string(conn.second->Sockfd()) + " ";
        }
        LOG(DEBUG, "epoll管理的fd列表: %s\n", fdlist.c_str());
    }
    ~Reactor()
    {
    }

private:
    // key: sockfd
    // value: Connetion*
    std::unordered_map<int, Connection *> _connections; // Event
    std::unique_ptr<Multiplex> _epoller;
    bool _isrunning;
    struct epoll_event revs[gnum];

    // Reactor中添加处理socket的方法集合
    // 1. 处理新连接
    handler_t _OnConnect;
    // 2. 处理普通的sockfd,主要是IO处理
    handler_t _OnRecver;
    handler_t _OnSender;
    handler_t _OnExcepter;
};
HandlerConnection.hpp

我们已经在Reactor进行事件派发,接下来就需要处理事件派发

#pragma once

#include <iostream>
#include <sys/types.h>
#include <sys/socket.h>
#include "Log.hpp"
#include "Connection.hpp"

using namespace log_ns;
const static int buffersize = 512;

// 不应该让HandlerConnection来处理报文
class HandlerConnection
{
public:
    HandlerConnection(handler_t process) : _process(process)
    {
    }

    // 处理读事件
    void HandlerRecver(Connection *conn) // conn就是就绪的conn
    {
        errno = 0;
        // 1. 直接读取
        LOG(DEBUG, "client 给我发了消息: %d\n", conn->Sockfd());
        while (true)
        {
            char buffer[buffersize];
            // ssize_t recv(int sockfd, void *buf, size_t len, int flags);
            ssize_t n = ::recv(conn->Sockfd(), buffer, sizeof(buffer) - 1, 0);
            if (n > 0)
            {
                buffer[n] = 0; // 数据块
                conn->AppendInbuffer(buffer);
            }
            else
            {
                if (errno == EWOULDBLOCK) // 当前没有更多数据可读取
                {
                    break;
                }
                else if (errno == EINTR) // 信号中断
                {
                    continue; // 重新尝试接收数据
                }
                else
                {
                    conn->_handler_excepter(conn); // 统一执行异常处理
                    return;
                }
            }
        }

        // 2. 交给业务处理
        // 一定是读取完毕了,我们应该处理数据了
        std::cout << "%d Inbuffer 内容: " << conn->Inbuffer() << std::endl;

        // 未来不让他做处理, 我让他把报文解析,处理的方法,作为任务,交给线程池!
        // std::bind()
        _process(conn); // 内容分析
    }

    // 处理写事件
    void HandlerSender(Connection *conn)
    {
        errno = 0;
        // 1. 直接写
        while (true)
        {
            ssize_t n = ::send(conn->Sockfd(), conn->OutBuffer().c_str(), conn->OutBuffer().size(), 0);
            if (n > 0)
            {
                conn->DiscardOutbuffer(n);
                if (conn->OutBuffer().empty())
                    break;
            }
            else if (n == 0)
            {
                break;
            }
            else
            {
                if (errno == EWOULDBLOCK)
                {
                    // 发送条件不满足
                    break;
                }
                else if (errno == EINTR)
                {
                    continue;
                }
                else
                {
                    conn->_handler_excepter(conn);
                    return;
                }
            }
        }

        // 2. 只能是发送条件不满足 && 缓冲区还有数据
        if (!conn->OutBuffer().empty())
        {
            // 开启对写事件关心.
            conn->_R->EnableConnectionReadWrite(conn->Sockfd(), true, true);
            // 发送完了呢?
        }
        else
        {
            conn->_R->EnableConnectionReadWrite(conn->Sockfd(), true, false);
        }
    }

    void HanlderExcepter(Connection *conn)
    {
        // 整个代码的所有的逻辑异常处理,全部都在这里
        // 删除连接
        conn->_R->DelConnection(conn->Sockfd());
    }

private:
    handler_t _process;
};
NetCal.hpp && PackageParse.hpp && Protocol.hpp

我们主要内容模块已经写好了,现在拿着之前我们写的网络版本计算机处理业务

#pragma once

#include "Protocol.hpp"
#include <memory>

class NetCal
{
public:
    NetCal()
    {
    }
    ~NetCal()
    {
    }
    std::shared_ptr<Response> Calculator(std::shared_ptr<Request> req)
    {
        auto resp = Factory::BuildResponseDefault();
        switch (req->Oper())
        {
        case '+':
            resp->_result = req->X() + req->Y();
            break;
        case '-':
            resp->_result = req->X() - req->Y();
            break;
        case '*':
            resp->_result = req->X() * req->Y();
            break;
        case '/':
        {
            if (req->Y() == 0)
            {
                resp->_code = 1;
                resp->_desc = "div zero";
            }
            else
            {
                resp->_result = req->X() / req->Y();
            }
        }
        break;
        case '%':
        {
            if (req->Y() == 0)
            {
                resp->_code = 2;
                resp->_desc = "mod zero";
            }
            else
            {
                resp->_result = req->X() % req->Y();
            }
        }
        break;
        default:
        {
            resp->_code = 3;
            resp->_desc = "illegal operation";
        }
        break;
        }
        return resp;
    }
};
#pragma once
#include <iostream>
#include <functional>
#include "InetAddr.hpp"
#include "Log.hpp"
#include "Protocol.hpp"
#include "Connection.hpp"
#include "NetCal.hpp"

using namespace log_ns;
class PackageParse
{
public:
    void Excute(Connection *conn)
    {
        while (true)
        {
            // 我们能保证我们读到的是一个完整的报文吗?不能!
            // 2. 报文解析,提取报头和有效载荷
            std::string package = Decode(conn->Inbuffer());
            if (package.empty())
                break;
            // 我们能保证我们读到的是一个完整的报文吗?能!!
            auto req = Factory::BuildRequestDefault();
            std::cout << "package: \n"
                      << package << std::endl;
            // 3. 反序列化
            req->Deserialize(package);

            // 4. 业务处理
            // auto resp = _process(req); // 通过请求,得到应答
            auto resp = cal.Calculator(req);

            // 5. 序列化应答
            std::string respjson;
            resp->Serialize(&respjson);
            std::cout << "respjson: \n"
                      << respjson << std::endl;

            // 6. 添加len长度报头
            respjson = Encode(respjson);
            std::cout << "respjson add header done: \n"
                      << respjson << std::endl;

            // 7. 发回
            conn->AppendOutbuffer(respjson);
        }

        // 我们已经至少处理了一个请求,同时至少会有一个应答
        // if(!conn->OutBuffer().empty()) 
        // conn->_handler_sender(conn); // 方法1:直接发送数据
        if (!conn->OutBuffer().empty())
            conn->_R->EnableConnectionReadWrite(conn->Sockfd(), true, true); // 方法2:我只要进行激活对写事件的关心即可
    }

private:
    NetCal cal;
};
#pragma once

#include <iostream>
#include <memory>
#include <iostream>
#include <string>
#include <jsoncpp/json/json.h>

static const std::string sep = "\r\n";

// 设计一下协议的报头和报文的完整格式
// "len"\r\n"{json}"\r\n --- 完整的报文, len 有效载荷的长度!
// \r\n: 区分len 和 json 串
// \r\n: 暂是没有其他用,打印方便,debug
// 添加报头
std::string Encode(const std::string &jsonstr)
{
    int len = jsonstr.size();
    std::string lenstr = std::to_string(len);
    return lenstr + sep + jsonstr + sep;
}
// 不能带const
// "le
// "len"
// "len"\r\n
// "len"\r\n"{json}"\r\n (]
// "len"\r\n"{j
// "len"\r\n"{json}"\r\n"len"\r\n"{
// "len"\r\n"{json}"\r\n
// "len"\r\n"{json}"\r\n"len"\r\n"{json}"\r\n"len"\r\n"{json}"\r\n"len"\r\n"{json}"\r
std::string Decode(std::string &packagestream)
{
    // 分析
    auto pos = packagestream.find(sep);
    if (pos == std::string::npos)
        return std::string();
    std::string lenstr = packagestream.substr(0, pos);
    int len = std::stoi(lenstr);
    // 计算一个完整的报文应该是多长??
    int total = lenstr.size() + len + 2 * sep.size();
    if (packagestream.size() < total)
        return std::string();

    // 提取
    std::string jsonstr = packagestream.substr(pos + sep.size(), len);
    packagestream.erase(0, total);
    return jsonstr;
}

class Request
{
public:
    Request()
    {
    }
    Request(int x, int y, char oper) : _x(x), _y(y), _oper(oper)
    {
    }

    bool Serialize(std::string *out)
    {
        // 1. 使用现成的库, xml, json(jsoncpp), protobuf
        Json::Value root;
        root["x"] = _x;
        root["y"] = _y;
        root["oper"] = _oper;
        Json::FastWriter writer;
        // Json::StyledWriter writer;
        std::string s = writer.write(root);
        *out = s;
        return true;
    }
    bool Deserialize(const std::string &in)
    {
        Json::Value root;
        Json::Reader reader;
        bool res = reader.parse(in, root);

        _x = root["x"].asInt();
        _y = root["y"].asInt();
        _oper = root["oper"].asInt();

        return true;
    }
    void Print()
    {
        std::cout << _x << std::endl;
        std::cout << _y << std::endl;
        std::cout << _oper << std::endl;
    }
    ~Request()
    {
    }
    int X()
    {
        return _x;
    }
    int Y()
    {
        return _y;
    }
    char Oper()
    {
        return _oper;
    }
    void SetValue(int x, int y, char oper)
    {
        _x = x;
        _y = y;
        _oper = oper;
    }

private:
    int _x;
    int _y;
    char _oper; // + - * / % // x oper y
};

// struct request resp={30, 0};
class Response
{
public:
    Response() : _result(0), _code(0), _desc("success")
    {
    }
    bool Serialize(std::string *out)
    {
        // 1. 使用现成的库, xml, json(jsoncpp), protobuf
        Json::Value root;
        root["result"] = _result;
        root["code"] = _code;
        root["desc"] = _desc;
        Json::FastWriter writer;
        // Json::StyledWriter writer;
        std::string s = writer.write(root);
        *out = s;
        return true;
    }
    bool Deserialize(const std::string &in)
    {
        Json::Value root;
        Json::Reader reader;
        bool res = reader.parse(in, root);
        if (!res)
            return false;
        _result = root["result"].asInt();
        _code = root["code"].asInt();
        _desc = root["desc"].asString();

        return true;
    }
    void PrintResult()
    {
        std::cout << "result: " << _result << ", code: " << _code << ", desc: " << _desc << std::endl; 
    }
    ~Response()
    {
    }

public:
    int _result;
    int _code; // 0: success, 1: div zero 2. 非法操作
    std::string _desc;
};

class Factory
{
public:
    static std::shared_ptr<Request> BuildRequestDefault()
    {
        return std::make_shared<Request>();
    }
    static std::shared_ptr<Response> BuildResponseDefault()
    {
        return std::make_shared<Response>();
    }
};

测试结果一:

测试结果二:

#include "Log.hpp"
#include "Reactor.hpp"
#include "Listener.hpp"
#include "HandlerConnection.hpp"
#include "PackageParse.hpp"

#include <iostream>
#include <memory>

using namespace log_ns;

// ./tcpserver 8888
int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        std::cerr << "Usage: " << argv[0] << " local-port" << std::endl;
        exit(0);
    }
    uint16_t port = std::stoi(argv[1]);
    EnableScreen();
    InetAddr localaddr("0.0.0.0", port);

    PackageParse parse;

    // 专门用来处理新连接到来的模块
    Listener listener(port); // 连接管理器

    // 专门用来处理普通sockfd的模块
    HandlerConnection handlers(std::bind(&PackageParse::Excute, &parse, std::placeholders::_1)); // IO处理器
    
    // 主模块,事件派发
    std::unique_ptr<Reactor> R = std::make_unique<Reactor>(); // 事件派发器

    // 模块之间产生关联
    R->SetOnConnect(std::bind(&Listener::Accepter, &listener, std::placeholders::_1));
    R->SetOnNormalHandler(
        std::bind(&HandlerConnection::HandlerRecver, &handlers, std::placeholders::_1),
        std::bind(&HandlerConnection::HandlerSender, &handlers, std::placeholders::_1),
        std::bind(&HandlerConnection::HanlderExcepter, &handlers, std::placeholders::_1)
    );

    R->AddConnection(listener.ListenSockfd(), EPOLLIN|EPOLLET, localaddr, ListenConnection);

    R->Dispatcher();

    return 0;
}

 

细节补充


http://www.kler.cn/a/612181.html

相关文章:

  • vue ts+Windi CSS
  • CTFshow【命令执行】web29-web40 做题笔记
  • 未来工程项目管理新走向:云原生软件赋能绿色可持续建设
  • Kafka 面试备战指南
  • eureka与ribbon混合使用
  • Linux设置SSH免密码密钥登录
  • Netty和Project Reactor如何共同处理大数据流?
  • 无人机抗风测试技术要点概述!
  • failed to load steamui.dll”错误:Steam用户的高频崩溃问题解析
  • LLaMA-Factory使用实战
  • Elasticsearch 之 ElasticsearchRestTemplate 聚合查询
  • Java版Manus实现来了,Spring AI Alibaba发布开源OpenManus实现
  • Linux驱动开发--IIC子系统
  • 基于HTML5的3D魔方项目开发实践
  • leetcode 150. 逆波兰表达式求值
  • 22、web前端开发之html5(三)
  • HarmonyOS Next~鸿蒙系统开发类Kit深度解析与应用实践
  • 211、【图论】建造最大岛屿(Python)
  • 计算机网络——传输层(TCP)
  • 广东新政激发产业活力,凡拓数创以全场景AI3D方案领跑机器人赛道