当前位置: 首页 > article >正文

Autogen_core源码:_cancellation_token.py

目录

    • _cancellation_token.py代码
    • 代码解释
      • 类的初始化
      • 取消操作
      • 检查取消状态
      • 添加回调函数
      • 关联异步`Future`对象
      • 总结
    • 代码示例
      • 示例 1:基本的取消操作
      • 示例 2:添加回调函数
      • 示例 3:检查令牌是否已取消

_cancellation_token.py代码

import threading
from asyncio import Future
from typing import Any, Callable, List


class CancellationToken:
    """A token used to cancel pending async calls"""

    def __init__(self) -> None:
        self._cancelled: bool = False
        self._lock: threading.Lock = threading.Lock()
        self._callbacks: List[Callable[[], None]] = []

    def cancel(self) -> None:
        """Cancel pending async calls linked to this cancellation token."""
        with self._lock:
            if not self._cancelled:
                self._cancelled = True
                for callback in self._callbacks:
                    callback()

    def is_cancelled(self) -> bool:
        """Check if the CancellationToken has been used"""
        with self._lock:
            return self._cancelled

    def add_callback(self, callback: Callable[[], None]) -> None:
        """Attach a callback that will be called when cancel is invoked"""
        with self._lock:
            if self._cancelled:
                callback()
            else:
                self._callbacks.append(callback)

    def link_future(self, future: Future[Any]) -> Future[Any]:
        """Link a pending async call to a token to allow its cancellation"""
        with self._lock:
            if self._cancelled:
                future.cancel()
            else:

                def _cancel() -> None:
                    future.cancel()

                self._callbacks.append(_cancel)
        return future

代码解释

这段Python代码定义了一个名为CancellationToken的类,其主要功能是提供一种机制,用于取消挂起的异步调用。下面详细解释代码的逻辑和功能:

类的初始化

def __init__(self) -> None:
    self._cancelled: bool = False
    self._lock: threading.Lock = threading.Lock()
    self._callbacks: List[Callable[[], None]] = []
  • _cancelled:一个布尔类型的私有变量,用于标记该取消令牌是否已经被使用(即是否已经调用了cancel方法),初始值为False
  • _lock:一个线程锁对象,用于确保在多线程环境下对共享资源(如_cancelled_callbacks)的访问是线程安全的。
  • _callbacks:一个列表,用于存储当调用cancel方法时需要执行的回调函数。

取消操作

def cancel(self) -> None:
    """Cancel pending async calls linked to this cancellation token."""
    with self._lock:
        if not self._cancelled:
            self._cancelled = True
            for callback in self._callbacks:
                callback()
  • cancel方法用于取消与该取消令牌关联的所有挂起的异步调用。
  • 使用with self._lock语句确保在修改_cancelled状态和执行回调函数时不会出现竞态条件。
  • 只有当_cancelledFalse时,才会将其设置为True,并依次执行_callbacks列表中的所有回调函数。

检查取消状态

def is_cancelled(self) -> bool:
    """Check if the CancellationToken has been used"""
    with self._lock:
        return self._cancelled
  • is_cancelled方法用于检查该取消令牌是否已经被使用。
  • 使用with self._lock语句确保在读取_cancelled状态时不会出现竞态条件。
  • 返回_cancelled的值。

添加回调函数

def add_callback(self, callback: Callable[[], None]) -> None:
    """Attach a callback that will be called when cancel is invoked"""
    with self._lock:
        if self._cancelled:
            callback()
        else:
            self._callbacks.append(callback)
  • add_callback方法用于添加一个回调函数,当调用cancel方法时,该回调函数将被执行。
  • 使用with self._lock语句确保在检查_cancelled状态和修改_callbacks列表时不会出现竞态条件。
  • 如果_cancelledTrue,说明取消操作已经发生,直接执行回调函数;否则,将回调函数添加到_callbacks列表中。

关联异步Future对象

def link_future(self, future: Future[Any]) -> Future[Any]:
    """Link a pending async call to a token to allow its cancellation"""
    with self._lock:
        if self._cancelled:
            future.cancel()
        else:
            def _cancel() -> None:
                future.cancel()
            self._callbacks.append(_cancel)
    return future
  • link_future方法用于将一个异步Future对象与该取消令牌关联起来,以便可以取消该异步调用。
  • 使用with self._lock语句确保在检查_cancelled状态和修改_callbacks列表时不会出现竞态条件。
  • 如果_cancelledTrue,说明取消操作已经发生,直接取消Future对象;否则,定义一个内部函数_cancel,用于取消Future对象,并将其添加到_callbacks列表中。
  • 最后返回Future对象。

总结

CancellationToken类提供了一种机制,允许用户在需要时取消挂起的异步调用。通过添加回调函数和关联Future对象,当调用cancel方法时,所有与该取消令牌关联的操作都将被取消。同时,使用线程锁确保了在多线程环境下的线程安全。

代码示例

示例 1:基本的取消操作

import asyncio
from typing import Any, Callable, List
import threading
from asyncio import Future


class CancellationToken:
    """A token used to cancel pending async calls"""

    def __init__(self) -> None:
        self._cancelled: bool = False
        self._lock: threading.Lock = threading.Lock()
        self._callbacks: List[Callable[[], None]] = []

    def cancel(self) -> None:
        """Cancel pending async calls linked to this cancellation token."""
        with self._lock:
            if not self._cancelled:
                self._cancelled = True
                for callback in self._callbacks:
                    callback()

    def is_cancelled(self) -> bool:
        """Check if the CancellationToken has been used"""
        with self._lock:
            return self._cancelled

    def add_callback(self, callback: Callable[[], None]) -> None:
        """Attach a callback that will be called when cancel is invoked"""
        with self._lock:
            if self._cancelled:
                callback()
            else:
                self._callbacks.append(callback)

    def link_future(self, future: Future[Any]) -> Future[Any]:
        """Link a pending async call to a token to allow its cancellation"""
        with self._lock:
            if self._cancelled:
                future.cancel()
            else:

                def _cancel() -> None:
                    future.cancel()

                self._callbacks.append(_cancel)
        return future


async def long_running_task():
    print("Task started")
    await asyncio.sleep(5)
    print("Task completed")


async def main():
    token = CancellationToken()
    task = asyncio.create_task(long_running_task())
    token.link_future(task)

    # 模拟一段时间后取消任务
    await asyncio.sleep(2)
    token.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("Task was cancelled")


await main()
Task started
Task was cancelled

示例 2:添加回调函数

def callback_function():
    print("Callback function was called")


async def main():
    token = CancellationToken()
    token.add_callback(callback_function)

    # 取消令牌
    token.cancel()

await main()
Callback function was called

示例 3:检查令牌是否已取消


async def main():
    token = CancellationToken()
    print(f"Is cancelled before cancel: {token.is_cancelled()}")
    token.cancel()
    print(f"Is cancelled after cancel: {token.is_cancelled()}")

await main()
Is cancelled before cancel: False
Is cancelled after cancel: True

http://www.kler.cn/a/528653.html

相关文章:

  • 14-9-1C++STL的set容器
  • go-zero学习笔记(一)
  • ROS-IMU
  • 最近最少使用算法(LRU最近最少使用)缓存替换算法
  • Windows系统本地部署deepseek 更改目录
  • Attention--人工智能领域的核心技术
  • F. Greetings
  • 深入理解--JVM 类加载机制详解
  • Baklib揭示内容中台在企业数字化转型中的关键作用与应用探索
  • hexo部署到github page时,hexo d后page里面绑定的个人域名消失的问题
  • Spring中ObjectProvider的妙用与实例解析
  • 小白怎样部署和使用本地大模型DeepSeek ?
  • vue虚拟列表优化前端性能
  • generator 生成器,enumerate,命名空间(笔记向)
  • 【大模型LLM面试合集】大语言模型架构_llama系列模型
  • Vue.js 比较 Composition API 和 Options API
  • vsnprintf() 将可变参数格式化输出到字符数组
  • 什么是门控循环单元?
  • 爬取鲜花网站数据
  • 使用 Docker(Podman) 部署 MongoDB 数据库及使用详解
  • 白话DeepSeek-R1论文(三)| DeepSeek-R1蒸馏技术:让小模型“继承”大模型的推理超能力
  • 为AI聊天工具添加一个知识系统 之82 详细设计之23 符号逻辑 正则表达式规则 之1
  • 如何实现滑动列表功能
  • 智慧园区综合管理系统如何实现多个维度的高效管理与安全风险控制
  • c++ list的front和pop_front的概念和使用案例
  • 【3】阿里面试题整理