Skip to content

DeepOcSort

Bases: BaseTracker

DeepOCSort Tracker: A tracking algorithm that utilizes a combination of appearance and motion-based tracking.

Parameters:

Name Type Description Default
model_weights str

Path to the model weights for ReID (Re-Identification).

required
device str

Device on which to run the model (e.g., 'cpu' or 'cuda').

required
fp16 bool

Whether to use half-precision (fp16) for faster inference on compatible devices.

required
per_class bool

Whether to perform per-class tracking. If True, tracks are maintained separately for each object class.

False
det_thresh float

Detection confidence threshold. Detections below this threshold will be ignored.

0.3
max_age int

Maximum number of frames to keep a track alive without any detections.

30
min_hits int

Minimum number of hits required to confirm a track.

3
iou_threshold float

Intersection over Union (IoU) threshold for data association.

0.3
delta_t int

Time delta for velocity estimation in Kalman Filter.

3
asso_func str

Association function to use for data association. Options include "iou" for IoU-based association.

'iou'
inertia float

Weight for inertia in motion modeling. Higher values make tracks less responsive to changes.

0.2
w_association_emb float

Weight for the embedding-based association score.

0.5
alpha_fixed_emb float

Fixed alpha for updating embeddings. Controls the contribution of new and old embeddings in the ReID model.

0.95
aw_param float

Parameter for adaptive weighting between association costs.

0.5
embedding_off bool

Whether to turn off the embedding-based association.

False
cmc_off bool

Whether to turn off camera motion compensation (CMC).

False
aw_off bool

Whether to turn off adaptive weighting.

False
Q_xy_scaling float

Scaling factor for the process noise covariance in the Kalman Filter for position coordinates.

0.01
Q_s_scaling float

Scaling factor for the process noise covariance in the Kalman Filter for scale coordinates.

0.0001
**kwargs dict

Additional arguments for future extensions or parameters.

{}
Source code in boxmot/trackers/deepocsort/deepocsort.py
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
class DeepOcSort(BaseTracker):
    """
    DeepOCSort Tracker: A tracking algorithm that utilizes a combination of appearance and motion-based tracking.

    Args:
        model_weights (str): Path to the model weights for ReID (Re-Identification).
        device (str): Device on which to run the model (e.g., 'cpu' or 'cuda').
        fp16 (bool): Whether to use half-precision (fp16) for faster inference on compatible devices.
        per_class (bool, optional): Whether to perform per-class tracking. If True, tracks are maintained separately for each object class.
        det_thresh (float, optional): Detection confidence threshold. Detections below this threshold will be ignored.
        max_age (int, optional): Maximum number of frames to keep a track alive without any detections.
        min_hits (int, optional): Minimum number of hits required to confirm a track.
        iou_threshold (float, optional): Intersection over Union (IoU) threshold for data association.
        delta_t (int, optional): Time delta for velocity estimation in Kalman Filter.
        asso_func (str, optional): Association function to use for data association. Options include "iou" for IoU-based association.
        inertia (float, optional): Weight for inertia in motion modeling. Higher values make tracks less responsive to changes.
        w_association_emb (float, optional): Weight for the embedding-based association score.
        alpha_fixed_emb (float, optional): Fixed alpha for updating embeddings. Controls the contribution of new and old embeddings in the ReID model.
        aw_param (float, optional): Parameter for adaptive weighting between association costs.
        embedding_off (bool, optional): Whether to turn off the embedding-based association.
        cmc_off (bool, optional): Whether to turn off camera motion compensation (CMC).
        aw_off (bool, optional): Whether to turn off adaptive weighting.
        Q_xy_scaling (float, optional): Scaling factor for the process noise covariance in the Kalman Filter for position coordinates.
        Q_s_scaling (float, optional): Scaling factor for the process noise covariance in the Kalman Filter for scale coordinates.
        **kwargs: Additional arguments for future extensions or parameters.
    """

    def __init__(
        self,
        reid_weights: Path,
        device: torch.device,
        half: bool,
        per_class: bool = False,
        det_thresh: float = 0.3,
        max_age: int = 30,
        min_hits: int = 3,
        iou_threshold: float = 0.3,
        delta_t: int = 3,
        asso_func: str = "iou",
        inertia: float = 0.2,
        w_association_emb: float = 0.5,
        alpha_fixed_emb: float = 0.95,
        aw_param: float = 0.5,
        embedding_off: bool = False,
        cmc_off: bool = False,
        aw_off: bool = False,
        Q_xy_scaling: float = 0.01,
        Q_s_scaling: float = 0.0001,
        **kwargs: dict,
    ):
        super().__init__(max_age=max_age, per_class=per_class, asso_func=asso_func)
        """
        Sets key parameters for SORT
        """
        self.max_age = max_age
        self.min_hits = min_hits
        self.iou_threshold = iou_threshold
        self.det_thresh = det_thresh
        self.delta_t = delta_t
        self.asso_func = asso_func
        self.inertia = inertia
        self.w_association_emb = w_association_emb
        self.alpha_fixed_emb = alpha_fixed_emb
        self.aw_param = aw_param
        self.per_class = per_class
        self.Q_xy_scaling = Q_xy_scaling
        self.Q_s_scaling = Q_s_scaling
        KalmanBoxTracker.count = 1

        self.model = ReidAutoBackend(
            weights=reid_weights, device=device, half=half
        ).model
        # "similarity transforms using feature point extraction, optical flow, and RANSAC"
        self.cmc = get_cmc_method("sof")()
        self.embedding_off = embedding_off
        self.cmc_off = cmc_off
        self.aw_off = aw_off

    @BaseTracker.setup_decorator
    @BaseTracker.per_class_decorator
    def update(
        self, dets: np.ndarray, img: np.ndarray, embs: np.ndarray = None
    ) -> np.ndarray:
        """
        Params:
          dets - a numpy array of detections in the format [[x1,y1,x2,y2,score],[x1,y1,x2,y2,score],...]
        Requires: this method must be called once for each frame even with empty detections
        (use np.empty((0, 5)) for frames without detections).
        Returns the a similar array, where the last column is the object ID.
        NOTE: The number of objects returned may differ from the number of detections provided.
        """
        # dets, s, c = dets.data
        # print(dets, s, c)
        self.check_inputs(dets, img, embs)

        self.frame_count += 1
        self.height, self.width = img.shape[:2]

        scores = dets[:, 4]
        dets = np.hstack([dets, np.arange(len(dets)).reshape(-1, 1)])
        assert dets.shape[1] == 7
        remain_inds = scores > self.det_thresh
        dets = dets[remain_inds]

        # appearance descriptor extraction
        if self.embedding_off or dets.shape[0] == 0:
            dets_embs = np.ones((dets.shape[0], 1))
        elif embs is not None:
            dets_embs = embs[remain_inds]
        else:
            # (Ndets x X) [512, 1024, 2048]
            dets_embs = self.model.get_features(dets[:, 0:4], img)

        # CMC
        if not self.cmc_off:
            transform = self.cmc.apply(img, dets[:, :4])
            for trk in self.active_tracks:
                trk.apply_affine_correction(transform)

        trust = (dets[:, 4] - self.det_thresh) / (1 - self.det_thresh)
        af = self.alpha_fixed_emb
        # From [self.alpha_fixed_emb, 1], goes to 1 as detector is less confident
        dets_alpha = af + (1 - af) * (1 - trust)

        # get predicted locations from existing trackers.
        trks = np.zeros((len(self.active_tracks), 5))
        trk_embs = []
        to_del = []
        ret = []
        for t, trk in enumerate(trks):
            pos = self.active_tracks[t].predict()[0]
            trk[:] = [pos[0], pos[1], pos[2], pos[3], 0]
            if np.any(np.isnan(pos)):
                to_del.append(t)
            else:
                trk_embs.append(self.active_tracks[t].get_emb())
        trks = np.ma.compress_rows(np.ma.masked_invalid(trks))

        if len(trk_embs) > 0:
            trk_embs = np.vstack(trk_embs)
        else:
            trk_embs = np.array(trk_embs)

        for t in reversed(to_del):
            self.active_tracks.pop(t)

        velocities = np.array(
            [trk.velocity if trk.velocity is not None else np.array((0, 0)) for trk in self.active_tracks])
        last_boxes = np.array([trk.last_observation for trk in self.active_tracks])
        k_observations = np.array(
            [k_previous_obs(trk.observations, trk.age, self.delta_t) for trk in self.active_tracks])

        """
            First round of association
        """
        # (M detections X N tracks, final score)
        if self.embedding_off or dets.shape[0] == 0 or trk_embs.shape[0] == 0:
            stage1_emb_cost = None
        else:
            stage1_emb_cost = dets_embs @ trk_embs.T
        matched, unmatched_dets, unmatched_trks = associate(
            dets[:, 0:5],
            trks,
            self.asso_func,
            self.iou_threshold,
            velocities,
            k_observations,
            self.inertia,
            img.shape[1],  # w
            img.shape[0],  # h
            stage1_emb_cost,
            self.w_association_emb,
            self.aw_off,
            self.aw_param,
        )
        for m in matched:
            self.active_tracks[m[1]].update(dets[m[0], :])
            self.active_tracks[m[1]].update_emb(dets_embs[m[0]], alpha=dets_alpha[m[0]])

        """
            Second round of associaton by OCR
        """
        if unmatched_dets.shape[0] > 0 and unmatched_trks.shape[0] > 0:
            left_dets = dets[unmatched_dets]
            left_dets_embs = dets_embs[unmatched_dets]
            left_trks = last_boxes[unmatched_trks]
            left_trks_embs = trk_embs[unmatched_trks]

            iou_left = self.asso_func(left_dets, left_trks)
            # TODO: is better without this
            emb_cost_left = left_dets_embs @ left_trks_embs.T
            if self.embedding_off:
                emb_cost_left = np.zeros_like(emb_cost_left)
            iou_left = np.array(iou_left)
            if iou_left.max() > self.iou_threshold:
                """
                NOTE: by using a lower threshold, e.g., self.iou_threshold - 0.1, you may
                get a higher performance especially on MOT17/MOT20 datasets. But we keep it
                uniform here for simplicity
                """
                rematched_indices = linear_assignment(-iou_left)
                to_remove_det_indices = []
                to_remove_trk_indices = []
                for m in rematched_indices:
                    det_ind, trk_ind = unmatched_dets[m[0]], unmatched_trks[m[1]]
                    if iou_left[m[0], m[1]] < self.iou_threshold:
                        continue
                    self.active_tracks[trk_ind].update(dets[det_ind, :])
                    self.active_tracks[trk_ind].update_emb(
                        dets_embs[det_ind], alpha=dets_alpha[det_ind]
                    )
                    to_remove_det_indices.append(det_ind)
                    to_remove_trk_indices.append(trk_ind)
                unmatched_dets = np.setdiff1d(
                    unmatched_dets, np.array(to_remove_det_indices)
                )
                unmatched_trks = np.setdiff1d(
                    unmatched_trks, np.array(to_remove_trk_indices)
                )

        for m in unmatched_trks:
            self.active_tracks[m].update(None)

        # create and initialise new trackers for unmatched detections
        for i in unmatched_dets:
            trk = KalmanBoxTracker(
                dets[i],
                delta_t=self.delta_t,
                emb=dets_embs[i],
                alpha=dets_alpha[i],
                Q_xy_scaling=self.Q_xy_scaling,
                Q_s_scaling=self.Q_s_scaling,
                max_obs=self.max_obs,
            )
            self.active_tracks.append(trk)
        i = len(self.active_tracks)
        for trk in reversed(self.active_tracks):
            if trk.last_observation.sum() < 0:
                d = trk.get_state()[0]
            else:
                """
                this is optional to use the recent observation or the kalman filter prediction,
                we didn't notice significant difference here
                """
                d = trk.last_observation[:4]
            if (trk.time_since_update < 1) and (
                trk.hit_streak >= self.min_hits or self.frame_count <= self.min_hits
            ):
                # +1 as MOT benchmark requires positive
                ret.append(
                    np.concatenate(
                        (d, [trk.id], [trk.conf], [trk.cls], [trk.det_ind])
                    ).reshape(1, -1)
                )
            i -= 1
            # remove dead tracklet
            if trk.time_since_update > self.max_age:
                self.active_tracks.pop(i)
        if len(ret) > 0:
            return np.concatenate(ret)
        return np.array([])

update(dets, img, embs=None)

Requires: this method must be called once for each frame even with empty detections (use np.empty((0, 5)) for frames without detections). Returns the a similar array, where the last column is the object ID. NOTE: The number of objects returned may differ from the number of detections provided.

Source code in boxmot/trackers/deepocsort/deepocsort.py
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
@BaseTracker.setup_decorator
@BaseTracker.per_class_decorator
def update(
    self, dets: np.ndarray, img: np.ndarray, embs: np.ndarray = None
) -> np.ndarray:
    """
    Params:
      dets - a numpy array of detections in the format [[x1,y1,x2,y2,score],[x1,y1,x2,y2,score],...]
    Requires: this method must be called once for each frame even with empty detections
    (use np.empty((0, 5)) for frames without detections).
    Returns the a similar array, where the last column is the object ID.
    NOTE: The number of objects returned may differ from the number of detections provided.
    """
    # dets, s, c = dets.data
    # print(dets, s, c)
    self.check_inputs(dets, img, embs)

    self.frame_count += 1
    self.height, self.width = img.shape[:2]

    scores = dets[:, 4]
    dets = np.hstack([dets, np.arange(len(dets)).reshape(-1, 1)])
    assert dets.shape[1] == 7
    remain_inds = scores > self.det_thresh
    dets = dets[remain_inds]

    # appearance descriptor extraction
    if self.embedding_off or dets.shape[0] == 0:
        dets_embs = np.ones((dets.shape[0], 1))
    elif embs is not None:
        dets_embs = embs[remain_inds]
    else:
        # (Ndets x X) [512, 1024, 2048]
        dets_embs = self.model.get_features(dets[:, 0:4], img)

    # CMC
    if not self.cmc_off:
        transform = self.cmc.apply(img, dets[:, :4])
        for trk in self.active_tracks:
            trk.apply_affine_correction(transform)

    trust = (dets[:, 4] - self.det_thresh) / (1 - self.det_thresh)
    af = self.alpha_fixed_emb
    # From [self.alpha_fixed_emb, 1], goes to 1 as detector is less confident
    dets_alpha = af + (1 - af) * (1 - trust)

    # get predicted locations from existing trackers.
    trks = np.zeros((len(self.active_tracks), 5))
    trk_embs = []
    to_del = []
    ret = []
    for t, trk in enumerate(trks):
        pos = self.active_tracks[t].predict()[0]
        trk[:] = [pos[0], pos[1], pos[2], pos[3], 0]
        if np.any(np.isnan(pos)):
            to_del.append(t)
        else:
            trk_embs.append(self.active_tracks[t].get_emb())
    trks = np.ma.compress_rows(np.ma.masked_invalid(trks))

    if len(trk_embs) > 0:
        trk_embs = np.vstack(trk_embs)
    else:
        trk_embs = np.array(trk_embs)

    for t in reversed(to_del):
        self.active_tracks.pop(t)

    velocities = np.array(
        [trk.velocity if trk.velocity is not None else np.array((0, 0)) for trk in self.active_tracks])
    last_boxes = np.array([trk.last_observation for trk in self.active_tracks])
    k_observations = np.array(
        [k_previous_obs(trk.observations, trk.age, self.delta_t) for trk in self.active_tracks])

    """
        First round of association
    """
    # (M detections X N tracks, final score)
    if self.embedding_off or dets.shape[0] == 0 or trk_embs.shape[0] == 0:
        stage1_emb_cost = None
    else:
        stage1_emb_cost = dets_embs @ trk_embs.T
    matched, unmatched_dets, unmatched_trks = associate(
        dets[:, 0:5],
        trks,
        self.asso_func,
        self.iou_threshold,
        velocities,
        k_observations,
        self.inertia,
        img.shape[1],  # w
        img.shape[0],  # h
        stage1_emb_cost,
        self.w_association_emb,
        self.aw_off,
        self.aw_param,
    )
    for m in matched:
        self.active_tracks[m[1]].update(dets[m[0], :])
        self.active_tracks[m[1]].update_emb(dets_embs[m[0]], alpha=dets_alpha[m[0]])

    """
        Second round of associaton by OCR
    """
    if unmatched_dets.shape[0] > 0 and unmatched_trks.shape[0] > 0:
        left_dets = dets[unmatched_dets]
        left_dets_embs = dets_embs[unmatched_dets]
        left_trks = last_boxes[unmatched_trks]
        left_trks_embs = trk_embs[unmatched_trks]

        iou_left = self.asso_func(left_dets, left_trks)
        # TODO: is better without this
        emb_cost_left = left_dets_embs @ left_trks_embs.T
        if self.embedding_off:
            emb_cost_left = np.zeros_like(emb_cost_left)
        iou_left = np.array(iou_left)
        if iou_left.max() > self.iou_threshold:
            """
            NOTE: by using a lower threshold, e.g., self.iou_threshold - 0.1, you may
            get a higher performance especially on MOT17/MOT20 datasets. But we keep it
            uniform here for simplicity
            """
            rematched_indices = linear_assignment(-iou_left)
            to_remove_det_indices = []
            to_remove_trk_indices = []
            for m in rematched_indices:
                det_ind, trk_ind = unmatched_dets[m[0]], unmatched_trks[m[1]]
                if iou_left[m[0], m[1]] < self.iou_threshold:
                    continue
                self.active_tracks[trk_ind].update(dets[det_ind, :])
                self.active_tracks[trk_ind].update_emb(
                    dets_embs[det_ind], alpha=dets_alpha[det_ind]
                )
                to_remove_det_indices.append(det_ind)
                to_remove_trk_indices.append(trk_ind)
            unmatched_dets = np.setdiff1d(
                unmatched_dets, np.array(to_remove_det_indices)
            )
            unmatched_trks = np.setdiff1d(
                unmatched_trks, np.array(to_remove_trk_indices)
            )

    for m in unmatched_trks:
        self.active_tracks[m].update(None)

    # create and initialise new trackers for unmatched detections
    for i in unmatched_dets:
        trk = KalmanBoxTracker(
            dets[i],
            delta_t=self.delta_t,
            emb=dets_embs[i],
            alpha=dets_alpha[i],
            Q_xy_scaling=self.Q_xy_scaling,
            Q_s_scaling=self.Q_s_scaling,
            max_obs=self.max_obs,
        )
        self.active_tracks.append(trk)
    i = len(self.active_tracks)
    for trk in reversed(self.active_tracks):
        if trk.last_observation.sum() < 0:
            d = trk.get_state()[0]
        else:
            """
            this is optional to use the recent observation or the kalman filter prediction,
            we didn't notice significant difference here
            """
            d = trk.last_observation[:4]
        if (trk.time_since_update < 1) and (
            trk.hit_streak >= self.min_hits or self.frame_count <= self.min_hits
        ):
            # +1 as MOT benchmark requires positive
            ret.append(
                np.concatenate(
                    (d, [trk.id], [trk.conf], [trk.cls], [trk.det_ind])
                ).reshape(1, -1)
            )
        i -= 1
        # remove dead tracklet
        if trk.time_since_update > self.max_age:
            self.active_tracks.pop(i)
    if len(ret) > 0:
        return np.concatenate(ret)
    return np.array([])