Python----Python高级(并发编程:协程Coroutines,事件循环,Task对象,协程间通信,协程同步,将协程分布到线程池/进程池中)
一、协程
1.1、协程
协程,Coroutines,也叫作纤程(Fiber)
协程,全称是“协同程序”,用来实现任务协作。是一种在线程中,比线程更加轻量级的存在,由程序员自己写程序来管理。
当出现IO阻塞时,CPU一直等待IO返回,处于空转状态。这时候用协程,可以执行其他任务。当IO返回结果后,再回来处理数据。充分利用了IO等待的时间,提高了效率。
进程与线程是操作系统管理和调度的基本单位,而协程则是由程序员 实现的一种轻量级的、用户空间级别的多任务机制,通常不由操作系统直接提供支持。
1.2、协程的核心(控制流的让出和恢复)
1.每个协程有自己的执行栈,可以保存自己的执行现场
2.可以由用户程序按需创建协程(比如:遇到io操作)
3.协程“主动让出(yield)”执行权时候,会保存执行现场(保存中断时的寄存器上下文和栈),然后切换到其他协程
4.协程恢复执行(resume)时,根据之前保存的执行现场恢复到中断前的状态,继续执行,这样就通过协程实现了轻量的由用户态调度的多任务模型
1.3、协程的特点
1. 占用资源少:协程通常只需要少量的栈空间,这是因为它们采用协作式的多任务 处理机制,可以在固定的栈空间内通过状态保存和恢复来实现任务的切换,相比 多线程和多进程,协程占用的系统资源更少。
2. 切换开销小:协程的切换是在用户态进行的,不需要进行系统调用,也不涉及内 核态的上下文切换,因此其切换开销非常小,远远低于线程间的上下文切换。
3. 可暂停和可恢复的函数:协程允许函数在执行过程中主动暂停(通常是遇到I/O操 作或其他耗时操作时),并将控制权交还给调度器,以便其他协程可以运行。在 I/O操作或其他耗时操作完成后,该协程可以从暂停的地方继续执行,而不会阻塞 整个线程。这种特性使得协程非常适合于处理I/O密集型任务,可以在等待I/O操 作完成时释放CPU,从而提高程序的并发性能和资源利用率。
1.4、协程的优点
1.由于自身带有上下文和栈,无需线程上下文切换的开销,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级;
2.无需原子操作的锁定及同步的开销;
3.方便切换控制流,简化编程模型
4.单线程内就可以实现并发的效果,最大限度地利用cpu,且可扩展性高,成本低(注:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理)
asyncio协程是写爬虫比较好的方式。比多线程和多进程都好. 开辟新的线程和进程是非常耗时的。
1.5、协程的缺点
1.无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上。
1.6、与进程和线程的比较
多进程是在操作系统层面实现的并行执行方式,每个进程拥有独立的内存空间和 系统资源,进程间通过进程间通信(IPC)机制(如管道、消息队列、共享内存 等)进行交互,这增加了通信的复杂性。多进程可以充分利用多核处理器的性 能,实现真正的并行计算。由于进程间的隔离性,系统的安全性和稳定性也得到 了提高。然而,进程间通信和同步的开销相对较高,且每个进程的创建和销毁通 常伴随着较大的资源开销。
多线程是在一个进程内部实现的并发执行方式,多个线程共享该进程的内存空间 和资源,这使得线程间通信和数据共享相对容易。但是,这也引入了线程安全问题,需要通过同步机制(如互斥锁、信号量、条件变量等)来避免数据冲突。多 线程的优点在于能够实现并发执行,但线程间的上下文切换开销相比进程较小, 但相比协程则较大,且需要谨慎处理线程安全问题。
协程是一种轻量级的线程,与多进程和多线程相比,它具有占用资源少、切换开 销小、可以实现高效异步执行等优点。协程通过非阻塞I/O操作来等待数据,当数 据就绪时自动恢复执行,从而提高了程序的执行效率和响应速度。然而,协程也 有其局限性,它只能在单个线程内执行,因此它对于CPU密集型任务来说并没有 什么好处。
二、实现协程的方法
在python中,实现协程的方法有以下几种:
1. 使用async/await关键字:python3.5及以后出现,到目前为止这是目前最主流的 实现协程的方法。
2. 使用yield关键字:使用yield关键字及其send方法可以实现协程的效果。
3. 使用asyncio.coroutine:在python3.4发布之后可以使用该装饰器与yield from 配合来实现协程,不过在python3.8弃用。
4. 使用第三方库:通过其他的第三方库也可以实现协程,如greenlet。
def consumer():
print("消费者准备接收数据。")
while True:
# 接收生产者发送的数据
data = (yield)
print("消费者接收到了数据:", data)
def producer(consumer_generator):
# 启动生成器,使它准备好接收数据
next(consumer_generator)
for i in range(5):
print("生产者发送数据:", i)
# 发送数据给消费者
consumer_generator.send(i)
# 终止生成器
consumer_generator.close()
if __name__ == '__main__':
# 创建消费者生成器
consumer_coroutine = consumer()
# 创建生产者
producer(consumer_coroutine)
'''
消费者准备接收数据。
生产者发送数据: 0
消费者接收到了数据: 0
生产者发送数据: 1
消费者接收到了数据: 1
生产者发送数据: 2
消费者接收到了数据: 2
生产者发送数据: 3
消费者接收到了数据: 3
生产者发送数据: 4
消费者接收到了数据: 4
'''
2.1、async
async关键字是Python异步编程的核心组成部分,用于定义协程函数。协程函数与 普通函数不同,它们在调用时不会执行函数里面的代码,而是会返回一个协程对象。
# 定义一个协程函数
async def func():
print('123')
# 直接调用协程函数会发出警告,并且函数内部的功能也不会执行
func()
想要运行函数体里面的代码,需要进行两个方面的准备:
1. 获取事件循环。
2. 将协程对象封装为Task对象并提交到事件循环中。
2.2、await
在Python中, await关键字用于挂起(暂停)异步函数的执行,直到被等待的协程 (coroutine)完成。这是异步编程中的一个关键概念,因为它允许程序在等待结果 的同时执行其他任务。
2.2.1、await的基本用法
1. 只能在异步函数内部使用: await关键字只能在一个使用了 异步函数内部使用。它不能在普通的同步函数中使用。
2. 等待协程: async def定义的 await后面通常跟一个协程对象(一个异步函数的调用)。当执行到 await时,当前协程会暂停执行,等待右侧的协程完成。
2.2.2、await的工作原理
1. 挂起与恢复: 当执行到 await时,当前协程会挂起,并让出控制权给事件循环 (event loop)。事件循环可以在这段时间内运行其他协程或处理其他事件。一 旦await后面的协程完成,事件循环会恢复执行原来的协程, 果就是协程的返回值。
2. 非阻塞: 尽管 await表达式的结 await看起来像是同步代码中的阻塞操作,但实际上它是非阻塞 的。这是因为事件循环负责协程之间的切换,从而实现并发。
import asyncio
import time
# 定义一个异步函数say_after,它接受延迟时间和要打印的消息作为参数
async def say_after(delay, what):
# 使用await关键字挂起当前协程,直到指定的延迟时间结束后再继续执行
await asyncio.sleep(delay)
# 打印消息
print(what)
# 定义主异步函数main
async def main():
# 记录开始时间
print(f"started at {time.strftime('%X')}")
# 调用say_after函数,等待1秒后打印'hello'
await say_after(1, 'hello')
# 调用say_after函数,等待2秒后打印'world'
# 注意:这里的执行不是并行的,而是顺序的,因为两个await语句是顺序执行的
await say_after(2, 'world')
# 记录结束时间
print(f"finished at {time.strftime('%X')}")
# 调用asyncio.run()来启动主协程
# 这将创建一个新的事件循环并运行main()直到完成
asyncio.run(main())
'''
started at xx:xx:xx
hello
world
finished at xx:xx:xx
'''
三、事件循环
事件循环是一种处理程序执行、事件和消息分发的机制。它不断地等待事件的发生, 当事件发生时,事件循环会将其分发给相应的处理程序进行处理。事件循环的核心是 一个循环,它会不断地检查是否有事件需要处理,如果有,就调用相应的回调函数来 处理这些事件。
其工作流程为:
1. 启动:创建并启动事件循环。
2. 注册事件:将各种事件(如网络套接字、文件描述符、定时器等)注册到事件循 环中。
3. 事件循环:进入一个循环,等待事件的发生,并处理这些事件。
4. 执行任务:当事件发生时,事件循环会调用相关的处理函数或恢复相应的协程。
5. 关闭:当所有任务完成后,关闭事件循环。
事件循环的创建随着Python版本的不同而不同,在Python3.7版本之前,事件循环需 要先使用 asyncio.get_event_loop()来获取循环,然后使用 run_until_complete()来执行任务。在Python3.7及以后的版本,直接使用 asyncio.run()来直接执行任务。
import asyncio
# 定义一个异步函数func1
async def func1():
print('start func1') # 打印信息,表示func1开始执行
await asyncio.sleep(2) # 模拟异步I/O操作,协程在这里挂起2秒钟
print('end func1') # 打印信息,表示func1执行结束
# 定义另一个异步函数func2
async def func2():
print('start func2') # 打印信息,表示func2开始执行
await asyncio.sleep(2) # 模拟异步I/O操作,协程在这里挂起2秒钟
print('end func2') # 打印信息,表示func2执行结束
asyncio.run(func1())
asyncio.run(func2())
'''
start func1
end func1
start func2
end func2
'''
四、Task对象
Task对象是 asyncio库中的一个实现,它用来在事件循环中安排协程的执行。一个 Task是对协程的一个封装,简单来说,协程本身并不会自动运行,当一个协程被封 装为一个Task对象并提交到事件循环中时,它才会在事件循环中被安排执行。当协程 执行完毕后, Task会提供协程的返回值或异常,并且相比协程对象, 更加丰富的方法供我们使用。
将协程对象封装为 Task对象拥有 Task是通过asyncio库中的函数进行的,但随着Python版本的不 同,其所用函数也不同。
在python3.7之前,Task的创建使用的是 asyncio.ensure_future()函数,通过该 函数将使用 async定义的协程函数所返回的协程对象提交到事件循环中。在 python3.7之后,创建Task对象的方法变得更加直接和明确,可以使用 asyncio.create_task()函数来创建,且python3.8版本之后,添加了name参数可 以为任务指定名称。这个函数接受一个协程对象作为参数,并返回一个新的Task对 象。
import asyncio
# 定义一个异步函数func1
async def func1():
print('start func1') # 打印信息,表示func1开始执行
await asyncio.sleep(2) # 模拟异步I/O操作,协程在这里挂起2秒钟
print('end func1') # 打印信息,表示func1执行结束
# 定义另一个异步函数func2
async def func2():
print('start func2') # 打印信息,表示func2开始执行
await asyncio.sleep(2) # 模拟异步I/O操作,协程在这里挂起2秒钟
print('end func2') # 打印信息,表示func2执行结束
# 定义主异步函数main,它将作为程序的入口点
async def main():
# 创建任务列表,使用asyncio.create_task来创建任务
tasks = [
asyncio.create_task(func1()), # 创建并调度func1作为异步任务
asyncio.create_task(func2())# 创建并调度func2作为异步任务
]
# 使用asyncio.wait等待所有任务完成
# asyncio.wait接收一个任务列表,并等待这些任务完成
done, pending = await asyncio.wait(tasks)
# 使用 asyncio.run() 来运行主函数
# asyncio.run()是Python 3.7引入的,它会创建一个新的事件循环,运行传入的协程,并在协程完成后关闭事件循环
# 等同于asyncio.get_event_loop().run_until_complete(main())
asyncio.run(main())
'''
start func1
start func2
end func1
end func2
'''
import asyncio
# 定义一个异步函数func1
async def func1():
print('start func1') # 打印信息,表示func1开始执行
await asyncio.sleep(2) # 模拟异步I/O操作,协程在这里挂起2秒钟
print('end func1') # 打印信息,表示func1执行结束
# 定义另一个异步函数func2
async def func2():
print('start func2') # 打印信息,表示func2开始执行
await asyncio.sleep(2) # 模拟异步I/O操作,协程在这里挂起2秒钟
print('end func2') # 打印信息,表示func2执行结束
# 定义主异步函数main,它将作为程序的入口点
async def main():
# 直接调用asyncio.gather不需要将协程对象先手动封装为Task对象
# 该函数会负责将它们作为任务调度到事件循环中
# asyncio.gather会返回一个包含所有结果的列表
await asyncio.gather(func1(), func2())
# 使用 asyncio.run() 来运行主函数
# asyncio.run()会创建一个新的事件循环,运行传入的协程,并在协程完成后关闭事件循环
asyncio.run(main())
'''
start func1
start func2
end func1
end func2
'''
五、协程间通信
与线程相似,协程之间的通信也只有消息队列一种,且拥有不同种类的消息队列。 在python中,协程所使用的消息队列在 asyncio.Queue库下,其中共存在三种类型 的队列,分别为标准的先进先出队列 Queue、先进后出队列 LifoQueue和优先级队 列PriorityQueue。
5.1、Queue
先进先出的原则
asyncio.Queue(maxsize=0)
maxsize:队列的最大尺寸,如果maxsize小于等于零,则队列尺寸是无限的。如果 是大于0的整数,则当队列达到maxsize时, await put()将阻塞至某个元素被 get()取出。
类方法 :
● Queue.qsize():返回队列中当前有几条消息。
● Queue.empty():如果队列为空,返回True,否则返回 False。
● Queue.full():如果队列已满(达到最大尺寸),返回 True,否则返回 False。
● Queue.put(item, block=True, timeout=None):将 item 放入队列。如果 block 是True是 None(默认),则在必要时阻塞至有空闲的插槽, 如果timeout 是正数,将最多阻塞 timeout 秒,如果在这段时间内没有可 用的插槽,将引发 queue.Full 异常。
● Queue.put_nowait(item):相当于 Queue.put(item, block=False)。如果队列已满,立即引发 queue.Full 异常。
● Queue.get(b1ock=True,timeout=None):从队列中移除并返回一个元素。如果 block是 True 且 timeout是 None(默认),则在必要时阻塞至队列中有项目可用。如果 timeout 是正数,将最多阻塞 timeout 秒,如果在这段时间内没有项目可用,将引发 queue.Empty 异常。
● Queue.get_nowait():相当于 Queue.get(block=False)。如果队列为空立即引发 queue.Empty 异常。
● Queue.task_done():指示之前入队的一个任务已经完成。由队列的消费者线程使用。每个Queue. get()调用之后,需要调用 Queue.task_done()告诉队列该任务处理完成。
● Queue.join():阻塞直到队列中的所有元素都被处理完。当元素添加到队列的 时候,未完成任务的计数就会增加,每当消费协程调用 task_done()表示这个元 素已经处理完毕,那么未完成计数就会减少。当未完成计数降到零的时候, join()阻塞被接触。
import asyncio
# 生产者协程,负责生成一系列数字并将它们放入队列中
async def producer(queue, n):
for i in range(1, n + 1): # 循环从1到n,生成数字
print(f'生产者生产了: {i}') # 打印当前生产的数字
await queue.put(i) # 将数字放入队列,如果队列已满,则阻塞直到有空位
await asyncio.sleep(1) # 模拟生产耗时,等待1秒钟
print('生产者完成生产。') # 所有数字生产完毕,打印完成消息
await queue.put(None) # 放入一个None作为结束信号,通知消费者没有更多数字
# 消费者协程,负责从队列中取出数字并打印它们
async def consumer(queue):
while True: # 无限循环,直到接收到结束信号
item = await queue.get() # 从队列中取出一个元素,如果队列为空,则阻塞直到有元素
if item is None: # 检查是否接收到结束信号
queue.task_done() # 通知队列,当前任务已经完成
break # 如果是结束信号,退出循环
print(f'消费者消费了: {item}') # 打印当前消费的数字
queue.task_done() # 通知队列,当前任务已经完成
print('消费者完成消费。') # 打印完成消费的消息
# 主协程,负责启动生产者和消费者协程,并等待它们完成
async def main():
queue = asyncio.Queue(5) # 创建一个队列实例,用于生产者和消费者之间的通信
# 创建生产者和消费者协程
producer_coro = producer(queue, 5) # 生产者协程,生产1到5的数字
consumer_coro = consumer(queue) # 消费者协程
# 使用asyncio.gather等待生产者和消费者协程完成
# gather允许同时运行多个协程,并在它们都完成时返回结果
await asyncio.gather(producer_coro, consumer_coro)
# 运行主协程,启动事件循环
asyncio.run(main())
'''
生产者生产了: 1
消费者消费了: 1
生产者生产了: 2
消费者消费了: 2
生产者生产了: 3
消费者消费了: 3
生产者生产了: 4
消费者消费了: 4
生产者生产了: 5
消费者消费了: 5
生产者完成生产。
消费者完成消费。
'''
5.2、LifoQueue
后进先出
asyncio.LifoQueue(maxsize=0)
maxsize:队列的最大尺寸。如果设置为小于或等于0的数,则队列的尺寸是无 限的。
常用方法:
● LifoQueue.put(item, block=True, timeout=None):将 如果 block 是 True 且 timeout 是 item 放入队列。 None(默认),则在必要时阻塞至有空闲 的插槽。如果 timeout 是正数,将最多阻塞 timeout 秒,如果在这段时间内没 有可用的插槽,将引发完全异常。
● LifoQueue.put_nowait(item):相当于LifoQueue.put(item,b1ock=False)。如果队列已满,立即引发完全异常。
● LifoQueue.get(block=True,timeout=None):从队列中移除并返回一个元素。如果 block是True且timeout是 None(默认),则在必要时阻塞至队列中有项目可用。如果 timeout 是正数,将最多阻塞timeout秒,如果在这段时间内没有项目可用,将引发完全异常。
● LifoQueue.get_nowait():相当于 LifoQueue.get(block=False)。如果队列为空,立即引发完全异常。
● LifoQueue.qsize():返回队列中的项目数量
● LifoQueue.empty():如果队列为空,返回 True,否则返回 False。
● LifoQueue.full():如果队列已满(达到最大尺寸),返回 True,否则返回False 。
● LifoQueue.task_done(): 指示之前入队的一个任务已经完成,即 get出来的元素相关操作已经完成。由队列中的 get()端掌控,每次 get用于一个任务时,任务最后要调用 task_done()告诉队列,任务已经完成。
● LifoQueue.join():阻塞直到队列中的所有元素都被处理完。当元素添加到队 列的时候,未完成任务的计数就会增加,每当消费协程调用 task_done()表示这 个元素已经处理完毕,那么未完成计数就会减少。当未完成计数降到零的时候, join()阻塞被接触。
import asyncio
# 生产者协程,负责生成一系列数字并将它们放入队列中
async def producer(queue, n):
for i in range(1, n + 1): # 循环从1到n,生成数字
print(f'生产者生产了: {i}') # 打印当前生产的数字
await queue.put(i) # 将数字放入队列,如果队列已满,则阻塞直到有空位
await asyncio.sleep(1) # 模拟生产耗时,等待1秒钟
print('生产者完成生产。') # 所有数字生产完毕,打印完成消息
await queue.put(None) # 放入一个None作为结束信号,通知消费者没有更多数字
# 消费者协程,负责从队列中取出数字并打印它们
async def consumer(queue):
await asyncio.sleep(5)
while True: # 无限循环,直到接收到结束信号
item = await queue.get() # 从队列中取出一个元素,如果队列为空,则阻塞直到有元素
if item is None: # 检查是否接收到结束信号
queue.task_done() # 通知队列,当前任务已经完成
break # 如果是结束信号,退出循环
print(f'消费者消费了: {item}') # 打印当前消费的数字
queue.task_done() # 通知队列,当前任务已经完成
print('消费者完成消费。') # 打印完成消费的消息
# 主协程,负责启动生产者和消费者协程,并等待它们完成
async def main():
queue = asyncio.LifoQueue(10) # 创建一个队列实例,用于生产者和消费者之间的通信
# 创建生产者和消费者协程
producer_coro = producer(queue, 5) # 生产者协程,生产1到5的数字
consumer_coro = consumer(queue) # 消费者协程
# 使用asyncio.gather等待生产者和消费者协程完成
# gather允许同时运行多个协程,并在它们都完成时返回结果
await asyncio.gather(producer_coro, consumer_coro)
# 等待队列中的所有项目都被处理完毕
await queue.join()
print('所有任务都已处理完毕。')
# 运行主协程,启动事件循环
asyncio.run(main())
'''
生产者生产了: 1
生产者生产了: 2
生产者生产了: 3
生产者生产了: 4
生产者生产了: 5
消费者消费了: 5
消费者消费了: 4
消费者消费了: 3
消费者消费了: 2
消费者消费了: 1
生产者完成生产。
消费者完成消费。
所有任务都已处理完毕。
'''
5.3、PriorityQueue
实现优先级队列
asyncio.PriorityQueue(maxsize=0)
maxsize:队列的最大尺寸。如果设置为小于或等于0的数,则队列的尺寸是无 限的。
常用方法:
● PriorityQueue.put((priority, item), block=True, timeout=None):将 item 放入队列,并为其指定一个优先级 timeout 是 priority。如果 block 是 True 且 None(默认),则在必要时阻塞至有空闲的插槽。如果 timeout 是正数,将最多阻塞 timeout 秒,如果在这段时间内没有可用的插槽,将引发 完全异常。
● PriorityQueue.put_nowait((item, priority):相当于 PriorityQueue.put((item, priority), block=False)。如果队列已满,立 即引发完全异常。
● PriorityQueue.get(block=True, timeout=None):从队列中移除并返回一 个元素。如果 block 是 True 且 timeout 是 None(默认),则在必要时阻塞 至队列中有项目可用。如果 timeout 是正数,将最多阻塞 timeout 秒,如果在 这段时间内没有项目可用,将引发完全异常。
● PriorityQueue.get_nowait():相当于 PriorityQueue.get(block=False)。如果队列为空,立即引发完全异常。
● PriorityQueue.qsize():返回队列中的项目数量。
● PriorityQueue.empty():如果队列为空,返回True,否则返回False。
● PriorityQueue.full():如果队列已满(达到最大尺寸),返回True,否则返回 False。
● PriorityQueue.task_done():指示之前入队的一个任务已经完成,即 来的元素相关操作已经完成。由队列中的 get()端掌控,每次get用于一个任务 时,任务最后要调用 get出 task_done()告诉队列,任务已经完成。
import asyncio
# 生产者协程,负责生成一系列数字(这里实际上是字典键值对)并将它们放入队列中
async def producer(queue, n):
fx = {4: 'd', 5: 'e', 2: 'b', 3: 'c', 1: 'a'} # 定义一个字典,包含数字和字母的映射
fx_tuples = [(key, value) for key, value in fx.items()] # 将字典转换为元组列表
for i in range(0, n): # 循环从0到n-1,但由于字典是无序的,这里的i仅用作索引限制
# 注意:如果n大于fx_tuples的长度,将会引发IndexError
print(f'生产者生产了: {fx_tuples[i]}') # 打印当前生产的元组
await queue.put(fx_tuples[i]) # 将元组放入队列,如果队列已满,则阻塞直到有空位
await asyncio.sleep(1) # 模拟生产耗时,等待1秒钟
print('生产者完成生产。') # 所有指定的元组生产完毕,打印完成消息
await queue.put(None) # 放入一个None作为结束信号,通知消费者没有更多元组
# 消费者协程,负责从队列中取出元组并打印它们
async def consumer(queue):
await asyncio.sleep(5) # 消费者在开始消费前等待5秒(模拟其他任务或延迟)
while True: # 无限循环,直到接收到结束信号
item = await queue.get() # 从队列中取出一个元素,如果队列为空,则阻塞直到有元素
if item is None: # 检查是否接收到结束信号
queue.task_done() # 通知队列,当前任务(即处理None作为结束信号的任务)已经完成
break # 如果是结束信号,退出循环
print(f'消费者消费了: {item}') # 打印当前消费的元组
queue.task_done() # 通知队列,当前任务(即处理一个元组的任务)已经完成
print('消费者完成消费。') # 打印完成消费的消息
# 主协程,负责启动生产者和消费者协程,并等待它们完成
async def main():
queue = asyncio.PriorityQueue(10) # 创建一个优先队列实例,用于生产者和消费者之间的通信,容量为10
# 创建生产者和消费者协程
producer_coro = producer(queue, 5) # 生产者协程,尝试生产前5个字典中的键值对(注意字典无序)
consumer_coro = consumer(queue) # 消费者协程
# 使用asyncio.gather等待生产者和消费者协程完成
# gather允许同时运行多个协程,并在它们都完成时返回结果(这里不关心具体返回值)
await asyncio.gather(producer_coro, consumer_coro)
# 等待队列中的所有项目都被处理完毕(即等待所有task_done()被调用)
await queue.join()
print('所有任务都已处理完毕。')
# 运行主协程,启动事件循环
asyncio.run(main())
'''
生产者生产了: (4, 'd')
生产者生产了: (5, 'e')
生产者生产了: (2, 'b')
生产者生产了: (3, 'c')
生产者生产了: (1, 'a')
消费者消费了: (1, 'a')
消费者消费了: (2, 'b')
消费者消费了: (3, 'c')
消费者消费了: (4, 'd')
消费者消费了: (5, 'e')
生产者完成生产。
消费者完成消费。
所有任务都已处理完毕。
'''
六、协程同步
与进程、线程类似,协程也有同步机制,包括Lock、Semaphore、Event、 Condition。
6.1、Lock
在协程中,可以使用Lock来确保同一时间只有一个协程可以访问某个资源。
asyncio.Lock()
其方法为:
acquire():获取锁。此方法会等待直至锁为unlocked,然后将其设为locked并 返回True。当有一个以上的协程在 acquire()中被阻塞则会等待解锁,最终只 有一个协程会被执行。锁的获取是公平的,被执行的协程将是第一个开始等待锁 的协程。
release():释放锁。当锁为locked时,将其设为unlocked并返回。如果锁为 unlocked,则会抛出异常。
locked():如果锁为locked则返回 True。
为了避免死锁,建议使用async with 来管理Lock。
import asyncio # 导入asyncio模块,提供异步编程的原语和工具
async def worker(lock, id): # 定义一个协程函数,接收一个锁和一个标识符
while True: # 无限循环,模拟持续工作的协程
async with lock: # 使用async with语句获取锁,确保同一时间只有一个协程执行这部分代码
print(f"Worker {id} is working") # 打印当前协程正在工作的消息
await asyncio.sleep(1) # 模拟I/O操作,挂起协程1秒钟
async def main(): # 定义主协程函数,用于启动和管理其他协程
lock = asyncio.Lock() # 创建一个锁,用于同步协程,防止它们同时执行某些代码块
# 创建两个协程,并将它们传递给asyncio.gather()函数
# asyncio.gather()用于并发运行多个协程,并等待它们全部完成
await asyncio.gather(worker(lock, 1), worker(lock, 2)) # 并发运行两个worker协程
asyncio.run(main()) # 运行主协程,启动事件循环并执行主协程
6.2、Semaphore
在协程中,可以使用Semaphore来控制对资源的访问数量。Semaphore会管理一个 内部计数器,该计数器会随每次 acquire调用递减并随每次 release调用递增,计 数器的值永远不会降到零以下。当 acquire发现其值为零时,它将保持阻塞直到有某 个任务调用了 release。
asyncio.Semaphore(value=1)
value:value参数用来为内部计数器赋初始值,默认为1。如果给定的值小于0则会 抛出异常。
其方法为:
acquire():获取一个信号量。如果内部计数器的值大于零,则将其减一并立即 返回True。如果其值为零,则会等待直到 release被调用。
release():释放一个信号量,将内部计数器的值加一。可以唤醒一个正在等待 获取信号量对象的任务。
locked():如果信号量对象无法被立即获取则返回True
建议使用async with来管理Semaphore。
import asyncio
async def car(semaphore, car_id):
"""模拟车辆进入停车场并离开的过程"""
print(f"Car {car_id} 正在等待停车位")
async with semaphore: # 获取信号量,相当于获取停车位
print(f"Car {car_id} 进入停车场了.")
await asyncio.sleep(2) # 模拟车辆在停车场内停留的时间
print(f"Car {car_id} 离开停车场了")
# 信号量在退出async with块时自动释放,相当于车辆离开停车场
async def main():
# 假设停车场只有3个停车位
parking_spaces = asyncio.Semaphore(3)
# 创建5个车辆协程
cars = [car(parking_spaces, i) for i in range(1, 6)]
# 并发运行所有车辆协程
await asyncio.gather(*cars)
asyncio.run(main())
'''
Car 1 正在等待停车位
Car 1 进入停车场了.
Car 2 正在等待停车位
Car 2 进入停车场了.
Car 3 正在等待停车位
Car 3 进入停车场了.
Car 4 正在等待停车位
Car 5 正在等待停车位
Car 1 离开停车场了
Car 2 离开停车场了
Car 3 离开停车场了
Car 4 进入停车场了.
Car 5 进入停车场了.
Car 4 离开停车场了
Car 5 离开停车场了
'''
6.3、Event
在python中使用Event允许一个协程通知一个或多个协程某个事件已经发生。Event 对象会管理一个内部标志,可通过 set方法将其设为True并通过 设为False。 clear方法将其重 wait方法会阻塞直至该标志被设为True。该标志初始时会被设为 False。
asyncio.Event()
其方法为:
wait():协程等待直至事件被设置。如果事件已被设置,则立即返回True,否则将阻塞直至另一个任务调用set()
set():设置事件。所有等待事件被设置的任务将被立即唤醒。
clear():清空(取消设置)事件。通过wait()进行等待的任务现在将会阻塞直至set()方法被再次调用。
is_set():如果事件已被设置则返回 True。
import asyncio
import random
async def producer(event, data):
"""生产者协程,它在准备好数据后设置事件"""
print(f"Producer is preparing data: {data}")
time = random.uniform(0.5, 2)
print(time)
await asyncio.sleep(time) # 模拟数据准备时间
print(f"Producer has prepared data: {data}")
event.set() # 设置事件,表示数据已经准备好了
print("Producer has notified the consumer.")
async def consumer(event):
"""消费者协程,它在事件被设置后开始消费数据"""
print("Consumer is waiting for data.")
await event.wait() # 等待事件被设置
print("Consumer has received the notification and is consuming data.")
# 模拟数据处理
await asyncio.sleep(random.uniform(0.5, 2))
print("Consumer has finished consuming data.")
async def main():
# 创建一个事件对象
event = asyncio.Event()
# 创建生产者和消费者协程
producer_coro = producer(event, "data1")
consumer_coro = consumer(event)
# 并发运行生产者和消费者协程
await asyncio.gather(producer_coro, consumer_coro)
asyncio.run(main())
'''
Producer is preparing data: data1
1.4196680751620212
Consumer is waiting for data.
Producer has prepared data: data1
Producer has notified the consumer.
Consumer has received the notification and is consuming data.
Consumer has finished consuming data.
'''
6.4、Condition
在python中允许协程等待某些条件成立,然后被通知恢复执行。在本质上, Condition 对象合并了 Event和 Lock 的功能。 多个 Condition 对象有可能共享一个 Lock,这允许关注于共享资源的特定状态的不同任务实现对共享资源的协同独占访 问。
asyncio.Condition(lock=None)
lock:lock参数必须为自己创建的 Lock对象或None,在后一种情况下会自动创建 一个新的Lock对象。
其方法为:
acquire():获取下层的锁。此方法会等待直至下层的锁为 unlocked,将其设为 locked 并返回 True。
notify(n=1):唤醒最多 n 个正在等待此条件的任务(默认为 1 个)。如果没 有任务正在等待则此方法为空操作。锁必须在此方法被调用前被获取并在随后被 快速释放。 如果通过一个 unlocked 锁调用则会引发异常。
locked():如果下层的锁已被获取则返回 True。
notify_all():唤醒所有正在等待此条件的任务。此方法的行为类似于 notify,但会唤醒所有正在等待的任务。锁必须在此方法被调用前被获取并在 随后被快速释放。 如果通过一个 unlocked 锁调用则会引发异常。
release():释放下层的锁。在未锁定的锁调用时,会引发异常。
wait():等待直至收到通知。当此方法被调用时如果调用方任务未获得锁,则 会引发异常。这个方法会释放下层的锁,然后保持阻塞直到被 notify()或 notify_all()调用所唤醒。 一旦被唤醒,Condition 会重新获取它的锁并且此 方法将返回 True。
wait_for(predicate):等待直到目标值变为 true。目标必须为一个可调用对 象,其结果将被解读为一个布尔值。
建议使用async with来管理Condition。
import asyncio
# 生产者函数,负责通知所有等待的消费者
async def producer(condition):
while True: # 无限循环,模拟持续的生产活动
await condition.acquire() # 获取条件变量的锁
condition.notify_all() # 通知所有等待的消费者
condition.release() # 释放条件变量的锁
await asyncio.sleep(1) # 暂停一秒,模拟生产活动的时间间隔
# 消费者函数,负责等待生产者的通知
async def consumer(condition, number):
while True: # 无限循环,模拟持续的消费活动
await condition.acquire() # 获取条件变量的锁
print(f'{number}正在等待condition') # 打印消费者正在等待的通知
await condition.wait() # 等待生产者的通知
print(f'{number}已释放condition') # 打印消费者收到通知后的消息
condition.release() # 释放条件变量的锁
# 主函数,负责启动生产者和消费者任务
async def main():
condition = asyncio.Condition() # 创建一个条件变量
# 创建任务列表,包括一个生产者和多个消费者
tasks = [
asyncio.create_task(producer(condition)), # 创建生产者任务
asyncio.create_task(consumer(condition, 1)), # 创建消费者任务,编号为1
asyncio.create_task(consumer(condition, 2)), # 创建消费者任务,编号为2
asyncio.create_task(consumer(condition, 3)), # 创建消费者任务,编号为3
asyncio.create_task(consumer(condition, 4)), # 创建消费者任务,编号为4
asyncio.create_task(consumer(condition, 5)), # 创建消费者任务,编号为5
]
# 等待所有任务完成,由于生产者是无限循环,这里实际上会无限等待
await asyncio.wait(tasks)
# 运行主函数,启动事件循环
asyncio.run(main())
七、将协程分布到线程池/进程池中
一般情况下,程序的异步开发要么使用协程,要么使用进程池或线程池,但是也会碰 到有一些情况需要既使用协程又使用进程池或线程池,而进程池、线程池 submit后 返回的 Future和协程的 Future又不是一回事,不能直接使用await,因此就需要进 行一个对象的转换。
在Python中,可以通过 asyncio.wrap_future()来将一个 concurrent.futures.Future转化为asyncio.Future,这样就可以去使用协程的相关内容了。
import asyncio
import concurrent.futures
import time
# 这是一个普通函数
def func1():
time.sleep(5)
print('in func1')
# 这是一个普通函数
def func2():
time.sleep(3)
print('in func2')
async def main():
# 创建一个进程池
with concurrent.futures.ProcessPoolExecutor() as pool:
# 使用进程池提交任务
future1 = pool.submit(func1)
future2 = pool.submit(func2)
# 将 concurrent.futures.Future 转换为 asyncio.Future
async_future1 = asyncio.wrap_future(future1)
async_future2 = asyncio.wrap_future(future2)
# 使用 asyncio 的 await 等待结果
result = await asyncio.gather(async_future1,async_future2)
print(f"The result is {result}")
# 注意:进程就需要放到主模块中去执行
if __name__ == '__main__':
asyncio.run(main())
'''
in func2
in func1
The result is [None, None]
'''
使用 loop.run_in_executor()直接转换
使用 asyncio.get_running_loop()时,如果当前没有正在运行的事件循环,就抛 出异常。而上面的 asyncio.get_event_loop()则是在当前没有正在运行的事件循 环的基础上,会创建一个新的事件循环。相对来说, asyncio.get_running_loop()更适合在协程或异步函数内部使用, asyncio.get_event_loop()适用于更广泛的情况。
loop.run_in_executor(executor, func, *args):
executor:一个执行器对象,通常是 concurrent.futures.ThreadPoolExecutor 或 concurrent.futures.ProcessPoolExecutor 的实例。它管理同步函数的执 行,如果不指定就默认创建一个线程池。
func:要执行的同步函数。
*args:传递给 func的位置参数。
import asyncio
import time
# 示例同步函数,模拟耗时操作
def slow_function1():
# 打印信息,表示函数开始执行
print("Function 1 is running")
# 模拟耗时操作,线程睡眠2秒
time.sleep(2)
# 打印信息,表示函数执行完毕
print("Function 1 is done")
# 返回函数执行结束的信息
return 'func1 end'
def slow_function2():
# 打印信息,表示函数开始执行
print("Function 2 is running")
# 模拟耗时操作,线程睡眠2秒
time.sleep(2)
# 打印信息,表示函数执行完毕
print("Function 2 is done")
# 返回函数执行结束的信息
return 'func2 end'
async def main():
# 获取当前正在运行的事件循环
loop = asyncio.get_running_loop()
print('before run')
# 使用线程池执行器并发运行两个同步函数
# run_in_executor的第一个参数为None,表示使用默认的线程池执行器
task1 = loop.run_in_executor(None, slow_function1)
task2 = loop.run_in_executor(None, slow_function2)
print('after run')
# 等待两个函数执行完成,并获取它们的返回值
result1 = await task1
result2 = await task2
print('after await')
# 打印两个函数的执行结果
print(f"Result of function 1: {result1}")
print(f"Result of function 2: {result2}")
if __name__ == '__main__':
# 记录程序开始执行的时间
start = time.time()
# 运行主函数
asyncio.run(main())
# asyncio.get_running_loop()
# 打印程序执行的总时间
print('total_time', time.time() - start)
'''
before run
Function 1 is running
Function 2 is running
after run
Function 1 is done
Function 2 is done
after await
Result of function 1: func1 end
Result of function 2: func2 end
total_time 2.0069408416748047
'''