autogen 自定义agent (1)
目录
- 第一个自定义agent:CountDownAgent
- 代码运行逻辑
- 1. 创建 CountDownAgent 代理
- 2. 处理消息
- 3. 运行 CountDownAgent
- 另一种调用方式
- 类似的agent: CountUpAgent
第一个自定义agent:CountDownAgent
from typing import AsyncGenerator, List, Sequence, Tuple
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import AgentEvent, ChatMessage, TextMessage
from autogen_core import CancellationToken
class CountDownAgent(BaseChatAgent):
def __init__(self, name: str, count: int = 3):
super().__init__(name, "A simple agent that counts down.")
self._count = count
@property
def produced_message_types(self) -> Sequence[type[ChatMessage]]:
return (TextMessage,)
async def on_messages(self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken) -> Response:
# Calls the on_messages_stream.
response: Response | None = None
async for message in self.on_messages_stream(messages, cancellation_token):
if isinstance(message, Response):
response = message
assert response is not None
return response
async def on_messages_stream(
self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken
) -> AsyncGenerator[AgentEvent | ChatMessage | Response, None]:
inner_messages: List[AgentEvent | ChatMessage] = []
for i in range(self._count, 0, -1):
msg = TextMessage(content=f"{i}...", source=self.name)
inner_messages.append(msg)
yield msg
# The response is returned at the end of the stream.
# It contains the final message and all the inner messages.
yield Response(chat_message=TextMessage(content="Done!", source=self.name), inner_messages=inner_messages)
async def on_reset(self, cancellation_token: CancellationToken) -> None:
pass
async def run_countdown_agent() -> None:
# Create a countdown agent.
countdown_agent = CountDownAgent("countdown")
# Run the agent with a given task and stream the response.
async for message in countdown_agent.on_messages_stream([], CancellationToken()):
if isinstance(message, Response):
print(message.chat_message.content)
else:
print(message.content)
# Use asyncio.run(run_countdown_agent()) when running in a script.
await run_countdown_agent()
3...
2...
1...
Done!
代码运行逻辑
1. 创建 CountDownAgent 代理
- 该代理继承自 BaseChatAgent,它有一个 _count 变量(默认 3),用于控制倒计时次数。
- 代理的 produced_message_types 属性表明,它只会生成 TextMessage 类型的消息。
2. 处理消息
-
on_messages() 方法是主处理逻辑:
- 它异步调用 on_messages_stream() 方法,收集最终的 Response 并返回。
-
on_messages_stream() 方法:
- 遍历 _count 变量,从 count 开始倒数,每次发送 TextMessage(如 “3…”、“2…”)。
最终返回 Response 对象,其 chat_message 为 “Done!”,且 inner_messages 包含所有倒计时消息。
- 遍历 _count 变量,从 count 开始倒数,每次发送 TextMessage(如 “3…”、“2…”)。
3. 运行 CountDownAgent
- run_countdown_agent() 创建 CountDownAgent,并调用 on_messages_stream() 处理空输入 messages=[]。
- async for 遍历代理生成的消息,逐步打印倒计时消息,最终打印 “Done!”。
另一种调用方式
async def run_countdown_agent() -> None:
# Create a countdown agent.
countdown_agent = CountDownAgent("countdown")
# Run the agent with a given task and stream the response.
res = await countdown_agent.on_messages([], CancellationToken())
print(res)
# Use asyncio.run(run_countdown_agent()) when running in a script.
await run_countdown_agent()
Response(chat_message=TextMessage(source='countdown', models_usage=None, content='Done!', type='TextMessage'), inner_messages=[TextMessage(source='countdown', models_usage=None, content='3...', type='TextMessage'), TextMessage(source='countdown', models_usage=None, content='2...', type='TextMessage'), TextMessage(source='countdown', models_usage=None, content='1...', type='TextMessage')])
类似的agent: CountUpAgent
from typing import AsyncGenerator, List, Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import AgentEvent, ChatMessage, TextMessage
from autogen_core import CancellationToken
class CountUpAgent(BaseChatAgent):
def __init__(self, name: str, count: int = 3):
super().__init__(name, "A simple agent that counts up.")
self._count = count
@property
def produced_message_types(self) -> Sequence[type[ChatMessage]]:
return (TextMessage,)
async def on_messages(self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken) -> Response:
response: Response | None = None
async for message in self.on_messages_stream(messages, cancellation_token):
if isinstance(message, Response):
response = message
assert response is not None
return response
async def on_messages_stream(
self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken
) -> AsyncGenerator[AgentEvent | ChatMessage | Response, None]:
inner_messages: List[AgentEvent | ChatMessage] = []
for i in range(1, self._count + 1):
msg = TextMessage(content=f"{i}...", source=self.name)
inner_messages.append(msg)
yield msg
yield Response(chat_message=TextMessage(content="Done!", source=self.name), inner_messages=inner_messages)
async def on_reset(self, cancellation_token: CancellationToken) -> None:
pass
async def run_countup_agent() -> None:
countup_agent = CountUpAgent("countup")
async for message in countup_agent.on_messages_stream([], CancellationToken()):
if isinstance(message, Response):
print(message.chat_message.content)
else:
print(message.content)
# Use asyncio.run(run_countup_agent()) when running in a script.
await run_countup_agent()
1...
2...
3...
Done!