main

  1import datetime
  2import logging
  3import os
  4import signal
  5import sys
  6import time
  7import warnings
  8from typing import Dict, List
  9
 10import torch
 11
 12IGNORE_FUTURE_WARNINGS = True
 13if IGNORE_FUTURE_WARNINGS:
 14    warnings.simplefilter("ignore", category=FutureWarning)
 15
 16import gc
 17
 18import fiftyone as fo
 19import torch.multiprocessing as mp
 20from tqdm import tqdm
 21
 22from config.config import (
 23    SELECTED_DATASET,
 24    SELECTED_WORKFLOW,
 25    V51_ADDRESS,
 26    V51_PORT,
 27    V51_REMOTE,
 28    WORKFLOWS,
 29)
 30from utils.anomaly_detection_data_preparation import AnomalyDetectionDataPreparation
 31from utils.data_loader import FiftyOneTorchDatasetCOCO, TorchToHFDatasetCOCO
 32from utils.dataset_loader import load_dataset
 33from utils.logging import configure_logging
 34from utils.mp_distribution import ZeroShotDistributer
 35from utils.sidebar_groups import arrange_fields_in_groups
 36from utils.wandb_helper import wandb_close, wandb_init
 37from workflows.anomaly_detection import Anodec
 38from workflows.auto_labeling import (
 39    CustomCoDETRObjectDetection,
 40    HuggingFaceObjectDetection,
 41    UltralyticsObjectDetection,
 42    ZeroShotObjectDetection,
 43)
 44from workflows.aws_download import AwsDownloader
 45from workflows.class_mapping import ClassMapper
 46from workflows.embedding_selection import EmbeddingSelection
 47from workflows.ensemble_selection import EnsembleSelection
 48from workflows.auto_label_mask import AutoLabelMask
 49from workflows.class_mapping import ClassMapper
 50from workflows.data_ingest import run_data_ingest
 51
 52wandb_run = None  # Init globally to make sure it is available
 53
 54def signal_handler(sig, frame):
 55    """Handle Ctrl+C signal by cleaning up resources and exiting."""
 56    logging.error("You pressed Ctrl+C!")
 57    try:
 58        wandb_close(exit_code=1)
 59        cleanup_memory()
 60    except:
 61        pass
 62    sys.exit(0)
 63
 64
 65def workflow_aws_download(parameters, wandb_activate=True):
 66    """Download and process data from AWS S3 bucket."""
 67    dataset = None
 68    dataset_name = None
 69    wandb_exit_code = 0
 70    files_to_be_downloaded = 0
 71    try:
 72        # Config
 73        bucket = parameters["bucket"]
 74        prefix = parameters["prefix"]
 75        download_path = parameters["download_path"]
 76        test_run = parameters["test_run"]
 77
 78        # Logging
 79        now = datetime.datetime.now()
 80        datetime_str = now.strftime("%Y-%m-%d_%H-%M-%S")
 81        log_dir = f"logs/tensorboard/aws_{datetime_str}"
 82
 83        dataset_name = f"annarbor_rolling_{datetime_str}"
 84
 85        # Weights and Biases
 86        wandb_run = wandb_init(
 87            run_name=dataset_name,
 88            project_name="AWS Download",
 89            dataset_name=dataset_name,
 90            log_dir=log_dir,
 91            wandb_activate=wandb_activate,
 92        )
 93
 94        # Workflow
 95        aws_downloader = AwsDownloader(
 96            bucket=bucket,
 97            prefix=prefix,
 98            download_path=download_path,
 99            test_run=test_run,
100        )
101
102        (
103            sub_folder,
104            files,
105            files_to_be_downloaded,
106            DOWNLOAD_NUMBER_SUCCESS,
107            DOWNLOAD_SIZE_SUCCESS,
108        ) = aws_downloader.download_files(log_dir=log_dir)
109
110        dataset = aws_downloader.decode_data(
111            sub_folder=sub_folder,
112            files=files,
113            log_dir=log_dir,
114            dataset_name=dataset_name,
115        )
116
117    except Exception as e:
118        logging.error(f"AWS Download and Extraction failed: {e}")
119        wandb_exit_code = 1
120
121    finally:
122        wandb_close(wandb_exit_code)
123
124    return dataset, dataset_name, files_to_be_downloaded
125
126
127def workflow_anomaly_detection(
128    dataset_normal,
129    dataset_ano_dec,
130    dataset_info,
131    eval_metrics,
132    run_config,
133    wandb_activate=True,
134):
135    """Run anomaly detection workflow using specified models and configurations."""
136    try:
137        # Weights and Biases
138        wandb_exit_code = 0
139        wandb_run, log_dir = wandb_init(
140            run_name=run_config["model_name"],
141            project_name="Selection by Anomaly Detection",
142            dataset_name=dataset_info["name"],
143            config=run_config,
144            wandb_activate=wandb_activate,
145        )
146
147        # Workflow
148
149        SUPPORTED_MODES = ["train", "inference"]
150        # Check if all selected modes are supported
151        for mode in run_config["mode"]:
152            if mode not in SUPPORTED_MODES:
153                logging.error(f"Selected mode {mode} is not supported.")
154        if SUPPORTED_MODES[0] in run_config["mode"]:
155            ano_dec = Anodec(
156                dataset=dataset_ano_dec,
157                eval_metrics=eval_metrics,
158                dataset_info=dataset_info,
159                config=run_config,
160                tensorboard_output=log_dir,
161            )
162            ano_dec.train_and_export_model()
163            ano_dec.run_inference(mode=SUPPORTED_MODES[0])
164            ano_dec.eval_v51()
165        if SUPPORTED_MODES[1] in run_config["mode"]:
166            ano_dec = Anodec(
167                dataset=dataset_normal,
168                eval_metrics=eval_metrics,
169                dataset_info=dataset_info,
170                config=run_config,
171                tensorboard_output=log_dir,
172            )
173            ano_dec.run_inference(mode=SUPPORTED_MODES[1])
174
175    except Exception as e:
176        logging.error(
177            f"Error in Anomaly Detection for model {run_config['model_name']}: {e}"
178        )
179        wandb_exit_code = 1
180    finally:
181        wandb_close(exit_code=wandb_exit_code)
182
183    return True
184
185
186def workflow_embedding_selection(
187    dataset, dataset_info, MODEL_NAME, config, wandb_activate=True
188):
189    """Compute embeddings and find representative and rare images for dataset selection."""
190    try:
191        wandb_exit_code = 0
192        wandb_run, log_dir = wandb_init(
193            run_name=MODEL_NAME,
194            project_name="Selection by Embedding",
195            dataset_name=dataset_info["name"],
196            config=config,
197            wandb_activate=wandb_activate,
198        )
199        embedding_selector = EmbeddingSelection(
200            dataset, dataset_info, MODEL_NAME, log_dir
201        )
202
203        if embedding_selector.model_already_used == False:
204
205            embedding_selector.compute_embeddings(config["mode"])
206            embedding_selector.compute_similarity()
207
208            # Find representative and unique samples as center points for further selections
209            thresholds = config["parameters"]
210            embedding_selector.compute_representativeness(
211                thresholds["compute_representativeness"]
212            )
213            embedding_selector.compute_unique_images_greedy(
214                thresholds["compute_unique_images_greedy"]
215            )
216            embedding_selector.compute_unique_images_deterministic(
217                thresholds["compute_unique_images_deterministic"]
218            )
219
220            # Select samples similar to the center points to enlarge the dataset
221            embedding_selector.compute_similar_images(
222                thresholds["compute_similar_images"], thresholds["neighbour_count"]
223            )
224        else:
225            logging.warning(
226                f"Skipping model {embedding_selector.model_name_key}. It was already used for sample selection."
227            )
228
229    except Exception as e:
230        logging.error(f"An error occurred with model {MODEL_NAME}: {e}")
231        wandb_exit_code = 1
232    finally:
233        wandb_close(wandb_exit_code)
234
235    return True
236
237
238def workflow_auto_labeling_ultralytics(dataset, run_config, wandb_activate=True):
239    """Auto-labeling workflow using Ultralytics models with optional training and inference."""
240    try:
241        wandb_exit_code = 0
242        wandb_run = wandb_init(
243            run_name=run_config["model_name"],
244            project_name="Auto Labeling Ultralytics",
245            dataset_name=run_config["v51_dataset_name"],
246            config=run_config,
247            wandb_activate=wandb_activate,
248        )
249
250        detector = UltralyticsObjectDetection(dataset=dataset, config=run_config)
251
252        # Check if all selected modes are supported
253        SUPPORTED_MODES = ["train", "inference"]
254        for mode in run_config["mode"]:
255            if mode not in SUPPORTED_MODES:
256                logging.error(f"Selected mode {mode} is not supported.")
257
258        if SUPPORTED_MODES[0] in run_config["mode"]:
259            logging.info(f"Training model {run_config['model_name']}")
260            detector.train()
261        if SUPPORTED_MODES[1] in run_config["mode"]:
262            logging.info(f"Running inference for model {run_config['model_name']}")
263            detector.inference()
264
265    except Exception as e:
266        logging.error(f"An error occurred with model {run_config['model_name']}: {e}")
267        wandb_exit_code = 1
268
269    finally:
270        wandb_close(wandb_exit_code)
271
272    return True
273
274
275def workflow_auto_labeling_hf(dataset, hf_dataset, run_config, wandb_activate=True):
276    """Auto-labeling using Hugging Face models on a dataset, including training and/or inference based on the provided configuration."""
277    try:
278        wandb_exit_code = 0
279        wandb_run = wandb_init(
280            run_name=run_config["model_name"],
281            project_name="Auto Labeling Hugging Face",
282            dataset_name=run_config["v51_dataset_name"],
283            config=run_config,
284            wandb_activate=wandb_activate,
285        )
286
287        detector = HuggingFaceObjectDetection(
288            dataset=dataset,
289            config=run_config,
290        )
291        SUPPORTED_MODES = ["train", "inference"]
292
293        # Check if all selected modes are supported
294        for mode in run_config["mode"]:
295            if mode not in SUPPORTED_MODES:
296                logging.error(f"Selected mode {mode} is not supported.")
297        if SUPPORTED_MODES[0] in run_config["mode"]:
298            logging.info(f"Training model {run_config['model_name']}")
299            detector.train(hf_dataset)
300        if SUPPORTED_MODES[1] in run_config["mode"]:
301            logging.info(f"Running inference for model {run_config['model_name']}")
302            detector.inference(inference_settings=run_config["inference_settings"])
303
304    except Exception as e:
305        logging.error(f"An error occurred with model {run_config['model_name']}: {e}")
306        wandb_exit_code = 1
307
308    finally:
309        wandb_close(wandb_exit_code)
310
311    return True
312
313
314def workflow_auto_labeling_custom_codetr(
315    dataset, dataset_info, run_config, wandb_activate=True
316):
317    """Auto labeling workflow using Co-DETR model supporting training and inference modes."""
318
319    try:
320        wandb_exit_code = 0
321        wandb_run = wandb_init(
322            run_name=run_config["config"],
323            project_name="Co-DETR Auto Labeling",
324            dataset_name=dataset_info["name"],
325            config=run_config,
326            wandb_activate=wandb_activate,
327        )
328
329        mode = run_config["mode"]
330
331        detector = CustomCoDETRObjectDetection(dataset, dataset_info, run_config)
332        detector.convert_data()
333        if "train" in mode:
334            detector.update_config_file(
335                dataset_name=dataset_info["name"],
336                config_file=run_config["config"],
337                max_epochs=run_config["epochs"],
338            )
339            detector.train(
340                run_config["config"], run_config["n_gpus"], run_config["container_tool"]
341            )
342        if "inference" in mode:
343            detector.run_inference(
344                dataset,
345                run_config["config"],
346                run_config["n_gpus"],
347                run_config["container_tool"],
348                run_config["inference_settings"],
349            )
350    except Exception as e:
351        logging.error(f"Error during CoDETR training: {e}")
352        wandb_exit_code = 1
353    finally:
354        wandb_close(wandb_exit_code)
355
356    return True
357
358
359def workflow_zero_shot_object_detection(dataset, dataset_info, config):
360    """Run zero-shot object detection on a dataset using models from Huggingface, supporting both single and multi-GPU inference."""
361    # Set multiprocessing mode for CUDA multiprocessing
362    try:
363        mp.set_start_method("spawn", force=True)
364        logging.debug("Successfully set multiprocessing start method to 'spawn'")
365    except RuntimeError as e:
366        # This is expected if the start method was already set
367        logging.debug(f"Multiprocessing start method was already set: {e}")
368    except Exception as e:
369        # Handle any other unexpected errors
370        logging.error(f"Failed to set multiprocessing start method: {e}")
371
372    # Zero-shot object detector models from Huggingface
373    # Optimized for parallel multi-GPU inference, also supports single GPU
374    dataset_torch = FiftyOneTorchDatasetCOCO(dataset)
375    detector = ZeroShotObjectDetection(
376        dataset_torch=dataset_torch, dataset_info=dataset_info, config=config
377    )
378
379    # Check if model detections are already stored in V51 dataset or on disk
380    models_splits_dict = detector.exclude_stored_predictions(
381        dataset_v51=dataset, config=config
382    )
383    if len(models_splits_dict) > 0:
384        config["hf_models_zeroshot_objectdetection"] = models_splits_dict
385        distributor = ZeroShotDistributer(
386            config=config,
387            n_samples=len(dataset_torch),
388            dataset_info=dataset_info,
389            detector=detector,
390        )
391        distributor.distribute_and_run()
392    else:
393        logging.info(
394            "All zero shot models already have predictions stored in the dataset."
395        )
396
397    # To make new fields available to follow-up processes
398    dataset.reload()
399    dataset.save()
400
401    return True
402
403
404def workflow_auto_label_mask(dataset, dataset_info, config):
405    try:
406        depth_config = config["depth_estimation"]
407        seg_config = config["semantic_segmentation"]
408
409        for architecture_name, architecture_info in depth_config.items():
410            auto_labeler = AutoLabelMask(
411                dataset=dataset,
412                dataset_info=dataset_info,
413                model_name=architecture_name,
414                task_type="depth_estimation",
415                model_config=architecture_info,
416            )
417            auto_labeler.run_inference()
418
419        for architecture_name, architecture_info in seg_config.items():
420            auto_labeler = AutoLabelMask(
421                dataset=dataset,
422                dataset_info=dataset_info,
423                model_name=architecture_name,
424                task_type="semantic_segmentation",
425                model_config=architecture_info,
426            )
427            auto_labeler.run_inference()
428
429    except Exception as e:
430        logging.error(f"Auto-labeling mask workflow failed: {e}")
431        raise
432
433
434def workflow_ensemble_selection(dataset, dataset_info, run_config, wandb_activate=True):
435    """Runs ensemble selection workflow on given dataset using provided configuration."""
436    try:
437        wandb_exit_code = 0
438
439        wandb_run = wandb_init(
440            run_name="Selection by Ensemble",
441            project_name="Ensemble Selection",
442            dataset_name=dataset_info["name"],
443            config=run_config,
444            wandb_activate=wandb_activate,
445        )
446        ensemble_selecter = EnsembleSelection(dataset, run_config)
447        ensemble_selecter.ensemble_selection()
448    except Exception as e:
449        logging.error(f"An error occured during Ensemble Selection: {e}")
450        wandb_exit_code = 1
451
452    finally:
453        wandb_close(wandb_exit_code)
454
455    return True
456
457
458def workflow_class_mapping(
459    dataset,
460    dataset_info,
461    run_config,
462    wandb_activate=True,
463    test_dataset_source=None,
464    test_dataset_target=None,
465):
466    """Runs class mapping workflow to align labels between the source dataset and target dataset."""
467    try:
468        wandb_exit_code = 0
469        # Initialize a wandb run for class mapping
470        wandb_run = wandb_init(
471            run_name="class_mapping",
472            project_name="Class Mapping",
473            dataset_name=dataset_info["name"],
474            config=run_config,
475            wandb_activate=wandb_activate,
476        )
477        class_mapping_models = run_config["hf_models_zeroshot_classification"]
478
479        for model_name in (
480            pbar := tqdm(class_mapping_models, desc="Processing Class Mapping")
481        ):
482            pbar.set_description(f"Zero Shot Classification model {model_name}")
483            mapper = ClassMapper(dataset, model_name, run_config)
484            try:
485                stats = mapper.run_mapping(test_dataset_source, test_dataset_target)
486                # Display statistics only if mapping was successful
487                source_class_counts = stats["total_processed"]
488                logging.info("\nClassification Results for Source Dataset:")
489                for source_class, count in stats["source_class_counts"].items():
490                    percentage = (
491                        (count / source_class_counts) * 100
492                        if source_class_counts > 0
493                        else 0
494                    )
495                    logging.info(
496                        f"{source_class}: {count} samples processed ({percentage:.1f}%)"
497                    )
498
499                # Display statistics for tags added to Target Dataset
500                logging.info("\nTag Addition Results (Target Dataset Tags):")
501                logging.info(f"Total new tags added: {stats['changes_made']}")
502                for target_class, tag_count in stats["tags_added_per_category"].items():
503                    logging.info(f"{target_class} tags added: {tag_count}")
504
505            except Exception as e:
506                logging.error(f"Error during mapping with model {model_name}: {e}")
507
508    except Exception as e:
509        logging.error(f"Error in class_mapping workflow: {e}")
510        wandb_exit_code = 1
511    finally:
512        wandb_close(wandb_exit_code)
513
514    return True
515
516
517def cleanup_memory(do_extensive_cleanup=False):
518    """Clean up memory after workflow execution. 'do_extensive_cleanup' recommended for multiple training sessions in a row."""
519    logging.info("Starting memory cleanup")
520    # Clear CUDA cache
521    if torch.cuda.is_available():
522        torch.cuda.empty_cache()
523
524    # Force garbage collection
525    gc.collect()
526
527    if do_extensive_cleanup:
528
529        # Clear any leftover tensors
530        n_deleted_torch_objects = 0
531        for obj in tqdm(
532            gc.get_objects(), desc="Deleting objects from Python Garbage Collector"
533        ):
534            try:
535                if torch.is_tensor(obj):
536                    del obj
537                    n_deleted_torch_objects += 1
538            except:
539                pass
540
541        logging.info(f"Deleted {n_deleted_torch_objects} torch objects")
542
543        # Final garbage collection
544        gc.collect()
545
546
547class WorkflowExecutor:
548    """Orchestrates the execution of multiple data processing workflows in sequence."""
549
550    def __init__(
551        self,
552        workflows: List[str],
553        selected_dataset: str,
554        dataset: fo.Dataset,
555        dataset_info: Dict,
556    ):
557        """Initializes with specified workflows, dataset selection, and dataset metadata."""
558        self.workflows = workflows
559        self.selected_dataset = selected_dataset
560        self.dataset = dataset
561        self.dataset_info = dataset_info
562
563    def execute(self) -> bool:
564        """Execute all configured workflows in sequence and handle errors."""
565        if len(self.workflows) == 0:
566            logging.error("No workflows selected.")
567            return False
568
569        logging.info(f"Selected workflows: {self.workflows}")
570        for workflow in self.workflows:
571            logging.info(
572                f"Running workflow {workflow} for dataset {self.selected_dataset}"
573            )
574            try:
575                if workflow == "aws_download":
576                    parameter_group = "mcity"
577                    parameters = WORKFLOWS["aws_download"].get(parameter_group, None)
578                    if parameter_group == "mcity":
579                        dataset, dataset_name, _ = workflow_aws_download(parameters)
580                    else:
581                        logging.error(
582                            f"The parameter group {parameter_group} is not supported. As AWS are highly specific, please provide a separate set of parameters and a workflow."
583                        )
584
585                    # Select downloaded dataset for further workflows if configured
586                    if dataset is not None:
587                        if parameters["selected_dataset_overwrite"] == True:
588
589                            dataset_info = {
590                                "name": dataset_name,
591                                "v51_type": "FiftyOneDataset",
592                                "splits": [],
593                            }
594
595                            self.dataset = dataset
596                            self.dataset_info = dataset_info
597                            logging.warning(
598                                f"Overwritting selected dataset {self.selected_dataset} with {dataset_name}"
599                            )
600                            self.selected_dataset = dataset_name
601
602                elif workflow == "embedding_selection":
603                    embedding_models = WORKFLOWS["embedding_selection"][
604                        "embedding_models"
605                    ]
606
607                    for MODEL_NAME in (
608                        pbar := tqdm(embedding_models, desc="Selection by Embeddings")
609                    ):
610
611                        # Status
612                        pbar.set_description(
613                            f"Selection by embeddings with model {MODEL_NAME}."
614                        )
615
616                        # Config
617                        mode = WORKFLOWS["embedding_selection"]["mode"]
618                        parameters = WORKFLOWS["embedding_selection"]["parameters"]
619                        config = {"mode": mode, "parameters": parameters}
620
621                        # Workflow
622                        workflow_embedding_selection(
623                            self.dataset,
624                            self.dataset_info,
625                            MODEL_NAME,
626                            config,
627                        )
628
629                elif workflow == "anomaly_detection":
630
631                    # Config
632                    ano_dec_config = WORKFLOWS["anomaly_detection"]
633                    anomalib_image_models = ano_dec_config["anomalib_image_models"]
634                    eval_metrics = ano_dec_config["anomalib_eval_metrics"]
635
636                    dataset_ano_dec = None
637                    data_root = None
638                    if "train" in ano_dec_config["mode"]:
639                        try:
640                            data_preparer = AnomalyDetectionDataPreparation(
641                                self.dataset, self.selected_dataset
642                            )
643                            dataset_ano_dec = data_preparer.dataset_ano_dec
644                            data_root = data_preparer.export_root
645                        except Exception as e:
646                            logging.error(
647                                f"Error during data preparation for Anomaly Detection: {e}"
648                            )
649
650                    for MODEL_NAME in (
651                        pbar := tqdm(anomalib_image_models, desc="Anomalib")
652                    ):
653                        # Status
654                        pbar.set_description(f"Anomalib model {MODEL_NAME}")
655
656                        # Config
657                        run_config = {
658                            "model_name": MODEL_NAME,
659                            "image_size": anomalib_image_models[MODEL_NAME].get(
660                                "image_size", None
661                            ),
662                            "batch_size": anomalib_image_models[MODEL_NAME].get(
663                                "batch_size", 1
664                            ),
665                            "epochs": ano_dec_config["epochs"],
666                            "early_stop_patience": ano_dec_config[
667                                "early_stop_patience"
668                            ],
669                            "data_root": data_root,
670                            "mode": ano_dec_config["mode"],
671                        }
672
673                        # Workflow
674                        workflow_anomaly_detection(
675                            self.dataset,
676                            dataset_ano_dec,
677                            self.dataset_info,
678                            eval_metrics,
679                            run_config,
680                        )
681
682                elif workflow == "auto_labeling":
683
684                    # Config
685                    SUPPORTED_MODEL_SOURCES = [
686                        "hf_models_objectdetection",
687                        "ultralytics",
688                        "custom_codetr",
689                    ]
690
691                    # Common parameters between models
692                    config_autolabel = WORKFLOWS["auto_labeling"]
693                    mode = config_autolabel["mode"]
694                    epochs = config_autolabel["epochs"]
695                    selected_model_source = config_autolabel["model_source"]
696
697                    # Check if all selected modes are supported
698                    for model_source in selected_model_source:
699                        if model_source not in SUPPORTED_MODEL_SOURCES:
700                            logging.error(
701                                f"Selected model source {model_source} is not supported."
702                            )
703
704                    if SUPPORTED_MODEL_SOURCES[0] in selected_model_source:
705                        # Hugging Face Models
706                        # Single GPU mode (https://github.com/huggingface/transformers/issues/28740)
707                        os.environ["CUDA_VISIBLE_DEVICES"] = "0"
708                        hf_models = config_autolabel["hf_models_objectdetection"]
709
710                        # Dataset Conversion
711                        try:
712                            logging.info("Converting dataset into Hugging Face format.")
713                            pytorch_dataset = FiftyOneTorchDatasetCOCO(self.dataset)
714                            pt_to_hf_converter = TorchToHFDatasetCOCO(pytorch_dataset)
715                            hf_dataset = pt_to_hf_converter.convert()
716                        except Exception as e:
717                            logging.error(f"Error during dataset conversion: {e}")
718
719                        for MODEL_NAME in (
720                            pbar := tqdm(hf_models, desc="Auto Labeling Models")
721                        ):
722                            # Status Update
723                            pbar.set_description(
724                                f"Processing Hugging Face model {MODEL_NAME}"
725                            )
726
727                            # Config
728                            config_model = config_autolabel[
729                                "hf_models_objectdetection"
730                            ][MODEL_NAME]
731
732                            run_config = {
733                                "mode": mode,
734                                "model_name": MODEL_NAME,
735                                "v51_dataset_name": self.selected_dataset,
736                                "epochs": epochs,
737                                "early_stop_threshold": config_autolabel[
738                                    "early_stop_threshold"
739                                ],
740                                "early_stop_patience": config_autolabel[
741                                    "early_stop_patience"
742                                ],
743                                "learning_rate": config_autolabel["learning_rate"],
744                                "weight_decay": config_autolabel["weight_decay"],
745                                "max_grad_norm": config_autolabel["max_grad_norm"],
746                                "batch_size": config_model.get("batch_size", 1),
747                                "image_size": config_model.get("image_size", None),
748                                "n_worker_dataloader": config_autolabel[
749                                    "n_worker_dataloader"
750                                ],
751                                "inference_settings": config_autolabel[
752                                    "inference_settings"
753                                ],
754                            }
755
756                            # Workflow
757                            workflow_auto_labeling_hf(
758                                self.dataset,
759                                hf_dataset,
760                                run_config,
761                            )
762
763                    if SUPPORTED_MODEL_SOURCES[1] in selected_model_source:
764                        # Ultralytics Models
765                        config_ultralytics = config_autolabel["ultralytics"]
766                        models_ultralytics = config_ultralytics["models"]
767                        export_dataset_root = config_ultralytics["export_dataset_root"]
768
769                        # Export data into necessary format
770                        if "train" in mode:
771                            try:
772                                UltralyticsObjectDetection.export_data(
773                                    self.dataset,
774                                    self.dataset_info,
775                                    export_dataset_root,
776                                )
777                            except Exception as e:
778                                logging.error(
779                                    f"Error during Ultralytics dataset export: {e}"
780                                )
781
782                        for model_name in (
783                            pbar := tqdm(
784                                models_ultralytics, desc="Ultralytics training"
785                            )
786                        ):
787                            pbar.set_description(f"Ultralytics model {model_name}")
788                            run_config = {
789                                "mode": mode,
790                                "model_name": model_name,
791                                "v51_dataset_name": self.dataset_info["name"],
792                                "epochs": epochs,
793                                "patience": config_autolabel["early_stop_patience"],
794                                "batch_size": models_ultralytics[model_name][
795                                    "batch_size"
796                                ],
797                                "img_size": models_ultralytics[model_name]["img_size"],
798                                "export_dataset_root": export_dataset_root,
799                                "inference_settings": config_autolabel[
800                                    "inference_settings"
801                                ],
802                                "multi_scale": config_ultralytics["multi_scale"],
803                                "cos_lr": config_ultralytics["cos_lr"],
804                            }
805
806                            workflow_auto_labeling_ultralytics(self.dataset, run_config)
807
808                    if SUPPORTED_MODEL_SOURCES[2] in selected_model_source:
809                        # Custom Co-DETR
810                        config_codetr = config_autolabel["custom_codetr"]
811                        run_config = {
812                            "export_dataset_root": config_codetr["export_dataset_root"],
813                            "container_tool": config_codetr["container_tool"],
814                            "n_gpus": config_codetr["n_gpus"],
815                            "mode": config_autolabel["mode"],
816                            "epochs": config_autolabel["epochs"],
817                            "inference_settings": config_autolabel[
818                                "inference_settings"
819                            ],
820                            "config": None,
821                        }
822                        codetr_configs = config_codetr["configs"]
823
824                        for config in (
825                            pbar := tqdm(
826                                codetr_configs, desc="Processing Co-DETR configurations"
827                            )
828                        ):
829                            pbar.set_description(f"Co-DETR model {config}")
830                            run_config["config"] = config
831                            workflow_auto_labeling_custom_codetr(
832                                self.dataset, self.dataset_info, run_config
833                            )
834
835                elif workflow == "auto_labeling_zero_shot":
836                    config = WORKFLOWS["auto_labeling_zero_shot"]
837                    workflow_zero_shot_object_detection(
838                        self.dataset, self.dataset_info, config
839                    )
840
841                elif workflow == "ensemble_selection":
842                    # Config
843                    run_config = WORKFLOWS["ensemble_selection"]
844
845                    # Workflow
846                    workflow_ensemble_selection(
847                        self.dataset, self.dataset_info, run_config
848                    )
849
850                elif workflow == "auto_label_mask":
851                    config = WORKFLOWS["auto_label_mask"]
852                    workflow_auto_label_mask(self.dataset, self.dataset_info, config)
853
854                elif workflow == "class_mapping":
855                    # Config
856                    run_config = WORKFLOWS["class_mapping"]
857
858                    # Workflow
859                    workflow_class_mapping(
860                        self.dataset,
861                        self.dataset_info,
862                        run_config,
863                        test_dataset_source=None,
864                        test_dataset_target=None,
865                    )
866                elif workflow == "data_ingest":
867                    dataset = run_data_ingest()
868
869                    dataset_info = {
870                        "name": "custom_dataset",
871                        "v51_type": "FiftyOneDataset",
872                        "splits": ["train", "val", "test"],
873                    }
874
875                    self.dataset = dataset
876                    self.dataset_info = dataset_info
877                    self.selected_dataset = "custom_dataset"
878
879                    logging.info(f"Data ingestion completed successfully.")
880
881
882                else:
883                    logging.error(
884                        f"Workflow {workflow} not found. Check available workflows in config.py."
885                    )
886                    return False
887
888                cleanup_memory()  # Clean after each workflow
889                logging.info(f"Completed workflow {workflow} and cleaned up memory")
890
891            except Exception as e:
892                logging.error(f"Workflow {workflow}: An error occurred: {e}")
893                wandb_close(exit_code=1)
894                cleanup_memory()  # Clean up even after failure
895
896        return True
897
898
899def main():
900    """Executes the data processing workflow, loads dataset, and launches Voxel51 visualization interface."""
901    time_start = time.time()
902    configure_logging()
903
904    # Signal handler for CTRL + C
905    signal.signal(signal.SIGINT, signal_handler)
906
907    # Execute workflows
908    if "data_ingest" in SELECTED_WORKFLOW:
909        executor = WorkflowExecutor(
910            SELECTED_WORKFLOW,
911            SELECTED_DATASET["name"],
912            dataset=None,
913            dataset_info=None,
914        )
915        executor.execute()
916
917        # FIX: Pull back outputs after ingestion
918        dataset = executor.dataset
919        dataset_info = executor.dataset_info
920
921    else:
922        dataset, dataset_info = load_dataset(SELECTED_DATASET)
923
924        executor = WorkflowExecutor(
925            SELECTED_WORKFLOW,
926            SELECTED_DATASET["name"],
927            dataset,
928            dataset_info,
929        )
930        executor.execute()
931
932
933    if dataset is not None:
934        dataset.reload()
935        dataset.save()
936        arrange_fields_in_groups(dataset)
937        logging.info(f"Launching Voxel51 session for dataset {dataset_info['name']}.")
938
939        # Dataset stats
940        logging.debug(dataset)
941        logging.debug(dataset.stats(include_media=True))
942
943        # V51 UI launch
944        session = fo.launch_app(
945            dataset, address=V51_ADDRESS, port=V51_PORT, remote=V51_REMOTE
946        )
947    else:
948        logging.info("Skipping Voxel51 session.")
949
950
951
952    time_stop = time.time()
953    logging.info(f"Elapsed time: {time_stop - time_start:.2f} seconds")
954
955
956if __name__ == "__main__":
957    cleanup_memory()
958    main()
IGNORE_FUTURE_WARNINGS = True
wandb_run = None
def signal_handler(sig, frame):
55def signal_handler(sig, frame):
56    """Handle Ctrl+C signal by cleaning up resources and exiting."""
57    logging.error("You pressed Ctrl+C!")
58    try:
59        wandb_close(exit_code=1)
60        cleanup_memory()
61    except:
62        pass
63    sys.exit(0)

Handle Ctrl+C signal by cleaning up resources and exiting.

def workflow_aws_download(parameters, wandb_activate=True):
 66def workflow_aws_download(parameters, wandb_activate=True):
 67    """Download and process data from AWS S3 bucket."""
 68    dataset = None
 69    dataset_name = None
 70    wandb_exit_code = 0
 71    files_to_be_downloaded = 0
 72    try:
 73        # Config
 74        bucket = parameters["bucket"]
 75        prefix = parameters["prefix"]
 76        download_path = parameters["download_path"]
 77        test_run = parameters["test_run"]
 78
 79        # Logging
 80        now = datetime.datetime.now()
 81        datetime_str = now.strftime("%Y-%m-%d_%H-%M-%S")
 82        log_dir = f"logs/tensorboard/aws_{datetime_str}"
 83
 84        dataset_name = f"annarbor_rolling_{datetime_str}"
 85
 86        # Weights and Biases
 87        wandb_run = wandb_init(
 88            run_name=dataset_name,
 89            project_name="AWS Download",
 90            dataset_name=dataset_name,
 91            log_dir=log_dir,
 92            wandb_activate=wandb_activate,
 93        )
 94
 95        # Workflow
 96        aws_downloader = AwsDownloader(
 97            bucket=bucket,
 98            prefix=prefix,
 99            download_path=download_path,
100            test_run=test_run,
101        )
102
103        (
104            sub_folder,
105            files,
106            files_to_be_downloaded,
107            DOWNLOAD_NUMBER_SUCCESS,
108            DOWNLOAD_SIZE_SUCCESS,
109        ) = aws_downloader.download_files(log_dir=log_dir)
110
111        dataset = aws_downloader.decode_data(
112            sub_folder=sub_folder,
113            files=files,
114            log_dir=log_dir,
115            dataset_name=dataset_name,
116        )
117
118    except Exception as e:
119        logging.error(f"AWS Download and Extraction failed: {e}")
120        wandb_exit_code = 1
121
122    finally:
123        wandb_close(wandb_exit_code)
124
125    return dataset, dataset_name, files_to_be_downloaded

Download and process data from AWS S3 bucket.

def workflow_anomaly_detection( dataset_normal, dataset_ano_dec, dataset_info, eval_metrics, run_config, wandb_activate=True):
128def workflow_anomaly_detection(
129    dataset_normal,
130    dataset_ano_dec,
131    dataset_info,
132    eval_metrics,
133    run_config,
134    wandb_activate=True,
135):
136    """Run anomaly detection workflow using specified models and configurations."""
137    try:
138        # Weights and Biases
139        wandb_exit_code = 0
140        wandb_run, log_dir = wandb_init(
141            run_name=run_config["model_name"],
142            project_name="Selection by Anomaly Detection",
143            dataset_name=dataset_info["name"],
144            config=run_config,
145            wandb_activate=wandb_activate,
146        )
147
148        # Workflow
149
150        SUPPORTED_MODES = ["train", "inference"]
151        # Check if all selected modes are supported
152        for mode in run_config["mode"]:
153            if mode not in SUPPORTED_MODES:
154                logging.error(f"Selected mode {mode} is not supported.")
155        if SUPPORTED_MODES[0] in run_config["mode"]:
156            ano_dec = Anodec(
157                dataset=dataset_ano_dec,
158                eval_metrics=eval_metrics,
159                dataset_info=dataset_info,
160                config=run_config,
161                tensorboard_output=log_dir,
162            )
163            ano_dec.train_and_export_model()
164            ano_dec.run_inference(mode=SUPPORTED_MODES[0])
165            ano_dec.eval_v51()
166        if SUPPORTED_MODES[1] in run_config["mode"]:
167            ano_dec = Anodec(
168                dataset=dataset_normal,
169                eval_metrics=eval_metrics,
170                dataset_info=dataset_info,
171                config=run_config,
172                tensorboard_output=log_dir,
173            )
174            ano_dec.run_inference(mode=SUPPORTED_MODES[1])
175
176    except Exception as e:
177        logging.error(
178            f"Error in Anomaly Detection for model {run_config['model_name']}: {e}"
179        )
180        wandb_exit_code = 1
181    finally:
182        wandb_close(exit_code=wandb_exit_code)
183
184    return True

Run anomaly detection workflow using specified models and configurations.

def workflow_embedding_selection(dataset, dataset_info, MODEL_NAME, config, wandb_activate=True):
187def workflow_embedding_selection(
188    dataset, dataset_info, MODEL_NAME, config, wandb_activate=True
189):
190    """Compute embeddings and find representative and rare images for dataset selection."""
191    try:
192        wandb_exit_code = 0
193        wandb_run, log_dir = wandb_init(
194            run_name=MODEL_NAME,
195            project_name="Selection by Embedding",
196            dataset_name=dataset_info["name"],
197            config=config,
198            wandb_activate=wandb_activate,
199        )
200        embedding_selector = EmbeddingSelection(
201            dataset, dataset_info, MODEL_NAME, log_dir
202        )
203
204        if embedding_selector.model_already_used == False:
205
206            embedding_selector.compute_embeddings(config["mode"])
207            embedding_selector.compute_similarity()
208
209            # Find representative and unique samples as center points for further selections
210            thresholds = config["parameters"]
211            embedding_selector.compute_representativeness(
212                thresholds["compute_representativeness"]
213            )
214            embedding_selector.compute_unique_images_greedy(
215                thresholds["compute_unique_images_greedy"]
216            )
217            embedding_selector.compute_unique_images_deterministic(
218                thresholds["compute_unique_images_deterministic"]
219            )
220
221            # Select samples similar to the center points to enlarge the dataset
222            embedding_selector.compute_similar_images(
223                thresholds["compute_similar_images"], thresholds["neighbour_count"]
224            )
225        else:
226            logging.warning(
227                f"Skipping model {embedding_selector.model_name_key}. It was already used for sample selection."
228            )
229
230    except Exception as e:
231        logging.error(f"An error occurred with model {MODEL_NAME}: {e}")
232        wandb_exit_code = 1
233    finally:
234        wandb_close(wandb_exit_code)
235
236    return True

Compute embeddings and find representative and rare images for dataset selection.

def workflow_auto_labeling_ultralytics(dataset, run_config, wandb_activate=True):
239def workflow_auto_labeling_ultralytics(dataset, run_config, wandb_activate=True):
240    """Auto-labeling workflow using Ultralytics models with optional training and inference."""
241    try:
242        wandb_exit_code = 0
243        wandb_run = wandb_init(
244            run_name=run_config["model_name"],
245            project_name="Auto Labeling Ultralytics",
246            dataset_name=run_config["v51_dataset_name"],
247            config=run_config,
248            wandb_activate=wandb_activate,
249        )
250
251        detector = UltralyticsObjectDetection(dataset=dataset, config=run_config)
252
253        # Check if all selected modes are supported
254        SUPPORTED_MODES = ["train", "inference"]
255        for mode in run_config["mode"]:
256            if mode not in SUPPORTED_MODES:
257                logging.error(f"Selected mode {mode} is not supported.")
258
259        if SUPPORTED_MODES[0] in run_config["mode"]:
260            logging.info(f"Training model {run_config['model_name']}")
261            detector.train()
262        if SUPPORTED_MODES[1] in run_config["mode"]:
263            logging.info(f"Running inference for model {run_config['model_name']}")
264            detector.inference()
265
266    except Exception as e:
267        logging.error(f"An error occurred with model {run_config['model_name']}: {e}")
268        wandb_exit_code = 1
269
270    finally:
271        wandb_close(wandb_exit_code)
272
273    return True

Auto-labeling workflow using Ultralytics models with optional training and inference.

def workflow_auto_labeling_hf(dataset, hf_dataset, run_config, wandb_activate=True):
276def workflow_auto_labeling_hf(dataset, hf_dataset, run_config, wandb_activate=True):
277    """Auto-labeling using Hugging Face models on a dataset, including training and/or inference based on the provided configuration."""
278    try:
279        wandb_exit_code = 0
280        wandb_run = wandb_init(
281            run_name=run_config["model_name"],
282            project_name="Auto Labeling Hugging Face",
283            dataset_name=run_config["v51_dataset_name"],
284            config=run_config,
285            wandb_activate=wandb_activate,
286        )
287
288        detector = HuggingFaceObjectDetection(
289            dataset=dataset,
290            config=run_config,
291        )
292        SUPPORTED_MODES = ["train", "inference"]
293
294        # Check if all selected modes are supported
295        for mode in run_config["mode"]:
296            if mode not in SUPPORTED_MODES:
297                logging.error(f"Selected mode {mode} is not supported.")
298        if SUPPORTED_MODES[0] in run_config["mode"]:
299            logging.info(f"Training model {run_config['model_name']}")
300            detector.train(hf_dataset)
301        if SUPPORTED_MODES[1] in run_config["mode"]:
302            logging.info(f"Running inference for model {run_config['model_name']}")
303            detector.inference(inference_settings=run_config["inference_settings"])
304
305    except Exception as e:
306        logging.error(f"An error occurred with model {run_config['model_name']}: {e}")
307        wandb_exit_code = 1
308
309    finally:
310        wandb_close(wandb_exit_code)
311
312    return True

Auto-labeling using Hugging Face models on a dataset, including training and/or inference based on the provided configuration.

def workflow_auto_labeling_custom_codetr(dataset, dataset_info, run_config, wandb_activate=True):
315def workflow_auto_labeling_custom_codetr(
316    dataset, dataset_info, run_config, wandb_activate=True
317):
318    """Auto labeling workflow using Co-DETR model supporting training and inference modes."""
319
320    try:
321        wandb_exit_code = 0
322        wandb_run = wandb_init(
323            run_name=run_config["config"],
324            project_name="Co-DETR Auto Labeling",
325            dataset_name=dataset_info["name"],
326            config=run_config,
327            wandb_activate=wandb_activate,
328        )
329
330        mode = run_config["mode"]
331
332        detector = CustomCoDETRObjectDetection(dataset, dataset_info, run_config)
333        detector.convert_data()
334        if "train" in mode:
335            detector.update_config_file(
336                dataset_name=dataset_info["name"],
337                config_file=run_config["config"],
338                max_epochs=run_config["epochs"],
339            )
340            detector.train(
341                run_config["config"], run_config["n_gpus"], run_config["container_tool"]
342            )
343        if "inference" in mode:
344            detector.run_inference(
345                dataset,
346                run_config["config"],
347                run_config["n_gpus"],
348                run_config["container_tool"],
349                run_config["inference_settings"],
350            )
351    except Exception as e:
352        logging.error(f"Error during CoDETR training: {e}")
353        wandb_exit_code = 1
354    finally:
355        wandb_close(wandb_exit_code)
356
357    return True

Auto labeling workflow using Co-DETR model supporting training and inference modes.

def workflow_zero_shot_object_detection(dataset, dataset_info, config):
360def workflow_zero_shot_object_detection(dataset, dataset_info, config):
361    """Run zero-shot object detection on a dataset using models from Huggingface, supporting both single and multi-GPU inference."""
362    # Set multiprocessing mode for CUDA multiprocessing
363    try:
364        mp.set_start_method("spawn", force=True)
365        logging.debug("Successfully set multiprocessing start method to 'spawn'")
366    except RuntimeError as e:
367        # This is expected if the start method was already set
368        logging.debug(f"Multiprocessing start method was already set: {e}")
369    except Exception as e:
370        # Handle any other unexpected errors
371        logging.error(f"Failed to set multiprocessing start method: {e}")
372
373    # Zero-shot object detector models from Huggingface
374    # Optimized for parallel multi-GPU inference, also supports single GPU
375    dataset_torch = FiftyOneTorchDatasetCOCO(dataset)
376    detector = ZeroShotObjectDetection(
377        dataset_torch=dataset_torch, dataset_info=dataset_info, config=config
378    )
379
380    # Check if model detections are already stored in V51 dataset or on disk
381    models_splits_dict = detector.exclude_stored_predictions(
382        dataset_v51=dataset, config=config
383    )
384    if len(models_splits_dict) > 0:
385        config["hf_models_zeroshot_objectdetection"] = models_splits_dict
386        distributor = ZeroShotDistributer(
387            config=config,
388            n_samples=len(dataset_torch),
389            dataset_info=dataset_info,
390            detector=detector,
391        )
392        distributor.distribute_and_run()
393    else:
394        logging.info(
395            "All zero shot models already have predictions stored in the dataset."
396        )
397
398    # To make new fields available to follow-up processes
399    dataset.reload()
400    dataset.save()
401
402    return True

Run zero-shot object detection on a dataset using models from Huggingface, supporting both single and multi-GPU inference.

def workflow_auto_label_mask(dataset, dataset_info, config):
405def workflow_auto_label_mask(dataset, dataset_info, config):
406    try:
407        depth_config = config["depth_estimation"]
408        seg_config = config["semantic_segmentation"]
409
410        for architecture_name, architecture_info in depth_config.items():
411            auto_labeler = AutoLabelMask(
412                dataset=dataset,
413                dataset_info=dataset_info,
414                model_name=architecture_name,
415                task_type="depth_estimation",
416                model_config=architecture_info,
417            )
418            auto_labeler.run_inference()
419
420        for architecture_name, architecture_info in seg_config.items():
421            auto_labeler = AutoLabelMask(
422                dataset=dataset,
423                dataset_info=dataset_info,
424                model_name=architecture_name,
425                task_type="semantic_segmentation",
426                model_config=architecture_info,
427            )
428            auto_labeler.run_inference()
429
430    except Exception as e:
431        logging.error(f"Auto-labeling mask workflow failed: {e}")
432        raise
def workflow_ensemble_selection(dataset, dataset_info, run_config, wandb_activate=True):
435def workflow_ensemble_selection(dataset, dataset_info, run_config, wandb_activate=True):
436    """Runs ensemble selection workflow on given dataset using provided configuration."""
437    try:
438        wandb_exit_code = 0
439
440        wandb_run = wandb_init(
441            run_name="Selection by Ensemble",
442            project_name="Ensemble Selection",
443            dataset_name=dataset_info["name"],
444            config=run_config,
445            wandb_activate=wandb_activate,
446        )
447        ensemble_selecter = EnsembleSelection(dataset, run_config)
448        ensemble_selecter.ensemble_selection()
449    except Exception as e:
450        logging.error(f"An error occured during Ensemble Selection: {e}")
451        wandb_exit_code = 1
452
453    finally:
454        wandb_close(wandb_exit_code)
455
456    return True

Runs ensemble selection workflow on given dataset using provided configuration.

def workflow_class_mapping( dataset, dataset_info, run_config, wandb_activate=True, test_dataset_source=None, test_dataset_target=None):
459def workflow_class_mapping(
460    dataset,
461    dataset_info,
462    run_config,
463    wandb_activate=True,
464    test_dataset_source=None,
465    test_dataset_target=None,
466):
467    """Runs class mapping workflow to align labels between the source dataset and target dataset."""
468    try:
469        wandb_exit_code = 0
470        # Initialize a wandb run for class mapping
471        wandb_run = wandb_init(
472            run_name="class_mapping",
473            project_name="Class Mapping",
474            dataset_name=dataset_info["name"],
475            config=run_config,
476            wandb_activate=wandb_activate,
477        )
478        class_mapping_models = run_config["hf_models_zeroshot_classification"]
479
480        for model_name in (
481            pbar := tqdm(class_mapping_models, desc="Processing Class Mapping")
482        ):
483            pbar.set_description(f"Zero Shot Classification model {model_name}")
484            mapper = ClassMapper(dataset, model_name, run_config)
485            try:
486                stats = mapper.run_mapping(test_dataset_source, test_dataset_target)
487                # Display statistics only if mapping was successful
488                source_class_counts = stats["total_processed"]
489                logging.info("\nClassification Results for Source Dataset:")
490                for source_class, count in stats["source_class_counts"].items():
491                    percentage = (
492                        (count / source_class_counts) * 100
493                        if source_class_counts > 0
494                        else 0
495                    )
496                    logging.info(
497                        f"{source_class}: {count} samples processed ({percentage:.1f}%)"
498                    )
499
500                # Display statistics for tags added to Target Dataset
501                logging.info("\nTag Addition Results (Target Dataset Tags):")
502                logging.info(f"Total new tags added: {stats['changes_made']}")
503                for target_class, tag_count in stats["tags_added_per_category"].items():
504                    logging.info(f"{target_class} tags added: {tag_count}")
505
506            except Exception as e:
507                logging.error(f"Error during mapping with model {model_name}: {e}")
508
509    except Exception as e:
510        logging.error(f"Error in class_mapping workflow: {e}")
511        wandb_exit_code = 1
512    finally:
513        wandb_close(wandb_exit_code)
514
515    return True

Runs class mapping workflow to align labels between the source dataset and target dataset.

def cleanup_memory(do_extensive_cleanup=False):
518def cleanup_memory(do_extensive_cleanup=False):
519    """Clean up memory after workflow execution. 'do_extensive_cleanup' recommended for multiple training sessions in a row."""
520    logging.info("Starting memory cleanup")
521    # Clear CUDA cache
522    if torch.cuda.is_available():
523        torch.cuda.empty_cache()
524
525    # Force garbage collection
526    gc.collect()
527
528    if do_extensive_cleanup:
529
530        # Clear any leftover tensors
531        n_deleted_torch_objects = 0
532        for obj in tqdm(
533            gc.get_objects(), desc="Deleting objects from Python Garbage Collector"
534        ):
535            try:
536                if torch.is_tensor(obj):
537                    del obj
538                    n_deleted_torch_objects += 1
539            except:
540                pass
541
542        logging.info(f"Deleted {n_deleted_torch_objects} torch objects")
543
544        # Final garbage collection
545        gc.collect()

Clean up memory after workflow execution. 'do_extensive_cleanup' recommended for multiple training sessions in a row.

class WorkflowExecutor:
548class WorkflowExecutor:
549    """Orchestrates the execution of multiple data processing workflows in sequence."""
550
551    def __init__(
552        self,
553        workflows: List[str],
554        selected_dataset: str,
555        dataset: fo.Dataset,
556        dataset_info: Dict,
557    ):
558        """Initializes with specified workflows, dataset selection, and dataset metadata."""
559        self.workflows = workflows
560        self.selected_dataset = selected_dataset
561        self.dataset = dataset
562        self.dataset_info = dataset_info
563
564    def execute(self) -> bool:
565        """Execute all configured workflows in sequence and handle errors."""
566        if len(self.workflows) == 0:
567            logging.error("No workflows selected.")
568            return False
569
570        logging.info(f"Selected workflows: {self.workflows}")
571        for workflow in self.workflows:
572            logging.info(
573                f"Running workflow {workflow} for dataset {self.selected_dataset}"
574            )
575            try:
576                if workflow == "aws_download":
577                    parameter_group = "mcity"
578                    parameters = WORKFLOWS["aws_download"].get(parameter_group, None)
579                    if parameter_group == "mcity":
580                        dataset, dataset_name, _ = workflow_aws_download(parameters)
581                    else:
582                        logging.error(
583                            f"The parameter group {parameter_group} is not supported. As AWS are highly specific, please provide a separate set of parameters and a workflow."
584                        )
585
586                    # Select downloaded dataset for further workflows if configured
587                    if dataset is not None:
588                        if parameters["selected_dataset_overwrite"] == True:
589
590                            dataset_info = {
591                                "name": dataset_name,
592                                "v51_type": "FiftyOneDataset",
593                                "splits": [],
594                            }
595
596                            self.dataset = dataset
597                            self.dataset_info = dataset_info
598                            logging.warning(
599                                f"Overwritting selected dataset {self.selected_dataset} with {dataset_name}"
600                            )
601                            self.selected_dataset = dataset_name
602
603                elif workflow == "embedding_selection":
604                    embedding_models = WORKFLOWS["embedding_selection"][
605                        "embedding_models"
606                    ]
607
608                    for MODEL_NAME in (
609                        pbar := tqdm(embedding_models, desc="Selection by Embeddings")
610                    ):
611
612                        # Status
613                        pbar.set_description(
614                            f"Selection by embeddings with model {MODEL_NAME}."
615                        )
616
617                        # Config
618                        mode = WORKFLOWS["embedding_selection"]["mode"]
619                        parameters = WORKFLOWS["embedding_selection"]["parameters"]
620                        config = {"mode": mode, "parameters": parameters}
621
622                        # Workflow
623                        workflow_embedding_selection(
624                            self.dataset,
625                            self.dataset_info,
626                            MODEL_NAME,
627                            config,
628                        )
629
630                elif workflow == "anomaly_detection":
631
632                    # Config
633                    ano_dec_config = WORKFLOWS["anomaly_detection"]
634                    anomalib_image_models = ano_dec_config["anomalib_image_models"]
635                    eval_metrics = ano_dec_config["anomalib_eval_metrics"]
636
637                    dataset_ano_dec = None
638                    data_root = None
639                    if "train" in ano_dec_config["mode"]:
640                        try:
641                            data_preparer = AnomalyDetectionDataPreparation(
642                                self.dataset, self.selected_dataset
643                            )
644                            dataset_ano_dec = data_preparer.dataset_ano_dec
645                            data_root = data_preparer.export_root
646                        except Exception as e:
647                            logging.error(
648                                f"Error during data preparation for Anomaly Detection: {e}"
649                            )
650
651                    for MODEL_NAME in (
652                        pbar := tqdm(anomalib_image_models, desc="Anomalib")
653                    ):
654                        # Status
655                        pbar.set_description(f"Anomalib model {MODEL_NAME}")
656
657                        # Config
658                        run_config = {
659                            "model_name": MODEL_NAME,
660                            "image_size": anomalib_image_models[MODEL_NAME].get(
661                                "image_size", None
662                            ),
663                            "batch_size": anomalib_image_models[MODEL_NAME].get(
664                                "batch_size", 1
665                            ),
666                            "epochs": ano_dec_config["epochs"],
667                            "early_stop_patience": ano_dec_config[
668                                "early_stop_patience"
669                            ],
670                            "data_root": data_root,
671                            "mode": ano_dec_config["mode"],
672                        }
673
674                        # Workflow
675                        workflow_anomaly_detection(
676                            self.dataset,
677                            dataset_ano_dec,
678                            self.dataset_info,
679                            eval_metrics,
680                            run_config,
681                        )
682
683                elif workflow == "auto_labeling":
684
685                    # Config
686                    SUPPORTED_MODEL_SOURCES = [
687                        "hf_models_objectdetection",
688                        "ultralytics",
689                        "custom_codetr",
690                    ]
691
692                    # Common parameters between models
693                    config_autolabel = WORKFLOWS["auto_labeling"]
694                    mode = config_autolabel["mode"]
695                    epochs = config_autolabel["epochs"]
696                    selected_model_source = config_autolabel["model_source"]
697
698                    # Check if all selected modes are supported
699                    for model_source in selected_model_source:
700                        if model_source not in SUPPORTED_MODEL_SOURCES:
701                            logging.error(
702                                f"Selected model source {model_source} is not supported."
703                            )
704
705                    if SUPPORTED_MODEL_SOURCES[0] in selected_model_source:
706                        # Hugging Face Models
707                        # Single GPU mode (https://github.com/huggingface/transformers/issues/28740)
708                        os.environ["CUDA_VISIBLE_DEVICES"] = "0"
709                        hf_models = config_autolabel["hf_models_objectdetection"]
710
711                        # Dataset Conversion
712                        try:
713                            logging.info("Converting dataset into Hugging Face format.")
714                            pytorch_dataset = FiftyOneTorchDatasetCOCO(self.dataset)
715                            pt_to_hf_converter = TorchToHFDatasetCOCO(pytorch_dataset)
716                            hf_dataset = pt_to_hf_converter.convert()
717                        except Exception as e:
718                            logging.error(f"Error during dataset conversion: {e}")
719
720                        for MODEL_NAME in (
721                            pbar := tqdm(hf_models, desc="Auto Labeling Models")
722                        ):
723                            # Status Update
724                            pbar.set_description(
725                                f"Processing Hugging Face model {MODEL_NAME}"
726                            )
727
728                            # Config
729                            config_model = config_autolabel[
730                                "hf_models_objectdetection"
731                            ][MODEL_NAME]
732
733                            run_config = {
734                                "mode": mode,
735                                "model_name": MODEL_NAME,
736                                "v51_dataset_name": self.selected_dataset,
737                                "epochs": epochs,
738                                "early_stop_threshold": config_autolabel[
739                                    "early_stop_threshold"
740                                ],
741                                "early_stop_patience": config_autolabel[
742                                    "early_stop_patience"
743                                ],
744                                "learning_rate": config_autolabel["learning_rate"],
745                                "weight_decay": config_autolabel["weight_decay"],
746                                "max_grad_norm": config_autolabel["max_grad_norm"],
747                                "batch_size": config_model.get("batch_size", 1),
748                                "image_size": config_model.get("image_size", None),
749                                "n_worker_dataloader": config_autolabel[
750                                    "n_worker_dataloader"
751                                ],
752                                "inference_settings": config_autolabel[
753                                    "inference_settings"
754                                ],
755                            }
756
757                            # Workflow
758                            workflow_auto_labeling_hf(
759                                self.dataset,
760                                hf_dataset,
761                                run_config,
762                            )
763
764                    if SUPPORTED_MODEL_SOURCES[1] in selected_model_source:
765                        # Ultralytics Models
766                        config_ultralytics = config_autolabel["ultralytics"]
767                        models_ultralytics = config_ultralytics["models"]
768                        export_dataset_root = config_ultralytics["export_dataset_root"]
769
770                        # Export data into necessary format
771                        if "train" in mode:
772                            try:
773                                UltralyticsObjectDetection.export_data(
774                                    self.dataset,
775                                    self.dataset_info,
776                                    export_dataset_root,
777                                )
778                            except Exception as e:
779                                logging.error(
780                                    f"Error during Ultralytics dataset export: {e}"
781                                )
782
783                        for model_name in (
784                            pbar := tqdm(
785                                models_ultralytics, desc="Ultralytics training"
786                            )
787                        ):
788                            pbar.set_description(f"Ultralytics model {model_name}")
789                            run_config = {
790                                "mode": mode,
791                                "model_name": model_name,
792                                "v51_dataset_name": self.dataset_info["name"],
793                                "epochs": epochs,
794                                "patience": config_autolabel["early_stop_patience"],
795                                "batch_size": models_ultralytics[model_name][
796                                    "batch_size"
797                                ],
798                                "img_size": models_ultralytics[model_name]["img_size"],
799                                "export_dataset_root": export_dataset_root,
800                                "inference_settings": config_autolabel[
801                                    "inference_settings"
802                                ],
803                                "multi_scale": config_ultralytics["multi_scale"],
804                                "cos_lr": config_ultralytics["cos_lr"],
805                            }
806
807                            workflow_auto_labeling_ultralytics(self.dataset, run_config)
808
809                    if SUPPORTED_MODEL_SOURCES[2] in selected_model_source:
810                        # Custom Co-DETR
811                        config_codetr = config_autolabel["custom_codetr"]
812                        run_config = {
813                            "export_dataset_root": config_codetr["export_dataset_root"],
814                            "container_tool": config_codetr["container_tool"],
815                            "n_gpus": config_codetr["n_gpus"],
816                            "mode": config_autolabel["mode"],
817                            "epochs": config_autolabel["epochs"],
818                            "inference_settings": config_autolabel[
819                                "inference_settings"
820                            ],
821                            "config": None,
822                        }
823                        codetr_configs = config_codetr["configs"]
824
825                        for config in (
826                            pbar := tqdm(
827                                codetr_configs, desc="Processing Co-DETR configurations"
828                            )
829                        ):
830                            pbar.set_description(f"Co-DETR model {config}")
831                            run_config["config"] = config
832                            workflow_auto_labeling_custom_codetr(
833                                self.dataset, self.dataset_info, run_config
834                            )
835
836                elif workflow == "auto_labeling_zero_shot":
837                    config = WORKFLOWS["auto_labeling_zero_shot"]
838                    workflow_zero_shot_object_detection(
839                        self.dataset, self.dataset_info, config
840                    )
841
842                elif workflow == "ensemble_selection":
843                    # Config
844                    run_config = WORKFLOWS["ensemble_selection"]
845
846                    # Workflow
847                    workflow_ensemble_selection(
848                        self.dataset, self.dataset_info, run_config
849                    )
850
851                elif workflow == "auto_label_mask":
852                    config = WORKFLOWS["auto_label_mask"]
853                    workflow_auto_label_mask(self.dataset, self.dataset_info, config)
854
855                elif workflow == "class_mapping":
856                    # Config
857                    run_config = WORKFLOWS["class_mapping"]
858
859                    # Workflow
860                    workflow_class_mapping(
861                        self.dataset,
862                        self.dataset_info,
863                        run_config,
864                        test_dataset_source=None,
865                        test_dataset_target=None,
866                    )
867                elif workflow == "data_ingest":
868                    dataset = run_data_ingest()
869
870                    dataset_info = {
871                        "name": "custom_dataset",
872                        "v51_type": "FiftyOneDataset",
873                        "splits": ["train", "val", "test"],
874                    }
875
876                    self.dataset = dataset
877                    self.dataset_info = dataset_info
878                    self.selected_dataset = "custom_dataset"
879
880                    logging.info(f"Data ingestion completed successfully.")
881
882
883                else:
884                    logging.error(
885                        f"Workflow {workflow} not found. Check available workflows in config.py."
886                    )
887                    return False
888
889                cleanup_memory()  # Clean after each workflow
890                logging.info(f"Completed workflow {workflow} and cleaned up memory")
891
892            except Exception as e:
893                logging.error(f"Workflow {workflow}: An error occurred: {e}")
894                wandb_close(exit_code=1)
895                cleanup_memory()  # Clean up even after failure
896
897        return True

Orchestrates the execution of multiple data processing workflows in sequence.

WorkflowExecutor( workflows: List[str], selected_dataset: str, dataset: fiftyone.core.dataset.Dataset, dataset_info: Dict)
551    def __init__(
552        self,
553        workflows: List[str],
554        selected_dataset: str,
555        dataset: fo.Dataset,
556        dataset_info: Dict,
557    ):
558        """Initializes with specified workflows, dataset selection, and dataset metadata."""
559        self.workflows = workflows
560        self.selected_dataset = selected_dataset
561        self.dataset = dataset
562        self.dataset_info = dataset_info

Initializes with specified workflows, dataset selection, and dataset metadata.

workflows
selected_dataset
dataset
dataset_info
def execute(self) -> bool:
564    def execute(self) -> bool:
565        """Execute all configured workflows in sequence and handle errors."""
566        if len(self.workflows) == 0:
567            logging.error("No workflows selected.")
568            return False
569
570        logging.info(f"Selected workflows: {self.workflows}")
571        for workflow in self.workflows:
572            logging.info(
573                f"Running workflow {workflow} for dataset {self.selected_dataset}"
574            )
575            try:
576                if workflow == "aws_download":
577                    parameter_group = "mcity"
578                    parameters = WORKFLOWS["aws_download"].get(parameter_group, None)
579                    if parameter_group == "mcity":
580                        dataset, dataset_name, _ = workflow_aws_download(parameters)
581                    else:
582                        logging.error(
583                            f"The parameter group {parameter_group} is not supported. As AWS are highly specific, please provide a separate set of parameters and a workflow."
584                        )
585
586                    # Select downloaded dataset for further workflows if configured
587                    if dataset is not None:
588                        if parameters["selected_dataset_overwrite"] == True:
589
590                            dataset_info = {
591                                "name": dataset_name,
592                                "v51_type": "FiftyOneDataset",
593                                "splits": [],
594                            }
595
596                            self.dataset = dataset
597                            self.dataset_info = dataset_info
598                            logging.warning(
599                                f"Overwritting selected dataset {self.selected_dataset} with {dataset_name}"
600                            )
601                            self.selected_dataset = dataset_name
602
603                elif workflow == "embedding_selection":
604                    embedding_models = WORKFLOWS["embedding_selection"][
605                        "embedding_models"
606                    ]
607
608                    for MODEL_NAME in (
609                        pbar := tqdm(embedding_models, desc="Selection by Embeddings")
610                    ):
611
612                        # Status
613                        pbar.set_description(
614                            f"Selection by embeddings with model {MODEL_NAME}."
615                        )
616
617                        # Config
618                        mode = WORKFLOWS["embedding_selection"]["mode"]
619                        parameters = WORKFLOWS["embedding_selection"]["parameters"]
620                        config = {"mode": mode, "parameters": parameters}
621
622                        # Workflow
623                        workflow_embedding_selection(
624                            self.dataset,
625                            self.dataset_info,
626                            MODEL_NAME,
627                            config,
628                        )
629
630                elif workflow == "anomaly_detection":
631
632                    # Config
633                    ano_dec_config = WORKFLOWS["anomaly_detection"]
634                    anomalib_image_models = ano_dec_config["anomalib_image_models"]
635                    eval_metrics = ano_dec_config["anomalib_eval_metrics"]
636
637                    dataset_ano_dec = None
638                    data_root = None
639                    if "train" in ano_dec_config["mode"]:
640                        try:
641                            data_preparer = AnomalyDetectionDataPreparation(
642                                self.dataset, self.selected_dataset
643                            )
644                            dataset_ano_dec = data_preparer.dataset_ano_dec
645                            data_root = data_preparer.export_root
646                        except Exception as e:
647                            logging.error(
648                                f"Error during data preparation for Anomaly Detection: {e}"
649                            )
650
651                    for MODEL_NAME in (
652                        pbar := tqdm(anomalib_image_models, desc="Anomalib")
653                    ):
654                        # Status
655                        pbar.set_description(f"Anomalib model {MODEL_NAME}")
656
657                        # Config
658                        run_config = {
659                            "model_name": MODEL_NAME,
660                            "image_size": anomalib_image_models[MODEL_NAME].get(
661                                "image_size", None
662                            ),
663                            "batch_size": anomalib_image_models[MODEL_NAME].get(
664                                "batch_size", 1
665                            ),
666                            "epochs": ano_dec_config["epochs"],
667                            "early_stop_patience": ano_dec_config[
668                                "early_stop_patience"
669                            ],
670                            "data_root": data_root,
671                            "mode": ano_dec_config["mode"],
672                        }
673
674                        # Workflow
675                        workflow_anomaly_detection(
676                            self.dataset,
677                            dataset_ano_dec,
678                            self.dataset_info,
679                            eval_metrics,
680                            run_config,
681                        )
682
683                elif workflow == "auto_labeling":
684
685                    # Config
686                    SUPPORTED_MODEL_SOURCES = [
687                        "hf_models_objectdetection",
688                        "ultralytics",
689                        "custom_codetr",
690                    ]
691
692                    # Common parameters between models
693                    config_autolabel = WORKFLOWS["auto_labeling"]
694                    mode = config_autolabel["mode"]
695                    epochs = config_autolabel["epochs"]
696                    selected_model_source = config_autolabel["model_source"]
697
698                    # Check if all selected modes are supported
699                    for model_source in selected_model_source:
700                        if model_source not in SUPPORTED_MODEL_SOURCES:
701                            logging.error(
702                                f"Selected model source {model_source} is not supported."
703                            )
704
705                    if SUPPORTED_MODEL_SOURCES[0] in selected_model_source:
706                        # Hugging Face Models
707                        # Single GPU mode (https://github.com/huggingface/transformers/issues/28740)
708                        os.environ["CUDA_VISIBLE_DEVICES"] = "0"
709                        hf_models = config_autolabel["hf_models_objectdetection"]
710
711                        # Dataset Conversion
712                        try:
713                            logging.info("Converting dataset into Hugging Face format.")
714                            pytorch_dataset = FiftyOneTorchDatasetCOCO(self.dataset)
715                            pt_to_hf_converter = TorchToHFDatasetCOCO(pytorch_dataset)
716                            hf_dataset = pt_to_hf_converter.convert()
717                        except Exception as e:
718                            logging.error(f"Error during dataset conversion: {e}")
719
720                        for MODEL_NAME in (
721                            pbar := tqdm(hf_models, desc="Auto Labeling Models")
722                        ):
723                            # Status Update
724                            pbar.set_description(
725                                f"Processing Hugging Face model {MODEL_NAME}"
726                            )
727
728                            # Config
729                            config_model = config_autolabel[
730                                "hf_models_objectdetection"
731                            ][MODEL_NAME]
732
733                            run_config = {
734                                "mode": mode,
735                                "model_name": MODEL_NAME,
736                                "v51_dataset_name": self.selected_dataset,
737                                "epochs": epochs,
738                                "early_stop_threshold": config_autolabel[
739                                    "early_stop_threshold"
740                                ],
741                                "early_stop_patience": config_autolabel[
742                                    "early_stop_patience"
743                                ],
744                                "learning_rate": config_autolabel["learning_rate"],
745                                "weight_decay": config_autolabel["weight_decay"],
746                                "max_grad_norm": config_autolabel["max_grad_norm"],
747                                "batch_size": config_model.get("batch_size", 1),
748                                "image_size": config_model.get("image_size", None),
749                                "n_worker_dataloader": config_autolabel[
750                                    "n_worker_dataloader"
751                                ],
752                                "inference_settings": config_autolabel[
753                                    "inference_settings"
754                                ],
755                            }
756
757                            # Workflow
758                            workflow_auto_labeling_hf(
759                                self.dataset,
760                                hf_dataset,
761                                run_config,
762                            )
763
764                    if SUPPORTED_MODEL_SOURCES[1] in selected_model_source:
765                        # Ultralytics Models
766                        config_ultralytics = config_autolabel["ultralytics"]
767                        models_ultralytics = config_ultralytics["models"]
768                        export_dataset_root = config_ultralytics["export_dataset_root"]
769
770                        # Export data into necessary format
771                        if "train" in mode:
772                            try:
773                                UltralyticsObjectDetection.export_data(
774                                    self.dataset,
775                                    self.dataset_info,
776                                    export_dataset_root,
777                                )
778                            except Exception as e:
779                                logging.error(
780                                    f"Error during Ultralytics dataset export: {e}"
781                                )
782
783                        for model_name in (
784                            pbar := tqdm(
785                                models_ultralytics, desc="Ultralytics training"
786                            )
787                        ):
788                            pbar.set_description(f"Ultralytics model {model_name}")
789                            run_config = {
790                                "mode": mode,
791                                "model_name": model_name,
792                                "v51_dataset_name": self.dataset_info["name"],
793                                "epochs": epochs,
794                                "patience": config_autolabel["early_stop_patience"],
795                                "batch_size": models_ultralytics[model_name][
796                                    "batch_size"
797                                ],
798                                "img_size": models_ultralytics[model_name]["img_size"],
799                                "export_dataset_root": export_dataset_root,
800                                "inference_settings": config_autolabel[
801                                    "inference_settings"
802                                ],
803                                "multi_scale": config_ultralytics["multi_scale"],
804                                "cos_lr": config_ultralytics["cos_lr"],
805                            }
806
807                            workflow_auto_labeling_ultralytics(self.dataset, run_config)
808
809                    if SUPPORTED_MODEL_SOURCES[2] in selected_model_source:
810                        # Custom Co-DETR
811                        config_codetr = config_autolabel["custom_codetr"]
812                        run_config = {
813                            "export_dataset_root": config_codetr["export_dataset_root"],
814                            "container_tool": config_codetr["container_tool"],
815                            "n_gpus": config_codetr["n_gpus"],
816                            "mode": config_autolabel["mode"],
817                            "epochs": config_autolabel["epochs"],
818                            "inference_settings": config_autolabel[
819                                "inference_settings"
820                            ],
821                            "config": None,
822                        }
823                        codetr_configs = config_codetr["configs"]
824
825                        for config in (
826                            pbar := tqdm(
827                                codetr_configs, desc="Processing Co-DETR configurations"
828                            )
829                        ):
830                            pbar.set_description(f"Co-DETR model {config}")
831                            run_config["config"] = config
832                            workflow_auto_labeling_custom_codetr(
833                                self.dataset, self.dataset_info, run_config
834                            )
835
836                elif workflow == "auto_labeling_zero_shot":
837                    config = WORKFLOWS["auto_labeling_zero_shot"]
838                    workflow_zero_shot_object_detection(
839                        self.dataset, self.dataset_info, config
840                    )
841
842                elif workflow == "ensemble_selection":
843                    # Config
844                    run_config = WORKFLOWS["ensemble_selection"]
845
846                    # Workflow
847                    workflow_ensemble_selection(
848                        self.dataset, self.dataset_info, run_config
849                    )
850
851                elif workflow == "auto_label_mask":
852                    config = WORKFLOWS["auto_label_mask"]
853                    workflow_auto_label_mask(self.dataset, self.dataset_info, config)
854
855                elif workflow == "class_mapping":
856                    # Config
857                    run_config = WORKFLOWS["class_mapping"]
858
859                    # Workflow
860                    workflow_class_mapping(
861                        self.dataset,
862                        self.dataset_info,
863                        run_config,
864                        test_dataset_source=None,
865                        test_dataset_target=None,
866                    )
867                elif workflow == "data_ingest":
868                    dataset = run_data_ingest()
869
870                    dataset_info = {
871                        "name": "custom_dataset",
872                        "v51_type": "FiftyOneDataset",
873                        "splits": ["train", "val", "test"],
874                    }
875
876                    self.dataset = dataset
877                    self.dataset_info = dataset_info
878                    self.selected_dataset = "custom_dataset"
879
880                    logging.info(f"Data ingestion completed successfully.")
881
882
883                else:
884                    logging.error(
885                        f"Workflow {workflow} not found. Check available workflows in config.py."
886                    )
887                    return False
888
889                cleanup_memory()  # Clean after each workflow
890                logging.info(f"Completed workflow {workflow} and cleaned up memory")
891
892            except Exception as e:
893                logging.error(f"Workflow {workflow}: An error occurred: {e}")
894                wandb_close(exit_code=1)
895                cleanup_memory()  # Clean up even after failure
896
897        return True

Execute all configured workflows in sequence and handle errors.

def main():
900def main():
901    """Executes the data processing workflow, loads dataset, and launches Voxel51 visualization interface."""
902    time_start = time.time()
903    configure_logging()
904
905    # Signal handler for CTRL + C
906    signal.signal(signal.SIGINT, signal_handler)
907
908    # Execute workflows
909    if "data_ingest" in SELECTED_WORKFLOW:
910        executor = WorkflowExecutor(
911            SELECTED_WORKFLOW,
912            SELECTED_DATASET["name"],
913            dataset=None,
914            dataset_info=None,
915        )
916        executor.execute()
917
918        # FIX: Pull back outputs after ingestion
919        dataset = executor.dataset
920        dataset_info = executor.dataset_info
921
922    else:
923        dataset, dataset_info = load_dataset(SELECTED_DATASET)
924
925        executor = WorkflowExecutor(
926            SELECTED_WORKFLOW,
927            SELECTED_DATASET["name"],
928            dataset,
929            dataset_info,
930        )
931        executor.execute()
932
933
934    if dataset is not None:
935        dataset.reload()
936        dataset.save()
937        arrange_fields_in_groups(dataset)
938        logging.info(f"Launching Voxel51 session for dataset {dataset_info['name']}.")
939
940        # Dataset stats
941        logging.debug(dataset)
942        logging.debug(dataset.stats(include_media=True))
943
944        # V51 UI launch
945        session = fo.launch_app(
946            dataset, address=V51_ADDRESS, port=V51_PORT, remote=V51_REMOTE
947        )
948    else:
949        logging.info("Skipping Voxel51 session.")
950
951
952
953    time_stop = time.time()
954    logging.info(f"Elapsed time: {time_stop - time_start:.2f} seconds")

Executes the data processing workflow, loads dataset, and launches Voxel51 visualization interface.