Reactor介绍,如何从简易版本的epoll修改成Reactor模型(demo版本代码+详细介绍)
目录
Reactor demo
引入
比喻
修改代码
connection
tcp_server
ET模式
主逻辑
处理事件
运行结果
代码
完善功能
读取数据
运行结果
编辑
代码
处理数据
回指指针
如何处理写事件
引入
循环内
处理对写事件的关心
异常处理
代码
server.hpp
server.cpp
运行结果
Reactor demo
引入
Reactor 模式是一种设计模式,广泛应用于处理高效的事件驱动编程和网络编程中
- 用于管理和分发来自不同事件源的事件
- 核心目的是优化系统在处理大量事件时的性能,特别是在 I/O 操作密集的环境中
Reactor 是一个半同步半异步模型
- reactor直译过来就是反应堆
- 同步 -- 调用epoll接口等待的过程
- 异步 -- 事件以回调方式进行处理
比喻
回想起我们曾经玩过的打地鼠游戏:
- 整个游戏界面就是reactor模型,操作的人就是多路转接方案,检测每个洞(连接)有没有地鼠出来(事件就绪),一旦出来就砸他(执行注册好的回调方法)
- 这里我们写的代码算是同步,因为需要在内部处理事件
- 如果要写异步,可以搞个线程池,将收到的数据直接push进任务队列中,交给线程池处理,我们只进行io,然后在内部搞个字段来接收线程池返回的结果
这里我们只是写一个demo版本的(当然,demo版本也很麻烦)
(大家如果遇到什么问题,可以评论交流)
修改代码
之前我们只调用一次read,无法确定是否读完了一份完整数据,并且只有读功能 -- epoll接口使用 -- 非阻塞式网络io(仅读事件)-CSDN博客
- 在这份代码中,我们并没有保证读完所有数据,并且也不会有机会拿到完整数据,因为每次读取都会创建新的临时缓冲区
- 所以,我们需要把没读完的数据临时缓存起来
因为应用层上存在大量连接,每个连接都对应一个套接字文件,这些连接都会遇到这个问题
- 所以需要给每个文件都设置输入输出缓冲区,并定义结构来管理
这次主要有两个模块:
connection
对应我们上面说的,是对文件的管理结构,结合网络通信+epoll,可以确定里面的成员变量:
- 要有每个连接对应的套接字fd
- 缓冲区肯定输入输出都要有(这里我们就用string就行,虽然它并不适合处理二进制流,应该用vector,但vector会有很多拷贝,所以方便起见,还是用string)
- 可以将处理读/写/异常事件就绪时的回调函数也放在里面,刚好可以实现自定义特定文件的处理方式 -- 这样可以使用类内的缓冲区,而不是在读数据时,调用公共函数,将数据添加到公共临时缓冲区中
- 定义一个回指指针,指向tcp_server(按下不表,在后续说明)
tcp_server
是我们服务器的类
- 肯定先要包括之前我们封装好的epoll接口和socket接口对象
需要管理多个连接,也就是需要一个结构来将多个connection对象组织起来
- 使用unordered_map结构,建立fd->连接结构的映射关系
- 每次将新的要关注的文件添加到connections中,一旦有文件上的事件就绪,就可以通过fd,找到处理事件的方法
我们目前将读事件分为两类,所以需要两种回调函数
- 获取新连接 和 读取数据
- 那么,最好是先定义出针对各种类型的处理函数,然后根据文件类型,手动设置好我们需要的方法
- 因为我们是在服务器内部进行回调函数的设置,所以将回调函数定义在类内,使用会更方便
以上,我们可以定义一个函数来解决,总的来说分为两步:
- 将需要关注的[新文件上的特定事件]添加进epoll模型中 (内核层)
- 将[新文件+如何处理特定事件]添加进connections中 (用户层)
除此之外,我们可以直接在类内定义一个struct epoll_event数组,存放从内核捞取出的就绪事件,然后交给事件派发器
ET模式
保证服务器以ET模式工作,要设置相应的标志位:
以及,为了保证全部读取,需要将fd设置为非阻塞io方式 -- fcntl()
主逻辑
服务器不断循环,循环过程中派发事件
- 然后在派发器逻辑中,每次获取一个就绪事件,分辨是哪个文件上的哪个事件就绪了,然后调用注册好的回调函数
判断是否就绪:
- 我们检测是否是读事件就绪,和epoll的工作模式无关,只检测EPOLLIN就行了
如果出现异常(EPOLLERR,EPOLLHUP),统一转化为读写问题(设置两个标记位)
- 因为一旦出现异常,读写一定会受到影响,只要转化,就能在读写函数内部解决 ??
而且,在正式处理前,我们需要一个函数,来判断当前连接是否安全(是否是我们需要关注的文件),以及当前对应的处理函数是否被设置
处理事件
连接/数据到来时,我们无法确定只有一个连接/一份数据,并且这里是在ET模式下,必须要读取出所有连接
- 所以,我们写一个循环来获取连接/读取数据,直到读完
运行结果
我们从本地连接上该服务,会发现连接成功:
最后一条的警告,是我们在socket.hpp中封装的accept里显示的,因为此时是非阻塞式+底层没有数据,所以系统调用的accept走了返回值<0的情况,然后打印到日志上
- 我们可以在内部添加判断语句,如果错误码=EAGAIN,就不打印日志
代码
#pragma once #include <memory> #include <errno.h> #include <string> #include <functional> #include <fcntl.h> #include "Log.hpp" #include "socket.hpp" #include "myepoll.hpp" #include <unordered_map> class connection; using func_t = std::function<void(std::shared_ptr<connection>)>; class connection // 每个文件都对应一个连接,拥有自己的输入输出缓冲区 { int fd_; std::string in_buffer_; std::string out_buffer_; public: func_t read_cb_; func_t write_cb_; func_t except_cb_; // 回指指针 public: connection(int sockfd, func_t read_cb, func_t write_cb, func_t except_cb) : fd_(sockfd), read_cb_(read_cb), write_cb_(write_cb), except_cb_(except_cb) {} ~connection() {} int get_fd() { return fd_; } private: }; class epoll_server { static const int def_timeout = 1000; static const int def_num = 64; static const uint32_t EVENT_IN = EPOLLIN | EPOLLET; // 设置ET模式 static const uint32_t EVENT_OUT = EPOLLOUT | EPOLLET; int port_; std::shared_ptr<MY_SOCKET> p_listen_sock_; std::shared_ptr<MY_EPOLL> p_epoll_; std::unordered_map<int, std::shared_ptr<connection>> connections_; // 建立fd->连接对象的映射关系 struct epoll_event events_[def_num]; public: epoll_server(int port) : port_(port), p_listen_sock_(new MY_SOCKET), p_epoll_(new MY_EPOLL(def_timeout)) {} ~epoll_server() {} void add_sock(int fd, uint32_t event, func_t read_cb, func_t write_cb, func_t except_cb) { // 添加到connections中 -- 用户层 connections_.insert(std::make_pair(fd, std::make_shared<connection>(fd, read_cb, write_cb, except_cb))); // 添加到epoll模型 -- 内核 p_epoll_->ctl(EPOLL_CTL_ADD, fd, event); } void init() { p_listen_sock_->Socket(); set_no_block(p_listen_sock_->get_fd()); // 设置为非阻塞式 p_listen_sock_->Bind(port_); p_listen_sock_->Listen(); // 添加监听套接字 add_sock(p_listen_sock_->get_fd(), EVENT_IN, std::bind(&epoll_server::accept, this, std::placeholders::_1), nullptr, nullptr); lg(DEBUG, "listen_socket add success\n"); } void loop() { init(); while (true) { int n = p_epoll_->wait(events_, def_num); // 等待并获取就绪事件 for (int i = 0; i < n; ++i) { Dispatcher(events_[i]); // 每次处理一个就绪事件 } } } private: void accept(std::shared_ptr<connection> conn) // 处理连接事件 { // 连接到来时,我们要循环处理,直到无数据 while (true) { std::string clientip; uint16_t clientport; int sock = p_listen_sock_->Accept(clientip, clientport); if (sock > 0) { lg(INFO, "get a new connection ,fd : %d\n", sock); set_no_block(sock); // 设置为非阻塞式 // 将新套接字添加进connections和epoll模型 add_sock(sock, EVENT_IN, std::bind(&epoll_server::receiver, this, std::placeholders::_1), nullptr, nullptr); } else { // 如果底层无数据,也会错误返回,并设置错误码11 if (errno == EAGAIN) // 无数据 { break; } else if (errno == EINTR) // 被信号中断 { continue; } else { lg(ERROR, "accept error\n"); break; } } } } void receiver(std::shared_ptr<connection> conn) { } void Dispatcher(struct epoll_event &sock) { int fd = sock.data.fd; // 需要判断是否是我们关注的文件 if (!is_safe(fd)) { lg(DEBUG, "fd: %s is not safe\n", fd); return; } int event = sock.events; if ((event & EPOLLHUP) || (event & EPOLLERR)) // 异常事件转换为读写事件 { event |= EPOLLIN; event |= EPOLLOUT; } if ((event & EPOLLIN) && connections_[fd]->read_cb_) // 如果读取回调存在 { connections_[fd]->read_cb_(connections_[fd]); // 调用读回调 } if ((event & EPOLLOUT) && connections_[fd]->write_cb_) // 如果读取回调存在 { connections_[fd]->write_cb_(connections_[fd]); // 调用读回调 } } bool is_safe(int fd) { auto it = connections_.find(fd); if (it != connections_.end()) { return true; } else { return false; } } void set_no_block(int fd) { int ret = fcntl(fd, F_GETFL); if (ret < 0) { perror("fcntl"); return; } fcntl(fd, F_SETFL, ret | O_NONBLOCK); } };
完善功能
读取数据
完善普通文件的回调函数(监听套接字只需要处理读事件,但通信用的套接字需要三种事件都处理)
我们先写好读取数据的函数:
- 循环读取至读完全部数据
- 读取一段就放入输入缓冲区中(服务器不应该关心数据格式/是否是一份完整数据,只要把数据全拿到手就行)
- 因为我们是非阻塞式,所以一旦读取完毕,会返回错误,我们需要将返回值<0的情况分类: 读完数据 / 因异常信号中断读取 / 真的出错
- 然后,我们在真的出错时,调用异常处理函数
- 同理,在对方关闭连接时,也需要进入异常处理阶段
这里为了日志好看,可以在connection结构中增加两个字段 -- ip和port
运行结果
可以看到,随着我们的输入,打印出的[输入缓冲区的内容]变得更多:
代码
#pragma once #include <memory> #include <errno.h> #include <string> #include <functional> #include <fcntl.h> #include "Log.hpp" #include "socket.hpp" #include "myepoll.hpp" #include <unordered_map> class connection; using func_t = std::function<void(std::shared_ptr<connection>)>; class connection // 每个文件都对应一个连接,拥有自己的输入输出缓冲区 { int fd_; std::string in_buffer_; std::string out_buffer_; public: func_t read_cb_; func_t write_cb_; func_t except_cb_; // 方便日志打印 std::string ip_; uint16_t port_; // 回指指针 public: connection(int sockfd, func_t read_cb, func_t write_cb, func_t except_cb) : fd_(sockfd), read_cb_(read_cb), write_cb_(write_cb), except_cb_(except_cb) {} ~connection() {} int get_fd() { return fd_; } void append(const std::string &str) { in_buffer_ += str; } std::string& inbuffer() { return in_buffer_; } private: }; class epoll_server { static const int def_timeout = 1000; static const int def_num = 64; static const int def_buffsize = 128; static const uint32_t EVENT_IN = EPOLLIN | EPOLLET; // 设置ET模式 static const uint32_t EVENT_OUT = EPOLLOUT | EPOLLET; int port_; std::shared_ptr<MY_SOCKET> p_listen_sock_; std::shared_ptr<MY_EPOLL> p_epoll_; std::unordered_map<int, std::shared_ptr<connection>> connections_; // 建立fd->连接对象的映射关系 struct epoll_event events_[def_num]; public: epoll_server(int port) : port_(port), p_listen_sock_(new MY_SOCKET), p_epoll_(new MY_EPOLL(def_timeout)) {} ~epoll_server() {} void add_sock(int fd, uint32_t event, func_t read_cb, func_t write_cb, func_t except_cb) { // 添加到connections中 -- 用户层 connections_.insert(std::make_pair(fd, std::make_shared<connection>(fd, read_cb, write_cb, except_cb))); // 添加到epoll模型 -- 内核 p_epoll_->ctl(EPOLL_CTL_ADD, fd, event); } void init() { p_listen_sock_->Socket(); set_no_block(p_listen_sock_->get_fd()); // 设置为非阻塞式 p_listen_sock_->Bind(port_); p_listen_sock_->Listen(); // 添加监听套接字 add_sock(p_listen_sock_->get_fd(), EVENT_IN, std::bind(&epoll_server::accept, this, std::placeholders::_1), nullptr, nullptr); lg(DEBUG, "listen_socket add success\n"); } void loop() { init(); while (true) { int n = p_epoll_->wait(events_, def_num); // 等待并获取就绪事件 for (int i = 0; i < n; ++i) { Dispatcher(events_[i]); // 每次处理一个就绪事件 } } } private: void accept(std::shared_ptr<connection> conn) // 处理连接事件 { // 连接到来时,我们要循环处理,直到无数据 while (true) { std::string clientip; uint16_t clientport; int sock = p_listen_sock_->Accept(clientip, clientport); if (sock > 0) { lg(DEBUG, "get a new client, get info-> [%s:%d], sockfd : %d", clientip.c_str(), clientport, sock); set_no_block(sock); // 设置为非阻塞式 // 将新套接字添加进connections和epoll模型 add_sock(sock, EVENT_IN, std::bind(&epoll_server::receiver, this, std::placeholders::_1), std::bind(&epoll_server::sender, this, std::placeholders::_1), std::bind(&epoll_server::excepter, this, std::placeholders::_1)); } else { // 如果底层无数据,也会错误返回,并设置错误码11 if (errno == EAGAIN) // 无数据 { break; } else if (errno == EINTR) // 被信号中断 { continue; } else { lg(ERROR, "accept error\n"); break; } } } } void receiver(std::shared_ptr<connection> conn) { while (true) // 读取至底层无数据 { char buffer[def_buffsize]; int n = read(conn->get_fd(), buffer, sizeof(buffer) - 1); if (n > 0) // 还没读完 { buffer[n] = 0; conn->append(buffer); } else if (n == 0) // 对方关闭连接 { lg(INFO, "sockfd: %d, client info %s:%d quit...", conn->get_fd(), conn->ip_.c_str(), conn->port_); conn->except_cb_(conn); // 关注异常事件 return; } else // 出错/读完 { if (errno == EAGAIN) // 读完全部数据 { break; } else if (errno == EINTR) { continue; } else // 真的出错 { conn->except_cb_(conn); // 关注异常事件 return; } } } } void sender(std::shared_ptr<connection> conn) {} void excepter(std::shared_ptr<connection> conn) {} void Dispatcher(struct epoll_event &sock) { int fd = sock.data.fd; // 需要判断是否是我们关注的文件 if (!is_safe(fd)) { lg(DEBUG, "fd: %s is not safe\n", fd); return; } int event = sock.events; if ((event & EPOLLHUP) || (event & EPOLLERR)) // 异常事件转换为读写事件 { event |= EPOLLIN; event |= EPOLLOUT; } if ((event & EPOLLIN) && connections_[fd]->read_cb_) // 如果读回调存在 { connections_[fd]->read_cb_(connections_[fd]); // 调用读回调 } if ((event & EPOLLOUT) && connections_[fd]->write_cb_) // 如果写回调存在 { connections_[fd]->write_cb_(connections_[fd]); // 调用写回调 } print(connections_[fd]); } bool is_safe(int fd) { auto it = connections_.find(fd); if (it != connections_.end()) { return true; } else { return false; } } void set_no_block(int fd) { int ret = fcntl(fd, F_GETFL); if (ret < 0) { perror("fcntl"); return; } fcntl(fd, F_SETFL, ret | O_NONBLOCK); } void print(std::shared_ptr<connection> conn) { std::cout << "fd: " << conn->get_fd() << " , "; std::cout << "in_buffer: " << conn->inbuffer().c_str() << std::endl; } };
处理数据
虽然我们存入了数据,但我们还没有处理数据
处理数据应该交给用户来决定(也就是使用回调函数,在参数中传入connection对象即可,里面包含该文件读到的所有数据),因为这部分属于应用层的事情
- 检测数据是否完整(协议定制,序列化/反序列化)
- 如果包含一份完整数据,进行处理(具体业务处理)
我们将之前写过的网络计算机代码拿过来用 -- 网络计算器(使用json序列化/反序列化,条件编译,注意点),json介绍+语法介绍_json序列化和反序列化工具-CSDN博客
网络计算器代码编写+注意点(序列化,反序列化,报头封装和解包,服务端和客户端,计算),客户端和服务端数据传递流程图,守护进程化+日志重定向到文件_计算器封装-CSDN博客
- 直接定义一个函数,使用自定义协议来处理数据(将输入缓冲区中的数据做处理,然后把结果写回输出缓冲区)
- 然后在实例化服务器时,将该函数作为实参传进去
- 这样,就可以在读取完数据之后进行处理,并且发送 (注意,发送数据应该由服务器来做,也就需要我们的回指指针发挥用处 -- 回指服务器,然后由服务器调用发送函数)
回指指针
因为connection和epoll_server会互相引用
- server里保存了全部connection结构的指针,用于管理
- 而connection里也需要引用epoll_server,来使用里面的函数
- 这样就造成了循环引用问题
所以,需要我们将connection里存放的回指指针的类型定义成weak_ptr,当需要使用时再转换为shared_ptr
- 而我们传参时,必须通过shared_ptr来转换为weak_ptr,所以我们调用shared_from_this()来返回当前对象的shared_ptr
- 而调用该函数的前提是,让需要使用的类继承enable_shared_from_this这个模板类
如何处理写事件
引入
因为写事件关注的是发送缓冲区是否有空间
- 缓冲区经常都是有空间的,所以写事件经常会就绪
- 而一旦事件就绪,wait就会返回
- 但我们通常真正关心的是"是否有数据可以发送",而不是"是否有空间"
- 所以写事件,要按需设置是否关心 -- 代码体现
对于读事件来说,我们设置常关心
- 因为读事件看的就是有无数据
循环内
那我们该如何写入呢?
- 直接调用写函数(send/write)
- 并且要像读数据一样,需要把输出缓冲区内的数据全部写进文件的发送缓冲区中才行
- 所以要循环写入
并且,和读取不一样,需要我们手动删除输出缓冲区中的数据
- 因为读取是从内核读到用户层,内核会自动帮我们删除已读出的数据
接下来,说说函数返回值的问题:
- 如果返回值>0,说明此时成功写入了数据,需要我们删除已经写入的数据(如果已经将数据全部写完了,退出循环)
- 返回值为0,说明此时缓冲区内没有数据,压根没有写入数据,直接返回
- 注意这里[退出循环]和[直接返回]的区别,因为我们要在循环结束后,处理对写事件的关心
发送出错,分几种情况(和处理读数据一样)
- 底层缓冲区没有空间了,返回EWOULDBLOCK(=EAGIN=11)
- 被信号中断
- 真的出错
处理对写事件的关心
出循环后分为两种情况:
如果outbuffer里还有数据没写完 -- 设置对写事件的关心
- 因为此时受限于底层的缓冲区空间,所以需要关注写事件
- 一旦发送缓冲区有空间了,就会通知我们,然后回调我们的写处理函数,继续发送数据
outbuffer里的数据已经被写完了 -- 取消对写事件的关心
- 数据已经写完了,即使有空间也不需要,所以不用关注
以上可以自定义一个使能事件的函数,可以自主决定是否开启读/写事件
- 是否开启 -- bool类型字段
- 然后在内部调用epoll_ctl函数,来修改特定文件对事件的关注
异常处理
一旦走到异常处理的函数中,一定是出问题了
- 那就直接移除epoll中对该文件上事件的关心
- 关闭这个连接
- 从自定义的连接管理结构中移除
代码
server.hpp
#pragma once #include <memory> #include <errno.h> #include <string> #include <functional> #include <fcntl.h> #include <unordered_map> #include "Log.hpp" #include "socket.hpp" #include "myepoll.hpp" class connection; class epoll_server; using func_t = std::function<void(std::shared_ptr<connection>)>; class connection // 每个文件都对应一个连接,拥有自己的输入输出缓冲区 { int fd_; std::string in_buffer_; std::string out_buffer_; public: func_t read_cb_; func_t write_cb_; func_t except_cb_; // 方便日志打印 std::string ip_; uint16_t port_; // 使用 weak_ptr 防止循环引用 std::weak_ptr<epoll_server> p_svr_; public: connection(int sockfd, func_t read_cb, func_t write_cb, func_t except_cb) : fd_(sockfd), read_cb_(read_cb), write_cb_(write_cb), except_cb_(except_cb) {} ~connection() {} void set_p_svr(std::weak_ptr<epoll_server> ptr) { p_svr_ = ptr; } int get_fd() { return fd_; } void in_append(const std::string &str) { in_buffer_ += str; } void out_append(const std::string &str) { out_buffer_ += str; } std::string &inbuffer() { return in_buffer_; } std::string &outbuffer() { return out_buffer_; } }; class epoll_server : public std::enable_shared_from_this<epoll_server>, public no_copy { static const int def_timeout = 1000; static const int def_num = 64; static const int def_buffsize = 128; static const uint32_t EVENT_IN = EPOLLIN | EPOLLET; // 设置ET模式 static const uint32_t EVENT_OUT = EPOLLOUT | EPOLLET; int port_; func_t handle_; std::shared_ptr<MY_SOCKET> p_listen_sock_; std::shared_ptr<MY_EPOLL> p_epoll_; std::unordered_map<int, std::shared_ptr<connection>> connections_; // 建立fd->连接对象的映射关系 struct epoll_event events_[def_num]; public: epoll_server(int port, func_t handle) : port_(port), handle_(handle), p_listen_sock_(new MY_SOCKET()), p_epoll_(new MY_EPOLL(def_timeout)) {} ~epoll_server() {} void add_sock(int fd, uint32_t event, func_t read_cb, func_t write_cb, func_t except_cb, const std::string &ip = "0.0.0.0", uint16_t port = 0) { std::shared_ptr<connection> new_connection(new connection(fd, read_cb, write_cb, except_cb)); new_connection->set_p_svr(shared_from_this()); // shared_from_this(): 返回当前对象的shared_ptr,要确保epoll_server已经以shared_ptr的形式存在(主函数中以shared_ptr形式实例化对象) new_connection->ip_ = ip; new_connection->port_ = port; connections_.insert(std::make_pair(fd, new_connection)); p_epoll_->ctl(EPOLL_CTL_ADD, fd, event); } void loop() { init(); while (true) { int n = p_epoll_->wait(events_, def_num); // 等待并获取就绪事件 for (int i = 0; i < n; ++i) { Dispatcher(events_[i]); // 每次处理一个就绪事件 } } } void receiver(std::shared_ptr<connection> conn) { int fd = conn->get_fd(); while (true) // 读取至底层无数据 { char buffer[def_buffsize]; memset(buffer, 0, sizeof(buffer)); int n = read(fd, buffer, sizeof(buffer) - 1); if (n > 0) // 还没读完 { conn->in_append(buffer); } else if (n == 0) // 对方关闭连接 { lg(INFO, "sockfd: %d, client info %s:%d quit", fd, conn->ip_.c_str(), conn->port_); conn->except_cb_(conn); // 关注异常事件 return; } else // 出错/读完 { if (errno == EAGAIN) // 读完全部数据 { break; } else if (errno == EINTR) { continue; } else // 真的出错 { lg(WARNING, "sockfd: %d, client info %s:%d error", fd, conn->ip_.c_str(), conn->port_); conn->except_cb_(conn); // 关注异常事件 return; } } } // 读完了数据,就该处理了,但不一定包含了一份完整报文 handle_(conn); } void excepter(std::shared_ptr<connection> conn) { int fd = conn->get_fd(); lg(WARNING, "Excepter hander sockfd: %d, client info %s:%d excepter handler", fd, conn->ip_.c_str(), conn->port_); p_epoll_->ctl(EPOLL_CTL_DEL, fd, 0); close(fd); lg(DEBUG, "close %d done\n", fd); connections_.erase(fd); lg(DEBUG, "remove %d from _connections done\n", fd); } void sender(std::shared_ptr<connection> conn) { auto &buffer = conn->outbuffer(); int fd = conn->get_fd(); while (true) { ssize_t n = write(fd, buffer.c_str(), buffer.size()); // 将输出缓冲区的内容写入内核 if (n > 0) // 写入一定数据 { buffer.erase(0, n); if (buffer.empty()) // 数据写完了 { break; } } else if (n == 0) // 没有数据可写 { return; } else { if (errno == EAGAIN) { break; } else if (errno == EINTR) { continue; } else { lg(WARNING, "sockfd: %d, client info %s:%d send error...", conn->get_fd(), conn->ip_.c_str(), conn->port_); conn->except_cb_(conn); return; } } } // 判断接下来是否需要关注写事件 if (buffer.empty()) { enable_event(fd, true, false); } else { enable_event(fd, true, true); } } private: void init() { p_listen_sock_->Socket(); set_no_block(p_listen_sock_->get_fd()); // 设置为非阻塞式 p_listen_sock_->Bind(port_); p_listen_sock_->Listen(); // 添加监听套接字 add_sock(p_listen_sock_->get_fd(), EVENT_IN, std::bind(&epoll_server::accept, this, std::placeholders::_1), nullptr, nullptr); lg(DEBUG, "listen_socket add success\n"); } void accept(std::shared_ptr<connection> conn) // 处理连接事件 { while (true) { std::string clientip; uint16_t clientport; int sock = p_listen_sock_->Accept(clientip, clientport); if (sock > 0) { lg(DEBUG, "get a new client, get info-> [%s:%d], sockfd : %d", clientip.c_str(), clientport, sock); set_no_block(sock); // 设置为非阻塞式 // 将新套接字添加进connections和epoll模型 add_sock(sock, EVENT_IN, std::bind(&epoll_server::receiver, this, std::placeholders::_1), std::bind(&epoll_server::excepter, this, std::placeholders::_1), std::bind(&epoll_server::excepter, this, std::placeholders::_1), clientip, clientport); } else { if (errno == EAGAIN) // 无数据 { break; } else if (errno == EINTR) // 被信号中断 { continue; } else { lg(ERROR, "accept error\n"); break; } } } } void Dispatcher(struct epoll_event &sock) { int fd = sock.data.fd; // 需要判断是否是我们关注的文件 if (!is_safe(fd)) { lg(DEBUG, "fd: %d is not safe\n", fd); return; } auto conn = connections_[fd]; if (!conn) return; int event = sock.events; if ((event & EPOLLHUP) || (event & EPOLLERR)) // 异常事件转换为读写事件 { event |= EPOLLIN; event |= EPOLLOUT; } if ((event & EPOLLIN) && conn->read_cb_) // 如果读回调存在 { conn->read_cb_(conn); // 调用读回调 } if ((event & EPOLLOUT) && conn->write_cb_) // 如果写回调存在 { conn->write_cb_(conn); // 调用写回调 } } bool is_safe(int fd) { return connections_.find(fd) != connections_.end(); // 是否在connections结构中存在 } void set_no_block(int fd) { int ret = fcntl(fd, F_GETFL); if (ret < 0) { perror("fcntl"); return; } fcntl(fd, F_SETFL, ret | O_NONBLOCK); } void enable_event(int fd, bool f_read, bool f_write) { if (fd < 0) { lg(ERROR, "Invalid file descriptor: %d", fd); return; } uint32_t event = 0; event |= ((f_read ? EPOLLIN : 0) | (f_write ? EPOLLOUT : 0) | EPOLLET); p_epoll_->ctl(EPOLL_CTL_MOD, fd, event); } };
server.cpp
#include "server.hpp" #include "cal.hpp" void def_handle(std::weak_ptr<connection> conne) { if (conne.expired()) return; auto conn = conne.lock(); calculate Cal; std::string str = Cal.cal(conn->inbuffer()); // 处理数据,得到结果 if (str.empty()) { return; } //lg(DEBUG, "get data: %s\n", str.c_str()); conn->out_append(str); // 添加到输出缓冲区 //lg(DEBUG, "out_append success\n"); // 写入 auto server = conn->p_svr_.lock(); // weak_ptr不拥有对象的所有权,需要转换为shared_ptr server->sender(conn); // 需要让服务器调用写处理函数,后续让服务器擦屁股(也许没有写入全部数据) //lg(DEBUG, "sender success\n"); } int main() { std::shared_ptr<epoll_server> epoll_svr(new epoll_server(8080, def_handle)); epoll_svr->loop(); return 0; }
其他代码在压缩包里
运行结果
我们把网络计算器的客户端也拿过来 -- 网络计算器代码编写+注意点(序列化,反序列化,报头封装和解包,服务端和客户端,计算),客户端和服务端数据传递流程图,守护进程化+日志重定向到文件_计算器封装-CSDN博客
直接做测试: