workflows.auto_labeling
1import datetime 2import json 3import logging 4import os 5import queue 6import random 7import re 8import shutil 9import signal 10import subprocess 11import sys 12import time 13from difflib import get_close_matches 14from functools import partial 15from pathlib import Path 16from typing import Union 17 18import albumentations as A 19import fiftyone as fo 20import numpy as np 21import psutil 22import torch 23import torch.multiprocessing as mp 24from accelerate.test_utils.testing import get_backend 25from datasets import Split 26from fiftyone import ViewField as F 27from huggingface_hub import HfApi, hf_hub_download 28from PIL import Image 29from torch.utils.data import DataLoader, Subset 30from torch.utils.tensorboard import SummaryWriter 31from torchvision.transforms.functional import to_pil_image 32from tqdm import tqdm 33from transformers import ( 34 AutoConfig, 35 AutoModelForObjectDetection, 36 AutoModelForZeroShotObjectDetection, 37 AutoProcessor, 38 EarlyStoppingCallback, 39 Trainer, 40 TrainingArguments, 41) 42from ultralytics import YOLO 43 44import wandb 45from config.config import ( 46 ACCEPTED_SPLITS, 47 GLOBAL_SEED, 48 HF_DO_UPLOAD, 49 HF_ROOT, 50 NUM_WORKERS, 51 WANDB_ACTIVE, 52 WORKFLOWS, 53) 54from utils.dataset_loader import get_supported_datasets 55from utils.logging import configure_logging 56from utils.sample_field_operations import add_sample_field 57 58 59def get_dataset_and_model_from_hf_id(hf_id: str): 60 """Extract dataset and model name from HuggingFace ID by matching against supported datasets.""" 61 # HF ID follows structure organization/dataset_model 62 # Both dataset and model can contain "_" as well 63 64 # Remove organization (everything before the first "/") 65 hf_id = hf_id.split("/", 1)[-1] 66 67 # Find all dataset names that appear in hf_id 68 supported_datasets = get_supported_datasets() 69 matches = [ 70 dataset_name for dataset_name in supported_datasets if dataset_name in hf_id 71 ] 72 73 if not matches: 74 logging.warning( 75 f"Dataset name could not be extracted from Hugging Face ID {hf_id}" 76 ) 77 dataset_name = "no_dataset_name" 78 else: 79 # Return the longest match (most specific) 80 dataset_name = max(matches, key=len) 81 82 # Get model name by removing dataset name from hf_id 83 model_name = hf_id.replace(dataset_name, "").strip("_") 84 if not model_name: 85 logging.warning( 86 f"Model name could not be extracted from Hugging Face ID {hf_id}" 87 ) 88 model_name = "no_model_name" 89 90 return dataset_name, model_name 91 92 93# Handling timeouts 94class TimeoutException(Exception): 95 """Custom exception for handling dataloader timeouts.""" 96 97 pass 98 99 100def timeout_handler(signum, frame): 101 raise TimeoutException("Dataloader creation timed out") 102 103 104class ZeroShotInferenceCollateFn: 105 """Collate function for zero-shot inference that prepares batches for model input.""" 106 107 def __init__( 108 self, 109 hf_model_config_name, 110 hf_processor, 111 batch_size, 112 object_classes, 113 batch_classes, 114 ): 115 """Initialize the auto labeling model with the Hugging Face model config, processor, batch size, object classes, and batch classes.""" 116 try: 117 self.hf_model_config_name = hf_model_config_name 118 self.processor = hf_processor 119 self.batch_size = batch_size 120 self.object_classes = object_classes 121 self.batch_classes = batch_classes 122 except Exception as e: 123 logging.error(f"Error in collate init of DataLoader: {e}") 124 125 def __call__(self, batch): 126 """Processes a batch of data by preparing images and labels for model input.""" 127 try: 128 images, labels = zip(*batch) 129 target_sizes = [tuple(img.shape[1:]) for img in images] 130 131 # Adjustments for final batch 132 n_images = len(images) 133 if n_images < self.batch_size: 134 self.batch_classes = [self.object_classes] * n_images 135 136 # Apply PIL transformation for specific models 137 if self.hf_model_config_name == "OmDetTurboConfig": 138 images = [to_pil_image(image) for image in images] 139 140 inputs = self.processor( 141 text=self.batch_classes, 142 images=images, 143 return_tensors="pt", 144 padding=True, # Allow for differently sized images 145 ) 146 147 return inputs, labels, target_sizes, self.batch_classes 148 except Exception as e: 149 logging.error(f"Error in collate function of DataLoader: {e}") 150 151 152class ZeroShotObjectDetection: 153 """Zero-shot object detection using various HuggingFace models with multi-GPU support.""" 154 155 def __init__( 156 self, 157 dataset_torch: torch.utils.data.Dataset, 158 dataset_info, 159 config, 160 detections_path="./output/detections/", 161 log_root="./logs/", 162 ): 163 """Initialize the zero-shot object detection labeler with dataset, configuration, and path settings.""" 164 self.dataset_torch = dataset_torch 165 self.dataset_info = dataset_info 166 self.dataset_name = dataset_info["name"] 167 self.object_classes = config["object_classes"] 168 self.detection_threshold = config["detection_threshold"] 169 self.detections_root = os.path.join(detections_path, self.dataset_name) 170 self.tensorboard_root = os.path.join( 171 log_root, "tensorboard/zeroshot_object_detection" 172 ) 173 174 logging.info(f"Zero-shot models will look for {self.object_classes}") 175 176 def exclude_stored_predictions( 177 self, dataset_v51: fo.Dataset, config, do_exclude=False 178 ): 179 """Checks for existing predictions and loads them from disk if available.""" 180 dataset_schema = dataset_v51.get_field_schema() 181 models_splits_dict = {} 182 for model_name, value in config["hf_models_zeroshot_objectdetection"].items(): 183 model_name_key = re.sub(r"[\W-]+", "_", model_name) 184 pred_key = re.sub( 185 r"[\W-]+", "_", "pred_zsod_" + model_name 186 ) # od for Object Detection 187 # Check if data already stored in V51 dataset 188 if pred_key in dataset_schema and do_exclude is True: 189 logging.warning( 190 f"Skipping model {model_name}. Predictions already stored in Voxel51 dataset." 191 ) 192 # Check if data already stored on disk 193 elif ( 194 os.path.isdir(os.path.join(self.detections_root, model_name_key)) 195 and do_exclude is True 196 ): 197 try: 198 logging.info(f"Loading {model_name} predictions from disk.") 199 temp_dataset = fo.Dataset.from_dir( 200 dataset_dir=os.path.join(self.detections_root, model_name_key), 201 dataset_type=fo.types.COCODetectionDataset, 202 name="temp_dataset", 203 data_path="data.json", 204 ) 205 206 # Copy all detections from stored dataset into our dataset 207 detections = temp_dataset.values("detections.detections") 208 add_sample_field( 209 dataset_v51, 210 pred_key, 211 fo.EmbeddedDocumentField, 212 embedded_doc_type=fo.Detections, 213 ) 214 dataset_v51.set_values(f"{pred_key}.detections", detections) 215 except Exception as e: 216 logging.error( 217 f"Data in {os.path.join(self.detections_root, model_name_key)} could not be loaded. Error: {e}" 218 ) 219 finally: 220 fo.delete_dataset("temp_dataset") 221 # Assign model to be run 222 else: 223 models_splits_dict[model_name] = value 224 225 logging.info(f"Models to be run: {models_splits_dict}") 226 return models_splits_dict 227 228 # Worker functions 229 def update_queue_sizes_worker( 230 self, queues, queue_sizes, largest_queue_index, max_queue_size 231 ): 232 """Monitor and manage multiple result queues for balanced processing.""" 233 experiment_name = f"queue_size_monitor_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}" 234 log_directory = os.path.join( 235 self.tensorboard_root, self.dataset_name, experiment_name 236 ) 237 wandb.tensorboard.patch(root_logdir=log_directory) 238 if WANDB_ACTIVE: 239 wandb.init( 240 name=f"queue_size_monitor_{os.getpid()}", 241 job_type="inference", 242 project="Zero Shot Object Detection", 243 ) 244 writer = SummaryWriter(log_dir=log_directory) 245 246 step = 0 247 248 while True: 249 for i, queue in enumerate(queues): 250 queue_sizes[i] = queue.qsize() 251 writer.add_scalar(f"queue_size/items/{i}", queue_sizes[i], step) 252 253 step += 1 254 255 # Find the index of the largest queue 256 max_size = max(queue_sizes) 257 max_index = queue_sizes.index(max_size) 258 259 # Calculate the total size of all queues 260 total_size = sum(queue_sizes) 261 262 # If total_size is greater than 0, calculate the probabilities 263 if total_size > 0: 264 # Normalize the queue sizes by the max_queue_size 265 normalized_sizes = [size / max_queue_size for size in queue_sizes] 266 267 # Calculate probabilities based on normalized sizes 268 probabilities = [ 269 size / sum(normalized_sizes) for size in normalized_sizes 270 ] 271 272 # Use random.choices with weights (probabilities) 273 chosen_queue_index = random.choices( 274 range(len(queues)), weights=probabilities, k=1 275 )[0] 276 277 largest_queue_index.value = chosen_queue_index 278 else: 279 largest_queue_index.value = max_index 280 281 time.sleep(0.1) 282 283 def process_outputs_worker( 284 self, 285 result_queues, 286 largest_queue_index, 287 inference_finished, 288 max_queue_size, 289 wandb_activate=False, 290 ): 291 """Process model outputs from result queues and save to dataset.""" 292 configure_logging() 293 logging.info(f"Process ID: {os.getpid()}. Results processing process started") 294 dataset_v51 = fo.load_dataset(self.dataset_name) 295 processing_successful = None 296 297 # Logging 298 experiment_name = f"post_process_{os.getpid()}_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}" 299 log_directory = os.path.join( 300 self.tensorboard_root, self.dataset_name, experiment_name 301 ) 302 wandb.tensorboard.patch(root_logdir=log_directory) 303 if WANDB_ACTIVE and wandb_activate: 304 wandb.init( 305 name=f"post_process_{os.getpid()}", 306 job_type="inference", 307 project="Zero Shot Object Detection", 308 ) 309 writer = SummaryWriter(log_dir=log_directory) 310 n_processed_images = 0 311 312 logging.info(f"Post-Processor {os.getpid()} starting loop.") 313 314 while True: 315 results_queue = result_queues[largest_queue_index.value] 316 writer.add_scalar( 317 f"post_processing/selected_queue", 318 largest_queue_index.value, 319 n_processed_images, 320 ) 321 322 if results_queue.qsize() == max_queue_size: 323 logging.warning( 324 f"Queue full: {results_queue.qsize()}. Consider increasing number of post-processing workers." 325 ) 326 327 # Exit only when inference is finished and the queue is empty 328 if inference_finished.value and results_queue.empty(): 329 dataset_v51.save() 330 logging.info( 331 f"Post-processing worker {os.getpid()} has finished all outputs." 332 ) 333 break 334 335 # Process results from the queue if available 336 if not results_queue.empty(): 337 try: 338 time_start = time.time() 339 340 result = results_queue.get_nowait() 341 342 processing_successful = self.process_outputs( 343 dataset_v51, 344 result, 345 self.object_classes, 346 self.detection_threshold, 347 ) 348 349 # Performance logging 350 n_images = len(result["labels"]) 351 time_end = time.time() 352 duration = time_end - time_start 353 batches_per_second = 1 / duration 354 frames_per_second = batches_per_second * n_images 355 n_processed_images += n_images 356 writer.add_scalar( 357 f"post_processing/frames_per_second", 358 frames_per_second, 359 n_processed_images, 360 ) 361 362 del result # Explicit removal from device 363 364 except Exception as e: 365 continue 366 367 else: 368 continue 369 370 writer.close() 371 wandb.finish(exit_code=0) 372 return processing_successful # Return last processing status 373 374 def gpu_worker( 375 self, 376 gpu_id, 377 cpu_cores, 378 task_queue, 379 results_queue, 380 done_event, 381 post_processing_finished, 382 set_cpu_affinity=False, 383 ): 384 """Run model inference on specified GPU with dedicated CPU cores.""" 385 dataset_v51 = fo.load_dataset( 386 self.dataset_name 387 ) # NOTE Only for the case of sequential processing 388 configure_logging() 389 # Set CPU 390 if set_cpu_affinity: 391 # Allow only certain CPU cores 392 psutil.Process().cpu_affinity(cpu_cores) 393 logging.info(f"Available CPU cores: {psutil.Process().cpu_affinity()}") 394 max_n_cpus = len(cpu_cores) 395 torch.set_num_threads(max_n_cpus) 396 397 # Set GPU 398 logging.info(f"GPU {gpu_id}: {torch.cuda.get_device_name(gpu_id)}") 399 device = torch.device(f"cuda:{gpu_id}") 400 401 run_successful = None 402 with torch.cuda.device(gpu_id): 403 while True: 404 if post_processing_finished.value and task_queue.empty(): 405 # Keep alive until post-processing is done 406 break 407 408 if task_queue.empty(): 409 done_event.set() 410 411 if not task_queue.empty(): 412 try: 413 task_metadata = task_queue.get( 414 timeout=5 415 ) # Timeout to prevent indefinite blocking 416 except Exception as e: 417 break # Exit if no more tasks 418 run_successful = self.model_inference( 419 task_metadata, 420 device, 421 self.dataset_torch, 422 dataset_v51, 423 self.object_classes, 424 results_queue, 425 self.tensorboard_root, 426 ) 427 logging.info( 428 f"Worker for GPU {gpu_id} finished run successful: {run_successful}" 429 ) 430 else: 431 continue 432 return run_successful # Return last processing status 433 434 def eval_and_export_worker(self, models_ready_queue, n_models): 435 """Evaluate model performance and export results for completed models.""" 436 configure_logging() 437 logging.info(f"Process ID: {os.getpid()}. Eval-and-export process started") 438 439 dataset = fo.load_dataset(self.dataset_name) 440 run_successful = None 441 models_done = 0 442 443 while True: 444 if not models_ready_queue.empty(): 445 try: 446 dict = models_ready_queue.get( 447 timeout=5 448 ) # Timeout to prevent indefinite blocking 449 model_name = dict["model_name"] 450 pred_key = re.sub(r"[\W-]+", "_", "pred_zsod_" + model_name) 451 eval_key = re.sub(r"[\W-]+", "_", "eval_zsod_" + model_name) 452 dataset.reload() 453 run_successful = self.eval_and_export( 454 dataset, model_name, pred_key, eval_key 455 ) 456 models_done += 1 457 logging.info( 458 f"Evaluation and export of {models_done}/{n_models} models done." 459 ) 460 except Exception as e: 461 logging.error(f"Error in eval-and-export worker: {e}") 462 continue 463 464 if models_done == n_models: 465 break 466 467 return run_successful 468 469 # Functionality functions 470 def model_inference( 471 self, 472 metadata: dict, 473 device: str, 474 dataset: torch.utils.data.Dataset, 475 dataset_v51: fo.Dataset, 476 object_classes: list, 477 results_queue: Union[queue.Queue, mp.Queue], 478 root_log_dir: str, 479 persistent_workers: bool = False, 480 ): 481 """Model inference method running zero-shot object detection on provided dataset and device, returning success status.""" 482 writer = None 483 run_successful = True 484 processor, model, inputs, outputs, result, dataloader = ( 485 None, 486 None, 487 None, 488 None, 489 None, 490 None, 491 ) # For finally block 492 493 # Timeout handler 494 dataloader_timeout = 60 495 signal.signal(signal.SIGALRM, timeout_handler) 496 497 try: 498 # Metadata 499 run_id = metadata["run_id"] 500 model_name = metadata["model_name"] 501 dataset_name = metadata["dataset_name"] 502 is_subset = metadata["is_subset"] 503 batch_size = metadata["batch_size"] 504 505 logging.info( 506 f"Process ID: {os.getpid()}, Run ID: {run_id}, Device: {device}, Model: {model_name}" 507 ) 508 509 # Load the model 510 logging.info(f"Loading model {model_name}") 511 processor = AutoProcessor.from_pretrained(model_name, use_fast=True) 512 model = AutoModelForZeroShotObjectDetection.from_pretrained(model_name) 513 model = model.to(device, non_blocking=True) 514 model.eval() 515 hf_model_config = AutoConfig.from_pretrained(model_name) 516 hf_model_config_name = type(hf_model_config).__name__ 517 batch_classes = [object_classes] * batch_size 518 logging.info(f"Loaded model type {hf_model_config_name}") 519 520 # Dataloader 521 logging.info("Generating dataloader") 522 if is_subset: 523 chunk_index_start = metadata["chunk_index_start"] 524 chunk_index_end = metadata["chunk_index_end"] 525 logging.info(f"Length of dataset: {len(dataset)}") 526 logging.info(f"Subset start index: {chunk_index_start}") 527 logging.info(f"Subset stop index: {chunk_index_end}") 528 dataset = Subset(dataset, range(chunk_index_start, chunk_index_end)) 529 530 zero_shot_inference_preprocessing = ZeroShotInferenceCollateFn( 531 hf_model_config_name=hf_model_config_name, 532 hf_processor=processor, 533 object_classes=object_classes, 534 batch_size=batch_size, 535 batch_classes=batch_classes, 536 ) 537 num_workers = WORKFLOWS["auto_labeling_zero_shot"]["n_worker_dataloader"] 538 prefetch_factor = WORKFLOWS["auto_labeling_zero_shot"][ 539 "prefetch_factor_dataloader" 540 ] 541 dataloader = DataLoader( 542 dataset, 543 batch_size=batch_size, 544 shuffle=False, 545 num_workers=num_workers, 546 persistent_workers=persistent_workers, 547 pin_memory=True, 548 prefetch_factor=prefetch_factor, 549 collate_fn=zero_shot_inference_preprocessing, 550 ) 551 552 dataloader_length = len(dataloader) 553 if dataloader_length < 1: 554 logging.error( 555 f"Dataloader has insufficient data: {dataloader_length} entries. Please check your dataset and DataLoader configuration." 556 ) 557 558 # Logging 559 experiment_name = f"{model_name}_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}_{device}" 560 log_directory = os.path.join(root_log_dir, dataset_name, experiment_name) 561 wandb.tensorboard.patch(root_logdir=log_directory) 562 if WANDB_ACTIVE: 563 wandb.init( 564 name=f"{model_name}_{device}", 565 job_type="inference", 566 project="Zero Shot Object Detection", 567 config=metadata, 568 ) 569 writer = SummaryWriter(log_dir=log_directory) 570 571 # Inference Loop 572 logging.info(f"{os.getpid()}: Starting inference loop5") 573 n_processed_images = 0 574 for inputs, labels, target_sizes, batch_classes in tqdm( 575 dataloader, desc="Inference Loop" 576 ): 577 signal.alarm(dataloader_timeout) 578 try: 579 time_start = time.time() 580 n_images = len(labels) 581 inputs = inputs.to(device, non_blocking=True) 582 583 with torch.amp.autocast("cuda"), torch.inference_mode(): 584 outputs = model(**inputs) 585 586 result = { 587 "inputs": inputs, 588 "outputs": outputs, 589 "processor": processor, 590 "target_sizes": target_sizes, 591 "labels": labels, 592 "model_name": model_name, 593 "hf_model_config_name": hf_model_config_name, 594 "batch_classes": batch_classes, 595 } 596 597 logging.debug(f"{os.getpid()}: Putting result into queue") 598 599 results_queue.put( 600 result, timeout=60 601 ) # Ditch data only after 60 seconds 602 603 # Logging 604 time_end = time.time() 605 duration = time_end - time_start 606 batches_per_second = 1 / duration 607 frames_per_second = batches_per_second * n_images 608 n_processed_images += n_images 609 logging.debug( 610 f"{os.getpid()}: Number of processes images: {n_processed_images}" 611 ) 612 writer.add_scalar( 613 f"inference/frames_per_second", 614 frames_per_second, 615 n_processed_images, 616 ) 617 618 except TimeoutException: 619 logging.warning( 620 f"Dataloader loop got stuck. Continuing with next batch." 621 ) 622 continue 623 624 finally: 625 signal.alarm(0) # Cancel the alarm 626 627 # Flawless execution 628 wandb_exit_code = 0 629 630 except Exception as e: 631 wandb_exit_code = 1 632 run_successful = False 633 logging.error(f"Error in Process {os.getpid()}: {e}") 634 finally: 635 try: 636 wandb.finish(exit_code=wandb_exit_code) 637 except: 638 pass 639 640 # Explicit removal from device 641 del ( 642 processor, 643 model, 644 inputs, 645 outputs, 646 result, 647 dataloader, 648 ) 649 650 torch.cuda.empty_cache() 651 wandb.tensorboard.unpatch() 652 if writer: 653 writer.close() 654 return run_successful 655 656 def process_outputs(self, dataset_v51, result, object_classes, detection_threshold): 657 """Process outputs from object detection models, extracting bounding boxes and labels to save to the dataset.""" 658 try: 659 inputs = result["inputs"] 660 outputs = result["outputs"] 661 target_sizes = result["target_sizes"] 662 labels = result["labels"] 663 model_name = result["model_name"] 664 hf_model_config_name = result["hf_model_config_name"] 665 batch_classes = result["batch_classes"] 666 processor = result["processor"] 667 668 # Processing output 669 if hf_model_config_name == "GroundingDinoConfig": 670 results = processor.post_process_grounded_object_detection( 671 outputs, 672 inputs.input_ids, 673 box_threshold=detection_threshold, 674 text_threshold=detection_threshold, 675 ) 676 elif hf_model_config_name in ["Owlv2Config", "OwlViTConfig"]: 677 results = processor.post_process_grounded_object_detection( 678 outputs=outputs, 679 threshold=detection_threshold, 680 target_sizes=target_sizes, 681 text_labels=batch_classes, 682 ) 683 elif hf_model_config_name == "OmDetTurboConfig": 684 results = processor.post_process_grounded_object_detection( 685 outputs, 686 text_labels=batch_classes, 687 threshold=detection_threshold, 688 nms_threshold=detection_threshold, 689 target_sizes=target_sizes, 690 ) 691 else: 692 logging.error(f"Invalid model name: {hf_model_config_name}") 693 694 if not len(results) == len(target_sizes) == len(labels): 695 logging.error( 696 f"Lengths of results, target_sizes, and labels do not match: {len(results)}, {len(target_sizes)}, {len(labels)}" 697 ) 698 for result, size, target in zip(results, target_sizes, labels): 699 boxes, scores, labels = ( 700 result["boxes"], 701 result["scores"], 702 result["text_labels"], 703 ) 704 705 img_height = size[0] 706 img_width = size[1] 707 708 detections = [] 709 for box, score, label in zip(boxes, scores, labels): 710 processing_successful = True 711 if hf_model_config_name == "GroundingDinoConfig": 712 # Outputs do not comply with given labels 713 # Grounding DINO outputs multiple pairs of object boxes and noun phrases for a given (Image, Text) pair 714 # There can be either multiple labels per output ("bike van"), incomplete ones ("motorcyc"), or broken ones ("##cic") 715 processed_label = label.split()[ 716 0 717 ] # Assume first output is the best output 718 if processed_label in object_classes: 719 label = processed_label 720 top_left_x = box[0].item() 721 top_left_y = box[1].item() 722 box_width = (box[2] - box[0]).item() 723 box_height = (box[3] - box[1]).item() 724 else: 725 matches = get_close_matches( 726 processed_label, object_classes, n=1, cutoff=0.6 727 ) 728 selected_label = matches[0] if matches else None 729 if selected_label: 730 logging.debug( 731 f"Mapped output '{processed_label}' to class '{selected_label}'" 732 ) 733 label = selected_label 734 top_left_x = box[0].item() 735 top_left_y = box[1].item() 736 box_width = (box[2] - box[0]).item() 737 box_height = (box[3] - box[1]).item() 738 else: 739 logging.debug( 740 f"Skipped detection with {hf_model_config_name} due to unclear output: {label}" 741 ) 742 processing_successful = False 743 744 elif hf_model_config_name in [ 745 "Owlv2Config", 746 "OwlViTConfig", 747 "OmDetTurboConfig", 748 ]: 749 top_left_x = box[0].item() / img_width 750 top_left_y = box[1].item() / img_height 751 box_width = (box[2].item() - box[0].item()) / img_width 752 box_height = (box[3].item() - box[1].item()) / img_height 753 754 if ( 755 processing_successful 756 ): # Skip GroundingDinoConfig labels that could not be processed 757 detection = fo.Detection( 758 label=label, 759 bounding_box=[ 760 top_left_x, 761 top_left_y, 762 box_width, 763 box_height, 764 ], 765 confidence=score.item(), 766 ) 767 detection["bbox_area"] = ( 768 detection["bounding_box"][2] * detection["bounding_box"][3] 769 ) 770 detections.append(detection) 771 772 # Attach label to V51 dataset 773 pred_key = re.sub( 774 r"[\W-]+", "_", "pred_zsod_" + model_name 775 ) # zsod Zero-Shot Object Deection 776 sample = dataset_v51[target["image_id"]] 777 sample[pred_key] = fo.Detections(detections=detections) 778 sample.save() 779 780 except Exception as e: 781 logging.error(f"Error in processing outputs: {e}") 782 processing_successful = False 783 finally: 784 return processing_successful 785 786 def eval_and_export(self, dataset_v51, model_name, pred_key, eval_key): 787 """Populate dataset with evaluation results (if ground_truth available)""" 788 try: 789 dataset_v51.evaluate_detections( 790 pred_key, 791 gt_field="ground_truth", 792 eval_key=eval_key, 793 compute_mAP=True, 794 ) 795 except Exception as e: 796 logging.warning(f"Evaluation not possible: {e}") 797 798 # Store labels https://docs.voxel51.com/api/fiftyone.core.collections.html#fiftyone.core.collections.SampleCollection.export 799 model_name_key = re.sub(r"[\W-]+", "_", model_name) 800 dataset_v51.export( 801 export_dir=os.path.join(self.detections_root, model_name_key), 802 dataset_type=fo.types.COCODetectionDataset, 803 data_path="data.json", 804 export_media=None, # "manifest", 805 label_field=pred_key, 806 progress=True, 807 ) 808 return True 809 810 811class UltralyticsObjectDetection: 812 """Object detection using Ultralytics YOLO models with training and inference support.""" 813 814 def __init__(self, dataset, config): 815 """Initialize with dataset, config, and setup paths for model and data.""" 816 self.dataset = dataset 817 self.config = config 818 self.ultralytics_data_path = os.path.join( 819 config["export_dataset_root"], config["v51_dataset_name"] 820 ) 821 822 self.hf_hub_model_id = ( 823 f"{HF_ROOT}/" 824 + f"{config['v51_dataset_name']}_{config['model_name']}".replace("/", "_") 825 ) 826 827 self.export_root = "output/models/ultralytics/" 828 self.export_folder = os.path.join( 829 self.export_root, self.config["v51_dataset_name"] 830 ) 831 832 self.model_path = os.path.join( 833 self.export_folder, self.config["model_name"], "weights", "best.pt" 834 ) 835 836 @staticmethod 837 def export_data( 838 dataset, dataset_info, export_dataset_root, label_field="ground_truth" 839 ): 840 """Export dataset to YOLO format for Ultralytics training.""" 841 ultralytics_data_path = os.path.join(export_dataset_root, dataset_info["name"]) 842 # Delete export directory if it already exists 843 if os.path.exists(ultralytics_data_path): 844 shutil.rmtree(ultralytics_data_path) 845 846 logging.info("Exporting data for training with Ultralytics") 847 classes = dataset.distinct(f"{label_field}.detections.label") 848 849 # Make directory 850 os.makedirs(ultralytics_data_path, exist_ok=False) 851 852 for split in ACCEPTED_SPLITS: 853 split_view = dataset.match_tags(split) 854 855 if split == "val" or split == "train": # YOLO expects train and val 856 split_view.export( 857 export_dir=ultralytics_data_path, 858 dataset_type=fo.types.YOLOv5Dataset, 859 label_field=label_field, 860 classes=classes, 861 split=split, 862 ) 863 864 def train(self): 865 """Train the YOLO model for object detection using Ultralytics and optionally upload to Hugging Face.""" 866 model = YOLO(self.config["model_name"], task="detect") 867 # https://docs.ultralytics.com/modes/train/#train-settings 868 869 # Use all available GPUs 870 device = "0" # Default to GPU 0 871 if torch.cuda.device_count() > 1: 872 device = ",".join(map(str, range(torch.cuda.device_count()))) 873 874 results = model.train( 875 data=f"{self.ultralytics_data_path}/dataset.yaml", 876 epochs=self.config["epochs"], 877 project=self.export_folder, 878 name=self.config["model_name"], 879 patience=self.config["patience"], 880 batch=self.config["batch_size"], 881 imgsz=self.config["img_size"], 882 multi_scale=self.config["multi_scale"], 883 cos_lr=self.config["cos_lr"], 884 seed=GLOBAL_SEED, 885 optimizer="AdamW", # "auto" as default 886 pretrained=True, 887 exist_ok=True, 888 amp=True, 889 device=device 890 ) 891 metrics = model.val() 892 logging.info(f"Model Performance: {metrics}") 893 894 # Upload model to Hugging Face 895 if HF_DO_UPLOAD: 896 logging.info(f"Uploading model {self.model_path} to Hugging Face.") 897 api = HfApi() 898 api.create_repo( 899 self.hf_hub_model_id, private=True, repo_type="model", exist_ok=True 900 ) 901 api.upload_file( 902 path_or_fileobj=self.model_path, 903 path_in_repo="best.pt", 904 repo_id=self.hf_hub_model_id, 905 repo_type="model", 906 ) 907 908 def inference(self, gt_field="ground_truth"): 909 """Performs inference using YOLO model on a dataset, with options to evaluate results.""" 910 logging.info(f"Running inference on dataset {self.config['v51_dataset_name']}") 911 inference_settings = self.config["inference_settings"] 912 913 dataset_name = None 914 model_name = self.config["model_name"] 915 916 model_hf = inference_settings["model_hf"] 917 if model_hf is not None: 918 # Use model manually defined in config. 919 # This way models can be used for inference which were trained on a different dataset 920 dataset_name, _ = get_dataset_and_model_from_hf_id(model_hf) 921 922 # Set up directories 923 download_dir = os.path.join( 924 self.export_root, dataset_name, model_name, "weights" 925 ) 926 os.makedirs(os.path.join(download_dir), exist_ok=True) 927 928 self.model_path = os.path.join(download_dir, "best.pt") 929 930 # Create directories if they don't exist 931 932 file_path = hf_hub_download( 933 repo_id=model_hf, 934 filename="best.pt", 935 local_dir=download_dir, 936 ) 937 else: 938 # Automatically determine model based on dataset 939 dataset_name = self.config["v51_dataset_name"] 940 941 try: 942 if os.path.exists(self.model_path): 943 file_path = self.model_path 944 logging.info(f"Loading model {model_name} from disk: {file_path}") 945 else: 946 download_dir = self.model_path.replace("best.pt", "") 947 os.makedirs(download_dir, exist_ok=True) 948 logging.info( 949 f"Downloading model {self.hf_hub_model_id} from Hugging Face to {download_dir}" 950 ) 951 file_path = hf_hub_download( 952 repo_id=self.hf_hub_model_id, 953 filename="best.pt", 954 local_dir=download_dir, 955 ) 956 except Exception as e: 957 logging.error(f"Failed to load or download model: {str(e)}.") 958 return False 959 960 pred_key = f"pred_od_{model_name}-{dataset_name}" 961 logging.info(f"Using model {self.model_path} for inference.") 962 model = YOLO(self.model_path) 963 964 detection_threshold = inference_settings["detection_threshold"] 965 if inference_settings["inference_on_test"] is True: 966 dataset_eval_view = self.dataset.match_tags("test") 967 if len(dataset_eval_view) == 0: 968 logging.error("Dataset misses split 'test'") 969 dataset_eval_view.apply_model( 970 model, label_field=pred_key, confidence_thresh=detection_threshold 971 ) 972 else: 973 self.dataset.apply_model( 974 model, label_field=pred_key, confidence_thresh=detection_threshold 975 ) 976 977 if inference_settings["do_eval"]: 978 eval_key = f"eval_{self.config['model_name']}_{dataset_name}" 979 980 if inference_settings["inference_on_test"] is True: 981 dataset_view = self.dataset.match_tags(["test"]) 982 else: 983 dataset_view = self.dataset 984 985 dataset_view.evaluate_detections( 986 pred_key, 987 gt_field=gt_field, 988 eval_key=eval_key, 989 compute_mAP=True, 990 ) 991 992 993def transform_batch_standalone( 994 batch, 995 image_processor, 996 do_convert_annotations=True, 997 return_pixel_mask=False, 998): 999 """Apply format annotations in COCO format for object detection task. Outside of class so it can be pickled.""" 1000 images = [] 1001 annotations = [] 1002 1003 for image_path, annotation in zip(batch["image_path"], batch["objects"]): 1004 image = Image.open(image_path).convert("RGB") 1005 image_np = np.array(image) 1006 images.append(image_np) 1007 1008 coco_annotations = [] 1009 for i, bbox in enumerate(annotation["bbox"]): 1010 1011 # Conversion from HF dataset bounding boxes to DETR: 1012 # Input: HF dataset bbox is COCO (top_left_x, top_left_y, width, height) in absolute coordinates 1013 # Output: 1014 # DETR expects COCO (top_left_x, top_left_y, width, height) in absolute coordinates if 'do_convert_annotations == True' 1015 # DETR expects YOLO (center_x, center_y, width, height) in relative coordinates between [0,1] if 'do_convert_annotations == False' 1016 1017 if do_convert_annotations == False: 1018 x, y, w, h = bbox 1019 img_height, img_width = image_np.shape[:2] 1020 center_x = (x + w / 2) / img_width 1021 center_y = (y + h / 2) / img_height 1022 width = w / img_width 1023 height = h / img_height 1024 bbox = [center_x, center_y, width, height] 1025 1026 # Ensure bbox values are within the expected range 1027 assert all(0 <= coord <= 1 for coord in bbox), f"Invalid bbox: {bbox}" 1028 1029 logging.debug( 1030 f"Converted {[x, y, w, h]} to {[center_x, center_y, width, height]} with 'do_convert_annotations' = {do_convert_annotations}" 1031 ) 1032 1033 coco_annotation = { 1034 "image_id": annotation["image_id"], 1035 "bbox": bbox, 1036 "category_id": annotation["category_id"][i], 1037 "area": annotation["area"][i], 1038 "iscrowd": 0, 1039 } 1040 coco_annotations.append(coco_annotation) 1041 detr_annotation = { 1042 "image_id": annotation["image_id"], 1043 "annotations": coco_annotations, 1044 } 1045 annotations.append(detr_annotation) 1046 1047 # Apply the image processor transformations: resizing, rescaling, normalization 1048 result = image_processor( 1049 images=images, annotations=annotations, return_tensors="pt" 1050 ) 1051 1052 if not return_pixel_mask: 1053 result.pop("pixel_mask", None) 1054 1055 return result 1056 1057 1058class HuggingFaceObjectDetection: 1059 """Object detection using HuggingFace models with support for training and inference.""" 1060 1061 def __init__( 1062 self, 1063 dataset, 1064 config, 1065 output_model_path="./output/models/object_detection_hf", 1066 output_detections_path="./output/detections/", 1067 gt_field="ground_truth", 1068 ): 1069 """Initialize with dataset, config, and optional output paths.""" 1070 self.dataset = dataset 1071 self.config = config 1072 self.model_name = config["model_name"] 1073 self.model_name_key = re.sub(r"[\W-]+", "_", self.model_name) 1074 self.dataset_name = config["v51_dataset_name"] 1075 self.do_convert_annotations = True # HF can convert (top_left_x, top_left_y, bottom_right_x, bottom_right_y) in abs. coordinates to (x_min, y_min, width, height) in rel. coordinates https://github.com/huggingface/transformers/blob/v4.48.2/src/transformers/models/conditional_detr/image_processing_conditional_detr.py#L1497 1076 1077 self.detections_root = os.path.join( 1078 output_detections_path, self.dataset_name, self.model_name_key 1079 ) 1080 1081 self.model_root = os.path.join( 1082 output_model_path, self.dataset_name, self.model_name_key 1083 ) 1084 1085 self.hf_hub_model_id = ( 1086 f"{HF_ROOT}/" + f"{self.dataset_name}_{self.model_name}".replace("/", "_") 1087 ) 1088 1089 self.categories = dataset.distinct(f"{gt_field}.detections.label") 1090 self.id2label = {index: x for index, x in enumerate(self.categories, start=0)} 1091 self.label2id = {v: k for k, v in self.id2label.items()} 1092 1093 def collate_fn(self, batch): 1094 """Collate function for batching data during training and inference.""" 1095 data = {} 1096 data["pixel_values"] = torch.stack([x["pixel_values"] for x in batch]) 1097 data["labels"] = [x["labels"] for x in batch] 1098 if "pixel_mask" in batch[0]: 1099 data["pixel_mask"] = torch.stack([x["pixel_mask"] for x in batch]) 1100 return data 1101 1102 def train(self, hf_dataset, overwrite_output=True): 1103 """Train models for object detection tasks with support for custom image sizes and transformations.""" 1104 torch.cuda.empty_cache() 1105 img_size_target = self.config.get("image_size", None) 1106 if img_size_target is None: 1107 image_processor = AutoProcessor.from_pretrained( 1108 self.model_name, 1109 do_resize=False, 1110 do_pad=True, 1111 use_fast=True, 1112 do_convert_annotations=self.do_convert_annotations, 1113 ) 1114 else: 1115 logging.warning(f"Resizing images to target size {img_size_target}.") 1116 image_processor = AutoProcessor.from_pretrained( 1117 self.model_name, 1118 do_resize=True, 1119 size={ 1120 "max_height": img_size_target[1], 1121 "max_width": img_size_target[0], 1122 }, 1123 do_pad=True, 1124 pad_size={"height": img_size_target[1], "width": img_size_target[0]}, 1125 use_fast=True, 1126 do_convert_annotations=self.do_convert_annotations, 1127 ) 1128 1129 train_transform_batch = partial( 1130 transform_batch_standalone, 1131 image_processor=image_processor, 1132 do_convert_annotations=self.do_convert_annotations, 1133 ) 1134 val_test_transform_batch = partial( 1135 transform_batch_standalone, 1136 image_processor=image_processor, 1137 do_convert_annotations=self.do_convert_annotations, 1138 ) 1139 1140 hf_dataset[Split.TRAIN] = hf_dataset[Split.TRAIN].with_transform( 1141 train_transform_batch 1142 ) 1143 hf_dataset[Split.VALIDATION] = hf_dataset[Split.VALIDATION].with_transform( 1144 val_test_transform_batch 1145 ) 1146 hf_dataset[Split.TEST] = hf_dataset[Split.TEST].with_transform( 1147 val_test_transform_batch 1148 ) 1149 1150 hf_model_config = AutoConfig.from_pretrained(self.model_name) 1151 hf_model_config_name = type(hf_model_config).__name__ 1152 1153 if type(hf_model_config) in AutoModelForObjectDetection._model_mapping: 1154 model = AutoModelForObjectDetection.from_pretrained( 1155 self.model_name, 1156 id2label=self.id2label, 1157 label2id=self.label2id, 1158 ignore_mismatched_sizes=True, 1159 ) 1160 else: 1161 model = None 1162 logging.error( 1163 "Hugging Face AutoModel does not support " + str(type(hf_model_config)) 1164 ) 1165 1166 if ( 1167 overwrite_output == True 1168 and os.path.exists(self.model_root) 1169 and os.listdir(self.model_root) 1170 ): 1171 logging.warning( 1172 f"Training will overwrite existing results in {self.model_root}" 1173 ) 1174 1175 training_args = TrainingArguments( 1176 run_name=self.model_name, 1177 output_dir=self.model_root, 1178 overwrite_output_dir=overwrite_output, 1179 num_train_epochs=self.config["epochs"], 1180 fp16=True, 1181 per_device_train_batch_size=self.config["batch_size"], 1182 auto_find_batch_size=True, 1183 dataloader_num_workers=min(self.config["n_worker_dataloader"], NUM_WORKERS), 1184 learning_rate=self.config["learning_rate"], 1185 lr_scheduler_type="cosine", 1186 weight_decay=self.config["weight_decay"], 1187 max_grad_norm=self.config["max_grad_norm"], 1188 metric_for_best_model="eval_loss", 1189 greater_is_better=False, 1190 load_best_model_at_end=True, 1191 eval_strategy="epoch", 1192 save_strategy="best", 1193 save_total_limit=1, 1194 remove_unused_columns=False, 1195 eval_do_concat_batches=False, 1196 save_safetensors=False, # Does not work with all models 1197 hub_model_id=self.hf_hub_model_id, 1198 hub_private_repo=True, 1199 push_to_hub=HF_DO_UPLOAD, 1200 seed=GLOBAL_SEED, 1201 data_seed=GLOBAL_SEED, 1202 ) 1203 1204 early_stopping_callback = EarlyStoppingCallback( 1205 early_stopping_patience=self.config["early_stop_patience"], 1206 early_stopping_threshold=self.config["early_stop_threshold"], 1207 ) 1208 1209 trainer = Trainer( 1210 model=model, 1211 args=training_args, 1212 train_dataset=hf_dataset[Split.TRAIN], 1213 eval_dataset=hf_dataset[Split.VALIDATION], 1214 tokenizer=image_processor, 1215 data_collator=self.collate_fn, 1216 callbacks=[early_stopping_callback], 1217 # compute_metrics=eval_compute_metrics_fn, 1218 ) 1219 1220 logging.info(f"Starting training of model {self.model_name}.") 1221 trainer.train() 1222 if HF_DO_UPLOAD: 1223 trainer.push_to_hub() 1224 1225 metrics = trainer.evaluate(eval_dataset=hf_dataset[Split.TEST]) 1226 logging.info(f"Model training completed. Evaluation results: {metrics}") 1227 1228 def inference(self, inference_settings, load_from_hf=True, gt_field="ground_truth"): 1229 """Performs model inference on a dataset, loading from Hugging Face or disk, and optionally evaluates detection results.""" 1230 1231 model_hf = inference_settings["model_hf"] 1232 dataset_name = None 1233 if model_hf is not None: 1234 self.hf_hub_model_id = model_hf 1235 dataset_name, model_name = get_dataset_and_model_from_hf_id(model_hf) 1236 else: 1237 dataset_name = self.dataset_name 1238 torch.cuda.empty_cache() 1239 # Load trained model from Hugging Face 1240 load_from_hf_successful = None 1241 if load_from_hf: 1242 try: 1243 logging.info(f"Loading model from Hugging Face: {self.hf_hub_model_id}") 1244 image_processor = AutoProcessor.from_pretrained(self.hf_hub_model_id) 1245 model = AutoModelForObjectDetection.from_pretrained( 1246 self.hf_hub_model_id 1247 ) 1248 load_from_hf_successful = True 1249 except Exception as e: 1250 load_from_hf_successful = False 1251 logging.warning( 1252 f"Model {self.model_name} could not be loaded from Hugging Face {self.hf_hub_model_id}. Attempting loading from disk." 1253 ) 1254 if load_from_hf == False or load_from_hf_successful == False: 1255 try: 1256 # Select folder in self.model_root that include 'checkpoint-' 1257 checkpoint_dirs = [ 1258 d 1259 for d in os.listdir(self.model_root) 1260 if "checkpoint-" in d 1261 and os.path.isdir(os.path.join(self.model_root, d)) 1262 ] 1263 1264 if not checkpoint_dirs: 1265 logging.error( 1266 f"No checkpoint directory found in {self.model_root}!" 1267 ) 1268 model_path = None 1269 else: 1270 # Sort by modification time (latest first) 1271 checkpoint_dirs.sort( 1272 key=lambda d: os.path.getmtime( 1273 os.path.join(self.model_root, d) 1274 ), 1275 reverse=True, 1276 ) 1277 1278 if len(checkpoint_dirs) > 1: 1279 logging.warning( 1280 f"Multiple checkpoint directories found: {checkpoint_dirs}. Selecting the latest one: {checkpoint_dirs[0]}." 1281 ) 1282 1283 selected_checkpoint = checkpoint_dirs[0] 1284 logging.info( 1285 f"Loading model from disk: {self.model_root}/{selected_checkpoint}" 1286 ) 1287 model_path = os.path.join(self.model_root, selected_checkpoint) 1288 1289 image_processor = AutoProcessor.from_pretrained(model_path) 1290 model = AutoModelForObjectDetection.from_pretrained(model_path) 1291 except Exception as e: 1292 logging.error( 1293 f"Model {self.model_name} could not be loaded from folder {self.model_root}/{selected_checkpoint}. Inference not possible." 1294 ) 1295 1296 device, _, _ = get_backend() 1297 logging.info(f"Using device {device} for inference.") 1298 model = model.to(device) 1299 model.eval() 1300 1301 pred_key = f"pred_od_{self.model_name_key}-{dataset_name}" 1302 1303 if inference_settings["inference_on_test"] is True: 1304 INFERENCE_SPLITS = ["test"] 1305 dataset_eval_view = self.dataset.match_tags(INFERENCE_SPLITS) 1306 else: 1307 dataset_eval_view = self.dataset 1308 1309 detection_threshold = inference_settings["detection_threshold"] 1310 1311 with torch.amp.autocast("cuda"), torch.inference_mode(): 1312 for sample in dataset_eval_view.iter_samples(progress=True, autosave=True): 1313 image_width = sample.metadata.width 1314 image_height = sample.metadata.height 1315 img_filepath = sample.filepath 1316 1317 image = Image.open(img_filepath) 1318 inputs = image_processor(images=[image], return_tensors="pt") 1319 outputs = model(**inputs.to(device)) 1320 target_sizes = torch.tensor([[image.size[1], image.size[0]]]) 1321 1322 results = image_processor.post_process_object_detection( 1323 outputs, threshold=detection_threshold, target_sizes=target_sizes 1324 )[0] 1325 1326 detections = [] 1327 for score, label, box in zip( 1328 results["scores"], results["labels"], results["boxes"] 1329 ): 1330 # Bbox is in absolute coordinates x, y, x2, y2 1331 box = box.tolist() 1332 text_label = model.config.id2label[label.item()] 1333 1334 # Voxel51 requires relative coordinates between 0 and 1 1335 top_left_x = box[0] / image_width 1336 top_left_y = box[1] / image_height 1337 box_width = (box[2] - box[0]) / image_width 1338 box_height = (box[3] - box[1]) / image_height 1339 detection = fo.Detection( 1340 label=text_label, 1341 bounding_box=[ 1342 top_left_x, 1343 top_left_y, 1344 box_width, 1345 box_height, 1346 ], 1347 confidence=score.item(), 1348 ) 1349 detections.append(detection) 1350 1351 sample[pred_key] = fo.Detections(detections=detections) 1352 1353 if inference_settings["do_eval"] is True: 1354 eval_key = re.sub( 1355 r"[\W-]+", "_", "eval_" + self.model_name + "_" + self.dataset_name 1356 ) 1357 1358 if inference_settings["inference_on_test"] is True: 1359 dataset_view = self.dataset.match_tags(["test"]) 1360 else: 1361 dataset_view = self.dataset 1362 1363 dataset_view.evaluate_detections( 1364 pred_key, 1365 gt_field=gt_field, 1366 eval_key=eval_key, 1367 compute_mAP=True, 1368 ) 1369 1370 1371class CustomCoDETRObjectDetection: 1372 """Interface for running Co-DETR object detection model training and inference in containers""" 1373 1374 def __init__(self, dataset, dataset_info, run_config): 1375 """Initialize Co-DETR interface with dataset and configuration""" 1376 self.root_codetr = "./custom_models/CoDETR/Co-DETR" 1377 self.root_codetr_models = "output/models/codetr" 1378 self.dataset = dataset 1379 self.dataset_name = dataset_info["name"] 1380 self.export_dir_root = run_config["export_dataset_root"] 1381 self.config_key = os.path.splitext(os.path.basename(run_config["config"]))[0] 1382 self.hf_repo_name = f"{HF_ROOT}/{self.dataset_name}_{self.config_key}" 1383 1384 def convert_data(self): 1385 """Convert dataset to COCO format required by Co-DETR""" 1386 1387 export_dir = os.path.join(self.export_dir_root, self.dataset_name, "coco") 1388 1389 # Check if folder already exists 1390 if not os.path.exists(export_dir): 1391 # Make directory 1392 os.makedirs(export_dir, exist_ok=True) 1393 logging.info(f"Exporting data to {export_dir}") 1394 splits = [ 1395 "train", 1396 "val", 1397 "test", 1398 ] # CoDETR expects data in 'train' and 'val' folder 1399 for split in splits: 1400 split_view = self.dataset.match_tags(split) 1401 split_view.export( 1402 dataset_type=fo.types.COCODetectionDataset, 1403 data_path=os.path.join(export_dir, f"{split}2017"), 1404 labels_path=os.path.join( 1405 export_dir, "annotations", f"instances_{split}2017.json" 1406 ), 1407 label_field="ground_truth", 1408 ) 1409 else: 1410 logging.warning( 1411 f"Folder {export_dir} already exists, skipping data export." 1412 ) 1413 1414 def update_config_file(self, dataset_name, config_file, max_epochs): 1415 """Update Co-DETR config file with dataset-specific parameters""" 1416 1417 config_path = os.path.join(self.root_codetr, config_file) 1418 1419 # Get classes from exported data 1420 annotations_json = os.path.join( 1421 self.export_dir_root, 1422 dataset_name, 1423 "coco/annotations/instances_train2017.json", 1424 ) 1425 # Read the JSON file 1426 with open(annotations_json, "r") as file: 1427 data = json.load(file) 1428 1429 # Extract the value associated with the key "categories" 1430 categories = data.get("categories") 1431 class_names = tuple(category["name"] for category in categories) 1432 num_classes = len(class_names) 1433 1434 # Update configuration file 1435 # This assumes that 'classes = '('a','b',...)' are already defined and will be overwritten. 1436 with open(config_path, "r") as file: 1437 content = file.read() 1438 1439 # Update the classes tuple 1440 content = re.sub(r"classes\s*=\s*\(.*?\)", f"classes = {class_names}", content) 1441 1442 # Update all instances of num_classes 1443 content = re.sub(r"num_classes=\d+", f"num_classes={num_classes}", content) 1444 1445 # Update all instances of max_epochs 1446 content = re.sub(r"max_epochs=\d+", f"max_epochs={max_epochs}", content) 1447 1448 with open(config_path, "w") as file: 1449 file.write(content) 1450 1451 logging.warning( 1452 f"Updated {config_path} with classes={class_names} and num_classes={num_classes} and max_epochs={max_epochs}" 1453 ) 1454 1455 def train(self, param_config, param_n_gpus, container_tool, param_function="train"): 1456 """Train Co-DETR model using containerized environment""" 1457 1458 # Check if model already exists 1459 output_folder_codetr = os.path.join(self.root_codetr, "output") 1460 os.makedirs(output_folder_codetr, exist_ok=True) 1461 param_config_name = os.path.splitext(os.path.basename(param_config))[0] 1462 best_models_dir = os.path.join(output_folder_codetr, "best") 1463 os.makedirs(best_models_dir, exist_ok=True) 1464 # Best model files follow the naming scheme "config_dataset.pth" 1465 pth_model_files = ( 1466 [f for f in os.listdir(best_models_dir) if f.endswith(".pth")] 1467 if os.path.exists(best_models_dir) and os.path.isdir(best_models_dir) 1468 else [] 1469 ) 1470 1471 # Best model files are stored in the format "config_dataset.pth" 1472 matching_files = [ 1473 f 1474 for f in pth_model_files 1475 if f.startswith(param_config_name) 1476 and self.dataset_name in f 1477 and f.endswith(".pth") 1478 ] 1479 if len(matching_files) > 0: 1480 logging.warning( 1481 f"Model {param_config_name} already trained on dataset {self.dataset_name}. Skipping training." 1482 ) 1483 if len(matching_files) > 1: 1484 logging.warning(f"Multiple weights found: {matching_files}") 1485 else: 1486 logging.info( 1487 f"Launching training for Co-DETR config {param_config} and dataset {self.dataset_name}." 1488 ) 1489 volume_data = os.path.join(self.export_dir_root, self.dataset_name) 1490 1491 # Train model, store checkpoints in 'output_folder_codetr' 1492 train_result = self._run_container( 1493 volume_data=volume_data, 1494 param_function=param_function, 1495 param_config=param_config, 1496 param_n_gpus=param_n_gpus, 1497 container_tool=container_tool, 1498 ) 1499 1500 # Find the best_bbox checkpoint file 1501 checkpoint_files = [ 1502 f 1503 for f in os.listdir(output_folder_codetr) 1504 if "best_bbox" in f and f.endswith(".pth") 1505 ] 1506 if not checkpoint_files: 1507 logging.error( 1508 "Co-DETR was not trained, model pth file missing. No checkpoint file with 'best_bbox' found." 1509 ) 1510 else: 1511 if len(checkpoint_files) > 1: 1512 logging.warning( 1513 f"Found {len(checkpoint_files)} checkpoint files. Selecting {checkpoint_files[0]}." 1514 ) 1515 checkpoint = checkpoint_files[0] 1516 checkpoint_path = os.path.join(output_folder_codetr, checkpoint) 1517 logging.info("Co-DETR was trained successfully.") 1518 1519 # Upload best model to Hugging Face 1520 if HF_DO_UPLOAD == True: 1521 logging.info("Uploading Co-DETR model to Hugging Face.") 1522 api = HfApi() 1523 api.create_repo( 1524 self.hf_repo_name, 1525 private=True, 1526 repo_type="model", 1527 exist_ok=True, 1528 ) 1529 api.upload_file( 1530 path_or_fileobj=checkpoint_path, 1531 path_in_repo="model.pth", 1532 repo_id=self.hf_repo_name, 1533 repo_type="model", 1534 ) 1535 1536 # Move best model file and clear output folder 1537 self._run_container( 1538 volume_data=volume_data, 1539 param_function="clear-output", 1540 param_config=param_config, 1541 param_dataset_name=self.dataset_name, 1542 container_tool=container_tool, 1543 ) 1544 1545 @staticmethod 1546 def _find_file_iteratively(start_path, filename): 1547 """Direct access or recursively search for a file in a directory structure.""" 1548 # Convert start_path to a Path object 1549 start_path = Path(start_path) 1550 1551 # Check if the file exists in the start_path directly (very fast) 1552 file_path = start_path / filename 1553 if file_path.exists(): 1554 return str(file_path) 1555 1556 # Start with the highest directory and go up iteratively 1557 current_dir = start_path 1558 checked_dirs = set() 1559 1560 while current_dir != current_dir.root: 1561 # Check if the file is in the current directory 1562 file_path = current_dir / filename 1563 if file_path.exists(): 1564 return str(file_path) 1565 1566 # If we haven't checked the sibling directories, check them as well 1567 parent_dir = current_dir.parent 1568 if parent_dir not in checked_dirs: 1569 # Check sibling directories 1570 for sibling in parent_dir.iterdir(): 1571 if sibling != current_dir and sibling.is_dir(): 1572 sibling_file_path = sibling / filename 1573 if sibling_file_path.exists(): 1574 return str(sibling_file_path) 1575 checked_dirs.add(parent_dir) 1576 1577 # Otherwise, go one level up 1578 current_dir = current_dir.parent 1579 1580 # If file is not found after traversing all levels, return None 1581 logging.error(f"File {filename} could not be found.") 1582 return None 1583 1584 def run_inference( 1585 self, 1586 dataset, 1587 param_config, 1588 param_n_gpus, 1589 container_tool, 1590 inference_settings, 1591 param_function="inference", 1592 inference_output_folder="custom_models/CoDETR/Co-DETR/output/inference/", 1593 gt_field="ground_truth", 1594 ): 1595 """Run inference using trained Co-DETR model and convert results to FiftyOne format""" 1596 1597 logging.info(f"Launching inference for Co-DETR config {param_config}.") 1598 volume_data = os.path.join(self.export_dir_root, self.dataset_name) 1599 1600 if inference_settings["inference_on_test"] is True: 1601 folder_inference = os.path.join("coco", "test2017") 1602 else: 1603 folder_inference = os.path.join("coco") 1604 1605 # Get model from Hugging Face 1606 dataset_name = None 1607 config_key = None 1608 try: 1609 if inference_settings["model_hf"] is None: 1610 hf_path = self.hf_repo_name 1611 else: 1612 hf_path = inference_settings["model_hf"] 1613 1614 dataset_name, config_key = get_dataset_and_model_from_hf_id(hf_path) 1615 1616 download_folder = os.path.join( 1617 self.root_codetr_models, dataset_name, config_key 1618 ) 1619 1620 logging.info( 1621 f"Downloading model {hf_path} from Hugging Face into {download_folder}" 1622 ) 1623 os.makedirs(download_folder, exist_ok=True) 1624 1625 file_path = hf_hub_download( 1626 repo_id=hf_path, 1627 filename="model.pth", 1628 local_dir=download_folder, 1629 ) 1630 except Exception as e: 1631 logging.error(f"An error occured during model download: {e}") 1632 1633 model_path = os.path.join(dataset_name, config_key, "model.pth") 1634 logging.info(f"Starting inference for model {model_path}") 1635 1636 inference_result = self._run_container( 1637 volume_data=volume_data, 1638 param_function=param_function, 1639 param_config=param_config, 1640 param_n_gpus=param_n_gpus, 1641 container_tool=container_tool, 1642 param_inference_dataset_folder=folder_inference, 1643 param_inference_model_checkpoint=model_path, 1644 ) 1645 1646 # Convert results from JSON output into V51 dataset 1647 # Files follow format inference_results_{timestamp}.json (run_inference.py) 1648 os.makedirs(inference_output_folder, exist_ok=True) 1649 output_files = [ 1650 f 1651 for f in os.listdir(inference_output_folder) 1652 if f.startswith("inference_results_") and f.endswith(".json") 1653 ] 1654 logging.debug(f"Found files with inference content: {output_files}") 1655 1656 if not output_files: 1657 logging.error( 1658 f"No inference result files found in {inference_output_folder}" 1659 ) 1660 1661 # Get full path for each file 1662 file_paths = [os.path.join(inference_output_folder, f) for f in output_files] 1663 1664 # Extract timestamp from the filename and sort based on the timestamp 1665 file_paths_sorted = sorted( 1666 file_paths, 1667 key=lambda f: datetime.datetime.strptime( 1668 f.split("_")[-2] + "_" + f.split("_")[-1].replace(".json", ""), 1669 "%Y%m%d_%H%M%S", 1670 ), 1671 reverse=True, 1672 ) 1673 1674 # Use the most recent file based on timestamp 1675 latest_file = file_paths_sorted[0] 1676 logging.info(f"Using inference results from: {latest_file}") 1677 with open(latest_file, "r") as file: 1678 data = json.load(file) 1679 1680 # Get conversion for annotated classes 1681 annotations_path = os.path.join( 1682 volume_data, "coco", "annotations", "instances_train2017.json" 1683 ) 1684 1685 with open(annotations_path, "r") as file: 1686 data_annotations = json.load(file) 1687 1688 class_ids_and_names = [ 1689 (category["id"], category["name"]) 1690 for category in data_annotations["categories"] 1691 ] 1692 1693 # Match sample filepaths (from exported Co-DETR COCO format) to V51 filepaths 1694 sample = dataset.first() 1695 root_dir_samples = sample.filepath 1696 1697 # Convert results into V51 file format 1698 detection_threshold = inference_settings["detection_threshold"] 1699 pred_key = f"pred_od_{config_key}-{dataset_name}" 1700 for key, value in tqdm(data.items(), desc="Processing Co-DETR detection"): 1701 try: 1702 # Get filename 1703 filepath = CustomCoDETRObjectDetection._find_file_iteratively( 1704 root_dir_samples, os.path.basename(key) 1705 ) 1706 sample = dataset[filepath] 1707 1708 img_width = sample.metadata.width 1709 img_height = sample.metadata.height 1710 1711 detections_v51 = [] 1712 for class_id, class_detections in enumerate(data[key]): # Starts with 0 1713 if len(class_detections) > 0: 1714 objects_class = class_ids_and_names[class_id] 1715 for detection in class_detections: 1716 confidence = detection[4] 1717 detection_v51 = fo.Detection( 1718 label=objects_class[1], 1719 bounding_box=[ 1720 detection[0] / img_width, 1721 detection[1] / img_height, 1722 (detection[2] - detection[0]) / img_width, 1723 (detection[3] - detection[1]) / img_height, 1724 ], 1725 confidence=confidence, 1726 ) 1727 if confidence >= detection_threshold: 1728 detections_v51.append(detection_v51) 1729 1730 sample[pred_key] = fo.Detections(detections=detections_v51) 1731 sample.save() 1732 except Exception as e: 1733 logging.error( 1734 f"An error occured during the conversion of Co-DETR inference results to the V51 dataset: {e}" 1735 ) 1736 1737 # Run V51 evaluation 1738 if inference_settings["do_eval"] is True: 1739 eval_key = pred_key.replace("pred_", "eval_").replace("-", "_") 1740 1741 if inference_settings["inference_on_test"] is True: 1742 dataset_view = dataset.match_tags(["test"]) 1743 else: 1744 dataset_view = dataset 1745 1746 logging.info( 1747 f"Starting evaluation for {pred_key} in evaluation key {eval_key}." 1748 ) 1749 dataset_view.evaluate_detections( 1750 pred_key, 1751 gt_field=gt_field, 1752 eval_key=eval_key, 1753 compute_mAP=True, 1754 ) 1755 1756 def _run_container( 1757 self, 1758 volume_data, 1759 param_function, 1760 param_config="", 1761 param_n_gpus="1", 1762 param_dataset_name="", 1763 param_inference_dataset_folder="", 1764 param_inference_model_checkpoint="", 1765 image="dbogdollresearch/codetr", 1766 workdir="/launch", 1767 container_tool="docker", 1768 ): 1769 """Execute Co-DETR container with specified parameters using Docker or Singularity""" 1770 1771 try: 1772 # Convert relative paths to absolute paths (necessary under WSL2) 1773 root_codetr_abs = os.path.abspath(self.root_codetr) 1774 volume_data_abs = os.path.abspath(volume_data) 1775 root_codetr_models_abs = os.path.abspath(self.root_codetr_models) 1776 1777 # Check if using Docker or Singularity and define the appropriate command 1778 if container_tool == "docker": 1779 command = [ 1780 "docker", 1781 "run", 1782 "--gpus", 1783 "all", 1784 "--workdir", 1785 workdir, 1786 "--volume", 1787 f"{root_codetr_abs}:{workdir}", 1788 "--volume", 1789 f"{volume_data_abs}:{workdir}/data:ro", 1790 "--volume", 1791 f"{root_codetr_models_abs}:{workdir}/hf_models:ro", 1792 "--shm-size=8g", 1793 image, 1794 param_function, 1795 param_config, 1796 param_n_gpus, 1797 param_dataset_name, 1798 param_inference_dataset_folder, 1799 param_inference_model_checkpoint, 1800 ] 1801 elif container_tool == "singularity": 1802 command = [ 1803 "singularity", 1804 "run", 1805 "--nv", 1806 "--pwd", 1807 workdir, 1808 "--bind", 1809 f"{self.root_codetr}:{workdir}", 1810 "--bind", 1811 f"{volume_data}:{workdir}/data:ro", 1812 "--bind", 1813 f"{self.root_codetr_models}:{workdir}/hf_models:ro", 1814 f"docker://{image}", 1815 param_function, 1816 param_config, 1817 param_n_gpus, 1818 param_dataset_name, 1819 param_inference_dataset_folder, 1820 param_inference_model_checkpoint, 1821 ] 1822 else: 1823 raise ValueError( 1824 f"Invalid container tool specified: {container_tool}. Choose 'docker' or 'singularity'." 1825 ) 1826 1827 # Start the process and stream outputs to the console 1828 logging.info(f"Launching terminal command {command}") 1829 with subprocess.Popen( 1830 command, stdout=sys.stdout, stderr=sys.stderr, text=True 1831 ) as proc: 1832 proc.wait() # Wait for the process to complete 1833 return True 1834 except Exception as e: 1835 logging.error(f"Error during Co-DETR container run: {e}") 1836 return False
60def get_dataset_and_model_from_hf_id(hf_id: str): 61 """Extract dataset and model name from HuggingFace ID by matching against supported datasets.""" 62 # HF ID follows structure organization/dataset_model 63 # Both dataset and model can contain "_" as well 64 65 # Remove organization (everything before the first "/") 66 hf_id = hf_id.split("/", 1)[-1] 67 68 # Find all dataset names that appear in hf_id 69 supported_datasets = get_supported_datasets() 70 matches = [ 71 dataset_name for dataset_name in supported_datasets if dataset_name in hf_id 72 ] 73 74 if not matches: 75 logging.warning( 76 f"Dataset name could not be extracted from Hugging Face ID {hf_id}" 77 ) 78 dataset_name = "no_dataset_name" 79 else: 80 # Return the longest match (most specific) 81 dataset_name = max(matches, key=len) 82 83 # Get model name by removing dataset name from hf_id 84 model_name = hf_id.replace(dataset_name, "").strip("_") 85 if not model_name: 86 logging.warning( 87 f"Model name could not be extracted from Hugging Face ID {hf_id}" 88 ) 89 model_name = "no_model_name" 90 91 return dataset_name, model_name
Extract dataset and model name from HuggingFace ID by matching against supported datasets.
95class TimeoutException(Exception): 96 """Custom exception for handling dataloader timeouts.""" 97 98 pass
Custom exception for handling dataloader timeouts.
105class ZeroShotInferenceCollateFn: 106 """Collate function for zero-shot inference that prepares batches for model input.""" 107 108 def __init__( 109 self, 110 hf_model_config_name, 111 hf_processor, 112 batch_size, 113 object_classes, 114 batch_classes, 115 ): 116 """Initialize the auto labeling model with the Hugging Face model config, processor, batch size, object classes, and batch classes.""" 117 try: 118 self.hf_model_config_name = hf_model_config_name 119 self.processor = hf_processor 120 self.batch_size = batch_size 121 self.object_classes = object_classes 122 self.batch_classes = batch_classes 123 except Exception as e: 124 logging.error(f"Error in collate init of DataLoader: {e}") 125 126 def __call__(self, batch): 127 """Processes a batch of data by preparing images and labels for model input.""" 128 try: 129 images, labels = zip(*batch) 130 target_sizes = [tuple(img.shape[1:]) for img in images] 131 132 # Adjustments for final batch 133 n_images = len(images) 134 if n_images < self.batch_size: 135 self.batch_classes = [self.object_classes] * n_images 136 137 # Apply PIL transformation for specific models 138 if self.hf_model_config_name == "OmDetTurboConfig": 139 images = [to_pil_image(image) for image in images] 140 141 inputs = self.processor( 142 text=self.batch_classes, 143 images=images, 144 return_tensors="pt", 145 padding=True, # Allow for differently sized images 146 ) 147 148 return inputs, labels, target_sizes, self.batch_classes 149 except Exception as e: 150 logging.error(f"Error in collate function of DataLoader: {e}")
Collate function for zero-shot inference that prepares batches for model input.
108 def __init__( 109 self, 110 hf_model_config_name, 111 hf_processor, 112 batch_size, 113 object_classes, 114 batch_classes, 115 ): 116 """Initialize the auto labeling model with the Hugging Face model config, processor, batch size, object classes, and batch classes.""" 117 try: 118 self.hf_model_config_name = hf_model_config_name 119 self.processor = hf_processor 120 self.batch_size = batch_size 121 self.object_classes = object_classes 122 self.batch_classes = batch_classes 123 except Exception as e: 124 logging.error(f"Error in collate init of DataLoader: {e}")
Initialize the auto labeling model with the Hugging Face model config, processor, batch size, object classes, and batch classes.
153class ZeroShotObjectDetection: 154 """Zero-shot object detection using various HuggingFace models with multi-GPU support.""" 155 156 def __init__( 157 self, 158 dataset_torch: torch.utils.data.Dataset, 159 dataset_info, 160 config, 161 detections_path="./output/detections/", 162 log_root="./logs/", 163 ): 164 """Initialize the zero-shot object detection labeler with dataset, configuration, and path settings.""" 165 self.dataset_torch = dataset_torch 166 self.dataset_info = dataset_info 167 self.dataset_name = dataset_info["name"] 168 self.object_classes = config["object_classes"] 169 self.detection_threshold = config["detection_threshold"] 170 self.detections_root = os.path.join(detections_path, self.dataset_name) 171 self.tensorboard_root = os.path.join( 172 log_root, "tensorboard/zeroshot_object_detection" 173 ) 174 175 logging.info(f"Zero-shot models will look for {self.object_classes}") 176 177 def exclude_stored_predictions( 178 self, dataset_v51: fo.Dataset, config, do_exclude=False 179 ): 180 """Checks for existing predictions and loads them from disk if available.""" 181 dataset_schema = dataset_v51.get_field_schema() 182 models_splits_dict = {} 183 for model_name, value in config["hf_models_zeroshot_objectdetection"].items(): 184 model_name_key = re.sub(r"[\W-]+", "_", model_name) 185 pred_key = re.sub( 186 r"[\W-]+", "_", "pred_zsod_" + model_name 187 ) # od for Object Detection 188 # Check if data already stored in V51 dataset 189 if pred_key in dataset_schema and do_exclude is True: 190 logging.warning( 191 f"Skipping model {model_name}. Predictions already stored in Voxel51 dataset." 192 ) 193 # Check if data already stored on disk 194 elif ( 195 os.path.isdir(os.path.join(self.detections_root, model_name_key)) 196 and do_exclude is True 197 ): 198 try: 199 logging.info(f"Loading {model_name} predictions from disk.") 200 temp_dataset = fo.Dataset.from_dir( 201 dataset_dir=os.path.join(self.detections_root, model_name_key), 202 dataset_type=fo.types.COCODetectionDataset, 203 name="temp_dataset", 204 data_path="data.json", 205 ) 206 207 # Copy all detections from stored dataset into our dataset 208 detections = temp_dataset.values("detections.detections") 209 add_sample_field( 210 dataset_v51, 211 pred_key, 212 fo.EmbeddedDocumentField, 213 embedded_doc_type=fo.Detections, 214 ) 215 dataset_v51.set_values(f"{pred_key}.detections", detections) 216 except Exception as e: 217 logging.error( 218 f"Data in {os.path.join(self.detections_root, model_name_key)} could not be loaded. Error: {e}" 219 ) 220 finally: 221 fo.delete_dataset("temp_dataset") 222 # Assign model to be run 223 else: 224 models_splits_dict[model_name] = value 225 226 logging.info(f"Models to be run: {models_splits_dict}") 227 return models_splits_dict 228 229 # Worker functions 230 def update_queue_sizes_worker( 231 self, queues, queue_sizes, largest_queue_index, max_queue_size 232 ): 233 """Monitor and manage multiple result queues for balanced processing.""" 234 experiment_name = f"queue_size_monitor_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}" 235 log_directory = os.path.join( 236 self.tensorboard_root, self.dataset_name, experiment_name 237 ) 238 wandb.tensorboard.patch(root_logdir=log_directory) 239 if WANDB_ACTIVE: 240 wandb.init( 241 name=f"queue_size_monitor_{os.getpid()}", 242 job_type="inference", 243 project="Zero Shot Object Detection", 244 ) 245 writer = SummaryWriter(log_dir=log_directory) 246 247 step = 0 248 249 while True: 250 for i, queue in enumerate(queues): 251 queue_sizes[i] = queue.qsize() 252 writer.add_scalar(f"queue_size/items/{i}", queue_sizes[i], step) 253 254 step += 1 255 256 # Find the index of the largest queue 257 max_size = max(queue_sizes) 258 max_index = queue_sizes.index(max_size) 259 260 # Calculate the total size of all queues 261 total_size = sum(queue_sizes) 262 263 # If total_size is greater than 0, calculate the probabilities 264 if total_size > 0: 265 # Normalize the queue sizes by the max_queue_size 266 normalized_sizes = [size / max_queue_size for size in queue_sizes] 267 268 # Calculate probabilities based on normalized sizes 269 probabilities = [ 270 size / sum(normalized_sizes) for size in normalized_sizes 271 ] 272 273 # Use random.choices with weights (probabilities) 274 chosen_queue_index = random.choices( 275 range(len(queues)), weights=probabilities, k=1 276 )[0] 277 278 largest_queue_index.value = chosen_queue_index 279 else: 280 largest_queue_index.value = max_index 281 282 time.sleep(0.1) 283 284 def process_outputs_worker( 285 self, 286 result_queues, 287 largest_queue_index, 288 inference_finished, 289 max_queue_size, 290 wandb_activate=False, 291 ): 292 """Process model outputs from result queues and save to dataset.""" 293 configure_logging() 294 logging.info(f"Process ID: {os.getpid()}. Results processing process started") 295 dataset_v51 = fo.load_dataset(self.dataset_name) 296 processing_successful = None 297 298 # Logging 299 experiment_name = f"post_process_{os.getpid()}_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}" 300 log_directory = os.path.join( 301 self.tensorboard_root, self.dataset_name, experiment_name 302 ) 303 wandb.tensorboard.patch(root_logdir=log_directory) 304 if WANDB_ACTIVE and wandb_activate: 305 wandb.init( 306 name=f"post_process_{os.getpid()}", 307 job_type="inference", 308 project="Zero Shot Object Detection", 309 ) 310 writer = SummaryWriter(log_dir=log_directory) 311 n_processed_images = 0 312 313 logging.info(f"Post-Processor {os.getpid()} starting loop.") 314 315 while True: 316 results_queue = result_queues[largest_queue_index.value] 317 writer.add_scalar( 318 f"post_processing/selected_queue", 319 largest_queue_index.value, 320 n_processed_images, 321 ) 322 323 if results_queue.qsize() == max_queue_size: 324 logging.warning( 325 f"Queue full: {results_queue.qsize()}. Consider increasing number of post-processing workers." 326 ) 327 328 # Exit only when inference is finished and the queue is empty 329 if inference_finished.value and results_queue.empty(): 330 dataset_v51.save() 331 logging.info( 332 f"Post-processing worker {os.getpid()} has finished all outputs." 333 ) 334 break 335 336 # Process results from the queue if available 337 if not results_queue.empty(): 338 try: 339 time_start = time.time() 340 341 result = results_queue.get_nowait() 342 343 processing_successful = self.process_outputs( 344 dataset_v51, 345 result, 346 self.object_classes, 347 self.detection_threshold, 348 ) 349 350 # Performance logging 351 n_images = len(result["labels"]) 352 time_end = time.time() 353 duration = time_end - time_start 354 batches_per_second = 1 / duration 355 frames_per_second = batches_per_second * n_images 356 n_processed_images += n_images 357 writer.add_scalar( 358 f"post_processing/frames_per_second", 359 frames_per_second, 360 n_processed_images, 361 ) 362 363 del result # Explicit removal from device 364 365 except Exception as e: 366 continue 367 368 else: 369 continue 370 371 writer.close() 372 wandb.finish(exit_code=0) 373 return processing_successful # Return last processing status 374 375 def gpu_worker( 376 self, 377 gpu_id, 378 cpu_cores, 379 task_queue, 380 results_queue, 381 done_event, 382 post_processing_finished, 383 set_cpu_affinity=False, 384 ): 385 """Run model inference on specified GPU with dedicated CPU cores.""" 386 dataset_v51 = fo.load_dataset( 387 self.dataset_name 388 ) # NOTE Only for the case of sequential processing 389 configure_logging() 390 # Set CPU 391 if set_cpu_affinity: 392 # Allow only certain CPU cores 393 psutil.Process().cpu_affinity(cpu_cores) 394 logging.info(f"Available CPU cores: {psutil.Process().cpu_affinity()}") 395 max_n_cpus = len(cpu_cores) 396 torch.set_num_threads(max_n_cpus) 397 398 # Set GPU 399 logging.info(f"GPU {gpu_id}: {torch.cuda.get_device_name(gpu_id)}") 400 device = torch.device(f"cuda:{gpu_id}") 401 402 run_successful = None 403 with torch.cuda.device(gpu_id): 404 while True: 405 if post_processing_finished.value and task_queue.empty(): 406 # Keep alive until post-processing is done 407 break 408 409 if task_queue.empty(): 410 done_event.set() 411 412 if not task_queue.empty(): 413 try: 414 task_metadata = task_queue.get( 415 timeout=5 416 ) # Timeout to prevent indefinite blocking 417 except Exception as e: 418 break # Exit if no more tasks 419 run_successful = self.model_inference( 420 task_metadata, 421 device, 422 self.dataset_torch, 423 dataset_v51, 424 self.object_classes, 425 results_queue, 426 self.tensorboard_root, 427 ) 428 logging.info( 429 f"Worker for GPU {gpu_id} finished run successful: {run_successful}" 430 ) 431 else: 432 continue 433 return run_successful # Return last processing status 434 435 def eval_and_export_worker(self, models_ready_queue, n_models): 436 """Evaluate model performance and export results for completed models.""" 437 configure_logging() 438 logging.info(f"Process ID: {os.getpid()}. Eval-and-export process started") 439 440 dataset = fo.load_dataset(self.dataset_name) 441 run_successful = None 442 models_done = 0 443 444 while True: 445 if not models_ready_queue.empty(): 446 try: 447 dict = models_ready_queue.get( 448 timeout=5 449 ) # Timeout to prevent indefinite blocking 450 model_name = dict["model_name"] 451 pred_key = re.sub(r"[\W-]+", "_", "pred_zsod_" + model_name) 452 eval_key = re.sub(r"[\W-]+", "_", "eval_zsod_" + model_name) 453 dataset.reload() 454 run_successful = self.eval_and_export( 455 dataset, model_name, pred_key, eval_key 456 ) 457 models_done += 1 458 logging.info( 459 f"Evaluation and export of {models_done}/{n_models} models done." 460 ) 461 except Exception as e: 462 logging.error(f"Error in eval-and-export worker: {e}") 463 continue 464 465 if models_done == n_models: 466 break 467 468 return run_successful 469 470 # Functionality functions 471 def model_inference( 472 self, 473 metadata: dict, 474 device: str, 475 dataset: torch.utils.data.Dataset, 476 dataset_v51: fo.Dataset, 477 object_classes: list, 478 results_queue: Union[queue.Queue, mp.Queue], 479 root_log_dir: str, 480 persistent_workers: bool = False, 481 ): 482 """Model inference method running zero-shot object detection on provided dataset and device, returning success status.""" 483 writer = None 484 run_successful = True 485 processor, model, inputs, outputs, result, dataloader = ( 486 None, 487 None, 488 None, 489 None, 490 None, 491 None, 492 ) # For finally block 493 494 # Timeout handler 495 dataloader_timeout = 60 496 signal.signal(signal.SIGALRM, timeout_handler) 497 498 try: 499 # Metadata 500 run_id = metadata["run_id"] 501 model_name = metadata["model_name"] 502 dataset_name = metadata["dataset_name"] 503 is_subset = metadata["is_subset"] 504 batch_size = metadata["batch_size"] 505 506 logging.info( 507 f"Process ID: {os.getpid()}, Run ID: {run_id}, Device: {device}, Model: {model_name}" 508 ) 509 510 # Load the model 511 logging.info(f"Loading model {model_name}") 512 processor = AutoProcessor.from_pretrained(model_name, use_fast=True) 513 model = AutoModelForZeroShotObjectDetection.from_pretrained(model_name) 514 model = model.to(device, non_blocking=True) 515 model.eval() 516 hf_model_config = AutoConfig.from_pretrained(model_name) 517 hf_model_config_name = type(hf_model_config).__name__ 518 batch_classes = [object_classes] * batch_size 519 logging.info(f"Loaded model type {hf_model_config_name}") 520 521 # Dataloader 522 logging.info("Generating dataloader") 523 if is_subset: 524 chunk_index_start = metadata["chunk_index_start"] 525 chunk_index_end = metadata["chunk_index_end"] 526 logging.info(f"Length of dataset: {len(dataset)}") 527 logging.info(f"Subset start index: {chunk_index_start}") 528 logging.info(f"Subset stop index: {chunk_index_end}") 529 dataset = Subset(dataset, range(chunk_index_start, chunk_index_end)) 530 531 zero_shot_inference_preprocessing = ZeroShotInferenceCollateFn( 532 hf_model_config_name=hf_model_config_name, 533 hf_processor=processor, 534 object_classes=object_classes, 535 batch_size=batch_size, 536 batch_classes=batch_classes, 537 ) 538 num_workers = WORKFLOWS["auto_labeling_zero_shot"]["n_worker_dataloader"] 539 prefetch_factor = WORKFLOWS["auto_labeling_zero_shot"][ 540 "prefetch_factor_dataloader" 541 ] 542 dataloader = DataLoader( 543 dataset, 544 batch_size=batch_size, 545 shuffle=False, 546 num_workers=num_workers, 547 persistent_workers=persistent_workers, 548 pin_memory=True, 549 prefetch_factor=prefetch_factor, 550 collate_fn=zero_shot_inference_preprocessing, 551 ) 552 553 dataloader_length = len(dataloader) 554 if dataloader_length < 1: 555 logging.error( 556 f"Dataloader has insufficient data: {dataloader_length} entries. Please check your dataset and DataLoader configuration." 557 ) 558 559 # Logging 560 experiment_name = f"{model_name}_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}_{device}" 561 log_directory = os.path.join(root_log_dir, dataset_name, experiment_name) 562 wandb.tensorboard.patch(root_logdir=log_directory) 563 if WANDB_ACTIVE: 564 wandb.init( 565 name=f"{model_name}_{device}", 566 job_type="inference", 567 project="Zero Shot Object Detection", 568 config=metadata, 569 ) 570 writer = SummaryWriter(log_dir=log_directory) 571 572 # Inference Loop 573 logging.info(f"{os.getpid()}: Starting inference loop5") 574 n_processed_images = 0 575 for inputs, labels, target_sizes, batch_classes in tqdm( 576 dataloader, desc="Inference Loop" 577 ): 578 signal.alarm(dataloader_timeout) 579 try: 580 time_start = time.time() 581 n_images = len(labels) 582 inputs = inputs.to(device, non_blocking=True) 583 584 with torch.amp.autocast("cuda"), torch.inference_mode(): 585 outputs = model(**inputs) 586 587 result = { 588 "inputs": inputs, 589 "outputs": outputs, 590 "processor": processor, 591 "target_sizes": target_sizes, 592 "labels": labels, 593 "model_name": model_name, 594 "hf_model_config_name": hf_model_config_name, 595 "batch_classes": batch_classes, 596 } 597 598 logging.debug(f"{os.getpid()}: Putting result into queue") 599 600 results_queue.put( 601 result, timeout=60 602 ) # Ditch data only after 60 seconds 603 604 # Logging 605 time_end = time.time() 606 duration = time_end - time_start 607 batches_per_second = 1 / duration 608 frames_per_second = batches_per_second * n_images 609 n_processed_images += n_images 610 logging.debug( 611 f"{os.getpid()}: Number of processes images: {n_processed_images}" 612 ) 613 writer.add_scalar( 614 f"inference/frames_per_second", 615 frames_per_second, 616 n_processed_images, 617 ) 618 619 except TimeoutException: 620 logging.warning( 621 f"Dataloader loop got stuck. Continuing with next batch." 622 ) 623 continue 624 625 finally: 626 signal.alarm(0) # Cancel the alarm 627 628 # Flawless execution 629 wandb_exit_code = 0 630 631 except Exception as e: 632 wandb_exit_code = 1 633 run_successful = False 634 logging.error(f"Error in Process {os.getpid()}: {e}") 635 finally: 636 try: 637 wandb.finish(exit_code=wandb_exit_code) 638 except: 639 pass 640 641 # Explicit removal from device 642 del ( 643 processor, 644 model, 645 inputs, 646 outputs, 647 result, 648 dataloader, 649 ) 650 651 torch.cuda.empty_cache() 652 wandb.tensorboard.unpatch() 653 if writer: 654 writer.close() 655 return run_successful 656 657 def process_outputs(self, dataset_v51, result, object_classes, detection_threshold): 658 """Process outputs from object detection models, extracting bounding boxes and labels to save to the dataset.""" 659 try: 660 inputs = result["inputs"] 661 outputs = result["outputs"] 662 target_sizes = result["target_sizes"] 663 labels = result["labels"] 664 model_name = result["model_name"] 665 hf_model_config_name = result["hf_model_config_name"] 666 batch_classes = result["batch_classes"] 667 processor = result["processor"] 668 669 # Processing output 670 if hf_model_config_name == "GroundingDinoConfig": 671 results = processor.post_process_grounded_object_detection( 672 outputs, 673 inputs.input_ids, 674 box_threshold=detection_threshold, 675 text_threshold=detection_threshold, 676 ) 677 elif hf_model_config_name in ["Owlv2Config", "OwlViTConfig"]: 678 results = processor.post_process_grounded_object_detection( 679 outputs=outputs, 680 threshold=detection_threshold, 681 target_sizes=target_sizes, 682 text_labels=batch_classes, 683 ) 684 elif hf_model_config_name == "OmDetTurboConfig": 685 results = processor.post_process_grounded_object_detection( 686 outputs, 687 text_labels=batch_classes, 688 threshold=detection_threshold, 689 nms_threshold=detection_threshold, 690 target_sizes=target_sizes, 691 ) 692 else: 693 logging.error(f"Invalid model name: {hf_model_config_name}") 694 695 if not len(results) == len(target_sizes) == len(labels): 696 logging.error( 697 f"Lengths of results, target_sizes, and labels do not match: {len(results)}, {len(target_sizes)}, {len(labels)}" 698 ) 699 for result, size, target in zip(results, target_sizes, labels): 700 boxes, scores, labels = ( 701 result["boxes"], 702 result["scores"], 703 result["text_labels"], 704 ) 705 706 img_height = size[0] 707 img_width = size[1] 708 709 detections = [] 710 for box, score, label in zip(boxes, scores, labels): 711 processing_successful = True 712 if hf_model_config_name == "GroundingDinoConfig": 713 # Outputs do not comply with given labels 714 # Grounding DINO outputs multiple pairs of object boxes and noun phrases for a given (Image, Text) pair 715 # There can be either multiple labels per output ("bike van"), incomplete ones ("motorcyc"), or broken ones ("##cic") 716 processed_label = label.split()[ 717 0 718 ] # Assume first output is the best output 719 if processed_label in object_classes: 720 label = processed_label 721 top_left_x = box[0].item() 722 top_left_y = box[1].item() 723 box_width = (box[2] - box[0]).item() 724 box_height = (box[3] - box[1]).item() 725 else: 726 matches = get_close_matches( 727 processed_label, object_classes, n=1, cutoff=0.6 728 ) 729 selected_label = matches[0] if matches else None 730 if selected_label: 731 logging.debug( 732 f"Mapped output '{processed_label}' to class '{selected_label}'" 733 ) 734 label = selected_label 735 top_left_x = box[0].item() 736 top_left_y = box[1].item() 737 box_width = (box[2] - box[0]).item() 738 box_height = (box[3] - box[1]).item() 739 else: 740 logging.debug( 741 f"Skipped detection with {hf_model_config_name} due to unclear output: {label}" 742 ) 743 processing_successful = False 744 745 elif hf_model_config_name in [ 746 "Owlv2Config", 747 "OwlViTConfig", 748 "OmDetTurboConfig", 749 ]: 750 top_left_x = box[0].item() / img_width 751 top_left_y = box[1].item() / img_height 752 box_width = (box[2].item() - box[0].item()) / img_width 753 box_height = (box[3].item() - box[1].item()) / img_height 754 755 if ( 756 processing_successful 757 ): # Skip GroundingDinoConfig labels that could not be processed 758 detection = fo.Detection( 759 label=label, 760 bounding_box=[ 761 top_left_x, 762 top_left_y, 763 box_width, 764 box_height, 765 ], 766 confidence=score.item(), 767 ) 768 detection["bbox_area"] = ( 769 detection["bounding_box"][2] * detection["bounding_box"][3] 770 ) 771 detections.append(detection) 772 773 # Attach label to V51 dataset 774 pred_key = re.sub( 775 r"[\W-]+", "_", "pred_zsod_" + model_name 776 ) # zsod Zero-Shot Object Deection 777 sample = dataset_v51[target["image_id"]] 778 sample[pred_key] = fo.Detections(detections=detections) 779 sample.save() 780 781 except Exception as e: 782 logging.error(f"Error in processing outputs: {e}") 783 processing_successful = False 784 finally: 785 return processing_successful 786 787 def eval_and_export(self, dataset_v51, model_name, pred_key, eval_key): 788 """Populate dataset with evaluation results (if ground_truth available)""" 789 try: 790 dataset_v51.evaluate_detections( 791 pred_key, 792 gt_field="ground_truth", 793 eval_key=eval_key, 794 compute_mAP=True, 795 ) 796 except Exception as e: 797 logging.warning(f"Evaluation not possible: {e}") 798 799 # Store labels https://docs.voxel51.com/api/fiftyone.core.collections.html#fiftyone.core.collections.SampleCollection.export 800 model_name_key = re.sub(r"[\W-]+", "_", model_name) 801 dataset_v51.export( 802 export_dir=os.path.join(self.detections_root, model_name_key), 803 dataset_type=fo.types.COCODetectionDataset, 804 data_path="data.json", 805 export_media=None, # "manifest", 806 label_field=pred_key, 807 progress=True, 808 ) 809 return True
Zero-shot object detection using various HuggingFace models with multi-GPU support.
156 def __init__( 157 self, 158 dataset_torch: torch.utils.data.Dataset, 159 dataset_info, 160 config, 161 detections_path="./output/detections/", 162 log_root="./logs/", 163 ): 164 """Initialize the zero-shot object detection labeler with dataset, configuration, and path settings.""" 165 self.dataset_torch = dataset_torch 166 self.dataset_info = dataset_info 167 self.dataset_name = dataset_info["name"] 168 self.object_classes = config["object_classes"] 169 self.detection_threshold = config["detection_threshold"] 170 self.detections_root = os.path.join(detections_path, self.dataset_name) 171 self.tensorboard_root = os.path.join( 172 log_root, "tensorboard/zeroshot_object_detection" 173 ) 174 175 logging.info(f"Zero-shot models will look for {self.object_classes}")
Initialize the zero-shot object detection labeler with dataset, configuration, and path settings.
177 def exclude_stored_predictions( 178 self, dataset_v51: fo.Dataset, config, do_exclude=False 179 ): 180 """Checks for existing predictions and loads them from disk if available.""" 181 dataset_schema = dataset_v51.get_field_schema() 182 models_splits_dict = {} 183 for model_name, value in config["hf_models_zeroshot_objectdetection"].items(): 184 model_name_key = re.sub(r"[\W-]+", "_", model_name) 185 pred_key = re.sub( 186 r"[\W-]+", "_", "pred_zsod_" + model_name 187 ) # od for Object Detection 188 # Check if data already stored in V51 dataset 189 if pred_key in dataset_schema and do_exclude is True: 190 logging.warning( 191 f"Skipping model {model_name}. Predictions already stored in Voxel51 dataset." 192 ) 193 # Check if data already stored on disk 194 elif ( 195 os.path.isdir(os.path.join(self.detections_root, model_name_key)) 196 and do_exclude is True 197 ): 198 try: 199 logging.info(f"Loading {model_name} predictions from disk.") 200 temp_dataset = fo.Dataset.from_dir( 201 dataset_dir=os.path.join(self.detections_root, model_name_key), 202 dataset_type=fo.types.COCODetectionDataset, 203 name="temp_dataset", 204 data_path="data.json", 205 ) 206 207 # Copy all detections from stored dataset into our dataset 208 detections = temp_dataset.values("detections.detections") 209 add_sample_field( 210 dataset_v51, 211 pred_key, 212 fo.EmbeddedDocumentField, 213 embedded_doc_type=fo.Detections, 214 ) 215 dataset_v51.set_values(f"{pred_key}.detections", detections) 216 except Exception as e: 217 logging.error( 218 f"Data in {os.path.join(self.detections_root, model_name_key)} could not be loaded. Error: {e}" 219 ) 220 finally: 221 fo.delete_dataset("temp_dataset") 222 # Assign model to be run 223 else: 224 models_splits_dict[model_name] = value 225 226 logging.info(f"Models to be run: {models_splits_dict}") 227 return models_splits_dict
Checks for existing predictions and loads them from disk if available.
230 def update_queue_sizes_worker( 231 self, queues, queue_sizes, largest_queue_index, max_queue_size 232 ): 233 """Monitor and manage multiple result queues for balanced processing.""" 234 experiment_name = f"queue_size_monitor_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}" 235 log_directory = os.path.join( 236 self.tensorboard_root, self.dataset_name, experiment_name 237 ) 238 wandb.tensorboard.patch(root_logdir=log_directory) 239 if WANDB_ACTIVE: 240 wandb.init( 241 name=f"queue_size_monitor_{os.getpid()}", 242 job_type="inference", 243 project="Zero Shot Object Detection", 244 ) 245 writer = SummaryWriter(log_dir=log_directory) 246 247 step = 0 248 249 while True: 250 for i, queue in enumerate(queues): 251 queue_sizes[i] = queue.qsize() 252 writer.add_scalar(f"queue_size/items/{i}", queue_sizes[i], step) 253 254 step += 1 255 256 # Find the index of the largest queue 257 max_size = max(queue_sizes) 258 max_index = queue_sizes.index(max_size) 259 260 # Calculate the total size of all queues 261 total_size = sum(queue_sizes) 262 263 # If total_size is greater than 0, calculate the probabilities 264 if total_size > 0: 265 # Normalize the queue sizes by the max_queue_size 266 normalized_sizes = [size / max_queue_size for size in queue_sizes] 267 268 # Calculate probabilities based on normalized sizes 269 probabilities = [ 270 size / sum(normalized_sizes) for size in normalized_sizes 271 ] 272 273 # Use random.choices with weights (probabilities) 274 chosen_queue_index = random.choices( 275 range(len(queues)), weights=probabilities, k=1 276 )[0] 277 278 largest_queue_index.value = chosen_queue_index 279 else: 280 largest_queue_index.value = max_index 281 282 time.sleep(0.1)
Monitor and manage multiple result queues for balanced processing.
284 def process_outputs_worker( 285 self, 286 result_queues, 287 largest_queue_index, 288 inference_finished, 289 max_queue_size, 290 wandb_activate=False, 291 ): 292 """Process model outputs from result queues and save to dataset.""" 293 configure_logging() 294 logging.info(f"Process ID: {os.getpid()}. Results processing process started") 295 dataset_v51 = fo.load_dataset(self.dataset_name) 296 processing_successful = None 297 298 # Logging 299 experiment_name = f"post_process_{os.getpid()}_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}" 300 log_directory = os.path.join( 301 self.tensorboard_root, self.dataset_name, experiment_name 302 ) 303 wandb.tensorboard.patch(root_logdir=log_directory) 304 if WANDB_ACTIVE and wandb_activate: 305 wandb.init( 306 name=f"post_process_{os.getpid()}", 307 job_type="inference", 308 project="Zero Shot Object Detection", 309 ) 310 writer = SummaryWriter(log_dir=log_directory) 311 n_processed_images = 0 312 313 logging.info(f"Post-Processor {os.getpid()} starting loop.") 314 315 while True: 316 results_queue = result_queues[largest_queue_index.value] 317 writer.add_scalar( 318 f"post_processing/selected_queue", 319 largest_queue_index.value, 320 n_processed_images, 321 ) 322 323 if results_queue.qsize() == max_queue_size: 324 logging.warning( 325 f"Queue full: {results_queue.qsize()}. Consider increasing number of post-processing workers." 326 ) 327 328 # Exit only when inference is finished and the queue is empty 329 if inference_finished.value and results_queue.empty(): 330 dataset_v51.save() 331 logging.info( 332 f"Post-processing worker {os.getpid()} has finished all outputs." 333 ) 334 break 335 336 # Process results from the queue if available 337 if not results_queue.empty(): 338 try: 339 time_start = time.time() 340 341 result = results_queue.get_nowait() 342 343 processing_successful = self.process_outputs( 344 dataset_v51, 345 result, 346 self.object_classes, 347 self.detection_threshold, 348 ) 349 350 # Performance logging 351 n_images = len(result["labels"]) 352 time_end = time.time() 353 duration = time_end - time_start 354 batches_per_second = 1 / duration 355 frames_per_second = batches_per_second * n_images 356 n_processed_images += n_images 357 writer.add_scalar( 358 f"post_processing/frames_per_second", 359 frames_per_second, 360 n_processed_images, 361 ) 362 363 del result # Explicit removal from device 364 365 except Exception as e: 366 continue 367 368 else: 369 continue 370 371 writer.close() 372 wandb.finish(exit_code=0) 373 return processing_successful # Return last processing status
Process model outputs from result queues and save to dataset.
375 def gpu_worker( 376 self, 377 gpu_id, 378 cpu_cores, 379 task_queue, 380 results_queue, 381 done_event, 382 post_processing_finished, 383 set_cpu_affinity=False, 384 ): 385 """Run model inference on specified GPU with dedicated CPU cores.""" 386 dataset_v51 = fo.load_dataset( 387 self.dataset_name 388 ) # NOTE Only for the case of sequential processing 389 configure_logging() 390 # Set CPU 391 if set_cpu_affinity: 392 # Allow only certain CPU cores 393 psutil.Process().cpu_affinity(cpu_cores) 394 logging.info(f"Available CPU cores: {psutil.Process().cpu_affinity()}") 395 max_n_cpus = len(cpu_cores) 396 torch.set_num_threads(max_n_cpus) 397 398 # Set GPU 399 logging.info(f"GPU {gpu_id}: {torch.cuda.get_device_name(gpu_id)}") 400 device = torch.device(f"cuda:{gpu_id}") 401 402 run_successful = None 403 with torch.cuda.device(gpu_id): 404 while True: 405 if post_processing_finished.value and task_queue.empty(): 406 # Keep alive until post-processing is done 407 break 408 409 if task_queue.empty(): 410 done_event.set() 411 412 if not task_queue.empty(): 413 try: 414 task_metadata = task_queue.get( 415 timeout=5 416 ) # Timeout to prevent indefinite blocking 417 except Exception as e: 418 break # Exit if no more tasks 419 run_successful = self.model_inference( 420 task_metadata, 421 device, 422 self.dataset_torch, 423 dataset_v51, 424 self.object_classes, 425 results_queue, 426 self.tensorboard_root, 427 ) 428 logging.info( 429 f"Worker for GPU {gpu_id} finished run successful: {run_successful}" 430 ) 431 else: 432 continue 433 return run_successful # Return last processing status
Run model inference on specified GPU with dedicated CPU cores.
435 def eval_and_export_worker(self, models_ready_queue, n_models): 436 """Evaluate model performance and export results for completed models.""" 437 configure_logging() 438 logging.info(f"Process ID: {os.getpid()}. Eval-and-export process started") 439 440 dataset = fo.load_dataset(self.dataset_name) 441 run_successful = None 442 models_done = 0 443 444 while True: 445 if not models_ready_queue.empty(): 446 try: 447 dict = models_ready_queue.get( 448 timeout=5 449 ) # Timeout to prevent indefinite blocking 450 model_name = dict["model_name"] 451 pred_key = re.sub(r"[\W-]+", "_", "pred_zsod_" + model_name) 452 eval_key = re.sub(r"[\W-]+", "_", "eval_zsod_" + model_name) 453 dataset.reload() 454 run_successful = self.eval_and_export( 455 dataset, model_name, pred_key, eval_key 456 ) 457 models_done += 1 458 logging.info( 459 f"Evaluation and export of {models_done}/{n_models} models done." 460 ) 461 except Exception as e: 462 logging.error(f"Error in eval-and-export worker: {e}") 463 continue 464 465 if models_done == n_models: 466 break 467 468 return run_successful
Evaluate model performance and export results for completed models.
471 def model_inference( 472 self, 473 metadata: dict, 474 device: str, 475 dataset: torch.utils.data.Dataset, 476 dataset_v51: fo.Dataset, 477 object_classes: list, 478 results_queue: Union[queue.Queue, mp.Queue], 479 root_log_dir: str, 480 persistent_workers: bool = False, 481 ): 482 """Model inference method running zero-shot object detection on provided dataset and device, returning success status.""" 483 writer = None 484 run_successful = True 485 processor, model, inputs, outputs, result, dataloader = ( 486 None, 487 None, 488 None, 489 None, 490 None, 491 None, 492 ) # For finally block 493 494 # Timeout handler 495 dataloader_timeout = 60 496 signal.signal(signal.SIGALRM, timeout_handler) 497 498 try: 499 # Metadata 500 run_id = metadata["run_id"] 501 model_name = metadata["model_name"] 502 dataset_name = metadata["dataset_name"] 503 is_subset = metadata["is_subset"] 504 batch_size = metadata["batch_size"] 505 506 logging.info( 507 f"Process ID: {os.getpid()}, Run ID: {run_id}, Device: {device}, Model: {model_name}" 508 ) 509 510 # Load the model 511 logging.info(f"Loading model {model_name}") 512 processor = AutoProcessor.from_pretrained(model_name, use_fast=True) 513 model = AutoModelForZeroShotObjectDetection.from_pretrained(model_name) 514 model = model.to(device, non_blocking=True) 515 model.eval() 516 hf_model_config = AutoConfig.from_pretrained(model_name) 517 hf_model_config_name = type(hf_model_config).__name__ 518 batch_classes = [object_classes] * batch_size 519 logging.info(f"Loaded model type {hf_model_config_name}") 520 521 # Dataloader 522 logging.info("Generating dataloader") 523 if is_subset: 524 chunk_index_start = metadata["chunk_index_start"] 525 chunk_index_end = metadata["chunk_index_end"] 526 logging.info(f"Length of dataset: {len(dataset)}") 527 logging.info(f"Subset start index: {chunk_index_start}") 528 logging.info(f"Subset stop index: {chunk_index_end}") 529 dataset = Subset(dataset, range(chunk_index_start, chunk_index_end)) 530 531 zero_shot_inference_preprocessing = ZeroShotInferenceCollateFn( 532 hf_model_config_name=hf_model_config_name, 533 hf_processor=processor, 534 object_classes=object_classes, 535 batch_size=batch_size, 536 batch_classes=batch_classes, 537 ) 538 num_workers = WORKFLOWS["auto_labeling_zero_shot"]["n_worker_dataloader"] 539 prefetch_factor = WORKFLOWS["auto_labeling_zero_shot"][ 540 "prefetch_factor_dataloader" 541 ] 542 dataloader = DataLoader( 543 dataset, 544 batch_size=batch_size, 545 shuffle=False, 546 num_workers=num_workers, 547 persistent_workers=persistent_workers, 548 pin_memory=True, 549 prefetch_factor=prefetch_factor, 550 collate_fn=zero_shot_inference_preprocessing, 551 ) 552 553 dataloader_length = len(dataloader) 554 if dataloader_length < 1: 555 logging.error( 556 f"Dataloader has insufficient data: {dataloader_length} entries. Please check your dataset and DataLoader configuration." 557 ) 558 559 # Logging 560 experiment_name = f"{model_name}_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}_{device}" 561 log_directory = os.path.join(root_log_dir, dataset_name, experiment_name) 562 wandb.tensorboard.patch(root_logdir=log_directory) 563 if WANDB_ACTIVE: 564 wandb.init( 565 name=f"{model_name}_{device}", 566 job_type="inference", 567 project="Zero Shot Object Detection", 568 config=metadata, 569 ) 570 writer = SummaryWriter(log_dir=log_directory) 571 572 # Inference Loop 573 logging.info(f"{os.getpid()}: Starting inference loop5") 574 n_processed_images = 0 575 for inputs, labels, target_sizes, batch_classes in tqdm( 576 dataloader, desc="Inference Loop" 577 ): 578 signal.alarm(dataloader_timeout) 579 try: 580 time_start = time.time() 581 n_images = len(labels) 582 inputs = inputs.to(device, non_blocking=True) 583 584 with torch.amp.autocast("cuda"), torch.inference_mode(): 585 outputs = model(**inputs) 586 587 result = { 588 "inputs": inputs, 589 "outputs": outputs, 590 "processor": processor, 591 "target_sizes": target_sizes, 592 "labels": labels, 593 "model_name": model_name, 594 "hf_model_config_name": hf_model_config_name, 595 "batch_classes": batch_classes, 596 } 597 598 logging.debug(f"{os.getpid()}: Putting result into queue") 599 600 results_queue.put( 601 result, timeout=60 602 ) # Ditch data only after 60 seconds 603 604 # Logging 605 time_end = time.time() 606 duration = time_end - time_start 607 batches_per_second = 1 / duration 608 frames_per_second = batches_per_second * n_images 609 n_processed_images += n_images 610 logging.debug( 611 f"{os.getpid()}: Number of processes images: {n_processed_images}" 612 ) 613 writer.add_scalar( 614 f"inference/frames_per_second", 615 frames_per_second, 616 n_processed_images, 617 ) 618 619 except TimeoutException: 620 logging.warning( 621 f"Dataloader loop got stuck. Continuing with next batch." 622 ) 623 continue 624 625 finally: 626 signal.alarm(0) # Cancel the alarm 627 628 # Flawless execution 629 wandb_exit_code = 0 630 631 except Exception as e: 632 wandb_exit_code = 1 633 run_successful = False 634 logging.error(f"Error in Process {os.getpid()}: {e}") 635 finally: 636 try: 637 wandb.finish(exit_code=wandb_exit_code) 638 except: 639 pass 640 641 # Explicit removal from device 642 del ( 643 processor, 644 model, 645 inputs, 646 outputs, 647 result, 648 dataloader, 649 ) 650 651 torch.cuda.empty_cache() 652 wandb.tensorboard.unpatch() 653 if writer: 654 writer.close() 655 return run_successful
Model inference method running zero-shot object detection on provided dataset and device, returning success status.
657 def process_outputs(self, dataset_v51, result, object_classes, detection_threshold): 658 """Process outputs from object detection models, extracting bounding boxes and labels to save to the dataset.""" 659 try: 660 inputs = result["inputs"] 661 outputs = result["outputs"] 662 target_sizes = result["target_sizes"] 663 labels = result["labels"] 664 model_name = result["model_name"] 665 hf_model_config_name = result["hf_model_config_name"] 666 batch_classes = result["batch_classes"] 667 processor = result["processor"] 668 669 # Processing output 670 if hf_model_config_name == "GroundingDinoConfig": 671 results = processor.post_process_grounded_object_detection( 672 outputs, 673 inputs.input_ids, 674 box_threshold=detection_threshold, 675 text_threshold=detection_threshold, 676 ) 677 elif hf_model_config_name in ["Owlv2Config", "OwlViTConfig"]: 678 results = processor.post_process_grounded_object_detection( 679 outputs=outputs, 680 threshold=detection_threshold, 681 target_sizes=target_sizes, 682 text_labels=batch_classes, 683 ) 684 elif hf_model_config_name == "OmDetTurboConfig": 685 results = processor.post_process_grounded_object_detection( 686 outputs, 687 text_labels=batch_classes, 688 threshold=detection_threshold, 689 nms_threshold=detection_threshold, 690 target_sizes=target_sizes, 691 ) 692 else: 693 logging.error(f"Invalid model name: {hf_model_config_name}") 694 695 if not len(results) == len(target_sizes) == len(labels): 696 logging.error( 697 f"Lengths of results, target_sizes, and labels do not match: {len(results)}, {len(target_sizes)}, {len(labels)}" 698 ) 699 for result, size, target in zip(results, target_sizes, labels): 700 boxes, scores, labels = ( 701 result["boxes"], 702 result["scores"], 703 result["text_labels"], 704 ) 705 706 img_height = size[0] 707 img_width = size[1] 708 709 detections = [] 710 for box, score, label in zip(boxes, scores, labels): 711 processing_successful = True 712 if hf_model_config_name == "GroundingDinoConfig": 713 # Outputs do not comply with given labels 714 # Grounding DINO outputs multiple pairs of object boxes and noun phrases for a given (Image, Text) pair 715 # There can be either multiple labels per output ("bike van"), incomplete ones ("motorcyc"), or broken ones ("##cic") 716 processed_label = label.split()[ 717 0 718 ] # Assume first output is the best output 719 if processed_label in object_classes: 720 label = processed_label 721 top_left_x = box[0].item() 722 top_left_y = box[1].item() 723 box_width = (box[2] - box[0]).item() 724 box_height = (box[3] - box[1]).item() 725 else: 726 matches = get_close_matches( 727 processed_label, object_classes, n=1, cutoff=0.6 728 ) 729 selected_label = matches[0] if matches else None 730 if selected_label: 731 logging.debug( 732 f"Mapped output '{processed_label}' to class '{selected_label}'" 733 ) 734 label = selected_label 735 top_left_x = box[0].item() 736 top_left_y = box[1].item() 737 box_width = (box[2] - box[0]).item() 738 box_height = (box[3] - box[1]).item() 739 else: 740 logging.debug( 741 f"Skipped detection with {hf_model_config_name} due to unclear output: {label}" 742 ) 743 processing_successful = False 744 745 elif hf_model_config_name in [ 746 "Owlv2Config", 747 "OwlViTConfig", 748 "OmDetTurboConfig", 749 ]: 750 top_left_x = box[0].item() / img_width 751 top_left_y = box[1].item() / img_height 752 box_width = (box[2].item() - box[0].item()) / img_width 753 box_height = (box[3].item() - box[1].item()) / img_height 754 755 if ( 756 processing_successful 757 ): # Skip GroundingDinoConfig labels that could not be processed 758 detection = fo.Detection( 759 label=label, 760 bounding_box=[ 761 top_left_x, 762 top_left_y, 763 box_width, 764 box_height, 765 ], 766 confidence=score.item(), 767 ) 768 detection["bbox_area"] = ( 769 detection["bounding_box"][2] * detection["bounding_box"][3] 770 ) 771 detections.append(detection) 772 773 # Attach label to V51 dataset 774 pred_key = re.sub( 775 r"[\W-]+", "_", "pred_zsod_" + model_name 776 ) # zsod Zero-Shot Object Deection 777 sample = dataset_v51[target["image_id"]] 778 sample[pred_key] = fo.Detections(detections=detections) 779 sample.save() 780 781 except Exception as e: 782 logging.error(f"Error in processing outputs: {e}") 783 processing_successful = False 784 finally: 785 return processing_successful
Process outputs from object detection models, extracting bounding boxes and labels to save to the dataset.
787 def eval_and_export(self, dataset_v51, model_name, pred_key, eval_key): 788 """Populate dataset with evaluation results (if ground_truth available)""" 789 try: 790 dataset_v51.evaluate_detections( 791 pred_key, 792 gt_field="ground_truth", 793 eval_key=eval_key, 794 compute_mAP=True, 795 ) 796 except Exception as e: 797 logging.warning(f"Evaluation not possible: {e}") 798 799 # Store labels https://docs.voxel51.com/api/fiftyone.core.collections.html#fiftyone.core.collections.SampleCollection.export 800 model_name_key = re.sub(r"[\W-]+", "_", model_name) 801 dataset_v51.export( 802 export_dir=os.path.join(self.detections_root, model_name_key), 803 dataset_type=fo.types.COCODetectionDataset, 804 data_path="data.json", 805 export_media=None, # "manifest", 806 label_field=pred_key, 807 progress=True, 808 ) 809 return True
Populate dataset with evaluation results (if ground_truth available)
812class UltralyticsObjectDetection: 813 """Object detection using Ultralytics YOLO models with training and inference support.""" 814 815 def __init__(self, dataset, config): 816 """Initialize with dataset, config, and setup paths for model and data.""" 817 self.dataset = dataset 818 self.config = config 819 self.ultralytics_data_path = os.path.join( 820 config["export_dataset_root"], config["v51_dataset_name"] 821 ) 822 823 self.hf_hub_model_id = ( 824 f"{HF_ROOT}/" 825 + f"{config['v51_dataset_name']}_{config['model_name']}".replace("/", "_") 826 ) 827 828 self.export_root = "output/models/ultralytics/" 829 self.export_folder = os.path.join( 830 self.export_root, self.config["v51_dataset_name"] 831 ) 832 833 self.model_path = os.path.join( 834 self.export_folder, self.config["model_name"], "weights", "best.pt" 835 ) 836 837 @staticmethod 838 def export_data( 839 dataset, dataset_info, export_dataset_root, label_field="ground_truth" 840 ): 841 """Export dataset to YOLO format for Ultralytics training.""" 842 ultralytics_data_path = os.path.join(export_dataset_root, dataset_info["name"]) 843 # Delete export directory if it already exists 844 if os.path.exists(ultralytics_data_path): 845 shutil.rmtree(ultralytics_data_path) 846 847 logging.info("Exporting data for training with Ultralytics") 848 classes = dataset.distinct(f"{label_field}.detections.label") 849 850 # Make directory 851 os.makedirs(ultralytics_data_path, exist_ok=False) 852 853 for split in ACCEPTED_SPLITS: 854 split_view = dataset.match_tags(split) 855 856 if split == "val" or split == "train": # YOLO expects train and val 857 split_view.export( 858 export_dir=ultralytics_data_path, 859 dataset_type=fo.types.YOLOv5Dataset, 860 label_field=label_field, 861 classes=classes, 862 split=split, 863 ) 864 865 def train(self): 866 """Train the YOLO model for object detection using Ultralytics and optionally upload to Hugging Face.""" 867 model = YOLO(self.config["model_name"], task="detect") 868 # https://docs.ultralytics.com/modes/train/#train-settings 869 870 # Use all available GPUs 871 device = "0" # Default to GPU 0 872 if torch.cuda.device_count() > 1: 873 device = ",".join(map(str, range(torch.cuda.device_count()))) 874 875 results = model.train( 876 data=f"{self.ultralytics_data_path}/dataset.yaml", 877 epochs=self.config["epochs"], 878 project=self.export_folder, 879 name=self.config["model_name"], 880 patience=self.config["patience"], 881 batch=self.config["batch_size"], 882 imgsz=self.config["img_size"], 883 multi_scale=self.config["multi_scale"], 884 cos_lr=self.config["cos_lr"], 885 seed=GLOBAL_SEED, 886 optimizer="AdamW", # "auto" as default 887 pretrained=True, 888 exist_ok=True, 889 amp=True, 890 device=device 891 ) 892 metrics = model.val() 893 logging.info(f"Model Performance: {metrics}") 894 895 # Upload model to Hugging Face 896 if HF_DO_UPLOAD: 897 logging.info(f"Uploading model {self.model_path} to Hugging Face.") 898 api = HfApi() 899 api.create_repo( 900 self.hf_hub_model_id, private=True, repo_type="model", exist_ok=True 901 ) 902 api.upload_file( 903 path_or_fileobj=self.model_path, 904 path_in_repo="best.pt", 905 repo_id=self.hf_hub_model_id, 906 repo_type="model", 907 ) 908 909 def inference(self, gt_field="ground_truth"): 910 """Performs inference using YOLO model on a dataset, with options to evaluate results.""" 911 logging.info(f"Running inference on dataset {self.config['v51_dataset_name']}") 912 inference_settings = self.config["inference_settings"] 913 914 dataset_name = None 915 model_name = self.config["model_name"] 916 917 model_hf = inference_settings["model_hf"] 918 if model_hf is not None: 919 # Use model manually defined in config. 920 # This way models can be used for inference which were trained on a different dataset 921 dataset_name, _ = get_dataset_and_model_from_hf_id(model_hf) 922 923 # Set up directories 924 download_dir = os.path.join( 925 self.export_root, dataset_name, model_name, "weights" 926 ) 927 os.makedirs(os.path.join(download_dir), exist_ok=True) 928 929 self.model_path = os.path.join(download_dir, "best.pt") 930 931 # Create directories if they don't exist 932 933 file_path = hf_hub_download( 934 repo_id=model_hf, 935 filename="best.pt", 936 local_dir=download_dir, 937 ) 938 else: 939 # Automatically determine model based on dataset 940 dataset_name = self.config["v51_dataset_name"] 941 942 try: 943 if os.path.exists(self.model_path): 944 file_path = self.model_path 945 logging.info(f"Loading model {model_name} from disk: {file_path}") 946 else: 947 download_dir = self.model_path.replace("best.pt", "") 948 os.makedirs(download_dir, exist_ok=True) 949 logging.info( 950 f"Downloading model {self.hf_hub_model_id} from Hugging Face to {download_dir}" 951 ) 952 file_path = hf_hub_download( 953 repo_id=self.hf_hub_model_id, 954 filename="best.pt", 955 local_dir=download_dir, 956 ) 957 except Exception as e: 958 logging.error(f"Failed to load or download model: {str(e)}.") 959 return False 960 961 pred_key = f"pred_od_{model_name}-{dataset_name}" 962 logging.info(f"Using model {self.model_path} for inference.") 963 model = YOLO(self.model_path) 964 965 detection_threshold = inference_settings["detection_threshold"] 966 if inference_settings["inference_on_test"] is True: 967 dataset_eval_view = self.dataset.match_tags("test") 968 if len(dataset_eval_view) == 0: 969 logging.error("Dataset misses split 'test'") 970 dataset_eval_view.apply_model( 971 model, label_field=pred_key, confidence_thresh=detection_threshold 972 ) 973 else: 974 self.dataset.apply_model( 975 model, label_field=pred_key, confidence_thresh=detection_threshold 976 ) 977 978 if inference_settings["do_eval"]: 979 eval_key = f"eval_{self.config['model_name']}_{dataset_name}" 980 981 if inference_settings["inference_on_test"] is True: 982 dataset_view = self.dataset.match_tags(["test"]) 983 else: 984 dataset_view = self.dataset 985 986 dataset_view.evaluate_detections( 987 pred_key, 988 gt_field=gt_field, 989 eval_key=eval_key, 990 compute_mAP=True, 991 )
Object detection using Ultralytics YOLO models with training and inference support.
815 def __init__(self, dataset, config): 816 """Initialize with dataset, config, and setup paths for model and data.""" 817 self.dataset = dataset 818 self.config = config 819 self.ultralytics_data_path = os.path.join( 820 config["export_dataset_root"], config["v51_dataset_name"] 821 ) 822 823 self.hf_hub_model_id = ( 824 f"{HF_ROOT}/" 825 + f"{config['v51_dataset_name']}_{config['model_name']}".replace("/", "_") 826 ) 827 828 self.export_root = "output/models/ultralytics/" 829 self.export_folder = os.path.join( 830 self.export_root, self.config["v51_dataset_name"] 831 ) 832 833 self.model_path = os.path.join( 834 self.export_folder, self.config["model_name"], "weights", "best.pt" 835 )
Initialize with dataset, config, and setup paths for model and data.
837 @staticmethod 838 def export_data( 839 dataset, dataset_info, export_dataset_root, label_field="ground_truth" 840 ): 841 """Export dataset to YOLO format for Ultralytics training.""" 842 ultralytics_data_path = os.path.join(export_dataset_root, dataset_info["name"]) 843 # Delete export directory if it already exists 844 if os.path.exists(ultralytics_data_path): 845 shutil.rmtree(ultralytics_data_path) 846 847 logging.info("Exporting data for training with Ultralytics") 848 classes = dataset.distinct(f"{label_field}.detections.label") 849 850 # Make directory 851 os.makedirs(ultralytics_data_path, exist_ok=False) 852 853 for split in ACCEPTED_SPLITS: 854 split_view = dataset.match_tags(split) 855 856 if split == "val" or split == "train": # YOLO expects train and val 857 split_view.export( 858 export_dir=ultralytics_data_path, 859 dataset_type=fo.types.YOLOv5Dataset, 860 label_field=label_field, 861 classes=classes, 862 split=split, 863 )
Export dataset to YOLO format for Ultralytics training.
865 def train(self): 866 """Train the YOLO model for object detection using Ultralytics and optionally upload to Hugging Face.""" 867 model = YOLO(self.config["model_name"], task="detect") 868 # https://docs.ultralytics.com/modes/train/#train-settings 869 870 # Use all available GPUs 871 device = "0" # Default to GPU 0 872 if torch.cuda.device_count() > 1: 873 device = ",".join(map(str, range(torch.cuda.device_count()))) 874 875 results = model.train( 876 data=f"{self.ultralytics_data_path}/dataset.yaml", 877 epochs=self.config["epochs"], 878 project=self.export_folder, 879 name=self.config["model_name"], 880 patience=self.config["patience"], 881 batch=self.config["batch_size"], 882 imgsz=self.config["img_size"], 883 multi_scale=self.config["multi_scale"], 884 cos_lr=self.config["cos_lr"], 885 seed=GLOBAL_SEED, 886 optimizer="AdamW", # "auto" as default 887 pretrained=True, 888 exist_ok=True, 889 amp=True, 890 device=device 891 ) 892 metrics = model.val() 893 logging.info(f"Model Performance: {metrics}") 894 895 # Upload model to Hugging Face 896 if HF_DO_UPLOAD: 897 logging.info(f"Uploading model {self.model_path} to Hugging Face.") 898 api = HfApi() 899 api.create_repo( 900 self.hf_hub_model_id, private=True, repo_type="model", exist_ok=True 901 ) 902 api.upload_file( 903 path_or_fileobj=self.model_path, 904 path_in_repo="best.pt", 905 repo_id=self.hf_hub_model_id, 906 repo_type="model", 907 )
Train the YOLO model for object detection using Ultralytics and optionally upload to Hugging Face.
909 def inference(self, gt_field="ground_truth"): 910 """Performs inference using YOLO model on a dataset, with options to evaluate results.""" 911 logging.info(f"Running inference on dataset {self.config['v51_dataset_name']}") 912 inference_settings = self.config["inference_settings"] 913 914 dataset_name = None 915 model_name = self.config["model_name"] 916 917 model_hf = inference_settings["model_hf"] 918 if model_hf is not None: 919 # Use model manually defined in config. 920 # This way models can be used for inference which were trained on a different dataset 921 dataset_name, _ = get_dataset_and_model_from_hf_id(model_hf) 922 923 # Set up directories 924 download_dir = os.path.join( 925 self.export_root, dataset_name, model_name, "weights" 926 ) 927 os.makedirs(os.path.join(download_dir), exist_ok=True) 928 929 self.model_path = os.path.join(download_dir, "best.pt") 930 931 # Create directories if they don't exist 932 933 file_path = hf_hub_download( 934 repo_id=model_hf, 935 filename="best.pt", 936 local_dir=download_dir, 937 ) 938 else: 939 # Automatically determine model based on dataset 940 dataset_name = self.config["v51_dataset_name"] 941 942 try: 943 if os.path.exists(self.model_path): 944 file_path = self.model_path 945 logging.info(f"Loading model {model_name} from disk: {file_path}") 946 else: 947 download_dir = self.model_path.replace("best.pt", "") 948 os.makedirs(download_dir, exist_ok=True) 949 logging.info( 950 f"Downloading model {self.hf_hub_model_id} from Hugging Face to {download_dir}" 951 ) 952 file_path = hf_hub_download( 953 repo_id=self.hf_hub_model_id, 954 filename="best.pt", 955 local_dir=download_dir, 956 ) 957 except Exception as e: 958 logging.error(f"Failed to load or download model: {str(e)}.") 959 return False 960 961 pred_key = f"pred_od_{model_name}-{dataset_name}" 962 logging.info(f"Using model {self.model_path} for inference.") 963 model = YOLO(self.model_path) 964 965 detection_threshold = inference_settings["detection_threshold"] 966 if inference_settings["inference_on_test"] is True: 967 dataset_eval_view = self.dataset.match_tags("test") 968 if len(dataset_eval_view) == 0: 969 logging.error("Dataset misses split 'test'") 970 dataset_eval_view.apply_model( 971 model, label_field=pred_key, confidence_thresh=detection_threshold 972 ) 973 else: 974 self.dataset.apply_model( 975 model, label_field=pred_key, confidence_thresh=detection_threshold 976 ) 977 978 if inference_settings["do_eval"]: 979 eval_key = f"eval_{self.config['model_name']}_{dataset_name}" 980 981 if inference_settings["inference_on_test"] is True: 982 dataset_view = self.dataset.match_tags(["test"]) 983 else: 984 dataset_view = self.dataset 985 986 dataset_view.evaluate_detections( 987 pred_key, 988 gt_field=gt_field, 989 eval_key=eval_key, 990 compute_mAP=True, 991 )
Performs inference using YOLO model on a dataset, with options to evaluate results.
994def transform_batch_standalone( 995 batch, 996 image_processor, 997 do_convert_annotations=True, 998 return_pixel_mask=False, 999): 1000 """Apply format annotations in COCO format for object detection task. Outside of class so it can be pickled.""" 1001 images = [] 1002 annotations = [] 1003 1004 for image_path, annotation in zip(batch["image_path"], batch["objects"]): 1005 image = Image.open(image_path).convert("RGB") 1006 image_np = np.array(image) 1007 images.append(image_np) 1008 1009 coco_annotations = [] 1010 for i, bbox in enumerate(annotation["bbox"]): 1011 1012 # Conversion from HF dataset bounding boxes to DETR: 1013 # Input: HF dataset bbox is COCO (top_left_x, top_left_y, width, height) in absolute coordinates 1014 # Output: 1015 # DETR expects COCO (top_left_x, top_left_y, width, height) in absolute coordinates if 'do_convert_annotations == True' 1016 # DETR expects YOLO (center_x, center_y, width, height) in relative coordinates between [0,1] if 'do_convert_annotations == False' 1017 1018 if do_convert_annotations == False: 1019 x, y, w, h = bbox 1020 img_height, img_width = image_np.shape[:2] 1021 center_x = (x + w / 2) / img_width 1022 center_y = (y + h / 2) / img_height 1023 width = w / img_width 1024 height = h / img_height 1025 bbox = [center_x, center_y, width, height] 1026 1027 # Ensure bbox values are within the expected range 1028 assert all(0 <= coord <= 1 for coord in bbox), f"Invalid bbox: {bbox}" 1029 1030 logging.debug( 1031 f"Converted {[x, y, w, h]} to {[center_x, center_y, width, height]} with 'do_convert_annotations' = {do_convert_annotations}" 1032 ) 1033 1034 coco_annotation = { 1035 "image_id": annotation["image_id"], 1036 "bbox": bbox, 1037 "category_id": annotation["category_id"][i], 1038 "area": annotation["area"][i], 1039 "iscrowd": 0, 1040 } 1041 coco_annotations.append(coco_annotation) 1042 detr_annotation = { 1043 "image_id": annotation["image_id"], 1044 "annotations": coco_annotations, 1045 } 1046 annotations.append(detr_annotation) 1047 1048 # Apply the image processor transformations: resizing, rescaling, normalization 1049 result = image_processor( 1050 images=images, annotations=annotations, return_tensors="pt" 1051 ) 1052 1053 if not return_pixel_mask: 1054 result.pop("pixel_mask", None) 1055 1056 return result
Apply format annotations in COCO format for object detection task. Outside of class so it can be pickled.
1059class HuggingFaceObjectDetection: 1060 """Object detection using HuggingFace models with support for training and inference.""" 1061 1062 def __init__( 1063 self, 1064 dataset, 1065 config, 1066 output_model_path="./output/models/object_detection_hf", 1067 output_detections_path="./output/detections/", 1068 gt_field="ground_truth", 1069 ): 1070 """Initialize with dataset, config, and optional output paths.""" 1071 self.dataset = dataset 1072 self.config = config 1073 self.model_name = config["model_name"] 1074 self.model_name_key = re.sub(r"[\W-]+", "_", self.model_name) 1075 self.dataset_name = config["v51_dataset_name"] 1076 self.do_convert_annotations = True # HF can convert (top_left_x, top_left_y, bottom_right_x, bottom_right_y) in abs. coordinates to (x_min, y_min, width, height) in rel. coordinates https://github.com/huggingface/transformers/blob/v4.48.2/src/transformers/models/conditional_detr/image_processing_conditional_detr.py#L1497 1077 1078 self.detections_root = os.path.join( 1079 output_detections_path, self.dataset_name, self.model_name_key 1080 ) 1081 1082 self.model_root = os.path.join( 1083 output_model_path, self.dataset_name, self.model_name_key 1084 ) 1085 1086 self.hf_hub_model_id = ( 1087 f"{HF_ROOT}/" + f"{self.dataset_name}_{self.model_name}".replace("/", "_") 1088 ) 1089 1090 self.categories = dataset.distinct(f"{gt_field}.detections.label") 1091 self.id2label = {index: x for index, x in enumerate(self.categories, start=0)} 1092 self.label2id = {v: k for k, v in self.id2label.items()} 1093 1094 def collate_fn(self, batch): 1095 """Collate function for batching data during training and inference.""" 1096 data = {} 1097 data["pixel_values"] = torch.stack([x["pixel_values"] for x in batch]) 1098 data["labels"] = [x["labels"] for x in batch] 1099 if "pixel_mask" in batch[0]: 1100 data["pixel_mask"] = torch.stack([x["pixel_mask"] for x in batch]) 1101 return data 1102 1103 def train(self, hf_dataset, overwrite_output=True): 1104 """Train models for object detection tasks with support for custom image sizes and transformations.""" 1105 torch.cuda.empty_cache() 1106 img_size_target = self.config.get("image_size", None) 1107 if img_size_target is None: 1108 image_processor = AutoProcessor.from_pretrained( 1109 self.model_name, 1110 do_resize=False, 1111 do_pad=True, 1112 use_fast=True, 1113 do_convert_annotations=self.do_convert_annotations, 1114 ) 1115 else: 1116 logging.warning(f"Resizing images to target size {img_size_target}.") 1117 image_processor = AutoProcessor.from_pretrained( 1118 self.model_name, 1119 do_resize=True, 1120 size={ 1121 "max_height": img_size_target[1], 1122 "max_width": img_size_target[0], 1123 }, 1124 do_pad=True, 1125 pad_size={"height": img_size_target[1], "width": img_size_target[0]}, 1126 use_fast=True, 1127 do_convert_annotations=self.do_convert_annotations, 1128 ) 1129 1130 train_transform_batch = partial( 1131 transform_batch_standalone, 1132 image_processor=image_processor, 1133 do_convert_annotations=self.do_convert_annotations, 1134 ) 1135 val_test_transform_batch = partial( 1136 transform_batch_standalone, 1137 image_processor=image_processor, 1138 do_convert_annotations=self.do_convert_annotations, 1139 ) 1140 1141 hf_dataset[Split.TRAIN] = hf_dataset[Split.TRAIN].with_transform( 1142 train_transform_batch 1143 ) 1144 hf_dataset[Split.VALIDATION] = hf_dataset[Split.VALIDATION].with_transform( 1145 val_test_transform_batch 1146 ) 1147 hf_dataset[Split.TEST] = hf_dataset[Split.TEST].with_transform( 1148 val_test_transform_batch 1149 ) 1150 1151 hf_model_config = AutoConfig.from_pretrained(self.model_name) 1152 hf_model_config_name = type(hf_model_config).__name__ 1153 1154 if type(hf_model_config) in AutoModelForObjectDetection._model_mapping: 1155 model = AutoModelForObjectDetection.from_pretrained( 1156 self.model_name, 1157 id2label=self.id2label, 1158 label2id=self.label2id, 1159 ignore_mismatched_sizes=True, 1160 ) 1161 else: 1162 model = None 1163 logging.error( 1164 "Hugging Face AutoModel does not support " + str(type(hf_model_config)) 1165 ) 1166 1167 if ( 1168 overwrite_output == True 1169 and os.path.exists(self.model_root) 1170 and os.listdir(self.model_root) 1171 ): 1172 logging.warning( 1173 f"Training will overwrite existing results in {self.model_root}" 1174 ) 1175 1176 training_args = TrainingArguments( 1177 run_name=self.model_name, 1178 output_dir=self.model_root, 1179 overwrite_output_dir=overwrite_output, 1180 num_train_epochs=self.config["epochs"], 1181 fp16=True, 1182 per_device_train_batch_size=self.config["batch_size"], 1183 auto_find_batch_size=True, 1184 dataloader_num_workers=min(self.config["n_worker_dataloader"], NUM_WORKERS), 1185 learning_rate=self.config["learning_rate"], 1186 lr_scheduler_type="cosine", 1187 weight_decay=self.config["weight_decay"], 1188 max_grad_norm=self.config["max_grad_norm"], 1189 metric_for_best_model="eval_loss", 1190 greater_is_better=False, 1191 load_best_model_at_end=True, 1192 eval_strategy="epoch", 1193 save_strategy="best", 1194 save_total_limit=1, 1195 remove_unused_columns=False, 1196 eval_do_concat_batches=False, 1197 save_safetensors=False, # Does not work with all models 1198 hub_model_id=self.hf_hub_model_id, 1199 hub_private_repo=True, 1200 push_to_hub=HF_DO_UPLOAD, 1201 seed=GLOBAL_SEED, 1202 data_seed=GLOBAL_SEED, 1203 ) 1204 1205 early_stopping_callback = EarlyStoppingCallback( 1206 early_stopping_patience=self.config["early_stop_patience"], 1207 early_stopping_threshold=self.config["early_stop_threshold"], 1208 ) 1209 1210 trainer = Trainer( 1211 model=model, 1212 args=training_args, 1213 train_dataset=hf_dataset[Split.TRAIN], 1214 eval_dataset=hf_dataset[Split.VALIDATION], 1215 tokenizer=image_processor, 1216 data_collator=self.collate_fn, 1217 callbacks=[early_stopping_callback], 1218 # compute_metrics=eval_compute_metrics_fn, 1219 ) 1220 1221 logging.info(f"Starting training of model {self.model_name}.") 1222 trainer.train() 1223 if HF_DO_UPLOAD: 1224 trainer.push_to_hub() 1225 1226 metrics = trainer.evaluate(eval_dataset=hf_dataset[Split.TEST]) 1227 logging.info(f"Model training completed. Evaluation results: {metrics}") 1228 1229 def inference(self, inference_settings, load_from_hf=True, gt_field="ground_truth"): 1230 """Performs model inference on a dataset, loading from Hugging Face or disk, and optionally evaluates detection results.""" 1231 1232 model_hf = inference_settings["model_hf"] 1233 dataset_name = None 1234 if model_hf is not None: 1235 self.hf_hub_model_id = model_hf 1236 dataset_name, model_name = get_dataset_and_model_from_hf_id(model_hf) 1237 else: 1238 dataset_name = self.dataset_name 1239 torch.cuda.empty_cache() 1240 # Load trained model from Hugging Face 1241 load_from_hf_successful = None 1242 if load_from_hf: 1243 try: 1244 logging.info(f"Loading model from Hugging Face: {self.hf_hub_model_id}") 1245 image_processor = AutoProcessor.from_pretrained(self.hf_hub_model_id) 1246 model = AutoModelForObjectDetection.from_pretrained( 1247 self.hf_hub_model_id 1248 ) 1249 load_from_hf_successful = True 1250 except Exception as e: 1251 load_from_hf_successful = False 1252 logging.warning( 1253 f"Model {self.model_name} could not be loaded from Hugging Face {self.hf_hub_model_id}. Attempting loading from disk." 1254 ) 1255 if load_from_hf == False or load_from_hf_successful == False: 1256 try: 1257 # Select folder in self.model_root that include 'checkpoint-' 1258 checkpoint_dirs = [ 1259 d 1260 for d in os.listdir(self.model_root) 1261 if "checkpoint-" in d 1262 and os.path.isdir(os.path.join(self.model_root, d)) 1263 ] 1264 1265 if not checkpoint_dirs: 1266 logging.error( 1267 f"No checkpoint directory found in {self.model_root}!" 1268 ) 1269 model_path = None 1270 else: 1271 # Sort by modification time (latest first) 1272 checkpoint_dirs.sort( 1273 key=lambda d: os.path.getmtime( 1274 os.path.join(self.model_root, d) 1275 ), 1276 reverse=True, 1277 ) 1278 1279 if len(checkpoint_dirs) > 1: 1280 logging.warning( 1281 f"Multiple checkpoint directories found: {checkpoint_dirs}. Selecting the latest one: {checkpoint_dirs[0]}." 1282 ) 1283 1284 selected_checkpoint = checkpoint_dirs[0] 1285 logging.info( 1286 f"Loading model from disk: {self.model_root}/{selected_checkpoint}" 1287 ) 1288 model_path = os.path.join(self.model_root, selected_checkpoint) 1289 1290 image_processor = AutoProcessor.from_pretrained(model_path) 1291 model = AutoModelForObjectDetection.from_pretrained(model_path) 1292 except Exception as e: 1293 logging.error( 1294 f"Model {self.model_name} could not be loaded from folder {self.model_root}/{selected_checkpoint}. Inference not possible." 1295 ) 1296 1297 device, _, _ = get_backend() 1298 logging.info(f"Using device {device} for inference.") 1299 model = model.to(device) 1300 model.eval() 1301 1302 pred_key = f"pred_od_{self.model_name_key}-{dataset_name}" 1303 1304 if inference_settings["inference_on_test"] is True: 1305 INFERENCE_SPLITS = ["test"] 1306 dataset_eval_view = self.dataset.match_tags(INFERENCE_SPLITS) 1307 else: 1308 dataset_eval_view = self.dataset 1309 1310 detection_threshold = inference_settings["detection_threshold"] 1311 1312 with torch.amp.autocast("cuda"), torch.inference_mode(): 1313 for sample in dataset_eval_view.iter_samples(progress=True, autosave=True): 1314 image_width = sample.metadata.width 1315 image_height = sample.metadata.height 1316 img_filepath = sample.filepath 1317 1318 image = Image.open(img_filepath) 1319 inputs = image_processor(images=[image], return_tensors="pt") 1320 outputs = model(**inputs.to(device)) 1321 target_sizes = torch.tensor([[image.size[1], image.size[0]]]) 1322 1323 results = image_processor.post_process_object_detection( 1324 outputs, threshold=detection_threshold, target_sizes=target_sizes 1325 )[0] 1326 1327 detections = [] 1328 for score, label, box in zip( 1329 results["scores"], results["labels"], results["boxes"] 1330 ): 1331 # Bbox is in absolute coordinates x, y, x2, y2 1332 box = box.tolist() 1333 text_label = model.config.id2label[label.item()] 1334 1335 # Voxel51 requires relative coordinates between 0 and 1 1336 top_left_x = box[0] / image_width 1337 top_left_y = box[1] / image_height 1338 box_width = (box[2] - box[0]) / image_width 1339 box_height = (box[3] - box[1]) / image_height 1340 detection = fo.Detection( 1341 label=text_label, 1342 bounding_box=[ 1343 top_left_x, 1344 top_left_y, 1345 box_width, 1346 box_height, 1347 ], 1348 confidence=score.item(), 1349 ) 1350 detections.append(detection) 1351 1352 sample[pred_key] = fo.Detections(detections=detections) 1353 1354 if inference_settings["do_eval"] is True: 1355 eval_key = re.sub( 1356 r"[\W-]+", "_", "eval_" + self.model_name + "_" + self.dataset_name 1357 ) 1358 1359 if inference_settings["inference_on_test"] is True: 1360 dataset_view = self.dataset.match_tags(["test"]) 1361 else: 1362 dataset_view = self.dataset 1363 1364 dataset_view.evaluate_detections( 1365 pred_key, 1366 gt_field=gt_field, 1367 eval_key=eval_key, 1368 compute_mAP=True, 1369 )
Object detection using HuggingFace models with support for training and inference.
1062 def __init__( 1063 self, 1064 dataset, 1065 config, 1066 output_model_path="./output/models/object_detection_hf", 1067 output_detections_path="./output/detections/", 1068 gt_field="ground_truth", 1069 ): 1070 """Initialize with dataset, config, and optional output paths.""" 1071 self.dataset = dataset 1072 self.config = config 1073 self.model_name = config["model_name"] 1074 self.model_name_key = re.sub(r"[\W-]+", "_", self.model_name) 1075 self.dataset_name = config["v51_dataset_name"] 1076 self.do_convert_annotations = True # HF can convert (top_left_x, top_left_y, bottom_right_x, bottom_right_y) in abs. coordinates to (x_min, y_min, width, height) in rel. coordinates https://github.com/huggingface/transformers/blob/v4.48.2/src/transformers/models/conditional_detr/image_processing_conditional_detr.py#L1497 1077 1078 self.detections_root = os.path.join( 1079 output_detections_path, self.dataset_name, self.model_name_key 1080 ) 1081 1082 self.model_root = os.path.join( 1083 output_model_path, self.dataset_name, self.model_name_key 1084 ) 1085 1086 self.hf_hub_model_id = ( 1087 f"{HF_ROOT}/" + f"{self.dataset_name}_{self.model_name}".replace("/", "_") 1088 ) 1089 1090 self.categories = dataset.distinct(f"{gt_field}.detections.label") 1091 self.id2label = {index: x for index, x in enumerate(self.categories, start=0)} 1092 self.label2id = {v: k for k, v in self.id2label.items()}
Initialize with dataset, config, and optional output paths.
1094 def collate_fn(self, batch): 1095 """Collate function for batching data during training and inference.""" 1096 data = {} 1097 data["pixel_values"] = torch.stack([x["pixel_values"] for x in batch]) 1098 data["labels"] = [x["labels"] for x in batch] 1099 if "pixel_mask" in batch[0]: 1100 data["pixel_mask"] = torch.stack([x["pixel_mask"] for x in batch]) 1101 return data
Collate function for batching data during training and inference.
1103 def train(self, hf_dataset, overwrite_output=True): 1104 """Train models for object detection tasks with support for custom image sizes and transformations.""" 1105 torch.cuda.empty_cache() 1106 img_size_target = self.config.get("image_size", None) 1107 if img_size_target is None: 1108 image_processor = AutoProcessor.from_pretrained( 1109 self.model_name, 1110 do_resize=False, 1111 do_pad=True, 1112 use_fast=True, 1113 do_convert_annotations=self.do_convert_annotations, 1114 ) 1115 else: 1116 logging.warning(f"Resizing images to target size {img_size_target}.") 1117 image_processor = AutoProcessor.from_pretrained( 1118 self.model_name, 1119 do_resize=True, 1120 size={ 1121 "max_height": img_size_target[1], 1122 "max_width": img_size_target[0], 1123 }, 1124 do_pad=True, 1125 pad_size={"height": img_size_target[1], "width": img_size_target[0]}, 1126 use_fast=True, 1127 do_convert_annotations=self.do_convert_annotations, 1128 ) 1129 1130 train_transform_batch = partial( 1131 transform_batch_standalone, 1132 image_processor=image_processor, 1133 do_convert_annotations=self.do_convert_annotations, 1134 ) 1135 val_test_transform_batch = partial( 1136 transform_batch_standalone, 1137 image_processor=image_processor, 1138 do_convert_annotations=self.do_convert_annotations, 1139 ) 1140 1141 hf_dataset[Split.TRAIN] = hf_dataset[Split.TRAIN].with_transform( 1142 train_transform_batch 1143 ) 1144 hf_dataset[Split.VALIDATION] = hf_dataset[Split.VALIDATION].with_transform( 1145 val_test_transform_batch 1146 ) 1147 hf_dataset[Split.TEST] = hf_dataset[Split.TEST].with_transform( 1148 val_test_transform_batch 1149 ) 1150 1151 hf_model_config = AutoConfig.from_pretrained(self.model_name) 1152 hf_model_config_name = type(hf_model_config).__name__ 1153 1154 if type(hf_model_config) in AutoModelForObjectDetection._model_mapping: 1155 model = AutoModelForObjectDetection.from_pretrained( 1156 self.model_name, 1157 id2label=self.id2label, 1158 label2id=self.label2id, 1159 ignore_mismatched_sizes=True, 1160 ) 1161 else: 1162 model = None 1163 logging.error( 1164 "Hugging Face AutoModel does not support " + str(type(hf_model_config)) 1165 ) 1166 1167 if ( 1168 overwrite_output == True 1169 and os.path.exists(self.model_root) 1170 and os.listdir(self.model_root) 1171 ): 1172 logging.warning( 1173 f"Training will overwrite existing results in {self.model_root}" 1174 ) 1175 1176 training_args = TrainingArguments( 1177 run_name=self.model_name, 1178 output_dir=self.model_root, 1179 overwrite_output_dir=overwrite_output, 1180 num_train_epochs=self.config["epochs"], 1181 fp16=True, 1182 per_device_train_batch_size=self.config["batch_size"], 1183 auto_find_batch_size=True, 1184 dataloader_num_workers=min(self.config["n_worker_dataloader"], NUM_WORKERS), 1185 learning_rate=self.config["learning_rate"], 1186 lr_scheduler_type="cosine", 1187 weight_decay=self.config["weight_decay"], 1188 max_grad_norm=self.config["max_grad_norm"], 1189 metric_for_best_model="eval_loss", 1190 greater_is_better=False, 1191 load_best_model_at_end=True, 1192 eval_strategy="epoch", 1193 save_strategy="best", 1194 save_total_limit=1, 1195 remove_unused_columns=False, 1196 eval_do_concat_batches=False, 1197 save_safetensors=False, # Does not work with all models 1198 hub_model_id=self.hf_hub_model_id, 1199 hub_private_repo=True, 1200 push_to_hub=HF_DO_UPLOAD, 1201 seed=GLOBAL_SEED, 1202 data_seed=GLOBAL_SEED, 1203 ) 1204 1205 early_stopping_callback = EarlyStoppingCallback( 1206 early_stopping_patience=self.config["early_stop_patience"], 1207 early_stopping_threshold=self.config["early_stop_threshold"], 1208 ) 1209 1210 trainer = Trainer( 1211 model=model, 1212 args=training_args, 1213 train_dataset=hf_dataset[Split.TRAIN], 1214 eval_dataset=hf_dataset[Split.VALIDATION], 1215 tokenizer=image_processor, 1216 data_collator=self.collate_fn, 1217 callbacks=[early_stopping_callback], 1218 # compute_metrics=eval_compute_metrics_fn, 1219 ) 1220 1221 logging.info(f"Starting training of model {self.model_name}.") 1222 trainer.train() 1223 if HF_DO_UPLOAD: 1224 trainer.push_to_hub() 1225 1226 metrics = trainer.evaluate(eval_dataset=hf_dataset[Split.TEST]) 1227 logging.info(f"Model training completed. Evaluation results: {metrics}")
Train models for object detection tasks with support for custom image sizes and transformations.
1229 def inference(self, inference_settings, load_from_hf=True, gt_field="ground_truth"): 1230 """Performs model inference on a dataset, loading from Hugging Face or disk, and optionally evaluates detection results.""" 1231 1232 model_hf = inference_settings["model_hf"] 1233 dataset_name = None 1234 if model_hf is not None: 1235 self.hf_hub_model_id = model_hf 1236 dataset_name, model_name = get_dataset_and_model_from_hf_id(model_hf) 1237 else: 1238 dataset_name = self.dataset_name 1239 torch.cuda.empty_cache() 1240 # Load trained model from Hugging Face 1241 load_from_hf_successful = None 1242 if load_from_hf: 1243 try: 1244 logging.info(f"Loading model from Hugging Face: {self.hf_hub_model_id}") 1245 image_processor = AutoProcessor.from_pretrained(self.hf_hub_model_id) 1246 model = AutoModelForObjectDetection.from_pretrained( 1247 self.hf_hub_model_id 1248 ) 1249 load_from_hf_successful = True 1250 except Exception as e: 1251 load_from_hf_successful = False 1252 logging.warning( 1253 f"Model {self.model_name} could not be loaded from Hugging Face {self.hf_hub_model_id}. Attempting loading from disk." 1254 ) 1255 if load_from_hf == False or load_from_hf_successful == False: 1256 try: 1257 # Select folder in self.model_root that include 'checkpoint-' 1258 checkpoint_dirs = [ 1259 d 1260 for d in os.listdir(self.model_root) 1261 if "checkpoint-" in d 1262 and os.path.isdir(os.path.join(self.model_root, d)) 1263 ] 1264 1265 if not checkpoint_dirs: 1266 logging.error( 1267 f"No checkpoint directory found in {self.model_root}!" 1268 ) 1269 model_path = None 1270 else: 1271 # Sort by modification time (latest first) 1272 checkpoint_dirs.sort( 1273 key=lambda d: os.path.getmtime( 1274 os.path.join(self.model_root, d) 1275 ), 1276 reverse=True, 1277 ) 1278 1279 if len(checkpoint_dirs) > 1: 1280 logging.warning( 1281 f"Multiple checkpoint directories found: {checkpoint_dirs}. Selecting the latest one: {checkpoint_dirs[0]}." 1282 ) 1283 1284 selected_checkpoint = checkpoint_dirs[0] 1285 logging.info( 1286 f"Loading model from disk: {self.model_root}/{selected_checkpoint}" 1287 ) 1288 model_path = os.path.join(self.model_root, selected_checkpoint) 1289 1290 image_processor = AutoProcessor.from_pretrained(model_path) 1291 model = AutoModelForObjectDetection.from_pretrained(model_path) 1292 except Exception as e: 1293 logging.error( 1294 f"Model {self.model_name} could not be loaded from folder {self.model_root}/{selected_checkpoint}. Inference not possible." 1295 ) 1296 1297 device, _, _ = get_backend() 1298 logging.info(f"Using device {device} for inference.") 1299 model = model.to(device) 1300 model.eval() 1301 1302 pred_key = f"pred_od_{self.model_name_key}-{dataset_name}" 1303 1304 if inference_settings["inference_on_test"] is True: 1305 INFERENCE_SPLITS = ["test"] 1306 dataset_eval_view = self.dataset.match_tags(INFERENCE_SPLITS) 1307 else: 1308 dataset_eval_view = self.dataset 1309 1310 detection_threshold = inference_settings["detection_threshold"] 1311 1312 with torch.amp.autocast("cuda"), torch.inference_mode(): 1313 for sample in dataset_eval_view.iter_samples(progress=True, autosave=True): 1314 image_width = sample.metadata.width 1315 image_height = sample.metadata.height 1316 img_filepath = sample.filepath 1317 1318 image = Image.open(img_filepath) 1319 inputs = image_processor(images=[image], return_tensors="pt") 1320 outputs = model(**inputs.to(device)) 1321 target_sizes = torch.tensor([[image.size[1], image.size[0]]]) 1322 1323 results = image_processor.post_process_object_detection( 1324 outputs, threshold=detection_threshold, target_sizes=target_sizes 1325 )[0] 1326 1327 detections = [] 1328 for score, label, box in zip( 1329 results["scores"], results["labels"], results["boxes"] 1330 ): 1331 # Bbox is in absolute coordinates x, y, x2, y2 1332 box = box.tolist() 1333 text_label = model.config.id2label[label.item()] 1334 1335 # Voxel51 requires relative coordinates between 0 and 1 1336 top_left_x = box[0] / image_width 1337 top_left_y = box[1] / image_height 1338 box_width = (box[2] - box[0]) / image_width 1339 box_height = (box[3] - box[1]) / image_height 1340 detection = fo.Detection( 1341 label=text_label, 1342 bounding_box=[ 1343 top_left_x, 1344 top_left_y, 1345 box_width, 1346 box_height, 1347 ], 1348 confidence=score.item(), 1349 ) 1350 detections.append(detection) 1351 1352 sample[pred_key] = fo.Detections(detections=detections) 1353 1354 if inference_settings["do_eval"] is True: 1355 eval_key = re.sub( 1356 r"[\W-]+", "_", "eval_" + self.model_name + "_" + self.dataset_name 1357 ) 1358 1359 if inference_settings["inference_on_test"] is True: 1360 dataset_view = self.dataset.match_tags(["test"]) 1361 else: 1362 dataset_view = self.dataset 1363 1364 dataset_view.evaluate_detections( 1365 pred_key, 1366 gt_field=gt_field, 1367 eval_key=eval_key, 1368 compute_mAP=True, 1369 )
Performs model inference on a dataset, loading from Hugging Face or disk, and optionally evaluates detection results.
1372class CustomCoDETRObjectDetection: 1373 """Interface for running Co-DETR object detection model training and inference in containers""" 1374 1375 def __init__(self, dataset, dataset_info, run_config): 1376 """Initialize Co-DETR interface with dataset and configuration""" 1377 self.root_codetr = "./custom_models/CoDETR/Co-DETR" 1378 self.root_codetr_models = "output/models/codetr" 1379 self.dataset = dataset 1380 self.dataset_name = dataset_info["name"] 1381 self.export_dir_root = run_config["export_dataset_root"] 1382 self.config_key = os.path.splitext(os.path.basename(run_config["config"]))[0] 1383 self.hf_repo_name = f"{HF_ROOT}/{self.dataset_name}_{self.config_key}" 1384 1385 def convert_data(self): 1386 """Convert dataset to COCO format required by Co-DETR""" 1387 1388 export_dir = os.path.join(self.export_dir_root, self.dataset_name, "coco") 1389 1390 # Check if folder already exists 1391 if not os.path.exists(export_dir): 1392 # Make directory 1393 os.makedirs(export_dir, exist_ok=True) 1394 logging.info(f"Exporting data to {export_dir}") 1395 splits = [ 1396 "train", 1397 "val", 1398 "test", 1399 ] # CoDETR expects data in 'train' and 'val' folder 1400 for split in splits: 1401 split_view = self.dataset.match_tags(split) 1402 split_view.export( 1403 dataset_type=fo.types.COCODetectionDataset, 1404 data_path=os.path.join(export_dir, f"{split}2017"), 1405 labels_path=os.path.join( 1406 export_dir, "annotations", f"instances_{split}2017.json" 1407 ), 1408 label_field="ground_truth", 1409 ) 1410 else: 1411 logging.warning( 1412 f"Folder {export_dir} already exists, skipping data export." 1413 ) 1414 1415 def update_config_file(self, dataset_name, config_file, max_epochs): 1416 """Update Co-DETR config file with dataset-specific parameters""" 1417 1418 config_path = os.path.join(self.root_codetr, config_file) 1419 1420 # Get classes from exported data 1421 annotations_json = os.path.join( 1422 self.export_dir_root, 1423 dataset_name, 1424 "coco/annotations/instances_train2017.json", 1425 ) 1426 # Read the JSON file 1427 with open(annotations_json, "r") as file: 1428 data = json.load(file) 1429 1430 # Extract the value associated with the key "categories" 1431 categories = data.get("categories") 1432 class_names = tuple(category["name"] for category in categories) 1433 num_classes = len(class_names) 1434 1435 # Update configuration file 1436 # This assumes that 'classes = '('a','b',...)' are already defined and will be overwritten. 1437 with open(config_path, "r") as file: 1438 content = file.read() 1439 1440 # Update the classes tuple 1441 content = re.sub(r"classes\s*=\s*\(.*?\)", f"classes = {class_names}", content) 1442 1443 # Update all instances of num_classes 1444 content = re.sub(r"num_classes=\d+", f"num_classes={num_classes}", content) 1445 1446 # Update all instances of max_epochs 1447 content = re.sub(r"max_epochs=\d+", f"max_epochs={max_epochs}", content) 1448 1449 with open(config_path, "w") as file: 1450 file.write(content) 1451 1452 logging.warning( 1453 f"Updated {config_path} with classes={class_names} and num_classes={num_classes} and max_epochs={max_epochs}" 1454 ) 1455 1456 def train(self, param_config, param_n_gpus, container_tool, param_function="train"): 1457 """Train Co-DETR model using containerized environment""" 1458 1459 # Check if model already exists 1460 output_folder_codetr = os.path.join(self.root_codetr, "output") 1461 os.makedirs(output_folder_codetr, exist_ok=True) 1462 param_config_name = os.path.splitext(os.path.basename(param_config))[0] 1463 best_models_dir = os.path.join(output_folder_codetr, "best") 1464 os.makedirs(best_models_dir, exist_ok=True) 1465 # Best model files follow the naming scheme "config_dataset.pth" 1466 pth_model_files = ( 1467 [f for f in os.listdir(best_models_dir) if f.endswith(".pth")] 1468 if os.path.exists(best_models_dir) and os.path.isdir(best_models_dir) 1469 else [] 1470 ) 1471 1472 # Best model files are stored in the format "config_dataset.pth" 1473 matching_files = [ 1474 f 1475 for f in pth_model_files 1476 if f.startswith(param_config_name) 1477 and self.dataset_name in f 1478 and f.endswith(".pth") 1479 ] 1480 if len(matching_files) > 0: 1481 logging.warning( 1482 f"Model {param_config_name} already trained on dataset {self.dataset_name}. Skipping training." 1483 ) 1484 if len(matching_files) > 1: 1485 logging.warning(f"Multiple weights found: {matching_files}") 1486 else: 1487 logging.info( 1488 f"Launching training for Co-DETR config {param_config} and dataset {self.dataset_name}." 1489 ) 1490 volume_data = os.path.join(self.export_dir_root, self.dataset_name) 1491 1492 # Train model, store checkpoints in 'output_folder_codetr' 1493 train_result = self._run_container( 1494 volume_data=volume_data, 1495 param_function=param_function, 1496 param_config=param_config, 1497 param_n_gpus=param_n_gpus, 1498 container_tool=container_tool, 1499 ) 1500 1501 # Find the best_bbox checkpoint file 1502 checkpoint_files = [ 1503 f 1504 for f in os.listdir(output_folder_codetr) 1505 if "best_bbox" in f and f.endswith(".pth") 1506 ] 1507 if not checkpoint_files: 1508 logging.error( 1509 "Co-DETR was not trained, model pth file missing. No checkpoint file with 'best_bbox' found." 1510 ) 1511 else: 1512 if len(checkpoint_files) > 1: 1513 logging.warning( 1514 f"Found {len(checkpoint_files)} checkpoint files. Selecting {checkpoint_files[0]}." 1515 ) 1516 checkpoint = checkpoint_files[0] 1517 checkpoint_path = os.path.join(output_folder_codetr, checkpoint) 1518 logging.info("Co-DETR was trained successfully.") 1519 1520 # Upload best model to Hugging Face 1521 if HF_DO_UPLOAD == True: 1522 logging.info("Uploading Co-DETR model to Hugging Face.") 1523 api = HfApi() 1524 api.create_repo( 1525 self.hf_repo_name, 1526 private=True, 1527 repo_type="model", 1528 exist_ok=True, 1529 ) 1530 api.upload_file( 1531 path_or_fileobj=checkpoint_path, 1532 path_in_repo="model.pth", 1533 repo_id=self.hf_repo_name, 1534 repo_type="model", 1535 ) 1536 1537 # Move best model file and clear output folder 1538 self._run_container( 1539 volume_data=volume_data, 1540 param_function="clear-output", 1541 param_config=param_config, 1542 param_dataset_name=self.dataset_name, 1543 container_tool=container_tool, 1544 ) 1545 1546 @staticmethod 1547 def _find_file_iteratively(start_path, filename): 1548 """Direct access or recursively search for a file in a directory structure.""" 1549 # Convert start_path to a Path object 1550 start_path = Path(start_path) 1551 1552 # Check if the file exists in the start_path directly (very fast) 1553 file_path = start_path / filename 1554 if file_path.exists(): 1555 return str(file_path) 1556 1557 # Start with the highest directory and go up iteratively 1558 current_dir = start_path 1559 checked_dirs = set() 1560 1561 while current_dir != current_dir.root: 1562 # Check if the file is in the current directory 1563 file_path = current_dir / filename 1564 if file_path.exists(): 1565 return str(file_path) 1566 1567 # If we haven't checked the sibling directories, check them as well 1568 parent_dir = current_dir.parent 1569 if parent_dir not in checked_dirs: 1570 # Check sibling directories 1571 for sibling in parent_dir.iterdir(): 1572 if sibling != current_dir and sibling.is_dir(): 1573 sibling_file_path = sibling / filename 1574 if sibling_file_path.exists(): 1575 return str(sibling_file_path) 1576 checked_dirs.add(parent_dir) 1577 1578 # Otherwise, go one level up 1579 current_dir = current_dir.parent 1580 1581 # If file is not found after traversing all levels, return None 1582 logging.error(f"File {filename} could not be found.") 1583 return None 1584 1585 def run_inference( 1586 self, 1587 dataset, 1588 param_config, 1589 param_n_gpus, 1590 container_tool, 1591 inference_settings, 1592 param_function="inference", 1593 inference_output_folder="custom_models/CoDETR/Co-DETR/output/inference/", 1594 gt_field="ground_truth", 1595 ): 1596 """Run inference using trained Co-DETR model and convert results to FiftyOne format""" 1597 1598 logging.info(f"Launching inference for Co-DETR config {param_config}.") 1599 volume_data = os.path.join(self.export_dir_root, self.dataset_name) 1600 1601 if inference_settings["inference_on_test"] is True: 1602 folder_inference = os.path.join("coco", "test2017") 1603 else: 1604 folder_inference = os.path.join("coco") 1605 1606 # Get model from Hugging Face 1607 dataset_name = None 1608 config_key = None 1609 try: 1610 if inference_settings["model_hf"] is None: 1611 hf_path = self.hf_repo_name 1612 else: 1613 hf_path = inference_settings["model_hf"] 1614 1615 dataset_name, config_key = get_dataset_and_model_from_hf_id(hf_path) 1616 1617 download_folder = os.path.join( 1618 self.root_codetr_models, dataset_name, config_key 1619 ) 1620 1621 logging.info( 1622 f"Downloading model {hf_path} from Hugging Face into {download_folder}" 1623 ) 1624 os.makedirs(download_folder, exist_ok=True) 1625 1626 file_path = hf_hub_download( 1627 repo_id=hf_path, 1628 filename="model.pth", 1629 local_dir=download_folder, 1630 ) 1631 except Exception as e: 1632 logging.error(f"An error occured during model download: {e}") 1633 1634 model_path = os.path.join(dataset_name, config_key, "model.pth") 1635 logging.info(f"Starting inference for model {model_path}") 1636 1637 inference_result = self._run_container( 1638 volume_data=volume_data, 1639 param_function=param_function, 1640 param_config=param_config, 1641 param_n_gpus=param_n_gpus, 1642 container_tool=container_tool, 1643 param_inference_dataset_folder=folder_inference, 1644 param_inference_model_checkpoint=model_path, 1645 ) 1646 1647 # Convert results from JSON output into V51 dataset 1648 # Files follow format inference_results_{timestamp}.json (run_inference.py) 1649 os.makedirs(inference_output_folder, exist_ok=True) 1650 output_files = [ 1651 f 1652 for f in os.listdir(inference_output_folder) 1653 if f.startswith("inference_results_") and f.endswith(".json") 1654 ] 1655 logging.debug(f"Found files with inference content: {output_files}") 1656 1657 if not output_files: 1658 logging.error( 1659 f"No inference result files found in {inference_output_folder}" 1660 ) 1661 1662 # Get full path for each file 1663 file_paths = [os.path.join(inference_output_folder, f) for f in output_files] 1664 1665 # Extract timestamp from the filename and sort based on the timestamp 1666 file_paths_sorted = sorted( 1667 file_paths, 1668 key=lambda f: datetime.datetime.strptime( 1669 f.split("_")[-2] + "_" + f.split("_")[-1].replace(".json", ""), 1670 "%Y%m%d_%H%M%S", 1671 ), 1672 reverse=True, 1673 ) 1674 1675 # Use the most recent file based on timestamp 1676 latest_file = file_paths_sorted[0] 1677 logging.info(f"Using inference results from: {latest_file}") 1678 with open(latest_file, "r") as file: 1679 data = json.load(file) 1680 1681 # Get conversion for annotated classes 1682 annotations_path = os.path.join( 1683 volume_data, "coco", "annotations", "instances_train2017.json" 1684 ) 1685 1686 with open(annotations_path, "r") as file: 1687 data_annotations = json.load(file) 1688 1689 class_ids_and_names = [ 1690 (category["id"], category["name"]) 1691 for category in data_annotations["categories"] 1692 ] 1693 1694 # Match sample filepaths (from exported Co-DETR COCO format) to V51 filepaths 1695 sample = dataset.first() 1696 root_dir_samples = sample.filepath 1697 1698 # Convert results into V51 file format 1699 detection_threshold = inference_settings["detection_threshold"] 1700 pred_key = f"pred_od_{config_key}-{dataset_name}" 1701 for key, value in tqdm(data.items(), desc="Processing Co-DETR detection"): 1702 try: 1703 # Get filename 1704 filepath = CustomCoDETRObjectDetection._find_file_iteratively( 1705 root_dir_samples, os.path.basename(key) 1706 ) 1707 sample = dataset[filepath] 1708 1709 img_width = sample.metadata.width 1710 img_height = sample.metadata.height 1711 1712 detections_v51 = [] 1713 for class_id, class_detections in enumerate(data[key]): # Starts with 0 1714 if len(class_detections) > 0: 1715 objects_class = class_ids_and_names[class_id] 1716 for detection in class_detections: 1717 confidence = detection[4] 1718 detection_v51 = fo.Detection( 1719 label=objects_class[1], 1720 bounding_box=[ 1721 detection[0] / img_width, 1722 detection[1] / img_height, 1723 (detection[2] - detection[0]) / img_width, 1724 (detection[3] - detection[1]) / img_height, 1725 ], 1726 confidence=confidence, 1727 ) 1728 if confidence >= detection_threshold: 1729 detections_v51.append(detection_v51) 1730 1731 sample[pred_key] = fo.Detections(detections=detections_v51) 1732 sample.save() 1733 except Exception as e: 1734 logging.error( 1735 f"An error occured during the conversion of Co-DETR inference results to the V51 dataset: {e}" 1736 ) 1737 1738 # Run V51 evaluation 1739 if inference_settings["do_eval"] is True: 1740 eval_key = pred_key.replace("pred_", "eval_").replace("-", "_") 1741 1742 if inference_settings["inference_on_test"] is True: 1743 dataset_view = dataset.match_tags(["test"]) 1744 else: 1745 dataset_view = dataset 1746 1747 logging.info( 1748 f"Starting evaluation for {pred_key} in evaluation key {eval_key}." 1749 ) 1750 dataset_view.evaluate_detections( 1751 pred_key, 1752 gt_field=gt_field, 1753 eval_key=eval_key, 1754 compute_mAP=True, 1755 ) 1756 1757 def _run_container( 1758 self, 1759 volume_data, 1760 param_function, 1761 param_config="", 1762 param_n_gpus="1", 1763 param_dataset_name="", 1764 param_inference_dataset_folder="", 1765 param_inference_model_checkpoint="", 1766 image="dbogdollresearch/codetr", 1767 workdir="/launch", 1768 container_tool="docker", 1769 ): 1770 """Execute Co-DETR container with specified parameters using Docker or Singularity""" 1771 1772 try: 1773 # Convert relative paths to absolute paths (necessary under WSL2) 1774 root_codetr_abs = os.path.abspath(self.root_codetr) 1775 volume_data_abs = os.path.abspath(volume_data) 1776 root_codetr_models_abs = os.path.abspath(self.root_codetr_models) 1777 1778 # Check if using Docker or Singularity and define the appropriate command 1779 if container_tool == "docker": 1780 command = [ 1781 "docker", 1782 "run", 1783 "--gpus", 1784 "all", 1785 "--workdir", 1786 workdir, 1787 "--volume", 1788 f"{root_codetr_abs}:{workdir}", 1789 "--volume", 1790 f"{volume_data_abs}:{workdir}/data:ro", 1791 "--volume", 1792 f"{root_codetr_models_abs}:{workdir}/hf_models:ro", 1793 "--shm-size=8g", 1794 image, 1795 param_function, 1796 param_config, 1797 param_n_gpus, 1798 param_dataset_name, 1799 param_inference_dataset_folder, 1800 param_inference_model_checkpoint, 1801 ] 1802 elif container_tool == "singularity": 1803 command = [ 1804 "singularity", 1805 "run", 1806 "--nv", 1807 "--pwd", 1808 workdir, 1809 "--bind", 1810 f"{self.root_codetr}:{workdir}", 1811 "--bind", 1812 f"{volume_data}:{workdir}/data:ro", 1813 "--bind", 1814 f"{self.root_codetr_models}:{workdir}/hf_models:ro", 1815 f"docker://{image}", 1816 param_function, 1817 param_config, 1818 param_n_gpus, 1819 param_dataset_name, 1820 param_inference_dataset_folder, 1821 param_inference_model_checkpoint, 1822 ] 1823 else: 1824 raise ValueError( 1825 f"Invalid container tool specified: {container_tool}. Choose 'docker' or 'singularity'." 1826 ) 1827 1828 # Start the process and stream outputs to the console 1829 logging.info(f"Launching terminal command {command}") 1830 with subprocess.Popen( 1831 command, stdout=sys.stdout, stderr=sys.stderr, text=True 1832 ) as proc: 1833 proc.wait() # Wait for the process to complete 1834 return True 1835 except Exception as e: 1836 logging.error(f"Error during Co-DETR container run: {e}") 1837 return False
Interface for running Co-DETR object detection model training and inference in containers
1375 def __init__(self, dataset, dataset_info, run_config): 1376 """Initialize Co-DETR interface with dataset and configuration""" 1377 self.root_codetr = "./custom_models/CoDETR/Co-DETR" 1378 self.root_codetr_models = "output/models/codetr" 1379 self.dataset = dataset 1380 self.dataset_name = dataset_info["name"] 1381 self.export_dir_root = run_config["export_dataset_root"] 1382 self.config_key = os.path.splitext(os.path.basename(run_config["config"]))[0] 1383 self.hf_repo_name = f"{HF_ROOT}/{self.dataset_name}_{self.config_key}"
Initialize Co-DETR interface with dataset and configuration
1385 def convert_data(self): 1386 """Convert dataset to COCO format required by Co-DETR""" 1387 1388 export_dir = os.path.join(self.export_dir_root, self.dataset_name, "coco") 1389 1390 # Check if folder already exists 1391 if not os.path.exists(export_dir): 1392 # Make directory 1393 os.makedirs(export_dir, exist_ok=True) 1394 logging.info(f"Exporting data to {export_dir}") 1395 splits = [ 1396 "train", 1397 "val", 1398 "test", 1399 ] # CoDETR expects data in 'train' and 'val' folder 1400 for split in splits: 1401 split_view = self.dataset.match_tags(split) 1402 split_view.export( 1403 dataset_type=fo.types.COCODetectionDataset, 1404 data_path=os.path.join(export_dir, f"{split}2017"), 1405 labels_path=os.path.join( 1406 export_dir, "annotations", f"instances_{split}2017.json" 1407 ), 1408 label_field="ground_truth", 1409 ) 1410 else: 1411 logging.warning( 1412 f"Folder {export_dir} already exists, skipping data export." 1413 )
Convert dataset to COCO format required by Co-DETR
1415 def update_config_file(self, dataset_name, config_file, max_epochs): 1416 """Update Co-DETR config file with dataset-specific parameters""" 1417 1418 config_path = os.path.join(self.root_codetr, config_file) 1419 1420 # Get classes from exported data 1421 annotations_json = os.path.join( 1422 self.export_dir_root, 1423 dataset_name, 1424 "coco/annotations/instances_train2017.json", 1425 ) 1426 # Read the JSON file 1427 with open(annotations_json, "r") as file: 1428 data = json.load(file) 1429 1430 # Extract the value associated with the key "categories" 1431 categories = data.get("categories") 1432 class_names = tuple(category["name"] for category in categories) 1433 num_classes = len(class_names) 1434 1435 # Update configuration file 1436 # This assumes that 'classes = '('a','b',...)' are already defined and will be overwritten. 1437 with open(config_path, "r") as file: 1438 content = file.read() 1439 1440 # Update the classes tuple 1441 content = re.sub(r"classes\s*=\s*\(.*?\)", f"classes = {class_names}", content) 1442 1443 # Update all instances of num_classes 1444 content = re.sub(r"num_classes=\d+", f"num_classes={num_classes}", content) 1445 1446 # Update all instances of max_epochs 1447 content = re.sub(r"max_epochs=\d+", f"max_epochs={max_epochs}", content) 1448 1449 with open(config_path, "w") as file: 1450 file.write(content) 1451 1452 logging.warning( 1453 f"Updated {config_path} with classes={class_names} and num_classes={num_classes} and max_epochs={max_epochs}" 1454 )
Update Co-DETR config file with dataset-specific parameters
1456 def train(self, param_config, param_n_gpus, container_tool, param_function="train"): 1457 """Train Co-DETR model using containerized environment""" 1458 1459 # Check if model already exists 1460 output_folder_codetr = os.path.join(self.root_codetr, "output") 1461 os.makedirs(output_folder_codetr, exist_ok=True) 1462 param_config_name = os.path.splitext(os.path.basename(param_config))[0] 1463 best_models_dir = os.path.join(output_folder_codetr, "best") 1464 os.makedirs(best_models_dir, exist_ok=True) 1465 # Best model files follow the naming scheme "config_dataset.pth" 1466 pth_model_files = ( 1467 [f for f in os.listdir(best_models_dir) if f.endswith(".pth")] 1468 if os.path.exists(best_models_dir) and os.path.isdir(best_models_dir) 1469 else [] 1470 ) 1471 1472 # Best model files are stored in the format "config_dataset.pth" 1473 matching_files = [ 1474 f 1475 for f in pth_model_files 1476 if f.startswith(param_config_name) 1477 and self.dataset_name in f 1478 and f.endswith(".pth") 1479 ] 1480 if len(matching_files) > 0: 1481 logging.warning( 1482 f"Model {param_config_name} already trained on dataset {self.dataset_name}. Skipping training." 1483 ) 1484 if len(matching_files) > 1: 1485 logging.warning(f"Multiple weights found: {matching_files}") 1486 else: 1487 logging.info( 1488 f"Launching training for Co-DETR config {param_config} and dataset {self.dataset_name}." 1489 ) 1490 volume_data = os.path.join(self.export_dir_root, self.dataset_name) 1491 1492 # Train model, store checkpoints in 'output_folder_codetr' 1493 train_result = self._run_container( 1494 volume_data=volume_data, 1495 param_function=param_function, 1496 param_config=param_config, 1497 param_n_gpus=param_n_gpus, 1498 container_tool=container_tool, 1499 ) 1500 1501 # Find the best_bbox checkpoint file 1502 checkpoint_files = [ 1503 f 1504 for f in os.listdir(output_folder_codetr) 1505 if "best_bbox" in f and f.endswith(".pth") 1506 ] 1507 if not checkpoint_files: 1508 logging.error( 1509 "Co-DETR was not trained, model pth file missing. No checkpoint file with 'best_bbox' found." 1510 ) 1511 else: 1512 if len(checkpoint_files) > 1: 1513 logging.warning( 1514 f"Found {len(checkpoint_files)} checkpoint files. Selecting {checkpoint_files[0]}." 1515 ) 1516 checkpoint = checkpoint_files[0] 1517 checkpoint_path = os.path.join(output_folder_codetr, checkpoint) 1518 logging.info("Co-DETR was trained successfully.") 1519 1520 # Upload best model to Hugging Face 1521 if HF_DO_UPLOAD == True: 1522 logging.info("Uploading Co-DETR model to Hugging Face.") 1523 api = HfApi() 1524 api.create_repo( 1525 self.hf_repo_name, 1526 private=True, 1527 repo_type="model", 1528 exist_ok=True, 1529 ) 1530 api.upload_file( 1531 path_or_fileobj=checkpoint_path, 1532 path_in_repo="model.pth", 1533 repo_id=self.hf_repo_name, 1534 repo_type="model", 1535 ) 1536 1537 # Move best model file and clear output folder 1538 self._run_container( 1539 volume_data=volume_data, 1540 param_function="clear-output", 1541 param_config=param_config, 1542 param_dataset_name=self.dataset_name, 1543 container_tool=container_tool, 1544 )
Train Co-DETR model using containerized environment
1585 def run_inference( 1586 self, 1587 dataset, 1588 param_config, 1589 param_n_gpus, 1590 container_tool, 1591 inference_settings, 1592 param_function="inference", 1593 inference_output_folder="custom_models/CoDETR/Co-DETR/output/inference/", 1594 gt_field="ground_truth", 1595 ): 1596 """Run inference using trained Co-DETR model and convert results to FiftyOne format""" 1597 1598 logging.info(f"Launching inference for Co-DETR config {param_config}.") 1599 volume_data = os.path.join(self.export_dir_root, self.dataset_name) 1600 1601 if inference_settings["inference_on_test"] is True: 1602 folder_inference = os.path.join("coco", "test2017") 1603 else: 1604 folder_inference = os.path.join("coco") 1605 1606 # Get model from Hugging Face 1607 dataset_name = None 1608 config_key = None 1609 try: 1610 if inference_settings["model_hf"] is None: 1611 hf_path = self.hf_repo_name 1612 else: 1613 hf_path = inference_settings["model_hf"] 1614 1615 dataset_name, config_key = get_dataset_and_model_from_hf_id(hf_path) 1616 1617 download_folder = os.path.join( 1618 self.root_codetr_models, dataset_name, config_key 1619 ) 1620 1621 logging.info( 1622 f"Downloading model {hf_path} from Hugging Face into {download_folder}" 1623 ) 1624 os.makedirs(download_folder, exist_ok=True) 1625 1626 file_path = hf_hub_download( 1627 repo_id=hf_path, 1628 filename="model.pth", 1629 local_dir=download_folder, 1630 ) 1631 except Exception as e: 1632 logging.error(f"An error occured during model download: {e}") 1633 1634 model_path = os.path.join(dataset_name, config_key, "model.pth") 1635 logging.info(f"Starting inference for model {model_path}") 1636 1637 inference_result = self._run_container( 1638 volume_data=volume_data, 1639 param_function=param_function, 1640 param_config=param_config, 1641 param_n_gpus=param_n_gpus, 1642 container_tool=container_tool, 1643 param_inference_dataset_folder=folder_inference, 1644 param_inference_model_checkpoint=model_path, 1645 ) 1646 1647 # Convert results from JSON output into V51 dataset 1648 # Files follow format inference_results_{timestamp}.json (run_inference.py) 1649 os.makedirs(inference_output_folder, exist_ok=True) 1650 output_files = [ 1651 f 1652 for f in os.listdir(inference_output_folder) 1653 if f.startswith("inference_results_") and f.endswith(".json") 1654 ] 1655 logging.debug(f"Found files with inference content: {output_files}") 1656 1657 if not output_files: 1658 logging.error( 1659 f"No inference result files found in {inference_output_folder}" 1660 ) 1661 1662 # Get full path for each file 1663 file_paths = [os.path.join(inference_output_folder, f) for f in output_files] 1664 1665 # Extract timestamp from the filename and sort based on the timestamp 1666 file_paths_sorted = sorted( 1667 file_paths, 1668 key=lambda f: datetime.datetime.strptime( 1669 f.split("_")[-2] + "_" + f.split("_")[-1].replace(".json", ""), 1670 "%Y%m%d_%H%M%S", 1671 ), 1672 reverse=True, 1673 ) 1674 1675 # Use the most recent file based on timestamp 1676 latest_file = file_paths_sorted[0] 1677 logging.info(f"Using inference results from: {latest_file}") 1678 with open(latest_file, "r") as file: 1679 data = json.load(file) 1680 1681 # Get conversion for annotated classes 1682 annotations_path = os.path.join( 1683 volume_data, "coco", "annotations", "instances_train2017.json" 1684 ) 1685 1686 with open(annotations_path, "r") as file: 1687 data_annotations = json.load(file) 1688 1689 class_ids_and_names = [ 1690 (category["id"], category["name"]) 1691 for category in data_annotations["categories"] 1692 ] 1693 1694 # Match sample filepaths (from exported Co-DETR COCO format) to V51 filepaths 1695 sample = dataset.first() 1696 root_dir_samples = sample.filepath 1697 1698 # Convert results into V51 file format 1699 detection_threshold = inference_settings["detection_threshold"] 1700 pred_key = f"pred_od_{config_key}-{dataset_name}" 1701 for key, value in tqdm(data.items(), desc="Processing Co-DETR detection"): 1702 try: 1703 # Get filename 1704 filepath = CustomCoDETRObjectDetection._find_file_iteratively( 1705 root_dir_samples, os.path.basename(key) 1706 ) 1707 sample = dataset[filepath] 1708 1709 img_width = sample.metadata.width 1710 img_height = sample.metadata.height 1711 1712 detections_v51 = [] 1713 for class_id, class_detections in enumerate(data[key]): # Starts with 0 1714 if len(class_detections) > 0: 1715 objects_class = class_ids_and_names[class_id] 1716 for detection in class_detections: 1717 confidence = detection[4] 1718 detection_v51 = fo.Detection( 1719 label=objects_class[1], 1720 bounding_box=[ 1721 detection[0] / img_width, 1722 detection[1] / img_height, 1723 (detection[2] - detection[0]) / img_width, 1724 (detection[3] - detection[1]) / img_height, 1725 ], 1726 confidence=confidence, 1727 ) 1728 if confidence >= detection_threshold: 1729 detections_v51.append(detection_v51) 1730 1731 sample[pred_key] = fo.Detections(detections=detections_v51) 1732 sample.save() 1733 except Exception as e: 1734 logging.error( 1735 f"An error occured during the conversion of Co-DETR inference results to the V51 dataset: {e}" 1736 ) 1737 1738 # Run V51 evaluation 1739 if inference_settings["do_eval"] is True: 1740 eval_key = pred_key.replace("pred_", "eval_").replace("-", "_") 1741 1742 if inference_settings["inference_on_test"] is True: 1743 dataset_view = dataset.match_tags(["test"]) 1744 else: 1745 dataset_view = dataset 1746 1747 logging.info( 1748 f"Starting evaluation for {pred_key} in evaluation key {eval_key}." 1749 ) 1750 dataset_view.evaluate_detections( 1751 pred_key, 1752 gt_field=gt_field, 1753 eval_key=eval_key, 1754 compute_mAP=True, 1755 )
Run inference using trained Co-DETR model and convert results to FiftyOne format