当前位置: 首页 > article >正文

Select多路转接

        在之前的文章中,我们介绍了五种IO模型。曾提到过,效率最高的模型一般是多路转接,即同时有多个socked套接字在等待资源就绪,将各自的等待时间重叠,从而减少等待时间的占比,提高效率。

        那么在Linux系统中就给我们提供了select这样一个接口,他就能完成多路转接,同时他也是最早出现的多路转接解决方案,在一些配置很低的系统中,也能保证多路转接的使用。

一、select函数的原型

参数解释:

参数 nfds 是需要监视的最大的文件描述符值+1。
rdset,wrset,exset 分别对应于需要检测的可读文件描述符的集合,可写文件描述符的集 合及异常文件描述符的集合;(这其实是一个位图)因为就绪事件可分为读就绪,写就绪和异常事件。
参数 timeout 为结构 timeval,用来设置 select()的等待时间。

timeout是什么意思?

NULL:则表示 select()没有 timeout,select 将一直被阻塞,直到某个文件描述符上发生了事件。
0:仅检测描述符集合的状态,然后立即返回,即非阻塞,并不等待外部事件的发生。
特定的时间值:如果在指定的时间段里没有事件发生,select 将超时返回;如果在该时间内有事件发生,则该输入输出型参数返回的是剩余时间。

fd_set的定义:

用来操作fd_set的宏函数:

select函数的返回值以及错误码信息:

•执行成功则返回文件描述词状态已改变的个数
•如果返回 0 代表在描述词状态改变前已超过 timeout 时间,没有返回
•当有错误发生时则返回-1,错误原因存于 errno,此时参数 readfds,writefds,exceptfds 和 timeout 的值变成不可预测。

错误码可以是以下几种:

二、理解select函数的执行过程

        select函数几个参数中,除了第一个参数表示要监视的套接字的文件描述符,剩余的几个参数都是输入输出型参数。即当你调用他的时候,要对该套接字进行什么样子的监视,监视时间多长告诉给内核,函数返回的时候是内核把监视的结果及剩余时间返还给用户。

        那这就出现了一个比较繁琐的问题:每一次调用select的时候,都需要进行参数重置,即输入参数之前要先保存参数信息,方便下一次调用。

while(1)
{
    //在调用select之前,先要保存参数
    fd_set readset;
    FD_SET(fd,&readset);
    select(fd+1,&readset,nullptr,nullptr,nullptr);
    if(FD_ISSET(fd,readset))
    {
        //事件就绪的处理代码
    }
}

补充说明:什么叫做事件就绪?

        如果是读事件,则就绪的情况应该是读缓冲区中有数据存在;如果是写事件,则应该是写缓冲区中有剩余空间可以被使用。想想看,我们在创建一个套接字的时候,读写缓冲区默认就是没有数据的,那么此时是不是读事件天然不就绪,而写事件天然就绪。

        至于异常就绪则和tcp协议中的紧急指针有关。

select的特点:

不过fd_set的大小是可以修改的,至于怎么修改我也不清楚,可以在需要的时候查一下相关资料。

select的缺点:

        注意:这里的遍历fd不仅仅只是说用户在使用方便为了插入位图,判断位图修改等的遍历,而是操作系统为了判断哪些文件描述符是就绪的,而不得不遍历。

三、select的使用示例

        多路转接被研究出来就是为了解决网络通信中套接字的等待问题的,我们直接套用之前封装好的socket套接字来完成select编程。

// 模板方法类
// 套接字的封装---TCP

#pragma once
#include <iostream>
#include <string>
#include <string.h>
// #include"nocopy.hpp"
#include "Log.hpp"
#include "InetAddr.hpp"
#include <memory>

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include <unistd.h>
#include <functional>

enum
{
    SOCKET_ERROR = 1,
    BIND_ERROR,
    LISTEN_ERR
};
using namespace log_ns;
const static int gbacklog = 8;



namespace socket_ns
{
    class Socket;
    //因为这里要用到Socket这个类,但是此时他还没有被定义出来,所以我们在他前面声明一下,告诉编译器他的定义在后面,让我们在这一行可以使用他
    using SockPtr = std::shared_ptr<Socket>;
    
    // 基本套接字:只负责提供接口,他的子类通过让接口做组合,完成固定的逻辑
    class Socket
    {
    public:
        
    public:
        virtual void CreateSocketOrDie() = 0;
        virtual void CreateBindOrDie(uint16_t port) = 0;
        virtual void CreateListenOrDie(int backlog = gbacklog) = 0;
        virtual SockPtr Accepter(InetAddr *clientAddr) = 0;
        virtual bool Connector(const std::string& peerip,uint16_t peerport) = 0;
        virtual int sockfd()=0;
        virtual void Close()=0;

        virtual int Recv(std::string* out)=0;
        virtual int Send(const std::string& in)=0;

    public:
        //父类中只是对虚函数进行组合,以固定的逻辑顺序执行函数组合。
        //但是实际上调用的时候用父类的指针或者引用调用BuildListenSocket这种函数,会走子类重写的函数,这是多态的特性
        void BuildListenSocket(uint16_t port)
        {
            // 1.创建套接字
            CreateSocketOrDie();
            // 2.绑定
            CreateBindOrDie(port);
            // 3.设置监听状态
            CreateListenOrDie();
        }

        bool BuildClientSocket(const std::string& peerip,uint16_t peerport)
        {
            //1.创建套接字
            CreateSocketOrDie();
            //2.连接
            return Connector(peerip,peerport);
        }
        
    };


    class TcpSocket : public Socket
    {
    public:
        TcpSocket() {}
        TcpSocket(int sockfd) : _sockfd(sockfd)
        {}
        ~TcpSocket()
        {}

        void CreateSocketOrDie() override
        {
            // 1. 创建socket
            _sockfd = ::socket(AF_INET, SOCK_STREAM, 0);
            if (_sockfd < 0)
            {
                LOG(FATAL, "socket create error\n");
                exit(SOCKET_ERROR);
            }
            LOG(INFO, "socket create success, sockfd: %d\n", _sockfd); // 3
        }
        void CreateBindOrDie(uint16_t port) override
        {
            struct sockaddr_in local;
            memset(&local, 0, sizeof(local));
            local.sin_family = AF_INET;
            local.sin_port = htons(port);
            local.sin_addr.s_addr = INADDR_ANY;

            // 2. bind sockfd 和 Socket addr
            if (::bind(_sockfd, (struct sockaddr *)&local, sizeof(local)) < 0)
            {
                LOG(FATAL, "bind error\n");
                exit(BIND_ERROR);
            }

            LOG(INFO, "bind success, sockfd: %d\n", _sockfd);
        }
        void CreateListenOrDie(int backlog) override
        {
            // 3. 因为tcp是面向连接的,tcp需要未来不断地能够做到获取连接
            if (::listen(_sockfd, backlog) < 0)
            {
                LOG(FATAL, "listen error\n");
                exit(LISTEN_ERR);
            }
            LOG(INFO, "listen success\n");
        }
        SockPtr Accepter(InetAddr *clientAddr) override
        {
            struct sockaddr_in client;
            socklen_t len = sizeof(client);
            // 4. 获取新连接
            int sockfd = ::accept(_sockfd, (struct sockaddr *)&client, &len);
            if (sockfd < 0)
            {
                LOG(WARNING, "accept error\n");
                return nullptr;
            }
            *clientAddr = InetAddr(client);
            LOG(INFO, "get a new link, client info : %s, sockfd is : %d\n", clientAddr->AddrStr().c_str(), sockfd);
            return std::make_shared<TcpSocket>(sockfd);
        }
        bool Connector(const std::string& peerip,uint16_t peerport) override
        {
            struct sockaddr_in server;
            memset(&server, 0, sizeof(server));
            server.sin_family = AF_INET;
            server.sin_port = htons(peerport);
            ::inet_pton(AF_INET, peerip.c_str(), &server.sin_addr);

            int n = ::connect(_sockfd, (struct sockaddr *)&server, sizeof(server));
            if (n < 0)
            {
                return false;
            }
            return true;
        }

        int sockfd() override
        {
            return _sockfd;
        }

        void Close() override
        {
            if(_sockfd>0)
            {
                ::close(_sockfd);
            }
        }

        int Recv(std::string* out)override
        {
            char inbuffer[1024]; // 当做字符串
            int n = ::recv(_sockfd, inbuffer, sizeof(inbuffer) - 1,0);
            if (n > 0)
            {
                inbuffer[n] = 0;
                //覆盖式不可取,要使用+=
                *out+=inbuffer;
            }
            return n;
        }
        int Send(const std::string& in)override
        {
            return ::send(_sockfd,in.c_str(),in.size(),0);
        }
        
    private:
        int _sockfd; // 可以是listen也可是普通套接字
    };

    // //使用的时候:
    // Socket* sock=new TcpSocket();
    // sock->BuildListenSocket();

    // class UdpSocket:public Socket
    // {

    // };
}

select服务器的代码如下:

#pragma once
#include <iostream>
#include "socket.hpp"
#include <sys/select.h>
#include <memory>
#include "InetAddr.hpp"

using namespace socket_ns;

class SelectServer
{
    const static int gnum = sizeof(fd_set) * 8;
    const static int gdefaultfd = -1;

public:
    SelectServer(uint16_t port)
        : _port(port), _listensock(std::make_unique<TcpSocket>())
    {
    }
    ~SelectServer() {}

    void InitServer()
    {
        _listensock->BuildListenSocket(_port);
        for (int i = 0; i < gnum; i++)
        {
            fd_array[i] = gdefaultfd;
        }
        // 添加linsten套接字到该数组(默认填到第0个位置)
        fd_array[0] = _listensock->sockfd();
    }

    void Accepter()
    {
        // listen套接字得到一个新连接请求-----读事件就绪
        // 因为已经就绪了,就不会被阻塞了,即accept不会再等了
        InetAddr client;
        SockPtr sock = _listensock->Accepter(&client);
        if (sock->sockfd() > 0)
        {
            LOG(DEBUG, "get a new link,client info %s:%d\n", client.Ip().c_str(), client.Port());
            // 处理(但是这里不能直接处理,如果客户端不发消息那我仍然阻塞了)
            // 那么如何得知fd底层的数据是否就绪了呢?仍然是select!这些fd都要由select管理起来
            // 所以select中的文件描述符会越来越多
            // 只需要将新获得的连接套接字放入到fd_array中即可

            bool flag = false;

            // 看辅助数组中有没有空余位置给新fd使用,有则插入
            for (int pos = 1; pos < gnum; pos++)
            {
                if (fd_array[pos] == gdefaultfd)
                {
                    flag = true;
                    fd_array[pos] = sock->sockfd();
                    LOG(INFO, "add %d to fd_array success!\n", sock->sockfd());
                    break;
                }
            }
            // 遍历完成发现是满的
            if (flag == false)
            {
                LOG(WARNING, "Server is Full!\n");
                // 因为处理不了了,所以直接关闭刚刚获得到的连接的套接字
                ::close(sock->sockfd());
            }
        }
    }

    void Handler_IO(int i)
    {
        char buffer[1024];
        // 这里读就不会阻塞了,因为select已经等过了
        ssize_t n = ::recv(fd_array[i], buffer, sizeof(buffer) - 1, 0);
        if (n > 0)
        {
            buffer[n] = 0;
            std::cout << "client say#" << buffer << std::endl;
            // 回复的时候也需要用select找到就绪的,但任何一个sockfd被创建的时候,他的读写缓冲区一定是空的,我们之前关心读事件,是关心
            // 读缓冲区有没有数据,所以读天然就是不就绪的,但是写天然是就绪的
            //  std::string echo_str="[server echo info]";
            //  echo_str+=buffer;
            std::string content = "<html><body></h1>hello world</body></html>";
            std::string echo_str = "HTTP/1.0 200 OK\r\n";
            echo_str += "Content-Type: text/html\r\n";
            echo_str += "Cotent-Length:" + std::to_string(content.size()) + "\r\n\r\n";
            echo_str += content;
            ::send(fd_array[i], echo_str.c_str(), echo_str.size(), 0);
        }
        // 对方把连接关了
        else if (n == 0)
        {
            LOG(INFO, "client quit...\n");
            ::close(fd_array[i]);
            // 把该文件描述符从fd_array中拿出来
            fd_array[i] = gdefaultfd;
        }
        else
        {
            LOG(ERROR, "recv error!\n");
        }
    }

    // 一定有大量的fd就绪,可能是普通sockfd套接字,也可能是linsten套接字
    void HandlerEvent(fd_set &rfds)
    {
        //事件派发-----把一个就绪的sockfd交给合适的函数去执行
        // 遍历查找是哪些就绪了,就绪了则处理
        for (int i = 0; i < gnum; i++)
        {
            // 如果该位置是-1则说明未被填入fd,跳过他
            if (fd_array[i] == gdefaultfd)
            {
                continue;
            }
            // 判断当前位置的fd是否就绪
            if (FD_ISSET(fd_array[i], &rfds))
            {
                // 处理
                // 到底是listenfd还是普通socket呢?
                if (_listensock->sockfd() == fd_array[i])
                {
                    Accepter();
                }
                // 普通文件描述符就绪,正常读
                else
                {
                    Handler_IO(i);
                }
            }
        }
    }

    void Loop()
    {
        while (1)
        {
            // 一直获取客户端的信息,不能调用accept,因为他底层封装的是等+获取连接
            //_listensock->Accepter();

            // 1.对文件描述符集合进行初始化
            fd_set rfds;
            FD_ZERO(&rfds);
            int max_fd = gdefaultfd;

            // 2.把关心的fd,添加到集合中
            for (int i = 0; i < gnum; i++)
            {
                if (fd_array[i] == gdefaultfd)
                {
                    continue;
                }
                else
                {
                    FD_SET(fd_array[i], &rfds);
                }
                // 更新出最大的fd的值
                if (max_fd < fd_array[i])
                {
                    max_fd = fd_array[i];
                }
            }

            // 3.调用select
            struct timeval timeout = {3, 0};
            // 由于select是一个输入输出型参数,所以必须要有一个辅助数组来保存fd信息,用来重置参数
            int n = ::select(max_fd + 1, &rfds, nullptr, nullptr, &timeout);
            switch (n)
            {
            case 0:
                LOG(DEBUG, "time out,%d.%d\n", timeout.tv_sec, timeout.tv_usec);
                break;
            case -1:
                LOG(ERROR, "select error\n");
                break;
            default:
                // 如果事件已经就绪了,但是我没有做处理,则底层会一直通知我,告诉我有文件描述符就绪了,所以下一次调用select就不会再判断了,直接通知
                LOG(INFO, "have event ready,n: %d\n", n);
                // 处理
                HandlerEvent(rfds);
                PrintDebug();
                break;
            }
            sleep(3);
        }
    }

    void PrintDebug()
    {
        std::cout << "fd list:" << std::endl;
        for (int i = 0; i < gnum; i++)
        {
            if (fd_array[i] == gdefaultfd)
            {
                continue;
            }
            std::cout << fd_array[i] << " ";
            std::cout << std::endl;
        }
    }

private:
    uint16_t _port;
    std::unique_ptr<Socket> _listensock;
    int fd_array[gnum];
};

        在这里我们可以看到,为了解决参数重置的问题,我们使用了一个int类型的数组来保存套接字的文件描述符。

                其中,数组的内容存储的是套接字的文件描述符的值,用来表示当前一共有哪些文件描述符正在被进程使用。而我们还有一个fd_set rfds位图,他存储的则是真正的需要被select监视的文件描述符,由于我们这里只考虑了读事件,所以只创建了一个位图。

        当select返回值n>0的时候,说明有事件就绪了,我们则需要判断是监听套接字_listensock就绪还是通过监听套接字获取的通信套接字就绪了,对这两种情况进行区分,并进行任务派发。最终实现多路转接。


http://www.kler.cn/a/599582.html

相关文章:

  • 【机器学习】什么是线性回归?
  • Go常见问题与回答(下)
  • 【HTTP 传输过程中的 cookie】
  • 详细Linux中级知识(不断完善)
  • 高级java每日一道面试题-2025年3月09日-微服务篇[Eureka篇]-说一说Eureka自我保护模式
  • AI日报 - 2025年3月25日
  • 2018扬州大学876农业机械学概论填空名词解释简答
  • leetcode1109. 航班预订统计-medium
  • 批量配置Linux ~/.bash_profile
  • 【蓝桥杯每日一题】3.20
  • 阅读li2019-DOT源码--逐步调试
  • 第八课:Python高级排序算法:分治策略的深度应用与实践
  • 生产部署与多框架支持
  • 从零开始实现 C++ TinyWebServer 项目总览
  • 常见框架漏洞—中间件IIS
  • Filnk并行度和算子链
  • Python前缀和(例题:异或和,求和)
  • Java中static final才是修饰常量的,单独的final并不能修饰常量这样理解对吗?
  • Codeforces Round 1012 (Div. 2)
  • JSONP 漏洞