当前位置: 首页 > article >正文

Autogen_core: Quickstart

代码

from dataclasses import dataclass
from typing import Callable

from autogen_core import DefaultTopicId, MessageContext, RoutedAgent, default_subscription, message_handler


@dataclass
class Message:
    content: int


@default_subscription
class Modifier(RoutedAgent):
    def __init__(self, modify_val: Callable[[int], int]) -> None:
        super().__init__("A modifier agent.")
        self._modify_val = modify_val

    @message_handler
    async def handle_message(self, message: Message, ctx: MessageContext) -> None:
        val = self._modify_val(message.content)
        print(f"{'-'*80}\nModifier:\nModified {message.content} to {val}")
        await self.publish_message(Message(content=val), DefaultTopicId())  # type: ignore


@default_subscription
class Checker(RoutedAgent):
    def __init__(self, run_until: Callable[[int], bool]) -> None:
        super().__init__("A checker agent.")
        self._run_until = run_until

    @message_handler
    async def handle_message(self, message: Message, ctx: MessageContext) -> None:
        if not self._run_until(message.content):
            print(f"{'-'*80}\nChecker:\n{message.content} passed the check, continue.")
            await self.publish_message(Message(content=message.content), DefaultTopicId())
        else:
            print(f"{'-'*80}\nChecker:\n{message.content} failed the check, stopping.")

from autogen_core import AgentId, SingleThreadedAgentRuntime

runtime = SingleThreadedAgentRuntime()

await Modifier.register(
    runtime,
    "modifier",
    lambda: Modifier(modify_val=lambda x: x - 1),
)

await Checker.register(
    runtime,
    "checker",
    lambda: Checker(run_until=lambda x: x <= 1),
)

runtime.start()
await runtime.send_message(Message(10), AgentId("checker", "default"))
await runtime.stop_when_idle()

代码解析

这段代码使用 autogen_core 库实现了一个基于代理(Agent)的异步消息传递系统,其中两个代理(Modifier 和 Checker)协作处理消息,形成一个递减计算的循环,直到满足停止条件。

  1. Message 数据类
@dataclass
class Message:
    content: int

定义了一个 Message 数据类,其中 content 是一个 int 类型的字段,用于存储数值。

  1. Modifier 代理
@default_subscription
class Modifier(RoutedAgent):
    def __init__(self, modify_val: Callable[[int], int]) -> None:
        super().__init__("A modifier agent.")
        self._modify_val = modify_val

    @message_handler
    async def handle_message(self, message: Message, ctx: MessageContext) -> None:
        val = self._modify_val(message.content)
        print(f"{'-'*80}\nModifier:\nModified {message.content} to {val}")
        await self.publish_message(Message(content=val), DefaultTopicId())  # type: ignore

继承 RoutedAgent,用于接收和发送 Message

  • self._modify_val 是一个可调用函数,用于修改 Messagecontent
  • handle_message
    • 取出 message.content,调用 self._modify_val 进行修改(这里是 x - 1)。
    • 打印修改前后的值。
    • 发布新的 Message,让下一个代理(Checker)处理。
  1. Checker 代理
@default_subscription
class Checker(RoutedAgent):
    def __init__(self, run_until: Callable[[int], bool]) -> None:
        super().__init__("A checker agent.")
        self._run_until = run_until

    @message_handler
    async def handle_message(self, message: Message, ctx: MessageContext) -> None:
        if not self._run_until(message.content):
            print(f"{'-'*80}\nChecker:\n{message.content} passed the check, continue.")
            await self.publish_message(Message(content=message.content), DefaultTopicId())
        else:
            print(f"{'-'*80}\nChecker:\n{message.content} failed the check, stopping.")

继承 RoutedAgent,用于检查 Messagecontent 是否满足终止条件。

  • self._run_until 是一个可调用函数,用于判断数值是否应该终止。
  • handle_message
    • 检查 message.content 是否满足终止条件。
    • 如果不满足,打印信息并继续发送消息给 Modifier
    • 如果满足,打印停止信息,不再发送消息。
  1. 运行时 SingleThreadedAgentRuntime
runtime = SingleThreadedAgentRuntime()

创建了一个单线程代理运行时,用于调度 ModifierChecker

  1. 注册代理
await Modifier.register(
    runtime,
    "modifier",
    lambda: Modifier(modify_val=lambda x: x - 1),
)

await Checker.register(
    runtime,
    "checker",
    lambda: Checker(run_until=lambda x: x <= 1),
)
  • 注册 Modifier
    • modify_val=lambda x: x - 1:每次将数值减 1。
  • 注册 Checker
    • run_until=lambda x: x <= 1:如果 x ≤ 1,则停止处理。
  1. 运行主逻辑
runtime.start()
await runtime.send_message(Message(10), AgentId("checker", "default"))
await runtime.stop_when_idle()
  • 启动代理运行时。
  • 发送初始消息 Message(10)Checker 代理:
    • AgentId("checker", "default") 指定目标代理为 Checker
  • 运行时保持活动,直到所有任务完成后自动停止。

执行流程

初始值 10,执行过程如下:

  1. Checker 代理接收到 Message(10)
  • run_until(10) → False,未满足停止条件。
  • Checker 继续传递 Message(10)Modifier
  1. Modifier 代理接收到 Message(10)
  • modify_val(10) → 9,将 10 变为 9。
  • Modifier 发送 Message(9)
  1. Checker 代理接收到 Message(9)
  • run_until(9) → False,继续传递 Message(9)Modifier
  1. 重复以上过程,数值不断递减:
Checker: 9 passed the check, continue.
Modifier: Modified 9 to 8
Checker: 8 passed the check, continue.
Modifier: Modified 8 to 7
...
Checker: 2 passed the check, continue.
Modifier: Modified 2 to 1
  1. Checker 代理接收到 Message(1):
  • run_until(1) → True,满足停止条件。
    打印 “1 failed the check, stopping.” 并停止流程。

输出结果:

--------------------------------------------------------------------------------
Checker:
10 passed the check, continue.
--------------------------------------------------------------------------------
Modifier:
Modified 10 to 9
--------------------------------------------------------------------------------
Checker:
9 passed the check, continue.
--------------------------------------------------------------------------------
Modifier:
Modified 9 to 8
--------------------------------------------------------------------------------
Checker:
8 passed the check, continue.
--------------------------------------------------------------------------------
Modifier:
Modified 8 to 7
--------------------------------------------------------------------------------
Checker:
7 passed the check, continue.
--------------------------------------------------------------------------------
Modifier:
Modified 7 to 6
--------------------------------------------------------------------------------
Checker:
6 passed the check, continue.
--------------------------------------------------------------------------------
Modifier:
Modified 6 to 5
--------------------------------------------------------------------------------
Checker:
5 passed the check, continue.
--------------------------------------------------------------------------------
Modifier:
Modified 5 to 4
--------------------------------------------------------------------------------
Checker:
4 passed the check, continue.
--------------------------------------------------------------------------------
Modifier:
Modified 4 to 3
--------------------------------------------------------------------------------
Checker:
3 passed the check, continue.
--------------------------------------------------------------------------------
Modifier:
Modified 3 to 2
--------------------------------------------------------------------------------
Checker:
2 passed the check, continue.
--------------------------------------------------------------------------------
Modifier:
Modified 2 to 1
--------------------------------------------------------------------------------
Checker:
1 failed the check, stopping.

类似例子

代码:

from dataclasses import dataclass
from typing import Callable

from autogen_core import DefaultTopicId, MessageContext, RoutedAgent, default_subscription, message_handler

@dataclass
class Message:
    content: int

@default_subscription
class Doubler(RoutedAgent):
    def __init__(self, modify_val: Callable[[int], int]) -> None:
        super().__init__("A doubler agent.")
        self._modify_val = modify_val

    @message_handler
    async def handle_message(self, message: Message, ctx: MessageContext) -> None:
        val = self._modify_val(message.content)
        print(f"{'-'*80}\nDoubler:\nDoubled {message.content} to {val}")
        await self.publish_message(Message(content=val), DefaultTopicId())

@default_subscription
class ThresholdChecker(RoutedAgent):
    def __init__(self, threshold: int) -> None:
        super().__init__("A threshold checker agent.")
        self._threshold = threshold

    @message_handler
    async def handle_message(self, message: Message, ctx: MessageContext) -> None:
        if message.content < self._threshold:
            print(f"{'-'*80}\nThresholdChecker:\n{message.content} is below threshold, continue.")
            await self.publish_message(Message(content=message.content), DefaultTopicId())
        else:
            print(f"{'-'*80}\nThresholdChecker:\n{message.content} reached threshold, stopping.")

from autogen_core import AgentId, SingleThreadedAgentRuntime

runtime = SingleThreadedAgentRuntime()

await Doubler.register(
    runtime,
    "doubler",
    lambda: Doubler(modify_val=lambda x: x * 2),
)

await ThresholdChecker.register(
    runtime,
    "threshold_checker",
    lambda: ThresholdChecker(threshold=100),
)

runtime.start()
await runtime.send_message(Message(2), AgentId("threshold_checker", "default"))
await runtime.stop_when_idle()

输出:

--------------------------------------------------------------------------------
ThresholdChecker:
2 is below threshold, continue.
--------------------------------------------------------------------------------
Doubler:
Doubled 2 to 4
--------------------------------------------------------------------------------
ThresholdChecker:
4 is below threshold, continue.
--------------------------------------------------------------------------------
Doubler:
Doubled 4 to 8
--------------------------------------------------------------------------------
ThresholdChecker:
8 is below threshold, continue.
--------------------------------------------------------------------------------
Doubler:
Doubled 8 to 16
--------------------------------------------------------------------------------
ThresholdChecker:
16 is below threshold, continue.
--------------------------------------------------------------------------------
Doubler:
Doubled 16 to 32
--------------------------------------------------------------------------------
ThresholdChecker:
32 is below threshold, continue.
--------------------------------------------------------------------------------
Doubler:
Doubled 32 to 64
--------------------------------------------------------------------------------
ThresholdChecker:
64 is below threshold, continue.
--------------------------------------------------------------------------------
Doubler:
Doubled 64 to 128
--------------------------------------------------------------------------------
ThresholdChecker:
128 reached threshold, stopping.

参考链接:https://microsoft.github.io/autogen/stable/user-guide/core-user-guide/quickstart.html


http://www.kler.cn/a/521269.html

相关文章:

  • vulnhub靶场【kioptrix-2】靶机
  • 如何使用tushare pro获取股票数据——附爬虫代码以及tushare积分获取方式
  • Excel分区间统计分析(等步长、不等步长、多维度)
  • 瑞芯微方案:RV1126定制开发板方案定制
  • 【Elasticsearch 基础入门】Centos7下Elasticsearch 7.x安装与配置(单机)
  • 论文阅读(十六):利用线性链条件随机场模型检测阵列比较基因组杂交数据的拷贝数变异
  • DataSecOps的要点
  • 【JavaWeb学习Day13】
  • 基于Python的网易云音乐分析可视化系统的设计与实现
  • 实现一个安全且高效的图片上传接口:使用ASP.NET Core和SHA256哈希
  • Qt中Widget及其子类的相对位置移动
  • SQL 指南
  • LeetCode - Google 大模型校招10题 第1天 Attention 汇总 (3题)
  • Java后端之AOP
  • 深入 Rollup:从入门到精通(三)Rollup CLI命令行实战
  • linux设置mysql远程连接
  • GPT 本地运行输出界面简洁美观(命令行、界面、网页)
  • 基于Flask的旅游系统的设计与实现
  • jupyter版本所引起的扩展插件问题
  • 数组与链表