2. 进阶关卡-Laagent:从零搭建你的Multi-Agent
Lagent:从零搭建你的 Multi-Agent
环境准备
开发机选择 30% A100,镜像选择为 Cuda12.2-conda。
# 创建环境
conda create -n lagent python=3.10 -y
# 激活环境
conda activate lagent
# 安装 torch
conda install pytorch==2.1.2 torchvision==0.16.2 torchaudio==2.1.2 pytorch-cuda=12.1 -c pytorch -c nvidia -y
# 安装其他依赖包
pip install termcolor==2.4.0
pip install streamlit==1.39.0
pip install class_registry==2.1.2
pip install datasets==3.1.0
通过源码安装lagent:
# 创建目录以存放代码
mkdir -p /root/agent_camp4
cd /root/agent_camp4
git clone https://github.com/InternLM/lagent.git
cd lagent && git checkout e304e5d && pip install -e . && cd ..
pip install griffe==0.48.0
任务(完成此任务即完成闯关)
- 使用 Lagent 复现文档中 “制作一个属于自己的Agent” 和 “Multi-Agents博客写作系统的搭建”两部分内容,记录复现过程并截图。
- 将你的Agent部署到 Hugging Face 或 ModelScope 平台,应用名包含 Lagent 关键词(优秀学员必做)
制作一个属于自己的Agent
参考https://github.com/InternLM/Tutorial/blob/camp4/docs/L2/Agent/readme.md 进行复现, 关键步骤如下:
- 在laegnt/actions文件夹下面创建一个天气查询的工具程序。
conda activate lagent
cd /root/agent_camp4/lagent/lagent/actions
touch weather_query.py
代码内容如下:
import os
import requests
from lagent.actions.base_action import BaseAction, tool_api
from lagent.schema import ActionReturn, ActionStatusCode
class WeatherQuery(BaseAction):
def __init__(self):
super().__init__()
self.api_key = os.getenv("weather_token")
print(self.api_key)
if not self.api_key:
raise EnvironmentError("未找到环境变量 'token'。请设置你的和风天气 API Key 到 'weather_token' 环境变量中,比如export weather_token='xxx' ")
@tool_api
def run(self, location: str) -> dict:
"""
查询实时天气信息。
Args:
location (str): 要查询的地点名称、LocationID 或经纬度坐标(如 "101010100" 或 "116.41,39.92")。
Returns:
dict: 包含天气信息的字典
* location: 地点名称
* weather: 天气状况
* temperature: 当前温度
* wind_direction: 风向
* wind_speed: 风速(公里/小时)
* humidity: 相对湿度(%)
* report_time: 数据报告时间
"""
try:
# 如果 location 不是坐标格式(例如 "116.41,39.92"),则调用 GeoAPI 获取 LocationID
if not ("," in location and location.replace(",", "").replace(".", "").isdigit()):
# 使用 GeoAPI 获取 LocationID
geo_url = f"https://geoapi.qweather.com/v2/city/lookup?location={location}&key={self.api_key}"
geo_response = requests.get(geo_url)
geo_data = geo_response.json()
if geo_data.get("code") != "200" or not geo_data.get("location"):
raise Exception(f"GeoAPI 返回错误码:{geo_data.get('code')} 或未找到位置")
location = geo_data["location"][0]["id"]
# 构建天气查询的 API 请求 URL
weather_url = f"https://devapi.qweather.com/v7/weather/now?location={location}&key={self.api_key}"
response = requests.get(weather_url)
data = response.json()
# 检查 API 响应码
if data.get("code") != "200":
raise Exception(f"Weather API 返回错误码:{data.get('code')}")
# 解析和组织天气信息
weather_info = {
"location": location,
"weather": data["now"]["text"],
"temperature": data["now"]["temp"] + "°C",
"wind_direction": data["now"]["windDir"],
"wind_speed": data["now"]["windSpeed"] + " km/h",
"humidity": data["now"]["humidity"] + "%",
"report_time": data["updateTime"]
}
return {"result": weather_info}
except Exception as exc:
return ActionReturn(
errmsg=f"WeatherQuery 异常:{exc}",
state=ActionStatusCode.HTTP_ERROR
)
- 设置和风天气 API key
export weather_token='your_token_here'
- 在
/root/agent_camp4/lagent/lagent/actions/__init__.py
中加入下面的代码,用以初始化WeatherQuery方法:
from .weather_query import WeatherQuery
__all__ = [
'BaseAction', 'ActionExecutor', 'AsyncActionExecutor', 'InvalidAction',
'FinishAction', 'NoAction', 'BINGMap', 'AsyncBINGMap', 'ArxivSearch',
'AsyncArxivSearch', 'GoogleSearch', 'AsyncGoogleSearch', 'GoogleScholar',
'AsyncGoogleScholar', 'IPythonInterpreter', 'AsyncIPythonInterpreter',
'IPythonInteractive', 'AsyncIPythonInteractive',
'IPythonInteractiveManager', 'PythonInterpreter', 'AsyncPythonInterpreter',
'PPT', 'AsyncPPT', 'WebBrowser', 'AsyncWebBrowser', 'BaseParser',
'JsonParser', 'TupleParser', 'tool_api', 'WeatherQuery' # 这里
]
- 修改 Web Demo 脚本来集成自定义的 WeatherQuery 插件。
- from lagent.actions import ArxivSearch
+ from lagent.actions import ArxivSearch, WeatherQuery
- # 初始化插件列表
- action_list = [
- ArxivSearch(),
- ]
+ action_list = [
+ ArxivSearch(),
+ WeatherQuery(),
+ ]
- 启动web程序测试
streamlit run agent_api_web_demo.py。
Multi-Agents博客写作系统的搭建
conda activate lagent
cd /root/agent_camp4/lagent/examples
touch multi_agents_api_web_demo.py
将下面的代码填入multi_agents_api_web_demo.py:
import os
import asyncio
import json
import re
import requests
import streamlit as st
from lagent.agents import Agent
from lagent.prompts.parsers import PluginParser
from lagent.agents.stream import PLUGIN_CN, get_plugin_prompt
from lagent.schema import AgentMessage
from lagent.actions import ArxivSearch
from lagent.hooks import Hook
from lagent.llms import GPTAPI
YOUR_TOKEN_HERE = os.getenv("token")
if not YOUR_TOKEN_HERE:
raise EnvironmentError("未找到环境变量 'token',请设置后再运行程序。")
# Hook类,用于对消息添加前缀
class PrefixedMessageHook(Hook):
def __init__(self, prefix, senders=None):
"""
初始化Hook
:param prefix: 消息前缀
:param senders: 指定发送者列表
"""
self.prefix = prefix
self.senders = senders or []
def before_agent(self, agent, messages, session_id):
"""
在代理处理消息前修改消息内容
:param agent: 当前代理
:param messages: 消息列表
:param session_id: 会话ID
"""
for message in messages:
if message.sender in self.senders:
message.content = self.prefix + message.content
class AsyncBlogger:
"""博客生成类,整合写作者和批评者。"""
def __init__(self, model_type, api_base, writer_prompt, critic_prompt, critic_prefix='', max_turn=2):
"""
初始化博客生成器
:param model_type: 模型类型
:param api_base: API 基地址
:param writer_prompt: 写作者提示词
:param critic_prompt: 批评者提示词
:param critic_prefix: 批评消息前缀
:param max_turn: 最大轮次
"""
self.model_type = model_type
self.api_base = api_base
self.llm = GPTAPI(
model_type=model_type,
api_base=api_base,
key=YOUR_TOKEN_HERE,
max_new_tokens=4096,
)
self.plugins = [dict(type='lagent.actions.ArxivSearch')]
self.writer = Agent(
self.llm,
writer_prompt,
name='写作者',
output_format=dict(
type=PluginParser,
template=PLUGIN_CN,
prompt=get_plugin_prompt(self.plugins)
)
)
self.critic = Agent(
self.llm,
critic_prompt,
name='批评者',
hooks=[PrefixedMessageHook(critic_prefix, ['写作者'])]
)
self.max_turn = max_turn
async def forward(self, message: AgentMessage, update_placeholder):
"""
执行多阶段博客生成流程
:param message: 初始消息
:param update_placeholder: Streamlit占位符
:return: 最终优化的博客内容
"""
step1_placeholder = update_placeholder.container()
step2_placeholder = update_placeholder.container()
step3_placeholder = update_placeholder.container()
# 第一步:生成初始内容
step1_placeholder.markdown("**Step 1: 生成初始内容...**")
message = self.writer(message)
if message.content:
step1_placeholder.markdown(f"**生成的初始内容**:\n\n{message.content}")
else:
step1_placeholder.markdown("**生成的初始内容为空,请检查生成逻辑。**")
# 第二步:批评者提供反馈
step2_placeholder.markdown("**Step 2: 批评者正在提供反馈和文献推荐...**")
message = self.critic(message)
if message.content:
# 解析批评者反馈
suggestions = re.search(r"1\. 批评建议:\n(.*?)2\. 推荐的关键词:", message.content, re.S)
keywords = re.search(r"2\. 推荐的关键词:\n- (.*)", message.content)
feedback = suggestions.group(1).strip() if suggestions else "未提供批评建议"
keywords = keywords.group(1).strip() if keywords else "未提供关键词"
# Arxiv 文献查询
arxiv_search = ArxivSearch()
arxiv_results = arxiv_search.get_arxiv_article_information(keywords)
# 显示批评内容和文献推荐
message.content = f"**批评建议**:\n{feedback}\n\n**推荐的文献**:\n{arxiv_results}"
step2_placeholder.markdown(f"**批评和文献推荐**:\n\n{message.content}")
else:
step2_placeholder.markdown("**批评内容为空,请检查批评逻辑。**")
# 第三步:写作者根据反馈优化内容
step3_placeholder.markdown("**Step 3: 根据反馈改进内容...**")
improvement_prompt = AgentMessage(
sender="critic",
content=(
f"根据以下批评建议和推荐文献对内容进行改进:\n\n"
f"批评建议:\n{feedback}\n\n"
f"推荐文献:\n{arxiv_results}\n\n"
f"请优化初始内容,使其更加清晰、丰富,并符合专业水准。"
),
)
message = self.writer(improvement_prompt)
if message.content:
step3_placeholder.markdown(f"**最终优化的博客内容**:\n\n{message.content}")
else:
step3_placeholder.markdown("**最终优化的博客内容为空,请检查生成逻辑。**")
return message
def setup_sidebar():
"""设置侧边栏,选择模型。"""
model_name = st.sidebar.text_input('模型名称:', value='internlm2.5-latest')
api_base = st.sidebar.text_input(
'API Base 地址:', value='https://internlm-chat.intern-ai.org.cn/puyu/api/v1/chat/completions'
)
return model_name, api_base
def main():
"""
主函数:构建Streamlit界面并处理用户交互
"""
st.set_page_config(layout='wide', page_title='Lagent Web Demo', page_icon='🤖')
st.title("多代理博客优化助手")
model_type, api_base = setup_sidebar()
topic = st.text_input('输入一个话题:', 'Self-Supervised Learning')
generate_button = st.button('生成博客内容')
if (
'blogger' not in st.session_state or
st.session_state['model_type'] != model_type or
st.session_state['api_base'] != api_base
):
st.session_state['blogger'] = AsyncBlogger(
model_type=model_type,
api_base=api_base,
writer_prompt="你是一位优秀的AI内容写作者,请撰写一篇有吸引力且信息丰富的博客内容。",
critic_prompt="""
作为一位严谨的批评者,请给出建设性的批评和改进建议,并基于相关主题使用已有的工具推荐一些参考文献,推荐的关键词应该是英语形式,简洁且切题。
请按照以下格式提供反馈:
1. 批评建议:
- (具体建议)
2. 推荐的关键词:
- (关键词1, 关键词2, ...)
""",
critic_prefix="请批评以下内容,并提供改进建议:\n\n"
)
st.session_state['model_type'] = model_type
st.session_state['api_base'] = api_base
if generate_button:
update_placeholder = st.empty()
async def run_async_blogger():
message = AgentMessage(
sender='user',
content=f"请撰写一篇关于{topic}的博客文章,要求表达专业,生动有趣,并且易于理解。"
)
result = await st.session_state['blogger'].forward(message, update_placeholder)
return result
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(run_async_blogger())
if __name__ == '__main__':
main()
运行streamlit run multi_agents_api_web_demo.py
,启动Web服务 输入话题,比如Semi-Supervised Learning
,最终输出的博客结果如下:
总共经历了如下三个阶段:
- 写作者根据用户输入生成初稿,批评者对初稿进行评估
- 批评者对初稿进行评估,提供改进建议和文献推荐(通过关键词触发 Arxiv 文献搜索)。
- 写作者根据批评意见对内容进行改进。
闯关材料提交(完成任务并且提交材料视为闯关成功)
- 闯关作业总共分为一个任务,一个任务完成视作闯关成功。
- 请将作业发布到知乎、CSDN等任一社交媒体,将作业链接提交到以下问卷,助教老师批改后将获得 100 算力点奖励!!!
- 提交地址:https://aicarrier.feishu.cn/share/base/form/shrcnUqshYPt7MdtYRTRpkiOFJd
参考资料
- Lagent工具文档
- 闯关文档