Reactor
文章目录
- 正确的理解发送
- double free问题
- 1.把我们的reactor进行拆分
- 2.链接管理
- 3.Reactor的理论
listensock只需要设置_recv_cb,而其他sock,读,写,异常
所以今天写nullptr其实就不太对,添加为空就没办法去响应事件
获取新链接并没有对它做任何处理,那么我要对他做处理
Accepter 连接管理器
主要负责处理进行处理所有连接的
还有一批接口是事件管理器
也就是统一进行对所有事件来进行响应
获取了新链接,然后AddConnection把它设置进去 ,
之后已经把对应的新链接添加到内核里了,所以未来再进行事件循环时,一定会帮我们检测到已经就绪的事件,而这个时候就可能会包含普通的事件就绪,
普通的事件就绪他就会检测是否是读事件还是写事件,连接是否是健康的,然后直接执行曾经绑定的方法,对于普通的文件描述符,他绑定的方法刚好是它曾经设置的回调,也就是刚刚绑定的Recver
只要执行Recver,最终再把自己的connection对象指针传进去,所以未来在Recver这通过connection里面的fd , 此时直接读。
接下来的工作就是处理读了
你知道怎么处理数据吗?
你知道你要把数据处理的时候,从connection里面怎么读吗,我们调recv就可以读,我们曾经添加这个fd已经设置为非阻塞了,
而且他是ET的,根据上节课所讲,ET模式下一旦事件就绪我们只能疯狂的把数据全部读上来,可是数据里面有什么你自己知道吗?
对于tcpserver来讲它在读的时候,应不应该关心数据的格式???
同学们说应该,应该关心数据从哪到哪是个报文,从哪到哪是个什么东西。
答案是 不应该
你是个啥?你是个服务器,
你只需要关心IO数据就可以,至于这个数据有没有读完,报文的格式细节,这些你不用管。
你只需要帮我把数据全部收上来就可以 了
这才是比较关键的。
我们要处理一个业务,一定是先把数据收上来,收上来之后交给上层去处理,你别越俎代庖的把别人的事情做了,软件分层很重要。
所以接下来Recver只需要读就可以了
connection是形参传进来的,里面不是有fd吗。
我把fd拿进来
那读到哪里呢?
因为是ET模式必须得一直读,循环读,必须把本轮数据全读完,读取过程中根本不用关心读到的数据的格式
所以定义一个基本大小 g_buffer_size = 128
设置char buffer[ g_buffer_size ] 缓冲区
我们用recv 把数据读取到这个buffer中,而且标志位设置为0,代表阻塞读取,可是我们早把fd 设置为非阻塞了,所以这个recv 读取的时候还是非阻塞读取
recv 返回值
大于 0 读取成功
读取成功该怎么办?你知不知道对应的报文读完了没?你不知道,所以你要一直读,读到他出错。
我们在connection中设置 函数 向connection的string inbuffer直接拼接新内容就可以
等于 0 对端把链接关闭了
那我服务器就直接就不用处理这个链接了,
一旦读出现错误,我就直接让连接进入到异常处理回调
小于 0 读取出错
出错情况一 真的出错
真的出错,也进入到异常处理,所以所有的读和写所有的异常,全部在Excepter处理就可以了
这也就是为什么当前多路转接这里检测它如果有错误事件继续,异常事件继续,把他的事件全部设成读写事件继续,因为我们想把未来整个服务器所有的异常处理全部都放在这一个Excepter处理
出错情况二
非阻塞一直循环读取的时候,如果出错了,但是errno 错误码 如果等于 EWOULDBOLCK,说明这次读,一直读 读出错了,表明把本轮数据全读完了,读完了我们就不处理了,所以直接break就可以了
另外读取期间,如果 errno 错误码 等于 EINTR 表示读的时候被异常信号中断了,所以continue让他继续去读取。
其他情况才是真正的出错了,也就是第一种情况,这里用else完事。
所以可以看到connection里的接受缓冲区数据越来越多了。
读到了数据,可是读完了之后对我来讲,我怎么知道数据 要不要让上层去处理,你不是已经把一批数据全读上来了,此时我怎么知道这个数据在上层应该怎么处理呢?
处理应该交给上层做,你怎么交给上层呢?
服务器已经处理完了IO问题,我们给TcpServer再设置一个 让上层处理信息的回调OnMessage
把本轮数据全读完,到底这个数据能不能处理 读完之后交给上层,交给上层的时候要把connection对象交给这个回调,因为你读到的sock的所有的数据在connection结构体内部接受缓冲区里。
直接读完之后回调给上层
在代码中初始化的时候,构造tcpserver时,就需要给我再传个回调
对于tcpserver来讲OnMessage他是个回调,所有上层就要帮我们处理了,处理的时候一旦读成功了把数据本轮读完了,OnMessage就应该帮我处理,所以OnMessage上层要做的核心工作是:数据有了,但是不一定全,所以要求上层,1.检测 2.如果有完整报文,就处理
所以上层你怎么知道报文是一个完整报文呢?你怎么知道报文完整就要处理,怎么处理呢?
再往上写,就到了之前对特定缓冲区我们根据协议先把报文分包,如果能分出来就分,分不出来就不管,分出来了再反序列化再处理。
那这个回调是谁呢?
这里用输出这个connection缓冲区的数据打印出来测试
在往上就到了把报文分割,在做反序列化,就要有协议了,要有应用层了。
我们把之前网络版本计算器的协议相关头文件拿过来
TcpServer.hpp 服务器处理IO 的
Calculator .hpp 处理业务的
两个之间如何耦合呢?
Handler会将读上来的报文解码反序列化然后计算再序列化编码最后返回响应字符串结果
,接下来要把字符串结果发送出去。
得到计算结果之后要发送出去就和Calculator没关系了,就和服务器有关系,所以一会再绕进入服务器内部再进行处理。
我们的业务逻辑比较简单,没有特别耗时的操作
如果有耗时的操作,我们可以把读到的报文,解码,反序列化,处理计算工作就不做了,可以把之前的线程池拿进来,这里Handler的时候把所有的任务推送 到线程池里,让线程池处理,处理完之后在由线程池把处理的结果写给tcpserver让它再来发。
如果Handler处理结果字符串是空的,那我们就直接返回,让底层继续读,再事件就绪他会继续向inbuffer里追加,
最后一旦有完整的请求了,此时响应就处理完了,它也把缓冲区的字符串该移走就移走了。
下面就要开始发送了
该怎么发送呢?
你想发,前提条件是你要把对应的数据添加到发送缓冲区。
现在又提拱了AppendOutbuffer函数
我们要发怎么做呢?
我们在连接管理器里只设置了对fd的读关心,我们并没有关心它的写事件,那怎么处理写呢?
把计算的响应结果字符串添加到connection发送缓冲区里。
正确的理解发送
1、我们要把数据真正发出去,在epoll/select/poll中,因为写事件表明的本质是发送缓冲区是否有空间,而这个经常是有空间的,所以对于写事件而言,所以经常就是就绪的,
2、如果我们设置对EPOLLOUT事件关心,而EPOLLOUT几乎每次都会就绪,它会导致epoll经常返回,可我们真正关心的是有没有数据发,所以他会浪费CPU 的资源
结论
在写多路转接的代码时
对于读,设置常关心。
对于写,我们是按需设置。
什么是按需设置?
代码体现
3、那怎么处理写呢?
不用考虑epoll,直接写入,如果写入完成,就结束。
如果写入完成,但是数据没有写完,outbuffer里还有内容,我们就需要设置对写事件进行关心了,如果写完了,去掉对写事件的关心!
我们现在上层已经把数据放到outbuffer里,怎么发呢?
我们直接调用connection里面的Sender回调
首先走到Sender 的时候一定是上层已经在connect里面的outbuffer里面写了数据,当然你不写也行,我会做检测。
因为是ET模式的,非阻塞的,所以接下来while循环
我们直接send发就行了
send返回值是实际写入的个数
你上层在发的时候,TCP发送缓冲区只剩512字节,可你outbuffer里面可能是1024个字节,所以你期望全发,可实际只能发返回值个。
也就是返回值 > 0
如果返回值大于0,说明他发送成功了,他到底发了多少数据我们也不清楚,接下来发送成功我们要做的就是 从outbuffer里把发送成功的字符移除掉,移除掉返回值个,也就是n个
移除之后要判断,如果outbuffer已经是空了,下次就不发了直接break
返回值 == 0
outbuffer可能因为一些原因没数据,他没发,我们就返回就可以了不处理。
返回值 < 0
发送出错了
意味着当前我们要判断了,因为是一直在发,如果errno == EWOULDBLCOK
说明我一直发,可是最后底层发送缓冲区空间不够了,我上层可能还有数据,但底层不够了,不够了我就不能再发了,所以我就break。
另外errno == EINTR被信号中断了,我就continue
除此之外,发送时候就发出错了,那就打印调试信息,进入异常处理然后return不能让它往后走了
至此在上层就直接进行发送就行了。
现在while循环一直发,出来的时候有可能因为发的时候数据没发完,但是已经不能再发了,缓冲区满了。
也有可能outbuffer最后已经整体发完了。
走到异常处理我们不管了
所以接下来该怎么办呢?
发完有两种情况,要么把数据情况了,要么数据还没发完。
没法玩要做判断。
如果outbuffer数据没发完,我已经经过了不断的循环 给他发了很多,可是数据没有发完,此时开启对写事件的关心。
对写事件的关心,一旦底层的写缓冲区有空间了他会就绪,就绪他会继续回调Sender继续发(这里的回调是通过这里来回调的,
)
发完之后再去检测。
如果outbuffer为空,说明此次发完了,此时关闭对写事件的关心。
那该怎么开启对写事件的关心呢?
直接把fd 及其 关心的写事件 设置进epoll模型,利用EPOLL_CTL_MOD
这里把读也设置为true了,也就是也对读设置了关心,为什么?
一定要把EPOLLIN带上,要不它可能认为你不再关心EPOLLIN了,虽然确实是以追加的方式加的,但是这样设置的话确定性会高一些。
其实也证明EPOLLIN是常设的。
一个文件描述符,对应一对接受发送缓冲区,读写都是往一个fd读写。
最终上层就可以通过EnableEvent来开启对读写的关心
上层读取数据后获取响应字符串,之后直接调用Sender,这次发有没有可能没发完呢?
完全有可能,如果没发完,我们就在Sender开启对写事件关心,
一旦缓冲区有空间了,底层自动会帮我们做事件派发
其中就会响应到Sender,而Sender会继续再发,如果继续发还是没发完,继续打开对写事件关心,继续去写,如果此时他发完了,那么他会把写事件关心关闭。
错误更正
这里connection里面只能调用_send_cb
而不是上文中的Sender
可是发送只能由TcpServer来发,你直接拿connection的_send_cb发,这个方法不应该是你调的。
所以我们可以换种写法,你的connection里面包含了一个TcpServer的回指指针,回指指针可以调用类内的Sender方法。
所以直接调的其实是TcpServer内部的方法。
用我们之前网络版本计算器客户端来发10次请求测试
说明服务器是给了响应的,所以我们已经走了一圈了,可是我们最终还要处理异常处理问题。
因为右边是连接退出的时候打印的异常
现在我们到异常处理了
所有问题都转化成了读写的就绪,读写就绪我们也处理了连接的安全,安全的才进行后续的回调
现在到了异常这里呢,我认为他一定是出问题了,到底是读出问题了还是写出问题了还是其他的已经不重要了,为什么不重要了呢?
因为我要关掉连接了。
所以接下来该怎么做呢?
首先
- 你要在epoll中移除对事件的关心。
- 关闭异常的文件描述符
- 从unordered_map中移除
那还要不要把new出来的connection对象再给它 delete呢?
不用了,因为智能指针自动释放空间。
比如读出问题了,一旦进入异常处理 就不能再让他往后走了,一旦异常处理以后直接return
double free问题
因为tcpserver里的unordered_map里的智能指针指向了Connection
而 Connection里面的回指指针又指向了tcpserver。
把connection里面的shared_ptr 改为 weak_ptr就可以了
1.把我们的reactor进行拆分
我们可以把listen模块单独拆出来,然后把
listener关心的事件,就当成一个正常的连接添加进去就可以了。
有什么好处?
意味着往后如果想把服务设置成多线程
有一种最简单的方式
我们创建一个主线程,主线程内部包含一个TcpServer 这其实就是一个Reactor
未来这是你创建的base_server 主server,它只负责进行获取新链接。
未来把他启动之后呢,后面我们再创建多线程,创建多线程时他要做的事情是Loop
然后线程要执行的函数Loop
每一个线程内部继续创建服务器server,至此就相当于每一个线程都会有一个reactor
然后呢由主reactor 它往后获得的所有的连接不用再代码当中让他AddConnection
那么你做什么工作呢,你在tcpserver里维护一个vector fds
意思就是说,我在主reactor里我自己想办法获取新链接时,想办法维护一个数组
这个主reactor线程只负责把所有连接放上来,获取到的新fd放到vector里就可以了,主线程可以和其他线程通信,还记得之前写的阻塞队列吗,所以我们可以把vector推送给负载均衡式的交给其他reactor
其他reactor做什么呢?他不绑定listen套接字,其他reactor只需要把所有的文件描述符拿进来在他里面做AddConnection这个工作
也就是说每一个reactor呢,他都把fd获取上来
然后添加到自己对应的reactor里,所以对于fd生命周期的管理由上面的线程管理,而你的主reactor他只负责获取添加到vector里然后让所有线程读到就可以。
那其他线程怎么读到呢?
你可以设置阻塞队列,得到了多个连接把vector里 的文件描述符push往里面写,最后通知消费线程来消费,然后其他线程读到之后由其他线程处理
这种方案叫做 == 一个线程一个reactor==
一般叫做 one thread one loop
记住一定要把listen套接字拆出来
listener获取连接的时候,会回指tcp_server。
我们不要调用什么AddConection
他这里获取连接时只需要让它把新的fd push到对应的vector当中
push到对应的vector当中呢,再来进行操作
今天TcpServer不应该这么叫,他应该叫做事件循环 EventLoop
它最核心的工作就是它
2.链接管理
优秀的服务器必须得对连接进行管理
直接在connecion里添加
time_T last_active_time字段
获取当前时间戳
构造的时候就获取当前时间
把时间戳维护起来
然后每一次读事件就绪,我们直接更新last_active_time 更新当前时间
如果还有写,继续更新
如果不活跃的连接肯定是不会更新时间的,我们还有Loop
我们可以每一次有事件继续把epoll等待策略设置成每隔1S timeout一次
所以这里就可以让它do_other_thing()
做其他事情,每次进行循环,循环完了我们do_other_thing要遍历unordered_map,检测对应的connection的最近时间,如果哪一个连接长时间不动了,我怎么知道它不动了呢?
因为每一次connection时间都会做更新,你可以设置一个最大的超时时间,如果当前对应的connection他所指向的last_active_time 和 设定的未来超时时间timeout
比较一下 连接时间戳 小于 timeout
说明当前连接还没有超时
但如果 大于了 说明 超时了
那直接让连接进入异常处理,他会自动关闭连接。
还可以添加定时器的功能。
class Timer
定时器应该有什么呢?
unint64_t expired_time 未来过期时间
bool type 意味着定时器是否永久存在
func_t cb 回调
未来要有TimeManager
我们用优先级队列,把Timer放进来
将来会存在很多定时器,所有定时器都由堆管理,未来根据超时时间构建最小堆
服务器的属性里可以 添加TimeManager放到类里面
未来不是会Add添加事件吗,我们可以给每一个连接添加一个定时器
把他定好,初始化好,未来超时时间一设置,
然后再把定时器放入到最小堆里。
最终 定时器有了,有了之后呢,定时器是最小堆,从此往后事件循环时 不是有一个do other thing吗,事件派发完,不是有个 do other thing吗,你只需要在循环中检测最小堆 拿top()首部元素 检测是否超时
两种情况
没有超时
当前最小堆当中未来的超时时间 我们拿到 expired
int timeout = 未来超时时间expired - 当前时间
未来在派发时,下一次epoll的阻塞等待时间就改为timeout
所以每一次进行检测时,这里Dispatcher它应该等多长时间呢?
他应该等的按照最小堆当中最小的超时时间来进行等待。
那如果超时了呢?
直接拿到顶部元素top,你的Timer里面本来就有回调方法,直接执行它的回调,然后最小堆再进行pop,重复这个工作,就能把所有超时全处理了。
超时回调我该做什么工作呢?
你可以绑定内部检测特定连接的状态。
你要更优化的话,我还可以给Timer回指
tcpserver 和 connection
未来在服务器当中,每一个连接都有一个connection对象,每一个connection再定义一个timer对象,接着可以让他们两个互相指向,至少让timer指向对应的connection
所以一旦timer 超时了,你就可以定很多策略了,可是超时我不光要看超时,我还要看历史上它有没有活跃,此时我可以结合超时的时间,它超时了我要检测它,检测它可是不一定要移除它,我再要来查一下他的connection的最近活跃时间,如果最近很活跃,那么我重新更新他的超时时间,然后再把它重新定义一个定时器再加到最小堆里,此时就完成了对它里面的处理
3.Reactor的理论
Reactor叫做半同步半异步模型
主要原因是Reactor他要自己去select 和 poll 和epoll 他自己要等,他等的过程其实是同步的过程,IO分两步 等 + 拷贝
等的过程是由select 和 poll 和epoll 来做 这是体现的是同步,reactor参与了IO.
那异步体现在哪里呢?
异步体现在他可以直接回调处理
目前我们处理的时候是让单Reactor把所有事情全做了
但未来呢,我们记住,如果我们今天要处理的时候,如果有特别耗时的工作就可以更复杂了,
左边是IO逻辑
多Reactor可以再搞上一个线程池
每一个Reactor有数据要处理直接把他push到线程池里,线程池内部一定会有大量的其他线程进行处理任务
如果我们的业务特别复杂+耗时,比如有MySQL的操作,否则比较简单就没必要这么搞了。
还有一种模式叫做Proactor,纯异步方式 ,不考虑。
reactor 翻译过来 就是 反应堆模式
反应堆到底是什么意思呢?
reactor 相当于 打地鼠
我们每一个玩游戏的人就相当于是一个对应的多路转接
然后呢我们要检测每一个洞口上有没有对应的地鼠出来,虽然他没出来,但是我知道,一旦他出来了我就要执行我的回调方法来砸他。
游戏的面板相当于对应的reactor
每一个洞相当于我们的connecion
老鼠上来了叫做事件就绪
执行砸方法就叫做执行回调
这种就叫做反应堆
epoll参与了一次等多个fd,这就是同步,后来把数据读上来了你可以不处理,你想处理就处理,今天写的代码其实是算是一种同步的代码,基于非阻塞的同步代码。
你可以把对应收到的请求交到线程池,你就不处理了,只负责IO ( 等 + 拷贝 ) ,不负责处理,处理的事情推送到后端的任务队列里由线程池处理,线程池处理完了再把结果给你写 到对应的线程里,无非就是在服务器中再加一个对应的成员变量,然后由线程池直接去访问,写进去就可以
所以这里呢就可以体现出异步,当然这个呢比较复杂。
后半程说的所有东西,你要是能懂最好下来弄一弄
如果你不懂没关系,作为一个程序员,如果能 手写epoll server能搞定了,
基本上多路转接已经不再是问题了,一些服务器设计问题继续reactor
如果再写那就把后半程 reactor v2搞定
从今天开始往后,凡是很多的网络服务器的代码,同学们都能看懂了,包括什么redis用C写的,底层就是单reactor,它的处理用的是reactro的LT模式。
Linux课程我们就讲到这里,虽然课程结束了,但同学们现在的你们的学习并没有结束。
希望同学们持之以恒,一定要花很多时间再去学习。
如果压力比较大,唯一的做法是把基础能力变得非常强