main

  1import datetime
  2import logging
  3import os
  4import signal
  5import sys
  6import time
  7import warnings
  8from typing import Dict, List
  9
 10import torch
 11
 12IGNORE_FUTURE_WARNINGS = True
 13if IGNORE_FUTURE_WARNINGS:
 14    warnings.simplefilter("ignore", category=FutureWarning)
 15
 16import gc
 17
 18import fiftyone as fo
 19import torch.multiprocessing as mp
 20from tqdm import tqdm
 21
 22from config.config import (
 23    SELECTED_DATASET,
 24    SELECTED_WORKFLOW,
 25    V51_ADDRESS,
 26    V51_PORT,
 27    V51_REMOTE,
 28    WORKFLOWS,
 29)
 30from utils.anomaly_detection_data_preparation import AnomalyDetectionDataPreparation
 31from utils.data_loader import FiftyOneTorchDatasetCOCO, TorchToHFDatasetCOCO
 32from utils.dataset_loader import load_dataset
 33from utils.logging import configure_logging
 34from utils.mp_distribution import ZeroShotDistributer
 35from utils.sidebar_groups import arrange_fields_in_groups
 36from utils.wandb_helper import wandb_close, wandb_init
 37from workflows.anomaly_detection import Anodec
 38from workflows.auto_labeling import (
 39    CustomCoDETRObjectDetection,
 40    HuggingFaceObjectDetection,
 41    UltralyticsObjectDetection,
 42    ZeroShotObjectDetection,
 43)
 44from workflows.aws_download import AwsDownloader
 45from workflows.class_mapping import ClassMapper
 46from workflows.embedding_selection import EmbeddingSelection
 47from workflows.ensemble_selection import EnsembleSelection
 48from workflows.auto_label_mask import AutoLabelMask
 49from workflows.class_mapping import ClassMapper
 50
 51wandb_run = None  # Init globally to make sure it is available
 52
 53def signal_handler(sig, frame):
 54    """Handle Ctrl+C signal by cleaning up resources and exiting."""
 55    logging.error("You pressed Ctrl+C!")
 56    try:
 57        wandb_close(exit_code=1)
 58        cleanup_memory()
 59    except:
 60        pass
 61    sys.exit(0)
 62
 63
 64def workflow_aws_download(parameters, wandb_activate=True):
 65    """Download and process data from AWS S3 bucket."""
 66    dataset = None
 67    dataset_name = None
 68    wandb_exit_code = 0
 69    files_to_be_downloaded = 0
 70    try:
 71        # Config
 72        bucket = parameters["bucket"]
 73        prefix = parameters["prefix"]
 74        download_path = parameters["download_path"]
 75        test_run = parameters["test_run"]
 76
 77        # Logging
 78        now = datetime.datetime.now()
 79        datetime_str = now.strftime("%Y-%m-%d_%H-%M-%S")
 80        log_dir = f"logs/tensorboard/aws_{datetime_str}"
 81
 82        dataset_name = f"annarbor_rolling_{datetime_str}"
 83
 84        # Weights and Biases
 85        wandb_run = wandb_init(
 86            run_name=dataset_name,
 87            project_name="AWS Download",
 88            dataset_name=dataset_name,
 89            log_dir=log_dir,
 90            wandb_activate=wandb_activate,
 91        )
 92
 93        # Workflow
 94        aws_downloader = AwsDownloader(
 95            bucket=bucket,
 96            prefix=prefix,
 97            download_path=download_path,
 98            test_run=test_run,
 99        )
100
101        (
102            sub_folder,
103            files,
104            files_to_be_downloaded,
105            DOWNLOAD_NUMBER_SUCCESS,
106            DOWNLOAD_SIZE_SUCCESS,
107        ) = aws_downloader.download_files(log_dir=log_dir)
108
109        dataset = aws_downloader.decode_data(
110            sub_folder=sub_folder,
111            files=files,
112            log_dir=log_dir,
113            dataset_name=dataset_name,
114        )
115
116    except Exception as e:
117        logging.error(f"AWS Download and Extraction failed: {e}")
118        wandb_exit_code = 1
119
120    finally:
121        wandb_close(wandb_exit_code)
122
123    return dataset, dataset_name, files_to_be_downloaded
124
125
126def workflow_anomaly_detection(
127    dataset_normal,
128    dataset_ano_dec,
129    dataset_info,
130    eval_metrics,
131    run_config,
132    wandb_activate=True,
133):
134    """Run anomaly detection workflow using specified models and configurations."""
135    try:
136        # Weights and Biases
137        wandb_exit_code = 0
138        wandb_run, log_dir = wandb_init(
139            run_name=run_config["model_name"],
140            project_name="Selection by Anomaly Detection",
141            dataset_name=dataset_info["name"],
142            config=run_config,
143            wandb_activate=wandb_activate,
144        )
145
146        # Workflow
147
148        SUPPORTED_MODES = ["train", "inference"]
149        # Check if all selected modes are supported
150        for mode in run_config["mode"]:
151            if mode not in SUPPORTED_MODES:
152                logging.error(f"Selected mode {mode} is not supported.")
153        if SUPPORTED_MODES[0] in run_config["mode"]:
154            ano_dec = Anodec(
155                dataset=dataset_ano_dec,
156                eval_metrics=eval_metrics,
157                dataset_info=dataset_info,
158                config=run_config,
159                tensorboard_output=log_dir,
160            )
161            ano_dec.train_and_export_model()
162            ano_dec.run_inference(mode=SUPPORTED_MODES[0])
163            ano_dec.eval_v51()
164        if SUPPORTED_MODES[1] in run_config["mode"]:
165            ano_dec = Anodec(
166                dataset=dataset_normal,
167                eval_metrics=eval_metrics,
168                dataset_info=dataset_info,
169                config=run_config,
170                tensorboard_output=log_dir,
171            )
172            ano_dec.run_inference(mode=SUPPORTED_MODES[1])
173
174    except Exception as e:
175        logging.error(
176            f"Error in Anomaly Detection for model {run_config['model_name']}: {e}"
177        )
178        wandb_exit_code = 1
179    finally:
180        wandb_close(exit_code=wandb_exit_code)
181
182    return True
183
184
185def workflow_embedding_selection(
186    dataset, dataset_info, MODEL_NAME, config, wandb_activate=True
187):
188    """Compute embeddings and find representative and rare images for dataset selection."""
189    try:
190        wandb_exit_code = 0
191        wandb_run, log_dir = wandb_init(
192            run_name=MODEL_NAME,
193            project_name="Selection by Embedding",
194            dataset_name=dataset_info["name"],
195            config=config,
196            wandb_activate=wandb_activate,
197        )
198        embedding_selector = EmbeddingSelection(
199            dataset, dataset_info, MODEL_NAME, log_dir
200        )
201
202        if embedding_selector.model_already_used == False:
203
204            embedding_selector.compute_embeddings(config["mode"])
205            embedding_selector.compute_similarity()
206
207            # Find representative and unique samples as center points for further selections
208            thresholds = config["parameters"]
209            embedding_selector.compute_representativeness(
210                thresholds["compute_representativeness"]
211            )
212            embedding_selector.compute_unique_images_greedy(
213                thresholds["compute_unique_images_greedy"]
214            )
215            embedding_selector.compute_unique_images_deterministic(
216                thresholds["compute_unique_images_deterministic"]
217            )
218
219            # Select samples similar to the center points to enlarge the dataset
220            embedding_selector.compute_similar_images(
221                thresholds["compute_similar_images"], thresholds["neighbour_count"]
222            )
223        else:
224            logging.warning(
225                f"Skipping model {embedding_selector.model_name_key}. It was already used for sample selection."
226            )
227
228    except Exception as e:
229        logging.error(f"An error occurred with model {MODEL_NAME}: {e}")
230        wandb_exit_code = 1
231    finally:
232        wandb_close(wandb_exit_code)
233
234    return True
235
236
237def workflow_auto_labeling_ultralytics(dataset, run_config, wandb_activate=True):
238    """Auto-labeling workflow using Ultralytics models with optional training and inference."""
239    try:
240        wandb_exit_code = 0
241        wandb_run = wandb_init(
242            run_name=run_config["model_name"],
243            project_name="Auto Labeling Ultralytics",
244            dataset_name=run_config["v51_dataset_name"],
245            config=run_config,
246            wandb_activate=wandb_activate,
247        )
248
249        detector = UltralyticsObjectDetection(dataset=dataset, config=run_config)
250
251        # Check if all selected modes are supported
252        SUPPORTED_MODES = ["train", "inference"]
253        for mode in run_config["mode"]:
254            if mode not in SUPPORTED_MODES:
255                logging.error(f"Selected mode {mode} is not supported.")
256
257        if SUPPORTED_MODES[0] in run_config["mode"]:
258            logging.info(f"Training model {run_config['model_name']}")
259            detector.train()
260        if SUPPORTED_MODES[1] in run_config["mode"]:
261            logging.info(f"Running inference for model {run_config['model_name']}")
262            detector.inference()
263
264    except Exception as e:
265        logging.error(f"An error occurred with model {run_config['model_name']}: {e}")
266        wandb_exit_code = 1
267
268    finally:
269        wandb_close(wandb_exit_code)
270
271    return True
272
273
274def workflow_auto_labeling_hf(dataset, hf_dataset, run_config, wandb_activate=True):
275    """Auto-labeling using Hugging Face models on a dataset, including training and/or inference based on the provided configuration."""
276    try:
277        wandb_exit_code = 0
278        wandb_run = wandb_init(
279            run_name=run_config["model_name"],
280            project_name="Auto Labeling Hugging Face",
281            dataset_name=run_config["v51_dataset_name"],
282            config=run_config,
283            wandb_activate=wandb_activate,
284        )
285
286        detector = HuggingFaceObjectDetection(
287            dataset=dataset,
288            config=run_config,
289        )
290        SUPPORTED_MODES = ["train", "inference"]
291
292        # Check if all selected modes are supported
293        for mode in run_config["mode"]:
294            if mode not in SUPPORTED_MODES:
295                logging.error(f"Selected mode {mode} is not supported.")
296        if SUPPORTED_MODES[0] in run_config["mode"]:
297            logging.info(f"Training model {run_config['model_name']}")
298            detector.train(hf_dataset)
299        if SUPPORTED_MODES[1] in run_config["mode"]:
300            logging.info(f"Running inference for model {run_config['model_name']}")
301            detector.inference(inference_settings=run_config["inference_settings"])
302
303    except Exception as e:
304        logging.error(f"An error occurred with model {run_config['model_name']}: {e}")
305        wandb_exit_code = 1
306
307    finally:
308        wandb_close(wandb_exit_code)
309
310    return True
311
312
313def workflow_auto_labeling_custom_codetr(
314    dataset, dataset_info, run_config, wandb_activate=True
315):
316    """Auto labeling workflow using Co-DETR model supporting training and inference modes."""
317
318    try:
319        wandb_exit_code = 0
320        wandb_run = wandb_init(
321            run_name=run_config["config"],
322            project_name="Co-DETR Auto Labeling",
323            dataset_name=dataset_info["name"],
324            config=run_config,
325            wandb_activate=wandb_activate,
326        )
327
328        mode = run_config["mode"]
329
330        detector = CustomCoDETRObjectDetection(dataset, dataset_info, run_config)
331        detector.convert_data()
332        if "train" in mode:
333            detector.update_config_file(
334                dataset_name=dataset_info["name"],
335                config_file=run_config["config"],
336                max_epochs=run_config["epochs"],
337            )
338            detector.train(
339                run_config["config"], run_config["n_gpus"], run_config["container_tool"]
340            )
341        if "inference" in mode:
342            detector.run_inference(
343                dataset,
344                run_config["config"],
345                run_config["n_gpus"],
346                run_config["container_tool"],
347                run_config["inference_settings"],
348            )
349    except Exception as e:
350        logging.error(f"Error during CoDETR training: {e}")
351        wandb_exit_code = 1
352    finally:
353        wandb_close(wandb_exit_code)
354
355    return True
356
357
358def workflow_zero_shot_object_detection(dataset, dataset_info, config):
359    """Run zero-shot object detection on a dataset using models from Huggingface, supporting both single and multi-GPU inference."""
360    # Set multiprocessing mode for CUDA multiprocessing
361    try:
362        mp.set_start_method("spawn", force=True)
363        logging.debug("Successfully set multiprocessing start method to 'spawn'")
364    except RuntimeError as e:
365        # This is expected if the start method was already set
366        logging.debug(f"Multiprocessing start method was already set: {e}")
367    except Exception as e:
368        # Handle any other unexpected errors
369        logging.error(f"Failed to set multiprocessing start method: {e}")
370
371    # Zero-shot object detector models from Huggingface
372    # Optimized for parallel multi-GPU inference, also supports single GPU
373    dataset_torch = FiftyOneTorchDatasetCOCO(dataset)
374    detector = ZeroShotObjectDetection(
375        dataset_torch=dataset_torch, dataset_info=dataset_info, config=config
376    )
377
378    # Check if model detections are already stored in V51 dataset or on disk
379    models_splits_dict = detector.exclude_stored_predictions(
380        dataset_v51=dataset, config=config
381    )
382    if len(models_splits_dict) > 0:
383        config["hf_models_zeroshot_objectdetection"] = models_splits_dict
384        distributor = ZeroShotDistributer(
385            config=config,
386            n_samples=len(dataset_torch),
387            dataset_info=dataset_info,
388            detector=detector,
389        )
390        distributor.distribute_and_run()
391    else:
392        logging.info(
393            "All zero shot models already have predictions stored in the dataset."
394        )
395
396    # To make new fields available to follow-up processes
397    dataset.reload()
398    dataset.save()
399
400    return True
401
402
403def workflow_auto_label_mask(dataset, dataset_info, config):
404    try:
405        depth_config = config["depth_estimation"]
406        seg_config = config["semantic_segmentation"]
407
408        for architecture_name, architecture_info in depth_config.items():
409            auto_labeler = AutoLabelMask(
410                dataset=dataset,
411                dataset_info=dataset_info,
412                model_name=architecture_name,
413                task_type="depth_estimation",
414                model_config=architecture_info,
415            )
416            auto_labeler.run_inference()
417
418        for architecture_name, architecture_info in seg_config.items():
419            auto_labeler = AutoLabelMask(
420                dataset=dataset,
421                dataset_info=dataset_info,
422                model_name=architecture_name,
423                task_type="semantic_segmentation",
424                model_config=architecture_info,
425            )
426            auto_labeler.run_inference()
427
428    except Exception as e:
429        logging.error(f"Auto-labeling mask workflow failed: {e}")
430        raise
431
432
433def workflow_ensemble_selection(dataset, dataset_info, run_config, wandb_activate=True):
434    """Runs ensemble selection workflow on given dataset using provided configuration."""
435    try:
436        wandb_exit_code = 0
437
438        wandb_run = wandb_init(
439            run_name="Selection by Ensemble",
440            project_name="Ensemble Selection",
441            dataset_name=dataset_info["name"],
442            config=run_config,
443            wandb_activate=wandb_activate,
444        )
445        ensemble_selecter = EnsembleSelection(dataset, run_config)
446        ensemble_selecter.ensemble_selection()
447    except Exception as e:
448        logging.error(f"An error occured during Ensemble Selection: {e}")
449        wandb_exit_code = 1
450
451    finally:
452        wandb_close(wandb_exit_code)
453
454    return True
455
456
457def workflow_class_mapping(
458    dataset,
459    dataset_info,
460    run_config,
461    wandb_activate=True,
462    test_dataset_source=None,
463    test_dataset_target=None,
464):
465    """Runs class mapping workflow to align labels between the source dataset and target dataset."""
466    try:
467        wandb_exit_code = 0
468        # Initialize a wandb run for class mapping
469        wandb_run = wandb_init(
470            run_name="class_mapping",
471            project_name="Class Mapping",
472            dataset_name=dataset_info["name"],
473            config=run_config,
474            wandb_activate=wandb_activate,
475        )
476        class_mapping_models = run_config["hf_models_zeroshot_classification"]
477
478        for model_name in (
479            pbar := tqdm(class_mapping_models, desc="Processing Class Mapping")
480        ):
481            pbar.set_description(f"Zero Shot Classification model {model_name}")
482            mapper = ClassMapper(dataset, model_name, run_config)
483            try:
484                stats = mapper.run_mapping(test_dataset_source, test_dataset_target)
485                # Display statistics only if mapping was successful
486                source_class_counts = stats["total_processed"]
487                logging.info("\nClassification Results for Source Dataset:")
488                for source_class, count in stats["source_class_counts"].items():
489                    percentage = (
490                        (count / source_class_counts) * 100
491                        if source_class_counts > 0
492                        else 0
493                    )
494                    logging.info(
495                        f"{source_class}: {count} samples processed ({percentage:.1f}%)"
496                    )
497
498                # Display statistics for tags added to Target Dataset
499                logging.info("\nTag Addition Results (Target Dataset Tags):")
500                logging.info(f"Total new tags added: {stats['changes_made']}")
501                for target_class, tag_count in stats["tags_added_per_category"].items():
502                    logging.info(f"{target_class} tags added: {tag_count}")
503
504            except Exception as e:
505                logging.error(f"Error during mapping with model {model_name}: {e}")
506
507    except Exception as e:
508        logging.error(f"Error in class_mapping workflow: {e}")
509        wandb_exit_code = 1
510    finally:
511        wandb_close(wandb_exit_code)
512
513    return True
514
515
516def cleanup_memory(do_extensive_cleanup=False):
517    """Clean up memory after workflow execution. 'do_extensive_cleanup' recommended for multiple training sessions in a row."""
518    logging.info("Starting memory cleanup")
519    # Clear CUDA cache
520    if torch.cuda.is_available():
521        torch.cuda.empty_cache()
522
523    # Force garbage collection
524    gc.collect()
525
526    if do_extensive_cleanup:
527
528        # Clear any leftover tensors
529        n_deleted_torch_objects = 0
530        for obj in tqdm(
531            gc.get_objects(), desc="Deleting objects from Python Garbage Collector"
532        ):
533            try:
534                if torch.is_tensor(obj):
535                    del obj
536                    n_deleted_torch_objects += 1
537            except:
538                pass
539
540        logging.info(f"Deleted {n_deleted_torch_objects} torch objects")
541
542        # Final garbage collection
543        gc.collect()
544
545
546class WorkflowExecutor:
547    """Orchestrates the execution of multiple data processing workflows in sequence."""
548
549    def __init__(
550        self,
551        workflows: List[str],
552        selected_dataset: str,
553        dataset: fo.Dataset,
554        dataset_info: Dict,
555    ):
556        """Initializes with specified workflows, dataset selection, and dataset metadata."""
557        self.workflows = workflows
558        self.selected_dataset = selected_dataset
559        self.dataset = dataset
560        self.dataset_info = dataset_info
561
562    def execute(self) -> bool:
563        """Execute all configured workflows in sequence and handle errors."""
564        if len(self.workflows) == 0:
565            logging.error("No workflows selected.")
566            return False
567
568        logging.info(f"Selected workflows: {self.workflows}")
569        for workflow in self.workflows:
570            logging.info(
571                f"Running workflow {workflow} for dataset {self.selected_dataset}"
572            )
573            try:
574                if workflow == "aws_download":
575                    parameter_group = "mcity"
576                    parameters = WORKFLOWS["aws_download"].get(parameter_group, None)
577                    if parameter_group == "mcity":
578                        dataset, dataset_name, _ = workflow_aws_download(parameters)
579                    else:
580                        logging.error(
581                            f"The parameter group {parameter_group} is not supported. As AWS are highly specific, please provide a separate set of parameters and a workflow."
582                        )
583
584                    # Select downloaded dataset for further workflows if configured
585                    if dataset is not None:
586                        if parameters["selected_dataset_overwrite"] == True:
587
588                            dataset_info = {
589                                "name": dataset_name,
590                                "v51_type": "FiftyOneDataset",
591                                "splits": [],
592                            }
593
594                            self.dataset = dataset
595                            self.dataset_info = dataset_info
596                            logging.warning(
597                                f"Overwritting selected dataset {self.selected_dataset} with {dataset_name}"
598                            )
599                            self.selected_dataset = dataset_name
600
601                elif workflow == "embedding_selection":
602                    embedding_models = WORKFLOWS["embedding_selection"][
603                        "embedding_models"
604                    ]
605
606                    for MODEL_NAME in (
607                        pbar := tqdm(embedding_models, desc="Selection by Embeddings")
608                    ):
609
610                        # Status
611                        pbar.set_description(
612                            f"Selection by embeddings with model {MODEL_NAME}."
613                        )
614
615                        # Config
616                        mode = WORKFLOWS["embedding_selection"]["mode"]
617                        parameters = WORKFLOWS["embedding_selection"]["parameters"]
618                        config = {"mode": mode, "parameters": parameters}
619
620                        # Workflow
621                        workflow_embedding_selection(
622                            self.dataset,
623                            self.dataset_info,
624                            MODEL_NAME,
625                            config,
626                        )
627
628                elif workflow == "anomaly_detection":
629
630                    # Config
631                    ano_dec_config = WORKFLOWS["anomaly_detection"]
632                    anomalib_image_models = ano_dec_config["anomalib_image_models"]
633                    eval_metrics = ano_dec_config["anomalib_eval_metrics"]
634
635                    dataset_ano_dec = None
636                    data_root = None
637                    if "train" in ano_dec_config["mode"]:
638                        try:
639                            data_preparer = AnomalyDetectionDataPreparation(
640                                self.dataset, self.selected_dataset
641                            )
642                            dataset_ano_dec = data_preparer.dataset_ano_dec
643                            data_root = data_preparer.export_root
644                        except Exception as e:
645                            logging.error(
646                                f"Error during data preparation for Anomaly Detection: {e}"
647                            )
648
649                    for MODEL_NAME in (
650                        pbar := tqdm(anomalib_image_models, desc="Anomalib")
651                    ):
652                        # Status
653                        pbar.set_description(f"Anomalib model {MODEL_NAME}")
654
655                        # Config
656                        run_config = {
657                            "model_name": MODEL_NAME,
658                            "image_size": anomalib_image_models[MODEL_NAME].get(
659                                "image_size", None
660                            ),
661                            "batch_size": anomalib_image_models[MODEL_NAME].get(
662                                "batch_size", 1
663                            ),
664                            "epochs": ano_dec_config["epochs"],
665                            "early_stop_patience": ano_dec_config[
666                                "early_stop_patience"
667                            ],
668                            "data_root": data_root,
669                            "mode": ano_dec_config["mode"],
670                        }
671
672                        # Workflow
673                        workflow_anomaly_detection(
674                            self.dataset,
675                            dataset_ano_dec,
676                            self.dataset_info,
677                            eval_metrics,
678                            run_config,
679                        )
680
681                elif workflow == "auto_labeling":
682
683                    # Config
684                    SUPPORTED_MODEL_SOURCES = [
685                        "hf_models_objectdetection",
686                        "ultralytics",
687                        "custom_codetr",
688                    ]
689
690                    # Common parameters between models
691                    config_autolabel = WORKFLOWS["auto_labeling"]
692                    mode = config_autolabel["mode"]
693                    epochs = config_autolabel["epochs"]
694                    selected_model_source = config_autolabel["model_source"]
695
696                    # Check if all selected modes are supported
697                    for model_source in selected_model_source:
698                        if model_source not in SUPPORTED_MODEL_SOURCES:
699                            logging.error(
700                                f"Selected model source {model_source} is not supported."
701                            )
702
703                    if SUPPORTED_MODEL_SOURCES[0] in selected_model_source:
704                        # Hugging Face Models
705                        # Single GPU mode (https://github.com/huggingface/transformers/issues/28740)
706                        os.environ["CUDA_VISIBLE_DEVICES"] = "0"
707                        hf_models = config_autolabel["hf_models_objectdetection"]
708
709                        # Dataset Conversion
710                        try:
711                            logging.info("Converting dataset into Hugging Face format.")
712                            pytorch_dataset = FiftyOneTorchDatasetCOCO(self.dataset)
713                            pt_to_hf_converter = TorchToHFDatasetCOCO(pytorch_dataset)
714                            hf_dataset = pt_to_hf_converter.convert()
715                        except Exception as e:
716                            logging.error(f"Error during dataset conversion: {e}")
717
718                        for MODEL_NAME in (
719                            pbar := tqdm(hf_models, desc="Auto Labeling Models")
720                        ):
721                            # Status Update
722                            pbar.set_description(
723                                f"Processing Hugging Face model {MODEL_NAME}"
724                            )
725
726                            # Config
727                            config_model = config_autolabel[
728                                "hf_models_objectdetection"
729                            ][MODEL_NAME]
730
731                            run_config = {
732                                "mode": mode,
733                                "model_name": MODEL_NAME,
734                                "v51_dataset_name": self.selected_dataset,
735                                "epochs": epochs,
736                                "early_stop_threshold": config_autolabel[
737                                    "early_stop_threshold"
738                                ],
739                                "early_stop_patience": config_autolabel[
740                                    "early_stop_patience"
741                                ],
742                                "learning_rate": config_autolabel["learning_rate"],
743                                "weight_decay": config_autolabel["weight_decay"],
744                                "max_grad_norm": config_autolabel["max_grad_norm"],
745                                "batch_size": config_model.get("batch_size", 1),
746                                "image_size": config_model.get("image_size", None),
747                                "n_worker_dataloader": config_autolabel[
748                                    "n_worker_dataloader"
749                                ],
750                                "inference_settings": config_autolabel[
751                                    "inference_settings"
752                                ],
753                            }
754
755                            # Workflow
756                            workflow_auto_labeling_hf(
757                                self.dataset,
758                                hf_dataset,
759                                run_config,
760                            )
761
762                    if SUPPORTED_MODEL_SOURCES[1] in selected_model_source:
763                        # Ultralytics Models
764                        config_ultralytics = config_autolabel["ultralytics"]
765                        models_ultralytics = config_ultralytics["models"]
766                        export_dataset_root = config_ultralytics["export_dataset_root"]
767
768                        # Export data into necessary format
769                        if "train" in mode:
770                            try:
771                                UltralyticsObjectDetection.export_data(
772                                    self.dataset,
773                                    self.dataset_info,
774                                    export_dataset_root,
775                                )
776                            except Exception as e:
777                                logging.error(
778                                    f"Error during Ultralytics dataset export: {e}"
779                                )
780
781                        for model_name in (
782                            pbar := tqdm(
783                                models_ultralytics, desc="Ultralytics training"
784                            )
785                        ):
786                            pbar.set_description(f"Ultralytics model {model_name}")
787                            run_config = {
788                                "mode": mode,
789                                "model_name": model_name,
790                                "v51_dataset_name": self.dataset_info["name"],
791                                "epochs": epochs,
792                                "patience": config_autolabel["early_stop_patience"],
793                                "batch_size": models_ultralytics[model_name][
794                                    "batch_size"
795                                ],
796                                "img_size": models_ultralytics[model_name]["img_size"],
797                                "export_dataset_root": export_dataset_root,
798                                "inference_settings": config_autolabel[
799                                    "inference_settings"
800                                ],
801                                "multi_scale": config_ultralytics["multi_scale"],
802                                "cos_lr": config_ultralytics["cos_lr"],
803                            }
804
805                            workflow_auto_labeling_ultralytics(self.dataset, run_config)
806
807                    if SUPPORTED_MODEL_SOURCES[2] in selected_model_source:
808                        # Custom Co-DETR
809                        config_codetr = config_autolabel["custom_codetr"]
810                        run_config = {
811                            "export_dataset_root": config_codetr["export_dataset_root"],
812                            "container_tool": config_codetr["container_tool"],
813                            "n_gpus": config_codetr["n_gpus"],
814                            "mode": config_autolabel["mode"],
815                            "epochs": config_autolabel["epochs"],
816                            "inference_settings": config_autolabel[
817                                "inference_settings"
818                            ],
819                            "config": None,
820                        }
821                        codetr_configs = config_codetr["configs"]
822
823                        for config in (
824                            pbar := tqdm(
825                                codetr_configs, desc="Processing Co-DETR configurations"
826                            )
827                        ):
828                            pbar.set_description(f"Co-DETR model {config}")
829                            run_config["config"] = config
830                            workflow_auto_labeling_custom_codetr(
831                                self.dataset, self.dataset_info, run_config
832                            )
833
834                elif workflow == "auto_labeling_zero_shot":
835                    config = WORKFLOWS["auto_labeling_zero_shot"]
836                    workflow_zero_shot_object_detection(
837                        self.dataset, self.dataset_info, config
838                    )
839
840                elif workflow == "ensemble_selection":
841                    # Config
842                    run_config = WORKFLOWS["ensemble_selection"]
843
844                    # Workflow
845                    workflow_ensemble_selection(
846                        self.dataset, self.dataset_info, run_config
847                    )
848
849                elif workflow == "auto_label_mask":
850                    config = WORKFLOWS["auto_label_mask"]
851                    workflow_auto_label_mask(self.dataset, self.dataset_info, config)
852
853                elif workflow == "class_mapping":
854                    # Config
855                    run_config = WORKFLOWS["class_mapping"]
856
857                    # Workflow
858                    workflow_class_mapping(
859                        self.dataset,
860                        self.dataset_info,
861                        run_config,
862                        test_dataset_source=None,
863                        test_dataset_target=None,
864                    )
865
866                else:
867                    logging.error(
868                        f"Workflow {workflow} not found. Check available workflows in config.py."
869                    )
870                    return False
871
872                cleanup_memory()  # Clean after each workflow
873                logging.info(f"Completed workflow {workflow} and cleaned up memory")
874
875            except Exception as e:
876                logging.error(f"Workflow {workflow}: An error occurred: {e}")
877                wandb_close(exit_code=1)
878                cleanup_memory()  # Clean up even after failure
879
880        return True
881
882
883def main():
884    """Executes the data processing workflow, loads dataset, and launches Voxel51 visualization interface."""
885    time_start = time.time()
886    configure_logging()
887
888    # Signal handler for CTRL + C
889    signal.signal(signal.SIGINT, signal_handler)
890
891    # Execute workflows
892    dataset, dataset_info = load_dataset(SELECTED_DATASET)
893
894    executor = WorkflowExecutor(
895        SELECTED_WORKFLOW, SELECTED_DATASET["name"], dataset, dataset_info
896    )
897    executor.execute()
898
899    # Launch V51 session
900    dataset.reload()
901    dataset.save()
902    arrange_fields_in_groups(dataset)
903    logging.info(f"Launching Voxel51 session for dataset {dataset_info['name']}.")
904
905    # Dataset stats
906    logging.debug(dataset)
907    logging.debug(dataset.stats(include_media=True))
908
909    # V51 UI launch
910    session = fo.launch_app(
911        dataset, address=V51_ADDRESS, port=V51_PORT, remote=V51_REMOTE
912    )
913
914    time_stop = time.time()
915    logging.info(f"Elapsed time: {time_stop - time_start:.2f} seconds")
916
917
918if __name__ == "__main__":
919    cleanup_memory()
920    main()
IGNORE_FUTURE_WARNINGS = True
wandb_run = None
def signal_handler(sig, frame):
54def signal_handler(sig, frame):
55    """Handle Ctrl+C signal by cleaning up resources and exiting."""
56    logging.error("You pressed Ctrl+C!")
57    try:
58        wandb_close(exit_code=1)
59        cleanup_memory()
60    except:
61        pass
62    sys.exit(0)

Handle Ctrl+C signal by cleaning up resources and exiting.

def workflow_aws_download(parameters, wandb_activate=True):
 65def workflow_aws_download(parameters, wandb_activate=True):
 66    """Download and process data from AWS S3 bucket."""
 67    dataset = None
 68    dataset_name = None
 69    wandb_exit_code = 0
 70    files_to_be_downloaded = 0
 71    try:
 72        # Config
 73        bucket = parameters["bucket"]
 74        prefix = parameters["prefix"]
 75        download_path = parameters["download_path"]
 76        test_run = parameters["test_run"]
 77
 78        # Logging
 79        now = datetime.datetime.now()
 80        datetime_str = now.strftime("%Y-%m-%d_%H-%M-%S")
 81        log_dir = f"logs/tensorboard/aws_{datetime_str}"
 82
 83        dataset_name = f"annarbor_rolling_{datetime_str}"
 84
 85        # Weights and Biases
 86        wandb_run = wandb_init(
 87            run_name=dataset_name,
 88            project_name="AWS Download",
 89            dataset_name=dataset_name,
 90            log_dir=log_dir,
 91            wandb_activate=wandb_activate,
 92        )
 93
 94        # Workflow
 95        aws_downloader = AwsDownloader(
 96            bucket=bucket,
 97            prefix=prefix,
 98            download_path=download_path,
 99            test_run=test_run,
100        )
101
102        (
103            sub_folder,
104            files,
105            files_to_be_downloaded,
106            DOWNLOAD_NUMBER_SUCCESS,
107            DOWNLOAD_SIZE_SUCCESS,
108        ) = aws_downloader.download_files(log_dir=log_dir)
109
110        dataset = aws_downloader.decode_data(
111            sub_folder=sub_folder,
112            files=files,
113            log_dir=log_dir,
114            dataset_name=dataset_name,
115        )
116
117    except Exception as e:
118        logging.error(f"AWS Download and Extraction failed: {e}")
119        wandb_exit_code = 1
120
121    finally:
122        wandb_close(wandb_exit_code)
123
124    return dataset, dataset_name, files_to_be_downloaded

Download and process data from AWS S3 bucket.

def workflow_anomaly_detection( dataset_normal, dataset_ano_dec, dataset_info, eval_metrics, run_config, wandb_activate=True):
127def workflow_anomaly_detection(
128    dataset_normal,
129    dataset_ano_dec,
130    dataset_info,
131    eval_metrics,
132    run_config,
133    wandb_activate=True,
134):
135    """Run anomaly detection workflow using specified models and configurations."""
136    try:
137        # Weights and Biases
138        wandb_exit_code = 0
139        wandb_run, log_dir = wandb_init(
140            run_name=run_config["model_name"],
141            project_name="Selection by Anomaly Detection",
142            dataset_name=dataset_info["name"],
143            config=run_config,
144            wandb_activate=wandb_activate,
145        )
146
147        # Workflow
148
149        SUPPORTED_MODES = ["train", "inference"]
150        # Check if all selected modes are supported
151        for mode in run_config["mode"]:
152            if mode not in SUPPORTED_MODES:
153                logging.error(f"Selected mode {mode} is not supported.")
154        if SUPPORTED_MODES[0] in run_config["mode"]:
155            ano_dec = Anodec(
156                dataset=dataset_ano_dec,
157                eval_metrics=eval_metrics,
158                dataset_info=dataset_info,
159                config=run_config,
160                tensorboard_output=log_dir,
161            )
162            ano_dec.train_and_export_model()
163            ano_dec.run_inference(mode=SUPPORTED_MODES[0])
164            ano_dec.eval_v51()
165        if SUPPORTED_MODES[1] in run_config["mode"]:
166            ano_dec = Anodec(
167                dataset=dataset_normal,
168                eval_metrics=eval_metrics,
169                dataset_info=dataset_info,
170                config=run_config,
171                tensorboard_output=log_dir,
172            )
173            ano_dec.run_inference(mode=SUPPORTED_MODES[1])
174
175    except Exception as e:
176        logging.error(
177            f"Error in Anomaly Detection for model {run_config['model_name']}: {e}"
178        )
179        wandb_exit_code = 1
180    finally:
181        wandb_close(exit_code=wandb_exit_code)
182
183    return True

Run anomaly detection workflow using specified models and configurations.

def workflow_embedding_selection(dataset, dataset_info, MODEL_NAME, config, wandb_activate=True):
186def workflow_embedding_selection(
187    dataset, dataset_info, MODEL_NAME, config, wandb_activate=True
188):
189    """Compute embeddings and find representative and rare images for dataset selection."""
190    try:
191        wandb_exit_code = 0
192        wandb_run, log_dir = wandb_init(
193            run_name=MODEL_NAME,
194            project_name="Selection by Embedding",
195            dataset_name=dataset_info["name"],
196            config=config,
197            wandb_activate=wandb_activate,
198        )
199        embedding_selector = EmbeddingSelection(
200            dataset, dataset_info, MODEL_NAME, log_dir
201        )
202
203        if embedding_selector.model_already_used == False:
204
205            embedding_selector.compute_embeddings(config["mode"])
206            embedding_selector.compute_similarity()
207
208            # Find representative and unique samples as center points for further selections
209            thresholds = config["parameters"]
210            embedding_selector.compute_representativeness(
211                thresholds["compute_representativeness"]
212            )
213            embedding_selector.compute_unique_images_greedy(
214                thresholds["compute_unique_images_greedy"]
215            )
216            embedding_selector.compute_unique_images_deterministic(
217                thresholds["compute_unique_images_deterministic"]
218            )
219
220            # Select samples similar to the center points to enlarge the dataset
221            embedding_selector.compute_similar_images(
222                thresholds["compute_similar_images"], thresholds["neighbour_count"]
223            )
224        else:
225            logging.warning(
226                f"Skipping model {embedding_selector.model_name_key}. It was already used for sample selection."
227            )
228
229    except Exception as e:
230        logging.error(f"An error occurred with model {MODEL_NAME}: {e}")
231        wandb_exit_code = 1
232    finally:
233        wandb_close(wandb_exit_code)
234
235    return True

Compute embeddings and find representative and rare images for dataset selection.

def workflow_auto_labeling_ultralytics(dataset, run_config, wandb_activate=True):
238def workflow_auto_labeling_ultralytics(dataset, run_config, wandb_activate=True):
239    """Auto-labeling workflow using Ultralytics models with optional training and inference."""
240    try:
241        wandb_exit_code = 0
242        wandb_run = wandb_init(
243            run_name=run_config["model_name"],
244            project_name="Auto Labeling Ultralytics",
245            dataset_name=run_config["v51_dataset_name"],
246            config=run_config,
247            wandb_activate=wandb_activate,
248        )
249
250        detector = UltralyticsObjectDetection(dataset=dataset, config=run_config)
251
252        # Check if all selected modes are supported
253        SUPPORTED_MODES = ["train", "inference"]
254        for mode in run_config["mode"]:
255            if mode not in SUPPORTED_MODES:
256                logging.error(f"Selected mode {mode} is not supported.")
257
258        if SUPPORTED_MODES[0] in run_config["mode"]:
259            logging.info(f"Training model {run_config['model_name']}")
260            detector.train()
261        if SUPPORTED_MODES[1] in run_config["mode"]:
262            logging.info(f"Running inference for model {run_config['model_name']}")
263            detector.inference()
264
265    except Exception as e:
266        logging.error(f"An error occurred with model {run_config['model_name']}: {e}")
267        wandb_exit_code = 1
268
269    finally:
270        wandb_close(wandb_exit_code)
271
272    return True

Auto-labeling workflow using Ultralytics models with optional training and inference.

def workflow_auto_labeling_hf(dataset, hf_dataset, run_config, wandb_activate=True):
275def workflow_auto_labeling_hf(dataset, hf_dataset, run_config, wandb_activate=True):
276    """Auto-labeling using Hugging Face models on a dataset, including training and/or inference based on the provided configuration."""
277    try:
278        wandb_exit_code = 0
279        wandb_run = wandb_init(
280            run_name=run_config["model_name"],
281            project_name="Auto Labeling Hugging Face",
282            dataset_name=run_config["v51_dataset_name"],
283            config=run_config,
284            wandb_activate=wandb_activate,
285        )
286
287        detector = HuggingFaceObjectDetection(
288            dataset=dataset,
289            config=run_config,
290        )
291        SUPPORTED_MODES = ["train", "inference"]
292
293        # Check if all selected modes are supported
294        for mode in run_config["mode"]:
295            if mode not in SUPPORTED_MODES:
296                logging.error(f"Selected mode {mode} is not supported.")
297        if SUPPORTED_MODES[0] in run_config["mode"]:
298            logging.info(f"Training model {run_config['model_name']}")
299            detector.train(hf_dataset)
300        if SUPPORTED_MODES[1] in run_config["mode"]:
301            logging.info(f"Running inference for model {run_config['model_name']}")
302            detector.inference(inference_settings=run_config["inference_settings"])
303
304    except Exception as e:
305        logging.error(f"An error occurred with model {run_config['model_name']}: {e}")
306        wandb_exit_code = 1
307
308    finally:
309        wandb_close(wandb_exit_code)
310
311    return True

Auto-labeling using Hugging Face models on a dataset, including training and/or inference based on the provided configuration.

def workflow_auto_labeling_custom_codetr(dataset, dataset_info, run_config, wandb_activate=True):
314def workflow_auto_labeling_custom_codetr(
315    dataset, dataset_info, run_config, wandb_activate=True
316):
317    """Auto labeling workflow using Co-DETR model supporting training and inference modes."""
318
319    try:
320        wandb_exit_code = 0
321        wandb_run = wandb_init(
322            run_name=run_config["config"],
323            project_name="Co-DETR Auto Labeling",
324            dataset_name=dataset_info["name"],
325            config=run_config,
326            wandb_activate=wandb_activate,
327        )
328
329        mode = run_config["mode"]
330
331        detector = CustomCoDETRObjectDetection(dataset, dataset_info, run_config)
332        detector.convert_data()
333        if "train" in mode:
334            detector.update_config_file(
335                dataset_name=dataset_info["name"],
336                config_file=run_config["config"],
337                max_epochs=run_config["epochs"],
338            )
339            detector.train(
340                run_config["config"], run_config["n_gpus"], run_config["container_tool"]
341            )
342        if "inference" in mode:
343            detector.run_inference(
344                dataset,
345                run_config["config"],
346                run_config["n_gpus"],
347                run_config["container_tool"],
348                run_config["inference_settings"],
349            )
350    except Exception as e:
351        logging.error(f"Error during CoDETR training: {e}")
352        wandb_exit_code = 1
353    finally:
354        wandb_close(wandb_exit_code)
355
356    return True

Auto labeling workflow using Co-DETR model supporting training and inference modes.

def workflow_zero_shot_object_detection(dataset, dataset_info, config):
359def workflow_zero_shot_object_detection(dataset, dataset_info, config):
360    """Run zero-shot object detection on a dataset using models from Huggingface, supporting both single and multi-GPU inference."""
361    # Set multiprocessing mode for CUDA multiprocessing
362    try:
363        mp.set_start_method("spawn", force=True)
364        logging.debug("Successfully set multiprocessing start method to 'spawn'")
365    except RuntimeError as e:
366        # This is expected if the start method was already set
367        logging.debug(f"Multiprocessing start method was already set: {e}")
368    except Exception as e:
369        # Handle any other unexpected errors
370        logging.error(f"Failed to set multiprocessing start method: {e}")
371
372    # Zero-shot object detector models from Huggingface
373    # Optimized for parallel multi-GPU inference, also supports single GPU
374    dataset_torch = FiftyOneTorchDatasetCOCO(dataset)
375    detector = ZeroShotObjectDetection(
376        dataset_torch=dataset_torch, dataset_info=dataset_info, config=config
377    )
378
379    # Check if model detections are already stored in V51 dataset or on disk
380    models_splits_dict = detector.exclude_stored_predictions(
381        dataset_v51=dataset, config=config
382    )
383    if len(models_splits_dict) > 0:
384        config["hf_models_zeroshot_objectdetection"] = models_splits_dict
385        distributor = ZeroShotDistributer(
386            config=config,
387            n_samples=len(dataset_torch),
388            dataset_info=dataset_info,
389            detector=detector,
390        )
391        distributor.distribute_and_run()
392    else:
393        logging.info(
394            "All zero shot models already have predictions stored in the dataset."
395        )
396
397    # To make new fields available to follow-up processes
398    dataset.reload()
399    dataset.save()
400
401    return True

Run zero-shot object detection on a dataset using models from Huggingface, supporting both single and multi-GPU inference.

def workflow_auto_label_mask(dataset, dataset_info, config):
404def workflow_auto_label_mask(dataset, dataset_info, config):
405    try:
406        depth_config = config["depth_estimation"]
407        seg_config = config["semantic_segmentation"]
408
409        for architecture_name, architecture_info in depth_config.items():
410            auto_labeler = AutoLabelMask(
411                dataset=dataset,
412                dataset_info=dataset_info,
413                model_name=architecture_name,
414                task_type="depth_estimation",
415                model_config=architecture_info,
416            )
417            auto_labeler.run_inference()
418
419        for architecture_name, architecture_info in seg_config.items():
420            auto_labeler = AutoLabelMask(
421                dataset=dataset,
422                dataset_info=dataset_info,
423                model_name=architecture_name,
424                task_type="semantic_segmentation",
425                model_config=architecture_info,
426            )
427            auto_labeler.run_inference()
428
429    except Exception as e:
430        logging.error(f"Auto-labeling mask workflow failed: {e}")
431        raise
def workflow_ensemble_selection(dataset, dataset_info, run_config, wandb_activate=True):
434def workflow_ensemble_selection(dataset, dataset_info, run_config, wandb_activate=True):
435    """Runs ensemble selection workflow on given dataset using provided configuration."""
436    try:
437        wandb_exit_code = 0
438
439        wandb_run = wandb_init(
440            run_name="Selection by Ensemble",
441            project_name="Ensemble Selection",
442            dataset_name=dataset_info["name"],
443            config=run_config,
444            wandb_activate=wandb_activate,
445        )
446        ensemble_selecter = EnsembleSelection(dataset, run_config)
447        ensemble_selecter.ensemble_selection()
448    except Exception as e:
449        logging.error(f"An error occured during Ensemble Selection: {e}")
450        wandb_exit_code = 1
451
452    finally:
453        wandb_close(wandb_exit_code)
454
455    return True

Runs ensemble selection workflow on given dataset using provided configuration.

def workflow_class_mapping( dataset, dataset_info, run_config, wandb_activate=True, test_dataset_source=None, test_dataset_target=None):
458def workflow_class_mapping(
459    dataset,
460    dataset_info,
461    run_config,
462    wandb_activate=True,
463    test_dataset_source=None,
464    test_dataset_target=None,
465):
466    """Runs class mapping workflow to align labels between the source dataset and target dataset."""
467    try:
468        wandb_exit_code = 0
469        # Initialize a wandb run for class mapping
470        wandb_run = wandb_init(
471            run_name="class_mapping",
472            project_name="Class Mapping",
473            dataset_name=dataset_info["name"],
474            config=run_config,
475            wandb_activate=wandb_activate,
476        )
477        class_mapping_models = run_config["hf_models_zeroshot_classification"]
478
479        for model_name in (
480            pbar := tqdm(class_mapping_models, desc="Processing Class Mapping")
481        ):
482            pbar.set_description(f"Zero Shot Classification model {model_name}")
483            mapper = ClassMapper(dataset, model_name, run_config)
484            try:
485                stats = mapper.run_mapping(test_dataset_source, test_dataset_target)
486                # Display statistics only if mapping was successful
487                source_class_counts = stats["total_processed"]
488                logging.info("\nClassification Results for Source Dataset:")
489                for source_class, count in stats["source_class_counts"].items():
490                    percentage = (
491                        (count / source_class_counts) * 100
492                        if source_class_counts > 0
493                        else 0
494                    )
495                    logging.info(
496                        f"{source_class}: {count} samples processed ({percentage:.1f}%)"
497                    )
498
499                # Display statistics for tags added to Target Dataset
500                logging.info("\nTag Addition Results (Target Dataset Tags):")
501                logging.info(f"Total new tags added: {stats['changes_made']}")
502                for target_class, tag_count in stats["tags_added_per_category"].items():
503                    logging.info(f"{target_class} tags added: {tag_count}")
504
505            except Exception as e:
506                logging.error(f"Error during mapping with model {model_name}: {e}")
507
508    except Exception as e:
509        logging.error(f"Error in class_mapping workflow: {e}")
510        wandb_exit_code = 1
511    finally:
512        wandb_close(wandb_exit_code)
513
514    return True

Runs class mapping workflow to align labels between the source dataset and target dataset.

def cleanup_memory(do_extensive_cleanup=False):
517def cleanup_memory(do_extensive_cleanup=False):
518    """Clean up memory after workflow execution. 'do_extensive_cleanup' recommended for multiple training sessions in a row."""
519    logging.info("Starting memory cleanup")
520    # Clear CUDA cache
521    if torch.cuda.is_available():
522        torch.cuda.empty_cache()
523
524    # Force garbage collection
525    gc.collect()
526
527    if do_extensive_cleanup:
528
529        # Clear any leftover tensors
530        n_deleted_torch_objects = 0
531        for obj in tqdm(
532            gc.get_objects(), desc="Deleting objects from Python Garbage Collector"
533        ):
534            try:
535                if torch.is_tensor(obj):
536                    del obj
537                    n_deleted_torch_objects += 1
538            except:
539                pass
540
541        logging.info(f"Deleted {n_deleted_torch_objects} torch objects")
542
543        # Final garbage collection
544        gc.collect()

Clean up memory after workflow execution. 'do_extensive_cleanup' recommended for multiple training sessions in a row.

class WorkflowExecutor:
547class WorkflowExecutor:
548    """Orchestrates the execution of multiple data processing workflows in sequence."""
549
550    def __init__(
551        self,
552        workflows: List[str],
553        selected_dataset: str,
554        dataset: fo.Dataset,
555        dataset_info: Dict,
556    ):
557        """Initializes with specified workflows, dataset selection, and dataset metadata."""
558        self.workflows = workflows
559        self.selected_dataset = selected_dataset
560        self.dataset = dataset
561        self.dataset_info = dataset_info
562
563    def execute(self) -> bool:
564        """Execute all configured workflows in sequence and handle errors."""
565        if len(self.workflows) == 0:
566            logging.error("No workflows selected.")
567            return False
568
569        logging.info(f"Selected workflows: {self.workflows}")
570        for workflow in self.workflows:
571            logging.info(
572                f"Running workflow {workflow} for dataset {self.selected_dataset}"
573            )
574            try:
575                if workflow == "aws_download":
576                    parameter_group = "mcity"
577                    parameters = WORKFLOWS["aws_download"].get(parameter_group, None)
578                    if parameter_group == "mcity":
579                        dataset, dataset_name, _ = workflow_aws_download(parameters)
580                    else:
581                        logging.error(
582                            f"The parameter group {parameter_group} is not supported. As AWS are highly specific, please provide a separate set of parameters and a workflow."
583                        )
584
585                    # Select downloaded dataset for further workflows if configured
586                    if dataset is not None:
587                        if parameters["selected_dataset_overwrite"] == True:
588
589                            dataset_info = {
590                                "name": dataset_name,
591                                "v51_type": "FiftyOneDataset",
592                                "splits": [],
593                            }
594
595                            self.dataset = dataset
596                            self.dataset_info = dataset_info
597                            logging.warning(
598                                f"Overwritting selected dataset {self.selected_dataset} with {dataset_name}"
599                            )
600                            self.selected_dataset = dataset_name
601
602                elif workflow == "embedding_selection":
603                    embedding_models = WORKFLOWS["embedding_selection"][
604                        "embedding_models"
605                    ]
606
607                    for MODEL_NAME in (
608                        pbar := tqdm(embedding_models, desc="Selection by Embeddings")
609                    ):
610
611                        # Status
612                        pbar.set_description(
613                            f"Selection by embeddings with model {MODEL_NAME}."
614                        )
615
616                        # Config
617                        mode = WORKFLOWS["embedding_selection"]["mode"]
618                        parameters = WORKFLOWS["embedding_selection"]["parameters"]
619                        config = {"mode": mode, "parameters": parameters}
620
621                        # Workflow
622                        workflow_embedding_selection(
623                            self.dataset,
624                            self.dataset_info,
625                            MODEL_NAME,
626                            config,
627                        )
628
629                elif workflow == "anomaly_detection":
630
631                    # Config
632                    ano_dec_config = WORKFLOWS["anomaly_detection"]
633                    anomalib_image_models = ano_dec_config["anomalib_image_models"]
634                    eval_metrics = ano_dec_config["anomalib_eval_metrics"]
635
636                    dataset_ano_dec = None
637                    data_root = None
638                    if "train" in ano_dec_config["mode"]:
639                        try:
640                            data_preparer = AnomalyDetectionDataPreparation(
641                                self.dataset, self.selected_dataset
642                            )
643                            dataset_ano_dec = data_preparer.dataset_ano_dec
644                            data_root = data_preparer.export_root
645                        except Exception as e:
646                            logging.error(
647                                f"Error during data preparation for Anomaly Detection: {e}"
648                            )
649
650                    for MODEL_NAME in (
651                        pbar := tqdm(anomalib_image_models, desc="Anomalib")
652                    ):
653                        # Status
654                        pbar.set_description(f"Anomalib model {MODEL_NAME}")
655
656                        # Config
657                        run_config = {
658                            "model_name": MODEL_NAME,
659                            "image_size": anomalib_image_models[MODEL_NAME].get(
660                                "image_size", None
661                            ),
662                            "batch_size": anomalib_image_models[MODEL_NAME].get(
663                                "batch_size", 1
664                            ),
665                            "epochs": ano_dec_config["epochs"],
666                            "early_stop_patience": ano_dec_config[
667                                "early_stop_patience"
668                            ],
669                            "data_root": data_root,
670                            "mode": ano_dec_config["mode"],
671                        }
672
673                        # Workflow
674                        workflow_anomaly_detection(
675                            self.dataset,
676                            dataset_ano_dec,
677                            self.dataset_info,
678                            eval_metrics,
679                            run_config,
680                        )
681
682                elif workflow == "auto_labeling":
683
684                    # Config
685                    SUPPORTED_MODEL_SOURCES = [
686                        "hf_models_objectdetection",
687                        "ultralytics",
688                        "custom_codetr",
689                    ]
690
691                    # Common parameters between models
692                    config_autolabel = WORKFLOWS["auto_labeling"]
693                    mode = config_autolabel["mode"]
694                    epochs = config_autolabel["epochs"]
695                    selected_model_source = config_autolabel["model_source"]
696
697                    # Check if all selected modes are supported
698                    for model_source in selected_model_source:
699                        if model_source not in SUPPORTED_MODEL_SOURCES:
700                            logging.error(
701                                f"Selected model source {model_source} is not supported."
702                            )
703
704                    if SUPPORTED_MODEL_SOURCES[0] in selected_model_source:
705                        # Hugging Face Models
706                        # Single GPU mode (https://github.com/huggingface/transformers/issues/28740)
707                        os.environ["CUDA_VISIBLE_DEVICES"] = "0"
708                        hf_models = config_autolabel["hf_models_objectdetection"]
709
710                        # Dataset Conversion
711                        try:
712                            logging.info("Converting dataset into Hugging Face format.")
713                            pytorch_dataset = FiftyOneTorchDatasetCOCO(self.dataset)
714                            pt_to_hf_converter = TorchToHFDatasetCOCO(pytorch_dataset)
715                            hf_dataset = pt_to_hf_converter.convert()
716                        except Exception as e:
717                            logging.error(f"Error during dataset conversion: {e}")
718
719                        for MODEL_NAME in (
720                            pbar := tqdm(hf_models, desc="Auto Labeling Models")
721                        ):
722                            # Status Update
723                            pbar.set_description(
724                                f"Processing Hugging Face model {MODEL_NAME}"
725                            )
726
727                            # Config
728                            config_model = config_autolabel[
729                                "hf_models_objectdetection"
730                            ][MODEL_NAME]
731
732                            run_config = {
733                                "mode": mode,
734                                "model_name": MODEL_NAME,
735                                "v51_dataset_name": self.selected_dataset,
736                                "epochs": epochs,
737                                "early_stop_threshold": config_autolabel[
738                                    "early_stop_threshold"
739                                ],
740                                "early_stop_patience": config_autolabel[
741                                    "early_stop_patience"
742                                ],
743                                "learning_rate": config_autolabel["learning_rate"],
744                                "weight_decay": config_autolabel["weight_decay"],
745                                "max_grad_norm": config_autolabel["max_grad_norm"],
746                                "batch_size": config_model.get("batch_size", 1),
747                                "image_size": config_model.get("image_size", None),
748                                "n_worker_dataloader": config_autolabel[
749                                    "n_worker_dataloader"
750                                ],
751                                "inference_settings": config_autolabel[
752                                    "inference_settings"
753                                ],
754                            }
755
756                            # Workflow
757                            workflow_auto_labeling_hf(
758                                self.dataset,
759                                hf_dataset,
760                                run_config,
761                            )
762
763                    if SUPPORTED_MODEL_SOURCES[1] in selected_model_source:
764                        # Ultralytics Models
765                        config_ultralytics = config_autolabel["ultralytics"]
766                        models_ultralytics = config_ultralytics["models"]
767                        export_dataset_root = config_ultralytics["export_dataset_root"]
768
769                        # Export data into necessary format
770                        if "train" in mode:
771                            try:
772                                UltralyticsObjectDetection.export_data(
773                                    self.dataset,
774                                    self.dataset_info,
775                                    export_dataset_root,
776                                )
777                            except Exception as e:
778                                logging.error(
779                                    f"Error during Ultralytics dataset export: {e}"
780                                )
781
782                        for model_name in (
783                            pbar := tqdm(
784                                models_ultralytics, desc="Ultralytics training"
785                            )
786                        ):
787                            pbar.set_description(f"Ultralytics model {model_name}")
788                            run_config = {
789                                "mode": mode,
790                                "model_name": model_name,
791                                "v51_dataset_name": self.dataset_info["name"],
792                                "epochs": epochs,
793                                "patience": config_autolabel["early_stop_patience"],
794                                "batch_size": models_ultralytics[model_name][
795                                    "batch_size"
796                                ],
797                                "img_size": models_ultralytics[model_name]["img_size"],
798                                "export_dataset_root": export_dataset_root,
799                                "inference_settings": config_autolabel[
800                                    "inference_settings"
801                                ],
802                                "multi_scale": config_ultralytics["multi_scale"],
803                                "cos_lr": config_ultralytics["cos_lr"],
804                            }
805
806                            workflow_auto_labeling_ultralytics(self.dataset, run_config)
807
808                    if SUPPORTED_MODEL_SOURCES[2] in selected_model_source:
809                        # Custom Co-DETR
810                        config_codetr = config_autolabel["custom_codetr"]
811                        run_config = {
812                            "export_dataset_root": config_codetr["export_dataset_root"],
813                            "container_tool": config_codetr["container_tool"],
814                            "n_gpus": config_codetr["n_gpus"],
815                            "mode": config_autolabel["mode"],
816                            "epochs": config_autolabel["epochs"],
817                            "inference_settings": config_autolabel[
818                                "inference_settings"
819                            ],
820                            "config": None,
821                        }
822                        codetr_configs = config_codetr["configs"]
823
824                        for config in (
825                            pbar := tqdm(
826                                codetr_configs, desc="Processing Co-DETR configurations"
827                            )
828                        ):
829                            pbar.set_description(f"Co-DETR model {config}")
830                            run_config["config"] = config
831                            workflow_auto_labeling_custom_codetr(
832                                self.dataset, self.dataset_info, run_config
833                            )
834
835                elif workflow == "auto_labeling_zero_shot":
836                    config = WORKFLOWS["auto_labeling_zero_shot"]
837                    workflow_zero_shot_object_detection(
838                        self.dataset, self.dataset_info, config
839                    )
840
841                elif workflow == "ensemble_selection":
842                    # Config
843                    run_config = WORKFLOWS["ensemble_selection"]
844
845                    # Workflow
846                    workflow_ensemble_selection(
847                        self.dataset, self.dataset_info, run_config
848                    )
849
850                elif workflow == "auto_label_mask":
851                    config = WORKFLOWS["auto_label_mask"]
852                    workflow_auto_label_mask(self.dataset, self.dataset_info, config)
853
854                elif workflow == "class_mapping":
855                    # Config
856                    run_config = WORKFLOWS["class_mapping"]
857
858                    # Workflow
859                    workflow_class_mapping(
860                        self.dataset,
861                        self.dataset_info,
862                        run_config,
863                        test_dataset_source=None,
864                        test_dataset_target=None,
865                    )
866
867                else:
868                    logging.error(
869                        f"Workflow {workflow} not found. Check available workflows in config.py."
870                    )
871                    return False
872
873                cleanup_memory()  # Clean after each workflow
874                logging.info(f"Completed workflow {workflow} and cleaned up memory")
875
876            except Exception as e:
877                logging.error(f"Workflow {workflow}: An error occurred: {e}")
878                wandb_close(exit_code=1)
879                cleanup_memory()  # Clean up even after failure
880
881        return True

Orchestrates the execution of multiple data processing workflows in sequence.

WorkflowExecutor( workflows: List[str], selected_dataset: str, dataset: fiftyone.core.dataset.Dataset, dataset_info: Dict)
550    def __init__(
551        self,
552        workflows: List[str],
553        selected_dataset: str,
554        dataset: fo.Dataset,
555        dataset_info: Dict,
556    ):
557        """Initializes with specified workflows, dataset selection, and dataset metadata."""
558        self.workflows = workflows
559        self.selected_dataset = selected_dataset
560        self.dataset = dataset
561        self.dataset_info = dataset_info

Initializes with specified workflows, dataset selection, and dataset metadata.

workflows
selected_dataset
dataset
dataset_info
def execute(self) -> bool:
563    def execute(self) -> bool:
564        """Execute all configured workflows in sequence and handle errors."""
565        if len(self.workflows) == 0:
566            logging.error("No workflows selected.")
567            return False
568
569        logging.info(f"Selected workflows: {self.workflows}")
570        for workflow in self.workflows:
571            logging.info(
572                f"Running workflow {workflow} for dataset {self.selected_dataset}"
573            )
574            try:
575                if workflow == "aws_download":
576                    parameter_group = "mcity"
577                    parameters = WORKFLOWS["aws_download"].get(parameter_group, None)
578                    if parameter_group == "mcity":
579                        dataset, dataset_name, _ = workflow_aws_download(parameters)
580                    else:
581                        logging.error(
582                            f"The parameter group {parameter_group} is not supported. As AWS are highly specific, please provide a separate set of parameters and a workflow."
583                        )
584
585                    # Select downloaded dataset for further workflows if configured
586                    if dataset is not None:
587                        if parameters["selected_dataset_overwrite"] == True:
588
589                            dataset_info = {
590                                "name": dataset_name,
591                                "v51_type": "FiftyOneDataset",
592                                "splits": [],
593                            }
594
595                            self.dataset = dataset
596                            self.dataset_info = dataset_info
597                            logging.warning(
598                                f"Overwritting selected dataset {self.selected_dataset} with {dataset_name}"
599                            )
600                            self.selected_dataset = dataset_name
601
602                elif workflow == "embedding_selection":
603                    embedding_models = WORKFLOWS["embedding_selection"][
604                        "embedding_models"
605                    ]
606
607                    for MODEL_NAME in (
608                        pbar := tqdm(embedding_models, desc="Selection by Embeddings")
609                    ):
610
611                        # Status
612                        pbar.set_description(
613                            f"Selection by embeddings with model {MODEL_NAME}."
614                        )
615
616                        # Config
617                        mode = WORKFLOWS["embedding_selection"]["mode"]
618                        parameters = WORKFLOWS["embedding_selection"]["parameters"]
619                        config = {"mode": mode, "parameters": parameters}
620
621                        # Workflow
622                        workflow_embedding_selection(
623                            self.dataset,
624                            self.dataset_info,
625                            MODEL_NAME,
626                            config,
627                        )
628
629                elif workflow == "anomaly_detection":
630
631                    # Config
632                    ano_dec_config = WORKFLOWS["anomaly_detection"]
633                    anomalib_image_models = ano_dec_config["anomalib_image_models"]
634                    eval_metrics = ano_dec_config["anomalib_eval_metrics"]
635
636                    dataset_ano_dec = None
637                    data_root = None
638                    if "train" in ano_dec_config["mode"]:
639                        try:
640                            data_preparer = AnomalyDetectionDataPreparation(
641                                self.dataset, self.selected_dataset
642                            )
643                            dataset_ano_dec = data_preparer.dataset_ano_dec
644                            data_root = data_preparer.export_root
645                        except Exception as e:
646                            logging.error(
647                                f"Error during data preparation for Anomaly Detection: {e}"
648                            )
649
650                    for MODEL_NAME in (
651                        pbar := tqdm(anomalib_image_models, desc="Anomalib")
652                    ):
653                        # Status
654                        pbar.set_description(f"Anomalib model {MODEL_NAME}")
655
656                        # Config
657                        run_config = {
658                            "model_name": MODEL_NAME,
659                            "image_size": anomalib_image_models[MODEL_NAME].get(
660                                "image_size", None
661                            ),
662                            "batch_size": anomalib_image_models[MODEL_NAME].get(
663                                "batch_size", 1
664                            ),
665                            "epochs": ano_dec_config["epochs"],
666                            "early_stop_patience": ano_dec_config[
667                                "early_stop_patience"
668                            ],
669                            "data_root": data_root,
670                            "mode": ano_dec_config["mode"],
671                        }
672
673                        # Workflow
674                        workflow_anomaly_detection(
675                            self.dataset,
676                            dataset_ano_dec,
677                            self.dataset_info,
678                            eval_metrics,
679                            run_config,
680                        )
681
682                elif workflow == "auto_labeling":
683
684                    # Config
685                    SUPPORTED_MODEL_SOURCES = [
686                        "hf_models_objectdetection",
687                        "ultralytics",
688                        "custom_codetr",
689                    ]
690
691                    # Common parameters between models
692                    config_autolabel = WORKFLOWS["auto_labeling"]
693                    mode = config_autolabel["mode"]
694                    epochs = config_autolabel["epochs"]
695                    selected_model_source = config_autolabel["model_source"]
696
697                    # Check if all selected modes are supported
698                    for model_source in selected_model_source:
699                        if model_source not in SUPPORTED_MODEL_SOURCES:
700                            logging.error(
701                                f"Selected model source {model_source} is not supported."
702                            )
703
704                    if SUPPORTED_MODEL_SOURCES[0] in selected_model_source:
705                        # Hugging Face Models
706                        # Single GPU mode (https://github.com/huggingface/transformers/issues/28740)
707                        os.environ["CUDA_VISIBLE_DEVICES"] = "0"
708                        hf_models = config_autolabel["hf_models_objectdetection"]
709
710                        # Dataset Conversion
711                        try:
712                            logging.info("Converting dataset into Hugging Face format.")
713                            pytorch_dataset = FiftyOneTorchDatasetCOCO(self.dataset)
714                            pt_to_hf_converter = TorchToHFDatasetCOCO(pytorch_dataset)
715                            hf_dataset = pt_to_hf_converter.convert()
716                        except Exception as e:
717                            logging.error(f"Error during dataset conversion: {e}")
718
719                        for MODEL_NAME in (
720                            pbar := tqdm(hf_models, desc="Auto Labeling Models")
721                        ):
722                            # Status Update
723                            pbar.set_description(
724                                f"Processing Hugging Face model {MODEL_NAME}"
725                            )
726
727                            # Config
728                            config_model = config_autolabel[
729                                "hf_models_objectdetection"
730                            ][MODEL_NAME]
731
732                            run_config = {
733                                "mode": mode,
734                                "model_name": MODEL_NAME,
735                                "v51_dataset_name": self.selected_dataset,
736                                "epochs": epochs,
737                                "early_stop_threshold": config_autolabel[
738                                    "early_stop_threshold"
739                                ],
740                                "early_stop_patience": config_autolabel[
741                                    "early_stop_patience"
742                                ],
743                                "learning_rate": config_autolabel["learning_rate"],
744                                "weight_decay": config_autolabel["weight_decay"],
745                                "max_grad_norm": config_autolabel["max_grad_norm"],
746                                "batch_size": config_model.get("batch_size", 1),
747                                "image_size": config_model.get("image_size", None),
748                                "n_worker_dataloader": config_autolabel[
749                                    "n_worker_dataloader"
750                                ],
751                                "inference_settings": config_autolabel[
752                                    "inference_settings"
753                                ],
754                            }
755
756                            # Workflow
757                            workflow_auto_labeling_hf(
758                                self.dataset,
759                                hf_dataset,
760                                run_config,
761                            )
762
763                    if SUPPORTED_MODEL_SOURCES[1] in selected_model_source:
764                        # Ultralytics Models
765                        config_ultralytics = config_autolabel["ultralytics"]
766                        models_ultralytics = config_ultralytics["models"]
767                        export_dataset_root = config_ultralytics["export_dataset_root"]
768
769                        # Export data into necessary format
770                        if "train" in mode:
771                            try:
772                                UltralyticsObjectDetection.export_data(
773                                    self.dataset,
774                                    self.dataset_info,
775                                    export_dataset_root,
776                                )
777                            except Exception as e:
778                                logging.error(
779                                    f"Error during Ultralytics dataset export: {e}"
780                                )
781
782                        for model_name in (
783                            pbar := tqdm(
784                                models_ultralytics, desc="Ultralytics training"
785                            )
786                        ):
787                            pbar.set_description(f"Ultralytics model {model_name}")
788                            run_config = {
789                                "mode": mode,
790                                "model_name": model_name,
791                                "v51_dataset_name": self.dataset_info["name"],
792                                "epochs": epochs,
793                                "patience": config_autolabel["early_stop_patience"],
794                                "batch_size": models_ultralytics[model_name][
795                                    "batch_size"
796                                ],
797                                "img_size": models_ultralytics[model_name]["img_size"],
798                                "export_dataset_root": export_dataset_root,
799                                "inference_settings": config_autolabel[
800                                    "inference_settings"
801                                ],
802                                "multi_scale": config_ultralytics["multi_scale"],
803                                "cos_lr": config_ultralytics["cos_lr"],
804                            }
805
806                            workflow_auto_labeling_ultralytics(self.dataset, run_config)
807
808                    if SUPPORTED_MODEL_SOURCES[2] in selected_model_source:
809                        # Custom Co-DETR
810                        config_codetr = config_autolabel["custom_codetr"]
811                        run_config = {
812                            "export_dataset_root": config_codetr["export_dataset_root"],
813                            "container_tool": config_codetr["container_tool"],
814                            "n_gpus": config_codetr["n_gpus"],
815                            "mode": config_autolabel["mode"],
816                            "epochs": config_autolabel["epochs"],
817                            "inference_settings": config_autolabel[
818                                "inference_settings"
819                            ],
820                            "config": None,
821                        }
822                        codetr_configs = config_codetr["configs"]
823
824                        for config in (
825                            pbar := tqdm(
826                                codetr_configs, desc="Processing Co-DETR configurations"
827                            )
828                        ):
829                            pbar.set_description(f"Co-DETR model {config}")
830                            run_config["config"] = config
831                            workflow_auto_labeling_custom_codetr(
832                                self.dataset, self.dataset_info, run_config
833                            )
834
835                elif workflow == "auto_labeling_zero_shot":
836                    config = WORKFLOWS["auto_labeling_zero_shot"]
837                    workflow_zero_shot_object_detection(
838                        self.dataset, self.dataset_info, config
839                    )
840
841                elif workflow == "ensemble_selection":
842                    # Config
843                    run_config = WORKFLOWS["ensemble_selection"]
844
845                    # Workflow
846                    workflow_ensemble_selection(
847                        self.dataset, self.dataset_info, run_config
848                    )
849
850                elif workflow == "auto_label_mask":
851                    config = WORKFLOWS["auto_label_mask"]
852                    workflow_auto_label_mask(self.dataset, self.dataset_info, config)
853
854                elif workflow == "class_mapping":
855                    # Config
856                    run_config = WORKFLOWS["class_mapping"]
857
858                    # Workflow
859                    workflow_class_mapping(
860                        self.dataset,
861                        self.dataset_info,
862                        run_config,
863                        test_dataset_source=None,
864                        test_dataset_target=None,
865                    )
866
867                else:
868                    logging.error(
869                        f"Workflow {workflow} not found. Check available workflows in config.py."
870                    )
871                    return False
872
873                cleanup_memory()  # Clean after each workflow
874                logging.info(f"Completed workflow {workflow} and cleaned up memory")
875
876            except Exception as e:
877                logging.error(f"Workflow {workflow}: An error occurred: {e}")
878                wandb_close(exit_code=1)
879                cleanup_memory()  # Clean up even after failure
880
881        return True

Execute all configured workflows in sequence and handle errors.

def main():
884def main():
885    """Executes the data processing workflow, loads dataset, and launches Voxel51 visualization interface."""
886    time_start = time.time()
887    configure_logging()
888
889    # Signal handler for CTRL + C
890    signal.signal(signal.SIGINT, signal_handler)
891
892    # Execute workflows
893    dataset, dataset_info = load_dataset(SELECTED_DATASET)
894
895    executor = WorkflowExecutor(
896        SELECTED_WORKFLOW, SELECTED_DATASET["name"], dataset, dataset_info
897    )
898    executor.execute()
899
900    # Launch V51 session
901    dataset.reload()
902    dataset.save()
903    arrange_fields_in_groups(dataset)
904    logging.info(f"Launching Voxel51 session for dataset {dataset_info['name']}.")
905
906    # Dataset stats
907    logging.debug(dataset)
908    logging.debug(dataset.stats(include_media=True))
909
910    # V51 UI launch
911    session = fo.launch_app(
912        dataset, address=V51_ADDRESS, port=V51_PORT, remote=V51_REMOTE
913    )
914
915    time_stop = time.time()
916    logging.info(f"Elapsed time: {time_stop - time_start:.2f} seconds")

Executes the data processing workflow, loads dataset, and launches Voxel51 visualization interface.