当前位置: 首页 > article >正文

Autogen_core:Concurrent Agents

目录

    • 1. Single Message & Multiple Processors
      • 代码
      • 解释
        • 导入库和模块
        • 定义任务数据类
        • 定义处理器代理类
        • 运行时环境和代理注册
        • 发布消息并停止运行时
        • 输出解释
    • 2. Multiple messages & Multiple Processors
      • 代码
      • 解释
        • 1. 定义主题类型和主题ID
        • 2. 定义UrgentProcessor类
        • 3. 定义NormalProcessor类
        • 4. 运行时环境的设置和启动
        • 5. 发布任务消息
        • 6. 停止运行时环境
        • 输出解释
    • 3. Collecting Results
      • 代码
      • 解释
        • 实现原理
    • 4. Direct Messages
      • 代码
      • 解释
        • 类 `WorkerAgent`
        • 类 `DelegatorAgent`
        • 运行时环境 `SingleThreadedAgentRuntime`

1. Single Message & Multiple Processors

代码


import asyncio
from dataclasses import dataclass

from autogen_core import (
    AgentId,
    ClosureAgent,
    ClosureContext,
    DefaultTopicId,
    MessageContext,
    RoutedAgent,
    SingleThreadedAgentRuntime,
    TopicId,
    TypeSubscription,
    default_subscription,
    message_handler,
    type_subscription,
)
@dataclass
class Task:
    task_id: str


@dataclass
class TaskResponse:
    task_id: str
    result: str
@default_subscription
class Processor(RoutedAgent):
    @message_handler
    async def on_task(self, message: Task, ctx: MessageContext) -> None:
        print(f"{self._description} starting task {message.task_id}")
        await asyncio.sleep(2)  # Simulate work
        print(f"{self._description} finished task {message.task_id}")
runtime = SingleThreadedAgentRuntime()

await Processor.register(runtime, "agent_1", lambda: Processor("Agent 1"))
await Processor.register(runtime, "agent_2", lambda: Processor("Agent 2"))

runtime.start()

await runtime.publish_message(Task(task_id="task-1"), topic_id=DefaultTopicId())

await runtime.stop_when_idle()
Agent 1 starting task task-1
Agent 2 starting task task-1
Agent 1 finished task task-1
Agent 2 finished task task-1 

解释

这段代码是一个使用异步编程和消息传递机制实现的简单任务处理系统。它使用了 autogen_core 库来创建和管理代理(agents),并通过消息传递来实现任务的分发和处理。下面是对代码的详细解释:

导入库和模块
import asyncio
from dataclasses import dataclass

from autogen_core import (
    AgentId,
    ClosureAgent,
    ClosureContext,
    DefaultTopicId,
    MessageContext,
    RoutedAgent,
    SingleThreadedAgentRuntime,
    TopicId,
    TypeSubscription,
    default_subscription,
    message_handler,
    type_subscription,
)

这里导入了一些必要的模块和类,包括异步编程库 asyncio 和数据类库 dataclasses,以及 autogen_core 库中的一些核心组件。

定义任务数据类
@dataclass
class Task:
    task_id: str

@dataclass
class TaskResponse:
    task_id: str
    result: str

定义了两个数据类 TaskTaskResponse,分别用于表示任务和任务响应。每个任务都有一个唯一的 task_id,而任务响应则包含任务的 task_id 和处理结果 result

定义处理器代理类
@default_subscription
class Processor(RoutedAgent):
    @message_handler
    async def on_task(self, message: Task, ctx: MessageContext) -> None:
        print(f"{self._description} starting task {message.task_id}")
        await asyncio.sleep(2)  # Simulate work
        print(f"{self._description} finished task {message.task_id}")

定义了一个名为 Processor 的代理类,继承自 RoutedAgent。这个类使用 @default_subscription 装饰器来标记默认的订阅行为。Processor 类中定义了一个异步方法 on_task,该方法使用 @message_handler 装饰器来标记它是一个消息处理函数。当接收到 Task 类型的消息时,这个方法会被调用,模拟处理任务的过程(通过 asyncio.sleep(2) 模拟工作延迟),并打印任务开始和结束的信息。

运行时环境和代理注册
runtime = SingleThreadedAgentRuntime()

await Processor.register(runtime, "agent_1", lambda: Processor("Agent 1"))
await Processor.register(runtime, "agent_2", lambda: Processor("Agent 2"))

runtime.start()

创建了一个单线程的代理运行时环境 SingleThreadedAgentRuntime,并注册了两个 Processor 代理实例,分别命名为 "agent_1""agent_2"。然后启动运行时环境。

发布消息并停止运行时
await runtime.publish_message(Task(task_id="task-1"), topic_id=DefaultTopicId())

await runtime.stop_when_idle()

向默认主题发布了一个 Task 消息,任务 ID 为 "task-1"。然后等待运行时环境在所有任务完成后停止。

输出解释
Agent 1 starting task task-1
Agent 2 starting task task-1
Agent 1 finished task task-1
Agent 2 finished task task-1

从输出可以看到,两个代理都收到了任务并开始处理,然后都成功完成了任务。这表明消息传递机制和任务处理逻辑工作正常。

这段代码展示了如何使用异步编程和消息传递机制来实现一个简单的任务分发和处理系统。

2. Multiple messages & Multiple Processors

代码

TASK_RESULTS_TOPIC_TYPE = "task-results"
task_results_topic_id = TopicId(type=TASK_RESULTS_TOPIC_TYPE, source="default")


@type_subscription(topic_type="urgent")
class UrgentProcessor(RoutedAgent):
    @message_handler
    async def on_task(self, message: Task, ctx: MessageContext) -> None:
        print(f"Urgent processor starting task {message.task_id}")
        await asyncio.sleep(1)  # Simulate work
        print(f"Urgent processor finished task {message.task_id}")

        task_response = TaskResponse(task_id=message.task_id, result="Results by Urgent Processor")
        await self.publish_message(task_response, topic_id=task_results_topic_id)


@type_subscription(topic_type="normal")
class NormalProcessor(RoutedAgent):
    @message_handler
    async def on_task(self, message: Task, ctx: MessageContext) -> None:
        print(f"Normal processor starting task {message.task_id}")
        await asyncio.sleep(3)  # Simulate work
        print(f"Normal processor finished task {message.task_id}")

        task_response = TaskResponse(task_id=message.task_id, result="Results by Normal Processor")
        await self.publish_message(task_response, topic_id=task_results_topic_id)

runtime = SingleThreadedAgentRuntime()

await UrgentProcessor.register(runtime, "urgent_processor", lambda: UrgentProcessor("Urgent Processor"))
await NormalProcessor.register(runtime, "normal_processor", lambda: NormalProcessor("Normal Processor"))

runtime.start()

await runtime.publish_message(Task(task_id="normal-1"), topic_id=TopicId(type="normal", source="default"))
await runtime.publish_message(Task(task_id="urgent-1"), topic_id=TopicId(type="urgent", source="default"))

await runtime.stop_when_idle()        
        
        
        
Normal processor starting task normal-1
Urgent processor starting task urgent-1
Urgent processor finished task urgent-1
Normal processor finished task normal-1

解释

这段代码是一个基于异步消息处理的任务调度系统示例,使用了Python编程语言。代码中定义了两个处理不同优先级任务的处理器(UrgentProcessor和NormalProcessor),并通过一个单线程的运行时环境(SingleThreadedAgentRuntime)来管理和执行这些任务。下面是对代码的详细解释:

1. 定义主题类型和主题ID
TASK_RESULTS_TOPIC_TYPE = "task-results"
task_results_topic_id = TopicId(type=TASK_RESULTS_TOPIC_TYPE, source="default")
  • TASK_RESULTS_TOPIC_TYPE 定义了一个主题类型,用于发布任务结果。
  • task_results_topic_id 是一个主题ID,指定了任务结果的发布主题。
2. 定义UrgentProcessor类
@type_subscription(topic_type="urgent")
class UrgentProcessor(RoutedAgent):
    @message_handler
    async def on_task(self, message: Task, ctx: MessageContext) -> None:
        print(f"Urgent processor starting task {message.task_id}")
        await asyncio.sleep(1)  # Simulate work
        print(f"Urgent processor finished task {message.task_id}")

        task_response = TaskResponse(task_id=message.task_id, result="Results by Urgent Processor")
        await self.publish_message(task_response, topic_id=task_results_topic_id)
  • @type_subscription(topic_type="urgent") 装饰器用于订阅“urgent”类型的消息。
  • UrgentProcessor 类继承自 RoutedAgent,用于处理紧急任务。
  • on_task 方法是一个消息处理函数,当接收到任务消息时执行。它模拟了1秒的工作时间,然后发布任务结果。
3. 定义NormalProcessor类
@type_subscription(topic_type="normal")
class NormalProcessor(RoutedAgent):
    @message_handler
    async def on_task(self, message: Task, ctx: MessageContext) -> None:
        print(f"Normal processor starting task {message.task_id}")
        await asyncio.sleep(3)  # Simulate work
        print(f"Normal processor finished task {message.task_id}")

        task_response = TaskResponse(task_id=message.task_id, result="Results by Normal Processor")
        await self.publish_message(task_response, topic_id=task_results_topic_id)
  • @type_subscription(topic_type="normal") 装饰器用于订阅“normal”类型的消息。
  • NormalProcessor 类继承自 RoutedAgent,用于处理普通任务。
  • on_task 方法也是一个消息处理函数,模拟了3秒的工作时间,然后发布任务结果。
4. 运行时环境的设置和启动
runtime = SingleThreadedAgentRuntime()

await UrgentProcessor.register(runtime, "urgent_processor", lambda: UrgentProcessor("Urgent Processor"))
await NormalProcessor.register(runtime, "normal_processor", lambda: NormalProcessor("Normal Processor"))

runtime.start()
  • 创建了一个单线程的运行时环境 SingleThreadedAgentRuntime
  • 使用 register 方法注册了 UrgentProcessorNormalProcessor,并指定了它们的名称和实例化函数。
  • 调用 runtime.start() 启动运行时环境。
5. 发布任务消息
await runtime.publish_message(Task(task_id="normal-1"), topic_id=TopicId(type="normal", source="default"))
await runtime.publish_message(Task(task_id="urgent-1"), topic_id=TopicId(type="urgent", source="default"))
  • 发布了两个任务消息,一个普通任务和一个紧急任务,分别对应不同的主题类型。
6. 停止运行时环境
await runtime.stop_when_idle()
  • 当所有任务处理完毕后,停止运行时环境。
输出解释
Normal processor starting task normal-1
Urgent processor starting task urgent-1
Urgent processor finished task urgent-1
Normal processor finished task normal-1
  • 输出显示,紧急任务先开始并完成,然后普通任务开始并完成。这符合预期,因为紧急任务的处理时间较短,应该优先完成。

3. Collecting Results

代码

queue = asyncio.Queue[TaskResponse]()


async def collect_result(_agent: ClosureContext, message: TaskResponse, ctx: MessageContext) -> None:
    await queue.put(message)


runtime.start()

CLOSURE_AGENT_TYPE = "collect_result_agent"
await ClosureAgent.register_closure(
    runtime,
    CLOSURE_AGENT_TYPE,
    collect_result,
    subscriptions=lambda: [TypeSubscription(topic_type=TASK_RESULTS_TOPIC_TYPE, agent_type=CLOSURE_AGENT_TYPE)],
)

await runtime.publish_message(Task(task_id="normal-1"), topic_id=TopicId(type="normal", source="default"))
await runtime.publish_message(Task(task_id="urgent-1"), topic_id=TopicId(type="urgent", source="default"))

await runtime.stop_when_idle()
Normal processor starting task normal-1
Urgent processor starting task urgent-1
Urgent processor finished task urgent-1
Normal processor finished task normal-1
while not queue.empty():
    print(await queue.get())
TaskResponse(task_id='urgent-1', result='Results by Urgent Processor')
TaskResponse(task_id='normal-1', result='Results by Normal Processor')

解释

这段代码是一个使用异步编程(asyncio)处理任务并收集结果的示例。它展示了如何通过一个队列来收集不同类型的任务结果,并在所有任务完成后打印这些结果。下面是对代码的详细解释:

  1. 导入模块和定义队列

    queue = asyncio.Queue[TaskResponse]()
    

    这里定义了一个异步队列 queue,用于存储任务结果。TaskResponse 是一个假设存在的类,用于表示任务的结果。

  2. 定义收集结果的异步函数

    async def collect_result(_agent: ClosureContext, message: TaskResponse, ctx: MessageContext) -> None:
        await queue.put(message)
    

    这个函数 collect_result 用于接收任务结果并将其放入队列中。它接收三个参数:_agent(代理上下文),message(任务结果),和 ctx(消息上下文)。

  3. 启动运行时环境

    runtime.start()
    

    启动一个运行时环境 runtime,用于管理任务的执行。

  4. 注册收集结果的代理

    CLOSURE_AGENT_TYPE = "collect_result_agent"
    await ClosureAgent.register_closure(
        runtime,
        CLOSURE_AGENT_TYPE,
        collect_result,
        subscriptions=lambda: [TypeSubscription(topic_type=TASK_RESULTS_TOPIC_TYPE, agent_type=CLOSURE_AGENT_TYPE)],
    )
    

    这里注册了一个名为 collect_result_agent 的代理,用于订阅任务结果的主题(TASK_RESULTS_TOPIC_TYPE),并将收集到的结果传递给 collect_result 函数。

  5. 发布任务消息

    await runtime.publish_message(Task(task_id="normal-1"), topic_id=TopicId(type="normal", source="default"))
    await runtime.publish_message(Task(task_id="urgent-1"), topic_id=TopicId(type="urgent", source="default"))
    

    发布两个任务消息,一个是普通任务(normal-1),另一个是紧急任务(urgent-1)。

  6. 停止运行时环境

    await runtime.stop_when_idle()
    

    当所有任务都完成后,停止运行时环境。

  7. 打印队列中的任务结果

    while not queue.empty():
        print(await queue.get())
    

    这个循环会不断从队列中获取任务结果并打印,直到队列为空。

实现原理
  • 异步队列asyncio.Queue 是一个线程和协程安全的队列,用于在异步任务之间传递数据。
  • 代理注册和订阅:通过注册代理并订阅特定主题,可以实现对不同类型任务的监听和处理。
  • 消息发布和接收:任务消息通过 publish_message 发布,并由注册的代理接收和处理。

4. Direct Messages

代码

class WorkerAgent(RoutedAgent):
    @message_handler
    async def on_task(self, message: Task, ctx: MessageContext) -> TaskResponse:
        print(f"{self.id} starting task {message.task_id}")
        await asyncio.sleep(2)  # Simulate work
        print(f"{self.id} finished task {message.task_id}")
        return TaskResponse(task_id=message.task_id, result=f"Results by {self.id}")


class DelegatorAgent(RoutedAgent):
    def __init__(self, description: str, worker_type: str):
        super().__init__(description)
        self.worker_instances = [AgentId(worker_type, f"{worker_type}-1"), AgentId(worker_type, f"{worker_type}-2")]

    @message_handler
    async def on_task(self, message: Task, ctx: MessageContext) -> TaskResponse:
        print(f"Delegator received task {message.task_id}.")

        subtask1 = Task(task_id="task-part-1")
        subtask2 = Task(task_id="task-part-2")

        worker1_result, worker2_result = await asyncio.gather(
            self.send_message(subtask1, self.worker_instances[0]), self.send_message(subtask2, self.worker_instances[1])
        )

        combined_result = f"Part 1: {worker1_result.result}, " f"Part 2: {worker2_result.result}"
        task_response = TaskResponse(task_id=message.task_id, result=combined_result)
        return task_response
    
runtime = SingleThreadedAgentRuntime()

await WorkerAgent.register(runtime, "worker", lambda: WorkerAgent("Worker Agent"))
await DelegatorAgent.register(runtime, "delegator", lambda: DelegatorAgent("Delegator Agent", "worker"))

runtime.start()

delegator = AgentId("delegator", "default")
response = await runtime.send_message(Task(task_id="main-task"), recipient=delegator)

print(f"Final result: {response.result}")
await runtime.stop_when_idle()


Delegator received task main-task.
worker/worker-1 starting task task-part-1
worker/worker-2 starting task task-part-2
worker/worker-1 finished task task-part-1
worker/worker-2 finished task task-part-2
Final result: Part 1: Results by worker/worker-1, Part 2: Results by worker/worker-2

解释

这段代码是一个基于异步消息传递的简单任务分配系统,由两个主要组件组成:WorkerAgentDelegatorAgent。下面是对代码的详细解释:

WorkerAgent
  • 功能WorkerAgent 类表示一个工作代理,它能够接收任务并处理这些任务。
  • 方法 on_task:使用 @message_handler 装饰器标记的方法,用于处理接收到的 Task 消息。方法内部模拟了任务处理过程(通过 asyncio.sleep(2) 模拟工作延迟),并返回一个 TaskResponse 消息,其中包含任务结果。
DelegatorAgent
  • 功能DelegatorAgent 类表示一个任务分配代理,它接收任务并将子任务分配给多个 WorkerAgent 实例。
  • 构造函数:初始化时,创建两个 WorkerAgent 实例的 ID,并保存在 worker_instances 列表中。
  • 方法 on_task:处理接收到的 Task 消息,将任务分解为两个子任务,并使用 asyncio.gather 并发地将这些子任务发送给两个不同的 WorkerAgent 实例。然后,它将两个子任务的结果组合成一个最终结果,并返回一个 TaskResponse 消息。
运行时环境 SingleThreadedAgentRuntime
  • 功能SingleThreadedAgentRuntime 是一个单线程的代理运行时环境,用于注册和运行代理。
  • 注册代理:使用 register 方法注册 WorkerAgentDelegatorAgent
  • 启动运行时:调用 runtime.start() 启动代理运行时环境。
  • 发送消息:通过 runtime.send_message 方法向 DelegatorAgent 发送一个 Task 消息,并等待响应。
  • 停止运行时:当所有任务完成后,调用 runtime.stop_when_idle() 停止运行时环境。

这段代码展示了如何使用异步消息传递和任务分配来实现分布式任务处理。

参考链接:https://microsoft.github.io/autogen/stable/user-guide/core-user-guide/design-patterns/concurrent-agents.html


http://www.kler.cn/a/524043.html

相关文章:

  • 使用Python爬虫获取1688商品拍立淘API接口(item_search_img)的实战指南
  • 大一计算机的自学总结:异或运算
  • 单路由及双路由端口映射指南
  • 智慧园区系统分类及其在提升企业管理效率中的创新应用探讨
  • OpenBMC:编译
  • [免费]基于Python的Django博客系统【论文+源码+SQL脚本】
  • 出现 Error processing condition on org.springframework.cloud.openfeign 解决方法
  • 线程局部存储tls的原理和使用
  • C++ 中用于控制输出格式的操纵符——setw 、setfill、setprecision、fixed
  • 智能化加速标准和协议的更新并推动验证IP(VIP)在芯片设计中的更广泛应用
  • vim交换文件的工作原理
  • 知网爬虫,作者、摘要、题目、发表期刊等主要内容的获取
  • 文章分类列表查询功能
  • 詳細講一下RN(React Native)中的列表組件FlatList和SectionList
  • 第25章 项目启航前的密谈
  • 基于容器本地化开发与交付的实践
  • 【开源免费】基于SpringBoot+Vue.JS在线考试学习交流网页平台(JAVA毕业设计)
  • ProGen生成功能蛋白序列
  • 蓝桥杯python语言基础(3)——循环结构
  • Linux 非阻塞IO
  • 《Memory Barriers a Hardware View for Software Hackers》阅读笔记
  • 【Linux】Linux C比较两个 IPv6 网关地址是否相等,包括前缀
  • SpringBoot-Vue整合百度地图
  • Attention Free Transformer (AFT)-2020论文笔记
  • 适配器模式——C++实现
  • 人工智能在医疗领域的应用有哪些?