Skip to content

Low-level API

Auto-generated reference for the building blocks used by the high-level facade. Use these when you want to compose the detector, ReID runtime, and trackers explicitly.

Detector

Public detector wrapper with overrideable stage hooks and source streaming.

Source code in boxmot/detectors/detector.py
class Detector:
    """Public detector wrapper with overrideable stage hooks and source streaming."""

    def __init__(
        self,
        path: str | Path,
        device: str = "cpu",
        imgsz=None,
        conf: Optional[float] = None,
        iou: float = 0.7,
        classes=None,
        agnostic_nms: bool = False,
        batch: int = 1,
        vid_stride: int = 1,
        callbacks: Optional[dict[str, list[Callable[["Detector"], None]]]] = None,
    ) -> None:
        self.path = Path(path)
        self.device = device
        self.imgsz = default_imgsz(path) if imgsz is None else imgsz
        self.conf = default_conf(path) if conf is None else float(conf)
        self.iou = float(iou)
        self.classes = classes
        self.agnostic_nms = bool(agnostic_nms)
        self.batch_size = max(int(batch), 1)
        self.vid_stride = max(int(vid_stride), 1)
        self.backend = self._get_backend_class(path)(model=path, device=device, imgsz=self.imgsz)
        self.model = getattr(self.backend, "model", getattr(self.backend, "_yolo", self.backend))
        self.done_warmup = False
        self.dataset = None
        self.results = None
        self.raw_results = None
        self.batch = None
        self.seen = 0
        self.stream = False
        self.callbacks = callbacks or {
            "on_predict_start": [],
            "on_predict_batch_start": [],
            "on_predict_postprocess_end": [],
            "on_predict_end": [],
        }
        self._lock = threading.Lock()
        self._last_orig_imgs: list[np.ndarray] | None = None

    @classmethod
    def _get_backend_class(cls, path: str | Path):
        return get_detector_class(path)

    @staticmethod
    def _as_result_list(results):
        return results if isinstance(results, list) else [results]

    @staticmethod
    def _batch_input(frames: list[np.ndarray]):
        return frames[0] if len(frames) == 1 else frames

    def setup_source(self, source, batch: Optional[int] = None, vid_stride: Optional[int] = None):
        """Prepare a batched source iterator for predictor-style inference."""
        self.dataset = _iter_batches(
            source,
            batch_size=max(int(self.batch_size if batch is None else batch), 1),
            vid_stride=max(int(self.vid_stride if vid_stride is None else vid_stride), 1),
        )
        return self.dataset

    def run_callbacks(self, event: str) -> None:
        """Run registered callbacks for a predictor lifecycle event."""
        for callback in self.callbacks.get(event, []):
            callback(self)

    def add_callback(self, event: str, func: Callable[["Detector"], None]) -> None:
        """Register a callback for a predictor lifecycle event."""
        self.callbacks.setdefault(event, []).append(func)

    def warmup(self) -> None:
        """Warm up the detector backend with a dummy frame once."""
        if self.done_warmup:
            return

        if isinstance(self.imgsz, (list, tuple)):
            height, width = int(self.imgsz[0]), int(self.imgsz[1])
        else:
            height = width = int(self.imgsz)

        dummy = np.zeros((height, width, 3), dtype=np.uint8)
        try:
            self.backend(
                [dummy],
                conf=self.conf,
                iou=self.iou,
                classes=self.classes,
                agnostic_nms=self.agnostic_nms,
            )
        except Exception as exc:  # noqa: BLE001
            LOGGER.warning(f"Detector warmup failed: {exc}")
        finally:
            self.done_warmup = True

    def preprocess(self, image: np.ndarray, **kwargs):
        images = image if isinstance(image, list) else [image]
        self._last_orig_imgs = images
        backend_pre = getattr(self.backend, "preprocess", None)
        if not callable(backend_pre):
            return image
        try:
            return backend_pre(images)
        except (TypeError, NotImplementedError):
            # Backend without a real preprocess stage (legacy contract):
            # fall back to the no-op pass-through so the composite path
            # ``self.backend(...)`` continues to work in ``process``.
            return image

    def process(self, frame, **kwargs):
        backend_proc = getattr(self.backend, "process", None)
        # Composite path: callers passing inference overrides
        # (conf/iou/classes/agnostic_nms) get the legacy "do everything"
        # semantics, which is what the warmup and standalone
        # ``Detector.process(images, conf=..., ...)`` callers rely on.
        composite_keys = {"conf", "iou", "classes", "agnostic_nms"}
        if any(key in kwargs for key in composite_keys):
            images = frame if isinstance(frame, list) else [frame]
            results = self.backend(
                images,
                conf=float(kwargs.get("conf", self.conf)),
                iou=float(kwargs.get("iou", self.iou)),
                classes=kwargs.get("classes", self.classes),
                agnostic_nms=bool(kwargs.get("agnostic_nms", self.agnostic_nms)),
            )
            if isinstance(results, list) and len(results) == 1:
                return results[0]
            return results
        # Stage path: ``frame`` is the output of ``self.preprocess`` and we
        # want only the model forward so timing reports inference separately
        # from preprocess/postprocess.
        if callable(backend_proc):
            try:
                return backend_proc(frame)
            except (TypeError, NotImplementedError):
                pass
        # Backend has no standalone process stage: fall back to composite.
        images = self._last_orig_imgs or (frame if isinstance(frame, list) else [frame])
        results = self.backend(
            images,
            conf=self.conf,
            iou=self.iou,
            classes=self.classes,
            agnostic_nms=self.agnostic_nms,
        )
        if isinstance(results, list) and len(results) == 1:
            return results[0]
        return results

    def postprocess(self, results, as_detections: bool = False, **kwargs):
        backend_post = getattr(self.backend, "postprocess", None)
        # If the backend has a real postprocess stage, route the raw model
        # output through it so NMS/scale-back work shows up in the dedicated
        # timing bucket. Backends without a real stage fall through to the
        # legacy unwrap-only behaviour.
        if callable(backend_post) and not isinstance(results, (Detections,)):
            already_detections = (
                isinstance(results, list)
                and len(results) > 0
                and all(isinstance(r, Detections) for r in results)
            )
            if not already_detections:
                try:
                    results = backend_post(
                        results,
                        conf=float(kwargs.get("conf", self.conf)),
                        iou=float(kwargs.get("iou", self.iou)),
                        classes=kwargs.get("classes", self.classes),
                        agnostic_nms=bool(kwargs.get("agnostic_nms", self.agnostic_nms)),
                    )
                except (TypeError, NotImplementedError):
                    pass
        if as_detections:
            return results
        if isinstance(results, Detections):
            if results.masks is not None:
                return results
            return results.dets
        if hasattr(results, "dets"):
            if getattr(results, "masks", None) is not None:
                return results
            return results.dets
        if isinstance(results, list) and all(isinstance(result, Detections) for result in results):
            if len(results) == 1:
                if results[0].masks is not None:
                    return results[0]
                return results[0].dets
            return results
        if isinstance(results, list) and all(hasattr(result, "dets") for result in results):
            if len(results) == 1:
                if getattr(results[0], "masks", None) is not None:
                    return results[0]
                return results[0].dets
            return [result.dets for result in results]
        return results

    def _predict_single(self, source, **kwargs):
        path = str(source) if isinstance(source, (str, Path)) else ""
        image = resolve_image(source)

        with self._lock:
            self.stream = False
            self.batch = ([path], [image])
            self.seen = 0
            self.run_callbacks("on_predict_start")
            self.run_callbacks("on_predict_batch_start")
            preprocessed = self.preprocess(image, path=path, **kwargs)
            raw_results = self.process(preprocessed, path=path, **kwargs)
            self.raw_results = self._as_result_list(raw_results)
            processed = self.postprocess(raw_results, image=image, path=path, **kwargs)
            self.results = self._as_result_list(processed)
            self.seen = len(self.results)
            self.run_callbacks("on_predict_postprocess_end")
            self.run_callbacks("on_predict_end")
            return processed

    def stream_inference(self, source, **kwargs):
        """Stream detector outputs over any supported BoxMOT source."""
        batch_size = max(int(kwargs.pop("batch", self.batch_size)), 1)
        vid_stride = max(int(kwargs.pop("vid_stride", self.vid_stride)), 1)

        with self._lock:
            self.stream = True
            self.seen = 0
            self.setup_source(source, batch=batch_size, vid_stride=vid_stride)
            self.run_callbacks("on_predict_start")
            try:
                for paths, frames in self.dataset:
                    self.batch = (paths, frames)
                    self.run_callbacks("on_predict_batch_start")
                    preprocessed = self.preprocess(self._batch_input(frames), paths=paths, **kwargs)
                    raw_results = self.process(preprocessed, paths=paths, **kwargs)
                    self.raw_results = self._as_result_list(raw_results)
                    processed = self.postprocess(raw_results, frames=frames, paths=paths, **kwargs)
                    self.results = self._as_result_list(processed)
                    self.run_callbacks("on_predict_postprocess_end")
                    for result in self.results:
                        self.seen += 1
                        yield result
            finally:
                self.run_callbacks("on_predict_end")

    def predict_cli(self, source, **kwargs) -> None:
        """Consume streaming inference without accumulating outputs in memory."""
        for _ in self.stream_inference(source, **kwargs):
            pass

    def __call__(self, source, stream: bool = False, **kwargs):
        if stream:
            return self.stream_inference(source, **kwargs)
        if _is_single_inference_source(source):
            return self._predict_single(source, **kwargs)
        return list(self.stream_inference(source, **kwargs))

add_callback(event, func)

Register a callback for a predictor lifecycle event.

Source code in boxmot/detectors/detector.py
def add_callback(self, event: str, func: Callable[["Detector"], None]) -> None:
    """Register a callback for a predictor lifecycle event."""
    self.callbacks.setdefault(event, []).append(func)

predict_cli(source, **kwargs)

Consume streaming inference without accumulating outputs in memory.

Source code in boxmot/detectors/detector.py
def predict_cli(self, source, **kwargs) -> None:
    """Consume streaming inference without accumulating outputs in memory."""
    for _ in self.stream_inference(source, **kwargs):
        pass

run_callbacks(event)

Run registered callbacks for a predictor lifecycle event.

Source code in boxmot/detectors/detector.py
def run_callbacks(self, event: str) -> None:
    """Run registered callbacks for a predictor lifecycle event."""
    for callback in self.callbacks.get(event, []):
        callback(self)

setup_source(source, batch=None, vid_stride=None)

Prepare a batched source iterator for predictor-style inference.

Source code in boxmot/detectors/detector.py
def setup_source(self, source, batch: Optional[int] = None, vid_stride: Optional[int] = None):
    """Prepare a batched source iterator for predictor-style inference."""
    self.dataset = _iter_batches(
        source,
        batch_size=max(int(self.batch_size if batch is None else batch), 1),
        vid_stride=max(int(self.vid_stride if vid_stride is None else vid_stride), 1),
    )
    return self.dataset

stream_inference(source, **kwargs)

Stream detector outputs over any supported BoxMOT source.

Source code in boxmot/detectors/detector.py
def stream_inference(self, source, **kwargs):
    """Stream detector outputs over any supported BoxMOT source."""
    batch_size = max(int(kwargs.pop("batch", self.batch_size)), 1)
    vid_stride = max(int(kwargs.pop("vid_stride", self.vid_stride)), 1)

    with self._lock:
        self.stream = True
        self.seen = 0
        self.setup_source(source, batch=batch_size, vid_stride=vid_stride)
        self.run_callbacks("on_predict_start")
        try:
            for paths, frames in self.dataset:
                self.batch = (paths, frames)
                self.run_callbacks("on_predict_batch_start")
                preprocessed = self.preprocess(self._batch_input(frames), paths=paths, **kwargs)
                raw_results = self.process(preprocessed, paths=paths, **kwargs)
                self.raw_results = self._as_result_list(raw_results)
                processed = self.postprocess(raw_results, frames=frames, paths=paths, **kwargs)
                self.results = self._as_result_list(processed)
                self.run_callbacks("on_predict_postprocess_end")
                for result in self.results:
                    self.seen += 1
                    yield result
        finally:
            self.run_callbacks("on_predict_end")

warmup()

Warm up the detector backend with a dummy frame once.

Source code in boxmot/detectors/detector.py
def warmup(self) -> None:
    """Warm up the detector backend with a dummy frame once."""
    if self.done_warmup:
        return

    if isinstance(self.imgsz, (list, tuple)):
        height, width = int(self.imgsz[0]), int(self.imgsz[1])
    else:
        height = width = int(self.imgsz)

    dummy = np.zeros((height, width, 3), dtype=np.uint8)
    try:
        self.backend(
            [dummy],
            conf=self.conf,
            iou=self.iou,
            classes=self.classes,
            agnostic_nms=self.agnostic_nms,
        )
    except Exception as exc:  # noqa: BLE001
        LOGGER.warning(f"Detector warmup failed: {exc}")
    finally:
        self.done_warmup = True

ReID

Unified ReID runtime that also exposes overrideable public stage hooks.

Source code in boxmot/reid/core/reid.py
class ReID:
    """Unified ReID runtime that also exposes overrideable public stage hooks."""

    def __init__(
        self,
        path: str | Path | list[str | Path] | tuple[str | Path, ...] | None = None,
        *,
        weights: str | Path | list[str | Path] | tuple[str | Path, ...] | None = None,
        device: str | torch.device = "cpu",
        half: bool = False,
        preprocess_name: str | None = None,
    ) -> None:
        model_ref = path if path is not None else weights
        if model_ref is None:
            model_ref = WEIGHTS / "osnet_x0_25_msmt17.pt"

        primary_weight = model_ref[0] if isinstance(model_ref, (list, tuple)) else model_ref
        self.path = Path(primary_weight)
        self.weights = model_ref
        self.device = device if isinstance(device, torch.device) else select_device(device)
        self.half = bool(half)
        # Honour the caller-provided preprocessing choice. ReID models in the
        # zoo (OSNet, LMBN, etc.) are trained with plain ``cv2.resize`` to the
        # input shape, so we fall back to the registry default (``"resize"``)
        # rather than letterbox-padding the crop. Hardcoding ``"resize_pad"``
        # here was a regression that silently changed embedding distributions
        # and degraded IDF1 versus the v17 baseline.
        self.preprocess_name = preprocess_name or DEFAULT_PREPROCESS
        (
            self.pt,
            self.jit,
            self.onnx,
            self.xml,
            self.engine,
            self.tflite,
        ) = self.model_type(self.path)
        self.backend = self
        self.model = self.get_backend()

    @classmethod
    def from_backend(cls, backend: Any) -> "ReID":
        """Build a ``ReID`` runtime around an already-instantiated backend.

        Useful when a tracker has already loaded a ReID backend and we want to
        reuse it (rather than reloading the weights) while still exposing the
        public ``preprocess`` / ``process`` / ``postprocess`` stage hooks.
        """
        instance = cls.__new__(cls)
        instance.path = Path(getattr(backend, "weights", "") or "")
        instance.weights = instance.path
        instance.device = getattr(backend, "device", torch.device("cpu"))
        instance.half = bool(getattr(backend, "half", False))
        instance.preprocess_name = DEFAULT_PREPROCESS
        instance.pt = instance.jit = instance.onnx = False
        instance.xml = instance.engine = instance.tflite = False
        instance.backend = instance
        instance.model = backend
        return instance

    def get_backend(self):
        if hasattr(self, "_backend_model"):
            return self._backend_model

        backend_map = (
            (self.pt, PyTorchBackend),
            (self.jit, TorchscriptBackend),
            (self.onnx, ONNXBackend),
            (self.engine, TensorRTBackend),
            (self.xml, OpenVinoBackend),
            (self.tflite, TFLiteBackend),
        )

        for enabled, backend_class in backend_map:
            if enabled:
                self._backend_model = backend_class(
                    self.weights, self.device, self.half, preprocess=self.preprocess_name
                )
                return self._backend_model

        LOGGER.error("This model framework is not supported yet!")
        raise SystemExit(1)

    def check_suffix(
        self,
        file: Path | str = "osnet_x0_25_msmt17.pt",
        suffix: str | Tuple[str, ...] = (".pt",),
        msg: str = "",
    ) -> None:
        suffixes = [suffix] if isinstance(suffix, str) else list(suffix)
        files = [file] if isinstance(file, (str, Path)) else list(file)

        for candidate in files:
            file_suffix = Path(candidate).suffix.lower()
            if file_suffix and file_suffix not in suffixes:
                LOGGER.error(
                    f"File {candidate} does not have an acceptable suffix. Expected: {suffixes}{msg}"
                )

    def model_type(self, path: Path) -> Tuple[bool, ...]:
        suffixes = list(export_formats().Suffix)
        self.check_suffix(path, suffixes)
        types = [suffix in Path(path).name for suffix in suffixes]

        if Path(path).suffix in {".xml", ".bin"}:
            try:
                openvino_index = suffixes.index("_openvino_model")
                types[openvino_index] = True
            except ValueError:
                pass

        return tuple(types)

    @staticmethod
    def _coerce_boxes(boxes: Any) -> np.ndarray:
        arr = np.asarray(boxes, dtype=np.float32)
        if arr.size == 0:
            cols = arr.shape[1] if arr.ndim == 2 else 4
            return np.empty((0, cols), dtype=np.float32)
        if arr.ndim == 1:
            arr = arr.reshape(1, -1)
        return arr.astype(np.float32, copy=False)

    @staticmethod
    def _coerce_crops(crops: Any) -> list[np.ndarray]:
        if isinstance(crops, (str, Path)):
            return [resolve_image(crops)]

        if isinstance(crops, np.ndarray):
            if crops.ndim == 4:
                return [np.asarray(crop) for crop in crops]
            if crops.ndim == 3:
                return [crops]
            raise ValueError(f"Unsupported crop tensor shape: {crops.shape}")

        if isinstance(crops, (list, tuple)):
            return [
                resolve_image(crop) if isinstance(crop, (str, Path)) else np.asarray(crop)
                for crop in crops
            ]

        raise ValueError(f"Unsupported ReID input type: {type(crops)}")

    def _prepare_crop_batch(self, crops: list[np.ndarray]) -> torch.Tensor:
        if not crops:
            return torch.empty(
                (0, 3, *self.model.input_shape),
                dtype=torch.float32,
                device=self.model.device,
            )

        from boxmot.reid.core.preprocessing import get_preprocess_fn
        preprocess_fn = get_preprocess_fn(self.preprocess_name)

        batch = torch.empty(
            (len(crops), 3, *self.model.input_shape),
            dtype=torch.float16 if self.model.half else torch.float32,
            device=self.model.device,
        )

        for index, crop in enumerate(crops):
            if crop.size == 0:
                crop = np.zeros((*self.model.input_shape, 3), dtype=np.uint8)

            resized = preprocess_fn(crop, self.model.input_shape)
            resized = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
            tensor = torch.from_numpy(resized).to(batch.device, dtype=batch.dtype)
            batch[index] = tensor.permute(2, 0, 1)

        batch = batch / 255.0
        batch = (batch - self.model.mean_array) / self.model.std_array
        return batch

    def preprocess(self, inputs, boxes=None, **kwargs):
        """Build the model-ready input batch (cropping + standardization)."""
        if boxes is not None:
            image = resolve_image(inputs)
            coerced = self._coerce_boxes(boxes)
            if not hasattr(self.model, "get_crops"):
                return {"mode": "image_boxes", "image": image, "boxes": coerced, "fallback": True}
            if coerced.size == 0:
                empty = torch.empty(
                    (0, 3, *self.model.input_shape),
                    dtype=torch.float16 if self.model.half else torch.float32,
                    device=self.model.device,
                )
                batch = self.model.inference_preprocess(empty)
                return {"mode": "image_boxes", "batch": batch, "empty": True}
            batch = self.model.get_crops(coerced, image)
            batch = self.model.inference_preprocess(batch)
            return {"mode": "image_boxes", "batch": batch, "empty": False}

        crops = self._coerce_crops(inputs)
        if not crops:
            empty = torch.empty(
                (0, 3, *self.model.input_shape),
                dtype=torch.float16 if self.model.half else torch.float32,
                device=self.model.device,
            )
            batch = self.model.inference_preprocess(empty)
            return {"mode": "crops", "batch": batch, "empty": True}

        batch = self._prepare_crop_batch(crops)
        batch = self.model.inference_preprocess(batch)
        return {"mode": "crops", "batch": batch, "empty": False}

    def process(self, payload, **kwargs):
        """Run the ReID model forward pass."""
        if payload.get("fallback", False):
            return {"_features": self.model.get_features(payload["boxes"], payload["image"])}
        if payload.get("empty", False):
            return None
        with torch.no_grad():
            return self.model.forward(payload["batch"])

    def postprocess(self, features, **kwargs) -> np.ndarray:
        """Move features to numpy and L2-normalize them."""
        if features is None:
            return np.empty((0, 0), dtype=np.float32)
        if isinstance(features, dict) and "_features" in features:
            return np.asarray(features["_features"], dtype=np.float32)
        if not hasattr(self.model, "inference_postprocess"):
            return np.asarray(features, dtype=np.float32)
        features = np.asarray(self.model.inference_postprocess(features), dtype=np.float32)
        if features.size == 0:
            return np.empty((0, 0), dtype=np.float32)
        norms = np.linalg.norm(features, axis=-1, keepdims=True)
        norms[norms == 0] = 1.0
        return features / norms

    def __call__(self, inputs, boxes=None, **kwargs) -> np.ndarray:
        payload = self.preprocess(inputs, boxes=boxes, **kwargs)
        features = self.process(payload, boxes=boxes, **kwargs)
        return self.postprocess(features, boxes=boxes, **kwargs)

from_backend(backend) classmethod

Build a ReID runtime around an already-instantiated backend.

Useful when a tracker has already loaded a ReID backend and we want to reuse it (rather than reloading the weights) while still exposing the public preprocess / process / postprocess stage hooks.

Source code in boxmot/reid/core/reid.py
@classmethod
def from_backend(cls, backend: Any) -> "ReID":
    """Build a ``ReID`` runtime around an already-instantiated backend.

    Useful when a tracker has already loaded a ReID backend and we want to
    reuse it (rather than reloading the weights) while still exposing the
    public ``preprocess`` / ``process`` / ``postprocess`` stage hooks.
    """
    instance = cls.__new__(cls)
    instance.path = Path(getattr(backend, "weights", "") or "")
    instance.weights = instance.path
    instance.device = getattr(backend, "device", torch.device("cpu"))
    instance.half = bool(getattr(backend, "half", False))
    instance.preprocess_name = DEFAULT_PREPROCESS
    instance.pt = instance.jit = instance.onnx = False
    instance.xml = instance.engine = instance.tflite = False
    instance.backend = instance
    instance.model = backend
    return instance

postprocess(features, **kwargs)

Move features to numpy and L2-normalize them.

Source code in boxmot/reid/core/reid.py
def postprocess(self, features, **kwargs) -> np.ndarray:
    """Move features to numpy and L2-normalize them."""
    if features is None:
        return np.empty((0, 0), dtype=np.float32)
    if isinstance(features, dict) and "_features" in features:
        return np.asarray(features["_features"], dtype=np.float32)
    if not hasattr(self.model, "inference_postprocess"):
        return np.asarray(features, dtype=np.float32)
    features = np.asarray(self.model.inference_postprocess(features), dtype=np.float32)
    if features.size == 0:
        return np.empty((0, 0), dtype=np.float32)
    norms = np.linalg.norm(features, axis=-1, keepdims=True)
    norms[norms == 0] = 1.0
    return features / norms

preprocess(inputs, boxes=None, **kwargs)

Build the model-ready input batch (cropping + standardization).

Source code in boxmot/reid/core/reid.py
def preprocess(self, inputs, boxes=None, **kwargs):
    """Build the model-ready input batch (cropping + standardization)."""
    if boxes is not None:
        image = resolve_image(inputs)
        coerced = self._coerce_boxes(boxes)
        if not hasattr(self.model, "get_crops"):
            return {"mode": "image_boxes", "image": image, "boxes": coerced, "fallback": True}
        if coerced.size == 0:
            empty = torch.empty(
                (0, 3, *self.model.input_shape),
                dtype=torch.float16 if self.model.half else torch.float32,
                device=self.model.device,
            )
            batch = self.model.inference_preprocess(empty)
            return {"mode": "image_boxes", "batch": batch, "empty": True}
        batch = self.model.get_crops(coerced, image)
        batch = self.model.inference_preprocess(batch)
        return {"mode": "image_boxes", "batch": batch, "empty": False}

    crops = self._coerce_crops(inputs)
    if not crops:
        empty = torch.empty(
            (0, 3, *self.model.input_shape),
            dtype=torch.float16 if self.model.half else torch.float32,
            device=self.model.device,
        )
        batch = self.model.inference_preprocess(empty)
        return {"mode": "crops", "batch": batch, "empty": True}

    batch = self._prepare_crop_batch(crops)
    batch = self.model.inference_preprocess(batch)
    return {"mode": "crops", "batch": batch, "empty": False}

process(payload, **kwargs)

Run the ReID model forward pass.

Source code in boxmot/reid/core/reid.py
def process(self, payload, **kwargs):
    """Run the ReID model forward pass."""
    if payload.get("fallback", False):
        return {"_features": self.model.get_features(payload["boxes"], payload["image"])}
    if payload.get("empty", False):
        return None
    with torch.no_grad():
        return self.model.forward(payload["batch"])

Tracker factory

Creates and returns an instance of the specified tracker type.

Parameters: - tracker_type: The type of the tracker (e.g., 'strongsort', 'ocsort'). - tracker_config: Path to the tracker configuration file. - reid_weights: Weights for ReID (re-identification). Used to build a ReID backend when reid_model is not supplied. - device: Device to run the ReID backend on (only used when building from reid_weights). - half: Whether to use half-precision for the ReID backend (only used when building from reid_weights). - per_class: Boolean for class-specific tracking (optional). - evolve_param_dict: A dictionary of parameters for evolving the tracker. - reid_preprocess: Preprocessing method for the ReID backend (only used when building from reid_weights). - reid_model: Pre-built ReID backend (e.g., ReID(...).model). Takes precedence over reid_weights and lets callers share a single backend across trackers. - tracker_backend: Backend to use for the tracker. "python" (default) uses the pure-Python implementation under boxmot.trackers. "cpp" delegates to the registered native (C++) live backend via :func:boxmot.native.registry.get_native_live_backend. The native backend is built on demand if it isn't already compiled.

Returns: - An instance of the selected tracker.

  • ValueError: If tracker_type is not recognized or the requested tracker_backend is not available for that tracker.
Source code in boxmot/trackers/tracker_zoo.py
def create_tracker(
    tracker_type,
    tracker_config=None,
    reid_weights=None,
    device=None,
    half=None,
    per_class=None,
    evolve_param_dict=None,
    reid_preprocess=None,
    reid_model=None,
    tracker_backend="python",
):
    """
    Creates and returns an instance of the specified tracker type.

    Parameters:
    - tracker_type: The type of the tracker (e.g., 'strongsort', 'ocsort').
    - tracker_config: Path to the tracker configuration file.
    - reid_weights: Weights for ReID (re-identification). Used to build a ReID backend
        when ``reid_model`` is not supplied.
    - device: Device to run the ReID backend on (only used when building from ``reid_weights``).
    - half: Whether to use half-precision for the ReID backend (only used when building from ``reid_weights``).
    - per_class: Boolean for class-specific tracking (optional).
    - evolve_param_dict: A dictionary of parameters for evolving the tracker.
    - reid_preprocess: Preprocessing method for the ReID backend (only used when building from ``reid_weights``).
    - reid_model: Pre-built ReID backend (e.g., ``ReID(...).model``). Takes
        precedence over ``reid_weights`` and lets callers share a single backend across trackers.
    - tracker_backend: Backend to use for the tracker. ``"python"`` (default)
        uses the pure-Python implementation under ``boxmot.trackers``. ``"cpp"``
        delegates to the registered native (C++) live backend via
        :func:`boxmot.native.registry.get_native_live_backend`. The native
        backend is built on demand if it isn't already compiled.

    Returns:
    - An instance of the selected tracker.

    Raises:
    - ValueError: If `tracker_type` is not recognized or the requested
      ``tracker_backend`` is not available for that tracker.
    """

    backend = str(tracker_backend or "python").strip().lower()
    if backend not in ("python", "cpp"):
        raise ValueError(
            f"Unknown tracker_backend '{tracker_backend}'. Expected one of: python, cpp."
        )

    if backend == "cpp":
        # Lazy import to keep ``boxmot.native`` (which pulls in ctypes / CMake
        # plumbing) optional for pure-Python users.
        from boxmot.native.registry import get_native_live_backend

        native = get_native_live_backend(tracker_type)
        cfg_dict = None
        if evolve_param_dict is not None:
            cfg_dict = dict(evolve_param_dict)
        elif tracker_config is not None:
            with open(tracker_config, "r", encoding="utf-8") as f:
                yaml_config = yaml.safe_load(f) or {}
                cfg_dict = {
                    param: details["default"] for param, details in yaml_config.items()
                }
        # Native live constructors take ``(cfg_dict, reid_weights=...)``; only
        # ReID-aware trackers consume ``reid_weights``. Passing ``reid_weights``
        # to non-ReID trackers is harmless because the unused kwarg is ignored
        # by the wrapper signatures.
        kwargs = {}
        if tracker_type in REID_TRACKERS:
            kwargs["reid_weights"] = reid_weights
            if reid_preprocess is not None:
                kwargs["reid_preprocess"] = reid_preprocess
        return native.create_tracker(cfg_dict, **kwargs)

    if tracker_type not in TRACKER_MAPPING:
        available = ", ".join(TRACKER_MAPPING.keys())
        raise ValueError(f"Unknown tracker type: '{tracker_type}'. Available trackers are: {available}")

    # Load configuration from file or use provided dictionary
    if evolve_param_dict is None:
        if tracker_config is None:
            # Load default tracker config
            tracker_config = get_tracker_config(tracker_type)
        with open(tracker_config, "r") as f:
            yaml_config = yaml.safe_load(f)
            tracker_args = {
                param: details["default"] for param, details in yaml_config.items()
            }
    else:
        tracker_args = evolve_param_dict.copy()

    # Prepare arguments
    tracker_args["per_class"] = per_class

    if tracker_type in REID_TRACKERS:
        if reid_model is None and reid_weights is not None:
            reid_model = ReID(
                weights=reid_weights,
                device=device,
                half=half,
                preprocess_name=reid_preprocess,
            ).model
        tracker_args["reid_model"] = reid_model

    # Tracker-specific adjustments
    if tracker_type == "strongsort":
        tracker_args.pop("per_class", None)

    # Dynamically import and instantiate the correct tracker class
    module_path, class_name = TRACKER_MAPPING[tracker_type].rsplit(".", 1)
    module = importlib.import_module(module_path)
    tracker_class = getattr(module, class_name)

    # Return the instantiated tracker class with arguments and warmed-up models
    tracker = tracker_class(**tracker_args)
    if hasattr(tracker, "model") and tracker.model is not None:
        tracker.model.warmup()
    return tracker

Returns the path to the tracker configuration file.

Source code in boxmot/trackers/tracker_zoo.py
def get_tracker_config(tracker_type):
    """Returns the path to the tracker configuration file."""
    return TRACKER_CONFIGS / f"{tracker_type}.yaml"