ultralytics实现DeepSort之级联匹配
前面博客中说过,通过Market-1501数据集的训练后可以得到特征提取模型,这个模型最终的输出值为(bs,num_class)
,但在DeepSort算法应用中的输出结果并非如此,其输出的是特征信息。
特征提取
通过训练后的特征提取模型即可完成特征提取的功能,最终输出结果为特征,其维度为(16,512)
具体过程如下:
- 利用
feature_extractor_test.pt
提取图像中的检测框,这个feature_extractor_test.pt中保存的是标注信息(纯标注信息)该过程是模拟检测模型的目标检测过程
- 将这些标注框裁剪,变为一个个的标注框内容,共16个,效果参考如下,当然并不是真的裁剪
- 使用Net加载训练好的权重文件,提取特征,这里调用网络时传入了reid参数,代表去除分类头,只要特征提取结果
self.net = Net(num_classes=num_classes, reid=reid)
if self.reid:
x = x.div(x.norm(p=2,dim=1,keepdim=True)) # x.norm=(x1^p+...+xn^p)^(1/p)
return x
# classifier
x = self.classifier(x)
完整代码如下:
import torch
import torchvision.transforms as transforms
import numpy as np
import cv2
import logging
from model import Net # 执行 predict.py 用这个
# from model import Net # 执行 feature_extractor.py用这个
#把传入的n个小图统一尺寸,变成可以送入神经网络的一个批次数据n*3*128*64,
#然后再用一个已经训练好的神经网络,把n*3*128*64变成n*512的特征
class Extractor(object):
def __init__(self, model_path, use_cuda=True,num_classes=751,reid=True):
self.net = Net(num_classes=num_classes, reid=reid)
self.device = "cuda" if torch.cuda.is_available() and use_cuda else "cpu"
state_dict = torch.load(model_path, map_location=lambda storage, loc: storage)['net_dict']
self.net.load_state_dict(state_dict)
logger = logging.getLogger("root.tracker")
logger.info("Loading weights from {}... Done!".format(model_path))
self.net.to(self.device)
self.size = (64, 128)#图像统一resize为:h128 w64
self.norm = transforms.Compose([#组合操作
# [0,255]的数据变张量, 例如原来是128*64*3 变为3*128*64 还要归一化到[0,1]
transforms.ToTensor(),
# rgb三个通道的数据用公式(img-mean)/std将数据归一化到[-1,1]
# 其中,均值和方差分别如下 mean=[0.485, 0.456, 0.406] and std=[0.229, 0.224, 0.225]
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
])
def _preprocess(self, im_crops):
"""
TODO:
1. to float with scale from 0 to 1
2. resize to (64, 128) as Market1501 dataset did
3. concatenate to a numpy array
3. to torch Tensor
4. normalize
"""
def _resize(im, size):
return cv2.resize(im.astype(np.float32)/255., size)
#先统一变成 128*64*3 再变成3*128*64,再变成1*3*128*64, 然后拼接为n*3*128*64
im_batch = torch.cat([self.norm(_resize(im, self.size)).unsqueeze(0) for im in im_crops], dim=0).float()
return im_batch
#当把类的实例名字作为一个函数进行调用时,默认调用此函数
def __call__(self, im_crops):
im_batch = self._preprocess(im_crops) #im_batch n*3*128*64
with torch.no_grad():
im_batch = im_batch.to(self.device)
features = self.net(im_batch) #n*512
return features.cpu().numpy()
def _xywh_to_xyxy( bbox_xywh):
x, y, w, h = bbox_xywh
x1 = int(x - w / 2)
x2 = int(x + w / 2)
y1 = int(y - h / 2)
y2 = int(y + h / 2)
return x1, y1, x2, y2
if __name__ == '__main__':
img=cv2.imread("feature_extractor_test.jpg")
bbox_xywh =torch.load('feature_extractor_test.pt')
im_crops = []
# 从原图org_img抠出n个小图,这n个小图是识别出来的矩形框里面的内容
for box in bbox_xywh:
x1, y1, x2, y2 = _xywh_to_xyxy(box)
im = img[y1:y2, x1:x2]
im_crops.append(im)
cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.imshow('modeltest', img)
cv2.waitKey(0)
cv2.destroyAllWindows()
extractor = Extractor("checkpoint/ckpt.t7")
feature = extractor(im_crops)
print(feature.shape)
至此,我们便完成了DeepSort算法中的Deep阶段了,即特征提取模块的训练,需要注意的是由于我们的数据集使用的是行人,所以它也只能用于行人目标追踪。
DeepSort方法流程
为了更好的学习DeepSort方法,我们依旧采用Debug的方式来查看其实现流程。先来了解其基本流程:
-
初始化Tracks:在视频的第一帧中,基于目标检测器的输出,为每个检测到的目标创建一个新的Tracks,并将其状态设为
Unconfirmed
。此时,由于是第一帧,所以所有的Tracks
都处于unconfirmed
状态。 -
预测Tracks:使用卡尔曼滤波器预测Tracks在下一帧中的位置和速度。
-
目标检测(Detections):在每一帧中,目标检测器识别出该帧中所有目标的检测框。
-
IOU匹配:计算上一帧预测的Tracks与当前帧Detections之间的IOU,并基于此构建代价矩阵。
-
匹配与更新:
使用匈牙利算法对代价矩阵进行优化匹配,以最小化匹配的总代价。
对于匹配成功的Tracks,使用卡尔曼滤波器进行状态更新(这里的状态更新指的是位置)。
对于未匹配的Tracks(Unmatched Tracks),直接删除这些失配的Tracks(如果Tracks是确认态,则需要连续达到一定次数(默认30次)才能删除)。
对于未匹配的Detections(Unmatched Detections),将这些失配的Detections初始化为新的Tracks。
-
循环执行(2)-(5)步骤,直到出现确认态(confirmed)的Tracks或者视频帧结束。
-
级联匹配:
对于Confirmed状态的Tracks,利用外观特征进行更精确的级联匹配。
第一种是Tracks匹配,通过卡尔曼滤波更新相应的Tracks变量。
第二种和第三种是Detections和Tracks失配,将之前的不确定态的Tracks和失配的Tracks与Unmatched Detections逐一进行IOU匹配,再根据匹配度计算代价矩阵。
-
线性匹配结果:
将(7)中得到的所有代价矩阵作为匈牙利算法的输入,得到线性匹配的结果。
第一种是Tracks失配(Unmatched Tracks),直接删除这些失配的Tracks(如果Tracks是确认态,则需要连续达到一定次数(默认30次)才能删除)。
第二种是Detections失配(Unmatched Detections),将这些失配的Detections初始化为新的Tracks。
第三种是检测框和预测的框框成功配对,表示前一帧和后一帧的追踪成功,通过卡尔曼滤波更新相应的Tracks变量。
-
循环执行(7)-(8)步骤,直到视频结束。
-
输出结果:在整个视频帧处理过程中,维护和更新Tracks,最终输出每个目标的跟踪结果,包括目标ID、位置、速度等信息。
DeepSort解析
DeepSort算法不仅使用IOU作为匹配条件,同时也利用目标的特征信息来进行匹配,但DeepSort中的Sort模块实际上与先前的Sort算法在功能上并没有区别,依旧使用卡尔曼滤波来进行轨迹预测,依旧采用匈牙利匹配实现检测框与轨迹的匹配。那么具体有哪些不同呢,这里我们看一下DeepSort算法的构成:
DeepSort初始化方法
DeepSort算法初始化定义如下,可以看到其内包含的内容主要有置信度阈值,特征提取器(即我们先前Market-1501
数据集训练的模型),特征计算器 metric(用于计算轨迹特征与检测框特征的相似度)以及轨迹跟踪器。
class DeepSort(object):
def __init__(self, model_path, max_dist=0.2, min_confidence=0.3, nms_max_overlap=1.0, max_iou_distance=0.7,
max_age=70, n_init=3, nn_budget=100, use_cuda=True,num_classes=751,reid=True):
self.min_confidence = min_confidence # 置信度阈值 yolov8的检测结果,挑出来要置信度大的检测
self.nms_max_overlap = nms_max_overlap # 和非极大值抑制有关
# 特征提取器 用于:把n个小图变成统一尺寸送入神经网络,得到n*512的特征
self.extractor = Extractor(model_path, use_cuda=use_cuda,num_classes=num_classes,reid=reid)
max_cosine_distance = max_dist # 用于级联匹配是要计算余弦距离,如果大于该阈值,则匹配不上
nn_budget = 100 # 每个轨迹保存特征最多的个数
metric = NearestNeighborDistanceMetric("cosine", max_cosine_distance, nn_budget) #特征向量之间计算相似度用的一个类
# 轨迹跟踪管理器 管理着所有轨迹
self.tracker = Tracker(metric, max_iou_distance=max_iou_distance, max_age=max_age, n_init=n_init)
DeepSort提取特征方法
上面所说的特征提取便是通过下面的方法来实现的,将目标检测器(如YOLO、RT-DETR
)的检测框提取出来,随后利用self.extractor
进行特征提取
def _get_features(self, bbox_xywh, ori_img):
im_crops = []
# 从原图org_img抠出n个小图,这n个小图是识别出来的矩形框里面的内容
for box in bbox_xywh:
x1,y1,x2,y2 = self._xywh_to_xyxy(box)
im = ori_img[y1:y2,x1:x2]
im_crops.append(im)
if im_crops:
#从原图org_img抠出n个小图,把n个小图变成统一尺寸送入神经网络,得到n*512的特征
features = self.extractor(im_crops) # 相当于调用类Extractor里面的__call__函数
else:
features = np.array([])
return features
def _xywh_to_xyxy(self, bbox_xywh):
x, y, w, h = bbox_xywh
x1 = max(int(x - w / 2), 0)
x2 = min(int(x + w / 2), self.width - 1)
y1 = max(int(y - h / 2), 0)
y2 = min(int(y + h / 2), self.height - 1)
return x1, y1, x2, y2
DeepSort轨迹更新方法
随后便是update方法了,代码在deepsort.py中,该部分会通过获取检测器的检测结果以及卡尔曼滤波的预测结果,从而更新轨迹,详细如下:
def update(self, bbox_xywh, confidences, oids, ori_img):
self.height, self.width = ori_img.shape[:2] #1080 1920
# generate detections
# 得到ori_img这幅图上的特征 n*512
features = self._get_features(bbox_xywh, ori_img) # features n*512
bbox_tlwh = self._xywh_to_tlwh(bbox_xywh) # 把中心xy变成左上tl
# detections是当前这幅图ori_img检查出来的n个对象,组成一个列表 [Detection(tlwh,置信度,512个特征)],筛选掉小于min_confidence的检测结果
detections = [Detection(bbox_tlwh[i], conf, features[i],oid) for i, (conf,oid) in enumerate(zip(confidences,oids)) if conf > self.min_confidence]
# run on non-maximum supression
boxes = np.array([d.tlwh for d in detections]) #没用
scores = np.array([d.confidence for d in detections]) #没用
# update tracker
self.tracker.predict() # 所有轨迹向前预测一步
self.tracker.update(detections) # 所有轨迹根据最新检测detections进行更新
# output bbox identities
outputs = []
for track in self.tracker.tracks:
if not track.is_confirmed() or track.time_since_update > 1:
continue
box = track.to_tlwh()
x1, y1, x2, y2 = self._tlwh_to_xyxy(box)
track_id = track.track_id
track_oid = track.oid
outputs.append(np.array([x1, y1, x2, y2, track_id, track_oid], dtype=np.int))
if len(outputs) > 0:
outputs = np.stack(outputs, axis=0)
return outputs
第一帧匹配结果
检测出的目标共有16个
由于是第一帧,所以追踪器中没有轨迹,最终结果会将所有的检测结果当作轨迹存储进去
第二帧追踪结果
在第二帧的追踪中,依旧有16个检测结果
第三帧追踪结果
在第三帧中,检测到17个目标
但此时的轨迹却只有16个
匹配方法
该匹配方法包含级联匹配与IOU的匹配,IOU匹配先前我们已经解释过了,那么这个级联匹配到底是什么呢?
为什么叫级联匹配,主要是它的匹配过程是一个循环。即从missing age=0的轨迹,每一帧都匹配上,没有丢失过的轨迹到missing age=30的轨迹,丢失轨迹的最大时间30帧的轨迹,挨个的和检测结果进行匹配。也就是说,对于没有丢失过的轨迹赋予优先匹配的权利,而丢失的最久的轨迹最后匹配。
当然代码中先进行一级匹配(特征匹配)再进行二级匹配(IOU匹配),这也是一种级联匹配
def _match(self, detections):
def gated_metric(tracks, dets, track_indices, detection_indices):
features = np.array([dets[i].feature for i in detection_indices])
targets = np.array([tracks[i].track_id for i in track_indices])
# 通过最近邻或者余弦距离计算出成本矩阵或者称为代价矩阵
# self.metric是nn_matching.py里面类NearestNeighborDistanceMetric的实例,本类初始化时传入
# self.metric=NearestNeighborDistanceMetric("cosine", 0.2, 100)
cost_matrix = self.metric.distance(features, targets)
# 如果发现二者的距离比用kf预测的距离大,说明二者肯定不匹配,就把对应的成本矩阵值变成一个很大的数
cost_matrix = linear_assignment.gate_cost_matrix(#cost_matrix 行对应轨迹 列对应检测
self.kf, cost_matrix, tracks, dets, track_indices,
detection_indices)
return cost_matrix
# Split track set into confirmed and unconfirmed tracks.
# 区分开已确定的轨迹confirmed_tracks和其它未确定的轨迹unconfirmed_tracks
confirmed_tracks = [ #已经确认的轨迹索引值
i for i, t in enumerate(self.tracks) if t.is_confirmed()]
unconfirmed_tracks = [#未确认的轨迹索引值也就是暂定的轨迹tentative
i for i, t in enumerate(self.tracks) if not t.is_confirmed()]
# 本函数功能一:级联匹配
# 级联匹配是检测detections和confirmed的轨迹进行匹配。
# 因为级联匹配是和前面几个级依次进行匹配,所以级联匹配设计目的就是和confirmed的轨迹匹配。
matches_a, unmatched_tracks_a, unmatched_detections = \
linear_assignment.matching_cascade(
gated_metric, self.metric.matching_threshold, self.max_age,
self.tracks, detections, confirmed_tracks)
# 本函数功能二:IOU匹配 对级联匹配中还没有匹配成功的目标再进行IoU匹配
# 是以下两个之间的匹配:
# 一是:暂定的轨迹unconfirmed_tracks 加上年纪为1的轨迹组成 做IOU的轨迹候选 iou_track_candidates
# 二是:级联匹配每匹配成功的检测 unmatched_detections
iou_track_candidates = unconfirmed_tracks + [
k for k in unmatched_tracks_a if
self.tracks[k].time_since_update == 1] # 自从更新做了1次预测,说明本track上一次匹配上了做了update
unmatched_tracks_a = [ #年纪大>1的轨迹不能用作做IOU。因为多次没匹配上,和当前帧检测结果在时间上差的远,
k for k in unmatched_tracks_a if
self.tracks[k].time_since_update != 1]
# 调用IoU匹配
matches_b, unmatched_tracks_b, unmatched_detections = \
linear_assignment.min_cost_matching(
iou_matching.iou_cost, self.max_iou_distance, self.tracks,
detections, iou_track_candidates, unmatched_detections)
# 3 整理输出结果
matches = matches_a + matches_b # 组合两部分匹配
unmatched_tracks = list(set(unmatched_tracks_a + unmatched_tracks_b))
return matches, unmatched_tracks, unmatched_detections
级联匹配具体实现
def matching_cascade(
distance_metric, max_distance, cascade_depth, tracks, detections,
track_indices=None, detection_indices=None):
# 分配track_indices和detection_indices两个列表
if track_indices is None: #tracks里面confirmed索引 注意 track_indices=[], 和track_indices=None不是一回事
track_indices = list(range(len(tracks)))
if detection_indices is None:#detections的索引值
detection_indices = list(range(len(detections)))
# 未匹配检测先初始化为所有,后续会随着匹配逐渐减少
unmatched_detections = detection_indices
# 初始化匹配对 matches,先初始化为空集,每做一级匹配,如果匹配成功了,就收集进来
matches = []
# 由小到大依次对每个level的tracks做匹配
for level in range(cascade_depth):
# 如果没有detections,退出循环
if len(unmatched_detections) == 0: # No detections left 剩下都没有被匹配的detections
break
# time_since_update=1 说明自从更新做了1次预测,说明本track上一次匹配上了做了update 所以这些轨迹先去匹配
# time_since_update=2 说明自从更新做了2次预测,说明本track上一次没匹配上,上上次匹配上了做了更新,所以这些轨迹去匹配的优先级其次
# 这样最多做70回, 也就是说,一条轨迹如果不更新,寿命最多70回,就再也接续不上了
# time_since_update=70 说明自从更新做了70次预测,说明已经有69次没匹配上了也没做更新,这轮做完就不再往前找了。
track_indices_l = [ #第level轮候选track
k for k in track_indices
if tracks[k].time_since_update == 1 + level
]
# 如果第level轮没有候选track,就进行到下一轮去找
if len(track_indices_l) == 0: # Nothing to match at this level
continue
# 调用min_cost_matching进行匹配
# 函数含义:本轮可以匹配的轨迹track_indices_l, 还没有匹配上的检测unmatched_detections进行匹配
# 结果:matches_l是匹配对的set (track索引,detections索引), unmatched_detections是detections里面没用匹配的索引
# 注意,这里unmatched_detections集合随着匹配的进行会一点点变小
matches_l, _, unmatched_detections = \
min_cost_matching(
distance_metric, max_distance, tracks, detections,
track_indices_l, unmatched_detections)
matches += matches_l #收集匹配对
unmatched_tracks = list(set(track_indices) - set(k for k, _ in matches))
#返回结果:
# 匹配对matches (track索引,detections索引),
# 未匹配的轨迹unmatched_tracks
# 未匹配的检测unmatched_detections
return matches, unmatched_tracks, unmatched_detections
至于这个特征图的损失是如何计算的呢?
def _match(self, detections):
def gated_metric(tracks, dets, track_indices, detection_indices):
features = np.array([dets[i].feature for i in detection_indices])
targets = np.array([tracks[i].track_id for i in track_indices])
# 通过最近邻或者余弦距离计算出成本矩阵或者称为代价矩阵
# self.metric是nn_matching.py里面类NearestNeighborDistanceMetric的实例,本类初始化时传入
# self.metric=NearestNeighborDistanceMetric("cosine", 0.2, 100)
cost_matrix = self.metric.distance(features, targets)
# 如果发现二者的距离比用kf预测的距离大,说明二者肯定不匹配,就把对应的成本矩阵值变成一个很大的数
cost_matrix = linear_assignment.gate_cost_matrix(#cost_matrix 行对应轨迹 列对应检测
self.kf, cost_matrix, tracks, dets, track_indices,
detection_indices)
return cost_matrix
def distance(self, features, targets):
cost_matrix = np.zeros((len(targets), len(features)))#行索引是轨迹 列索引是当前这帧图像的检测
for i, target in enumerate(targets):
cost_matrix[i, :] = self._metric(self.samples[target], features) #每个轨迹返回的代价为17,若有16个轨迹,则得到的代价矩阵为(16,17)
return cost_matrix
#计算两个特征的差距
def _nn_cosine_distance(x, y): #传入的x为一个轨迹中保存的历史特征,维度为(7,512) y为当前检测框特征,维度为(17,512),17代表检测结果个数,最终计算的代价为7,17,然后再取最小值,得到17
distances = _cosine_distance(x, y)
return distances.min(axis=0)
其余的则是判断状态,然后对轨迹进行删除等操作,这些均与Sort方法类似,这里就不再赘述了。
总结
DeepSort算法引入的特征提取模块,并在匹配时首先使用特征作为匹配指标,因此,这使得DeepSort算法能够在目标遮挡后再次出现时能够重新匹配成功(ReID),当然这也意味着其速度上较不使用特征提取方法的跟踪算法,(如ByteTrack)速度慢了一些。