Autogen_core:Concurrent Agents
目录
- 1. Single Message & Multiple Processors
- 代码
- 解释
- 导入库和模块
- 定义任务数据类
- 定义处理器代理类
- 运行时环境和代理注册
- 发布消息并停止运行时
- 输出解释
- 2. Multiple messages & Multiple Processors
- 代码
- 解释
- 1. 定义主题类型和主题ID
- 2. 定义UrgentProcessor类
- 3. 定义NormalProcessor类
- 4. 运行时环境的设置和启动
- 5. 发布任务消息
- 6. 停止运行时环境
- 输出解释
- 3. Collecting Results
- 代码
- 解释
- 实现原理
- 4. Direct Messages
- 代码
- 解释
- 类 `WorkerAgent`
- 类 `DelegatorAgent`
- 运行时环境 `SingleThreadedAgentRuntime`
1. Single Message & Multiple Processors
代码
import asyncio
from dataclasses import dataclass
from autogen_core import (
AgentId,
ClosureAgent,
ClosureContext,
DefaultTopicId,
MessageContext,
RoutedAgent,
SingleThreadedAgentRuntime,
TopicId,
TypeSubscription,
default_subscription,
message_handler,
type_subscription,
)
@dataclass
class Task:
task_id: str
@dataclass
class TaskResponse:
task_id: str
result: str
@default_subscription
class Processor(RoutedAgent):
@message_handler
async def on_task(self, message: Task, ctx: MessageContext) -> None:
print(f"{self._description} starting task {message.task_id}")
await asyncio.sleep(2) # Simulate work
print(f"{self._description} finished task {message.task_id}")
runtime = SingleThreadedAgentRuntime()
await Processor.register(runtime, "agent_1", lambda: Processor("Agent 1"))
await Processor.register(runtime, "agent_2", lambda: Processor("Agent 2"))
runtime.start()
await runtime.publish_message(Task(task_id="task-1"), topic_id=DefaultTopicId())
await runtime.stop_when_idle()
Agent 1 starting task task-1
Agent 2 starting task task-1
Agent 1 finished task task-1
Agent 2 finished task task-1
解释
这段代码是一个使用异步编程和消息传递机制实现的简单任务处理系统。它使用了 autogen_core
库来创建和管理代理(agents),并通过消息传递来实现任务的分发和处理。下面是对代码的详细解释:
导入库和模块
import asyncio
from dataclasses import dataclass
from autogen_core import (
AgentId,
ClosureAgent,
ClosureContext,
DefaultTopicId,
MessageContext,
RoutedAgent,
SingleThreadedAgentRuntime,
TopicId,
TypeSubscription,
default_subscription,
message_handler,
type_subscription,
)
这里导入了一些必要的模块和类,包括异步编程库 asyncio
和数据类库 dataclasses
,以及 autogen_core
库中的一些核心组件。
定义任务数据类
@dataclass
class Task:
task_id: str
@dataclass
class TaskResponse:
task_id: str
result: str
定义了两个数据类 Task
和 TaskResponse
,分别用于表示任务和任务响应。每个任务都有一个唯一的 task_id
,而任务响应则包含任务的 task_id
和处理结果 result
。
定义处理器代理类
@default_subscription
class Processor(RoutedAgent):
@message_handler
async def on_task(self, message: Task, ctx: MessageContext) -> None:
print(f"{self._description} starting task {message.task_id}")
await asyncio.sleep(2) # Simulate work
print(f"{self._description} finished task {message.task_id}")
定义了一个名为 Processor
的代理类,继承自 RoutedAgent
。这个类使用 @default_subscription
装饰器来标记默认的订阅行为。Processor
类中定义了一个异步方法 on_task
,该方法使用 @message_handler
装饰器来标记它是一个消息处理函数。当接收到 Task
类型的消息时,这个方法会被调用,模拟处理任务的过程(通过 asyncio.sleep(2)
模拟工作延迟),并打印任务开始和结束的信息。
运行时环境和代理注册
runtime = SingleThreadedAgentRuntime()
await Processor.register(runtime, "agent_1", lambda: Processor("Agent 1"))
await Processor.register(runtime, "agent_2", lambda: Processor("Agent 2"))
runtime.start()
创建了一个单线程的代理运行时环境 SingleThreadedAgentRuntime
,并注册了两个 Processor
代理实例,分别命名为 "agent_1"
和 "agent_2"
。然后启动运行时环境。
发布消息并停止运行时
await runtime.publish_message(Task(task_id="task-1"), topic_id=DefaultTopicId())
await runtime.stop_when_idle()
向默认主题发布了一个 Task
消息,任务 ID 为 "task-1"
。然后等待运行时环境在所有任务完成后停止。
输出解释
Agent 1 starting task task-1
Agent 2 starting task task-1
Agent 1 finished task task-1
Agent 2 finished task task-1
从输出可以看到,两个代理都收到了任务并开始处理,然后都成功完成了任务。这表明消息传递机制和任务处理逻辑工作正常。
这段代码展示了如何使用异步编程和消息传递机制来实现一个简单的任务分发和处理系统。
2. Multiple messages & Multiple Processors
代码
TASK_RESULTS_TOPIC_TYPE = "task-results"
task_results_topic_id = TopicId(type=TASK_RESULTS_TOPIC_TYPE, source="default")
@type_subscription(topic_type="urgent")
class UrgentProcessor(RoutedAgent):
@message_handler
async def on_task(self, message: Task, ctx: MessageContext) -> None:
print(f"Urgent processor starting task {message.task_id}")
await asyncio.sleep(1) # Simulate work
print(f"Urgent processor finished task {message.task_id}")
task_response = TaskResponse(task_id=message.task_id, result="Results by Urgent Processor")
await self.publish_message(task_response, topic_id=task_results_topic_id)
@type_subscription(topic_type="normal")
class NormalProcessor(RoutedAgent):
@message_handler
async def on_task(self, message: Task, ctx: MessageContext) -> None:
print(f"Normal processor starting task {message.task_id}")
await asyncio.sleep(3) # Simulate work
print(f"Normal processor finished task {message.task_id}")
task_response = TaskResponse(task_id=message.task_id, result="Results by Normal Processor")
await self.publish_message(task_response, topic_id=task_results_topic_id)
runtime = SingleThreadedAgentRuntime()
await UrgentProcessor.register(runtime, "urgent_processor", lambda: UrgentProcessor("Urgent Processor"))
await NormalProcessor.register(runtime, "normal_processor", lambda: NormalProcessor("Normal Processor"))
runtime.start()
await runtime.publish_message(Task(task_id="normal-1"), topic_id=TopicId(type="normal", source="default"))
await runtime.publish_message(Task(task_id="urgent-1"), topic_id=TopicId(type="urgent", source="default"))
await runtime.stop_when_idle()
Normal processor starting task normal-1
Urgent processor starting task urgent-1
Urgent processor finished task urgent-1
Normal processor finished task normal-1
解释
这段代码是一个基于异步消息处理的任务调度系统示例,使用了Python编程语言。代码中定义了两个处理不同优先级任务的处理器(UrgentProcessor和NormalProcessor),并通过一个单线程的运行时环境(SingleThreadedAgentRuntime)来管理和执行这些任务。下面是对代码的详细解释:
1. 定义主题类型和主题ID
TASK_RESULTS_TOPIC_TYPE = "task-results"
task_results_topic_id = TopicId(type=TASK_RESULTS_TOPIC_TYPE, source="default")
TASK_RESULTS_TOPIC_TYPE
定义了一个主题类型,用于发布任务结果。task_results_topic_id
是一个主题ID,指定了任务结果的发布主题。
2. 定义UrgentProcessor类
@type_subscription(topic_type="urgent")
class UrgentProcessor(RoutedAgent):
@message_handler
async def on_task(self, message: Task, ctx: MessageContext) -> None:
print(f"Urgent processor starting task {message.task_id}")
await asyncio.sleep(1) # Simulate work
print(f"Urgent processor finished task {message.task_id}")
task_response = TaskResponse(task_id=message.task_id, result="Results by Urgent Processor")
await self.publish_message(task_response, topic_id=task_results_topic_id)
@type_subscription(topic_type="urgent")
装饰器用于订阅“urgent”类型的消息。UrgentProcessor
类继承自RoutedAgent
,用于处理紧急任务。on_task
方法是一个消息处理函数,当接收到任务消息时执行。它模拟了1秒的工作时间,然后发布任务结果。
3. 定义NormalProcessor类
@type_subscription(topic_type="normal")
class NormalProcessor(RoutedAgent):
@message_handler
async def on_task(self, message: Task, ctx: MessageContext) -> None:
print(f"Normal processor starting task {message.task_id}")
await asyncio.sleep(3) # Simulate work
print(f"Normal processor finished task {message.task_id}")
task_response = TaskResponse(task_id=message.task_id, result="Results by Normal Processor")
await self.publish_message(task_response, topic_id=task_results_topic_id)
@type_subscription(topic_type="normal")
装饰器用于订阅“normal”类型的消息。NormalProcessor
类继承自RoutedAgent
,用于处理普通任务。on_task
方法也是一个消息处理函数,模拟了3秒的工作时间,然后发布任务结果。
4. 运行时环境的设置和启动
runtime = SingleThreadedAgentRuntime()
await UrgentProcessor.register(runtime, "urgent_processor", lambda: UrgentProcessor("Urgent Processor"))
await NormalProcessor.register(runtime, "normal_processor", lambda: NormalProcessor("Normal Processor"))
runtime.start()
- 创建了一个单线程的运行时环境
SingleThreadedAgentRuntime
。 - 使用
register
方法注册了UrgentProcessor
和NormalProcessor
,并指定了它们的名称和实例化函数。 - 调用
runtime.start()
启动运行时环境。
5. 发布任务消息
await runtime.publish_message(Task(task_id="normal-1"), topic_id=TopicId(type="normal", source="default"))
await runtime.publish_message(Task(task_id="urgent-1"), topic_id=TopicId(type="urgent", source="default"))
- 发布了两个任务消息,一个普通任务和一个紧急任务,分别对应不同的主题类型。
6. 停止运行时环境
await runtime.stop_when_idle()
- 当所有任务处理完毕后,停止运行时环境。
输出解释
Normal processor starting task normal-1
Urgent processor starting task urgent-1
Urgent processor finished task urgent-1
Normal processor finished task normal-1
- 输出显示,紧急任务先开始并完成,然后普通任务开始并完成。这符合预期,因为紧急任务的处理时间较短,应该优先完成。
3. Collecting Results
代码
queue = asyncio.Queue[TaskResponse]()
async def collect_result(_agent: ClosureContext, message: TaskResponse, ctx: MessageContext) -> None:
await queue.put(message)
runtime.start()
CLOSURE_AGENT_TYPE = "collect_result_agent"
await ClosureAgent.register_closure(
runtime,
CLOSURE_AGENT_TYPE,
collect_result,
subscriptions=lambda: [TypeSubscription(topic_type=TASK_RESULTS_TOPIC_TYPE, agent_type=CLOSURE_AGENT_TYPE)],
)
await runtime.publish_message(Task(task_id="normal-1"), topic_id=TopicId(type="normal", source="default"))
await runtime.publish_message(Task(task_id="urgent-1"), topic_id=TopicId(type="urgent", source="default"))
await runtime.stop_when_idle()
Normal processor starting task normal-1
Urgent processor starting task urgent-1
Urgent processor finished task urgent-1
Normal processor finished task normal-1
while not queue.empty():
print(await queue.get())
TaskResponse(task_id='urgent-1', result='Results by Urgent Processor')
TaskResponse(task_id='normal-1', result='Results by Normal Processor')
解释
这段代码是一个使用异步编程(asyncio)处理任务并收集结果的示例。它展示了如何通过一个队列来收集不同类型的任务结果,并在所有任务完成后打印这些结果。下面是对代码的详细解释:
-
导入模块和定义队列:
queue = asyncio.Queue[TaskResponse]()
这里定义了一个异步队列
queue
,用于存储任务结果。TaskResponse
是一个假设存在的类,用于表示任务的结果。 -
定义收集结果的异步函数:
async def collect_result(_agent: ClosureContext, message: TaskResponse, ctx: MessageContext) -> None: await queue.put(message)
这个函数
collect_result
用于接收任务结果并将其放入队列中。它接收三个参数:_agent
(代理上下文),message
(任务结果),和ctx
(消息上下文)。 -
启动运行时环境:
runtime.start()
启动一个运行时环境
runtime
,用于管理任务的执行。 -
注册收集结果的代理:
CLOSURE_AGENT_TYPE = "collect_result_agent" await ClosureAgent.register_closure( runtime, CLOSURE_AGENT_TYPE, collect_result, subscriptions=lambda: [TypeSubscription(topic_type=TASK_RESULTS_TOPIC_TYPE, agent_type=CLOSURE_AGENT_TYPE)], )
这里注册了一个名为
collect_result_agent
的代理,用于订阅任务结果的主题(TASK_RESULTS_TOPIC_TYPE
),并将收集到的结果传递给collect_result
函数。 -
发布任务消息:
await runtime.publish_message(Task(task_id="normal-1"), topic_id=TopicId(type="normal", source="default")) await runtime.publish_message(Task(task_id="urgent-1"), topic_id=TopicId(type="urgent", source="default"))
发布两个任务消息,一个是普通任务(
normal-1
),另一个是紧急任务(urgent-1
)。 -
停止运行时环境:
await runtime.stop_when_idle()
当所有任务都完成后,停止运行时环境。
-
打印队列中的任务结果:
while not queue.empty(): print(await queue.get())
这个循环会不断从队列中获取任务结果并打印,直到队列为空。
实现原理
- 异步队列:
asyncio.Queue
是一个线程和协程安全的队列,用于在异步任务之间传递数据。 - 代理注册和订阅:通过注册代理并订阅特定主题,可以实现对不同类型任务的监听和处理。
- 消息发布和接收:任务消息通过
publish_message
发布,并由注册的代理接收和处理。
4. Direct Messages
代码
class WorkerAgent(RoutedAgent):
@message_handler
async def on_task(self, message: Task, ctx: MessageContext) -> TaskResponse:
print(f"{self.id} starting task {message.task_id}")
await asyncio.sleep(2) # Simulate work
print(f"{self.id} finished task {message.task_id}")
return TaskResponse(task_id=message.task_id, result=f"Results by {self.id}")
class DelegatorAgent(RoutedAgent):
def __init__(self, description: str, worker_type: str):
super().__init__(description)
self.worker_instances = [AgentId(worker_type, f"{worker_type}-1"), AgentId(worker_type, f"{worker_type}-2")]
@message_handler
async def on_task(self, message: Task, ctx: MessageContext) -> TaskResponse:
print(f"Delegator received task {message.task_id}.")
subtask1 = Task(task_id="task-part-1")
subtask2 = Task(task_id="task-part-2")
worker1_result, worker2_result = await asyncio.gather(
self.send_message(subtask1, self.worker_instances[0]), self.send_message(subtask2, self.worker_instances[1])
)
combined_result = f"Part 1: {worker1_result.result}, " f"Part 2: {worker2_result.result}"
task_response = TaskResponse(task_id=message.task_id, result=combined_result)
return task_response
runtime = SingleThreadedAgentRuntime()
await WorkerAgent.register(runtime, "worker", lambda: WorkerAgent("Worker Agent"))
await DelegatorAgent.register(runtime, "delegator", lambda: DelegatorAgent("Delegator Agent", "worker"))
runtime.start()
delegator = AgentId("delegator", "default")
response = await runtime.send_message(Task(task_id="main-task"), recipient=delegator)
print(f"Final result: {response.result}")
await runtime.stop_when_idle()
Delegator received task main-task.
worker/worker-1 starting task task-part-1
worker/worker-2 starting task task-part-2
worker/worker-1 finished task task-part-1
worker/worker-2 finished task task-part-2
Final result: Part 1: Results by worker/worker-1, Part 2: Results by worker/worker-2
解释
这段代码是一个基于异步消息传递的简单任务分配系统,由两个主要组件组成:WorkerAgent
和 DelegatorAgent
。下面是对代码的详细解释:
类 WorkerAgent
- 功能:
WorkerAgent
类表示一个工作代理,它能够接收任务并处理这些任务。 - 方法
on_task
:使用@message_handler
装饰器标记的方法,用于处理接收到的Task
消息。方法内部模拟了任务处理过程(通过asyncio.sleep(2)
模拟工作延迟),并返回一个TaskResponse
消息,其中包含任务结果。
类 DelegatorAgent
- 功能:
DelegatorAgent
类表示一个任务分配代理,它接收任务并将子任务分配给多个WorkerAgent
实例。 - 构造函数:初始化时,创建两个
WorkerAgent
实例的 ID,并保存在worker_instances
列表中。 - 方法
on_task
:处理接收到的Task
消息,将任务分解为两个子任务,并使用asyncio.gather
并发地将这些子任务发送给两个不同的WorkerAgent
实例。然后,它将两个子任务的结果组合成一个最终结果,并返回一个TaskResponse
消息。
运行时环境 SingleThreadedAgentRuntime
- 功能:
SingleThreadedAgentRuntime
是一个单线程的代理运行时环境,用于注册和运行代理。 - 注册代理:使用
register
方法注册WorkerAgent
和DelegatorAgent
。 - 启动运行时:调用
runtime.start()
启动代理运行时环境。 - 发送消息:通过
runtime.send_message
方法向DelegatorAgent
发送一个Task
消息,并等待响应。 - 停止运行时:当所有任务完成后,调用
runtime.stop_when_idle()
停止运行时环境。
这段代码展示了如何使用异步消息传递和任务分配来实现分布式任务处理。
参考链接:https://microsoft.github.io/autogen/stable/user-guide/core-user-guide/design-patterns/concurrent-agents.html