目标跟踪之DeepSort算法(4)
目标跟踪之DeepSort
- 1 安装
- 1.1 代码下载与安装
- 1. 2 DeepSort检测流程
- 1.3 模型初始化流程
- 2. 模型推理
- 2.1 模型推理代码解析
- 2.2 对预测结果跟踪代码解析
- 2.3 轨迹预测
- 2.4 轨迹跟踪
- 2.5 轨迹与特征匹配
- 2.6 计算轨迹与检测的特征余弦距离
- 2.7 用轨迹与检测的马氏距离跟新cost_matrix矩阵
1 安装
参考:https://github.com/beeduchai/YOLOv8-DeepSORT-Object-Tracking
1.1 代码下载与安装
1 创建一个全新的虚拟环境
conda create -n yolov8_deepsort python=3.9
2 激活虚拟环境
conda activate yolov8_deepsort
3 在当前地址创建一个文件夹存放将要下载的YOLOv8-DeepSORT-Object-Tracking
mkdir my_yolov8_deepsort
4 跳转到yolov8_deepsort 文件夹
cd my_yolov8_deepsort
5 下载代码
git clone https://github.com/MuhammadMoinFaisal/YOLOv8-DeepSORT-Object-Tracking.git
6 安装依赖库
pip install -e .
7 在https://drive.google.com/drive/folders/1kna8eWGrSfzaR6DtNJ8_GchGgPMv3VC8下载文件,将文件夹解压放在ultralytics/yolo/v8/detect目录下
8 在ultralytics/yolo/v8/detect放一个视频,执行以下命令.首次执行以下代码,会自动取下载yolov8l.pt模型
python predict.py model=yolov8l.pt source="traffic.mp4" show=True
注释:如果出现错误,可能是库的版本不对,根据报错提示更改版本。
1. 2 DeepSort检测流程
DeepSort检测流程:
- 模型初始化
- 模型推理
# ultralytics/yolo/v8/detect/predict.py
def predict(cfg): # cfg:ultralytics/yolo/configs/default.yaml
# 1.模型初始化
init_tracker()
cfg.model = cfg.model or "yolov8n.pt"
cfg.imgsz = check_imgsz(cfg.imgsz, min_dim=2) # check image size
cfg.source = cfg.source if cfg.source is not None else ROOT / "assets"
# 2. 模型推理
predictor = DetectionPredictor(cfg)
predictor()
1.3 模型初始化流程
- 实例化对象获取"deep_sort_pytorch/configs/deep_sort.yaml"的参数。
- 初始化跟踪器
2.1 实例化特征提取器
2.2 实例化匹配代价矩阵
2.3 实例化跟踪器
# ultralytics/yolo/v8/detect/predict.py
def init_tracker():
global deepsort
# 1 实例化对象获取"deep_sort_pytorch/configs/deep_sort.yaml"的参数
cfg_deep = get_config()
cfg_deep.merge_from_file("deep_sort_pytorch/configs/deep_sort.yaml")
# 2 初始化跟踪器
deepsort= DeepSort(cfg_deep.DEEPSORT.REID_CKPT,
max_dist=cfg_deep.DEEPSORT.MAX_DIST, min_confidence=cfg_deep.DEEPSORT.MIN_CONFIDENCE,
nms_max_overlap=cfg_deep.DEEPSORT.NMS_MAX_OVERLAP, max_iou_distance=cfg_deep.DEEPSORT.MAX_IOU_DISTANCE,
max_age=cfg_deep.DEEPSORT.MAX_AGE, n_init=cfg_deep.DEEPSORT.N_INIT, nn_budget=cfg_deep.DEEPSORT.NN_BUDGET,
use_cuda=True)
# ultralytics/yolo/v8/detect/deep_sort_pytorch/deep_sort/deep_sort.py
class DeepSort(object):
def __init__(self, model_path, max_dist=0.2, min_confidence=0.3, nms_max_overlap=1.0, max_iou_distance=0.7, max_age=70, n_init=3, nn_budget=100, use_cuda=True):
self.min_confidence = min_confidence # 检测物体的最小置信度
self.nms_max_overlap = nms_max_overlap # NMS时iou的最大值
# 2.1 实例化特征提取器
self.extractor = Extractor(model_path, use_cuda=use_cuda) # 提取检测到的物体的特征,用于计算轨迹与检测框的余弦距离
max_cosine_distance = max_dist # 计算距离的最大值
2.2 实例化匹配代价矩阵
metric = NearestNeighborDistanceMetric(
"cosine", max_cosine_distance, nn_budget) # metric计算匹配代价矩阵。在级联匹配中用欧式距离计算特级特征和检测特征的距离
2.3 实例化跟踪器
self.tracker = Tracker(
metric, max_iou_distance=max_iou_distance, max_age=max_age, n_init=n_init)
2. 模型推理
模型推理流程:
- 准备模型和数据,对数据进行预处理
1.1 数据进行预处理 - 数据带入模型得到预测结果
- 处理预测结果
3.1 处理预测结果 - 对预测结果跟踪
2.1 模型推理代码解析
# ultralytics/yolo/engine/predictor.py
@smart_inference_mode()
def __call__(self, source=None, model=None):
# 1.准备模型和数据,对数据进行预处理
self.run_callbacks("on_predict_start")
model = self.model if self.done_setup else self.setup(source, model)
model.eval()
self.seen, self.windows, self.dt = 0, [], (ops.Profile(), ops.Profile(), ops.Profile())
self.all_outputs = []
for batch in self.dataset:
self.run_callbacks("on_predict_batch_start")
path, im, im0s, vid_cap, s = batch
visualize = increment_path(self.save_dir / Path(path).stem, mkdir=True) if self.args.visualize else False
with self.dt[0]:
im = self.preprocess(im)
if len(im.shape) == 3:
im = im[None] # expand for batch dim
# 2. 数据带入模型得到预测结果
# Inference
with self.dt[1]:
preds = model(im, augment=self.args.augment, visualize=visualize)
# 3. 处理预测结果
# postprocess
with self.dt[2]:
preds = self.postprocess(preds, im, im0s)
for i in range(len(im)):
if self.webcam:
path, im0s = path[i], im0s[i]
p = Path(path)
# 4. 对预测结果跟踪
s += self.write_results(i, preds, (p, im, im0s))
if self.args.show:
self.show(p)
if self.args.save:
self.save_preds(vid_cap, i, str(self.save_dir / p.name))
# Print time (inference-only)
LOGGER.info(f"{s}{'' if len(preds) else '(no detections), '}{self.dt[1].dt * 1E3:.1f}ms")
self.run_callbacks("on_predict_batch_end")
# Print results
t = tuple(x.t / self.seen * 1E3 for x in self.dt) # speeds per image
LOGGER.info(
f'Speed: %.1fms pre-process, %.1fms inference, %.1fms postprocess per image at shape {(1, 3, *self.imgsz)}'
% t)
if self.args.save_txt or self.args.save:
s = f"\n{len(list(self.save_dir.glob('labels/*.txt')))} labels saved to {self.save_dir / 'labels'}" if self.args.save_txt else ''
LOGGER.info(f"Results saved to {colorstr('bold', self.save_dir)}{s}")
self.run_callbacks("on_predict_end")
return self.all_outputs
# 1.1 数据进行预处理 ultralytics/yolo/v8/detect/predict.py
def preprocess(self, img):
img = torch.from_numpy(img).to(self.model.device) # 图片数据转换成tensor格式
img = img.half() if self.model.fp16 else img.float() # uint8 to fp16/32 选择半精度还是双精度
img /= 255 # 0 - 255 to 0.0 - 1.0 # 图片数据缩放到0~1之间
return img
# 3.1 处理预测结果 ultralytics/yolo/v8/detect/predict.py
def postprocess(self, preds, img, orig_img):
# 1 非极大值抑制
preds = ops.non_max_suppression(preds,
self.args.conf,
self.args.iou,
agnostic=self.args.agnostic_nms,
max_det=self.args.max_det)
for i, pred in enumerate(preds):
shape = orig_img[i].shape if self.webcam else orig_img.shape
# 2 把检测到的框映射到原图
pred[:, :4] = ops.scale_boxes(img.shape[2:], pred[:, :4], shape).round()
return preds
2.2 对预测结果跟踪代码解析
把检测框的中心点宽高、置信度、类别、原图输入 deepsort.update()中进行跟踪,跟踪代码流程如下:
- 提取特征。
- 轨迹预测。
- 轨迹跟踪。
# ultralytics/yolo/v8/detect/predict.py-->def write_results(self, idx, preds, batch)-->outputs = deepsort.update(xywhs, confss, oids, im0)--> self.tracker.predict() self.tracker.update(detections)
def update(self, bbox_xywh, confidences, oids, ori_img):
self.height, self.width = ori_img.shape[:2]
# generate detections
# 1. 提取特征
features = self._get_features(bbox_xywh, ori_img) # 提取检测框对应物体的特征
bbox_tlwh = self._xywh_to_tlwh(bbox_xywh) # 把框由中心点宽高的格式转换为左上角坐标和宽高的格式
detections = [Detection(bbox_tlwh[i], conf, features[i],oid) for i, (conf,oid) in enumerate(zip(confidences,oids)) if conf > self.min_confidence] # 删除资信度小于阈值的检测框,并把检测框的特征添加到detections中
# run on non-maximum supression
boxes = np.array([d.tlwh for d in detections])
scores = np.array([d.confidence for d in detections])
# update tracker
# 2. 轨迹预测
self.tracker.predict()
# 3. 轨迹跟踪
self.tracker.update(detections)
# output bbox identities
outputs = []
for track in self.tracker.tracks:
if not track.is_confirmed() or track.time_since_update > 1:
continue
box = track.to_tlwh()
x1, y1, x2, y2 = self._tlwh_to_xyxy(box)
track_id = track.track_id
track_oid = track.oid
outputs.append(np.array([x1, y1, x2, y2, track_id, track_oid], dtype=np.int))
if len(outputs) > 0:
outputs = np.stack(outputs, axis=0)
return outputs
2.3 轨迹预测
ultralytics/yolo/v8/detect/deep_sort_pytorch/deep_sort/sort/kalman_filter.py
self.tracker.predict()–>track.predict(self.kf)–>def predict(self, mean, covariance)
轨迹预测预测流程:
- 状态转移矩阵。
- 根据状态转移矩阵获取下一状态和其协方差。
def predict(self, mean, covariance):
"""Run Kalman filter prediction step.
Parameters
----------
mean : ndarray
The 8 dimensional mean vector of the object state at the previous
time step.
covariance : ndarray
The 8x8 dimensional covariance matrix of the object state at the
previous time step.
Returns
-------
(ndarray, ndarray)
Returns the mean vector and covariance matrix of the predicted
state. Unobserved velocities are initialized to 0 mean.
"""
std_pos = [
self._std_weight_position * mean[3],
self._std_weight_position * mean[3],
1e-2,
self._std_weight_position * mean[3]]
std_vel = [
self._std_weight_velocity * mean[3],
self._std_weight_velocity * mean[3],
1e-5,
self._std_weight_velocity * mean[3]]
# 1. 状态转移矩阵
motion_cov = np.diag(np.square(np.r_[std_pos, std_vel]))
# 2. 根据状态转移矩阵获取下一状态和其协方差
mean = np.dot(self._motion_mat, mean)
covariance = np.linalg.multi_dot((
self._motion_mat, covariance, self._motion_mat.T)) + motion_cov
return mean, covariance
2.4 轨迹跟踪
ultralytics/yolo/v8/detect/deep_sort_pytorch/deep_sort/sort/tracker.py
self.tracker.update(detections)
轨迹跟踪步骤:
- 轨迹与特征匹配
- 更新匹配轨迹、未匹配轨迹和未匹配检测
- 更新轨迹的特征
def update(self, detections):
"""Perform measurement update and track management.
Parameters
---------- bbox_tlwh[i], conf, features[i],oid
detections : List[deep_sort.detection.Detection]
A list of detections at the current time step.
"""
# Run matching cascade. 1. 轨迹与特征 级联匹配和IOU匹配
matches, unmatched_tracks, unmatched_detections = \
self._match(detections)
# Update track set. 2. 更新匹配轨迹、未匹配轨迹和未匹配检测
for track_idx, detection_idx in matches:
self.tracks[track_idx].update(
self.kf, detections[detection_idx]) # 对匹配轨迹更新得到当前最优轨迹
for track_idx in unmatched_tracks:
self.tracks[track_idx].mark_missed() # 对未匹配轨迹进行标识
for detection_idx in unmatched_detections:
self._initiate_track(detections[detection_idx]) # 对新检测框初始化轨迹
self.tracks = [t for t in self.tracks if not t.is_deleted()] # 剔除掉删除的轨迹
# Update distance metric. 3. 更新轨迹的历史特征
active_targets = [t.track_id for t in self.tracks if t.is_confirmed()] # 存放确认状态轨迹的id
features, targets = [], [] # 分别存放当前所有轨迹的特征和id
for track in self.tracks:
if not track.is_confirmed():
continue
features += track.features
targets += [track.track_id for _ in track.features]
track.features = []
self.metric.partial_fit( #
np.asarray(features), np.asarray(targets), active_targets)
2.5 轨迹与特征匹配
轨迹与特征匹配步骤:
- 把轨迹分为确认状态和非确认状态。
- 级联匹配。通过级联匹配得到确认匹配轨迹和检测的id
matches_a,未匹配轨迹unmatched_tracks_a, 未匹配检测unmatched_detections - IOU匹配。把级联匹配步骤中只有一次未匹配的轨迹并如非确认轨迹用于IOU匹配;保留级联匹配步骤中未匹配的轨迹中连续2帧或2帧以上没有匹配的轨迹。
- 更新级联匹配和IOU匹配的结果。
def _match(self, detections):
def gated_metric(tracks, dets, track_indices, detection_indices):
features = np.array([dets[i].feature for i in detection_indices]) # 检测框的特征
targets = np.array([tracks[i].track_id for i in track_indices]) # 轨迹的id
cost_matrix = self.metric.distance(features, targets) # 计算轨迹与检测的特征余弦距离
cost_matrix = linear_assignment.gate_cost_matrix( # 用轨迹与检测的马氏距离跟新cost_matrix矩阵
self.kf, cost_matrix, tracks, dets, track_indices,
detection_indices)
return cost_matrix
# Split track set into confirmed and unconfirmed tracks.
# 1 把轨迹分为确认状态和非确认状态
confirmed_tracks = [
i for i, t in enumerate(self.tracks) if t.is_confirmed()]
unconfirmed_tracks = [
i for i, t in enumerate(self.tracks) if not t.is_confirmed()]
# Associate confirmed tracks using appearance features.
# 2 级联匹配
matches_a, unmatched_tracks_a, unmatched_detections = \
linear_assignment.matching_cascade(
gated_metric, self.metric.matching_threshold, self.max_age,
self.tracks, detections, confirmed_tracks)
# Associate remaining tracks together with unconfirmed tracks using IOU.
# 3 IOU匹配
iou_track_candidates = unconfirmed_tracks + [
k for k in unmatched_tracks_a if
self.tracks[k].time_since_update == 1] # 如果级联未匹配轨迹上一帧匹配成功,这一帧匹配失败,则把其添加到不确认轨迹中
unmatched_tracks_a = [
k for k in unmatched_tracks_a if
self.tracks[k].time_since_update != 1] # 跟新unmatched_tracks_a,只保留大于等于连续两帧没有被匹配上的
matches_b, unmatched_tracks_b, unmatched_detections = \
linear_assignment.min_cost_matching(
iou_matching.iou_cost, self.max_iou_distance, self.tracks,
detections, iou_track_candidates, unmatched_detections) # IOU匹配
# 4 更新级联匹配和IOU匹配的结果
matches = matches_a + matches_b
unmatched_tracks = list(set(unmatched_tracks_a + unmatched_tracks_b))
return matches, unmatched_tracks, unmatched_detections
def _initiate_track(self, detection):
mean, covariance = self.kf.initiate(detection.to_xyah()) # 根据新检测到的框初始化轨迹的数值和协方差
self.tracks.append(Track(
mean, covariance, self._next_id, self.n_init, self.max_age,detection.oid,
detection.feature)) # 添加新的轨迹
self._next_id += 1
# 2 级联匹配
级联匹配步骤:
2.1 获取(1 + level)次没有被匹配上的轨迹
2.2 计算轨迹特征和检测特征的代价矩阵
2.2.1 计算轨迹与检测的特征余弦距离
2.2.2 用轨迹与检测的马氏距离跟新cost_matrix矩阵
def matching_cascade(
distance_metric, max_distance, cascade_depth, tracks, detections,
track_indices=None, detection_indices=None):
"""Run matching cascade.
Parameters
----------
distance_metric : Callable[List[Track], List[Detection], List[int], List[int]) -> ndarray
The distance metric is given a list of tracks and detections as well as
a list of N track indices and M detection indices. The metric should
return the NxM dimensional cost matrix, where element (i, j) is the
association cost between the i-th track in the given track indices and
the j-th detection in the given detection indices.
max_distance : float
Gating threshold. Associations with cost larger than this value are
disregarded.
cascade_depth: int
The cascade depth, should be se to the maximum track age.
tracks : List[track.Track]
A list of predicted tracks at the current time step.
detections : List[detection.Detection]
A list of detections at the current time step.
track_indices : Optional[List[int]]
List of track indices that maps rows in `cost_matrix` to tracks in
`tracks` (see description above). Defaults to all tracks.
detection_indices : Optional[List[int]]
List of detection indices that maps columns in `cost_matrix` to
detections in `detections` (see description above). Defaults to all
detections.
Returns
-------
(List[(int, int)], List[int], List[int])
Returns a tuple with the following three entries:
* A list of matched track and detection indices.
* A list of unmatched track indices.
* A list of unmatched detection indices.
"""
if track_indices is None:
track_indices = list(range(len(tracks)))
if detection_indices is None:
detection_indices = list(range(len(detections)))
unmatched_detections = detection_indices # 初始所有的检测都没有匹配
matches = []
for level in range(cascade_depth):
if len(unmatched_detections) == 0: # No detections left
break
# 1 获取(1 + level)次没有被匹配上的轨迹
track_indices_l = [
k for k in track_indices
if tracks[k].time_since_update == 1 + level
]
if len(track_indices_l) == 0: # Nothing to match at this level
continue
# 2 计算轨迹特征和检测特征的马氏距离
matches_l, _, unmatched_detections = \
min_cost_matching(
distance_metric, max_distance, tracks, detections, # max_distance阈值
track_indices_l, unmatched_detections)
matches += matches_l
unmatched_tracks = list(set(track_indices) - set(k for k, _ in matches))
return matches, unmatched_tracks, unmatched_detections
2.6 计算轨迹与检测的特征余弦距离
对检测特征和历史的轨迹特征先归一化,再1减去特征的矩阵相乘得到余弦距离
def _cosine_distance(a, b, data_is_normalized=False):
"""Compute pair-wise cosine distance between points in `a` and `b`.
Parameters
----------
a : array_like
An NxM matrix of N samples of dimensionality M.
b : array_like
An LxM matrix of L samples of dimensionality M.
data_is_normalized : Optional[bool]
If True, assumes rows in a and b are unit length vectors.
Otherwise, a and b are explicitly normalized to lenght 1.
Returns
-------
ndarray
Returns a matrix of size len(a), len(b) such that element (i, j)
contains the squared distance between `a[i]` and `b[j]`.
"""
if not data_is_normalized:
a = np.asarray(a) / np.linalg.norm(a, axis=1, keepdims=True) # a是轨迹特征归一化
b = np.asarray(b) / np.linalg.norm(b, axis=1, keepdims=True) # b是检测特征归一化
return 1. - np.dot(a, b.T)
2.7 用轨迹与检测的马氏距离跟新cost_matrix矩阵
根据公式计算马氏距离
def gate_cost_matrix(
kf, cost_matrix, tracks, detections, track_indices, detection_indices,
gated_cost=INFTY_COST, only_position=False):
"""Invalidate infeasible entries in cost matrix based on the state
distributions obtained by Kalman filtering.
Parameters
----------
kf : The Kalman filter.
cost_matrix : ndarray
The NxM dimensional cost matrix, where N is the number of track indices
and M is the number of detection indices, such that entry (i, j) is the
association cost between `tracks[track_indices[i]]` and
`detections[detection_indices[j]]`.
tracks : List[track.Track]
A list of predicted tracks at the current time step.
detections : List[detection.Detection]
A list of detections at the current time step.
track_indices : List[int]
List of track indices that maps rows in `cost_matrix` to tracks in
`tracks` (see description above).
detection_indices : List[int]
List of detection indices that maps columns in `cost_matrix` to
detections in `detections` (see description above).
gated_cost : Optional[float]
Entries in the cost matrix corresponding to infeasible associations are
set this value. Defaults to a very large value.
only_position : Optional[bool]
If True, only the x, y position of the state distribution is considered
during gating. Defaults to False.
Returns
-------
ndarray
Returns the modified cost matrix.
"""
gating_dim = 2 if only_position else 4
gating_threshold = kalman_filter.chi2inv95[gating_dim]
measurements = np.asarray(
[detections[i].to_xyah() for i in detection_indices])
for row, track_idx in enumerate(track_indices):
track = tracks[track_idx] # gating_distance马氏距离
gating_distance = kf.gating_distance( # track.mean, track.covariance是轨迹的均值和协方差。measurements是检测框
track.mean, track.covariance, measurements, only_position)
cost_matrix[row, gating_distance > gating_threshold] = gated_cost # 如果马氏距离大于阈值,则赋值为极大值。
return cost_matrix
# 马氏距离
def gating_distance(self, mean, covariance, measurements,
only_position=False):
"""Compute gating distance between state distribution and measurements.
A suitable distance threshold can be obtained from `chi2inv95`. If
`only_position` is False, the chi-square distribution has 4 degrees of
freedom, otherwise 2.
Parameters
----------
mean : ndarray
Mean vector over the state distribution (8 dimensional).
covariance : ndarray
Covariance of the state distribution (8x8 dimensional).
measurements : ndarray
An Nx4 dimensional matrix of N measurements, each in
format (x, y, a, h) where (x, y) is the bounding box center
position, a the aspect ratio, and h the height.
only_position : Optional[bool]
If True, distance computation is done with respect to the bounding
box center position only.
Returns
-------
ndarray
Returns an array of length N, where the i-th element contains the
squared Mahalanobis distance between (mean, covariance) and
`measurements[i]`.
"""
mean, covariance = self.project(mean, covariance) # 轨迹特征
if only_position: # 计算马氏距离用几个值
mean, covariance = mean[:2], covariance[:2, :2]
measurements = measurements[:, :2]
cholesky_factor = np.linalg.cholesky(covariance) # 对轨迹的协方差分解成下三角矩阵和,前提covariance正定
d = measurements - mean #(det-track).T *track's covariance.INV *(det-track)
z = scipy.linalg.solve_triangular( # 求线性方程组的解 mash = d.T *covariance^{-1} *d = d.T *(L.T*L) *d = (Ld).T*(Ld)
cholesky_factor, d.T, lower=True, check_finite=False, # L.T*? = d.T--> ? = ((L.T)^{-1} d.T=(dL^{-1}).T
overwrite_b=True)
squared_maha = np.sum(z * z, axis=0)
return squared_maha