C++ IO多路复用 poll模型
原文链接:C++ IO多路复用 poll模型
预备知识
poll模型前置需要了解的可以参考:
IO控制:fcntl库:IO控制库
多线程:C++ Linux多线程同步通信-信号量
socket:C++ Linux多进程Socket通信
select模型:C++ IO多路复用 select模型
poll模型
特性
原理
poll是对select的改进,主要与select进行比较
优点
- IO无数量限制:select用位图检测IO,存在一个MAX_SIZE,但是poll引入链表解决上限问题
- poll将监听集合与响应集合分别设置,不需要每次重置集合
缺点
- 没有改变select的线性遍历的轮询方法,连接数越多,耗时越长
库函数<sys/poll>
# include <poll>
#include <errno.h>
int poll ( struct pollfd * fdarray, unsigned int nfds, int timeout);
fdarray: 监听的IO数组
nfds: fds数组的长度
timeout: 0(非阻塞) -1(阻塞) >0(时限)
return >0(存在事件的IO数量) 0(超出时限无IO事件) -1(错误)
错误的errno类型:EBADF(存在无效IO),EFAULT(fds越界),EINTR(收到中断信号),EINVAL(fds超过上限),ENOMEM(内存不足)
struct pollfd {
int fd; // 需要监听的IO
short events; // 等待的事件类型
short revents; // 实际发生的事件类型
} ;
events(监听位标志):
POLLIN 所有可读
POLLRDNORM 正常数据可读
POLLRDBAND 高优先级数据可读 (如缓冲区需要优先读取的数据)
POLLPRI 紧急数据可读 (要求立即读取,如TCP外带指针的控制信息) 四种标志区分用于对读事件有更高区分要求的场景
POLLOUT 所有可写
POLLWRNORM 正常数据可写
POLLWRBAND 高优先级数据可写 (特殊协议里面使用,当该IO优先数据的缓冲区可写时标志)
POLLRDHUP 该IO半关闭,不会再给主机发数据,但是还可以接受数据
revents(返回位标志): 相比events,还增加了3个标志
POLLERR IO错误时返回
POLLHUP IO已经关闭
POLLNVAL IO未打开
实例: 实现非阻塞请求响应形式的一个服务器,提供一个四则运算的API, 客户端同步(要求响应后才能再次请求)
请求响应格式
- 请求格式
以http协议为例,请求包含请求头和请求体 最简单的请求头(本例实现的)包括:请求类型,请求主体的字节长度,是否保持连接,是否编码加密.
请求类型这种枚举表示可以用二进制位表示,减小数据包大小
编码加密我们使用简单的数据+10表示.解码-10即可
struct RequestData{ //20B
struct RequestHeader; //8B short会对齐
struct RequestBody; //12B
};
struct RequestHeader{
short type; // 按位表示|encode|keep-alive| 两位表示4种情况
int bodylen; //
};
struct RequestBody{
short method; // 按位表示: add|sub|times|divide| 两位表示4种情况
int num1;
int num2;
};
- 响应格式
struct ResponseData{ //16B
struct ResponseHeader; //8B
struct ResponseBody; //8B
};
struct ResponseHeader{
short type; // 直接返回请求相同类型
int bodylen; // 数据大小
};
struct ResponseBody{
short method; // 直接返回请求相同类型
int result;
};
IO连接的处理
这种请求响应格式的服务,需要考虑IO响应后的处理,如keepalive的需要保持连接,否则直接发送完后立即断开连接(close fd,从fds中删除 TCP断开是4挥手过程,可以等响应被读出后再断开)
问题
实际写的时候有几个问题:
-
消息的处理时,IO复用+线程池+线程同步最关键的就是响应集合的结构:如果是哈希表,每个请求必须响应后才能再次请求(因为哈希表key唯一). 如果是队列,可以支持同一个服务器多个请求,但是队列的结构需要设计
-
队列消息最大的问题就是消息不能直接查找,当遍历fd时,需要每次遍历队列找到相应的消息. 当按照消息遍历每次也不能定位fd在数组哪里,并且fd不一定立即可写. 哈希表能够通过fd直接定位消息.
-
队列结构如何设计实现:fd可写和首个消息定位都为O(1)?
-
响应结束要求断开连接怎么实现?因为fds数组idx和fd对应,如果断开连接,fds的对应就变化了,因此哈希表不能用来记录索引和fd对应项.断开连接只能重新移动赋值一个新的fds
程序
实际遇到的bug:
- 事件判断的时候错误使用了|符号confds[0].revents|POLLIN
正确的为&按位与
-
accept没有错误检测
添加 -
循环开始时要清空所有IO的revent=0
server.cpp:接收请求,创建线程处理,将结果添加哈希表项,等响应时写回,并删除表项,同时对连接数组重新处理
#include<iostream>
#include<cstring>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<sys/poll.h>
#include<unordered_map>
#include<semaphore.h>
#include<thread>
#include<climits>
#include<unistd.h>
using namespace std;
#define SLEEP_TIME 1
#define MAX_CONNECT 4
#define MAX_RUNNING 4
#define REQUESTDATA_SIZE 20
#define REQUESTHEADER_SIZE 8
#define REQUESTBODY_SIZE 12
#define RESPONSEDATA_SIZE 16
#define RESPONSEHEADER_SIZE 8
#define RESPONSEBODY_SIZE 8
#define ENCODE 1
#define ALIVE 2
#define ADD 0
#define SUB 1
#define TIMES 2
#define DIVIDE 3
struct RequestHeader{
short type; // 按位表示|encode|keep-alive| 两位表示4种情况
int bodylen; //
};
struct RequestBody{
short method; // 按位表示: add|sub|times|divide| 两位表示4种情况
int num1;
int num2;
};
struct RequestData{ //20B
struct RequestHeader header; //8B short会对齐
struct RequestBody body; //12B
};
struct ResponseHeader{
short type; // 直接返回请求相同类型
int bodylen; // 数据大小
};
struct ResponseBody{
short method; // 直接返回请求相同类型
int result; // return nullptr if error
};
struct ResponseData{ //16B
struct ResponseHeader header; //8B
struct ResponseBody body; //8B
};
int running_threads=0;
unordered_map<int,struct ResponseData*> responses;
sem_t sem_thread;
// 线程处理请求函数
int processing_func(int fd,RequestData reqdata){
struct ResponseHeader resheader;
struct ResponseBody resbody;
resheader.type=reqdata.header.type;
resheader.bodylen=RESPONSEBODY_SIZE;
resbody.method=reqdata.body.method;
resbody.result=INT_MAX;
struct ResponseData resdata;
resdata.header=resheader;
resdata.body=resbody;
int result=0,num1=(reqdata.body).num1,num2=(reqdata.body).num2;
if(resheader.type&ENCODE){
num1-=10;num2-=10;
}
//处理
if(resbody.method&ADD){
result=num1+num2;
}else if(resbody.method&SUB){
result=num1-num2;
}else if(resbody.method&TIMES){
result=num1*num2;
}else if(resbody.method&DIVIDE){
if(num2==0) result=INT_MAX;
else{
result=num1/num2;
}
}else{
perror("error request body.method");
}
resbody.result=result;
//临界区添加数据
sem_wait(&sem_thread);
if(!responses.count(fd)){
responses[fd]=&resdata;
}else{
perror("responses get request from same fd twice");
}
sem_post(&sem_thread);
return 0;
}
int main(){
//信号量同步
sem_init(&sem_thread, 0, 1);
string server_ip="127.0.0.1";
uint16_t server_port=8001;
int server_fd=socket(AF_INET,SOCK_STREAM,0);
if(server_fd==-1){
perror("socket");exit(EXIT_FAILURE);
}
struct sockaddr_in server_addr;
server_addr.sin_family=AF_INET;
server_addr.sin_port=htons(server_port);
inet_pton(AF_INET,server_ip.c_str(),(struct sockaddr*)&server_addr.sin_addr.s_addr);
if(bind(server_fd,(const struct sockaddr*)&server_addr,sizeof(server_addr))==-1){
perror("bind");exit(EXIT_FAILURE);
}
if(listen(server_fd,10)==-1){
perror("listen");exit(EXIT_FAILURE);
}
struct pollfd confds[MAX_CONNECT+1];
confds[0].fd=server_fd;
confds[0].events=POLLIN;
struct RequestHeader reqheader;
struct RequestBody reqbody;
reqheader.type=0;
reqheader.bodylen=REQUESTBODY_SIZE;
reqbody.method=0;
reqbody.num1=0;
reqbody.num2=0;
struct RequestData reqdata;
reqdata.header=reqheader;
reqdata.body=reqbody;
int cur_cons=1;
while(true){
int nevents=poll(confds,cur_cons,0);
if(nevents==-1){
perror("poll");
}else if(nevents==0){
cout<<"no events"<<"\n";
sleep(SLEEP_TIME);
continue;
}else ;
//添加新连接
if((confds[0].revents|POLLIN)&&cur_cons<MAX_CONNECT+1){
int new_fd=accept(server_fd,nullptr,nullptr);
confds[cur_cons].fd=new_fd;
confds[cur_cons].events=POLLIN|POLLOUT;
confds[cur_cons].revents=0;
cur_cons++;
}
//遍历处理IO事件
for(int i=1;i<cur_cons;i++){
//遍历接受数据
if((confds[i].revents&POLLIN)&&(running_threads<MAX_RUNNING)){
ssize_t size=recv(confds[i].fd,&reqdata,REQUESTDATA_SIZE,0);
if(size<=0){
perror("recv");exit(EXIT_FAILURE);
}else{
thread t(processing_func,confds[i].fd,reqdata);
if(t.joinable()) t.join();
cout<<"recv request: request size=="<<size<<"(REQUESTDATA_SIZE=20)\n";
}
}
//遍历发送数据
if(confds[i].revents&POLLOUT){
int fd=confds[i].fd;
if(responses.count(fd)){
ssize_t size=send(confds[i].fd,responses[fd],RESPONSEDATA_SIZE,0);
if(size<=0){
perror("send");
}else{
cout<<"send response: response size=="<<size<<"(RESPONSEDATA_SIZE=16)\n";
if(!(responses[fd]->header.type&ALIVE)){
if(close(fd)==-1){
perror("close");
}
struct pollfd new_confds[MAX_CONNECT+1];
//删除第i个连接
memcpy(new_confds, confds, i*sizeof(struct pollfd));
memcpy(new_confds + i * sizeof(struct pollfd), confds+(i+1)*sizeof(struct pollfd), (MAX_CONNECT-i-1)*sizeof(struct pollfd));
//删除哈希表数据,连接数-1
sem_wait(&sem_thread);
responses.erase(fd);
sem_post(&sem_thread);
cur_cons--;
}
}
}
}
}
}
return 0;
}
client.cpp 单个进程计算请求,阻塞,等待结果返回输出.
#include<sys/socket.h>
#include<arpa/inet.h>
#include<cstring>
#include<unistd.h>
#include<iostream>
#include<thread>
#include<unistd.h>
#include<random>
#include<unordered_map>
using namespace std;
#define REQUESTDATA_SIZE 20
#define REQUESTHEADER_SIZE 8
#define REQUESTBODY_SIZE 12
#define RESPONSEDATA_SIZE 16
#define RESPONSEHEADER_SIZE 8
#define RESPONSEBODY_SIZE 8
#define ENCODE 1
#define ALIVE 2
#define ADD 0
#define SUB 1
#define TIMES 2
#define DIVIDE 3
struct RequestHeader{
short type; // 按位表示|encode|keep-alive| 两位表示4种情况
int bodylen; //
};
struct RequestBody{
short method; // 按位表示: add|sub|times|divide| 两位表示4种情况
int num1;
int num2;
};
struct RequestData{ //20B
struct RequestHeader header; //8B short会对齐
struct RequestBody body; //12B
};
struct ResponseHeader{
short type; // 直接返回请求相同类型
int bodylen; // 数据大小
};
struct ResponseBody{
short method; // 直接返回请求相同类型
int result; // return nullptr if error
};
struct ResponseData{ //16B
struct ResponseHeader header; //8B
struct ResponseBody body; //8B
};
unordered_map<int,string> um={
{ADD,"ADD"},
{SUB,"SUB"},
{TIMES,"TIMES"},
{DIVIDE,"DIVIDE"}
};
random_device rd;
mt19937 gen(rd());
uniform_int_distribution<> randtype(0, 2);
uniform_int_distribution<> randmethod(0, 3);
uniform_int_distribution<> randnum(0, 100);
void postfunc(string id,int server_fd,struct sockaddr_in server_addr,struct RequestData reqdata){
if(connect(server_fd,(sockaddr*)&server_addr,sizeof(server_addr))==-1){
perror("connect");
return ;
}
reqdata.header.type=randtype(gen);
reqdata.body.method=randmethod(gen);
reqdata.body.num1=randnum(gen);
reqdata.body.num2=randnum(gen);
if((reqdata.header.type&ENCODE)){
reqdata.body.num1+=10;
reqdata.body.num2+=10;
}
// send
ssize_t size=send(server_fd,(void*)&reqdata,REQUESTDATA_SIZE,0);
if(size<0){
perror("send");
exit(EXIT_FAILURE);
}else if(size==0){
perror("connect closed");
}else{
cout<<"thread"<<id<<" send success"<<"\n";
}
struct ResponseData resdata;
// recv
size=recv(server_fd,(void*)&resdata,sizeof(resdata),0);
if(size<0){
perror("recv");
}else if(size==0){
perror("connect closed");
}else{
cout<<"thread"<<id<<":"<<"[ENCODE="<<(reqdata.header.type&ENCODE)<<" ALIVE="<<(reqdata.header.type&ALIVE)
<<"num1="<<reqdata.body.num1<<" "<<um[reqdata.body.method]<<" num2="<<reqdata.body.num2<<" Result="<<resdata.body.result<<"\n";
}
}
int main(int argc,char* argv){
string id;
if(argc==0){
cout<<"no id args"<<"\n";
}else{
id=argv[0];
}
//socket
int server_fd=socket(AF_INET,SOCK_STREAM,0);
if(server_fd==-1){
perror("socket");exit(EXIT_FAILURE);
}
struct sockaddr_in server_addr;
string server_ip="127.0.0.1";
uint16_t server_port=8001;
server_addr.sin_family=AF_INET;
server_addr.sin_port=htons(server_port);
inet_pton(AF_INET,server_ip.c_str(),&server_addr.sin_addr.s_addr);
struct RequestHeader reqheader;
struct RequestBody reqbody;
reqheader.type=0;
reqheader.bodylen=REQUESTBODY_SIZE;
reqbody.method=0;
reqbody.num1=0;
reqbody.num2=0;
struct RequestData reqdata;
reqdata.header=reqheader;
reqdata.body=reqbody;
postfunc(id,server_fd,server_addr,reqdata);
return 0;
}
multi_client.cpp 10个进程同时请求
#include <iostream>
#include <cstring>
#include <unistd.h>
#include <sys/wait.h>
#include <vector>
#include<filesystem>
using namespace std;
int child_process(const char* program_name,const char* args[]){
pid_t cpid=fork();
if(cpid==-1){
perror("fork");
exit(EXIT_FAILURE);
}else if(cpid>0){
return cpid;
}else{
execve(program_name,const_cast<char* const*>(args),nullptr);
perror("execve");
exit(EXIT_FAILURE);
}
}
int main()
{
char* program_name=(char*)"./c.cpp";
if(!std::filesystem::exists(program_name)){
cout<<"file not exists\n";
exit(EXIT_FAILURE);
}
vector<pid_t> childs(10);
for(int i=0;i<10;i++){
string process_id="process_"+to_string(i);
const char* args[]={program_name,process_id.c_str(),nullptr};
childs[i]=child_process(program_name,args);
}
pid_t cpid;
while((cpid=wait(nullptr))>0){
cout<<"child."<<cpid<<"terminated\n";
}
return 0;
}