workflows.data_ingest
1import os 2import fiftyone as fo 3import fiftyone.types as fot 4import random 5import shutil 6import fiftyone.utils.yolo as fouy 7import fiftyone.utils.video as fouv 8import logging 9from config.config import WORKFLOWS 10from pathlib import Path 11from ruamel.yaml import YAML 12from ruamel.yaml.scalarstring import DoubleQuotedScalarString as dqs 13import yaml 14 15DATASETS_YAML = Path(__file__).resolve().parent.parent / "config" / "datasets.yaml" 16 17def append_dataset_entry(dataset_name: str, loader_fct: str = "load_custom_dataset"): 18 yaml_path = Path(__file__).resolve().parent.parent / "config" / "datasets.yaml" 19 yaml = YAML() 20 yaml.preserve_quotes = True 21 yaml.indent(sequence=4, offset=2) 22 23 with open(yaml_path, "r") as f: 24 data = yaml.load(f) 25 26 # Check for duplicate names 27 if any(d.get("name") == dataset_name for d in data["datasets"]): 28 logging.info(f"Dataset '{dataset_name}' already exists in datasets.yaml. Skipping append.") 29 return 30 31 new_entry = { 32 "name": dqs(dataset_name), 33 "loader_fct": dqs(loader_fct), 34 "v51_type": dqs("FiftyOneDataset") 35 } 36 37 data["datasets"].append(new_entry) 38 39 with open(yaml_path, "w") as f: 40 yaml.dump(data, f) 41 42 logging.info(f"Appended new dataset entry to datasets.yaml: {dataset_name}") 43 44def detect_format(dataset_dir): 45 files = os.listdir(dataset_dir) 46 files_lower = [f.lower() for f in files] 47 48 # Flatten all files (including nested) up to 2 levels 49 all_files = [] 50 for root, dirs, fs in os.walk(dataset_dir): 51 for f in fs: 52 all_files.append(os.path.join(root, f).lower()) 53 54 # Format detection 55 if any("annotations.xml" in f for f in all_files): 56 return "cvat" 57 elif any(f.endswith(".json") and ("instances" in f or "coco" in f) for f in all_files): 58 return "coco" 59 elif any(f.endswith(".xml") for f in all_files): 60 return "voc" 61 elif any("labels" in f for f in all_files) or any(f.endswith(".txt") for f in all_files): 62 return "yolo" 63 elif any(f.endswith((".mp4", ".avi", ".mov")) for f in all_files): 64 return "video" 65 elif any(f.endswith((".jpg", ".jpeg", ".png")) for f in all_files): 66 return "image_only" 67 else: 68 raise ValueError("Unable to auto-detect dataset format.") 69 70 71def get_dataset_type(fmt): 72 """Map format string to FiftyOne dataset type or string indicators.""" 73 if fmt == "coco": 74 return fot.COCODetectionDataset 75 elif fmt == "voc": 76 return fot.VOCDetectionDataset 77 elif fmt == "yolo": 78 return fot.YOLOv5Dataset 79 elif fmt == "video": 80 return "video" 81 elif fmt == "image_only": 82 return "image_only" 83 elif fmt == "cvat": 84 return fot.CVATImageDataset 85 else: 86 raise ValueError(f"Unsupported annotation_format: {fmt}") 87 88 89 90def run_data_ingest(): 91 config = WORKFLOWS["data_ingest"] 92 base_name = config["dataset_name"] 93 dataset_dir = config["dataset_dir"] 94 split = config.get("split_percentages", [0.7, 0.15, 0.15]) 95 fmt = config["annotation_format"] 96 fps = config["fps"] 97 98 if fmt == "auto": 99 fmt = detect_format(dataset_dir) 100 101 dataset_type = get_dataset_type(fmt) 102 103 existing = fo.list_datasets() 104 105 i = 1 106 while f"{base_name}{i}" in existing: 107 i += 1 108 dataset_name = f"{base_name}{i}" 109 110 logging.info(f"Ingesting dataset: {dataset_name}") 111 logging.info(f"Detected format: {fmt}") 112 logging.info(f"Loading from: {dataset_dir}") 113 114 if dataset_type == "video": 115 116 logging.info(f"Converting videos in {dataset_dir} to frames at {fps} FPS...") 117 118 # Create temp dataset from videos 119 video_dataset = fo.Dataset.from_videos_dir(dataset_dir, name=f"{dataset_name}_video_temp") 120 121 # Define frame sampling output directory 122 frames_dir = os.path.join(dataset_dir, "extracted_frames") 123 os.makedirs(frames_dir, exist_ok=True) 124 125 # Sample at 1 FPS using FFmpeg 126 fouv.sample_videos( 127 video_dataset, 128 fps=fps, 129 output_dir=frames_dir, 130 original_frame_numbers=False, 131 force_sample=True, 132 verbose=False, 133 progress=True, 134 ) 135 136 logging.info(f"Sampled frames stored at {frames_dir}") 137 138 # Now load as image-only dataset 139 dataset = fo.Dataset.from_images_dir(frames_dir, name=dataset_name) 140 141 temp_name = f"{dataset_name}_video_temp" 142 if temp_name in fo.list_datasets(): 143 fo.delete_dataset(temp_name) 144 logging.info(f"Deleted temporary dataset: {temp_name}") 145 146 elif dataset_type == "image_only": 147 dataset = fo.Dataset.from_images_dir(dataset_dir, name = dataset_name) 148 elif fmt == "coco": 149 # Fast path: Single JSON + single image folder (any name) 150 flat_jsons = [f for f in os.listdir(dataset_dir) if f.endswith(".json")] 151 image_dirs = [ 152 os.path.join(dataset_dir, d) 153 for d in os.listdir(dataset_dir) 154 if os.path.isdir(os.path.join(dataset_dir, d)) and d.lower() != "annotations" 155 ] 156 157 if len(flat_jsons) == 1 and len(image_dirs) == 1: 158 image_dir = image_dirs[0] 159 logging.info(f"Using {image_dir} for all splits (defaulted to 'train')") 160 161 dataset = fo.Dataset.from_dir( 162 dataset_type=fot.COCODetectionDataset, 163 data_path=image_dir, 164 labels_path=os.path.join(dataset_dir, flat_jsons[0]), 165 name=dataset_name, 166 ) 167 168 dataset.tag_samples("train") 169 logging.info(f"Loaded {len(dataset)} samples from flat COCO directory") 170 dataset.persistent = True 171 172 else: 173 annotations_dir = os.path.join(dataset_dir, "annotations") 174 json_search_dir = annotations_dir if os.path.exists(annotations_dir) else dataset_dir 175 176 # Find all COCO-style annotation files 177 possible_jsons = [ 178 f for f in os.listdir(json_search_dir) 179 if f.endswith(".json") or ("instances" in f or "annotation" in f) 180 ] 181 182 if not possible_jsons: 183 raise ValueError(f"No COCO-style annotation JSON files found in '{json_search_dir}'") 184 185 # Find available image directories with common split names 186 available_dirs = { 187 split: os.path.join(dataset_dir, d) 188 for d in os.listdir(dataset_dir) 189 if os.path.isdir(os.path.join(dataset_dir, d)) 190 for split in ["train", "val", "test"] 191 if split in d.lower() 192 } 193 194 dataset = fo.Dataset(name=dataset_name) 195 196 for json_file in possible_jsons: 197 json_lower = json_file.lower() 198 if "train" in json_lower: 199 split_name = "train" 200 elif "val" in json_lower: 201 split_name = "val" 202 elif "test" in json_lower: 203 split_name = "test" 204 else: 205 split_name = "unlabeled" 206 207 # Match to image folder 208 data_path = available_dirs.get(split_name) 209 if not data_path: 210 logging.info(f"Skipping '{split_name}' — No image folder found for '{json_file}'") 211 continue 212 213 dataset_split = fo.Dataset.from_dir( 214 dataset_type=fot.COCODetectionDataset, 215 data_path=data_path, 216 labels_path=os.path.join(json_search_dir, json_file), 217 name=f"{dataset_name}_{split_name}", 218 ) 219 220 sample_ids = dataset.add_samples(dataset_split) 221 dataset.select(sample_ids).tag_samples(split_name) 222 logging.info(f"Loaded {len(sample_ids)} samples for split '{split_name}'") 223 224 elif fmt == "cvat": 225 dataset = fo.Dataset.from_dir( 226 dataset_type=fot.CVATImageDataset, 227 data_path=os.path.join(dataset_dir, "data"), 228 labels_path=os.path.join(dataset_dir, "annotations.xml"), 229 name=dataset_name, 230 ) 231 else: 232 if fmt == "yolo": 233 # Check for dataset.yaml 234 yaml_path = os.path.join(dataset_dir, "dataset.yaml") 235 if os.path.exists(yaml_path): 236 dataset = fo.Dataset(name=dataset_name) 237 split_names = ["train", "val", "test"] 238 loaded_splits = {} 239 240 with open(yaml_path) as f: 241 yaml_content = f.read() 242 243 for split_name in split_names: 244 if f"{split_name}:" not in yaml_content: 245 logging.info(f"Skipping split '{split_name}' — not found in dataset.yaml") 246 continue 247 248 try: 249 importer = fouy.YOLOv5DatasetImporter( 250 dataset_dir=dataset_dir, 251 yaml_path=yaml_path, 252 split=split_name, 253 ) 254 255 sample_ids = dataset.add_importer(importer, label_field="ground_truth") 256 257 if not sample_ids: 258 logging.infot(f"No samples loaded for split '{split_name}'") 259 continue 260 261 dataset.select(sample_ids).tag_samples(split_name) 262 loaded_splits[split_name] = sample_ids 263 264 logging.info(f"Loaded {len(sample_ids)} samples for split '{split_name}'") 265 except Exception as e: 266 logging.info(f"Warning: Failed to load split '{split_name}': {e}") 267 268 269 270 # Fallback logic if val or test is missing 271 if "val" not in loaded_splits and "test" in loaded_splits: 272 test_samples = loaded_splits["test"] 273 midpoint = len(test_samples) // 2 274 275 dataset.select(test_samples).untag_samples("test") 276 277 dataset.select(test_samples[:midpoint]).tag_samples("val") 278 dataset.select(test_samples[midpoint:]).tag_samples("test") 279 logging.info("Split test → val/test 50-50") 280 281 elif "test" not in loaded_splits and "val" in loaded_splits: 282 val_samples = loaded_splits["val"] 283 midpoint = len(val_samples) // 2 284 285 dataset.select(val_samples).untag_samples("val") 286 287 dataset.select(val_samples[:midpoint]).tag_samples("val") 288 dataset.select(val_samples[midpoint:]).tag_samples("test") 289 logging.info("Split val → val/test 50-50") 290 291 elif "val" not in loaded_splits and "test" not in loaded_splits and "train" in loaded_splits: 292 all_train_samples = loaded_splits["train"] 293 dataset.select(all_train_samples).untag_samples("train")# clear tag first 294 random.seed(51) 295 random.shuffle(all_train_samples) 296 297 n = len(all_train_samples) 298 s_train, s_val, s_test = split 299 n_train = int(s_train * n) 300 n_val = int(s_val * n) 301 302 # Get sample IDs 303 dataset.select(all_train_samples[:n_train]).tag_samples("train") 304 dataset.select(all_train_samples[ n_train : n_train + n_val]).tag_samples("val") 305 dataset.select(all_train_samples[ n_train + n_val:]).tag_samples("test") 306 307 logging.info(f"Split train → train/val/test with {split} proportion") 308 309 logging.info(f"Final split counts: {dataset.count_sample_tags()}") 310 311 else: 312 dataset = fo.Dataset.from_dir( 313 dataset_dir=dataset_dir, 314 dataset_type=dataset_type, 315 name=dataset_name, 316 ) 317 318 319 dataset.persistent = True 320 321 # Move 'detections' to 'ground_truth' and remove 'detections' 322 for sample in dataset: 323 if sample.has_field("detections"): 324 sample["ground_truth"] = sample["detections"] 325 sample.clear_field("detections") 326 sample.save() 327 328 # Also remove 'detections' from schema if it exists 329 if "detections" in dataset.get_field_schema(): 330 dataset.delete_sample_field("detections") 331 332 if fmt == "yolo" and os.path.exists(os.path.join(dataset_dir, "dataset.yaml")): 333 pass 334 else: 335 # Clear all existing split tags before reassigning 336 for tag in ["train", "val", "test"]: 337 dataset.match_tags(tag).untag_samples(tag) 338 339 dataset.shuffle(seed=51) 340 341 n = len(dataset) 342 s_train, s_val, s_test = split 343 344 n_train = int(s_train * n) 345 remaining = n - n_train 346 n_val = int(remaining * (s_val / (s_val + s_test))) 347 n_test = remaining - n_val 348 349 350 # Apply tag-based split 351 dataset[:n_train].tag_samples("train") 352 dataset[n_train:n_train + n_val].tag_samples("val") 353 dataset[n_train + n_val:].tag_samples("test") 354 355 logging.info(f"Split applied: train {n_train}, val {n_val}, test {n_test}") 356 logging.info(f"Dataset '{dataset_name}' ingested with {len(dataset)} samples") 357 358 append_dataset_entry(dataset_name)
DATASETS_YAML =
PosixPath('/home/runner/work/mcity_data_engine/mcity_data_engine/config/datasets.yaml')
def
append_dataset_entry(dataset_name: str, loader_fct: str = 'load_custom_dataset'):
18def append_dataset_entry(dataset_name: str, loader_fct: str = "load_custom_dataset"): 19 yaml_path = Path(__file__).resolve().parent.parent / "config" / "datasets.yaml" 20 yaml = YAML() 21 yaml.preserve_quotes = True 22 yaml.indent(sequence=4, offset=2) 23 24 with open(yaml_path, "r") as f: 25 data = yaml.load(f) 26 27 # Check for duplicate names 28 if any(d.get("name") == dataset_name for d in data["datasets"]): 29 logging.info(f"Dataset '{dataset_name}' already exists in datasets.yaml. Skipping append.") 30 return 31 32 new_entry = { 33 "name": dqs(dataset_name), 34 "loader_fct": dqs(loader_fct), 35 "v51_type": dqs("FiftyOneDataset") 36 } 37 38 data["datasets"].append(new_entry) 39 40 with open(yaml_path, "w") as f: 41 yaml.dump(data, f) 42 43 logging.info(f"Appended new dataset entry to datasets.yaml: {dataset_name}")
def
detect_format(dataset_dir):
45def detect_format(dataset_dir): 46 files = os.listdir(dataset_dir) 47 files_lower = [f.lower() for f in files] 48 49 # Flatten all files (including nested) up to 2 levels 50 all_files = [] 51 for root, dirs, fs in os.walk(dataset_dir): 52 for f in fs: 53 all_files.append(os.path.join(root, f).lower()) 54 55 # Format detection 56 if any("annotations.xml" in f for f in all_files): 57 return "cvat" 58 elif any(f.endswith(".json") and ("instances" in f or "coco" in f) for f in all_files): 59 return "coco" 60 elif any(f.endswith(".xml") for f in all_files): 61 return "voc" 62 elif any("labels" in f for f in all_files) or any(f.endswith(".txt") for f in all_files): 63 return "yolo" 64 elif any(f.endswith((".mp4", ".avi", ".mov")) for f in all_files): 65 return "video" 66 elif any(f.endswith((".jpg", ".jpeg", ".png")) for f in all_files): 67 return "image_only" 68 else: 69 raise ValueError("Unable to auto-detect dataset format.")
def
get_dataset_type(fmt):
72def get_dataset_type(fmt): 73 """Map format string to FiftyOne dataset type or string indicators.""" 74 if fmt == "coco": 75 return fot.COCODetectionDataset 76 elif fmt == "voc": 77 return fot.VOCDetectionDataset 78 elif fmt == "yolo": 79 return fot.YOLOv5Dataset 80 elif fmt == "video": 81 return "video" 82 elif fmt == "image_only": 83 return "image_only" 84 elif fmt == "cvat": 85 return fot.CVATImageDataset 86 else: 87 raise ValueError(f"Unsupported annotation_format: {fmt}")
Map format string to FiftyOne dataset type or string indicators.
def
run_data_ingest():
91def run_data_ingest(): 92 config = WORKFLOWS["data_ingest"] 93 base_name = config["dataset_name"] 94 dataset_dir = config["dataset_dir"] 95 split = config.get("split_percentages", [0.7, 0.15, 0.15]) 96 fmt = config["annotation_format"] 97 fps = config["fps"] 98 99 if fmt == "auto": 100 fmt = detect_format(dataset_dir) 101 102 dataset_type = get_dataset_type(fmt) 103 104 existing = fo.list_datasets() 105 106 i = 1 107 while f"{base_name}{i}" in existing: 108 i += 1 109 dataset_name = f"{base_name}{i}" 110 111 logging.info(f"Ingesting dataset: {dataset_name}") 112 logging.info(f"Detected format: {fmt}") 113 logging.info(f"Loading from: {dataset_dir}") 114 115 if dataset_type == "video": 116 117 logging.info(f"Converting videos in {dataset_dir} to frames at {fps} FPS...") 118 119 # Create temp dataset from videos 120 video_dataset = fo.Dataset.from_videos_dir(dataset_dir, name=f"{dataset_name}_video_temp") 121 122 # Define frame sampling output directory 123 frames_dir = os.path.join(dataset_dir, "extracted_frames") 124 os.makedirs(frames_dir, exist_ok=True) 125 126 # Sample at 1 FPS using FFmpeg 127 fouv.sample_videos( 128 video_dataset, 129 fps=fps, 130 output_dir=frames_dir, 131 original_frame_numbers=False, 132 force_sample=True, 133 verbose=False, 134 progress=True, 135 ) 136 137 logging.info(f"Sampled frames stored at {frames_dir}") 138 139 # Now load as image-only dataset 140 dataset = fo.Dataset.from_images_dir(frames_dir, name=dataset_name) 141 142 temp_name = f"{dataset_name}_video_temp" 143 if temp_name in fo.list_datasets(): 144 fo.delete_dataset(temp_name) 145 logging.info(f"Deleted temporary dataset: {temp_name}") 146 147 elif dataset_type == "image_only": 148 dataset = fo.Dataset.from_images_dir(dataset_dir, name = dataset_name) 149 elif fmt == "coco": 150 # Fast path: Single JSON + single image folder (any name) 151 flat_jsons = [f for f in os.listdir(dataset_dir) if f.endswith(".json")] 152 image_dirs = [ 153 os.path.join(dataset_dir, d) 154 for d in os.listdir(dataset_dir) 155 if os.path.isdir(os.path.join(dataset_dir, d)) and d.lower() != "annotations" 156 ] 157 158 if len(flat_jsons) == 1 and len(image_dirs) == 1: 159 image_dir = image_dirs[0] 160 logging.info(f"Using {image_dir} for all splits (defaulted to 'train')") 161 162 dataset = fo.Dataset.from_dir( 163 dataset_type=fot.COCODetectionDataset, 164 data_path=image_dir, 165 labels_path=os.path.join(dataset_dir, flat_jsons[0]), 166 name=dataset_name, 167 ) 168 169 dataset.tag_samples("train") 170 logging.info(f"Loaded {len(dataset)} samples from flat COCO directory") 171 dataset.persistent = True 172 173 else: 174 annotations_dir = os.path.join(dataset_dir, "annotations") 175 json_search_dir = annotations_dir if os.path.exists(annotations_dir) else dataset_dir 176 177 # Find all COCO-style annotation files 178 possible_jsons = [ 179 f for f in os.listdir(json_search_dir) 180 if f.endswith(".json") or ("instances" in f or "annotation" in f) 181 ] 182 183 if not possible_jsons: 184 raise ValueError(f"No COCO-style annotation JSON files found in '{json_search_dir}'") 185 186 # Find available image directories with common split names 187 available_dirs = { 188 split: os.path.join(dataset_dir, d) 189 for d in os.listdir(dataset_dir) 190 if os.path.isdir(os.path.join(dataset_dir, d)) 191 for split in ["train", "val", "test"] 192 if split in d.lower() 193 } 194 195 dataset = fo.Dataset(name=dataset_name) 196 197 for json_file in possible_jsons: 198 json_lower = json_file.lower() 199 if "train" in json_lower: 200 split_name = "train" 201 elif "val" in json_lower: 202 split_name = "val" 203 elif "test" in json_lower: 204 split_name = "test" 205 else: 206 split_name = "unlabeled" 207 208 # Match to image folder 209 data_path = available_dirs.get(split_name) 210 if not data_path: 211 logging.info(f"Skipping '{split_name}' — No image folder found for '{json_file}'") 212 continue 213 214 dataset_split = fo.Dataset.from_dir( 215 dataset_type=fot.COCODetectionDataset, 216 data_path=data_path, 217 labels_path=os.path.join(json_search_dir, json_file), 218 name=f"{dataset_name}_{split_name}", 219 ) 220 221 sample_ids = dataset.add_samples(dataset_split) 222 dataset.select(sample_ids).tag_samples(split_name) 223 logging.info(f"Loaded {len(sample_ids)} samples for split '{split_name}'") 224 225 elif fmt == "cvat": 226 dataset = fo.Dataset.from_dir( 227 dataset_type=fot.CVATImageDataset, 228 data_path=os.path.join(dataset_dir, "data"), 229 labels_path=os.path.join(dataset_dir, "annotations.xml"), 230 name=dataset_name, 231 ) 232 else: 233 if fmt == "yolo": 234 # Check for dataset.yaml 235 yaml_path = os.path.join(dataset_dir, "dataset.yaml") 236 if os.path.exists(yaml_path): 237 dataset = fo.Dataset(name=dataset_name) 238 split_names = ["train", "val", "test"] 239 loaded_splits = {} 240 241 with open(yaml_path) as f: 242 yaml_content = f.read() 243 244 for split_name in split_names: 245 if f"{split_name}:" not in yaml_content: 246 logging.info(f"Skipping split '{split_name}' — not found in dataset.yaml") 247 continue 248 249 try: 250 importer = fouy.YOLOv5DatasetImporter( 251 dataset_dir=dataset_dir, 252 yaml_path=yaml_path, 253 split=split_name, 254 ) 255 256 sample_ids = dataset.add_importer(importer, label_field="ground_truth") 257 258 if not sample_ids: 259 logging.infot(f"No samples loaded for split '{split_name}'") 260 continue 261 262 dataset.select(sample_ids).tag_samples(split_name) 263 loaded_splits[split_name] = sample_ids 264 265 logging.info(f"Loaded {len(sample_ids)} samples for split '{split_name}'") 266 except Exception as e: 267 logging.info(f"Warning: Failed to load split '{split_name}': {e}") 268 269 270 271 # Fallback logic if val or test is missing 272 if "val" not in loaded_splits and "test" in loaded_splits: 273 test_samples = loaded_splits["test"] 274 midpoint = len(test_samples) // 2 275 276 dataset.select(test_samples).untag_samples("test") 277 278 dataset.select(test_samples[:midpoint]).tag_samples("val") 279 dataset.select(test_samples[midpoint:]).tag_samples("test") 280 logging.info("Split test → val/test 50-50") 281 282 elif "test" not in loaded_splits and "val" in loaded_splits: 283 val_samples = loaded_splits["val"] 284 midpoint = len(val_samples) // 2 285 286 dataset.select(val_samples).untag_samples("val") 287 288 dataset.select(val_samples[:midpoint]).tag_samples("val") 289 dataset.select(val_samples[midpoint:]).tag_samples("test") 290 logging.info("Split val → val/test 50-50") 291 292 elif "val" not in loaded_splits and "test" not in loaded_splits and "train" in loaded_splits: 293 all_train_samples = loaded_splits["train"] 294 dataset.select(all_train_samples).untag_samples("train")# clear tag first 295 random.seed(51) 296 random.shuffle(all_train_samples) 297 298 n = len(all_train_samples) 299 s_train, s_val, s_test = split 300 n_train = int(s_train * n) 301 n_val = int(s_val * n) 302 303 # Get sample IDs 304 dataset.select(all_train_samples[:n_train]).tag_samples("train") 305 dataset.select(all_train_samples[ n_train : n_train + n_val]).tag_samples("val") 306 dataset.select(all_train_samples[ n_train + n_val:]).tag_samples("test") 307 308 logging.info(f"Split train → train/val/test with {split} proportion") 309 310 logging.info(f"Final split counts: {dataset.count_sample_tags()}") 311 312 else: 313 dataset = fo.Dataset.from_dir( 314 dataset_dir=dataset_dir, 315 dataset_type=dataset_type, 316 name=dataset_name, 317 ) 318 319 320 dataset.persistent = True 321 322 # Move 'detections' to 'ground_truth' and remove 'detections' 323 for sample in dataset: 324 if sample.has_field("detections"): 325 sample["ground_truth"] = sample["detections"] 326 sample.clear_field("detections") 327 sample.save() 328 329 # Also remove 'detections' from schema if it exists 330 if "detections" in dataset.get_field_schema(): 331 dataset.delete_sample_field("detections") 332 333 if fmt == "yolo" and os.path.exists(os.path.join(dataset_dir, "dataset.yaml")): 334 pass 335 else: 336 # Clear all existing split tags before reassigning 337 for tag in ["train", "val", "test"]: 338 dataset.match_tags(tag).untag_samples(tag) 339 340 dataset.shuffle(seed=51) 341 342 n = len(dataset) 343 s_train, s_val, s_test = split 344 345 n_train = int(s_train * n) 346 remaining = n - n_train 347 n_val = int(remaining * (s_val / (s_val + s_test))) 348 n_test = remaining - n_val 349 350 351 # Apply tag-based split 352 dataset[:n_train].tag_samples("train") 353 dataset[n_train:n_train + n_val].tag_samples("val") 354 dataset[n_train + n_val:].tag_samples("test") 355 356 logging.info(f"Split applied: train {n_train}, val {n_val}, test {n_test}") 357 logging.info(f"Dataset '{dataset_name}' ingested with {len(dataset)} samples") 358 359 append_dataset_entry(dataset_name)