当前位置: 首页 > article >正文

Autogen_core源码:_subscription.py

目录

    • _subscription.py代码
    • 代码解释
      • 代码逻辑解释
        • 1. 导入模块
        • 2. 定义`Subscription`协议
        • 3. 定义`id`属性
        • 4. 定义`__eq__`方法
        • 5. 定义`is_match`方法
        • 6. 定义`map_to_agent`方法
        • 7. 定义`UnboundSubscription`别名
      • 功能总结
    • 代码示例
      • 1. Implementing the `Subscription` Protocol
      • 2. Using the `UnboundSubscription` Helper Alias
      • 3. Asynchronous `UnboundSubscription` Example

_subscription.py代码

from __future__ import annotations

from typing import Awaitable, Callable, Protocol, runtime_checkable

from ._agent_id import AgentId
from ._topic import TopicId


@runtime_checkable
class Subscription(Protocol):
    """Subscriptions define the topics that an agent is interested in."""

    @property
    def id(self) -> str:
        """Get the ID of the subscription.

        Implementations should return a unique ID for the subscription. Usually this is a UUID.

        Returns:
            str: ID of the subscription.
        """
        ...

    def __eq__(self, other: object) -> bool:
        """Check if two subscriptions are equal.

        Args:
            other (object): Other subscription to compare against.

        Returns:
            bool: True if the subscriptions are equal, False otherwise.
        """
        if not isinstance(other, Subscription):
            return False

        return self.id == other.id

    def is_match(self, topic_id: TopicId) -> bool:
        """Check if a given topic_id matches the subscription.

        Args:
            topic_id (TopicId): TopicId to check.

        Returns:
            bool: True if the topic_id matches the subscription, False otherwise.
        """
        ...

    def map_to_agent(self, topic_id: TopicId) -> AgentId:
        """Map a topic_id to an agent. Should only be called if `is_match` returns True for the given topic_id.

        Args:
            topic_id (TopicId): TopicId to map.

        Returns:
            AgentId: ID of the agent that should handle the topic_id.

        Raises:
            CantHandleException: If the subscription cannot handle the topic_id.
        """
        ...


# Helper alias to represent the lambdas used to define subscriptions
UnboundSubscription = Callable[[], list[Subscription] | Awaitable[list[Subscription]]]

代码解释

这段Python代码定义了一个名为Subscription的协议(Protocol),并创建了一个辅助别名UnboundSubscription,下面详细解释其逻辑和功能。

代码逻辑解释

1. 导入模块
from __future__ import annotations
from typing import Awaitable, Callable, Protocol, runtime_checkable
from._agent_id import AgentId
from._topic import TopicId
  • from __future__ import annotations:允许在类型注解中使用前向引用,即可以在定义类型时引用尚未定义的类或类型。
  • AwaitableCallableProtocolruntime_checkable:这些是Python标准库typing模块中的类型和装饰器。Awaitable用于表示可等待对象,Callable用于表示可调用对象,Protocol用于定义协议,runtime_checkable用于将协议标记为可在运行时检查。
  • AgentIdTopicId:从._agent_id._topic模块中导入的自定义类型,分别表示代理ID和主题ID。
2. 定义Subscription协议
@runtime_checkable
class Subscription(Protocol):
    """Subscriptions define the topics that an agent is interested in."""
  • @runtime_checkable:将Subscription协议标记为可在运行时检查,这意味着可以使用isinstance()函数来检查一个对象是否实现了该协议。
  • class Subscription(Protocol):定义了一个名为Subscription的协议,协议类似于接口,它定义了一组方法和属性,但不提供具体的实现。
3. 定义id属性
    @property
    def id(self) -> str:
        """Get the ID of the subscription.

        Implementations should return a unique ID for the subscription. Usually this is a UUID.

        Returns:
            str: ID of the subscription.
        """
       ...
  • @property:将id方法转换为属性,使得可以像访问属性一样访问该方法。
  • def id(self) -> str:定义了一个抽象方法,要求实现该协议的类必须提供一个id属性,该属性返回一个字符串类型的订阅ID。
4. 定义__eq__方法
    def __eq__(self, other: object) -> bool:
        """Check if two subscriptions are equal.

        Args:
            other (object): Other subscription to compare against.

        Returns:
            bool: True if the subscriptions are equal, False otherwise.
        """
        if not isinstance(other, Subscription):
            return False

        return self.id == other.id
  • def __eq__(self, other: object) -> bool:定义了__eq__方法,用于比较两个Subscription对象是否相等。
  • if not isinstance(other, Subscription): return False:如果other不是Subscription类型的对象,则返回False
  • return self.id == other.id:如果otherSubscription类型的对象,则比较两个对象的id属性是否相等。
5. 定义is_match方法
    def is_match(self, topic_id: TopicId) -> bool:
        """Check if a given topic_id matches the subscription.

        Args:
            topic_id (TopicId): TopicId to check.

        Returns:
            bool: True if the topic_id matches the subscription, False otherwise.
        """
       ...
  • def is_match(self, topic_id: TopicId) -> bool:定义了一个抽象方法,要求实现该协议的类必须提供一个is_match方法,该方法接受一个TopicId类型的参数,并返回一个布尔值,表示该主题ID是否与订阅匹配。
6. 定义map_to_agent方法
    def map_to_agent(self, topic_id: TopicId) -> AgentId:
        """Map a topic_id to an agent. Should only be called if `is_match` returns True for the given topic_id.

        Args:
            topic_id (TopicId): TopicId to map.

        Returns:
            AgentId: ID of the agent that should handle the topic_id.

        Raises:
            CantHandleException: If the subscription cannot handle the topic_id.
        """
       ...
  • def map_to_agent(self, topic_id: TopicId) -> AgentId:定义了一个抽象方法,要求实现该协议的类必须提供一个map_to_agent方法,该方法接受一个TopicId类型的参数,并返回一个AgentId类型的结果,表示应该处理该主题ID的代理ID。
  • Raises: CantHandleException: If the subscription cannot handle the topic_id.:如果订阅无法处理该主题ID,则抛出CantHandleException异常。
7. 定义UnboundSubscription别名
UnboundSubscription = Callable[[], list[Subscription] | Awaitable[list[Subscription]]]
  • UnboundSubscription:定义了一个辅助别名,表示一个可调用对象,该对象不接受任何参数,返回一个Subscription对象列表或一个可等待的Subscription对象列表。

功能总结

这段代码的主要功能是定义了一个Subscription协议,用于描述订阅的行为和属性。订阅表示一个代理对某些主题感兴趣,通过实现Subscription协议的类可以创建具体的订阅对象,并使用这些对象来检查主题ID是否匹配、将主题ID映射到代理等。同时,代码还定义了一个辅助别名UnboundSubscription,用于表示生成订阅列表的可调用对象。

代码示例

1. Implementing the Subscription Protocol

from uuid import uuid4
from typing import Awaitable, Callable, Protocol, runtime_checkable,List

# Assume these are simple string-based representations for now
class AgentId(str):
    pass

class TopicId(str):
    pass

@runtime_checkable
class Subscription(Protocol):
    @property
    def id(self) -> str:
       ...

    def __eq__(self, other: object) -> bool:
        if not isinstance(other, Subscription):
            return False
        return self.id == other.id

    def is_match(self, topic_id: TopicId) -> bool:
       ...

    def map_to_agent(self, topic_id: TopicId) -> AgentId:
       ...

class ConcreteSubscription:
    def __init__(self, agent_id: AgentId, topic_pattern: str):
        self._id = str(uuid4())
        self._agent_id = agent_id
        self._topic_pattern = topic_pattern

    @property
    def id(self) -> str:
        return self._id

    def is_match(self, topic_id: TopicId) -> bool:
        return topic_id.startswith(self._topic_pattern)

    def map_to_agent(self, topic_id: TopicId) -> AgentId:
        if self.is_match(topic_id):
            return self._agent_id
        raise ValueError("CantHandleException: Subscription cannot handle the topic_id.")


# Usage example
agent_id = AgentId("agent_1")
subscription = ConcreteSubscription(agent_id, "topic_prefix_")
topic_id = TopicId("topic_prefix_123")
if subscription.is_match(topic_id):
    mapped_agent_id = subscription.map_to_agent(topic_id)
    print(f"Topic {topic_id} is mapped to agent {mapped_agent_id}")
Topic topic_prefix_123 is mapped to agent agent_1

2. Using the UnboundSubscription Helper Alias

UnboundSubscription = Callable[[], List[Subscription] | Awaitable[List[Subscription]]]

def create_subscriptions() -> List[Subscription]:
    agent_id = AgentId("agent_1")
    subscription1 = ConcreteSubscription(agent_id, "topic_prefix_1_")
    subscription2 = ConcreteSubscription(agent_id, "topic_prefix_2_")
    return [subscription1, subscription2]


unbound_sub: UnboundSubscription = create_subscriptions
subscriptions = unbound_sub()
for sub in subscriptions:
    print(f"Subscription ID: {sub.id}")
Subscription ID: 9b25f5f0-b162-4e21-bf11-c777313b0110
Subscription ID: ebdcd6d7-79ec-42d9-8121-c89dc28d781c

3. Asynchronous UnboundSubscription Example

import asyncio
async def create_subscriptions_async() -> List[Subscription]:
    await asyncio.sleep(1)  # Simulate some async operation
    agent_id = AgentId("agent_1")
    subscription1 = ConcreteSubscription(agent_id, "topic_prefix_1_")
    subscription2 = ConcreteSubscription(agent_id, "topic_prefix_2_")
    return [subscription1, subscription2]


unbound_sub_async: UnboundSubscription = create_subscriptions_async

async def main():
    subscriptions = await unbound_sub_async()
    for sub in subscriptions:
        print(f"Subscription ID: {sub.id}")


await main()
Subscription ID: 644e7731-1bf8-4202-b7dc-58fe2feb8ca3
Subscription ID: 2cae55f8-4163-4dbe-8c26-30f7ec80b581
原文地址:https://blog.csdn.net/qq_41472205/article/details/145416446
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.kler.cn/a/528348.html

相关文章:

  • https的原理
  • Cesium+Vue3教程(011):打造数字城市
  • 网络工程师 (12)软件开发与测试
  • CNN的各种知识点(三):有关于VGG16 的结构展开的问题(1)
  • 【C++篇】哈希表
  • Maya的id贴图
  • Linux网络 HTTP cookie 与 session
  • html的字符实体和颜色表示
  • Web3技术详解
  • Notepad++消除生成bak文件
  • ROS-IMU
  • python小知识-typing注解你的程序
  • Flutter开发环境配置
  • 【Uniapp-Vue3】解决uni-popup弹窗在安全区显示透明问题
  • Linux——ext2文件系统(一)
  • 使用 Redis Streams 实现高性能消息队列
  • 2025 AI行业变革:从DeepSeek V3到o3-mini的技术演进
  • 蓝桥杯刷题DAY2:二维前缀和 一维前缀和 差分数组
  • leetcode 2563. 统计公平数对的数目
  • x86-64数据传输指令