当前位置: 首页 > article >正文

autogen 自定义agent (1)

目录

    • 第一个自定义agent:CountDownAgent
    • 代码运行逻辑
      • 1. 创建 CountDownAgent 代理
      • 2. 处理消息
      • 3. 运行 CountDownAgent
    • 另一种调用方式
    • 类似的agent: CountUpAgent

第一个自定义agent:CountDownAgent

from typing import AsyncGenerator, List, Sequence, Tuple

from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import AgentEvent, ChatMessage, TextMessage
from autogen_core import CancellationToken


class CountDownAgent(BaseChatAgent):
    def __init__(self, name: str, count: int = 3):
        super().__init__(name, "A simple agent that counts down.")
        self._count = count

    @property
    def produced_message_types(self) -> Sequence[type[ChatMessage]]:
        return (TextMessage,)

    async def on_messages(self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken) -> Response:
        # Calls the on_messages_stream.
        response: Response | None = None
        async for message in self.on_messages_stream(messages, cancellation_token):
            if isinstance(message, Response):
                response = message
        assert response is not None
        return response

    async def on_messages_stream(
        self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken
    ) -> AsyncGenerator[AgentEvent | ChatMessage | Response, None]:
        inner_messages: List[AgentEvent | ChatMessage] = []
        for i in range(self._count, 0, -1):
            msg = TextMessage(content=f"{i}...", source=self.name)
            inner_messages.append(msg)
            yield msg
        # The response is returned at the end of the stream.
        # It contains the final message and all the inner messages.
        yield Response(chat_message=TextMessage(content="Done!", source=self.name), inner_messages=inner_messages)

    async def on_reset(self, cancellation_token: CancellationToken) -> None:
        pass


async def run_countdown_agent() -> None:
    # Create a countdown agent.
    countdown_agent = CountDownAgent("countdown")

    # Run the agent with a given task and stream the response.
    async for message in countdown_agent.on_messages_stream([], CancellationToken()):
        if isinstance(message, Response):
            print(message.chat_message.content)
        else:
            print(message.content)


# Use asyncio.run(run_countdown_agent()) when running in a script.
await run_countdown_agent()
3...
2...
1...
Done!

代码运行逻辑

1. 创建 CountDownAgent 代理

  • 该代理继承自 BaseChatAgent,它有一个 _count 变量(默认 3),用于控制倒计时次数。
  • 代理的 produced_message_types 属性表明,它只会生成 TextMessage 类型的消息。

2. 处理消息

  • on_messages() 方法是主处理逻辑:

    • 它异步调用 on_messages_stream() 方法,收集最终的 Response 并返回。
  • on_messages_stream() 方法:

    • 遍历 _count 变量,从 count 开始倒数,每次发送 TextMessage(如 “3…”、“2…”)。
      最终返回 Response 对象,其 chat_message 为 “Done!”,且 inner_messages 包含所有倒计时消息。

3. 运行 CountDownAgent

  • run_countdown_agent() 创建 CountDownAgent,并调用 on_messages_stream() 处理空输入 messages=[]。
  • async for 遍历代理生成的消息,逐步打印倒计时消息,最终打印 “Done!”。

另一种调用方式

async def run_countdown_agent() -> None:
    # Create a countdown agent.
    countdown_agent = CountDownAgent("countdown")

    # Run the agent with a given task and stream the response.
    res = await countdown_agent.on_messages([], CancellationToken())
    print(res)

# Use asyncio.run(run_countdown_agent()) when running in a script.
await run_countdown_agent()
Response(chat_message=TextMessage(source='countdown', models_usage=None, content='Done!', type='TextMessage'), inner_messages=[TextMessage(source='countdown', models_usage=None, content='3...', type='TextMessage'), TextMessage(source='countdown', models_usage=None, content='2...', type='TextMessage'), TextMessage(source='countdown', models_usage=None, content='1...', type='TextMessage')])

类似的agent: CountUpAgent

from typing import AsyncGenerator, List, Sequence

from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import AgentEvent, ChatMessage, TextMessage
from autogen_core import CancellationToken


class CountUpAgent(BaseChatAgent):
    def __init__(self, name: str, count: int = 3):
        super().__init__(name, "A simple agent that counts up.")
        self._count = count

    @property
    def produced_message_types(self) -> Sequence[type[ChatMessage]]:
        return (TextMessage,)

    async def on_messages(self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken) -> Response:
        response: Response | None = None
        async for message in self.on_messages_stream(messages, cancellation_token):
            if isinstance(message, Response):
                response = message
        assert response is not None
        return response

    async def on_messages_stream(
        self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken
    ) -> AsyncGenerator[AgentEvent | ChatMessage | Response, None]:
        inner_messages: List[AgentEvent | ChatMessage] = []
        for i in range(1, self._count + 1):
            msg = TextMessage(content=f"{i}...", source=self.name)
            inner_messages.append(msg)
            yield msg
        yield Response(chat_message=TextMessage(content="Done!", source=self.name), inner_messages=inner_messages)

    async def on_reset(self, cancellation_token: CancellationToken) -> None:
        pass


async def run_countup_agent() -> None:
    countup_agent = CountUpAgent("countup")
    async for message in countup_agent.on_messages_stream([], CancellationToken()):
        if isinstance(message, Response):
            print(message.chat_message.content)
        else:
            print(message.content)

# Use asyncio.run(run_countup_agent()) when running in a script.
await run_countup_agent()
1...
2...
3...
Done!

http://www.kler.cn/a/527599.html

相关文章:

  • 计算机网络之计算机网络的分类
  • 一文读懂 Faiss:开启高维向量高效检索的大门
  • Docker小游戏 | 使用Docker部署2048网页小游戏
  • DeepSeek的崛起与全球科技市场的震荡
  • 供应链系统设计-供应链中台系统设计(十二)- 清结算中心设计篇(一)
  • 商密测评题库详解:商用密码应用安全性评估从业人员考核题库详细解析(9)
  • 基于排队理论的物联网发布/订阅通信系统建模与优化
  • 第二讲:类与对象(上)
  • deepseek大模型本机部署
  • OSCP:常见文件传输方法
  • OSCP 渗透测试:网络抓包工具的使用指南
  • Java多线程——对象的共享
  • DeepSeek本地部署(windows)
  • 软件测试(认识测试)
  • 无人机图传模块 wfb-ng openipc-fpv,4G
  • 【易理解】04_什么是try-catch-throw语句?
  • socket编程短平快
  • 计算机网络一点事(24)
  • 漏洞扫描工具之xray
  • 【视频+图文讲解】HTML基础2-html骨架与基本语法
  • OpenCV:Harris、Shi-Tomasi角点检测
  • 【小白学AI系列】NLP 核心知识点(六)Softmax函数介绍
  • 如何优化轮式移动机器人的运动稳定性?
  • 仿真设计|基于51单片机的低频信号控制系统仿真
  • PostgreSQL图插件AGE
  • DeepSeek-R1 论文解读 —— 强化学习大语言模型新时代来临?