Autogen_core源码:_agent_runtime.py
目录
- _agent_runtime.py代码
- 代码解释
- 代码逻辑概述
- 具体功能解释
- 1. 发送消息
- 2. 发布消息
- 3. 注册代理工厂
- 4. 获取底层代理实例
- 5. 获取代理ID
- 6. 保存和加载运行时状态
- 7. 获取代理元数据
- 8. 保存和加载单个代理状态
- 9. 添加和移除订阅
- 10. 添加消息序列化器
- 总结
- 代码示例
- 示例 1:发送消息
- 示例 2:发布消息
- 示例 3:注册代理工厂
- 示例 4:获取代理实例
- 示例 5:保存和加载运行时状态
- 示例 6:添加和移除订阅
- 示例 7:添加消息序列化器
_agent_runtime.py代码
from __future__ import annotations
from collections.abc import Sequence
from typing import Any, Awaitable, Callable, Mapping, Protocol, Type, TypeVar, overload, runtime_checkable
from ._agent import Agent
from ._agent_id import AgentId
from ._agent_metadata import AgentMetadata
from ._agent_type import AgentType
from ._cancellation_token import CancellationToken
from ._serialization import MessageSerializer
from ._subscription import Subscription
from ._topic import TopicId
# Undeliverable - error
T = TypeVar("T", bound=Agent)
@runtime_checkable
class AgentRuntime(Protocol):
async def send_message(
self,
message: Any,
recipient: AgentId,
*,
sender: AgentId | None = None,
cancellation_token: CancellationToken | None = None,
message_id: str | None = None,
) -> Any:
"""Send a message to an agent and get a response.
Args:
message (Any): The message to send.
recipient (AgentId): The agent to send the message to.
sender (AgentId | None, optional): Agent which sent the message. Should **only** be None if this was sent from no agent, such as directly to the runtime externally. Defaults to None.
cancellation_token (CancellationToken | None, optional): Token used to cancel an in progress . Defaults to None.
Raises:
CantHandleException: If the recipient cannot handle the message.
UndeliverableException: If the message cannot be delivered.
Other: Any other exception raised by the recipient.
Returns:
Any: The response from the agent.
"""
...
async def publish_message(
self,
message: Any,
topic_id: TopicId,
*,
sender: AgentId | None = None,
cancellation_token: CancellationToken | None = None,
message_id: str | None = None,
) -> None:
"""Publish a message to all agents in the given namespace, or if no namespace is provided, the namespace of the sender.
No responses are expected from publishing.
Args:
message (Any): The message to publish.
topic (TopicId): The topic to publish the message to.
sender (AgentId | None, optional): The agent which sent the message. Defaults to None.
cancellation_token (CancellationToken | None, optional): Token used to cancel an in progress. Defaults to None.
message_id (str | None, optional): The message id. If None, a new message id will be generated. Defaults to None. This message id must be unique. and is recommended to be a UUID.
Raises:
UndeliverableException: If the message cannot be delivered.
"""
...
async def register_factory(
self,
type: str | AgentType,
agent_factory: Callable[[], T | Awaitable[T]],
*,
expected_class: type[T] | None = None,
) -> AgentType:
"""Register an agent factory with the runtime associated with a specific type. The type must be unique. This API does not add any subscriptions.
.. note::
This is a low level API and usually the agent class's `register` method should be used instead, as this also handles subscriptions automatically.
Example:
.. code-block:: python
from dataclasses import dataclass
from autogen_core import AgentRuntime, MessageContext, RoutedAgent, event
from autogen_core.models import UserMessage
@dataclass
class MyMessage:
content: str
class MyAgent(RoutedAgent):
def __init__(self) -> None:
super().__init__("My core agent")
@event
async def handler(self, message: UserMessage, context: MessageContext) -> None:
print("Event received: ", message.content)
async def my_agent_factory():
return MyAgent()
async def main() -> None:
runtime: AgentRuntime = ... # type: ignore
await runtime.register_factory("my_agent", lambda: MyAgent())
import asyncio
asyncio.run(main())
Args:
type (str): The type of agent this factory creates. It is not the same as agent class name. The `type` parameter is used to differentiate between different factory functions rather than agent classes.
agent_factory (Callable[[], T]): The factory that creates the agent, where T is a concrete Agent type. Inside the factory, use `autogen_core.AgentInstantiationContext` to access variables like the current runtime and agent ID.
expected_class (type[T] | None, optional): The expected class of the agent, used for runtime validation of the factory. Defaults to None.
"""
...
# TODO: uncomment out the following type ignore when this is fixed in mypy: https://github.com/python/mypy/issues/3737
async def try_get_underlying_agent_instance(self, id: AgentId, type: Type[T] = Agent) -> T: # type: ignore[assignment]
"""Try to get the underlying agent instance by name and namespace. This is generally discouraged (hence the long name), but can be useful in some cases.
If the underlying agent is not accessible, this will raise an exception.
Args:
id (AgentId): The agent id.
type (Type[T], optional): The expected type of the agent. Defaults to Agent.
Returns:
T: The concrete agent instance.
Raises:
LookupError: If the agent is not found.
NotAccessibleError: If the agent is not accessible, for example if it is located remotely.
TypeError: If the agent is not of the expected type.
"""
...
@overload
async def get(self, id: AgentId, /, *, lazy: bool = ...) -> AgentId: ...
@overload
async def get(self, type: AgentType | str, /, key: str = ..., *, lazy: bool = ...) -> AgentId: ...
async def get(
self, id_or_type: AgentId | AgentType | str, /, key: str = "default", *, lazy: bool = True
) -> AgentId: ...
async def save_state(self) -> Mapping[str, Any]:
"""Save the state of the entire runtime, including all hosted agents. The only way to restore the state is to pass it to :meth:`load_state`.
The structure of the state is implementation defined and can be any JSON serializable object.
Returns:
Mapping[str, Any]: The saved state.
"""
...
async def load_state(self, state: Mapping[str, Any]) -> None:
"""Load the state of the entire runtime, including all hosted agents. The state should be the same as the one returned by :meth:`save_state`.
Args:
state (Mapping[str, Any]): The saved state.
"""
...
async def agent_metadata(self, agent: AgentId) -> AgentMetadata:
"""Get the metadata for an agent.
Args:
agent (AgentId): The agent id.
Returns:
AgentMetadata: The agent metadata.
"""
...
async def agent_save_state(self, agent: AgentId) -> Mapping[str, Any]:
"""Save the state of a single agent.
The structure of the state is implementation defined and can be any JSON serializable object.
Args:
agent (AgentId): The agent id.
Returns:
Mapping[str, Any]: The saved state.
"""
...
async def agent_load_state(self, agent: AgentId, state: Mapping[str, Any]) -> None:
"""Load the state of a single agent.
Args:
agent (AgentId): The agent id.
state (Mapping[str, Any]): The saved state.
"""
...
async def add_subscription(self, subscription: Subscription) -> None:
"""Add a new subscription that the runtime should fulfill when processing published messages
Args:
subscription (Subscription): The subscription to add
"""
...
async def remove_subscription(self, id: str) -> None:
"""Remove a subscription from the runtime
Args:
id (str): id of the subscription to remove
Raises:
LookupError: If the subscription does not exist
"""
...
def add_message_serializer(self, serializer: MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) -> None:
"""Add a new message serialization serializer to the runtime
Note: This will deduplicate serializers based on the type_name and data_content_type properties
Args:
serializer (MessageSerializer[Any] | Sequence[MessageSerializer[Any]]): The serializer/s to add
"""
...
代码解释
这段Python代码定义了一个名为AgentRuntime
的协议(Protocol),该协议描述了一个代理运行时环境应具备的功能接口。下面详细解释代码逻辑和完成的功能:
代码逻辑概述
- 导入模块:导入了一系列必要的模块和类型,包括
collections.abc
中的Sequence
、typing
模块中的各种类型注解,以及自定义的一些模块如_agent
、_agent_id
等。 - 定义类型变量:定义了一个类型变量
T
,它必须是Agent
类或其子类。 - 定义
AgentRuntime
协议:使用@runtime_checkable
装饰器定义了一个AgentRuntime
协议,该协议包含了多个异步方法,这些方法描述了代理运行时环境应具备的功能。
具体功能解释
1. 发送消息
async def send_message(
self,
message: Any,
recipient: AgentId,
*,
sender: AgentId | None = None,
cancellation_token: CancellationToken | None = None,
message_id: str | None = None,
) -> Any:
...
- 功能:向指定的代理发送消息并获取响应。
- 参数:
message
:要发送的消息。recipient
:消息的接收代理的ID。sender
:消息的发送代理的ID,若为None
表示消息是从外部直接发送到运行时的。cancellation_token
:用于取消正在进行的操作的令牌。message_id
:消息的唯一ID,若为None
则会生成一个新的ID。
- 异常:可能会抛出
CantHandleException
(接收者无法处理消息)、UndeliverableException
(消息无法送达)等异常。 - 返回值:接收代理的响应。
2. 发布消息
async def publish_message(
self,
message: Any,
topic_id: TopicId,
*,
sender: AgentId | None = None,
cancellation_token: CancellationToken | None = None,
message_id: str | None = None,
) -> None:
...
- 功能:将消息发布到指定的主题,不期望得到响应。
- 参数:
message
:要发布的消息。topic_id
:消息要发布到的主题ID。sender
:消息的发送代理的ID,默认为None
。cancellation_token
:用于取消正在进行的操作的令牌。message_id
:消息的唯一ID,若为None
则会生成一个新的ID。
- 异常:可能会抛出
UndeliverableException
(消息无法送达)异常。
3. 注册代理工厂
async def register_factory(
self,
type: str | AgentType,
agent_factory: Callable[[], T | Awaitable[T]],
*,
expected_class: type[T] | None = None,
) -> AgentType:
...
- 功能:向运行时注册一个代理工厂,该工厂与特定类型关联,类型必须唯一。
- 参数:
type
:代理的类型,可以是字符串或AgentType
对象。agent_factory
:创建代理的工厂函数,返回一个代理实例或一个可等待的代理实例。expected_class
:代理的预期类,用于运行时验证工厂。
- 返回值:注册的代理类型。
4. 获取底层代理实例
async def try_get_underlying_agent_instance(self, id: AgentId, type: Type[T] = Agent) -> T:
...
- 功能:尝试根据代理ID获取底层代理实例。
- 参数:
id
:代理的ID。type
:代理的预期类型,默认为Agent
。
- 异常:可能会抛出
LookupError
(代理未找到)、NotAccessibleError
(代理不可访问)、TypeError
(代理类型不符合预期)等异常。 - 返回值:具体的代理实例。
5. 获取代理ID
@overload
async def get(self, id: AgentId, /, *, lazy: bool =...) -> AgentId:...
@overload
async def get(self, type: AgentType | str, /, key: str =..., *, lazy: bool =...) -> AgentId:...
async def get(
self, id_or_type: AgentId | AgentType | str, /, key: str = "default", *, lazy: bool = True
) -> AgentId:
...
- 功能:根据代理ID或类型获取代理ID。
- 参数:
id_or_type
:可以是代理ID、代理类型或类型字符串。key
:键,默认为"default"
。lazy
:是否懒加载,默认为True
。
- 返回值:代理ID。
6. 保存和加载运行时状态
async def save_state(self) -> Mapping[str, Any]:
...
async def load_state(self, state: Mapping[str, Any]) -> None:
...
save_state
功能:保存整个运行时的状态,包括所有托管代理的状态。- 返回值:保存的状态,是一个JSON可序列化的对象。
load_state
功能:加载之前保存的运行时状态。- 参数:
state
为之前保存的状态。
7. 获取代理元数据
async def agent_metadata(self, agent: AgentId) -> AgentMetadata:
...
- 功能:获取指定代理的元数据。
- 参数:
agent
为代理的ID。 - 返回值:代理的元数据。
8. 保存和加载单个代理状态
async def agent_save_state(self, agent: AgentId) -> Mapping[str, Any]:
...
async def agent_load_state(self, agent: AgentId, state: Mapping[str, Any]) -> None:
...
agent_save_state
功能:保存单个代理的状态。- 参数:
agent
为代理的ID。 - 返回值:保存的代理状态,是一个JSON可序列化的对象。
agent_load_state
功能:加载单个代理的状态。- 参数:
agent
为代理的ID,state
为之前保存的代理状态。
9. 添加和移除订阅
async def add_subscription(self, subscription: Subscription) -> None:
...
async def remove_subscription(self, id: str) -> None:
...
add_subscription
功能:向运行时添加一个新的订阅,运行时在处理发布的消息时会满足该订阅。- 参数:
subscription
为要添加的订阅。 remove_subscription
功能:从运行时移除一个订阅。- 参数:
id
为要移除的订阅的ID。 - 异常:若订阅不存在,会抛出
LookupError
异常。
10. 添加消息序列化器
def add_message_serializer(self, serializer: MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) -> None:
...
- 功能:向运行时添加一个或多个消息序列化器,会根据
type_name
和data_content_type
属性对序列化器进行去重。 - 参数:
serializer
为要添加的序列化器或序列化器序列。
总结
AgentRuntime
协议定义了一个代理运行时环境的接口,包括消息发送和发布、代理工厂注册、状态保存和加载、订阅管理以及消息序列化器管理等功能,为实现具体的代理运行时环境提供了规范。
代码示例
示例 1:发送消息
import asyncio
from autogen_core import AgentRuntime,Agent, AgentId, TopicId,SingleThreadedAgentRuntime, Subscription
async def send_message_example(runtime: AgentRuntime):
recipient = AgentId("recipient_agent", "default")
message = {"content": "Hello, recipient!"}
try:
response = await runtime.send_message(message, recipient)
print("Response received:", response)
except Exception as e:
print(f"Error sending message: {e}")
async def main():
runtime: AgentRuntime = SingleThreadedAgentRuntime()
await send_message_example(runtime)
await main()
Error sending message: Recipient not found
示例 2:发布消息
async def publish_message_example(runtime: AgentRuntime):
topic = TopicId("example_topic","default")
message = {"content": "This is a published message!"}
sender = AgentId("sender_agent", "default")
try:
await runtime.publish_message(message, topic, sender=sender)
print("Message published successfully.")
except Exception as e:
print(f"Error publishing message: {e}")
async def main():
runtime: AgentRuntime =SingleThreadedAgentRuntime()
await publish_message_example(runtime)
await main()
Message published successfully.
示例 3:注册代理工厂
class MyAgent(Agent):
def __init__(self):
super().__init__("My Agent")
async def my_agent_factory():
return MyAgent()
async def register_factory_example(runtime: AgentRuntime):
agent_type = await runtime.register_factory("my_agent_type", my_agent_factory)
print(f"Agent factory registered for type: {agent_type}")
async def main():
runtime: AgentRuntime =SingleThreadedAgentRuntime()
await register_factory_example(runtime)
await main()
Agent factory registered for type: AgentType(type='my_agent_type')
示例 4:获取代理实例
async def get_agent_instance_example(runtime: AgentRuntime):
agent_id = AgentId("my_agent", "default")
try:
agent = await runtime.try_get_underlying_agent_instance(agent_id, Agent)
print(f"Agent instance retrieved: {agent}")
except Exception as e:
print(f"Error getting agent instance: {e}")
async def main():
runtime: AgentRuntime =SingleThreadedAgentRuntime()
await get_agent_instance_example(runtime)
await main()
Error getting agent instance: Agent with name my_agent not found.
示例 5:保存和加载运行时状态
async def save_load_state_example(runtime: AgentRuntime):
# 保存状态
state = await runtime.save_state()
print("Runtime state saved.")
# 加载状态
await runtime.load_state(state)
print("Runtime state loaded.")
async def main():
runtime: AgentRuntime =SingleThreadedAgentRuntime()
await save_load_state_example(runtime)
await main()
Runtime state saved.
Runtime state loaded.
示例 6:添加和移除订阅
async def subscription_example(runtime: AgentRuntime):
from autogen_core import TypeSubscription
subscription = TypeSubscription(topic_type="t1", agent_type="a1")
# 添加订阅
await runtime.add_subscription(subscription)
print("Subscription added.")
# 移除订阅
try:
await runtime.remove_subscription(subscription.id)
print("Subscription removed.")
except Exception as e:
print(f"Error removing subscription: {e}")
async def main():
runtime: AgentRuntime =SingleThreadedAgentRuntime() # 假设这里已经有一个 AgentRuntime 实例
await subscription_example(runtime)
await main()
Subscription added.
Subscription removed.
示例 7:添加消息序列化器
from autogen_core import AgentRuntime, MessageSerializer
from typing import Any
class MyMessageSerializer(MessageSerializer[Any]):
def serialize(self, message: Any) -> bytes:
# 实现序列化逻辑
pass
def deserialize(self, data: bytes) -> Any:
# 实现反序列化逻辑
pass
runtime: AgentRuntime =SingleThreadedAgentRuntime()
serializer = MyMessageSerializer()
runtime.add_message_serializer(serializer)
print("Message serializer added.")
Message serializer added.