自动驾驶---Parking端到端架构
1 背景
自动泊车也是智能驾驶低速功能中比较重要的一部分,低速功能其中还包括记忆泊车,代客泊车等。传统的泊车算法通常使用基于规则或者搜索优化的方案来实现。然而,由于算法的复杂设计,这些方法在复杂的泊车场景中效果有限,或者说上限不高。相比之下,基于神经网络的方法往往比基于规则的方法更直观、更通用。通过采集大量专家泊车轨迹数据并且通过基于学习的方法模拟人类策略,也可以有效地完成泊车任务。
传统的泊车算法通常基于多个模块,需要将整个停车过程分解为多个阶段,如环境感知、车位检测、定位、建图,路径规划以及控制等,复杂的模型架构导致其在紧凑的停车位或复杂场景中容易遇到困难。
为什么要先介绍泊车端到端呢?因为这篇论文非常详细地介绍了感知---规控两段式端到端的架构,非常清晰,并且论文作者也开源了代码,对于初入门端到端的读者更加友好,行车的两段式端到端也是类似的架构,只不过训练的数据集差别会比较大。
2 端到端架构
采用模仿学习来执行从RGB图像到路径规划的端到端规划,并且模仿人类驾驶的轨迹。本篇博客介绍的端到端方法利用了目标查询编码器来融合图像和目标特征,并且使用基于Transformer的解码器来自回归预测未来的路径点。整个泊车端到端架构如下所示,虽然看起来比较简单,但其中涉及到的环节还是比较多的。
上面这一套框架其实也适用于行车,只不过输入来源和训练的数据不同。在泊车功能上,关注更多的是环视摄像头的数据,而在行车功能上,更多使用的是其它的摄像头的信息,并且训练的主要是行车的人驾数据,像特斯拉一样的话(终极状态:行泊共用一个模型),那就是行泊的人驾数据一起训练。因为论文作者在该端到端方案的介绍上比较详细,有兴趣的读者可以了解模型的架构及整个端到端研发的流程,不管是行车,还是泊车都是比较有帮助的。
2.1 模型架构
模型架构看起来比较简单(如下图所示),但其中串的小模型比较多,也并不是一段式端到端,这一点需要读者注意。
- 数据预处理:首先将环视摄像头捕获的图像作为输入,并对图像进行必要的预处理,如裁剪、归一化等操作,以适应网络的输入要求。
- 特征提取与融合:这个地方涉及两个方面,如下。
- Camera Encoder:通过EfficientNet方法提取图像特征,再通过LSS方法提取深度信息,最终可得到BEV(鸟瞰图)表示形式。
- Target Encoder:结合BEV信息和停车位的目标信息进行编码,得到目标特征。
- 特征融合:然后,利用目标查询编码器将目标特征与图像特征进行融合(使用了Transormer中经典的交叉注意力机制),使网络能够更好地理解图像中的停车场景和目标信息。
- 轨迹预测:基于融合后的特征,使用基于 Transformer 的解码器以自回归的方式预测未来的航点。在预测过程中,解码器会根据之前预测的航点以及输入的特征信息,逐步生成后续的轨迹点,从而得到完整的停车轨迹。有以下两点优势:
- 由于轨迹点的顺序特性,采用基于Transformer解码器的自回归方法来生成轨迹点,能够更好地捕捉轨迹的动态变化和长期依赖关系,提高了轨迹预测的准确性;
- 通过交叉注意力机制,网络能够有效地将相机特征和目标特征结合起来,以生成更准确的路径规划。
- 控制执行:根据预测的航点,与车辆的控制器协作,将车辆操纵到指定的停车位中,直到车辆完全停放好。
整个代码也比较清晰,基本围绕上面描述的框架执行,主文件位于parking_model_real.py中,代码结构:
先看整个模型的主文件:
import torch
from torch import nn
from model_interface.model.bev_encoder import BevEncoder, BevQuery
from model_interface.model.gru_trajectory_decoder import GRUTrajectoryDecoder
from model_interface.model.lss_bev_model import LssBevModel
from model_interface.model.trajectory_decoder import TrajectoryDecoder
from utils.config import Configuration
class ParkingModelReal(nn.Module):
def __init__(self, cfg: Configuration):
super().__init__()
self.cfg = cfg
# Camera Encoder
self.lss_bev_model = LssBevModel(self.cfg)
self.image_res_encoder = BevEncoder(in_channel=self.cfg.bev_encoder_in_channel)
# Target Encoder
self.target_res_encoder = BevEncoder(in_channel=1)
# BEV Query
self.bev_query = BevQuery(self.cfg)
# Trajectory Decoder
self.trajectory_decoder = self.get_trajectory_decoder()
def forward(self, data):
# Encoder
bev_feature, pred_depth, bev_target = self.encoder(data, mode="train")
# Decoder
pred_traj_point = self.trajectory_decoder(bev_feature, data['gt_traj_point_token'].cuda())
return pred_traj_point, pred_depth, bev_target
def predict_transformer(self, data, predict_token_num):
# Encoder
bev_feature, pred_depth, bev_target = self.encoder(data, mode="predict")
# Auto Regressive Decoder
autoregressive_point = data['gt_traj_point_token'].cuda() # During inference, we regard BOS as gt_traj_point_token.
for _ in range(predict_token_num):
pred_traj_point = self.trajectory_decoder.predict(bev_feature, autoregressive_point)
autoregressive_point = torch.cat([autoregressive_point, pred_traj_point], dim=1)
return autoregressive_point, pred_depth, bev_target
def predict_gru(self, data):
# Encoder
bev_feature, _, _ = self.encoder(data, mode="predict")
# Decoder
autoregressive_point = self.trajectory_decoder(bev_feature).squeeze()
return autoregressive_point
def encoder(self, data, mode):
# Camera Encoder
images = data['image'].to(self.cfg.device, non_blocking=True)
intrinsics = data['intrinsics'].to(self.cfg.device, non_blocking=True)
extrinsics = data['extrinsics'].to(self.cfg.device, non_blocking=True)
bev_camera, pred_depth = self.lss_bev_model(images, intrinsics, extrinsics)
bev_camera_encoder = self.image_res_encoder(bev_camera, flatten=False)
# Target Encoder
target_point = data['fuzzy_target_point'] if self.cfg.use_fuzzy_target else data['target_point']
target_point = target_point.to(self.cfg.device, non_blocking=True)
bev_target = self.get_target_bev(target_point, mode=mode)
bev_target_encoder = self.target_res_encoder(bev_target, flatten=False)
# Feature Fusion
bev_feature = self.get_feature_fusion(bev_target_encoder, bev_camera_encoder)
bev_feature = torch.flatten(bev_feature, 2)
return bev_feature, pred_depth, bev_target
def get_target_bev(self, target_point, mode):
h, w = int((self.cfg.bev_y_bound[1] - self.cfg.bev_y_bound[0]) / self.cfg.bev_y_bound[2]), int((self.cfg.bev_x_bound[1] - self.cfg.bev_x_bound[0]) / self.cfg.bev_x_bound[2])
b = self.cfg.batch_size if mode == "train" else 1
# Get target point
bev_target = torch.zeros((b, 1, h, w), dtype=torch.float).to(self.cfg.device, non_blocking=True)
x_pixel = (h / 2 + target_point[:, 0] / self.cfg.bev_x_bound[2]).unsqueeze(0).T.int()
y_pixel = (w / 2 + target_point[:, 1] / self.cfg.bev_y_bound[2]).unsqueeze(0).T.int()
target_point = torch.cat([x_pixel, y_pixel], dim=1)
# Add noise
if self.cfg.add_noise_to_target and mode == "train":
noise_threshold = int(self.cfg.target_noise_threshold / self.cfg.bev_x_bound[2])
noise = (torch.rand_like(target_point, dtype=torch.float) * noise_threshold * 2 - noise_threshold).int()
target_point += noise
# Get target point tensor in the BEV view
for batch in range(b):
bev_target_batch = bev_target[batch][0]
target_point_batch = target_point[batch]
range_minmax = int(self.cfg.target_range / self.cfg.bev_x_bound[2])
bev_target_batch[target_point_batch[0] - range_minmax: target_point_batch[0] + range_minmax + 1,
target_point_batch[1] - range_minmax: target_point_batch[1] + range_minmax + 1] = 1.0
return bev_target
def get_feature_fusion(self, bev_target_encoder, bev_camera_encoder):
if self.cfg.fusion_method == "query":
bev_feature = self.bev_query(bev_target_encoder, bev_camera_encoder)
elif self.cfg.fusion_method == "plus":
bev_feature = bev_target_encoder + bev_camera_encoder
elif self.cfg.fusion_method == "concat":
concat_feature = torch.concatenate([bev_target_encoder, bev_camera_encoder], dim=1)
conv = nn.Conv2d(512, 256, kernel_size=3, stride=1, padding=1, bias=False).cuda()
bev_feature = conv(concat_feature)
else:
raise ValueError(f"Don't support fusion_method '{self.cfg.fusion_method}'!")
return bev_feature
def get_trajectory_decoder(self):
if self.cfg.decoder_method == "transformer":
trajectory_decoder = TrajectoryDecoder(self.cfg)
elif self.cfg.decoder_method == "gru":
trajectory_decoder = GRUTrajectoryDecoder(self.cfg)
else:
raise ValueError(f"Don't support decoder_method '{self.cfg.decoder_method}'!")
return trajectory_decoder
Parking Model里面的小模型(Camera Encoder,Target Encoder,BEV Query,Trajectory Decoder)代码也比较清晰,如下Trajectory Decoder所示,更多的是模型内部的结构,偏理论一些,比如embedding,mask,softmax等,都是Transformer里的基本元素,后面笔者也会单独针对小模型的结构再详细说明。
import torch
from torch import nn
from timm.models.layers import trunc_normal_
from utils.config import Configuration
class TrajectoryDecoder(nn.Module):
def __init__(self, cfg: Configuration):
super().__init__()
self.cfg = cfg
self.PAD_token = self.cfg.token_nums + self.cfg.append_token - 1
self.embedding = nn.Embedding(self.cfg.token_nums + self.cfg.append_token, self.cfg.tf_de_dim)
self.pos_drop = nn.Dropout(self.cfg.tf_de_dropout)
item_cnt = self.cfg.autoregressive_points
self.pos_embed = nn.Parameter(torch.randn(1, self.cfg.item_number*item_cnt + 2, self.cfg.tf_de_dim) * .02)
tf_layer = nn.TransformerDecoderLayer(d_model=self.cfg.tf_de_dim, nhead=self.cfg.tf_de_heads)
self.tf_decoder = nn.TransformerDecoder(tf_layer, num_layers=self.cfg.tf_de_layers)
self.output = nn.Linear(self.cfg.tf_de_dim, self.cfg.token_nums + self.cfg.append_token)
self.init_weights()
def init_weights(self):
for name, p in self.named_parameters():
if 'pos_embed' in name:
continue
if p.dim() > 1:
nn.init.xavier_uniform_(p)
trunc_normal_(self.pos_embed, std=.02)
def create_mask(self, tgt):
tgt_mask = (torch.triu(torch.ones((tgt.shape[1], tgt.shape[1]), device=self.cfg.device)) == 1).transpose(0, 1)
tgt_mask = tgt_mask.float().masked_fill(tgt_mask == 0, float('-inf')).masked_fill(tgt_mask == 1, float(0.0))
tgt_padding_mask = (tgt == self.PAD_token)
return tgt_mask, tgt_padding_mask
def decoder(self, encoder_out, tgt_embedding, tgt_mask, tgt_padding_mask):
encoder_out = encoder_out.transpose(0, 1)
tgt_embedding = tgt_embedding.transpose(0, 1)
pred_traj_points = self.tf_decoder(tgt=tgt_embedding,
memory=encoder_out,
tgt_mask=tgt_mask,
tgt_key_padding_mask=tgt_padding_mask)
pred_traj_points = pred_traj_points.transpose(0, 1)
return pred_traj_points
def forward(self, encoder_out, tgt):
tgt = tgt[:, :-1]
tgt_mask, tgt_padding_mask = self.create_mask(tgt)
tgt_embedding = self.embedding(tgt)
tgt_embedding = self.pos_drop(tgt_embedding + self.pos_embed)
pred_traj_points = self.decoder(encoder_out, tgt_embedding, tgt_mask, tgt_padding_mask)
pred_traj_points = self.output(pred_traj_points)
return pred_traj_points
def predict(self, encoder_out, tgt):
length = tgt.size(1)
padding_num = self.cfg.item_number * self.cfg.autoregressive_points + 2 - length
offset = 1
if padding_num > 0:
padding = torch.ones(tgt.size(0), padding_num).fill_(self.PAD_token).long().to('cuda')
tgt = torch.cat([tgt, padding], dim=1)
tgt_mask, tgt_padding_mask = self.create_mask(tgt)
tgt_embedding = self.embedding(tgt)
tgt_embedding = tgt_embedding + self.pos_embed
pred_traj_points = self.decoder(encoder_out, tgt_embedding, tgt_mask, tgt_padding_mask)
pred_traj_points = self.output(pred_traj_points)[:, length - offset, :]
pred_traj_points = torch.softmax(pred_traj_points, dim=-1)
pred_traj_points = pred_traj_points.argmax(dim=-1).view(-1, 1)
return pred_traj_points
2.2 训练
首先是数据采集,高校在这一点上比较吃亏,没有丰富的训练数据集,车企在这一方面有着先天的优势,每天手动泊车的驾驶员还是比较多的,因此搜集人驾泊车的数据会比较容易(行车数据的收集类似)。
论文中数据集是使用车载设备收集的。为了促进全面的视觉感知和轨迹,采用环视相机来捕获 RGB 图像。同时,集成了航位推算技术,利用传感器数据融合算法实现稳健和准确的车辆定位。数据是在各种停车场景中收集的,包括地下和地面车库,如下图所示,从不同的环境中收集数据有助于增强神经网络的泛化能力。
在训练过程中,使用环视摄像头图像(一般环视摄像头的数量就是 4 个)作为输入,目标停车位由停车结束时的点来确定;采集到的人驾数据轨迹序列点用于监督端到端预测结果。
2.3 评价标准
评价标准主要从两个层面来评价,分别是模型评价和实车测试结果评价。
(1)模型评价
-
Hausdorff Distance(豪斯多夫距离):用于衡量预测轨迹与真实轨迹之间的相似度。豪斯多夫距离越小,表示预测轨迹与真实轨迹越接近。
-
L2 Distance(L2距离):另一种衡量预测轨迹与真实轨迹之间差异的方法,L2距离越小,表示预测的准确性越高。
-
四次差异(Four. Diff.):该指标可能指的是预测轨迹与真实轨迹在四个不同维度上的差异,具体维度未在摘要中详述,但可以推测可能包括位置、方向等关键参数。
(2)实车评价
-
泊车成功率(PSR):论文中提到,在四个不同的真实车库中进行的实验结果表明,所提出的方法平均泊车成功率达到了87.8%。
-
无时段率(NSR):未能在指定停车位停车的比率。
-
违章停车率(PVR):违章停车率是指车辆略微超出指定停车位而不阻碍或阻碍相邻停车位的情况。
-
平均位置误差(APE):平均位置误差是成功停车时目标停车位置与自主车辆停止位置之间的平均距离。
-
平均方向误差(AOE):平均方向误差是成功停车时自主车辆的目标停车方向与停止方向之间的平均差。
-
平均停车分数(APS):平均停车分数是通过综合评估计算得出的,其中包括停车过程中的位置误差、方向误差和成功率。分数分布在 0 到 100 之间。
-
平均停车时间(APT):多次停车操作的平均停车持续时间。停车持续时间是从停车模式启动的那一刻开始计算的,直到车辆成功停在指定空间,或者停车过程因异常或故障而终止。
3 总结
论文中提到的端到端方法和高度优化的基于规则的泊车方法之间仍然存在性能差距,在复杂场景的泊车效率以及成功率可能不如基于规则的泊车成功率。
随着技术的发展,端到端肯定是未来自动驾驶或者具身智能的一个大方向。如何进一步提升端到端算法的性能,笔者相信在模型结构及数据训练上将会成为焦点。作为安全兜底或者作为端到端的备份,传统的规控还有其相对应的价值存在。
参考论文:《ParkingE2E: Camera-based End-to-end Parking Network, from Images to Planning》