当前位置: 首页 > article >正文

C++ IO多路复用 poll模型

原文链接:C++ IO多路复用 poll模型

预备知识

poll模型前置需要了解的可以参考:

IO控制:fcntl库:IO控制库

多线程:C++ Linux多线程同步通信-信号量

socket:C++ Linux多进程Socket通信

select模型:C++ IO多路复用 select模型

poll模型

特性
原理

poll是对select的改进,主要与select进行比较

优点
  1. IO无数量限制:select用位图检测IO,存在一个MAX_SIZE,但是poll引入链表解决上限问题
  2. poll将监听集合与响应集合分别设置,不需要每次重置集合
缺点
  1. 没有改变select的线性遍历的轮询方法,连接数越多,耗时越长
库函数<sys/poll>
# include <poll>
#include <errno.h> 

int poll ( struct pollfd * fdarray, unsigned int nfds, int timeout);
    fdarray: 监听的IO数组
    nfds: fds数组的长度
    timeout: 0(非阻塞) -1(阻塞) >0(时限)

return >0(存在事件的IO数量) 0(超出时限无IO事件) -1(错误)
    错误的errno类型:EBADF(存在无效IO),EFAULT(fds越界),EINTR(收到中断信号),EINVAL(fds超过上限),ENOMEM(内存不足)


struct pollfd {
    int fd;         	// 需要监听的IO
    short events;       // 等待的事件类型
    short revents;      // 实际发生的事件类型
} ; 

events(监听位标志):
    POLLIN 所有可读
    POLLRDNORM 正常数据可读
    POLLRDBAND 高优先级数据可读 (如缓冲区需要优先读取的数据)
    POLLPRI 紧急数据可读 (要求立即读取,如TCP外带指针的控制信息) 四种标志区分用于对读事件有更高区分要求的场景
    POLLOUT 所有可写
    POLLWRNORM 正常数据可写
    POLLWRBAND 高优先级数据可写 (特殊协议里面使用,当该IO优先数据的缓冲区可写时标志) 
    POLLRDHUP 该IO半关闭,不会再给主机发数据,但是还可以接受数据
revents(返回位标志): 相比events,还增加了3个标志
    POLLERR IO错误时返回
    POLLHUP IO已经关闭
    POLLNVAL IO未打开


实例: 实现非阻塞请求响应形式的一个服务器,提供一个四则运算的API, 客户端同步(要求响应后才能再次请求)
请求响应格式
  1. 请求格式
    以http协议为例,请求包含请求头和请求体 最简单的请求头(本例实现的)包括:请求类型,请求主体的字节长度,是否保持连接,是否编码加密.

请求类型这种枚举表示可以用二进制位表示,减小数据包大小

编码加密我们使用简单的数据+10表示.解码-10即可

struct  RequestData{ //20B
    struct  RequestHeader; //8B short会对齐 
    struct  RequestBody; //12B
};

struct  RequestHeader{
    short  type; // 按位表示|encode|keep-alive| 两位表示4种情况
    int bodylen; //
};

struct  RequestBody{
    short method; // 按位表示: add|sub|times|divide| 两位表示4种情况
    int num1; 
    int num2;
};
  1. 响应格式
struct  ResponseData{ //16B
    struct  ResponseHeader; //8B
    struct  ResponseBody; //8B
};

struct  ResponseHeader{
    short  type; // 直接返回请求相同类型
    int bodylen; // 数据大小
};

struct  ResponseBody{
    short method; // 直接返回请求相同类型
    int result; 
};
IO连接的处理

这种请求响应格式的服务,需要考虑IO响应后的处理,如keepalive的需要保持连接,否则直接发送完后立即断开连接(close fd,从fds中删除 TCP断开是4挥手过程,可以等响应被读出后再断开)

问题

实际写的时候有几个问题:

  1. 消息的处理时,IO复用+线程池+线程同步最关键的就是响应集合的结构:如果是哈希表,每个请求必须响应后才能再次请求(因为哈希表key唯一). 如果是队列,可以支持同一个服务器多个请求,但是队列的结构需要设计

  2. 队列消息最大的问题就是消息不能直接查找,当遍历fd时,需要每次遍历队列找到相应的消息. 当按照消息遍历每次也不能定位fd在数组哪里,并且fd不一定立即可写. 哈希表能够通过fd直接定位消息.

  3. 队列结构如何设计实现:fd可写和首个消息定位都为O(1)?

  4. 响应结束要求断开连接怎么实现?因为fds数组idx和fd对应,如果断开连接,fds的对应就变化了,因此哈希表不能用来记录索引和fd对应项.断开连接只能重新移动赋值一个新的fds

程序

实际遇到的bug:

  1. 事件判断的时候错误使用了|符号confds[0].revents|POLLIN

正确的为&按位与

  1. accept没有错误检测
    添加

  2. 循环开始时要清空所有IO的revent=0

server.cpp:接收请求,创建线程处理,将结果添加哈希表项,等响应时写回,并删除表项,同时对连接数组重新处理
#include<iostream>
#include<cstring>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<sys/poll.h>
#include<unordered_map>
#include<semaphore.h>
#include<thread>
#include<climits>
#include<unistd.h>
using namespace std;

#define SLEEP_TIME 1
#define MAX_CONNECT 4
#define MAX_RUNNING 4


#define REQUESTDATA_SIZE 20
#define REQUESTHEADER_SIZE 8
#define REQUESTBODY_SIZE 12
#define RESPONSEDATA_SIZE 16
#define RESPONSEHEADER_SIZE 8
#define RESPONSEBODY_SIZE 8

#define ENCODE 1
#define ALIVE 2
#define ADD 0
#define SUB 1
#define TIMES 2
#define DIVIDE 3


struct  RequestHeader{
    short  type; // 按位表示|encode|keep-alive| 两位表示4种情况
    int bodylen; //
};
struct  RequestBody{
    short method; // 按位表示: add|sub|times|divide| 两位表示4种情况
    int num1; 
    int num2;
};
struct  RequestData{ //20B
    struct  RequestHeader header; //8B short会对齐 
    struct  RequestBody body; //12B
};


struct  ResponseHeader{
    short  type; // 直接返回请求相同类型
    int bodylen; // 数据大小
};
struct  ResponseBody{
    short method; // 直接返回请求相同类型
    int result; // return nullptr if error 
};
struct  ResponseData{ //16B
    struct  ResponseHeader header; //8B
    struct  ResponseBody body; //8B
};




int running_threads=0;

unordered_map<int,struct ResponseData*> responses;

sem_t sem_thread;

// 线程处理请求函数
int processing_func(int fd,RequestData reqdata){
    struct ResponseHeader resheader;
    struct ResponseBody resbody;
    resheader.type=reqdata.header.type;
    resheader.bodylen=RESPONSEBODY_SIZE;
    resbody.method=reqdata.body.method;
    resbody.result=INT_MAX;
    
    struct ResponseData resdata;
    resdata.header=resheader;
    resdata.body=resbody;
    
    int result=0,num1=(reqdata.body).num1,num2=(reqdata.body).num2;
    if(resheader.type&ENCODE){
        num1-=10;num2-=10;
    }
    
    //处理
    if(resbody.method&ADD){
        result=num1+num2;
    }else if(resbody.method&SUB){
        result=num1-num2;
    }else if(resbody.method&TIMES){
        result=num1*num2;
    }else if(resbody.method&DIVIDE){
        if(num2==0) result=INT_MAX;
        else{
            result=num1/num2;
        }
    }else{
        perror("error request body.method"); 
    }
    
    resbody.result=result;
    
    //临界区添加数据
    sem_wait(&sem_thread);
    if(!responses.count(fd)){
        responses[fd]=&resdata;
    }else{
        perror("responses get request from same fd twice");
    }
    sem_post(&sem_thread);
    return 0;
}


int main(){
    //信号量同步
    sem_init(&sem_thread, 0, 1);
    
    string server_ip="127.0.0.1";
    uint16_t server_port=8001;
    
    int server_fd=socket(AF_INET,SOCK_STREAM,0);
    if(server_fd==-1){
        perror("socket");exit(EXIT_FAILURE);
    }
    
    struct sockaddr_in server_addr;
    server_addr.sin_family=AF_INET;
    server_addr.sin_port=htons(server_port);
    inet_pton(AF_INET,server_ip.c_str(),(struct sockaddr*)&server_addr.sin_addr.s_addr);
    
    if(bind(server_fd,(const struct sockaddr*)&server_addr,sizeof(server_addr))==-1){
        perror("bind");exit(EXIT_FAILURE);
    }
    
    if(listen(server_fd,10)==-1){
        perror("listen");exit(EXIT_FAILURE);
    }
    
    struct pollfd confds[MAX_CONNECT+1];
    confds[0].fd=server_fd;
    confds[0].events=POLLIN;
    
    struct RequestHeader reqheader;
    struct RequestBody reqbody;
    reqheader.type=0;
    reqheader.bodylen=REQUESTBODY_SIZE;
    reqbody.method=0;
    reqbody.num1=0;
    reqbody.num2=0;
    
    struct RequestData reqdata;
    reqdata.header=reqheader;
    reqdata.body=reqbody;
    

    

    
    int cur_cons=1;
    while(true){
        int nevents=poll(confds,cur_cons,0);
        if(nevents==-1){
            perror("poll");
        }else if(nevents==0){
            cout<<"no events"<<"\n";
            sleep(SLEEP_TIME);
            continue;
        }else ;
        
        //添加新连接
        if((confds[0].revents|POLLIN)&&cur_cons<MAX_CONNECT+1){
            int new_fd=accept(server_fd,nullptr,nullptr);
            confds[cur_cons].fd=new_fd;
            confds[cur_cons].events=POLLIN|POLLOUT;
            confds[cur_cons].revents=0;
            cur_cons++;
        }
        
        //遍历处理IO事件
        for(int i=1;i<cur_cons;i++){
            //遍历接受数据
            if((confds[i].revents&POLLIN)&&(running_threads<MAX_RUNNING)){
                ssize_t size=recv(confds[i].fd,&reqdata,REQUESTDATA_SIZE,0);
                if(size<=0){
                    perror("recv");exit(EXIT_FAILURE);
                }else{
                    thread t(processing_func,confds[i].fd,reqdata);
                    if(t.joinable()) t.join();
                    cout<<"recv request: request size=="<<size<<"(REQUESTDATA_SIZE=20)\n";
                }
            }
            
            //遍历发送数据
            if(confds[i].revents&POLLOUT){
                int fd=confds[i].fd;
                if(responses.count(fd)){
                    ssize_t size=send(confds[i].fd,responses[fd],RESPONSEDATA_SIZE,0);
                    if(size<=0){
                        perror("send");
                    }else{
                        cout<<"send response: response size=="<<size<<"(RESPONSEDATA_SIZE=16)\n";
                        if(!(responses[fd]->header.type&ALIVE)){
                            if(close(fd)==-1){
                                perror("close");
                            }
                            struct pollfd new_confds[MAX_CONNECT+1];
                            //删除第i个连接
                            memcpy(new_confds, confds, i*sizeof(struct pollfd));
                            memcpy(new_confds + i * sizeof(struct pollfd), confds+(i+1)*sizeof(struct pollfd), (MAX_CONNECT-i-1)*sizeof(struct pollfd));
                        
                            //删除哈希表数据,连接数-1
                            sem_wait(&sem_thread);
                            responses.erase(fd);
                            sem_post(&sem_thread);
                            cur_cons--;
                        }
                    }
                }
            }
        }
    }
    return 0;
}
client.cpp 单个进程计算请求,阻塞,等待结果返回输出.
#include<sys/socket.h>
#include<arpa/inet.h>
#include<cstring>
#include<unistd.h>
#include<iostream>
#include<thread>
#include<unistd.h>
#include<random>
#include<unordered_map>
using namespace std;

#define REQUESTDATA_SIZE 20
#define REQUESTHEADER_SIZE 8
#define REQUESTBODY_SIZE 12
#define RESPONSEDATA_SIZE 16
#define RESPONSEHEADER_SIZE 8
#define RESPONSEBODY_SIZE 8

#define ENCODE 1
#define ALIVE 2
#define ADD 0
#define SUB 1
#define TIMES 2
#define DIVIDE 3


struct  RequestHeader{
    short  type; // 按位表示|encode|keep-alive| 两位表示4种情况
    int bodylen; //
};
struct  RequestBody{
    short method; // 按位表示: add|sub|times|divide| 两位表示4种情况
    int num1; 
    int num2;
};
struct  RequestData{ //20B
    struct  RequestHeader header; //8B short会对齐 
    struct  RequestBody body; //12B
};


struct  ResponseHeader{
    short  type; // 直接返回请求相同类型
    int bodylen; // 数据大小
};
struct  ResponseBody{
    short method; // 直接返回请求相同类型
    int result; // return nullptr if error 
};
struct  ResponseData{ //16B
    struct  ResponseHeader header; //8B
    struct  ResponseBody body; //8B
};

unordered_map<int,string> um={
    {ADD,"ADD"},
    {SUB,"SUB"},
    {TIMES,"TIMES"},
    {DIVIDE,"DIVIDE"}
};



random_device rd;
mt19937 gen(rd());
uniform_int_distribution<> randtype(0, 2);
uniform_int_distribution<> randmethod(0, 3);
uniform_int_distribution<> randnum(0, 100);


void postfunc(string id,int server_fd,struct sockaddr_in server_addr,struct RequestData reqdata){
    
    if(connect(server_fd,(sockaddr*)&server_addr,sizeof(server_addr))==-1){
        perror("connect");
        return ;
    }
    
    reqdata.header.type=randtype(gen);
    reqdata.body.method=randmethod(gen);
    reqdata.body.num1=randnum(gen);
    reqdata.body.num2=randnum(gen);
    
    
    if((reqdata.header.type&ENCODE)){
        reqdata.body.num1+=10;
        reqdata.body.num2+=10;
    }
    
    // send
    ssize_t size=send(server_fd,(void*)&reqdata,REQUESTDATA_SIZE,0);
    if(size<0){
        perror("send");
        exit(EXIT_FAILURE);
    }else if(size==0){
        perror("connect closed");
    }else{
        cout<<"thread"<<id<<" send success"<<"\n";
    }
    
    
    struct ResponseData resdata;
    
    
    // recv
    size=recv(server_fd,(void*)&resdata,sizeof(resdata),0);
    if(size<0){
        perror("recv");
    }else if(size==0){
        perror("connect closed");
    }else{
        cout<<"thread"<<id<<":"<<"[ENCODE="<<(reqdata.header.type&ENCODE)<<" ALIVE="<<(reqdata.header.type&ALIVE)
        <<"num1="<<reqdata.body.num1<<" "<<um[reqdata.body.method]<<" num2="<<reqdata.body.num2<<" Result="<<resdata.body.result<<"\n";
    }
    
}



int main(int argc,char* argv){
    
    string id;
    if(argc==0){
        cout<<"no id args"<<"\n";
    }else{
        id=argv[0];
    }

    //socket 
    int server_fd=socket(AF_INET,SOCK_STREAM,0);
    if(server_fd==-1){
        perror("socket");exit(EXIT_FAILURE);
    }
    
    struct sockaddr_in server_addr;
    string server_ip="127.0.0.1";
    uint16_t server_port=8001;
    
    server_addr.sin_family=AF_INET;
    server_addr.sin_port=htons(server_port);
    inet_pton(AF_INET,server_ip.c_str(),&server_addr.sin_addr.s_addr);
    
    
    
    struct RequestHeader reqheader;
    struct RequestBody reqbody;
    reqheader.type=0;
    reqheader.bodylen=REQUESTBODY_SIZE;
    reqbody.method=0;
    reqbody.num1=0;
    reqbody.num2=0;
    
    struct RequestData reqdata;
    reqdata.header=reqheader;
    reqdata.body=reqbody;
    
    
    postfunc(id,server_fd,server_addr,reqdata);
    
    return 0;
}
multi_client.cpp 10个进程同时请求


#include <iostream>
#include <cstring>
#include <unistd.h>
#include <sys/wait.h>
#include <vector>
#include<filesystem>
using namespace std;

int child_process(const char* program_name,const char* args[]){
    pid_t cpid=fork();
    if(cpid==-1){
        perror("fork");
        exit(EXIT_FAILURE);
    }else if(cpid>0){
        return cpid;
    }else{
        execve(program_name,const_cast<char* const*>(args),nullptr);
        perror("execve");
        exit(EXIT_FAILURE);
    }
    
}

int main()
{
    char* program_name=(char*)"./c.cpp";
    if(!std::filesystem::exists(program_name)){
        cout<<"file not exists\n";
        exit(EXIT_FAILURE);
    }
    
    vector<pid_t> childs(10);
    
    for(int i=0;i<10;i++){
        string process_id="process_"+to_string(i);
        const char* args[]={program_name,process_id.c_str(),nullptr};
        childs[i]=child_process(program_name,args);
    }
    
    pid_t cpid;
    while((cpid=wait(nullptr))>0){
        cout<<"child."<<cpid<<"terminated\n";
    }
    
    return 0;
}


http://www.kler.cn/a/353868.html

相关文章:

  • 【马来西亚理工大学主办,ACM出版】2025年大数据、通信技术与计算机应用国际学术会议(BDCTA 2025)
  • (长期更新)《零基础入门 ArcGIS(ArcScene) 》实验七----城市三维建模与分析(超超超详细!!!)
  • vscode通过ssh连接服务器实现免密登录
  • 打开idea开发软件停留在加载弹出框页面进不去
  • 【Unity笔记】资源包导入后是洋红色(粉色)怎么办?
  • 如何轻松反转C# List<T>中的元素顺序
  • 前端学习-CSS的三大特性(十七)
  • python 爬虫 入门 一、基础工具
  • 分布式数据库环境(HBase分布式数据库)的搭建与配置
  • [算法日常] 逆序对
  • 音乐播放器项目专栏介绍​
  • Linux的kafka安装部署
  • 自动化测试与敏捷开发的重要性
  • docker (desktopcompose) download
  • KVM 建黑群晖
  • HarmonyOS NEXT 应用开发实战(六、组件导航Navigation使用详解)
  • JavaWeb Servlet--09深入:注册系统03--删除用户业务
  • 使用WordPress从零开始搭建一个本地网站实现远程访问
  • [Python]将pdf文件转为svg
  • VMware16的安装及VMware配置Linux虚拟机
  • Windows模拟电脑假死之键盘鼠标无响应
  • 【Java后端】Spring vs SpringBoot
  • Spring Boot比Spring多哪些注解
  • extern与static
  • sd卡挂载返回FR_NOT_READY等错误
  • 我谈Sobel算子与高斯一阶微分的关系