Autogen_core源码:_subscription.py
目录
- _subscription.py代码
- 代码解释
- 代码逻辑解释
- 1. 导入模块
- 2. 定义`Subscription`协议
- 3. 定义`id`属性
- 4. 定义`__eq__`方法
- 5. 定义`is_match`方法
- 6. 定义`map_to_agent`方法
- 7. 定义`UnboundSubscription`别名
- 功能总结
- 代码示例
- 1. Implementing the `Subscription` Protocol
- 2. Using the `UnboundSubscription` Helper Alias
- 3. Asynchronous `UnboundSubscription` Example
_subscription.py代码
from __future__ import annotations
from typing import Awaitable, Callable, Protocol, runtime_checkable
from ._agent_id import AgentId
from ._topic import TopicId
@runtime_checkable
class Subscription(Protocol):
"""Subscriptions define the topics that an agent is interested in."""
@property
def id(self) -> str:
"""Get the ID of the subscription.
Implementations should return a unique ID for the subscription. Usually this is a UUID.
Returns:
str: ID of the subscription.
"""
...
def __eq__(self, other: object) -> bool:
"""Check if two subscriptions are equal.
Args:
other (object): Other subscription to compare against.
Returns:
bool: True if the subscriptions are equal, False otherwise.
"""
if not isinstance(other, Subscription):
return False
return self.id == other.id
def is_match(self, topic_id: TopicId) -> bool:
"""Check if a given topic_id matches the subscription.
Args:
topic_id (TopicId): TopicId to check.
Returns:
bool: True if the topic_id matches the subscription, False otherwise.
"""
...
def map_to_agent(self, topic_id: TopicId) -> AgentId:
"""Map a topic_id to an agent. Should only be called if `is_match` returns True for the given topic_id.
Args:
topic_id (TopicId): TopicId to map.
Returns:
AgentId: ID of the agent that should handle the topic_id.
Raises:
CantHandleException: If the subscription cannot handle the topic_id.
"""
...
# Helper alias to represent the lambdas used to define subscriptions
UnboundSubscription = Callable[[], list[Subscription] | Awaitable[list[Subscription]]]
代码解释
这段Python代码定义了一个名为Subscription
的协议(Protocol),并创建了一个辅助别名UnboundSubscription
,下面详细解释其逻辑和功能。
代码逻辑解释
1. 导入模块
from __future__ import annotations
from typing import Awaitable, Callable, Protocol, runtime_checkable
from._agent_id import AgentId
from._topic import TopicId
from __future__ import annotations
:允许在类型注解中使用前向引用,即可以在定义类型时引用尚未定义的类或类型。Awaitable
、Callable
、Protocol
、runtime_checkable
:这些是Python标准库typing
模块中的类型和装饰器。Awaitable
用于表示可等待对象,Callable
用于表示可调用对象,Protocol
用于定义协议,runtime_checkable
用于将协议标记为可在运行时检查。AgentId
和TopicId
:从._agent_id
和._topic
模块中导入的自定义类型,分别表示代理ID和主题ID。
2. 定义Subscription
协议
@runtime_checkable
class Subscription(Protocol):
"""Subscriptions define the topics that an agent is interested in."""
@runtime_checkable
:将Subscription
协议标记为可在运行时检查,这意味着可以使用isinstance()
函数来检查一个对象是否实现了该协议。class Subscription(Protocol)
:定义了一个名为Subscription
的协议,协议类似于接口,它定义了一组方法和属性,但不提供具体的实现。
3. 定义id
属性
@property
def id(self) -> str:
"""Get the ID of the subscription.
Implementations should return a unique ID for the subscription. Usually this is a UUID.
Returns:
str: ID of the subscription.
"""
...
@property
:将id
方法转换为属性,使得可以像访问属性一样访问该方法。def id(self) -> str
:定义了一个抽象方法,要求实现该协议的类必须提供一个id
属性,该属性返回一个字符串类型的订阅ID。
4. 定义__eq__
方法
def __eq__(self, other: object) -> bool:
"""Check if two subscriptions are equal.
Args:
other (object): Other subscription to compare against.
Returns:
bool: True if the subscriptions are equal, False otherwise.
"""
if not isinstance(other, Subscription):
return False
return self.id == other.id
def __eq__(self, other: object) -> bool
:定义了__eq__
方法,用于比较两个Subscription
对象是否相等。if not isinstance(other, Subscription): return False
:如果other
不是Subscription
类型的对象,则返回False
。return self.id == other.id
:如果other
是Subscription
类型的对象,则比较两个对象的id
属性是否相等。
5. 定义is_match
方法
def is_match(self, topic_id: TopicId) -> bool:
"""Check if a given topic_id matches the subscription.
Args:
topic_id (TopicId): TopicId to check.
Returns:
bool: True if the topic_id matches the subscription, False otherwise.
"""
...
def is_match(self, topic_id: TopicId) -> bool
:定义了一个抽象方法,要求实现该协议的类必须提供一个is_match
方法,该方法接受一个TopicId
类型的参数,并返回一个布尔值,表示该主题ID是否与订阅匹配。
6. 定义map_to_agent
方法
def map_to_agent(self, topic_id: TopicId) -> AgentId:
"""Map a topic_id to an agent. Should only be called if `is_match` returns True for the given topic_id.
Args:
topic_id (TopicId): TopicId to map.
Returns:
AgentId: ID of the agent that should handle the topic_id.
Raises:
CantHandleException: If the subscription cannot handle the topic_id.
"""
...
def map_to_agent(self, topic_id: TopicId) -> AgentId
:定义了一个抽象方法,要求实现该协议的类必须提供一个map_to_agent
方法,该方法接受一个TopicId
类型的参数,并返回一个AgentId
类型的结果,表示应该处理该主题ID的代理ID。Raises: CantHandleException: If the subscription cannot handle the topic_id.
:如果订阅无法处理该主题ID,则抛出CantHandleException
异常。
7. 定义UnboundSubscription
别名
UnboundSubscription = Callable[[], list[Subscription] | Awaitable[list[Subscription]]]
UnboundSubscription
:定义了一个辅助别名,表示一个可调用对象,该对象不接受任何参数,返回一个Subscription
对象列表或一个可等待的Subscription
对象列表。
功能总结
这段代码的主要功能是定义了一个Subscription
协议,用于描述订阅的行为和属性。订阅表示一个代理对某些主题感兴趣,通过实现Subscription
协议的类可以创建具体的订阅对象,并使用这些对象来检查主题ID是否匹配、将主题ID映射到代理等。同时,代码还定义了一个辅助别名UnboundSubscription
,用于表示生成订阅列表的可调用对象。
代码示例
1. Implementing the Subscription
Protocol
from uuid import uuid4
from typing import Awaitable, Callable, Protocol, runtime_checkable,List
# Assume these are simple string-based representations for now
class AgentId(str):
pass
class TopicId(str):
pass
@runtime_checkable
class Subscription(Protocol):
@property
def id(self) -> str:
...
def __eq__(self, other: object) -> bool:
if not isinstance(other, Subscription):
return False
return self.id == other.id
def is_match(self, topic_id: TopicId) -> bool:
...
def map_to_agent(self, topic_id: TopicId) -> AgentId:
...
class ConcreteSubscription:
def __init__(self, agent_id: AgentId, topic_pattern: str):
self._id = str(uuid4())
self._agent_id = agent_id
self._topic_pattern = topic_pattern
@property
def id(self) -> str:
return self._id
def is_match(self, topic_id: TopicId) -> bool:
return topic_id.startswith(self._topic_pattern)
def map_to_agent(self, topic_id: TopicId) -> AgentId:
if self.is_match(topic_id):
return self._agent_id
raise ValueError("CantHandleException: Subscription cannot handle the topic_id.")
# Usage example
agent_id = AgentId("agent_1")
subscription = ConcreteSubscription(agent_id, "topic_prefix_")
topic_id = TopicId("topic_prefix_123")
if subscription.is_match(topic_id):
mapped_agent_id = subscription.map_to_agent(topic_id)
print(f"Topic {topic_id} is mapped to agent {mapped_agent_id}")
Topic topic_prefix_123 is mapped to agent agent_1
2. Using the UnboundSubscription
Helper Alias
UnboundSubscription = Callable[[], List[Subscription] | Awaitable[List[Subscription]]]
def create_subscriptions() -> List[Subscription]:
agent_id = AgentId("agent_1")
subscription1 = ConcreteSubscription(agent_id, "topic_prefix_1_")
subscription2 = ConcreteSubscription(agent_id, "topic_prefix_2_")
return [subscription1, subscription2]
unbound_sub: UnboundSubscription = create_subscriptions
subscriptions = unbound_sub()
for sub in subscriptions:
print(f"Subscription ID: {sub.id}")
Subscription ID: 9b25f5f0-b162-4e21-bf11-c777313b0110
Subscription ID: ebdcd6d7-79ec-42d9-8121-c89dc28d781c
3. Asynchronous UnboundSubscription
Example
import asyncio
async def create_subscriptions_async() -> List[Subscription]:
await asyncio.sleep(1) # Simulate some async operation
agent_id = AgentId("agent_1")
subscription1 = ConcreteSubscription(agent_id, "topic_prefix_1_")
subscription2 = ConcreteSubscription(agent_id, "topic_prefix_2_")
return [subscription1, subscription2]
unbound_sub_async: UnboundSubscription = create_subscriptions_async
async def main():
subscriptions = await unbound_sub_async()
for sub in subscriptions:
print(f"Subscription ID: {sub.id}")
await main()
Subscription ID: 644e7731-1bf8-4202-b7dc-58fe2feb8ca3
Subscription ID: 2cae55f8-4163-4dbe-8c26-30f7ec80b581
原文地址:https://blog.csdn.net/qq_41472205/article/details/145416446
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.kler.cn/a/528348.html 如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.kler.cn/a/528348.html 如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!