main

   1import datetime
   2import logging
   3import os
   4import signal
   5import sys
   6import time
   7import warnings
   8from typing import Dict, List
   9
  10import torch
  11
  12IGNORE_FUTURE_WARNINGS = True
  13if IGNORE_FUTURE_WARNINGS:
  14    warnings.simplefilter("ignore", category=FutureWarning)
  15
  16import gc
  17
  18import fiftyone as fo
  19import torch.multiprocessing as mp
  20from tqdm import tqdm
  21
  22from config.config import (
  23    SELECTED_DATASET,
  24    SELECTED_WORKFLOW,
  25    V51_ADDRESS,
  26    V51_PORT,
  27    V51_REMOTE,
  28    WORKFLOWS,
  29)
  30from utils.anomaly_detection_data_preparation import AnomalyDetectionDataPreparation
  31from utils.data_loader import FiftyOneTorchDatasetCOCO, TorchToHFDatasetCOCO
  32from utils.dataset_loader import load_dataset
  33from utils.logging import configure_logging
  34from utils.mp_distribution import ZeroShotDistributer
  35from utils.sidebar_groups import arrange_fields_in_groups
  36from utils.wandb_helper import wandb_close, wandb_init
  37from workflows.anomaly_detection import Anodec
  38from workflows.auto_labeling import (
  39    CustomCoDETRObjectDetection,
  40    CustomRFDETRObjectDetection,
  41    HuggingFaceObjectDetection,
  42    UltralyticsObjectDetection,
  43    ZeroShotObjectDetection,
  44)
  45from workflows.aws_download import AwsDownloader
  46from workflows.class_mapping import ClassMapper
  47from workflows.embedding_selection import EmbeddingSelection
  48from workflows.ensemble_selection import EnsembleSelection
  49from workflows.auto_label_mask import AutoLabelMask
  50from workflows.class_mapping import ClassMapper
  51from workflows.data_ingest import run_data_ingest
  52
  53wandb_run = None  # Init globally to make sure it is available
  54
  55def signal_handler(sig, frame):
  56    """Handle Ctrl+C signal by cleaning up resources and exiting."""
  57    logging.error("You pressed Ctrl+C!")
  58    try:
  59        wandb_close(exit_code=1)
  60        cleanup_memory()
  61    except:
  62        pass
  63    sys.exit(0)
  64
  65
  66def workflow_aws_download(parameters, wandb_activate=True):
  67    """Download and process data from AWS S3 bucket."""
  68    dataset = None
  69    dataset_name = None
  70    wandb_exit_code = 0
  71    files_to_be_downloaded = 0
  72    try:
  73        # Config
  74        bucket = parameters["bucket"]
  75        prefix = parameters["prefix"]
  76        download_path = parameters["download_path"]
  77        test_run = parameters["test_run"]
  78
  79        # Logging
  80        now = datetime.datetime.now()
  81        datetime_str = now.strftime("%Y-%m-%d_%H-%M-%S")
  82        log_dir = f"logs/tensorboard/aws_{datetime_str}"
  83
  84        dataset_name = f"annarbor_rolling_{datetime_str}"
  85
  86        # Weights and Biases
  87        wandb_run = wandb_init(
  88            run_name=dataset_name,
  89            project_name="AWS Download",
  90            dataset_name=dataset_name,
  91            log_dir=log_dir,
  92            wandb_activate=wandb_activate,
  93        )
  94
  95        # Workflow
  96        aws_downloader = AwsDownloader(
  97            bucket=bucket,
  98            prefix=prefix,
  99            download_path=download_path,
 100            test_run=test_run,
 101        )
 102
 103        (
 104            sub_folder,
 105            files,
 106            files_to_be_downloaded,
 107            DOWNLOAD_NUMBER_SUCCESS,
 108            DOWNLOAD_SIZE_SUCCESS,
 109        ) = aws_downloader.download_files(log_dir=log_dir)
 110
 111        dataset = aws_downloader.decode_data(
 112            sub_folder=sub_folder,
 113            files=files,
 114            log_dir=log_dir,
 115            dataset_name=dataset_name,
 116        )
 117
 118    except Exception as e:
 119        logging.error(f"AWS Download and Extraction failed: {e}")
 120        wandb_exit_code = 1
 121
 122    finally:
 123        wandb_close(wandb_exit_code)
 124
 125    return dataset, dataset_name, files_to_be_downloaded
 126
 127
 128def workflow_anomaly_detection(
 129    dataset_normal,
 130    dataset_ano_dec,
 131    dataset_info,
 132    eval_metrics,
 133    run_config,
 134    wandb_activate=True,
 135):
 136    """Run anomaly detection workflow using specified models and configurations."""
 137    try:
 138        # Weights and Biases
 139        wandb_exit_code = 0
 140        wandb_run, log_dir = wandb_init(
 141            run_name=run_config["model_name"],
 142            project_name="Selection by Anomaly Detection",
 143            dataset_name=dataset_info["name"],
 144            config=run_config,
 145            wandb_activate=wandb_activate,
 146        )
 147
 148        # Workflow
 149
 150        SUPPORTED_MODES = ["train", "inference"]
 151        # Check if all selected modes are supported
 152        for mode in run_config["mode"]:
 153            if mode not in SUPPORTED_MODES:
 154                logging.error(f"Selected mode {mode} is not supported.")
 155        if SUPPORTED_MODES[0] in run_config["mode"]:
 156            ano_dec = Anodec(
 157                dataset=dataset_ano_dec,
 158                eval_metrics=eval_metrics,
 159                dataset_info=dataset_info,
 160                config=run_config,
 161                tensorboard_output=log_dir,
 162            )
 163            ano_dec.train_and_export_model()
 164            ano_dec.run_inference(mode=SUPPORTED_MODES[0])
 165            ano_dec.eval_v51()
 166        if SUPPORTED_MODES[1] in run_config["mode"]:
 167            ano_dec = Anodec(
 168                dataset=dataset_normal,
 169                eval_metrics=eval_metrics,
 170                dataset_info=dataset_info,
 171                config=run_config,
 172                tensorboard_output=log_dir,
 173            )
 174            ano_dec.run_inference(mode=SUPPORTED_MODES[1])
 175
 176    except Exception as e:
 177        logging.error(
 178            f"Error in Anomaly Detection for model {run_config['model_name']}: {e}"
 179        )
 180        wandb_exit_code = 1
 181    finally:
 182        wandb_close(exit_code=wandb_exit_code)
 183
 184    return True
 185
 186
 187def workflow_embedding_selection(
 188    dataset, dataset_info, MODEL_NAME, config, wandb_activate=True
 189):
 190    """Compute embeddings and find representative and rare images for dataset selection."""
 191    try:
 192        wandb_exit_code = 0
 193        wandb_run, log_dir = wandb_init(
 194            run_name=MODEL_NAME,
 195            project_name="Selection by Embedding",
 196            dataset_name=dataset_info["name"],
 197            config=config,
 198            wandb_activate=wandb_activate,
 199        )
 200        embedding_selector = EmbeddingSelection(
 201            dataset, dataset_info, MODEL_NAME, log_dir
 202        )
 203
 204        if embedding_selector.model_already_used == False:
 205
 206            embedding_selector.compute_embeddings(config["mode"])
 207            embedding_selector.compute_similarity()
 208
 209            # Find representative and unique samples as center points for further selections
 210            thresholds = config["parameters"]
 211            embedding_selector.compute_representativeness(
 212                thresholds["compute_representativeness"]
 213            )
 214            embedding_selector.compute_unique_images_greedy(
 215                thresholds["compute_unique_images_greedy"]
 216            )
 217            embedding_selector.compute_unique_images_deterministic(
 218                thresholds["compute_unique_images_deterministic"]
 219            )
 220
 221            # Select samples similar to the center points to enlarge the dataset
 222            embedding_selector.compute_similar_images(
 223                thresholds["compute_similar_images"], thresholds["neighbour_count"]
 224            )
 225        else:
 226            logging.warning(
 227                f"Skipping model {embedding_selector.model_name_key}. It was already used for sample selection."
 228            )
 229
 230    except Exception as e:
 231        logging.error(f"An error occurred with model {MODEL_NAME}: {e}")
 232        wandb_exit_code = 1
 233    finally:
 234        wandb_close(wandb_exit_code)
 235
 236    return True
 237
 238
 239def workflow_auto_labeling_ultralytics(dataset, run_config, wandb_activate=True):
 240    """Auto-labeling workflow using Ultralytics models with optional training and inference."""
 241    try:
 242        wandb_exit_code = 0
 243        wandb_run = wandb_init(
 244            run_name=run_config["model_name"],
 245            project_name="Auto Labeling Ultralytics",
 246            dataset_name=run_config["v51_dataset_name"],
 247            config=run_config,
 248            wandb_activate=wandb_activate,
 249        )
 250
 251        detector = UltralyticsObjectDetection(dataset=dataset, config=run_config)
 252
 253        # Check if all selected modes are supported
 254        SUPPORTED_MODES = ["train", "inference"]
 255        for mode in run_config["mode"]:
 256            if mode not in SUPPORTED_MODES:
 257                logging.error(f"Selected mode {mode} is not supported.")
 258
 259        if SUPPORTED_MODES[0] in run_config["mode"]:
 260            logging.info(f"Training model {run_config['model_name']}")
 261            detector.train()
 262        if SUPPORTED_MODES[1] in run_config["mode"]:
 263            logging.info(f"Running inference for model {run_config['model_name']}")
 264            detector.inference()
 265
 266    except Exception as e:
 267        logging.error(f"An error occurred with model {run_config['model_name']}: {e}")
 268        wandb_exit_code = 1
 269
 270    finally:
 271        wandb_close(wandb_exit_code)
 272
 273    return True
 274
 275
 276def workflow_auto_labeling_hf(dataset, hf_dataset, run_config, wandb_activate=True):
 277    """Auto-labeling using Hugging Face models on a dataset, including training and/or inference based on the provided configuration."""
 278    try:
 279        wandb_exit_code = 0
 280        wandb_run = wandb_init(
 281            run_name=run_config["model_name"],
 282            project_name="Auto Labeling Hugging Face",
 283            dataset_name=run_config["v51_dataset_name"],
 284            config=run_config,
 285            wandb_activate=wandb_activate,
 286        )
 287
 288        detector = HuggingFaceObjectDetection(
 289            dataset=dataset,
 290            config=run_config,
 291        )
 292        SUPPORTED_MODES = ["train", "inference"]
 293
 294        # Check if all selected modes are supported
 295        for mode in run_config["mode"]:
 296            if mode not in SUPPORTED_MODES:
 297                logging.error(f"Selected mode {mode} is not supported.")
 298        if SUPPORTED_MODES[0] in run_config["mode"]:
 299            logging.info(f"Training model {run_config['model_name']}")
 300            detector.train(hf_dataset)
 301        if SUPPORTED_MODES[1] in run_config["mode"]:
 302            logging.info(f"Running inference for model {run_config['model_name']}")
 303            detector.inference(inference_settings=run_config["inference_settings"])
 304
 305    except Exception as e:
 306        logging.error(f"An error occurred with model {run_config['model_name']}: {e}")
 307        wandb_exit_code = 1
 308
 309    finally:
 310        wandb_close(wandb_exit_code)
 311
 312    return True
 313
 314
 315def workflow_auto_labeling_custom_codetr(
 316    dataset, dataset_info, run_config, wandb_activate=True
 317):
 318    """Auto labeling workflow using Co-DETR model supporting training and inference modes."""
 319
 320    try:
 321        wandb_exit_code = 0
 322        wandb_run = wandb_init(
 323            run_name=run_config["config"],
 324            project_name="Co-DETR Auto Labeling",
 325            dataset_name=dataset_info["name"],
 326            config=run_config,
 327            wandb_activate=wandb_activate,
 328        )
 329
 330        mode = run_config["mode"]
 331
 332        detector = CustomCoDETRObjectDetection(dataset, dataset_info, run_config)
 333        detector.convert_data()
 334        if "train" in mode:
 335            detector.update_config_file(
 336                dataset_name=dataset_info["name"],
 337                config_file=run_config["config"],
 338                max_epochs=run_config["epochs"],
 339            )
 340            detector.train(
 341                run_config["config"], run_config["n_gpus"], run_config["container_tool"]
 342            )
 343        if "inference" in mode:
 344            detector.run_inference(
 345                dataset,
 346                run_config["config"],
 347                run_config["n_gpus"],
 348                run_config["container_tool"],
 349                run_config["inference_settings"],
 350            )
 351    except Exception as e:
 352        logging.error(f"Error during CoDETR training: {e}")
 353        wandb_exit_code = 1
 354    finally:
 355        wandb_close(wandb_exit_code)
 356
 357    return True
 358
 359
 360def workflow_zero_shot_object_detection(dataset, dataset_info, config):
 361    """Run zero-shot object detection on a dataset using models from Huggingface, supporting both single and multi-GPU inference."""
 362    # Set multiprocessing mode for CUDA multiprocessing
 363    try:
 364        mp.set_start_method("spawn", force=True)
 365        logging.debug("Successfully set multiprocessing start method to 'spawn'")
 366    except RuntimeError as e:
 367        # This is expected if the start method was already set
 368        logging.debug(f"Multiprocessing start method was already set: {e}")
 369    except Exception as e:
 370        # Handle any other unexpected errors
 371        logging.error(f"Failed to set multiprocessing start method: {e}")
 372
 373    # Zero-shot object detector models from Huggingface
 374    # Optimized for parallel multi-GPU inference, also supports single GPU
 375    dataset_torch = FiftyOneTorchDatasetCOCO(dataset)
 376    detector = ZeroShotObjectDetection(
 377        dataset_torch=dataset_torch, dataset_info=dataset_info, config=config
 378    )
 379
 380    # Check if model detections are already stored in V51 dataset or on disk
 381    models_splits_dict = detector.exclude_stored_predictions(
 382        dataset_v51=dataset, config=config
 383    )
 384    if len(models_splits_dict) > 0:
 385        config["hf_models_zeroshot_objectdetection"] = models_splits_dict
 386        distributor = ZeroShotDistributer(
 387            config=config,
 388            n_samples=len(dataset_torch),
 389            dataset_info=dataset_info,
 390            detector=detector,
 391        )
 392        distributor.distribute_and_run()
 393    else:
 394        logging.info(
 395            "All zero shot models already have predictions stored in the dataset."
 396        )
 397
 398    # To make new fields available to follow-up processes
 399    dataset.reload()
 400    dataset.save()
 401
 402    return True
 403
 404
 405def workflow_auto_label_mask(dataset, dataset_info, config):
 406    try:
 407        depth_config = config["depth_estimation"]
 408        seg_config = config["semantic_segmentation"]
 409
 410        for architecture_name, architecture_info in depth_config.items():
 411            auto_labeler = AutoLabelMask(
 412                dataset=dataset,
 413                dataset_info=dataset_info,
 414                model_name=architecture_name,
 415                task_type="depth_estimation",
 416                model_config=architecture_info,
 417            )
 418            auto_labeler.run_inference()
 419
 420        for architecture_name, architecture_info in seg_config.items():
 421            auto_labeler = AutoLabelMask(
 422                dataset=dataset,
 423                dataset_info=dataset_info,
 424                model_name=architecture_name,
 425                task_type="semantic_segmentation",
 426                model_config=architecture_info,
 427            )
 428            auto_labeler.run_inference()
 429
 430    except Exception as e:
 431        logging.error(f"Auto-labeling mask workflow failed: {e}")
 432        raise
 433
 434
 435def workflow_ensemble_selection(dataset, dataset_info, run_config, wandb_activate=True):
 436    """Runs ensemble selection workflow on given dataset using provided configuration."""
 437    try:
 438        wandb_exit_code = 0
 439
 440        wandb_run = wandb_init(
 441            run_name="Selection by Ensemble",
 442            project_name="Ensemble Selection",
 443            dataset_name=dataset_info["name"],
 444            config=run_config,
 445            wandb_activate=wandb_activate,
 446        )
 447        ensemble_selecter = EnsembleSelection(dataset, run_config)
 448        ensemble_selecter.ensemble_selection()
 449    except Exception as e:
 450        logging.error(f"An error occured during Ensemble Selection: {e}")
 451        wandb_exit_code = 1
 452
 453    finally:
 454        wandb_close(wandb_exit_code)
 455
 456    return True
 457
 458
 459def workflow_class_mapping(
 460    dataset,
 461    dataset_info,
 462    run_config,
 463    wandb_activate=True,
 464    test_dataset_source=None,
 465    test_dataset_target=None,
 466):
 467    """Runs class mapping workflow to align labels between the source dataset and target dataset."""
 468    try:
 469        wandb_exit_code = 0
 470        # Initialize a wandb run for class mapping
 471        wandb_run = wandb_init(
 472            run_name="class_mapping",
 473            project_name="Class Mapping",
 474            dataset_name=dataset_info["name"],
 475            config=run_config,
 476            wandb_activate=wandb_activate,
 477        )
 478        class_mapping_models = run_config["hf_models_zeroshot_classification"]
 479
 480        for model_name in (
 481            pbar := tqdm(class_mapping_models, desc="Processing Class Mapping")
 482        ):
 483            pbar.set_description(f"Zero Shot Classification model {model_name}")
 484            mapper = ClassMapper(dataset, model_name, run_config)
 485            try:
 486                stats = mapper.run_mapping(test_dataset_source, test_dataset_target)
 487                # Display statistics only if mapping was successful
 488                source_class_counts = stats["total_processed"]
 489                logging.info("\nClassification Results for Source Dataset:")
 490                for source_class, count in stats["source_class_counts"].items():
 491                    percentage = (
 492                        (count / source_class_counts) * 100
 493                        if source_class_counts > 0
 494                        else 0
 495                    )
 496                    logging.info(
 497                        f"{source_class}: {count} samples processed ({percentage:.1f}%)"
 498                    )
 499
 500                # Display statistics for tags added to Target Dataset
 501                logging.info("\nTag Addition Results (Target Dataset Tags):")
 502                logging.info(f"Total new tags added: {stats['changes_made']}")
 503                for target_class, tag_count in stats["tags_added_per_category"].items():
 504                    logging.info(f"{target_class} tags added: {tag_count}")
 505
 506            except Exception as e:
 507                logging.error(f"Error during mapping with model {model_name}: {e}")
 508
 509    except Exception as e:
 510        logging.error(f"Error in class_mapping workflow: {e}")
 511        wandb_exit_code = 1
 512    finally:
 513        wandb_close(wandb_exit_code)
 514
 515    return True
 516
 517
 518def cleanup_memory(do_extensive_cleanup=False):
 519    """Clean up memory after workflow execution. 'do_extensive_cleanup' recommended for multiple training sessions in a row."""
 520    logging.info("Starting memory cleanup")
 521    # Clear CUDA cache
 522    if torch.cuda.is_available():
 523        torch.cuda.empty_cache()
 524
 525    # Force garbage collection
 526    gc.collect()
 527
 528    if do_extensive_cleanup:
 529
 530        # Clear any leftover tensors
 531        n_deleted_torch_objects = 0
 532        for obj in tqdm(
 533            gc.get_objects(), desc="Deleting objects from Python Garbage Collector"
 534        ):
 535            try:
 536                if torch.is_tensor(obj):
 537                    del obj
 538                    n_deleted_torch_objects += 1
 539            except:
 540                pass
 541
 542        logging.info(f"Deleted {n_deleted_torch_objects} torch objects")
 543
 544        # Final garbage collection
 545        gc.collect()
 546
 547
 548class WorkflowExecutor:
 549    """Orchestrates the execution of multiple data processing workflows in sequence."""
 550
 551    def __init__(
 552        self,
 553        workflows: List[str],
 554        selected_dataset: str,
 555        dataset: fo.Dataset,
 556        dataset_info: Dict,
 557    ):
 558        """Initializes with specified workflows, dataset selection, and dataset metadata."""
 559        self.workflows = workflows
 560        self.selected_dataset = selected_dataset
 561        self.dataset = dataset
 562        self.dataset_info = dataset_info
 563
 564    def execute(self) -> bool:
 565        """Execute all configured workflows in sequence and handle errors."""
 566        if len(self.workflows) == 0:
 567            logging.error("No workflows selected.")
 568            return False
 569
 570        logging.info(f"Selected workflows: {self.workflows}")
 571        for workflow in self.workflows:
 572            logging.info(
 573                f"Running workflow {workflow} for dataset {self.selected_dataset}"
 574            )
 575            try:
 576                if workflow == "aws_download":
 577                    parameter_group = "mcity"
 578                    parameters = WORKFLOWS["aws_download"].get(parameter_group, None)
 579                    if parameter_group == "mcity":
 580                        dataset, dataset_name, _ = workflow_aws_download(parameters)
 581                    else:
 582                        logging.error(
 583                            f"The parameter group {parameter_group} is not supported. As AWS are highly specific, please provide a separate set of parameters and a workflow."
 584                        )
 585
 586                    # Select downloaded dataset for further workflows if configured
 587                    if dataset is not None:
 588                        if parameters["selected_dataset_overwrite"] == True:
 589
 590                            dataset_info = {
 591                                "name": dataset_name,
 592                                "v51_type": "FiftyOneDataset",
 593                                "splits": [],
 594                            }
 595
 596                            self.dataset = dataset
 597                            self.dataset_info = dataset_info
 598                            logging.warning(
 599                                f"Overwritting selected dataset {self.selected_dataset} with {dataset_name}"
 600                            )
 601                            self.selected_dataset = dataset_name
 602
 603                elif workflow == "embedding_selection":
 604                    embedding_models = WORKFLOWS["embedding_selection"][
 605                        "embedding_models"
 606                    ]
 607
 608                    for MODEL_NAME in (
 609                        pbar := tqdm(embedding_models, desc="Selection by Embeddings")
 610                    ):
 611
 612                        # Status
 613                        pbar.set_description(
 614                            f"Selection by embeddings with model {MODEL_NAME}."
 615                        )
 616
 617                        # Config
 618                        mode = WORKFLOWS["embedding_selection"]["mode"]
 619                        parameters = WORKFLOWS["embedding_selection"]["parameters"]
 620                        config = {"mode": mode, "parameters": parameters}
 621
 622                        # Workflow
 623                        workflow_embedding_selection(
 624                            self.dataset,
 625                            self.dataset_info,
 626                            MODEL_NAME,
 627                            config,
 628                        )
 629
 630                elif workflow == "anomaly_detection":
 631
 632                    # Config
 633                    ano_dec_config = WORKFLOWS["anomaly_detection"]
 634                    anomalib_image_models = ano_dec_config["anomalib_image_models"]
 635                    eval_metrics = ano_dec_config["anomalib_eval_metrics"]
 636
 637                    dataset_ano_dec = None
 638                    data_root = None
 639                    if "train" in ano_dec_config["mode"]:
 640                        try:
 641                            data_preparer = AnomalyDetectionDataPreparation(
 642                                self.dataset, self.selected_dataset
 643                            )
 644                            dataset_ano_dec = data_preparer.dataset_ano_dec
 645                            data_root = data_preparer.export_root
 646                        except Exception as e:
 647                            logging.error(
 648                                f"Error during data preparation for Anomaly Detection: {e}"
 649                            )
 650
 651                    for MODEL_NAME in (
 652                        pbar := tqdm(anomalib_image_models, desc="Anomalib")
 653                    ):
 654                        # Status
 655                        pbar.set_description(f"Anomalib model {MODEL_NAME}")
 656
 657                        # Config
 658                        run_config = {
 659                            "model_name": MODEL_NAME,
 660                            "image_size": anomalib_image_models[MODEL_NAME].get(
 661                                "image_size", None
 662                            ),
 663                            "batch_size": anomalib_image_models[MODEL_NAME].get(
 664                                "batch_size", 1
 665                            ),
 666                            "epochs": ano_dec_config["epochs"],
 667                            "early_stop_patience": ano_dec_config[
 668                                "early_stop_patience"
 669                            ],
 670                            "data_root": data_root,
 671                            "mode": ano_dec_config["mode"],
 672                        }
 673
 674                        # Workflow
 675                        workflow_anomaly_detection(
 676                            self.dataset,
 677                            dataset_ano_dec,
 678                            self.dataset_info,
 679                            eval_metrics,
 680                            run_config,
 681                        )
 682
 683                elif workflow == "auto_labeling":
 684
 685                    # Config
 686                    SUPPORTED_MODEL_SOURCES = [
 687                        "hf_models_objectdetection",
 688                        "ultralytics",
 689                        "custom_codetr",
 690                        "roboflow",
 691                    ]
 692
 693                    # Common parameters between models
 694                    config_autolabel = WORKFLOWS["auto_labeling"]
 695                    mode = config_autolabel["mode"]
 696                    epochs = config_autolabel["epochs"]
 697                    selected_model_source = config_autolabel["model_source"]
 698
 699                    # Check if all selected modes are supported
 700                    for model_source in selected_model_source:
 701                        if model_source not in SUPPORTED_MODEL_SOURCES:
 702                            logging.error(
 703                                f"Selected model source {model_source} is not supported."
 704                            )
 705
 706                    if SUPPORTED_MODEL_SOURCES[0] in selected_model_source:
 707                        # Hugging Face Models
 708                        # Single GPU mode (https://github.com/huggingface/transformers/issues/28740)
 709                        os.environ["CUDA_VISIBLE_DEVICES"] = "0"
 710                        hf_models = config_autolabel["hf_models_objectdetection"]
 711
 712                        # Dataset Conversion
 713                        try:
 714                            logging.info("Converting dataset into Hugging Face format.")
 715                            pytorch_dataset = FiftyOneTorchDatasetCOCO(self.dataset)
 716                            pt_to_hf_converter = TorchToHFDatasetCOCO(pytorch_dataset)
 717                            hf_dataset = pt_to_hf_converter.convert()
 718                        except Exception as e:
 719                            logging.error(f"Error during dataset conversion: {e}")
 720
 721                        for MODEL_NAME in (
 722                            pbar := tqdm(hf_models, desc="Auto Labeling Models")
 723                        ):
 724                            # Status Update
 725                            pbar.set_description(
 726                                f"Processing Hugging Face model {MODEL_NAME}"
 727                            )
 728
 729                            # Config
 730                            config_model = config_autolabel[
 731                                "hf_models_objectdetection"
 732                            ][MODEL_NAME]
 733
 734                            run_config = {
 735                                "mode": mode,
 736                                "model_name": MODEL_NAME,
 737                                "v51_dataset_name": self.selected_dataset,
 738                                "epochs": epochs,
 739                                "early_stop_threshold": config_autolabel[
 740                                    "early_stop_threshold"
 741                                ],
 742                                "early_stop_patience": config_autolabel[
 743                                    "early_stop_patience"
 744                                ],
 745                                "learning_rate": config_autolabel["learning_rate"],
 746                                "weight_decay": config_autolabel["weight_decay"],
 747                                "max_grad_norm": config_autolabel["max_grad_norm"],
 748                                "batch_size": config_model.get("batch_size", 1),
 749                                "image_size": config_model.get("image_size", None),
 750                                "n_worker_dataloader": config_autolabel[
 751                                    "n_worker_dataloader"
 752                                ],
 753                                "inference_settings": config_autolabel[
 754                                    "inference_settings"
 755                                ],
 756                            }
 757
 758                            # Workflow
 759                            workflow_auto_labeling_hf(
 760                                self.dataset,
 761                                hf_dataset,
 762                                run_config,
 763                            )
 764
 765                    if SUPPORTED_MODEL_SOURCES[1] in selected_model_source:
 766                        # Ultralytics Models
 767                        config_ultralytics = config_autolabel["ultralytics"]
 768                        models_ultralytics = config_ultralytics["models"]
 769                        export_dataset_root = config_ultralytics["export_dataset_root"]
 770
 771                        # Export data into necessary format
 772                        if "train" in mode:
 773                            try:
 774                                UltralyticsObjectDetection.export_data(
 775                                    self.dataset,
 776                                    self.dataset_info,
 777                                    export_dataset_root,
 778                                )
 779                            except Exception as e:
 780                                logging.error(
 781                                    f"Error during Ultralytics dataset export: {e}"
 782                                )
 783
 784                        for model_name in (
 785                            pbar := tqdm(
 786                                models_ultralytics, desc="Ultralytics training"
 787                            )
 788                        ):
 789                            pbar.set_description(f"Ultralytics model {model_name}")
 790                            run_config = {
 791                                "mode": mode,
 792                                "model_name": model_name,
 793                                "v51_dataset_name": self.dataset_info["name"],
 794                                "epochs": epochs,
 795                                "patience": config_autolabel["early_stop_patience"],
 796                                "batch_size": models_ultralytics[model_name][
 797                                    "batch_size"
 798                                ],
 799                                "img_size": models_ultralytics[model_name]["img_size"],
 800                                "export_dataset_root": export_dataset_root,
 801                                "inference_settings": config_autolabel[
 802                                    "inference_settings"
 803                                ],
 804                                "multi_scale": config_ultralytics["multi_scale"],
 805                                "cos_lr": config_ultralytics["cos_lr"],
 806                            }
 807
 808                            workflow_auto_labeling_ultralytics(self.dataset, run_config)
 809
 810                    if SUPPORTED_MODEL_SOURCES[2] in selected_model_source:
 811                        # Custom Co-DETR
 812                        config_codetr = config_autolabel["custom_codetr"]
 813                        run_config = {
 814                            "export_dataset_root": config_codetr["export_dataset_root"],
 815                            "container_tool": config_codetr["container_tool"],
 816                            "n_gpus": config_codetr["n_gpus"],
 817                            "mode": config_autolabel["mode"],
 818                            "epochs": config_autolabel["epochs"],
 819                            "inference_settings": config_autolabel[
 820                                "inference_settings"
 821                            ],
 822                            "config": None,
 823                        }
 824                        codetr_configs = config_codetr["configs"]
 825
 826                        for config in (
 827                            pbar := tqdm(
 828                                codetr_configs, desc="Processing Co-DETR configurations"
 829                            )
 830                        ):
 831                            pbar.set_description(f"Co-DETR model {config}")
 832                            run_config["config"] = config
 833                            workflow_auto_labeling_custom_codetr(
 834                                self.dataset, self.dataset_info, run_config
 835                            )
 836
 837                    if SUPPORTED_MODEL_SOURCES[3] in selected_model_source:
 838
 839                        config_rfdetr = config_autolabel["roboflow"]
 840
 841                        # Shared config parameters
 842                        shared_config = {
 843                            "epochs": config_autolabel["epochs"],
 844                            "learning_rate": config_autolabel["learning_rate"],
 845                            "weight_decay": config_autolabel["weight_decay"],
 846                            "early_stop_patience": config_autolabel["early_stop_patience"],
 847                            "early_stop_threshold": config_autolabel["early_stop_threshold"],
 848                        }
 849
 850                        run_config = {
 851                            "export_dataset_root": config_rfdetr["export_dataset_root"],
 852                            "mode": config_autolabel["mode"],
 853                            "inference_settings": config_autolabel["inference_settings"],
 854                            "config": None,
 855                            # RF-DETR specific parameters
 856                            "batch_size": config_rfdetr["batch_size"],
 857                            "grad_accum_steps": config_rfdetr["grad_accum_steps"],
 858                            "lr_encoder": config_rfdetr["lr_encoder"],
 859                            "resolution": config_rfdetr["resolution"],
 860                            "use_ema": config_rfdetr["use_ema"],
 861                            "gradient_checkpointing": config_rfdetr["gradient_checkpointing"],
 862                            "early_stopping_min_delta": config_rfdetr["early_stopping_min_delta"],
 863                            "early_stopping_use_ema": config_rfdetr["early_stopping_use_ema"],
 864                        }
 865
 866                        rfdetr_configs = config_rfdetr["configs"]
 867
 868                        for config in (
 869                            pbar := tqdm(rfdetr_configs, desc="Processing RF-DETR configurations")
 870                        ):
 871                            pbar.set_description(f"RF-DETR model {config}")
 872                            run_config["config"] = config
 873
 874                            try:
 875                                wandb_exit_code = 0
 876                                wandb_run = wandb_init(
 877                                    run_name=config,
 878                                    project_name="RF-DETR Auto Labeling",
 879                                    dataset_name=self.dataset_info["name"],
 880                                    config=run_config,
 881                                    wandb_activate=True,
 882                                )
 883
 884                                detector = CustomRFDETRObjectDetection(
 885                                    self.dataset, self.dataset_info, run_config
 886                                )
 887
 888                                # Convert data to RF-DETR format
 889                                detector.convert_data()
 890
 891                                # Training
 892                                if "train" in mode:
 893                                    logging.info(f"Training RF-DETR model: {config}")
 894                                    detector.train(run_config, shared_config)
 895
 896                                # Inference
 897                                if "inference" in mode:
 898                                    logging.info(f"Running inference for RF-DETR model: {config}")
 899                                    detector.inference(
 900                                        inference_settings=config_autolabel["inference_settings"]
 901                                    )
 902
 903                            except Exception as e:
 904                                logging.error(f"Error during RF-DETR workflow with {config}: {e}")
 905                                wandb_exit_code = 1
 906                            finally:
 907                                wandb_close(wandb_exit_code)
 908
 909
 910                elif workflow == "auto_labeling_zero_shot":
 911                    config = WORKFLOWS["auto_labeling_zero_shot"]
 912                    workflow_zero_shot_object_detection(
 913                        self.dataset, self.dataset_info, config
 914                    )
 915
 916                elif workflow == "ensemble_selection":
 917                    # Config
 918                    run_config = WORKFLOWS["ensemble_selection"]
 919
 920                    # Workflow
 921                    workflow_ensemble_selection(
 922                        self.dataset, self.dataset_info, run_config
 923                    )
 924
 925                elif workflow == "auto_label_mask":
 926                    config = WORKFLOWS["auto_label_mask"]
 927                    workflow_auto_label_mask(self.dataset, self.dataset_info, config)
 928
 929                elif workflow == "class_mapping":
 930                    # Config
 931                    run_config = WORKFLOWS["class_mapping"]
 932
 933                    # Workflow
 934                    workflow_class_mapping(
 935                        self.dataset,
 936                        self.dataset_info,
 937                        run_config,
 938                        test_dataset_source=None,
 939                        test_dataset_target=None,
 940                    )
 941                elif workflow == "data_ingest":
 942                    dataset = run_data_ingest()
 943
 944                    dataset_info = {
 945                        "name": "custom_dataset",
 946                        "v51_type": "FiftyOneDataset",
 947                        "splits": ["train", "val", "test"],
 948                    }
 949
 950                    self.dataset = dataset
 951                    self.dataset_info = dataset_info
 952                    self.selected_dataset = "custom_dataset"
 953
 954                    logging.info(f"Data ingestion completed successfully.")
 955
 956
 957                else:
 958                    logging.error(
 959                        f"Workflow {workflow} not found. Check available workflows in config.py."
 960                    )
 961                    return False
 962
 963                cleanup_memory()  # Clean after each workflow
 964                logging.info(f"Completed workflow {workflow} and cleaned up memory")
 965
 966            except Exception as e:
 967                logging.error(f"Workflow {workflow}: An error occurred: {e}")
 968                wandb_close(exit_code=1)
 969                cleanup_memory()  # Clean up even after failure
 970
 971        return True
 972
 973
 974def main():
 975    """Executes the data processing workflow, loads dataset, and launches Voxel51 visualization interface."""
 976    time_start = time.time()
 977    configure_logging()
 978
 979    # Signal handler for CTRL + C
 980    signal.signal(signal.SIGINT, signal_handler)
 981
 982    # Execute workflows
 983    if "data_ingest" in SELECTED_WORKFLOW:
 984        executor = WorkflowExecutor(
 985            SELECTED_WORKFLOW,
 986            SELECTED_DATASET["name"],
 987            dataset=None,
 988            dataset_info=None,
 989        )
 990        executor.execute()
 991
 992        # FIX: Pull back outputs after ingestion
 993        dataset = executor.dataset
 994        dataset_info = executor.dataset_info
 995
 996    else:
 997        dataset, dataset_info = load_dataset(SELECTED_DATASET)
 998
 999        executor = WorkflowExecutor(
1000            SELECTED_WORKFLOW,
1001            SELECTED_DATASET["name"],
1002            dataset,
1003            dataset_info,
1004        )
1005        executor.execute()
1006
1007
1008    if dataset is not None:
1009        dataset.reload()
1010        dataset.save()
1011        arrange_fields_in_groups(dataset)
1012        logging.info(f"Launching Voxel51 session for dataset {dataset_info['name']}.")
1013
1014        # Dataset stats
1015        logging.debug(dataset)
1016        logging.debug(dataset.stats(include_media=True))
1017
1018        # V51 UI launch
1019        session = fo.launch_app(
1020            dataset, address=V51_ADDRESS, port=V51_PORT, remote=V51_REMOTE
1021        )
1022    else:
1023        logging.info("Skipping Voxel51 session.")
1024
1025
1026
1027    time_stop = time.time()
1028    logging.info(f"Elapsed time: {time_stop - time_start:.2f} seconds")
1029
1030
1031if __name__ == "__main__":
1032    cleanup_memory()
1033    main()
IGNORE_FUTURE_WARNINGS = True
wandb_run = None
def signal_handler(sig, frame):
56def signal_handler(sig, frame):
57    """Handle Ctrl+C signal by cleaning up resources and exiting."""
58    logging.error("You pressed Ctrl+C!")
59    try:
60        wandb_close(exit_code=1)
61        cleanup_memory()
62    except:
63        pass
64    sys.exit(0)

Handle Ctrl+C signal by cleaning up resources and exiting.

def workflow_aws_download(parameters, wandb_activate=True):
 67def workflow_aws_download(parameters, wandb_activate=True):
 68    """Download and process data from AWS S3 bucket."""
 69    dataset = None
 70    dataset_name = None
 71    wandb_exit_code = 0
 72    files_to_be_downloaded = 0
 73    try:
 74        # Config
 75        bucket = parameters["bucket"]
 76        prefix = parameters["prefix"]
 77        download_path = parameters["download_path"]
 78        test_run = parameters["test_run"]
 79
 80        # Logging
 81        now = datetime.datetime.now()
 82        datetime_str = now.strftime("%Y-%m-%d_%H-%M-%S")
 83        log_dir = f"logs/tensorboard/aws_{datetime_str}"
 84
 85        dataset_name = f"annarbor_rolling_{datetime_str}"
 86
 87        # Weights and Biases
 88        wandb_run = wandb_init(
 89            run_name=dataset_name,
 90            project_name="AWS Download",
 91            dataset_name=dataset_name,
 92            log_dir=log_dir,
 93            wandb_activate=wandb_activate,
 94        )
 95
 96        # Workflow
 97        aws_downloader = AwsDownloader(
 98            bucket=bucket,
 99            prefix=prefix,
100            download_path=download_path,
101            test_run=test_run,
102        )
103
104        (
105            sub_folder,
106            files,
107            files_to_be_downloaded,
108            DOWNLOAD_NUMBER_SUCCESS,
109            DOWNLOAD_SIZE_SUCCESS,
110        ) = aws_downloader.download_files(log_dir=log_dir)
111
112        dataset = aws_downloader.decode_data(
113            sub_folder=sub_folder,
114            files=files,
115            log_dir=log_dir,
116            dataset_name=dataset_name,
117        )
118
119    except Exception as e:
120        logging.error(f"AWS Download and Extraction failed: {e}")
121        wandb_exit_code = 1
122
123    finally:
124        wandb_close(wandb_exit_code)
125
126    return dataset, dataset_name, files_to_be_downloaded

Download and process data from AWS S3 bucket.

def workflow_anomaly_detection( dataset_normal, dataset_ano_dec, dataset_info, eval_metrics, run_config, wandb_activate=True):
129def workflow_anomaly_detection(
130    dataset_normal,
131    dataset_ano_dec,
132    dataset_info,
133    eval_metrics,
134    run_config,
135    wandb_activate=True,
136):
137    """Run anomaly detection workflow using specified models and configurations."""
138    try:
139        # Weights and Biases
140        wandb_exit_code = 0
141        wandb_run, log_dir = wandb_init(
142            run_name=run_config["model_name"],
143            project_name="Selection by Anomaly Detection",
144            dataset_name=dataset_info["name"],
145            config=run_config,
146            wandb_activate=wandb_activate,
147        )
148
149        # Workflow
150
151        SUPPORTED_MODES = ["train", "inference"]
152        # Check if all selected modes are supported
153        for mode in run_config["mode"]:
154            if mode not in SUPPORTED_MODES:
155                logging.error(f"Selected mode {mode} is not supported.")
156        if SUPPORTED_MODES[0] in run_config["mode"]:
157            ano_dec = Anodec(
158                dataset=dataset_ano_dec,
159                eval_metrics=eval_metrics,
160                dataset_info=dataset_info,
161                config=run_config,
162                tensorboard_output=log_dir,
163            )
164            ano_dec.train_and_export_model()
165            ano_dec.run_inference(mode=SUPPORTED_MODES[0])
166            ano_dec.eval_v51()
167        if SUPPORTED_MODES[1] in run_config["mode"]:
168            ano_dec = Anodec(
169                dataset=dataset_normal,
170                eval_metrics=eval_metrics,
171                dataset_info=dataset_info,
172                config=run_config,
173                tensorboard_output=log_dir,
174            )
175            ano_dec.run_inference(mode=SUPPORTED_MODES[1])
176
177    except Exception as e:
178        logging.error(
179            f"Error in Anomaly Detection for model {run_config['model_name']}: {e}"
180        )
181        wandb_exit_code = 1
182    finally:
183        wandb_close(exit_code=wandb_exit_code)
184
185    return True

Run anomaly detection workflow using specified models and configurations.

def workflow_embedding_selection(dataset, dataset_info, MODEL_NAME, config, wandb_activate=True):
188def workflow_embedding_selection(
189    dataset, dataset_info, MODEL_NAME, config, wandb_activate=True
190):
191    """Compute embeddings and find representative and rare images for dataset selection."""
192    try:
193        wandb_exit_code = 0
194        wandb_run, log_dir = wandb_init(
195            run_name=MODEL_NAME,
196            project_name="Selection by Embedding",
197            dataset_name=dataset_info["name"],
198            config=config,
199            wandb_activate=wandb_activate,
200        )
201        embedding_selector = EmbeddingSelection(
202            dataset, dataset_info, MODEL_NAME, log_dir
203        )
204
205        if embedding_selector.model_already_used == False:
206
207            embedding_selector.compute_embeddings(config["mode"])
208            embedding_selector.compute_similarity()
209
210            # Find representative and unique samples as center points for further selections
211            thresholds = config["parameters"]
212            embedding_selector.compute_representativeness(
213                thresholds["compute_representativeness"]
214            )
215            embedding_selector.compute_unique_images_greedy(
216                thresholds["compute_unique_images_greedy"]
217            )
218            embedding_selector.compute_unique_images_deterministic(
219                thresholds["compute_unique_images_deterministic"]
220            )
221
222            # Select samples similar to the center points to enlarge the dataset
223            embedding_selector.compute_similar_images(
224                thresholds["compute_similar_images"], thresholds["neighbour_count"]
225            )
226        else:
227            logging.warning(
228                f"Skipping model {embedding_selector.model_name_key}. It was already used for sample selection."
229            )
230
231    except Exception as e:
232        logging.error(f"An error occurred with model {MODEL_NAME}: {e}")
233        wandb_exit_code = 1
234    finally:
235        wandb_close(wandb_exit_code)
236
237    return True

Compute embeddings and find representative and rare images for dataset selection.

def workflow_auto_labeling_ultralytics(dataset, run_config, wandb_activate=True):
240def workflow_auto_labeling_ultralytics(dataset, run_config, wandb_activate=True):
241    """Auto-labeling workflow using Ultralytics models with optional training and inference."""
242    try:
243        wandb_exit_code = 0
244        wandb_run = wandb_init(
245            run_name=run_config["model_name"],
246            project_name="Auto Labeling Ultralytics",
247            dataset_name=run_config["v51_dataset_name"],
248            config=run_config,
249            wandb_activate=wandb_activate,
250        )
251
252        detector = UltralyticsObjectDetection(dataset=dataset, config=run_config)
253
254        # Check if all selected modes are supported
255        SUPPORTED_MODES = ["train", "inference"]
256        for mode in run_config["mode"]:
257            if mode not in SUPPORTED_MODES:
258                logging.error(f"Selected mode {mode} is not supported.")
259
260        if SUPPORTED_MODES[0] in run_config["mode"]:
261            logging.info(f"Training model {run_config['model_name']}")
262            detector.train()
263        if SUPPORTED_MODES[1] in run_config["mode"]:
264            logging.info(f"Running inference for model {run_config['model_name']}")
265            detector.inference()
266
267    except Exception as e:
268        logging.error(f"An error occurred with model {run_config['model_name']}: {e}")
269        wandb_exit_code = 1
270
271    finally:
272        wandb_close(wandb_exit_code)
273
274    return True

Auto-labeling workflow using Ultralytics models with optional training and inference.

def workflow_auto_labeling_hf(dataset, hf_dataset, run_config, wandb_activate=True):
277def workflow_auto_labeling_hf(dataset, hf_dataset, run_config, wandb_activate=True):
278    """Auto-labeling using Hugging Face models on a dataset, including training and/or inference based on the provided configuration."""
279    try:
280        wandb_exit_code = 0
281        wandb_run = wandb_init(
282            run_name=run_config["model_name"],
283            project_name="Auto Labeling Hugging Face",
284            dataset_name=run_config["v51_dataset_name"],
285            config=run_config,
286            wandb_activate=wandb_activate,
287        )
288
289        detector = HuggingFaceObjectDetection(
290            dataset=dataset,
291            config=run_config,
292        )
293        SUPPORTED_MODES = ["train", "inference"]
294
295        # Check if all selected modes are supported
296        for mode in run_config["mode"]:
297            if mode not in SUPPORTED_MODES:
298                logging.error(f"Selected mode {mode} is not supported.")
299        if SUPPORTED_MODES[0] in run_config["mode"]:
300            logging.info(f"Training model {run_config['model_name']}")
301            detector.train(hf_dataset)
302        if SUPPORTED_MODES[1] in run_config["mode"]:
303            logging.info(f"Running inference for model {run_config['model_name']}")
304            detector.inference(inference_settings=run_config["inference_settings"])
305
306    except Exception as e:
307        logging.error(f"An error occurred with model {run_config['model_name']}: {e}")
308        wandb_exit_code = 1
309
310    finally:
311        wandb_close(wandb_exit_code)
312
313    return True

Auto-labeling using Hugging Face models on a dataset, including training and/or inference based on the provided configuration.

def workflow_auto_labeling_custom_codetr(dataset, dataset_info, run_config, wandb_activate=True):
316def workflow_auto_labeling_custom_codetr(
317    dataset, dataset_info, run_config, wandb_activate=True
318):
319    """Auto labeling workflow using Co-DETR model supporting training and inference modes."""
320
321    try:
322        wandb_exit_code = 0
323        wandb_run = wandb_init(
324            run_name=run_config["config"],
325            project_name="Co-DETR Auto Labeling",
326            dataset_name=dataset_info["name"],
327            config=run_config,
328            wandb_activate=wandb_activate,
329        )
330
331        mode = run_config["mode"]
332
333        detector = CustomCoDETRObjectDetection(dataset, dataset_info, run_config)
334        detector.convert_data()
335        if "train" in mode:
336            detector.update_config_file(
337                dataset_name=dataset_info["name"],
338                config_file=run_config["config"],
339                max_epochs=run_config["epochs"],
340            )
341            detector.train(
342                run_config["config"], run_config["n_gpus"], run_config["container_tool"]
343            )
344        if "inference" in mode:
345            detector.run_inference(
346                dataset,
347                run_config["config"],
348                run_config["n_gpus"],
349                run_config["container_tool"],
350                run_config["inference_settings"],
351            )
352    except Exception as e:
353        logging.error(f"Error during CoDETR training: {e}")
354        wandb_exit_code = 1
355    finally:
356        wandb_close(wandb_exit_code)
357
358    return True

Auto labeling workflow using Co-DETR model supporting training and inference modes.

def workflow_zero_shot_object_detection(dataset, dataset_info, config):
361def workflow_zero_shot_object_detection(dataset, dataset_info, config):
362    """Run zero-shot object detection on a dataset using models from Huggingface, supporting both single and multi-GPU inference."""
363    # Set multiprocessing mode for CUDA multiprocessing
364    try:
365        mp.set_start_method("spawn", force=True)
366        logging.debug("Successfully set multiprocessing start method to 'spawn'")
367    except RuntimeError as e:
368        # This is expected if the start method was already set
369        logging.debug(f"Multiprocessing start method was already set: {e}")
370    except Exception as e:
371        # Handle any other unexpected errors
372        logging.error(f"Failed to set multiprocessing start method: {e}")
373
374    # Zero-shot object detector models from Huggingface
375    # Optimized for parallel multi-GPU inference, also supports single GPU
376    dataset_torch = FiftyOneTorchDatasetCOCO(dataset)
377    detector = ZeroShotObjectDetection(
378        dataset_torch=dataset_torch, dataset_info=dataset_info, config=config
379    )
380
381    # Check if model detections are already stored in V51 dataset or on disk
382    models_splits_dict = detector.exclude_stored_predictions(
383        dataset_v51=dataset, config=config
384    )
385    if len(models_splits_dict) > 0:
386        config["hf_models_zeroshot_objectdetection"] = models_splits_dict
387        distributor = ZeroShotDistributer(
388            config=config,
389            n_samples=len(dataset_torch),
390            dataset_info=dataset_info,
391            detector=detector,
392        )
393        distributor.distribute_and_run()
394    else:
395        logging.info(
396            "All zero shot models already have predictions stored in the dataset."
397        )
398
399    # To make new fields available to follow-up processes
400    dataset.reload()
401    dataset.save()
402
403    return True

Run zero-shot object detection on a dataset using models from Huggingface, supporting both single and multi-GPU inference.

def workflow_auto_label_mask(dataset, dataset_info, config):
406def workflow_auto_label_mask(dataset, dataset_info, config):
407    try:
408        depth_config = config["depth_estimation"]
409        seg_config = config["semantic_segmentation"]
410
411        for architecture_name, architecture_info in depth_config.items():
412            auto_labeler = AutoLabelMask(
413                dataset=dataset,
414                dataset_info=dataset_info,
415                model_name=architecture_name,
416                task_type="depth_estimation",
417                model_config=architecture_info,
418            )
419            auto_labeler.run_inference()
420
421        for architecture_name, architecture_info in seg_config.items():
422            auto_labeler = AutoLabelMask(
423                dataset=dataset,
424                dataset_info=dataset_info,
425                model_name=architecture_name,
426                task_type="semantic_segmentation",
427                model_config=architecture_info,
428            )
429            auto_labeler.run_inference()
430
431    except Exception as e:
432        logging.error(f"Auto-labeling mask workflow failed: {e}")
433        raise
def workflow_ensemble_selection(dataset, dataset_info, run_config, wandb_activate=True):
436def workflow_ensemble_selection(dataset, dataset_info, run_config, wandb_activate=True):
437    """Runs ensemble selection workflow on given dataset using provided configuration."""
438    try:
439        wandb_exit_code = 0
440
441        wandb_run = wandb_init(
442            run_name="Selection by Ensemble",
443            project_name="Ensemble Selection",
444            dataset_name=dataset_info["name"],
445            config=run_config,
446            wandb_activate=wandb_activate,
447        )
448        ensemble_selecter = EnsembleSelection(dataset, run_config)
449        ensemble_selecter.ensemble_selection()
450    except Exception as e:
451        logging.error(f"An error occured during Ensemble Selection: {e}")
452        wandb_exit_code = 1
453
454    finally:
455        wandb_close(wandb_exit_code)
456
457    return True

Runs ensemble selection workflow on given dataset using provided configuration.

def workflow_class_mapping( dataset, dataset_info, run_config, wandb_activate=True, test_dataset_source=None, test_dataset_target=None):
460def workflow_class_mapping(
461    dataset,
462    dataset_info,
463    run_config,
464    wandb_activate=True,
465    test_dataset_source=None,
466    test_dataset_target=None,
467):
468    """Runs class mapping workflow to align labels between the source dataset and target dataset."""
469    try:
470        wandb_exit_code = 0
471        # Initialize a wandb run for class mapping
472        wandb_run = wandb_init(
473            run_name="class_mapping",
474            project_name="Class Mapping",
475            dataset_name=dataset_info["name"],
476            config=run_config,
477            wandb_activate=wandb_activate,
478        )
479        class_mapping_models = run_config["hf_models_zeroshot_classification"]
480
481        for model_name in (
482            pbar := tqdm(class_mapping_models, desc="Processing Class Mapping")
483        ):
484            pbar.set_description(f"Zero Shot Classification model {model_name}")
485            mapper = ClassMapper(dataset, model_name, run_config)
486            try:
487                stats = mapper.run_mapping(test_dataset_source, test_dataset_target)
488                # Display statistics only if mapping was successful
489                source_class_counts = stats["total_processed"]
490                logging.info("\nClassification Results for Source Dataset:")
491                for source_class, count in stats["source_class_counts"].items():
492                    percentage = (
493                        (count / source_class_counts) * 100
494                        if source_class_counts > 0
495                        else 0
496                    )
497                    logging.info(
498                        f"{source_class}: {count} samples processed ({percentage:.1f}%)"
499                    )
500
501                # Display statistics for tags added to Target Dataset
502                logging.info("\nTag Addition Results (Target Dataset Tags):")
503                logging.info(f"Total new tags added: {stats['changes_made']}")
504                for target_class, tag_count in stats["tags_added_per_category"].items():
505                    logging.info(f"{target_class} tags added: {tag_count}")
506
507            except Exception as e:
508                logging.error(f"Error during mapping with model {model_name}: {e}")
509
510    except Exception as e:
511        logging.error(f"Error in class_mapping workflow: {e}")
512        wandb_exit_code = 1
513    finally:
514        wandb_close(wandb_exit_code)
515
516    return True

Runs class mapping workflow to align labels between the source dataset and target dataset.

def cleanup_memory(do_extensive_cleanup=False):
519def cleanup_memory(do_extensive_cleanup=False):
520    """Clean up memory after workflow execution. 'do_extensive_cleanup' recommended for multiple training sessions in a row."""
521    logging.info("Starting memory cleanup")
522    # Clear CUDA cache
523    if torch.cuda.is_available():
524        torch.cuda.empty_cache()
525
526    # Force garbage collection
527    gc.collect()
528
529    if do_extensive_cleanup:
530
531        # Clear any leftover tensors
532        n_deleted_torch_objects = 0
533        for obj in tqdm(
534            gc.get_objects(), desc="Deleting objects from Python Garbage Collector"
535        ):
536            try:
537                if torch.is_tensor(obj):
538                    del obj
539                    n_deleted_torch_objects += 1
540            except:
541                pass
542
543        logging.info(f"Deleted {n_deleted_torch_objects} torch objects")
544
545        # Final garbage collection
546        gc.collect()

Clean up memory after workflow execution. 'do_extensive_cleanup' recommended for multiple training sessions in a row.

class WorkflowExecutor:
549class WorkflowExecutor:
550    """Orchestrates the execution of multiple data processing workflows in sequence."""
551
552    def __init__(
553        self,
554        workflows: List[str],
555        selected_dataset: str,
556        dataset: fo.Dataset,
557        dataset_info: Dict,
558    ):
559        """Initializes with specified workflows, dataset selection, and dataset metadata."""
560        self.workflows = workflows
561        self.selected_dataset = selected_dataset
562        self.dataset = dataset
563        self.dataset_info = dataset_info
564
565    def execute(self) -> bool:
566        """Execute all configured workflows in sequence and handle errors."""
567        if len(self.workflows) == 0:
568            logging.error("No workflows selected.")
569            return False
570
571        logging.info(f"Selected workflows: {self.workflows}")
572        for workflow in self.workflows:
573            logging.info(
574                f"Running workflow {workflow} for dataset {self.selected_dataset}"
575            )
576            try:
577                if workflow == "aws_download":
578                    parameter_group = "mcity"
579                    parameters = WORKFLOWS["aws_download"].get(parameter_group, None)
580                    if parameter_group == "mcity":
581                        dataset, dataset_name, _ = workflow_aws_download(parameters)
582                    else:
583                        logging.error(
584                            f"The parameter group {parameter_group} is not supported. As AWS are highly specific, please provide a separate set of parameters and a workflow."
585                        )
586
587                    # Select downloaded dataset for further workflows if configured
588                    if dataset is not None:
589                        if parameters["selected_dataset_overwrite"] == True:
590
591                            dataset_info = {
592                                "name": dataset_name,
593                                "v51_type": "FiftyOneDataset",
594                                "splits": [],
595                            }
596
597                            self.dataset = dataset
598                            self.dataset_info = dataset_info
599                            logging.warning(
600                                f"Overwritting selected dataset {self.selected_dataset} with {dataset_name}"
601                            )
602                            self.selected_dataset = dataset_name
603
604                elif workflow == "embedding_selection":
605                    embedding_models = WORKFLOWS["embedding_selection"][
606                        "embedding_models"
607                    ]
608
609                    for MODEL_NAME in (
610                        pbar := tqdm(embedding_models, desc="Selection by Embeddings")
611                    ):
612
613                        # Status
614                        pbar.set_description(
615                            f"Selection by embeddings with model {MODEL_NAME}."
616                        )
617
618                        # Config
619                        mode = WORKFLOWS["embedding_selection"]["mode"]
620                        parameters = WORKFLOWS["embedding_selection"]["parameters"]
621                        config = {"mode": mode, "parameters": parameters}
622
623                        # Workflow
624                        workflow_embedding_selection(
625                            self.dataset,
626                            self.dataset_info,
627                            MODEL_NAME,
628                            config,
629                        )
630
631                elif workflow == "anomaly_detection":
632
633                    # Config
634                    ano_dec_config = WORKFLOWS["anomaly_detection"]
635                    anomalib_image_models = ano_dec_config["anomalib_image_models"]
636                    eval_metrics = ano_dec_config["anomalib_eval_metrics"]
637
638                    dataset_ano_dec = None
639                    data_root = None
640                    if "train" in ano_dec_config["mode"]:
641                        try:
642                            data_preparer = AnomalyDetectionDataPreparation(
643                                self.dataset, self.selected_dataset
644                            )
645                            dataset_ano_dec = data_preparer.dataset_ano_dec
646                            data_root = data_preparer.export_root
647                        except Exception as e:
648                            logging.error(
649                                f"Error during data preparation for Anomaly Detection: {e}"
650                            )
651
652                    for MODEL_NAME in (
653                        pbar := tqdm(anomalib_image_models, desc="Anomalib")
654                    ):
655                        # Status
656                        pbar.set_description(f"Anomalib model {MODEL_NAME}")
657
658                        # Config
659                        run_config = {
660                            "model_name": MODEL_NAME,
661                            "image_size": anomalib_image_models[MODEL_NAME].get(
662                                "image_size", None
663                            ),
664                            "batch_size": anomalib_image_models[MODEL_NAME].get(
665                                "batch_size", 1
666                            ),
667                            "epochs": ano_dec_config["epochs"],
668                            "early_stop_patience": ano_dec_config[
669                                "early_stop_patience"
670                            ],
671                            "data_root": data_root,
672                            "mode": ano_dec_config["mode"],
673                        }
674
675                        # Workflow
676                        workflow_anomaly_detection(
677                            self.dataset,
678                            dataset_ano_dec,
679                            self.dataset_info,
680                            eval_metrics,
681                            run_config,
682                        )
683
684                elif workflow == "auto_labeling":
685
686                    # Config
687                    SUPPORTED_MODEL_SOURCES = [
688                        "hf_models_objectdetection",
689                        "ultralytics",
690                        "custom_codetr",
691                        "roboflow",
692                    ]
693
694                    # Common parameters between models
695                    config_autolabel = WORKFLOWS["auto_labeling"]
696                    mode = config_autolabel["mode"]
697                    epochs = config_autolabel["epochs"]
698                    selected_model_source = config_autolabel["model_source"]
699
700                    # Check if all selected modes are supported
701                    for model_source in selected_model_source:
702                        if model_source not in SUPPORTED_MODEL_SOURCES:
703                            logging.error(
704                                f"Selected model source {model_source} is not supported."
705                            )
706
707                    if SUPPORTED_MODEL_SOURCES[0] in selected_model_source:
708                        # Hugging Face Models
709                        # Single GPU mode (https://github.com/huggingface/transformers/issues/28740)
710                        os.environ["CUDA_VISIBLE_DEVICES"] = "0"
711                        hf_models = config_autolabel["hf_models_objectdetection"]
712
713                        # Dataset Conversion
714                        try:
715                            logging.info("Converting dataset into Hugging Face format.")
716                            pytorch_dataset = FiftyOneTorchDatasetCOCO(self.dataset)
717                            pt_to_hf_converter = TorchToHFDatasetCOCO(pytorch_dataset)
718                            hf_dataset = pt_to_hf_converter.convert()
719                        except Exception as e:
720                            logging.error(f"Error during dataset conversion: {e}")
721
722                        for MODEL_NAME in (
723                            pbar := tqdm(hf_models, desc="Auto Labeling Models")
724                        ):
725                            # Status Update
726                            pbar.set_description(
727                                f"Processing Hugging Face model {MODEL_NAME}"
728                            )
729
730                            # Config
731                            config_model = config_autolabel[
732                                "hf_models_objectdetection"
733                            ][MODEL_NAME]
734
735                            run_config = {
736                                "mode": mode,
737                                "model_name": MODEL_NAME,
738                                "v51_dataset_name": self.selected_dataset,
739                                "epochs": epochs,
740                                "early_stop_threshold": config_autolabel[
741                                    "early_stop_threshold"
742                                ],
743                                "early_stop_patience": config_autolabel[
744                                    "early_stop_patience"
745                                ],
746                                "learning_rate": config_autolabel["learning_rate"],
747                                "weight_decay": config_autolabel["weight_decay"],
748                                "max_grad_norm": config_autolabel["max_grad_norm"],
749                                "batch_size": config_model.get("batch_size", 1),
750                                "image_size": config_model.get("image_size", None),
751                                "n_worker_dataloader": config_autolabel[
752                                    "n_worker_dataloader"
753                                ],
754                                "inference_settings": config_autolabel[
755                                    "inference_settings"
756                                ],
757                            }
758
759                            # Workflow
760                            workflow_auto_labeling_hf(
761                                self.dataset,
762                                hf_dataset,
763                                run_config,
764                            )
765
766                    if SUPPORTED_MODEL_SOURCES[1] in selected_model_source:
767                        # Ultralytics Models
768                        config_ultralytics = config_autolabel["ultralytics"]
769                        models_ultralytics = config_ultralytics["models"]
770                        export_dataset_root = config_ultralytics["export_dataset_root"]
771
772                        # Export data into necessary format
773                        if "train" in mode:
774                            try:
775                                UltralyticsObjectDetection.export_data(
776                                    self.dataset,
777                                    self.dataset_info,
778                                    export_dataset_root,
779                                )
780                            except Exception as e:
781                                logging.error(
782                                    f"Error during Ultralytics dataset export: {e}"
783                                )
784
785                        for model_name in (
786                            pbar := tqdm(
787                                models_ultralytics, desc="Ultralytics training"
788                            )
789                        ):
790                            pbar.set_description(f"Ultralytics model {model_name}")
791                            run_config = {
792                                "mode": mode,
793                                "model_name": model_name,
794                                "v51_dataset_name": self.dataset_info["name"],
795                                "epochs": epochs,
796                                "patience": config_autolabel["early_stop_patience"],
797                                "batch_size": models_ultralytics[model_name][
798                                    "batch_size"
799                                ],
800                                "img_size": models_ultralytics[model_name]["img_size"],
801                                "export_dataset_root": export_dataset_root,
802                                "inference_settings": config_autolabel[
803                                    "inference_settings"
804                                ],
805                                "multi_scale": config_ultralytics["multi_scale"],
806                                "cos_lr": config_ultralytics["cos_lr"],
807                            }
808
809                            workflow_auto_labeling_ultralytics(self.dataset, run_config)
810
811                    if SUPPORTED_MODEL_SOURCES[2] in selected_model_source:
812                        # Custom Co-DETR
813                        config_codetr = config_autolabel["custom_codetr"]
814                        run_config = {
815                            "export_dataset_root": config_codetr["export_dataset_root"],
816                            "container_tool": config_codetr["container_tool"],
817                            "n_gpus": config_codetr["n_gpus"],
818                            "mode": config_autolabel["mode"],
819                            "epochs": config_autolabel["epochs"],
820                            "inference_settings": config_autolabel[
821                                "inference_settings"
822                            ],
823                            "config": None,
824                        }
825                        codetr_configs = config_codetr["configs"]
826
827                        for config in (
828                            pbar := tqdm(
829                                codetr_configs, desc="Processing Co-DETR configurations"
830                            )
831                        ):
832                            pbar.set_description(f"Co-DETR model {config}")
833                            run_config["config"] = config
834                            workflow_auto_labeling_custom_codetr(
835                                self.dataset, self.dataset_info, run_config
836                            )
837
838                    if SUPPORTED_MODEL_SOURCES[3] in selected_model_source:
839
840                        config_rfdetr = config_autolabel["roboflow"]
841
842                        # Shared config parameters
843                        shared_config = {
844                            "epochs": config_autolabel["epochs"],
845                            "learning_rate": config_autolabel["learning_rate"],
846                            "weight_decay": config_autolabel["weight_decay"],
847                            "early_stop_patience": config_autolabel["early_stop_patience"],
848                            "early_stop_threshold": config_autolabel["early_stop_threshold"],
849                        }
850
851                        run_config = {
852                            "export_dataset_root": config_rfdetr["export_dataset_root"],
853                            "mode": config_autolabel["mode"],
854                            "inference_settings": config_autolabel["inference_settings"],
855                            "config": None,
856                            # RF-DETR specific parameters
857                            "batch_size": config_rfdetr["batch_size"],
858                            "grad_accum_steps": config_rfdetr["grad_accum_steps"],
859                            "lr_encoder": config_rfdetr["lr_encoder"],
860                            "resolution": config_rfdetr["resolution"],
861                            "use_ema": config_rfdetr["use_ema"],
862                            "gradient_checkpointing": config_rfdetr["gradient_checkpointing"],
863                            "early_stopping_min_delta": config_rfdetr["early_stopping_min_delta"],
864                            "early_stopping_use_ema": config_rfdetr["early_stopping_use_ema"],
865                        }
866
867                        rfdetr_configs = config_rfdetr["configs"]
868
869                        for config in (
870                            pbar := tqdm(rfdetr_configs, desc="Processing RF-DETR configurations")
871                        ):
872                            pbar.set_description(f"RF-DETR model {config}")
873                            run_config["config"] = config
874
875                            try:
876                                wandb_exit_code = 0
877                                wandb_run = wandb_init(
878                                    run_name=config,
879                                    project_name="RF-DETR Auto Labeling",
880                                    dataset_name=self.dataset_info["name"],
881                                    config=run_config,
882                                    wandb_activate=True,
883                                )
884
885                                detector = CustomRFDETRObjectDetection(
886                                    self.dataset, self.dataset_info, run_config
887                                )
888
889                                # Convert data to RF-DETR format
890                                detector.convert_data()
891
892                                # Training
893                                if "train" in mode:
894                                    logging.info(f"Training RF-DETR model: {config}")
895                                    detector.train(run_config, shared_config)
896
897                                # Inference
898                                if "inference" in mode:
899                                    logging.info(f"Running inference for RF-DETR model: {config}")
900                                    detector.inference(
901                                        inference_settings=config_autolabel["inference_settings"]
902                                    )
903
904                            except Exception as e:
905                                logging.error(f"Error during RF-DETR workflow with {config}: {e}")
906                                wandb_exit_code = 1
907                            finally:
908                                wandb_close(wandb_exit_code)
909
910
911                elif workflow == "auto_labeling_zero_shot":
912                    config = WORKFLOWS["auto_labeling_zero_shot"]
913                    workflow_zero_shot_object_detection(
914                        self.dataset, self.dataset_info, config
915                    )
916
917                elif workflow == "ensemble_selection":
918                    # Config
919                    run_config = WORKFLOWS["ensemble_selection"]
920
921                    # Workflow
922                    workflow_ensemble_selection(
923                        self.dataset, self.dataset_info, run_config
924                    )
925
926                elif workflow == "auto_label_mask":
927                    config = WORKFLOWS["auto_label_mask"]
928                    workflow_auto_label_mask(self.dataset, self.dataset_info, config)
929
930                elif workflow == "class_mapping":
931                    # Config
932                    run_config = WORKFLOWS["class_mapping"]
933
934                    # Workflow
935                    workflow_class_mapping(
936                        self.dataset,
937                        self.dataset_info,
938                        run_config,
939                        test_dataset_source=None,
940                        test_dataset_target=None,
941                    )
942                elif workflow == "data_ingest":
943                    dataset = run_data_ingest()
944
945                    dataset_info = {
946                        "name": "custom_dataset",
947                        "v51_type": "FiftyOneDataset",
948                        "splits": ["train", "val", "test"],
949                    }
950
951                    self.dataset = dataset
952                    self.dataset_info = dataset_info
953                    self.selected_dataset = "custom_dataset"
954
955                    logging.info(f"Data ingestion completed successfully.")
956
957
958                else:
959                    logging.error(
960                        f"Workflow {workflow} not found. Check available workflows in config.py."
961                    )
962                    return False
963
964                cleanup_memory()  # Clean after each workflow
965                logging.info(f"Completed workflow {workflow} and cleaned up memory")
966
967            except Exception as e:
968                logging.error(f"Workflow {workflow}: An error occurred: {e}")
969                wandb_close(exit_code=1)
970                cleanup_memory()  # Clean up even after failure
971
972        return True

Orchestrates the execution of multiple data processing workflows in sequence.

WorkflowExecutor( workflows: List[str], selected_dataset: str, dataset: fiftyone.core.dataset.Dataset, dataset_info: Dict)
552    def __init__(
553        self,
554        workflows: List[str],
555        selected_dataset: str,
556        dataset: fo.Dataset,
557        dataset_info: Dict,
558    ):
559        """Initializes with specified workflows, dataset selection, and dataset metadata."""
560        self.workflows = workflows
561        self.selected_dataset = selected_dataset
562        self.dataset = dataset
563        self.dataset_info = dataset_info

Initializes with specified workflows, dataset selection, and dataset metadata.

workflows
selected_dataset
dataset
dataset_info
def execute(self) -> bool:
565    def execute(self) -> bool:
566        """Execute all configured workflows in sequence and handle errors."""
567        if len(self.workflows) == 0:
568            logging.error("No workflows selected.")
569            return False
570
571        logging.info(f"Selected workflows: {self.workflows}")
572        for workflow in self.workflows:
573            logging.info(
574                f"Running workflow {workflow} for dataset {self.selected_dataset}"
575            )
576            try:
577                if workflow == "aws_download":
578                    parameter_group = "mcity"
579                    parameters = WORKFLOWS["aws_download"].get(parameter_group, None)
580                    if parameter_group == "mcity":
581                        dataset, dataset_name, _ = workflow_aws_download(parameters)
582                    else:
583                        logging.error(
584                            f"The parameter group {parameter_group} is not supported. As AWS are highly specific, please provide a separate set of parameters and a workflow."
585                        )
586
587                    # Select downloaded dataset for further workflows if configured
588                    if dataset is not None:
589                        if parameters["selected_dataset_overwrite"] == True:
590
591                            dataset_info = {
592                                "name": dataset_name,
593                                "v51_type": "FiftyOneDataset",
594                                "splits": [],
595                            }
596
597                            self.dataset = dataset
598                            self.dataset_info = dataset_info
599                            logging.warning(
600                                f"Overwritting selected dataset {self.selected_dataset} with {dataset_name}"
601                            )
602                            self.selected_dataset = dataset_name
603
604                elif workflow == "embedding_selection":
605                    embedding_models = WORKFLOWS["embedding_selection"][
606                        "embedding_models"
607                    ]
608
609                    for MODEL_NAME in (
610                        pbar := tqdm(embedding_models, desc="Selection by Embeddings")
611                    ):
612
613                        # Status
614                        pbar.set_description(
615                            f"Selection by embeddings with model {MODEL_NAME}."
616                        )
617
618                        # Config
619                        mode = WORKFLOWS["embedding_selection"]["mode"]
620                        parameters = WORKFLOWS["embedding_selection"]["parameters"]
621                        config = {"mode": mode, "parameters": parameters}
622
623                        # Workflow
624                        workflow_embedding_selection(
625                            self.dataset,
626                            self.dataset_info,
627                            MODEL_NAME,
628                            config,
629                        )
630
631                elif workflow == "anomaly_detection":
632
633                    # Config
634                    ano_dec_config = WORKFLOWS["anomaly_detection"]
635                    anomalib_image_models = ano_dec_config["anomalib_image_models"]
636                    eval_metrics = ano_dec_config["anomalib_eval_metrics"]
637
638                    dataset_ano_dec = None
639                    data_root = None
640                    if "train" in ano_dec_config["mode"]:
641                        try:
642                            data_preparer = AnomalyDetectionDataPreparation(
643                                self.dataset, self.selected_dataset
644                            )
645                            dataset_ano_dec = data_preparer.dataset_ano_dec
646                            data_root = data_preparer.export_root
647                        except Exception as e:
648                            logging.error(
649                                f"Error during data preparation for Anomaly Detection: {e}"
650                            )
651
652                    for MODEL_NAME in (
653                        pbar := tqdm(anomalib_image_models, desc="Anomalib")
654                    ):
655                        # Status
656                        pbar.set_description(f"Anomalib model {MODEL_NAME}")
657
658                        # Config
659                        run_config = {
660                            "model_name": MODEL_NAME,
661                            "image_size": anomalib_image_models[MODEL_NAME].get(
662                                "image_size", None
663                            ),
664                            "batch_size": anomalib_image_models[MODEL_NAME].get(
665                                "batch_size", 1
666                            ),
667                            "epochs": ano_dec_config["epochs"],
668                            "early_stop_patience": ano_dec_config[
669                                "early_stop_patience"
670                            ],
671                            "data_root": data_root,
672                            "mode": ano_dec_config["mode"],
673                        }
674
675                        # Workflow
676                        workflow_anomaly_detection(
677                            self.dataset,
678                            dataset_ano_dec,
679                            self.dataset_info,
680                            eval_metrics,
681                            run_config,
682                        )
683
684                elif workflow == "auto_labeling":
685
686                    # Config
687                    SUPPORTED_MODEL_SOURCES = [
688                        "hf_models_objectdetection",
689                        "ultralytics",
690                        "custom_codetr",
691                        "roboflow",
692                    ]
693
694                    # Common parameters between models
695                    config_autolabel = WORKFLOWS["auto_labeling"]
696                    mode = config_autolabel["mode"]
697                    epochs = config_autolabel["epochs"]
698                    selected_model_source = config_autolabel["model_source"]
699
700                    # Check if all selected modes are supported
701                    for model_source in selected_model_source:
702                        if model_source not in SUPPORTED_MODEL_SOURCES:
703                            logging.error(
704                                f"Selected model source {model_source} is not supported."
705                            )
706
707                    if SUPPORTED_MODEL_SOURCES[0] in selected_model_source:
708                        # Hugging Face Models
709                        # Single GPU mode (https://github.com/huggingface/transformers/issues/28740)
710                        os.environ["CUDA_VISIBLE_DEVICES"] = "0"
711                        hf_models = config_autolabel["hf_models_objectdetection"]
712
713                        # Dataset Conversion
714                        try:
715                            logging.info("Converting dataset into Hugging Face format.")
716                            pytorch_dataset = FiftyOneTorchDatasetCOCO(self.dataset)
717                            pt_to_hf_converter = TorchToHFDatasetCOCO(pytorch_dataset)
718                            hf_dataset = pt_to_hf_converter.convert()
719                        except Exception as e:
720                            logging.error(f"Error during dataset conversion: {e}")
721
722                        for MODEL_NAME in (
723                            pbar := tqdm(hf_models, desc="Auto Labeling Models")
724                        ):
725                            # Status Update
726                            pbar.set_description(
727                                f"Processing Hugging Face model {MODEL_NAME}"
728                            )
729
730                            # Config
731                            config_model = config_autolabel[
732                                "hf_models_objectdetection"
733                            ][MODEL_NAME]
734
735                            run_config = {
736                                "mode": mode,
737                                "model_name": MODEL_NAME,
738                                "v51_dataset_name": self.selected_dataset,
739                                "epochs": epochs,
740                                "early_stop_threshold": config_autolabel[
741                                    "early_stop_threshold"
742                                ],
743                                "early_stop_patience": config_autolabel[
744                                    "early_stop_patience"
745                                ],
746                                "learning_rate": config_autolabel["learning_rate"],
747                                "weight_decay": config_autolabel["weight_decay"],
748                                "max_grad_norm": config_autolabel["max_grad_norm"],
749                                "batch_size": config_model.get("batch_size", 1),
750                                "image_size": config_model.get("image_size", None),
751                                "n_worker_dataloader": config_autolabel[
752                                    "n_worker_dataloader"
753                                ],
754                                "inference_settings": config_autolabel[
755                                    "inference_settings"
756                                ],
757                            }
758
759                            # Workflow
760                            workflow_auto_labeling_hf(
761                                self.dataset,
762                                hf_dataset,
763                                run_config,
764                            )
765
766                    if SUPPORTED_MODEL_SOURCES[1] in selected_model_source:
767                        # Ultralytics Models
768                        config_ultralytics = config_autolabel["ultralytics"]
769                        models_ultralytics = config_ultralytics["models"]
770                        export_dataset_root = config_ultralytics["export_dataset_root"]
771
772                        # Export data into necessary format
773                        if "train" in mode:
774                            try:
775                                UltralyticsObjectDetection.export_data(
776                                    self.dataset,
777                                    self.dataset_info,
778                                    export_dataset_root,
779                                )
780                            except Exception as e:
781                                logging.error(
782                                    f"Error during Ultralytics dataset export: {e}"
783                                )
784
785                        for model_name in (
786                            pbar := tqdm(
787                                models_ultralytics, desc="Ultralytics training"
788                            )
789                        ):
790                            pbar.set_description(f"Ultralytics model {model_name}")
791                            run_config = {
792                                "mode": mode,
793                                "model_name": model_name,
794                                "v51_dataset_name": self.dataset_info["name"],
795                                "epochs": epochs,
796                                "patience": config_autolabel["early_stop_patience"],
797                                "batch_size": models_ultralytics[model_name][
798                                    "batch_size"
799                                ],
800                                "img_size": models_ultralytics[model_name]["img_size"],
801                                "export_dataset_root": export_dataset_root,
802                                "inference_settings": config_autolabel[
803                                    "inference_settings"
804                                ],
805                                "multi_scale": config_ultralytics["multi_scale"],
806                                "cos_lr": config_ultralytics["cos_lr"],
807                            }
808
809                            workflow_auto_labeling_ultralytics(self.dataset, run_config)
810
811                    if SUPPORTED_MODEL_SOURCES[2] in selected_model_source:
812                        # Custom Co-DETR
813                        config_codetr = config_autolabel["custom_codetr"]
814                        run_config = {
815                            "export_dataset_root": config_codetr["export_dataset_root"],
816                            "container_tool": config_codetr["container_tool"],
817                            "n_gpus": config_codetr["n_gpus"],
818                            "mode": config_autolabel["mode"],
819                            "epochs": config_autolabel["epochs"],
820                            "inference_settings": config_autolabel[
821                                "inference_settings"
822                            ],
823                            "config": None,
824                        }
825                        codetr_configs = config_codetr["configs"]
826
827                        for config in (
828                            pbar := tqdm(
829                                codetr_configs, desc="Processing Co-DETR configurations"
830                            )
831                        ):
832                            pbar.set_description(f"Co-DETR model {config}")
833                            run_config["config"] = config
834                            workflow_auto_labeling_custom_codetr(
835                                self.dataset, self.dataset_info, run_config
836                            )
837
838                    if SUPPORTED_MODEL_SOURCES[3] in selected_model_source:
839
840                        config_rfdetr = config_autolabel["roboflow"]
841
842                        # Shared config parameters
843                        shared_config = {
844                            "epochs": config_autolabel["epochs"],
845                            "learning_rate": config_autolabel["learning_rate"],
846                            "weight_decay": config_autolabel["weight_decay"],
847                            "early_stop_patience": config_autolabel["early_stop_patience"],
848                            "early_stop_threshold": config_autolabel["early_stop_threshold"],
849                        }
850
851                        run_config = {
852                            "export_dataset_root": config_rfdetr["export_dataset_root"],
853                            "mode": config_autolabel["mode"],
854                            "inference_settings": config_autolabel["inference_settings"],
855                            "config": None,
856                            # RF-DETR specific parameters
857                            "batch_size": config_rfdetr["batch_size"],
858                            "grad_accum_steps": config_rfdetr["grad_accum_steps"],
859                            "lr_encoder": config_rfdetr["lr_encoder"],
860                            "resolution": config_rfdetr["resolution"],
861                            "use_ema": config_rfdetr["use_ema"],
862                            "gradient_checkpointing": config_rfdetr["gradient_checkpointing"],
863                            "early_stopping_min_delta": config_rfdetr["early_stopping_min_delta"],
864                            "early_stopping_use_ema": config_rfdetr["early_stopping_use_ema"],
865                        }
866
867                        rfdetr_configs = config_rfdetr["configs"]
868
869                        for config in (
870                            pbar := tqdm(rfdetr_configs, desc="Processing RF-DETR configurations")
871                        ):
872                            pbar.set_description(f"RF-DETR model {config}")
873                            run_config["config"] = config
874
875                            try:
876                                wandb_exit_code = 0
877                                wandb_run = wandb_init(
878                                    run_name=config,
879                                    project_name="RF-DETR Auto Labeling",
880                                    dataset_name=self.dataset_info["name"],
881                                    config=run_config,
882                                    wandb_activate=True,
883                                )
884
885                                detector = CustomRFDETRObjectDetection(
886                                    self.dataset, self.dataset_info, run_config
887                                )
888
889                                # Convert data to RF-DETR format
890                                detector.convert_data()
891
892                                # Training
893                                if "train" in mode:
894                                    logging.info(f"Training RF-DETR model: {config}")
895                                    detector.train(run_config, shared_config)
896
897                                # Inference
898                                if "inference" in mode:
899                                    logging.info(f"Running inference for RF-DETR model: {config}")
900                                    detector.inference(
901                                        inference_settings=config_autolabel["inference_settings"]
902                                    )
903
904                            except Exception as e:
905                                logging.error(f"Error during RF-DETR workflow with {config}: {e}")
906                                wandb_exit_code = 1
907                            finally:
908                                wandb_close(wandb_exit_code)
909
910
911                elif workflow == "auto_labeling_zero_shot":
912                    config = WORKFLOWS["auto_labeling_zero_shot"]
913                    workflow_zero_shot_object_detection(
914                        self.dataset, self.dataset_info, config
915                    )
916
917                elif workflow == "ensemble_selection":
918                    # Config
919                    run_config = WORKFLOWS["ensemble_selection"]
920
921                    # Workflow
922                    workflow_ensemble_selection(
923                        self.dataset, self.dataset_info, run_config
924                    )
925
926                elif workflow == "auto_label_mask":
927                    config = WORKFLOWS["auto_label_mask"]
928                    workflow_auto_label_mask(self.dataset, self.dataset_info, config)
929
930                elif workflow == "class_mapping":
931                    # Config
932                    run_config = WORKFLOWS["class_mapping"]
933
934                    # Workflow
935                    workflow_class_mapping(
936                        self.dataset,
937                        self.dataset_info,
938                        run_config,
939                        test_dataset_source=None,
940                        test_dataset_target=None,
941                    )
942                elif workflow == "data_ingest":
943                    dataset = run_data_ingest()
944
945                    dataset_info = {
946                        "name": "custom_dataset",
947                        "v51_type": "FiftyOneDataset",
948                        "splits": ["train", "val", "test"],
949                    }
950
951                    self.dataset = dataset
952                    self.dataset_info = dataset_info
953                    self.selected_dataset = "custom_dataset"
954
955                    logging.info(f"Data ingestion completed successfully.")
956
957
958                else:
959                    logging.error(
960                        f"Workflow {workflow} not found. Check available workflows in config.py."
961                    )
962                    return False
963
964                cleanup_memory()  # Clean after each workflow
965                logging.info(f"Completed workflow {workflow} and cleaned up memory")
966
967            except Exception as e:
968                logging.error(f"Workflow {workflow}: An error occurred: {e}")
969                wandb_close(exit_code=1)
970                cleanup_memory()  # Clean up even after failure
971
972        return True

Execute all configured workflows in sequence and handle errors.

def main():
 975def main():
 976    """Executes the data processing workflow, loads dataset, and launches Voxel51 visualization interface."""
 977    time_start = time.time()
 978    configure_logging()
 979
 980    # Signal handler for CTRL + C
 981    signal.signal(signal.SIGINT, signal_handler)
 982
 983    # Execute workflows
 984    if "data_ingest" in SELECTED_WORKFLOW:
 985        executor = WorkflowExecutor(
 986            SELECTED_WORKFLOW,
 987            SELECTED_DATASET["name"],
 988            dataset=None,
 989            dataset_info=None,
 990        )
 991        executor.execute()
 992
 993        # FIX: Pull back outputs after ingestion
 994        dataset = executor.dataset
 995        dataset_info = executor.dataset_info
 996
 997    else:
 998        dataset, dataset_info = load_dataset(SELECTED_DATASET)
 999
1000        executor = WorkflowExecutor(
1001            SELECTED_WORKFLOW,
1002            SELECTED_DATASET["name"],
1003            dataset,
1004            dataset_info,
1005        )
1006        executor.execute()
1007
1008
1009    if dataset is not None:
1010        dataset.reload()
1011        dataset.save()
1012        arrange_fields_in_groups(dataset)
1013        logging.info(f"Launching Voxel51 session for dataset {dataset_info['name']}.")
1014
1015        # Dataset stats
1016        logging.debug(dataset)
1017        logging.debug(dataset.stats(include_media=True))
1018
1019        # V51 UI launch
1020        session = fo.launch_app(
1021            dataset, address=V51_ADDRESS, port=V51_PORT, remote=V51_REMOTE
1022        )
1023    else:
1024        logging.info("Skipping Voxel51 session.")
1025
1026
1027
1028    time_stop = time.time()
1029    logging.info(f"Elapsed time: {time_stop - time_start:.2f} seconds")

Executes the data processing workflow, loads dataset, and launches Voxel51 visualization interface.