【计网】实现reactor反应堆模型 --- 处理数据发回问题 ,异常处理问题
没有一颗星,
会因为追求梦想而受伤,
当你真心渴望某样东西时,
整个宇宙都会来帮忙。
--- 保罗・戈埃罗 《牧羊少年奇幻之旅》---
实现Reactor反应堆模型
- 1 数据处理
- 2 数据发回问题
- 3 异常处理问题
- 4 运行效果
1 数据处理
在上一篇文章中我们搭建起了Reactor反应堆模型的基础框架,可以实现对数据接受。那么接下来就需要对数据进行处理之后发回。
数据处理方面,需要使用到协议,我这里采取使用JSON串的形式,将之前网络计算器项目的协议结构直接拿过来使用。
【计网】从零开始掌握序列化 — 实现网络计算器项目
在HandlerConnection
模块中connection
连接的缓冲区中会接受到新的数据。接收到数据之后HandlerConnection
模块的工作就完成了,下面需要进入协议解析模块进行处理。直接调用HandlerConnection
模块的回调函数_process
,进入协议解析模块。
协议解析模块的逻辑很简单:
- 判断缓冲区中是否有完整报文
- 对完整报文进行协议解析
- 对解析出来的数据进行处理,得到应答报文
- 将应答报文发送回去
#pragma once
#include "Connection.hpp"
#include "InetAddr.hpp"
#include "Protocol.hpp"
#include "Log.hpp"
#include "NetCal.hpp"
using namespace log_ns;
class PackageParse
{
public:
void Execute(Connection *conn)
{
LOG(INFO, "service start!!!\n");
while (true)
{
// 1.报文解析
std::string str = Decode(conn->Inbuffer()); // 通过去报头获取报文
// std::cout << "str: " << str << std::endl;
// 连接当前没有完整的报文! --- 直接退出
if (str.empty())
break;
// 到这里说明有完整的报文!!!
auto req = Factory::BuildRequestDefault();
// 2.反序列化 得到 Request
req->Deserialize(str);
// auto res = Factory::BuildResponseDefault();
// 3.业务处理
auto res = cal.Calculator(req);
// 4.进行序列化处理
std::string ret;
res->Serialize(&ret);
std::cout << "ret: " << ret << std::endl;
// 5.加入报头
std::string package = Encode(ret);
//std::cout << "package: \n"<< package << std::endl;
// 6.发回数据
// 直接进行发送 , 怎么知道写入条件不满足呢? 通过错误码errno是EAGAIN即可。
conn->AppendOutbuffer(package);
}
// 到了这里 说明至少处理了一个请求 只是有一个应答
// 进行发回数据
if (!conn->Outbuffer().empty())
conn->_handler_sender(conn); // 方法1:直接发回数据
// 方法2:将写事件设置进入EPOLL就可以了 会自动进行读取
}
~PackageParse()
{
}
private:
NetCal cal; // 计算器
};
这里处理结束后,要将数据发回,但是我们还没有实现数据发回的逻辑,接下来我们来分析一下发回数据要怎么处理
2 数据发回问题
对于多进程与多线程的情况下,write更加简单,有多少发多少,直接进行阻塞式写入。
但是对于多路转接来说,write比较复杂:
- 当我们获得一个新的的fd时, 输入输出缓冲区默认都是空的
- 读事件就绪:本质就是输入缓冲区有了数据,有了新连接。
- 写事件就绪:不关心数据是什么,而是关心发送缓冲区中有没有空间,如果有空间,发送条件就是就绪的,否则不满足。
- 把一个
sockfd
托管给select poll epoll
,原因sockfd
上事件没有就绪,还是事件就绪了?当然是不就绪的时候托管给EPOLL。 - 默认sockfd新建的情况下,读事件不是就绪的,因为输入缓冲区没有数据,所以读事件要常添加到epoll中托管
- 默认sockfd新建的情况下,写事件是就绪的,因为输出缓冲区没有数据,所以写事件默认是直接写的
- 所以 只有当写入条件不满足时,我们才按需开启对sockfd的EPOLLOUT事件进行托管, 一直写,到缓冲区写满时数据还没有发完,就需要开启对写事件的关心!
- 对于写来说,当写入时出现条件不满足的情况时,后续剩余的数据,EPOLL会自动进行发送!
- 如果直接对一个sockfd设置EPOLLOUT关心,epoll就会大量的就绪,因为输出缓冲区不会第一时间写满!
未来如果发完了,对于EPOLLOUT事件的关心,就要被关闭
如果缓冲区没写满,数据也发完了 ,就不需要开启写事件关心
如果我们设置了对EPOLLOUT的关心,EPOLL对EPOLLOUT首次设置关心的时候默认会就绪一次!
这是根据以上准则整理出的代码:
void Sender(Connection *conn)
{
errno = 0;
//进行发送
while(true)
{
//进行发送数据
ssize_t n = ::send(conn->Sockfd() ,conn->Outbuffer().c_str() , conn->Outbuffer().size() , 0);
//发送成功
if(n > 0)
{
//向将读取成功的数据从缓冲区删除
conn->DiscardOutbuffer(n);
//判断是否读取完
if(conn->Outbuffer().empty())
{
break ;
}
}
else if(n == 0)
{
break ;
}
else
{
//通过errno判断错误类型
if(errno == EWOULDBLOCK)
{
//说明输出缓冲区满了
break;
}
else if(errno == EINTR)
{
//信号中断 继续发送
continue;
}
else
{
//真的出错了
LOG(ERROR , "send error , errno:%d\n" ,errno);
//进入异常处理
conn->_handler_excepter(conn);
return ;
}
}
}
//到这里说明是写入条件不满足了
if(!conn->Outbuffer().empty())
{
//EPOLL进行托管
//开启对写事件的关心
//LOG(DEBUG , "----------\n");
conn->GetReactor()->EnableConnectionReadWrite(conn->Sockfd() , true , true);
//发送完了呢?
}
else
{
//将写事件关闭
conn->GetReactor()->EnableConnectionReadWrite(conn->Sockfd(), true , false);
}
}
这样我们就可以成功的将数据发回,并在发回条件不满足时,将写事件托管给EPOLL进行自动发送!发完之后需要将EPOLLOUT事件撤销。
这样写入的问题就解决了
3 异常处理问题
在写入和读取数据的过程中,所有的异常我们都是交给异常处理方法进行解决。而所以的异常,最终的都是要将连接中断,文件描述符关闭,解除EPOLL托管。
void Excepter(Connection *conn)
{
//整个代码所以的逻辑异常 都在这里处理
//删除连接
conn->GetReactor()->DelConnection(conn->Sockfd());
}
去除连接的代码在Reactor中:
void DelConnection(int sockfd)
{
// 安全检测
if (!IsConnExist(sockfd))
return;
LOG(INFO, "sockfd: %d quit , 服务器释放资源\n", sockfd);
// 在内核中移除sockfd关心
EnableConnectionReadWrite(sockfd, false, false);
_epoller->DelEvent(sockfd);
// Socketfd要进行关闭
_conn[sockfd]->Close();
// 在Reactor中移除Connection的关心
delete _conn[sockfd];
_conn.erase(sockfd);
}
经过这个处理,出现异常的连接,就直接别删除了。
4 运行效果
截止目前为止,我们已经实现了:
- 通过Reactor托管Listener获取新连接
- EPOLL对新连接的读事件进行托管,获取数据
- 得到数据之后可以进行上层的协议解析与业务处理
- 数据处理之后,可以进行发回数据,发回条件不满足时,可以将写事件托管给Reactor进行自动处理
来看效果:
效果非常可以了!
下一篇文章我们来解决如何加入多线程与多进程,提高效率!