workflows.data_ingest

  1import os
  2import fiftyone as fo
  3import fiftyone.types as fot
  4import random
  5import shutil
  6import fiftyone.utils.yolo as fouy
  7import fiftyone.utils.video as fouv
  8import logging
  9from config.config import WORKFLOWS
 10from pathlib import Path
 11from ruamel.yaml import YAML
 12from ruamel.yaml.scalarstring import DoubleQuotedScalarString as dqs
 13import yaml
 14
 15DATASETS_YAML = Path(__file__).resolve().parent.parent / "config" / "datasets.yaml"
 16
 17def append_dataset_entry(dataset_name: str, loader_fct: str = "load_custom_dataset"):
 18    yaml_path = Path(__file__).resolve().parent.parent / "config" / "datasets.yaml"
 19    yaml = YAML()
 20    yaml.preserve_quotes = True
 21    yaml.indent(sequence=4, offset=2)
 22
 23    with open(yaml_path, "r") as f:
 24        data = yaml.load(f)
 25
 26    # Check for duplicate names
 27    if any(d.get("name") == dataset_name for d in data["datasets"]):
 28        logging.info(f"Dataset '{dataset_name}' already exists in datasets.yaml. Skipping append.")
 29        return
 30
 31    new_entry = {
 32        "name": dqs(dataset_name),
 33        "loader_fct": dqs(loader_fct),
 34        "v51_type": dqs("FiftyOneDataset")
 35    }
 36
 37    data["datasets"].append(new_entry)
 38
 39    with open(yaml_path, "w") as f:
 40        yaml.dump(data, f)
 41
 42    logging.info(f"Appended new dataset entry to datasets.yaml: {dataset_name}")
 43
 44def detect_format(dataset_dir):
 45    files = os.listdir(dataset_dir)
 46    files_lower = [f.lower() for f in files]
 47
 48    # Flatten all files (including nested) up to 2 levels
 49    all_files = []
 50    for root, dirs, fs in os.walk(dataset_dir):
 51        for f in fs:
 52            all_files.append(os.path.join(root, f).lower())
 53
 54    # Format detection
 55    if any("annotations.xml" in f for f in all_files):
 56        return "cvat"
 57    elif any(f.endswith(".json") and ("instances" in f or "coco" in f) for f in all_files):
 58        return "coco"
 59    elif any(f.endswith(".xml") for f in all_files):
 60        return "voc"
 61    elif any("labels" in f for f in all_files) or any(f.endswith(".txt") for f in all_files):
 62        return "yolo"
 63    elif any(f.endswith((".mp4", ".avi", ".mov")) for f in all_files):
 64        return "video"
 65    elif any(f.endswith((".jpg", ".jpeg", ".png")) for f in all_files):
 66        return "image_only"
 67    else:
 68        raise ValueError("Unable to auto-detect dataset format.")
 69
 70
 71def get_dataset_type(fmt):
 72    """Map format string to FiftyOne dataset type or string indicators."""
 73    if fmt == "coco":
 74        return fot.COCODetectionDataset
 75    elif fmt == "voc":
 76        return fot.VOCDetectionDataset
 77    elif fmt == "yolo":
 78        return fot.YOLOv5Dataset
 79    elif fmt == "video":
 80        return "video"
 81    elif fmt == "image_only":
 82        return "image_only"
 83    elif fmt == "cvat":
 84        return fot.CVATImageDataset
 85    else:
 86        raise ValueError(f"Unsupported annotation_format: {fmt}")
 87
 88
 89
 90def run_data_ingest():
 91    config = WORKFLOWS["data_ingest"]
 92    base_name = config["dataset_name"]
 93    dataset_dir = config["dataset_dir"]
 94    split = config.get("split_percentages", [0.7, 0.15, 0.15])
 95    fmt = config["annotation_format"]
 96    fps = config["fps"]
 97
 98    if fmt == "auto":
 99        fmt = detect_format(dataset_dir)
100
101    dataset_type = get_dataset_type(fmt)
102
103    existing = fo.list_datasets()
104
105    i = 1
106    while f"{base_name}{i}" in existing:
107        i += 1
108    dataset_name = f"{base_name}{i}"
109
110    logging.info(f"Ingesting dataset: {dataset_name}")
111    logging.info(f"Detected format: {fmt}")
112    logging.info(f"Loading from: {dataset_dir}")
113
114    if dataset_type == "video":
115
116        logging.info(f"Converting videos in {dataset_dir} to frames at {fps} FPS...")
117
118        # Create temp dataset from videos
119        video_dataset = fo.Dataset.from_videos_dir(dataset_dir, name=f"{dataset_name}_video_temp")
120
121        # Define frame sampling output directory
122        frames_dir = os.path.join(dataset_dir, "extracted_frames")
123        os.makedirs(frames_dir, exist_ok=True)
124
125        # Sample at 1 FPS using FFmpeg
126        fouv.sample_videos(
127            video_dataset,
128            fps=fps,
129            output_dir=frames_dir,
130            original_frame_numbers=False,
131            force_sample=True,
132            verbose=False,
133            progress=True,
134        )
135
136        logging.info(f"Sampled frames stored at {frames_dir}")
137
138        # Now load as image-only dataset
139        dataset = fo.Dataset.from_images_dir(frames_dir, name=dataset_name)
140
141        temp_name = f"{dataset_name}_video_temp"
142        if temp_name in fo.list_datasets():
143            fo.delete_dataset(temp_name)
144            logging.info(f"Deleted temporary dataset: {temp_name}")
145
146    elif dataset_type == "image_only":
147        dataset = fo.Dataset.from_images_dir(dataset_dir, name = dataset_name)
148    elif fmt == "coco":
149        # Fast path: Single JSON + single image folder (any name)
150        flat_jsons = [f for f in os.listdir(dataset_dir) if f.endswith(".json")]
151        image_dirs = [
152            os.path.join(dataset_dir, d)
153            for d in os.listdir(dataset_dir)
154            if os.path.isdir(os.path.join(dataset_dir, d)) and d.lower() != "annotations"
155        ]
156
157        if len(flat_jsons) == 1 and len(image_dirs) == 1:
158            image_dir = image_dirs[0]
159            logging.info(f"Using {image_dir} for all splits (defaulted to 'train')")
160
161            dataset = fo.Dataset.from_dir(
162                dataset_type=fot.COCODetectionDataset,
163                data_path=image_dir,
164                labels_path=os.path.join(dataset_dir, flat_jsons[0]),
165                name=dataset_name,
166            )
167
168            dataset.tag_samples("train")
169            logging.info(f"Loaded {len(dataset)} samples from flat COCO directory")
170            dataset.persistent = True
171
172        else:
173            annotations_dir = os.path.join(dataset_dir, "annotations")
174            json_search_dir = annotations_dir if os.path.exists(annotations_dir) else dataset_dir
175
176            # Find all COCO-style annotation files
177            possible_jsons = [
178                f for f in os.listdir(json_search_dir)
179                if f.endswith(".json") or ("instances" in f or "annotation" in f)
180            ]
181
182            if not possible_jsons:
183                raise ValueError(f"No COCO-style annotation JSON files found in '{json_search_dir}'")
184
185            # Find available image directories with common split names
186            available_dirs = {
187                split: os.path.join(dataset_dir, d)
188                for d in os.listdir(dataset_dir)
189                if os.path.isdir(os.path.join(dataset_dir, d))
190                for split in ["train", "val", "test"]
191                if split in d.lower()
192            }
193
194            dataset = fo.Dataset(name=dataset_name)
195
196            for json_file in possible_jsons:
197                json_lower = json_file.lower()
198                if "train" in json_lower:
199                    split_name = "train"
200                elif "val" in json_lower:
201                    split_name = "val"
202                elif "test" in json_lower:
203                    split_name = "test"
204                else:
205                    split_name = "unlabeled"
206
207                # Match to image folder
208                data_path = available_dirs.get(split_name)
209                if not data_path:
210                    logging.info(f"Skipping '{split_name}' — No image folder found for '{json_file}'")
211                    continue
212
213                dataset_split = fo.Dataset.from_dir(
214                    dataset_type=fot.COCODetectionDataset,
215                    data_path=data_path,
216                    labels_path=os.path.join(json_search_dir, json_file),
217                    name=f"{dataset_name}_{split_name}",
218                )
219
220                sample_ids = dataset.add_samples(dataset_split)
221                dataset.select(sample_ids).tag_samples(split_name)
222                logging.info(f"Loaded {len(sample_ids)} samples for split '{split_name}'")
223
224    elif fmt == "cvat":
225        dataset = fo.Dataset.from_dir(
226            dataset_type=fot.CVATImageDataset,
227            data_path=os.path.join(dataset_dir, "data"),
228            labels_path=os.path.join(dataset_dir, "annotations.xml"),
229            name=dataset_name,
230        )
231    else:
232        if fmt == "yolo":
233            # Check for dataset.yaml
234            yaml_path = os.path.join(dataset_dir, "dataset.yaml")
235            if os.path.exists(yaml_path):
236                dataset = fo.Dataset(name=dataset_name)
237                split_names = ["train", "val", "test"]
238                loaded_splits = {}
239
240                with open(yaml_path) as f:
241                    yaml_content = f.read()
242
243                for split_name in split_names:
244                    if f"{split_name}:" not in yaml_content:
245                        logging.info(f"Skipping split '{split_name}' — not found in dataset.yaml")
246                        continue
247
248                    try:
249                        importer = fouy.YOLOv5DatasetImporter(
250                            dataset_dir=dataset_dir,
251                            yaml_path=yaml_path,
252                            split=split_name,
253                        )
254
255                        sample_ids = dataset.add_importer(importer, label_field="ground_truth")
256
257                        if not sample_ids:
258                            logging.infot(f"No samples loaded for split '{split_name}'")
259                            continue
260
261                        dataset.select(sample_ids).tag_samples(split_name)
262                        loaded_splits[split_name] = sample_ids
263
264                        logging.info(f"Loaded {len(sample_ids)} samples for split '{split_name}'")
265                    except Exception as e:
266                        logging.info(f"Warning: Failed to load split '{split_name}': {e}")
267
268
269
270                # Fallback logic if val or test is missing
271                if "val" not in loaded_splits and "test" in loaded_splits:
272                    test_samples = loaded_splits["test"]
273                    midpoint = len(test_samples) // 2
274
275                    dataset.select(test_samples).untag_samples("test")
276
277                    dataset.select(test_samples[:midpoint]).tag_samples("val")
278                    dataset.select(test_samples[midpoint:]).tag_samples("test")
279                    logging.info("Split test → val/test 50-50")
280
281                elif "test" not in loaded_splits and "val" in loaded_splits:
282                    val_samples = loaded_splits["val"]
283                    midpoint = len(val_samples) // 2
284
285                    dataset.select(val_samples).untag_samples("val")
286
287                    dataset.select(val_samples[:midpoint]).tag_samples("val")
288                    dataset.select(val_samples[midpoint:]).tag_samples("test")
289                    logging.info("Split val → val/test 50-50")
290
291                elif "val" not in loaded_splits and "test" not in loaded_splits and "train" in loaded_splits:
292                    all_train_samples = loaded_splits["train"]
293                    dataset.select(all_train_samples).untag_samples("train")# clear tag first
294                    random.seed(51)
295                    random.shuffle(all_train_samples)
296
297                    n = len(all_train_samples)
298                    s_train, s_val, s_test = split
299                    n_train = int(s_train * n)
300                    n_val = int(s_val * n)
301
302                    # Get sample IDs
303                    dataset.select(all_train_samples[:n_train]).tag_samples("train")
304                    dataset.select(all_train_samples[ n_train : n_train + n_val]).tag_samples("val")
305                    dataset.select(all_train_samples[ n_train + n_val:]).tag_samples("test")
306
307                    logging.info(f"Split train → train/val/test with {split} proportion")
308
309                logging.info(f"Final split counts: {dataset.count_sample_tags()}")
310
311        else:
312            dataset = fo.Dataset.from_dir(
313                dataset_dir=dataset_dir,
314                dataset_type=dataset_type,
315                name=dataset_name,
316            )
317
318
319    dataset.persistent = True
320
321    # Move 'detections' to 'ground_truth' and remove 'detections'
322    for sample in dataset:
323        if sample.has_field("detections"):
324            sample["ground_truth"] = sample["detections"]
325            sample.clear_field("detections")
326            sample.save()
327
328    # Also remove 'detections' from schema if it exists
329    if "detections" in dataset.get_field_schema():
330        dataset.delete_sample_field("detections")
331
332    if fmt == "yolo" and os.path.exists(os.path.join(dataset_dir, "dataset.yaml")):
333        pass
334    else:
335        # Clear all existing split tags before reassigning
336        for tag in ["train", "val", "test"]:
337            dataset.match_tags(tag).untag_samples(tag)
338
339        dataset.shuffle(seed=51)
340
341        n = len(dataset)
342        s_train, s_val, s_test = split
343
344        n_train = int(s_train * n)
345        remaining = n - n_train
346        n_val = int(remaining * (s_val / (s_val + s_test)))
347        n_test = remaining - n_val
348
349
350        # Apply tag-based split
351        dataset[:n_train].tag_samples("train")
352        dataset[n_train:n_train + n_val].tag_samples("val")
353        dataset[n_train + n_val:].tag_samples("test")
354
355        logging.info(f"Split applied: train {n_train}, val {n_val}, test {n_test}")
356    logging.info(f"Dataset '{dataset_name}' ingested with {len(dataset)} samples")
357
358    append_dataset_entry(dataset_name)
DATASETS_YAML = PosixPath('/home/runner/work/mcity_data_engine/mcity_data_engine/config/datasets.yaml')
def append_dataset_entry(dataset_name: str, loader_fct: str = 'load_custom_dataset'):
18def append_dataset_entry(dataset_name: str, loader_fct: str = "load_custom_dataset"):
19    yaml_path = Path(__file__).resolve().parent.parent / "config" / "datasets.yaml"
20    yaml = YAML()
21    yaml.preserve_quotes = True
22    yaml.indent(sequence=4, offset=2)
23
24    with open(yaml_path, "r") as f:
25        data = yaml.load(f)
26
27    # Check for duplicate names
28    if any(d.get("name") == dataset_name for d in data["datasets"]):
29        logging.info(f"Dataset '{dataset_name}' already exists in datasets.yaml. Skipping append.")
30        return
31
32    new_entry = {
33        "name": dqs(dataset_name),
34        "loader_fct": dqs(loader_fct),
35        "v51_type": dqs("FiftyOneDataset")
36    }
37
38    data["datasets"].append(new_entry)
39
40    with open(yaml_path, "w") as f:
41        yaml.dump(data, f)
42
43    logging.info(f"Appended new dataset entry to datasets.yaml: {dataset_name}")
def detect_format(dataset_dir):
45def detect_format(dataset_dir):
46    files = os.listdir(dataset_dir)
47    files_lower = [f.lower() for f in files]
48
49    # Flatten all files (including nested) up to 2 levels
50    all_files = []
51    for root, dirs, fs in os.walk(dataset_dir):
52        for f in fs:
53            all_files.append(os.path.join(root, f).lower())
54
55    # Format detection
56    if any("annotations.xml" in f for f in all_files):
57        return "cvat"
58    elif any(f.endswith(".json") and ("instances" in f or "coco" in f) for f in all_files):
59        return "coco"
60    elif any(f.endswith(".xml") for f in all_files):
61        return "voc"
62    elif any("labels" in f for f in all_files) or any(f.endswith(".txt") for f in all_files):
63        return "yolo"
64    elif any(f.endswith((".mp4", ".avi", ".mov")) for f in all_files):
65        return "video"
66    elif any(f.endswith((".jpg", ".jpeg", ".png")) for f in all_files):
67        return "image_only"
68    else:
69        raise ValueError("Unable to auto-detect dataset format.")
def get_dataset_type(fmt):
72def get_dataset_type(fmt):
73    """Map format string to FiftyOne dataset type or string indicators."""
74    if fmt == "coco":
75        return fot.COCODetectionDataset
76    elif fmt == "voc":
77        return fot.VOCDetectionDataset
78    elif fmt == "yolo":
79        return fot.YOLOv5Dataset
80    elif fmt == "video":
81        return "video"
82    elif fmt == "image_only":
83        return "image_only"
84    elif fmt == "cvat":
85        return fot.CVATImageDataset
86    else:
87        raise ValueError(f"Unsupported annotation_format: {fmt}")

Map format string to FiftyOne dataset type or string indicators.

def run_data_ingest():
 91def run_data_ingest():
 92    config = WORKFLOWS["data_ingest"]
 93    base_name = config["dataset_name"]
 94    dataset_dir = config["dataset_dir"]
 95    split = config.get("split_percentages", [0.7, 0.15, 0.15])
 96    fmt = config["annotation_format"]
 97    fps = config["fps"]
 98
 99    if fmt == "auto":
100        fmt = detect_format(dataset_dir)
101
102    dataset_type = get_dataset_type(fmt)
103
104    existing = fo.list_datasets()
105
106    i = 1
107    while f"{base_name}{i}" in existing:
108        i += 1
109    dataset_name = f"{base_name}{i}"
110
111    logging.info(f"Ingesting dataset: {dataset_name}")
112    logging.info(f"Detected format: {fmt}")
113    logging.info(f"Loading from: {dataset_dir}")
114
115    if dataset_type == "video":
116
117        logging.info(f"Converting videos in {dataset_dir} to frames at {fps} FPS...")
118
119        # Create temp dataset from videos
120        video_dataset = fo.Dataset.from_videos_dir(dataset_dir, name=f"{dataset_name}_video_temp")
121
122        # Define frame sampling output directory
123        frames_dir = os.path.join(dataset_dir, "extracted_frames")
124        os.makedirs(frames_dir, exist_ok=True)
125
126        # Sample at 1 FPS using FFmpeg
127        fouv.sample_videos(
128            video_dataset,
129            fps=fps,
130            output_dir=frames_dir,
131            original_frame_numbers=False,
132            force_sample=True,
133            verbose=False,
134            progress=True,
135        )
136
137        logging.info(f"Sampled frames stored at {frames_dir}")
138
139        # Now load as image-only dataset
140        dataset = fo.Dataset.from_images_dir(frames_dir, name=dataset_name)
141
142        temp_name = f"{dataset_name}_video_temp"
143        if temp_name in fo.list_datasets():
144            fo.delete_dataset(temp_name)
145            logging.info(f"Deleted temporary dataset: {temp_name}")
146
147    elif dataset_type == "image_only":
148        dataset = fo.Dataset.from_images_dir(dataset_dir, name = dataset_name)
149    elif fmt == "coco":
150        # Fast path: Single JSON + single image folder (any name)
151        flat_jsons = [f for f in os.listdir(dataset_dir) if f.endswith(".json")]
152        image_dirs = [
153            os.path.join(dataset_dir, d)
154            for d in os.listdir(dataset_dir)
155            if os.path.isdir(os.path.join(dataset_dir, d)) and d.lower() != "annotations"
156        ]
157
158        if len(flat_jsons) == 1 and len(image_dirs) == 1:
159            image_dir = image_dirs[0]
160            logging.info(f"Using {image_dir} for all splits (defaulted to 'train')")
161
162            dataset = fo.Dataset.from_dir(
163                dataset_type=fot.COCODetectionDataset,
164                data_path=image_dir,
165                labels_path=os.path.join(dataset_dir, flat_jsons[0]),
166                name=dataset_name,
167            )
168
169            dataset.tag_samples("train")
170            logging.info(f"Loaded {len(dataset)} samples from flat COCO directory")
171            dataset.persistent = True
172
173        else:
174            annotations_dir = os.path.join(dataset_dir, "annotations")
175            json_search_dir = annotations_dir if os.path.exists(annotations_dir) else dataset_dir
176
177            # Find all COCO-style annotation files
178            possible_jsons = [
179                f for f in os.listdir(json_search_dir)
180                if f.endswith(".json") or ("instances" in f or "annotation" in f)
181            ]
182
183            if not possible_jsons:
184                raise ValueError(f"No COCO-style annotation JSON files found in '{json_search_dir}'")
185
186            # Find available image directories with common split names
187            available_dirs = {
188                split: os.path.join(dataset_dir, d)
189                for d in os.listdir(dataset_dir)
190                if os.path.isdir(os.path.join(dataset_dir, d))
191                for split in ["train", "val", "test"]
192                if split in d.lower()
193            }
194
195            dataset = fo.Dataset(name=dataset_name)
196
197            for json_file in possible_jsons:
198                json_lower = json_file.lower()
199                if "train" in json_lower:
200                    split_name = "train"
201                elif "val" in json_lower:
202                    split_name = "val"
203                elif "test" in json_lower:
204                    split_name = "test"
205                else:
206                    split_name = "unlabeled"
207
208                # Match to image folder
209                data_path = available_dirs.get(split_name)
210                if not data_path:
211                    logging.info(f"Skipping '{split_name}' — No image folder found for '{json_file}'")
212                    continue
213
214                dataset_split = fo.Dataset.from_dir(
215                    dataset_type=fot.COCODetectionDataset,
216                    data_path=data_path,
217                    labels_path=os.path.join(json_search_dir, json_file),
218                    name=f"{dataset_name}_{split_name}",
219                )
220
221                sample_ids = dataset.add_samples(dataset_split)
222                dataset.select(sample_ids).tag_samples(split_name)
223                logging.info(f"Loaded {len(sample_ids)} samples for split '{split_name}'")
224
225    elif fmt == "cvat":
226        dataset = fo.Dataset.from_dir(
227            dataset_type=fot.CVATImageDataset,
228            data_path=os.path.join(dataset_dir, "data"),
229            labels_path=os.path.join(dataset_dir, "annotations.xml"),
230            name=dataset_name,
231        )
232    else:
233        if fmt == "yolo":
234            # Check for dataset.yaml
235            yaml_path = os.path.join(dataset_dir, "dataset.yaml")
236            if os.path.exists(yaml_path):
237                dataset = fo.Dataset(name=dataset_name)
238                split_names = ["train", "val", "test"]
239                loaded_splits = {}
240
241                with open(yaml_path) as f:
242                    yaml_content = f.read()
243
244                for split_name in split_names:
245                    if f"{split_name}:" not in yaml_content:
246                        logging.info(f"Skipping split '{split_name}' — not found in dataset.yaml")
247                        continue
248
249                    try:
250                        importer = fouy.YOLOv5DatasetImporter(
251                            dataset_dir=dataset_dir,
252                            yaml_path=yaml_path,
253                            split=split_name,
254                        )
255
256                        sample_ids = dataset.add_importer(importer, label_field="ground_truth")
257
258                        if not sample_ids:
259                            logging.infot(f"No samples loaded for split '{split_name}'")
260                            continue
261
262                        dataset.select(sample_ids).tag_samples(split_name)
263                        loaded_splits[split_name] = sample_ids
264
265                        logging.info(f"Loaded {len(sample_ids)} samples for split '{split_name}'")
266                    except Exception as e:
267                        logging.info(f"Warning: Failed to load split '{split_name}': {e}")
268
269
270
271                # Fallback logic if val or test is missing
272                if "val" not in loaded_splits and "test" in loaded_splits:
273                    test_samples = loaded_splits["test"]
274                    midpoint = len(test_samples) // 2
275
276                    dataset.select(test_samples).untag_samples("test")
277
278                    dataset.select(test_samples[:midpoint]).tag_samples("val")
279                    dataset.select(test_samples[midpoint:]).tag_samples("test")
280                    logging.info("Split test → val/test 50-50")
281
282                elif "test" not in loaded_splits and "val" in loaded_splits:
283                    val_samples = loaded_splits["val"]
284                    midpoint = len(val_samples) // 2
285
286                    dataset.select(val_samples).untag_samples("val")
287
288                    dataset.select(val_samples[:midpoint]).tag_samples("val")
289                    dataset.select(val_samples[midpoint:]).tag_samples("test")
290                    logging.info("Split val → val/test 50-50")
291
292                elif "val" not in loaded_splits and "test" not in loaded_splits and "train" in loaded_splits:
293                    all_train_samples = loaded_splits["train"]
294                    dataset.select(all_train_samples).untag_samples("train")# clear tag first
295                    random.seed(51)
296                    random.shuffle(all_train_samples)
297
298                    n = len(all_train_samples)
299                    s_train, s_val, s_test = split
300                    n_train = int(s_train * n)
301                    n_val = int(s_val * n)
302
303                    # Get sample IDs
304                    dataset.select(all_train_samples[:n_train]).tag_samples("train")
305                    dataset.select(all_train_samples[ n_train : n_train + n_val]).tag_samples("val")
306                    dataset.select(all_train_samples[ n_train + n_val:]).tag_samples("test")
307
308                    logging.info(f"Split train → train/val/test with {split} proportion")
309
310                logging.info(f"Final split counts: {dataset.count_sample_tags()}")
311
312        else:
313            dataset = fo.Dataset.from_dir(
314                dataset_dir=dataset_dir,
315                dataset_type=dataset_type,
316                name=dataset_name,
317            )
318
319
320    dataset.persistent = True
321
322    # Move 'detections' to 'ground_truth' and remove 'detections'
323    for sample in dataset:
324        if sample.has_field("detections"):
325            sample["ground_truth"] = sample["detections"]
326            sample.clear_field("detections")
327            sample.save()
328
329    # Also remove 'detections' from schema if it exists
330    if "detections" in dataset.get_field_schema():
331        dataset.delete_sample_field("detections")
332
333    if fmt == "yolo" and os.path.exists(os.path.join(dataset_dir, "dataset.yaml")):
334        pass
335    else:
336        # Clear all existing split tags before reassigning
337        for tag in ["train", "val", "test"]:
338            dataset.match_tags(tag).untag_samples(tag)
339
340        dataset.shuffle(seed=51)
341
342        n = len(dataset)
343        s_train, s_val, s_test = split
344
345        n_train = int(s_train * n)
346        remaining = n - n_train
347        n_val = int(remaining * (s_val / (s_val + s_test)))
348        n_test = remaining - n_val
349
350
351        # Apply tag-based split
352        dataset[:n_train].tag_samples("train")
353        dataset[n_train:n_train + n_val].tag_samples("val")
354        dataset[n_train + n_val:].tag_samples("test")
355
356        logging.info(f"Split applied: train {n_train}, val {n_val}, test {n_test}")
357    logging.info(f"Dataset '{dataset_name}' ingested with {len(dataset)} samples")
358
359    append_dataset_entry(dataset_name)