IO多路复用,服务器,广播与组播
目录
multil_io
IO多路复用
1、阻塞IO ===》最常用 默认设置
2、非阻塞IO ===》在阻塞IO的基础上调整其为不再阻塞等待。
3.信号驱动io
4.并发
5.1、IO多路复用 select
select-fifo
Select-tcp
5.1、IO多路复用 epoll
epoll_create
参数说明
返回值
错误码
主要用途
epoll_ctl
参数说明
epoll_event 结构体
返回值
错误码
操作类型详解
注意事项
epoll_wait
参数说明
返回值
错误码
注意事项
epoll使用fifo
epoll超时用法fifo
Epoll-tcp
select poll epoll的区别
服务器,允许多个进程连接
Tcp-fork
ser
cli
Tcp-pthread
广播与组播
广播
组播
multil_io
IO多路复用
定义:单线程或单进程同时监测若干个文件描述符是否可以执行IO操作的能力
作用:
应用程序通常需要处理来自多条事件流中的事件,比如我现在用的电脑,需要同时处理键盘鼠标的输入、中断信号等等事件,再比如web服务器如nginx,需要同时处理来来自N个客户端的事件。
逻辑控制流在时间上的重叠叫做 并发
而CPU单核在同一时刻只能做一件事情,一种解决办法是对CPU进行时分复用(多个事件流将CPU切割成多个时间片,不同事件流的时间片交替进行)。在计算机系统中,我们用线程或者进程来表示一条执行流,通过不同的线程或进程在操作系统内部的调度,来做到对CPU处理的时分复用。这样多个事件流就可以并发进行,不需要一个等待另一个太久,在用户看起来他们似乎就是并行在做一样。
使用并发处理的成本:
线程/进程创建成本
CPU切换不同线程/进程成本 Context Switch
多线程的资源竞争
有没有一种可以在单线程/进程中处理多个事件流的方法呢?一种答案就是IO多路复用。
因此IO多路复用解决的本质问题是在用更少的资源完成更多的事。
IO模型
1、阻塞IO
2、非阻塞IO EAGAIN 忙等待 errno
3、信号驱动IO SIGIO 用的相对少(了解)
4、并行模型 进程,线程
5、IO多路复用 select、poll、epoll
1、阻塞IO ===》最常用 默认设置
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <pthread.h>
int main(int argc, char *argv[])
{
int ret = mkfifo("myfifo1",0666);
if(-1 == ret)
{
if( EEXIST!= errno )
{
perror("mkfifo");
return 1;
}
}
int fd_w = open("myfifo1",O_WRONLY);
if(-1 == fd_w)
{
perror("open");
return 1;
}
while(1)
{
char buf[128]="hello, fifo test";
write(fd_w,buf,strlen(buf));
sleep(3);
}
return 0;
}
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <pthread.h>
int main(int argc, char *argv[])
{
int ret = mkfifo("myfifo1",0666);
if(-1 == ret)
{
if( EEXIST!= errno )
{
perror("mkfifo");
return 1;
}
}
int fd_r = open("myfifo1",O_RDONLY);
if(-1 == fd_r)
{
perror("open");
return 1;
}
while(1)
{
char buf[128]={0};
read(fd_r,buf,sizeof(buf));
printf("fifo:%s\n",buf);
bzero(buf,sizeof(buf));
fgets(buf,sizeof(buf),stdin);
printf("terminal:%s\n",buf);
}
return 0;
}
2、非阻塞IO ===》在阻塞IO的基础上调整其为不再阻塞等待。
在程序执行阶段调整文件的执行方式为非阻塞:
===》fcntl() ===>动态调整文件的阻塞属性
#include <unistd.h>
#include <fcntl.h>
int fcntl(int fd, int cmd, ... /* arg */ );
功能:修改指定文件的属性信息。
参数:fd 要调整的文件描述符
cmd 要调整的文件属性宏名称
... 可变长的属性值参数。
返回值:成功 不一定,看cmd
失败 -1;
eg:修改文件的非阻塞属性:
int flag ;
flag = fcntl(fd,F_GETFL,0); ///获取fd文件的默认属性到flag变量中。
flag = flag | O_NONBLOCK; ///将变量的值调整并添加非阻塞属性
fcntl(fd,F_SETFL,flag); ///将新属性flag设置到fd对应的文件生效。
以上代码执行后的阻塞IO将变成非阻塞方式。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <pthread.h>
int main(int argc, char *argv[])
{
int ret = mkfifo("myfifo1",0666);
if(-1 == ret)
{
if( EEXIST!= errno )
{
perror("mkfifo");
return 1;
}
}
int fd_w = open("myfifo1",O_WRONLY);
if(-1 == fd_w)
{
perror("open");
return 1;
}
while(1)
{
char buf[128]="hello, fifo test";
write(fd_w,buf,strlen(buf));
sleep(3);
}
return 0;
}
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <pthread.h>
int main(int argc, char *argv[])
{
int ret = mkfifo("myfifo1",0666);
if(-1 == ret)
{
if( EEXIST!= errno )
{
perror("mkfifo");
return 1;
}
}
int fd_r = open("myfifo1",O_RDONLY);
if(-1 == fd_r)
{
perror("open");
return 1;
}
int flag = fcntl(fd_r,F_GETFL);
fcntl(fd_r,F_SETFL,flag|O_NONBLOCK);
flag = fcntl( 0,F_GETFL);
fcntl(0,F_SETFL,flag|O_NONBLOCK);
while(1)
{
char buf[128]={0};
if(read(fd_r,buf,sizeof(buf))>0)
{
printf("fifo:%s\n",buf);
}
bzero(buf,sizeof(buf));
if(fgets(buf,sizeof(buf),stdin))
{
printf("terminal:%s\n",buf);
}
}
return 0;
}
3.信号驱动io
文件描述符需要追加 O_ASYNC 标志。
设备有io事件可以执行时,内核发送SIGIO信号。
1.追加标志
int flag ;
flag = fcntl(fd,F_GETFL,0);
fcntl(fd,F_SETFL,flag | O_ASYNC); 2.设置信号接收者
fcntl(fd,F_SETOWN,getpid());//常用设置
3.对信号进行捕获
signal(SIGIO,myhandle);//
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <pthread.h>
int main(int argc, char *argv[])
{
int ret = mkfifo("myfifo1",0666);
if(-1 == ret)
{
if( EEXIST!= errno )
{
perror("mkfifo");
return 1;
}
}
int fd_w = open("myfifo1",O_WRONLY);
if(-1 == fd_w)
{
perror("open");
return 1;
}
while(1)
{
char buf[128]="hello, fifo test";
write(fd_w,buf,strlen(buf));
sleep(3);
}
return 0;
}
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <pthread.h>
#include <signal.h>
int fd_r;
void handle(int num)
{
char buf[128]={0};
read(fd_r,buf,sizeof(buf));
printf("fifo:%s\n",buf);
return ;
}
int main(int argc, char *argv[])
{
signal(SIGIO,handle);
int ret = mkfifo("myfifo1",0666);
if(-1 == ret)
{
if( EEXIST!= errno )
{
perror("mkfifo");
return 1;
}
}
fd_r = open("myfifo1",O_RDONLY);
if(-1 == fd_r)
{
perror("open");
return 1;
}
//给管道设置信号驱动
int flag = fcntl(fd_r,F_GETFL);
fcntl(fd_r,F_SETFL,flag| O_ASYNC);
//如果有写管道,本进程作为sigio信号的接收者
fcntl(fd_r,F_SETOWN,getpid());
while(1)
{
char buf[128]={0};
bzero(buf,sizeof(buf));
fgets(buf,sizeof(buf),stdin);
printf("terminal:%s\n",buf);
}
return 0;
}
4.并发
1.进程
2.线程
进程:
线程:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <pthread.h>
void* th1(void* arg)
{
int fd_r = *(int*)arg;
while(1)
{
char buf[128]={0};
read(fd_r,buf,sizeof(buf));
printf("fifo:%s\n",buf);
}
return NULL;
}
void* th2(void* arg)
{
while(1)
{
char buf[128]={0};
bzero(buf,sizeof(buf));
fgets(buf,sizeof(buf),stdin);
printf("terminal:%s\n",buf);
}
return NULL;
}
int main(int argc, char *argv[])
{
int ret = mkfifo("myfifo1",0666);
if(-1 == ret)
{
if( EEXIST!= errno )
{
perror("mkfifo");
return 1;
}
}
int fd_r = open("myfifo1",O_RDONLY);
if(-1 == fd_r)
{
perror("open");
return 1;
}
pthread_t tid1,tid2;
pthread_create(&tid1,NULL,th1,&fd_r);
pthread_create(&tid2,NULL,th2,NULL);
pthread_join(tid1,NULL);
pthread_join(tid2,NULL);
return 0;
}
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <pthread.h>
int main(int argc, char *argv[])
{
int ret = mkfifo("myfifo1",0666);
if(-1 == ret)
{
if( EEXIST!= errno )
{
perror("mkfifo");
return 1;
}
}
int fd_w = open("myfifo1",O_WRONLY);
if(-1 == fd_w)
{
perror("open");
return 1;
}
while(1)
{
char buf[128]="hello, fifo test";
write(fd_w,buf,strlen(buf));
sleep(3);
}
return 0;
}
5.1、IO多路复用 select
IO 多路复用 ===》并发服务器 ===》TCP协议 3、select循环服务器 ===> 用select函数来动态检测有数据流动的文件描述符
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds,
struct timeval *timeout);
功能:完成指定描述符集合中有效描述符的动态检测。
该函数具有阻塞等待功能,在函数执行完毕后
目标测试集合中将只保留最后有数据的描述符。
参数:nfds 描述符的上限值,一般是链接后描述符的最大值+1;
readfds 只读描述符集
writefds 只写描述符集
exceptfds 异常描述符集
以上三个参数都是 fd_set * 的描述符集合类型
timeout 检测超时 如果是NULL表示一直检测不超时 。
返回值:超时 0
失败 -1
成功 >0
为了配合select函数执行,有如下宏函数:
void FD_CLR(int fd, fd_set *set);
功能:将指定的set集合中编号为fd的描述符号删除。
int FD_ISSET(int fd, fd_set *set);
功能:判断值为fd的描述符是否在set集合中,
如果在则返回真,否则返回假。
void FD_SET(int fd, fd_set *set);
功能:将指定的fd描述符,添加到set集合中。
void FD_ZERO(fd_set *set);
功能:将指定的set集合中所有描述符删除。
select-fifo
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <pthread.h>
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
int main(int argc, char *argv[])
{
int ret = mkfifo("myfifo1",0666);
if(-1 == ret)
{
if( EEXIST!= errno )
{
perror("mkfifo");
return 1;
}
}
int fd_r = open("myfifo1",O_RDONLY);
if(-1 == fd_r)
{
perror("open");
return 1;
}
//1 create set
fd_set rd_set,tmp_set; //read set
FD_ZERO(&rd_set);
FD_ZERO(&tmp_set);
// 2. add fd
FD_SET(0,&tmp_set);
FD_SET(fd_r,&tmp_set);
while(1)
{
//6.clean flag
rd_set = tmp_set;
char buf[128]={0};
//3 wait event
select(fd_r+1,&rd_set,NULL,NULL,NULL );
//4 找到对应的fd
if(FD_ISSET(fd_r,&rd_set))
{
read(fd_r,buf,sizeof(buf));
printf("fifo:%s\n",buf);
}
if(FD_ISSET(0,&rd_set))
{
bzero(buf,sizeof(buf));
fgets(buf,sizeof(buf),stdin);
printf("terminal:%s\n",buf);
}
}
return 0;
}
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <pthread.h>
int main(int argc, char *argv[])
{
int ret = mkfifo("myfifo1",0666);
if(-1 == ret)
{
if( EEXIST!= errno )
{
perror("mkfifo");
return 1;
}
}
int fd_w = open("myfifo1",O_WRONLY);
if(-1 == fd_w)
{
perror("open");
return 1;
}
while(1)
{
char buf[128]="hello, fifo test";
write(fd_w,buf,strlen(buf));
sleep(3);
}
return 0;
}
Select-tcp
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <arpa/inet.h>
#include <time.h>
/* According to POSIX.1-2001, POSIX.1-2008 */
#include <sys/select.h>
/* According to earlier standards */
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
typedef struct sockaddr* (SA);
int main(int argc, char *argv[])
{
//监听套接字
int listfd = socket(AF_INET,SOCK_STREAM,0 );
if(-1 ==listfd)
{
perror("socket");
exit(1);
}
struct sockaddr_in ser,cli;
bzero(&ser,sizeof(ser));
bzero(&cli,sizeof(cli));
ser.sin_family = AF_INET;
ser.sin_port = htons(50000);
ser.sin_addr.s_addr =inet_addr("127.0.0.1");
//man 7 socket
int on = 1;
setsockopt(listfd,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(on));
setsockopt(listfd,SOL_SOCKET,SO_REUSEPORT,&on,sizeof(on));
int ret = bind(listfd,(SA)&ser,sizeof(ser));
if(-1 ==ret)
{
perror("bind");
exit(1);
}
//建立连接的排队数
listen(listfd,3);
socklen_t len = sizeof(cli);
//1 create set
fd_set rd_set,tmp_set;
FD_ZERO(&rd_set);
FD_ZERO(&tmp_set);
//2.add fd
FD_SET(listfd,&tmp_set);
int maxfd = listfd;
while(1)
{
rd_set = tmp_set;
select(maxfd+1,&rd_set,NULL,NULL,NULL);
int i = 0 ;
//通讯套接字
for(i = 0 ;i<maxfd+1;i++)
{
if(FD_ISSET(i,&rd_set) && i ==listfd)
{
int conn = accept(listfd,(SA)&cli,&len);
if(-1 == conn)
{
perror("accept");
// exit(1);
continue;
}
FD_SET(conn,&tmp_set);
if(conn>maxfd)
maxfd = conn;
}
if(FD_ISSET(i,&rd_set) && i!=listfd)
{
int conn = i ;
char buf[512]={0};
int rd_ret = recv(conn,buf,sizeof(buf),0);
if(rd_ret<=0)
{
FD_CLR(conn,&tmp_set);
close(conn);
printf("cli offline\n");
break;
}
//printf("cli:%s\n",buf);
time_t tm;
time(&tm);
sprintf(buf,"%s %s",buf,ctime(&tm));
send(conn,buf,strlen(buf),0);
}
}
}
close(listfd);
return 0;
}
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <arpa/inet.h>
#include <time.h>
#include <sys/time.h>
typedef struct sockaddr* (SA);
int main(int argc, char *argv[])
{
int conn= socket(AF_INET,SOCK_STREAM,0);
if(-1 == conn)
{
perror("socket");
exit(1);
}
struct sockaddr_in ser;
bzero(&ser,sizeof(ser));
ser.sin_family = AF_INET;
ser.sin_port = htons(50000);
ser.sin_addr.s_addr =inet_addr("127.0.0.1");
int ret = connect(conn,(SA)&ser,sizeof(ser));
if(-1 == ret)
{
perror("connect");
exit(1);
}
int i =5;
struct timeval tv;
tv.tv_sec = 3;
tv.tv_usec = 0 ;
setsockopt(conn,SOL_SOCKET,SO_RCVTIMEO,&tv,sizeof(tv)); //recv == -1 timeout
while(1)
{
char buf[512]="hello,this tcp test";
send(conn,buf,strlen(buf),0);
bzero(buf,sizeof(buf));
int ret = recv(conn,buf,sizeof(buf),0);
if(ret==0)
{
printf("ser close\n");
break;
}
if(ret<=0)
{
printf("time out,contineu\n");
}
printf("ser:%s\n",buf);
sleep(1);
}
close(conn);
return 0;
}
5.1、IO多路复用 epoll
epoll
是 Linux 提供的一种高效的 I/O 多路复用机制,适用于处理大量并发连接。主要用于事件通知,以高效处理多个文件描述符的读写操作。
epoll_create
int epoll_create(int size);
参数说明
-
size
: 实际上被忽略,历史上用于指定事件表的初始大小。在现代 Linux 内核中,这个值通常设置为 1。
返回值
-
成功: 返回一个非负整数,表示
epoll
实例的文件描述符。 -
失败: 返回 -1,并设置
errno
以指示错误原因。
错误码
-
EMFILE
: 进程的打开文件描述符数量达到上限。 -
ENFILE
: 系统的打开文件描述符数量达到上限。 -
ENOMEM
: 内存不足,无法分配资源。
主要用途
-
创建
epoll
实例: 用于管理 I/O 事件的监视。 -
初始化事件表: 为
epoll
设置一个用于跟踪文件描述符的事件表。
epoll_ctl
epoll_ctl
函数用于控制 epoll
实例的行为,可以用来添加、修改或删除要监视的文件描述符及其事件。这是 epoll
机制中的核心函数之一,通常配合 epoll_create
和 epoll_wait
使用。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
参数说明
-
epfd
:epoll_create
返回的epoll
文件描述符。表示你要操作的epoll
实例。 -
op
: 操作类型。可以是以下之一:-
EPOLL_CTL_ADD
: 添加一个新的文件描述符到epoll
实例中。 -
EPOLL_CTL_MOD
: 修改已经存在的文件描述符的事件监听设置。 -
EPOLL_CTL_DEL
: 从epoll
实例中删除一个文件描述符。
-
-
fd
: 需要添加、修改或删除的文件描述符。 -
event
: 指向epoll_event
结构体的指针,用于指定文件描述符的事件类型及其他信息。
epoll_event
结构体
struct epoll_event {
uint32_t events; // 事件类型
epoll_data_t data; // 事件相关的数据
};
-
events
: 需要监听的事件类型。常用的事件类型包括:-
EPOLLIN
: 可读事件。 -
EPOLLOUT
: 可写事件。 -
EPOLLERR
: 错误事件。 -
EPOLLHUP
: 挂断事件。 -
EPOLLET
: 边缘触发(可选)。
-
-
data
: 用户定义的数据,可以是文件描述符、指针等。
返回值
-
成功: 返回 0。
-
失败: 返回 -1,并设置
errno
以指示错误原因。
错误码
-
EBADF
:epfd
不是有效的文件描述符。 -
EINVAL
: 参数op
无效,或fd
不在epfd
所管理的epoll
实例中。 -
ENOENT
:EPOLL_CTL_MOD
操作中指定的文件描述符不存在。 -
ENOMEM
: 内存不足,无法完成操作。
操作类型详解
-
EPOLL_CTL_ADD
:-
将
fd
添加到epoll
实例中,以便监视其事件。 -
event
参数指定了需要监视的事件类型。
-
-
EPOLL_CTL_MOD
:-
修改已存在文件描述符的事件监听设置。
-
event
参数更新文件描述符fd
的事件类型。
-
-
EPOLL_CTL_DEL
:-
从
epoll
实例中删除文件描述符。 -
文件描述符
fd
不再接受epoll
实例的事件通知。
-
注意事项
-
边缘触发(
EPOLLET
): 使用边缘触发模式时,需要使用非阻塞 I/O,否则可能会出现问题。边缘触发模式只有在状态变化时才会通知,而水平触发模式会持续通知直到事件处理完成。 -
事件处理: 在
epoll_wait
返回后,你需要根据events
中的标志来处理事件,例如读取数据、处理错误等。
epoll_wait
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
参数说明
-
epfd
:epoll_create
返回的epoll
文件描述符。 -
events
: 指向epoll_event
结构体数组的指针,用于存储触发的事件。 -
maxevents
:events
数组的大小,即最大可以返回的事件数。 -
timeout
: 等待事件的超时时间(以毫秒为单位)。可以是以下值之一:-
-1
: 无限等待,直到有事件发生。 -
0
: 非阻塞模式,立即返回,不管是否有事件发生。 -
正整数: 等待指定的时间后返回,如果在此时间内没有事件发生,则返回。
-
返回值
-
成功: 返回实际触发的事件数(
0
到maxevents
之间)。 -
失败: 返回 -1,并设置
errno
以指示错误原因。
错误码
-
EBADF
:epfd
不是有效的文件描述符。 -
EINTR
: 调用被信号中断。 -
EINVAL
: 参数maxevents
或timeout
无效。
注意事项
-
事件处理: 在处理
epoll_wait
返回的事件时,根据events
中的标志来确定需要处理的事件类型。 -
超时处理: 使用适当的
timeout
设置来平衡响应时间和系统资源的使用。 -
非阻塞 I/O: 如果使用边缘触发模式(
EPOLLET
),确保使用非阻塞 I/O 来避免潜在的阻塞问题
epoll使用fifo
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <pthread.h>
#include <sys/epoll.h>
int add_fd(int epfd,int fd)
{
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = fd;
int ret = epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&ev);
if(-1 == ret)
{
perror("add fd");
}
return ret;
}
int main(int argc, char *argv[])
{
int ret = mkfifo("myfifo1",0666);
if(-1 == ret)
{
if( EEXIST!= errno )
{
perror("mkfifo");
return 1;
}
}
int fd_r = open("myfifo1",O_RDONLY);
if(-1 == fd_r)
{
perror("open");
return 1;
}
struct epoll_event rev[2]={0};
//1 create set
int epfd = epoll_create(2);
if(-1 == epfd)
{
perror("epoll_create");
return 1;
}
//2 add fd
add_fd(epfd,0);
add_fd(epfd,fd_r);
while(1)
{
char buf[128]={0};
int ep_ret = epoll_wait(epfd,rev,2,-1);
int i = 0 ;
for(i = 0;i<ep_ret;i++)
{
if(fd_r == rev[i].data.fd )
{
read(fd_r,buf,sizeof(buf));
printf("fifo:%s\n",buf);
}
else if(0 == rev[i].data.fd)
{
bzero(buf,sizeof(buf));
fgets(buf,sizeof(buf),stdin);
printf("terminal:%s\n",buf);
}
}
}
return 0;
}
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <pthread.h>
int main(int argc, char *argv[])
{
int ret = mkfifo("myfifo1",0666);
if(-1 == ret)
{
if( EEXIST!= errno )
{
perror("mkfifo");
return 1;
}
}
int fd_w = open("myfifo1",O_WRONLY);
if(-1 == fd_w)
{
perror("open");
return 1;
}
while(1)
{
char buf[128]="hello, fifo test";
write(fd_w,buf,strlen(buf));
sleep(3);
}
return 0;
}
epoll超时用法fifo
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <pthread.h>
#include <sys/epoll.h>
int add_fd(int epfd,int fd)
{
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = fd;
int ret = epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&ev);
if(-1 == ret)
{
perror("add fd");
}
return ret;
}
int main(int argc, char *argv[])
{
int ret = mkfifo("myfifo1",0666);
if(-1 == ret)
{
if( EEXIST!= errno )
{
perror("mkfifo");
return 1;
}
}
int fd_r = open("myfifo1",O_RDONLY);
if(-1 == fd_r)
{
perror("open");
return 1;
}
struct epoll_event rev[2]={0};
//1 create set
int epfd = epoll_create(2);
if(-1 == epfd)
{
perror("epoll_create");
return 1;
}
//2 add fd
add_fd(epfd,0);
add_fd(epfd,fd_r);
while(1)
{
char buf[128]={0};
int ep_ret = epoll_wait(epfd,rev,2,1000*2);
if(ep_ret>0)
{
int i = 0 ;
for(i = 0;i<ep_ret;i++)
{
if(fd_r == rev[i].data.fd )
{
read(fd_r,buf,sizeof(buf));
printf("fifo:%s\n",buf);
}
else if(0 == rev[i].data.fd)
{
bzero(buf,sizeof(buf));
fgets(buf,sizeof(buf),stdin);
printf("terminal:%s\n",buf);
}
}
}
else if (0 == ep_ret)
{
printf("timeout...\n");
continue;
}
else
{
perror("epoll_wait");
break;
}
}
return 0;
}
Epoll-tcp
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <arpa/inet.h>
#include <time.h>
#include <sys/epoll.h>
typedef struct sockaddr* (SA);
int add_fd(int epfd,int fd)
{
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = fd;
int ret = epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&ev);
if(-1 == ret)
{
perror("add fd");
}
return ret;
}
int del_fd(int epfd,int fd)
{
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = fd;
int ret = epoll_ctl(epfd,EPOLL_CTL_DEL,fd,&ev);
if(-1 == ret)
{
perror("add fd");
}
return ret;
}
int main(int argc, char *argv[])
{
//监听套接字
int listfd = socket(AF_INET,SOCK_STREAM,0 );
if(-1 ==listfd)
{
perror("socket");
exit(1);
}
struct sockaddr_in ser,cli;
bzero(&ser,sizeof(ser));
bzero(&cli,sizeof(cli));
ser.sin_family = AF_INET;
ser.sin_port = htons(50000);
ser.sin_addr.s_addr =inet_addr("127.0.0.1");
//man 7 socket
int on = 1;
setsockopt(listfd,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(on));
setsockopt(listfd,SOL_SOCKET,SO_REUSEPORT,&on,sizeof(on));
int ret = bind(listfd,(SA)&ser,sizeof(ser));
if(-1 ==ret)
{
perror("bind");
exit(1);
}
//建立连接的排队数
listen(listfd,3);
socklen_t len = sizeof(cli);
struct epoll_event rev[10]={0};
//1 create set
int epfd = epoll_create(10);
if(-1 == epfd)
{
perror("epoll_create");
return 1;
}
// 2 .add fd
add_fd(epfd,listfd);
while(1)
{
//3 wait event
int ep_ret = epoll_wait(epfd,rev,10,-1);
int i = 0 ;
//4 find fd handle
for(i = 0 ;i<ep_ret;i++)
{
if(rev[i].data.fd == listfd)
{ //通讯套接字
int conn = accept(listfd,(SA)&cli,&len);
if(-1 == conn)
{
perror("accept");
continue;
}
add_fd(epfd,conn);
}
else
{
int conn = rev[i].data.fd;
char buf[512]={0};
int rd_ret = recv(conn,buf,sizeof(buf),0);
if(rd_ret<=0)
{
del_fd(epfd,conn);
close(conn);
break;
}
time_t tm;
time(&tm);
sprintf(buf,"%s %s",buf,ctime(&tm));
send(conn,buf,strlen(buf),0);
}
}
}
close(listfd);
return 0;
}
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <arpa/inet.h>
#include <time.h>
#include <sys/time.h>
typedef struct sockaddr* (SA);
int main(int argc, char *argv[])
{
int conn= socket(AF_INET,SOCK_STREAM,0);
if(-1 == conn)
{
perror("socket");
exit(1);
}
struct sockaddr_in ser;
bzero(&ser,sizeof(ser));
ser.sin_family = AF_INET;
ser.sin_port = htons(50000);
ser.sin_addr.s_addr =inet_addr("127.0.0.1");
int ret = connect(conn,(SA)&ser,sizeof(ser));
if(-1 == ret)
{
perror("connect");
exit(1);
}
int i =5;
struct timeval tv;
tv.tv_sec = 3;
tv.tv_usec = 0 ;
setsockopt(conn,SOL_SOCKET,SO_RCVTIMEO,&tv,sizeof(tv)); //recv == -1 timeout
while(1)
{
char buf[512]="hello,this tcp test";
send(conn,buf,strlen(buf),0);
bzero(buf,sizeof(buf));
int ret = recv(conn,buf,sizeof(buf),0);
if(ret==0)
{
printf("ser close\n");
break;
}
if(ret<=0)
{
printf("time out,contineu\n");
}
printf("ser:%s\n",buf);
sleep(1);
}
close(conn);
return 0;
}
select poll epoll的区别
-
select
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
void FD_CLR(int fd, fd_set *set);
int FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);
select的调用一般要注意几点:
① readfds等是指针结果参数,会被函数修改,所以一般会另外定义一个allread_fdset,保持全部要监听读的句柄,将它的拷贝传递给select函数,返回可读的句柄集合,类型fdset支持赋值运算符=;
② 要注意计算nfds,当新增监听句柄时比较容易修改,当减少监听句柄时较麻烦些,如果要精确修改需要遍历或者采用最大堆等数据结构维护这个句柄集,以方便的找到第二大的句柄,或者干脆在减少监听句柄时不管nfds;
③ timeout如果为NULL表示阻塞等,如果timeout指向的时间为0(struct timeval的两个变量都为0),表示非阻塞,否则表示select的超时时间;
④ select返回-1表示错误,返回0表示超时时间到没有监听到的事件发生,返回正数表示监听到的所有事件数(包括可读,可写,异常),通常在处理事件时 会利用这个返回值来提高效率,避免不必要的事件触发检查。(比如总共只有一个事件,已经在可读集合中处理了一个事件,则可写和异常就没必要再去遍历句柄集 判断是否发生事件了);
⑤ Linux的实现中select返回时会将timeout修改为剩余时间,所以重复使用timeout需要注意。
select的缺点在于:
① 由于描述符集合set的限制,每个set最多只能监听FD_SETSIZE(在Linux上是1024)个句柄(不同机器可能不一样);
② 返回的可读集合是个fdset类型,需要对所有的监听读句柄一一进行FD_ISSET的测试来判断是否可读;
③ nfds的存在就是为了解决select的效率问题(select遍历nfds个文件描述符,判断每个描述符是否是自己关心的,对关心的描述符判断是否发生事件)。但是解决不彻底,比如如果只监听0和1000两个句柄,select需要遍历1001个句柄来检查事件。
-
epoll
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
epoll 解决了select和poll的几个性能上的缺陷:①不限制监听的描述符个数(poll也是),只受进程打开描述符总数的限制;②监听性能不随着监听描述 符数的增加而增加,是O(1)的,不再是轮询描述符来探测事件,而是由描述符主动上报事件;(底层:中断,应用层:发信号)③使用共享内存的方式,不在用户和内核之间反复传递监听的描述 符信息;④返回参数中就是触发事件的列表,不用再遍历输入事件表查询各个事件是否被触发。
epoll显著提高性能的前提是:监听大量描述符,并且每次触发事件的描述符文件非常少。
epoll的另外区别是:①epoll创建了描述符,记得close;②支持水平触发和边沿触发。
服务器,允许多个进程连接
Tcp-fork
ser
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <arpa/inet.h>
#include <time.h>
#include <signal.h>
#include <sys/wait.h>
typedef struct sockaddr* (SA);
void handle(int num)
{
wait(NULL);
}
int main(int argc, char *argv[])
{
signal(SIGCHLD,handle);
//监听套接字
int listfd = socket(AF_INET,SOCK_STREAM,0 );
if(-1 ==listfd)
{
perror("socket");
exit(1);
}
struct sockaddr_in ser,cli;
bzero(&ser,sizeof(ser));
bzero(&cli,sizeof(cli));
ser.sin_family = AF_INET;
ser.sin_port = htons(50000);
ser.sin_addr.s_addr =inet_addr("127.0.0.1");
//man 7 socket
int on = 1;
setsockopt(listfd,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(on));
setsockopt(listfd,SOL_SOCKET,SO_REUSEPORT,&on,sizeof(on));
int ret = bind(listfd,(SA)&ser,sizeof(ser));
if(-1 ==ret)
{
perror("bind");
exit(1);
}
//建立连接的排队数
listen(listfd,3);
socklen_t len = sizeof(cli);
while(1)
{
//通讯套接字
int conn = accept(listfd,(SA)&cli,&len);
if(-1 == conn)
{
perror("accept");
//exit(1);
continue;
}
pid_t pid = fork();
if(0 == pid)
{
while(1)
{
close(listfd);
char buf[512]={0};
int rd_ret = recv(conn,buf,sizeof(buf),0);
if(rd_ret<=0)
{
printf("cli off line\n");
close(conn);
//break;
exit(1);
}
printf("cli:%s\n",buf);
time_t tm;
time(&tm);
sprintf(buf,"%s %s",buf,ctime(&tm));
send(conn,buf,strlen(buf),0);
}
}
else if (pid>0)
{
close(conn);
}
else
{
printf("fork");
continue;
}
}
close(listfd);
return 0;
}
cli
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <arpa/inet.h>
#include <time.h>
#include <sys/time.h>
typedef struct sockaddr* (SA);
int main(int argc, char *argv[])
{
int conn= socket(AF_INET,SOCK_STREAM,0);
if(-1 == conn)
{
perror("socket");
exit(1);
}
struct sockaddr_in ser;
bzero(&ser,sizeof(ser));
ser.sin_family = AF_INET;
ser.sin_port = htons(50000);
ser.sin_addr.s_addr =inet_addr("127.0.0.1");
int ret = connect(conn,(SA)&ser,sizeof(ser));
if(-1 == ret)
{
perror("connect");
exit(1);
}
int i =5;
struct timeval tv;
tv.tv_sec = 3;
tv.tv_usec = 0 ;
setsockopt(conn,SOL_SOCKET,SO_RCVTIMEO,&tv,sizeof(tv)); //recv == -1 timeout
while(1)
{
char buf[512]="hello,this tcp test";
send(conn,buf,strlen(buf),0);
bzero(buf,sizeof(buf));
int ret = recv(conn,buf,sizeof(buf),0);
if(ret==0)
{
printf("ser close\n");
break;
}
if(ret<=0)
{
printf("time out,contineu\n");
}
printf("ser:%s\n",buf);
sleep(1);
}
close(conn);
return 0;
}
Tcp-pthread
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <arpa/inet.h>
#include <time.h>
#include <signal.h>
#include <sys/wait.h>
#include <pthread.h>
typedef struct sockaddr* (SA);
void* th(void* arg)
{
pthread_detach(pthread_self());
int conn =* (int*)arg;
//sem_post();
while(1)
{
char buf[512]={0};
int rd_ret = recv(conn,buf,sizeof(buf),0);
if(rd_ret<=0)
{
printf("cli off line\n");
close(conn);
break;
}
printf("cli:%s\n",buf);
time_t tm;
time(&tm);
sprintf(buf,"%s %s",buf,ctime(&tm));
send(conn,buf,strlen(buf),0);
}
return NULL;
}
int main(int argc, char *argv[])
{
//监听套接字
int listfd = socket(AF_INET,SOCK_STREAM,0 );
if(-1 ==listfd)
{
perror("socket");
exit(1);
}
struct sockaddr_in ser,cli;
bzero(&ser,sizeof(ser));
bzero(&cli,sizeof(cli));
ser.sin_family = AF_INET;
ser.sin_port = htons(50000);
ser.sin_addr.s_addr =inet_addr("127.0.0.1");
//man 7 socket
int on = 1;
setsockopt(listfd,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(on));
setsockopt(listfd,SOL_SOCKET,SO_REUSEPORT,&on,sizeof(on));
int ret = bind(listfd,(SA)&ser,sizeof(ser));
if(-1 ==ret)
{
perror("bind");
exit(1);
}
//建立连接的排队数
listen(listfd,3);
socklen_t len = sizeof(cli);
while(1)
{
//通讯套接字
int conn = accept(listfd,(SA)&cli,&len);
if(-1 == conn)
{
perror("accept");
//exit(1);
continue;
}
pthread_t tid;
pthread_create(&tid,NULL,th,&conn);
//sem_wait();
// join();
// 确保th中,把conn保存到局部变量中
usleep(1000*5);
}
close(listfd);
return 0;
}
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <arpa/inet.h>
#include <time.h>
#include <sys/time.h>
typedef struct sockaddr* (SA);
int main(int argc, char *argv[])
{
int conn= socket(AF_INET,SOCK_STREAM,0);
if(-1 == conn)
{
perror("socket");
exit(1);
}
struct sockaddr_in ser;
bzero(&ser,sizeof(ser));
ser.sin_family = AF_INET;
ser.sin_port = htons(50000);
ser.sin_addr.s_addr =inet_addr("127.0.0.1");
int ret = connect(conn,(SA)&ser,sizeof(ser));
if(-1 == ret)
{
perror("connect");
exit(1);
}
int i =5;
struct timeval tv;
tv.tv_sec = 3;
tv.tv_usec = 0 ;
setsockopt(conn,SOL_SOCKET,SO_RCVTIMEO,&tv,sizeof(tv)); //recv == -1 timeout
while(1)
{
char buf[512]="hello,this tcp test";
send(conn,buf,strlen(buf),0);
bzero(buf,sizeof(buf));
int ret = recv(conn,buf,sizeof(buf),0);
if(ret==0)
{
printf("ser close\n");
break;
}
if(ret<=0)
{
printf("time out,contineu\n");
}
printf("ser:%s\n",buf);
sleep(1);
}
close(conn);
return 0;
}
广播与组播
广播
recv
#include <netinet/in.h>
#include <netinet/ip.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/types.h> /* See NOTES */
#include <unistd.h>
#include <arpa/inet.h>
typedef struct sockaddr* (SA);
int main(int argc, char **argv) {
int sockfd = socket(AF_INET,SOCK_DGRAM,0);
if(-1 == sockfd)
{
perror("socket");
exit(1);
}
struct sockaddr_in ser, cli;
ser.sin_family = AF_INET;
ser.sin_port = htons(9999);
ser.sin_addr.s_addr =0;
int ret = bind(sockfd, (SA)&ser, sizeof(ser));
if (-1 == ret) {
perror("bind");
exit(1);
}
socklen_t len = sizeof(cli);
while(1)
{
char buf[512]={0};
recvfrom(sockfd,buf,sizeof(buf),0,(SA)&cli,&len);
printf("%s\n",buf);
}
return 0;
}
send
#include <netinet/in.h>
#include <netinet/ip.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/types.h> /* See NOTES */
#include <unistd.h>
#include <arpa/inet.h>
#include <string.h>
typedef struct sockaddr* (SA);
int main(int argc, char **argv)
{
int sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (-1 == sockfd) {
perror("socket");
exit(1);
}
//man 7 socket 208
socklen_t on =1;
setsockopt(sockfd,SOL_SOCKET,SO_BROADCAST,&on,sizeof(on));
struct sockaddr_in all;
all.sin_family = AF_INET;
all.sin_port = htons(9999);
all.sin_addr.s_addr = inet_addr("192.168.0.255");
while(1)
{
char buf[]="this is udp boardcast test...";
sendto(sockfd,buf,strlen(buf),0,(SA)&all,sizeof(all));
sleep(1);
}
return 0;
}
组播
recv
#include <arpa/inet.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h> /* See NOTES */
#include <unistd.h>
typedef struct sockaddr *(SA);
#define MUTIL_ADDR "235.1.2.3"
int main(int argc, char **argv) {
int sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (-1 == sockfd) {
perror("socket");
exit(1);
}
//本机 地址
struct sockaddr_in local, sendaddr;
local.sin_family = AF_INET;
local.sin_port = htons(9999);
// ip is 0
local.sin_addr.s_addr = INADDR_ANY;
int ret = bind(sockfd,(SA)&local,sizeof(local));
if(-1 ==ret)
{
perror("bind");
exit(1);
}
struct ip_mreqn multiaddr;
multiaddr.imr_multiaddr.s_addr = inet_addr(MUTIL_ADDR);
multiaddr.imr_address.s_addr = INADDR_ANY;
// 0 to indicate any interface.
multiaddr.imr_ifindex = 0;
//把自己的地址加入组播地址
//man 7 ip 89
setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &multiaddr,sizeof(multiaddr));
char sendaddrbuf[64] = {0};
socklen_t len = sizeof(sendaddr);
while (1) {
char buf[512] = {0};
recvfrom(sockfd, buf, sizeof(buf), 0, (SA)&sendaddr, &len);
printf("%s:%d %s\n",
inet_ntop(AF_INET, &sendaddr.sin_addr, sendaddrbuf,
sizeof(sendaddrbuf)),
ntohs(sendaddr.sin_port), buf);
sprintf(buf, "%s %s", buf, "aaa");
// send mutil cast
sendto(sockfd, buf, strlen(buf), 0, (SA)&sendaddr, sizeof(sendaddr));
sleep(1);
}
return 0;
}
send
#include <netinet/in.h>
#include <netinet/ip.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/types.h> /* See NOTES */
#include <unistd.h>
#include <arpa/inet.h>
#include <string.h>
typedef struct sockaddr* (SA);
#define MUTIL_ADDR "235.1.2.3"
int main(int argc, char **argv)
{
int sockfd = socket(AF_INET,SOCK_DGRAM,0);
if(-1 == sockfd)
{
perror("socket");
exit(1);
}
struct sockaddr_in cli,sendaddr;
cli.sin_family = AF_INET;
cli.sin_port = htons(9999);
cli.sin_addr.s_addr =inet_addr(MUTIL_ADDR);
char sendaddrbuf[64]={0};
socklen_t len = sizeof(cli);
while(1)
{
char buf[512]="this is udp mulitcast test";
//send mutil cast
sendto(sockfd,buf,strlen(buf),0,(SA)&cli,sizeof(cli));
bzero(buf,sizeof(buf));
recvfrom(sockfd,buf,sizeof(buf),0,(SA)&sendaddr,&len);
printf("%s:%d %s\n",inet_ntop(AF_INET,&sendaddr.sin_addr,sendaddrbuf,sizeof(sendaddrbuf)),ntohs(sendaddr.sin_port),buf);
}
return 0;
}