main
1import datetime 2import logging 3import os 4import signal 5import sys 6import time 7import warnings 8from typing import Dict, List 9 10import torch 11 12IGNORE_FUTURE_WARNINGS = True 13if IGNORE_FUTURE_WARNINGS: 14 warnings.simplefilter("ignore", category=FutureWarning) 15 16import gc 17 18import fiftyone as fo 19import torch.multiprocessing as mp 20from tqdm import tqdm 21 22from config.config import ( 23 SELECTED_DATASET, 24 SELECTED_WORKFLOW, 25 V51_ADDRESS, 26 V51_PORT, 27 V51_REMOTE, 28 WORKFLOWS, 29) 30from utils.anomaly_detection_data_preparation import AnomalyDetectionDataPreparation 31from utils.data_loader import FiftyOneTorchDatasetCOCO, TorchToHFDatasetCOCO 32from utils.dataset_loader import load_dataset 33from utils.logging import configure_logging 34from utils.mp_distribution import ZeroShotDistributer 35from utils.sidebar_groups import arrange_fields_in_groups 36from utils.wandb_helper import wandb_close, wandb_init 37from workflows.anomaly_detection import Anodec 38from workflows.auto_labeling import ( 39 CustomCoDETRObjectDetection, 40 HuggingFaceObjectDetection, 41 UltralyticsObjectDetection, 42 ZeroShotObjectDetection, 43) 44from workflows.aws_download import AwsDownloader 45from workflows.class_mapping import ClassMapper 46from workflows.embedding_selection import EmbeddingSelection 47from workflows.ensemble_selection import EnsembleSelection 48from workflows.auto_label_mask import AutoLabelMask 49from workflows.class_mapping import ClassMapper 50 51wandb_run = None # Init globally to make sure it is available 52 53def signal_handler(sig, frame): 54 """Handle Ctrl+C signal by cleaning up resources and exiting.""" 55 logging.error("You pressed Ctrl+C!") 56 try: 57 wandb_close(exit_code=1) 58 cleanup_memory() 59 except: 60 pass 61 sys.exit(0) 62 63 64def workflow_aws_download(parameters, wandb_activate=True): 65 """Download and process data from AWS S3 bucket.""" 66 dataset = None 67 dataset_name = None 68 wandb_exit_code = 0 69 files_to_be_downloaded = 0 70 try: 71 # Config 72 bucket = parameters["bucket"] 73 prefix = parameters["prefix"] 74 download_path = parameters["download_path"] 75 test_run = parameters["test_run"] 76 77 # Logging 78 now = datetime.datetime.now() 79 datetime_str = now.strftime("%Y-%m-%d_%H-%M-%S") 80 log_dir = f"logs/tensorboard/aws_{datetime_str}" 81 82 dataset_name = f"annarbor_rolling_{datetime_str}" 83 84 # Weights and Biases 85 wandb_run = wandb_init( 86 run_name=dataset_name, 87 project_name="AWS Download", 88 dataset_name=dataset_name, 89 log_dir=log_dir, 90 wandb_activate=wandb_activate, 91 ) 92 93 # Workflow 94 aws_downloader = AwsDownloader( 95 bucket=bucket, 96 prefix=prefix, 97 download_path=download_path, 98 test_run=test_run, 99 ) 100 101 ( 102 sub_folder, 103 files, 104 files_to_be_downloaded, 105 DOWNLOAD_NUMBER_SUCCESS, 106 DOWNLOAD_SIZE_SUCCESS, 107 ) = aws_downloader.download_files(log_dir=log_dir) 108 109 dataset = aws_downloader.decode_data( 110 sub_folder=sub_folder, 111 files=files, 112 log_dir=log_dir, 113 dataset_name=dataset_name, 114 ) 115 116 except Exception as e: 117 logging.error(f"AWS Download and Extraction failed: {e}") 118 wandb_exit_code = 1 119 120 finally: 121 wandb_close(wandb_exit_code) 122 123 return dataset, dataset_name, files_to_be_downloaded 124 125 126def workflow_anomaly_detection( 127 dataset_normal, 128 dataset_ano_dec, 129 dataset_info, 130 eval_metrics, 131 run_config, 132 wandb_activate=True, 133): 134 """Run anomaly detection workflow using specified models and configurations.""" 135 try: 136 # Weights and Biases 137 wandb_exit_code = 0 138 wandb_run, log_dir = wandb_init( 139 run_name=run_config["model_name"], 140 project_name="Selection by Anomaly Detection", 141 dataset_name=dataset_info["name"], 142 config=run_config, 143 wandb_activate=wandb_activate, 144 ) 145 146 # Workflow 147 148 SUPPORTED_MODES = ["train", "inference"] 149 # Check if all selected modes are supported 150 for mode in run_config["mode"]: 151 if mode not in SUPPORTED_MODES: 152 logging.error(f"Selected mode {mode} is not supported.") 153 if SUPPORTED_MODES[0] in run_config["mode"]: 154 ano_dec = Anodec( 155 dataset=dataset_ano_dec, 156 eval_metrics=eval_metrics, 157 dataset_info=dataset_info, 158 config=run_config, 159 tensorboard_output=log_dir, 160 ) 161 ano_dec.train_and_export_model() 162 ano_dec.run_inference(mode=SUPPORTED_MODES[0]) 163 ano_dec.eval_v51() 164 if SUPPORTED_MODES[1] in run_config["mode"]: 165 ano_dec = Anodec( 166 dataset=dataset_normal, 167 eval_metrics=eval_metrics, 168 dataset_info=dataset_info, 169 config=run_config, 170 tensorboard_output=log_dir, 171 ) 172 ano_dec.run_inference(mode=SUPPORTED_MODES[1]) 173 174 except Exception as e: 175 logging.error( 176 f"Error in Anomaly Detection for model {run_config['model_name']}: {e}" 177 ) 178 wandb_exit_code = 1 179 finally: 180 wandb_close(exit_code=wandb_exit_code) 181 182 return True 183 184 185def workflow_embedding_selection( 186 dataset, dataset_info, MODEL_NAME, config, wandb_activate=True 187): 188 """Compute embeddings and find representative and rare images for dataset selection.""" 189 try: 190 wandb_exit_code = 0 191 wandb_run, log_dir = wandb_init( 192 run_name=MODEL_NAME, 193 project_name="Selection by Embedding", 194 dataset_name=dataset_info["name"], 195 config=config, 196 wandb_activate=wandb_activate, 197 ) 198 embedding_selector = EmbeddingSelection( 199 dataset, dataset_info, MODEL_NAME, log_dir 200 ) 201 202 if embedding_selector.model_already_used == False: 203 204 embedding_selector.compute_embeddings(config["mode"]) 205 embedding_selector.compute_similarity() 206 207 # Find representative and unique samples as center points for further selections 208 thresholds = config["parameters"] 209 embedding_selector.compute_representativeness( 210 thresholds["compute_representativeness"] 211 ) 212 embedding_selector.compute_unique_images_greedy( 213 thresholds["compute_unique_images_greedy"] 214 ) 215 embedding_selector.compute_unique_images_deterministic( 216 thresholds["compute_unique_images_deterministic"] 217 ) 218 219 # Select samples similar to the center points to enlarge the dataset 220 embedding_selector.compute_similar_images( 221 thresholds["compute_similar_images"], thresholds["neighbour_count"] 222 ) 223 else: 224 logging.warning( 225 f"Skipping model {embedding_selector.model_name_key}. It was already used for sample selection." 226 ) 227 228 except Exception as e: 229 logging.error(f"An error occurred with model {MODEL_NAME}: {e}") 230 wandb_exit_code = 1 231 finally: 232 wandb_close(wandb_exit_code) 233 234 return True 235 236 237def workflow_auto_labeling_ultralytics(dataset, run_config, wandb_activate=True): 238 """Auto-labeling workflow using Ultralytics models with optional training and inference.""" 239 try: 240 wandb_exit_code = 0 241 wandb_run = wandb_init( 242 run_name=run_config["model_name"], 243 project_name="Auto Labeling Ultralytics", 244 dataset_name=run_config["v51_dataset_name"], 245 config=run_config, 246 wandb_activate=wandb_activate, 247 ) 248 249 detector = UltralyticsObjectDetection(dataset=dataset, config=run_config) 250 251 # Check if all selected modes are supported 252 SUPPORTED_MODES = ["train", "inference"] 253 for mode in run_config["mode"]: 254 if mode not in SUPPORTED_MODES: 255 logging.error(f"Selected mode {mode} is not supported.") 256 257 if SUPPORTED_MODES[0] in run_config["mode"]: 258 logging.info(f"Training model {run_config['model_name']}") 259 detector.train() 260 if SUPPORTED_MODES[1] in run_config["mode"]: 261 logging.info(f"Running inference for model {run_config['model_name']}") 262 detector.inference() 263 264 except Exception as e: 265 logging.error(f"An error occurred with model {run_config['model_name']}: {e}") 266 wandb_exit_code = 1 267 268 finally: 269 wandb_close(wandb_exit_code) 270 271 return True 272 273 274def workflow_auto_labeling_hf(dataset, hf_dataset, run_config, wandb_activate=True): 275 """Auto-labeling using Hugging Face models on a dataset, including training and/or inference based on the provided configuration.""" 276 try: 277 wandb_exit_code = 0 278 wandb_run = wandb_init( 279 run_name=run_config["model_name"], 280 project_name="Auto Labeling Hugging Face", 281 dataset_name=run_config["v51_dataset_name"], 282 config=run_config, 283 wandb_activate=wandb_activate, 284 ) 285 286 detector = HuggingFaceObjectDetection( 287 dataset=dataset, 288 config=run_config, 289 ) 290 SUPPORTED_MODES = ["train", "inference"] 291 292 # Check if all selected modes are supported 293 for mode in run_config["mode"]: 294 if mode not in SUPPORTED_MODES: 295 logging.error(f"Selected mode {mode} is not supported.") 296 if SUPPORTED_MODES[0] in run_config["mode"]: 297 logging.info(f"Training model {run_config['model_name']}") 298 detector.train(hf_dataset) 299 if SUPPORTED_MODES[1] in run_config["mode"]: 300 logging.info(f"Running inference for model {run_config['model_name']}") 301 detector.inference(inference_settings=run_config["inference_settings"]) 302 303 except Exception as e: 304 logging.error(f"An error occurred with model {run_config['model_name']}: {e}") 305 wandb_exit_code = 1 306 307 finally: 308 wandb_close(wandb_exit_code) 309 310 return True 311 312 313def workflow_auto_labeling_custom_codetr( 314 dataset, dataset_info, run_config, wandb_activate=True 315): 316 """Auto labeling workflow using Co-DETR model supporting training and inference modes.""" 317 318 try: 319 wandb_exit_code = 0 320 wandb_run = wandb_init( 321 run_name=run_config["config"], 322 project_name="Co-DETR Auto Labeling", 323 dataset_name=dataset_info["name"], 324 config=run_config, 325 wandb_activate=wandb_activate, 326 ) 327 328 mode = run_config["mode"] 329 330 detector = CustomCoDETRObjectDetection(dataset, dataset_info, run_config) 331 detector.convert_data() 332 if "train" in mode: 333 detector.update_config_file( 334 dataset_name=dataset_info["name"], 335 config_file=run_config["config"], 336 max_epochs=run_config["epochs"], 337 ) 338 detector.train( 339 run_config["config"], run_config["n_gpus"], run_config["container_tool"] 340 ) 341 if "inference" in mode: 342 detector.run_inference( 343 dataset, 344 run_config["config"], 345 run_config["n_gpus"], 346 run_config["container_tool"], 347 run_config["inference_settings"], 348 ) 349 except Exception as e: 350 logging.error(f"Error during CoDETR training: {e}") 351 wandb_exit_code = 1 352 finally: 353 wandb_close(wandb_exit_code) 354 355 return True 356 357 358def workflow_zero_shot_object_detection(dataset, dataset_info, config): 359 """Run zero-shot object detection on a dataset using models from Huggingface, supporting both single and multi-GPU inference.""" 360 # Set multiprocessing mode for CUDA multiprocessing 361 try: 362 mp.set_start_method("spawn", force=True) 363 logging.debug("Successfully set multiprocessing start method to 'spawn'") 364 except RuntimeError as e: 365 # This is expected if the start method was already set 366 logging.debug(f"Multiprocessing start method was already set: {e}") 367 except Exception as e: 368 # Handle any other unexpected errors 369 logging.error(f"Failed to set multiprocessing start method: {e}") 370 371 # Zero-shot object detector models from Huggingface 372 # Optimized for parallel multi-GPU inference, also supports single GPU 373 dataset_torch = FiftyOneTorchDatasetCOCO(dataset) 374 detector = ZeroShotObjectDetection( 375 dataset_torch=dataset_torch, dataset_info=dataset_info, config=config 376 ) 377 378 # Check if model detections are already stored in V51 dataset or on disk 379 models_splits_dict = detector.exclude_stored_predictions( 380 dataset_v51=dataset, config=config 381 ) 382 if len(models_splits_dict) > 0: 383 config["hf_models_zeroshot_objectdetection"] = models_splits_dict 384 distributor = ZeroShotDistributer( 385 config=config, 386 n_samples=len(dataset_torch), 387 dataset_info=dataset_info, 388 detector=detector, 389 ) 390 distributor.distribute_and_run() 391 else: 392 logging.info( 393 "All zero shot models already have predictions stored in the dataset." 394 ) 395 396 # To make new fields available to follow-up processes 397 dataset.reload() 398 dataset.save() 399 400 return True 401 402 403def workflow_auto_label_mask(dataset, dataset_info, config): 404 try: 405 depth_config = config["depth_estimation"] 406 seg_config = config["semantic_segmentation"] 407 408 for architecture_name, architecture_info in depth_config.items(): 409 auto_labeler = AutoLabelMask( 410 dataset=dataset, 411 dataset_info=dataset_info, 412 model_name=architecture_name, 413 task_type="depth_estimation", 414 model_config=architecture_info, 415 ) 416 auto_labeler.run_inference() 417 418 for architecture_name, architecture_info in seg_config.items(): 419 auto_labeler = AutoLabelMask( 420 dataset=dataset, 421 dataset_info=dataset_info, 422 model_name=architecture_name, 423 task_type="semantic_segmentation", 424 model_config=architecture_info, 425 ) 426 auto_labeler.run_inference() 427 428 except Exception as e: 429 logging.error(f"Auto-labeling mask workflow failed: {e}") 430 raise 431 432 433def workflow_ensemble_selection(dataset, dataset_info, run_config, wandb_activate=True): 434 """Runs ensemble selection workflow on given dataset using provided configuration.""" 435 try: 436 wandb_exit_code = 0 437 438 wandb_run = wandb_init( 439 run_name="Selection by Ensemble", 440 project_name="Ensemble Selection", 441 dataset_name=dataset_info["name"], 442 config=run_config, 443 wandb_activate=wandb_activate, 444 ) 445 ensemble_selecter = EnsembleSelection(dataset, run_config) 446 ensemble_selecter.ensemble_selection() 447 except Exception as e: 448 logging.error(f"An error occured during Ensemble Selection: {e}") 449 wandb_exit_code = 1 450 451 finally: 452 wandb_close(wandb_exit_code) 453 454 return True 455 456 457def workflow_class_mapping( 458 dataset, 459 dataset_info, 460 run_config, 461 wandb_activate=True, 462 test_dataset_source=None, 463 test_dataset_target=None, 464): 465 """Runs class mapping workflow to align labels between the source dataset and target dataset.""" 466 try: 467 wandb_exit_code = 0 468 # Initialize a wandb run for class mapping 469 wandb_run = wandb_init( 470 run_name="class_mapping", 471 project_name="Class Mapping", 472 dataset_name=dataset_info["name"], 473 config=run_config, 474 wandb_activate=wandb_activate, 475 ) 476 class_mapping_models = run_config["hf_models_zeroshot_classification"] 477 478 for model_name in ( 479 pbar := tqdm(class_mapping_models, desc="Processing Class Mapping") 480 ): 481 pbar.set_description(f"Zero Shot Classification model {model_name}") 482 mapper = ClassMapper(dataset, model_name, run_config) 483 try: 484 stats = mapper.run_mapping(test_dataset_source, test_dataset_target) 485 # Display statistics only if mapping was successful 486 source_class_counts = stats["total_processed"] 487 logging.info("\nClassification Results for Source Dataset:") 488 for source_class, count in stats["source_class_counts"].items(): 489 percentage = ( 490 (count / source_class_counts) * 100 491 if source_class_counts > 0 492 else 0 493 ) 494 logging.info( 495 f"{source_class}: {count} samples processed ({percentage:.1f}%)" 496 ) 497 498 # Display statistics for tags added to Target Dataset 499 logging.info("\nTag Addition Results (Target Dataset Tags):") 500 logging.info(f"Total new tags added: {stats['changes_made']}") 501 for target_class, tag_count in stats["tags_added_per_category"].items(): 502 logging.info(f"{target_class} tags added: {tag_count}") 503 504 except Exception as e: 505 logging.error(f"Error during mapping with model {model_name}: {e}") 506 507 except Exception as e: 508 logging.error(f"Error in class_mapping workflow: {e}") 509 wandb_exit_code = 1 510 finally: 511 wandb_close(wandb_exit_code) 512 513 return True 514 515 516def cleanup_memory(do_extensive_cleanup=False): 517 """Clean up memory after workflow execution. 'do_extensive_cleanup' recommended for multiple training sessions in a row.""" 518 logging.info("Starting memory cleanup") 519 # Clear CUDA cache 520 if torch.cuda.is_available(): 521 torch.cuda.empty_cache() 522 523 # Force garbage collection 524 gc.collect() 525 526 if do_extensive_cleanup: 527 528 # Clear any leftover tensors 529 n_deleted_torch_objects = 0 530 for obj in tqdm( 531 gc.get_objects(), desc="Deleting objects from Python Garbage Collector" 532 ): 533 try: 534 if torch.is_tensor(obj): 535 del obj 536 n_deleted_torch_objects += 1 537 except: 538 pass 539 540 logging.info(f"Deleted {n_deleted_torch_objects} torch objects") 541 542 # Final garbage collection 543 gc.collect() 544 545 546class WorkflowExecutor: 547 """Orchestrates the execution of multiple data processing workflows in sequence.""" 548 549 def __init__( 550 self, 551 workflows: List[str], 552 selected_dataset: str, 553 dataset: fo.Dataset, 554 dataset_info: Dict, 555 ): 556 """Initializes with specified workflows, dataset selection, and dataset metadata.""" 557 self.workflows = workflows 558 self.selected_dataset = selected_dataset 559 self.dataset = dataset 560 self.dataset_info = dataset_info 561 562 def execute(self) -> bool: 563 """Execute all configured workflows in sequence and handle errors.""" 564 if len(self.workflows) == 0: 565 logging.error("No workflows selected.") 566 return False 567 568 logging.info(f"Selected workflows: {self.workflows}") 569 for workflow in self.workflows: 570 logging.info( 571 f"Running workflow {workflow} for dataset {self.selected_dataset}" 572 ) 573 try: 574 if workflow == "aws_download": 575 parameter_group = "mcity" 576 parameters = WORKFLOWS["aws_download"].get(parameter_group, None) 577 if parameter_group == "mcity": 578 dataset, dataset_name, _ = workflow_aws_download(parameters) 579 else: 580 logging.error( 581 f"The parameter group {parameter_group} is not supported. As AWS are highly specific, please provide a separate set of parameters and a workflow." 582 ) 583 584 # Select downloaded dataset for further workflows if configured 585 if dataset is not None: 586 if parameters["selected_dataset_overwrite"] == True: 587 588 dataset_info = { 589 "name": dataset_name, 590 "v51_type": "FiftyOneDataset", 591 "splits": [], 592 } 593 594 self.dataset = dataset 595 self.dataset_info = dataset_info 596 logging.warning( 597 f"Overwritting selected dataset {self.selected_dataset} with {dataset_name}" 598 ) 599 self.selected_dataset = dataset_name 600 601 elif workflow == "embedding_selection": 602 embedding_models = WORKFLOWS["embedding_selection"][ 603 "embedding_models" 604 ] 605 606 for MODEL_NAME in ( 607 pbar := tqdm(embedding_models, desc="Selection by Embeddings") 608 ): 609 610 # Status 611 pbar.set_description( 612 f"Selection by embeddings with model {MODEL_NAME}." 613 ) 614 615 # Config 616 mode = WORKFLOWS["embedding_selection"]["mode"] 617 parameters = WORKFLOWS["embedding_selection"]["parameters"] 618 config = {"mode": mode, "parameters": parameters} 619 620 # Workflow 621 workflow_embedding_selection( 622 self.dataset, 623 self.dataset_info, 624 MODEL_NAME, 625 config, 626 ) 627 628 elif workflow == "anomaly_detection": 629 630 # Config 631 ano_dec_config = WORKFLOWS["anomaly_detection"] 632 anomalib_image_models = ano_dec_config["anomalib_image_models"] 633 eval_metrics = ano_dec_config["anomalib_eval_metrics"] 634 635 dataset_ano_dec = None 636 data_root = None 637 if "train" in ano_dec_config["mode"]: 638 try: 639 data_preparer = AnomalyDetectionDataPreparation( 640 self.dataset, self.selected_dataset 641 ) 642 dataset_ano_dec = data_preparer.dataset_ano_dec 643 data_root = data_preparer.export_root 644 except Exception as e: 645 logging.error( 646 f"Error during data preparation for Anomaly Detection: {e}" 647 ) 648 649 for MODEL_NAME in ( 650 pbar := tqdm(anomalib_image_models, desc="Anomalib") 651 ): 652 # Status 653 pbar.set_description(f"Anomalib model {MODEL_NAME}") 654 655 # Config 656 run_config = { 657 "model_name": MODEL_NAME, 658 "image_size": anomalib_image_models[MODEL_NAME].get( 659 "image_size", None 660 ), 661 "batch_size": anomalib_image_models[MODEL_NAME].get( 662 "batch_size", 1 663 ), 664 "epochs": ano_dec_config["epochs"], 665 "early_stop_patience": ano_dec_config[ 666 "early_stop_patience" 667 ], 668 "data_root": data_root, 669 "mode": ano_dec_config["mode"], 670 } 671 672 # Workflow 673 workflow_anomaly_detection( 674 self.dataset, 675 dataset_ano_dec, 676 self.dataset_info, 677 eval_metrics, 678 run_config, 679 ) 680 681 elif workflow == "auto_labeling": 682 683 # Config 684 SUPPORTED_MODEL_SOURCES = [ 685 "hf_models_objectdetection", 686 "ultralytics", 687 "custom_codetr", 688 ] 689 690 # Common parameters between models 691 config_autolabel = WORKFLOWS["auto_labeling"] 692 mode = config_autolabel["mode"] 693 epochs = config_autolabel["epochs"] 694 selected_model_source = config_autolabel["model_source"] 695 696 # Check if all selected modes are supported 697 for model_source in selected_model_source: 698 if model_source not in SUPPORTED_MODEL_SOURCES: 699 logging.error( 700 f"Selected model source {model_source} is not supported." 701 ) 702 703 if SUPPORTED_MODEL_SOURCES[0] in selected_model_source: 704 # Hugging Face Models 705 # Single GPU mode (https://github.com/huggingface/transformers/issues/28740) 706 os.environ["CUDA_VISIBLE_DEVICES"] = "0" 707 hf_models = config_autolabel["hf_models_objectdetection"] 708 709 # Dataset Conversion 710 try: 711 logging.info("Converting dataset into Hugging Face format.") 712 pytorch_dataset = FiftyOneTorchDatasetCOCO(self.dataset) 713 pt_to_hf_converter = TorchToHFDatasetCOCO(pytorch_dataset) 714 hf_dataset = pt_to_hf_converter.convert() 715 except Exception as e: 716 logging.error(f"Error during dataset conversion: {e}") 717 718 for MODEL_NAME in ( 719 pbar := tqdm(hf_models, desc="Auto Labeling Models") 720 ): 721 # Status Update 722 pbar.set_description( 723 f"Processing Hugging Face model {MODEL_NAME}" 724 ) 725 726 # Config 727 config_model = config_autolabel[ 728 "hf_models_objectdetection" 729 ][MODEL_NAME] 730 731 run_config = { 732 "mode": mode, 733 "model_name": MODEL_NAME, 734 "v51_dataset_name": self.selected_dataset, 735 "epochs": epochs, 736 "early_stop_threshold": config_autolabel[ 737 "early_stop_threshold" 738 ], 739 "early_stop_patience": config_autolabel[ 740 "early_stop_patience" 741 ], 742 "learning_rate": config_autolabel["learning_rate"], 743 "weight_decay": config_autolabel["weight_decay"], 744 "max_grad_norm": config_autolabel["max_grad_norm"], 745 "batch_size": config_model.get("batch_size", 1), 746 "image_size": config_model.get("image_size", None), 747 "n_worker_dataloader": config_autolabel[ 748 "n_worker_dataloader" 749 ], 750 "inference_settings": config_autolabel[ 751 "inference_settings" 752 ], 753 } 754 755 # Workflow 756 workflow_auto_labeling_hf( 757 self.dataset, 758 hf_dataset, 759 run_config, 760 ) 761 762 if SUPPORTED_MODEL_SOURCES[1] in selected_model_source: 763 # Ultralytics Models 764 config_ultralytics = config_autolabel["ultralytics"] 765 models_ultralytics = config_ultralytics["models"] 766 export_dataset_root = config_ultralytics["export_dataset_root"] 767 768 # Export data into necessary format 769 if "train" in mode: 770 try: 771 UltralyticsObjectDetection.export_data( 772 self.dataset, 773 self.dataset_info, 774 export_dataset_root, 775 ) 776 except Exception as e: 777 logging.error( 778 f"Error during Ultralytics dataset export: {e}" 779 ) 780 781 for model_name in ( 782 pbar := tqdm( 783 models_ultralytics, desc="Ultralytics training" 784 ) 785 ): 786 pbar.set_description(f"Ultralytics model {model_name}") 787 run_config = { 788 "mode": mode, 789 "model_name": model_name, 790 "v51_dataset_name": self.dataset_info["name"], 791 "epochs": epochs, 792 "patience": config_autolabel["early_stop_patience"], 793 "batch_size": models_ultralytics[model_name][ 794 "batch_size" 795 ], 796 "img_size": models_ultralytics[model_name]["img_size"], 797 "export_dataset_root": export_dataset_root, 798 "inference_settings": config_autolabel[ 799 "inference_settings" 800 ], 801 "multi_scale": config_ultralytics["multi_scale"], 802 "cos_lr": config_ultralytics["cos_lr"], 803 } 804 805 workflow_auto_labeling_ultralytics(self.dataset, run_config) 806 807 if SUPPORTED_MODEL_SOURCES[2] in selected_model_source: 808 # Custom Co-DETR 809 config_codetr = config_autolabel["custom_codetr"] 810 run_config = { 811 "export_dataset_root": config_codetr["export_dataset_root"], 812 "container_tool": config_codetr["container_tool"], 813 "n_gpus": config_codetr["n_gpus"], 814 "mode": config_autolabel["mode"], 815 "epochs": config_autolabel["epochs"], 816 "inference_settings": config_autolabel[ 817 "inference_settings" 818 ], 819 "config": None, 820 } 821 codetr_configs = config_codetr["configs"] 822 823 for config in ( 824 pbar := tqdm( 825 codetr_configs, desc="Processing Co-DETR configurations" 826 ) 827 ): 828 pbar.set_description(f"Co-DETR model {config}") 829 run_config["config"] = config 830 workflow_auto_labeling_custom_codetr( 831 self.dataset, self.dataset_info, run_config 832 ) 833 834 elif workflow == "auto_labeling_zero_shot": 835 config = WORKFLOWS["auto_labeling_zero_shot"] 836 workflow_zero_shot_object_detection( 837 self.dataset, self.dataset_info, config 838 ) 839 840 elif workflow == "ensemble_selection": 841 # Config 842 run_config = WORKFLOWS["ensemble_selection"] 843 844 # Workflow 845 workflow_ensemble_selection( 846 self.dataset, self.dataset_info, run_config 847 ) 848 849 elif workflow == "auto_label_mask": 850 config = WORKFLOWS["auto_label_mask"] 851 workflow_auto_label_mask(self.dataset, self.dataset_info, config) 852 853 elif workflow == "class_mapping": 854 # Config 855 run_config = WORKFLOWS["class_mapping"] 856 857 # Workflow 858 workflow_class_mapping( 859 self.dataset, 860 self.dataset_info, 861 run_config, 862 test_dataset_source=None, 863 test_dataset_target=None, 864 ) 865 866 else: 867 logging.error( 868 f"Workflow {workflow} not found. Check available workflows in config.py." 869 ) 870 return False 871 872 cleanup_memory() # Clean after each workflow 873 logging.info(f"Completed workflow {workflow} and cleaned up memory") 874 875 except Exception as e: 876 logging.error(f"Workflow {workflow}: An error occurred: {e}") 877 wandb_close(exit_code=1) 878 cleanup_memory() # Clean up even after failure 879 880 return True 881 882 883def main(): 884 """Executes the data processing workflow, loads dataset, and launches Voxel51 visualization interface.""" 885 time_start = time.time() 886 configure_logging() 887 888 # Signal handler for CTRL + C 889 signal.signal(signal.SIGINT, signal_handler) 890 891 # Execute workflows 892 dataset, dataset_info = load_dataset(SELECTED_DATASET) 893 894 executor = WorkflowExecutor( 895 SELECTED_WORKFLOW, SELECTED_DATASET["name"], dataset, dataset_info 896 ) 897 executor.execute() 898 899 # Launch V51 session 900 dataset.reload() 901 dataset.save() 902 arrange_fields_in_groups(dataset) 903 logging.info(f"Launching Voxel51 session for dataset {dataset_info['name']}.") 904 905 # Dataset stats 906 logging.debug(dataset) 907 logging.debug(dataset.stats(include_media=True)) 908 909 # V51 UI launch 910 session = fo.launch_app( 911 dataset, address=V51_ADDRESS, port=V51_PORT, remote=V51_REMOTE 912 ) 913 914 time_stop = time.time() 915 logging.info(f"Elapsed time: {time_stop - time_start:.2f} seconds") 916 917 918if __name__ == "__main__": 919 cleanup_memory() 920 main()
54def signal_handler(sig, frame): 55 """Handle Ctrl+C signal by cleaning up resources and exiting.""" 56 logging.error("You pressed Ctrl+C!") 57 try: 58 wandb_close(exit_code=1) 59 cleanup_memory() 60 except: 61 pass 62 sys.exit(0)
Handle Ctrl+C signal by cleaning up resources and exiting.
65def workflow_aws_download(parameters, wandb_activate=True): 66 """Download and process data from AWS S3 bucket.""" 67 dataset = None 68 dataset_name = None 69 wandb_exit_code = 0 70 files_to_be_downloaded = 0 71 try: 72 # Config 73 bucket = parameters["bucket"] 74 prefix = parameters["prefix"] 75 download_path = parameters["download_path"] 76 test_run = parameters["test_run"] 77 78 # Logging 79 now = datetime.datetime.now() 80 datetime_str = now.strftime("%Y-%m-%d_%H-%M-%S") 81 log_dir = f"logs/tensorboard/aws_{datetime_str}" 82 83 dataset_name = f"annarbor_rolling_{datetime_str}" 84 85 # Weights and Biases 86 wandb_run = wandb_init( 87 run_name=dataset_name, 88 project_name="AWS Download", 89 dataset_name=dataset_name, 90 log_dir=log_dir, 91 wandb_activate=wandb_activate, 92 ) 93 94 # Workflow 95 aws_downloader = AwsDownloader( 96 bucket=bucket, 97 prefix=prefix, 98 download_path=download_path, 99 test_run=test_run, 100 ) 101 102 ( 103 sub_folder, 104 files, 105 files_to_be_downloaded, 106 DOWNLOAD_NUMBER_SUCCESS, 107 DOWNLOAD_SIZE_SUCCESS, 108 ) = aws_downloader.download_files(log_dir=log_dir) 109 110 dataset = aws_downloader.decode_data( 111 sub_folder=sub_folder, 112 files=files, 113 log_dir=log_dir, 114 dataset_name=dataset_name, 115 ) 116 117 except Exception as e: 118 logging.error(f"AWS Download and Extraction failed: {e}") 119 wandb_exit_code = 1 120 121 finally: 122 wandb_close(wandb_exit_code) 123 124 return dataset, dataset_name, files_to_be_downloaded
Download and process data from AWS S3 bucket.
127def workflow_anomaly_detection( 128 dataset_normal, 129 dataset_ano_dec, 130 dataset_info, 131 eval_metrics, 132 run_config, 133 wandb_activate=True, 134): 135 """Run anomaly detection workflow using specified models and configurations.""" 136 try: 137 # Weights and Biases 138 wandb_exit_code = 0 139 wandb_run, log_dir = wandb_init( 140 run_name=run_config["model_name"], 141 project_name="Selection by Anomaly Detection", 142 dataset_name=dataset_info["name"], 143 config=run_config, 144 wandb_activate=wandb_activate, 145 ) 146 147 # Workflow 148 149 SUPPORTED_MODES = ["train", "inference"] 150 # Check if all selected modes are supported 151 for mode in run_config["mode"]: 152 if mode not in SUPPORTED_MODES: 153 logging.error(f"Selected mode {mode} is not supported.") 154 if SUPPORTED_MODES[0] in run_config["mode"]: 155 ano_dec = Anodec( 156 dataset=dataset_ano_dec, 157 eval_metrics=eval_metrics, 158 dataset_info=dataset_info, 159 config=run_config, 160 tensorboard_output=log_dir, 161 ) 162 ano_dec.train_and_export_model() 163 ano_dec.run_inference(mode=SUPPORTED_MODES[0]) 164 ano_dec.eval_v51() 165 if SUPPORTED_MODES[1] in run_config["mode"]: 166 ano_dec = Anodec( 167 dataset=dataset_normal, 168 eval_metrics=eval_metrics, 169 dataset_info=dataset_info, 170 config=run_config, 171 tensorboard_output=log_dir, 172 ) 173 ano_dec.run_inference(mode=SUPPORTED_MODES[1]) 174 175 except Exception as e: 176 logging.error( 177 f"Error in Anomaly Detection for model {run_config['model_name']}: {e}" 178 ) 179 wandb_exit_code = 1 180 finally: 181 wandb_close(exit_code=wandb_exit_code) 182 183 return True
Run anomaly detection workflow using specified models and configurations.
186def workflow_embedding_selection( 187 dataset, dataset_info, MODEL_NAME, config, wandb_activate=True 188): 189 """Compute embeddings and find representative and rare images for dataset selection.""" 190 try: 191 wandb_exit_code = 0 192 wandb_run, log_dir = wandb_init( 193 run_name=MODEL_NAME, 194 project_name="Selection by Embedding", 195 dataset_name=dataset_info["name"], 196 config=config, 197 wandb_activate=wandb_activate, 198 ) 199 embedding_selector = EmbeddingSelection( 200 dataset, dataset_info, MODEL_NAME, log_dir 201 ) 202 203 if embedding_selector.model_already_used == False: 204 205 embedding_selector.compute_embeddings(config["mode"]) 206 embedding_selector.compute_similarity() 207 208 # Find representative and unique samples as center points for further selections 209 thresholds = config["parameters"] 210 embedding_selector.compute_representativeness( 211 thresholds["compute_representativeness"] 212 ) 213 embedding_selector.compute_unique_images_greedy( 214 thresholds["compute_unique_images_greedy"] 215 ) 216 embedding_selector.compute_unique_images_deterministic( 217 thresholds["compute_unique_images_deterministic"] 218 ) 219 220 # Select samples similar to the center points to enlarge the dataset 221 embedding_selector.compute_similar_images( 222 thresholds["compute_similar_images"], thresholds["neighbour_count"] 223 ) 224 else: 225 logging.warning( 226 f"Skipping model {embedding_selector.model_name_key}. It was already used for sample selection." 227 ) 228 229 except Exception as e: 230 logging.error(f"An error occurred with model {MODEL_NAME}: {e}") 231 wandb_exit_code = 1 232 finally: 233 wandb_close(wandb_exit_code) 234 235 return True
Compute embeddings and find representative and rare images for dataset selection.
238def workflow_auto_labeling_ultralytics(dataset, run_config, wandb_activate=True): 239 """Auto-labeling workflow using Ultralytics models with optional training and inference.""" 240 try: 241 wandb_exit_code = 0 242 wandb_run = wandb_init( 243 run_name=run_config["model_name"], 244 project_name="Auto Labeling Ultralytics", 245 dataset_name=run_config["v51_dataset_name"], 246 config=run_config, 247 wandb_activate=wandb_activate, 248 ) 249 250 detector = UltralyticsObjectDetection(dataset=dataset, config=run_config) 251 252 # Check if all selected modes are supported 253 SUPPORTED_MODES = ["train", "inference"] 254 for mode in run_config["mode"]: 255 if mode not in SUPPORTED_MODES: 256 logging.error(f"Selected mode {mode} is not supported.") 257 258 if SUPPORTED_MODES[0] in run_config["mode"]: 259 logging.info(f"Training model {run_config['model_name']}") 260 detector.train() 261 if SUPPORTED_MODES[1] in run_config["mode"]: 262 logging.info(f"Running inference for model {run_config['model_name']}") 263 detector.inference() 264 265 except Exception as e: 266 logging.error(f"An error occurred with model {run_config['model_name']}: {e}") 267 wandb_exit_code = 1 268 269 finally: 270 wandb_close(wandb_exit_code) 271 272 return True
Auto-labeling workflow using Ultralytics models with optional training and inference.
275def workflow_auto_labeling_hf(dataset, hf_dataset, run_config, wandb_activate=True): 276 """Auto-labeling using Hugging Face models on a dataset, including training and/or inference based on the provided configuration.""" 277 try: 278 wandb_exit_code = 0 279 wandb_run = wandb_init( 280 run_name=run_config["model_name"], 281 project_name="Auto Labeling Hugging Face", 282 dataset_name=run_config["v51_dataset_name"], 283 config=run_config, 284 wandb_activate=wandb_activate, 285 ) 286 287 detector = HuggingFaceObjectDetection( 288 dataset=dataset, 289 config=run_config, 290 ) 291 SUPPORTED_MODES = ["train", "inference"] 292 293 # Check if all selected modes are supported 294 for mode in run_config["mode"]: 295 if mode not in SUPPORTED_MODES: 296 logging.error(f"Selected mode {mode} is not supported.") 297 if SUPPORTED_MODES[0] in run_config["mode"]: 298 logging.info(f"Training model {run_config['model_name']}") 299 detector.train(hf_dataset) 300 if SUPPORTED_MODES[1] in run_config["mode"]: 301 logging.info(f"Running inference for model {run_config['model_name']}") 302 detector.inference(inference_settings=run_config["inference_settings"]) 303 304 except Exception as e: 305 logging.error(f"An error occurred with model {run_config['model_name']}: {e}") 306 wandb_exit_code = 1 307 308 finally: 309 wandb_close(wandb_exit_code) 310 311 return True
Auto-labeling using Hugging Face models on a dataset, including training and/or inference based on the provided configuration.
314def workflow_auto_labeling_custom_codetr( 315 dataset, dataset_info, run_config, wandb_activate=True 316): 317 """Auto labeling workflow using Co-DETR model supporting training and inference modes.""" 318 319 try: 320 wandb_exit_code = 0 321 wandb_run = wandb_init( 322 run_name=run_config["config"], 323 project_name="Co-DETR Auto Labeling", 324 dataset_name=dataset_info["name"], 325 config=run_config, 326 wandb_activate=wandb_activate, 327 ) 328 329 mode = run_config["mode"] 330 331 detector = CustomCoDETRObjectDetection(dataset, dataset_info, run_config) 332 detector.convert_data() 333 if "train" in mode: 334 detector.update_config_file( 335 dataset_name=dataset_info["name"], 336 config_file=run_config["config"], 337 max_epochs=run_config["epochs"], 338 ) 339 detector.train( 340 run_config["config"], run_config["n_gpus"], run_config["container_tool"] 341 ) 342 if "inference" in mode: 343 detector.run_inference( 344 dataset, 345 run_config["config"], 346 run_config["n_gpus"], 347 run_config["container_tool"], 348 run_config["inference_settings"], 349 ) 350 except Exception as e: 351 logging.error(f"Error during CoDETR training: {e}") 352 wandb_exit_code = 1 353 finally: 354 wandb_close(wandb_exit_code) 355 356 return True
Auto labeling workflow using Co-DETR model supporting training and inference modes.
359def workflow_zero_shot_object_detection(dataset, dataset_info, config): 360 """Run zero-shot object detection on a dataset using models from Huggingface, supporting both single and multi-GPU inference.""" 361 # Set multiprocessing mode for CUDA multiprocessing 362 try: 363 mp.set_start_method("spawn", force=True) 364 logging.debug("Successfully set multiprocessing start method to 'spawn'") 365 except RuntimeError as e: 366 # This is expected if the start method was already set 367 logging.debug(f"Multiprocessing start method was already set: {e}") 368 except Exception as e: 369 # Handle any other unexpected errors 370 logging.error(f"Failed to set multiprocessing start method: {e}") 371 372 # Zero-shot object detector models from Huggingface 373 # Optimized for parallel multi-GPU inference, also supports single GPU 374 dataset_torch = FiftyOneTorchDatasetCOCO(dataset) 375 detector = ZeroShotObjectDetection( 376 dataset_torch=dataset_torch, dataset_info=dataset_info, config=config 377 ) 378 379 # Check if model detections are already stored in V51 dataset or on disk 380 models_splits_dict = detector.exclude_stored_predictions( 381 dataset_v51=dataset, config=config 382 ) 383 if len(models_splits_dict) > 0: 384 config["hf_models_zeroshot_objectdetection"] = models_splits_dict 385 distributor = ZeroShotDistributer( 386 config=config, 387 n_samples=len(dataset_torch), 388 dataset_info=dataset_info, 389 detector=detector, 390 ) 391 distributor.distribute_and_run() 392 else: 393 logging.info( 394 "All zero shot models already have predictions stored in the dataset." 395 ) 396 397 # To make new fields available to follow-up processes 398 dataset.reload() 399 dataset.save() 400 401 return True
Run zero-shot object detection on a dataset using models from Huggingface, supporting both single and multi-GPU inference.
404def workflow_auto_label_mask(dataset, dataset_info, config): 405 try: 406 depth_config = config["depth_estimation"] 407 seg_config = config["semantic_segmentation"] 408 409 for architecture_name, architecture_info in depth_config.items(): 410 auto_labeler = AutoLabelMask( 411 dataset=dataset, 412 dataset_info=dataset_info, 413 model_name=architecture_name, 414 task_type="depth_estimation", 415 model_config=architecture_info, 416 ) 417 auto_labeler.run_inference() 418 419 for architecture_name, architecture_info in seg_config.items(): 420 auto_labeler = AutoLabelMask( 421 dataset=dataset, 422 dataset_info=dataset_info, 423 model_name=architecture_name, 424 task_type="semantic_segmentation", 425 model_config=architecture_info, 426 ) 427 auto_labeler.run_inference() 428 429 except Exception as e: 430 logging.error(f"Auto-labeling mask workflow failed: {e}") 431 raise
434def workflow_ensemble_selection(dataset, dataset_info, run_config, wandb_activate=True): 435 """Runs ensemble selection workflow on given dataset using provided configuration.""" 436 try: 437 wandb_exit_code = 0 438 439 wandb_run = wandb_init( 440 run_name="Selection by Ensemble", 441 project_name="Ensemble Selection", 442 dataset_name=dataset_info["name"], 443 config=run_config, 444 wandb_activate=wandb_activate, 445 ) 446 ensemble_selecter = EnsembleSelection(dataset, run_config) 447 ensemble_selecter.ensemble_selection() 448 except Exception as e: 449 logging.error(f"An error occured during Ensemble Selection: {e}") 450 wandb_exit_code = 1 451 452 finally: 453 wandb_close(wandb_exit_code) 454 455 return True
Runs ensemble selection workflow on given dataset using provided configuration.
458def workflow_class_mapping( 459 dataset, 460 dataset_info, 461 run_config, 462 wandb_activate=True, 463 test_dataset_source=None, 464 test_dataset_target=None, 465): 466 """Runs class mapping workflow to align labels between the source dataset and target dataset.""" 467 try: 468 wandb_exit_code = 0 469 # Initialize a wandb run for class mapping 470 wandb_run = wandb_init( 471 run_name="class_mapping", 472 project_name="Class Mapping", 473 dataset_name=dataset_info["name"], 474 config=run_config, 475 wandb_activate=wandb_activate, 476 ) 477 class_mapping_models = run_config["hf_models_zeroshot_classification"] 478 479 for model_name in ( 480 pbar := tqdm(class_mapping_models, desc="Processing Class Mapping") 481 ): 482 pbar.set_description(f"Zero Shot Classification model {model_name}") 483 mapper = ClassMapper(dataset, model_name, run_config) 484 try: 485 stats = mapper.run_mapping(test_dataset_source, test_dataset_target) 486 # Display statistics only if mapping was successful 487 source_class_counts = stats["total_processed"] 488 logging.info("\nClassification Results for Source Dataset:") 489 for source_class, count in stats["source_class_counts"].items(): 490 percentage = ( 491 (count / source_class_counts) * 100 492 if source_class_counts > 0 493 else 0 494 ) 495 logging.info( 496 f"{source_class}: {count} samples processed ({percentage:.1f}%)" 497 ) 498 499 # Display statistics for tags added to Target Dataset 500 logging.info("\nTag Addition Results (Target Dataset Tags):") 501 logging.info(f"Total new tags added: {stats['changes_made']}") 502 for target_class, tag_count in stats["tags_added_per_category"].items(): 503 logging.info(f"{target_class} tags added: {tag_count}") 504 505 except Exception as e: 506 logging.error(f"Error during mapping with model {model_name}: {e}") 507 508 except Exception as e: 509 logging.error(f"Error in class_mapping workflow: {e}") 510 wandb_exit_code = 1 511 finally: 512 wandb_close(wandb_exit_code) 513 514 return True
Runs class mapping workflow to align labels between the source dataset and target dataset.
517def cleanup_memory(do_extensive_cleanup=False): 518 """Clean up memory after workflow execution. 'do_extensive_cleanup' recommended for multiple training sessions in a row.""" 519 logging.info("Starting memory cleanup") 520 # Clear CUDA cache 521 if torch.cuda.is_available(): 522 torch.cuda.empty_cache() 523 524 # Force garbage collection 525 gc.collect() 526 527 if do_extensive_cleanup: 528 529 # Clear any leftover tensors 530 n_deleted_torch_objects = 0 531 for obj in tqdm( 532 gc.get_objects(), desc="Deleting objects from Python Garbage Collector" 533 ): 534 try: 535 if torch.is_tensor(obj): 536 del obj 537 n_deleted_torch_objects += 1 538 except: 539 pass 540 541 logging.info(f"Deleted {n_deleted_torch_objects} torch objects") 542 543 # Final garbage collection 544 gc.collect()
Clean up memory after workflow execution. 'do_extensive_cleanup' recommended for multiple training sessions in a row.
547class WorkflowExecutor: 548 """Orchestrates the execution of multiple data processing workflows in sequence.""" 549 550 def __init__( 551 self, 552 workflows: List[str], 553 selected_dataset: str, 554 dataset: fo.Dataset, 555 dataset_info: Dict, 556 ): 557 """Initializes with specified workflows, dataset selection, and dataset metadata.""" 558 self.workflows = workflows 559 self.selected_dataset = selected_dataset 560 self.dataset = dataset 561 self.dataset_info = dataset_info 562 563 def execute(self) -> bool: 564 """Execute all configured workflows in sequence and handle errors.""" 565 if len(self.workflows) == 0: 566 logging.error("No workflows selected.") 567 return False 568 569 logging.info(f"Selected workflows: {self.workflows}") 570 for workflow in self.workflows: 571 logging.info( 572 f"Running workflow {workflow} for dataset {self.selected_dataset}" 573 ) 574 try: 575 if workflow == "aws_download": 576 parameter_group = "mcity" 577 parameters = WORKFLOWS["aws_download"].get(parameter_group, None) 578 if parameter_group == "mcity": 579 dataset, dataset_name, _ = workflow_aws_download(parameters) 580 else: 581 logging.error( 582 f"The parameter group {parameter_group} is not supported. As AWS are highly specific, please provide a separate set of parameters and a workflow." 583 ) 584 585 # Select downloaded dataset for further workflows if configured 586 if dataset is not None: 587 if parameters["selected_dataset_overwrite"] == True: 588 589 dataset_info = { 590 "name": dataset_name, 591 "v51_type": "FiftyOneDataset", 592 "splits": [], 593 } 594 595 self.dataset = dataset 596 self.dataset_info = dataset_info 597 logging.warning( 598 f"Overwritting selected dataset {self.selected_dataset} with {dataset_name}" 599 ) 600 self.selected_dataset = dataset_name 601 602 elif workflow == "embedding_selection": 603 embedding_models = WORKFLOWS["embedding_selection"][ 604 "embedding_models" 605 ] 606 607 for MODEL_NAME in ( 608 pbar := tqdm(embedding_models, desc="Selection by Embeddings") 609 ): 610 611 # Status 612 pbar.set_description( 613 f"Selection by embeddings with model {MODEL_NAME}." 614 ) 615 616 # Config 617 mode = WORKFLOWS["embedding_selection"]["mode"] 618 parameters = WORKFLOWS["embedding_selection"]["parameters"] 619 config = {"mode": mode, "parameters": parameters} 620 621 # Workflow 622 workflow_embedding_selection( 623 self.dataset, 624 self.dataset_info, 625 MODEL_NAME, 626 config, 627 ) 628 629 elif workflow == "anomaly_detection": 630 631 # Config 632 ano_dec_config = WORKFLOWS["anomaly_detection"] 633 anomalib_image_models = ano_dec_config["anomalib_image_models"] 634 eval_metrics = ano_dec_config["anomalib_eval_metrics"] 635 636 dataset_ano_dec = None 637 data_root = None 638 if "train" in ano_dec_config["mode"]: 639 try: 640 data_preparer = AnomalyDetectionDataPreparation( 641 self.dataset, self.selected_dataset 642 ) 643 dataset_ano_dec = data_preparer.dataset_ano_dec 644 data_root = data_preparer.export_root 645 except Exception as e: 646 logging.error( 647 f"Error during data preparation for Anomaly Detection: {e}" 648 ) 649 650 for MODEL_NAME in ( 651 pbar := tqdm(anomalib_image_models, desc="Anomalib") 652 ): 653 # Status 654 pbar.set_description(f"Anomalib model {MODEL_NAME}") 655 656 # Config 657 run_config = { 658 "model_name": MODEL_NAME, 659 "image_size": anomalib_image_models[MODEL_NAME].get( 660 "image_size", None 661 ), 662 "batch_size": anomalib_image_models[MODEL_NAME].get( 663 "batch_size", 1 664 ), 665 "epochs": ano_dec_config["epochs"], 666 "early_stop_patience": ano_dec_config[ 667 "early_stop_patience" 668 ], 669 "data_root": data_root, 670 "mode": ano_dec_config["mode"], 671 } 672 673 # Workflow 674 workflow_anomaly_detection( 675 self.dataset, 676 dataset_ano_dec, 677 self.dataset_info, 678 eval_metrics, 679 run_config, 680 ) 681 682 elif workflow == "auto_labeling": 683 684 # Config 685 SUPPORTED_MODEL_SOURCES = [ 686 "hf_models_objectdetection", 687 "ultralytics", 688 "custom_codetr", 689 ] 690 691 # Common parameters between models 692 config_autolabel = WORKFLOWS["auto_labeling"] 693 mode = config_autolabel["mode"] 694 epochs = config_autolabel["epochs"] 695 selected_model_source = config_autolabel["model_source"] 696 697 # Check if all selected modes are supported 698 for model_source in selected_model_source: 699 if model_source not in SUPPORTED_MODEL_SOURCES: 700 logging.error( 701 f"Selected model source {model_source} is not supported." 702 ) 703 704 if SUPPORTED_MODEL_SOURCES[0] in selected_model_source: 705 # Hugging Face Models 706 # Single GPU mode (https://github.com/huggingface/transformers/issues/28740) 707 os.environ["CUDA_VISIBLE_DEVICES"] = "0" 708 hf_models = config_autolabel["hf_models_objectdetection"] 709 710 # Dataset Conversion 711 try: 712 logging.info("Converting dataset into Hugging Face format.") 713 pytorch_dataset = FiftyOneTorchDatasetCOCO(self.dataset) 714 pt_to_hf_converter = TorchToHFDatasetCOCO(pytorch_dataset) 715 hf_dataset = pt_to_hf_converter.convert() 716 except Exception as e: 717 logging.error(f"Error during dataset conversion: {e}") 718 719 for MODEL_NAME in ( 720 pbar := tqdm(hf_models, desc="Auto Labeling Models") 721 ): 722 # Status Update 723 pbar.set_description( 724 f"Processing Hugging Face model {MODEL_NAME}" 725 ) 726 727 # Config 728 config_model = config_autolabel[ 729 "hf_models_objectdetection" 730 ][MODEL_NAME] 731 732 run_config = { 733 "mode": mode, 734 "model_name": MODEL_NAME, 735 "v51_dataset_name": self.selected_dataset, 736 "epochs": epochs, 737 "early_stop_threshold": config_autolabel[ 738 "early_stop_threshold" 739 ], 740 "early_stop_patience": config_autolabel[ 741 "early_stop_patience" 742 ], 743 "learning_rate": config_autolabel["learning_rate"], 744 "weight_decay": config_autolabel["weight_decay"], 745 "max_grad_norm": config_autolabel["max_grad_norm"], 746 "batch_size": config_model.get("batch_size", 1), 747 "image_size": config_model.get("image_size", None), 748 "n_worker_dataloader": config_autolabel[ 749 "n_worker_dataloader" 750 ], 751 "inference_settings": config_autolabel[ 752 "inference_settings" 753 ], 754 } 755 756 # Workflow 757 workflow_auto_labeling_hf( 758 self.dataset, 759 hf_dataset, 760 run_config, 761 ) 762 763 if SUPPORTED_MODEL_SOURCES[1] in selected_model_source: 764 # Ultralytics Models 765 config_ultralytics = config_autolabel["ultralytics"] 766 models_ultralytics = config_ultralytics["models"] 767 export_dataset_root = config_ultralytics["export_dataset_root"] 768 769 # Export data into necessary format 770 if "train" in mode: 771 try: 772 UltralyticsObjectDetection.export_data( 773 self.dataset, 774 self.dataset_info, 775 export_dataset_root, 776 ) 777 except Exception as e: 778 logging.error( 779 f"Error during Ultralytics dataset export: {e}" 780 ) 781 782 for model_name in ( 783 pbar := tqdm( 784 models_ultralytics, desc="Ultralytics training" 785 ) 786 ): 787 pbar.set_description(f"Ultralytics model {model_name}") 788 run_config = { 789 "mode": mode, 790 "model_name": model_name, 791 "v51_dataset_name": self.dataset_info["name"], 792 "epochs": epochs, 793 "patience": config_autolabel["early_stop_patience"], 794 "batch_size": models_ultralytics[model_name][ 795 "batch_size" 796 ], 797 "img_size": models_ultralytics[model_name]["img_size"], 798 "export_dataset_root": export_dataset_root, 799 "inference_settings": config_autolabel[ 800 "inference_settings" 801 ], 802 "multi_scale": config_ultralytics["multi_scale"], 803 "cos_lr": config_ultralytics["cos_lr"], 804 } 805 806 workflow_auto_labeling_ultralytics(self.dataset, run_config) 807 808 if SUPPORTED_MODEL_SOURCES[2] in selected_model_source: 809 # Custom Co-DETR 810 config_codetr = config_autolabel["custom_codetr"] 811 run_config = { 812 "export_dataset_root": config_codetr["export_dataset_root"], 813 "container_tool": config_codetr["container_tool"], 814 "n_gpus": config_codetr["n_gpus"], 815 "mode": config_autolabel["mode"], 816 "epochs": config_autolabel["epochs"], 817 "inference_settings": config_autolabel[ 818 "inference_settings" 819 ], 820 "config": None, 821 } 822 codetr_configs = config_codetr["configs"] 823 824 for config in ( 825 pbar := tqdm( 826 codetr_configs, desc="Processing Co-DETR configurations" 827 ) 828 ): 829 pbar.set_description(f"Co-DETR model {config}") 830 run_config["config"] = config 831 workflow_auto_labeling_custom_codetr( 832 self.dataset, self.dataset_info, run_config 833 ) 834 835 elif workflow == "auto_labeling_zero_shot": 836 config = WORKFLOWS["auto_labeling_zero_shot"] 837 workflow_zero_shot_object_detection( 838 self.dataset, self.dataset_info, config 839 ) 840 841 elif workflow == "ensemble_selection": 842 # Config 843 run_config = WORKFLOWS["ensemble_selection"] 844 845 # Workflow 846 workflow_ensemble_selection( 847 self.dataset, self.dataset_info, run_config 848 ) 849 850 elif workflow == "auto_label_mask": 851 config = WORKFLOWS["auto_label_mask"] 852 workflow_auto_label_mask(self.dataset, self.dataset_info, config) 853 854 elif workflow == "class_mapping": 855 # Config 856 run_config = WORKFLOWS["class_mapping"] 857 858 # Workflow 859 workflow_class_mapping( 860 self.dataset, 861 self.dataset_info, 862 run_config, 863 test_dataset_source=None, 864 test_dataset_target=None, 865 ) 866 867 else: 868 logging.error( 869 f"Workflow {workflow} not found. Check available workflows in config.py." 870 ) 871 return False 872 873 cleanup_memory() # Clean after each workflow 874 logging.info(f"Completed workflow {workflow} and cleaned up memory") 875 876 except Exception as e: 877 logging.error(f"Workflow {workflow}: An error occurred: {e}") 878 wandb_close(exit_code=1) 879 cleanup_memory() # Clean up even after failure 880 881 return True
Orchestrates the execution of multiple data processing workflows in sequence.
550 def __init__( 551 self, 552 workflows: List[str], 553 selected_dataset: str, 554 dataset: fo.Dataset, 555 dataset_info: Dict, 556 ): 557 """Initializes with specified workflows, dataset selection, and dataset metadata.""" 558 self.workflows = workflows 559 self.selected_dataset = selected_dataset 560 self.dataset = dataset 561 self.dataset_info = dataset_info
Initializes with specified workflows, dataset selection, and dataset metadata.
563 def execute(self) -> bool: 564 """Execute all configured workflows in sequence and handle errors.""" 565 if len(self.workflows) == 0: 566 logging.error("No workflows selected.") 567 return False 568 569 logging.info(f"Selected workflows: {self.workflows}") 570 for workflow in self.workflows: 571 logging.info( 572 f"Running workflow {workflow} for dataset {self.selected_dataset}" 573 ) 574 try: 575 if workflow == "aws_download": 576 parameter_group = "mcity" 577 parameters = WORKFLOWS["aws_download"].get(parameter_group, None) 578 if parameter_group == "mcity": 579 dataset, dataset_name, _ = workflow_aws_download(parameters) 580 else: 581 logging.error( 582 f"The parameter group {parameter_group} is not supported. As AWS are highly specific, please provide a separate set of parameters and a workflow." 583 ) 584 585 # Select downloaded dataset for further workflows if configured 586 if dataset is not None: 587 if parameters["selected_dataset_overwrite"] == True: 588 589 dataset_info = { 590 "name": dataset_name, 591 "v51_type": "FiftyOneDataset", 592 "splits": [], 593 } 594 595 self.dataset = dataset 596 self.dataset_info = dataset_info 597 logging.warning( 598 f"Overwritting selected dataset {self.selected_dataset} with {dataset_name}" 599 ) 600 self.selected_dataset = dataset_name 601 602 elif workflow == "embedding_selection": 603 embedding_models = WORKFLOWS["embedding_selection"][ 604 "embedding_models" 605 ] 606 607 for MODEL_NAME in ( 608 pbar := tqdm(embedding_models, desc="Selection by Embeddings") 609 ): 610 611 # Status 612 pbar.set_description( 613 f"Selection by embeddings with model {MODEL_NAME}." 614 ) 615 616 # Config 617 mode = WORKFLOWS["embedding_selection"]["mode"] 618 parameters = WORKFLOWS["embedding_selection"]["parameters"] 619 config = {"mode": mode, "parameters": parameters} 620 621 # Workflow 622 workflow_embedding_selection( 623 self.dataset, 624 self.dataset_info, 625 MODEL_NAME, 626 config, 627 ) 628 629 elif workflow == "anomaly_detection": 630 631 # Config 632 ano_dec_config = WORKFLOWS["anomaly_detection"] 633 anomalib_image_models = ano_dec_config["anomalib_image_models"] 634 eval_metrics = ano_dec_config["anomalib_eval_metrics"] 635 636 dataset_ano_dec = None 637 data_root = None 638 if "train" in ano_dec_config["mode"]: 639 try: 640 data_preparer = AnomalyDetectionDataPreparation( 641 self.dataset, self.selected_dataset 642 ) 643 dataset_ano_dec = data_preparer.dataset_ano_dec 644 data_root = data_preparer.export_root 645 except Exception as e: 646 logging.error( 647 f"Error during data preparation for Anomaly Detection: {e}" 648 ) 649 650 for MODEL_NAME in ( 651 pbar := tqdm(anomalib_image_models, desc="Anomalib") 652 ): 653 # Status 654 pbar.set_description(f"Anomalib model {MODEL_NAME}") 655 656 # Config 657 run_config = { 658 "model_name": MODEL_NAME, 659 "image_size": anomalib_image_models[MODEL_NAME].get( 660 "image_size", None 661 ), 662 "batch_size": anomalib_image_models[MODEL_NAME].get( 663 "batch_size", 1 664 ), 665 "epochs": ano_dec_config["epochs"], 666 "early_stop_patience": ano_dec_config[ 667 "early_stop_patience" 668 ], 669 "data_root": data_root, 670 "mode": ano_dec_config["mode"], 671 } 672 673 # Workflow 674 workflow_anomaly_detection( 675 self.dataset, 676 dataset_ano_dec, 677 self.dataset_info, 678 eval_metrics, 679 run_config, 680 ) 681 682 elif workflow == "auto_labeling": 683 684 # Config 685 SUPPORTED_MODEL_SOURCES = [ 686 "hf_models_objectdetection", 687 "ultralytics", 688 "custom_codetr", 689 ] 690 691 # Common parameters between models 692 config_autolabel = WORKFLOWS["auto_labeling"] 693 mode = config_autolabel["mode"] 694 epochs = config_autolabel["epochs"] 695 selected_model_source = config_autolabel["model_source"] 696 697 # Check if all selected modes are supported 698 for model_source in selected_model_source: 699 if model_source not in SUPPORTED_MODEL_SOURCES: 700 logging.error( 701 f"Selected model source {model_source} is not supported." 702 ) 703 704 if SUPPORTED_MODEL_SOURCES[0] in selected_model_source: 705 # Hugging Face Models 706 # Single GPU mode (https://github.com/huggingface/transformers/issues/28740) 707 os.environ["CUDA_VISIBLE_DEVICES"] = "0" 708 hf_models = config_autolabel["hf_models_objectdetection"] 709 710 # Dataset Conversion 711 try: 712 logging.info("Converting dataset into Hugging Face format.") 713 pytorch_dataset = FiftyOneTorchDatasetCOCO(self.dataset) 714 pt_to_hf_converter = TorchToHFDatasetCOCO(pytorch_dataset) 715 hf_dataset = pt_to_hf_converter.convert() 716 except Exception as e: 717 logging.error(f"Error during dataset conversion: {e}") 718 719 for MODEL_NAME in ( 720 pbar := tqdm(hf_models, desc="Auto Labeling Models") 721 ): 722 # Status Update 723 pbar.set_description( 724 f"Processing Hugging Face model {MODEL_NAME}" 725 ) 726 727 # Config 728 config_model = config_autolabel[ 729 "hf_models_objectdetection" 730 ][MODEL_NAME] 731 732 run_config = { 733 "mode": mode, 734 "model_name": MODEL_NAME, 735 "v51_dataset_name": self.selected_dataset, 736 "epochs": epochs, 737 "early_stop_threshold": config_autolabel[ 738 "early_stop_threshold" 739 ], 740 "early_stop_patience": config_autolabel[ 741 "early_stop_patience" 742 ], 743 "learning_rate": config_autolabel["learning_rate"], 744 "weight_decay": config_autolabel["weight_decay"], 745 "max_grad_norm": config_autolabel["max_grad_norm"], 746 "batch_size": config_model.get("batch_size", 1), 747 "image_size": config_model.get("image_size", None), 748 "n_worker_dataloader": config_autolabel[ 749 "n_worker_dataloader" 750 ], 751 "inference_settings": config_autolabel[ 752 "inference_settings" 753 ], 754 } 755 756 # Workflow 757 workflow_auto_labeling_hf( 758 self.dataset, 759 hf_dataset, 760 run_config, 761 ) 762 763 if SUPPORTED_MODEL_SOURCES[1] in selected_model_source: 764 # Ultralytics Models 765 config_ultralytics = config_autolabel["ultralytics"] 766 models_ultralytics = config_ultralytics["models"] 767 export_dataset_root = config_ultralytics["export_dataset_root"] 768 769 # Export data into necessary format 770 if "train" in mode: 771 try: 772 UltralyticsObjectDetection.export_data( 773 self.dataset, 774 self.dataset_info, 775 export_dataset_root, 776 ) 777 except Exception as e: 778 logging.error( 779 f"Error during Ultralytics dataset export: {e}" 780 ) 781 782 for model_name in ( 783 pbar := tqdm( 784 models_ultralytics, desc="Ultralytics training" 785 ) 786 ): 787 pbar.set_description(f"Ultralytics model {model_name}") 788 run_config = { 789 "mode": mode, 790 "model_name": model_name, 791 "v51_dataset_name": self.dataset_info["name"], 792 "epochs": epochs, 793 "patience": config_autolabel["early_stop_patience"], 794 "batch_size": models_ultralytics[model_name][ 795 "batch_size" 796 ], 797 "img_size": models_ultralytics[model_name]["img_size"], 798 "export_dataset_root": export_dataset_root, 799 "inference_settings": config_autolabel[ 800 "inference_settings" 801 ], 802 "multi_scale": config_ultralytics["multi_scale"], 803 "cos_lr": config_ultralytics["cos_lr"], 804 } 805 806 workflow_auto_labeling_ultralytics(self.dataset, run_config) 807 808 if SUPPORTED_MODEL_SOURCES[2] in selected_model_source: 809 # Custom Co-DETR 810 config_codetr = config_autolabel["custom_codetr"] 811 run_config = { 812 "export_dataset_root": config_codetr["export_dataset_root"], 813 "container_tool": config_codetr["container_tool"], 814 "n_gpus": config_codetr["n_gpus"], 815 "mode": config_autolabel["mode"], 816 "epochs": config_autolabel["epochs"], 817 "inference_settings": config_autolabel[ 818 "inference_settings" 819 ], 820 "config": None, 821 } 822 codetr_configs = config_codetr["configs"] 823 824 for config in ( 825 pbar := tqdm( 826 codetr_configs, desc="Processing Co-DETR configurations" 827 ) 828 ): 829 pbar.set_description(f"Co-DETR model {config}") 830 run_config["config"] = config 831 workflow_auto_labeling_custom_codetr( 832 self.dataset, self.dataset_info, run_config 833 ) 834 835 elif workflow == "auto_labeling_zero_shot": 836 config = WORKFLOWS["auto_labeling_zero_shot"] 837 workflow_zero_shot_object_detection( 838 self.dataset, self.dataset_info, config 839 ) 840 841 elif workflow == "ensemble_selection": 842 # Config 843 run_config = WORKFLOWS["ensemble_selection"] 844 845 # Workflow 846 workflow_ensemble_selection( 847 self.dataset, self.dataset_info, run_config 848 ) 849 850 elif workflow == "auto_label_mask": 851 config = WORKFLOWS["auto_label_mask"] 852 workflow_auto_label_mask(self.dataset, self.dataset_info, config) 853 854 elif workflow == "class_mapping": 855 # Config 856 run_config = WORKFLOWS["class_mapping"] 857 858 # Workflow 859 workflow_class_mapping( 860 self.dataset, 861 self.dataset_info, 862 run_config, 863 test_dataset_source=None, 864 test_dataset_target=None, 865 ) 866 867 else: 868 logging.error( 869 f"Workflow {workflow} not found. Check available workflows in config.py." 870 ) 871 return False 872 873 cleanup_memory() # Clean after each workflow 874 logging.info(f"Completed workflow {workflow} and cleaned up memory") 875 876 except Exception as e: 877 logging.error(f"Workflow {workflow}: An error occurred: {e}") 878 wandb_close(exit_code=1) 879 cleanup_memory() # Clean up even after failure 880 881 return True
Execute all configured workflows in sequence and handle errors.
884def main(): 885 """Executes the data processing workflow, loads dataset, and launches Voxel51 visualization interface.""" 886 time_start = time.time() 887 configure_logging() 888 889 # Signal handler for CTRL + C 890 signal.signal(signal.SIGINT, signal_handler) 891 892 # Execute workflows 893 dataset, dataset_info = load_dataset(SELECTED_DATASET) 894 895 executor = WorkflowExecutor( 896 SELECTED_WORKFLOW, SELECTED_DATASET["name"], dataset, dataset_info 897 ) 898 executor.execute() 899 900 # Launch V51 session 901 dataset.reload() 902 dataset.save() 903 arrange_fields_in_groups(dataset) 904 logging.info(f"Launching Voxel51 session for dataset {dataset_info['name']}.") 905 906 # Dataset stats 907 logging.debug(dataset) 908 logging.debug(dataset.stats(include_media=True)) 909 910 # V51 UI launch 911 session = fo.launch_app( 912 dataset, address=V51_ADDRESS, port=V51_PORT, remote=V51_REMOTE 913 ) 914 915 time_stop = time.time() 916 logging.info(f"Elapsed time: {time_stop - time_start:.2f} seconds")
Executes the data processing workflow, loads dataset, and launches Voxel51 visualization interface.