main
1import datetime 2import logging 3import os 4import signal 5import sys 6import time 7import warnings 8from typing import Dict, List 9 10import torch 11 12IGNORE_FUTURE_WARNINGS = True 13if IGNORE_FUTURE_WARNINGS: 14 warnings.simplefilter("ignore", category=FutureWarning) 15 16import gc 17 18import fiftyone as fo 19import torch.multiprocessing as mp 20from tqdm import tqdm 21 22from config.config import ( 23 SELECTED_DATASET, 24 SELECTED_WORKFLOW, 25 V51_ADDRESS, 26 V51_PORT, 27 V51_REMOTE, 28 WORKFLOWS, 29) 30from utils.anomaly_detection_data_preparation import AnomalyDetectionDataPreparation 31from utils.data_loader import FiftyOneTorchDatasetCOCO, TorchToHFDatasetCOCO 32from utils.dataset_loader import load_dataset 33from utils.logging import configure_logging 34from utils.mp_distribution import ZeroShotDistributer 35from utils.sidebar_groups import arrange_fields_in_groups 36from utils.wandb_helper import wandb_close, wandb_init 37from workflows.anomaly_detection import Anodec 38from workflows.auto_labeling import ( 39 CustomCoDETRObjectDetection, 40 HuggingFaceObjectDetection, 41 UltralyticsObjectDetection, 42 ZeroShotObjectDetection, 43) 44from workflows.aws_download import AwsDownloader 45from workflows.class_mapping import ClassMapper 46from workflows.embedding_selection import EmbeddingSelection 47from workflows.ensemble_selection import EnsembleSelection 48from workflows.auto_label_mask import AutoLabelMask 49from workflows.class_mapping import ClassMapper 50from workflows.data_ingest import run_data_ingest 51 52wandb_run = None # Init globally to make sure it is available 53 54def signal_handler(sig, frame): 55 """Handle Ctrl+C signal by cleaning up resources and exiting.""" 56 logging.error("You pressed Ctrl+C!") 57 try: 58 wandb_close(exit_code=1) 59 cleanup_memory() 60 except: 61 pass 62 sys.exit(0) 63 64 65def workflow_aws_download(parameters, wandb_activate=True): 66 """Download and process data from AWS S3 bucket.""" 67 dataset = None 68 dataset_name = None 69 wandb_exit_code = 0 70 files_to_be_downloaded = 0 71 try: 72 # Config 73 bucket = parameters["bucket"] 74 prefix = parameters["prefix"] 75 download_path = parameters["download_path"] 76 test_run = parameters["test_run"] 77 78 # Logging 79 now = datetime.datetime.now() 80 datetime_str = now.strftime("%Y-%m-%d_%H-%M-%S") 81 log_dir = f"logs/tensorboard/aws_{datetime_str}" 82 83 dataset_name = f"annarbor_rolling_{datetime_str}" 84 85 # Weights and Biases 86 wandb_run = wandb_init( 87 run_name=dataset_name, 88 project_name="AWS Download", 89 dataset_name=dataset_name, 90 log_dir=log_dir, 91 wandb_activate=wandb_activate, 92 ) 93 94 # Workflow 95 aws_downloader = AwsDownloader( 96 bucket=bucket, 97 prefix=prefix, 98 download_path=download_path, 99 test_run=test_run, 100 ) 101 102 ( 103 sub_folder, 104 files, 105 files_to_be_downloaded, 106 DOWNLOAD_NUMBER_SUCCESS, 107 DOWNLOAD_SIZE_SUCCESS, 108 ) = aws_downloader.download_files(log_dir=log_dir) 109 110 dataset = aws_downloader.decode_data( 111 sub_folder=sub_folder, 112 files=files, 113 log_dir=log_dir, 114 dataset_name=dataset_name, 115 ) 116 117 except Exception as e: 118 logging.error(f"AWS Download and Extraction failed: {e}") 119 wandb_exit_code = 1 120 121 finally: 122 wandb_close(wandb_exit_code) 123 124 return dataset, dataset_name, files_to_be_downloaded 125 126 127def workflow_anomaly_detection( 128 dataset_normal, 129 dataset_ano_dec, 130 dataset_info, 131 eval_metrics, 132 run_config, 133 wandb_activate=True, 134): 135 """Run anomaly detection workflow using specified models and configurations.""" 136 try: 137 # Weights and Biases 138 wandb_exit_code = 0 139 wandb_run, log_dir = wandb_init( 140 run_name=run_config["model_name"], 141 project_name="Selection by Anomaly Detection", 142 dataset_name=dataset_info["name"], 143 config=run_config, 144 wandb_activate=wandb_activate, 145 ) 146 147 # Workflow 148 149 SUPPORTED_MODES = ["train", "inference"] 150 # Check if all selected modes are supported 151 for mode in run_config["mode"]: 152 if mode not in SUPPORTED_MODES: 153 logging.error(f"Selected mode {mode} is not supported.") 154 if SUPPORTED_MODES[0] in run_config["mode"]: 155 ano_dec = Anodec( 156 dataset=dataset_ano_dec, 157 eval_metrics=eval_metrics, 158 dataset_info=dataset_info, 159 config=run_config, 160 tensorboard_output=log_dir, 161 ) 162 ano_dec.train_and_export_model() 163 ano_dec.run_inference(mode=SUPPORTED_MODES[0]) 164 ano_dec.eval_v51() 165 if SUPPORTED_MODES[1] in run_config["mode"]: 166 ano_dec = Anodec( 167 dataset=dataset_normal, 168 eval_metrics=eval_metrics, 169 dataset_info=dataset_info, 170 config=run_config, 171 tensorboard_output=log_dir, 172 ) 173 ano_dec.run_inference(mode=SUPPORTED_MODES[1]) 174 175 except Exception as e: 176 logging.error( 177 f"Error in Anomaly Detection for model {run_config['model_name']}: {e}" 178 ) 179 wandb_exit_code = 1 180 finally: 181 wandb_close(exit_code=wandb_exit_code) 182 183 return True 184 185 186def workflow_embedding_selection( 187 dataset, dataset_info, MODEL_NAME, config, wandb_activate=True 188): 189 """Compute embeddings and find representative and rare images for dataset selection.""" 190 try: 191 wandb_exit_code = 0 192 wandb_run, log_dir = wandb_init( 193 run_name=MODEL_NAME, 194 project_name="Selection by Embedding", 195 dataset_name=dataset_info["name"], 196 config=config, 197 wandb_activate=wandb_activate, 198 ) 199 embedding_selector = EmbeddingSelection( 200 dataset, dataset_info, MODEL_NAME, log_dir 201 ) 202 203 if embedding_selector.model_already_used == False: 204 205 embedding_selector.compute_embeddings(config["mode"]) 206 embedding_selector.compute_similarity() 207 208 # Find representative and unique samples as center points for further selections 209 thresholds = config["parameters"] 210 embedding_selector.compute_representativeness( 211 thresholds["compute_representativeness"] 212 ) 213 embedding_selector.compute_unique_images_greedy( 214 thresholds["compute_unique_images_greedy"] 215 ) 216 embedding_selector.compute_unique_images_deterministic( 217 thresholds["compute_unique_images_deterministic"] 218 ) 219 220 # Select samples similar to the center points to enlarge the dataset 221 embedding_selector.compute_similar_images( 222 thresholds["compute_similar_images"], thresholds["neighbour_count"] 223 ) 224 else: 225 logging.warning( 226 f"Skipping model {embedding_selector.model_name_key}. It was already used for sample selection." 227 ) 228 229 except Exception as e: 230 logging.error(f"An error occurred with model {MODEL_NAME}: {e}") 231 wandb_exit_code = 1 232 finally: 233 wandb_close(wandb_exit_code) 234 235 return True 236 237 238def workflow_auto_labeling_ultralytics(dataset, run_config, wandb_activate=True): 239 """Auto-labeling workflow using Ultralytics models with optional training and inference.""" 240 try: 241 wandb_exit_code = 0 242 wandb_run = wandb_init( 243 run_name=run_config["model_name"], 244 project_name="Auto Labeling Ultralytics", 245 dataset_name=run_config["v51_dataset_name"], 246 config=run_config, 247 wandb_activate=wandb_activate, 248 ) 249 250 detector = UltralyticsObjectDetection(dataset=dataset, config=run_config) 251 252 # Check if all selected modes are supported 253 SUPPORTED_MODES = ["train", "inference"] 254 for mode in run_config["mode"]: 255 if mode not in SUPPORTED_MODES: 256 logging.error(f"Selected mode {mode} is not supported.") 257 258 if SUPPORTED_MODES[0] in run_config["mode"]: 259 logging.info(f"Training model {run_config['model_name']}") 260 detector.train() 261 if SUPPORTED_MODES[1] in run_config["mode"]: 262 logging.info(f"Running inference for model {run_config['model_name']}") 263 detector.inference() 264 265 except Exception as e: 266 logging.error(f"An error occurred with model {run_config['model_name']}: {e}") 267 wandb_exit_code = 1 268 269 finally: 270 wandb_close(wandb_exit_code) 271 272 return True 273 274 275def workflow_auto_labeling_hf(dataset, hf_dataset, run_config, wandb_activate=True): 276 """Auto-labeling using Hugging Face models on a dataset, including training and/or inference based on the provided configuration.""" 277 try: 278 wandb_exit_code = 0 279 wandb_run = wandb_init( 280 run_name=run_config["model_name"], 281 project_name="Auto Labeling Hugging Face", 282 dataset_name=run_config["v51_dataset_name"], 283 config=run_config, 284 wandb_activate=wandb_activate, 285 ) 286 287 detector = HuggingFaceObjectDetection( 288 dataset=dataset, 289 config=run_config, 290 ) 291 SUPPORTED_MODES = ["train", "inference"] 292 293 # Check if all selected modes are supported 294 for mode in run_config["mode"]: 295 if mode not in SUPPORTED_MODES: 296 logging.error(f"Selected mode {mode} is not supported.") 297 if SUPPORTED_MODES[0] in run_config["mode"]: 298 logging.info(f"Training model {run_config['model_name']}") 299 detector.train(hf_dataset) 300 if SUPPORTED_MODES[1] in run_config["mode"]: 301 logging.info(f"Running inference for model {run_config['model_name']}") 302 detector.inference(inference_settings=run_config["inference_settings"]) 303 304 except Exception as e: 305 logging.error(f"An error occurred with model {run_config['model_name']}: {e}") 306 wandb_exit_code = 1 307 308 finally: 309 wandb_close(wandb_exit_code) 310 311 return True 312 313 314def workflow_auto_labeling_custom_codetr( 315 dataset, dataset_info, run_config, wandb_activate=True 316): 317 """Auto labeling workflow using Co-DETR model supporting training and inference modes.""" 318 319 try: 320 wandb_exit_code = 0 321 wandb_run = wandb_init( 322 run_name=run_config["config"], 323 project_name="Co-DETR Auto Labeling", 324 dataset_name=dataset_info["name"], 325 config=run_config, 326 wandb_activate=wandb_activate, 327 ) 328 329 mode = run_config["mode"] 330 331 detector = CustomCoDETRObjectDetection(dataset, dataset_info, run_config) 332 detector.convert_data() 333 if "train" in mode: 334 detector.update_config_file( 335 dataset_name=dataset_info["name"], 336 config_file=run_config["config"], 337 max_epochs=run_config["epochs"], 338 ) 339 detector.train( 340 run_config["config"], run_config["n_gpus"], run_config["container_tool"] 341 ) 342 if "inference" in mode: 343 detector.run_inference( 344 dataset, 345 run_config["config"], 346 run_config["n_gpus"], 347 run_config["container_tool"], 348 run_config["inference_settings"], 349 ) 350 except Exception as e: 351 logging.error(f"Error during CoDETR training: {e}") 352 wandb_exit_code = 1 353 finally: 354 wandb_close(wandb_exit_code) 355 356 return True 357 358 359def workflow_zero_shot_object_detection(dataset, dataset_info, config): 360 """Run zero-shot object detection on a dataset using models from Huggingface, supporting both single and multi-GPU inference.""" 361 # Set multiprocessing mode for CUDA multiprocessing 362 try: 363 mp.set_start_method("spawn", force=True) 364 logging.debug("Successfully set multiprocessing start method to 'spawn'") 365 except RuntimeError as e: 366 # This is expected if the start method was already set 367 logging.debug(f"Multiprocessing start method was already set: {e}") 368 except Exception as e: 369 # Handle any other unexpected errors 370 logging.error(f"Failed to set multiprocessing start method: {e}") 371 372 # Zero-shot object detector models from Huggingface 373 # Optimized for parallel multi-GPU inference, also supports single GPU 374 dataset_torch = FiftyOneTorchDatasetCOCO(dataset) 375 detector = ZeroShotObjectDetection( 376 dataset_torch=dataset_torch, dataset_info=dataset_info, config=config 377 ) 378 379 # Check if model detections are already stored in V51 dataset or on disk 380 models_splits_dict = detector.exclude_stored_predictions( 381 dataset_v51=dataset, config=config 382 ) 383 if len(models_splits_dict) > 0: 384 config["hf_models_zeroshot_objectdetection"] = models_splits_dict 385 distributor = ZeroShotDistributer( 386 config=config, 387 n_samples=len(dataset_torch), 388 dataset_info=dataset_info, 389 detector=detector, 390 ) 391 distributor.distribute_and_run() 392 else: 393 logging.info( 394 "All zero shot models already have predictions stored in the dataset." 395 ) 396 397 # To make new fields available to follow-up processes 398 dataset.reload() 399 dataset.save() 400 401 return True 402 403 404def workflow_auto_label_mask(dataset, dataset_info, config): 405 try: 406 depth_config = config["depth_estimation"] 407 seg_config = config["semantic_segmentation"] 408 409 for architecture_name, architecture_info in depth_config.items(): 410 auto_labeler = AutoLabelMask( 411 dataset=dataset, 412 dataset_info=dataset_info, 413 model_name=architecture_name, 414 task_type="depth_estimation", 415 model_config=architecture_info, 416 ) 417 auto_labeler.run_inference() 418 419 for architecture_name, architecture_info in seg_config.items(): 420 auto_labeler = AutoLabelMask( 421 dataset=dataset, 422 dataset_info=dataset_info, 423 model_name=architecture_name, 424 task_type="semantic_segmentation", 425 model_config=architecture_info, 426 ) 427 auto_labeler.run_inference() 428 429 except Exception as e: 430 logging.error(f"Auto-labeling mask workflow failed: {e}") 431 raise 432 433 434def workflow_ensemble_selection(dataset, dataset_info, run_config, wandb_activate=True): 435 """Runs ensemble selection workflow on given dataset using provided configuration.""" 436 try: 437 wandb_exit_code = 0 438 439 wandb_run = wandb_init( 440 run_name="Selection by Ensemble", 441 project_name="Ensemble Selection", 442 dataset_name=dataset_info["name"], 443 config=run_config, 444 wandb_activate=wandb_activate, 445 ) 446 ensemble_selecter = EnsembleSelection(dataset, run_config) 447 ensemble_selecter.ensemble_selection() 448 except Exception as e: 449 logging.error(f"An error occured during Ensemble Selection: {e}") 450 wandb_exit_code = 1 451 452 finally: 453 wandb_close(wandb_exit_code) 454 455 return True 456 457 458def workflow_class_mapping( 459 dataset, 460 dataset_info, 461 run_config, 462 wandb_activate=True, 463 test_dataset_source=None, 464 test_dataset_target=None, 465): 466 """Runs class mapping workflow to align labels between the source dataset and target dataset.""" 467 try: 468 wandb_exit_code = 0 469 # Initialize a wandb run for class mapping 470 wandb_run = wandb_init( 471 run_name="class_mapping", 472 project_name="Class Mapping", 473 dataset_name=dataset_info["name"], 474 config=run_config, 475 wandb_activate=wandb_activate, 476 ) 477 class_mapping_models = run_config["hf_models_zeroshot_classification"] 478 479 for model_name in ( 480 pbar := tqdm(class_mapping_models, desc="Processing Class Mapping") 481 ): 482 pbar.set_description(f"Zero Shot Classification model {model_name}") 483 mapper = ClassMapper(dataset, model_name, run_config) 484 try: 485 stats = mapper.run_mapping(test_dataset_source, test_dataset_target) 486 # Display statistics only if mapping was successful 487 source_class_counts = stats["total_processed"] 488 logging.info("\nClassification Results for Source Dataset:") 489 for source_class, count in stats["source_class_counts"].items(): 490 percentage = ( 491 (count / source_class_counts) * 100 492 if source_class_counts > 0 493 else 0 494 ) 495 logging.info( 496 f"{source_class}: {count} samples processed ({percentage:.1f}%)" 497 ) 498 499 # Display statistics for tags added to Target Dataset 500 logging.info("\nTag Addition Results (Target Dataset Tags):") 501 logging.info(f"Total new tags added: {stats['changes_made']}") 502 for target_class, tag_count in stats["tags_added_per_category"].items(): 503 logging.info(f"{target_class} tags added: {tag_count}") 504 505 except Exception as e: 506 logging.error(f"Error during mapping with model {model_name}: {e}") 507 508 except Exception as e: 509 logging.error(f"Error in class_mapping workflow: {e}") 510 wandb_exit_code = 1 511 finally: 512 wandb_close(wandb_exit_code) 513 514 return True 515 516 517def cleanup_memory(do_extensive_cleanup=False): 518 """Clean up memory after workflow execution. 'do_extensive_cleanup' recommended for multiple training sessions in a row.""" 519 logging.info("Starting memory cleanup") 520 # Clear CUDA cache 521 if torch.cuda.is_available(): 522 torch.cuda.empty_cache() 523 524 # Force garbage collection 525 gc.collect() 526 527 if do_extensive_cleanup: 528 529 # Clear any leftover tensors 530 n_deleted_torch_objects = 0 531 for obj in tqdm( 532 gc.get_objects(), desc="Deleting objects from Python Garbage Collector" 533 ): 534 try: 535 if torch.is_tensor(obj): 536 del obj 537 n_deleted_torch_objects += 1 538 except: 539 pass 540 541 logging.info(f"Deleted {n_deleted_torch_objects} torch objects") 542 543 # Final garbage collection 544 gc.collect() 545 546 547class WorkflowExecutor: 548 """Orchestrates the execution of multiple data processing workflows in sequence.""" 549 550 def __init__( 551 self, 552 workflows: List[str], 553 selected_dataset: str, 554 dataset: fo.Dataset, 555 dataset_info: Dict, 556 ): 557 """Initializes with specified workflows, dataset selection, and dataset metadata.""" 558 self.workflows = workflows 559 self.selected_dataset = selected_dataset 560 self.dataset = dataset 561 self.dataset_info = dataset_info 562 563 def execute(self) -> bool: 564 """Execute all configured workflows in sequence and handle errors.""" 565 if len(self.workflows) == 0: 566 logging.error("No workflows selected.") 567 return False 568 569 logging.info(f"Selected workflows: {self.workflows}") 570 for workflow in self.workflows: 571 logging.info( 572 f"Running workflow {workflow} for dataset {self.selected_dataset}" 573 ) 574 try: 575 if workflow == "aws_download": 576 parameter_group = "mcity" 577 parameters = WORKFLOWS["aws_download"].get(parameter_group, None) 578 if parameter_group == "mcity": 579 dataset, dataset_name, _ = workflow_aws_download(parameters) 580 else: 581 logging.error( 582 f"The parameter group {parameter_group} is not supported. As AWS are highly specific, please provide a separate set of parameters and a workflow." 583 ) 584 585 # Select downloaded dataset for further workflows if configured 586 if dataset is not None: 587 if parameters["selected_dataset_overwrite"] == True: 588 589 dataset_info = { 590 "name": dataset_name, 591 "v51_type": "FiftyOneDataset", 592 "splits": [], 593 } 594 595 self.dataset = dataset 596 self.dataset_info = dataset_info 597 logging.warning( 598 f"Overwritting selected dataset {self.selected_dataset} with {dataset_name}" 599 ) 600 self.selected_dataset = dataset_name 601 602 elif workflow == "embedding_selection": 603 embedding_models = WORKFLOWS["embedding_selection"][ 604 "embedding_models" 605 ] 606 607 for MODEL_NAME in ( 608 pbar := tqdm(embedding_models, desc="Selection by Embeddings") 609 ): 610 611 # Status 612 pbar.set_description( 613 f"Selection by embeddings with model {MODEL_NAME}." 614 ) 615 616 # Config 617 mode = WORKFLOWS["embedding_selection"]["mode"] 618 parameters = WORKFLOWS["embedding_selection"]["parameters"] 619 config = {"mode": mode, "parameters": parameters} 620 621 # Workflow 622 workflow_embedding_selection( 623 self.dataset, 624 self.dataset_info, 625 MODEL_NAME, 626 config, 627 ) 628 629 elif workflow == "anomaly_detection": 630 631 # Config 632 ano_dec_config = WORKFLOWS["anomaly_detection"] 633 anomalib_image_models = ano_dec_config["anomalib_image_models"] 634 eval_metrics = ano_dec_config["anomalib_eval_metrics"] 635 636 dataset_ano_dec = None 637 data_root = None 638 if "train" in ano_dec_config["mode"]: 639 try: 640 data_preparer = AnomalyDetectionDataPreparation( 641 self.dataset, self.selected_dataset 642 ) 643 dataset_ano_dec = data_preparer.dataset_ano_dec 644 data_root = data_preparer.export_root 645 except Exception as e: 646 logging.error( 647 f"Error during data preparation for Anomaly Detection: {e}" 648 ) 649 650 for MODEL_NAME in ( 651 pbar := tqdm(anomalib_image_models, desc="Anomalib") 652 ): 653 # Status 654 pbar.set_description(f"Anomalib model {MODEL_NAME}") 655 656 # Config 657 run_config = { 658 "model_name": MODEL_NAME, 659 "image_size": anomalib_image_models[MODEL_NAME].get( 660 "image_size", None 661 ), 662 "batch_size": anomalib_image_models[MODEL_NAME].get( 663 "batch_size", 1 664 ), 665 "epochs": ano_dec_config["epochs"], 666 "early_stop_patience": ano_dec_config[ 667 "early_stop_patience" 668 ], 669 "data_root": data_root, 670 "mode": ano_dec_config["mode"], 671 } 672 673 # Workflow 674 workflow_anomaly_detection( 675 self.dataset, 676 dataset_ano_dec, 677 self.dataset_info, 678 eval_metrics, 679 run_config, 680 ) 681 682 elif workflow == "auto_labeling": 683 684 # Config 685 SUPPORTED_MODEL_SOURCES = [ 686 "hf_models_objectdetection", 687 "ultralytics", 688 "custom_codetr", 689 ] 690 691 # Common parameters between models 692 config_autolabel = WORKFLOWS["auto_labeling"] 693 mode = config_autolabel["mode"] 694 epochs = config_autolabel["epochs"] 695 selected_model_source = config_autolabel["model_source"] 696 697 # Check if all selected modes are supported 698 for model_source in selected_model_source: 699 if model_source not in SUPPORTED_MODEL_SOURCES: 700 logging.error( 701 f"Selected model source {model_source} is not supported." 702 ) 703 704 if SUPPORTED_MODEL_SOURCES[0] in selected_model_source: 705 # Hugging Face Models 706 # Single GPU mode (https://github.com/huggingface/transformers/issues/28740) 707 os.environ["CUDA_VISIBLE_DEVICES"] = "0" 708 hf_models = config_autolabel["hf_models_objectdetection"] 709 710 # Dataset Conversion 711 try: 712 logging.info("Converting dataset into Hugging Face format.") 713 pytorch_dataset = FiftyOneTorchDatasetCOCO(self.dataset) 714 pt_to_hf_converter = TorchToHFDatasetCOCO(pytorch_dataset) 715 hf_dataset = pt_to_hf_converter.convert() 716 except Exception as e: 717 logging.error(f"Error during dataset conversion: {e}") 718 719 for MODEL_NAME in ( 720 pbar := tqdm(hf_models, desc="Auto Labeling Models") 721 ): 722 # Status Update 723 pbar.set_description( 724 f"Processing Hugging Face model {MODEL_NAME}" 725 ) 726 727 # Config 728 config_model = config_autolabel[ 729 "hf_models_objectdetection" 730 ][MODEL_NAME] 731 732 run_config = { 733 "mode": mode, 734 "model_name": MODEL_NAME, 735 "v51_dataset_name": self.selected_dataset, 736 "epochs": epochs, 737 "early_stop_threshold": config_autolabel[ 738 "early_stop_threshold" 739 ], 740 "early_stop_patience": config_autolabel[ 741 "early_stop_patience" 742 ], 743 "learning_rate": config_autolabel["learning_rate"], 744 "weight_decay": config_autolabel["weight_decay"], 745 "max_grad_norm": config_autolabel["max_grad_norm"], 746 "batch_size": config_model.get("batch_size", 1), 747 "image_size": config_model.get("image_size", None), 748 "n_worker_dataloader": config_autolabel[ 749 "n_worker_dataloader" 750 ], 751 "inference_settings": config_autolabel[ 752 "inference_settings" 753 ], 754 } 755 756 # Workflow 757 workflow_auto_labeling_hf( 758 self.dataset, 759 hf_dataset, 760 run_config, 761 ) 762 763 if SUPPORTED_MODEL_SOURCES[1] in selected_model_source: 764 # Ultralytics Models 765 config_ultralytics = config_autolabel["ultralytics"] 766 models_ultralytics = config_ultralytics["models"] 767 export_dataset_root = config_ultralytics["export_dataset_root"] 768 769 # Export data into necessary format 770 if "train" in mode: 771 try: 772 UltralyticsObjectDetection.export_data( 773 self.dataset, 774 self.dataset_info, 775 export_dataset_root, 776 ) 777 except Exception as e: 778 logging.error( 779 f"Error during Ultralytics dataset export: {e}" 780 ) 781 782 for model_name in ( 783 pbar := tqdm( 784 models_ultralytics, desc="Ultralytics training" 785 ) 786 ): 787 pbar.set_description(f"Ultralytics model {model_name}") 788 run_config = { 789 "mode": mode, 790 "model_name": model_name, 791 "v51_dataset_name": self.dataset_info["name"], 792 "epochs": epochs, 793 "patience": config_autolabel["early_stop_patience"], 794 "batch_size": models_ultralytics[model_name][ 795 "batch_size" 796 ], 797 "img_size": models_ultralytics[model_name]["img_size"], 798 "export_dataset_root": export_dataset_root, 799 "inference_settings": config_autolabel[ 800 "inference_settings" 801 ], 802 "multi_scale": config_ultralytics["multi_scale"], 803 "cos_lr": config_ultralytics["cos_lr"], 804 } 805 806 workflow_auto_labeling_ultralytics(self.dataset, run_config) 807 808 if SUPPORTED_MODEL_SOURCES[2] in selected_model_source: 809 # Custom Co-DETR 810 config_codetr = config_autolabel["custom_codetr"] 811 run_config = { 812 "export_dataset_root": config_codetr["export_dataset_root"], 813 "container_tool": config_codetr["container_tool"], 814 "n_gpus": config_codetr["n_gpus"], 815 "mode": config_autolabel["mode"], 816 "epochs": config_autolabel["epochs"], 817 "inference_settings": config_autolabel[ 818 "inference_settings" 819 ], 820 "config": None, 821 } 822 codetr_configs = config_codetr["configs"] 823 824 for config in ( 825 pbar := tqdm( 826 codetr_configs, desc="Processing Co-DETR configurations" 827 ) 828 ): 829 pbar.set_description(f"Co-DETR model {config}") 830 run_config["config"] = config 831 workflow_auto_labeling_custom_codetr( 832 self.dataset, self.dataset_info, run_config 833 ) 834 835 elif workflow == "auto_labeling_zero_shot": 836 config = WORKFLOWS["auto_labeling_zero_shot"] 837 workflow_zero_shot_object_detection( 838 self.dataset, self.dataset_info, config 839 ) 840 841 elif workflow == "ensemble_selection": 842 # Config 843 run_config = WORKFLOWS["ensemble_selection"] 844 845 # Workflow 846 workflow_ensemble_selection( 847 self.dataset, self.dataset_info, run_config 848 ) 849 850 elif workflow == "auto_label_mask": 851 config = WORKFLOWS["auto_label_mask"] 852 workflow_auto_label_mask(self.dataset, self.dataset_info, config) 853 854 elif workflow == "class_mapping": 855 # Config 856 run_config = WORKFLOWS["class_mapping"] 857 858 # Workflow 859 workflow_class_mapping( 860 self.dataset, 861 self.dataset_info, 862 run_config, 863 test_dataset_source=None, 864 test_dataset_target=None, 865 ) 866 elif workflow == "data_ingest": 867 dataset = run_data_ingest() 868 869 dataset_info = { 870 "name": "custom_dataset", 871 "v51_type": "FiftyOneDataset", 872 "splits": ["train", "val", "test"], 873 } 874 875 self.dataset = dataset 876 self.dataset_info = dataset_info 877 self.selected_dataset = "custom_dataset" 878 879 logging.info(f"Data ingestion completed successfully.") 880 881 882 else: 883 logging.error( 884 f"Workflow {workflow} not found. Check available workflows in config.py." 885 ) 886 return False 887 888 cleanup_memory() # Clean after each workflow 889 logging.info(f"Completed workflow {workflow} and cleaned up memory") 890 891 except Exception as e: 892 logging.error(f"Workflow {workflow}: An error occurred: {e}") 893 wandb_close(exit_code=1) 894 cleanup_memory() # Clean up even after failure 895 896 return True 897 898 899def main(): 900 """Executes the data processing workflow, loads dataset, and launches Voxel51 visualization interface.""" 901 time_start = time.time() 902 configure_logging() 903 904 # Signal handler for CTRL + C 905 signal.signal(signal.SIGINT, signal_handler) 906 907 # Execute workflows 908 if "data_ingest" in SELECTED_WORKFLOW: 909 executor = WorkflowExecutor( 910 SELECTED_WORKFLOW, 911 SELECTED_DATASET["name"], 912 dataset=None, 913 dataset_info=None, 914 ) 915 executor.execute() 916 917 # FIX: Pull back outputs after ingestion 918 dataset = executor.dataset 919 dataset_info = executor.dataset_info 920 921 else: 922 dataset, dataset_info = load_dataset(SELECTED_DATASET) 923 924 executor = WorkflowExecutor( 925 SELECTED_WORKFLOW, 926 SELECTED_DATASET["name"], 927 dataset, 928 dataset_info, 929 ) 930 executor.execute() 931 932 933 if dataset is not None: 934 dataset.reload() 935 dataset.save() 936 arrange_fields_in_groups(dataset) 937 logging.info(f"Launching Voxel51 session for dataset {dataset_info['name']}.") 938 939 # Dataset stats 940 logging.debug(dataset) 941 logging.debug(dataset.stats(include_media=True)) 942 943 # V51 UI launch 944 session = fo.launch_app( 945 dataset, address=V51_ADDRESS, port=V51_PORT, remote=V51_REMOTE 946 ) 947 else: 948 logging.info("Skipping Voxel51 session.") 949 950 951 952 time_stop = time.time() 953 logging.info(f"Elapsed time: {time_stop - time_start:.2f} seconds") 954 955 956if __name__ == "__main__": 957 cleanup_memory() 958 main()
55def signal_handler(sig, frame): 56 """Handle Ctrl+C signal by cleaning up resources and exiting.""" 57 logging.error("You pressed Ctrl+C!") 58 try: 59 wandb_close(exit_code=1) 60 cleanup_memory() 61 except: 62 pass 63 sys.exit(0)
Handle Ctrl+C signal by cleaning up resources and exiting.
66def workflow_aws_download(parameters, wandb_activate=True): 67 """Download and process data from AWS S3 bucket.""" 68 dataset = None 69 dataset_name = None 70 wandb_exit_code = 0 71 files_to_be_downloaded = 0 72 try: 73 # Config 74 bucket = parameters["bucket"] 75 prefix = parameters["prefix"] 76 download_path = parameters["download_path"] 77 test_run = parameters["test_run"] 78 79 # Logging 80 now = datetime.datetime.now() 81 datetime_str = now.strftime("%Y-%m-%d_%H-%M-%S") 82 log_dir = f"logs/tensorboard/aws_{datetime_str}" 83 84 dataset_name = f"annarbor_rolling_{datetime_str}" 85 86 # Weights and Biases 87 wandb_run = wandb_init( 88 run_name=dataset_name, 89 project_name="AWS Download", 90 dataset_name=dataset_name, 91 log_dir=log_dir, 92 wandb_activate=wandb_activate, 93 ) 94 95 # Workflow 96 aws_downloader = AwsDownloader( 97 bucket=bucket, 98 prefix=prefix, 99 download_path=download_path, 100 test_run=test_run, 101 ) 102 103 ( 104 sub_folder, 105 files, 106 files_to_be_downloaded, 107 DOWNLOAD_NUMBER_SUCCESS, 108 DOWNLOAD_SIZE_SUCCESS, 109 ) = aws_downloader.download_files(log_dir=log_dir) 110 111 dataset = aws_downloader.decode_data( 112 sub_folder=sub_folder, 113 files=files, 114 log_dir=log_dir, 115 dataset_name=dataset_name, 116 ) 117 118 except Exception as e: 119 logging.error(f"AWS Download and Extraction failed: {e}") 120 wandb_exit_code = 1 121 122 finally: 123 wandb_close(wandb_exit_code) 124 125 return dataset, dataset_name, files_to_be_downloaded
Download and process data from AWS S3 bucket.
128def workflow_anomaly_detection( 129 dataset_normal, 130 dataset_ano_dec, 131 dataset_info, 132 eval_metrics, 133 run_config, 134 wandb_activate=True, 135): 136 """Run anomaly detection workflow using specified models and configurations.""" 137 try: 138 # Weights and Biases 139 wandb_exit_code = 0 140 wandb_run, log_dir = wandb_init( 141 run_name=run_config["model_name"], 142 project_name="Selection by Anomaly Detection", 143 dataset_name=dataset_info["name"], 144 config=run_config, 145 wandb_activate=wandb_activate, 146 ) 147 148 # Workflow 149 150 SUPPORTED_MODES = ["train", "inference"] 151 # Check if all selected modes are supported 152 for mode in run_config["mode"]: 153 if mode not in SUPPORTED_MODES: 154 logging.error(f"Selected mode {mode} is not supported.") 155 if SUPPORTED_MODES[0] in run_config["mode"]: 156 ano_dec = Anodec( 157 dataset=dataset_ano_dec, 158 eval_metrics=eval_metrics, 159 dataset_info=dataset_info, 160 config=run_config, 161 tensorboard_output=log_dir, 162 ) 163 ano_dec.train_and_export_model() 164 ano_dec.run_inference(mode=SUPPORTED_MODES[0]) 165 ano_dec.eval_v51() 166 if SUPPORTED_MODES[1] in run_config["mode"]: 167 ano_dec = Anodec( 168 dataset=dataset_normal, 169 eval_metrics=eval_metrics, 170 dataset_info=dataset_info, 171 config=run_config, 172 tensorboard_output=log_dir, 173 ) 174 ano_dec.run_inference(mode=SUPPORTED_MODES[1]) 175 176 except Exception as e: 177 logging.error( 178 f"Error in Anomaly Detection for model {run_config['model_name']}: {e}" 179 ) 180 wandb_exit_code = 1 181 finally: 182 wandb_close(exit_code=wandb_exit_code) 183 184 return True
Run anomaly detection workflow using specified models and configurations.
187def workflow_embedding_selection( 188 dataset, dataset_info, MODEL_NAME, config, wandb_activate=True 189): 190 """Compute embeddings and find representative and rare images for dataset selection.""" 191 try: 192 wandb_exit_code = 0 193 wandb_run, log_dir = wandb_init( 194 run_name=MODEL_NAME, 195 project_name="Selection by Embedding", 196 dataset_name=dataset_info["name"], 197 config=config, 198 wandb_activate=wandb_activate, 199 ) 200 embedding_selector = EmbeddingSelection( 201 dataset, dataset_info, MODEL_NAME, log_dir 202 ) 203 204 if embedding_selector.model_already_used == False: 205 206 embedding_selector.compute_embeddings(config["mode"]) 207 embedding_selector.compute_similarity() 208 209 # Find representative and unique samples as center points for further selections 210 thresholds = config["parameters"] 211 embedding_selector.compute_representativeness( 212 thresholds["compute_representativeness"] 213 ) 214 embedding_selector.compute_unique_images_greedy( 215 thresholds["compute_unique_images_greedy"] 216 ) 217 embedding_selector.compute_unique_images_deterministic( 218 thresholds["compute_unique_images_deterministic"] 219 ) 220 221 # Select samples similar to the center points to enlarge the dataset 222 embedding_selector.compute_similar_images( 223 thresholds["compute_similar_images"], thresholds["neighbour_count"] 224 ) 225 else: 226 logging.warning( 227 f"Skipping model {embedding_selector.model_name_key}. It was already used for sample selection." 228 ) 229 230 except Exception as e: 231 logging.error(f"An error occurred with model {MODEL_NAME}: {e}") 232 wandb_exit_code = 1 233 finally: 234 wandb_close(wandb_exit_code) 235 236 return True
Compute embeddings and find representative and rare images for dataset selection.
239def workflow_auto_labeling_ultralytics(dataset, run_config, wandb_activate=True): 240 """Auto-labeling workflow using Ultralytics models with optional training and inference.""" 241 try: 242 wandb_exit_code = 0 243 wandb_run = wandb_init( 244 run_name=run_config["model_name"], 245 project_name="Auto Labeling Ultralytics", 246 dataset_name=run_config["v51_dataset_name"], 247 config=run_config, 248 wandb_activate=wandb_activate, 249 ) 250 251 detector = UltralyticsObjectDetection(dataset=dataset, config=run_config) 252 253 # Check if all selected modes are supported 254 SUPPORTED_MODES = ["train", "inference"] 255 for mode in run_config["mode"]: 256 if mode not in SUPPORTED_MODES: 257 logging.error(f"Selected mode {mode} is not supported.") 258 259 if SUPPORTED_MODES[0] in run_config["mode"]: 260 logging.info(f"Training model {run_config['model_name']}") 261 detector.train() 262 if SUPPORTED_MODES[1] in run_config["mode"]: 263 logging.info(f"Running inference for model {run_config['model_name']}") 264 detector.inference() 265 266 except Exception as e: 267 logging.error(f"An error occurred with model {run_config['model_name']}: {e}") 268 wandb_exit_code = 1 269 270 finally: 271 wandb_close(wandb_exit_code) 272 273 return True
Auto-labeling workflow using Ultralytics models with optional training and inference.
276def workflow_auto_labeling_hf(dataset, hf_dataset, run_config, wandb_activate=True): 277 """Auto-labeling using Hugging Face models on a dataset, including training and/or inference based on the provided configuration.""" 278 try: 279 wandb_exit_code = 0 280 wandb_run = wandb_init( 281 run_name=run_config["model_name"], 282 project_name="Auto Labeling Hugging Face", 283 dataset_name=run_config["v51_dataset_name"], 284 config=run_config, 285 wandb_activate=wandb_activate, 286 ) 287 288 detector = HuggingFaceObjectDetection( 289 dataset=dataset, 290 config=run_config, 291 ) 292 SUPPORTED_MODES = ["train", "inference"] 293 294 # Check if all selected modes are supported 295 for mode in run_config["mode"]: 296 if mode not in SUPPORTED_MODES: 297 logging.error(f"Selected mode {mode} is not supported.") 298 if SUPPORTED_MODES[0] in run_config["mode"]: 299 logging.info(f"Training model {run_config['model_name']}") 300 detector.train(hf_dataset) 301 if SUPPORTED_MODES[1] in run_config["mode"]: 302 logging.info(f"Running inference for model {run_config['model_name']}") 303 detector.inference(inference_settings=run_config["inference_settings"]) 304 305 except Exception as e: 306 logging.error(f"An error occurred with model {run_config['model_name']}: {e}") 307 wandb_exit_code = 1 308 309 finally: 310 wandb_close(wandb_exit_code) 311 312 return True
Auto-labeling using Hugging Face models on a dataset, including training and/or inference based on the provided configuration.
315def workflow_auto_labeling_custom_codetr( 316 dataset, dataset_info, run_config, wandb_activate=True 317): 318 """Auto labeling workflow using Co-DETR model supporting training and inference modes.""" 319 320 try: 321 wandb_exit_code = 0 322 wandb_run = wandb_init( 323 run_name=run_config["config"], 324 project_name="Co-DETR Auto Labeling", 325 dataset_name=dataset_info["name"], 326 config=run_config, 327 wandb_activate=wandb_activate, 328 ) 329 330 mode = run_config["mode"] 331 332 detector = CustomCoDETRObjectDetection(dataset, dataset_info, run_config) 333 detector.convert_data() 334 if "train" in mode: 335 detector.update_config_file( 336 dataset_name=dataset_info["name"], 337 config_file=run_config["config"], 338 max_epochs=run_config["epochs"], 339 ) 340 detector.train( 341 run_config["config"], run_config["n_gpus"], run_config["container_tool"] 342 ) 343 if "inference" in mode: 344 detector.run_inference( 345 dataset, 346 run_config["config"], 347 run_config["n_gpus"], 348 run_config["container_tool"], 349 run_config["inference_settings"], 350 ) 351 except Exception as e: 352 logging.error(f"Error during CoDETR training: {e}") 353 wandb_exit_code = 1 354 finally: 355 wandb_close(wandb_exit_code) 356 357 return True
Auto labeling workflow using Co-DETR model supporting training and inference modes.
360def workflow_zero_shot_object_detection(dataset, dataset_info, config): 361 """Run zero-shot object detection on a dataset using models from Huggingface, supporting both single and multi-GPU inference.""" 362 # Set multiprocessing mode for CUDA multiprocessing 363 try: 364 mp.set_start_method("spawn", force=True) 365 logging.debug("Successfully set multiprocessing start method to 'spawn'") 366 except RuntimeError as e: 367 # This is expected if the start method was already set 368 logging.debug(f"Multiprocessing start method was already set: {e}") 369 except Exception as e: 370 # Handle any other unexpected errors 371 logging.error(f"Failed to set multiprocessing start method: {e}") 372 373 # Zero-shot object detector models from Huggingface 374 # Optimized for parallel multi-GPU inference, also supports single GPU 375 dataset_torch = FiftyOneTorchDatasetCOCO(dataset) 376 detector = ZeroShotObjectDetection( 377 dataset_torch=dataset_torch, dataset_info=dataset_info, config=config 378 ) 379 380 # Check if model detections are already stored in V51 dataset or on disk 381 models_splits_dict = detector.exclude_stored_predictions( 382 dataset_v51=dataset, config=config 383 ) 384 if len(models_splits_dict) > 0: 385 config["hf_models_zeroshot_objectdetection"] = models_splits_dict 386 distributor = ZeroShotDistributer( 387 config=config, 388 n_samples=len(dataset_torch), 389 dataset_info=dataset_info, 390 detector=detector, 391 ) 392 distributor.distribute_and_run() 393 else: 394 logging.info( 395 "All zero shot models already have predictions stored in the dataset." 396 ) 397 398 # To make new fields available to follow-up processes 399 dataset.reload() 400 dataset.save() 401 402 return True
Run zero-shot object detection on a dataset using models from Huggingface, supporting both single and multi-GPU inference.
405def workflow_auto_label_mask(dataset, dataset_info, config): 406 try: 407 depth_config = config["depth_estimation"] 408 seg_config = config["semantic_segmentation"] 409 410 for architecture_name, architecture_info in depth_config.items(): 411 auto_labeler = AutoLabelMask( 412 dataset=dataset, 413 dataset_info=dataset_info, 414 model_name=architecture_name, 415 task_type="depth_estimation", 416 model_config=architecture_info, 417 ) 418 auto_labeler.run_inference() 419 420 for architecture_name, architecture_info in seg_config.items(): 421 auto_labeler = AutoLabelMask( 422 dataset=dataset, 423 dataset_info=dataset_info, 424 model_name=architecture_name, 425 task_type="semantic_segmentation", 426 model_config=architecture_info, 427 ) 428 auto_labeler.run_inference() 429 430 except Exception as e: 431 logging.error(f"Auto-labeling mask workflow failed: {e}") 432 raise
435def workflow_ensemble_selection(dataset, dataset_info, run_config, wandb_activate=True): 436 """Runs ensemble selection workflow on given dataset using provided configuration.""" 437 try: 438 wandb_exit_code = 0 439 440 wandb_run = wandb_init( 441 run_name="Selection by Ensemble", 442 project_name="Ensemble Selection", 443 dataset_name=dataset_info["name"], 444 config=run_config, 445 wandb_activate=wandb_activate, 446 ) 447 ensemble_selecter = EnsembleSelection(dataset, run_config) 448 ensemble_selecter.ensemble_selection() 449 except Exception as e: 450 logging.error(f"An error occured during Ensemble Selection: {e}") 451 wandb_exit_code = 1 452 453 finally: 454 wandb_close(wandb_exit_code) 455 456 return True
Runs ensemble selection workflow on given dataset using provided configuration.
459def workflow_class_mapping( 460 dataset, 461 dataset_info, 462 run_config, 463 wandb_activate=True, 464 test_dataset_source=None, 465 test_dataset_target=None, 466): 467 """Runs class mapping workflow to align labels between the source dataset and target dataset.""" 468 try: 469 wandb_exit_code = 0 470 # Initialize a wandb run for class mapping 471 wandb_run = wandb_init( 472 run_name="class_mapping", 473 project_name="Class Mapping", 474 dataset_name=dataset_info["name"], 475 config=run_config, 476 wandb_activate=wandb_activate, 477 ) 478 class_mapping_models = run_config["hf_models_zeroshot_classification"] 479 480 for model_name in ( 481 pbar := tqdm(class_mapping_models, desc="Processing Class Mapping") 482 ): 483 pbar.set_description(f"Zero Shot Classification model {model_name}") 484 mapper = ClassMapper(dataset, model_name, run_config) 485 try: 486 stats = mapper.run_mapping(test_dataset_source, test_dataset_target) 487 # Display statistics only if mapping was successful 488 source_class_counts = stats["total_processed"] 489 logging.info("\nClassification Results for Source Dataset:") 490 for source_class, count in stats["source_class_counts"].items(): 491 percentage = ( 492 (count / source_class_counts) * 100 493 if source_class_counts > 0 494 else 0 495 ) 496 logging.info( 497 f"{source_class}: {count} samples processed ({percentage:.1f}%)" 498 ) 499 500 # Display statistics for tags added to Target Dataset 501 logging.info("\nTag Addition Results (Target Dataset Tags):") 502 logging.info(f"Total new tags added: {stats['changes_made']}") 503 for target_class, tag_count in stats["tags_added_per_category"].items(): 504 logging.info(f"{target_class} tags added: {tag_count}") 505 506 except Exception as e: 507 logging.error(f"Error during mapping with model {model_name}: {e}") 508 509 except Exception as e: 510 logging.error(f"Error in class_mapping workflow: {e}") 511 wandb_exit_code = 1 512 finally: 513 wandb_close(wandb_exit_code) 514 515 return True
Runs class mapping workflow to align labels between the source dataset and target dataset.
518def cleanup_memory(do_extensive_cleanup=False): 519 """Clean up memory after workflow execution. 'do_extensive_cleanup' recommended for multiple training sessions in a row.""" 520 logging.info("Starting memory cleanup") 521 # Clear CUDA cache 522 if torch.cuda.is_available(): 523 torch.cuda.empty_cache() 524 525 # Force garbage collection 526 gc.collect() 527 528 if do_extensive_cleanup: 529 530 # Clear any leftover tensors 531 n_deleted_torch_objects = 0 532 for obj in tqdm( 533 gc.get_objects(), desc="Deleting objects from Python Garbage Collector" 534 ): 535 try: 536 if torch.is_tensor(obj): 537 del obj 538 n_deleted_torch_objects += 1 539 except: 540 pass 541 542 logging.info(f"Deleted {n_deleted_torch_objects} torch objects") 543 544 # Final garbage collection 545 gc.collect()
Clean up memory after workflow execution. 'do_extensive_cleanup' recommended for multiple training sessions in a row.
548class WorkflowExecutor: 549 """Orchestrates the execution of multiple data processing workflows in sequence.""" 550 551 def __init__( 552 self, 553 workflows: List[str], 554 selected_dataset: str, 555 dataset: fo.Dataset, 556 dataset_info: Dict, 557 ): 558 """Initializes with specified workflows, dataset selection, and dataset metadata.""" 559 self.workflows = workflows 560 self.selected_dataset = selected_dataset 561 self.dataset = dataset 562 self.dataset_info = dataset_info 563 564 def execute(self) -> bool: 565 """Execute all configured workflows in sequence and handle errors.""" 566 if len(self.workflows) == 0: 567 logging.error("No workflows selected.") 568 return False 569 570 logging.info(f"Selected workflows: {self.workflows}") 571 for workflow in self.workflows: 572 logging.info( 573 f"Running workflow {workflow} for dataset {self.selected_dataset}" 574 ) 575 try: 576 if workflow == "aws_download": 577 parameter_group = "mcity" 578 parameters = WORKFLOWS["aws_download"].get(parameter_group, None) 579 if parameter_group == "mcity": 580 dataset, dataset_name, _ = workflow_aws_download(parameters) 581 else: 582 logging.error( 583 f"The parameter group {parameter_group} is not supported. As AWS are highly specific, please provide a separate set of parameters and a workflow." 584 ) 585 586 # Select downloaded dataset for further workflows if configured 587 if dataset is not None: 588 if parameters["selected_dataset_overwrite"] == True: 589 590 dataset_info = { 591 "name": dataset_name, 592 "v51_type": "FiftyOneDataset", 593 "splits": [], 594 } 595 596 self.dataset = dataset 597 self.dataset_info = dataset_info 598 logging.warning( 599 f"Overwritting selected dataset {self.selected_dataset} with {dataset_name}" 600 ) 601 self.selected_dataset = dataset_name 602 603 elif workflow == "embedding_selection": 604 embedding_models = WORKFLOWS["embedding_selection"][ 605 "embedding_models" 606 ] 607 608 for MODEL_NAME in ( 609 pbar := tqdm(embedding_models, desc="Selection by Embeddings") 610 ): 611 612 # Status 613 pbar.set_description( 614 f"Selection by embeddings with model {MODEL_NAME}." 615 ) 616 617 # Config 618 mode = WORKFLOWS["embedding_selection"]["mode"] 619 parameters = WORKFLOWS["embedding_selection"]["parameters"] 620 config = {"mode": mode, "parameters": parameters} 621 622 # Workflow 623 workflow_embedding_selection( 624 self.dataset, 625 self.dataset_info, 626 MODEL_NAME, 627 config, 628 ) 629 630 elif workflow == "anomaly_detection": 631 632 # Config 633 ano_dec_config = WORKFLOWS["anomaly_detection"] 634 anomalib_image_models = ano_dec_config["anomalib_image_models"] 635 eval_metrics = ano_dec_config["anomalib_eval_metrics"] 636 637 dataset_ano_dec = None 638 data_root = None 639 if "train" in ano_dec_config["mode"]: 640 try: 641 data_preparer = AnomalyDetectionDataPreparation( 642 self.dataset, self.selected_dataset 643 ) 644 dataset_ano_dec = data_preparer.dataset_ano_dec 645 data_root = data_preparer.export_root 646 except Exception as e: 647 logging.error( 648 f"Error during data preparation for Anomaly Detection: {e}" 649 ) 650 651 for MODEL_NAME in ( 652 pbar := tqdm(anomalib_image_models, desc="Anomalib") 653 ): 654 # Status 655 pbar.set_description(f"Anomalib model {MODEL_NAME}") 656 657 # Config 658 run_config = { 659 "model_name": MODEL_NAME, 660 "image_size": anomalib_image_models[MODEL_NAME].get( 661 "image_size", None 662 ), 663 "batch_size": anomalib_image_models[MODEL_NAME].get( 664 "batch_size", 1 665 ), 666 "epochs": ano_dec_config["epochs"], 667 "early_stop_patience": ano_dec_config[ 668 "early_stop_patience" 669 ], 670 "data_root": data_root, 671 "mode": ano_dec_config["mode"], 672 } 673 674 # Workflow 675 workflow_anomaly_detection( 676 self.dataset, 677 dataset_ano_dec, 678 self.dataset_info, 679 eval_metrics, 680 run_config, 681 ) 682 683 elif workflow == "auto_labeling": 684 685 # Config 686 SUPPORTED_MODEL_SOURCES = [ 687 "hf_models_objectdetection", 688 "ultralytics", 689 "custom_codetr", 690 ] 691 692 # Common parameters between models 693 config_autolabel = WORKFLOWS["auto_labeling"] 694 mode = config_autolabel["mode"] 695 epochs = config_autolabel["epochs"] 696 selected_model_source = config_autolabel["model_source"] 697 698 # Check if all selected modes are supported 699 for model_source in selected_model_source: 700 if model_source not in SUPPORTED_MODEL_SOURCES: 701 logging.error( 702 f"Selected model source {model_source} is not supported." 703 ) 704 705 if SUPPORTED_MODEL_SOURCES[0] in selected_model_source: 706 # Hugging Face Models 707 # Single GPU mode (https://github.com/huggingface/transformers/issues/28740) 708 os.environ["CUDA_VISIBLE_DEVICES"] = "0" 709 hf_models = config_autolabel["hf_models_objectdetection"] 710 711 # Dataset Conversion 712 try: 713 logging.info("Converting dataset into Hugging Face format.") 714 pytorch_dataset = FiftyOneTorchDatasetCOCO(self.dataset) 715 pt_to_hf_converter = TorchToHFDatasetCOCO(pytorch_dataset) 716 hf_dataset = pt_to_hf_converter.convert() 717 except Exception as e: 718 logging.error(f"Error during dataset conversion: {e}") 719 720 for MODEL_NAME in ( 721 pbar := tqdm(hf_models, desc="Auto Labeling Models") 722 ): 723 # Status Update 724 pbar.set_description( 725 f"Processing Hugging Face model {MODEL_NAME}" 726 ) 727 728 # Config 729 config_model = config_autolabel[ 730 "hf_models_objectdetection" 731 ][MODEL_NAME] 732 733 run_config = { 734 "mode": mode, 735 "model_name": MODEL_NAME, 736 "v51_dataset_name": self.selected_dataset, 737 "epochs": epochs, 738 "early_stop_threshold": config_autolabel[ 739 "early_stop_threshold" 740 ], 741 "early_stop_patience": config_autolabel[ 742 "early_stop_patience" 743 ], 744 "learning_rate": config_autolabel["learning_rate"], 745 "weight_decay": config_autolabel["weight_decay"], 746 "max_grad_norm": config_autolabel["max_grad_norm"], 747 "batch_size": config_model.get("batch_size", 1), 748 "image_size": config_model.get("image_size", None), 749 "n_worker_dataloader": config_autolabel[ 750 "n_worker_dataloader" 751 ], 752 "inference_settings": config_autolabel[ 753 "inference_settings" 754 ], 755 } 756 757 # Workflow 758 workflow_auto_labeling_hf( 759 self.dataset, 760 hf_dataset, 761 run_config, 762 ) 763 764 if SUPPORTED_MODEL_SOURCES[1] in selected_model_source: 765 # Ultralytics Models 766 config_ultralytics = config_autolabel["ultralytics"] 767 models_ultralytics = config_ultralytics["models"] 768 export_dataset_root = config_ultralytics["export_dataset_root"] 769 770 # Export data into necessary format 771 if "train" in mode: 772 try: 773 UltralyticsObjectDetection.export_data( 774 self.dataset, 775 self.dataset_info, 776 export_dataset_root, 777 ) 778 except Exception as e: 779 logging.error( 780 f"Error during Ultralytics dataset export: {e}" 781 ) 782 783 for model_name in ( 784 pbar := tqdm( 785 models_ultralytics, desc="Ultralytics training" 786 ) 787 ): 788 pbar.set_description(f"Ultralytics model {model_name}") 789 run_config = { 790 "mode": mode, 791 "model_name": model_name, 792 "v51_dataset_name": self.dataset_info["name"], 793 "epochs": epochs, 794 "patience": config_autolabel["early_stop_patience"], 795 "batch_size": models_ultralytics[model_name][ 796 "batch_size" 797 ], 798 "img_size": models_ultralytics[model_name]["img_size"], 799 "export_dataset_root": export_dataset_root, 800 "inference_settings": config_autolabel[ 801 "inference_settings" 802 ], 803 "multi_scale": config_ultralytics["multi_scale"], 804 "cos_lr": config_ultralytics["cos_lr"], 805 } 806 807 workflow_auto_labeling_ultralytics(self.dataset, run_config) 808 809 if SUPPORTED_MODEL_SOURCES[2] in selected_model_source: 810 # Custom Co-DETR 811 config_codetr = config_autolabel["custom_codetr"] 812 run_config = { 813 "export_dataset_root": config_codetr["export_dataset_root"], 814 "container_tool": config_codetr["container_tool"], 815 "n_gpus": config_codetr["n_gpus"], 816 "mode": config_autolabel["mode"], 817 "epochs": config_autolabel["epochs"], 818 "inference_settings": config_autolabel[ 819 "inference_settings" 820 ], 821 "config": None, 822 } 823 codetr_configs = config_codetr["configs"] 824 825 for config in ( 826 pbar := tqdm( 827 codetr_configs, desc="Processing Co-DETR configurations" 828 ) 829 ): 830 pbar.set_description(f"Co-DETR model {config}") 831 run_config["config"] = config 832 workflow_auto_labeling_custom_codetr( 833 self.dataset, self.dataset_info, run_config 834 ) 835 836 elif workflow == "auto_labeling_zero_shot": 837 config = WORKFLOWS["auto_labeling_zero_shot"] 838 workflow_zero_shot_object_detection( 839 self.dataset, self.dataset_info, config 840 ) 841 842 elif workflow == "ensemble_selection": 843 # Config 844 run_config = WORKFLOWS["ensemble_selection"] 845 846 # Workflow 847 workflow_ensemble_selection( 848 self.dataset, self.dataset_info, run_config 849 ) 850 851 elif workflow == "auto_label_mask": 852 config = WORKFLOWS["auto_label_mask"] 853 workflow_auto_label_mask(self.dataset, self.dataset_info, config) 854 855 elif workflow == "class_mapping": 856 # Config 857 run_config = WORKFLOWS["class_mapping"] 858 859 # Workflow 860 workflow_class_mapping( 861 self.dataset, 862 self.dataset_info, 863 run_config, 864 test_dataset_source=None, 865 test_dataset_target=None, 866 ) 867 elif workflow == "data_ingest": 868 dataset = run_data_ingest() 869 870 dataset_info = { 871 "name": "custom_dataset", 872 "v51_type": "FiftyOneDataset", 873 "splits": ["train", "val", "test"], 874 } 875 876 self.dataset = dataset 877 self.dataset_info = dataset_info 878 self.selected_dataset = "custom_dataset" 879 880 logging.info(f"Data ingestion completed successfully.") 881 882 883 else: 884 logging.error( 885 f"Workflow {workflow} not found. Check available workflows in config.py." 886 ) 887 return False 888 889 cleanup_memory() # Clean after each workflow 890 logging.info(f"Completed workflow {workflow} and cleaned up memory") 891 892 except Exception as e: 893 logging.error(f"Workflow {workflow}: An error occurred: {e}") 894 wandb_close(exit_code=1) 895 cleanup_memory() # Clean up even after failure 896 897 return True
Orchestrates the execution of multiple data processing workflows in sequence.
551 def __init__( 552 self, 553 workflows: List[str], 554 selected_dataset: str, 555 dataset: fo.Dataset, 556 dataset_info: Dict, 557 ): 558 """Initializes with specified workflows, dataset selection, and dataset metadata.""" 559 self.workflows = workflows 560 self.selected_dataset = selected_dataset 561 self.dataset = dataset 562 self.dataset_info = dataset_info
Initializes with specified workflows, dataset selection, and dataset metadata.
564 def execute(self) -> bool: 565 """Execute all configured workflows in sequence and handle errors.""" 566 if len(self.workflows) == 0: 567 logging.error("No workflows selected.") 568 return False 569 570 logging.info(f"Selected workflows: {self.workflows}") 571 for workflow in self.workflows: 572 logging.info( 573 f"Running workflow {workflow} for dataset {self.selected_dataset}" 574 ) 575 try: 576 if workflow == "aws_download": 577 parameter_group = "mcity" 578 parameters = WORKFLOWS["aws_download"].get(parameter_group, None) 579 if parameter_group == "mcity": 580 dataset, dataset_name, _ = workflow_aws_download(parameters) 581 else: 582 logging.error( 583 f"The parameter group {parameter_group} is not supported. As AWS are highly specific, please provide a separate set of parameters and a workflow." 584 ) 585 586 # Select downloaded dataset for further workflows if configured 587 if dataset is not None: 588 if parameters["selected_dataset_overwrite"] == True: 589 590 dataset_info = { 591 "name": dataset_name, 592 "v51_type": "FiftyOneDataset", 593 "splits": [], 594 } 595 596 self.dataset = dataset 597 self.dataset_info = dataset_info 598 logging.warning( 599 f"Overwritting selected dataset {self.selected_dataset} with {dataset_name}" 600 ) 601 self.selected_dataset = dataset_name 602 603 elif workflow == "embedding_selection": 604 embedding_models = WORKFLOWS["embedding_selection"][ 605 "embedding_models" 606 ] 607 608 for MODEL_NAME in ( 609 pbar := tqdm(embedding_models, desc="Selection by Embeddings") 610 ): 611 612 # Status 613 pbar.set_description( 614 f"Selection by embeddings with model {MODEL_NAME}." 615 ) 616 617 # Config 618 mode = WORKFLOWS["embedding_selection"]["mode"] 619 parameters = WORKFLOWS["embedding_selection"]["parameters"] 620 config = {"mode": mode, "parameters": parameters} 621 622 # Workflow 623 workflow_embedding_selection( 624 self.dataset, 625 self.dataset_info, 626 MODEL_NAME, 627 config, 628 ) 629 630 elif workflow == "anomaly_detection": 631 632 # Config 633 ano_dec_config = WORKFLOWS["anomaly_detection"] 634 anomalib_image_models = ano_dec_config["anomalib_image_models"] 635 eval_metrics = ano_dec_config["anomalib_eval_metrics"] 636 637 dataset_ano_dec = None 638 data_root = None 639 if "train" in ano_dec_config["mode"]: 640 try: 641 data_preparer = AnomalyDetectionDataPreparation( 642 self.dataset, self.selected_dataset 643 ) 644 dataset_ano_dec = data_preparer.dataset_ano_dec 645 data_root = data_preparer.export_root 646 except Exception as e: 647 logging.error( 648 f"Error during data preparation for Anomaly Detection: {e}" 649 ) 650 651 for MODEL_NAME in ( 652 pbar := tqdm(anomalib_image_models, desc="Anomalib") 653 ): 654 # Status 655 pbar.set_description(f"Anomalib model {MODEL_NAME}") 656 657 # Config 658 run_config = { 659 "model_name": MODEL_NAME, 660 "image_size": anomalib_image_models[MODEL_NAME].get( 661 "image_size", None 662 ), 663 "batch_size": anomalib_image_models[MODEL_NAME].get( 664 "batch_size", 1 665 ), 666 "epochs": ano_dec_config["epochs"], 667 "early_stop_patience": ano_dec_config[ 668 "early_stop_patience" 669 ], 670 "data_root": data_root, 671 "mode": ano_dec_config["mode"], 672 } 673 674 # Workflow 675 workflow_anomaly_detection( 676 self.dataset, 677 dataset_ano_dec, 678 self.dataset_info, 679 eval_metrics, 680 run_config, 681 ) 682 683 elif workflow == "auto_labeling": 684 685 # Config 686 SUPPORTED_MODEL_SOURCES = [ 687 "hf_models_objectdetection", 688 "ultralytics", 689 "custom_codetr", 690 ] 691 692 # Common parameters between models 693 config_autolabel = WORKFLOWS["auto_labeling"] 694 mode = config_autolabel["mode"] 695 epochs = config_autolabel["epochs"] 696 selected_model_source = config_autolabel["model_source"] 697 698 # Check if all selected modes are supported 699 for model_source in selected_model_source: 700 if model_source not in SUPPORTED_MODEL_SOURCES: 701 logging.error( 702 f"Selected model source {model_source} is not supported." 703 ) 704 705 if SUPPORTED_MODEL_SOURCES[0] in selected_model_source: 706 # Hugging Face Models 707 # Single GPU mode (https://github.com/huggingface/transformers/issues/28740) 708 os.environ["CUDA_VISIBLE_DEVICES"] = "0" 709 hf_models = config_autolabel["hf_models_objectdetection"] 710 711 # Dataset Conversion 712 try: 713 logging.info("Converting dataset into Hugging Face format.") 714 pytorch_dataset = FiftyOneTorchDatasetCOCO(self.dataset) 715 pt_to_hf_converter = TorchToHFDatasetCOCO(pytorch_dataset) 716 hf_dataset = pt_to_hf_converter.convert() 717 except Exception as e: 718 logging.error(f"Error during dataset conversion: {e}") 719 720 for MODEL_NAME in ( 721 pbar := tqdm(hf_models, desc="Auto Labeling Models") 722 ): 723 # Status Update 724 pbar.set_description( 725 f"Processing Hugging Face model {MODEL_NAME}" 726 ) 727 728 # Config 729 config_model = config_autolabel[ 730 "hf_models_objectdetection" 731 ][MODEL_NAME] 732 733 run_config = { 734 "mode": mode, 735 "model_name": MODEL_NAME, 736 "v51_dataset_name": self.selected_dataset, 737 "epochs": epochs, 738 "early_stop_threshold": config_autolabel[ 739 "early_stop_threshold" 740 ], 741 "early_stop_patience": config_autolabel[ 742 "early_stop_patience" 743 ], 744 "learning_rate": config_autolabel["learning_rate"], 745 "weight_decay": config_autolabel["weight_decay"], 746 "max_grad_norm": config_autolabel["max_grad_norm"], 747 "batch_size": config_model.get("batch_size", 1), 748 "image_size": config_model.get("image_size", None), 749 "n_worker_dataloader": config_autolabel[ 750 "n_worker_dataloader" 751 ], 752 "inference_settings": config_autolabel[ 753 "inference_settings" 754 ], 755 } 756 757 # Workflow 758 workflow_auto_labeling_hf( 759 self.dataset, 760 hf_dataset, 761 run_config, 762 ) 763 764 if SUPPORTED_MODEL_SOURCES[1] in selected_model_source: 765 # Ultralytics Models 766 config_ultralytics = config_autolabel["ultralytics"] 767 models_ultralytics = config_ultralytics["models"] 768 export_dataset_root = config_ultralytics["export_dataset_root"] 769 770 # Export data into necessary format 771 if "train" in mode: 772 try: 773 UltralyticsObjectDetection.export_data( 774 self.dataset, 775 self.dataset_info, 776 export_dataset_root, 777 ) 778 except Exception as e: 779 logging.error( 780 f"Error during Ultralytics dataset export: {e}" 781 ) 782 783 for model_name in ( 784 pbar := tqdm( 785 models_ultralytics, desc="Ultralytics training" 786 ) 787 ): 788 pbar.set_description(f"Ultralytics model {model_name}") 789 run_config = { 790 "mode": mode, 791 "model_name": model_name, 792 "v51_dataset_name": self.dataset_info["name"], 793 "epochs": epochs, 794 "patience": config_autolabel["early_stop_patience"], 795 "batch_size": models_ultralytics[model_name][ 796 "batch_size" 797 ], 798 "img_size": models_ultralytics[model_name]["img_size"], 799 "export_dataset_root": export_dataset_root, 800 "inference_settings": config_autolabel[ 801 "inference_settings" 802 ], 803 "multi_scale": config_ultralytics["multi_scale"], 804 "cos_lr": config_ultralytics["cos_lr"], 805 } 806 807 workflow_auto_labeling_ultralytics(self.dataset, run_config) 808 809 if SUPPORTED_MODEL_SOURCES[2] in selected_model_source: 810 # Custom Co-DETR 811 config_codetr = config_autolabel["custom_codetr"] 812 run_config = { 813 "export_dataset_root": config_codetr["export_dataset_root"], 814 "container_tool": config_codetr["container_tool"], 815 "n_gpus": config_codetr["n_gpus"], 816 "mode": config_autolabel["mode"], 817 "epochs": config_autolabel["epochs"], 818 "inference_settings": config_autolabel[ 819 "inference_settings" 820 ], 821 "config": None, 822 } 823 codetr_configs = config_codetr["configs"] 824 825 for config in ( 826 pbar := tqdm( 827 codetr_configs, desc="Processing Co-DETR configurations" 828 ) 829 ): 830 pbar.set_description(f"Co-DETR model {config}") 831 run_config["config"] = config 832 workflow_auto_labeling_custom_codetr( 833 self.dataset, self.dataset_info, run_config 834 ) 835 836 elif workflow == "auto_labeling_zero_shot": 837 config = WORKFLOWS["auto_labeling_zero_shot"] 838 workflow_zero_shot_object_detection( 839 self.dataset, self.dataset_info, config 840 ) 841 842 elif workflow == "ensemble_selection": 843 # Config 844 run_config = WORKFLOWS["ensemble_selection"] 845 846 # Workflow 847 workflow_ensemble_selection( 848 self.dataset, self.dataset_info, run_config 849 ) 850 851 elif workflow == "auto_label_mask": 852 config = WORKFLOWS["auto_label_mask"] 853 workflow_auto_label_mask(self.dataset, self.dataset_info, config) 854 855 elif workflow == "class_mapping": 856 # Config 857 run_config = WORKFLOWS["class_mapping"] 858 859 # Workflow 860 workflow_class_mapping( 861 self.dataset, 862 self.dataset_info, 863 run_config, 864 test_dataset_source=None, 865 test_dataset_target=None, 866 ) 867 elif workflow == "data_ingest": 868 dataset = run_data_ingest() 869 870 dataset_info = { 871 "name": "custom_dataset", 872 "v51_type": "FiftyOneDataset", 873 "splits": ["train", "val", "test"], 874 } 875 876 self.dataset = dataset 877 self.dataset_info = dataset_info 878 self.selected_dataset = "custom_dataset" 879 880 logging.info(f"Data ingestion completed successfully.") 881 882 883 else: 884 logging.error( 885 f"Workflow {workflow} not found. Check available workflows in config.py." 886 ) 887 return False 888 889 cleanup_memory() # Clean after each workflow 890 logging.info(f"Completed workflow {workflow} and cleaned up memory") 891 892 except Exception as e: 893 logging.error(f"Workflow {workflow}: An error occurred: {e}") 894 wandb_close(exit_code=1) 895 cleanup_memory() # Clean up even after failure 896 897 return True
Execute all configured workflows in sequence and handle errors.
900def main(): 901 """Executes the data processing workflow, loads dataset, and launches Voxel51 visualization interface.""" 902 time_start = time.time() 903 configure_logging() 904 905 # Signal handler for CTRL + C 906 signal.signal(signal.SIGINT, signal_handler) 907 908 # Execute workflows 909 if "data_ingest" in SELECTED_WORKFLOW: 910 executor = WorkflowExecutor( 911 SELECTED_WORKFLOW, 912 SELECTED_DATASET["name"], 913 dataset=None, 914 dataset_info=None, 915 ) 916 executor.execute() 917 918 # FIX: Pull back outputs after ingestion 919 dataset = executor.dataset 920 dataset_info = executor.dataset_info 921 922 else: 923 dataset, dataset_info = load_dataset(SELECTED_DATASET) 924 925 executor = WorkflowExecutor( 926 SELECTED_WORKFLOW, 927 SELECTED_DATASET["name"], 928 dataset, 929 dataset_info, 930 ) 931 executor.execute() 932 933 934 if dataset is not None: 935 dataset.reload() 936 dataset.save() 937 arrange_fields_in_groups(dataset) 938 logging.info(f"Launching Voxel51 session for dataset {dataset_info['name']}.") 939 940 # Dataset stats 941 logging.debug(dataset) 942 logging.debug(dataset.stats(include_media=True)) 943 944 # V51 UI launch 945 session = fo.launch_app( 946 dataset, address=V51_ADDRESS, port=V51_PORT, remote=V51_REMOTE 947 ) 948 else: 949 logging.info("Skipping Voxel51 session.") 950 951 952 953 time_stop = time.time() 954 logging.info(f"Elapsed time: {time_stop - time_start:.2f} seconds")
Executes the data processing workflow, loads dataset, and launches Voxel51 visualization interface.