Python并发编程 07 事件驱动模型、进程切换、进程阻塞、文件描述符、缓存I/O、selectors模块
文章目录
- 一、事件驱动模型
- 二、进程切换
- 三、进程阻塞
- 四、文件描述符
- 五、缓存I/O
- 1、缓存I/O概述
- 2、IO模型
- (1)阻塞(blocking) IO
- (2)非阻塞(nonblocking) IO
- (3)IO多路复用(I/O multiplexing)
- (4)异步(Asynchronous) IO
- 六、selectors模块
- 七、小实验:实现客户端并发文件上传与下载
一、事件驱动模型
事件包括:敲击键盘上某个按键、点击鼠标、系统内部定时器触发等。
事件驱动模型大致流程:
开始 → 初始化 → 等待 → 事件触发 → 执行该事件对应的操作。
下面以鼠标点击事件为例:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<p onclick="fun()">点我呀</p>
<script type="text/javascript">
function fun() {
alert('OK!')
}
</script>
</body>
</html>
当点击“点我呀”,会弹出弹框OK!。那么怎么获取鼠标点击呢?
如果创建一个线程,写个死循环来监测,CPU资源浪费。因为大部分时间不需要任何操作,但是CPU却一直在运行该死循环。如果阻塞该程序,则不会执行程序的其他部分。所以用事件驱动模型来解决该问题。
事件驱动模型大体思路:
①有一个事件(消息)队列
②鼠标按下时,往这个队列中增加一个点击事件
③有个循环,不断从队列取出事件,根据不同的事件,调用不同的函数。
④事件一般都各自保存各自的处理函数指针,这样每个消息都有独立的处理函数。
事件驱动编程时一种编程范式,这里程序的执行流由外部事件来决定。它的特点是包含一个事件循环,当外部事件发生时,使用回调机制来触发相应的处理。
二、进程切换
操作系统内核将一个正在CPU上运行的应用程序挂起,并恢复以前被挂起的某个应用程序,让其在CPU上执行。
步骤如下:
①保存处理机上下文,包括程序计数器和其他寄存器。
②更新PCB信息。
③把进程的PCB移入相应的队列,如就绪、在某事件阻塞等队列。
④选择另一个进程执行,并更新其PCB。
⑤更新内存管理的数据结构
⑥恢复处理机上下文,
总之,进程切换非常耗资源。
三、进程阻塞
正在执行的进程,由于期待的某个事件未发生,如请求资源失败,由系统自动执行阻塞原语,使自己由运行态变为阻塞态。进程的阻塞是进程的一种主动行为。当进程进入阻塞态,是不占用CPU资源的。
四、文件描述符
socket实际上就是一个文件描述符
文件描述符是一个用于表述指向文件的引用的抽象化概念。它在形式上是一个非负整数,实际上它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。
当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序填写往往会围绕着文件描述符展开。但文件描述符这一概念往往只适用于Linux这样的操作系统。
五、缓存I/O
1、缓存I/O概述
缓存I/O又被称作标准I/O。在Linux的缓存I/O机制中,数据会先被拷贝到操作系统内核缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。
2、IO模型
(1)阻塞(blocking) IO
阻塞IO整个过程只发了一次系统调用,属于同步的
服务端:
import socket
sk = socket.socket()
sk.bind(("127.0.0.1", 8000))
sk.listen(5)
while True:
conn, addr = sk.accept()
print(conn)
print(type(conn))
conn.send("hello client".encode("utf-8"))
data = conn.recv(1024)
print(data.decode("utf-8"))
客户端:
import socket
sk = socket.socket()
sk.connect(("127.0.0.1", 8000))
data = sk.recv(1024)
print(data.decode('utf-8'))
sk.send(b'hello server')
当客户端程序未启用时,服务端进程会一直阻塞在sk.accept()这里。
(2)非阻塞(nonblocking) IO
缺点是发了多次系统调用,还有就是对事件触发的响应不及时,因为是每隔一段时间,再去检查事件是否触发。而在这个时间间隔之中事件触发,也要等待检查时间到了,再去执行事件触发后的操作。
服务端:
import time
import socket
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sk.bind(("127.0.0.1",8000))
sk.listen(5)
sk.setblocking(False)
# 默认是阻塞IO,False设置为非阻塞IO
print("waiting client connection ......")
while True:
try:
conn,addr = sk.accept() # 没有链接传入,这里不会再阻塞,而是引发异常
print("+++", addr)
client_message = conn.recv(1024) # 如果没收到数据,也会触发异常
print(str(client_message,"utf-8"))
conn.close()
except Exception as e:
print(e)
time.sleep(4) # 服务端可以在这4s中,干其他事,而不是阻塞等待链接。
客户端:
import time
import socket
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
while True:
sk.connect(('127.0.0.1',8000))
sk.sendall(bytes("hello","utf8"))
time.sleep(2)
break
(3)IO多路复用(I/O multiplexing)
select发送系统调用要的数据是:是否有人来连接,如果没有则一直监听。
水平触发:事件发生就一直在触发状态,发送信号。例如在高电平就一直处于触发状态。
边缘触发:只有在发生变化时,才会触发,发送信号。例如从低电平变化到高电平这一瞬间,才会触发。
select方法就是水平触发,只要它所监听的事件发生,就会申请系统调用,而内核态的数据没有被用户态接收,它会持续发出系统调用信号。
IO多路复用相比较阻塞IO,最大的优点是可以同时监听多个链接。
示例一
服务端:
import socket
import select
sk = socket.socket()
sk.bind(("127.0.0.1",8000))
sk.listen(5)
inp = [sk,]
while True:
r,w,e = select.select(inp,[],[],5)
# select.select(输入列表,输出列表,错误列表,监听时间)
# 监听时间到了,会执行后续代码。监听时间如果不写,则会一直监听,进程阻塞在这里
# 有链接,则sk就会发生变化。发生变化则r=[sk,]
for i in r:
# 下面三行去掉,会一直打印hello,这是因为select是水平触发
conn,addr = i.accept() # 把客户端传来的套接字对象从内核空间复制到用户空间,即conn
print(conn)
inp.append(conn)
print("hello")
print(">>>>")
客户端:
import socket
sk=socket.socket()
sk.connect(("127.0.0.1",8000))
while 1:
pass
示例二
服务端:
import socket
import select
sk=socket.socket()
sk.bind(("127.0.0.1",8801))
sk.listen(5)
inputs=[sk,]
while True:
r,w,e=select.select(inputs,[],[],5)
for obj in r:#[sk,]
if obj==sk:
conn,add=obj.accept()
print(conn)
inputs.append(conn)
else:
data_byte=obj.recv(1024)
print(str(data_byte,'utf8'))
inp=input('回答%s号客户>>>'%inputs.index(obj))
obj.sendall(bytes(inp,'utf8'))
print('>>',r)
客户端1:
import socket
sk = socket.socket()
sk.connect(('127.0.0.1', 8801))
while True:
inp = input(">>>>")
sk.sendall(bytes(inp, "utf8"))
data = sk.recv(1024)
print(str(data, 'utf8'))
客户端2代码同客户端1
示例二实现了并发。
IO多路复用常用方法有:
select、poll、epool,性能按顺序递增。另外select监听有最大链接数限制,而其他两个没有。select监听多个对象时,会去轮询遍历所有监听对象。epoll监听时,是哪个事件触发了,自动告知epoll,这样效率大幅提高了。
另外select在所有系统都有,windows系统没有poll和epoll,在Linux上,三个方法都有。
(4)异步(Asynchronous) IO
异步特点:全程无阻塞。
异步IO对用户的角度来看,是四个模型中效率最好的。但是内核实现起来,比较复杂,所以不太常用。
只有在一丁点阻塞都没有的情况,才叫异步。所以只有异步IO符合,而前三种方式都属于同步IO
六、selectors模块
selectors模块是实现IO多路复用的一个模块。
服务端:
import selectors
import socket
sel = selectors.DefaultSelector()
# 根据运行程序所在系统,选出一个IO多路复用的最好的方法,例如windows只能选择select
def accept(sock, mask):
conn, addr = sock.accept()
print('accepted', conn, 'from', addr)
conn.setblocking(False)
sel.register(conn, selectors.EVENT_READ, read)
def read(conn, mask):
data = conn.recv(1000) # Should be ready
if not data: # 客户端conn断开时,data值为空
print('closing', conn)
sel.unregister(conn) # 监听列表中移除conn
conn.close()
else:
print('echoing', repr(data), 'to', conn)
conn.send(data)
sock = socket.socket()
sock.bind(('localhost', 8090))
sock.listen(100)
sock.setblocking(False)
# setblocking(False) 是将套接字设置为 非阻塞模式,这样调用 recv()、send() 和 accept() 等方法时,不阻塞
sel.register(sock, selectors.EVENT_READ, accept) # 将sock与accept绑定,将sock注册到监听列表中
# selectors.EVENT_READ 是 selectors 模块中定义的常量,表示 “可读事件”(即套接字可以读取数据的事件)。
# selectors.EVENT_READ的作用是:在 select() 方法监听的事件中,如果当 sock 可读时,accept 回调函数会被触发。
print("server.....")
while True:
events = sel.select() # 监听套接字列表,没有触发事件会阻塞
for key, mask in events:
callback = key.data # key.data为 register() 中绑定的函数
callback(key.fileobj, mask) # key.fileobj为被监听的套接字。mask为事件掩码,本代码未用到
客户端(可以多个客户端连接):
import socket
sk=socket.socket()
sk.connect(("127.0.0.1",8090))
while 1:
inp=input(">>>")
sk.send(inp.encode("utf8"))
data=sk.recv(1024)
print(data.decode("utf8"))
七、小实验:实现客户端并发文件上传与下载
服务端:
import os,time
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
import socket,selectors
class selectFtpServer:
def __init__(self):
self.dic = {}
self.sel = selectors.DefaultSelector()
self.create_socket()
self.handle()
def create_socket(self):
server = socket.socket()
server.bind(('127.0.0.1', 8000))
server.listen(5)
server.setblocking(False)
self.sel.register(server, selectors.EVENT_READ, self.accept)
print('服务端已开启,等待用户连接...')
def handle(self):
while True:
events = self.sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj, mask)
def accept(self, sock, mask):
conn, addr = sock.accept()
print("from %s %s connected" %addr)
conn.setblocking(False)
self.sel.register(conn, selectors.EVENT_READ, self.read)
self.dic[conn] = {}
def read(self, conn, mask):
self.data = conn.recv(1024)
if not self.data: # conn 断开的情况
print("closing", conn)
self.dic.pop(conn)
self.sel.unregister(conn)
conn.close()
else:
if not self.dic[conn]:
cmd, filename, filesize = str(self.data, encoding="utf-8").split("|")
self.dic[conn] = {"cmd":cmd, "filename":filename, "filesize":int(filesize), "hasReceived":0}
if cmd == "put":
conn.send(bytes("OK", encoding="utf-8"))
if self.dic[conn]["cmd"] == "get":
file = os.path.join(BASE_DIR, "download", filename)
if os.path.exists(file):
fileSize = os.path.getsize(file)
self.dic[conn]["filesize"] = fileSize
send_info = "%s|%s" %("YES", fileSize)
conn.send(bytes(send_info, encoding="utf-8"))
else:
send_info = "%s|%s" %("NO", 0)
conn.send(bytes(send_info, encoding="utf-8"))
else:
cmd = self.dic[conn].get("cmd")
if hasattr(self, cmd):
func = getattr(self, cmd)
func(conn)
else:
print("error cmd!")
conn.close()
def put(self, conn):
fileName = self.dic[conn]["filename"]
fileSize = self.dic[conn]["filesize"]
path = os.path.join(BASE_DIR, "upload", fileName)
recv_data = self.data
self.dic[conn]["hasReceived"] += len(recv_data)
with open(path, "ab") as f:
f.write(recv_data)
if fileSize == self.dic[conn]["hasReceived"]:
self.dic[conn] = {}
print(f"{fileName}上传完毕!")
def get(self, conn):
fileName = self.dic[conn]["filename"]
fileSize = self.dic[conn]["filesize"]
path = os.path.join(BASE_DIR, "download", fileName)
if fileSize > self.dic[conn]["hasReceived"]:
with open(path, "rb") as f:
f.seek(self.dic[conn]["hasReceived"])
data = f.read(1024)
send_size = len(data)
conn.send(data)
self.dic[conn]["hasReceived"] += send_size
if fileSize == self.dic[conn]["hasReceived"]:
self.dic[conn] = {}
print(f"{fileName}下载完毕!")
if __name__ == '__main__':
selectFtpServer()
客户端(多个客户端代码相同):
import socket
import os,sys
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
class selectFtpCLient:
def __init__(self):
self.args = sys.argv
if len(self.args) > 1:
self.port = (self.args[1], int(self.args[2]))
else:
self.port = ("127.0.0.1", 8000)
self.create_socket()
self.command_fanout()
def create_socket(self):
self.sk = socket.socket()
self.sk.connect(self.port)
print("连接FTP服务器成功")
def command_fanout(self):
while True:
cmd = input(">>>").strip() # put a.avi
if cmd == 'exit':
break
cmd,file = cmd.split()
if hasattr(self, cmd):
func = getattr(self, cmd)
func(cmd, file) # put("put", "a.avi")
else:
print("调用错误!")
def put(self, cmd, file):
if os.path.isfile(file):
fileName = os.path.basename(file)
fileSize = os.path.getsize(file)
fileInfo = "%s|%s|%s" %(cmd, fileName, fileSize)
self.sk.send(bytes(fileInfo, encoding='utf-8'))
recvStatus = self.sk.recv(1024)
print('recvStatus', recvStatus.decode("utf-8"))
hasSend = 0
if str(recvStatus, encoding='utf-8') == "OK":
with open(file, "rb") as f:
while fileSize > hasSend:
contant = f.read(1024)
recv_size = len(contant)
self.sk.send(contant)
hasSend += recv_size
s = str(int(hasSend/fileSize*100)) + "%"
print(f"正在上传文件:{fileName},已经上传:{s}")
print("%s文件上传完毕" %(fileName,))
else:
print("文件不存在")
def get(self, cmd, file):
hasRecv = 0
info = "%s|%s|%s" %(cmd, file, "0")
self.sk.send(bytes(info, encoding="utf-8"))
fileInfo = self.sk.recv(1024)
print(fileInfo.decode("utf-8"))
sendStatus, fileSize = fileInfo.decode("utf-8").split("|")
fileSize = int(fileSize)
if sendStatus == "YES":
while True:
self.sk.send(bytes(info, encoding="utf-8"))
if fileSize > hasRecv:
data = self.sk.recv(1024)
hasRecv += len(data)
with open(file, "ab") as f:
f.write(data)
s = str(int(hasRecv/fileSize*100)) + "%"
print(f"正在下载文件:{file},已经下载:{s}","大小:", hasRecv,"total:",fileSize)
if fileSize == hasRecv:
print(f"{file}已经下载完爆!")
break
if __name__ == '__main__':
selectFtpCLient()