当前位置: 首页 > article >正文

Python并发编程 07 事件驱动模型、进程切换、进程阻塞、文件描述符、缓存I/O、selectors模块

文章目录

  • 一、事件驱动模型
  • 二、进程切换
  • 三、进程阻塞
  • 四、文件描述符
  • 五、缓存I/O
    • 1、缓存I/O概述
    • 2、IO模型
      • (1)阻塞(blocking) IO
      • (2)非阻塞(nonblocking) IO
      • (3)IO多路复用(I/O multiplexing)
      • (4)异步(Asynchronous) IO
  • 六、selectors模块
  • 七、小实验:实现客户端并发文件上传与下载

一、事件驱动模型

事件包括:敲击键盘上某个按键、点击鼠标、系统内部定时器触发等。
事件驱动模型大致流程:
开始 → 初始化 → 等待 → 事件触发 → 执行该事件对应的操作。
下面以鼠标点击事件为例:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<p onclick="fun()">点我呀</p>

<script type="text/javascript">
    function fun() {
          alert('OK!')
    }
</script>

</body>
</html>

当点击“点我呀”,会弹出弹框OK!。那么怎么获取鼠标点击呢?
如果创建一个线程,写个死循环来监测,CPU资源浪费。因为大部分时间不需要任何操作,但是CPU却一直在运行该死循环。如果阻塞该程序,则不会执行程序的其他部分。所以用事件驱动模型来解决该问题。

事件驱动模型大体思路:
①有一个事件(消息)队列
②鼠标按下时,往这个队列中增加一个点击事件
③有个循环,不断从队列取出事件,根据不同的事件,调用不同的函数。
④事件一般都各自保存各自的处理函数指针,这样每个消息都有独立的处理函数。

事件驱动编程时一种编程范式,这里程序的执行流由外部事件来决定。它的特点是包含一个事件循环,当外部事件发生时,使用回调机制来触发相应的处理。

二、进程切换

操作系统内核将一个正在CPU上运行的应用程序挂起,并恢复以前被挂起的某个应用程序,让其在CPU上执行。
步骤如下:
①保存处理机上下文,包括程序计数器和其他寄存器。
②更新PCB信息。
③把进程的PCB移入相应的队列,如就绪、在某事件阻塞等队列。
④选择另一个进程执行,并更新其PCB。
⑤更新内存管理的数据结构
⑥恢复处理机上下文,
总之,进程切换非常耗资源。

三、进程阻塞

正在执行的进程,由于期待的某个事件未发生,如请求资源失败,由系统自动执行阻塞原语,使自己由运行态变为阻塞态。进程的阻塞是进程的一种主动行为。当进程进入阻塞态,是不占用CPU资源的。

四、文件描述符

socket实际上就是一个文件描述符

文件描述符是一个用于表述指向文件的引用的抽象化概念。它在形式上是一个非负整数,实际上它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。

当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序填写往往会围绕着文件描述符展开。但文件描述符这一概念往往只适用于Linux这样的操作系统。

五、缓存I/O

1、缓存I/O概述

缓存I/O又被称作标准I/O。在Linux的缓存I/O机制中,数据会先被拷贝到操作系统内核缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。

2、IO模型

(1)阻塞(blocking) IO

在这里插入图片描述
阻塞IO整个过程只发了一次系统调用,属于同步的


服务端:

import socket

sk = socket.socket()

sk.bind(("127.0.0.1", 8000))

sk.listen(5)

while True:
    conn, addr = sk.accept()
    print(conn)
    print(type(conn))


    conn.send("hello client".encode("utf-8"))
    data = conn.recv(1024)
    print(data.decode("utf-8"))

客户端:

import socket

sk = socket.socket()

sk.connect(("127.0.0.1", 8000))

data = sk.recv(1024)
print(data.decode('utf-8'))
sk.send(b'hello server')

当客户端程序未启用时,服务端进程会一直阻塞在sk.accept()这里。


(2)非阻塞(nonblocking) IO

在这里插入图片描述
缺点是发了多次系统调用,还有就是对事件触发的响应不及时,因为是每隔一段时间,再去检查事件是否触发。而在这个时间间隔之中事件触发,也要等待检查时间到了,再去执行事件触发后的操作。


服务端:

import time
import socket

sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sk.bind(("127.0.0.1",8000))
sk.listen(5)

sk.setblocking(False)
# 默认是阻塞IO,False设置为非阻塞IO
print("waiting client connection ......")

while True:
    try:
        conn,addr = sk.accept() # 没有链接传入,这里不会再阻塞,而是引发异常
        print("+++", addr)
        client_message = conn.recv(1024) # 如果没收到数据,也会触发异常
        print(str(client_message,"utf-8"))
        conn.close()
    except Exception as e:
        print(e)
        time.sleep(4) # 服务端可以在这4s中,干其他事,而不是阻塞等待链接。

客户端:

import time
import socket
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)

while True:
    sk.connect(('127.0.0.1',8000))

    sk.sendall(bytes("hello","utf8"))
    time.sleep(2)
    break

(3)IO多路复用(I/O multiplexing)

在这里插入图片描述
select发送系统调用要的数据是:是否有人来连接,如果没有则一直监听。

水平触发:事件发生就一直在触发状态,发送信号。例如在高电平就一直处于触发状态。
边缘触发:只有在发生变化时,才会触发,发送信号。例如从低电平变化到高电平这一瞬间,才会触发。
select方法就是水平触发,只要它所监听的事件发生,就会申请系统调用,而内核态的数据没有被用户态接收,它会持续发出系统调用信号。

IO多路复用相比较阻塞IO,最大的优点是可以同时监听多个链接。

示例一
服务端:

import socket
import select

sk = socket.socket()
sk.bind(("127.0.0.1",8000))
sk.listen(5)
inp = [sk,]

while True:
    r,w,e = select.select(inp,[],[],5)
    # select.select(输入列表,输出列表,错误列表,监听时间)
    # 监听时间到了,会执行后续代码。监听时间如果不写,则会一直监听,进程阻塞在这里
    # 有链接,则sk就会发生变化。发生变化则r=[sk,]

    for i in r:
        # 下面三行去掉,会一直打印hello,这是因为select是水平触发
        conn,addr = i.accept()  # 把客户端传来的套接字对象从内核空间复制到用户空间,即conn
        print(conn)
        inp.append(conn)

        print("hello")

    print(">>>>")

客户端:

import socket

sk=socket.socket()

sk.connect(("127.0.0.1",8000))

while 1:
    pass

示例二
服务端:

import socket
import select
sk=socket.socket()
sk.bind(("127.0.0.1",8801))
sk.listen(5)
inputs=[sk,]
while True:
    r,w,e=select.select(inputs,[],[],5)

    for obj in r:#[sk,]
        if obj==sk:
            conn,add=obj.accept()
            print(conn)
            inputs.append(conn)
        else:
            data_byte=obj.recv(1024)
            print(str(data_byte,'utf8'))
            inp=input('回答%s号客户>>>'%inputs.index(obj))
            obj.sendall(bytes(inp,'utf8'))

    print('>>',r)

客户端1:

import socket

sk = socket.socket()
sk.connect(('127.0.0.1', 8801))

while True:
    inp = input(">>>>")
    sk.sendall(bytes(inp, "utf8"))
    data = sk.recv(1024)
    print(str(data, 'utf8'))

客户端2代码同客户端1

示例二实现了并发。

IO多路复用常用方法有:
select、poll、epool,性能按顺序递增。另外select监听有最大链接数限制,而其他两个没有。select监听多个对象时,会去轮询遍历所有监听对象。epoll监听时,是哪个事件触发了,自动告知epoll,这样效率大幅提高了。
另外select在所有系统都有,windows系统没有poll和epoll,在Linux上,三个方法都有。

(4)异步(Asynchronous) IO

在这里插入图片描述
异步特点:全程无阻塞。
异步IO对用户的角度来看,是四个模型中效率最好的。但是内核实现起来,比较复杂,所以不太常用。

只有在一丁点阻塞都没有的情况,才叫异步。所以只有异步IO符合,而前三种方式都属于同步IO

六、selectors模块

selectors模块是实现IO多路复用的一个模块。

服务端:

import selectors
import socket

sel = selectors.DefaultSelector()
# 根据运行程序所在系统,选出一个IO多路复用的最好的方法,例如windows只能选择select

def accept(sock, mask):
    conn, addr = sock.accept()
    print('accepted', conn, 'from', addr)
    conn.setblocking(False)
    sel.register(conn, selectors.EVENT_READ, read)

def read(conn, mask):
    data = conn.recv(1000)  # Should be ready
    if not data:    # 客户端conn断开时,data值为空
        print('closing', conn)
        sel.unregister(conn)    # 监听列表中移除conn
        conn.close()
    else:
        print('echoing', repr(data), 'to', conn)
        conn.send(data)

sock = socket.socket()
sock.bind(('localhost', 8090))
sock.listen(100)
sock.setblocking(False)
# setblocking(False) 是将套接字设置为 非阻塞模式,这样调用 recv()、send() 和 accept() 等方法时,不阻塞

sel.register(sock, selectors.EVENT_READ, accept) # 将sock与accept绑定,将sock注册到监听列表中
# selectors.EVENT_READ 是 selectors 模块中定义的常量,表示 “可读事件”(即套接字可以读取数据的事件)。
# selectors.EVENT_READ的作用是:在 select() 方法监听的事件中,如果当 sock 可读时,accept 回调函数会被触发。
print("server.....")

while True:
    events = sel.select()   # 监听套接字列表,没有触发事件会阻塞
    for key, mask in events:
        callback = key.data # key.data为 register() 中绑定的函数
        callback(key.fileobj, mask) # key.fileobj为被监听的套接字。mask为事件掩码,本代码未用到

客户端(可以多个客户端连接):

import socket

sk=socket.socket()

sk.connect(("127.0.0.1",8090))
while 1:
    inp=input(">>>")
    sk.send(inp.encode("utf8"))
    data=sk.recv(1024)
    print(data.decode("utf8"))

七、小实验:实现客户端并发文件上传与下载

服务端:

import os,time

BASE_DIR = os.path.dirname(os.path.abspath(__file__))

import socket,selectors

class selectFtpServer:
    def __init__(self):
        self.dic = {}
        self.sel = selectors.DefaultSelector()
        self.create_socket()
        self.handle()

    def create_socket(self):
        server = socket.socket()
        server.bind(('127.0.0.1', 8000))
        server.listen(5)
        server.setblocking(False)
        self.sel.register(server, selectors.EVENT_READ, self.accept)
        print('服务端已开启,等待用户连接...')

    def handle(self):
        while True:
            events = self.sel.select()
            for key, mask in events:
                callback = key.data
                callback(key.fileobj, mask)


    def accept(self, sock, mask):
        conn, addr = sock.accept()
        print("from %s %s connected" %addr)
        conn.setblocking(False)
        self.sel.register(conn, selectors.EVENT_READ, self.read)

        self.dic[conn] = {}

    def read(self, conn, mask):
        self.data = conn.recv(1024)
        if not self.data:    # conn 断开的情况
            print("closing", conn)
            self.dic.pop(conn)
            self.sel.unregister(conn)
            conn.close()
        else:
            if not self.dic[conn]:
                cmd, filename, filesize = str(self.data, encoding="utf-8").split("|")
                self.dic[conn] = {"cmd":cmd, "filename":filename, "filesize":int(filesize), "hasReceived":0}

                if cmd == "put":
                    conn.send(bytes("OK", encoding="utf-8"))

                if self.dic[conn]["cmd"] == "get":
                    file = os.path.join(BASE_DIR, "download", filename)

                    if os.path.exists(file):
                        fileSize = os.path.getsize(file)
                        self.dic[conn]["filesize"] = fileSize
                        send_info = "%s|%s" %("YES", fileSize)
                        conn.send(bytes(send_info, encoding="utf-8"))
                    else:
                        send_info = "%s|%s" %("NO", 0)
                        conn.send(bytes(send_info, encoding="utf-8"))
            else:
                cmd = self.dic[conn].get("cmd")
                if hasattr(self, cmd):
                    func = getattr(self, cmd)
                    func(conn)
                else:
                    print("error cmd!")
                    conn.close()

    def put(self, conn):
        fileName = self.dic[conn]["filename"]
        fileSize = self.dic[conn]["filesize"]
        path = os.path.join(BASE_DIR, "upload", fileName)
        recv_data = self.data
        self.dic[conn]["hasReceived"] += len(recv_data)

        with open(path, "ab") as f:
            f.write(recv_data)
        if fileSize == self.dic[conn]["hasReceived"]:
            self.dic[conn] = {}
            print(f"{fileName}上传完毕!")

    def get(self, conn):
        fileName = self.dic[conn]["filename"]
        fileSize = self.dic[conn]["filesize"]
        path = os.path.join(BASE_DIR, "download", fileName)

        if fileSize > self.dic[conn]["hasReceived"]:
            with open(path, "rb") as f:
                f.seek(self.dic[conn]["hasReceived"])
                data = f.read(1024)
            send_size = len(data)
            conn.send(data)
            self.dic[conn]["hasReceived"] += send_size

        if fileSize == self.dic[conn]["hasReceived"]:
            self.dic[conn] = {}
            print(f"{fileName}下载完毕!")

if __name__ == '__main__':
    selectFtpServer()

客户端(多个客户端代码相同):

import socket
import os,sys
BASE_DIR = os.path.dirname(os.path.abspath(__file__))

class selectFtpCLient:
    def __init__(self):
        self.args = sys.argv
        if len(self.args) > 1:
            self.port = (self.args[1], int(self.args[2]))
        else:
            self.port = ("127.0.0.1", 8000)
        self.create_socket()
        self.command_fanout()

    def create_socket(self):
        self.sk = socket.socket()
        self.sk.connect(self.port)
        print("连接FTP服务器成功")

    def command_fanout(self):
        while True:
            cmd = input(">>>").strip()  # put a.avi
            if cmd == 'exit':
                break
            cmd,file = cmd.split()
            if hasattr(self, cmd):
                func = getattr(self, cmd)
                func(cmd, file) # put("put", "a.avi")
            else:
                print("调用错误!")

    def put(self, cmd, file):
        if os.path.isfile(file):
            fileName = os.path.basename(file)
            fileSize = os.path.getsize(file)
            fileInfo = "%s|%s|%s" %(cmd, fileName, fileSize)
            self.sk.send(bytes(fileInfo, encoding='utf-8'))
            recvStatus = self.sk.recv(1024)
            print('recvStatus', recvStatus.decode("utf-8"))
            hasSend = 0
            if str(recvStatus, encoding='utf-8') == "OK":
                with open(file, "rb") as f:
                    while fileSize > hasSend:
                        contant = f.read(1024)
                        recv_size = len(contant)
                        self.sk.send(contant)
                        hasSend += recv_size
                        s = str(int(hasSend/fileSize*100)) + "%"
                        print(f"正在上传文件:{fileName},已经上传:{s}")
                    print("%s文件上传完毕" %(fileName,))
        else:
            print("文件不存在")

    def get(self, cmd, file):
        hasRecv = 0
        info = "%s|%s|%s" %(cmd, file, "0")
        self.sk.send(bytes(info, encoding="utf-8"))

        fileInfo = self.sk.recv(1024)
        print(fileInfo.decode("utf-8"))
        sendStatus, fileSize = fileInfo.decode("utf-8").split("|")
        fileSize = int(fileSize)

        if sendStatus == "YES":
            while True:
                self.sk.send(bytes(info, encoding="utf-8"))

                if fileSize > hasRecv:
                    data = self.sk.recv(1024)
                    hasRecv += len(data)
                    with open(file, "ab") as f:
                        f.write(data)
                    s = str(int(hasRecv/fileSize*100)) + "%"
                    print(f"正在下载文件:{file},已经下载:{s}","大小:", hasRecv,"total:",fileSize)
                if fileSize == hasRecv:
                    print(f"{file}已经下载完爆!")
                    break

if __name__ == '__main__':
    selectFtpCLient()

http://www.kler.cn/a/515546.html

相关文章:

  • 逐笔成交逐笔委托Level2高频数据下载和分析:20250122
  • 数据结构——实验一·线性表
  • 【Postgres_Python】使用python脚本批量创建和导入多个PG数据库
  • Maven的下载安装配置
  • idea新增java快捷键代码片段
  • 13.接口类和抽象类的区别
  • Vue3+Element Plus 实现 el-table 表格组件滚动是否触底监听判断
  • 父级perspective与子元素transform:perspective的区别
  • 在vue3中使用datav完整引入时卡在加载页面的解决方法
  • 【10.2】队列-设计循环队列
  • FFmpeg音视频采集
  • 数据结构——实验二·栈
  • 2025美赛倒计时,数学建模五类模型40+常用算法及算法手册汇总
  • 【2024年华为OD机试】 (E卷,100分) - 整数编码(JavaScriptJava PythonC/C++)
  • 4.C++中的循环语句
  • 【Mac】Python相关知识经验
  • 什么是网络爬虫?Python爬虫到底怎么学?
  • TDengine 与上海电气工业互联网平台完成兼容性认证
  • PySide6的简单介绍
  • elk 安装
  • 深度学习-91-大语言模型LLM之基于langchain的模型IO的提示模板
  • 【测开】利用界面化操作存储步骤数据,为 Selenium 自动化测试提效赋能(一)
  • ubuntu k8s 1.31
  • 学习ASP.NET Core的身份认证(基于JwtBearer的身份认证9)
  • WPF5-x名称空间
  • 数据结构基础之《(16)—链表题目》