13.3 Linux_网络编程_多路复用I/O接入多客户端
select函数
函数声明
int select(int nfds,
fd_set *readfds,fd_set *writefds,fd_set *exceptfds,
struct timeval *timeout);
返回值:成功返回有数据的文件描述符,失败返回-1
nfds:最大的文件描述符,值为readfds,writefds,exceptfds中最大的文件描述符+1
readfds:可读集合,常用
writefds:可写集合,不常用
exceptfds:异常集合,不常用
timeout:超时时间。NULL永久阻塞,0非阻塞
struct timeval结构体:
struct timeval {
time_t tv_sec; //s
suseconds_t tv_usec; //us
};
fd_set结构体:
fd_set就是一个整型数,每一个bit位代表一个文件描述符fd。
当fd可操作时,对应的位就会自动置1,读取之后需要手动清零。操作函数如下:
//将文件描述符从集合中删除
void FD_CLR(int fd, fd_set *set);
//查看文件描述符是否存在于集合当中
int FD_ISSET(int fd, fd_set *set);
//添加文件描述符,添加后就可以检测该文件描述符
void FD_SET(int fd, fd_set *set);
//初始化集合
void FD_ZERO(fd_set *set);
示例代码
代码功能:
这是一个服务器端的socket,使用select以阻塞的方式去监听多个文件描述符的数据。当监听到数据后,代表数据已经到达,这时不论是调用accept还是read都是不会阻塞的,即:不会有阻塞等待数据到来的过程,会有一小段阻塞拷贝数据的过程。
当监听到的文件描述符是fd时,说明是有客户端需要接入,这时就执行accept相关操作;当监听到不为fd时,这代表是客户端发来数据,需要读取客户端的数据,那么使用循环去轮询各个客户端的套接字文件描述符,从而与客户端进行通信。
代码实现:
server.c代码:
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/select.h>
#define BACKLOG 5 //最大接入客户端数量
int socket_init(char** argv);
int main(int argc ,char** argv){
int fd;
//判断参数有效性
if(argc != 3){
printf("param err\n");
printf("%s<ip><port>\n",argv[0]);
return -1;
}
printf("ip = %s\n",argv[1]);
printf("port = %s\n",argv[2]);
//初始化socket:TCP,IPv4,本地回环测试
fd = socket_init(argv);
//接受客户端链接
int newFd;
struct sockaddr_in newAddr;
socklen_t newAddrlen;
char buf[100] = {0};
fd_set readfds,readfdsTmp;
int i,nfds;
//1.清空可读可写集合,并将服务器的fd添加进可读集合
FD_ZERO(&readfds);
FD_ZERO(&readfdsTmp);
FD_SET(fd,&readfds);
nfds = fd + BACKLOG + 1;
while(1){
readfdsTmp = readfds;
//2.以阻塞方式监听全部文件描述符,如果有文件描述符可以写入,则返回
if(select(nfds,&readfdsTmp,NULL,NULL,NULL) == -1){
perror("select");
exit(-1);
}
//3.根据监听到的文件描述符进行相应的操作
if(FD_ISSET(fd,&readfdsTmp)){//监听到了服务器的fd,代表有新的客户端接入
if((newFd = accept(fd,(struct sockaddr*)&newAddr,&newAddrlen)) < 0){
perror("accept");
return -1;
}
printf("[%s,%d]connect\n",inet_ntoa(newAddr.sin_addr),ntohs(newAddr.sin_port));
printf("newFd = %d\n",newFd);
FD_SET(newFd,&readfds);//将新的socket加入监听列表
}
else{
//printf("Debug:fd = %d\n",fd);
for(i=fd+1;i<nfds;i++){//监听到了客户端的fd,处理相应客户端信息,这里的难点是客户端fd的范围如何确定
//printf("Debug:i=%d\n",i);
if(FD_ISSET(i,&readfdsTmp)){
if(read(i,buf,sizeof(buf)) <= 0){
close(i);
FD_CLR(i,&readfds);
printf("fd=%d closed\n",i);
}else{
if(getpeername(i,(struct sockaddr*)&newAddr,&newAddrlen) == -1){//与客户端链接存在问题
perror("getpeername");
}else{
printf("[%s,%d]data:%s",inet_ntoa(newAddr.sin_addr),ntohs(newAddr.sin_port),buf);
write(i,"server\n",strlen("server\n"));
memset(buf,0,sizeof(buf));
}
}
}
}
}
}
close(fd);
return 0;
}
int socket_init(char** argv){
int fd;
struct sockaddr_in addr;
//1.创建socket
if((fd=socket(AF_INET,SOCK_STREAM,0))<0){//IPv4,TCP协议
perror("socket");
exit(-1);
}
//2.绑定IP、端口号
addr.sin_family = AF_INET; //IPv4
addr.sin_port = htons(atoi(argv[2])); //端口号,要转化为大端子节序
addr.sin_addr.s_addr = inet_addr(argv[1]); //IP地址:0表示在本网络上的本主机,即:自己
if(bind(fd,(struct sockaddr*)&addr,sizeof(struct sockaddr_in)) == -1){
perror("bind");
exit(-1);
}
//3.监听socket
if(listen(fd,BACKLOG) == -1){ //允许最多接入5个客户端
perror("listen");
exit(-1);
}
return fd;
}
代码运行结果如下:
poll函数
函数声明
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
fds:进行复用的配置
nfds:文件描述符的个数
timeout:超时时间,单位ms。0:非阻塞,负数:永久阻塞
struct pollfd结构体:
struct pollfd {
int fd; //文件描述符
short events; //请求的事件
short revents; //返回的事件(该参数由系统设置)
};
events值:
- POLLIN -- 有数据可读(常用)
- POLLPRI -- 有紧急数据需要读取
- POLLOUT -- 文件可写
示例代码
存在问题:
为什么两个客户端同时退出后,再次接入只有第一个可以接入,当第一个输入数据后,第二个才能接入。
为什么退出服务器时,会多次触发poll解除阻塞(与select问题一样)
server.c 代码:
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <poll.h>
#define BACKLOG 5 //最大接入客户端数量
int socket_init(char** argv);
int main(int argc ,char** argv){
int fd;
//判断参数有效性
if(argc != 3){
printf("param err\n");
printf("%s<ip><port>\n",argv[0]);
return -1;
}
printf("ip = %s\n",argv[1]);
printf("port = %s\n",argv[2]);
//初始化socket:TCP,IPv4,本地回环测试
fd = socket_init(argv);
//接受客户端链接
int newFd;
struct sockaddr_in newAddr;
socklen_t newAddrlen;
char buf[100] = {0};
struct pollfd fds[BACKLOG+1] = {0}; //所要监听的序列: BACKLOG个客户端 + 1个服务器
nfds_t nfds = 1; //初始化时只有服务器的socket文件描述符
int i,j;
fds[0].fd = fd;
fds[0].events = POLLIN;
while(1){
//for(i=0;i<nfds;i++){
// printf("fds[%d]=%d\n",i,fds[i].fd);
//}
//1.以阻塞方式,等待数据到来
printf("while again\n");
printf("poll %d\n",poll(fds,nfds,-1));
for(i=0;i<nfds;i++){
if(fds[i].fd == fd && fds[i].revents & POLLIN){ //监听到服务器有可读数据
if((newFd = accept(fd,(struct sockaddr*)&newAddr,&newAddrlen)) < 0){
perror("accept");
return -1;
}
//将新的客户端加入到序列中
fds[nfds].fd = newFd;
fds[nfds].events = POLLIN;
nfds++;
printf("client fd = %d\n",newFd);
printf("[%s,%d]connect\n",inet_ntoa(newAddr.sin_addr),ntohs(newAddr.sin_port));
}else if(fds[i].revents & POLLIN){//监听到客户端有可读数据
//printf("Debug:client fd = %d\n",fds[i].fd);
if(getpeername(fds[i].fd,(struct sockaddr*)&newAddr,&newAddrlen) == -1){//与客户端链接存在问题
printf("fd = %d closed\n",fds[i].fd);
close(fds[i].fd); //关闭socket
for(j=i;j<(nfds-1);j++){//从序列中删除该socket
fds[j] = fds[j+1];
}
nfds--;
i--; //i的fd被i+1的fd覆盖了,所以下次访问i就是访问下一个fd
}else{
printf("before read\n");
printf("read num = %ld\n",read(fds[i].fd,buf,sizeof(buf)));
printf("[%s,%d]data:%s\n",inet_ntoa(newAddr.sin_addr),ntohs(newAddr.sin_port),buf);
write(fds[i].fd,"server\n",strlen("server\n"));
memset(buf,0,sizeof(buf));
}
}
}
}
close(fd);
return 0;
}
int socket_init(char** argv){
int fd;
struct sockaddr_in addr;
//1.创建socket
if((fd=socket(AF_INET,SOCK_STREAM,0))<0){//IPv4,TCP协议
perror("socket");
exit(-1);
}
//2.绑定IP、端口号
addr.sin_family = AF_INET; //IPv4
addr.sin_port = htons(atoi(argv[2])); //端口号,要转化为大端子节序
addr.sin_addr.s_addr = inet_addr(argv[1]); //IP地址:0表示在本网络上的本主机,即:自己
if(bind(fd,(struct sockaddr*)&addr,sizeof(struct sockaddr_in)) == -1){
perror("bind");
exit(-1);
}
//3.监听socket
if(listen(fd,BACKLOG) == -1){ //允许最多接入5个客户端
perror("listen");
exit(-1);
}
return fd;
}
epoll函数
函数声明
//创建epoll句柄
int epoll_create(int size);//size已经不再使用
//epoll句柄控制
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
//等待epoll文件描述符上的I/O事件
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
epoll_create返回文件描述符,失败返回-1
epfd:epoll的文件描述符,即epoll_create的返回值
op:动作
op值 | 含义 |
EPOLL_CTL_ADD | 注册新的fd到epfd中 |
EPOLL_CTL_MOD | 修改已注册fd的监听事件 |
EPOLL_CTL_DEL | 从epfd中删除一个fd |
fd:要进行监听的fd
event:要监听什么事件
events:传入的是struct epoll_event类型的数组,会自动设置其值
maxevents:数组元素的个数
timeout:超时事件,单位ms。0:非阻塞,-1:永久阻塞
struct epoll_event结构体:
struct epoll_event {
uint32_t events; //事件
epoll_data_t data; /* User data variable */
};
typedef union epoll_data {
void* ptr;
int fd;
uint32_t u32;
uint64_t u64;
}epoll_data_t;
使用该结构体时,events是以宏的形式出现,data中只关心fd的赋值。
events宏 | 含义 |
EPOLLIN | 监听是否有数据可读 |
EPOLLOUT | 监听是否可写入数据 |
示例代码
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#define BACKLOG 5 //最大接入客户端数量
int socket_init(char** argv);
int main(int argc ,char** argv){
int fd;
//判断参数有效性
if(argc != 3){
printf("param err\n");
printf("%s<ip><port>\n",argv[0]);
return -1;
}
printf("ip = %s\n",argv[1]);
printf("port = %s\n",argv[2]);
//初始化socket:TCP,IPv4,本地回环测试
fd = socket_init(argv);
//接受客户端链接
int newFd;
struct sockaddr_in newAddr;
socklen_t newAddrlen;
char buf[100] = {0};
int epfd,i,ret;
struct epoll_event tmp;
struct epoll_event events[BACKLOG+1] = {0};// BACKLOG个客户端 + 1个服务器
//1.创建epoll句柄
if((epfd = epoll_create(1)) < 0){ //参数废弃,传入1即可
perror("epoll_create");
return -1;
}
//2.将服务器socket的fd加入序列
tmp.events = EPOLLIN;//监听是否有可读的数据
tmp.data.fd = fd; //监听的是服务器的socket
if(epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&tmp) < 0){
perror("epoll_ctl");
return -1;
}
while(1){
//printf("Debug:while again\n");
//3.阻塞方式监听全部I/O
epoll_wait(epfd,events,BACKLOG+1,-1);//当监测到时会自动设置events的值
for(i=0;i<(BACKLOG+1);i++){
//printf("Debug:i=%d\n",i);
if(events[i].data.fd == fd){//被设置的fd是服务器
if((newFd = accept(fd,(struct sockaddr*)&newAddr,&newAddrlen)) < 0){
perror("accept");
return -1;
}
//将新的客户端加入序列
tmp.events = EPOLLIN; //监听是否有可读的数据
tmp.data.fd = newFd; //监听的是新接入的客户端
if(epoll_ctl(epfd,EPOLL_CTL_ADD,newFd,&tmp) < 0){
perror("epoll_ctl");
return -1;
}
printf("client fd = %d\n",newFd);
printf("[%s,%d]connect\n",inet_ntoa(newAddr.sin_addr),ntohs(newAddr.sin_port));
}else if(events[i].data.fd != 0){//被设置的fd是客户端
//printf("Debug:before read\n");
//printf("Debug:events[i].data.fd = %d\n",events[i].data.fd);
ret = read(events[i].data.fd,buf,sizeof(buf));
//printf("Debug:after read\n");
if(ret <=0){
printf("fd = %d closed\n",events[i].data.fd);
//将客户端从序列中移除
if(epoll_ctl(epfd,EPOLL_CTL_DEL,events[i].data.fd,NULL) < 0){
perror("epoll_ctl");
return -1;
}
//memset(&events[i],0,sizeof(events[0]));不能使用memset去清除events元素
close(events[i].data.fd);
}else{
if(getpeername(events[i].data.fd,(struct sockaddr*)&newAddr,&newAddrlen) == -1){//与客户端链接存在问题
perror("getpeername");
}else{
printf("[%s,%d]%s\n",inet_ntoa(newAddr.sin_addr),ntohs(newAddr.sin_port),buf);
}
}
}
}
}
close(fd);
return 0;
}
int socket_init(char** argv){
int fd;
struct sockaddr_in addr;
//1.创建socket
if((fd=socket(AF_INET,SOCK_STREAM,0))<0){//IPv4,TCP协议
perror("socket");
exit(-1);
}
//2.绑定IP、端口号
addr.sin_family = AF_INET; //IPv4
addr.sin_port = htons(atoi(argv[2])); //端口号,要转化为大端子节序
addr.sin_addr.s_addr = inet_addr(argv[1]); //IP地址:0表示在本网络上的本主机,即:自己
if(bind(fd,(struct sockaddr*)&addr,sizeof(struct sockaddr_in)) == -1){
perror("bind");
exit(-1);
}
//3.监听socket
if(listen(fd,BACKLOG) == -1){ //允许最多接入5个客户端
perror("listen");
exit(-1);
}
return fd;
}