网络通信与并发编程(八)I/O模型
I/O模型
文章目录
- I/O模型
- 一、基本概念回顾
- 二、阻塞I/O
- 三、非阻塞I/O
- 四、多路复用I/O
- 五、异步I/O
- 六、总结
一、基本概念回顾
首先我们需要回顾一下同步、异步、阻塞和非阻塞的概念。
同步指程序在执行某个任务时,会等待该任务完成,然后才能继续执行后续的代码。在同步操作中,程序的执行流程是线性的,一个任务完成后,才会执行下一个任务。
异步指程序在执行某个任务时,不会等待该任务完成,而是继续执行后续的代码。当该异步功能完成后,通过状态、通知或回调来通知调用者。
阻塞:当一个进程/线程执行一个需要等待的操作时,它会停止执行,直到该操作完成。这意味着进程/线程会被挂起,直到所需的操作完成。
非阻塞:当一个进程/线程执行一个需要等待的操作时,它会继续执行后续的代码,而不是停止等待。这意味着进程/线程可以同时进行其他工作,而不需要等待某个操作完成。
同步与异步针对的是函数/任务的调用而言的,阻塞与非阻塞针对的是进程/线程的。同步就是当一个进程发起一个函数/任务调用的时候,一直等到函数/任务完成,而进程继续处于激活状态;阻塞是当请求不能满足的时候就将进程挂起,也就是进程是非激活的。
二、阻塞I/O
阻塞型I/O是指系统调用不返回调用结果并让当前线程一直阻塞,只有当该系统调用获得结果或者超时出错时才返回。之前编写的socket程序基本都是阻塞I/O,阻塞I/O的基本流程如下图所示:
阻塞I/O分为wait和copy两个阶段。wait阶段用户发送read请求后kernel会等待数据传入,此时用户的进程会一直处于阻塞状态;当kernel收到数据以后进入copy阶段,kernel拷贝数据到用户内存,并返回结果解除进程的阻塞状态。
总结一下阻塞I/O的wait和copy两个阶段进程都处于阻塞状态,只有等kernel拷贝完数据以后阻塞状态才会解除。所以当使用阻塞I/O操作进行通信时,很可能会出现进程一直阻塞在原地无法响应其他请求的情况非常影响通信的效率。针对这种情况如若使用多线程,当网络的请求量较大时可以在一定程度上缓解阻塞I/O产生的效率问题,但是庞大的进程/线程量会造成操作系统资源的大量占用影响操作系统对外界的响应效率。如果改为使用线程池来解决上述问题,的确可以解决大量占用操作系统资源的问题,但是线程池的产生的进程/线程数量有限,当请求的量比较大时依然存在严重的效率问题。
为了从根本上解决上述的问题,我们需要使用非阻塞I/O的方法。
三、非阻塞I/O
非阻塞I/O的基本流程如下图所示:
非阻塞I/O中当用户发送read请求以后,如果kernel中还没有数据,kernel会直接返回error,用户接收到以后就知道kernel中还没有数据,从而用户进程不会被阻塞,可以继续执行后面的任务。当用户发送read请求并且kernel中以及由数据以后,kernel会拷贝数据给用户被返回相应的值,此阶段内用户线进程需要接收完kernel拷贝的数据后才能继续执行后面的任务,因此用户线程/进程是阻塞的。
总结一下非阻塞I/O在wait阶段是非阻塞的,在copy阶段是阻塞的。
用socket实现的非阻塞I/O通信如下:(服务端)
from socket import *
import time
server = socket(AF_INET, SOCK_STREAM)
server.bind(('127.0.0.1',8080))
server.listen(5)
#设置为非阻塞I/O
server.setblocking(False)
#用于存放建立的链接通道
conn_l=[]
while True:
try:
print('总连接数[%s]' % len(conn_l))
#非阻塞I/O下没有建立链接请求会报BlockingIOError
conn,addr=server.accept()
conn_l.append(conn)
except BlockingIOError:
#存放待删除的链接通道
del_l=[]
#使用for语句实现与所有的通信链接挨个通信
for conn in conn_l:
try:
data=conn.recv(1024)
if len(data) == 0:
del_l.append(conn)
continue
conn.send(data.upper())
#非阻塞I/O下没有收到客户端发送的信息会报错
except BlockingIOError:
pass
#客户端断开链接后需要清除列表中存放的链接通道
except ConnectionResetError:
del_l.append(conn)
for conn in del_l:
conn_l.remove(conn)
上述的非阻塞I/O通信虽然可以提高通信的效率,然后却存在循环向操作系统发送请求致使cpu占用率大大提升、当建立的通信链接较多时通信会存在明显延时两个问题。
因而非阻塞I/O并非是一直理想的通信方式,针对非阻塞I/O的问题需要使用多路复用I/O来解决。
四、多路复用I/O
复用指的是让一个进程可以服务于多个客户端。
多路复用I/O的基本流程如下图所示:
用户首先发送select请求,整个进程会被select阻塞住,此时kernel会监听所有select负责的socket,当由socket中收到数据了用户再发送read请求拷贝操作系统的数据。
总结多路复用I/O通信中wait阶段交给了select托管,进程再该阶段会被阻塞,而copy阶段进程也会被阻塞。
针对上面的分析,我们可以看出多路复用I/O也是两个阶段阻塞,它和阻塞I/O差不多,但是多路复用I/O的特定在于select可以同时处理多个链接,查看链接中是否收到数据。因此当通信中处理的链接量较少时,多路复用I/O的效率肯定是比不过阻塞I/O+多线程/线程池的,但是当通信中处理的链接数较多时多路复用I/O的优点就能被体现出来了。
注意点:
- 在编写多路复用I/O的代码时,socket端一般会设为非阻塞模式,多路复用I/O在wait阶段的阻塞是由select引发的而不是socket引发的。
from socket import *
import time
import select
server = socket(AF_INET, SOCK_STREAM)
server.bind(('127.0.0.1',8080))
server.listen(5)
server.setblocking(False)
#存放待发送的数据
data_dic={}
#read_list用于存放待接收数据的链接
read_list=[server,]
#write_list存放待发送数据的链接
write_list=[]
print('start....')
while True:
#将read_list,write_list中的链接托管给select,当由链接处于收到请求、可发送数据时会返回给r1和w1
#select.select(read_list,write_list,[],0.5) 最后一位参数表示延时时间,select向操作系统发送请求后等待的时间,超时后会直接执行后面的代码
rl,wl,_=select.select(read_list,write_list,[])
#遍历收到建立通信链接/信息的对象
for sk in rl:
if sk == server:
conn,addr=sk.accept()
read_list.append(conn)
else:
data=sk.recv(1024)
write_list.append(sk)
data_dic[sk]=data
#遍历准备发送数据的链接
for sk in wl:
sk.send(data_dic[sk].upper())
data_dic.pop(sk)
write_list.remove(sk)
上述代码的优点:模型只使用一个线程来同时处理多个客户端的通信问题,占用的资源少,消耗的cpu也少。
上述代码的缺点:当通信中存在的请求数非常大时,select挨个询问socket依然会花费不少时间。针对这个问题可以改用epoll代替select,可以有效的节省消耗的时间,然而epoll的跨平台性比较差,如果程序需要跨平台运行可能会存在一定的问题。
select、poll和epoll三种实现多路复用的方式在不同平台有不同的支持情况,我们可以使用selectors模块自动选择当前平台下最适合的多路复用方式。
from socket import *
import selectors
sel=selectors.DefaultSelector()
def accept(server_fileobj,mask):
conn,addr=server_fileobj.accept()
#selectors.EVENT_READ表示指定读操作,read为读操作的回调函数
#register函数会将conn传给selectors监听
sel.register(conn,selectors.EVENT_READ,read)
def read(conn,mask):
try:
data=conn.recv(1024)
if not data:
print('closing',conn)
#关闭传入的链接
sel.unregister(conn)
conn.close()
return
conn.send(data.upper()+b'_SB')
except Exception:
print('closing', conn)
sel.unregister(conn)
conn.close()
server_fileobj=socket(AF_INET,SOCK_STREAM)
server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server_fileobj.bind(('127.0.0.1',8088))
server_fileobj.listen(5)
#设置socket的接口为非阻塞
server_fileobj.setblocking(False)
#accept为读操作的回调函数
sel.register(server_fileobj,selectors.EVENT_READ,accept)
while True:
#检测所有监听的对象是否处于readable状态
events=sel.select()
for sel_obj,mask in events:
callback=sel_obj.data #callback=accpet/read
callback(sel_obj.fileobj,mask) #accpet(server_fileobj,1)/read(server_fileobj,1)
五、异步I/O
异步I/O的基本流程如下:
用户发送read请求后kernel会立即返回给用户,因此在wait阶段用户进程也不会阻塞。当kernel中收到数据后会将数据拷贝给用户并在完成以后发送signal告知用户数据已经拷贝完成。
六、总结
基于上面的介绍,我们可以看到阻塞I/O与非阻塞I/O的区别在于wait阶段进程是否被阻塞;而同步I/O与异步I/O的区别在于copy(真正的I/O操作阶段)阶段是否被阻塞。因此上面介绍的阻塞I/O、非阻塞I/O、多路复用I/O都属于同步I/O。