【Linux】多路转接--select、poll、epoll,非阻塞等待
1.IO的概念
IO=等+拷贝数据
- 等:发送缓冲区满了或者接受缓冲区没有数据,就需要等待
高效IO就是:减少单位时间内,"等"的比重
2. 阻塞IO和非阻塞IO
2.1.阻塞IO
阻塞等待会在read的地方等待
#include <iostream>
#include <cstdio>
#include <unistd.h>
#include <errno.h>
using namespace std;
int main()
{
char buffer[1024] = {0};
// 阻塞IO
while (1)
{
ssize_t s = read(0, buffer, sizeof(buffer) - 1);
if (s > 0)
{
//去掉回车
buffer[s-1] = 0;
printf("read success,echo:%s\n",buffer);
}
else if (s == 0)
{
printf("write closed,errno:%d\n", errno);
}
else
{
printf("read failed,errno:%d\n", errno);
}
}
return 0;
}
2.2.非阻塞等待
2.2.1.设置非阻塞等待
1.直接在打开文件的时候设置O_NONBLOCK或者O_NDELAY ,返回的文件描述符就是非阻塞的
int fd1=open("test.txt",O_WRONLY|O_CREAT|O_NONBLOCK,0644);
2.使用fcntl系统函数修改文件描述符
#include <iostream>
#include <cstdio>
#include <unistd.h>
#include <fcntl.h>
#include <error.h>
using namespace std;
void SetNonBlock(int fd)
{
int fl = fcntl(fd, F_GETFL);
if (fl < 0)
{
cerr << "fcntl failed" << endl;
return;
}
fcntl(fd, F_SETFL, fl | O_NONBLOCK);
}
int main()
{
SetNonBlock(0);
return 0;
}
2.2.2.非阻塞的特点
#include <iostream>
#include <cstdio>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
using namespace std;
void SetNonBlock(int fd)
{
int fl = fcntl(fd, F_GETFL);
if (fl < 0)
{
cerr << "fcntl failed" << endl;
return;
}
fcntl(fd, F_SETFL, fl | O_NONBLOCK);
}
int main()
{
SetNonBlock(0);
char buf[1024];
while (1)
{
ssize_t s = read(0, buf, sizeof(buf) - 1);
if (s > 0)
{
//去掉回车
buf[s-1] = 0;
printf("read success,echo: %s\n", buf);
}
else if (s == 0)
{
printf("write closed,errno:%d\n", errno);
}
else
{
printf("read failed,errno:%d\n", errno);
}
sleep(1);
}
return 0;
}
结果如下:非阻塞等待,也就是轮询检测,当如果等待没就绪的话read的返回值是<0的,和错误的返回值都是<0的;我们怎么区分read错误和等待没就绪了
区分错误和等待未就绪方法
- 错误都会被设置错误码,为什么会设置C语言的错误码因为Linux是使用C语言编写的,EAGAIN/EWOULDBLOCK的值都是11;
优化后的代码
while (1)
{
ssize_t s = read(0, buf, sizeof(buf) - 1);
if (s > 0)
{
//去掉回车
buf[s-1] = 0;
printf("read success,echo: %s\n", buf);
}
else if (s == 0)
{
printf("write closed,errno:%d\n", errno);
}
else
{
//等待未就绪,这两个宏都被定义为11
if(errno==EAGAIN||errno==EWOULDBLOCK)
{
cout<<"time out"<<endl;
}
else
{
printf("read failed,errno:%d\n", errno);
}
}
sleep(1);
}
3.多路转接--select
select就是可以同是等待一批IO,这样单位时间内的等待时间减少了;
select函数
int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);
返回值:等待就绪的个数
int nfds参数 :等待位图中最大的一个文件描述符+1,因为OS执行select使用循环的方式来判断是否等待就绪的:for(int i=0;i<nfds;i++)
fd_set *readfds/writefds/exceptfds参数 :fd_set是一个位图结构,这些参数都是输入输出型参数,需要用户输入需要等待哪些文件描述符,select等待就绪成功,也是把等待就绪的文件描述符写入到这个fd_set位图结构的
struct timeval *timeout参数 :传值nullptr表示阻塞等待,{0,0}表示非阻塞等待,{1,0}两个参数有不为0表示对应的时间没有等待就绪就返回一次;{s,us}前一个是秒,后一个是微秒
3.1.select的执行过程
3.1.1.fd_set位图
因为是一个位图结构,所以需要使用对应函数来操作;
void FD_CLR(int fd, fd_set *set); // 用来清除描述词组set中相关fd 的位
int FD_ISSET(int fd, fd_set *set); // 用来测试描述词组set中相关fd 的位是否为真
void FD_SET(int fd, fd_set *set); // 用来设置描述词组set中相关fd的位
void FD_ZERO(fd_set *set); // 用来清除描述词组set的全部位
3.1.2.执行过程
3.2.写一个代码:
封装的套接字接口socket.hpp
#pragma once #include<iostream> #include<cstdlib> #include<sys/socket.h> #include<sys/types.h> #include<netinet/in.h> #include<arpa/inet.h> namespace ns_socket{ class sock{ public: static int Socket() { int sock=socket(AF_INET,SOCK_STREAM,0); if(sock<0) { std::cerr<<"socket"<<std::endl; exit(1); } return sock; } static void Bind(int sock,uint16_t port) { struct sockaddr_in local; local.sin_family=AF_INET; local.sin_addr.s_addr=INADDR_ANY; local.sin_port=htons(port); if(bind(sock,(struct sockaddr*)&local,sizeof(local))<0) { std::cerr<<"bind"<<std::endl; exit(2); } } static void Listen(int sock) { if(listen(sock,5)<0) { std::cerr<<"listen"<<std::endl; exit(3); } } static int Accept(int sock) { struct sockaddr_in tmp; socklen_t tlen=sizeof(tmp); int new_sock=accept(sock,(struct sockaddr*)&tmp,&tlen); if(new_sock<0) { std::cerr<<"accept"<<std::endl; exit(4); } return new_sock; } static void Connect(int sock,char* server_ip,char* server_port) { struct sockaddr_in local; local.sin_family=AF_INET; local.sin_addr.s_addr=inet_addr(server_ip); local.sin_port=htons(atoi(server_port)); if(connect(sock,(struct sockaddr*)&local,sizeof(local))<0) { std::cerr<<"connect"<<std::endl; exit(5); } else { std::cout<<"connet success"<<std::endl; } } }; }
建立创建、bind、监听;
#include "socket.hpp" #include <unistd.h> #include <sys/select.h> using namespace ns_socket; using namespace std; #define NUM (sizeof(fd_set)*8)//保存数组的大小 void Usage() { std::cout<<"Usage:"<<"./select_server port"<<std::endl; } int main(int argc,char* argv[]) { if(argc!=2){ Usage(); return -2; } //fd_set readfds;//读等待位图 //std::cout<<sizeof(readfds)<<std::endl;//测试fd_set位图结构有多少个数 int sock_listen=sock::Socket(); cout<<"sock_liste: "<<sock_listen<<endl; sock::Bind(sock_listen,uint16_t(atoi(argv[1]))); sock::Listen(sock_listen); }
创建保存数组和fd_set
fd_set readfds;//读等待位图 int* fds_array=new int[NUM];//记录select的文件描述符 for(int i=0;i<NUM;i++) { fds_array[i]=-1; } fds_array[0]=sock_listen;
select的注意要有新fd或者删除fd都要添加到或者删除保存数组
while(true) { FD_ZERO(&readfds);//情况位图结构 int max_fd=fds_array[0];//最大文件描述符 for(int i=0;i<NUM;i++) { if(fds_array[i]!=-1){ max_fd=fds_array[i]; FD_SET(fds_array[i],&readfds); } } timeval tm={1,0}; int select_retrun=select(max_fd+1,&readfds,nullptr,nullptr,&tm);//第一个参数为最大文件描述符+1; if(select_retrun>0) { for(int sock=0;sock<NUM;sock++)//哪些文件描述符等待成功了并处理他 { if(FD_ISSET(sock,&readfds)) { if(sock==sock_listen)//新链接 { int new_sock=sock::Accept(sock_listen); if(new_sock>=0) { int tmp=0; for(int tmp=0;tmp<NUM;tmp++)//新链接的文件描述符添加会fds数组 { if(fds_array[tmp]==-1) { fds_array[tmp]=new_sock; cout<<"获得新链接: "<<new_sock<<endl; break; } } if(tmp==NUM) { close(new_sock); cout<<"链接已满,请重试,关闭描述符: "<<new_sock<<endl; } } } else//新数据读取 { cout<<sock<<"号文件描述符等待成功"<<endl; char buffer[1024]{0}; ssize_t s=read(sock,buffer,sizeof(buffer)-1); if(s>0) { buffer[s]=0; cout<<buffer<<endl; } else if(s==0) { cout<<"对端关闭,关闭文件描述符: "<<sock<<endl; close(sock); for(int i=0;i<NUM;i++)//fds数组移除对应文件描述符 { if(fds_array[i]==sock) fds_array[i]=-1; } } else { cout<<"读取数据失败,关闭文件描述符: "<<sock<<endl; close(sock); for(int i=0;i<NUM;i++)//fds数组移除对应文件描述符 { if(fds_array[i]==sock) fds_array[i]=-1; } } } } } } else if(select_retrun==0) { cout<<"没有等待成功文件描述符,继续"<<endl; continue; } else { cout<<"select失败,终止进程"<<endl; return -1; } } close(sock_listen); return 0;
3.3.select的优缺点
优点:
- 在单进程就可以等待一批进程,单位时间内减少等待的时间
缺点:
- 每次等待就绪fd_set位图结构等待的fd就会改变,需要用保存fd的数组再次设置;
- fd_set位图结构的大小有上限,所以同时检测的fd是有限的;
- select底层需要轮询检测哪些fd的事件就绪了;select的第一个参数要加一的原因;
- select可能会较高频率的从用户到内核,从内核到用户的频繁拷贝问题;每次都要重新设置fd_set和就绪改变fd_set;
4.多路转接--poll
poll函数
- 第一个参数是一个结构体,有文件描述符和short位图;
- 第二个参数数组的元素多少个;
- 第三个参数以毫秒为单位;1000就是1s;
#include <poll.h> int poll(struct pollfd *fds, nfds_t nfds, int timeout); // pollfd结构 struct pollfd { int fd; /* file descriptor */ short events; /* requested events */ short revents; /* returned events */ };
struct pollfd:
event和revent的每个位置代表的事件:
写一个poll代码:
#include "socket.hpp"
#include <unistd.h>
#include <poll.h>
using namespace ns_socket;
using namespace std;
#define NUM 128
void Usage()
{
std::cout << "Usage:"<< "./poll_server port" << std::endl;
}
int main(int argc, char *argv[])
{
if (argc != 2)
{
Usage();
return -2;
}
int sock_listen = sock::Socket();
cout << "sock_liste: " << sock_listen << endl;
sock::Bind(sock_listen, uint16_t(atoi(argv[1])));
sock::Listen(sock_listen);
struct pollfd poll_arr[NUM];
for (int i = 0; i < NUM; i++)
{
poll_arr[i].fd = -1;
poll_arr[i].events = 0;
poll_arr[i].revents = 0;
}
poll_arr[0].fd = sock_listen;
poll_arr[0].events = POLLIN;
poll_arr[0].revents = 0;
while (true)
{
int ret = poll(poll_arr, sizeof(poll_arr) / sizeof(poll_arr[0]), 2000);
if (ret > 0)
{
for (int i = 0; i < NUM; i++) // 哪些文件描述符等待成功了并处理他
{
if (poll_arr[i].revents & POLLIN)
{
if (poll_arr[i].fd == sock_listen) // 新链接
{
int new_sock = sock::Accept(sock_listen);
if (new_sock >= 0)
{
int tmp=0;
for (int tmp = 0; tmp < NUM; tmp++) // 新链接的文件描述符添加会fds数组
{
if (poll_arr[tmp].fd == -1)
{
poll_arr[tmp].fd = new_sock;
poll_arr[tmp].events = POLLIN;
poll_arr[tmp].revents = 0;
cout << "获得新链接: " << new_sock << endl;
break;
}
}
if (tmp == NUM)
{
close(new_sock);
cout << "链接已满,请重试,关闭描述符: " << new_sock << endl;
}
}
}
else // 新数据读取
{
cout << poll_arr[i].fd << "号文件描述符等待成功" << endl;
char buffer[1024]{0};
ssize_t s = read(poll_arr[i].fd, buffer, sizeof(buffer) - 1);
if (s > 0)
{
buffer[s] = 0;
cout << buffer << endl;
}
else if (s == 0)
{
cout << "对端关闭,关闭文件描述符: " << poll_arr[i].fd << endl;
close(poll_arr[i].fd);
for (int j = 0; j < NUM; j++) // fds数组移除对应文件描述符
{
if (poll_arr[j].fd == poll_arr[i].fd)
{
poll_arr[j].fd = -1;
poll_arr[j].events = 0;
poll_arr[j].revents = 0;
}
}
}
else
{
cout << "读取数据失败,关闭文件描述符: " << poll_arr[i].fd << endl;
close(i);
for (int j = 0; j < NUM; j++) // fds数组移除对应文件描述符
{
if (poll_arr[j].fd == poll_arr[i].fd)
{
poll_arr[j].fd = -1;
poll_arr[j].events = 0;
poll_arr[j].revents = 0;
}
}
}
}
}
}
}
else if (ret == 0)
{
cout << "没有等待成功文件描述符,继续" << endl;
continue;
}
else
{
cout << "poll失败,终止进程" << endl;
return -1;
}
}
close(sock_listen);
return 0;
}
较于select的优点:
- 就绪了不用再重新拷贝
- 等待的IO数量没有限制了
- 代码的书写更简单
5.多路转接--epoll
5.1.epoll函数
1.创建一个epoll模型,这个模型有3部分:一颗红黑树、回调机制、就绪队列
- 返回值:一个文件描述符,用完记得关闭;
- 参数:自从linux2.6.8之后,size参数是被忽略的,当时还是要填一个非零值;
int epoll_create(int size);
2.用户告诉内核等待那些fd和对应的事件
- 参数1:创建epoll模型的返回值文件描述符
- 参数2:删除、修改、增加struct epoll_event结构体的;
- 参数3:文件描述符,便于查找插入红黑树
- 参数4:一个结构体,包含时间位图由一个int实现,和fd
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
删除、修改、增加struct epoll_event结构体的宏;
EPOLL_CTL_ADD :注册新的fd到epfd中 EPOLL_CTL_MOD :修改已经注册的fd的监听事件 EPOLL_CTL_DEL :从epfd中删除一个fd
struct epoll_event结构体定义:
3.内核告诉用户哪些文件描述符就绪了
- 返回值:就绪fd的个数;
- 参数1:创建epoll模型的返回值文件描述符;
- 参数2:输出型参数需要自己创建一个struct epoll_event数组;
- 参数3:上面数组的个数;
- 参数4:毫秒为单位,1000表示1s;
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
5.2.epoll原理:
epoll_create:创建一个epoll模型,这个模型有3部分:一颗红黑树、回调机制、就绪队列
epoll_ctl(epfd, EPOLL_CTL_ADD,fd, event):添加一个需要检测的fd;添加到红黑树并建立对应的回调机制;
epoll_wait:轮询检测等待fd,就绪回调机制会知道,并把对应的struct epoll_event添加到就绪队列;
5.3.写一份代码:
#include "socket.hpp"
#include <unistd.h>
#include <sys/epoll.h>
using namespace ns_socket;
using namespace std;
#define SIZE 128
#define NUM 68
void Usage()
{
cout << "usage: ./epoll_server port" << endl;
}
int main(int argc, char *argv[])
{
if (argc != 2)
{
Usage();
return -1;
}
int listen_sock = sock::Socket();
sock::Bind(listen_sock, uint16_t(atoi(argv[1])));
sock::Listen(listen_sock);
// 建立epoll模型
int epfd = epoll_create(SIZE);
cout << "epfd: " << epfd << endl;
// 设置fd对应的event的事件和fd
struct epoll_event epevent;
epevent.events = EPOLLIN | EPOLLOUT;
epevent.data.fd = listen_sock;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, listen_sock, &epevent) != 0)
{
cout << "listen_sock epoll_ctl fail" << endl;
return -2;
}
volatile bool quit = false;
struct epoll_event events[NUM];
while (!quit)
{
int timeout = 1000;
int wait_num = epoll_wait(epfd, events, NUM, timeout);
if (wait_num > 0)
{
// cout<<"有事件就绪了"<<endl;
for (int i = 0; i < wait_num; i++)
{
// 等待成功先拿出fd和判断是哪个事件成功
int fd = events[i].data.fd;
if (events[i].events & EPOLLIN)
{
cout << fd << "号文件描述符读就绪" << endl;
// 新链接
if (fd == listen_sock)
{
cout << fd << "号文件描述符获取新链接" << endl;
int new_sock = sock::Accept(fd);
if (new_sock >= 0)
{
struct epoll_event add_event;
add_event.events = EPOLLIN;
add_event.data.fd = new_sock;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, new_sock, &add_event) == 0)
cout << new_sock << "号链接被添加到epoll" << endl;
else
{
close(new_sock);
cout << "epoll_ctl fail,close" << new_sock << endl;
}
}
}
// 读取数据
else
{
cout << fd << "号文件描述符读取数据" << endl;
char buffer[1024] = {0};
ssize_t s = read(fd, buffer, sizeof(buffer) - 1);
if (s > 0)
{
buffer[s] = 0;
cout << fd << "client: " << buffer << endl;
}
else if (s == 0)
{
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, nullptr);
close(fd);
cout << "对端关闭,已关闭文件描述符和在epoll中去除" << endl;
}
else
{
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, nullptr);
close(fd);
cout << "读取失败,已关闭文件描述符和在epoll中去除" << endl;
}
}
}
}
}
else if (wait_num == 0)
cout << "timeout ..." << endl;
else
cout << "epoll error" << endl;
}
close(epfd);
close(listen_sock);
return 0;
}
5.4.epoll的两种工作模式:LT(水平触发 level drigger)和ET(边缘触发edge drigger) (重要)
LT(水平触发):epoll的默认工作模式
- 当epoll检测到socket上事件就绪的时候, 可以不立刻进行处理. 或者只处理一部分,后序还会通知;
- 直到缓冲区上所有的数据都被处理完, epoll_wait 才不会立刻返回;
- 支持阻塞读写和非阻塞读写
ET(边缘触发):将socket新链接添加到epoll描述符的时候设置了EPOLLET标志, epoll进入ET工作模式.
- 通知场景:从无到有,从有到多,就是必须有新数据才会通知一次
- epoll检测到socket文件描述符必须立即处理并且必须把数据处理完,因为不会再次通知;
- 支持非阻塞读写:如果阻塞等待;有100字节的数据,一次读100字节,一次刚好读完阻塞等待还是会再次读读,但是已经没有数据了,那么就会阻塞在这个读取的地方直到有新数据来到;
ET模式较于LT模式的通知效率更高,实际的情况还是要看具体情况,整体效率不仅仅看通知效率,还有对端的发送效率(一次发多少,多少时间发一次)和自己的接受效率(一次读多少);