Autogen_core: ClosureAgent使用与测试
目录
- 第一个示例
- 第二个示例
- 完成的功能
下面两个示例展示了如何使用 AutoGen 库中的
ClosureAgent
来创建和使用代理。
ClosureAgent
允许你使用闭包(即一个没有定义类的函数)来定义代理,并从运行时中提取值。代码中展示了两个示例:
第一个示例
import asyncio
from dataclasses import dataclass
from autogen_core import (
ClosureAgent,
ClosureContext,
DefaultSubscription,
DefaultTopicId,
MessageContext,
SingleThreadedAgentRuntime,
)
"""
The closure agent allows you to define an agent using a closure, or function without needing to define a class. It allows values to be extracted out of the runtime.
The closure can define the type of message which is expected, or `Any` can be used to accept any type of message.
"""
@dataclass
class MyMessage:
content: str
async def main():
queue = asyncio.Queue[MyMessage]()
async def output_result(_ctx: ClosureContext, message: MyMessage, ctx: MessageContext) -> None:
await queue.put(message)
runtime = SingleThreadedAgentRuntime()
await ClosureAgent.register_closure(
runtime, "output_result", output_result, subscriptions=lambda: [DefaultSubscription()]
)
runtime.start()
await runtime.publish_message(MyMessage("Hello, world!"), DefaultTopicId())
await runtime.stop_when_idle()
result = await queue.get()
print(result)
await main()
MyMessage(content='Hello, world!')
这个示例定义了一个简单的代理,它接收一个消息并将其放入一个 asyncio 队列中。
- 定义了一个名为
MyMessage
的数据类,用于存储消息内容。 - 创建了一个名为
output_result
的异步函数,它接收一个ClosureContext
、一个MyMessage
和一个MessageContext
,然后将消息放入队列中。 - 创建了一个
SingleThreadedAgentRuntime
实例。 - 使用
register_closure
方法注册了output_result
函数作为一个代理。它订阅了默认的主题。 - 启动运行时,发布一个
MyMessage
消息,并在空闲时停止运行时。 - 从队列中获取结果并打印。
这个示例展示了如何注册一个闭包代理,并在运行时中发送和接收消息。
第二个示例
@dataclass
class Message:
content: str
async def test_register_receives_publish() -> None:
runtime = SingleThreadedAgentRuntime()
queue = asyncio.Queue[tuple[str, str]]()
async def log_message(closure_ctx: ClosureContext, message: Message, ctx: MessageContext) -> None:
key = closure_ctx.id.key
await queue.put((key, message.content))
await ClosureAgent.register_closure(runtime, "name", log_message, subscriptions=lambda: [DefaultSubscription()])
runtime.start()
await runtime.publish_message(Message("first message"), topic_id=DefaultTopicId())
await runtime.publish_message(Message("second message"), topic_id=DefaultTopicId())
await runtime.publish_message(Message("third message"), topic_id=DefaultTopicId())
await runtime.stop_when_idle()
assert queue.qsize() == 3
assert queue.get_nowait() == ("default", "first message")
assert queue.get_nowait() == ("default", "second message")
assert queue.get_nowait() == ("default", "third message")
assert queue.empty()
test_register_receives_publish()
<coroutine object test_register_receives_publish at 0x00000150ABEAA640>
这个示例定义了一个测试函数,用于测试注册代理、接收和发布消息的功能。
- 定义了一个名为
Message
的数据类,用于存储消息内容。 - 创建了一个名为
test_register_receives_publish
的异步测试函数。 - 在这个函数中,创建了一个
SingleThreadedAgentRuntime
实例和一个 asyncio 队列。 - 定义了一个名为
log_message
的异步函数,它接收一个ClosureContext
、一个Message
和一个MessageContext
,然后将代理的 ID 和消息内容作为一个元组放入队列中。 - 使用
register_closure
方法注册了log_message
函数作为一个代理,并订阅了默认的主题。 - 启动运行时,并发布三个
Message
消息。 - 等待运行时空闲后,检查队列中的消息数量和内容,确保有三个消息,并且它们的顺序和内容与发布的消息一致。
这个示例展示了如何测试代理的注册、消息的接收和发布功能。
完成的功能
这两个示例展示了如何使用 ClosureAgent
和 SingleThreadedAgentRuntime
来创建、注册和使用闭包代理。它们展示了如何发送和接收消息,以及如何测试代理的功能。这些示例是理解和学习如何使用 AutoGen 库来构建代理和消息传递系统的基础。
原文地址:https://blog.csdn.net/qq_41472205/article/details/145400326
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.kler.cn/a/526126.html 如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.kler.cn/a/526126.html 如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!