高级IO__
文章目录
- 1. 前置知识
- 1.1 高效IO的理论认识
- 1.2 五种IO模型
- 2. 非阻塞IO
- 2.1 fcntl函数介绍
- 2.2 阻塞IO实现
- 2.3 非阻塞IO实现
- 3. 多路转接之select
- 3.0 多路转接
- 3.1 select函数介绍
- 3.2 select 服务器处理读事件
- 3.3 完善服务器
- 3.4 select的缺点
- 4. 多路转接之poll
- 4.1 poll函数介绍
- 4.2 poll服务器处理读事件
- 4.3 poll的缺点
- 5. 多路转接之epoll
- 5.1 epoll函数介绍
- 5.1.1 epoll_create
- 5.1.2 epoll_ctl
- 5.1.3 epoll_wait
- 5.2 epoll的原理
- 5.3 epoll服务器
- 5.4 ET和LT
- 5.4.1 概念
- 5.4.2 LT和ET在`epoll`的工作模式
- 6. Reactor
1. 前置知识
1.1 高效IO的理论认识
IO是input
和output
的缩写,也就是输入和输出。下面是一个事实
- 在写TCP服务器时,应用层IO的时候,本质上就是把数据从用户层写给OS。(回想TCP的发送缓冲区和接受缓冲区)
- IO = 等 + 拷贝。不要忽略 等 这段时间,要想进行拷贝,需要判断读写事件的条件成立
何为高效的IO:单位时间内,IO过程中等的比重越少,IO效率越高(这是几乎所有提高IO效率的策略)
1.2 五种IO模型
5个人来钓鱼,下面是他们的策略:
- 小A(阻塞式IO):放下鱼竿后,就一直盯着鱼漂,等着鱼上钩,期间别人都不能与它对话交互,直到鱼上钩,像
read
,write
这样的接口都是这样的。 - 小B(非阻塞式IO):放下鱼竿后,定义一个时间段(假设5min),最开始检测到鱼竿上没有鱼后, 在5min内,就可以干别的事。5min后,再检测,有鱼后就钓上鱼了,没鱼就继续等。该策略的特点是轮询检查。小A和小B的鱼竿可以看做文件描述符
- 小C(信号驱动式IO):鱼漂上面放一个铃铛,放下鱼竿后就去干别的事情,当铃铛响后,钓上鱼,没响就一直干自己的事情
- 小D(多路复用,同多路转接):此人拿了好多鱼竿,全部放到水里,接着向小A或者小B一样等待鱼上钩
- 小E(异步IO):此人是个老板,看到前面4人钓鱼(同步IO),自己也想钓,但是忙于公务,认识到自己并不是想钓鱼,而是想要吃鱼。于是让自己的下属小F来替他钓鱼,给了他鱼竿和桶,接着自己就去开会去了。小F一人在钓鱼,直至桶装满鱼,打电话向小E报告。这里小F就是OS,鱼就是数据,此进程委托操作系统来帮其IO,自己去干别的事,不关心IO的具体过程,只要最终的数据。
阻塞IO和非阻塞IO的区别?
回答:IO = 等 + 拷贝。这两种IO方式等的策略不一样,一个是一直等,期间不干别的事,另一个期间可以干别的事。
同步IO和异步IO的区别?
回答:重点在于有没有参与IO,异步IO不参与IO,只是发起IO,最后拿到结果就行
2. 非阻塞IO
2.1 fcntl函数介绍
简介:fcntl
(file control)是Linux系统中用于文件描述符和套接字操作的系统调用,提供了对文件描述符的各种控制功能
函数原型:
#include <unistd.h>
#include <fcntl.h>
int fcntl(int fd, int cmd);
int fcntl(int fd, int cmd, long arg);
int fcntl(int fd, int cmd, struct flock *lock);
fd
:要操作的文件描述符。cmd
:要执行的命令。arg
或lock
:根据cmd
的不同,可能需要的额外参数。
fcntl
支持多种命令,用于不同的操作:
- F_DUPFD:复制文件描述符,返回一个新的文件描述符。
- F_GETFD和F_SETFD:获取或设置文件描述符的标志。
- F_GETFL和F_SETFL:获取或设置文件状态标志。
- F_GETLK、F_SETLK、F_SETLKW:文件锁操作。
设置非阻塞模式
基于fcntl()
, 我们实现一个setNoBlock
函数, 将文件描述符设置为非阻塞
bool setNoBlock(int fd)
{
// 1.使用F_GETFL将当前的文件描述符的属性取出来
int fl = fcntl(fd, F_GETFL);
// 2. 差错处理
if(fl < 0) {
perror("fcntl");
return false;
}
// 3.然后再使用F_SETFL将文件描述符设置回去. 设置回去的同时, 加上一个O_NONBLOCK参数
fcntl(fd, F_SETFL, fl | O_NONBLOCK);
std::cout << "Set no block done.\n";
return true;
}
2.2 阻塞IO实现
有下面的代码,会阻塞读取用户输入,然后输出
#include <iostream>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
static const int NUM = 1024;
int main()
{
char buff[NUM];
for(; ; ) {
std::cout << "Please enter: ";
fflush(stdout);
int n = read(0, buff, sizeof(buff) - 1);
if(n > 0) {
buff[n] = 0;
std::cout << "echo: " << buff;
} else if(n == 0) {
std::cout << "read done\n";
break;
} else {
std::cout << "read error! errno: " << errno << " why: " << strerror(errno) << '\n';
break;
}
}
return 0;
}
输出如下
lyf@hcss-ecs-3db9:~/pro/pro25_1_21高级IO$ ./a.out
Please enter: aaaa
echo: aaaa
Please enter: nihao
echo: nihao
Please enter: read done # 这里键入了ctrl+D, 表示EOF, 输入结束, 可以终止当前的输入流
2.3 非阻塞IO实现
修改代码,在2.2的代码中添加2.1中提到的setNoBlock()
函数。具体代码如下
#include <iostream>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
static const int NUM = 1024;
bool setNoBlock(int fd)
{
// 1.使用F_GETFL将当前的文件描述符的属性取出来
int fl = fcntl(fd, F_GETFL);
// 2. 差错处理
if(fl < 0) {
perror("fcntl");
return false;
}
// 3.然后再使用F_SETFL将文件描述符设置回去. 设置回去的同时, 加上一个O_NONBLOCK参数
fcntl(fd, F_SETFL, fl | O_NONBLOCK);
std::cout << "Set no block done.\n";
return true;
}
int main()
{
// 将标准输入设置为非阻塞
setNoBlock(0);
char buff[NUM];
for(; ; ) {
int n = read(0, buff, sizeof(buff) - 1);
if(n > 0) {
buff[n] = 0;
std::cout << "echo: " << buff;
} else if(n == 0) {
std::cout << "read done\n";
break;
} else {
std::cout << "read error! errno: " << errno << " why: " << strerror(errno) << '\n';
break;
}
}
return 0;
}
运行结果如下
lyf@hcss-ecs-3db9:~/pro/pro25_1_21高级IO$ ./a.out
Set no block done.
read error! errno: 11 why: Resource temporarily unavailable
可以观察到,errno
被设置了11,表示数据未就绪,EWOULDBLOCK
常量就是该值,需要判断。根据1.2中可以得知,当数据未就绪时,该进程可以做其他事情,所以代码改正如下(重复代码不再重新展示)。
void do_something_else()
{
// 这里可以做其它事情(*^▽^*)
sleep(1);
}
int main()
{
// ...
else {
if(errno == EWOULDBLOCK) {
std::cout << "Data not ready, try again.\n";
do_something_else();
} else {
std::cout << "read error! errno: " << errno << " why: " << strerror(errno) << '\n';
break;
}
}
}
运行结果如下,当我按下回车后,才会echo刚才我输入的全部内容。
g++ TestNoBlock.cc -o a.out -std=c++11
lyf@hcss-ecs-3db9:~/pro/pro25_1_21高级IO$ ./a.out
Set no block done.
Data not ready, try again.
Data not ready, try again.
Data not ready, try again.
Data not ready, try again.
Data not ready, try again.
AAAAAAAData not ready, try again.
AAAAAAAAAAAAAAAAAAAAData not ready, try again.
AAAAAAAAAAAAAAAAAAAAData not ready, try again.
AAA
echo: AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
Data not ready, try again.
Data not ready, try again.
不仅仅是read
,其它类似recv, write, send
均可以写成非阻塞IO的形式。用图像表示如下
3. 多路转接之select
3.0 多路转接
I/O 多路转接允许一个进程同时监视多个文件描述符,等待其中任何一个文件描述符变为“就绪”状态(如可读、可写)。
3.1 select函数介绍
简介:select
是一种用于多路复用 I/O 的系统调用,允许程序同时监视多个文件描述符(包括套接字),以确定它们是否准备好进行读取、写入或是否有异常条件发生。
函数原型:
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
/* 下面是位图操作 */
void FD_CLR(int fd, fd_set *set);
int FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);
参数说明:
-
nfds
:表示需要监视的最大文件描述符编号加1(即max(fd) + 1
)。select
会监视编号从 0 到nfds-1
的所有文件描述符。 -
readfds
:输入输出型参数,指向fd_set
类型的指针,表示需要监视的文件描述符集合,用于检测是否可读。 -
writefds
:输入输出型参数,指向fd_set
类型的指针,表示需要监视的文件描述符集合,用于检测是否可写。 -
exceptfds
:输入输出型参数,指向fd_set
类型的指针,表示需要监视的文件描述符集合,用于检测是否有异常条件发生(如带外数据)。 -
timeout
:-
指向
struct timeval
类型的指针,用于设置select
的超时时间。-
该结构体定义如下
struct timeval { time_t tv_sec; /* Seconds */ suseconds_t tv_usec; /* Microseconds */ };
-
-
如果设置为
{5, 0}
,select
表示select
函数最多等待 5 秒,直到某个文件描述符准备好,或者超时返回。 -
如果设置为
{0, 0}
,select
将立即返回,用于轮询。 -
如果为
NULL
,select
将阻塞等待,直到某个文件描述符准备好。 -
如果不设置
NULL
,该参数是一个输入输出型参数
-
返回值:
- 返回值 > 0:表示有文件描述符准备好,返回值表示准备好的文件描述符数量。
- 返回值 = 0:表示超时,没有任何文件描述符准备好。
- 返回值 < 0:表示发生错误,通常是因为无效的文件描述符或超时参数。
readfds
:
数据类型是 fd_set
,它是位图,是一个输入输出型参数
- 做输入参数时:用户告诉内核,我给你一个或者多个
fd
,你要帮我关心fd
上面的读事件,如果读事件就绪了,就要告诉用户我 - 做输出参数时:内核告诉用户,用户你让我关心的多个
fd
中,有哪些已经就绪了,用户你抓紧读取。如果用户不处理,select
会一直通知用户 - 举个例子,比如用户想给内核说明让其关心
0,1,4
3个fd
,所以该位图就可以设置为0001 0011
(作为输入参数)
当用户想给内核返回4
这个fd
表明此fd
已经就绪,所以该位图可以设置为0001 0000
(作为输出参数)
操作fd_set
:
FD_ZERO(fd_set *set)
:清空文件描述符集合。FD_SET(int fd, fd_set *set)
:将文件描述符fd
添加到集合中。FD_CLR(int fd, fd_set *set)
:从集合中移除文件描述符fd
。FD_ISSET(int fd, fd_set *set)
:检查文件描述符fd
是否在集合中。
3.2 select 服务器处理读事件
SelectServer.hpp
如下,并未处理到来的新连接,只是打印在服务器的终端上
#ifndef __SELECT_SERVER__
#define __SELECT_SERVER__
#include "Socket.hpp"
#include <sys/select.h>
static const uint16_t DEFAULT_PORT = 9000;
extern Log log;
class SelectServer
{
public:
SelectServer(const uint16_t& port = DEFAULT_PORT) : _port(port) {}
void initServer()
{
_listenSocket.Socket();
// 允许套接字绑定到一个处于 TIME_WAIT 状态的端口
int reuse = 1;
setsockopt(_listenSocket.GetFd(), SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &reuse, sizeof(reuse));
_listenSocket.Bind(_port);
_listenSocket.Listen();
}
void startServer()
{
/* accept会阻塞等待客户端的连接,一次只能等一个,
但我们这是一个实现多路转接的服务器,需要一次等待多个客户端,所以要先select
每次有一个新连接到来,等价于读事件就绪。*/
int listenSocket = _listenSocket.GetFd();
for(; ;) {
// 初始化读位图
fd_set readfds;
FD_ZERO(&readfds);
// 将文件描述符fd添加到集合中
FD_SET(listenSocket, &readfds);
// 进行select
timeval tv = {2, 0}; // 设置2s超时,由于是输入输出参数,所以可能需要周期重复的设置
int n = select(listenSocket + 1, &readfds, nullptr, nullptr, &tv);
// 判断n,进行不同的操作
switch (n)
{
case 0:
log(INFO, "Time out!: %d, %d\n", tv.tv_sec, tv.tv_usec);
sleep(1);
break;
case -1:
log(ERROR, "Select error!\n");
break;
default:
// 有事件就绪了。如果事件就绪,上层不处理,select就会一直通知用户,让其处理
// 一个事实是:如果select告诉用户数据已经就绪了,接下来,用户读取fd时不会被阻塞。
log(INFO, "Get a new link!\n");
handlerEvent(&readfds);
break;
}
}
}
bool handlerEvent(fd_set* set)
{
std::string ip;
uint16_t port;
// 由于此事件已经就绪了,所以accept不会被阻塞,这样就省去了等的过程
int socket = _listenSocket.Accept(&ip, &port);
if(socket < 0) return false;
log(INFO, "Accept success, ip: %s, port: %d\n", ip.c_str(), port);
// 这里不可以直接read,因为虽然可以建立连接,
// 但不能保证客户端一定会立马给服务端发消息,如果没发,就会一直阻塞在这里
// read();
sleep(1);
return true;
}
private:
Sock _listenSocket;
uint16_t _port;
};
#endif
main.cc
如下,包含了main()
函数
#include "SelectServer.hpp"
#include <memory>
int main()
{
std::unique_ptr<SelectServer> svr(new SelectServer());
svr->initServer();
svr->startServer();
return 0;
}
当运行后,服务器每过2s会打印Time out
,当有一个客户端(telnet)连接服务器时,由于没处理事件,会打印Get a new link
然后继续超时。如下图
3.3 完善服务器
现在需要将新连接交给select
来处理。让select
有获取更多文件描述符的能力。而且每次经过startServer()
外层的for
循环后,readfds
都需要更新(因为这是一个输入输出参数,每次输出后就会覆盖,所以需要更新),所以再添加一个成员变量fdArray[]
,默认所有初始化为-1,当有一个新连接到来,将其的值设置为fd
。数组长度为1024,因为fd_set
为128B,即1024bit,可以表示1024的fd
。
下面是修改后的代码,SelectServer.hpp
#ifndef __SELECT_SERVER__
#define __SELECT_SERVER__
#include "Socket.hpp"
#include <sys/select.h>
static const uint16_t DEFAULT_PORT = 9000;
static const int DEFAULT_FD = -1; // fdArray默认FD是-1
static const int ARRAY_SIZE = 1024; // fdArray的长度是1024
extern Log log;
class SelectServer
{
public:
SelectServer(const uint16_t& port = DEFAULT_PORT) : _port(port), _fdArray(new int[ARRAY_SIZE])
{
// 初始化fdArray,全部初始化为-1
for(int i = 0; i < ARRAY_SIZE; ++i) {
_fdArray[i] = DEFAULT_FD;
}
}
~SelectServer()
{
_listenSocket.close();
delete _fdArray;
}
void initServer()
{
_listenSocket.Socket();
// 允许套接字绑定到一个处于 TIME_WAIT 状态的端口
int reuse = 1;
setsockopt(_listenSocket.GetFd(), SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &reuse, sizeof(reuse));
_listenSocket.Bind(_port);
_listenSocket.Listen();
}
void startServer()
{
/* accept会阻塞等待客户端的连接,一次只能等一个,
但我们这是一个实现多路转接的服务器,需要一次等待多个客户端,所以要先select
每次有一个新连接到来,等价于读事件就绪。*/
int listenSocket = _listenSocket.GetFd();
_fdArray[0] = listenSocket; // 第一个元素是监听套接字的fd
for(; ;) {
// 初始化读位图
fd_set readfds;
FD_ZERO(&readfds);
int maxFd = listenSocket; // 最大的套接字,用于设置select的第一个参数
// 该循环每次都会根据_fdArray更新readfds
for(int i = 0; i < ARRAY_SIZE; ++i) {
if(_fdArray[i] == -1) continue;
// 不是-1,证明该文件描述符需要被监视,将其加入到数组
FD_SET(_fdArray[i], &readfds);
if(_fdArray[i] > maxFd) {
maxFd = _fdArray[i];
log(INFO, "Max fd update: %d\n", maxFd);
}
}
// 进行select
timeval tv = {2, 0}; // 设置2s超时,由于是输入输出参数,所以需要周期重复的设置
// int n = select(maxFd + 1, &readfds, nullptr, nullptr, &tv);
int n = select(maxFd + 1, &readfds, nullptr, nullptr, nullptr);
// 判断n,进行不同的操作
switch (n)
{
case 0:
log(INFO, "Time out!: %d, %d\n", tv.tv_sec, tv.tv_usec);
sleep(1);
break;
case -1:
log(ERROR, "Select error!\n");
break;
default:
// 有事件就绪了。如果事件就绪,上层不处理,select就会一直通知用户,让其处理
// 一个事实是:如果select告诉用户数据已经就绪了,接下来,用户读取fd时不会被阻塞。
log(INFO, "Get a new link!\n");
handlerEvent(&readfds);
break;
}
}
}
bool handlerEvent(fd_set* set)
{
// 这个循环_fdArray中所有已经就绪的事件,包括监听套接字创建连接和读事件
for(int i = 0; i < ARRAY_SIZE; ++i) {
if(_fdArray[i] == DEFAULT_FD)
continue;
int fd = _fdArray[i];
// 是监听套接字并且有读事件,证明是要创建连接
if(fd == _listenSocket.GetFd() && FD_ISSET(fd, set)) {
std::string ip;
uint16_t port;
// 由于此事件已经就绪了,所以accept不会被阻塞,这样就省去了等的过程
int socket = _listenSocket.Accept(&ip, &port);
if(socket < 0) continue;
log(INFO, "Accept success, ip: %s, port: %d\n", ip.c_str(), port);
// 该循环会在_fdArray找一个空闲的位置,把连接套接字放进去
int i = 1;
for(; i < ARRAY_SIZE; ++i) {
if(_fdArray[i] != DEFAULT_FD)
continue;
else
break;
}
if(i == ARRAY_SIZE) {
log(WARNING, "Fd array has no size!\n");
close(socket);
continue;
} else {
_fdArray[i] = socket;
printFdArray();
}
} else if(fd != _listenSocket.GetFd() && FD_ISSET(fd, set)) {
// printf("不是监听套接字, 但是有读事件, 证明可以read, fd: %d\n", fd);
// 不是监听套接字,但是有读事件,证明可以read
char buff[ARRAY_SIZE];
ssize_t n = read(fd, buff, sizeof buff - 1);
if(n > 0) {
buff[n] = 0;
log(INFO, "echo: %s", buff);
} else if(n == 0) {
log(INFO, "read done\n");
close(fd);
_fdArray[i] = DEFAULT_FD; // 将其从数组中移除
} else {
log(WARNING, "read error! errno: %d, why: %s\n", errno, strerror(errno));
close(fd);
_fdArray[i] = DEFAULT_FD;
}
} else if(fd != _listenSocket.GetFd() && !FD_ISSET(fd, set)) {
log(INFO, "不是监听套接字, 没有读事件\n");
} else if(fd == _listenSocket.GetFd() && !FD_ISSET(fd, set)) {
log(INFO, "是监听套接字, 没有读事件\n");
} else {
log(INFO, "其它情况...\n");
}
// sleep(1);
}
return true;
}
private:
void printFdArray()
{
printf("fdArray: ");
for(int i = 0; i < ARRAY_SIZE; ++i) {
if(_fdArray[i] == DEFAULT_FD)
break;
printf("%d ", _fdArray[i]);
}
printf("\n");
}
Sock _listenSocket;
uint16_t _port;
int* _fdArray;
};
#endif
运行结果如下,两个客户端并没有同时发送消息。注意已经将select的等待策略换成了阻塞等待
接下来可以做的事:可以把上面的代码95行到144行不同的处理情况分装成几个函数,进行解耦。如果想处理写事件,可以再增加一个数组。
3.4 select的缺点
- 等待的
fd
是有上限的 - 输入输出型参数比较多,数据从内核到用户,从用户到内核拷贝的频率比较高
- 输入输出型参数比较多,需要用户每次关心的
fd
进行事件重置 - 在用户层:使用第三方数组管理用户的
fd
,需要经过很多次遍历。在内核层:内核中select()
检验fd
事件是否就绪,也需要遍历。
详解第2点:
select
函数在执行时会直接修改传入的 fd_set
集合。具体来说:
- 如果某个文件描述符在调用
select
时已经准备好,select
会将该文件描述符从集合中移除。 - 如果文件描述符未准备好,
select
会保留该文件描述符在集合中。
这意味着,每次调用 select
之后,fd_set
的内容会发生变化,不再包含所有需要监视的文件描述符。如果不重新设置 fd_set
,下次调用 select
时可能会遗漏某些需要监视的文件描述符。
4. 多路转接之poll
4.1 poll函数介绍
简介:poll
是一种用于多路复用 I/O 的系统调用,类似于 select
,但它通过 pollfd
结构数组来管理文件描述符,而不是使用位掩码。poll
在某些情况下比 select
更高效,尤其是在处理大量文件描述符时。
函数原型:
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
参数说明:
-
fds
:-
指向
struct pollfd
类型的数组,每个元素表示一个需要监视的文件描述符。下面是该结构体的定义struct pollfd { int fd; /* 文件描述符 */ short events; /* 请求的事件 */ short revents; /* 实际发生的事件 */ };
-
-
nfds
:- 表示数组
fds
中的元素数量。
- 表示数组
-
timeout
:- 指定
poll
的超时时间,单位为毫秒。 - 如果
timeout
为负值,poll
将阻塞等待,直到某个文件描述符准备好。 - 如果
timeout
为 0,poll
将立即返回,用于轮询。
- 指定
常用事件标志:
POLLIN
:文件描述符准备好读取。POLLOUT
:文件描述符准备好写入。POLLERR
:发生错误条件。POLLHUP
:挂起(如对端关闭连接)。POLLNVAL
:无效的文件描述符。
返回值:
- 返回值 > 0:表示有文件描述符准备好,返回值表示准备好的文件描述符数量。
- 返回值 = 0:表示超时,没有任何文件描述符准备好。
- 返回值 < 0:表示发生错误,通常是因为无效的文件描述符或超时参数。
4.2 poll服务器处理读事件
与3.2差距不大,下面是PollServer.hpp
#ifndef __SELECT_SERVER__
#define __SELECT_SERVER__
#include "Socket.hpp"
#include <vector>
#include <poll.h>
static const uint16_t DEFAULT_PORT = 9000; // 默认端口号
static const int DEFAULT_FD = -1; // 默认FD是-1
static const int DEFAULT_ARRAY_SIZE = 1024; // 默认的长度是1024
static const short DEFAULT_EVENT = 0; // 默认事件
extern Log log;
class PollServer
{
public:
PollServer(const uint16_t& port = DEFAULT_PORT) : _port(port)
{
// 初始化_fds
_fds.resize(DEFAULT_ARRAY_SIZE);
for(int i = 0; i < DEFAULT_ARRAY_SIZE; ++i) {
_fds[i].fd = DEFAULT_FD;
_fds[i].events = DEFAULT_EVENT;
_fds[i].revents = DEFAULT_EVENT;
}
}
~PollServer()
{
_listenSocket.close();
}
void initServer()
{
_listenSocket.Socket();
// 允许套接字绑定到一个处于 TIME_WAIT 状态的端口
int reuse = 1;
setsockopt(_listenSocket.GetFd(), SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &reuse, sizeof(reuse));
_listenSocket.Bind(_port);
_listenSocket.Listen();
}
void startServer()
{
/* accept会阻塞等待客户端的连接,一次只能等一个,
但我们这是一个实现多路转接的服务器,需要一次等待多个客户端,所以要先poll
每次有一个新连接到来,等价于读事件就绪。*/
int listenSocket = _listenSocket.GetFd();
_fds[0].fd = listenSocket; // 第一个元素是监听套接字的fd
_fds[0].events = POLLIN; // 关心监听套接字的读事件
int timeout = -1; // 设置为负值,用于阻塞等待
for(; ;) {
// 进行poll
int n = poll(_fds.data(), _fds.size(), timeout);
// 判断n,进行不同的操作
switch (n)
{
case 0:
log(INFO, "Time out!\n");
break;
case -1:
log(ERROR, "Select error!\n");
break;
default:
log(INFO, "Get a new link!\n");
handlerEvent();
break;
}
}
}
bool handlerEvent()
{
// 这个循环处理_fds中所有已经就绪的事件,包括监听套接字创建连接和读事件
for(int i = 0; i < _fds.size(); ++i) {
int fd = _fds[i].fd;
int rEvent = _fds[i].revents;
int listenSocket = _listenSocket.GetFd();
if(fd == DEFAULT_FD)
continue;
if(fd == listenSocket) {
if(rEvent & POLLIN) {
// 是监听套接字并且有读事件,证明是要创建连接
acceptLink();
} else {
// 是监听套接字并且没有读事件
log(INFO, "%d是监听套接字, 没有读事件\n", fd);
}
} else if(fd != listenSocket) {
if(rEvent & POLLIN) {
// 不是监听套接字,但是有读事件,证明可以read
readData(fd, i);
} else {
// 不是监听套接字, 没有读事件
log(INFO, "%d不是监听套接字, 没有读事件\n", fd);
}
} else {
log(INFO, "其它情况...\n");
}
}
return true;
}
private:
void printFdArray()
{
printf("fdArray: ");
for(int i = 0; i < _fds.size(); ++i) {
if(_fds[i].fd == DEFAULT_FD)
break;
printf("%d ", _fds[i].fd);
}
printf("\n");
}
// 创建连接
void acceptLink()
{
std::string ip;
uint16_t port;
// 由于此事件已经就绪了,所以accept不会被阻塞,这样就省去了等的过程
int socket = _listenSocket.Accept(&ip, &port);
if(socket < 0) return;
log(INFO, "Accept success, ip: %s, port: %d\n", ip.c_str(), port);
// 该循环会在_fds找一个空闲的位置,把连接套接字放进去
int i = 1;
size_t fdSize = _fds.size();
for(; i < fdSize; ++i) {
if(_fds[i].fd != DEFAULT_FD)
continue;
else
break;
}
if(i == fdSize) {
// 扩容,2倍
log(WARNING, "Fd array has no size! In the process of expansion...\n");
_fds.resize(fdSize * 2);
close(socket);
return;
} else {
// 将该连接套接字添加进_fds中
_fds[i].fd = socket;
_fds[i].events = POLLIN;
_fds[i].revents = DEFAULT_EVENT; // 这个由OS填写
printFdArray();
}
}
// 读数据
void readData(int fd, int i)
{
char buff[DEFAULT_ARRAY_SIZE];
ssize_t n = read(fd, buff, sizeof buff - 1);
if(n > 0) {
buff[n] = 0;
log(INFO, "echo: %s", buff);
} else if(n == 0) {
log(INFO, "read done\n");
close(fd);
_fds[i].fd = DEFAULT_FD; // 将其从数组中移除
} else {
log(WARNING, "read error! errno: %d, why: %s\n", errno, strerror(errno));
close(fd);
_fds[i].fd = DEFAULT_FD;
}
}
Sock _listenSocket;
uint16_t _port;
std::vector<pollfd> _fds;
};
#endif
4.3 poll的缺点
当文件描述符增多时:
- 和
select
函数一样,poll
返回后,需要轮询遍历pollfd
来获取就绪的描述符. - 每次调用poll都需要把大量的
pollfd
结构从用户态拷贝到内核中. - 同时连接的大量客户端在一时刻可能只有很少的处于就绪状态, 因此随着监视的描述符数量的增长, 其效率也会线性下降.
5. 多路转接之epoll
5.1 epoll函数介绍
5.1.1 epoll_create
简介:用于创建一个 epoll 实例。它是使用 epoll 机制进行 I/O 多路复用的第一步。epoll 是一种高效的 I/O 事件通知机制,适用于处理大量并发连接。
函数原型:
#include <sys/epoll.h>
int epoll_create(int size);
参数说明:
size
:这是一个历史遗留参数,从 Linux 2.6.8 开始,该参数不再起作用,但必须大于零。
返回值:
- 成功:返回一个大于 0 的整数,表示 epoll 实例的文件描述符。
- 失败:返回 -1,并设置
errno
。
5.1.2 epoll_ctl
简介:用于向 epoll
实例中添加、修改或删除文件描述符,并让OS关心这些fd
上面的事件。
函数原型:
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
参数说明:
-
epfd
:由epoll_create
或epoll_create1
创建的epoll
实例的文件描述符。 -
op
:指定对文件描述符的操作,可以是以下值之一:EPOLL_CTL_ADD
:向epoll
实例中添加一个新的文件描述符。EPOLL_CTL_MOD
:修改已存在的文件描述符的事件类型。EPOLL_CTL_DEL
:从epoll
实例中删除一个文件描述符。
-
fd
:需要操作的目标文件描述符。 -
event
:指向struct epoll_event
的指针,该结构指定了需要监听的事件类型。下面是该结构体的定义-
struct epoll_event { uint32_t events; // 监听的事件类型 epoll_data_t data; // 用户自定义数据 };
-
events
:指定要监听的事件类型。可以是下面几个宏的集合- EPOLLIN : 表示对应的文件描述符可以读 (包括对端SOCKET正常关闭)
- EPOLLOUT : 表示对应的文件描述符可以写
- EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来)
- EPOLLERR : 表示对应的文件描述符发生错误
- EPOLLHUP : 表示对应的文件描述符被挂断
- EPOLLET : 将EPOLL设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的
- EPOLLONESHOT:只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个socket的话, 需要再次把这个socket加入到EPOLL队列里
-
data
:用户自定义的数据,通常用于存储与文件描述符相关的上下文信息。
返回值
- 成功:返回 0。
- 失败:返回 -1,并设置
errno
。
-
5.1.3 epoll_wait
简介:阻塞一段时间并等待事件发生,返回事件集合,也就是获取内核的事件通知。
函数原型:
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
参数说明:
epfd
:- 由
epoll_create
或epoll_create1
创建的epoll
实例的文件描述符。
- 由
events
:- 输出型参数,指向
struct epoll_event
类型的数组,用于存储epoll_wait
返回的事件信息。
- 输出型参数,指向
maxevents
:- 指定
events
数组的最大容量,即epoll_wait
最多返回的事件数量。 - 必须大于 0。
- 指定
timeout
:- 指定
epoll_wait
的超时时间,单位为毫秒。 - 如果
timeout
为负值,epoll_wait
将阻塞等待,直到有事件发生。 - 如果
timeout
为 0,epoll_wait
将立即返回,用于轮询。
- 指定
返回值:
- 返回值 > 0:表示有文件描述符准备好,返回值表示准备好的文件描述符数量。
- 返回值 = 0:表示超时,没有任何文件描述符准备好。
- 返回值 < 0:表示发生错误,通常是因为无效的文件描述符或超时参数。
5.2 epoll的原理
该系统调用帮我们维护了一个红黑树,一个就绪队列(双向链表),一个回调函数。下图只是方便理解
epoll_create
要加入监听事件的描述符都需要添加到这棵红黑树来。当红黑树上监听的描述符发生对应的监听事件时,内核会将这个节点插入到就绪队列中来。epoll_ctl
通过操作红黑树上的节点来控制epoll
对象epoll_wait
遍历双向链表,把双向链表里的节点数据拷贝出来,拷贝完毕后就从双向链表移除。
5.3 epoll服务器
NoCopy.hpp
不允许拷贝
#ifndef __NO_COPY__
#define __NO_COPY__
class NoCopy
{
public:
NoCopy() = default;
NoCopy(const NoCopy&) = delete;
NoCopy& operator=(const NoCopy&) = delete;
};
#endif
Epoller.hpp
封装了上面的三个接口,继承NoCopy
#ifndef __EPOLLER__
#define __EPOLLER__
#include <sys/epoll.h>
#include <string.h>
#include <exception>
#include "log.hpp"
#include "NoCopy.hpp"
extern Log log;
class Epoller : public NoCopy
{
public:
Epoller()
{
_epFd = epoll_create(1);
if(_epFd < 0) {
log(ERROR, "Epoll create error! why: ", strerror(errno));
throw std::runtime_error("Epoll create failed");
} else {
// throw std::logic_error("I feel awesome!\nJust test!");
log(INFO, "Epoll create success, _epFd: %d.\n", _epFd);
}
}
~Epoller()
{
if(_epFd >= 0) {
log(INFO, "~Epoller(), close _epFd now.\n");
close(_epFd);
}
}
void epollerCtl(int op, int fd, uint32_t event)
{
if(op == EPOLL_CTL_DEL) {
// 对该文件描述符指向的节点进行删除
int n = epoll_ctl(_epFd, op, fd, nullptr);
if(n < 0) {
log(ERROR, "Epoll delete control error! why: ", strerror(errno));
throw std::runtime_error("Epoll delete control failed");
}
} else {
// 添加或者修改
struct epoll_event ee;
ee.events = event;
ee.data.fd = fd; // 添加data字段的fd,目的是方便后期我们得知是哪一个fd就绪了。在handlerEvent中会永奥
int n = epoll_ctl(_epFd, op, fd, &ee);
if(n < 0) {
log(ERROR, "Epoll control error! why: ", strerror(errno));
throw std::runtime_error("Epoll control failed");
}
}
}
int epollerWait(struct epoll_event *events, int maxevents)
{
return epoll_wait(_epFd, events, maxevents, _timeOut);
}
private:
int _epFd;
int _timeOut = -1;
};
#endif
EpollServer.hpp
处理各种逻辑,接受客户端的消息,向其写回字符串
#ifndef __EPOLL_SERVER__
#define __EPOLL_SERVER__
#include "Socket.hpp"
#include "Epoller.hpp"
#include <memory>
class EpollServer
{
static const uint16_t DEFAULT_PORT = 9000;
static const int NUM = 128;
static const int DEFAULT_ARRAY_SIZE = 1024;
public:
EpollServer(uint16_t port = DEFAULT_PORT) : _port(port)
{
try{
// 创建Epoller
_epoller = std::make_unique<Epoller>();
// 初始化监听套接字
_listenSocket = std::make_unique<Sock>();
} catch(const std::exception& e) {
throw;
}
}
~EpollServer()
{
log(INFO, "~EpollServer(), close _listenSocket now.\n");
_listenSocket->Close();
}
void initServer()
{
// 创建监听套接字
_listenSocket->Socket();
// 允许套接字绑定到一个处于 TIME_WAIT 状态的端口
int reuse = 1;
setsockopt(_listenSocket->GetFd(), SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &reuse, sizeof(reuse));
_listenSocket->Bind(_port);
_listenSocket->Listen();
log(INFO, "Create listen socket done: %d.\n", _listenSocket->GetFd());
}
void runServer()
{
// 让服务器关心监听套接字上的读事件。实际上是将监听套接字添加到红黑树上
_epoller->epollerCtl(EPOLL_CTL_ADD, _listenSocket->GetFd(), EPOLLIN);
struct epoll_event recEvents[NUM];
for(; ;) {
// 等待事件发生,如果发生了,将事件放到recEvents数组中
int length = _epoller->epollerWait(recEvents, NUM);
switch (length)
{
case 0:
log(INFO, "Time out.\n");
break;
case -1:
log(ERROR, "Epoller wait error, why: \n", strerror(errno));
throw std::runtime_error("Epoller wait failed");
default:
handlerEvent(recEvents, length);
break;
}
}
}
// 事件派发
void handlerEvent(struct epoll_event* recEvents, int length)
{
// log(INFO, "handlerEvent.\n");
int listenSocekt = _listenSocket->GetFd();
for(int i = 0; i < length; ++i) {
int nowFd = recEvents[i].data.fd;
uint32_t event = recEvents[i].events;
if(event & EPOLLIN) {
// 有读事件
if(nowFd == listenSocekt) {
// 是监听套接字, 创建连接
acceptLink();
} else {
// 不是监听套接字, 读取数据
readData(nowFd);
}
} else if(event & EPOLLOUT) {
log(INFO, "A write event is ready.\n");
} else {
log(INFO, "Other events ready.\n");
}
}
sleep(1);
}
// 创建连接
void acceptLink()
{
int listenSocekt = _listenSocket->GetFd();
std::string ip;
uint16_t port;
int newFd = _listenSocket->Accept(&ip, &port);
// 让epoll关心该套接字
_epoller->epollerCtl(EPOLL_CTL_ADD, newFd, EPOLLIN);
log(INFO, "Server get a new link: %d. Ready to read!\n", newFd);
}
// 读数据
void readData(int fd)
{
char buff[DEFAULT_ARRAY_SIZE];
ssize_t n = read(fd, buff, sizeof buff - 1);
if(n > 0) {
buff[n] = 0;
log(INFO, "echo: %s", buff);
// 向客户端写回
std::string echoString = "Server has already get your string. your fd is ";
echoString += to_string(fd);
echoString += '\n';
write(fd, echoString.c_str(), echoString.size());
} else if(n == 0) {
log(INFO, "%d fd read done.\n", fd);
// 不需要让epoll关心了
_epoller->epollerCtl(EPOLL_CTL_DEL, fd, 0);
close(fd);
} else {
log(WARNING, "%d fd read error! errno: %d, why: %s\n", fd, errno, strerror(errno));
_epoller->epollerCtl(EPOLL_CTL_DEL, fd, 0);
close(fd);
}
}
private:
uint16_t _port;
std::unique_ptr<Epoller> _epoller;
std::unique_ptr<Sock> _listenSocket;
};
#endif
main.cc
#include "EpollServer.hpp"
int main()
{
try {
std::unique_ptr<EpollServer> svr(new EpollServer());
svr->initServer();
svr->runServer();
} catch(const std::exception& e) {
std::cerr << e.what() << '\n';
}
return 0;
}
运行结果如下
向队列中添加一次,还是次次都添加
5.4 ET和LT
epoll
有2种工作方式:水平触发(LT)和边缘触发(ET)
5.4.1 概念
Level Triggered(LT,水平触发)
- 定义:当文件描述符上的事件就绪后,
epoll_wait
会通知应用程序,即使应用程序没有立即处理该事件,epoll_wait
在后续调用中仍然会继续通知该事件,直到事件被处理。 - 特点:
- 应用程序可以延迟处理事件,
epoll_wait
会持续提醒。 - 更适合简单的应用场景,因为不需要严格管理事件的处理状态。
- 应用程序可以延迟处理事件,
Edge Triggered(ET,边缘触发)
- 定义:当文件描述符上的事件就绪后,
epoll_wait
只会通知应用程序一次。如果应用程序没有立即处理该事件,或者事件未完全处理完成,epoll_wait
在后续调用中不会再次通知该事件。 - 特点:
- 应用程序必须立即处理事件,并且需要处理完成,否则可能会错过事件(所以需要程序员做到,每次通知,必须把本轮数据全部取走。可以通过非阻塞循环读取该
fd
上的做到)。 - 更高效,减少了同一事件被重复触发的次数,适合高性能场景。
- 应用程序必须立即处理事件,并且需要处理完成,否则可能会错过事件(所以需要程序员做到,每次通知,必须把本轮数据全部取走。可以通过非阻塞循环读取该
ET vs LT:
一般来说,ET的通知效率更高,IO效率也更高。但实际上:LT也可以将所有的fd
设置成非阻塞,然后循环读取,这样通知第一次时,就把数据全部取走,和ET是一样的。
5.4.2 LT和ET在epoll
的工作模式
epoll
默认状态下就是LT工作模式:
- 当
epoll
检测到socket上事件就绪的时候, 可以不立刻进行处理. 或者只处理一部分。 - 比如缓冲区中有2K数据,但第一次调用
epoll_wait
返回时,read()
只读了1K数据, 缓冲区中还剩1K数据 - 在第二次调用
epoll_wait
时,epoll_wait
仍然会立刻返回并通知socket读事件就绪 - 直到缓冲区上所有的数据都被处理完,
epoll_wait
才不会立刻返回 - 支持阻塞读写和非阻塞读写
如果我们在第1步将socket添加到epoll
描述符的时候使用了EPOLLET
标志, epoll
进入ET工作模式
- 当
epoll
检测到socket上事件就绪时, 必须立刻处理 - 同上面一样,虽然只读了1K的数据,缓冲区还剩1K的数据,但
epoll_wait
只会通知一次 - 也就是说, ET模式下, 文件描述符上的事件就绪后, 只有一次处理机会。
- 只支持非阻塞的读写
6. Reactor
ET模式下的Reactor:Reactor 模式是一种基于事件驱动的设计模式,它通过将 I/O 事件的检测和处理分离,使得程序能够高效地处理多个客户端的并发请求。在 Reactor 模式中,有一个专门的事件多路分离器(Event Demultiplexer)负责监听多个 I/O 源上的事件,当某个 I/O 源上有事件发生时,将事件分发给对应的事件处理器(Event Handler)进行处理。
具体代码在此