Select多路转接
在之前的文章中,我们介绍了五种IO模型。曾提到过,效率最高的模型一般是多路转接,即同时有多个socked套接字在等待资源就绪,将各自的等待时间重叠,从而减少等待时间的占比,提高效率。
那么在Linux系统中就给我们提供了select这样一个接口,他就能完成多路转接,同时他也是最早出现的多路转接解决方案,在一些配置很低的系统中,也能保证多路转接的使用。
一、select函数的原型
参数解释:
•参数 nfds 是需要监视的最大的文件描述符值+1。
•rdset,wrset,exset 分别对应于需要检测的可读文件描述符的集合,可写文件描述符的集 合及异常文件描述符的集合;(这其实是一个位图)因为就绪事件可分为读就绪,写就绪和异常事件。
•参数 timeout 为结构 timeval,用来设置 select()的等待时间。
timeout是什么意思?
•NULL:则表示 select()没有 timeout,select 将一直被阻塞,直到某个文件描述符上发生了事件。
•0:仅检测描述符集合的状态,然后立即返回,即非阻塞,并不等待外部事件的发生。
•特定的时间值:如果在指定的时间段里没有事件发生,select 将超时返回;如果在该时间内有事件发生,则该输入输出型参数返回的是剩余时间。
fd_set的定义:
用来操作fd_set的宏函数:
select函数的返回值以及错误码信息:
•执行成功则返回文件描述词状态已改变的个数
•如果返回 0 代表在描述词状态改变前已超过 timeout 时间,没有返回
•当有错误发生时则返回-1,错误原因存于 errno,此时参数 readfds,writefds,exceptfds 和 timeout 的值变成不可预测。
错误码可以是以下几种:
二、理解select函数的执行过程
select函数几个参数中,除了第一个参数表示要监视的套接字的文件描述符,剩余的几个参数都是输入输出型参数。即当你调用他的时候,要对该套接字进行什么样子的监视,监视时间多长告诉给内核,函数返回的时候是内核把监视的结果及剩余时间返还给用户。
那这就出现了一个比较繁琐的问题:每一次调用select的时候,都需要进行参数重置,即输入参数之前要先保存参数信息,方便下一次调用。
while(1)
{
//在调用select之前,先要保存参数
fd_set readset;
FD_SET(fd,&readset);
select(fd+1,&readset,nullptr,nullptr,nullptr);
if(FD_ISSET(fd,readset))
{
//事件就绪的处理代码
}
}
补充说明:什么叫做事件就绪?
如果是读事件,则就绪的情况应该是读缓冲区中有数据存在;如果是写事件,则应该是写缓冲区中有剩余空间可以被使用。想想看,我们在创建一个套接字的时候,读写缓冲区默认就是没有数据的,那么此时是不是读事件天然不就绪,而写事件天然就绪。
至于异常就绪则和tcp协议中的紧急指针有关。
select的特点:
不过fd_set的大小是可以修改的,至于怎么修改我也不清楚,可以在需要的时候查一下相关资料。
select的缺点:
注意:这里的遍历fd不仅仅只是说用户在使用方便为了插入位图,判断位图修改等的遍历,而是操作系统为了判断哪些文件描述符是就绪的,而不得不遍历。
三、select的使用示例
多路转接被研究出来就是为了解决网络通信中套接字的等待问题的,我们直接套用之前封装好的socket套接字来完成select编程。
// 模板方法类
// 套接字的封装---TCP
#pragma once
#include <iostream>
#include <string>
#include <string.h>
// #include"nocopy.hpp"
#include "Log.hpp"
#include "InetAddr.hpp"
#include <memory>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <functional>
enum
{
SOCKET_ERROR = 1,
BIND_ERROR,
LISTEN_ERR
};
using namespace log_ns;
const static int gbacklog = 8;
namespace socket_ns
{
class Socket;
//因为这里要用到Socket这个类,但是此时他还没有被定义出来,所以我们在他前面声明一下,告诉编译器他的定义在后面,让我们在这一行可以使用他
using SockPtr = std::shared_ptr<Socket>;
// 基本套接字:只负责提供接口,他的子类通过让接口做组合,完成固定的逻辑
class Socket
{
public:
public:
virtual void CreateSocketOrDie() = 0;
virtual void CreateBindOrDie(uint16_t port) = 0;
virtual void CreateListenOrDie(int backlog = gbacklog) = 0;
virtual SockPtr Accepter(InetAddr *clientAddr) = 0;
virtual bool Connector(const std::string& peerip,uint16_t peerport) = 0;
virtual int sockfd()=0;
virtual void Close()=0;
virtual int Recv(std::string* out)=0;
virtual int Send(const std::string& in)=0;
public:
//父类中只是对虚函数进行组合,以固定的逻辑顺序执行函数组合。
//但是实际上调用的时候用父类的指针或者引用调用BuildListenSocket这种函数,会走子类重写的函数,这是多态的特性
void BuildListenSocket(uint16_t port)
{
// 1.创建套接字
CreateSocketOrDie();
// 2.绑定
CreateBindOrDie(port);
// 3.设置监听状态
CreateListenOrDie();
}
bool BuildClientSocket(const std::string& peerip,uint16_t peerport)
{
//1.创建套接字
CreateSocketOrDie();
//2.连接
return Connector(peerip,peerport);
}
};
class TcpSocket : public Socket
{
public:
TcpSocket() {}
TcpSocket(int sockfd) : _sockfd(sockfd)
{}
~TcpSocket()
{}
void CreateSocketOrDie() override
{
// 1. 创建socket
_sockfd = ::socket(AF_INET, SOCK_STREAM, 0);
if (_sockfd < 0)
{
LOG(FATAL, "socket create error\n");
exit(SOCKET_ERROR);
}
LOG(INFO, "socket create success, sockfd: %d\n", _sockfd); // 3
}
void CreateBindOrDie(uint16_t port) override
{
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(port);
local.sin_addr.s_addr = INADDR_ANY;
// 2. bind sockfd 和 Socket addr
if (::bind(_sockfd, (struct sockaddr *)&local, sizeof(local)) < 0)
{
LOG(FATAL, "bind error\n");
exit(BIND_ERROR);
}
LOG(INFO, "bind success, sockfd: %d\n", _sockfd);
}
void CreateListenOrDie(int backlog) override
{
// 3. 因为tcp是面向连接的,tcp需要未来不断地能够做到获取连接
if (::listen(_sockfd, backlog) < 0)
{
LOG(FATAL, "listen error\n");
exit(LISTEN_ERR);
}
LOG(INFO, "listen success\n");
}
SockPtr Accepter(InetAddr *clientAddr) override
{
struct sockaddr_in client;
socklen_t len = sizeof(client);
// 4. 获取新连接
int sockfd = ::accept(_sockfd, (struct sockaddr *)&client, &len);
if (sockfd < 0)
{
LOG(WARNING, "accept error\n");
return nullptr;
}
*clientAddr = InetAddr(client);
LOG(INFO, "get a new link, client info : %s, sockfd is : %d\n", clientAddr->AddrStr().c_str(), sockfd);
return std::make_shared<TcpSocket>(sockfd);
}
bool Connector(const std::string& peerip,uint16_t peerport) override
{
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(peerport);
::inet_pton(AF_INET, peerip.c_str(), &server.sin_addr);
int n = ::connect(_sockfd, (struct sockaddr *)&server, sizeof(server));
if (n < 0)
{
return false;
}
return true;
}
int sockfd() override
{
return _sockfd;
}
void Close() override
{
if(_sockfd>0)
{
::close(_sockfd);
}
}
int Recv(std::string* out)override
{
char inbuffer[1024]; // 当做字符串
int n = ::recv(_sockfd, inbuffer, sizeof(inbuffer) - 1,0);
if (n > 0)
{
inbuffer[n] = 0;
//覆盖式不可取,要使用+=
*out+=inbuffer;
}
return n;
}
int Send(const std::string& in)override
{
return ::send(_sockfd,in.c_str(),in.size(),0);
}
private:
int _sockfd; // 可以是listen也可是普通套接字
};
// //使用的时候:
// Socket* sock=new TcpSocket();
// sock->BuildListenSocket();
// class UdpSocket:public Socket
// {
// };
}
select服务器的代码如下:
#pragma once
#include <iostream>
#include "socket.hpp"
#include <sys/select.h>
#include <memory>
#include "InetAddr.hpp"
using namespace socket_ns;
class SelectServer
{
const static int gnum = sizeof(fd_set) * 8;
const static int gdefaultfd = -1;
public:
SelectServer(uint16_t port)
: _port(port), _listensock(std::make_unique<TcpSocket>())
{
}
~SelectServer() {}
void InitServer()
{
_listensock->BuildListenSocket(_port);
for (int i = 0; i < gnum; i++)
{
fd_array[i] = gdefaultfd;
}
// 添加linsten套接字到该数组(默认填到第0个位置)
fd_array[0] = _listensock->sockfd();
}
void Accepter()
{
// listen套接字得到一个新连接请求-----读事件就绪
// 因为已经就绪了,就不会被阻塞了,即accept不会再等了
InetAddr client;
SockPtr sock = _listensock->Accepter(&client);
if (sock->sockfd() > 0)
{
LOG(DEBUG, "get a new link,client info %s:%d\n", client.Ip().c_str(), client.Port());
// 处理(但是这里不能直接处理,如果客户端不发消息那我仍然阻塞了)
// 那么如何得知fd底层的数据是否就绪了呢?仍然是select!这些fd都要由select管理起来
// 所以select中的文件描述符会越来越多
// 只需要将新获得的连接套接字放入到fd_array中即可
bool flag = false;
// 看辅助数组中有没有空余位置给新fd使用,有则插入
for (int pos = 1; pos < gnum; pos++)
{
if (fd_array[pos] == gdefaultfd)
{
flag = true;
fd_array[pos] = sock->sockfd();
LOG(INFO, "add %d to fd_array success!\n", sock->sockfd());
break;
}
}
// 遍历完成发现是满的
if (flag == false)
{
LOG(WARNING, "Server is Full!\n");
// 因为处理不了了,所以直接关闭刚刚获得到的连接的套接字
::close(sock->sockfd());
}
}
}
void Handler_IO(int i)
{
char buffer[1024];
// 这里读就不会阻塞了,因为select已经等过了
ssize_t n = ::recv(fd_array[i], buffer, sizeof(buffer) - 1, 0);
if (n > 0)
{
buffer[n] = 0;
std::cout << "client say#" << buffer << std::endl;
// 回复的时候也需要用select找到就绪的,但任何一个sockfd被创建的时候,他的读写缓冲区一定是空的,我们之前关心读事件,是关心
// 读缓冲区有没有数据,所以读天然就是不就绪的,但是写天然是就绪的
// std::string echo_str="[server echo info]";
// echo_str+=buffer;
std::string content = "<html><body></h1>hello world</body></html>";
std::string echo_str = "HTTP/1.0 200 OK\r\n";
echo_str += "Content-Type: text/html\r\n";
echo_str += "Cotent-Length:" + std::to_string(content.size()) + "\r\n\r\n";
echo_str += content;
::send(fd_array[i], echo_str.c_str(), echo_str.size(), 0);
}
// 对方把连接关了
else if (n == 0)
{
LOG(INFO, "client quit...\n");
::close(fd_array[i]);
// 把该文件描述符从fd_array中拿出来
fd_array[i] = gdefaultfd;
}
else
{
LOG(ERROR, "recv error!\n");
}
}
// 一定有大量的fd就绪,可能是普通sockfd套接字,也可能是linsten套接字
void HandlerEvent(fd_set &rfds)
{
//事件派发-----把一个就绪的sockfd交给合适的函数去执行
// 遍历查找是哪些就绪了,就绪了则处理
for (int i = 0; i < gnum; i++)
{
// 如果该位置是-1则说明未被填入fd,跳过他
if (fd_array[i] == gdefaultfd)
{
continue;
}
// 判断当前位置的fd是否就绪
if (FD_ISSET(fd_array[i], &rfds))
{
// 处理
// 到底是listenfd还是普通socket呢?
if (_listensock->sockfd() == fd_array[i])
{
Accepter();
}
// 普通文件描述符就绪,正常读
else
{
Handler_IO(i);
}
}
}
}
void Loop()
{
while (1)
{
// 一直获取客户端的信息,不能调用accept,因为他底层封装的是等+获取连接
//_listensock->Accepter();
// 1.对文件描述符集合进行初始化
fd_set rfds;
FD_ZERO(&rfds);
int max_fd = gdefaultfd;
// 2.把关心的fd,添加到集合中
for (int i = 0; i < gnum; i++)
{
if (fd_array[i] == gdefaultfd)
{
continue;
}
else
{
FD_SET(fd_array[i], &rfds);
}
// 更新出最大的fd的值
if (max_fd < fd_array[i])
{
max_fd = fd_array[i];
}
}
// 3.调用select
struct timeval timeout = {3, 0};
// 由于select是一个输入输出型参数,所以必须要有一个辅助数组来保存fd信息,用来重置参数
int n = ::select(max_fd + 1, &rfds, nullptr, nullptr, &timeout);
switch (n)
{
case 0:
LOG(DEBUG, "time out,%d.%d\n", timeout.tv_sec, timeout.tv_usec);
break;
case -1:
LOG(ERROR, "select error\n");
break;
default:
// 如果事件已经就绪了,但是我没有做处理,则底层会一直通知我,告诉我有文件描述符就绪了,所以下一次调用select就不会再判断了,直接通知
LOG(INFO, "have event ready,n: %d\n", n);
// 处理
HandlerEvent(rfds);
PrintDebug();
break;
}
sleep(3);
}
}
void PrintDebug()
{
std::cout << "fd list:" << std::endl;
for (int i = 0; i < gnum; i++)
{
if (fd_array[i] == gdefaultfd)
{
continue;
}
std::cout << fd_array[i] << " ";
std::cout << std::endl;
}
}
private:
uint16_t _port;
std::unique_ptr<Socket> _listensock;
int fd_array[gnum];
};
在这里我们可以看到,为了解决参数重置的问题,我们使用了一个int类型的数组来保存套接字的文件描述符。
其中,数组的内容存储的是套接字的文件描述符的值,用来表示当前一共有哪些文件描述符正在被进程使用。而我们还有一个fd_set rfds位图,他存储的则是真正的需要被select监视的文件描述符,由于我们这里只考虑了读事件,所以只创建了一个位图。
当select返回值n>0的时候,说明有事件就绪了,我们则需要判断是监听套接字_listensock就绪还是通过监听套接字获取的通信套接字就绪了,对这两种情况进行区分,并进行任务派发。最终实现多路转接。