Autogen_core源码:_cancellation_token.py
目录
- _cancellation_token.py代码
- 代码解释
- 类的初始化
- 取消操作
- 检查取消状态
- 添加回调函数
- 关联异步`Future`对象
- 总结
- 代码示例
- 示例 1:基本的取消操作
- 示例 2:添加回调函数
- 示例 3:检查令牌是否已取消
_cancellation_token.py代码
import threading
from asyncio import Future
from typing import Any, Callable, List
class CancellationToken:
"""A token used to cancel pending async calls"""
def __init__(self) -> None:
self._cancelled: bool = False
self._lock: threading.Lock = threading.Lock()
self._callbacks: List[Callable[[], None]] = []
def cancel(self) -> None:
"""Cancel pending async calls linked to this cancellation token."""
with self._lock:
if not self._cancelled:
self._cancelled = True
for callback in self._callbacks:
callback()
def is_cancelled(self) -> bool:
"""Check if the CancellationToken has been used"""
with self._lock:
return self._cancelled
def add_callback(self, callback: Callable[[], None]) -> None:
"""Attach a callback that will be called when cancel is invoked"""
with self._lock:
if self._cancelled:
callback()
else:
self._callbacks.append(callback)
def link_future(self, future: Future[Any]) -> Future[Any]:
"""Link a pending async call to a token to allow its cancellation"""
with self._lock:
if self._cancelled:
future.cancel()
else:
def _cancel() -> None:
future.cancel()
self._callbacks.append(_cancel)
return future
代码解释
这段Python代码定义了一个名为CancellationToken
的类,其主要功能是提供一种机制,用于取消挂起的异步调用。下面详细解释代码的逻辑和功能:
类的初始化
def __init__(self) -> None:
self._cancelled: bool = False
self._lock: threading.Lock = threading.Lock()
self._callbacks: List[Callable[[], None]] = []
_cancelled
:一个布尔类型的私有变量,用于标记该取消令牌是否已经被使用(即是否已经调用了cancel
方法),初始值为False
。_lock
:一个线程锁对象,用于确保在多线程环境下对共享资源(如_cancelled
和_callbacks
)的访问是线程安全的。_callbacks
:一个列表,用于存储当调用cancel
方法时需要执行的回调函数。
取消操作
def cancel(self) -> None:
"""Cancel pending async calls linked to this cancellation token."""
with self._lock:
if not self._cancelled:
self._cancelled = True
for callback in self._callbacks:
callback()
cancel
方法用于取消与该取消令牌关联的所有挂起的异步调用。- 使用
with self._lock
语句确保在修改_cancelled
状态和执行回调函数时不会出现竞态条件。 - 只有当
_cancelled
为False
时,才会将其设置为True
,并依次执行_callbacks
列表中的所有回调函数。
检查取消状态
def is_cancelled(self) -> bool:
"""Check if the CancellationToken has been used"""
with self._lock:
return self._cancelled
is_cancelled
方法用于检查该取消令牌是否已经被使用。- 使用
with self._lock
语句确保在读取_cancelled
状态时不会出现竞态条件。 - 返回
_cancelled
的值。
添加回调函数
def add_callback(self, callback: Callable[[], None]) -> None:
"""Attach a callback that will be called when cancel is invoked"""
with self._lock:
if self._cancelled:
callback()
else:
self._callbacks.append(callback)
add_callback
方法用于添加一个回调函数,当调用cancel
方法时,该回调函数将被执行。- 使用
with self._lock
语句确保在检查_cancelled
状态和修改_callbacks
列表时不会出现竞态条件。 - 如果
_cancelled
为True
,说明取消操作已经发生,直接执行回调函数;否则,将回调函数添加到_callbacks
列表中。
关联异步Future
对象
def link_future(self, future: Future[Any]) -> Future[Any]:
"""Link a pending async call to a token to allow its cancellation"""
with self._lock:
if self._cancelled:
future.cancel()
else:
def _cancel() -> None:
future.cancel()
self._callbacks.append(_cancel)
return future
link_future
方法用于将一个异步Future
对象与该取消令牌关联起来,以便可以取消该异步调用。- 使用
with self._lock
语句确保在检查_cancelled
状态和修改_callbacks
列表时不会出现竞态条件。 - 如果
_cancelled
为True
,说明取消操作已经发生,直接取消Future
对象;否则,定义一个内部函数_cancel
,用于取消Future
对象,并将其添加到_callbacks
列表中。 - 最后返回
Future
对象。
总结
CancellationToken
类提供了一种机制,允许用户在需要时取消挂起的异步调用。通过添加回调函数和关联Future
对象,当调用cancel
方法时,所有与该取消令牌关联的操作都将被取消。同时,使用线程锁确保了在多线程环境下的线程安全。
代码示例
示例 1:基本的取消操作
import asyncio
from typing import Any, Callable, List
import threading
from asyncio import Future
class CancellationToken:
"""A token used to cancel pending async calls"""
def __init__(self) -> None:
self._cancelled: bool = False
self._lock: threading.Lock = threading.Lock()
self._callbacks: List[Callable[[], None]] = []
def cancel(self) -> None:
"""Cancel pending async calls linked to this cancellation token."""
with self._lock:
if not self._cancelled:
self._cancelled = True
for callback in self._callbacks:
callback()
def is_cancelled(self) -> bool:
"""Check if the CancellationToken has been used"""
with self._lock:
return self._cancelled
def add_callback(self, callback: Callable[[], None]) -> None:
"""Attach a callback that will be called when cancel is invoked"""
with self._lock:
if self._cancelled:
callback()
else:
self._callbacks.append(callback)
def link_future(self, future: Future[Any]) -> Future[Any]:
"""Link a pending async call to a token to allow its cancellation"""
with self._lock:
if self._cancelled:
future.cancel()
else:
def _cancel() -> None:
future.cancel()
self._callbacks.append(_cancel)
return future
async def long_running_task():
print("Task started")
await asyncio.sleep(5)
print("Task completed")
async def main():
token = CancellationToken()
task = asyncio.create_task(long_running_task())
token.link_future(task)
# 模拟一段时间后取消任务
await asyncio.sleep(2)
token.cancel()
try:
await task
except asyncio.CancelledError:
print("Task was cancelled")
await main()
Task started
Task was cancelled
示例 2:添加回调函数
def callback_function():
print("Callback function was called")
async def main():
token = CancellationToken()
token.add_callback(callback_function)
# 取消令牌
token.cancel()
await main()
Callback function was called
示例 3:检查令牌是否已取消
async def main():
token = CancellationToken()
print(f"Is cancelled before cancel: {token.is_cancelled()}")
token.cancel()
print(f"Is cancelled after cancel: {token.is_cancelled()}")
await main()
Is cancelled before cancel: False
Is cancelled after cancel: True