workflows.auto_labeling

   1import datetime
   2import json
   3import logging
   4import os
   5import queue
   6import random
   7import re
   8import shutil
   9import signal
  10import subprocess
  11import sys
  12import time
  13from difflib import get_close_matches
  14from functools import partial
  15from pathlib import Path
  16from typing import Union
  17
  18import albumentations as A
  19import fiftyone as fo
  20import numpy as np
  21import psutil
  22import torch
  23import torch.multiprocessing as mp
  24from accelerate.test_utils.testing import get_backend
  25from datasets import Split
  26from fiftyone import ViewField as F
  27from huggingface_hub import HfApi, hf_hub_download
  28from PIL import Image
  29from torch.utils.data import DataLoader, Subset
  30from torch.utils.tensorboard import SummaryWriter
  31from torchvision.transforms.functional import to_pil_image
  32from tqdm import tqdm
  33from transformers import (
  34    AutoConfig,
  35    AutoModelForObjectDetection,
  36    AutoModelForZeroShotObjectDetection,
  37    AutoProcessor,
  38    EarlyStoppingCallback,
  39    Trainer,
  40    TrainingArguments,
  41)
  42from ultralytics import YOLO
  43
  44import wandb
  45from config.config import (
  46    ACCEPTED_SPLITS,
  47    GLOBAL_SEED,
  48    HF_DO_UPLOAD,
  49    HF_ROOT,
  50    NUM_WORKERS,
  51    WANDB_ACTIVE,
  52    WORKFLOWS,
  53)
  54from utils.dataset_loader import get_supported_datasets
  55from utils.logging import configure_logging
  56from utils.sample_field_operations import add_sample_field
  57
  58
  59def get_dataset_and_model_from_hf_id(hf_id: str):
  60    """Extract dataset and model name from HuggingFace ID by matching against supported datasets."""
  61    # HF ID follows structure organization/dataset_model
  62    # Both dataset and model can contain "_" as well
  63
  64    # Remove organization (everything before the first "/")
  65    hf_id = hf_id.split("/", 1)[-1]
  66
  67    # Find all dataset names that appear in hf_id
  68    supported_datasets = get_supported_datasets()
  69    matches = [
  70        dataset_name for dataset_name in supported_datasets if dataset_name in hf_id
  71    ]
  72
  73    if not matches:
  74        logging.warning(
  75            f"Dataset name could not be extracted from Hugging Face ID {hf_id}"
  76        )
  77        dataset_name = "no_dataset_name"
  78    else:
  79        # Return the longest match (most specific)
  80        dataset_name = max(matches, key=len)
  81
  82    # Get model name by removing dataset name from hf_id
  83    model_name = hf_id.replace(dataset_name, "").strip("_")
  84    if not model_name:
  85        logging.warning(
  86            f"Model name could not be extracted from Hugging Face ID {hf_id}"
  87        )
  88        model_name = "no_model_name"
  89
  90    return dataset_name, model_name
  91
  92
  93# Handling timeouts
  94class TimeoutException(Exception):
  95    """Custom exception for handling dataloader timeouts."""
  96
  97    pass
  98
  99
 100def timeout_handler(signum, frame):
 101    raise TimeoutException("Dataloader creation timed out")
 102
 103
 104class ZeroShotInferenceCollateFn:
 105    """Collate function for zero-shot inference that prepares batches for model input."""
 106
 107    def __init__(
 108        self,
 109        hf_model_config_name,
 110        hf_processor,
 111        batch_size,
 112        object_classes,
 113        batch_classes,
 114    ):
 115        """Initialize the auto labeling model with the Hugging Face model config, processor, batch size, object classes, and batch classes."""
 116        try:
 117            self.hf_model_config_name = hf_model_config_name
 118            self.processor = hf_processor
 119            self.batch_size = batch_size
 120            self.object_classes = object_classes
 121            self.batch_classes = batch_classes
 122        except Exception as e:
 123            logging.error(f"Error in collate init of DataLoader: {e}")
 124
 125    def __call__(self, batch):
 126        """Processes a batch of data by preparing images and labels for model input."""
 127        try:
 128            images, labels = zip(*batch)
 129            target_sizes = [tuple(img.shape[1:]) for img in images]
 130
 131            # Adjustments for final batch
 132            n_images = len(images)
 133            if n_images < self.batch_size:
 134                self.batch_classes = [self.object_classes] * n_images
 135
 136            # Apply PIL transformation for specific models
 137            if self.hf_model_config_name == "OmDetTurboConfig":
 138                images = [to_pil_image(image) for image in images]
 139
 140            inputs = self.processor(
 141                text=self.batch_classes,
 142                images=images,
 143                return_tensors="pt",
 144                padding=True,  # Allow for differently sized images
 145            )
 146
 147            return inputs, labels, target_sizes, self.batch_classes
 148        except Exception as e:
 149            logging.error(f"Error in collate function of DataLoader: {e}")
 150
 151
 152class ZeroShotObjectDetection:
 153    """Zero-shot object detection using various HuggingFace models with multi-GPU support."""
 154
 155    def __init__(
 156        self,
 157        dataset_torch: torch.utils.data.Dataset,
 158        dataset_info,
 159        config,
 160        detections_path="./output/detections/",
 161        log_root="./logs/",
 162    ):
 163        """Initialize the zero-shot object detection labeler with dataset, configuration, and path settings."""
 164        self.dataset_torch = dataset_torch
 165        self.dataset_info = dataset_info
 166        self.dataset_name = dataset_info["name"]
 167        self.object_classes = config["object_classes"]
 168        self.detection_threshold = config["detection_threshold"]
 169        self.detections_root = os.path.join(detections_path, self.dataset_name)
 170        self.tensorboard_root = os.path.join(
 171            log_root, "tensorboard/zeroshot_object_detection"
 172        )
 173
 174        logging.info(f"Zero-shot models will look for {self.object_classes}")
 175
 176    def exclude_stored_predictions(
 177        self, dataset_v51: fo.Dataset, config, do_exclude=False
 178    ):
 179        """Checks for existing predictions and loads them from disk if available."""
 180        dataset_schema = dataset_v51.get_field_schema()
 181        models_splits_dict = {}
 182        for model_name, value in config["hf_models_zeroshot_objectdetection"].items():
 183            model_name_key = re.sub(r"[\W-]+", "_", model_name)
 184            pred_key = re.sub(
 185                r"[\W-]+", "_", "pred_zsod_" + model_name
 186            )  # od for Object Detection
 187            # Check if data already stored in V51 dataset
 188            if pred_key in dataset_schema and do_exclude is True:
 189                logging.warning(
 190                    f"Skipping model {model_name}. Predictions already stored in Voxel51 dataset."
 191                )
 192            # Check if data already stored on disk
 193            elif (
 194                os.path.isdir(os.path.join(self.detections_root, model_name_key))
 195                and do_exclude is True
 196            ):
 197                try:
 198                    logging.info(f"Loading {model_name} predictions from disk.")
 199                    temp_dataset = fo.Dataset.from_dir(
 200                        dataset_dir=os.path.join(self.detections_root, model_name_key),
 201                        dataset_type=fo.types.COCODetectionDataset,
 202                        name="temp_dataset",
 203                        data_path="data.json",
 204                    )
 205
 206                    # Copy all detections from stored dataset into our dataset
 207                    detections = temp_dataset.values("detections.detections")
 208                    add_sample_field(
 209                        dataset_v51,
 210                        pred_key,
 211                        fo.EmbeddedDocumentField,
 212                        embedded_doc_type=fo.Detections,
 213                    )
 214                    dataset_v51.set_values(f"{pred_key}.detections", detections)
 215                except Exception as e:
 216                    logging.error(
 217                        f"Data in {os.path.join(self.detections_root, model_name_key)} could not be loaded. Error: {e}"
 218                    )
 219                finally:
 220                    fo.delete_dataset("temp_dataset")
 221            # Assign model to be run
 222            else:
 223                models_splits_dict[model_name] = value
 224
 225        logging.info(f"Models to be run: {models_splits_dict}")
 226        return models_splits_dict
 227
 228    # Worker functions
 229    def update_queue_sizes_worker(
 230        self, queues, queue_sizes, largest_queue_index, max_queue_size
 231    ):
 232        """Monitor and manage multiple result queues for balanced processing."""
 233        experiment_name = f"queue_size_monitor_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}"
 234        log_directory = os.path.join(
 235            self.tensorboard_root, self.dataset_name, experiment_name
 236        )
 237        wandb.tensorboard.patch(root_logdir=log_directory)
 238        if WANDB_ACTIVE:
 239            wandb.init(
 240                name=f"queue_size_monitor_{os.getpid()}",
 241                job_type="inference",
 242                project="Zero Shot Object Detection",
 243            )
 244        writer = SummaryWriter(log_dir=log_directory)
 245
 246        step = 0
 247
 248        while True:
 249            for i, queue in enumerate(queues):
 250                queue_sizes[i] = queue.qsize()
 251                writer.add_scalar(f"queue_size/items/{i}", queue_sizes[i], step)
 252
 253            step += 1
 254
 255            # Find the index of the largest queue
 256            max_size = max(queue_sizes)
 257            max_index = queue_sizes.index(max_size)
 258
 259            # Calculate the total size of all queues
 260            total_size = sum(queue_sizes)
 261
 262            # If total_size is greater than 0, calculate the probabilities
 263            if total_size > 0:
 264                # Normalize the queue sizes by the max_queue_size
 265                normalized_sizes = [size / max_queue_size for size in queue_sizes]
 266
 267                # Calculate probabilities based on normalized sizes
 268                probabilities = [
 269                    size / sum(normalized_sizes) for size in normalized_sizes
 270                ]
 271
 272                # Use random.choices with weights (probabilities)
 273                chosen_queue_index = random.choices(
 274                    range(len(queues)), weights=probabilities, k=1
 275                )[0]
 276
 277                largest_queue_index.value = chosen_queue_index
 278            else:
 279                largest_queue_index.value = max_index
 280
 281            time.sleep(0.1)
 282
 283    def process_outputs_worker(
 284        self,
 285        result_queues,
 286        largest_queue_index,
 287        inference_finished,
 288        max_queue_size,
 289        wandb_activate=False,
 290    ):
 291        """Process model outputs from result queues and save to dataset."""
 292        configure_logging()
 293        logging.info(f"Process ID: {os.getpid()}. Results processing process started")
 294        dataset_v51 = fo.load_dataset(self.dataset_name)
 295        processing_successful = None
 296
 297        # Logging
 298        experiment_name = f"post_process_{os.getpid()}_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}"
 299        log_directory = os.path.join(
 300            self.tensorboard_root, self.dataset_name, experiment_name
 301        )
 302        wandb.tensorboard.patch(root_logdir=log_directory)
 303        if WANDB_ACTIVE and wandb_activate:
 304            wandb.init(
 305                name=f"post_process_{os.getpid()}",
 306                job_type="inference",
 307                project="Zero Shot Object Detection",
 308            )
 309        writer = SummaryWriter(log_dir=log_directory)
 310        n_processed_images = 0
 311
 312        logging.info(f"Post-Processor {os.getpid()} starting loop.")
 313
 314        while True:
 315            results_queue = result_queues[largest_queue_index.value]
 316            writer.add_scalar(
 317                f"post_processing/selected_queue",
 318                largest_queue_index.value,
 319                n_processed_images,
 320            )
 321
 322            if results_queue.qsize() == max_queue_size:
 323                logging.warning(
 324                    f"Queue full: {results_queue.qsize()}. Consider increasing number of post-processing workers."
 325                )
 326
 327            # Exit only when inference is finished and the queue is empty
 328            if inference_finished.value and results_queue.empty():
 329                dataset_v51.save()
 330                logging.info(
 331                    f"Post-processing worker {os.getpid()} has finished all outputs."
 332                )
 333                break
 334
 335            # Process results from the queue if available
 336            if not results_queue.empty():
 337                try:
 338                    time_start = time.time()
 339
 340                    result = results_queue.get_nowait()
 341
 342                    processing_successful = self.process_outputs(
 343                        dataset_v51,
 344                        result,
 345                        self.object_classes,
 346                        self.detection_threshold,
 347                    )
 348
 349                    # Performance logging
 350                    n_images = len(result["labels"])
 351                    time_end = time.time()
 352                    duration = time_end - time_start
 353                    batches_per_second = 1 / duration
 354                    frames_per_second = batches_per_second * n_images
 355                    n_processed_images += n_images
 356                    writer.add_scalar(
 357                        f"post_processing/frames_per_second",
 358                        frames_per_second,
 359                        n_processed_images,
 360                    )
 361
 362                    del result  # Explicit removal from device
 363
 364                except Exception as e:
 365                    continue
 366
 367            else:
 368                continue
 369
 370        writer.close()
 371        wandb.finish(exit_code=0)
 372        return processing_successful  # Return last processing status
 373
 374    def gpu_worker(
 375        self,
 376        gpu_id,
 377        cpu_cores,
 378        task_queue,
 379        results_queue,
 380        done_event,
 381        post_processing_finished,
 382        set_cpu_affinity=False,
 383    ):
 384        """Run model inference on specified GPU with dedicated CPU cores."""
 385        dataset_v51 = fo.load_dataset(
 386            self.dataset_name
 387        )  # NOTE Only for the case of sequential processing
 388        configure_logging()
 389        # Set CPU
 390        if set_cpu_affinity:
 391            # Allow only certain CPU cores
 392            psutil.Process().cpu_affinity(cpu_cores)
 393        logging.info(f"Available CPU cores: {psutil.Process().cpu_affinity()}")
 394        max_n_cpus = len(cpu_cores)
 395        torch.set_num_threads(max_n_cpus)
 396
 397        # Set GPU
 398        logging.info(f"GPU {gpu_id}: {torch.cuda.get_device_name(gpu_id)}")
 399        device = torch.device(f"cuda:{gpu_id}")
 400
 401        run_successful = None
 402        with torch.cuda.device(gpu_id):
 403            while True:
 404                if post_processing_finished.value and task_queue.empty():
 405                    # Keep alive until post-processing is done
 406                    break
 407
 408                if task_queue.empty():
 409                    done_event.set()
 410
 411                if not task_queue.empty():
 412                    try:
 413                        task_metadata = task_queue.get(
 414                            timeout=5
 415                        )  # Timeout to prevent indefinite blocking
 416                    except Exception as e:
 417                        break  # Exit if no more tasks
 418                    run_successful = self.model_inference(
 419                        task_metadata,
 420                        device,
 421                        self.dataset_torch,
 422                        dataset_v51,
 423                        self.object_classes,
 424                        results_queue,
 425                        self.tensorboard_root,
 426                    )
 427                    logging.info(
 428                        f"Worker for GPU {gpu_id} finished run successful: {run_successful}"
 429                    )
 430                else:
 431                    continue
 432        return run_successful  # Return last processing status
 433
 434    def eval_and_export_worker(self, models_ready_queue, n_models):
 435        """Evaluate model performance and export results for completed models."""
 436        configure_logging()
 437        logging.info(f"Process ID: {os.getpid()}. Eval-and-export process started")
 438
 439        dataset = fo.load_dataset(self.dataset_name)
 440        run_successful = None
 441        models_done = 0
 442
 443        while True:
 444            if not models_ready_queue.empty():
 445                try:
 446                    dict = models_ready_queue.get(
 447                        timeout=5
 448                    )  # Timeout to prevent indefinite blocking
 449                    model_name = dict["model_name"]
 450                    pred_key = re.sub(r"[\W-]+", "_", "pred_zsod_" + model_name)
 451                    eval_key = re.sub(r"[\W-]+", "_", "eval_zsod_" + model_name)
 452                    dataset.reload()
 453                    run_successful = self.eval_and_export(
 454                        dataset, model_name, pred_key, eval_key
 455                    )
 456                    models_done += 1
 457                    logging.info(
 458                        f"Evaluation and export of {models_done}/{n_models} models done."
 459                    )
 460                except Exception as e:
 461                    logging.error(f"Error in eval-and-export worker: {e}")
 462                    continue
 463
 464            if models_done == n_models:
 465                break
 466
 467        return run_successful
 468
 469    # Functionality functions
 470    def model_inference(
 471        self,
 472        metadata: dict,
 473        device: str,
 474        dataset: torch.utils.data.Dataset,
 475        dataset_v51: fo.Dataset,
 476        object_classes: list,
 477        results_queue: Union[queue.Queue, mp.Queue],
 478        root_log_dir: str,
 479        persistent_workers: bool = False,
 480    ):
 481        """Model inference method running zero-shot object detection on provided dataset and device, returning success status."""
 482        writer = None
 483        run_successful = True
 484        processor, model, inputs, outputs, result, dataloader = (
 485            None,
 486            None,
 487            None,
 488            None,
 489            None,
 490            None,
 491        )  # For finally block
 492
 493        # Timeout handler
 494        dataloader_timeout = 60
 495        signal.signal(signal.SIGALRM, timeout_handler)
 496
 497        try:
 498            # Metadata
 499            run_id = metadata["run_id"]
 500            model_name = metadata["model_name"]
 501            dataset_name = metadata["dataset_name"]
 502            is_subset = metadata["is_subset"]
 503            batch_size = metadata["batch_size"]
 504
 505            logging.info(
 506                f"Process ID: {os.getpid()}, Run ID: {run_id}, Device: {device}, Model: {model_name}"
 507            )
 508
 509            # Load the model
 510            logging.info(f"Loading model {model_name}")
 511            processor = AutoProcessor.from_pretrained(model_name, use_fast=True)
 512            model = AutoModelForZeroShotObjectDetection.from_pretrained(model_name)
 513            model = model.to(device, non_blocking=True)
 514            model.eval()
 515            hf_model_config = AutoConfig.from_pretrained(model_name)
 516            hf_model_config_name = type(hf_model_config).__name__
 517            batch_classes = [object_classes] * batch_size
 518            logging.info(f"Loaded model type {hf_model_config_name}")
 519
 520            # Dataloader
 521            logging.info("Generating dataloader")
 522            if is_subset:
 523                chunk_index_start = metadata["chunk_index_start"]
 524                chunk_index_end = metadata["chunk_index_end"]
 525                logging.info(f"Length of dataset: {len(dataset)}")
 526                logging.info(f"Subset start index: {chunk_index_start}")
 527                logging.info(f"Subset stop index: {chunk_index_end}")
 528                dataset = Subset(dataset, range(chunk_index_start, chunk_index_end))
 529
 530            zero_shot_inference_preprocessing = ZeroShotInferenceCollateFn(
 531                hf_model_config_name=hf_model_config_name,
 532                hf_processor=processor,
 533                object_classes=object_classes,
 534                batch_size=batch_size,
 535                batch_classes=batch_classes,
 536            )
 537            num_workers = WORKFLOWS["auto_labeling_zero_shot"]["n_worker_dataloader"]
 538            prefetch_factor = WORKFLOWS["auto_labeling_zero_shot"][
 539                "prefetch_factor_dataloader"
 540            ]
 541            dataloader = DataLoader(
 542                dataset,
 543                batch_size=batch_size,
 544                shuffle=False,
 545                num_workers=num_workers,
 546                persistent_workers=persistent_workers,
 547                pin_memory=True,
 548                prefetch_factor=prefetch_factor,
 549                collate_fn=zero_shot_inference_preprocessing,
 550            )
 551
 552            dataloader_length = len(dataloader)
 553            if dataloader_length < 1:
 554                logging.error(
 555                    f"Dataloader has insufficient data: {dataloader_length} entries. Please check your dataset and DataLoader configuration."
 556                )
 557
 558            # Logging
 559            experiment_name = f"{model_name}_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}_{device}"
 560            log_directory = os.path.join(root_log_dir, dataset_name, experiment_name)
 561            wandb.tensorboard.patch(root_logdir=log_directory)
 562            if WANDB_ACTIVE:
 563                wandb.init(
 564                    name=f"{model_name}_{device}",
 565                    job_type="inference",
 566                    project="Zero Shot Object Detection",
 567                    config=metadata,
 568                )
 569            writer = SummaryWriter(log_dir=log_directory)
 570
 571            # Inference Loop
 572            logging.info(f"{os.getpid()}: Starting inference loop5")
 573            n_processed_images = 0
 574            for inputs, labels, target_sizes, batch_classes in tqdm(
 575                dataloader, desc="Inference Loop"
 576            ):
 577                signal.alarm(dataloader_timeout)
 578                try:
 579                    time_start = time.time()
 580                    n_images = len(labels)
 581                    inputs = inputs.to(device, non_blocking=True)
 582
 583                    with torch.amp.autocast("cuda"), torch.inference_mode():
 584                        outputs = model(**inputs)
 585
 586                    result = {
 587                        "inputs": inputs,
 588                        "outputs": outputs,
 589                        "processor": processor,
 590                        "target_sizes": target_sizes,
 591                        "labels": labels,
 592                        "model_name": model_name,
 593                        "hf_model_config_name": hf_model_config_name,
 594                        "batch_classes": batch_classes,
 595                    }
 596
 597                    logging.debug(f"{os.getpid()}: Putting result into queue")
 598
 599                    results_queue.put(
 600                        result, timeout=60
 601                    )  # Ditch data only after 60 seconds
 602
 603                    # Logging
 604                    time_end = time.time()
 605                    duration = time_end - time_start
 606                    batches_per_second = 1 / duration
 607                    frames_per_second = batches_per_second * n_images
 608                    n_processed_images += n_images
 609                    logging.debug(
 610                        f"{os.getpid()}: Number of processes images: {n_processed_images}"
 611                    )
 612                    writer.add_scalar(
 613                        f"inference/frames_per_second",
 614                        frames_per_second,
 615                        n_processed_images,
 616                    )
 617
 618                except TimeoutException:
 619                    logging.warning(
 620                        f"Dataloader loop got stuck. Continuing with next batch."
 621                    )
 622                    continue
 623
 624                finally:
 625                    signal.alarm(0)  # Cancel the alarm
 626
 627            # Flawless execution
 628            wandb_exit_code = 0
 629
 630        except Exception as e:
 631            wandb_exit_code = 1
 632            run_successful = False
 633            logging.error(f"Error in Process {os.getpid()}: {e}")
 634        finally:
 635            try:
 636                wandb.finish(exit_code=wandb_exit_code)
 637            except:
 638                pass
 639
 640            # Explicit removal from device
 641            del (
 642                processor,
 643                model,
 644                inputs,
 645                outputs,
 646                result,
 647                dataloader,
 648            )
 649
 650            torch.cuda.empty_cache()
 651            wandb.tensorboard.unpatch()
 652            if writer:
 653                writer.close()
 654            return run_successful
 655
 656    def process_outputs(self, dataset_v51, result, object_classes, detection_threshold):
 657        """Process outputs from object detection models, extracting bounding boxes and labels to save to the dataset."""
 658        try:
 659            inputs = result["inputs"]
 660            outputs = result["outputs"]
 661            target_sizes = result["target_sizes"]
 662            labels = result["labels"]
 663            model_name = result["model_name"]
 664            hf_model_config_name = result["hf_model_config_name"]
 665            batch_classes = result["batch_classes"]
 666            processor = result["processor"]
 667
 668            # Processing output
 669            if hf_model_config_name == "GroundingDinoConfig":
 670                results = processor.post_process_grounded_object_detection(
 671                    outputs,
 672                    inputs.input_ids,
 673                    box_threshold=detection_threshold,
 674                    text_threshold=detection_threshold,
 675                )
 676            elif hf_model_config_name in ["Owlv2Config", "OwlViTConfig"]:
 677                results = processor.post_process_grounded_object_detection(
 678                    outputs=outputs,
 679                    threshold=detection_threshold,
 680                    target_sizes=target_sizes,
 681                    text_labels=batch_classes,
 682                )
 683            elif hf_model_config_name == "OmDetTurboConfig":
 684                results = processor.post_process_grounded_object_detection(
 685                    outputs,
 686                    text_labels=batch_classes,
 687                    threshold=detection_threshold,
 688                    nms_threshold=detection_threshold,
 689                    target_sizes=target_sizes,
 690                )
 691            else:
 692                logging.error(f"Invalid model name: {hf_model_config_name}")
 693
 694            if not len(results) == len(target_sizes) == len(labels):
 695                logging.error(
 696                    f"Lengths of results, target_sizes, and labels do not match: {len(results)}, {len(target_sizes)}, {len(labels)}"
 697                )
 698            for result, size, target in zip(results, target_sizes, labels):
 699                boxes, scores, labels = (
 700                    result["boxes"],
 701                    result["scores"],
 702                    result["text_labels"],
 703                )
 704
 705                img_height = size[0]
 706                img_width = size[1]
 707
 708                detections = []
 709                for box, score, label in zip(boxes, scores, labels):
 710                    processing_successful = True
 711                    if hf_model_config_name == "GroundingDinoConfig":
 712                        # Outputs do not comply with given labels
 713                        # Grounding DINO outputs multiple pairs of object boxes and noun phrases for a given (Image, Text) pair
 714                        # There can be either multiple labels per output ("bike van"), incomplete ones ("motorcyc"), or broken ones ("##cic")
 715                        processed_label = label.split()[
 716                            0
 717                        ]  # Assume first output is the best output
 718                        if processed_label in object_classes:
 719                            label = processed_label
 720                            top_left_x = box[0].item()
 721                            top_left_y = box[1].item()
 722                            box_width = (box[2] - box[0]).item()
 723                            box_height = (box[3] - box[1]).item()
 724                        else:
 725                            matches = get_close_matches(
 726                                processed_label, object_classes, n=1, cutoff=0.6
 727                            )
 728                            selected_label = matches[0] if matches else None
 729                            if selected_label:
 730                                logging.debug(
 731                                    f"Mapped output '{processed_label}' to class '{selected_label}'"
 732                                )
 733                                label = selected_label
 734                                top_left_x = box[0].item()
 735                                top_left_y = box[1].item()
 736                                box_width = (box[2] - box[0]).item()
 737                                box_height = (box[3] - box[1]).item()
 738                            else:
 739                                logging.debug(
 740                                    f"Skipped detection with {hf_model_config_name} due to unclear output: {label}"
 741                                )
 742                                processing_successful = False
 743
 744                    elif hf_model_config_name in [
 745                        "Owlv2Config",
 746                        "OwlViTConfig",
 747                        "OmDetTurboConfig",
 748                    ]:
 749                        top_left_x = box[0].item() / img_width
 750                        top_left_y = box[1].item() / img_height
 751                        box_width = (box[2].item() - box[0].item()) / img_width
 752                        box_height = (box[3].item() - box[1].item()) / img_height
 753
 754                    if (
 755                        processing_successful
 756                    ):  # Skip GroundingDinoConfig labels that could not be processed
 757                        detection = fo.Detection(
 758                            label=label,
 759                            bounding_box=[
 760                                top_left_x,
 761                                top_left_y,
 762                                box_width,
 763                                box_height,
 764                            ],
 765                            confidence=score.item(),
 766                        )
 767                        detection["bbox_area"] = (
 768                            detection["bounding_box"][2] * detection["bounding_box"][3]
 769                        )
 770                        detections.append(detection)
 771
 772                # Attach label to V51 dataset
 773                pred_key = re.sub(
 774                    r"[\W-]+", "_", "pred_zsod_" + model_name
 775                )  # zsod Zero-Shot Object Deection
 776                sample = dataset_v51[target["image_id"]]
 777                sample[pred_key] = fo.Detections(detections=detections)
 778                sample.save()
 779
 780        except Exception as e:
 781            logging.error(f"Error in processing outputs: {e}")
 782            processing_successful = False
 783        finally:
 784            return processing_successful
 785
 786    def eval_and_export(self, dataset_v51, model_name, pred_key, eval_key):
 787        """Populate dataset with evaluation results (if ground_truth available)"""
 788        try:
 789            dataset_v51.evaluate_detections(
 790                pred_key,
 791                gt_field="ground_truth",
 792                eval_key=eval_key,
 793                compute_mAP=True,
 794            )
 795        except Exception as e:
 796            logging.warning(f"Evaluation not possible: {e}")
 797
 798        # Store labels https://docs.voxel51.com/api/fiftyone.core.collections.html#fiftyone.core.collections.SampleCollection.export
 799        model_name_key = re.sub(r"[\W-]+", "_", model_name)
 800        dataset_v51.export(
 801            export_dir=os.path.join(self.detections_root, model_name_key),
 802            dataset_type=fo.types.COCODetectionDataset,
 803            data_path="data.json",
 804            export_media=None,  # "manifest",
 805            label_field=pred_key,
 806            progress=True,
 807        )
 808        return True
 809
 810
 811class UltralyticsObjectDetection:
 812    """Object detection using Ultralytics YOLO models with training and inference support."""
 813
 814    def __init__(self, dataset, config):
 815        """Initialize with dataset, config, and setup paths for model and data."""
 816        self.dataset = dataset
 817        self.config = config
 818        self.ultralytics_data_path = os.path.join(
 819            config["export_dataset_root"], config["v51_dataset_name"]
 820        )
 821
 822        self.hf_hub_model_id = (
 823            f"{HF_ROOT}/"
 824            + f"{config['v51_dataset_name']}_{config['model_name']}".replace("/", "_")
 825        )
 826
 827        self.export_root = "output/models/ultralytics/"
 828        self.export_folder = os.path.join(
 829            self.export_root, self.config["v51_dataset_name"]
 830        )
 831
 832        self.model_path = os.path.join(
 833            self.export_folder, self.config["model_name"], "weights", "best.pt"
 834        )
 835
 836    @staticmethod
 837    def export_data(
 838        dataset, dataset_info, export_dataset_root, label_field="ground_truth"
 839    ):
 840        """Export dataset to YOLO format for Ultralytics training."""
 841        ultralytics_data_path = os.path.join(export_dataset_root, dataset_info["name"])
 842        # Delete export directory if it already exists
 843        if os.path.exists(ultralytics_data_path):
 844            shutil.rmtree(ultralytics_data_path)
 845
 846        logging.info("Exporting data for training with Ultralytics")
 847        classes = dataset.distinct(f"{label_field}.detections.label")
 848
 849        # Make directory
 850        os.makedirs(ultralytics_data_path, exist_ok=False)
 851
 852        for split in ACCEPTED_SPLITS:
 853            split_view = dataset.match_tags(split)
 854
 855            if split == "val" or split == "train":  # YOLO expects train and val
 856                split_view.export(
 857                    export_dir=ultralytics_data_path,
 858                    dataset_type=fo.types.YOLOv5Dataset,
 859                    label_field=label_field,
 860                    classes=classes,
 861                    split=split,
 862                )
 863
 864    def train(self):
 865        """Train the YOLO model for object detection using Ultralytics and optionally upload to Hugging Face."""
 866        model = YOLO(self.config["model_name"], task="detect")
 867        # https://docs.ultralytics.com/modes/train/#train-settings
 868
 869        # Use all available GPUs
 870        device = "0"  # Default to GPU 0
 871        if torch.cuda.device_count() > 1:
 872            device = ",".join(map(str, range(torch.cuda.device_count())))
 873
 874        results = model.train(
 875            data=f"{self.ultralytics_data_path}/dataset.yaml",
 876            epochs=self.config["epochs"],
 877            project=self.export_folder,
 878            name=self.config["model_name"],
 879            patience=self.config["patience"],
 880            batch=self.config["batch_size"],
 881            imgsz=self.config["img_size"],
 882            multi_scale=self.config["multi_scale"],
 883            cos_lr=self.config["cos_lr"],
 884            seed=GLOBAL_SEED,
 885            optimizer="AdamW",  # "auto" as default
 886            pretrained=True,
 887            exist_ok=True,
 888            amp=True,
 889            device=device
 890        )
 891        metrics = model.val()
 892        logging.info(f"Model Performance: {metrics}")
 893
 894        # Upload model to Hugging Face
 895        if HF_DO_UPLOAD:
 896            logging.info(f"Uploading model {self.model_path} to Hugging Face.")
 897            api = HfApi()
 898            api.create_repo(
 899                self.hf_hub_model_id, private=True, repo_type="model", exist_ok=True
 900            )
 901            api.upload_file(
 902                path_or_fileobj=self.model_path,
 903                path_in_repo="best.pt",
 904                repo_id=self.hf_hub_model_id,
 905                repo_type="model",
 906            )
 907
 908    def inference(self, gt_field="ground_truth"):
 909        """Performs inference using YOLO model on a dataset, with options to evaluate results."""
 910        logging.info(f"Running inference on dataset {self.config['v51_dataset_name']}")
 911        inference_settings = self.config["inference_settings"]
 912
 913        dataset_name = None
 914        model_name = self.config["model_name"]
 915
 916        model_hf = inference_settings["model_hf"]
 917        if model_hf is not None:
 918            # Use model manually defined in config.
 919            # This way models can be used for inference which were trained on a different dataset
 920            dataset_name, _ = get_dataset_and_model_from_hf_id(model_hf)
 921
 922            # Set up directories
 923            download_dir = os.path.join(
 924                self.export_root, dataset_name, model_name, "weights"
 925            )
 926            os.makedirs(os.path.join(download_dir), exist_ok=True)
 927
 928            self.model_path = os.path.join(download_dir, "best.pt")
 929
 930            # Create directories if they don't exist
 931
 932            file_path = hf_hub_download(
 933                repo_id=model_hf,
 934                filename="best.pt",
 935                local_dir=download_dir,
 936            )
 937        else:
 938            # Automatically determine model based on dataset
 939            dataset_name = self.config["v51_dataset_name"]
 940
 941            try:
 942                if os.path.exists(self.model_path):
 943                    file_path = self.model_path
 944                    logging.info(f"Loading model {model_name} from disk: {file_path}")
 945                else:
 946                    download_dir = self.model_path.replace("best.pt", "")
 947                    os.makedirs(download_dir, exist_ok=True)
 948                    logging.info(
 949                        f"Downloading model {self.hf_hub_model_id} from Hugging Face to {download_dir}"
 950                    )
 951                    file_path = hf_hub_download(
 952                        repo_id=self.hf_hub_model_id,
 953                        filename="best.pt",
 954                        local_dir=download_dir,
 955                    )
 956            except Exception as e:
 957                logging.error(f"Failed to load or download model: {str(e)}.")
 958                return False
 959
 960        pred_key = f"pred_od_{model_name}-{dataset_name}"
 961        logging.info(f"Using model {self.model_path} for inference.")
 962        model = YOLO(self.model_path)
 963
 964        detection_threshold = inference_settings["detection_threshold"]
 965        if inference_settings["inference_on_test"] is True:
 966            dataset_eval_view = self.dataset.match_tags("test")
 967            if len(dataset_eval_view) == 0:
 968                logging.error("Dataset misses split 'test'")
 969            dataset_eval_view.apply_model(
 970                model, label_field=pred_key, confidence_thresh=detection_threshold
 971            )
 972        else:
 973            self.dataset.apply_model(
 974                model, label_field=pred_key, confidence_thresh=detection_threshold
 975            )
 976
 977        if inference_settings["do_eval"]:
 978            eval_key = f"eval_{self.config['model_name']}_{dataset_name}"
 979
 980            if inference_settings["inference_on_test"] is True:
 981                dataset_view = self.dataset.match_tags(["test"])
 982            else:
 983                dataset_view = self.dataset
 984
 985            dataset_view.evaluate_detections(
 986                pred_key,
 987                gt_field=gt_field,
 988                eval_key=eval_key,
 989                compute_mAP=True,
 990            )
 991
 992
 993def transform_batch_standalone(
 994    batch,
 995    image_processor,
 996    do_convert_annotations=True,
 997    return_pixel_mask=False,
 998):
 999    """Apply format annotations in COCO format for object detection task. Outside of class so it can be pickled."""
1000    images = []
1001    annotations = []
1002
1003    for image_path, annotation in zip(batch["image_path"], batch["objects"]):
1004        image = Image.open(image_path).convert("RGB")
1005        image_np = np.array(image)
1006        images.append(image_np)
1007
1008        coco_annotations = []
1009        for i, bbox in enumerate(annotation["bbox"]):
1010
1011            # Conversion from HF dataset bounding boxes to DETR:
1012            # Input: HF dataset bbox is COCO (top_left_x, top_left_y, width, height) in absolute coordinates
1013            # Output:
1014            # DETR expects COCO (top_left_x, top_left_y, width, height) in absolute coordinates if 'do_convert_annotations == True'
1015            # DETR expects YOLO (center_x, center_y, width, height) in relative coordinates between [0,1] if 'do_convert_annotations == False'
1016
1017            if do_convert_annotations == False:
1018                x, y, w, h = bbox
1019                img_height, img_width = image_np.shape[:2]
1020                center_x = (x + w / 2) / img_width
1021                center_y = (y + h / 2) / img_height
1022                width = w / img_width
1023                height = h / img_height
1024                bbox = [center_x, center_y, width, height]
1025
1026                # Ensure bbox values are within the expected range
1027                assert all(0 <= coord <= 1 for coord in bbox), f"Invalid bbox: {bbox}"
1028
1029                logging.debug(
1030                    f"Converted {[x, y, w, h]} to {[center_x, center_y, width, height]} with 'do_convert_annotations' = {do_convert_annotations}"
1031                )
1032
1033            coco_annotation = {
1034                "image_id": annotation["image_id"],
1035                "bbox": bbox,
1036                "category_id": annotation["category_id"][i],
1037                "area": annotation["area"][i],
1038                "iscrowd": 0,
1039            }
1040            coco_annotations.append(coco_annotation)
1041        detr_annotation = {
1042            "image_id": annotation["image_id"],
1043            "annotations": coco_annotations,
1044        }
1045        annotations.append(detr_annotation)
1046
1047        # Apply the image processor transformations: resizing, rescaling, normalization
1048        result = image_processor(
1049            images=images, annotations=annotations, return_tensors="pt"
1050        )
1051
1052    if not return_pixel_mask:
1053        result.pop("pixel_mask", None)
1054
1055    return result
1056
1057
1058class HuggingFaceObjectDetection:
1059    """Object detection using HuggingFace models with support for training and inference."""
1060
1061    def __init__(
1062        self,
1063        dataset,
1064        config,
1065        output_model_path="./output/models/object_detection_hf",
1066        output_detections_path="./output/detections/",
1067        gt_field="ground_truth",
1068    ):
1069        """Initialize with dataset, config, and optional output paths."""
1070        self.dataset = dataset
1071        self.config = config
1072        self.model_name = config["model_name"]
1073        self.model_name_key = re.sub(r"[\W-]+", "_", self.model_name)
1074        self.dataset_name = config["v51_dataset_name"]
1075        self.do_convert_annotations = True  # HF can convert (top_left_x, top_left_y, bottom_right_x, bottom_right_y) in abs. coordinates to (x_min, y_min, width, height) in rel. coordinates https://github.com/huggingface/transformers/blob/v4.48.2/src/transformers/models/conditional_detr/image_processing_conditional_detr.py#L1497
1076
1077        self.detections_root = os.path.join(
1078            output_detections_path, self.dataset_name, self.model_name_key
1079        )
1080
1081        self.model_root = os.path.join(
1082            output_model_path, self.dataset_name, self.model_name_key
1083        )
1084
1085        self.hf_hub_model_id = (
1086            f"{HF_ROOT}/" + f"{self.dataset_name}_{self.model_name}".replace("/", "_")
1087        )
1088
1089        self.categories = dataset.distinct(f"{gt_field}.detections.label")
1090        self.id2label = {index: x for index, x in enumerate(self.categories, start=0)}
1091        self.label2id = {v: k for k, v in self.id2label.items()}
1092
1093    def collate_fn(self, batch):
1094        """Collate function for batching data during training and inference."""
1095        data = {}
1096        data["pixel_values"] = torch.stack([x["pixel_values"] for x in batch])
1097        data["labels"] = [x["labels"] for x in batch]
1098        if "pixel_mask" in batch[0]:
1099            data["pixel_mask"] = torch.stack([x["pixel_mask"] for x in batch])
1100        return data
1101
1102    def train(self, hf_dataset, overwrite_output=True):
1103        """Train models for object detection tasks with support for custom image sizes and transformations."""
1104        torch.cuda.empty_cache()
1105        img_size_target = self.config.get("image_size", None)
1106        if img_size_target is None:
1107            image_processor = AutoProcessor.from_pretrained(
1108                self.model_name,
1109                do_resize=False,
1110                do_pad=True,
1111                use_fast=True,
1112                do_convert_annotations=self.do_convert_annotations,
1113            )
1114        else:
1115            logging.warning(f"Resizing images to target size {img_size_target}.")
1116            image_processor = AutoProcessor.from_pretrained(
1117                self.model_name,
1118                do_resize=True,
1119                size={
1120                    "max_height": img_size_target[1],
1121                    "max_width": img_size_target[0],
1122                },
1123                do_pad=True,
1124                pad_size={"height": img_size_target[1], "width": img_size_target[0]},
1125                use_fast=True,
1126                do_convert_annotations=self.do_convert_annotations,
1127            )
1128
1129        train_transform_batch = partial(
1130            transform_batch_standalone,
1131            image_processor=image_processor,
1132            do_convert_annotations=self.do_convert_annotations,
1133        )
1134        val_test_transform_batch = partial(
1135            transform_batch_standalone,
1136            image_processor=image_processor,
1137            do_convert_annotations=self.do_convert_annotations,
1138        )
1139
1140        hf_dataset[Split.TRAIN] = hf_dataset[Split.TRAIN].with_transform(
1141            train_transform_batch
1142        )
1143        hf_dataset[Split.VALIDATION] = hf_dataset[Split.VALIDATION].with_transform(
1144            val_test_transform_batch
1145        )
1146        hf_dataset[Split.TEST] = hf_dataset[Split.TEST].with_transform(
1147            val_test_transform_batch
1148        )
1149
1150        hf_model_config = AutoConfig.from_pretrained(self.model_name)
1151        hf_model_config_name = type(hf_model_config).__name__
1152
1153        if type(hf_model_config) in AutoModelForObjectDetection._model_mapping:
1154            model = AutoModelForObjectDetection.from_pretrained(
1155                self.model_name,
1156                id2label=self.id2label,
1157                label2id=self.label2id,
1158                ignore_mismatched_sizes=True,
1159            )
1160        else:
1161            model = None
1162            logging.error(
1163                "Hugging Face AutoModel does not support " + str(type(hf_model_config))
1164            )
1165
1166        if (
1167            overwrite_output == True
1168            and os.path.exists(self.model_root)
1169            and os.listdir(self.model_root)
1170        ):
1171            logging.warning(
1172                f"Training will overwrite existing results in {self.model_root}"
1173            )
1174
1175        training_args = TrainingArguments(
1176            run_name=self.model_name,
1177            output_dir=self.model_root,
1178            overwrite_output_dir=overwrite_output,
1179            num_train_epochs=self.config["epochs"],
1180            fp16=True,
1181            per_device_train_batch_size=self.config["batch_size"],
1182            auto_find_batch_size=True,
1183            dataloader_num_workers=min(self.config["n_worker_dataloader"], NUM_WORKERS),
1184            learning_rate=self.config["learning_rate"],
1185            lr_scheduler_type="cosine",
1186            weight_decay=self.config["weight_decay"],
1187            max_grad_norm=self.config["max_grad_norm"],
1188            metric_for_best_model="eval_loss",
1189            greater_is_better=False,
1190            load_best_model_at_end=True,
1191            eval_strategy="epoch",
1192            save_strategy="best",
1193            save_total_limit=1,
1194            remove_unused_columns=False,
1195            eval_do_concat_batches=False,
1196            save_safetensors=False,  # Does not work with all models
1197            hub_model_id=self.hf_hub_model_id,
1198            hub_private_repo=True,
1199            push_to_hub=HF_DO_UPLOAD,
1200            seed=GLOBAL_SEED,
1201            data_seed=GLOBAL_SEED,
1202        )
1203
1204        early_stopping_callback = EarlyStoppingCallback(
1205            early_stopping_patience=self.config["early_stop_patience"],
1206            early_stopping_threshold=self.config["early_stop_threshold"],
1207        )
1208
1209        trainer = Trainer(
1210            model=model,
1211            args=training_args,
1212            train_dataset=hf_dataset[Split.TRAIN],
1213            eval_dataset=hf_dataset[Split.VALIDATION],
1214            tokenizer=image_processor,
1215            data_collator=self.collate_fn,
1216            callbacks=[early_stopping_callback],
1217            # compute_metrics=eval_compute_metrics_fn,
1218        )
1219
1220        logging.info(f"Starting training of model {self.model_name}.")
1221        trainer.train()
1222        if HF_DO_UPLOAD:
1223            trainer.push_to_hub()
1224
1225        metrics = trainer.evaluate(eval_dataset=hf_dataset[Split.TEST])
1226        logging.info(f"Model training completed. Evaluation results: {metrics}")
1227
1228    def inference(self, inference_settings, load_from_hf=True, gt_field="ground_truth"):
1229        """Performs model inference on a dataset, loading from Hugging Face or disk, and optionally evaluates detection results."""
1230
1231        model_hf = inference_settings["model_hf"]
1232        dataset_name = None
1233        if model_hf is not None:
1234            self.hf_hub_model_id = model_hf
1235            dataset_name, model_name = get_dataset_and_model_from_hf_id(model_hf)
1236        else:
1237            dataset_name = self.dataset_name
1238        torch.cuda.empty_cache()
1239        # Load trained model from Hugging Face
1240        load_from_hf_successful = None
1241        if load_from_hf:
1242            try:
1243                logging.info(f"Loading model from Hugging Face: {self.hf_hub_model_id}")
1244                image_processor = AutoProcessor.from_pretrained(self.hf_hub_model_id)
1245                model = AutoModelForObjectDetection.from_pretrained(
1246                    self.hf_hub_model_id
1247                )
1248                load_from_hf_successful = True
1249            except Exception as e:
1250                load_from_hf_successful = False
1251                logging.warning(
1252                    f"Model {self.model_name} could not be loaded from Hugging Face {self.hf_hub_model_id}. Attempting loading from disk."
1253                )
1254        if load_from_hf == False or load_from_hf_successful == False:
1255            try:
1256                # Select folder in self.model_root that include 'checkpoint-'
1257                checkpoint_dirs = [
1258                    d
1259                    for d in os.listdir(self.model_root)
1260                    if "checkpoint-" in d
1261                    and os.path.isdir(os.path.join(self.model_root, d))
1262                ]
1263
1264                if not checkpoint_dirs:
1265                    logging.error(
1266                        f"No checkpoint directory found in {self.model_root}!"
1267                    )
1268                    model_path = None
1269                else:
1270                    # Sort by modification time (latest first)
1271                    checkpoint_dirs.sort(
1272                        key=lambda d: os.path.getmtime(
1273                            os.path.join(self.model_root, d)
1274                        ),
1275                        reverse=True,
1276                    )
1277
1278                    if len(checkpoint_dirs) > 1:
1279                        logging.warning(
1280                            f"Multiple checkpoint directories found: {checkpoint_dirs}. Selecting the latest one: {checkpoint_dirs[0]}."
1281                        )
1282
1283                    selected_checkpoint = checkpoint_dirs[0]
1284                    logging.info(
1285                        f"Loading model from disk: {self.model_root}/{selected_checkpoint}"
1286                    )
1287                    model_path = os.path.join(self.model_root, selected_checkpoint)
1288
1289                image_processor = AutoProcessor.from_pretrained(model_path)
1290                model = AutoModelForObjectDetection.from_pretrained(model_path)
1291            except Exception as e:
1292                logging.error(
1293                    f"Model {self.model_name} could not be loaded from folder {self.model_root}/{selected_checkpoint}. Inference not possible."
1294                )
1295
1296        device, _, _ = get_backend()
1297        logging.info(f"Using device {device} for inference.")
1298        model = model.to(device)
1299        model.eval()
1300
1301        pred_key = f"pred_od_{self.model_name_key}-{dataset_name}"
1302
1303        if inference_settings["inference_on_test"] is True:
1304            INFERENCE_SPLITS = ["test"]
1305            dataset_eval_view = self.dataset.match_tags(INFERENCE_SPLITS)
1306        else:
1307            dataset_eval_view = self.dataset
1308
1309        detection_threshold = inference_settings["detection_threshold"]
1310
1311        with torch.amp.autocast("cuda"), torch.inference_mode():
1312            for sample in dataset_eval_view.iter_samples(progress=True, autosave=True):
1313                image_width = sample.metadata.width
1314                image_height = sample.metadata.height
1315                img_filepath = sample.filepath
1316
1317                image = Image.open(img_filepath)
1318                inputs = image_processor(images=[image], return_tensors="pt")
1319                outputs = model(**inputs.to(device))
1320                target_sizes = torch.tensor([[image.size[1], image.size[0]]])
1321
1322                results = image_processor.post_process_object_detection(
1323                    outputs, threshold=detection_threshold, target_sizes=target_sizes
1324                )[0]
1325
1326                detections = []
1327                for score, label, box in zip(
1328                    results["scores"], results["labels"], results["boxes"]
1329                ):
1330                    # Bbox is in absolute coordinates x, y, x2, y2
1331                    box = box.tolist()
1332                    text_label = model.config.id2label[label.item()]
1333
1334                    # Voxel51 requires relative coordinates between 0 and 1
1335                    top_left_x = box[0] / image_width
1336                    top_left_y = box[1] / image_height
1337                    box_width = (box[2] - box[0]) / image_width
1338                    box_height = (box[3] - box[1]) / image_height
1339                    detection = fo.Detection(
1340                        label=text_label,
1341                        bounding_box=[
1342                            top_left_x,
1343                            top_left_y,
1344                            box_width,
1345                            box_height,
1346                        ],
1347                        confidence=score.item(),
1348                    )
1349                    detections.append(detection)
1350
1351                sample[pred_key] = fo.Detections(detections=detections)
1352
1353        if inference_settings["do_eval"] is True:
1354            eval_key = re.sub(
1355                r"[\W-]+", "_", "eval_" + self.model_name + "_" + self.dataset_name
1356            )
1357
1358            if inference_settings["inference_on_test"] is True:
1359                dataset_view = self.dataset.match_tags(["test"])
1360            else:
1361                dataset_view = self.dataset
1362
1363            dataset_view.evaluate_detections(
1364                pred_key,
1365                gt_field=gt_field,
1366                eval_key=eval_key,
1367                compute_mAP=True,
1368            )
1369
1370
1371class CustomCoDETRObjectDetection:
1372    """Interface for running Co-DETR object detection model training and inference in containers"""
1373
1374    def __init__(self, dataset, dataset_info, run_config):
1375        """Initialize Co-DETR interface with dataset and configuration"""
1376        self.root_codetr = "./custom_models/CoDETR/Co-DETR"
1377        self.root_codetr_models = "output/models/codetr"
1378        self.dataset = dataset
1379        self.dataset_name = dataset_info["name"]
1380        self.export_dir_root = run_config["export_dataset_root"]
1381        self.config_key = os.path.splitext(os.path.basename(run_config["config"]))[0]
1382        self.hf_repo_name = f"{HF_ROOT}/{self.dataset_name}_{self.config_key}"
1383
1384    def convert_data(self):
1385        """Convert dataset to COCO format required by Co-DETR"""
1386
1387        export_dir = os.path.join(self.export_dir_root, self.dataset_name, "coco")
1388
1389        # Check if folder already exists
1390        if not os.path.exists(export_dir):
1391            # Make directory
1392            os.makedirs(export_dir, exist_ok=True)
1393            logging.info(f"Exporting data to {export_dir}")
1394            splits = [
1395                "train",
1396                "val",
1397                "test",
1398            ]  # CoDETR expects data in 'train' and 'val' folder
1399            for split in splits:
1400                split_view = self.dataset.match_tags(split)
1401                split_view.export(
1402                    dataset_type=fo.types.COCODetectionDataset,
1403                    data_path=os.path.join(export_dir, f"{split}2017"),
1404                    labels_path=os.path.join(
1405                        export_dir, "annotations", f"instances_{split}2017.json"
1406                    ),
1407                    label_field="ground_truth",
1408                )
1409        else:
1410            logging.warning(
1411                f"Folder {export_dir} already exists, skipping data export."
1412            )
1413
1414    def update_config_file(self, dataset_name, config_file, max_epochs):
1415        """Update Co-DETR config file with dataset-specific parameters"""
1416
1417        config_path = os.path.join(self.root_codetr, config_file)
1418
1419        # Get classes from exported data
1420        annotations_json = os.path.join(
1421            self.export_dir_root,
1422            dataset_name,
1423            "coco/annotations/instances_train2017.json",
1424        )
1425        # Read the JSON file
1426        with open(annotations_json, "r") as file:
1427            data = json.load(file)
1428
1429        # Extract the value associated with the key "categories"
1430        categories = data.get("categories")
1431        class_names = tuple(category["name"] for category in categories)
1432        num_classes = len(class_names)
1433
1434        # Update configuration file
1435        # This assumes that 'classes = '('a','b',...)' are already defined and will be overwritten.
1436        with open(config_path, "r") as file:
1437            content = file.read()
1438
1439        # Update the classes tuple
1440        content = re.sub(r"classes\s*=\s*\(.*?\)", f"classes = {class_names}", content)
1441
1442        # Update all instances of num_classes
1443        content = re.sub(r"num_classes=\d+", f"num_classes={num_classes}", content)
1444
1445        # Update all instances of max_epochs
1446        content = re.sub(r"max_epochs=\d+", f"max_epochs={max_epochs}", content)
1447
1448        with open(config_path, "w") as file:
1449            file.write(content)
1450
1451        logging.warning(
1452            f"Updated {config_path} with classes={class_names} and num_classes={num_classes} and max_epochs={max_epochs}"
1453        )
1454
1455    def train(self, param_config, param_n_gpus, container_tool, param_function="train"):
1456        """Train Co-DETR model using containerized environment"""
1457
1458        # Check if model already exists
1459        output_folder_codetr = os.path.join(self.root_codetr, "output")
1460        os.makedirs(output_folder_codetr, exist_ok=True)
1461        param_config_name = os.path.splitext(os.path.basename(param_config))[0]
1462        best_models_dir = os.path.join(output_folder_codetr, "best")
1463        os.makedirs(best_models_dir, exist_ok=True)
1464        # Best model files follow the naming scheme "config_dataset.pth"
1465        pth_model_files = (
1466            [f for f in os.listdir(best_models_dir) if f.endswith(".pth")]
1467            if os.path.exists(best_models_dir) and os.path.isdir(best_models_dir)
1468            else []
1469        )
1470
1471        # Best model files are stored in the format "config_dataset.pth"
1472        matching_files = [
1473            f
1474            for f in pth_model_files
1475            if f.startswith(param_config_name)
1476            and self.dataset_name in f
1477            and f.endswith(".pth")
1478        ]
1479        if len(matching_files) > 0:
1480            logging.warning(
1481                f"Model {param_config_name} already trained on dataset {self.dataset_name}. Skipping training."
1482            )
1483            if len(matching_files) > 1:
1484                logging.warning(f"Multiple weights found: {matching_files}")
1485        else:
1486            logging.info(
1487                f"Launching training for Co-DETR config {param_config} and dataset {self.dataset_name}."
1488            )
1489            volume_data = os.path.join(self.export_dir_root, self.dataset_name)
1490
1491            # Train model, store checkpoints in 'output_folder_codetr'
1492            train_result = self._run_container(
1493                volume_data=volume_data,
1494                param_function=param_function,
1495                param_config=param_config,
1496                param_n_gpus=param_n_gpus,
1497                container_tool=container_tool,
1498            )
1499
1500            # Find the best_bbox checkpoint file
1501            checkpoint_files = [
1502                f
1503                for f in os.listdir(output_folder_codetr)
1504                if "best_bbox" in f and f.endswith(".pth")
1505            ]
1506            if not checkpoint_files:
1507                logging.error(
1508                    "Co-DETR was not trained, model pth file missing. No checkpoint file with 'best_bbox' found."
1509                )
1510            else:
1511                if len(checkpoint_files) > 1:
1512                    logging.warning(
1513                        f"Found {len(checkpoint_files)} checkpoint files. Selecting {checkpoint_files[0]}."
1514                    )
1515                checkpoint = checkpoint_files[0]
1516                checkpoint_path = os.path.join(output_folder_codetr, checkpoint)
1517                logging.info("Co-DETR was trained successfully.")
1518
1519                # Upload best model to Hugging Face
1520                if HF_DO_UPLOAD == True:
1521                    logging.info("Uploading Co-DETR model to Hugging Face.")
1522                    api = HfApi()
1523                    api.create_repo(
1524                        self.hf_repo_name,
1525                        private=True,
1526                        repo_type="model",
1527                        exist_ok=True,
1528                    )
1529                    api.upload_file(
1530                        path_or_fileobj=checkpoint_path,
1531                        path_in_repo="model.pth",
1532                        repo_id=self.hf_repo_name,
1533                        repo_type="model",
1534                    )
1535
1536                # Move best model file and clear output folder
1537                self._run_container(
1538                    volume_data=volume_data,
1539                    param_function="clear-output",
1540                    param_config=param_config,
1541                    param_dataset_name=self.dataset_name,
1542                    container_tool=container_tool,
1543                )
1544
1545    @staticmethod
1546    def _find_file_iteratively(start_path, filename):
1547        """Direct access or recursively search for a file in a directory structure."""
1548        # Convert start_path to a Path object
1549        start_path = Path(start_path)
1550
1551        # Check if the file exists in the start_path directly (very fast)
1552        file_path = start_path / filename
1553        if file_path.exists():
1554            return str(file_path)
1555
1556        # Start with the highest directory and go up iteratively
1557        current_dir = start_path
1558        checked_dirs = set()
1559
1560        while current_dir != current_dir.root:
1561            # Check if the file is in the current directory
1562            file_path = current_dir / filename
1563            if file_path.exists():
1564                return str(file_path)
1565
1566            # If we haven't checked the sibling directories, check them as well
1567            parent_dir = current_dir.parent
1568            if parent_dir not in checked_dirs:
1569                # Check sibling directories
1570                for sibling in parent_dir.iterdir():
1571                    if sibling != current_dir and sibling.is_dir():
1572                        sibling_file_path = sibling / filename
1573                        if sibling_file_path.exists():
1574                            return str(sibling_file_path)
1575                checked_dirs.add(parent_dir)
1576
1577            # Otherwise, go one level up
1578            current_dir = current_dir.parent
1579
1580        # If file is not found after traversing all levels, return None
1581        logging.error(f"File {filename} could not be found.")
1582        return None
1583
1584    def run_inference(
1585        self,
1586        dataset,
1587        param_config,
1588        param_n_gpus,
1589        container_tool,
1590        inference_settings,
1591        param_function="inference",
1592        inference_output_folder="custom_models/CoDETR/Co-DETR/output/inference/",
1593        gt_field="ground_truth",
1594    ):
1595        """Run inference using trained Co-DETR model and convert results to FiftyOne format"""
1596
1597        logging.info(f"Launching inference for Co-DETR config {param_config}.")
1598        volume_data = os.path.join(self.export_dir_root, self.dataset_name)
1599
1600        if inference_settings["inference_on_test"] is True:
1601            folder_inference = os.path.join("coco", "test2017")
1602        else:
1603            folder_inference = os.path.join("coco")
1604
1605        # Get model from Hugging Face
1606        dataset_name = None
1607        config_key = None
1608        try:
1609            if inference_settings["model_hf"] is None:
1610                hf_path = self.hf_repo_name
1611            else:
1612                hf_path = inference_settings["model_hf"]
1613
1614            dataset_name, config_key = get_dataset_and_model_from_hf_id(hf_path)
1615
1616            download_folder = os.path.join(
1617                self.root_codetr_models, dataset_name, config_key
1618            )
1619
1620            logging.info(
1621                f"Downloading model {hf_path} from Hugging Face into {download_folder}"
1622            )
1623            os.makedirs(download_folder, exist_ok=True)
1624
1625            file_path = hf_hub_download(
1626                repo_id=hf_path,
1627                filename="model.pth",
1628                local_dir=download_folder,
1629            )
1630        except Exception as e:
1631            logging.error(f"An error occured during model download: {e}")
1632
1633        model_path = os.path.join(dataset_name, config_key, "model.pth")
1634        logging.info(f"Starting inference for model {model_path}")
1635
1636        inference_result = self._run_container(
1637            volume_data=volume_data,
1638            param_function=param_function,
1639            param_config=param_config,
1640            param_n_gpus=param_n_gpus,
1641            container_tool=container_tool,
1642            param_inference_dataset_folder=folder_inference,
1643            param_inference_model_checkpoint=model_path,
1644        )
1645
1646        # Convert results from JSON output into V51 dataset
1647        # Files follow format inference_results_{timestamp}.json (run_inference.py)
1648        os.makedirs(inference_output_folder, exist_ok=True)
1649        output_files = [
1650            f
1651            for f in os.listdir(inference_output_folder)
1652            if f.startswith("inference_results_") and f.endswith(".json")
1653        ]
1654        logging.debug(f"Found files with inference content: {output_files}")
1655
1656        if not output_files:
1657            logging.error(
1658                f"No inference result files found in {inference_output_folder}"
1659            )
1660
1661        # Get full path for each file
1662        file_paths = [os.path.join(inference_output_folder, f) for f in output_files]
1663
1664        # Extract timestamp from the filename and sort based on the timestamp
1665        file_paths_sorted = sorted(
1666            file_paths,
1667            key=lambda f: datetime.datetime.strptime(
1668                f.split("_")[-2] + "_" + f.split("_")[-1].replace(".json", ""),
1669                "%Y%m%d_%H%M%S",
1670            ),
1671            reverse=True,
1672        )
1673
1674        # Use the most recent file based on timestamp
1675        latest_file = file_paths_sorted[0]
1676        logging.info(f"Using inference results from: {latest_file}")
1677        with open(latest_file, "r") as file:
1678            data = json.load(file)
1679
1680        # Get conversion for annotated classes
1681        annotations_path = os.path.join(
1682            volume_data, "coco", "annotations", "instances_train2017.json"
1683        )
1684
1685        with open(annotations_path, "r") as file:
1686            data_annotations = json.load(file)
1687
1688        class_ids_and_names = [
1689            (category["id"], category["name"])
1690            for category in data_annotations["categories"]
1691        ]
1692
1693        # Match sample filepaths (from exported Co-DETR COCO format) to V51 filepaths
1694        sample = dataset.first()
1695        root_dir_samples = sample.filepath
1696
1697        # Convert results into V51 file format
1698        detection_threshold = inference_settings["detection_threshold"]
1699        pred_key = f"pred_od_{config_key}-{dataset_name}"
1700        for key, value in tqdm(data.items(), desc="Processing Co-DETR detection"):
1701            try:
1702                # Get filename
1703                filepath = CustomCoDETRObjectDetection._find_file_iteratively(
1704                    root_dir_samples, os.path.basename(key)
1705                )
1706                sample = dataset[filepath]
1707
1708                img_width = sample.metadata.width
1709                img_height = sample.metadata.height
1710
1711                detections_v51 = []
1712                for class_id, class_detections in enumerate(data[key]):  # Starts with 0
1713                    if len(class_detections) > 0:
1714                        objects_class = class_ids_and_names[class_id]
1715                        for detection in class_detections:
1716                            confidence = detection[4]
1717                            detection_v51 = fo.Detection(
1718                                label=objects_class[1],
1719                                bounding_box=[
1720                                    detection[0] / img_width,
1721                                    detection[1] / img_height,
1722                                    (detection[2] - detection[0]) / img_width,
1723                                    (detection[3] - detection[1]) / img_height,
1724                                ],
1725                                confidence=confidence,
1726                            )
1727                            if confidence >= detection_threshold:
1728                                detections_v51.append(detection_v51)
1729
1730                sample[pred_key] = fo.Detections(detections=detections_v51)
1731                sample.save()
1732            except Exception as e:
1733                logging.error(
1734                    f"An error occured during the conversion of Co-DETR inference results to the V51 dataset: {e}"
1735                )
1736
1737        # Run V51 evaluation
1738        if inference_settings["do_eval"] is True:
1739            eval_key = pred_key.replace("pred_", "eval_").replace("-", "_")
1740
1741            if inference_settings["inference_on_test"] is True:
1742                dataset_view = dataset.match_tags(["test"])
1743            else:
1744                dataset_view = dataset
1745
1746            logging.info(
1747                f"Starting evaluation for {pred_key} in evaluation key {eval_key}."
1748            )
1749            dataset_view.evaluate_detections(
1750                pred_key,
1751                gt_field=gt_field,
1752                eval_key=eval_key,
1753                compute_mAP=True,
1754            )
1755
1756    def _run_container(
1757        self,
1758        volume_data,
1759        param_function,
1760        param_config="",
1761        param_n_gpus="1",
1762        param_dataset_name="",
1763        param_inference_dataset_folder="",
1764        param_inference_model_checkpoint="",
1765        image="dbogdollresearch/codetr",
1766        workdir="/launch",
1767        container_tool="docker",
1768    ):
1769        """Execute Co-DETR container with specified parameters using Docker or Singularity"""
1770
1771        try:
1772            # Convert relative paths to absolute paths (necessary under WSL2)
1773            root_codetr_abs = os.path.abspath(self.root_codetr)
1774            volume_data_abs = os.path.abspath(volume_data)
1775            root_codetr_models_abs = os.path.abspath(self.root_codetr_models)
1776
1777            # Check if using Docker or Singularity and define the appropriate command
1778            if container_tool == "docker":
1779                command = [
1780                    "docker",
1781                    "run",
1782                    "--gpus",
1783                    "all",
1784                    "--workdir",
1785                    workdir,
1786                    "--volume",
1787                    f"{root_codetr_abs}:{workdir}",
1788                    "--volume",
1789                    f"{volume_data_abs}:{workdir}/data:ro",
1790                    "--volume",
1791                    f"{root_codetr_models_abs}:{workdir}/hf_models:ro",
1792                    "--shm-size=8g",
1793                    image,
1794                    param_function,
1795                    param_config,
1796                    param_n_gpus,
1797                    param_dataset_name,
1798                    param_inference_dataset_folder,
1799                    param_inference_model_checkpoint,
1800                ]
1801            elif container_tool == "singularity":
1802                command = [
1803                    "singularity",
1804                    "run",
1805                    "--nv",
1806                    "--pwd",
1807                    workdir,
1808                    "--bind",
1809                    f"{self.root_codetr}:{workdir}",
1810                    "--bind",
1811                    f"{volume_data}:{workdir}/data:ro",
1812                    "--bind",
1813                    f"{self.root_codetr_models}:{workdir}/hf_models:ro",
1814                    f"docker://{image}",
1815                    param_function,
1816                    param_config,
1817                    param_n_gpus,
1818                    param_dataset_name,
1819                    param_inference_dataset_folder,
1820                    param_inference_model_checkpoint,
1821                ]
1822            else:
1823                raise ValueError(
1824                    f"Invalid container tool specified: {container_tool}. Choose 'docker' or 'singularity'."
1825                )
1826
1827            # Start the process and stream outputs to the console
1828            logging.info(f"Launching terminal command {command}")
1829            with subprocess.Popen(
1830                command, stdout=sys.stdout, stderr=sys.stderr, text=True
1831            ) as proc:
1832                proc.wait()  # Wait for the process to complete
1833            return True
1834        except Exception as e:
1835            logging.error(f"Error during Co-DETR container run: {e}")
1836            return False
def get_dataset_and_model_from_hf_id(hf_id: str):
60def get_dataset_and_model_from_hf_id(hf_id: str):
61    """Extract dataset and model name from HuggingFace ID by matching against supported datasets."""
62    # HF ID follows structure organization/dataset_model
63    # Both dataset and model can contain "_" as well
64
65    # Remove organization (everything before the first "/")
66    hf_id = hf_id.split("/", 1)[-1]
67
68    # Find all dataset names that appear in hf_id
69    supported_datasets = get_supported_datasets()
70    matches = [
71        dataset_name for dataset_name in supported_datasets if dataset_name in hf_id
72    ]
73
74    if not matches:
75        logging.warning(
76            f"Dataset name could not be extracted from Hugging Face ID {hf_id}"
77        )
78        dataset_name = "no_dataset_name"
79    else:
80        # Return the longest match (most specific)
81        dataset_name = max(matches, key=len)
82
83    # Get model name by removing dataset name from hf_id
84    model_name = hf_id.replace(dataset_name, "").strip("_")
85    if not model_name:
86        logging.warning(
87            f"Model name could not be extracted from Hugging Face ID {hf_id}"
88        )
89        model_name = "no_model_name"
90
91    return dataset_name, model_name

Extract dataset and model name from HuggingFace ID by matching against supported datasets.

class TimeoutException(builtins.Exception):
95class TimeoutException(Exception):
96    """Custom exception for handling dataloader timeouts."""
97
98    pass

Custom exception for handling dataloader timeouts.

def timeout_handler(signum, frame):
101def timeout_handler(signum, frame):
102    raise TimeoutException("Dataloader creation timed out")
class ZeroShotInferenceCollateFn:
105class ZeroShotInferenceCollateFn:
106    """Collate function for zero-shot inference that prepares batches for model input."""
107
108    def __init__(
109        self,
110        hf_model_config_name,
111        hf_processor,
112        batch_size,
113        object_classes,
114        batch_classes,
115    ):
116        """Initialize the auto labeling model with the Hugging Face model config, processor, batch size, object classes, and batch classes."""
117        try:
118            self.hf_model_config_name = hf_model_config_name
119            self.processor = hf_processor
120            self.batch_size = batch_size
121            self.object_classes = object_classes
122            self.batch_classes = batch_classes
123        except Exception as e:
124            logging.error(f"Error in collate init of DataLoader: {e}")
125
126    def __call__(self, batch):
127        """Processes a batch of data by preparing images and labels for model input."""
128        try:
129            images, labels = zip(*batch)
130            target_sizes = [tuple(img.shape[1:]) for img in images]
131
132            # Adjustments for final batch
133            n_images = len(images)
134            if n_images < self.batch_size:
135                self.batch_classes = [self.object_classes] * n_images
136
137            # Apply PIL transformation for specific models
138            if self.hf_model_config_name == "OmDetTurboConfig":
139                images = [to_pil_image(image) for image in images]
140
141            inputs = self.processor(
142                text=self.batch_classes,
143                images=images,
144                return_tensors="pt",
145                padding=True,  # Allow for differently sized images
146            )
147
148            return inputs, labels, target_sizes, self.batch_classes
149        except Exception as e:
150            logging.error(f"Error in collate function of DataLoader: {e}")

Collate function for zero-shot inference that prepares batches for model input.

ZeroShotInferenceCollateFn( hf_model_config_name, hf_processor, batch_size, object_classes, batch_classes)
108    def __init__(
109        self,
110        hf_model_config_name,
111        hf_processor,
112        batch_size,
113        object_classes,
114        batch_classes,
115    ):
116        """Initialize the auto labeling model with the Hugging Face model config, processor, batch size, object classes, and batch classes."""
117        try:
118            self.hf_model_config_name = hf_model_config_name
119            self.processor = hf_processor
120            self.batch_size = batch_size
121            self.object_classes = object_classes
122            self.batch_classes = batch_classes
123        except Exception as e:
124            logging.error(f"Error in collate init of DataLoader: {e}")

Initialize the auto labeling model with the Hugging Face model config, processor, batch size, object classes, and batch classes.

class ZeroShotObjectDetection:
153class ZeroShotObjectDetection:
154    """Zero-shot object detection using various HuggingFace models with multi-GPU support."""
155
156    def __init__(
157        self,
158        dataset_torch: torch.utils.data.Dataset,
159        dataset_info,
160        config,
161        detections_path="./output/detections/",
162        log_root="./logs/",
163    ):
164        """Initialize the zero-shot object detection labeler with dataset, configuration, and path settings."""
165        self.dataset_torch = dataset_torch
166        self.dataset_info = dataset_info
167        self.dataset_name = dataset_info["name"]
168        self.object_classes = config["object_classes"]
169        self.detection_threshold = config["detection_threshold"]
170        self.detections_root = os.path.join(detections_path, self.dataset_name)
171        self.tensorboard_root = os.path.join(
172            log_root, "tensorboard/zeroshot_object_detection"
173        )
174
175        logging.info(f"Zero-shot models will look for {self.object_classes}")
176
177    def exclude_stored_predictions(
178        self, dataset_v51: fo.Dataset, config, do_exclude=False
179    ):
180        """Checks for existing predictions and loads them from disk if available."""
181        dataset_schema = dataset_v51.get_field_schema()
182        models_splits_dict = {}
183        for model_name, value in config["hf_models_zeroshot_objectdetection"].items():
184            model_name_key = re.sub(r"[\W-]+", "_", model_name)
185            pred_key = re.sub(
186                r"[\W-]+", "_", "pred_zsod_" + model_name
187            )  # od for Object Detection
188            # Check if data already stored in V51 dataset
189            if pred_key in dataset_schema and do_exclude is True:
190                logging.warning(
191                    f"Skipping model {model_name}. Predictions already stored in Voxel51 dataset."
192                )
193            # Check if data already stored on disk
194            elif (
195                os.path.isdir(os.path.join(self.detections_root, model_name_key))
196                and do_exclude is True
197            ):
198                try:
199                    logging.info(f"Loading {model_name} predictions from disk.")
200                    temp_dataset = fo.Dataset.from_dir(
201                        dataset_dir=os.path.join(self.detections_root, model_name_key),
202                        dataset_type=fo.types.COCODetectionDataset,
203                        name="temp_dataset",
204                        data_path="data.json",
205                    )
206
207                    # Copy all detections from stored dataset into our dataset
208                    detections = temp_dataset.values("detections.detections")
209                    add_sample_field(
210                        dataset_v51,
211                        pred_key,
212                        fo.EmbeddedDocumentField,
213                        embedded_doc_type=fo.Detections,
214                    )
215                    dataset_v51.set_values(f"{pred_key}.detections", detections)
216                except Exception as e:
217                    logging.error(
218                        f"Data in {os.path.join(self.detections_root, model_name_key)} could not be loaded. Error: {e}"
219                    )
220                finally:
221                    fo.delete_dataset("temp_dataset")
222            # Assign model to be run
223            else:
224                models_splits_dict[model_name] = value
225
226        logging.info(f"Models to be run: {models_splits_dict}")
227        return models_splits_dict
228
229    # Worker functions
230    def update_queue_sizes_worker(
231        self, queues, queue_sizes, largest_queue_index, max_queue_size
232    ):
233        """Monitor and manage multiple result queues for balanced processing."""
234        experiment_name = f"queue_size_monitor_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}"
235        log_directory = os.path.join(
236            self.tensorboard_root, self.dataset_name, experiment_name
237        )
238        wandb.tensorboard.patch(root_logdir=log_directory)
239        if WANDB_ACTIVE:
240            wandb.init(
241                name=f"queue_size_monitor_{os.getpid()}",
242                job_type="inference",
243                project="Zero Shot Object Detection",
244            )
245        writer = SummaryWriter(log_dir=log_directory)
246
247        step = 0
248
249        while True:
250            for i, queue in enumerate(queues):
251                queue_sizes[i] = queue.qsize()
252                writer.add_scalar(f"queue_size/items/{i}", queue_sizes[i], step)
253
254            step += 1
255
256            # Find the index of the largest queue
257            max_size = max(queue_sizes)
258            max_index = queue_sizes.index(max_size)
259
260            # Calculate the total size of all queues
261            total_size = sum(queue_sizes)
262
263            # If total_size is greater than 0, calculate the probabilities
264            if total_size > 0:
265                # Normalize the queue sizes by the max_queue_size
266                normalized_sizes = [size / max_queue_size for size in queue_sizes]
267
268                # Calculate probabilities based on normalized sizes
269                probabilities = [
270                    size / sum(normalized_sizes) for size in normalized_sizes
271                ]
272
273                # Use random.choices with weights (probabilities)
274                chosen_queue_index = random.choices(
275                    range(len(queues)), weights=probabilities, k=1
276                )[0]
277
278                largest_queue_index.value = chosen_queue_index
279            else:
280                largest_queue_index.value = max_index
281
282            time.sleep(0.1)
283
284    def process_outputs_worker(
285        self,
286        result_queues,
287        largest_queue_index,
288        inference_finished,
289        max_queue_size,
290        wandb_activate=False,
291    ):
292        """Process model outputs from result queues and save to dataset."""
293        configure_logging()
294        logging.info(f"Process ID: {os.getpid()}. Results processing process started")
295        dataset_v51 = fo.load_dataset(self.dataset_name)
296        processing_successful = None
297
298        # Logging
299        experiment_name = f"post_process_{os.getpid()}_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}"
300        log_directory = os.path.join(
301            self.tensorboard_root, self.dataset_name, experiment_name
302        )
303        wandb.tensorboard.patch(root_logdir=log_directory)
304        if WANDB_ACTIVE and wandb_activate:
305            wandb.init(
306                name=f"post_process_{os.getpid()}",
307                job_type="inference",
308                project="Zero Shot Object Detection",
309            )
310        writer = SummaryWriter(log_dir=log_directory)
311        n_processed_images = 0
312
313        logging.info(f"Post-Processor {os.getpid()} starting loop.")
314
315        while True:
316            results_queue = result_queues[largest_queue_index.value]
317            writer.add_scalar(
318                f"post_processing/selected_queue",
319                largest_queue_index.value,
320                n_processed_images,
321            )
322
323            if results_queue.qsize() == max_queue_size:
324                logging.warning(
325                    f"Queue full: {results_queue.qsize()}. Consider increasing number of post-processing workers."
326                )
327
328            # Exit only when inference is finished and the queue is empty
329            if inference_finished.value and results_queue.empty():
330                dataset_v51.save()
331                logging.info(
332                    f"Post-processing worker {os.getpid()} has finished all outputs."
333                )
334                break
335
336            # Process results from the queue if available
337            if not results_queue.empty():
338                try:
339                    time_start = time.time()
340
341                    result = results_queue.get_nowait()
342
343                    processing_successful = self.process_outputs(
344                        dataset_v51,
345                        result,
346                        self.object_classes,
347                        self.detection_threshold,
348                    )
349
350                    # Performance logging
351                    n_images = len(result["labels"])
352                    time_end = time.time()
353                    duration = time_end - time_start
354                    batches_per_second = 1 / duration
355                    frames_per_second = batches_per_second * n_images
356                    n_processed_images += n_images
357                    writer.add_scalar(
358                        f"post_processing/frames_per_second",
359                        frames_per_second,
360                        n_processed_images,
361                    )
362
363                    del result  # Explicit removal from device
364
365                except Exception as e:
366                    continue
367
368            else:
369                continue
370
371        writer.close()
372        wandb.finish(exit_code=0)
373        return processing_successful  # Return last processing status
374
375    def gpu_worker(
376        self,
377        gpu_id,
378        cpu_cores,
379        task_queue,
380        results_queue,
381        done_event,
382        post_processing_finished,
383        set_cpu_affinity=False,
384    ):
385        """Run model inference on specified GPU with dedicated CPU cores."""
386        dataset_v51 = fo.load_dataset(
387            self.dataset_name
388        )  # NOTE Only for the case of sequential processing
389        configure_logging()
390        # Set CPU
391        if set_cpu_affinity:
392            # Allow only certain CPU cores
393            psutil.Process().cpu_affinity(cpu_cores)
394        logging.info(f"Available CPU cores: {psutil.Process().cpu_affinity()}")
395        max_n_cpus = len(cpu_cores)
396        torch.set_num_threads(max_n_cpus)
397
398        # Set GPU
399        logging.info(f"GPU {gpu_id}: {torch.cuda.get_device_name(gpu_id)}")
400        device = torch.device(f"cuda:{gpu_id}")
401
402        run_successful = None
403        with torch.cuda.device(gpu_id):
404            while True:
405                if post_processing_finished.value and task_queue.empty():
406                    # Keep alive until post-processing is done
407                    break
408
409                if task_queue.empty():
410                    done_event.set()
411
412                if not task_queue.empty():
413                    try:
414                        task_metadata = task_queue.get(
415                            timeout=5
416                        )  # Timeout to prevent indefinite blocking
417                    except Exception as e:
418                        break  # Exit if no more tasks
419                    run_successful = self.model_inference(
420                        task_metadata,
421                        device,
422                        self.dataset_torch,
423                        dataset_v51,
424                        self.object_classes,
425                        results_queue,
426                        self.tensorboard_root,
427                    )
428                    logging.info(
429                        f"Worker for GPU {gpu_id} finished run successful: {run_successful}"
430                    )
431                else:
432                    continue
433        return run_successful  # Return last processing status
434
435    def eval_and_export_worker(self, models_ready_queue, n_models):
436        """Evaluate model performance and export results for completed models."""
437        configure_logging()
438        logging.info(f"Process ID: {os.getpid()}. Eval-and-export process started")
439
440        dataset = fo.load_dataset(self.dataset_name)
441        run_successful = None
442        models_done = 0
443
444        while True:
445            if not models_ready_queue.empty():
446                try:
447                    dict = models_ready_queue.get(
448                        timeout=5
449                    )  # Timeout to prevent indefinite blocking
450                    model_name = dict["model_name"]
451                    pred_key = re.sub(r"[\W-]+", "_", "pred_zsod_" + model_name)
452                    eval_key = re.sub(r"[\W-]+", "_", "eval_zsod_" + model_name)
453                    dataset.reload()
454                    run_successful = self.eval_and_export(
455                        dataset, model_name, pred_key, eval_key
456                    )
457                    models_done += 1
458                    logging.info(
459                        f"Evaluation and export of {models_done}/{n_models} models done."
460                    )
461                except Exception as e:
462                    logging.error(f"Error in eval-and-export worker: {e}")
463                    continue
464
465            if models_done == n_models:
466                break
467
468        return run_successful
469
470    # Functionality functions
471    def model_inference(
472        self,
473        metadata: dict,
474        device: str,
475        dataset: torch.utils.data.Dataset,
476        dataset_v51: fo.Dataset,
477        object_classes: list,
478        results_queue: Union[queue.Queue, mp.Queue],
479        root_log_dir: str,
480        persistent_workers: bool = False,
481    ):
482        """Model inference method running zero-shot object detection on provided dataset and device, returning success status."""
483        writer = None
484        run_successful = True
485        processor, model, inputs, outputs, result, dataloader = (
486            None,
487            None,
488            None,
489            None,
490            None,
491            None,
492        )  # For finally block
493
494        # Timeout handler
495        dataloader_timeout = 60
496        signal.signal(signal.SIGALRM, timeout_handler)
497
498        try:
499            # Metadata
500            run_id = metadata["run_id"]
501            model_name = metadata["model_name"]
502            dataset_name = metadata["dataset_name"]
503            is_subset = metadata["is_subset"]
504            batch_size = metadata["batch_size"]
505
506            logging.info(
507                f"Process ID: {os.getpid()}, Run ID: {run_id}, Device: {device}, Model: {model_name}"
508            )
509
510            # Load the model
511            logging.info(f"Loading model {model_name}")
512            processor = AutoProcessor.from_pretrained(model_name, use_fast=True)
513            model = AutoModelForZeroShotObjectDetection.from_pretrained(model_name)
514            model = model.to(device, non_blocking=True)
515            model.eval()
516            hf_model_config = AutoConfig.from_pretrained(model_name)
517            hf_model_config_name = type(hf_model_config).__name__
518            batch_classes = [object_classes] * batch_size
519            logging.info(f"Loaded model type {hf_model_config_name}")
520
521            # Dataloader
522            logging.info("Generating dataloader")
523            if is_subset:
524                chunk_index_start = metadata["chunk_index_start"]
525                chunk_index_end = metadata["chunk_index_end"]
526                logging.info(f"Length of dataset: {len(dataset)}")
527                logging.info(f"Subset start index: {chunk_index_start}")
528                logging.info(f"Subset stop index: {chunk_index_end}")
529                dataset = Subset(dataset, range(chunk_index_start, chunk_index_end))
530
531            zero_shot_inference_preprocessing = ZeroShotInferenceCollateFn(
532                hf_model_config_name=hf_model_config_name,
533                hf_processor=processor,
534                object_classes=object_classes,
535                batch_size=batch_size,
536                batch_classes=batch_classes,
537            )
538            num_workers = WORKFLOWS["auto_labeling_zero_shot"]["n_worker_dataloader"]
539            prefetch_factor = WORKFLOWS["auto_labeling_zero_shot"][
540                "prefetch_factor_dataloader"
541            ]
542            dataloader = DataLoader(
543                dataset,
544                batch_size=batch_size,
545                shuffle=False,
546                num_workers=num_workers,
547                persistent_workers=persistent_workers,
548                pin_memory=True,
549                prefetch_factor=prefetch_factor,
550                collate_fn=zero_shot_inference_preprocessing,
551            )
552
553            dataloader_length = len(dataloader)
554            if dataloader_length < 1:
555                logging.error(
556                    f"Dataloader has insufficient data: {dataloader_length} entries. Please check your dataset and DataLoader configuration."
557                )
558
559            # Logging
560            experiment_name = f"{model_name}_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}_{device}"
561            log_directory = os.path.join(root_log_dir, dataset_name, experiment_name)
562            wandb.tensorboard.patch(root_logdir=log_directory)
563            if WANDB_ACTIVE:
564                wandb.init(
565                    name=f"{model_name}_{device}",
566                    job_type="inference",
567                    project="Zero Shot Object Detection",
568                    config=metadata,
569                )
570            writer = SummaryWriter(log_dir=log_directory)
571
572            # Inference Loop
573            logging.info(f"{os.getpid()}: Starting inference loop5")
574            n_processed_images = 0
575            for inputs, labels, target_sizes, batch_classes in tqdm(
576                dataloader, desc="Inference Loop"
577            ):
578                signal.alarm(dataloader_timeout)
579                try:
580                    time_start = time.time()
581                    n_images = len(labels)
582                    inputs = inputs.to(device, non_blocking=True)
583
584                    with torch.amp.autocast("cuda"), torch.inference_mode():
585                        outputs = model(**inputs)
586
587                    result = {
588                        "inputs": inputs,
589                        "outputs": outputs,
590                        "processor": processor,
591                        "target_sizes": target_sizes,
592                        "labels": labels,
593                        "model_name": model_name,
594                        "hf_model_config_name": hf_model_config_name,
595                        "batch_classes": batch_classes,
596                    }
597
598                    logging.debug(f"{os.getpid()}: Putting result into queue")
599
600                    results_queue.put(
601                        result, timeout=60
602                    )  # Ditch data only after 60 seconds
603
604                    # Logging
605                    time_end = time.time()
606                    duration = time_end - time_start
607                    batches_per_second = 1 / duration
608                    frames_per_second = batches_per_second * n_images
609                    n_processed_images += n_images
610                    logging.debug(
611                        f"{os.getpid()}: Number of processes images: {n_processed_images}"
612                    )
613                    writer.add_scalar(
614                        f"inference/frames_per_second",
615                        frames_per_second,
616                        n_processed_images,
617                    )
618
619                except TimeoutException:
620                    logging.warning(
621                        f"Dataloader loop got stuck. Continuing with next batch."
622                    )
623                    continue
624
625                finally:
626                    signal.alarm(0)  # Cancel the alarm
627
628            # Flawless execution
629            wandb_exit_code = 0
630
631        except Exception as e:
632            wandb_exit_code = 1
633            run_successful = False
634            logging.error(f"Error in Process {os.getpid()}: {e}")
635        finally:
636            try:
637                wandb.finish(exit_code=wandb_exit_code)
638            except:
639                pass
640
641            # Explicit removal from device
642            del (
643                processor,
644                model,
645                inputs,
646                outputs,
647                result,
648                dataloader,
649            )
650
651            torch.cuda.empty_cache()
652            wandb.tensorboard.unpatch()
653            if writer:
654                writer.close()
655            return run_successful
656
657    def process_outputs(self, dataset_v51, result, object_classes, detection_threshold):
658        """Process outputs from object detection models, extracting bounding boxes and labels to save to the dataset."""
659        try:
660            inputs = result["inputs"]
661            outputs = result["outputs"]
662            target_sizes = result["target_sizes"]
663            labels = result["labels"]
664            model_name = result["model_name"]
665            hf_model_config_name = result["hf_model_config_name"]
666            batch_classes = result["batch_classes"]
667            processor = result["processor"]
668
669            # Processing output
670            if hf_model_config_name == "GroundingDinoConfig":
671                results = processor.post_process_grounded_object_detection(
672                    outputs,
673                    inputs.input_ids,
674                    box_threshold=detection_threshold,
675                    text_threshold=detection_threshold,
676                )
677            elif hf_model_config_name in ["Owlv2Config", "OwlViTConfig"]:
678                results = processor.post_process_grounded_object_detection(
679                    outputs=outputs,
680                    threshold=detection_threshold,
681                    target_sizes=target_sizes,
682                    text_labels=batch_classes,
683                )
684            elif hf_model_config_name == "OmDetTurboConfig":
685                results = processor.post_process_grounded_object_detection(
686                    outputs,
687                    text_labels=batch_classes,
688                    threshold=detection_threshold,
689                    nms_threshold=detection_threshold,
690                    target_sizes=target_sizes,
691                )
692            else:
693                logging.error(f"Invalid model name: {hf_model_config_name}")
694
695            if not len(results) == len(target_sizes) == len(labels):
696                logging.error(
697                    f"Lengths of results, target_sizes, and labels do not match: {len(results)}, {len(target_sizes)}, {len(labels)}"
698                )
699            for result, size, target in zip(results, target_sizes, labels):
700                boxes, scores, labels = (
701                    result["boxes"],
702                    result["scores"],
703                    result["text_labels"],
704                )
705
706                img_height = size[0]
707                img_width = size[1]
708
709                detections = []
710                for box, score, label in zip(boxes, scores, labels):
711                    processing_successful = True
712                    if hf_model_config_name == "GroundingDinoConfig":
713                        # Outputs do not comply with given labels
714                        # Grounding DINO outputs multiple pairs of object boxes and noun phrases for a given (Image, Text) pair
715                        # There can be either multiple labels per output ("bike van"), incomplete ones ("motorcyc"), or broken ones ("##cic")
716                        processed_label = label.split()[
717                            0
718                        ]  # Assume first output is the best output
719                        if processed_label in object_classes:
720                            label = processed_label
721                            top_left_x = box[0].item()
722                            top_left_y = box[1].item()
723                            box_width = (box[2] - box[0]).item()
724                            box_height = (box[3] - box[1]).item()
725                        else:
726                            matches = get_close_matches(
727                                processed_label, object_classes, n=1, cutoff=0.6
728                            )
729                            selected_label = matches[0] if matches else None
730                            if selected_label:
731                                logging.debug(
732                                    f"Mapped output '{processed_label}' to class '{selected_label}'"
733                                )
734                                label = selected_label
735                                top_left_x = box[0].item()
736                                top_left_y = box[1].item()
737                                box_width = (box[2] - box[0]).item()
738                                box_height = (box[3] - box[1]).item()
739                            else:
740                                logging.debug(
741                                    f"Skipped detection with {hf_model_config_name} due to unclear output: {label}"
742                                )
743                                processing_successful = False
744
745                    elif hf_model_config_name in [
746                        "Owlv2Config",
747                        "OwlViTConfig",
748                        "OmDetTurboConfig",
749                    ]:
750                        top_left_x = box[0].item() / img_width
751                        top_left_y = box[1].item() / img_height
752                        box_width = (box[2].item() - box[0].item()) / img_width
753                        box_height = (box[3].item() - box[1].item()) / img_height
754
755                    if (
756                        processing_successful
757                    ):  # Skip GroundingDinoConfig labels that could not be processed
758                        detection = fo.Detection(
759                            label=label,
760                            bounding_box=[
761                                top_left_x,
762                                top_left_y,
763                                box_width,
764                                box_height,
765                            ],
766                            confidence=score.item(),
767                        )
768                        detection["bbox_area"] = (
769                            detection["bounding_box"][2] * detection["bounding_box"][3]
770                        )
771                        detections.append(detection)
772
773                # Attach label to V51 dataset
774                pred_key = re.sub(
775                    r"[\W-]+", "_", "pred_zsod_" + model_name
776                )  # zsod Zero-Shot Object Deection
777                sample = dataset_v51[target["image_id"]]
778                sample[pred_key] = fo.Detections(detections=detections)
779                sample.save()
780
781        except Exception as e:
782            logging.error(f"Error in processing outputs: {e}")
783            processing_successful = False
784        finally:
785            return processing_successful
786
787    def eval_and_export(self, dataset_v51, model_name, pred_key, eval_key):
788        """Populate dataset with evaluation results (if ground_truth available)"""
789        try:
790            dataset_v51.evaluate_detections(
791                pred_key,
792                gt_field="ground_truth",
793                eval_key=eval_key,
794                compute_mAP=True,
795            )
796        except Exception as e:
797            logging.warning(f"Evaluation not possible: {e}")
798
799        # Store labels https://docs.voxel51.com/api/fiftyone.core.collections.html#fiftyone.core.collections.SampleCollection.export
800        model_name_key = re.sub(r"[\W-]+", "_", model_name)
801        dataset_v51.export(
802            export_dir=os.path.join(self.detections_root, model_name_key),
803            dataset_type=fo.types.COCODetectionDataset,
804            data_path="data.json",
805            export_media=None,  # "manifest",
806            label_field=pred_key,
807            progress=True,
808        )
809        return True

Zero-shot object detection using various HuggingFace models with multi-GPU support.

ZeroShotObjectDetection( dataset_torch: torch.utils.data.dataset.Dataset, dataset_info, config, detections_path='./output/detections/', log_root='./logs/')
156    def __init__(
157        self,
158        dataset_torch: torch.utils.data.Dataset,
159        dataset_info,
160        config,
161        detections_path="./output/detections/",
162        log_root="./logs/",
163    ):
164        """Initialize the zero-shot object detection labeler with dataset, configuration, and path settings."""
165        self.dataset_torch = dataset_torch
166        self.dataset_info = dataset_info
167        self.dataset_name = dataset_info["name"]
168        self.object_classes = config["object_classes"]
169        self.detection_threshold = config["detection_threshold"]
170        self.detections_root = os.path.join(detections_path, self.dataset_name)
171        self.tensorboard_root = os.path.join(
172            log_root, "tensorboard/zeroshot_object_detection"
173        )
174
175        logging.info(f"Zero-shot models will look for {self.object_classes}")

Initialize the zero-shot object detection labeler with dataset, configuration, and path settings.

dataset_torch
dataset_info
dataset_name
object_classes
detection_threshold
detections_root
tensorboard_root
def exclude_stored_predictions( self, dataset_v51: fiftyone.core.dataset.Dataset, config, do_exclude=False):
177    def exclude_stored_predictions(
178        self, dataset_v51: fo.Dataset, config, do_exclude=False
179    ):
180        """Checks for existing predictions and loads them from disk if available."""
181        dataset_schema = dataset_v51.get_field_schema()
182        models_splits_dict = {}
183        for model_name, value in config["hf_models_zeroshot_objectdetection"].items():
184            model_name_key = re.sub(r"[\W-]+", "_", model_name)
185            pred_key = re.sub(
186                r"[\W-]+", "_", "pred_zsod_" + model_name
187            )  # od for Object Detection
188            # Check if data already stored in V51 dataset
189            if pred_key in dataset_schema and do_exclude is True:
190                logging.warning(
191                    f"Skipping model {model_name}. Predictions already stored in Voxel51 dataset."
192                )
193            # Check if data already stored on disk
194            elif (
195                os.path.isdir(os.path.join(self.detections_root, model_name_key))
196                and do_exclude is True
197            ):
198                try:
199                    logging.info(f"Loading {model_name} predictions from disk.")
200                    temp_dataset = fo.Dataset.from_dir(
201                        dataset_dir=os.path.join(self.detections_root, model_name_key),
202                        dataset_type=fo.types.COCODetectionDataset,
203                        name="temp_dataset",
204                        data_path="data.json",
205                    )
206
207                    # Copy all detections from stored dataset into our dataset
208                    detections = temp_dataset.values("detections.detections")
209                    add_sample_field(
210                        dataset_v51,
211                        pred_key,
212                        fo.EmbeddedDocumentField,
213                        embedded_doc_type=fo.Detections,
214                    )
215                    dataset_v51.set_values(f"{pred_key}.detections", detections)
216                except Exception as e:
217                    logging.error(
218                        f"Data in {os.path.join(self.detections_root, model_name_key)} could not be loaded. Error: {e}"
219                    )
220                finally:
221                    fo.delete_dataset("temp_dataset")
222            # Assign model to be run
223            else:
224                models_splits_dict[model_name] = value
225
226        logging.info(f"Models to be run: {models_splits_dict}")
227        return models_splits_dict

Checks for existing predictions and loads them from disk if available.

def update_queue_sizes_worker(self, queues, queue_sizes, largest_queue_index, max_queue_size):
230    def update_queue_sizes_worker(
231        self, queues, queue_sizes, largest_queue_index, max_queue_size
232    ):
233        """Monitor and manage multiple result queues for balanced processing."""
234        experiment_name = f"queue_size_monitor_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}"
235        log_directory = os.path.join(
236            self.tensorboard_root, self.dataset_name, experiment_name
237        )
238        wandb.tensorboard.patch(root_logdir=log_directory)
239        if WANDB_ACTIVE:
240            wandb.init(
241                name=f"queue_size_monitor_{os.getpid()}",
242                job_type="inference",
243                project="Zero Shot Object Detection",
244            )
245        writer = SummaryWriter(log_dir=log_directory)
246
247        step = 0
248
249        while True:
250            for i, queue in enumerate(queues):
251                queue_sizes[i] = queue.qsize()
252                writer.add_scalar(f"queue_size/items/{i}", queue_sizes[i], step)
253
254            step += 1
255
256            # Find the index of the largest queue
257            max_size = max(queue_sizes)
258            max_index = queue_sizes.index(max_size)
259
260            # Calculate the total size of all queues
261            total_size = sum(queue_sizes)
262
263            # If total_size is greater than 0, calculate the probabilities
264            if total_size > 0:
265                # Normalize the queue sizes by the max_queue_size
266                normalized_sizes = [size / max_queue_size for size in queue_sizes]
267
268                # Calculate probabilities based on normalized sizes
269                probabilities = [
270                    size / sum(normalized_sizes) for size in normalized_sizes
271                ]
272
273                # Use random.choices with weights (probabilities)
274                chosen_queue_index = random.choices(
275                    range(len(queues)), weights=probabilities, k=1
276                )[0]
277
278                largest_queue_index.value = chosen_queue_index
279            else:
280                largest_queue_index.value = max_index
281
282            time.sleep(0.1)

Monitor and manage multiple result queues for balanced processing.

def process_outputs_worker( self, result_queues, largest_queue_index, inference_finished, max_queue_size, wandb_activate=False):
284    def process_outputs_worker(
285        self,
286        result_queues,
287        largest_queue_index,
288        inference_finished,
289        max_queue_size,
290        wandb_activate=False,
291    ):
292        """Process model outputs from result queues and save to dataset."""
293        configure_logging()
294        logging.info(f"Process ID: {os.getpid()}. Results processing process started")
295        dataset_v51 = fo.load_dataset(self.dataset_name)
296        processing_successful = None
297
298        # Logging
299        experiment_name = f"post_process_{os.getpid()}_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}"
300        log_directory = os.path.join(
301            self.tensorboard_root, self.dataset_name, experiment_name
302        )
303        wandb.tensorboard.patch(root_logdir=log_directory)
304        if WANDB_ACTIVE and wandb_activate:
305            wandb.init(
306                name=f"post_process_{os.getpid()}",
307                job_type="inference",
308                project="Zero Shot Object Detection",
309            )
310        writer = SummaryWriter(log_dir=log_directory)
311        n_processed_images = 0
312
313        logging.info(f"Post-Processor {os.getpid()} starting loop.")
314
315        while True:
316            results_queue = result_queues[largest_queue_index.value]
317            writer.add_scalar(
318                f"post_processing/selected_queue",
319                largest_queue_index.value,
320                n_processed_images,
321            )
322
323            if results_queue.qsize() == max_queue_size:
324                logging.warning(
325                    f"Queue full: {results_queue.qsize()}. Consider increasing number of post-processing workers."
326                )
327
328            # Exit only when inference is finished and the queue is empty
329            if inference_finished.value and results_queue.empty():
330                dataset_v51.save()
331                logging.info(
332                    f"Post-processing worker {os.getpid()} has finished all outputs."
333                )
334                break
335
336            # Process results from the queue if available
337            if not results_queue.empty():
338                try:
339                    time_start = time.time()
340
341                    result = results_queue.get_nowait()
342
343                    processing_successful = self.process_outputs(
344                        dataset_v51,
345                        result,
346                        self.object_classes,
347                        self.detection_threshold,
348                    )
349
350                    # Performance logging
351                    n_images = len(result["labels"])
352                    time_end = time.time()
353                    duration = time_end - time_start
354                    batches_per_second = 1 / duration
355                    frames_per_second = batches_per_second * n_images
356                    n_processed_images += n_images
357                    writer.add_scalar(
358                        f"post_processing/frames_per_second",
359                        frames_per_second,
360                        n_processed_images,
361                    )
362
363                    del result  # Explicit removal from device
364
365                except Exception as e:
366                    continue
367
368            else:
369                continue
370
371        writer.close()
372        wandb.finish(exit_code=0)
373        return processing_successful  # Return last processing status

Process model outputs from result queues and save to dataset.

def gpu_worker( self, gpu_id, cpu_cores, task_queue, results_queue, done_event, post_processing_finished, set_cpu_affinity=False):
375    def gpu_worker(
376        self,
377        gpu_id,
378        cpu_cores,
379        task_queue,
380        results_queue,
381        done_event,
382        post_processing_finished,
383        set_cpu_affinity=False,
384    ):
385        """Run model inference on specified GPU with dedicated CPU cores."""
386        dataset_v51 = fo.load_dataset(
387            self.dataset_name
388        )  # NOTE Only for the case of sequential processing
389        configure_logging()
390        # Set CPU
391        if set_cpu_affinity:
392            # Allow only certain CPU cores
393            psutil.Process().cpu_affinity(cpu_cores)
394        logging.info(f"Available CPU cores: {psutil.Process().cpu_affinity()}")
395        max_n_cpus = len(cpu_cores)
396        torch.set_num_threads(max_n_cpus)
397
398        # Set GPU
399        logging.info(f"GPU {gpu_id}: {torch.cuda.get_device_name(gpu_id)}")
400        device = torch.device(f"cuda:{gpu_id}")
401
402        run_successful = None
403        with torch.cuda.device(gpu_id):
404            while True:
405                if post_processing_finished.value and task_queue.empty():
406                    # Keep alive until post-processing is done
407                    break
408
409                if task_queue.empty():
410                    done_event.set()
411
412                if not task_queue.empty():
413                    try:
414                        task_metadata = task_queue.get(
415                            timeout=5
416                        )  # Timeout to prevent indefinite blocking
417                    except Exception as e:
418                        break  # Exit if no more tasks
419                    run_successful = self.model_inference(
420                        task_metadata,
421                        device,
422                        self.dataset_torch,
423                        dataset_v51,
424                        self.object_classes,
425                        results_queue,
426                        self.tensorboard_root,
427                    )
428                    logging.info(
429                        f"Worker for GPU {gpu_id} finished run successful: {run_successful}"
430                    )
431                else:
432                    continue
433        return run_successful  # Return last processing status

Run model inference on specified GPU with dedicated CPU cores.

def eval_and_export_worker(self, models_ready_queue, n_models):
435    def eval_and_export_worker(self, models_ready_queue, n_models):
436        """Evaluate model performance and export results for completed models."""
437        configure_logging()
438        logging.info(f"Process ID: {os.getpid()}. Eval-and-export process started")
439
440        dataset = fo.load_dataset(self.dataset_name)
441        run_successful = None
442        models_done = 0
443
444        while True:
445            if not models_ready_queue.empty():
446                try:
447                    dict = models_ready_queue.get(
448                        timeout=5
449                    )  # Timeout to prevent indefinite blocking
450                    model_name = dict["model_name"]
451                    pred_key = re.sub(r"[\W-]+", "_", "pred_zsod_" + model_name)
452                    eval_key = re.sub(r"[\W-]+", "_", "eval_zsod_" + model_name)
453                    dataset.reload()
454                    run_successful = self.eval_and_export(
455                        dataset, model_name, pred_key, eval_key
456                    )
457                    models_done += 1
458                    logging.info(
459                        f"Evaluation and export of {models_done}/{n_models} models done."
460                    )
461                except Exception as e:
462                    logging.error(f"Error in eval-and-export worker: {e}")
463                    continue
464
465            if models_done == n_models:
466                break
467
468        return run_successful

Evaluate model performance and export results for completed models.

def model_inference( self, metadata: dict, device: str, dataset: torch.utils.data.dataset.Dataset, dataset_v51: fiftyone.core.dataset.Dataset, object_classes: list, results_queue: Union[queue.Queue, <bound method BaseContext.Queue of <multiprocessing.context.DefaultContext object>>], root_log_dir: str, persistent_workers: bool = False):
471    def model_inference(
472        self,
473        metadata: dict,
474        device: str,
475        dataset: torch.utils.data.Dataset,
476        dataset_v51: fo.Dataset,
477        object_classes: list,
478        results_queue: Union[queue.Queue, mp.Queue],
479        root_log_dir: str,
480        persistent_workers: bool = False,
481    ):
482        """Model inference method running zero-shot object detection on provided dataset and device, returning success status."""
483        writer = None
484        run_successful = True
485        processor, model, inputs, outputs, result, dataloader = (
486            None,
487            None,
488            None,
489            None,
490            None,
491            None,
492        )  # For finally block
493
494        # Timeout handler
495        dataloader_timeout = 60
496        signal.signal(signal.SIGALRM, timeout_handler)
497
498        try:
499            # Metadata
500            run_id = metadata["run_id"]
501            model_name = metadata["model_name"]
502            dataset_name = metadata["dataset_name"]
503            is_subset = metadata["is_subset"]
504            batch_size = metadata["batch_size"]
505
506            logging.info(
507                f"Process ID: {os.getpid()}, Run ID: {run_id}, Device: {device}, Model: {model_name}"
508            )
509
510            # Load the model
511            logging.info(f"Loading model {model_name}")
512            processor = AutoProcessor.from_pretrained(model_name, use_fast=True)
513            model = AutoModelForZeroShotObjectDetection.from_pretrained(model_name)
514            model = model.to(device, non_blocking=True)
515            model.eval()
516            hf_model_config = AutoConfig.from_pretrained(model_name)
517            hf_model_config_name = type(hf_model_config).__name__
518            batch_classes = [object_classes] * batch_size
519            logging.info(f"Loaded model type {hf_model_config_name}")
520
521            # Dataloader
522            logging.info("Generating dataloader")
523            if is_subset:
524                chunk_index_start = metadata["chunk_index_start"]
525                chunk_index_end = metadata["chunk_index_end"]
526                logging.info(f"Length of dataset: {len(dataset)}")
527                logging.info(f"Subset start index: {chunk_index_start}")
528                logging.info(f"Subset stop index: {chunk_index_end}")
529                dataset = Subset(dataset, range(chunk_index_start, chunk_index_end))
530
531            zero_shot_inference_preprocessing = ZeroShotInferenceCollateFn(
532                hf_model_config_name=hf_model_config_name,
533                hf_processor=processor,
534                object_classes=object_classes,
535                batch_size=batch_size,
536                batch_classes=batch_classes,
537            )
538            num_workers = WORKFLOWS["auto_labeling_zero_shot"]["n_worker_dataloader"]
539            prefetch_factor = WORKFLOWS["auto_labeling_zero_shot"][
540                "prefetch_factor_dataloader"
541            ]
542            dataloader = DataLoader(
543                dataset,
544                batch_size=batch_size,
545                shuffle=False,
546                num_workers=num_workers,
547                persistent_workers=persistent_workers,
548                pin_memory=True,
549                prefetch_factor=prefetch_factor,
550                collate_fn=zero_shot_inference_preprocessing,
551            )
552
553            dataloader_length = len(dataloader)
554            if dataloader_length < 1:
555                logging.error(
556                    f"Dataloader has insufficient data: {dataloader_length} entries. Please check your dataset and DataLoader configuration."
557                )
558
559            # Logging
560            experiment_name = f"{model_name}_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}_{device}"
561            log_directory = os.path.join(root_log_dir, dataset_name, experiment_name)
562            wandb.tensorboard.patch(root_logdir=log_directory)
563            if WANDB_ACTIVE:
564                wandb.init(
565                    name=f"{model_name}_{device}",
566                    job_type="inference",
567                    project="Zero Shot Object Detection",
568                    config=metadata,
569                )
570            writer = SummaryWriter(log_dir=log_directory)
571
572            # Inference Loop
573            logging.info(f"{os.getpid()}: Starting inference loop5")
574            n_processed_images = 0
575            for inputs, labels, target_sizes, batch_classes in tqdm(
576                dataloader, desc="Inference Loop"
577            ):
578                signal.alarm(dataloader_timeout)
579                try:
580                    time_start = time.time()
581                    n_images = len(labels)
582                    inputs = inputs.to(device, non_blocking=True)
583
584                    with torch.amp.autocast("cuda"), torch.inference_mode():
585                        outputs = model(**inputs)
586
587                    result = {
588                        "inputs": inputs,
589                        "outputs": outputs,
590                        "processor": processor,
591                        "target_sizes": target_sizes,
592                        "labels": labels,
593                        "model_name": model_name,
594                        "hf_model_config_name": hf_model_config_name,
595                        "batch_classes": batch_classes,
596                    }
597
598                    logging.debug(f"{os.getpid()}: Putting result into queue")
599
600                    results_queue.put(
601                        result, timeout=60
602                    )  # Ditch data only after 60 seconds
603
604                    # Logging
605                    time_end = time.time()
606                    duration = time_end - time_start
607                    batches_per_second = 1 / duration
608                    frames_per_second = batches_per_second * n_images
609                    n_processed_images += n_images
610                    logging.debug(
611                        f"{os.getpid()}: Number of processes images: {n_processed_images}"
612                    )
613                    writer.add_scalar(
614                        f"inference/frames_per_second",
615                        frames_per_second,
616                        n_processed_images,
617                    )
618
619                except TimeoutException:
620                    logging.warning(
621                        f"Dataloader loop got stuck. Continuing with next batch."
622                    )
623                    continue
624
625                finally:
626                    signal.alarm(0)  # Cancel the alarm
627
628            # Flawless execution
629            wandb_exit_code = 0
630
631        except Exception as e:
632            wandb_exit_code = 1
633            run_successful = False
634            logging.error(f"Error in Process {os.getpid()}: {e}")
635        finally:
636            try:
637                wandb.finish(exit_code=wandb_exit_code)
638            except:
639                pass
640
641            # Explicit removal from device
642            del (
643                processor,
644                model,
645                inputs,
646                outputs,
647                result,
648                dataloader,
649            )
650
651            torch.cuda.empty_cache()
652            wandb.tensorboard.unpatch()
653            if writer:
654                writer.close()
655            return run_successful

Model inference method running zero-shot object detection on provided dataset and device, returning success status.

def process_outputs(self, dataset_v51, result, object_classes, detection_threshold):
657    def process_outputs(self, dataset_v51, result, object_classes, detection_threshold):
658        """Process outputs from object detection models, extracting bounding boxes and labels to save to the dataset."""
659        try:
660            inputs = result["inputs"]
661            outputs = result["outputs"]
662            target_sizes = result["target_sizes"]
663            labels = result["labels"]
664            model_name = result["model_name"]
665            hf_model_config_name = result["hf_model_config_name"]
666            batch_classes = result["batch_classes"]
667            processor = result["processor"]
668
669            # Processing output
670            if hf_model_config_name == "GroundingDinoConfig":
671                results = processor.post_process_grounded_object_detection(
672                    outputs,
673                    inputs.input_ids,
674                    box_threshold=detection_threshold,
675                    text_threshold=detection_threshold,
676                )
677            elif hf_model_config_name in ["Owlv2Config", "OwlViTConfig"]:
678                results = processor.post_process_grounded_object_detection(
679                    outputs=outputs,
680                    threshold=detection_threshold,
681                    target_sizes=target_sizes,
682                    text_labels=batch_classes,
683                )
684            elif hf_model_config_name == "OmDetTurboConfig":
685                results = processor.post_process_grounded_object_detection(
686                    outputs,
687                    text_labels=batch_classes,
688                    threshold=detection_threshold,
689                    nms_threshold=detection_threshold,
690                    target_sizes=target_sizes,
691                )
692            else:
693                logging.error(f"Invalid model name: {hf_model_config_name}")
694
695            if not len(results) == len(target_sizes) == len(labels):
696                logging.error(
697                    f"Lengths of results, target_sizes, and labels do not match: {len(results)}, {len(target_sizes)}, {len(labels)}"
698                )
699            for result, size, target in zip(results, target_sizes, labels):
700                boxes, scores, labels = (
701                    result["boxes"],
702                    result["scores"],
703                    result["text_labels"],
704                )
705
706                img_height = size[0]
707                img_width = size[1]
708
709                detections = []
710                for box, score, label in zip(boxes, scores, labels):
711                    processing_successful = True
712                    if hf_model_config_name == "GroundingDinoConfig":
713                        # Outputs do not comply with given labels
714                        # Grounding DINO outputs multiple pairs of object boxes and noun phrases for a given (Image, Text) pair
715                        # There can be either multiple labels per output ("bike van"), incomplete ones ("motorcyc"), or broken ones ("##cic")
716                        processed_label = label.split()[
717                            0
718                        ]  # Assume first output is the best output
719                        if processed_label in object_classes:
720                            label = processed_label
721                            top_left_x = box[0].item()
722                            top_left_y = box[1].item()
723                            box_width = (box[2] - box[0]).item()
724                            box_height = (box[3] - box[1]).item()
725                        else:
726                            matches = get_close_matches(
727                                processed_label, object_classes, n=1, cutoff=0.6
728                            )
729                            selected_label = matches[0] if matches else None
730                            if selected_label:
731                                logging.debug(
732                                    f"Mapped output '{processed_label}' to class '{selected_label}'"
733                                )
734                                label = selected_label
735                                top_left_x = box[0].item()
736                                top_left_y = box[1].item()
737                                box_width = (box[2] - box[0]).item()
738                                box_height = (box[3] - box[1]).item()
739                            else:
740                                logging.debug(
741                                    f"Skipped detection with {hf_model_config_name} due to unclear output: {label}"
742                                )
743                                processing_successful = False
744
745                    elif hf_model_config_name in [
746                        "Owlv2Config",
747                        "OwlViTConfig",
748                        "OmDetTurboConfig",
749                    ]:
750                        top_left_x = box[0].item() / img_width
751                        top_left_y = box[1].item() / img_height
752                        box_width = (box[2].item() - box[0].item()) / img_width
753                        box_height = (box[3].item() - box[1].item()) / img_height
754
755                    if (
756                        processing_successful
757                    ):  # Skip GroundingDinoConfig labels that could not be processed
758                        detection = fo.Detection(
759                            label=label,
760                            bounding_box=[
761                                top_left_x,
762                                top_left_y,
763                                box_width,
764                                box_height,
765                            ],
766                            confidence=score.item(),
767                        )
768                        detection["bbox_area"] = (
769                            detection["bounding_box"][2] * detection["bounding_box"][3]
770                        )
771                        detections.append(detection)
772
773                # Attach label to V51 dataset
774                pred_key = re.sub(
775                    r"[\W-]+", "_", "pred_zsod_" + model_name
776                )  # zsod Zero-Shot Object Deection
777                sample = dataset_v51[target["image_id"]]
778                sample[pred_key] = fo.Detections(detections=detections)
779                sample.save()
780
781        except Exception as e:
782            logging.error(f"Error in processing outputs: {e}")
783            processing_successful = False
784        finally:
785            return processing_successful

Process outputs from object detection models, extracting bounding boxes and labels to save to the dataset.

def eval_and_export(self, dataset_v51, model_name, pred_key, eval_key):
787    def eval_and_export(self, dataset_v51, model_name, pred_key, eval_key):
788        """Populate dataset with evaluation results (if ground_truth available)"""
789        try:
790            dataset_v51.evaluate_detections(
791                pred_key,
792                gt_field="ground_truth",
793                eval_key=eval_key,
794                compute_mAP=True,
795            )
796        except Exception as e:
797            logging.warning(f"Evaluation not possible: {e}")
798
799        # Store labels https://docs.voxel51.com/api/fiftyone.core.collections.html#fiftyone.core.collections.SampleCollection.export
800        model_name_key = re.sub(r"[\W-]+", "_", model_name)
801        dataset_v51.export(
802            export_dir=os.path.join(self.detections_root, model_name_key),
803            dataset_type=fo.types.COCODetectionDataset,
804            data_path="data.json",
805            export_media=None,  # "manifest",
806            label_field=pred_key,
807            progress=True,
808        )
809        return True

Populate dataset with evaluation results (if ground_truth available)

class UltralyticsObjectDetection:
812class UltralyticsObjectDetection:
813    """Object detection using Ultralytics YOLO models with training and inference support."""
814
815    def __init__(self, dataset, config):
816        """Initialize with dataset, config, and setup paths for model and data."""
817        self.dataset = dataset
818        self.config = config
819        self.ultralytics_data_path = os.path.join(
820            config["export_dataset_root"], config["v51_dataset_name"]
821        )
822
823        self.hf_hub_model_id = (
824            f"{HF_ROOT}/"
825            + f"{config['v51_dataset_name']}_{config['model_name']}".replace("/", "_")
826        )
827
828        self.export_root = "output/models/ultralytics/"
829        self.export_folder = os.path.join(
830            self.export_root, self.config["v51_dataset_name"]
831        )
832
833        self.model_path = os.path.join(
834            self.export_folder, self.config["model_name"], "weights", "best.pt"
835        )
836
837    @staticmethod
838    def export_data(
839        dataset, dataset_info, export_dataset_root, label_field="ground_truth"
840    ):
841        """Export dataset to YOLO format for Ultralytics training."""
842        ultralytics_data_path = os.path.join(export_dataset_root, dataset_info["name"])
843        # Delete export directory if it already exists
844        if os.path.exists(ultralytics_data_path):
845            shutil.rmtree(ultralytics_data_path)
846
847        logging.info("Exporting data for training with Ultralytics")
848        classes = dataset.distinct(f"{label_field}.detections.label")
849
850        # Make directory
851        os.makedirs(ultralytics_data_path, exist_ok=False)
852
853        for split in ACCEPTED_SPLITS:
854            split_view = dataset.match_tags(split)
855
856            if split == "val" or split == "train":  # YOLO expects train and val
857                split_view.export(
858                    export_dir=ultralytics_data_path,
859                    dataset_type=fo.types.YOLOv5Dataset,
860                    label_field=label_field,
861                    classes=classes,
862                    split=split,
863                )
864
865    def train(self):
866        """Train the YOLO model for object detection using Ultralytics and optionally upload to Hugging Face."""
867        model = YOLO(self.config["model_name"], task="detect")
868        # https://docs.ultralytics.com/modes/train/#train-settings
869
870        # Use all available GPUs
871        device = "0"  # Default to GPU 0
872        if torch.cuda.device_count() > 1:
873            device = ",".join(map(str, range(torch.cuda.device_count())))
874
875        results = model.train(
876            data=f"{self.ultralytics_data_path}/dataset.yaml",
877            epochs=self.config["epochs"],
878            project=self.export_folder,
879            name=self.config["model_name"],
880            patience=self.config["patience"],
881            batch=self.config["batch_size"],
882            imgsz=self.config["img_size"],
883            multi_scale=self.config["multi_scale"],
884            cos_lr=self.config["cos_lr"],
885            seed=GLOBAL_SEED,
886            optimizer="AdamW",  # "auto" as default
887            pretrained=True,
888            exist_ok=True,
889            amp=True,
890            device=device
891        )
892        metrics = model.val()
893        logging.info(f"Model Performance: {metrics}")
894
895        # Upload model to Hugging Face
896        if HF_DO_UPLOAD:
897            logging.info(f"Uploading model {self.model_path} to Hugging Face.")
898            api = HfApi()
899            api.create_repo(
900                self.hf_hub_model_id, private=True, repo_type="model", exist_ok=True
901            )
902            api.upload_file(
903                path_or_fileobj=self.model_path,
904                path_in_repo="best.pt",
905                repo_id=self.hf_hub_model_id,
906                repo_type="model",
907            )
908
909    def inference(self, gt_field="ground_truth"):
910        """Performs inference using YOLO model on a dataset, with options to evaluate results."""
911        logging.info(f"Running inference on dataset {self.config['v51_dataset_name']}")
912        inference_settings = self.config["inference_settings"]
913
914        dataset_name = None
915        model_name = self.config["model_name"]
916
917        model_hf = inference_settings["model_hf"]
918        if model_hf is not None:
919            # Use model manually defined in config.
920            # This way models can be used for inference which were trained on a different dataset
921            dataset_name, _ = get_dataset_and_model_from_hf_id(model_hf)
922
923            # Set up directories
924            download_dir = os.path.join(
925                self.export_root, dataset_name, model_name, "weights"
926            )
927            os.makedirs(os.path.join(download_dir), exist_ok=True)
928
929            self.model_path = os.path.join(download_dir, "best.pt")
930
931            # Create directories if they don't exist
932
933            file_path = hf_hub_download(
934                repo_id=model_hf,
935                filename="best.pt",
936                local_dir=download_dir,
937            )
938        else:
939            # Automatically determine model based on dataset
940            dataset_name = self.config["v51_dataset_name"]
941
942            try:
943                if os.path.exists(self.model_path):
944                    file_path = self.model_path
945                    logging.info(f"Loading model {model_name} from disk: {file_path}")
946                else:
947                    download_dir = self.model_path.replace("best.pt", "")
948                    os.makedirs(download_dir, exist_ok=True)
949                    logging.info(
950                        f"Downloading model {self.hf_hub_model_id} from Hugging Face to {download_dir}"
951                    )
952                    file_path = hf_hub_download(
953                        repo_id=self.hf_hub_model_id,
954                        filename="best.pt",
955                        local_dir=download_dir,
956                    )
957            except Exception as e:
958                logging.error(f"Failed to load or download model: {str(e)}.")
959                return False
960
961        pred_key = f"pred_od_{model_name}-{dataset_name}"
962        logging.info(f"Using model {self.model_path} for inference.")
963        model = YOLO(self.model_path)
964
965        detection_threshold = inference_settings["detection_threshold"]
966        if inference_settings["inference_on_test"] is True:
967            dataset_eval_view = self.dataset.match_tags("test")
968            if len(dataset_eval_view) == 0:
969                logging.error("Dataset misses split 'test'")
970            dataset_eval_view.apply_model(
971                model, label_field=pred_key, confidence_thresh=detection_threshold
972            )
973        else:
974            self.dataset.apply_model(
975                model, label_field=pred_key, confidence_thresh=detection_threshold
976            )
977
978        if inference_settings["do_eval"]:
979            eval_key = f"eval_{self.config['model_name']}_{dataset_name}"
980
981            if inference_settings["inference_on_test"] is True:
982                dataset_view = self.dataset.match_tags(["test"])
983            else:
984                dataset_view = self.dataset
985
986            dataset_view.evaluate_detections(
987                pred_key,
988                gt_field=gt_field,
989                eval_key=eval_key,
990                compute_mAP=True,
991            )

Object detection using Ultralytics YOLO models with training and inference support.

UltralyticsObjectDetection(dataset, config)
815    def __init__(self, dataset, config):
816        """Initialize with dataset, config, and setup paths for model and data."""
817        self.dataset = dataset
818        self.config = config
819        self.ultralytics_data_path = os.path.join(
820            config["export_dataset_root"], config["v51_dataset_name"]
821        )
822
823        self.hf_hub_model_id = (
824            f"{HF_ROOT}/"
825            + f"{config['v51_dataset_name']}_{config['model_name']}".replace("/", "_")
826        )
827
828        self.export_root = "output/models/ultralytics/"
829        self.export_folder = os.path.join(
830            self.export_root, self.config["v51_dataset_name"]
831        )
832
833        self.model_path = os.path.join(
834            self.export_folder, self.config["model_name"], "weights", "best.pt"
835        )

Initialize with dataset, config, and setup paths for model and data.

dataset
config
ultralytics_data_path
hf_hub_model_id
export_root
export_folder
model_path
@staticmethod
def export_data( dataset, dataset_info, export_dataset_root, label_field='ground_truth'):
837    @staticmethod
838    def export_data(
839        dataset, dataset_info, export_dataset_root, label_field="ground_truth"
840    ):
841        """Export dataset to YOLO format for Ultralytics training."""
842        ultralytics_data_path = os.path.join(export_dataset_root, dataset_info["name"])
843        # Delete export directory if it already exists
844        if os.path.exists(ultralytics_data_path):
845            shutil.rmtree(ultralytics_data_path)
846
847        logging.info("Exporting data for training with Ultralytics")
848        classes = dataset.distinct(f"{label_field}.detections.label")
849
850        # Make directory
851        os.makedirs(ultralytics_data_path, exist_ok=False)
852
853        for split in ACCEPTED_SPLITS:
854            split_view = dataset.match_tags(split)
855
856            if split == "val" or split == "train":  # YOLO expects train and val
857                split_view.export(
858                    export_dir=ultralytics_data_path,
859                    dataset_type=fo.types.YOLOv5Dataset,
860                    label_field=label_field,
861                    classes=classes,
862                    split=split,
863                )

Export dataset to YOLO format for Ultralytics training.

def train(self):
865    def train(self):
866        """Train the YOLO model for object detection using Ultralytics and optionally upload to Hugging Face."""
867        model = YOLO(self.config["model_name"], task="detect")
868        # https://docs.ultralytics.com/modes/train/#train-settings
869
870        # Use all available GPUs
871        device = "0"  # Default to GPU 0
872        if torch.cuda.device_count() > 1:
873            device = ",".join(map(str, range(torch.cuda.device_count())))
874
875        results = model.train(
876            data=f"{self.ultralytics_data_path}/dataset.yaml",
877            epochs=self.config["epochs"],
878            project=self.export_folder,
879            name=self.config["model_name"],
880            patience=self.config["patience"],
881            batch=self.config["batch_size"],
882            imgsz=self.config["img_size"],
883            multi_scale=self.config["multi_scale"],
884            cos_lr=self.config["cos_lr"],
885            seed=GLOBAL_SEED,
886            optimizer="AdamW",  # "auto" as default
887            pretrained=True,
888            exist_ok=True,
889            amp=True,
890            device=device
891        )
892        metrics = model.val()
893        logging.info(f"Model Performance: {metrics}")
894
895        # Upload model to Hugging Face
896        if HF_DO_UPLOAD:
897            logging.info(f"Uploading model {self.model_path} to Hugging Face.")
898            api = HfApi()
899            api.create_repo(
900                self.hf_hub_model_id, private=True, repo_type="model", exist_ok=True
901            )
902            api.upload_file(
903                path_or_fileobj=self.model_path,
904                path_in_repo="best.pt",
905                repo_id=self.hf_hub_model_id,
906                repo_type="model",
907            )

Train the YOLO model for object detection using Ultralytics and optionally upload to Hugging Face.

def inference(self, gt_field='ground_truth'):
909    def inference(self, gt_field="ground_truth"):
910        """Performs inference using YOLO model on a dataset, with options to evaluate results."""
911        logging.info(f"Running inference on dataset {self.config['v51_dataset_name']}")
912        inference_settings = self.config["inference_settings"]
913
914        dataset_name = None
915        model_name = self.config["model_name"]
916
917        model_hf = inference_settings["model_hf"]
918        if model_hf is not None:
919            # Use model manually defined in config.
920            # This way models can be used for inference which were trained on a different dataset
921            dataset_name, _ = get_dataset_and_model_from_hf_id(model_hf)
922
923            # Set up directories
924            download_dir = os.path.join(
925                self.export_root, dataset_name, model_name, "weights"
926            )
927            os.makedirs(os.path.join(download_dir), exist_ok=True)
928
929            self.model_path = os.path.join(download_dir, "best.pt")
930
931            # Create directories if they don't exist
932
933            file_path = hf_hub_download(
934                repo_id=model_hf,
935                filename="best.pt",
936                local_dir=download_dir,
937            )
938        else:
939            # Automatically determine model based on dataset
940            dataset_name = self.config["v51_dataset_name"]
941
942            try:
943                if os.path.exists(self.model_path):
944                    file_path = self.model_path
945                    logging.info(f"Loading model {model_name} from disk: {file_path}")
946                else:
947                    download_dir = self.model_path.replace("best.pt", "")
948                    os.makedirs(download_dir, exist_ok=True)
949                    logging.info(
950                        f"Downloading model {self.hf_hub_model_id} from Hugging Face to {download_dir}"
951                    )
952                    file_path = hf_hub_download(
953                        repo_id=self.hf_hub_model_id,
954                        filename="best.pt",
955                        local_dir=download_dir,
956                    )
957            except Exception as e:
958                logging.error(f"Failed to load or download model: {str(e)}.")
959                return False
960
961        pred_key = f"pred_od_{model_name}-{dataset_name}"
962        logging.info(f"Using model {self.model_path} for inference.")
963        model = YOLO(self.model_path)
964
965        detection_threshold = inference_settings["detection_threshold"]
966        if inference_settings["inference_on_test"] is True:
967            dataset_eval_view = self.dataset.match_tags("test")
968            if len(dataset_eval_view) == 0:
969                logging.error("Dataset misses split 'test'")
970            dataset_eval_view.apply_model(
971                model, label_field=pred_key, confidence_thresh=detection_threshold
972            )
973        else:
974            self.dataset.apply_model(
975                model, label_field=pred_key, confidence_thresh=detection_threshold
976            )
977
978        if inference_settings["do_eval"]:
979            eval_key = f"eval_{self.config['model_name']}_{dataset_name}"
980
981            if inference_settings["inference_on_test"] is True:
982                dataset_view = self.dataset.match_tags(["test"])
983            else:
984                dataset_view = self.dataset
985
986            dataset_view.evaluate_detections(
987                pred_key,
988                gt_field=gt_field,
989                eval_key=eval_key,
990                compute_mAP=True,
991            )

Performs inference using YOLO model on a dataset, with options to evaluate results.

def transform_batch_standalone( batch, image_processor, do_convert_annotations=True, return_pixel_mask=False):
 994def transform_batch_standalone(
 995    batch,
 996    image_processor,
 997    do_convert_annotations=True,
 998    return_pixel_mask=False,
 999):
1000    """Apply format annotations in COCO format for object detection task. Outside of class so it can be pickled."""
1001    images = []
1002    annotations = []
1003
1004    for image_path, annotation in zip(batch["image_path"], batch["objects"]):
1005        image = Image.open(image_path).convert("RGB")
1006        image_np = np.array(image)
1007        images.append(image_np)
1008
1009        coco_annotations = []
1010        for i, bbox in enumerate(annotation["bbox"]):
1011
1012            # Conversion from HF dataset bounding boxes to DETR:
1013            # Input: HF dataset bbox is COCO (top_left_x, top_left_y, width, height) in absolute coordinates
1014            # Output:
1015            # DETR expects COCO (top_left_x, top_left_y, width, height) in absolute coordinates if 'do_convert_annotations == True'
1016            # DETR expects YOLO (center_x, center_y, width, height) in relative coordinates between [0,1] if 'do_convert_annotations == False'
1017
1018            if do_convert_annotations == False:
1019                x, y, w, h = bbox
1020                img_height, img_width = image_np.shape[:2]
1021                center_x = (x + w / 2) / img_width
1022                center_y = (y + h / 2) / img_height
1023                width = w / img_width
1024                height = h / img_height
1025                bbox = [center_x, center_y, width, height]
1026
1027                # Ensure bbox values are within the expected range
1028                assert all(0 <= coord <= 1 for coord in bbox), f"Invalid bbox: {bbox}"
1029
1030                logging.debug(
1031                    f"Converted {[x, y, w, h]} to {[center_x, center_y, width, height]} with 'do_convert_annotations' = {do_convert_annotations}"
1032                )
1033
1034            coco_annotation = {
1035                "image_id": annotation["image_id"],
1036                "bbox": bbox,
1037                "category_id": annotation["category_id"][i],
1038                "area": annotation["area"][i],
1039                "iscrowd": 0,
1040            }
1041            coco_annotations.append(coco_annotation)
1042        detr_annotation = {
1043            "image_id": annotation["image_id"],
1044            "annotations": coco_annotations,
1045        }
1046        annotations.append(detr_annotation)
1047
1048        # Apply the image processor transformations: resizing, rescaling, normalization
1049        result = image_processor(
1050            images=images, annotations=annotations, return_tensors="pt"
1051        )
1052
1053    if not return_pixel_mask:
1054        result.pop("pixel_mask", None)
1055
1056    return result

Apply format annotations in COCO format for object detection task. Outside of class so it can be pickled.

class HuggingFaceObjectDetection:
1059class HuggingFaceObjectDetection:
1060    """Object detection using HuggingFace models with support for training and inference."""
1061
1062    def __init__(
1063        self,
1064        dataset,
1065        config,
1066        output_model_path="./output/models/object_detection_hf",
1067        output_detections_path="./output/detections/",
1068        gt_field="ground_truth",
1069    ):
1070        """Initialize with dataset, config, and optional output paths."""
1071        self.dataset = dataset
1072        self.config = config
1073        self.model_name = config["model_name"]
1074        self.model_name_key = re.sub(r"[\W-]+", "_", self.model_name)
1075        self.dataset_name = config["v51_dataset_name"]
1076        self.do_convert_annotations = True  # HF can convert (top_left_x, top_left_y, bottom_right_x, bottom_right_y) in abs. coordinates to (x_min, y_min, width, height) in rel. coordinates https://github.com/huggingface/transformers/blob/v4.48.2/src/transformers/models/conditional_detr/image_processing_conditional_detr.py#L1497
1077
1078        self.detections_root = os.path.join(
1079            output_detections_path, self.dataset_name, self.model_name_key
1080        )
1081
1082        self.model_root = os.path.join(
1083            output_model_path, self.dataset_name, self.model_name_key
1084        )
1085
1086        self.hf_hub_model_id = (
1087            f"{HF_ROOT}/" + f"{self.dataset_name}_{self.model_name}".replace("/", "_")
1088        )
1089
1090        self.categories = dataset.distinct(f"{gt_field}.detections.label")
1091        self.id2label = {index: x for index, x in enumerate(self.categories, start=0)}
1092        self.label2id = {v: k for k, v in self.id2label.items()}
1093
1094    def collate_fn(self, batch):
1095        """Collate function for batching data during training and inference."""
1096        data = {}
1097        data["pixel_values"] = torch.stack([x["pixel_values"] for x in batch])
1098        data["labels"] = [x["labels"] for x in batch]
1099        if "pixel_mask" in batch[0]:
1100            data["pixel_mask"] = torch.stack([x["pixel_mask"] for x in batch])
1101        return data
1102
1103    def train(self, hf_dataset, overwrite_output=True):
1104        """Train models for object detection tasks with support for custom image sizes and transformations."""
1105        torch.cuda.empty_cache()
1106        img_size_target = self.config.get("image_size", None)
1107        if img_size_target is None:
1108            image_processor = AutoProcessor.from_pretrained(
1109                self.model_name,
1110                do_resize=False,
1111                do_pad=True,
1112                use_fast=True,
1113                do_convert_annotations=self.do_convert_annotations,
1114            )
1115        else:
1116            logging.warning(f"Resizing images to target size {img_size_target}.")
1117            image_processor = AutoProcessor.from_pretrained(
1118                self.model_name,
1119                do_resize=True,
1120                size={
1121                    "max_height": img_size_target[1],
1122                    "max_width": img_size_target[0],
1123                },
1124                do_pad=True,
1125                pad_size={"height": img_size_target[1], "width": img_size_target[0]},
1126                use_fast=True,
1127                do_convert_annotations=self.do_convert_annotations,
1128            )
1129
1130        train_transform_batch = partial(
1131            transform_batch_standalone,
1132            image_processor=image_processor,
1133            do_convert_annotations=self.do_convert_annotations,
1134        )
1135        val_test_transform_batch = partial(
1136            transform_batch_standalone,
1137            image_processor=image_processor,
1138            do_convert_annotations=self.do_convert_annotations,
1139        )
1140
1141        hf_dataset[Split.TRAIN] = hf_dataset[Split.TRAIN].with_transform(
1142            train_transform_batch
1143        )
1144        hf_dataset[Split.VALIDATION] = hf_dataset[Split.VALIDATION].with_transform(
1145            val_test_transform_batch
1146        )
1147        hf_dataset[Split.TEST] = hf_dataset[Split.TEST].with_transform(
1148            val_test_transform_batch
1149        )
1150
1151        hf_model_config = AutoConfig.from_pretrained(self.model_name)
1152        hf_model_config_name = type(hf_model_config).__name__
1153
1154        if type(hf_model_config) in AutoModelForObjectDetection._model_mapping:
1155            model = AutoModelForObjectDetection.from_pretrained(
1156                self.model_name,
1157                id2label=self.id2label,
1158                label2id=self.label2id,
1159                ignore_mismatched_sizes=True,
1160            )
1161        else:
1162            model = None
1163            logging.error(
1164                "Hugging Face AutoModel does not support " + str(type(hf_model_config))
1165            )
1166
1167        if (
1168            overwrite_output == True
1169            and os.path.exists(self.model_root)
1170            and os.listdir(self.model_root)
1171        ):
1172            logging.warning(
1173                f"Training will overwrite existing results in {self.model_root}"
1174            )
1175
1176        training_args = TrainingArguments(
1177            run_name=self.model_name,
1178            output_dir=self.model_root,
1179            overwrite_output_dir=overwrite_output,
1180            num_train_epochs=self.config["epochs"],
1181            fp16=True,
1182            per_device_train_batch_size=self.config["batch_size"],
1183            auto_find_batch_size=True,
1184            dataloader_num_workers=min(self.config["n_worker_dataloader"], NUM_WORKERS),
1185            learning_rate=self.config["learning_rate"],
1186            lr_scheduler_type="cosine",
1187            weight_decay=self.config["weight_decay"],
1188            max_grad_norm=self.config["max_grad_norm"],
1189            metric_for_best_model="eval_loss",
1190            greater_is_better=False,
1191            load_best_model_at_end=True,
1192            eval_strategy="epoch",
1193            save_strategy="best",
1194            save_total_limit=1,
1195            remove_unused_columns=False,
1196            eval_do_concat_batches=False,
1197            save_safetensors=False,  # Does not work with all models
1198            hub_model_id=self.hf_hub_model_id,
1199            hub_private_repo=True,
1200            push_to_hub=HF_DO_UPLOAD,
1201            seed=GLOBAL_SEED,
1202            data_seed=GLOBAL_SEED,
1203        )
1204
1205        early_stopping_callback = EarlyStoppingCallback(
1206            early_stopping_patience=self.config["early_stop_patience"],
1207            early_stopping_threshold=self.config["early_stop_threshold"],
1208        )
1209
1210        trainer = Trainer(
1211            model=model,
1212            args=training_args,
1213            train_dataset=hf_dataset[Split.TRAIN],
1214            eval_dataset=hf_dataset[Split.VALIDATION],
1215            tokenizer=image_processor,
1216            data_collator=self.collate_fn,
1217            callbacks=[early_stopping_callback],
1218            # compute_metrics=eval_compute_metrics_fn,
1219        )
1220
1221        logging.info(f"Starting training of model {self.model_name}.")
1222        trainer.train()
1223        if HF_DO_UPLOAD:
1224            trainer.push_to_hub()
1225
1226        metrics = trainer.evaluate(eval_dataset=hf_dataset[Split.TEST])
1227        logging.info(f"Model training completed. Evaluation results: {metrics}")
1228
1229    def inference(self, inference_settings, load_from_hf=True, gt_field="ground_truth"):
1230        """Performs model inference on a dataset, loading from Hugging Face or disk, and optionally evaluates detection results."""
1231
1232        model_hf = inference_settings["model_hf"]
1233        dataset_name = None
1234        if model_hf is not None:
1235            self.hf_hub_model_id = model_hf
1236            dataset_name, model_name = get_dataset_and_model_from_hf_id(model_hf)
1237        else:
1238            dataset_name = self.dataset_name
1239        torch.cuda.empty_cache()
1240        # Load trained model from Hugging Face
1241        load_from_hf_successful = None
1242        if load_from_hf:
1243            try:
1244                logging.info(f"Loading model from Hugging Face: {self.hf_hub_model_id}")
1245                image_processor = AutoProcessor.from_pretrained(self.hf_hub_model_id)
1246                model = AutoModelForObjectDetection.from_pretrained(
1247                    self.hf_hub_model_id
1248                )
1249                load_from_hf_successful = True
1250            except Exception as e:
1251                load_from_hf_successful = False
1252                logging.warning(
1253                    f"Model {self.model_name} could not be loaded from Hugging Face {self.hf_hub_model_id}. Attempting loading from disk."
1254                )
1255        if load_from_hf == False or load_from_hf_successful == False:
1256            try:
1257                # Select folder in self.model_root that include 'checkpoint-'
1258                checkpoint_dirs = [
1259                    d
1260                    for d in os.listdir(self.model_root)
1261                    if "checkpoint-" in d
1262                    and os.path.isdir(os.path.join(self.model_root, d))
1263                ]
1264
1265                if not checkpoint_dirs:
1266                    logging.error(
1267                        f"No checkpoint directory found in {self.model_root}!"
1268                    )
1269                    model_path = None
1270                else:
1271                    # Sort by modification time (latest first)
1272                    checkpoint_dirs.sort(
1273                        key=lambda d: os.path.getmtime(
1274                            os.path.join(self.model_root, d)
1275                        ),
1276                        reverse=True,
1277                    )
1278
1279                    if len(checkpoint_dirs) > 1:
1280                        logging.warning(
1281                            f"Multiple checkpoint directories found: {checkpoint_dirs}. Selecting the latest one: {checkpoint_dirs[0]}."
1282                        )
1283
1284                    selected_checkpoint = checkpoint_dirs[0]
1285                    logging.info(
1286                        f"Loading model from disk: {self.model_root}/{selected_checkpoint}"
1287                    )
1288                    model_path = os.path.join(self.model_root, selected_checkpoint)
1289
1290                image_processor = AutoProcessor.from_pretrained(model_path)
1291                model = AutoModelForObjectDetection.from_pretrained(model_path)
1292            except Exception as e:
1293                logging.error(
1294                    f"Model {self.model_name} could not be loaded from folder {self.model_root}/{selected_checkpoint}. Inference not possible."
1295                )
1296
1297        device, _, _ = get_backend()
1298        logging.info(f"Using device {device} for inference.")
1299        model = model.to(device)
1300        model.eval()
1301
1302        pred_key = f"pred_od_{self.model_name_key}-{dataset_name}"
1303
1304        if inference_settings["inference_on_test"] is True:
1305            INFERENCE_SPLITS = ["test"]
1306            dataset_eval_view = self.dataset.match_tags(INFERENCE_SPLITS)
1307        else:
1308            dataset_eval_view = self.dataset
1309
1310        detection_threshold = inference_settings["detection_threshold"]
1311
1312        with torch.amp.autocast("cuda"), torch.inference_mode():
1313            for sample in dataset_eval_view.iter_samples(progress=True, autosave=True):
1314                image_width = sample.metadata.width
1315                image_height = sample.metadata.height
1316                img_filepath = sample.filepath
1317
1318                image = Image.open(img_filepath)
1319                inputs = image_processor(images=[image], return_tensors="pt")
1320                outputs = model(**inputs.to(device))
1321                target_sizes = torch.tensor([[image.size[1], image.size[0]]])
1322
1323                results = image_processor.post_process_object_detection(
1324                    outputs, threshold=detection_threshold, target_sizes=target_sizes
1325                )[0]
1326
1327                detections = []
1328                for score, label, box in zip(
1329                    results["scores"], results["labels"], results["boxes"]
1330                ):
1331                    # Bbox is in absolute coordinates x, y, x2, y2
1332                    box = box.tolist()
1333                    text_label = model.config.id2label[label.item()]
1334
1335                    # Voxel51 requires relative coordinates between 0 and 1
1336                    top_left_x = box[0] / image_width
1337                    top_left_y = box[1] / image_height
1338                    box_width = (box[2] - box[0]) / image_width
1339                    box_height = (box[3] - box[1]) / image_height
1340                    detection = fo.Detection(
1341                        label=text_label,
1342                        bounding_box=[
1343                            top_left_x,
1344                            top_left_y,
1345                            box_width,
1346                            box_height,
1347                        ],
1348                        confidence=score.item(),
1349                    )
1350                    detections.append(detection)
1351
1352                sample[pred_key] = fo.Detections(detections=detections)
1353
1354        if inference_settings["do_eval"] is True:
1355            eval_key = re.sub(
1356                r"[\W-]+", "_", "eval_" + self.model_name + "_" + self.dataset_name
1357            )
1358
1359            if inference_settings["inference_on_test"] is True:
1360                dataset_view = self.dataset.match_tags(["test"])
1361            else:
1362                dataset_view = self.dataset
1363
1364            dataset_view.evaluate_detections(
1365                pred_key,
1366                gt_field=gt_field,
1367                eval_key=eval_key,
1368                compute_mAP=True,
1369            )

Object detection using HuggingFace models with support for training and inference.

HuggingFaceObjectDetection( dataset, config, output_model_path='./output/models/object_detection_hf', output_detections_path='./output/detections/', gt_field='ground_truth')
1062    def __init__(
1063        self,
1064        dataset,
1065        config,
1066        output_model_path="./output/models/object_detection_hf",
1067        output_detections_path="./output/detections/",
1068        gt_field="ground_truth",
1069    ):
1070        """Initialize with dataset, config, and optional output paths."""
1071        self.dataset = dataset
1072        self.config = config
1073        self.model_name = config["model_name"]
1074        self.model_name_key = re.sub(r"[\W-]+", "_", self.model_name)
1075        self.dataset_name = config["v51_dataset_name"]
1076        self.do_convert_annotations = True  # HF can convert (top_left_x, top_left_y, bottom_right_x, bottom_right_y) in abs. coordinates to (x_min, y_min, width, height) in rel. coordinates https://github.com/huggingface/transformers/blob/v4.48.2/src/transformers/models/conditional_detr/image_processing_conditional_detr.py#L1497
1077
1078        self.detections_root = os.path.join(
1079            output_detections_path, self.dataset_name, self.model_name_key
1080        )
1081
1082        self.model_root = os.path.join(
1083            output_model_path, self.dataset_name, self.model_name_key
1084        )
1085
1086        self.hf_hub_model_id = (
1087            f"{HF_ROOT}/" + f"{self.dataset_name}_{self.model_name}".replace("/", "_")
1088        )
1089
1090        self.categories = dataset.distinct(f"{gt_field}.detections.label")
1091        self.id2label = {index: x for index, x in enumerate(self.categories, start=0)}
1092        self.label2id = {v: k for k, v in self.id2label.items()}

Initialize with dataset, config, and optional output paths.

dataset
config
model_name
model_name_key
dataset_name
do_convert_annotations
detections_root
model_root
hf_hub_model_id
categories
id2label
label2id
def collate_fn(self, batch):
1094    def collate_fn(self, batch):
1095        """Collate function for batching data during training and inference."""
1096        data = {}
1097        data["pixel_values"] = torch.stack([x["pixel_values"] for x in batch])
1098        data["labels"] = [x["labels"] for x in batch]
1099        if "pixel_mask" in batch[0]:
1100            data["pixel_mask"] = torch.stack([x["pixel_mask"] for x in batch])
1101        return data

Collate function for batching data during training and inference.

def train(self, hf_dataset, overwrite_output=True):
1103    def train(self, hf_dataset, overwrite_output=True):
1104        """Train models for object detection tasks with support for custom image sizes and transformations."""
1105        torch.cuda.empty_cache()
1106        img_size_target = self.config.get("image_size", None)
1107        if img_size_target is None:
1108            image_processor = AutoProcessor.from_pretrained(
1109                self.model_name,
1110                do_resize=False,
1111                do_pad=True,
1112                use_fast=True,
1113                do_convert_annotations=self.do_convert_annotations,
1114            )
1115        else:
1116            logging.warning(f"Resizing images to target size {img_size_target}.")
1117            image_processor = AutoProcessor.from_pretrained(
1118                self.model_name,
1119                do_resize=True,
1120                size={
1121                    "max_height": img_size_target[1],
1122                    "max_width": img_size_target[0],
1123                },
1124                do_pad=True,
1125                pad_size={"height": img_size_target[1], "width": img_size_target[0]},
1126                use_fast=True,
1127                do_convert_annotations=self.do_convert_annotations,
1128            )
1129
1130        train_transform_batch = partial(
1131            transform_batch_standalone,
1132            image_processor=image_processor,
1133            do_convert_annotations=self.do_convert_annotations,
1134        )
1135        val_test_transform_batch = partial(
1136            transform_batch_standalone,
1137            image_processor=image_processor,
1138            do_convert_annotations=self.do_convert_annotations,
1139        )
1140
1141        hf_dataset[Split.TRAIN] = hf_dataset[Split.TRAIN].with_transform(
1142            train_transform_batch
1143        )
1144        hf_dataset[Split.VALIDATION] = hf_dataset[Split.VALIDATION].with_transform(
1145            val_test_transform_batch
1146        )
1147        hf_dataset[Split.TEST] = hf_dataset[Split.TEST].with_transform(
1148            val_test_transform_batch
1149        )
1150
1151        hf_model_config = AutoConfig.from_pretrained(self.model_name)
1152        hf_model_config_name = type(hf_model_config).__name__
1153
1154        if type(hf_model_config) in AutoModelForObjectDetection._model_mapping:
1155            model = AutoModelForObjectDetection.from_pretrained(
1156                self.model_name,
1157                id2label=self.id2label,
1158                label2id=self.label2id,
1159                ignore_mismatched_sizes=True,
1160            )
1161        else:
1162            model = None
1163            logging.error(
1164                "Hugging Face AutoModel does not support " + str(type(hf_model_config))
1165            )
1166
1167        if (
1168            overwrite_output == True
1169            and os.path.exists(self.model_root)
1170            and os.listdir(self.model_root)
1171        ):
1172            logging.warning(
1173                f"Training will overwrite existing results in {self.model_root}"
1174            )
1175
1176        training_args = TrainingArguments(
1177            run_name=self.model_name,
1178            output_dir=self.model_root,
1179            overwrite_output_dir=overwrite_output,
1180            num_train_epochs=self.config["epochs"],
1181            fp16=True,
1182            per_device_train_batch_size=self.config["batch_size"],
1183            auto_find_batch_size=True,
1184            dataloader_num_workers=min(self.config["n_worker_dataloader"], NUM_WORKERS),
1185            learning_rate=self.config["learning_rate"],
1186            lr_scheduler_type="cosine",
1187            weight_decay=self.config["weight_decay"],
1188            max_grad_norm=self.config["max_grad_norm"],
1189            metric_for_best_model="eval_loss",
1190            greater_is_better=False,
1191            load_best_model_at_end=True,
1192            eval_strategy="epoch",
1193            save_strategy="best",
1194            save_total_limit=1,
1195            remove_unused_columns=False,
1196            eval_do_concat_batches=False,
1197            save_safetensors=False,  # Does not work with all models
1198            hub_model_id=self.hf_hub_model_id,
1199            hub_private_repo=True,
1200            push_to_hub=HF_DO_UPLOAD,
1201            seed=GLOBAL_SEED,
1202            data_seed=GLOBAL_SEED,
1203        )
1204
1205        early_stopping_callback = EarlyStoppingCallback(
1206            early_stopping_patience=self.config["early_stop_patience"],
1207            early_stopping_threshold=self.config["early_stop_threshold"],
1208        )
1209
1210        trainer = Trainer(
1211            model=model,
1212            args=training_args,
1213            train_dataset=hf_dataset[Split.TRAIN],
1214            eval_dataset=hf_dataset[Split.VALIDATION],
1215            tokenizer=image_processor,
1216            data_collator=self.collate_fn,
1217            callbacks=[early_stopping_callback],
1218            # compute_metrics=eval_compute_metrics_fn,
1219        )
1220
1221        logging.info(f"Starting training of model {self.model_name}.")
1222        trainer.train()
1223        if HF_DO_UPLOAD:
1224            trainer.push_to_hub()
1225
1226        metrics = trainer.evaluate(eval_dataset=hf_dataset[Split.TEST])
1227        logging.info(f"Model training completed. Evaluation results: {metrics}")

Train models for object detection tasks with support for custom image sizes and transformations.

def inference(self, inference_settings, load_from_hf=True, gt_field='ground_truth'):
1229    def inference(self, inference_settings, load_from_hf=True, gt_field="ground_truth"):
1230        """Performs model inference on a dataset, loading from Hugging Face or disk, and optionally evaluates detection results."""
1231
1232        model_hf = inference_settings["model_hf"]
1233        dataset_name = None
1234        if model_hf is not None:
1235            self.hf_hub_model_id = model_hf
1236            dataset_name, model_name = get_dataset_and_model_from_hf_id(model_hf)
1237        else:
1238            dataset_name = self.dataset_name
1239        torch.cuda.empty_cache()
1240        # Load trained model from Hugging Face
1241        load_from_hf_successful = None
1242        if load_from_hf:
1243            try:
1244                logging.info(f"Loading model from Hugging Face: {self.hf_hub_model_id}")
1245                image_processor = AutoProcessor.from_pretrained(self.hf_hub_model_id)
1246                model = AutoModelForObjectDetection.from_pretrained(
1247                    self.hf_hub_model_id
1248                )
1249                load_from_hf_successful = True
1250            except Exception as e:
1251                load_from_hf_successful = False
1252                logging.warning(
1253                    f"Model {self.model_name} could not be loaded from Hugging Face {self.hf_hub_model_id}. Attempting loading from disk."
1254                )
1255        if load_from_hf == False or load_from_hf_successful == False:
1256            try:
1257                # Select folder in self.model_root that include 'checkpoint-'
1258                checkpoint_dirs = [
1259                    d
1260                    for d in os.listdir(self.model_root)
1261                    if "checkpoint-" in d
1262                    and os.path.isdir(os.path.join(self.model_root, d))
1263                ]
1264
1265                if not checkpoint_dirs:
1266                    logging.error(
1267                        f"No checkpoint directory found in {self.model_root}!"
1268                    )
1269                    model_path = None
1270                else:
1271                    # Sort by modification time (latest first)
1272                    checkpoint_dirs.sort(
1273                        key=lambda d: os.path.getmtime(
1274                            os.path.join(self.model_root, d)
1275                        ),
1276                        reverse=True,
1277                    )
1278
1279                    if len(checkpoint_dirs) > 1:
1280                        logging.warning(
1281                            f"Multiple checkpoint directories found: {checkpoint_dirs}. Selecting the latest one: {checkpoint_dirs[0]}."
1282                        )
1283
1284                    selected_checkpoint = checkpoint_dirs[0]
1285                    logging.info(
1286                        f"Loading model from disk: {self.model_root}/{selected_checkpoint}"
1287                    )
1288                    model_path = os.path.join(self.model_root, selected_checkpoint)
1289
1290                image_processor = AutoProcessor.from_pretrained(model_path)
1291                model = AutoModelForObjectDetection.from_pretrained(model_path)
1292            except Exception as e:
1293                logging.error(
1294                    f"Model {self.model_name} could not be loaded from folder {self.model_root}/{selected_checkpoint}. Inference not possible."
1295                )
1296
1297        device, _, _ = get_backend()
1298        logging.info(f"Using device {device} for inference.")
1299        model = model.to(device)
1300        model.eval()
1301
1302        pred_key = f"pred_od_{self.model_name_key}-{dataset_name}"
1303
1304        if inference_settings["inference_on_test"] is True:
1305            INFERENCE_SPLITS = ["test"]
1306            dataset_eval_view = self.dataset.match_tags(INFERENCE_SPLITS)
1307        else:
1308            dataset_eval_view = self.dataset
1309
1310        detection_threshold = inference_settings["detection_threshold"]
1311
1312        with torch.amp.autocast("cuda"), torch.inference_mode():
1313            for sample in dataset_eval_view.iter_samples(progress=True, autosave=True):
1314                image_width = sample.metadata.width
1315                image_height = sample.metadata.height
1316                img_filepath = sample.filepath
1317
1318                image = Image.open(img_filepath)
1319                inputs = image_processor(images=[image], return_tensors="pt")
1320                outputs = model(**inputs.to(device))
1321                target_sizes = torch.tensor([[image.size[1], image.size[0]]])
1322
1323                results = image_processor.post_process_object_detection(
1324                    outputs, threshold=detection_threshold, target_sizes=target_sizes
1325                )[0]
1326
1327                detections = []
1328                for score, label, box in zip(
1329                    results["scores"], results["labels"], results["boxes"]
1330                ):
1331                    # Bbox is in absolute coordinates x, y, x2, y2
1332                    box = box.tolist()
1333                    text_label = model.config.id2label[label.item()]
1334
1335                    # Voxel51 requires relative coordinates between 0 and 1
1336                    top_left_x = box[0] / image_width
1337                    top_left_y = box[1] / image_height
1338                    box_width = (box[2] - box[0]) / image_width
1339                    box_height = (box[3] - box[1]) / image_height
1340                    detection = fo.Detection(
1341                        label=text_label,
1342                        bounding_box=[
1343                            top_left_x,
1344                            top_left_y,
1345                            box_width,
1346                            box_height,
1347                        ],
1348                        confidence=score.item(),
1349                    )
1350                    detections.append(detection)
1351
1352                sample[pred_key] = fo.Detections(detections=detections)
1353
1354        if inference_settings["do_eval"] is True:
1355            eval_key = re.sub(
1356                r"[\W-]+", "_", "eval_" + self.model_name + "_" + self.dataset_name
1357            )
1358
1359            if inference_settings["inference_on_test"] is True:
1360                dataset_view = self.dataset.match_tags(["test"])
1361            else:
1362                dataset_view = self.dataset
1363
1364            dataset_view.evaluate_detections(
1365                pred_key,
1366                gt_field=gt_field,
1367                eval_key=eval_key,
1368                compute_mAP=True,
1369            )

Performs model inference on a dataset, loading from Hugging Face or disk, and optionally evaluates detection results.

class CustomCoDETRObjectDetection:
1372class CustomCoDETRObjectDetection:
1373    """Interface for running Co-DETR object detection model training and inference in containers"""
1374
1375    def __init__(self, dataset, dataset_info, run_config):
1376        """Initialize Co-DETR interface with dataset and configuration"""
1377        self.root_codetr = "./custom_models/CoDETR/Co-DETR"
1378        self.root_codetr_models = "output/models/codetr"
1379        self.dataset = dataset
1380        self.dataset_name = dataset_info["name"]
1381        self.export_dir_root = run_config["export_dataset_root"]
1382        self.config_key = os.path.splitext(os.path.basename(run_config["config"]))[0]
1383        self.hf_repo_name = f"{HF_ROOT}/{self.dataset_name}_{self.config_key}"
1384
1385    def convert_data(self):
1386        """Convert dataset to COCO format required by Co-DETR"""
1387
1388        export_dir = os.path.join(self.export_dir_root, self.dataset_name, "coco")
1389
1390        # Check if folder already exists
1391        if not os.path.exists(export_dir):
1392            # Make directory
1393            os.makedirs(export_dir, exist_ok=True)
1394            logging.info(f"Exporting data to {export_dir}")
1395            splits = [
1396                "train",
1397                "val",
1398                "test",
1399            ]  # CoDETR expects data in 'train' and 'val' folder
1400            for split in splits:
1401                split_view = self.dataset.match_tags(split)
1402                split_view.export(
1403                    dataset_type=fo.types.COCODetectionDataset,
1404                    data_path=os.path.join(export_dir, f"{split}2017"),
1405                    labels_path=os.path.join(
1406                        export_dir, "annotations", f"instances_{split}2017.json"
1407                    ),
1408                    label_field="ground_truth",
1409                )
1410        else:
1411            logging.warning(
1412                f"Folder {export_dir} already exists, skipping data export."
1413            )
1414
1415    def update_config_file(self, dataset_name, config_file, max_epochs):
1416        """Update Co-DETR config file with dataset-specific parameters"""
1417
1418        config_path = os.path.join(self.root_codetr, config_file)
1419
1420        # Get classes from exported data
1421        annotations_json = os.path.join(
1422            self.export_dir_root,
1423            dataset_name,
1424            "coco/annotations/instances_train2017.json",
1425        )
1426        # Read the JSON file
1427        with open(annotations_json, "r") as file:
1428            data = json.load(file)
1429
1430        # Extract the value associated with the key "categories"
1431        categories = data.get("categories")
1432        class_names = tuple(category["name"] for category in categories)
1433        num_classes = len(class_names)
1434
1435        # Update configuration file
1436        # This assumes that 'classes = '('a','b',...)' are already defined and will be overwritten.
1437        with open(config_path, "r") as file:
1438            content = file.read()
1439
1440        # Update the classes tuple
1441        content = re.sub(r"classes\s*=\s*\(.*?\)", f"classes = {class_names}", content)
1442
1443        # Update all instances of num_classes
1444        content = re.sub(r"num_classes=\d+", f"num_classes={num_classes}", content)
1445
1446        # Update all instances of max_epochs
1447        content = re.sub(r"max_epochs=\d+", f"max_epochs={max_epochs}", content)
1448
1449        with open(config_path, "w") as file:
1450            file.write(content)
1451
1452        logging.warning(
1453            f"Updated {config_path} with classes={class_names} and num_classes={num_classes} and max_epochs={max_epochs}"
1454        )
1455
1456    def train(self, param_config, param_n_gpus, container_tool, param_function="train"):
1457        """Train Co-DETR model using containerized environment"""
1458
1459        # Check if model already exists
1460        output_folder_codetr = os.path.join(self.root_codetr, "output")
1461        os.makedirs(output_folder_codetr, exist_ok=True)
1462        param_config_name = os.path.splitext(os.path.basename(param_config))[0]
1463        best_models_dir = os.path.join(output_folder_codetr, "best")
1464        os.makedirs(best_models_dir, exist_ok=True)
1465        # Best model files follow the naming scheme "config_dataset.pth"
1466        pth_model_files = (
1467            [f for f in os.listdir(best_models_dir) if f.endswith(".pth")]
1468            if os.path.exists(best_models_dir) and os.path.isdir(best_models_dir)
1469            else []
1470        )
1471
1472        # Best model files are stored in the format "config_dataset.pth"
1473        matching_files = [
1474            f
1475            for f in pth_model_files
1476            if f.startswith(param_config_name)
1477            and self.dataset_name in f
1478            and f.endswith(".pth")
1479        ]
1480        if len(matching_files) > 0:
1481            logging.warning(
1482                f"Model {param_config_name} already trained on dataset {self.dataset_name}. Skipping training."
1483            )
1484            if len(matching_files) > 1:
1485                logging.warning(f"Multiple weights found: {matching_files}")
1486        else:
1487            logging.info(
1488                f"Launching training for Co-DETR config {param_config} and dataset {self.dataset_name}."
1489            )
1490            volume_data = os.path.join(self.export_dir_root, self.dataset_name)
1491
1492            # Train model, store checkpoints in 'output_folder_codetr'
1493            train_result = self._run_container(
1494                volume_data=volume_data,
1495                param_function=param_function,
1496                param_config=param_config,
1497                param_n_gpus=param_n_gpus,
1498                container_tool=container_tool,
1499            )
1500
1501            # Find the best_bbox checkpoint file
1502            checkpoint_files = [
1503                f
1504                for f in os.listdir(output_folder_codetr)
1505                if "best_bbox" in f and f.endswith(".pth")
1506            ]
1507            if not checkpoint_files:
1508                logging.error(
1509                    "Co-DETR was not trained, model pth file missing. No checkpoint file with 'best_bbox' found."
1510                )
1511            else:
1512                if len(checkpoint_files) > 1:
1513                    logging.warning(
1514                        f"Found {len(checkpoint_files)} checkpoint files. Selecting {checkpoint_files[0]}."
1515                    )
1516                checkpoint = checkpoint_files[0]
1517                checkpoint_path = os.path.join(output_folder_codetr, checkpoint)
1518                logging.info("Co-DETR was trained successfully.")
1519
1520                # Upload best model to Hugging Face
1521                if HF_DO_UPLOAD == True:
1522                    logging.info("Uploading Co-DETR model to Hugging Face.")
1523                    api = HfApi()
1524                    api.create_repo(
1525                        self.hf_repo_name,
1526                        private=True,
1527                        repo_type="model",
1528                        exist_ok=True,
1529                    )
1530                    api.upload_file(
1531                        path_or_fileobj=checkpoint_path,
1532                        path_in_repo="model.pth",
1533                        repo_id=self.hf_repo_name,
1534                        repo_type="model",
1535                    )
1536
1537                # Move best model file and clear output folder
1538                self._run_container(
1539                    volume_data=volume_data,
1540                    param_function="clear-output",
1541                    param_config=param_config,
1542                    param_dataset_name=self.dataset_name,
1543                    container_tool=container_tool,
1544                )
1545
1546    @staticmethod
1547    def _find_file_iteratively(start_path, filename):
1548        """Direct access or recursively search for a file in a directory structure."""
1549        # Convert start_path to a Path object
1550        start_path = Path(start_path)
1551
1552        # Check if the file exists in the start_path directly (very fast)
1553        file_path = start_path / filename
1554        if file_path.exists():
1555            return str(file_path)
1556
1557        # Start with the highest directory and go up iteratively
1558        current_dir = start_path
1559        checked_dirs = set()
1560
1561        while current_dir != current_dir.root:
1562            # Check if the file is in the current directory
1563            file_path = current_dir / filename
1564            if file_path.exists():
1565                return str(file_path)
1566
1567            # If we haven't checked the sibling directories, check them as well
1568            parent_dir = current_dir.parent
1569            if parent_dir not in checked_dirs:
1570                # Check sibling directories
1571                for sibling in parent_dir.iterdir():
1572                    if sibling != current_dir and sibling.is_dir():
1573                        sibling_file_path = sibling / filename
1574                        if sibling_file_path.exists():
1575                            return str(sibling_file_path)
1576                checked_dirs.add(parent_dir)
1577
1578            # Otherwise, go one level up
1579            current_dir = current_dir.parent
1580
1581        # If file is not found after traversing all levels, return None
1582        logging.error(f"File {filename} could not be found.")
1583        return None
1584
1585    def run_inference(
1586        self,
1587        dataset,
1588        param_config,
1589        param_n_gpus,
1590        container_tool,
1591        inference_settings,
1592        param_function="inference",
1593        inference_output_folder="custom_models/CoDETR/Co-DETR/output/inference/",
1594        gt_field="ground_truth",
1595    ):
1596        """Run inference using trained Co-DETR model and convert results to FiftyOne format"""
1597
1598        logging.info(f"Launching inference for Co-DETR config {param_config}.")
1599        volume_data = os.path.join(self.export_dir_root, self.dataset_name)
1600
1601        if inference_settings["inference_on_test"] is True:
1602            folder_inference = os.path.join("coco", "test2017")
1603        else:
1604            folder_inference = os.path.join("coco")
1605
1606        # Get model from Hugging Face
1607        dataset_name = None
1608        config_key = None
1609        try:
1610            if inference_settings["model_hf"] is None:
1611                hf_path = self.hf_repo_name
1612            else:
1613                hf_path = inference_settings["model_hf"]
1614
1615            dataset_name, config_key = get_dataset_and_model_from_hf_id(hf_path)
1616
1617            download_folder = os.path.join(
1618                self.root_codetr_models, dataset_name, config_key
1619            )
1620
1621            logging.info(
1622                f"Downloading model {hf_path} from Hugging Face into {download_folder}"
1623            )
1624            os.makedirs(download_folder, exist_ok=True)
1625
1626            file_path = hf_hub_download(
1627                repo_id=hf_path,
1628                filename="model.pth",
1629                local_dir=download_folder,
1630            )
1631        except Exception as e:
1632            logging.error(f"An error occured during model download: {e}")
1633
1634        model_path = os.path.join(dataset_name, config_key, "model.pth")
1635        logging.info(f"Starting inference for model {model_path}")
1636
1637        inference_result = self._run_container(
1638            volume_data=volume_data,
1639            param_function=param_function,
1640            param_config=param_config,
1641            param_n_gpus=param_n_gpus,
1642            container_tool=container_tool,
1643            param_inference_dataset_folder=folder_inference,
1644            param_inference_model_checkpoint=model_path,
1645        )
1646
1647        # Convert results from JSON output into V51 dataset
1648        # Files follow format inference_results_{timestamp}.json (run_inference.py)
1649        os.makedirs(inference_output_folder, exist_ok=True)
1650        output_files = [
1651            f
1652            for f in os.listdir(inference_output_folder)
1653            if f.startswith("inference_results_") and f.endswith(".json")
1654        ]
1655        logging.debug(f"Found files with inference content: {output_files}")
1656
1657        if not output_files:
1658            logging.error(
1659                f"No inference result files found in {inference_output_folder}"
1660            )
1661
1662        # Get full path for each file
1663        file_paths = [os.path.join(inference_output_folder, f) for f in output_files]
1664
1665        # Extract timestamp from the filename and sort based on the timestamp
1666        file_paths_sorted = sorted(
1667            file_paths,
1668            key=lambda f: datetime.datetime.strptime(
1669                f.split("_")[-2] + "_" + f.split("_")[-1].replace(".json", ""),
1670                "%Y%m%d_%H%M%S",
1671            ),
1672            reverse=True,
1673        )
1674
1675        # Use the most recent file based on timestamp
1676        latest_file = file_paths_sorted[0]
1677        logging.info(f"Using inference results from: {latest_file}")
1678        with open(latest_file, "r") as file:
1679            data = json.load(file)
1680
1681        # Get conversion for annotated classes
1682        annotations_path = os.path.join(
1683            volume_data, "coco", "annotations", "instances_train2017.json"
1684        )
1685
1686        with open(annotations_path, "r") as file:
1687            data_annotations = json.load(file)
1688
1689        class_ids_and_names = [
1690            (category["id"], category["name"])
1691            for category in data_annotations["categories"]
1692        ]
1693
1694        # Match sample filepaths (from exported Co-DETR COCO format) to V51 filepaths
1695        sample = dataset.first()
1696        root_dir_samples = sample.filepath
1697
1698        # Convert results into V51 file format
1699        detection_threshold = inference_settings["detection_threshold"]
1700        pred_key = f"pred_od_{config_key}-{dataset_name}"
1701        for key, value in tqdm(data.items(), desc="Processing Co-DETR detection"):
1702            try:
1703                # Get filename
1704                filepath = CustomCoDETRObjectDetection._find_file_iteratively(
1705                    root_dir_samples, os.path.basename(key)
1706                )
1707                sample = dataset[filepath]
1708
1709                img_width = sample.metadata.width
1710                img_height = sample.metadata.height
1711
1712                detections_v51 = []
1713                for class_id, class_detections in enumerate(data[key]):  # Starts with 0
1714                    if len(class_detections) > 0:
1715                        objects_class = class_ids_and_names[class_id]
1716                        for detection in class_detections:
1717                            confidence = detection[4]
1718                            detection_v51 = fo.Detection(
1719                                label=objects_class[1],
1720                                bounding_box=[
1721                                    detection[0] / img_width,
1722                                    detection[1] / img_height,
1723                                    (detection[2] - detection[0]) / img_width,
1724                                    (detection[3] - detection[1]) / img_height,
1725                                ],
1726                                confidence=confidence,
1727                            )
1728                            if confidence >= detection_threshold:
1729                                detections_v51.append(detection_v51)
1730
1731                sample[pred_key] = fo.Detections(detections=detections_v51)
1732                sample.save()
1733            except Exception as e:
1734                logging.error(
1735                    f"An error occured during the conversion of Co-DETR inference results to the V51 dataset: {e}"
1736                )
1737
1738        # Run V51 evaluation
1739        if inference_settings["do_eval"] is True:
1740            eval_key = pred_key.replace("pred_", "eval_").replace("-", "_")
1741
1742            if inference_settings["inference_on_test"] is True:
1743                dataset_view = dataset.match_tags(["test"])
1744            else:
1745                dataset_view = dataset
1746
1747            logging.info(
1748                f"Starting evaluation for {pred_key} in evaluation key {eval_key}."
1749            )
1750            dataset_view.evaluate_detections(
1751                pred_key,
1752                gt_field=gt_field,
1753                eval_key=eval_key,
1754                compute_mAP=True,
1755            )
1756
1757    def _run_container(
1758        self,
1759        volume_data,
1760        param_function,
1761        param_config="",
1762        param_n_gpus="1",
1763        param_dataset_name="",
1764        param_inference_dataset_folder="",
1765        param_inference_model_checkpoint="",
1766        image="dbogdollresearch/codetr",
1767        workdir="/launch",
1768        container_tool="docker",
1769    ):
1770        """Execute Co-DETR container with specified parameters using Docker or Singularity"""
1771
1772        try:
1773            # Convert relative paths to absolute paths (necessary under WSL2)
1774            root_codetr_abs = os.path.abspath(self.root_codetr)
1775            volume_data_abs = os.path.abspath(volume_data)
1776            root_codetr_models_abs = os.path.abspath(self.root_codetr_models)
1777
1778            # Check if using Docker or Singularity and define the appropriate command
1779            if container_tool == "docker":
1780                command = [
1781                    "docker",
1782                    "run",
1783                    "--gpus",
1784                    "all",
1785                    "--workdir",
1786                    workdir,
1787                    "--volume",
1788                    f"{root_codetr_abs}:{workdir}",
1789                    "--volume",
1790                    f"{volume_data_abs}:{workdir}/data:ro",
1791                    "--volume",
1792                    f"{root_codetr_models_abs}:{workdir}/hf_models:ro",
1793                    "--shm-size=8g",
1794                    image,
1795                    param_function,
1796                    param_config,
1797                    param_n_gpus,
1798                    param_dataset_name,
1799                    param_inference_dataset_folder,
1800                    param_inference_model_checkpoint,
1801                ]
1802            elif container_tool == "singularity":
1803                command = [
1804                    "singularity",
1805                    "run",
1806                    "--nv",
1807                    "--pwd",
1808                    workdir,
1809                    "--bind",
1810                    f"{self.root_codetr}:{workdir}",
1811                    "--bind",
1812                    f"{volume_data}:{workdir}/data:ro",
1813                    "--bind",
1814                    f"{self.root_codetr_models}:{workdir}/hf_models:ro",
1815                    f"docker://{image}",
1816                    param_function,
1817                    param_config,
1818                    param_n_gpus,
1819                    param_dataset_name,
1820                    param_inference_dataset_folder,
1821                    param_inference_model_checkpoint,
1822                ]
1823            else:
1824                raise ValueError(
1825                    f"Invalid container tool specified: {container_tool}. Choose 'docker' or 'singularity'."
1826                )
1827
1828            # Start the process and stream outputs to the console
1829            logging.info(f"Launching terminal command {command}")
1830            with subprocess.Popen(
1831                command, stdout=sys.stdout, stderr=sys.stderr, text=True
1832            ) as proc:
1833                proc.wait()  # Wait for the process to complete
1834            return True
1835        except Exception as e:
1836            logging.error(f"Error during Co-DETR container run: {e}")
1837            return False

Interface for running Co-DETR object detection model training and inference in containers

CustomCoDETRObjectDetection(dataset, dataset_info, run_config)
1375    def __init__(self, dataset, dataset_info, run_config):
1376        """Initialize Co-DETR interface with dataset and configuration"""
1377        self.root_codetr = "./custom_models/CoDETR/Co-DETR"
1378        self.root_codetr_models = "output/models/codetr"
1379        self.dataset = dataset
1380        self.dataset_name = dataset_info["name"]
1381        self.export_dir_root = run_config["export_dataset_root"]
1382        self.config_key = os.path.splitext(os.path.basename(run_config["config"]))[0]
1383        self.hf_repo_name = f"{HF_ROOT}/{self.dataset_name}_{self.config_key}"

Initialize Co-DETR interface with dataset and configuration

root_codetr
root_codetr_models
dataset
dataset_name
export_dir_root
config_key
hf_repo_name
def convert_data(self):
1385    def convert_data(self):
1386        """Convert dataset to COCO format required by Co-DETR"""
1387
1388        export_dir = os.path.join(self.export_dir_root, self.dataset_name, "coco")
1389
1390        # Check if folder already exists
1391        if not os.path.exists(export_dir):
1392            # Make directory
1393            os.makedirs(export_dir, exist_ok=True)
1394            logging.info(f"Exporting data to {export_dir}")
1395            splits = [
1396                "train",
1397                "val",
1398                "test",
1399            ]  # CoDETR expects data in 'train' and 'val' folder
1400            for split in splits:
1401                split_view = self.dataset.match_tags(split)
1402                split_view.export(
1403                    dataset_type=fo.types.COCODetectionDataset,
1404                    data_path=os.path.join(export_dir, f"{split}2017"),
1405                    labels_path=os.path.join(
1406                        export_dir, "annotations", f"instances_{split}2017.json"
1407                    ),
1408                    label_field="ground_truth",
1409                )
1410        else:
1411            logging.warning(
1412                f"Folder {export_dir} already exists, skipping data export."
1413            )

Convert dataset to COCO format required by Co-DETR

def update_config_file(self, dataset_name, config_file, max_epochs):
1415    def update_config_file(self, dataset_name, config_file, max_epochs):
1416        """Update Co-DETR config file with dataset-specific parameters"""
1417
1418        config_path = os.path.join(self.root_codetr, config_file)
1419
1420        # Get classes from exported data
1421        annotations_json = os.path.join(
1422            self.export_dir_root,
1423            dataset_name,
1424            "coco/annotations/instances_train2017.json",
1425        )
1426        # Read the JSON file
1427        with open(annotations_json, "r") as file:
1428            data = json.load(file)
1429
1430        # Extract the value associated with the key "categories"
1431        categories = data.get("categories")
1432        class_names = tuple(category["name"] for category in categories)
1433        num_classes = len(class_names)
1434
1435        # Update configuration file
1436        # This assumes that 'classes = '('a','b',...)' are already defined and will be overwritten.
1437        with open(config_path, "r") as file:
1438            content = file.read()
1439
1440        # Update the classes tuple
1441        content = re.sub(r"classes\s*=\s*\(.*?\)", f"classes = {class_names}", content)
1442
1443        # Update all instances of num_classes
1444        content = re.sub(r"num_classes=\d+", f"num_classes={num_classes}", content)
1445
1446        # Update all instances of max_epochs
1447        content = re.sub(r"max_epochs=\d+", f"max_epochs={max_epochs}", content)
1448
1449        with open(config_path, "w") as file:
1450            file.write(content)
1451
1452        logging.warning(
1453            f"Updated {config_path} with classes={class_names} and num_classes={num_classes} and max_epochs={max_epochs}"
1454        )

Update Co-DETR config file with dataset-specific parameters

def train( self, param_config, param_n_gpus, container_tool, param_function='train'):
1456    def train(self, param_config, param_n_gpus, container_tool, param_function="train"):
1457        """Train Co-DETR model using containerized environment"""
1458
1459        # Check if model already exists
1460        output_folder_codetr = os.path.join(self.root_codetr, "output")
1461        os.makedirs(output_folder_codetr, exist_ok=True)
1462        param_config_name = os.path.splitext(os.path.basename(param_config))[0]
1463        best_models_dir = os.path.join(output_folder_codetr, "best")
1464        os.makedirs(best_models_dir, exist_ok=True)
1465        # Best model files follow the naming scheme "config_dataset.pth"
1466        pth_model_files = (
1467            [f for f in os.listdir(best_models_dir) if f.endswith(".pth")]
1468            if os.path.exists(best_models_dir) and os.path.isdir(best_models_dir)
1469            else []
1470        )
1471
1472        # Best model files are stored in the format "config_dataset.pth"
1473        matching_files = [
1474            f
1475            for f in pth_model_files
1476            if f.startswith(param_config_name)
1477            and self.dataset_name in f
1478            and f.endswith(".pth")
1479        ]
1480        if len(matching_files) > 0:
1481            logging.warning(
1482                f"Model {param_config_name} already trained on dataset {self.dataset_name}. Skipping training."
1483            )
1484            if len(matching_files) > 1:
1485                logging.warning(f"Multiple weights found: {matching_files}")
1486        else:
1487            logging.info(
1488                f"Launching training for Co-DETR config {param_config} and dataset {self.dataset_name}."
1489            )
1490            volume_data = os.path.join(self.export_dir_root, self.dataset_name)
1491
1492            # Train model, store checkpoints in 'output_folder_codetr'
1493            train_result = self._run_container(
1494                volume_data=volume_data,
1495                param_function=param_function,
1496                param_config=param_config,
1497                param_n_gpus=param_n_gpus,
1498                container_tool=container_tool,
1499            )
1500
1501            # Find the best_bbox checkpoint file
1502            checkpoint_files = [
1503                f
1504                for f in os.listdir(output_folder_codetr)
1505                if "best_bbox" in f and f.endswith(".pth")
1506            ]
1507            if not checkpoint_files:
1508                logging.error(
1509                    "Co-DETR was not trained, model pth file missing. No checkpoint file with 'best_bbox' found."
1510                )
1511            else:
1512                if len(checkpoint_files) > 1:
1513                    logging.warning(
1514                        f"Found {len(checkpoint_files)} checkpoint files. Selecting {checkpoint_files[0]}."
1515                    )
1516                checkpoint = checkpoint_files[0]
1517                checkpoint_path = os.path.join(output_folder_codetr, checkpoint)
1518                logging.info("Co-DETR was trained successfully.")
1519
1520                # Upload best model to Hugging Face
1521                if HF_DO_UPLOAD == True:
1522                    logging.info("Uploading Co-DETR model to Hugging Face.")
1523                    api = HfApi()
1524                    api.create_repo(
1525                        self.hf_repo_name,
1526                        private=True,
1527                        repo_type="model",
1528                        exist_ok=True,
1529                    )
1530                    api.upload_file(
1531                        path_or_fileobj=checkpoint_path,
1532                        path_in_repo="model.pth",
1533                        repo_id=self.hf_repo_name,
1534                        repo_type="model",
1535                    )
1536
1537                # Move best model file and clear output folder
1538                self._run_container(
1539                    volume_data=volume_data,
1540                    param_function="clear-output",
1541                    param_config=param_config,
1542                    param_dataset_name=self.dataset_name,
1543                    container_tool=container_tool,
1544                )

Train Co-DETR model using containerized environment

def run_inference( self, dataset, param_config, param_n_gpus, container_tool, inference_settings, param_function='inference', inference_output_folder='custom_models/CoDETR/Co-DETR/output/inference/', gt_field='ground_truth'):
1585    def run_inference(
1586        self,
1587        dataset,
1588        param_config,
1589        param_n_gpus,
1590        container_tool,
1591        inference_settings,
1592        param_function="inference",
1593        inference_output_folder="custom_models/CoDETR/Co-DETR/output/inference/",
1594        gt_field="ground_truth",
1595    ):
1596        """Run inference using trained Co-DETR model and convert results to FiftyOne format"""
1597
1598        logging.info(f"Launching inference for Co-DETR config {param_config}.")
1599        volume_data = os.path.join(self.export_dir_root, self.dataset_name)
1600
1601        if inference_settings["inference_on_test"] is True:
1602            folder_inference = os.path.join("coco", "test2017")
1603        else:
1604            folder_inference = os.path.join("coco")
1605
1606        # Get model from Hugging Face
1607        dataset_name = None
1608        config_key = None
1609        try:
1610            if inference_settings["model_hf"] is None:
1611                hf_path = self.hf_repo_name
1612            else:
1613                hf_path = inference_settings["model_hf"]
1614
1615            dataset_name, config_key = get_dataset_and_model_from_hf_id(hf_path)
1616
1617            download_folder = os.path.join(
1618                self.root_codetr_models, dataset_name, config_key
1619            )
1620
1621            logging.info(
1622                f"Downloading model {hf_path} from Hugging Face into {download_folder}"
1623            )
1624            os.makedirs(download_folder, exist_ok=True)
1625
1626            file_path = hf_hub_download(
1627                repo_id=hf_path,
1628                filename="model.pth",
1629                local_dir=download_folder,
1630            )
1631        except Exception as e:
1632            logging.error(f"An error occured during model download: {e}")
1633
1634        model_path = os.path.join(dataset_name, config_key, "model.pth")
1635        logging.info(f"Starting inference for model {model_path}")
1636
1637        inference_result = self._run_container(
1638            volume_data=volume_data,
1639            param_function=param_function,
1640            param_config=param_config,
1641            param_n_gpus=param_n_gpus,
1642            container_tool=container_tool,
1643            param_inference_dataset_folder=folder_inference,
1644            param_inference_model_checkpoint=model_path,
1645        )
1646
1647        # Convert results from JSON output into V51 dataset
1648        # Files follow format inference_results_{timestamp}.json (run_inference.py)
1649        os.makedirs(inference_output_folder, exist_ok=True)
1650        output_files = [
1651            f
1652            for f in os.listdir(inference_output_folder)
1653            if f.startswith("inference_results_") and f.endswith(".json")
1654        ]
1655        logging.debug(f"Found files with inference content: {output_files}")
1656
1657        if not output_files:
1658            logging.error(
1659                f"No inference result files found in {inference_output_folder}"
1660            )
1661
1662        # Get full path for each file
1663        file_paths = [os.path.join(inference_output_folder, f) for f in output_files]
1664
1665        # Extract timestamp from the filename and sort based on the timestamp
1666        file_paths_sorted = sorted(
1667            file_paths,
1668            key=lambda f: datetime.datetime.strptime(
1669                f.split("_")[-2] + "_" + f.split("_")[-1].replace(".json", ""),
1670                "%Y%m%d_%H%M%S",
1671            ),
1672            reverse=True,
1673        )
1674
1675        # Use the most recent file based on timestamp
1676        latest_file = file_paths_sorted[0]
1677        logging.info(f"Using inference results from: {latest_file}")
1678        with open(latest_file, "r") as file:
1679            data = json.load(file)
1680
1681        # Get conversion for annotated classes
1682        annotations_path = os.path.join(
1683            volume_data, "coco", "annotations", "instances_train2017.json"
1684        )
1685
1686        with open(annotations_path, "r") as file:
1687            data_annotations = json.load(file)
1688
1689        class_ids_and_names = [
1690            (category["id"], category["name"])
1691            for category in data_annotations["categories"]
1692        ]
1693
1694        # Match sample filepaths (from exported Co-DETR COCO format) to V51 filepaths
1695        sample = dataset.first()
1696        root_dir_samples = sample.filepath
1697
1698        # Convert results into V51 file format
1699        detection_threshold = inference_settings["detection_threshold"]
1700        pred_key = f"pred_od_{config_key}-{dataset_name}"
1701        for key, value in tqdm(data.items(), desc="Processing Co-DETR detection"):
1702            try:
1703                # Get filename
1704                filepath = CustomCoDETRObjectDetection._find_file_iteratively(
1705                    root_dir_samples, os.path.basename(key)
1706                )
1707                sample = dataset[filepath]
1708
1709                img_width = sample.metadata.width
1710                img_height = sample.metadata.height
1711
1712                detections_v51 = []
1713                for class_id, class_detections in enumerate(data[key]):  # Starts with 0
1714                    if len(class_detections) > 0:
1715                        objects_class = class_ids_and_names[class_id]
1716                        for detection in class_detections:
1717                            confidence = detection[4]
1718                            detection_v51 = fo.Detection(
1719                                label=objects_class[1],
1720                                bounding_box=[
1721                                    detection[0] / img_width,
1722                                    detection[1] / img_height,
1723                                    (detection[2] - detection[0]) / img_width,
1724                                    (detection[3] - detection[1]) / img_height,
1725                                ],
1726                                confidence=confidence,
1727                            )
1728                            if confidence >= detection_threshold:
1729                                detections_v51.append(detection_v51)
1730
1731                sample[pred_key] = fo.Detections(detections=detections_v51)
1732                sample.save()
1733            except Exception as e:
1734                logging.error(
1735                    f"An error occured during the conversion of Co-DETR inference results to the V51 dataset: {e}"
1736                )
1737
1738        # Run V51 evaluation
1739        if inference_settings["do_eval"] is True:
1740            eval_key = pred_key.replace("pred_", "eval_").replace("-", "_")
1741
1742            if inference_settings["inference_on_test"] is True:
1743                dataset_view = dataset.match_tags(["test"])
1744            else:
1745                dataset_view = dataset
1746
1747            logging.info(
1748                f"Starting evaluation for {pred_key} in evaluation key {eval_key}."
1749            )
1750            dataset_view.evaluate_detections(
1751                pred_key,
1752                gt_field=gt_field,
1753                eval_key=eval_key,
1754                compute_mAP=True,
1755            )

Run inference using trained Co-DETR model and convert results to FiftyOne format