【Robocasa】Code Review
文章目录
- Overview
- algo
- Initialization
- Important Class Methods
- Train Loop
- Test Time
- Configs
- demo
- Config Factory
- Config Structure
- Config Locking
- 默认锁定状态
- 配置修改的上下文管理器
- dataset
- 示例数据集对象
- 参数说明
- model
- 基础模块
- EncoderCore
- VisualCore
- ScanCore
- 随机化器 (Randomizers)
- 观察编码器和解码器 (Observation Encoder and Decoder)
- tutorials
- Config
- 1. 使用配置 json(推荐)
- 2. 在代码中构建配置对象
- 实现自定义算法
- 实现 Config 类
- 实现 Algo 类
- dataset
- 数据集内容与可视化
- 查看 HDF5 数据集结构
- 可视化数据集轨迹
- hyperparam_scan & generator
- Step 1:创建基础配置 json
- Step 2: Create Config Generator
- Step 3: Set Hyperparameter Values
- 设置固定值
- 定义超参数扫描值
- Step 4: 运行超参数助手脚本
- observation
- 观测组件:
- 观测流:
- 修改和添加自定义模态:
- pretrained_representations
- 术语
- 示例
- Tensor Collections
- TensorUtils
- Using Pretrained Models
- 评估训练过的策略
Overview
Overview
robomimic 框架由多个模块化组件组成,这些组件相互协作来训练和评估策略:
- 实验配置:配置对象定义了训练运行的所有设置
- 数据:一个 hdf5 数据集被加载到数据加载器中,数据加载器为算法提供小批量数据
- 训练:一个算法对象训练一组模型(包括策略)
- 评估:通过执行一组回合来在环境中评估策略
- 日志记录:实验统计信息、模型检查点和视频被保存到磁盘
这些模块由 robomimic 目录结构进行封装:
examples
:示例代码,帮助更好地理解代码库中的模块化组件robomimic/algo
:策略学习算法的实现robomimic/config
:默认的算法配置robomimic/envs
:环境的封装,用于评估回合robomimic/exps/templates
:实验的配置模板robomimic/models
:网络实现robomimic/scripts
:主要的仓库脚本robomimic/utils
:一组实用工具,包括用于加载数据集的 SequenceDataset 类,以及用于操作嵌套张量字典的 TensorUtils
algo
Initialization
创建 Algo
类实例的标准入口点是 algo/algo.py
中的 algo_factory
函数。这个函数使用了一个从算法名称(例如 "bc"
)到特殊的 algo_config_to_class
函数的映射,该函数负责读取 algo_config
(配置中的 config.algo
部分),并返回适当的算法类名称以实例化,同时传递所需的额外关键字参数。这是必要的,因为算法实际上可能有多个子类,具有不同的功能 —— 例如,BC 算法有 BC_GMM
类用于训练 GMM 策略,BC_RNN
类用于训练 RNN 策略。
因此,每个算法文件(例如 algo/bc.py
)实现了一个 algo_config_to_class
函数。该函数应使用 register_algo_factory_func
装饰器,并带有算法名称(例如 "bc"
)——这将把该函数注册到 algo_factory
使用的注册表中。算法名称应与对应算法的配置类中的 ALGO_NAME
属性匹配(对于 BC 来说,这是在 configs/bc_config.py
中的 BCConfig
类)。以下是 algo/bc.py
中的实现:
@register_algo_factory_func("bc")
def algo_config_to_class(algo_config):
"""
Maps algo config to the BC algo class to instantiate, along with additional algo kwargs.
Args:
algo_config (Config instance): algo config
Returns:
algo_class: subclass of Algo
algo_kwargs (dict): dictionary of additional kwargs to pass to algorithm
"""
# note: we need the check below because some configs import BCConfig and exclude
# some of these options
gaussian_enabled = ("gaussian" in algo_config and algo_config.gaussian.enabled)
gmm_enabled = ("gmm" in algo_config and algo_config.gmm.enabled)
vae_enabled = ("vae" in algo_config and algo_config.vae.enabled)
rnn_enabled = algo_config.rnn.enabled
transformer_enabled = algo_config.transformer.enabled
if gaussian_enabled:
if rnn_enabled:
raise NotImplementedError
elif transformer_enabled:
raise NotImplementedError
else:
algo_class, algo_kwargs = BC_Gaussian, {}
elif gmm_enabled:
if rnn_enabled:
algo_class, algo_kwargs = BC_RNN_GMM, {}
elif transformer_enabled:
algo_class, algo_kwargs = BC_Transformer_GMM, {}
else:
algo_class, algo_kwargs = BC_GMM, {}
elif vae_enabled:
if rnn_enabled:
raise NotImplementedError
elif transformer_enabled:
raise NotImplementedError
else:
algo_class, algo_kwargs = BC_VAE, {}
else:
if rnn_enabled:
algo_class, algo_kwargs = BC_RNN, {}
elif transformer_enabled:
algo_class, algo_kwargs = BC_Transformer, {}
else:
algo_class, algo_kwargs = BC, {}
return algo_class, algo_kwargs
Important Class Methods
在本节中,我们列出了每个 Algo
子类需要实现或重写的重要类方法,并按其通常在初始化、训练或测试时调用进行分类。
- 初始化
_create_networks(self)
- 在类初始化时调用 - 应该构建网络并将其放入
self.nets
的ModuleDict
中。
- 在类初始化时调用 - 应该构建网络并将其放入
- 训练
process_batch_for_training(self, batch)
- 接受从数据加载器中采样的一批数据,并过滤出算法所需的相关部分。还应将批次数据发送到正确的设备(CPU 或 GPU)。
train_on_batch(self, batch, epoch, validate=False)
- 接受一个处理后的批次,并在该批次数据上训练所有网络,考虑到当前的epoch编号和这是训练批次还是验证批次。这是训练的主要逻辑所在(例如,网络的前向和反向传播)。应返回一个包含重要训练统计信息的字典(例如,批次上的损失、梯度范数等)。
log_info(self, info)
- 接受
train_on_batch
的输出,并返回一个处理后的字典,用于TensorBoard日志记录。
- 接受
set_train(self)
- 为训练准备网络模块。默认情况下,仅调用
self.nets.train()
,但某些算法可能希望始终将一部分网络设置为评估模式(例如,BCQ的目标网络)。在这种情况下,它们应重写此方法。
- 为训练准备网络模块。默认情况下,仅调用
on_epoch_end(self, epoch)
- 在每个训练epoch结束时调用。通常包括调整学习率调度器(如果正在使用)。
serialize(self)
- 返回包含当前模型参数的状态字典。用于生成代理检查点。默认情况下,返回
self.nets.state_dict()
- 通常仅在像HBC和IRIS这样的层次算法中需要重写,以从子算法中收集状态字典。
- 返回包含当前模型参数的状态字典。用于生成代理检查点。默认情况下,返回
- 测试
set_eval(self)
- 为评估准备网络模块。默认情况下,仅调用
self.nets.eval()
,但像HBC和IRIS这样的层次算法会重写此方法,以调用其子算法的set_eval
。
- 为评估准备网络模块。默认情况下,仅调用
deserialize(self, model_dict)
serialize
的反向操作 - 加载模型权重。用于测试时恢复模型权重。
get_action(self, obs_dict, goal_dict=None)
- 测试时调用的主要方法,给定观察值返回一个或多个动作。
reset(self)
- 在每个回合开始时调用,以清除内部代理状态,开始新的回合。例如,
BC_RNN
会重置步数计数器和隐藏状态。
- 在每个回合开始时调用,以清除内部代理状态,开始新的回合。例如,
Train Loop
我们在 examples/simple_train_loop.py
中重现了一个简化版的训练循环,以展示 Algo
实例的方法在训练过程中的使用。
# @model 应该是 Algo 类的实例,用于训练
# @data_loader 应该是 torch.utils.data.DataLoader 的实例,用于采样批次
# 训练 50 个 epoch,每个 epoch 进行 100 次梯度更新
num_epochs = 50
gradient_steps_per_epoch = 100
# 确保模型处于训练模式
model.set_train()
for epoch in range(1, num_epochs + 1): # epoch 从 1 开始
# 获取数据加载器的迭代器 - 它会生成批次
data_loader_iter = iter(data_loader)
# 记录损失
losses = []
for _ in range(gradient_steps_per_epoch):
# 从数据加载器中加载下一批次
try:
batch = next(data_loader_iter)
except StopIteration:
# 数据加载器没有更多批次 - 重置并返回第一批次
data_loader_iter = iter(data_loader)
batch = next(data_loader_iter)
# 处理批次数据以进行训练
input_batch = model.process_batch_for_training(batch)
# 前向和反向传播
info = model.train_on_batch(batch=input_batch, epoch=epoch, validate=False)
# 记录损失
step_log = model.log_info(info)
losses.append(step_log["Loss"])
# 保存模型
model_params = model.serialize()
model_dict = dict(model=model.serialize())
torch.save(model_dict, "/path/to/ckpt.pth")
# 在每个 epoch 结束后,模型需要执行的操作
model.on_epoch_end(epoch)
Test Time
我们重现了 utils/file_utils.py
中 policy_from_checkpoint
函数的部分逻辑,以展示如何使用 Algo
方法加载模型。
# 加载检查点
ckpt_dict = maybe_dict_from_checkpoint(ckpt_path="/path/to/ckpt.pth")
algo_name = ckpt_dict["algo_name"]
config, _ = config_from_checkpoint(algo_name=algo_name, ckpt_dict=ckpt_dict)
# 创建 Algo 实例
model = algo_factory(
algo_name,
config,
obs_key_shapes=ckpt_dict["shape_metadata"]["all_shapes"],
ac_dim=ckpt_dict["shape_metadata"]["ac_dim"],
device=device,
)
# 加载权重
model.deserialize(ckpt_dict["model"])
model.set_eval()
# 将模型包装为 RolloutPolicy
model = RolloutPolicy(model)
Configs
demo
配置是 config/config.py
中定义的 Config
类的实例。其实现主要基于 addict,使得操作嵌套字典变得非常方便。配置本质上是嵌套字典(类似于加载的 json 对象),通过使用属性可以轻松访问和设置嵌套的键。下面是一个示例:
from rob
omimic.config import Config
# 创建嵌套字典并读取值的常规方法
c = dict()
c["experiment"] = dict()
c["experiment"]["save"] = dict()
c["experiment"]["save"]["enabled"] = True
print("save enabled: {}".format(c["experiment"]["save"]["enabled"]))
# 可以使用点语法来创建嵌套字典
c = Config()
c.experiment.save.enabled = True
# 也可以使用点语法访问属性
print("save enabled: {}".format(c.experiment.save.enabled))
在 Config
对象和 JSON 之间进行转换也非常简单,这在将配置对象保存到磁盘时非常方便(例如,在生成新的配置 JSON 用于训练,或在保存模型检查点时保存配置),以及从 JSON 加载配置时。
# 将配置转储为 JSON 字符串
json_string = c.dump()
# 将配置转储到 JSON 文件
c.dump(filename="c.json")
# 从 JSON 加载配置
import json
json_dic = json.load(json_string)
c = Config(json_dic)
所有算法配置类(每种算法类型一个类)都是 BaseConfig
类的子类(定义在 config/base_config.py
中),而 BaseConfig
类又是通用的 Config
类的子类。算法配置类保存在一个全局注册表中(更多细节稍后说明)。generate_config_templates.py
脚本利用上述功能,通过为每个算法类实例化默认配置对象,轻松生成模板配置 JSON 文件。每次修改默认设置或添加新配置设置时,都应该运行此脚本。 下面是代码片段。
import os
import json
import robomimic
from robomimic.config import get_all_registered_configs
# 将模板配置 JSON 存储在此目录
target_dir = os.path.join(robomimic.__path__[0], "exps/templates/")
# 遍历注册的算法配置类
all_configs = get_all_registered_configs()
for algo_name in all_configs:
# 为此算法创建配置类
c = all_configs[algo_name]()
# 转储为 JSON
json_path = os.path.join(target_dir, "{}.json".format(algo_name))
c.dump(filename=json_path)
接下来,我们将通过讨论 BaseConfig
类,介绍算法配置类的一般结构。BaseConfig
类比通用的 Config
类具有更具体的功能,专门用于本仓库中的使用。
Config Factory
BaseConfig
类具有一个名为 ALGO_NAME
的属性和类方法,所有子类都必须填写该属性——它应与每个配置的算法名称对应,并且应与每个算法实现文件顶部通过 register_algo_factory_func
注册的算法名称匹配。例如,对于 BCConfig
,ALGO_NAME = "bc"
。此属性非常重要,因为它确保所有继承自 BaseConfig
的配置都能注册到 REGISTERED_CONFIGS
全局注册表中(ALGO_NAME
作为键将类注册到注册表中)。这也允许 config_factory
函数轻松创建适当的配置类——这是在代码库中创建配置对象的标准入口点。虽然在训练过程中也可以使用配置的 JSON 文件加载配置(如文档中的快速开始部分所讨论),但 config_factory
函数仍然用于创建初始配置对象,随后该对象会被更新(见下面的示例)。
import json
from robomimic.config import config_factory
# 使用默认值创建算法的基础配置
config = config_factory("bc")
# 使用配置 JSON 更新默认设置
with open("/path/to/config.json", "r") as f:
ext_config_json = json.load(f)
config.update(ext_config_json)
在测试时,当从检查点加载模型时,配置通过读取检查点中的 JSON 字符串来恢复,然后使用该字符串实例化配置。以下是一个示例(略微修改自 config_from_checkpoint
,位于 utils/file_utils.py
中):
import robomimic.utils.file_utils as FileUtils
ckpt_dict = FileUtils.load_dict_from_checkpoint("path/to/ckpt.pth")
config_json = ckpt_dict["config"]
config = config_factory(ckpt_dict["algo_name"], dic=json.loads(config_json))
Config Structure
BaseConfig
类用于组织和结构化算法和实验的配置。它的每个方法都对应训练和评估流程中的一个重要部分。我们将详细介绍 BaseConfig
类的四个重要部分:
-
experiment_config(self)
- 功能:该方法负责配置实验设置,涉及以下参数:
- 训练运行的 名称。
- 是否进行 日志记录。
- 是否在训练过程中 保存模型,以及保存频率。
- 是否在训练过程中 渲染视频。
- 训练过程中 执行rollout的频率。
- 默认实现:
BaseConfig
类提供了该方法的默认实现,通常不需要子类重写,因为它涵盖了大多数算法的通用需求。
- 功能:该方法负责配置实验设置,涉及以下参数:
-
train_config(self)
- 功能:该方法配置与训练过程相关的设置,涉及以下内容:
- 用于训练的 数据集。
- 与 数据加载器 相关的设置,包括批大小、是否打乱数据以及其他数据处理选项。
- 默认实现:
BaseConfig
类为训练配置提供了默认设置,通常适用于大多数场景。
- 功能:该方法配置与训练过程相关的设置,涉及以下内容:
-
algo_config(self)
- 功能:该方法配置
config.algo
属性,负责配置与 算法相关的设置,这些设置决定了算法在训练和测试时的行为,具体包括:- 算法的 超参数(例如学习率、批大小等)。
- 与 模型结构 相关的设置。
- 算法内部机制的配置(例如探索策略或动作策略等)。
- 需要重写:与前两个方法不同,
algo_config
方法 必须 在每个子类中实现,因为每个算法有不同的参数,它们会影响算法的训练和评估。
- 功能:该方法配置
-
observation_config(self)
- 功能:该方法配置与观察(输入)相关的设置,填充
config.observation
属性,决定:- 算法使用的 观察模态(例如视觉、位置、环境传感器数据等)。
- 这些观察信息如何被 网络编码 或处理。
- 默认实现:
BaseConfig
类提供了该方法的默认实现,通常不需要重写。但是某些算法可能需要定制观察配置,尤其是当算法使用多个或复杂的输入模态时(例如图像输入、传感器数据等)。
- 功能:该方法配置与观察(输入)相关的设置,填充
Config Locking
为了防止意外修改配置,每个 Config
对象实现了两级锁定机制:key-locked 和 all-locked。具体而言,Config
对象通过以下方式进行锁定:
-
key-locked 状态
- 通过调用
config.lock_keys()
,将Config
对象设置为 key-locked 状态。在 key-locked 状态下,Config
对象 不允许添加新键,但允许修改已有键的值。
- 通过调用
-
all-locked 状态
- 通过调用
config.lock()
,将Config
对象设置为 all-locked 状态。在 all-locked 状态下,Config
对象 既不允许添加新键,也不允许修改现有键的值。
- 通过调用
默认锁定状态
所有继承自 BaseConfig
构造函数的 Config
对象,默认处于 key-locked 状态。构造完成后,建议通过调用 config.lock()
将 Config
对象转为 all-locked 状态。
配置修改的上下文管理器
Config
对象还实现了上下文管理器函数,允许在特定范围内方便且受控地修改配置:
-
value_unlocked()
该上下文范围允许修改现有键的值,即使Config
对象处于 all-locked 状态。 -
unlocked()
该上下文范围允许 添加新键,以及修改现有键的值,不管当前的锁定状态如何。
dataset
robomimic.utils.dataset.SequenceDataset
类扩展了 PyTorch 默认的 torch.utils.data.Dataset
类,以便与我们的演示 数据集 进行交互。该类支持按索引访问演示子序列(而不是单独的状态),并支持按需获取和内存缓存。本文将介绍 SequenceDataset
接口的关键概念。请参阅官方 PyTorch 文档 和简短的 示例,了解如何使用 Dataset
和 DataLoader
接口构建训练管道。同时,参见 已发布的数据集部分,获取有关下载与本仓库兼容的数据集的指导。
示例数据集对象
dataset = SequenceDataset(
hdf5_path=dataset_path, # 数据集的路径
obs_keys=( # 我们希望出现在批次中的观察数据
"robot0_eef_pos",
"robot0_eef_quat",
"image",
"object",
),
dataset_keys=( # 可选地指定更多的键,若它们也应出现在批次中
"actions",
"rewards",
"dones",
),
seq_length=10, # 长度为10的时间序列
pad_seq_length=True, # 填充轨迹中的最后一个观察数据,以确保所有序列都被采样
hdf5_cache_mode="all", # 将数据集缓存到内存中,以避免重复的文件 I/O
hdf5_normalize_obs=False, # 不对观察数据进行标准化
filter_by_attribute=None, # 可选地提供一个筛选键,用于数据筛选
)
参数说明
hdf5_path
- 指定包含训练演示数据的 HDF5 文件的绝对路径或相对路径。请参见 datasets 页面,了解预期的数据结构。
obs_keys
- 一个字符串列表,指定从数据集中读取哪些观察模态。通常从配置文件中读取:我们的实现会从
config.observation.modalities.obs.low_dim
和config.observation.modalities.obs.rgb
中汇总观察键。
- 一个字符串列表,指定从数据集中读取哪些观察模态。通常从配置文件中读取:我们的实现会从
dataset_keys
- 要从演示中读取的非观察数据的键。通常包括
actions
、rewards
、dones
等。
- 要从演示中读取的非观察数据的键。通常包括
seq_length
- 要获取的演示子序列的长度。
pad_seq_length
- 是否允许获取超出序列末尾的子序列。例如,给定长度为 10 的演示和
seq_length=10
,如果设置pad_seq_length=True
,则允许数据集对象通过重复最后一帧 5 次来访问__get_item(index=5)__
位置的子序列。
- 是否允许获取超出序列末尾的子序列。例如,给定长度为 10 的演示和
hdf5_cache_mode
- 可选地将数据集缓存到内存中以加快访问速度。数据集支持三种缓存模式:
["all", "low_dim", None]
。all
:将整个数据集加载到 RAM 中。此模式可以最大化数据加载速度,但会占用较大的内存空间。推荐用于较小的数据集或低维度观察数据。low_dim
:仅将低维度观察数据加载到 RAM 中。始终使用此模式,因为加载低维数据会产生一定的开销。低维观察数据在config.observation.modalities.obs.low_dim
中指定。None
:按需获取数据。
- 可选地将数据集缓存到内存中以加快访问速度。数据集支持三种缓存模式:
hdf5_normalize_obs
- 如果为
True
,通过计算每个观察数据的均值和标准差(在每个维度和模态下),并将每个维度的均值和方差归一化,来对观察数据进行标准化。
- 如果为
filter_by_attribute
- 如果提供,则使用给定的筛选键查找要加载的演示子集。有关更多信息,请参见 筛选键 文档。
model
robomimic 实现了一套可重用的网络模块,具有不同的抽象层级,使得创建新的策略模型变得更加容易。
基础模块
为了在处理多个模态时支持自动的形状解析,所有基本模块(如 MLP 和 ConvNets)都继承自 robomimic.models.base_nets.Module
,该类要求实现抽象方法 output_shape(self, input_shape=None)
。此函数根据外部的 input_shape
或内部实例变量来解析模块的输出形状(不包括批次维度)。要实现新的基础模块,只需继承 robomimic.models.Module
或 robomimic.models.ConvBase
(如果是添加卷积网络),并实现抽象函数。下面是一个 ResNet-18 基础模块的示例实现:
from robomimic.models.base_nets import ConvBase
class ResNet18Conv(ConvBase):
...
def output_shape(self, input_shape):
"""
计算从输入到此模块输出的形状的函数。
参数:
input_shape (int 的可迭代对象): 输入的形状,不包括批次维度。
某些模块可能不需要此参数,如果它们的输出不依赖于输入的大小,
或者假设输入是固定大小的。
返回:
out_shape ([int]): 对应输出形状的整数列表
"""
assert(len(input_shape) == 3)
out_h = int(math.ceil(input_shape[1] / 32.))
out_w = int(math.ceil(input_shape[2] / 32.))
return [512, out_h, out_w]
EncoderCore
我们创建了 EncoderCore
抽象类,用于封装任何旨在编码特定类型观测模态的网络(例如:VisualCore
用于 RGB 和深度观测,ScanCore
用于激光扫描器观测等)。当从 EncoderCore
子类化新的编码器类时,它将自动在 robomimic 内部注册,从而允许用户在其配置中直接以字符串形式引用自定义的编码器类。例如,如果用户指定了一个自定义的基于 EncoderCore
的类 MyCustomRGBEncoder
来编码 RGB 观测,他们可以直接在配置中设置如下:
config.observation.encoder.rgb.core_class = "MyCustomRGBEncoder"
config.observation.encoder.rgb.core_kwargs = ...
任何需要传递给编码器构造函数的相应关键字参数应在配置中的 core_kwargs
中指定。有关创建自定义编码器的更多信息,请参阅我们的示例脚本。
下面,我们提供了用于编码 RGB 和深度观测(VisualCore
)以及范围扫描器观测(ScanCore
)的具体 EncoderCore
类的描述。
VisualCore
我们提供了一个 VisualCore
模块,用于构建自定义的视觉架构。VisualCore
包括一个主干网络,用于特征化图像输入 —— 通常是一个 ConvBase
模块 —— 以及一个池化模块,用于将特征张量减少为固定大小的向量表示。下面是一个基于 ResNet18Conv
主干和 SpatialSoftmax
(论文)池化模块构建的 VisualCore
示例。
from robomimic.models.obs_core import VisualCore
from robomimic.models.base_nets import ResNet18Conv, SpatialSoftmax
vis_net = VisualCore(
input_shape=(3, 224, 224),
core_class="ResNet18Conv", # 使用 ResNet18 作为视觉核心的主干网络
core_kwargs={"pretrained": False, "input_coord_conv": False}, # ResNet18Conv 类的参数
pool_class="SpatialSoftmax", # 使用空间 Softmax 作为池化模块
pool_kwargs={"num_kp": 32}, # SpatialSoftmax 的参数 --- 使用 32 个关键点
flatten=True, # 将空间 Softmax 层的输出展平
feature_dimension=64, # 通过线性层将展平后的特征投影到 64 维向量
)
新的视觉主干网络和池化类可以通过继承 ConvBase
来添加。
ScanCore
我们提供了一个 ScanCore
模块,用于构建自定义的激光扫描器架构。ScanCore
包括一个 1D 卷积主干网络 (Conv1dBase
),用于特征化高维 1D 输入,以及一个池化模块,用于将特征张量减少为固定大小的向量表示。下面是一个使用 SpatialSoftmax
(论文)池化模块的 ScanCore
网络示例。
from robomimic.models.obs_core import ScanCore
from robomimic.models.base_nets import SpatialSoftmax
vis_net = ScanCore(
input_shape=(1, 120),
conv_kwargs={
"out_channels": [32, 64, 64],
"kernel_size": [8, 4, 2],
"stride": [4, 2, 1],
}, # 为每个 Conv1d 层传递的 kwarg 设置
conv_activation="relu", # 在每个 Conv1d 层之间使用 ReLU 激活
pool_class="SpatialSoftmax", # 使用空间 Softmax 作为池化模块
pool_kwargs={"num_kp": 32}, # SpatialSoftmax 的参数 --- 使用 32 个关键点
flatten=True, # 将空间 Softmax 层的输出展平
feature_dimension=64, # 通过线性层将展平后的特征投影到 64 维向量
)
随机化器 (Randomizers)
随机化器是 Module
类,它在训练过程中(以及可选的评估过程中)对网络输入进行扰动。一个 Randomizer
实现了 forward_in
和 forward_out
函数,分别用于处理神经网络模块的输入和输出。例如,CropRandomizer
实例的 forward_in
函数通过对输入图像进行随机裁剪来扰动图像(如下图所示)。如果 CropRandomizer
配置为对每个输入图像进行多个随机裁剪(n_crops > 1
),它将将所有随机裁剪的图像送入图像网络,并通过对 n_crops
维度的输出进行平均池化来减少网络输出。
Randomizer
模块通常与 ObservationEncoder
一起使用 —— 详情请参见下一节。可以通过继承 Randomizer
类并实现必要的抽象函数来实现额外的随机化器类。
可视化随机化输入: 若要可视化原始和随机化后的图像输入,请在 robomimic/macros.py
中设置 VISUALIZE_RANDOMIZER = True
。
观察编码器和解码器 (Observation Encoder and Decoder)
ObservationEncoder
和 ObservationDecoder
是处理观察字典输入和输出的基本构建模块。它们设计用于接收多个观测模态的输入(例如,包含图像和机器人本体感觉信号的字典),并输出包含动作和子目标等预测值的字典。下面是如何通过 register_obs_key
函数手动创建 ObservationEncoder
实例的示例。
from robomimic.models.base_nets import MLP
from robomimic.models.obs_core import VisualCore, CropRandomizer
from robomimic.models.obs_nets import ObservationEncoder, ObservationDecoder
obs_encoder = ObservationEncoder(feature_activation=torch.nn.ReLU)
# 有两种方式构建用于处理输入模态的网络。
# 假设我们正在处理形状为 (3, 224, 224) 的图像输入。
camera1_shape = [3, 224, 224]
image_randomizer = CropRandomizer(input_shape=camera2_shape, crop_height=200, crop_width=200)
# 我们将使用可配置的图像处理骨干网络 VisualCore 来处理输入图像模态
net_class = "VisualCore" # 该类在 models/base_nets.py 中定义
# VisualCore 网络的参数
net_kwargs = {
"input_shape": camera1_shape,
"core_class": "ResNet18Conv", # 使用 ResNet18 作为 VisualCore 的骨干网络
"core_kwargs": {"pretrained": False, "input_coord_conv": False},
"pool_class": "SpatialSoftmax", # 使用空间 Softmax 来规范化模型输出
"pool_kwargs": {"num_kp": 32}
}
# 注册处理该模态的网络
obs_encoder.register_obs_key(
name="camera1",
shape=camera1_shape,
net_class=net_class,
net_kwargs=net_kwargs,
randomizer=image_randomizer
)
# 我们还可以在编码器中混合低维度的观察,例如本体感觉信号
proprio_shape = [12]
net = MLP(input_dim=12, output_dim=32, layer_dims=(128,), output_activation=None)
obs_encoder.register_obs_key(
name="proprio",
shape=proprio_shape,
net=net
)
默认情况下,每个模态网络应将输入观测流减少为一个固定大小的向量。ObservationEncoder
的 forward
函数输出的结果是所有这些向量的拼接。拼接的顺序是确定的,并与注册模态时的顺序相同。ObservationGroupEncoder
进一步支持对嵌套的观测组进行编码,例如 obs
、goal
和 subgoal
。这使得构建目标条件和/或子目标条件的策略模型成为可能。
然而,当创建策略模型时,列举所有模态可能会变得繁琐。创建 ObservationEncoder
的标准入口是通过 robomimic.models.obs_nets
中的 observation_encoder_factory
函数。该函数将列举所有观测模态,并根据 config.observation
配置部分提供的配置初始化所有模态网络。
ObservationDecoder
类相对简单。它只是一个单输入、多输出头的 MLP。以下代码片段创建了一个 ObservationDecoder
,它将观测编码器的输出作为输入,并输出两个动作预测。
obs_decoder = ObservationDecoder(
input_feat_dim=obs_encoder.output_shape()[0],
decode_shapes=OrderedDict({"action_pos": (3,), "action_orn": (4,)})
)
tutorials
Config
Robomimic 使用一个集中式的 配置系统 来指定各个层次的 (超)参数。下面介绍了两种配置和启动训练的方式。
警告!不要修改默认配置!
不要直接修改默认配置文件,如 config/bc_config.py
,特别是在使用版本控制(如 git)管理代码时。修改这些文件会改变默认设置,很容易忘记这些更改,或者无意中提交这些更改,使其成为新的默认配置。
更多关于配置对象的信息,请参考 配置文档,以及关于超参数搜索的教程 超参数扫描教程。
1. 使用配置 json(推荐)
指定训练参数的推荐方式是通过 --config
参数将配置 json 传递给主训练脚本 train.py
。数据集可以通过设置配置 json 中 train
部分的 data
属性来指定,或者通过 --dataset
参数来指定。下面的例子运行了一个 BC 算法的默认模板 json。这是启动训练任务的推荐方式。
$ python train.py --config ../exps/templates/bc.json --dataset ../../tests/assets/test.hdf5
请参见 超参数助手文档,了解如何轻松生成 json 配置文件来启动训练任务。
2. 在代码中构建配置对象
启动训练的另一种方式是创建一个默认配置(例如通过 config = config_factory(algo_name="bc")
),在 Python 代码中修改配置,然后调用训练函数,像 examples/train_bc_rnn.py
脚本中那样。
import robomimic
import robomimic.utils.torch_utils as TorchUtils
from robomimic.config import config_factory
from robomimic.scripts.train import train
# 创建默认 BC 配置
config = config_factory(algo_name="bc")
# 在这里设置你想要更新的配置项
config.experiment.name = "bc_rnn_example"
config.train.data = "/path/to/dataset.hdf5"
config.train.output_dir = "/path/to/desired/output_dir"
config.train.batch_size = 256
config.train.num_epochs = 500
config.algo.gmm.enabled = False
# 获取 torch 设备
device = TorchUtils.get_torch_device(try_to_use_cuda=True)
# 启动训练任务
train(config, device=device)
实现自定义算法
本教程提供了在 robomimic 中实现自定义算法的示例。我们选择实现最近提出的 TD3-BC 算法。
包括以下步骤:
- 实现 TD3-BC 的自定义
Config
类。 - 实现 TD3-BC 的自定义
Algo
类。
实现 Config 类
我们将在 config/td3_bc_config.py
中实现 Config
类。我们实现一个名为 TD3_BCConfig
的配置类,继承自 BaseConfig
。特别地,我们设置类变量 ALGO_NAME = "td3_bc"
以将此配置注册到该算法名称下。我们实现 algo_config
函数来填充 config.algo
中所需的键 —— 它与 BCQConfig
的实现非常相似。以下是部分代码:
class TD3_BCConfig(BaseConfig):
ALGO_NAME = "td3_bc"
def algo_config(self):
# 优化参数
self.algo.optim_params.critic.learning_rate.initial = 3e-4 # critic 学习率
self.algo.optim_params.critic.learning_rate.decay_factor = 0.1 # 学习率衰减因子(如果存在 epoch 调度)
self.algo.optim_params.critic.learning_rate.epoch_schedule = [] # 学习率衰减的 epoch
self.algo.optim_params.critic.regularization.L2 = 0.00 # L2 正则化强度
self.algo.optim_params.critic.start_epoch = -1 # 开始训练 critic 的 epoch 数(-1 表示立即开始)
self.algo.optim_params.critic.end_epoch = -1 # 结束 critic 训练的 epoch 数(-1 表示立即结束)
self.algo.optim_params.actor.learning_rate.initial = 3e-4 # actor 学习率
self.algo.optim_params.actor.learning_rate.decay_factor = 0.1 # 学习率衰减因子(如果存在 epoch 调度)
self.algo.optim_params.actor.learning_rate.epoch_schedule = [] # 学习率衰减的 epoch
self.algo.optim_params.actor.regularization.L2 = 0.00 # L2 正则化强度
self.algo.optim_params.actor.start_epoch = -1 # 开始训练 actor 的 epoch 数(-1 表示立即开始)
self.algo.optim_params.actor.end_epoch = -1 # 结束 actor 训练的 epoch 数(-1 表示立即结束)
# alpha 值 - 用于权衡 critic 损失与 BC 损失
self.algo.alpha = 2.5
# 目标网络相关参数
self.algo.discount = 0.99 # 折扣因子
self.algo.n_step = 1 # 使用 n 步回报进行 TD 更新
self.algo.target_tau = 0.005 # 目标网络更新速率
self.algo.infinite_horizon = False # 如果为 True,按 1 / (1 - 折扣因子) 规范化终止奖励,以视为无限期
# ================== Critic 网络配置 ===================
self.algo.critic.use_huber = False # 使用 Huber 损失而非 L2 损失
self.algo.critic.max_gradient_norm = None # 对 critic 的 L2 梯度裁剪(None 表示不进行裁剪)
self.algo.critic.value_bounds = None # 可选的 2 元组,用于确保值估计的上下边界
# critic 集成参数(TD3 技巧)
self.algo.critic.ensemble.n = 2 # critic 集成中的 Q 网络数量
self.algo.critic.ensemble.weight = 1.0 # 用于混合 min 和 max 作为目标 Q 值的权重
self.algo.critic.layer_dims = (256, 256, 256) # critic MLP 的大小
# ================== Actor 网络配置 ===================
# 更新 actor 和目标网络的频率,每进行一次 critic 梯度更新时更新一次 actor
self.algo.actor.update_freq = 2
# 用于形成 Q 更新目标动作的探索噪声 - 被截断的高斯噪声
self.algo.actor.noise_std = 0.2 # 动作中加入的零均值高斯噪声的标准差
self.algo.actor.noise_clip = 0.5 # 噪声在每个维度上的截断范围
self.algo.actor.layer_dims = (256, 256, 256) # actor MLP 的大小
通常,我们只需要实现 algo_config
函数以填充 config.algo
中算法所需的键,但我们还更新了 experiment_config
函数和 observation_config
函数,使得从论文中的 gym
环境中重现实验更加容易。具体细节请参阅源文件。
最后,我们在 config/__init__.py
文件中添加了 from robomimic.config.td3_bc_config import TD3_BCConfig
,以确保该 Config
子类被 robomimic
注册。
实现 Algo 类
我们将在 algo/td3_bc.py
中实现 Algo
类。如 算法文档 所述,首先需要实现 algo_config_to_class
方法 —— 这一步相对简单,因为我们没有多个变体。我们特别注意确保将此函数注册为与我们为配置定义的算法名称相同("td3_bc"
)。
@register_algo_factory_func("td3_bc")
def algo_config_to_class(algo_config):
"""
将算法配置映射到 TD3_BC 算法类以进行实例化,并传递附加的算法关键字参数。
参数:
algo_config (Config 实例): 算法配置
返回:
algo_class: Algo 的子类
algo_kwargs (dict): 一个字典,包含附加的关键字参数,传递给算法
"""
# 目前只有一个 TD3_BC 的变体
return TD3_BC, {}
接下来,我们将描述如何实现 算法文档 中概述的方法。由于它们的实现与 BCQ
实现非常相似,我们省略了一些方法。我们首先定义类并实现 _create_networks
。代码使用了辅助函数 _create_critics
和 _create_actor
来创建 critic 和 actor 网络,类似于 BCQ
的实现。
class TD3_BC(PolicyAlgo, ValueAlgo):
def _create_networks(self):
"""
创建网络并将它们放入 @self.nets 中。
"""
self.nets = nn.ModuleDict()
self._create_critics()
self._create_actor()
# 在训练开始时同步目标网络
with torch.no_grad():
for critic_ind in range(len(self.nets["critic"])):
TorchUtils.hard_update(
source=self.nets["critic"][critic_ind],
target=self.nets["critic_target"][critic_ind],
)
TorchUtils.hard_update(
source=self.nets["actor"],
target=self.nets["actor_target"],
)
self.nets = self.nets.float().to(self.device)
def _create_critics(self):
critic_class = ValueNets.ActionValueNetwork
critic_args = dict(
obs_shapes=self.obs_shapes,
ac_dim=self.ac_dim,
mlp_layer_dims=self.algo_config.critic.layer_dims,
value_bounds=self.algo_config.critic.value_bounds,
goal_shapes=self.goal_shapes,
**ObsNets.obs_encoder_args_from_config(self.obs_config.encoder),
)
# Q 网络集成和目标集成
self.nets["critic"] = nn.ModuleList()
self.nets["critic_target"] = nn.ModuleList()
for _ in range(self.algo_config.critic.ensemble.n):
critic = critic_class(**critic_args)
self.nets["critic"].append(critic)
critic_target = critic_class(**critic_args)
self.nets["critic_target"].append(critic_target)
def _create_actor(self):
actor_class = PolicyNets.ActorNetwork
actor_args = dict(
obs_shapes=self.obs_shapes,
goal_shapes=self.goal_shapes,
ac_dim=self.ac_dim,
mlp_layer_dims=self.algo_config.actor.layer_dims,
**ObsNets.obs_encoder_args_from_config(self.obs_config.encoder),
)
self.nets["actor"] = actor_class(**actor_args)
self.nets["actor_target"] = actor_class(**actor_args)
接下来,我们描述 train_on_batch
函数,它实现了主要的训练逻辑。该函数使用 _train_critic_on_batch
辅助函数训练 critic,然后使用 _train_actor_on_batch
辅助函数训练 actor(根据 config.algo.actor.update_freq
配置变量,actor 的训练速度较慢,和原作者的实现相同)。最后,使用 TorchUtils.soft_update
将目标网络的参数向当前网络的参数稍微调整。
def train_on_batch(self, batch, epoch, validate=False):
"""
在单个数据批次上进行训练。
参数:
batch (dict): 包含从数据加载器中采样的 torch.Tensors,并且通过 @process_batch_for_training 进行过滤的字典。
epoch (int): 纪元号 - 某些算法需要进行分阶段训练和早期停止时使用。
validate (bool): 如果为 True,则不执行任何学习更新。
返回:
info (dict): 包含与日志相关的输入、输出和损失的字典。
"""
with TorchUtils.maybe_no_grad(no_grad=validate):
info = PolicyAlgo.train_on_batch(self, batch, epoch, validate=validate)
# 训练 Critic
no_critic_backprop = validate or (not self._check_epoch(net_name="critic", epoch=epoch))
with TorchUtils.maybe_no_grad(no_grad=no_critic_backprop):
critic_info = self._train_critic_on_batch(
batch=batch,
epoch=epoch,
no_backprop=no_critic_backprop,
)
info.update(critic_info)
# 以较低的频率更新 Actor 和目标网络
if not no_critic_backprop:
# 仅在 Critic 训练梯度步骤时更新计数器
self.actor_update_counter += 1
do_actor_update = (self.actor_update_counter % self.algo_config.actor.update_freq == 0)
# 训练 Actor
no_actor_backprop = validate or (not self._check_epoch(net_name="actor", epoch=epoch))
no_actor_backprop = no_actor_backprop or (not do_actor_update)
with TorchUtils.maybe_no_grad(no_grad=no_actor_backprop):
actor_info = self._train_actor_on_batch(
batch=batch,
epoch=epoch,
no_backprop=no_actor_backprop,
)
info.update(actor_info)
if not no_actor_backprop:
# 为了与原始实现匹配,仅在 Actor 梯度步骤时更新目标网络
with torch.no_grad():
# 更新目标 Critic 网络
for critic_ind in range(len(self.nets["critic"])):
TorchUtils.soft_update(
source=self.nets["critic"][critic_ind],
target=self.nets["critic_target"][critic_ind],
tau=self.algo_config.target_tau,
)
# 更新目标 Actor 网络
TorchUtils.soft_update(
source=self.nets["actor"],
target=self.nets["actor_target"],
tau=self.algo_config.target_tau,
)
return info
下面,我们展示了训练 critic 的辅助函数,明确了 Bellman 备份如何用于构造 TD 损失。TD 损失的目标 Q 值是通过与 TD3 相同的方法获得的。
def _train_critic_on_batch(self, batch, epoch, no_backprop=False):
info = OrderedDict()
# 批次变量
s_batch = batch["obs"]
a_batch = batch["actions"]
r_batch = batch["rewards"]
ns_batch = batch["next_obs"]
goal_s_batch = batch["goal_obs"]
# 如果未完成,设为 1,否则设为 0
done_mask_batch = 1. - batch["dones"]
info["done_masks"] = done_mask_batch
# Bellman 备份生成 Q 目标
q_targets = self._get_target_values(
next_states=ns_batch,
goal_states=goal_s_batch,
rewards=r_batch,
dones=done_mask_batch,
)
info["critic/q_targets"] = q_targets
# 使用这个目标训练所有 critic
for critic_ind, critic in enumerate(self.nets["critic"]):
critic_loss = self._compute_critic_loss(
critic=critic,
states=s_batch,
actions=a_batch,
goal_states=goal_s_batch,
q_targets=q_targets,
)
info["critic/critic{}_loss".format(critic_ind + 1)] = critic_loss
if not no_backprop:
critic_grad_norms = TorchUtils.backprop_for_loss(
net=self.nets["critic"][critic_ind],
optim=self.optimizers["critic"][critic_ind],
loss=critic_loss,
max_grad_norm=self.algo_config.critic.max_gradient_norm,
)
info["critic/critic{}_grad_norms".format(critic_ind + 1)] = critic_grad_norms
return info
def _get_target_values(self, next_states, goal_states, rewards, dones):
"""
获取用于训练 Q 函数的 TD 损失目标值的辅助函数。
"""
with torch.no_grad():
# 通过目标 actor 和噪声获取下一个动作
next_target_actions = self.nets["actor_target"](next_states, goal_states)
noise = (
torch.randn_like(next_target_actions) * self.algo_config.actor.noise_std
).clamp(-self.algo_config.actor.noise_clip, self.algo_config.actor.noise_clip)
next_actions = (next_target_actions + noise).clamp(-1.0, 1.0)
# TD3 技巧,通过所有 Q 预测值的最大值和最小值进行组合,得到单一的目标值
all_value_targets = self.nets["critic_target"][0](next_states, next_actions, goal_states).reshape(-1, 1)
max_value_targets = all_value_targets
min_value_targets = all_value_targets
for critic_target in self.nets["critic_target"][1:]:
all_value_targets = critic_target(next_states, next_actions, goal_states).reshape(-1, 1)
max_value_targets = torch.max(max_value_targets, all_value_targets)
min_value_targets = torch.min(min_value_targets, all_value_targets)
value_targets = self.algo_config.critic.ensemble.weight * min_value_targets + \
(1. - self.algo_config.critic.ensemble.weight) * max_value_targets
q_targets = rewards + dones * self.discount * value_targets
return q_targets
def _compute_critic_loss(self, critic, states, actions, goal_states, q_targets):
"""
计算估计的 Q 值和目标 Q 值之间的损失的辅助函数。
"""
q_estimated = critic(states, actions, goal_states)
if self.algo_config.critic.use_huber:
critic_loss = nn.SmoothL1Loss()(q_estimated, q_targets)
else:
critic_loss = nn.MSELoss()(q_estimated, q_targets)
return critic_loss
接下来,我们展示了训练 actor 的辅助函数,actor 通过 TD3(DDPG)和 BC 损失的加权组合来进行训练。
def _train_actor_on_batch(self, batch, epoch, no_backprop=False):
info = OrderedDict()
# Actor 损失(使用 DDPG 损失和 BC 损失的加权组合更新)
s_batch = batch["obs"]
a_batch = batch["actions"]
goal_s_batch = batch["goal_obs"]
# lambda 加权系数是超参数 (alpha) 和 Q 值归一化的组合
actor_actions = self.nets["actor"](s_batch, goal_s_batch)
Q_values = self.nets["critic"][0](s_batch, actor_actions, goal_s_batch)
lam = self.algo_config.alpha / Q_values.abs().mean().detach()
actor_loss = -lam * Q_values.mean() + nn.MSELoss()(actor_actions, a_batch)
info["actor/loss"] = actor_loss
if not no_backprop:
actor_grad_norms = TorchUtils.backprop_for_loss(
net=self.nets["actor"],
optim=self.optimizers["actor"],
loss=actor_loss,
)
info["actor/grad_norms"] = actor_grad_norms
return info
最后,我们描述了 get_action
方法的实现——该方法在测试时用于回滚操作。实现非常简单——只需查询 actor 获取动作。
def get_action(self, obs_dict, goal_dict=None):
"""
获取策略的动作输出。
参数:
obs_dict (dict): 当前观察值
goal_dict (dict): (可选)目标
返回:
action (torch.Tensor): 动作张量
"""
assert not self.nets.training
return self.nets["actor"](obs_dict=obs_dict, goal_dict=goal_dict)
最后,我们将 from robomimic.algo.td3_bc import TD3_BC
添加到 algo/__init__.py
文件中,确保这个 Algo
子类被 robomimic
注册。
到此为止!完整实现可以查看 algo/td3_bc.py
,并与 algo/bcq.py
进行对比,了解这两个实现的相似之处。
现在,我们可以运行 generate_config_templates.py
脚本来生成新算法的 json 配置模板,并在我们期望的数据集上进行训练。
# 生成 ../exps/templates/td3_bc.json 配置模板
$ python generate_config_templates.py
# 开始训练
$ python train.py --config ../exps/templates/td3_bc.json --dataset /path/to/walker2d_medium_expert.hdf5
dataset
数据集内容与可视化
本教程展示如何查看 robomimic hdf5 数据集的内容。
查看 HDF5 数据集结构
注意:HDF5 数据集结构。
点击此处查看每个 hdf5 数据集的预期结构。
该仓库提供了一个简单的实用脚本(get_dataset_info.py
),用于查看 hdf5 数据集的结构和一些统计信息。脚本会显示以下信息:
- 关于轨迹的统计信息(数量、平均长度等)
- 数据集中的 filter keys
- 数据集中的 环境元数据,这些元数据用于构建与数据采集时相同的仿真环境
- 第一条示范数据的结构
传递 --verbose
参数可以打印出每个 filter key 下的示范键列表,以及所有示范的完整数据集结构。下面是一个示例,使用的是仓库中提供的一个小型 hdf5 数据集 tests/assets/test.hdf5
。
$ python get_dataset_info.py --dataset ../../tests/assets/test.hdf5
任何希望编写自定义代码与 robomimic 数据集配合使用的用户,都应查看 jupyter notebook,该 notebook 展示了与 robomimic hdf5 数据集交互的多个有用 Python 代码片段。
可视化数据集轨迹
注意:这些示例兼容任何 robomimic 数据集。
本节中的示例使用的是仓库中提供的小型 hdf5 数据集 tests/assets/test.hdf5
,但你可以使用任何 robomimic hdf5 数据集运行这些示例。如果使用的是默认数据集,请确保 robosuite 在 offline_study
分支上运行 – 这是确保回放脚本正确运行的必要条件。
使用 playback_dataset.py
脚本轻松查看数据集轨迹。
# 对前5条轨迹,逐一加载环境模拟器状态,并渲染“agentview”和“robot0_eye_in_hand”相机的视频到 /tmp/playback_dataset.mp4
$ python playback_dataset.py --dataset ../../tests/assets/test.hdf5 --render_image_names agentview robot0_eye_in_hand --video_path /tmp/playback_dataset.mp4 --n 5
# 直接可视化数据集中的图像观测数据。这对于没有仿真器进行渲染的真实机器人数据集特别有用。
$ python playback_dataset.py --dataset ../../tests/assets/test.hdf5 --use-obs --render_image_names agentview_image --video_path /tmp/obs_trajectory.mp4
# 在环境中播放数据集中的动作,验证记录的动作是否合理。
$ python playback_dataset.py --dataset ../../tests/assets/test.hdf5 --use-actions --render_image_names agentview --video_path /tmp/playback_dataset_with_actions.mp4
# 仅可视化初始示范帧。
$ python playback_dataset.py --dataset ../../tests/assets/test.hdf5 --first --render_image_names agentview --video_path /tmp/dataset_task_inits.mp4
hyperparam_scan & generator
我们提供了 ConfigGenerator
类,位于 utils/hyperparam_utils.py
,以便轻松设置并在超参数上进行扫描。
这是使用本仓库启动多个训练运行的首选方式。
按照以下步骤运行您自己的超参数扫描:
- 创建基础配置 json
- 创建配置生成器
- 设置超参数值
- 运行超参数助手脚本
Step 1:创建基础配置 json
第一步是从基础配置 json 开始。常见的做法是将 exps/templates
中的一个模板(如 exps/templates/bc.json
)复制到一个新文件夹中(该文件夹将生成额外的配置 json)。
$ cp ../exps/templates/bc.json /tmp/gen_configs/base.json
Relevant settings in base json file
如果配置中有不参与扫描且与模板中的默认值没有不同的部分,可以根据需要省略它们。
我们修改了 /tmp/gen_configs/base.json
,添加了基础实验名称("bc_rnn_hyper"
)并指定了数据集路径("/tmp/test.hdf5"
)。
$ cat /tmp/gen_configs/base.json
Step 2: Create Config Generator
下一步是创建一个 ConfigGenerator
对象,该对象会根据每个唯一的超参数组合生成新的配置文件。我们在 scripts/hyperparam_helper.py
中提供了一个示例,接下来我们将一步步介绍如何使用这个脚本。
首先,我们定义一个 make_generator
函数来创建一个 ConfigGenerator
对象。之后,我们将进行设置超参数值的操作。
import robomimic
import robomimic.utils.hyperparam_utils as HyperparamUtils
def make_generator(config_file, script_file):
"""
实现此函数以设置你自己的超参数扫描!
"""
generator = HyperparamUtils.ConfigGenerator(
base_config_file=config_file, script_file=script_file
)
# 接下来:设置并扫描超参数
generator.add_param(...) # 设置 / 扫描 超参数1
generator.add_param(...) # 设置 / 扫描 超参数2
generator.add_param(...) # 设置 / 扫描 超参数3
...
return generator
def main(args):
# 创建配置生成器
generator = make_generator(
config_file=args.config, # 步骤1中的基础配置文件
script_file=args.script # 步骤4中解释的脚本
)
# 生成json文件和脚本
generator.generate()
...
Step 3: Set Hyperparameter Values
接下来,我们使用 generator.add_param
函数来设置超参数值,该函数接受以下参数:
key
: (string) 配置键的完整名称,用于扫描name
: (string) 该键的简写名称values
: (list) 要扫描的值的列表value_names
(list)(可选)与values
中每个值相关联的简写名称group
: (integer) 超参数组标识符。具有相同组的超参数将一起扫描。不同组的超参数将作为笛卡尔积扫描
设置固定值
回到我们的示例,我们首先设置一些固定的超参数值。
我们本可以直接修改基础配置文件,但选择在生成器函数中设置这些值。
在本例中,我们希望使用 BC-RNN 算法并设置 RNN horizon 为 10。这要求我们设置 config.train.seq_length = 10
和 config.algo.rnn.enabled = True
。
# 使用 RNN,并设置 horizon 为 10
generator.add_param(
key="algo.rnn.enabled",
name="",
group=0,
values=[True],
)
generator.add_param(
key="train.seq_length",
name="",
group=0,
values=[10],
)
generator.add_param(
key="algo.rnn.horizon",
name="",
group=0,
values=[10],
)
空的超参数名称
将 name=""
留空确保这些超参数的值不会决定实验名称。
仅在你扫描单个值时使用这种方式!
定义超参数扫描值
现在我们定义我们的扫描。我们想扫描以下超参数:
- policy learning rate,范围为 [1e-3, 1e-4]
- 是否使用 GMM 策略
- 是否使用 RNN 维度为 400 且 MLP 尺寸为 (1024, 1024),或者 RNN 维度为 1000 且没有 MLP
注意,学习率位于 group
1,GMM 启用参数位于 group
2,而 RNN 维度和 MLP 层维度都位于 group
3。
一起扫描超参数
我们将 RNN 维度和 MLP 层维度放在同一组中,以确保这些参数一起变化(RNN 维度 400 总是与 MLP 层维度 (1024, 1024) 配对,RNN 维度 1000 总是与空的 MLP 配对)。
# 学习率 - 1e-3, 1e-4
generator.add_param(
key="algo.optim_params.policy.learning_rate.initial",
name="plr",
group=1,
values=[1e-3, 1e-4],
)
# GMM 是否启用
generator.add_param(
key="algo.gmm.enabled",
name="gmm",
group=2,
values=[True, False],
value_names=["t", "f"],
)
# RNN 维度 400 + MLP 尺寸 (1024, 1024) vs. RNN 维度 1000 + 空的 MLP 尺寸
generator.add_param(
key="algo.rnn.hidden_dim",
name="rnnd",
group=3,
values=[
400,
1000,
],
)
generator.add_param(
key="algo.actor_layer_dims",
name="mlp",
group=3,
values=[
[1024, 1024],
[],
],
value_names=["1024", "0"],
)
Step 4: 运行超参数助手脚本
最后,我们运行超参数助手脚本(其中包含我们之前定义的函数)。
$ python hyperparam_helper.py --config /tmp/gen_configs/base.json --script /tmp/gen_configs/out.sh
所有生成的配置文件已被添加到 /tmp/gen_configs
中,并且还生成了一个有用的 Bash 脚本,可以用来启动你的训练任务。
$ cat /tmp/gen_configs/out.sh
#!/bin/bash
python train.py --config /tmp/gen_configs/bc_rnn_hyper_plr_0.001_gmm_t_rnnd_400_mlp_1024.json
python train.py --config /tmp/gen_configs/bc_rnn_hyper_plr_0.001_gmm_t_rnnd_1000_mlp_0.json
python train.py --config /tmp/gen_configs/bc_rnn_hyper_plr_0.001_gmm_f_rnnd_400_mlp_1024.json
python train.py --config /tmp/gen_configs/bc_rnn_hyper_plr_0.001_gmm_f_rnnd_1000_mlp_0.json
python train.py --config /tmp/gen_configs/bc_rnn_hyper_plr_0.0001_gmm_t_rnnd_400_mlp_1024.json
python train.py --config /tmp/gen_configs/bc_rnn_hyper_plr_0.0001_gmm_t_rnnd_1000_mlp_0.json
python train.py --config /tmp/gen_configs/bc_rnn_hyper_plr_0.0001_gmm_f_rnnd_400_mlp_1024.json
python train.py --config /tmp/gen_configs/bc_rnn_hyper_plr_0.0001_gmm_f_rnnd_1000_mlp_0.json
observation
robomimic 提供了一个灵活的框架来处理多模态观测。以下是观测数据的管理方式以及如何修改或添加自定义模态的说明:
观测组件:
-
键(Keys):单独的数据元素(例如
rgb_wrist
、eef_pos
、joint_vel
),代表来自环境的观测数据。 -
模态(Modalities):观测数据的类别。默认支持四种模态:
low_dim
:低维状态(例如关节位置或速度)。rgb
:RGB 图像数据。depth
:深度图像数据。scan
:扫描数组(例如 LiDAR 数据)。
-
组(Groups):由多个模态和每个模态的键组成的集合,通常表示环境的不同方面。例如,
obs
组包含常规的观测数据,而goal
和subgoal
组则包含与目标相关的模态。
观测流:
- 每个键会被映射到其对应的模态类。
- 同一模态下的所有观测数据会被串联,并通过该模态的
ObservationEncoder
进行处理。 - 所有模态处理后的观测数据会被串联在一起,传递给学习模型。
修改和添加自定义模态:
要修改或添加观测模态,可以按以下步骤进行:
- 配置编码器:在
Config
类中修改相应模态的encoder
参数,指定如何处理数据。 - 自定义模态:对于自定义模态(例如语义分割、光流等),可以参考提供的 示例脚本,定义新的模态及其处理方式。
通过扩展支持的模态或创建自定义模态,你可以使 robomimic 适应任何形式的多模态输入,满足你的任务需求。
pretrained_representations
Robomimic 支持多种预训练视觉表征,并提供了集成功能,以便将观测编码器调整为所需的预训练视觉表征编码器。
术语
首先,让我们澄清在使用不同预训练视觉表征时的语义区别:
- 骨干类(Backbone Classes):指各种预训练视觉编码器。例如,
R3MConv
和MVPConv
是用于使用 R3M 和 MVP 预训练表征的骨干类。 - 模型类(Model Classes):指每个选定骨干类中的不同大小的预训练模型。例如,
R3MConv
有三个模型类——resnet18
、resnet34
和resnet50
;而MVPConv
有五个模型类——vits-mae-hoi
、vits-mae-in
、vits-sup-in
、vitb-mae-egosoup
和vitl-256-mae-egosoup
。
示例
使用预训练视觉表征非常简单。每个预训练编码器由其 backbone_class
、model_class
和是否 freeze
表征(冻结表示或微调)来定义。请注意,您可能需要参考预训练表征的原始库以获取安装说明。
如果你是通过代码指定配置(如 examples/train_bc_rnn.py
中所示),以下是使用预训练表征的代码示例:
# R3M
config.observation.encoder.rgb.core_kwargs.backbone_class = 'R3MConv' # R3M backbone for image observations (unused if no image observations)
config.observation.encoder.rgb.core_kwargs.backbone_kwargs.r3m_model_class = 'resnet18' # R3M model class (resnet18, resnet34, resnet50)
config.observation.encoder.rgb.core_kwargs.backbone_kwargs.freeze = True # whether to freeze network during training or allow finetuning
config.observation.encoder.rgb.core_kwargs.pool_class = None # no pooling class for pretraining model
# MVP
config.observation.encoder.rgb.core_kwargs.backbone_class = 'MVPConv' # MVP backbone for image observations (unused if no image observations)
config.observation.encoder.rgb.core_kwargs.backbone_kwargs.mvp_model_class = 'vitb-mae-egosoup' # MVP model class (vits-mae-hoi, vits-mae-in, vits-sup-in, vitb-mae-egosoup, vitl-256-mae-egosoup)
config.observation.encoder.rgb.core_kwargs.backbone_kwargs.freeze = True # whether to freeze network during training or allow finetuning
config.observation.encoder.rgb.core_kwargs.pool_class = None # no pooling class for pretraining model
或者,如果您使用的是配置 json,则可以在 json 中设置适当的键。
Tensor Collections
TensorUtils
大多数 robomimic 模型都在嵌套的张量字典上进行操作,既包括输入张量,也包括训练标签。我们提供了一系列实用工具,用于处理这些字典,位于 robomimic.utils.tensor_utils
中。例如,给定一个 numpy 字典的观察值:
import numpy as np
x = {
'image': np.random.randn(3, 224, 224),
'proprio': {
'eef_pos': np.random.randn(3),
'eef_rot': np.random.randn(3)
}
}
例如,我们可以使用 robomimic.utils.tensor_utils
将它们转换为 PyTorch 张量,添加批次维度,并将它们发送到 GPU:
import torch
import robomimic.utils.tensor_utils as TensorUtils
# 将嵌套字典或列表或元组中的所有 numpy 数组转换为 torch 张量
x = TensorUtils.to_tensor(x)
# 为字典中的所有张量添加一个批次维度
x = TensorUtils.to_batch(x)
# 将所有嵌套张量发送到 GPU(如果可用)
x = TensorUtils.to_device(x, torch.device("cuda:0"))
库还支持对嵌套字典进行非平凡的形状操作。例如:
# 在 dim=1 处创建一个新维度,并将维度大小扩展为 10
x = TensorUtils.unsqueeze_expand_at(x, size=10, dim=1)
# x["rgb"].shape == torch.Size([1, 10, 3, 224, 224])
# 将第 0 维重复 10 次
x = TensorUtils.repeat_by_expand_at(x, repeats=10, dim=0)
# x["rgb"].shape == torch.Size([10, 10, 3, 224, 224])
# 按某个索引收集序列维度(dim=1)
x = TensorUtils.gather_sequence(x_seq, indices=torch.arange(10))
# x["rgb"].shape == torch.Size([10, 3, 224, 224])
此外,map_tensor
允许将任意函数应用于嵌套字典或张量列表中的所有张量,并返回相同的嵌套结构。
x = TensorUtils.map_tensor(x, your_func)
robomimic.utils.obs_utils
实现了一套用于预处理不同观察模态的工具函数,例如图像处理,并提供了确定观察类型的功能,以便创建适合的编码器网络架构。以下是一些重要的函数。
-
initialize_obs_utils_with_obs_specs(obs_modality_specs)
该函数初始化一个全局注册表,用于映射观察键名和观察模态之间的关系,例如哪些是低维的,哪些是RGB图像。例如,给定以下格式的
obs_modality_specs
:
{
"obs": {
"low_dim": ["robot0_eef_pos", "robot0_eef_quat"],
"rgb": ["agentview_image", "robot0_eye_in_hand"],
},
"goal": {
"low_dim": ["robot0_eef_pos"],
"rgb": ["agentview_image"]
}
}
该函数会在观察名称(例如 'agentview_image'
)和观察模态(例如 'rgb'
)之间创建映射。该注册表存储在 OBS_MODALITIES_TO_KEYS
中,并可以全局访问。诸如 key_is_obs_modality()
等实用函数依赖于此全局注册表来确定观察模态。
-
process_obs(obs_dict)
该函数对传入的观察字典进行预处理,以便输入到神经网络。例如,图像观察会被转换为
float
格式,重新缩放到[0-1]
之间,并转换为[C, H, W]
格式。 -
unprocess_obs(obs_dict)
该函数会撤销
process_obs
应用的预处理转换。此功能用于将图像转换回uint8
格式,以便高效存储。 -
normalize_obs(obs_dict, obs_normalization_stats)
该函数通过计算每个观察的均值和标准差(在每个维度和观察键中),并在每个维度内对单位均值和方差进行归一化,来对观察数据进行归一化处理。
Using Pretrained Models
本教程展示了如何使用预训练模型检查点。
本教程接下来的内容将展示如何使用实用脚本加载并执行训练过的策略。如果你希望通过交互式笔记本操作,请参考 jupyter notebook,该教程展示了如何从模型库下载检查点,使用 pytorch 加载检查点,并执行策略。
评估训练过的策略
保存在 models
目录中的策略检查点可以通过 run_trained_agent.py
脚本进行评估:
# 50 次滚动,最大时间步长 400,并将 agentview 和 wrist camera 图像渲染成视频
$ python run_trained_agent.py --agent /path/to/model.pth --n_rollouts 50 --horizon 400 --seed 0 --video_path /path/to/output.mp4 --camera_names agentview robot0_eye_in_hand
# 将滚动结果写入新的数据集 hdf5 文件
python run_trained_agent.py --agent /path/to/model.pth --n_rollouts 50 --horizon 400 --seed 0 --dataset_path /path/to/output.hdf5 --dataset_obs
# 将滚动结果(无显式观察)写入 hdf5 文件
python run_trained_agent.py --agent /path/to/model.pth --n_rollouts 50 --horizon 400 --seed 0 --dataset_path /path/to/output.hdf5