main
1import datetime 2import logging 3import os 4import signal 5import sys 6import time 7import warnings 8from typing import Dict, List 9 10import torch 11 12IGNORE_FUTURE_WARNINGS = True 13if IGNORE_FUTURE_WARNINGS: 14 warnings.simplefilter("ignore", category=FutureWarning) 15 16import gc 17 18import fiftyone as fo 19import torch.multiprocessing as mp 20from tqdm import tqdm 21 22from config.config import ( 23 SELECTED_DATASET, 24 SELECTED_WORKFLOW, 25 V51_ADDRESS, 26 V51_PORT, 27 V51_REMOTE, 28 WORKFLOWS, 29) 30from utils.anomaly_detection_data_preparation import AnomalyDetectionDataPreparation 31from utils.data_loader import FiftyOneTorchDatasetCOCO, TorchToHFDatasetCOCO 32from utils.dataset_loader import load_dataset 33from utils.logging import configure_logging 34from utils.mp_distribution import ZeroShotDistributer 35from utils.sidebar_groups import arrange_fields_in_groups 36from utils.wandb_helper import wandb_close, wandb_init 37from workflows.anomaly_detection import Anodec 38from workflows.auto_labeling import ( 39 CustomCoDETRObjectDetection, 40 CustomRFDETRObjectDetection, 41 HuggingFaceObjectDetection, 42 UltralyticsObjectDetection, 43 ZeroShotObjectDetection, 44) 45from workflows.aws_download import AwsDownloader 46from workflows.class_mapping import ClassMapper 47from workflows.embedding_selection import EmbeddingSelection 48from workflows.ensemble_selection import EnsembleSelection 49from workflows.auto_label_mask import AutoLabelMask 50from workflows.class_mapping import ClassMapper 51from workflows.data_ingest import run_data_ingest 52 53wandb_run = None # Init globally to make sure it is available 54 55def signal_handler(sig, frame): 56 """Handle Ctrl+C signal by cleaning up resources and exiting.""" 57 logging.error("You pressed Ctrl+C!") 58 try: 59 wandb_close(exit_code=1) 60 cleanup_memory() 61 except: 62 pass 63 sys.exit(0) 64 65 66def workflow_aws_download(parameters, wandb_activate=True): 67 """Download and process data from AWS S3 bucket.""" 68 dataset = None 69 dataset_name = None 70 wandb_exit_code = 0 71 files_to_be_downloaded = 0 72 try: 73 # Config 74 bucket = parameters["bucket"] 75 prefix = parameters["prefix"] 76 download_path = parameters["download_path"] 77 test_run = parameters["test_run"] 78 79 # Logging 80 now = datetime.datetime.now() 81 datetime_str = now.strftime("%Y-%m-%d_%H-%M-%S") 82 log_dir = f"logs/tensorboard/aws_{datetime_str}" 83 84 dataset_name = f"annarbor_rolling_{datetime_str}" 85 86 # Weights and Biases 87 wandb_run = wandb_init( 88 run_name=dataset_name, 89 project_name="AWS Download", 90 dataset_name=dataset_name, 91 log_dir=log_dir, 92 wandb_activate=wandb_activate, 93 ) 94 95 # Workflow 96 aws_downloader = AwsDownloader( 97 bucket=bucket, 98 prefix=prefix, 99 download_path=download_path, 100 test_run=test_run, 101 ) 102 103 ( 104 sub_folder, 105 files, 106 files_to_be_downloaded, 107 DOWNLOAD_NUMBER_SUCCESS, 108 DOWNLOAD_SIZE_SUCCESS, 109 ) = aws_downloader.download_files(log_dir=log_dir) 110 111 dataset = aws_downloader.decode_data( 112 sub_folder=sub_folder, 113 files=files, 114 log_dir=log_dir, 115 dataset_name=dataset_name, 116 ) 117 118 except Exception as e: 119 logging.error(f"AWS Download and Extraction failed: {e}") 120 wandb_exit_code = 1 121 122 finally: 123 wandb_close(wandb_exit_code) 124 125 return dataset, dataset_name, files_to_be_downloaded 126 127 128def workflow_anomaly_detection( 129 dataset_normal, 130 dataset_ano_dec, 131 dataset_info, 132 eval_metrics, 133 run_config, 134 wandb_activate=True, 135): 136 """Run anomaly detection workflow using specified models and configurations.""" 137 try: 138 # Weights and Biases 139 wandb_exit_code = 0 140 wandb_run, log_dir = wandb_init( 141 run_name=run_config["model_name"], 142 project_name="Selection by Anomaly Detection", 143 dataset_name=dataset_info["name"], 144 config=run_config, 145 wandb_activate=wandb_activate, 146 ) 147 148 # Workflow 149 150 SUPPORTED_MODES = ["train", "inference"] 151 # Check if all selected modes are supported 152 for mode in run_config["mode"]: 153 if mode not in SUPPORTED_MODES: 154 logging.error(f"Selected mode {mode} is not supported.") 155 if SUPPORTED_MODES[0] in run_config["mode"]: 156 ano_dec = Anodec( 157 dataset=dataset_ano_dec, 158 eval_metrics=eval_metrics, 159 dataset_info=dataset_info, 160 config=run_config, 161 tensorboard_output=log_dir, 162 ) 163 ano_dec.train_and_export_model() 164 ano_dec.run_inference(mode=SUPPORTED_MODES[0]) 165 ano_dec.eval_v51() 166 if SUPPORTED_MODES[1] in run_config["mode"]: 167 ano_dec = Anodec( 168 dataset=dataset_normal, 169 eval_metrics=eval_metrics, 170 dataset_info=dataset_info, 171 config=run_config, 172 tensorboard_output=log_dir, 173 ) 174 ano_dec.run_inference(mode=SUPPORTED_MODES[1]) 175 176 except Exception as e: 177 logging.error( 178 f"Error in Anomaly Detection for model {run_config['model_name']}: {e}" 179 ) 180 wandb_exit_code = 1 181 finally: 182 wandb_close(exit_code=wandb_exit_code) 183 184 return True 185 186 187def workflow_embedding_selection( 188 dataset, dataset_info, MODEL_NAME, config, wandb_activate=True 189): 190 """Compute embeddings and find representative and rare images for dataset selection.""" 191 try: 192 wandb_exit_code = 0 193 wandb_run, log_dir = wandb_init( 194 run_name=MODEL_NAME, 195 project_name="Selection by Embedding", 196 dataset_name=dataset_info["name"], 197 config=config, 198 wandb_activate=wandb_activate, 199 ) 200 embedding_selector = EmbeddingSelection( 201 dataset, dataset_info, MODEL_NAME, log_dir 202 ) 203 204 if embedding_selector.model_already_used == False: 205 206 embedding_selector.compute_embeddings(config["mode"]) 207 embedding_selector.compute_similarity() 208 209 # Find representative and unique samples as center points for further selections 210 thresholds = config["parameters"] 211 embedding_selector.compute_representativeness( 212 thresholds["compute_representativeness"] 213 ) 214 embedding_selector.compute_unique_images_greedy( 215 thresholds["compute_unique_images_greedy"] 216 ) 217 embedding_selector.compute_unique_images_deterministic( 218 thresholds["compute_unique_images_deterministic"] 219 ) 220 221 # Select samples similar to the center points to enlarge the dataset 222 embedding_selector.compute_similar_images( 223 thresholds["compute_similar_images"], thresholds["neighbour_count"] 224 ) 225 else: 226 logging.warning( 227 f"Skipping model {embedding_selector.model_name_key}. It was already used for sample selection." 228 ) 229 230 except Exception as e: 231 logging.error(f"An error occurred with model {MODEL_NAME}: {e}") 232 wandb_exit_code = 1 233 finally: 234 wandb_close(wandb_exit_code) 235 236 return True 237 238 239def workflow_auto_labeling_ultralytics(dataset, run_config, wandb_activate=True): 240 """Auto-labeling workflow using Ultralytics models with optional training and inference.""" 241 try: 242 wandb_exit_code = 0 243 wandb_run = wandb_init( 244 run_name=run_config["model_name"], 245 project_name="Auto Labeling Ultralytics", 246 dataset_name=run_config["v51_dataset_name"], 247 config=run_config, 248 wandb_activate=wandb_activate, 249 ) 250 251 detector = UltralyticsObjectDetection(dataset=dataset, config=run_config) 252 253 # Check if all selected modes are supported 254 SUPPORTED_MODES = ["train", "inference"] 255 for mode in run_config["mode"]: 256 if mode not in SUPPORTED_MODES: 257 logging.error(f"Selected mode {mode} is not supported.") 258 259 if SUPPORTED_MODES[0] in run_config["mode"]: 260 logging.info(f"Training model {run_config['model_name']}") 261 detector.train() 262 if SUPPORTED_MODES[1] in run_config["mode"]: 263 logging.info(f"Running inference for model {run_config['model_name']}") 264 detector.inference() 265 266 except Exception as e: 267 logging.error(f"An error occurred with model {run_config['model_name']}: {e}") 268 wandb_exit_code = 1 269 270 finally: 271 wandb_close(wandb_exit_code) 272 273 return True 274 275 276def workflow_auto_labeling_hf(dataset, hf_dataset, run_config, wandb_activate=True): 277 """Auto-labeling using Hugging Face models on a dataset, including training and/or inference based on the provided configuration.""" 278 try: 279 wandb_exit_code = 0 280 wandb_run = wandb_init( 281 run_name=run_config["model_name"], 282 project_name="Auto Labeling Hugging Face", 283 dataset_name=run_config["v51_dataset_name"], 284 config=run_config, 285 wandb_activate=wandb_activate, 286 ) 287 288 detector = HuggingFaceObjectDetection( 289 dataset=dataset, 290 config=run_config, 291 ) 292 SUPPORTED_MODES = ["train", "inference"] 293 294 # Check if all selected modes are supported 295 for mode in run_config["mode"]: 296 if mode not in SUPPORTED_MODES: 297 logging.error(f"Selected mode {mode} is not supported.") 298 if SUPPORTED_MODES[0] in run_config["mode"]: 299 logging.info(f"Training model {run_config['model_name']}") 300 detector.train(hf_dataset) 301 if SUPPORTED_MODES[1] in run_config["mode"]: 302 logging.info(f"Running inference for model {run_config['model_name']}") 303 detector.inference(inference_settings=run_config["inference_settings"]) 304 305 except Exception as e: 306 logging.error(f"An error occurred with model {run_config['model_name']}: {e}") 307 wandb_exit_code = 1 308 309 finally: 310 wandb_close(wandb_exit_code) 311 312 return True 313 314 315def workflow_auto_labeling_custom_codetr( 316 dataset, dataset_info, run_config, wandb_activate=True 317): 318 """Auto labeling workflow using Co-DETR model supporting training and inference modes.""" 319 320 try: 321 wandb_exit_code = 0 322 wandb_run = wandb_init( 323 run_name=run_config["config"], 324 project_name="Co-DETR Auto Labeling", 325 dataset_name=dataset_info["name"], 326 config=run_config, 327 wandb_activate=wandb_activate, 328 ) 329 330 mode = run_config["mode"] 331 332 detector = CustomCoDETRObjectDetection(dataset, dataset_info, run_config) 333 detector.convert_data() 334 if "train" in mode: 335 detector.update_config_file( 336 dataset_name=dataset_info["name"], 337 config_file=run_config["config"], 338 max_epochs=run_config["epochs"], 339 ) 340 detector.train( 341 run_config["config"], run_config["n_gpus"], run_config["container_tool"] 342 ) 343 if "inference" in mode: 344 detector.run_inference( 345 dataset, 346 run_config["config"], 347 run_config["n_gpus"], 348 run_config["container_tool"], 349 run_config["inference_settings"], 350 ) 351 except Exception as e: 352 logging.error(f"Error during CoDETR training: {e}") 353 wandb_exit_code = 1 354 finally: 355 wandb_close(wandb_exit_code) 356 357 return True 358 359 360def workflow_zero_shot_object_detection(dataset, dataset_info, config): 361 """Run zero-shot object detection on a dataset using models from Huggingface, supporting both single and multi-GPU inference.""" 362 # Set multiprocessing mode for CUDA multiprocessing 363 try: 364 mp.set_start_method("spawn", force=True) 365 logging.debug("Successfully set multiprocessing start method to 'spawn'") 366 except RuntimeError as e: 367 # This is expected if the start method was already set 368 logging.debug(f"Multiprocessing start method was already set: {e}") 369 except Exception as e: 370 # Handle any other unexpected errors 371 logging.error(f"Failed to set multiprocessing start method: {e}") 372 373 # Zero-shot object detector models from Huggingface 374 # Optimized for parallel multi-GPU inference, also supports single GPU 375 dataset_torch = FiftyOneTorchDatasetCOCO(dataset) 376 detector = ZeroShotObjectDetection( 377 dataset_torch=dataset_torch, dataset_info=dataset_info, config=config 378 ) 379 380 # Check if model detections are already stored in V51 dataset or on disk 381 models_splits_dict = detector.exclude_stored_predictions( 382 dataset_v51=dataset, config=config 383 ) 384 if len(models_splits_dict) > 0: 385 config["hf_models_zeroshot_objectdetection"] = models_splits_dict 386 distributor = ZeroShotDistributer( 387 config=config, 388 n_samples=len(dataset_torch), 389 dataset_info=dataset_info, 390 detector=detector, 391 ) 392 distributor.distribute_and_run() 393 else: 394 logging.info( 395 "All zero shot models already have predictions stored in the dataset." 396 ) 397 398 # To make new fields available to follow-up processes 399 dataset.reload() 400 dataset.save() 401 402 return True 403 404 405def workflow_auto_label_mask(dataset, dataset_info, config): 406 try: 407 depth_config = config["depth_estimation"] 408 seg_config = config["semantic_segmentation"] 409 410 for architecture_name, architecture_info in depth_config.items(): 411 auto_labeler = AutoLabelMask( 412 dataset=dataset, 413 dataset_info=dataset_info, 414 model_name=architecture_name, 415 task_type="depth_estimation", 416 model_config=architecture_info, 417 ) 418 auto_labeler.run_inference() 419 420 for architecture_name, architecture_info in seg_config.items(): 421 auto_labeler = AutoLabelMask( 422 dataset=dataset, 423 dataset_info=dataset_info, 424 model_name=architecture_name, 425 task_type="semantic_segmentation", 426 model_config=architecture_info, 427 ) 428 auto_labeler.run_inference() 429 430 except Exception as e: 431 logging.error(f"Auto-labeling mask workflow failed: {e}") 432 raise 433 434 435def workflow_ensemble_selection(dataset, dataset_info, run_config, wandb_activate=True): 436 """Runs ensemble selection workflow on given dataset using provided configuration.""" 437 try: 438 wandb_exit_code = 0 439 440 wandb_run = wandb_init( 441 run_name="Selection by Ensemble", 442 project_name="Ensemble Selection", 443 dataset_name=dataset_info["name"], 444 config=run_config, 445 wandb_activate=wandb_activate, 446 ) 447 ensemble_selecter = EnsembleSelection(dataset, run_config) 448 ensemble_selecter.ensemble_selection() 449 except Exception as e: 450 logging.error(f"An error occured during Ensemble Selection: {e}") 451 wandb_exit_code = 1 452 453 finally: 454 wandb_close(wandb_exit_code) 455 456 return True 457 458 459def workflow_class_mapping( 460 dataset, 461 dataset_info, 462 run_config, 463 wandb_activate=True, 464 test_dataset_source=None, 465 test_dataset_target=None, 466): 467 """Runs class mapping workflow to align labels between the source dataset and target dataset.""" 468 try: 469 wandb_exit_code = 0 470 # Initialize a wandb run for class mapping 471 wandb_run = wandb_init( 472 run_name="class_mapping", 473 project_name="Class Mapping", 474 dataset_name=dataset_info["name"], 475 config=run_config, 476 wandb_activate=wandb_activate, 477 ) 478 class_mapping_models = run_config["hf_models_zeroshot_classification"] 479 480 for model_name in ( 481 pbar := tqdm(class_mapping_models, desc="Processing Class Mapping") 482 ): 483 pbar.set_description(f"Zero Shot Classification model {model_name}") 484 mapper = ClassMapper(dataset, model_name, run_config) 485 try: 486 stats = mapper.run_mapping(test_dataset_source, test_dataset_target) 487 # Display statistics only if mapping was successful 488 source_class_counts = stats["total_processed"] 489 logging.info("\nClassification Results for Source Dataset:") 490 for source_class, count in stats["source_class_counts"].items(): 491 percentage = ( 492 (count / source_class_counts) * 100 493 if source_class_counts > 0 494 else 0 495 ) 496 logging.info( 497 f"{source_class}: {count} samples processed ({percentage:.1f}%)" 498 ) 499 500 # Display statistics for tags added to Target Dataset 501 logging.info("\nTag Addition Results (Target Dataset Tags):") 502 logging.info(f"Total new tags added: {stats['changes_made']}") 503 for target_class, tag_count in stats["tags_added_per_category"].items(): 504 logging.info(f"{target_class} tags added: {tag_count}") 505 506 except Exception as e: 507 logging.error(f"Error during mapping with model {model_name}: {e}") 508 509 except Exception as e: 510 logging.error(f"Error in class_mapping workflow: {e}") 511 wandb_exit_code = 1 512 finally: 513 wandb_close(wandb_exit_code) 514 515 return True 516 517 518def cleanup_memory(do_extensive_cleanup=False): 519 """Clean up memory after workflow execution. 'do_extensive_cleanup' recommended for multiple training sessions in a row.""" 520 logging.info("Starting memory cleanup") 521 # Clear CUDA cache 522 if torch.cuda.is_available(): 523 torch.cuda.empty_cache() 524 525 # Force garbage collection 526 gc.collect() 527 528 if do_extensive_cleanup: 529 530 # Clear any leftover tensors 531 n_deleted_torch_objects = 0 532 for obj in tqdm( 533 gc.get_objects(), desc="Deleting objects from Python Garbage Collector" 534 ): 535 try: 536 if torch.is_tensor(obj): 537 del obj 538 n_deleted_torch_objects += 1 539 except: 540 pass 541 542 logging.info(f"Deleted {n_deleted_torch_objects} torch objects") 543 544 # Final garbage collection 545 gc.collect() 546 547 548class WorkflowExecutor: 549 """Orchestrates the execution of multiple data processing workflows in sequence.""" 550 551 def __init__( 552 self, 553 workflows: List[str], 554 selected_dataset: str, 555 dataset: fo.Dataset, 556 dataset_info: Dict, 557 ): 558 """Initializes with specified workflows, dataset selection, and dataset metadata.""" 559 self.workflows = workflows 560 self.selected_dataset = selected_dataset 561 self.dataset = dataset 562 self.dataset_info = dataset_info 563 564 def execute(self) -> bool: 565 """Execute all configured workflows in sequence and handle errors.""" 566 if len(self.workflows) == 0: 567 logging.error("No workflows selected.") 568 return False 569 570 logging.info(f"Selected workflows: {self.workflows}") 571 for workflow in self.workflows: 572 logging.info( 573 f"Running workflow {workflow} for dataset {self.selected_dataset}" 574 ) 575 try: 576 if workflow == "aws_download": 577 parameter_group = "mcity" 578 parameters = WORKFLOWS["aws_download"].get(parameter_group, None) 579 if parameter_group == "mcity": 580 dataset, dataset_name, _ = workflow_aws_download(parameters) 581 else: 582 logging.error( 583 f"The parameter group {parameter_group} is not supported. As AWS are highly specific, please provide a separate set of parameters and a workflow." 584 ) 585 586 # Select downloaded dataset for further workflows if configured 587 if dataset is not None: 588 if parameters["selected_dataset_overwrite"] == True: 589 590 dataset_info = { 591 "name": dataset_name, 592 "v51_type": "FiftyOneDataset", 593 "splits": [], 594 } 595 596 self.dataset = dataset 597 self.dataset_info = dataset_info 598 logging.warning( 599 f"Overwritting selected dataset {self.selected_dataset} with {dataset_name}" 600 ) 601 self.selected_dataset = dataset_name 602 603 elif workflow == "embedding_selection": 604 embedding_models = WORKFLOWS["embedding_selection"][ 605 "embedding_models" 606 ] 607 608 for MODEL_NAME in ( 609 pbar := tqdm(embedding_models, desc="Selection by Embeddings") 610 ): 611 612 # Status 613 pbar.set_description( 614 f"Selection by embeddings with model {MODEL_NAME}." 615 ) 616 617 # Config 618 mode = WORKFLOWS["embedding_selection"]["mode"] 619 parameters = WORKFLOWS["embedding_selection"]["parameters"] 620 config = {"mode": mode, "parameters": parameters} 621 622 # Workflow 623 workflow_embedding_selection( 624 self.dataset, 625 self.dataset_info, 626 MODEL_NAME, 627 config, 628 ) 629 630 elif workflow == "anomaly_detection": 631 632 # Config 633 ano_dec_config = WORKFLOWS["anomaly_detection"] 634 anomalib_image_models = ano_dec_config["anomalib_image_models"] 635 eval_metrics = ano_dec_config["anomalib_eval_metrics"] 636 637 dataset_ano_dec = None 638 data_root = None 639 if "train" in ano_dec_config["mode"]: 640 try: 641 data_preparer = AnomalyDetectionDataPreparation( 642 self.dataset, self.selected_dataset 643 ) 644 dataset_ano_dec = data_preparer.dataset_ano_dec 645 data_root = data_preparer.export_root 646 except Exception as e: 647 logging.error( 648 f"Error during data preparation for Anomaly Detection: {e}" 649 ) 650 651 for MODEL_NAME in ( 652 pbar := tqdm(anomalib_image_models, desc="Anomalib") 653 ): 654 # Status 655 pbar.set_description(f"Anomalib model {MODEL_NAME}") 656 657 # Config 658 run_config = { 659 "model_name": MODEL_NAME, 660 "image_size": anomalib_image_models[MODEL_NAME].get( 661 "image_size", None 662 ), 663 "batch_size": anomalib_image_models[MODEL_NAME].get( 664 "batch_size", 1 665 ), 666 "epochs": ano_dec_config["epochs"], 667 "early_stop_patience": ano_dec_config[ 668 "early_stop_patience" 669 ], 670 "data_root": data_root, 671 "mode": ano_dec_config["mode"], 672 } 673 674 # Workflow 675 workflow_anomaly_detection( 676 self.dataset, 677 dataset_ano_dec, 678 self.dataset_info, 679 eval_metrics, 680 run_config, 681 ) 682 683 elif workflow == "auto_labeling": 684 685 # Config 686 SUPPORTED_MODEL_SOURCES = [ 687 "hf_models_objectdetection", 688 "ultralytics", 689 "custom_codetr", 690 "roboflow", 691 ] 692 693 # Common parameters between models 694 config_autolabel = WORKFLOWS["auto_labeling"] 695 mode = config_autolabel["mode"] 696 epochs = config_autolabel["epochs"] 697 selected_model_source = config_autolabel["model_source"] 698 699 # Check if all selected modes are supported 700 for model_source in selected_model_source: 701 if model_source not in SUPPORTED_MODEL_SOURCES: 702 logging.error( 703 f"Selected model source {model_source} is not supported." 704 ) 705 706 if SUPPORTED_MODEL_SOURCES[0] in selected_model_source: 707 # Hugging Face Models 708 # Single GPU mode (https://github.com/huggingface/transformers/issues/28740) 709 os.environ["CUDA_VISIBLE_DEVICES"] = "0" 710 hf_models = config_autolabel["hf_models_objectdetection"] 711 712 # Dataset Conversion 713 try: 714 logging.info("Converting dataset into Hugging Face format.") 715 pytorch_dataset = FiftyOneTorchDatasetCOCO(self.dataset) 716 pt_to_hf_converter = TorchToHFDatasetCOCO(pytorch_dataset) 717 hf_dataset = pt_to_hf_converter.convert() 718 except Exception as e: 719 logging.error(f"Error during dataset conversion: {e}") 720 721 for MODEL_NAME in ( 722 pbar := tqdm(hf_models, desc="Auto Labeling Models") 723 ): 724 # Status Update 725 pbar.set_description( 726 f"Processing Hugging Face model {MODEL_NAME}" 727 ) 728 729 # Config 730 config_model = config_autolabel[ 731 "hf_models_objectdetection" 732 ][MODEL_NAME] 733 734 run_config = { 735 "mode": mode, 736 "model_name": MODEL_NAME, 737 "v51_dataset_name": self.selected_dataset, 738 "epochs": epochs, 739 "early_stop_threshold": config_autolabel[ 740 "early_stop_threshold" 741 ], 742 "early_stop_patience": config_autolabel[ 743 "early_stop_patience" 744 ], 745 "learning_rate": config_autolabel["learning_rate"], 746 "weight_decay": config_autolabel["weight_decay"], 747 "max_grad_norm": config_autolabel["max_grad_norm"], 748 "batch_size": config_model.get("batch_size", 1), 749 "image_size": config_model.get("image_size", None), 750 "n_worker_dataloader": config_autolabel[ 751 "n_worker_dataloader" 752 ], 753 "inference_settings": config_autolabel[ 754 "inference_settings" 755 ], 756 } 757 758 # Workflow 759 workflow_auto_labeling_hf( 760 self.dataset, 761 hf_dataset, 762 run_config, 763 ) 764 765 if SUPPORTED_MODEL_SOURCES[1] in selected_model_source: 766 # Ultralytics Models 767 config_ultralytics = config_autolabel["ultralytics"] 768 models_ultralytics = config_ultralytics["models"] 769 export_dataset_root = config_ultralytics["export_dataset_root"] 770 771 # Export data into necessary format 772 if "train" in mode: 773 try: 774 UltralyticsObjectDetection.export_data( 775 self.dataset, 776 self.dataset_info, 777 export_dataset_root, 778 ) 779 except Exception as e: 780 logging.error( 781 f"Error during Ultralytics dataset export: {e}" 782 ) 783 784 for model_name in ( 785 pbar := tqdm( 786 models_ultralytics, desc="Ultralytics training" 787 ) 788 ): 789 pbar.set_description(f"Ultralytics model {model_name}") 790 run_config = { 791 "mode": mode, 792 "model_name": model_name, 793 "v51_dataset_name": self.dataset_info["name"], 794 "epochs": epochs, 795 "patience": config_autolabel["early_stop_patience"], 796 "batch_size": models_ultralytics[model_name][ 797 "batch_size" 798 ], 799 "img_size": models_ultralytics[model_name]["img_size"], 800 "export_dataset_root": export_dataset_root, 801 "inference_settings": config_autolabel[ 802 "inference_settings" 803 ], 804 "multi_scale": config_ultralytics["multi_scale"], 805 "cos_lr": config_ultralytics["cos_lr"], 806 } 807 808 workflow_auto_labeling_ultralytics(self.dataset, run_config) 809 810 if SUPPORTED_MODEL_SOURCES[2] in selected_model_source: 811 # Custom Co-DETR 812 config_codetr = config_autolabel["custom_codetr"] 813 run_config = { 814 "export_dataset_root": config_codetr["export_dataset_root"], 815 "container_tool": config_codetr["container_tool"], 816 "n_gpus": config_codetr["n_gpus"], 817 "mode": config_autolabel["mode"], 818 "epochs": config_autolabel["epochs"], 819 "inference_settings": config_autolabel[ 820 "inference_settings" 821 ], 822 "config": None, 823 } 824 codetr_configs = config_codetr["configs"] 825 826 for config in ( 827 pbar := tqdm( 828 codetr_configs, desc="Processing Co-DETR configurations" 829 ) 830 ): 831 pbar.set_description(f"Co-DETR model {config}") 832 run_config["config"] = config 833 workflow_auto_labeling_custom_codetr( 834 self.dataset, self.dataset_info, run_config 835 ) 836 837 if SUPPORTED_MODEL_SOURCES[3] in selected_model_source: 838 839 config_rfdetr = config_autolabel["roboflow"] 840 841 # Shared config parameters 842 shared_config = { 843 "epochs": config_autolabel["epochs"], 844 "learning_rate": config_autolabel["learning_rate"], 845 "weight_decay": config_autolabel["weight_decay"], 846 "early_stop_patience": config_autolabel["early_stop_patience"], 847 "early_stop_threshold": config_autolabel["early_stop_threshold"], 848 } 849 850 run_config = { 851 "export_dataset_root": config_rfdetr["export_dataset_root"], 852 "mode": config_autolabel["mode"], 853 "inference_settings": config_autolabel["inference_settings"], 854 "config": None, 855 # RF-DETR specific parameters 856 "batch_size": config_rfdetr["batch_size"], 857 "grad_accum_steps": config_rfdetr["grad_accum_steps"], 858 "lr_encoder": config_rfdetr["lr_encoder"], 859 "resolution": config_rfdetr["resolution"], 860 "use_ema": config_rfdetr["use_ema"], 861 "gradient_checkpointing": config_rfdetr["gradient_checkpointing"], 862 "early_stopping_min_delta": config_rfdetr["early_stopping_min_delta"], 863 "early_stopping_use_ema": config_rfdetr["early_stopping_use_ema"], 864 } 865 866 rfdetr_configs = config_rfdetr["configs"] 867 868 for config in ( 869 pbar := tqdm(rfdetr_configs, desc="Processing RF-DETR configurations") 870 ): 871 pbar.set_description(f"RF-DETR model {config}") 872 run_config["config"] = config 873 874 try: 875 wandb_exit_code = 0 876 wandb_run = wandb_init( 877 run_name=config, 878 project_name="RF-DETR Auto Labeling", 879 dataset_name=self.dataset_info["name"], 880 config=run_config, 881 wandb_activate=True, 882 ) 883 884 detector = CustomRFDETRObjectDetection( 885 self.dataset, self.dataset_info, run_config 886 ) 887 888 # Convert data to RF-DETR format 889 detector.convert_data() 890 891 # Training 892 if "train" in mode: 893 logging.info(f"Training RF-DETR model: {config}") 894 detector.train(run_config, shared_config) 895 896 # Inference 897 if "inference" in mode: 898 logging.info(f"Running inference for RF-DETR model: {config}") 899 detector.inference( 900 inference_settings=config_autolabel["inference_settings"] 901 ) 902 903 except Exception as e: 904 logging.error(f"Error during RF-DETR workflow with {config}: {e}") 905 wandb_exit_code = 1 906 finally: 907 wandb_close(wandb_exit_code) 908 909 910 elif workflow == "auto_labeling_zero_shot": 911 config = WORKFLOWS["auto_labeling_zero_shot"] 912 workflow_zero_shot_object_detection( 913 self.dataset, self.dataset_info, config 914 ) 915 916 elif workflow == "ensemble_selection": 917 # Config 918 run_config = WORKFLOWS["ensemble_selection"] 919 920 # Workflow 921 workflow_ensemble_selection( 922 self.dataset, self.dataset_info, run_config 923 ) 924 925 elif workflow == "auto_label_mask": 926 config = WORKFLOWS["auto_label_mask"] 927 workflow_auto_label_mask(self.dataset, self.dataset_info, config) 928 929 elif workflow == "class_mapping": 930 # Config 931 run_config = WORKFLOWS["class_mapping"] 932 933 # Workflow 934 workflow_class_mapping( 935 self.dataset, 936 self.dataset_info, 937 run_config, 938 test_dataset_source=None, 939 test_dataset_target=None, 940 ) 941 elif workflow == "data_ingest": 942 dataset = run_data_ingest() 943 944 dataset_info = { 945 "name": "custom_dataset", 946 "v51_type": "FiftyOneDataset", 947 "splits": ["train", "val", "test"], 948 } 949 950 self.dataset = dataset 951 self.dataset_info = dataset_info 952 self.selected_dataset = "custom_dataset" 953 954 logging.info(f"Data ingestion completed successfully.") 955 956 957 else: 958 logging.error( 959 f"Workflow {workflow} not found. Check available workflows in config.py." 960 ) 961 return False 962 963 cleanup_memory() # Clean after each workflow 964 logging.info(f"Completed workflow {workflow} and cleaned up memory") 965 966 except Exception as e: 967 logging.error(f"Workflow {workflow}: An error occurred: {e}") 968 wandb_close(exit_code=1) 969 cleanup_memory() # Clean up even after failure 970 971 return True 972 973 974def main(): 975 """Executes the data processing workflow, loads dataset, and launches Voxel51 visualization interface.""" 976 time_start = time.time() 977 configure_logging() 978 979 # Signal handler for CTRL + C 980 signal.signal(signal.SIGINT, signal_handler) 981 982 # Execute workflows 983 if "data_ingest" in SELECTED_WORKFLOW: 984 executor = WorkflowExecutor( 985 SELECTED_WORKFLOW, 986 SELECTED_DATASET["name"], 987 dataset=None, 988 dataset_info=None, 989 ) 990 executor.execute() 991 992 # FIX: Pull back outputs after ingestion 993 dataset = executor.dataset 994 dataset_info = executor.dataset_info 995 996 else: 997 dataset, dataset_info = load_dataset(SELECTED_DATASET) 998 999 executor = WorkflowExecutor( 1000 SELECTED_WORKFLOW, 1001 SELECTED_DATASET["name"], 1002 dataset, 1003 dataset_info, 1004 ) 1005 executor.execute() 1006 1007 1008 if dataset is not None: 1009 dataset.reload() 1010 dataset.save() 1011 arrange_fields_in_groups(dataset) 1012 logging.info(f"Launching Voxel51 session for dataset {dataset_info['name']}.") 1013 1014 # Dataset stats 1015 logging.debug(dataset) 1016 logging.debug(dataset.stats(include_media=True)) 1017 1018 # V51 UI launch 1019 session = fo.launch_app( 1020 dataset, address=V51_ADDRESS, port=V51_PORT, remote=V51_REMOTE 1021 ) 1022 else: 1023 logging.info("Skipping Voxel51 session.") 1024 1025 1026 1027 time_stop = time.time() 1028 logging.info(f"Elapsed time: {time_stop - time_start:.2f} seconds") 1029 1030 1031if __name__ == "__main__": 1032 cleanup_memory() 1033 main()
56def signal_handler(sig, frame): 57 """Handle Ctrl+C signal by cleaning up resources and exiting.""" 58 logging.error("You pressed Ctrl+C!") 59 try: 60 wandb_close(exit_code=1) 61 cleanup_memory() 62 except: 63 pass 64 sys.exit(0)
Handle Ctrl+C signal by cleaning up resources and exiting.
67def workflow_aws_download(parameters, wandb_activate=True): 68 """Download and process data from AWS S3 bucket.""" 69 dataset = None 70 dataset_name = None 71 wandb_exit_code = 0 72 files_to_be_downloaded = 0 73 try: 74 # Config 75 bucket = parameters["bucket"] 76 prefix = parameters["prefix"] 77 download_path = parameters["download_path"] 78 test_run = parameters["test_run"] 79 80 # Logging 81 now = datetime.datetime.now() 82 datetime_str = now.strftime("%Y-%m-%d_%H-%M-%S") 83 log_dir = f"logs/tensorboard/aws_{datetime_str}" 84 85 dataset_name = f"annarbor_rolling_{datetime_str}" 86 87 # Weights and Biases 88 wandb_run = wandb_init( 89 run_name=dataset_name, 90 project_name="AWS Download", 91 dataset_name=dataset_name, 92 log_dir=log_dir, 93 wandb_activate=wandb_activate, 94 ) 95 96 # Workflow 97 aws_downloader = AwsDownloader( 98 bucket=bucket, 99 prefix=prefix, 100 download_path=download_path, 101 test_run=test_run, 102 ) 103 104 ( 105 sub_folder, 106 files, 107 files_to_be_downloaded, 108 DOWNLOAD_NUMBER_SUCCESS, 109 DOWNLOAD_SIZE_SUCCESS, 110 ) = aws_downloader.download_files(log_dir=log_dir) 111 112 dataset = aws_downloader.decode_data( 113 sub_folder=sub_folder, 114 files=files, 115 log_dir=log_dir, 116 dataset_name=dataset_name, 117 ) 118 119 except Exception as e: 120 logging.error(f"AWS Download and Extraction failed: {e}") 121 wandb_exit_code = 1 122 123 finally: 124 wandb_close(wandb_exit_code) 125 126 return dataset, dataset_name, files_to_be_downloaded
Download and process data from AWS S3 bucket.
129def workflow_anomaly_detection( 130 dataset_normal, 131 dataset_ano_dec, 132 dataset_info, 133 eval_metrics, 134 run_config, 135 wandb_activate=True, 136): 137 """Run anomaly detection workflow using specified models and configurations.""" 138 try: 139 # Weights and Biases 140 wandb_exit_code = 0 141 wandb_run, log_dir = wandb_init( 142 run_name=run_config["model_name"], 143 project_name="Selection by Anomaly Detection", 144 dataset_name=dataset_info["name"], 145 config=run_config, 146 wandb_activate=wandb_activate, 147 ) 148 149 # Workflow 150 151 SUPPORTED_MODES = ["train", "inference"] 152 # Check if all selected modes are supported 153 for mode in run_config["mode"]: 154 if mode not in SUPPORTED_MODES: 155 logging.error(f"Selected mode {mode} is not supported.") 156 if SUPPORTED_MODES[0] in run_config["mode"]: 157 ano_dec = Anodec( 158 dataset=dataset_ano_dec, 159 eval_metrics=eval_metrics, 160 dataset_info=dataset_info, 161 config=run_config, 162 tensorboard_output=log_dir, 163 ) 164 ano_dec.train_and_export_model() 165 ano_dec.run_inference(mode=SUPPORTED_MODES[0]) 166 ano_dec.eval_v51() 167 if SUPPORTED_MODES[1] in run_config["mode"]: 168 ano_dec = Anodec( 169 dataset=dataset_normal, 170 eval_metrics=eval_metrics, 171 dataset_info=dataset_info, 172 config=run_config, 173 tensorboard_output=log_dir, 174 ) 175 ano_dec.run_inference(mode=SUPPORTED_MODES[1]) 176 177 except Exception as e: 178 logging.error( 179 f"Error in Anomaly Detection for model {run_config['model_name']}: {e}" 180 ) 181 wandb_exit_code = 1 182 finally: 183 wandb_close(exit_code=wandb_exit_code) 184 185 return True
Run anomaly detection workflow using specified models and configurations.
188def workflow_embedding_selection( 189 dataset, dataset_info, MODEL_NAME, config, wandb_activate=True 190): 191 """Compute embeddings and find representative and rare images for dataset selection.""" 192 try: 193 wandb_exit_code = 0 194 wandb_run, log_dir = wandb_init( 195 run_name=MODEL_NAME, 196 project_name="Selection by Embedding", 197 dataset_name=dataset_info["name"], 198 config=config, 199 wandb_activate=wandb_activate, 200 ) 201 embedding_selector = EmbeddingSelection( 202 dataset, dataset_info, MODEL_NAME, log_dir 203 ) 204 205 if embedding_selector.model_already_used == False: 206 207 embedding_selector.compute_embeddings(config["mode"]) 208 embedding_selector.compute_similarity() 209 210 # Find representative and unique samples as center points for further selections 211 thresholds = config["parameters"] 212 embedding_selector.compute_representativeness( 213 thresholds["compute_representativeness"] 214 ) 215 embedding_selector.compute_unique_images_greedy( 216 thresholds["compute_unique_images_greedy"] 217 ) 218 embedding_selector.compute_unique_images_deterministic( 219 thresholds["compute_unique_images_deterministic"] 220 ) 221 222 # Select samples similar to the center points to enlarge the dataset 223 embedding_selector.compute_similar_images( 224 thresholds["compute_similar_images"], thresholds["neighbour_count"] 225 ) 226 else: 227 logging.warning( 228 f"Skipping model {embedding_selector.model_name_key}. It was already used for sample selection." 229 ) 230 231 except Exception as e: 232 logging.error(f"An error occurred with model {MODEL_NAME}: {e}") 233 wandb_exit_code = 1 234 finally: 235 wandb_close(wandb_exit_code) 236 237 return True
Compute embeddings and find representative and rare images for dataset selection.
240def workflow_auto_labeling_ultralytics(dataset, run_config, wandb_activate=True): 241 """Auto-labeling workflow using Ultralytics models with optional training and inference.""" 242 try: 243 wandb_exit_code = 0 244 wandb_run = wandb_init( 245 run_name=run_config["model_name"], 246 project_name="Auto Labeling Ultralytics", 247 dataset_name=run_config["v51_dataset_name"], 248 config=run_config, 249 wandb_activate=wandb_activate, 250 ) 251 252 detector = UltralyticsObjectDetection(dataset=dataset, config=run_config) 253 254 # Check if all selected modes are supported 255 SUPPORTED_MODES = ["train", "inference"] 256 for mode in run_config["mode"]: 257 if mode not in SUPPORTED_MODES: 258 logging.error(f"Selected mode {mode} is not supported.") 259 260 if SUPPORTED_MODES[0] in run_config["mode"]: 261 logging.info(f"Training model {run_config['model_name']}") 262 detector.train() 263 if SUPPORTED_MODES[1] in run_config["mode"]: 264 logging.info(f"Running inference for model {run_config['model_name']}") 265 detector.inference() 266 267 except Exception as e: 268 logging.error(f"An error occurred with model {run_config['model_name']}: {e}") 269 wandb_exit_code = 1 270 271 finally: 272 wandb_close(wandb_exit_code) 273 274 return True
Auto-labeling workflow using Ultralytics models with optional training and inference.
277def workflow_auto_labeling_hf(dataset, hf_dataset, run_config, wandb_activate=True): 278 """Auto-labeling using Hugging Face models on a dataset, including training and/or inference based on the provided configuration.""" 279 try: 280 wandb_exit_code = 0 281 wandb_run = wandb_init( 282 run_name=run_config["model_name"], 283 project_name="Auto Labeling Hugging Face", 284 dataset_name=run_config["v51_dataset_name"], 285 config=run_config, 286 wandb_activate=wandb_activate, 287 ) 288 289 detector = HuggingFaceObjectDetection( 290 dataset=dataset, 291 config=run_config, 292 ) 293 SUPPORTED_MODES = ["train", "inference"] 294 295 # Check if all selected modes are supported 296 for mode in run_config["mode"]: 297 if mode not in SUPPORTED_MODES: 298 logging.error(f"Selected mode {mode} is not supported.") 299 if SUPPORTED_MODES[0] in run_config["mode"]: 300 logging.info(f"Training model {run_config['model_name']}") 301 detector.train(hf_dataset) 302 if SUPPORTED_MODES[1] in run_config["mode"]: 303 logging.info(f"Running inference for model {run_config['model_name']}") 304 detector.inference(inference_settings=run_config["inference_settings"]) 305 306 except Exception as e: 307 logging.error(f"An error occurred with model {run_config['model_name']}: {e}") 308 wandb_exit_code = 1 309 310 finally: 311 wandb_close(wandb_exit_code) 312 313 return True
Auto-labeling using Hugging Face models on a dataset, including training and/or inference based on the provided configuration.
316def workflow_auto_labeling_custom_codetr( 317 dataset, dataset_info, run_config, wandb_activate=True 318): 319 """Auto labeling workflow using Co-DETR model supporting training and inference modes.""" 320 321 try: 322 wandb_exit_code = 0 323 wandb_run = wandb_init( 324 run_name=run_config["config"], 325 project_name="Co-DETR Auto Labeling", 326 dataset_name=dataset_info["name"], 327 config=run_config, 328 wandb_activate=wandb_activate, 329 ) 330 331 mode = run_config["mode"] 332 333 detector = CustomCoDETRObjectDetection(dataset, dataset_info, run_config) 334 detector.convert_data() 335 if "train" in mode: 336 detector.update_config_file( 337 dataset_name=dataset_info["name"], 338 config_file=run_config["config"], 339 max_epochs=run_config["epochs"], 340 ) 341 detector.train( 342 run_config["config"], run_config["n_gpus"], run_config["container_tool"] 343 ) 344 if "inference" in mode: 345 detector.run_inference( 346 dataset, 347 run_config["config"], 348 run_config["n_gpus"], 349 run_config["container_tool"], 350 run_config["inference_settings"], 351 ) 352 except Exception as e: 353 logging.error(f"Error during CoDETR training: {e}") 354 wandb_exit_code = 1 355 finally: 356 wandb_close(wandb_exit_code) 357 358 return True
Auto labeling workflow using Co-DETR model supporting training and inference modes.
361def workflow_zero_shot_object_detection(dataset, dataset_info, config): 362 """Run zero-shot object detection on a dataset using models from Huggingface, supporting both single and multi-GPU inference.""" 363 # Set multiprocessing mode for CUDA multiprocessing 364 try: 365 mp.set_start_method("spawn", force=True) 366 logging.debug("Successfully set multiprocessing start method to 'spawn'") 367 except RuntimeError as e: 368 # This is expected if the start method was already set 369 logging.debug(f"Multiprocessing start method was already set: {e}") 370 except Exception as e: 371 # Handle any other unexpected errors 372 logging.error(f"Failed to set multiprocessing start method: {e}") 373 374 # Zero-shot object detector models from Huggingface 375 # Optimized for parallel multi-GPU inference, also supports single GPU 376 dataset_torch = FiftyOneTorchDatasetCOCO(dataset) 377 detector = ZeroShotObjectDetection( 378 dataset_torch=dataset_torch, dataset_info=dataset_info, config=config 379 ) 380 381 # Check if model detections are already stored in V51 dataset or on disk 382 models_splits_dict = detector.exclude_stored_predictions( 383 dataset_v51=dataset, config=config 384 ) 385 if len(models_splits_dict) > 0: 386 config["hf_models_zeroshot_objectdetection"] = models_splits_dict 387 distributor = ZeroShotDistributer( 388 config=config, 389 n_samples=len(dataset_torch), 390 dataset_info=dataset_info, 391 detector=detector, 392 ) 393 distributor.distribute_and_run() 394 else: 395 logging.info( 396 "All zero shot models already have predictions stored in the dataset." 397 ) 398 399 # To make new fields available to follow-up processes 400 dataset.reload() 401 dataset.save() 402 403 return True
Run zero-shot object detection on a dataset using models from Huggingface, supporting both single and multi-GPU inference.
406def workflow_auto_label_mask(dataset, dataset_info, config): 407 try: 408 depth_config = config["depth_estimation"] 409 seg_config = config["semantic_segmentation"] 410 411 for architecture_name, architecture_info in depth_config.items(): 412 auto_labeler = AutoLabelMask( 413 dataset=dataset, 414 dataset_info=dataset_info, 415 model_name=architecture_name, 416 task_type="depth_estimation", 417 model_config=architecture_info, 418 ) 419 auto_labeler.run_inference() 420 421 for architecture_name, architecture_info in seg_config.items(): 422 auto_labeler = AutoLabelMask( 423 dataset=dataset, 424 dataset_info=dataset_info, 425 model_name=architecture_name, 426 task_type="semantic_segmentation", 427 model_config=architecture_info, 428 ) 429 auto_labeler.run_inference() 430 431 except Exception as e: 432 logging.error(f"Auto-labeling mask workflow failed: {e}") 433 raise
436def workflow_ensemble_selection(dataset, dataset_info, run_config, wandb_activate=True): 437 """Runs ensemble selection workflow on given dataset using provided configuration.""" 438 try: 439 wandb_exit_code = 0 440 441 wandb_run = wandb_init( 442 run_name="Selection by Ensemble", 443 project_name="Ensemble Selection", 444 dataset_name=dataset_info["name"], 445 config=run_config, 446 wandb_activate=wandb_activate, 447 ) 448 ensemble_selecter = EnsembleSelection(dataset, run_config) 449 ensemble_selecter.ensemble_selection() 450 except Exception as e: 451 logging.error(f"An error occured during Ensemble Selection: {e}") 452 wandb_exit_code = 1 453 454 finally: 455 wandb_close(wandb_exit_code) 456 457 return True
Runs ensemble selection workflow on given dataset using provided configuration.
460def workflow_class_mapping( 461 dataset, 462 dataset_info, 463 run_config, 464 wandb_activate=True, 465 test_dataset_source=None, 466 test_dataset_target=None, 467): 468 """Runs class mapping workflow to align labels between the source dataset and target dataset.""" 469 try: 470 wandb_exit_code = 0 471 # Initialize a wandb run for class mapping 472 wandb_run = wandb_init( 473 run_name="class_mapping", 474 project_name="Class Mapping", 475 dataset_name=dataset_info["name"], 476 config=run_config, 477 wandb_activate=wandb_activate, 478 ) 479 class_mapping_models = run_config["hf_models_zeroshot_classification"] 480 481 for model_name in ( 482 pbar := tqdm(class_mapping_models, desc="Processing Class Mapping") 483 ): 484 pbar.set_description(f"Zero Shot Classification model {model_name}") 485 mapper = ClassMapper(dataset, model_name, run_config) 486 try: 487 stats = mapper.run_mapping(test_dataset_source, test_dataset_target) 488 # Display statistics only if mapping was successful 489 source_class_counts = stats["total_processed"] 490 logging.info("\nClassification Results for Source Dataset:") 491 for source_class, count in stats["source_class_counts"].items(): 492 percentage = ( 493 (count / source_class_counts) * 100 494 if source_class_counts > 0 495 else 0 496 ) 497 logging.info( 498 f"{source_class}: {count} samples processed ({percentage:.1f}%)" 499 ) 500 501 # Display statistics for tags added to Target Dataset 502 logging.info("\nTag Addition Results (Target Dataset Tags):") 503 logging.info(f"Total new tags added: {stats['changes_made']}") 504 for target_class, tag_count in stats["tags_added_per_category"].items(): 505 logging.info(f"{target_class} tags added: {tag_count}") 506 507 except Exception as e: 508 logging.error(f"Error during mapping with model {model_name}: {e}") 509 510 except Exception as e: 511 logging.error(f"Error in class_mapping workflow: {e}") 512 wandb_exit_code = 1 513 finally: 514 wandb_close(wandb_exit_code) 515 516 return True
Runs class mapping workflow to align labels between the source dataset and target dataset.
519def cleanup_memory(do_extensive_cleanup=False): 520 """Clean up memory after workflow execution. 'do_extensive_cleanup' recommended for multiple training sessions in a row.""" 521 logging.info("Starting memory cleanup") 522 # Clear CUDA cache 523 if torch.cuda.is_available(): 524 torch.cuda.empty_cache() 525 526 # Force garbage collection 527 gc.collect() 528 529 if do_extensive_cleanup: 530 531 # Clear any leftover tensors 532 n_deleted_torch_objects = 0 533 for obj in tqdm( 534 gc.get_objects(), desc="Deleting objects from Python Garbage Collector" 535 ): 536 try: 537 if torch.is_tensor(obj): 538 del obj 539 n_deleted_torch_objects += 1 540 except: 541 pass 542 543 logging.info(f"Deleted {n_deleted_torch_objects} torch objects") 544 545 # Final garbage collection 546 gc.collect()
Clean up memory after workflow execution. 'do_extensive_cleanup' recommended for multiple training sessions in a row.
549class WorkflowExecutor: 550 """Orchestrates the execution of multiple data processing workflows in sequence.""" 551 552 def __init__( 553 self, 554 workflows: List[str], 555 selected_dataset: str, 556 dataset: fo.Dataset, 557 dataset_info: Dict, 558 ): 559 """Initializes with specified workflows, dataset selection, and dataset metadata.""" 560 self.workflows = workflows 561 self.selected_dataset = selected_dataset 562 self.dataset = dataset 563 self.dataset_info = dataset_info 564 565 def execute(self) -> bool: 566 """Execute all configured workflows in sequence and handle errors.""" 567 if len(self.workflows) == 0: 568 logging.error("No workflows selected.") 569 return False 570 571 logging.info(f"Selected workflows: {self.workflows}") 572 for workflow in self.workflows: 573 logging.info( 574 f"Running workflow {workflow} for dataset {self.selected_dataset}" 575 ) 576 try: 577 if workflow == "aws_download": 578 parameter_group = "mcity" 579 parameters = WORKFLOWS["aws_download"].get(parameter_group, None) 580 if parameter_group == "mcity": 581 dataset, dataset_name, _ = workflow_aws_download(parameters) 582 else: 583 logging.error( 584 f"The parameter group {parameter_group} is not supported. As AWS are highly specific, please provide a separate set of parameters and a workflow." 585 ) 586 587 # Select downloaded dataset for further workflows if configured 588 if dataset is not None: 589 if parameters["selected_dataset_overwrite"] == True: 590 591 dataset_info = { 592 "name": dataset_name, 593 "v51_type": "FiftyOneDataset", 594 "splits": [], 595 } 596 597 self.dataset = dataset 598 self.dataset_info = dataset_info 599 logging.warning( 600 f"Overwritting selected dataset {self.selected_dataset} with {dataset_name}" 601 ) 602 self.selected_dataset = dataset_name 603 604 elif workflow == "embedding_selection": 605 embedding_models = WORKFLOWS["embedding_selection"][ 606 "embedding_models" 607 ] 608 609 for MODEL_NAME in ( 610 pbar := tqdm(embedding_models, desc="Selection by Embeddings") 611 ): 612 613 # Status 614 pbar.set_description( 615 f"Selection by embeddings with model {MODEL_NAME}." 616 ) 617 618 # Config 619 mode = WORKFLOWS["embedding_selection"]["mode"] 620 parameters = WORKFLOWS["embedding_selection"]["parameters"] 621 config = {"mode": mode, "parameters": parameters} 622 623 # Workflow 624 workflow_embedding_selection( 625 self.dataset, 626 self.dataset_info, 627 MODEL_NAME, 628 config, 629 ) 630 631 elif workflow == "anomaly_detection": 632 633 # Config 634 ano_dec_config = WORKFLOWS["anomaly_detection"] 635 anomalib_image_models = ano_dec_config["anomalib_image_models"] 636 eval_metrics = ano_dec_config["anomalib_eval_metrics"] 637 638 dataset_ano_dec = None 639 data_root = None 640 if "train" in ano_dec_config["mode"]: 641 try: 642 data_preparer = AnomalyDetectionDataPreparation( 643 self.dataset, self.selected_dataset 644 ) 645 dataset_ano_dec = data_preparer.dataset_ano_dec 646 data_root = data_preparer.export_root 647 except Exception as e: 648 logging.error( 649 f"Error during data preparation for Anomaly Detection: {e}" 650 ) 651 652 for MODEL_NAME in ( 653 pbar := tqdm(anomalib_image_models, desc="Anomalib") 654 ): 655 # Status 656 pbar.set_description(f"Anomalib model {MODEL_NAME}") 657 658 # Config 659 run_config = { 660 "model_name": MODEL_NAME, 661 "image_size": anomalib_image_models[MODEL_NAME].get( 662 "image_size", None 663 ), 664 "batch_size": anomalib_image_models[MODEL_NAME].get( 665 "batch_size", 1 666 ), 667 "epochs": ano_dec_config["epochs"], 668 "early_stop_patience": ano_dec_config[ 669 "early_stop_patience" 670 ], 671 "data_root": data_root, 672 "mode": ano_dec_config["mode"], 673 } 674 675 # Workflow 676 workflow_anomaly_detection( 677 self.dataset, 678 dataset_ano_dec, 679 self.dataset_info, 680 eval_metrics, 681 run_config, 682 ) 683 684 elif workflow == "auto_labeling": 685 686 # Config 687 SUPPORTED_MODEL_SOURCES = [ 688 "hf_models_objectdetection", 689 "ultralytics", 690 "custom_codetr", 691 "roboflow", 692 ] 693 694 # Common parameters between models 695 config_autolabel = WORKFLOWS["auto_labeling"] 696 mode = config_autolabel["mode"] 697 epochs = config_autolabel["epochs"] 698 selected_model_source = config_autolabel["model_source"] 699 700 # Check if all selected modes are supported 701 for model_source in selected_model_source: 702 if model_source not in SUPPORTED_MODEL_SOURCES: 703 logging.error( 704 f"Selected model source {model_source} is not supported." 705 ) 706 707 if SUPPORTED_MODEL_SOURCES[0] in selected_model_source: 708 # Hugging Face Models 709 # Single GPU mode (https://github.com/huggingface/transformers/issues/28740) 710 os.environ["CUDA_VISIBLE_DEVICES"] = "0" 711 hf_models = config_autolabel["hf_models_objectdetection"] 712 713 # Dataset Conversion 714 try: 715 logging.info("Converting dataset into Hugging Face format.") 716 pytorch_dataset = FiftyOneTorchDatasetCOCO(self.dataset) 717 pt_to_hf_converter = TorchToHFDatasetCOCO(pytorch_dataset) 718 hf_dataset = pt_to_hf_converter.convert() 719 except Exception as e: 720 logging.error(f"Error during dataset conversion: {e}") 721 722 for MODEL_NAME in ( 723 pbar := tqdm(hf_models, desc="Auto Labeling Models") 724 ): 725 # Status Update 726 pbar.set_description( 727 f"Processing Hugging Face model {MODEL_NAME}" 728 ) 729 730 # Config 731 config_model = config_autolabel[ 732 "hf_models_objectdetection" 733 ][MODEL_NAME] 734 735 run_config = { 736 "mode": mode, 737 "model_name": MODEL_NAME, 738 "v51_dataset_name": self.selected_dataset, 739 "epochs": epochs, 740 "early_stop_threshold": config_autolabel[ 741 "early_stop_threshold" 742 ], 743 "early_stop_patience": config_autolabel[ 744 "early_stop_patience" 745 ], 746 "learning_rate": config_autolabel["learning_rate"], 747 "weight_decay": config_autolabel["weight_decay"], 748 "max_grad_norm": config_autolabel["max_grad_norm"], 749 "batch_size": config_model.get("batch_size", 1), 750 "image_size": config_model.get("image_size", None), 751 "n_worker_dataloader": config_autolabel[ 752 "n_worker_dataloader" 753 ], 754 "inference_settings": config_autolabel[ 755 "inference_settings" 756 ], 757 } 758 759 # Workflow 760 workflow_auto_labeling_hf( 761 self.dataset, 762 hf_dataset, 763 run_config, 764 ) 765 766 if SUPPORTED_MODEL_SOURCES[1] in selected_model_source: 767 # Ultralytics Models 768 config_ultralytics = config_autolabel["ultralytics"] 769 models_ultralytics = config_ultralytics["models"] 770 export_dataset_root = config_ultralytics["export_dataset_root"] 771 772 # Export data into necessary format 773 if "train" in mode: 774 try: 775 UltralyticsObjectDetection.export_data( 776 self.dataset, 777 self.dataset_info, 778 export_dataset_root, 779 ) 780 except Exception as e: 781 logging.error( 782 f"Error during Ultralytics dataset export: {e}" 783 ) 784 785 for model_name in ( 786 pbar := tqdm( 787 models_ultralytics, desc="Ultralytics training" 788 ) 789 ): 790 pbar.set_description(f"Ultralytics model {model_name}") 791 run_config = { 792 "mode": mode, 793 "model_name": model_name, 794 "v51_dataset_name": self.dataset_info["name"], 795 "epochs": epochs, 796 "patience": config_autolabel["early_stop_patience"], 797 "batch_size": models_ultralytics[model_name][ 798 "batch_size" 799 ], 800 "img_size": models_ultralytics[model_name]["img_size"], 801 "export_dataset_root": export_dataset_root, 802 "inference_settings": config_autolabel[ 803 "inference_settings" 804 ], 805 "multi_scale": config_ultralytics["multi_scale"], 806 "cos_lr": config_ultralytics["cos_lr"], 807 } 808 809 workflow_auto_labeling_ultralytics(self.dataset, run_config) 810 811 if SUPPORTED_MODEL_SOURCES[2] in selected_model_source: 812 # Custom Co-DETR 813 config_codetr = config_autolabel["custom_codetr"] 814 run_config = { 815 "export_dataset_root": config_codetr["export_dataset_root"], 816 "container_tool": config_codetr["container_tool"], 817 "n_gpus": config_codetr["n_gpus"], 818 "mode": config_autolabel["mode"], 819 "epochs": config_autolabel["epochs"], 820 "inference_settings": config_autolabel[ 821 "inference_settings" 822 ], 823 "config": None, 824 } 825 codetr_configs = config_codetr["configs"] 826 827 for config in ( 828 pbar := tqdm( 829 codetr_configs, desc="Processing Co-DETR configurations" 830 ) 831 ): 832 pbar.set_description(f"Co-DETR model {config}") 833 run_config["config"] = config 834 workflow_auto_labeling_custom_codetr( 835 self.dataset, self.dataset_info, run_config 836 ) 837 838 if SUPPORTED_MODEL_SOURCES[3] in selected_model_source: 839 840 config_rfdetr = config_autolabel["roboflow"] 841 842 # Shared config parameters 843 shared_config = { 844 "epochs": config_autolabel["epochs"], 845 "learning_rate": config_autolabel["learning_rate"], 846 "weight_decay": config_autolabel["weight_decay"], 847 "early_stop_patience": config_autolabel["early_stop_patience"], 848 "early_stop_threshold": config_autolabel["early_stop_threshold"], 849 } 850 851 run_config = { 852 "export_dataset_root": config_rfdetr["export_dataset_root"], 853 "mode": config_autolabel["mode"], 854 "inference_settings": config_autolabel["inference_settings"], 855 "config": None, 856 # RF-DETR specific parameters 857 "batch_size": config_rfdetr["batch_size"], 858 "grad_accum_steps": config_rfdetr["grad_accum_steps"], 859 "lr_encoder": config_rfdetr["lr_encoder"], 860 "resolution": config_rfdetr["resolution"], 861 "use_ema": config_rfdetr["use_ema"], 862 "gradient_checkpointing": config_rfdetr["gradient_checkpointing"], 863 "early_stopping_min_delta": config_rfdetr["early_stopping_min_delta"], 864 "early_stopping_use_ema": config_rfdetr["early_stopping_use_ema"], 865 } 866 867 rfdetr_configs = config_rfdetr["configs"] 868 869 for config in ( 870 pbar := tqdm(rfdetr_configs, desc="Processing RF-DETR configurations") 871 ): 872 pbar.set_description(f"RF-DETR model {config}") 873 run_config["config"] = config 874 875 try: 876 wandb_exit_code = 0 877 wandb_run = wandb_init( 878 run_name=config, 879 project_name="RF-DETR Auto Labeling", 880 dataset_name=self.dataset_info["name"], 881 config=run_config, 882 wandb_activate=True, 883 ) 884 885 detector = CustomRFDETRObjectDetection( 886 self.dataset, self.dataset_info, run_config 887 ) 888 889 # Convert data to RF-DETR format 890 detector.convert_data() 891 892 # Training 893 if "train" in mode: 894 logging.info(f"Training RF-DETR model: {config}") 895 detector.train(run_config, shared_config) 896 897 # Inference 898 if "inference" in mode: 899 logging.info(f"Running inference for RF-DETR model: {config}") 900 detector.inference( 901 inference_settings=config_autolabel["inference_settings"] 902 ) 903 904 except Exception as e: 905 logging.error(f"Error during RF-DETR workflow with {config}: {e}") 906 wandb_exit_code = 1 907 finally: 908 wandb_close(wandb_exit_code) 909 910 911 elif workflow == "auto_labeling_zero_shot": 912 config = WORKFLOWS["auto_labeling_zero_shot"] 913 workflow_zero_shot_object_detection( 914 self.dataset, self.dataset_info, config 915 ) 916 917 elif workflow == "ensemble_selection": 918 # Config 919 run_config = WORKFLOWS["ensemble_selection"] 920 921 # Workflow 922 workflow_ensemble_selection( 923 self.dataset, self.dataset_info, run_config 924 ) 925 926 elif workflow == "auto_label_mask": 927 config = WORKFLOWS["auto_label_mask"] 928 workflow_auto_label_mask(self.dataset, self.dataset_info, config) 929 930 elif workflow == "class_mapping": 931 # Config 932 run_config = WORKFLOWS["class_mapping"] 933 934 # Workflow 935 workflow_class_mapping( 936 self.dataset, 937 self.dataset_info, 938 run_config, 939 test_dataset_source=None, 940 test_dataset_target=None, 941 ) 942 elif workflow == "data_ingest": 943 dataset = run_data_ingest() 944 945 dataset_info = { 946 "name": "custom_dataset", 947 "v51_type": "FiftyOneDataset", 948 "splits": ["train", "val", "test"], 949 } 950 951 self.dataset = dataset 952 self.dataset_info = dataset_info 953 self.selected_dataset = "custom_dataset" 954 955 logging.info(f"Data ingestion completed successfully.") 956 957 958 else: 959 logging.error( 960 f"Workflow {workflow} not found. Check available workflows in config.py." 961 ) 962 return False 963 964 cleanup_memory() # Clean after each workflow 965 logging.info(f"Completed workflow {workflow} and cleaned up memory") 966 967 except Exception as e: 968 logging.error(f"Workflow {workflow}: An error occurred: {e}") 969 wandb_close(exit_code=1) 970 cleanup_memory() # Clean up even after failure 971 972 return True
Orchestrates the execution of multiple data processing workflows in sequence.
552 def __init__( 553 self, 554 workflows: List[str], 555 selected_dataset: str, 556 dataset: fo.Dataset, 557 dataset_info: Dict, 558 ): 559 """Initializes with specified workflows, dataset selection, and dataset metadata.""" 560 self.workflows = workflows 561 self.selected_dataset = selected_dataset 562 self.dataset = dataset 563 self.dataset_info = dataset_info
Initializes with specified workflows, dataset selection, and dataset metadata.
565 def execute(self) -> bool: 566 """Execute all configured workflows in sequence and handle errors.""" 567 if len(self.workflows) == 0: 568 logging.error("No workflows selected.") 569 return False 570 571 logging.info(f"Selected workflows: {self.workflows}") 572 for workflow in self.workflows: 573 logging.info( 574 f"Running workflow {workflow} for dataset {self.selected_dataset}" 575 ) 576 try: 577 if workflow == "aws_download": 578 parameter_group = "mcity" 579 parameters = WORKFLOWS["aws_download"].get(parameter_group, None) 580 if parameter_group == "mcity": 581 dataset, dataset_name, _ = workflow_aws_download(parameters) 582 else: 583 logging.error( 584 f"The parameter group {parameter_group} is not supported. As AWS are highly specific, please provide a separate set of parameters and a workflow." 585 ) 586 587 # Select downloaded dataset for further workflows if configured 588 if dataset is not None: 589 if parameters["selected_dataset_overwrite"] == True: 590 591 dataset_info = { 592 "name": dataset_name, 593 "v51_type": "FiftyOneDataset", 594 "splits": [], 595 } 596 597 self.dataset = dataset 598 self.dataset_info = dataset_info 599 logging.warning( 600 f"Overwritting selected dataset {self.selected_dataset} with {dataset_name}" 601 ) 602 self.selected_dataset = dataset_name 603 604 elif workflow == "embedding_selection": 605 embedding_models = WORKFLOWS["embedding_selection"][ 606 "embedding_models" 607 ] 608 609 for MODEL_NAME in ( 610 pbar := tqdm(embedding_models, desc="Selection by Embeddings") 611 ): 612 613 # Status 614 pbar.set_description( 615 f"Selection by embeddings with model {MODEL_NAME}." 616 ) 617 618 # Config 619 mode = WORKFLOWS["embedding_selection"]["mode"] 620 parameters = WORKFLOWS["embedding_selection"]["parameters"] 621 config = {"mode": mode, "parameters": parameters} 622 623 # Workflow 624 workflow_embedding_selection( 625 self.dataset, 626 self.dataset_info, 627 MODEL_NAME, 628 config, 629 ) 630 631 elif workflow == "anomaly_detection": 632 633 # Config 634 ano_dec_config = WORKFLOWS["anomaly_detection"] 635 anomalib_image_models = ano_dec_config["anomalib_image_models"] 636 eval_metrics = ano_dec_config["anomalib_eval_metrics"] 637 638 dataset_ano_dec = None 639 data_root = None 640 if "train" in ano_dec_config["mode"]: 641 try: 642 data_preparer = AnomalyDetectionDataPreparation( 643 self.dataset, self.selected_dataset 644 ) 645 dataset_ano_dec = data_preparer.dataset_ano_dec 646 data_root = data_preparer.export_root 647 except Exception as e: 648 logging.error( 649 f"Error during data preparation for Anomaly Detection: {e}" 650 ) 651 652 for MODEL_NAME in ( 653 pbar := tqdm(anomalib_image_models, desc="Anomalib") 654 ): 655 # Status 656 pbar.set_description(f"Anomalib model {MODEL_NAME}") 657 658 # Config 659 run_config = { 660 "model_name": MODEL_NAME, 661 "image_size": anomalib_image_models[MODEL_NAME].get( 662 "image_size", None 663 ), 664 "batch_size": anomalib_image_models[MODEL_NAME].get( 665 "batch_size", 1 666 ), 667 "epochs": ano_dec_config["epochs"], 668 "early_stop_patience": ano_dec_config[ 669 "early_stop_patience" 670 ], 671 "data_root": data_root, 672 "mode": ano_dec_config["mode"], 673 } 674 675 # Workflow 676 workflow_anomaly_detection( 677 self.dataset, 678 dataset_ano_dec, 679 self.dataset_info, 680 eval_metrics, 681 run_config, 682 ) 683 684 elif workflow == "auto_labeling": 685 686 # Config 687 SUPPORTED_MODEL_SOURCES = [ 688 "hf_models_objectdetection", 689 "ultralytics", 690 "custom_codetr", 691 "roboflow", 692 ] 693 694 # Common parameters between models 695 config_autolabel = WORKFLOWS["auto_labeling"] 696 mode = config_autolabel["mode"] 697 epochs = config_autolabel["epochs"] 698 selected_model_source = config_autolabel["model_source"] 699 700 # Check if all selected modes are supported 701 for model_source in selected_model_source: 702 if model_source not in SUPPORTED_MODEL_SOURCES: 703 logging.error( 704 f"Selected model source {model_source} is not supported." 705 ) 706 707 if SUPPORTED_MODEL_SOURCES[0] in selected_model_source: 708 # Hugging Face Models 709 # Single GPU mode (https://github.com/huggingface/transformers/issues/28740) 710 os.environ["CUDA_VISIBLE_DEVICES"] = "0" 711 hf_models = config_autolabel["hf_models_objectdetection"] 712 713 # Dataset Conversion 714 try: 715 logging.info("Converting dataset into Hugging Face format.") 716 pytorch_dataset = FiftyOneTorchDatasetCOCO(self.dataset) 717 pt_to_hf_converter = TorchToHFDatasetCOCO(pytorch_dataset) 718 hf_dataset = pt_to_hf_converter.convert() 719 except Exception as e: 720 logging.error(f"Error during dataset conversion: {e}") 721 722 for MODEL_NAME in ( 723 pbar := tqdm(hf_models, desc="Auto Labeling Models") 724 ): 725 # Status Update 726 pbar.set_description( 727 f"Processing Hugging Face model {MODEL_NAME}" 728 ) 729 730 # Config 731 config_model = config_autolabel[ 732 "hf_models_objectdetection" 733 ][MODEL_NAME] 734 735 run_config = { 736 "mode": mode, 737 "model_name": MODEL_NAME, 738 "v51_dataset_name": self.selected_dataset, 739 "epochs": epochs, 740 "early_stop_threshold": config_autolabel[ 741 "early_stop_threshold" 742 ], 743 "early_stop_patience": config_autolabel[ 744 "early_stop_patience" 745 ], 746 "learning_rate": config_autolabel["learning_rate"], 747 "weight_decay": config_autolabel["weight_decay"], 748 "max_grad_norm": config_autolabel["max_grad_norm"], 749 "batch_size": config_model.get("batch_size", 1), 750 "image_size": config_model.get("image_size", None), 751 "n_worker_dataloader": config_autolabel[ 752 "n_worker_dataloader" 753 ], 754 "inference_settings": config_autolabel[ 755 "inference_settings" 756 ], 757 } 758 759 # Workflow 760 workflow_auto_labeling_hf( 761 self.dataset, 762 hf_dataset, 763 run_config, 764 ) 765 766 if SUPPORTED_MODEL_SOURCES[1] in selected_model_source: 767 # Ultralytics Models 768 config_ultralytics = config_autolabel["ultralytics"] 769 models_ultralytics = config_ultralytics["models"] 770 export_dataset_root = config_ultralytics["export_dataset_root"] 771 772 # Export data into necessary format 773 if "train" in mode: 774 try: 775 UltralyticsObjectDetection.export_data( 776 self.dataset, 777 self.dataset_info, 778 export_dataset_root, 779 ) 780 except Exception as e: 781 logging.error( 782 f"Error during Ultralytics dataset export: {e}" 783 ) 784 785 for model_name in ( 786 pbar := tqdm( 787 models_ultralytics, desc="Ultralytics training" 788 ) 789 ): 790 pbar.set_description(f"Ultralytics model {model_name}") 791 run_config = { 792 "mode": mode, 793 "model_name": model_name, 794 "v51_dataset_name": self.dataset_info["name"], 795 "epochs": epochs, 796 "patience": config_autolabel["early_stop_patience"], 797 "batch_size": models_ultralytics[model_name][ 798 "batch_size" 799 ], 800 "img_size": models_ultralytics[model_name]["img_size"], 801 "export_dataset_root": export_dataset_root, 802 "inference_settings": config_autolabel[ 803 "inference_settings" 804 ], 805 "multi_scale": config_ultralytics["multi_scale"], 806 "cos_lr": config_ultralytics["cos_lr"], 807 } 808 809 workflow_auto_labeling_ultralytics(self.dataset, run_config) 810 811 if SUPPORTED_MODEL_SOURCES[2] in selected_model_source: 812 # Custom Co-DETR 813 config_codetr = config_autolabel["custom_codetr"] 814 run_config = { 815 "export_dataset_root": config_codetr["export_dataset_root"], 816 "container_tool": config_codetr["container_tool"], 817 "n_gpus": config_codetr["n_gpus"], 818 "mode": config_autolabel["mode"], 819 "epochs": config_autolabel["epochs"], 820 "inference_settings": config_autolabel[ 821 "inference_settings" 822 ], 823 "config": None, 824 } 825 codetr_configs = config_codetr["configs"] 826 827 for config in ( 828 pbar := tqdm( 829 codetr_configs, desc="Processing Co-DETR configurations" 830 ) 831 ): 832 pbar.set_description(f"Co-DETR model {config}") 833 run_config["config"] = config 834 workflow_auto_labeling_custom_codetr( 835 self.dataset, self.dataset_info, run_config 836 ) 837 838 if SUPPORTED_MODEL_SOURCES[3] in selected_model_source: 839 840 config_rfdetr = config_autolabel["roboflow"] 841 842 # Shared config parameters 843 shared_config = { 844 "epochs": config_autolabel["epochs"], 845 "learning_rate": config_autolabel["learning_rate"], 846 "weight_decay": config_autolabel["weight_decay"], 847 "early_stop_patience": config_autolabel["early_stop_patience"], 848 "early_stop_threshold": config_autolabel["early_stop_threshold"], 849 } 850 851 run_config = { 852 "export_dataset_root": config_rfdetr["export_dataset_root"], 853 "mode": config_autolabel["mode"], 854 "inference_settings": config_autolabel["inference_settings"], 855 "config": None, 856 # RF-DETR specific parameters 857 "batch_size": config_rfdetr["batch_size"], 858 "grad_accum_steps": config_rfdetr["grad_accum_steps"], 859 "lr_encoder": config_rfdetr["lr_encoder"], 860 "resolution": config_rfdetr["resolution"], 861 "use_ema": config_rfdetr["use_ema"], 862 "gradient_checkpointing": config_rfdetr["gradient_checkpointing"], 863 "early_stopping_min_delta": config_rfdetr["early_stopping_min_delta"], 864 "early_stopping_use_ema": config_rfdetr["early_stopping_use_ema"], 865 } 866 867 rfdetr_configs = config_rfdetr["configs"] 868 869 for config in ( 870 pbar := tqdm(rfdetr_configs, desc="Processing RF-DETR configurations") 871 ): 872 pbar.set_description(f"RF-DETR model {config}") 873 run_config["config"] = config 874 875 try: 876 wandb_exit_code = 0 877 wandb_run = wandb_init( 878 run_name=config, 879 project_name="RF-DETR Auto Labeling", 880 dataset_name=self.dataset_info["name"], 881 config=run_config, 882 wandb_activate=True, 883 ) 884 885 detector = CustomRFDETRObjectDetection( 886 self.dataset, self.dataset_info, run_config 887 ) 888 889 # Convert data to RF-DETR format 890 detector.convert_data() 891 892 # Training 893 if "train" in mode: 894 logging.info(f"Training RF-DETR model: {config}") 895 detector.train(run_config, shared_config) 896 897 # Inference 898 if "inference" in mode: 899 logging.info(f"Running inference for RF-DETR model: {config}") 900 detector.inference( 901 inference_settings=config_autolabel["inference_settings"] 902 ) 903 904 except Exception as e: 905 logging.error(f"Error during RF-DETR workflow with {config}: {e}") 906 wandb_exit_code = 1 907 finally: 908 wandb_close(wandb_exit_code) 909 910 911 elif workflow == "auto_labeling_zero_shot": 912 config = WORKFLOWS["auto_labeling_zero_shot"] 913 workflow_zero_shot_object_detection( 914 self.dataset, self.dataset_info, config 915 ) 916 917 elif workflow == "ensemble_selection": 918 # Config 919 run_config = WORKFLOWS["ensemble_selection"] 920 921 # Workflow 922 workflow_ensemble_selection( 923 self.dataset, self.dataset_info, run_config 924 ) 925 926 elif workflow == "auto_label_mask": 927 config = WORKFLOWS["auto_label_mask"] 928 workflow_auto_label_mask(self.dataset, self.dataset_info, config) 929 930 elif workflow == "class_mapping": 931 # Config 932 run_config = WORKFLOWS["class_mapping"] 933 934 # Workflow 935 workflow_class_mapping( 936 self.dataset, 937 self.dataset_info, 938 run_config, 939 test_dataset_source=None, 940 test_dataset_target=None, 941 ) 942 elif workflow == "data_ingest": 943 dataset = run_data_ingest() 944 945 dataset_info = { 946 "name": "custom_dataset", 947 "v51_type": "FiftyOneDataset", 948 "splits": ["train", "val", "test"], 949 } 950 951 self.dataset = dataset 952 self.dataset_info = dataset_info 953 self.selected_dataset = "custom_dataset" 954 955 logging.info(f"Data ingestion completed successfully.") 956 957 958 else: 959 logging.error( 960 f"Workflow {workflow} not found. Check available workflows in config.py." 961 ) 962 return False 963 964 cleanup_memory() # Clean after each workflow 965 logging.info(f"Completed workflow {workflow} and cleaned up memory") 966 967 except Exception as e: 968 logging.error(f"Workflow {workflow}: An error occurred: {e}") 969 wandb_close(exit_code=1) 970 cleanup_memory() # Clean up even after failure 971 972 return True
Execute all configured workflows in sequence and handle errors.
975def main(): 976 """Executes the data processing workflow, loads dataset, and launches Voxel51 visualization interface.""" 977 time_start = time.time() 978 configure_logging() 979 980 # Signal handler for CTRL + C 981 signal.signal(signal.SIGINT, signal_handler) 982 983 # Execute workflows 984 if "data_ingest" in SELECTED_WORKFLOW: 985 executor = WorkflowExecutor( 986 SELECTED_WORKFLOW, 987 SELECTED_DATASET["name"], 988 dataset=None, 989 dataset_info=None, 990 ) 991 executor.execute() 992 993 # FIX: Pull back outputs after ingestion 994 dataset = executor.dataset 995 dataset_info = executor.dataset_info 996 997 else: 998 dataset, dataset_info = load_dataset(SELECTED_DATASET) 999 1000 executor = WorkflowExecutor( 1001 SELECTED_WORKFLOW, 1002 SELECTED_DATASET["name"], 1003 dataset, 1004 dataset_info, 1005 ) 1006 executor.execute() 1007 1008 1009 if dataset is not None: 1010 dataset.reload() 1011 dataset.save() 1012 arrange_fields_in_groups(dataset) 1013 logging.info(f"Launching Voxel51 session for dataset {dataset_info['name']}.") 1014 1015 # Dataset stats 1016 logging.debug(dataset) 1017 logging.debug(dataset.stats(include_media=True)) 1018 1019 # V51 UI launch 1020 session = fo.launch_app( 1021 dataset, address=V51_ADDRESS, port=V51_PORT, remote=V51_REMOTE 1022 ) 1023 else: 1024 logging.info("Skipping Voxel51 session.") 1025 1026 1027 1028 time_stop = time.time() 1029 logging.info(f"Elapsed time: {time_stop - time_start:.2f} seconds")
Executes the data processing workflow, loads dataset, and launches Voxel51 visualization interface.