Python编程 - 协程
前言
上篇文章主要讲述了python的进程,进程池和进程与线程对比等知识,接下来这篇文章再唠唠python的协程,让我们继续往下看!
一、协程的使用
python 中的协程是一种用于处理并发任务的高效工具,它依赖于 asyncio
库以及 async
和 await
关键字来实现异步编程。协程与传统的多线程或多进程并发模型不同,它通过事件循环实现任务的调度,在单线程内并发执行多个任务,适用于 I/O 密集型任务,如网络请求、文件操作等。
(一)基本使用
协程的定义使用 async
关键字,调用协程函数不会立即执行,而是返回一个协程对象,只有通过 await
关键字来执行它。await
会暂停当前协程的执行,等待另一个协程完成后再继续。
示例:
import asyncio
# 定义一个协程函数
async def say_hello():
print("Hello")
await asyncio.sleep(1) # 模拟耗时操作
print("World")
# 定义主函数
async def main():
await say_hello()
# 启动事件循环
asyncio.run(main())
代码解释:
-
async def say_hello()
定义了一个协程函数,它会在await asyncio.sleep(1)
处暂停 1 秒,模拟一个耗时任务。 -
asyncio.run(main())
启动了事件循环并执行协程。
(二)并发执行多个任务
协程的优势在于可以并发执行多个任务,避免顺序执行带来的阻塞。通过 asyncio.gather()
,可以同时运行多个协程,从而提高程序效率。
示例:
import asyncio
async def task1():
print("Task 1 started")
await asyncio.sleep(2) # 模拟耗时操作
print("Task 1 finished")
async def task2():
print("Task 2 started")
await asyncio.sleep(1)
print("Task 2 finished")
async def main():
# 并发运行两个任务
await asyncio.gather(task1(), task2())
asyncio.run(main())
代码解释:
-
asyncio.gather()
用于并发执行task1()
和task2()
。两个任务同时开始,而不会等待前一个任务完成再执行下一个。
(三)协程与异步I/O
协程在处理 I/O 密集型任务时表现尤为出色,例如网络请求、文件读取等。通过 aiohttp
等异步库,可以大大提高程序的响应速度。
示例:
import asyncio
import time
async def fetch_data(url):
print(f"Fetching data from {url}")
await asyncio.sleep(2) # 模拟网络请求
print(f"Received data from {url}")
async def main():
start = time.time()
# 并发执行多个网络请求
await asyncio.gather(fetch_data('url1'), fetch_data('url2'), fetch_data('url3'))
end = time.time()
print(f"Total time: {end - start} seconds")
asyncio.run(main())
代码解释:
-
asyncio.gather()
并发运行多个网络请求的协程任务。 -
由于
await asyncio.sleep(2)
是异步操作,因此多个请求会并发执行,从而显著减少总的执行时间。
(四)创建任务并独立执行
有时需要在不等待任务完成的情况下创建协程任务,可以使用 asyncio.create_task()
来创建一个独立执行的协程任务。
示例:
import asyncio
async def task1():
print("Task 1 started")
await asyncio.sleep(2)
print("Task 1 finished")
async def main():
# 创建任务,但不等待其完成
task = asyncio.create_task(task1())
print("Task 1 is running in the background")
await asyncio.sleep(1) # 主程序继续执行其他操作
print("Main task completed")
asyncio.run(main())
代码解释:
-
asyncio.create_task()
创建了一个独立的协程任务,它会在后台执行,而主程序可以继续执行其他操作。 -
task1()
的执行不会阻塞主任务的执行。
(五)协程中的异常处理
在协程中同样可以使用 try/except
进行异常处理,这样可以确保即使某个协程抛出异常,程序依然可以继续执行。
示例:
import asyncio
async def faulty_task():
try:
print("Faulty task started")
await asyncio.sleep(1)
raise ValueError("An error occurred!")
except ValueError as e:
print(f"Error caught: {e}")
async def main():
await faulty_task()
asyncio.run(main())
该示例展示了如何在协程中捕获并处理异常,避免程序因异常崩溃。
(六)超时控制
有时,某些任务可能会长时间运行或卡住,可以通过 asyncio.wait_for()
为协程任务设置超时时间,如果任务未在指定时间内完成,将抛出 asyncio.TimeoutError
异常。
示例:
import asyncio
async def long_task():
print("Long task started")
await asyncio.sleep(5)
print("Long task finished")
async def main():
try:
# 设置超时时间为 3 秒
await asyncio.wait_for(long_task(), timeout=3)
except asyncio.TimeoutError:
print("Task timed out!")
asyncio.run(main())
asyncio.wait_for()
设置任务的超时时间。如果 long_task()
在 3 秒内未完成,将抛出 TimeoutError
。
(七)总结
python 中的协程是一种高效处理并发任务的工具,特别适用于 I/O 密集型操作。通过 async/await
语法和 asyncio
库,可以在单线程环境下实现多任务并发,避免不必要的阻塞,极大提高程序的执行效率。
-
async
和await
是定义和调用协程的基础。 -
asyncio.gather()
和asyncio.create_task()
实现并发任务。 -
异常处理、超时控制、同步函数的异步化都可以在协程中灵活应用。
二、concurrent中的future对象
concurrent.futures
模块中,Future
对象是用于表示一个异步操作的结果,它可以帮助我们在多线程或多进程环境下跟踪任务的执行状态,并在任务完成后获取结果。Future
对象通常与线程池 ThreadPoolExecutor
或进程池 ProcessPoolExecutor
一起使用。
(一)概述
Future
对象是一个容器,用于存储异步任务的结果。它提供了多种方法和属性,用来检查任务的状态、获取任务的结果,或者等待任务完成。它的核心思想是:异步任务在后台执行,程序可以继续运行而不阻塞,而当我们需要结果时,可以通过 Future
对象访问该任务的执行状态和结果。
(二)使用场景
Future
对象一般与 concurrent.futures
模块中的线程池或进程池执行器executor一起使用,用来并发地执行多个任务。
ThreadPoolExecutor
示例
from concurrent.futures import ThreadPoolExecutor
def task(n):
print(f"处理任务 {n}")
return n * 2
# 使用线程池执行多个任务
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(task, i) for i in range(5)]
for future in futures:
print(f"任务结果: {future.result()}")
在上面的代码中,executor.submit()
提交任务并返回一个 Future
对象。通过调用 future.result()
,我们可以获取任务的结果。如果任务还未完成,result()
将会阻塞直到任务完成。
(三)Future
对象的属性与方法
Future
对象提供了几种方法和属性,用来跟踪和获取异步任务的状态和结果。
主要方法和属性
-
future.result(timeout=None)
:用于获取异步任务的结果。如果任务完成,立即返回结果;如果任务尚未完成,则会等待。如果设置了timeout
参数,则最多等待timeout
秒,超过时间将抛出TimeoutError
异常。如果任务在执行过程中抛出了异常,result()
也会重新抛出该异常。 -
future.done()
:返回True
表示任务已经完成(无论是成功完成还是抛出异常),否则返回False
。 -
future.cancel()
:用于尝试取消异步任务。如果任务未开始执行,则可以取消并返回True
;如果任务已经开始,则无法取消,返回False
。 -
future.cancelled()
:返回True
表示任务已经被取消,返回False
表示任务没有被取消。 -
future.running()
:返回True
表示任务正在执行,返回False
表示任务未执行或已经完成。 -
future.add_done_callback(fn)
:给Future
对象添加一个回调函数fn
,当任务完成时会调用该函数。回调函数会接收Future
对象作为参数。
示例:
from concurrent.futures import ThreadPoolExecutor
import time
def task(n):
time.sleep(n)
return n * 2
with ThreadPoolExecutor() as executor:
future = executor.submit(task, 3)
print(f"任务完成了吗? {future.done()}")
result = future.result() # 这里会阻塞直到任务完成
print(f"任务结果:{result}")
print(f"任务完成了吗? {future.done()}")
在这个例子中,future.done()
在任务开始执行时返回 False
,而当任务完成后再调用 done()
则返回 True
。
(四)Future
对象的回调机制
Future
对象支持回调机制,通过 add_done_callback()
方法,我们可以在任务完成时自动调用指定的回调函数。回调函数会接收当前 Future
对象作为参数。
示例:
from concurrent.futures import ThreadPoolExecutor
def task(n):
return n * 2
def callback(future):
print(f"回调:任务结果是 {future.result()}")
with ThreadPoolExecutor() as executor:
future = executor.submit(task, 3)
future.add_done_callback(callback)
在这个示例中,当任务完成时,回调函数会自动被调用,并且可以通过传递的 Future
对象来获取任务结果。
(五)as_completed
和 wait
concurrent.futures
模块还提供了 as_completed()
和 wait()
函数,便于在多个 Future
对象上等待和检查结果。
-
as_completed(futures)
:返回一个迭代器,当每个Future
对象完成时,它会按照完成的顺序返回。 -
wait(futures, timeout=None, return_when=ALL_COMPLETED)
:等待多个Future
对象的完成,支持超时和条件返回。
示例:
from concurrent.futures import ThreadPoolExecutor, as_completed
def task(n):
return n * 2
with ThreadPoolExecutor() as executor:
futures = [executor.submit(task, i) for i in range(5)]
for future in as_completed(futures):
print(f"任务结果: {future.result()}")
(六)总结
concurrent.futures
模块中的 Future
对象为我们提供了处理异步任务的方式。它可以通过线程池或进程池来并发执行任务,并允许我们轻松地获取任务的执行状态、结果以及异常处理。Future
对象的灵活性使它成为并发编程中不可或缺的工具,适用于 I/O 密集型任务、CPU 密集型任务等场景。
三、协程与线程和进程的交叉使用
在 Python 编程中,协程、线程和进程是三种常用的并发编程方式。它们各自适用于不同的场景,但在一些复杂的应用中,可能需要将它们交叉使用,以充分发挥它们各自的优势,实现更高效的并发处理。
(一)协程、线程和进程的区别
协程
-
轻量级并发:协程是由 Python 内部实现的用户级并发,基于事件循环。协程通过
async
和await
关键字实现异步非阻塞的 I/O 操作,适合处理 I/O 密集型任务,如网络请求、文件读写等。 -
单线程执行:协程通常在单个线程中执行,通过释放控制权 (
await
) 来提高程序的并发性,不会占用多个 CPU 核心。
线程
-
操作系统级并发:线程由操作系统调度,可以在同一个进程内运行多个线程。线程可以利用多核 CPU 来并发执行代码。
-
多线程共享内存:线程共享进程的内存资源,但这也带来了线程安全问题,因此需要使用锁(Lock)或其他同步机制来避免数据竞争。
-
适合 I/O 密集型任务:对于 I/O 密集型任务,多线程可以有效提高程序的响应速度。
进程
-
独立的内存空间:进程是完全独立的运行实体,拥有自己的内存空间和资源。通过
multiprocessing
模块可以在不同的 CPU 核心上并行运行进程。 -
进程间通信(IPC):进程间不能直接共享内存,因此需要使用管道、队列等方式进行通信。
-
适合 CPU 密集型任务:由于 Python 的全局解释器锁(GIL),在单个进程内无法同时运行多个 Python 字节码,但通过多进程可以避免 GIL 的影响,充分利用多核 CPU,适合 CPU 密集型任务如图像处理、大数据计算等。
(二)协/线/进程的交叉使用场景
- 协程与线程的交叉使用
协程可以在单线程中提供高效的 I/O 并发处理,但有时需要同时进行一些阻塞的同步操作,或者需要利用多核 CPU 进行并发计算时,可以将协程和线程结合使用。
示例:在协程中运行阻塞的同步代码
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
# 阻塞的同步任务
def blocking_task():
time.sleep(2)
return "任务完成"
# 使用协程调度异步任务
async def main():
loop = asyncio.get_running_loop()
with ThreadPoolExecutor() as pool:
# 在线程池中运行同步阻塞任务
result = await loop.run_in_executor(pool, blocking_task)
print(result)
# 启动事件循环
asyncio.run(main())
在这个例子中,blocking_task
是一个同步任务,通过 ThreadPoolExecutor
在单独的线程中运行,从而避免阻塞事件循环。协程通过 await
来异步等待线程中的任务完成,这种方式结合了协程的异步优势和线程的多核并发处理能力。
- 协程与进程的交叉使用
在某些情况下,单线程中的协程可能无法满足 CPU 密集型任务的需求,因此可以结合进程来处理耗费 CPU 的任务。
示例:在协程中并发运行多进程任务
import asyncio
from concurrent.futures import ProcessPoolExecutor
import time
# CPU 密集型任务
def cpu_bound_task(n):
total = 0
for i in range(1000000 * n):
total += i
return total
# 使用协程调度异步任务
async def main():
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
# 在进程池中运行 CPU 密集型任务
result = await loop.run_in_executor(pool, cpu_bound_task, 5)
print(f"计算结果: {result}")
# 启动事件循环
asyncio.run(main())
在这个例子中,cpu_bound_task
是一个 CPU 密集型任务,它在协程环境中通过 ProcessPoolExecutor
来并发执行。协程负责调度和等待进程的结果返回,从而避免事件循环被阻塞。
- 线程与进程的交叉使用
有时我们可能需要同时处理 I/O 密集型和 CPU 密集型任务,这时可以考虑将线程和进程结合使用。
示例:线程池和进程池的结合
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
# I/O 密集型任务
def io_task():
time.sleep(1)
print("I/O 密集型任务完成")
# CPU 密集型任务
def cpu_task(n):
total = 0
for i in range(1000000 * n):
total += i
print(f"CPU 密集型任务结果: {total}")
# 使用线程池和进程池并发执行任务
def main():
with ThreadPoolExecutor() as thread_pool, ProcessPoolExecutor() as process_pool:
# 在线程池中运行 I/O 密集型任务
for _ in range(3):
thread_pool.submit(io_task)
# 在进程池中运行 CPU 密集型任务
for _ in range(2):
process_pool.submit(cpu_task, 5)
if __name__ == "__main__":
main()
在这个示例中,io_task
在线程池中并发执行,而 cpu_task
则在进程池中运行。这种设计模式使得我们可以在同一个程序中同时处理 I/O 密集型任务和 CPU 密集型任务,从而最大化利用系统资源。
(三)总结
协程、线程和进程各有其优点和适用场景:
-
协程:适用于 I/O 密集型任务,尤其是大量网络请求、文件操作等场景,能够高效地进行异步非阻塞操作。
-
线程:适用于 I/O 密集型任务,可以在多核 CPU 上并发执行,但需要注意线程安全和锁的问题。
-
进程:适用于 CPU 密集型任务,能够充分利用多核 CPU,但进程间通信的开销较大。
四、总结
这篇文章主要讲的是协程的基本使用,原理,以及协程、线程和进程间的差别和交叉使用,根据不同的需求使用不同的功能,使得代码运行效率更高!