当前位置: 首页 > article >正文

Autogen_core 测试代码:test_cancellation.py

目录

      • 第一段代码:定义 `LongRunningAgent` 类
        • 主要逻辑:
        • 完成的功能:
      • 第二段代码:定义 `NestingLongRunningAgent` 类
        • 主要逻辑:
        • 完成的功能:
      • 第三段代码:测试取消功能
        • 主要逻辑:
        • 完成的功能:
      • 第四段代码:测试嵌套取消功能(仅外层调用)
        • 主要逻辑:
        • 完成的功能:

第一段代码:定义 LongRunningAgent

import asyncio
from dataclasses import dataclass

import pytest
from autogen_core import (
    AgentId,
    AgentInstantiationContext,
    CancellationToken,
    MessageContext,
    RoutedAgent,
    SingleThreadedAgentRuntime,
    message_handler,
)

@dataclass
class MessageType: ...

# Note for future reader:
# To do cancellation, only the token should be interacted with as a user
# If you cancel a future, it may not work as you expect.
class LongRunningAgent(RoutedAgent):
    def __init__(self) -> None:
        super().__init__("A long running agent")
        self.called = False
        self.cancelled = False

    @message_handler
    async def on_new_message(self, message: MessageType, ctx: MessageContext) -> MessageType:
        self.called = True
        sleep = asyncio.ensure_future(asyncio.sleep(100))
        ctx.cancellation_token.link_future(sleep)
        try:
            await sleep
            return MessageType()
        except asyncio.CancelledError:
            self.cancelled = True
            raise


class NestingLongRunningAgent(RoutedAgent):
    def __init__(self, nested_agent: AgentId) -> None:
        super().__init__("A nesting long running agent")
        self.called = False
        self.cancelled = False
        self._nested_agent = nested_agent

    @message_handler
    async def on_new_message(self, message: MessageType, ctx: MessageContext) -> MessageType:
        self.called = True
        response = self.send_message(message, self._nested_agent, cancellation_token=ctx.cancellation_token)
        try:
            val = await response
            assert isinstance(val, MessageType)
            return val
        except asyncio.CancelledError:
            self.cancelled = True
            raise

这段代码定义了一个名为 LongRunningAgent 的类,它继承自 RoutedAgent。这个类的主要功能是处理长时间运行的任务,并支持取消操作。

主要逻辑:
  1. 初始化:在 __init__ 方法中,设置了代理的名称,并初始化了两个标志 calledcancelled,用于跟踪任务是否被调用和是否被取消。

  2. 消息处理:通过装饰器 @message_handler 定义了一个异步方法 on_new_message,用于处理接收到的消息。

    • 设置长时间运行任务:使用 asyncio.sleep(100) 创建一个长时间运行的任务,并将其封装为 asyncio.Future 对象。

    • 链接取消令牌:通过 ctx.cancellation_token.link_future(sleep) 将取消令牌与长时间运行的任务链接起来,这样当令牌被取消时,任务也会被取消。

    • 等待任务完成:使用 await sleep 等待任务完成。

    • 处理取消:如果任务在等待过程中被取消(即接收到 asyncio.CancelledError),则设置 cancelled 标志为 True 并重新抛出异常。

完成的功能:
  • 定义了一个能够处理长时间运行任务的代理。

  • 支持通过取消令牌取消长时间运行的任务。

第二段代码:定义 NestingLongRunningAgent

async def test_cancellation_with_token() -> None:
    runtime = SingleThreadedAgentRuntime()

    await LongRunningAgent.register(runtime, "long_running", LongRunningAgent)
    agent_id = AgentId("long_running", key="default")
    token = CancellationToken()
    response = asyncio.create_task(runtime.send_message(MessageType(), recipient=agent_id, cancellation_token=token))
    assert not response.done()

    while runtime.unprocessed_messages_count == 0:
        await asyncio.sleep(0.01)

    await runtime._process_next()  # type: ignore

    token.cancel()

    with pytest.raises(asyncio.CancelledError):
        await response

    assert response.done()
    long_running_agent = await runtime.try_get_underlying_agent_instance(agent_id, type=LongRunningAgent)
    assert long_running_agent.called
    assert long_running_agent.cancelled
await test_cancellation_with_token()

这段代码定义了一个名为 NestingLongRunningAgent 的类,它也继承自 RoutedAgent。这个类的主要功能是嵌套调用另一个代理,并处理长时间运行的任务。

主要逻辑:
  1. 初始化:在 __init__ 方法中,设置了代理的名称,并初始化了两个标志 calledcancelled,同时接收一个嵌套代理的 AgentId

  2. 消息处理:通过装饰器 @message_handler 定义了一个异步方法 on_new_message,用于处理接收到的消息。

    • 调用嵌套代理:使用 self.send_message 向嵌套代理发送消息,并将取消令牌传递给嵌套代理。

    • 等待响应:使用 await response 等待嵌套代理的响应。

    • 处理取消:如果嵌套代理在等待过程中被取消(即接收到 asyncio.CancelledError),则设置 cancelled 标志为 True 并重新抛出异常。

完成的功能:
  • 定义了一个能够嵌套调用另一个代理的代理。

  • 支持通过取消令牌取消嵌套代理的任务。

第三段代码:测试取消功能

async def test_nested_cancellation_only_outer_called() -> None:
    runtime = SingleThreadedAgentRuntime()

    await LongRunningAgent.register(runtime, "long_running", LongRunningAgent)
    await NestingLongRunningAgent.register(
        runtime,
        "nested",
        lambda: NestingLongRunningAgent(AgentId("long_running", key=AgentInstantiationContext.current_agent_id().key)),
    )

    long_running_id = AgentId("long_running", key="default")
    nested_id = AgentId("nested", key="default")
    token = CancellationToken()
    response = asyncio.create_task(runtime.send_message(MessageType(), nested_id, cancellation_token=token))
    assert not response.done()

    while runtime.unprocessed_messages_count == 0:
        await asyncio.sleep(0.01)

    await runtime._process_next()  # type: ignore
    token.cancel()

    with pytest.raises(asyncio.CancelledError):
        await response

    assert response.done()
    nested_agent = await runtime.try_get_underlying_agent_instance(nested_id, type=NestingLongRunningAgent)
    assert nested_agent.called
    assert nested_agent.cancelled
    long_running_agent = await runtime.try_get_underlying_agent_instance(long_running_id, type=LongRunningAgent)
    assert long_running_agent.called is False
    assert long_running_agent.cancelled is False

await test_nested_cancellation_only_outer_called()

这段代码定义了一个异步函数 test_cancellation_with_token,用于测试 LongRunningAgent 的取消功能。

主要逻辑:
  1. 初始化运行时环境:创建一个 SingleThreadedAgentRuntime 实例。

  2. 注册代理:注册 LongRunningAgent

  3. 创建取消令牌:创建一个 CancellationToken 实例。

  4. 发送消息:使用 runtime.send_message 向代理发送消息,并将取消令牌传递给代理。

  5. 等待代理开始处理消息:使用 while 循环等待代理开始处理消息。

  6. 取消任务:调用 token.cancel() 取消任务。

  7. 检查结果:检查任务是否被取消,并验证代理的 calledcancelled 标志。

完成的功能:
  • 测试 LongRunningAgent 的取消功能。

  • 验证代理在取消操作后,能够正确地抛出 asyncio.CancelledError 异常。

  • 验证代理的 calledcancelled 标志是否正确设置。

第四段代码:测试嵌套取消功能(仅外层调用)


async def test_nested_cancellation_inner_called() -> None:
    runtime = SingleThreadedAgentRuntime()

    await LongRunningAgent.register(runtime, "long_running", LongRunningAgent)
    await NestingLongRunningAgent.register(
        runtime,
        "nested",
        lambda: NestingLongRunningAgent(AgentId("long_running", key=AgentInstantiationContext.current_agent_id().key)),
    )

    long_running_id = AgentId("long_running", key="default")
    nested_id = AgentId("nested", key="default")

    token = CancellationToken()
    response = asyncio.create_task(runtime.send_message(MessageType(), nested_id, cancellation_token=token))
    assert not response.done()

    while runtime.unprocessed_messages_count == 0:
        await asyncio.sleep(0.01)

    await runtime._process_next()  # type: ignore
    # allow the inner agent to process
    await runtime._process_next()  # type: ignore
    token.cancel()

    with pytest.raises(asyncio.CancelledError):
        await response

    assert response.done()
    nested_agent = await runtime.try_get_underlying_agent_instance(nested_id, type=NestingLongRunningAgent)
    assert nested_agent.called
    assert nested_agent.cancelled
    long_running_agent = await runtime.try_get_underlying_agent_instance(long_running_id, type=LongRunningAgent)
    assert long_running_agent.called
    assert long_running_agent.cancelled
    
await test_nested_cancellation_inner_called()

这段代码定义了一个异步函数 test_nested_cancellation_only_outer_called,用于测试 NestingLongRunningAgent 的取消功能,仅取消外层代理。

主要逻辑:
  1. 初始化运行时环境:创建一个 SingleThreadedAgentRuntime 实例。

  2. 注册代理:注册 LongRunningAgentNestingLongRunningAgent

  3. 创建取消令牌:创建一个 CancellationToken 实例。

  4. 发送消息:使用 runtime.send_message 向嵌套代理发送消息,并将取消令牌传递给代理。

  5. 等待代理开始处理消息:使用 while 循环等待代理开始处理消息。

  6. 取消任务:调用 token.cancel() 取消任务。

  7. 检查结果:检查任务是否被取消,并验证代理的 calledcancelled 标志。

完成的功能:
  • 嵌套代理的取消传播
    测试当一个父任务(NestingLongRunningAgent)被取消时,其内部的子任务(LongRunningAgent)是否也会被正确取消。

    • 父任务(nested)启动后触发子任务(long_running)。

    • 通过 CancellationToken 取消父任务,观察子任务是否同步取消。

  • 异步任务的状态验证
    确保任务取消后,所有相关代理(Agent)的状态正确更新:

    • called:标记代理是否被调用。

    • cancelled:标记代理是否被取消。


http://www.kler.cn/a/526735.html

相关文章:

  • 0 基础学运维:解锁 K8s 云计算运维工程师成长密码
  • rust学习-rust中的保留字
  • 数据结构 队列
  • Golang笔记——常用库context和runtime
  • python3+TensorFlow 2.x(三)手写数字识别
  • 蓝牙技术在物联网中的应用有哪些
  • Electron工具Electron Fiddle
  • 【TypeScript】TypeScript 运算符
  • AI 的安全性与合规性:实践中的最佳安全策略
  • 【Block总结】PKI 模块,无膨胀多尺度卷积,增强特征提取的能力|即插即用
  • 【华为OD-E卷 - 分积木 100分(python、java、c++、js、c)】
  • Autogen_core: test_code_executor.py
  • 算法---快速排序
  • Python3 【集合】避坑指南:常见错误解析
  • 【解决方案】MuMu模拟器移植系统进度条卡住98%无法打开
  • 快速提升网站收录:避免常见SEO误区
  • 深入解析JPA实体监听器的使用与实践
  • AI学习指南HuggingFace篇-Hugging Face 的环境搭建
  • 自由学习记录(33)
  • INCOSE需求编写指南-附录 B: 首字母缩略词和缩写
  • 详解python的单例模式
  • DeepSeek-V3技术报告解读
  • Cocos Creator 3.8 2D 游戏开发知识点整理
  • Sqoop支持ORC文件格式
  • AI大模型开发原理篇-4:神经概率语言模型NPLM
  • 【C++题解】1055. 求满足条件的整数个数