workflows.aws_download
1import base64 2import datetime 3import json 4import logging 5import os 6import re 7import time 8from importlib.metadata import version as get_version 9from queue import Empty 10 11import boto3 12import fiftyone as fo 13import pytz 14import torch.multiprocessing as mp 15from torch.utils.tensorboard import SummaryWriter 16from tqdm import tqdm 17 18from config.config import NUM_WORKERS 19 20 21class AwsDownloader: 22 """Downloads and decodes data from AWS S3 bucket, with support for multiprocessing and Voxel51 dataset creation.""" 23 24 def __init__(self, bucket, prefix, download_path, test_run): 25 """Initialize S3 downloader with bucket, prefix, download path and test flag, setting up AWS credentials from .secret file.""" 26 try: 27 with open(".secret", "r") as file: 28 for line in file: 29 key, value = line.strip().split("=") 30 os.environ[key] = value 31 32 # S3 Client 33 self.s3 = boto3.client( 34 "s3", 35 aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID", None), 36 aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY", None), 37 ) 38 39 self.bucket = bucket 40 self.prefix = prefix 41 self.test_run = test_run 42 43 self.download_path = download_path 44 os.makedirs(download_path, exist_ok=True) 45 except Exception as e: 46 logging.error(f"Connection to S3 client failed: {e}") 47 48 def _list_files(self, bucket, prefix): 49 """Return list of files and total size in TB from S3 bucket with given prefix.""" 50 files = [] 51 download_size_bytes = 0 52 paginator = self.s3.get_paginator("list_objects_v2") 53 for page in paginator.paginate(Bucket=bucket, Prefix=prefix): 54 if "Contents" in page: 55 for obj in page["Contents"]: 56 files.append(obj["Key"]) 57 download_size_bytes += obj["Size"] 58 59 total_size_tb = download_size_bytes / (1024**4) 60 61 return files, total_size_tb 62 63 def _set_v51_metadata(self, output_folder_root): 64 """Sets and saves FiftyOne metadata for dataset import by creating a metadata.json file with sample fields configuration.""" 65 sample_fields = [ 66 { 67 "name": "filepath", 68 "ftype": "fiftyone.core.fields.StringField", 69 "embedded_doc_type": None, 70 "subfield": None, 71 "fields": [], 72 "db_field": "filepath", 73 "description": None, 74 "info": None, 75 "read_only": True, 76 "created_at": { 77 "$date": "2024-11-14T15:24:21.719Z" 78 }, # FIXME Replace with actual date of function call 79 }, 80 { 81 "name": "sensor", 82 "ftype": "fiftyone.core.fields.StringField", 83 "embedded_doc_type": None, 84 "subfield": None, 85 "fields": [], 86 "db_field": "sensor", 87 "description": None, 88 "info": None, 89 "read_only": True, 90 "created_at": {"$date": "2024-11-14T15:24:21.719Z"}, 91 }, 92 { 93 "name": "timestamp", 94 "ftype": "fiftyone.core.fields.DateTimeField", 95 "embedded_doc_type": None, 96 "subfield": None, 97 "fields": [], 98 "db_field": "timestamp", 99 "description": None, 100 "info": None, 101 "read_only": True, 102 "created_at": {"$date": "2024-11-14T15:24:21.719Z"}, 103 }, 104 ] 105 106 # Get version of current V51 package 107 package_name = "fiftyone" 108 version = get_version(package_name) 109 version_str = str(version) 110 111 v51_metadata = {} 112 v51_metadata["version"] = version_str 113 v51_metadata["sample_fields"] = sample_fields 114 115 # Store metadata.json 116 file_path = os.path.join(output_folder_root, "metadata.json") 117 with open(file_path, "w") as json_file: 118 json.dump(v51_metadata, json_file) 119 120 def _process_file(self, file_path, output_folder_data): 121 """Process a JSON file containing image data and timestamps, converting them to a format compatible with V51 and saving images to disk.""" 122 v51_samples = [] 123 124 # Prepare ISO check for time format 125 iso8601_regex = re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d{3})?Z$") 126 127 with open(file_path, "r") as file: 128 for line in file: 129 try: 130 data = json.loads(line) 131 if "time" in data and "data" in data: 132 # Get data 133 timestamp = data.get("time") 134 image_base64 = data.get("data") 135 136 # Get timestamp 137 try: 138 # Time with ms (default) 139 time_obj = datetime.datetime.strptime( 140 timestamp, "%Y-%m-%d %H:%M:%S.%f" 141 ) 142 except ValueError: 143 time_obj = datetime.datetime.strptime( 144 timestamp, "%Y-%m-%d %H:%M:%S" 145 ) 146 formatted_time = ( 147 time_obj.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" 148 ) 149 sensor_name = data.get("sensor_name") 150 151 elif ( 152 "image" in data 153 and "sensor_name" in data 154 and "event_timestamp" in data 155 ): 156 # Get data 157 sensor_name = data.get("sensor_name") 158 timestamp = data.get("event_timestamp") 159 image_base64 = data.get("image") 160 161 # Get timestamps in UTC and Michigan time 162 # TODO Ensure that this conversion is correct, not clear in which time timestamps were written in 163 utc_time = datetime.datetime.utcfromtimestamp(timestamp) 164 michigan_tz = pytz.timezone("America/Detroit") 165 michigan_time = utc_time.astimezone(michigan_tz) 166 formatted_time = ( 167 michigan_time.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" 168 ) 169 else: 170 logging.error(f"Format cannot be processed: {data}") 171 continue 172 173 if image_base64 and formatted_time: 174 175 if ( 176 sensor_name is None 177 ): # FIXME Can be derived from S3 source if not stored in data itself 178 sensor_name = "Unknown" 179 180 # File paths 181 image_filename = f"{sensor_name}_{formatted_time}.jpg" 182 183 # Ensure correct timestamp format 184 milliseconds = formatted_time.split(".")[1][:3].ljust(3, "0") 185 formatted_time = ( 186 formatted_time.split(".")[0] + "." + milliseconds + "Z" 187 ) 188 189 iso8601_conform = bool(iso8601_regex.match(formatted_time)) 190 if not iso8601_conform: 191 logging.error( 192 f"Timestamp does not conform to ISO8601: {formatted_time}" 193 ) 194 195 # Store image to disk 196 output_path = os.path.join(output_folder_data, image_filename) 197 198 if os.path.exists(output_path): 199 logging.debug(f"File already exists: {output_path}") 200 else: 201 image_data = base64.b64decode(image_base64) 202 with open(output_path, "wb") as image_file: 203 image_file.write(image_data) 204 205 # Prepare import with V51 206 v51_sample = { 207 "filepath": output_path, 208 "sensor": sensor_name, 209 "timestamp": {"$date": formatted_time}, 210 } 211 v51_samples.append(v51_sample) 212 213 else: 214 logging.error( 215 f"There was an issue during file processing of {file_path}. Issues with image_base64: {image_base64 is None}, formatted_time: {formatted_time is None}, sensor_name: {sensor_name is None}" 216 ) 217 continue 218 219 except json.JSONDecodeError as e: 220 logging.error( 221 f"File {os.path.basename(file_path)} - Error decoding JSON: {e}" 222 ) 223 224 return v51_samples 225 226 def download_files(self, log_dir, MAX_SIZE_TB=1.5): 227 """Downloads files from AWS S3 bucket and verifies the download success, returning download details and status.""" 228 files_to_be_downloaded, total_size_tb = self._list_files( 229 self.bucket, self.prefix 230 ) 231 232 writer = SummaryWriter(log_dir=log_dir) 233 234 n_downloaded_files, n_skipped_files = 0, 0 235 if total_size_tb <= MAX_SIZE_TB: 236 for file in tqdm(files_to_be_downloaded, desc="Downloading files from AWS"): 237 time_start = time.time() 238 file_path = os.path.join(self.download_path, file) 239 240 # Ensure directory exists 241 os.makedirs(os.path.dirname(file_path), exist_ok=True) 242 243 if not os.path.exists(file_path): 244 if self.test_run == False: 245 self.s3.download_file(self.bucket, file, file_path) 246 n_downloaded_files += 1 247 248 time_end = time.time() 249 duration = time_end - time_start 250 files_per_second = 1 / duration 251 252 writer.add_scalar( 253 "download/files_per_second", 254 files_per_second, 255 n_downloaded_files, 256 ) 257 else: 258 logging.warning(f"Skipping {file}, already exists.") 259 n_skipped_files += 1 260 else: 261 logging.error( 262 f"Total size of {total_size_tb:.2f} TB exceeds limit of {MAX_SIZE_TB} TB. Skipping download." 263 ) 264 265 logging.info( 266 f"Collected {n_downloaded_files} files of total size {total_size_tb:.2f} TB, skipped {n_skipped_files} files." 267 ) 268 269 if self.test_run == True: 270 logging.error( 271 f"Test run: No data downloaded. Set 'test_run': False in the config. Aborting." 272 ) 273 274 # Check if all files were downloaded properly 275 DOWNLOAD_NUMBER_SUCCESS, DOWNLOAD_SIZE_SUCCESS = False, False 276 277 subfolders = [ 278 f.path for f in os.scandir(self.download_path) if f.is_dir() 279 ] # Each subfolder stands for a requested sample rate. Defaults to "1" 280 if len(subfolders) > 0: 281 sub_folder = os.path.basename(subfolders[0]) 282 if len(subfolders) > 1: 283 logging.warning( 284 f"More than one subfolder in download directory {self.download_path} found, selected {sub_folder}." 285 ) 286 287 downloaded_files = [] 288 downloaded_size_bytes = 0 289 290 folder_path = os.path.join(self.download_path, sub_folder) 291 files = [ 292 f 293 for f in os.listdir(folder_path) 294 if os.path.isfile(os.path.join(folder_path, f)) 295 ] 296 297 with tqdm( 298 total=len(files), desc="Checking downloaded files", unit="file" 299 ) as pbar: 300 for file in files: 301 file_path = os.path.join(folder_path, file) 302 downloaded_files.append(file_path) 303 downloaded_size_bytes += os.path.getsize(file_path) 304 pbar.update(1) 305 306 downloaded_size_tb = downloaded_size_bytes / (1024**4) 307 logging.info( 308 f"Downloaded {len(downloaded_files)} files. Total size: {downloaded_size_tb:.2f} TB" 309 ) 310 311 if len(files_to_be_downloaded) == len(downloaded_files): 312 logging.info("All files downloaded successfully.") 313 DOWNLOAD_NUMBER_SUCCESS = True 314 else: 315 logging.error( 316 f"Only {len(downloaded_files)} of {len(files_to_be_downloaded)} planned files downloaded." 317 ) 318 DOWNLOAD_NUMBER_SUCCESS = False 319 320 if total_size_tb == downloaded_size_tb: 321 logging.info("The downloaded size equals the planned download size.") 322 DOWNLOAD_SIZE_SUCCESS = True 323 else: 324 logging.error( 325 f"The downloaded size of {downloaded_size_tb} TB varies from the planned download size of {total_size_tb} TB." 326 ) 327 DOWNLOAD_SIZE_SUCCESS = False 328 else: 329 sub_folder = None 330 logging.error( 331 f"No subfolder found in download directory {self.download_path}." 332 ) 333 334 writer.close() 335 336 return ( 337 sub_folder, 338 files, 339 files_to_be_downloaded, 340 DOWNLOAD_NUMBER_SUCCESS, 341 DOWNLOAD_SIZE_SUCCESS, 342 ) 343 344 # Decode data with multiple workers 345 def decode_data( 346 self, 347 sub_folder, 348 files, 349 log_dir, 350 dataset_name="annarbor_rolling", 351 output_folder="decoded", 352 dataset_persistance=True, 353 ): 354 """Decodes downloaded data files into a Voxel51 dataset using multiprocessing workers for parallel processing.""" 355 output_folder_root = os.path.join(self.download_path, sub_folder, output_folder) 356 output_folder_data = os.path.join(output_folder_root, "data") 357 os.makedirs(output_folder_data, exist_ok=True) 358 359 # Save metadata to import dataset as Voxel51 dataset 360 self._set_v51_metadata(output_folder_root) 361 362 # Extract file content 363 json_file_path = os.path.join(output_folder_root, "samples.json") 364 365 # Queue for multiprocessing results 366 result_queue = mp.Queue() 367 task_queue = mp.Queue() 368 369 # Add files to the task queue 370 for file_path in files: 371 absolute_file_path = os.path.join(self.download_path, sub_folder, file_path) 372 task_queue.put(absolute_file_path) 373 374 logging.info( 375 f"Added {len(files)} files to task queue. Ready for multiprocessing." 376 ) 377 378 # Gather events for extraction workers 379 worker_done_events = [] 380 n_extraction_workers = NUM_WORKERS 381 for _ in range(n_extraction_workers): 382 done_event = mp.Event() 383 worker_done_events.append(done_event) 384 385 # Start the result worker 386 result_worker_process = mp.Process( 387 target=self.result_json_worker, 388 args=(result_queue, json_file_path, worker_done_events, log_dir), 389 ) 390 result_worker_process.start() 391 392 # Start the data extraction workers 393 n_files_per_worker = int(len(files) / n_extraction_workers) 394 workers = [] 395 for done_event in worker_done_events: 396 p = mp.Process( 397 target=self.data_extraction_worker, 398 args=( 399 task_queue, 400 result_queue, 401 done_event, 402 output_folder_data, 403 n_files_per_worker, 404 ), 405 ) 406 p.start() 407 workers.append(p) 408 409 # Waiting for data extraction workers 410 for p in workers: 411 p.join() 412 logging.info("All data processing workers finished processing.") 413 414 # Waiting for data processing worker 415 result_worker_process.join() 416 logging.info("JSON worker finished processing.") 417 418 task_queue.close() 419 result_queue.close() 420 421 # Load V51 dataset 422 dataset = fo.Dataset(name=dataset_name) 423 dataset.add_dir( 424 dataset_dir=output_folder_root, 425 dataset_type=fo.types.FiftyOneDataset, 426 progress=True, 427 ) 428 429 dataset.compute_metadata(num_workers=NUM_WORKERS, progress=True) 430 dataset.persistent = dataset_persistance 431 432 return dataset 433 434 # Worker functions 435 def data_extraction_worker( 436 self, 437 task_queue, 438 result_queue, 439 done_event, 440 output_folder_data, 441 n_files_per_worker, 442 ): 443 """Worker process that extracts data from files in the task queue and puts results in the result queue, processing until queue is empty.""" 444 logging.info(f"Process ID: {os.getpid()}. Data extraction process started.") 445 446 n_files_processed = 0 447 n_samples_processed = 0 448 while True: 449 if task_queue.empty() == True: # If queue is empty, break out 450 break 451 else: 452 try: 453 file_path = task_queue.get( 454 timeout=0.1 455 ) # Get a file path from the task queue 456 v51_samples = self._process_file(file_path, output_folder_data) 457 if len(v51_samples) > 0: 458 result_queue.put(v51_samples) 459 if n_files_processed % 100 == 0: 460 logging.info( 461 f"Worker {os.getpid()} finished {n_samples_processed} samples. {n_files_processed} / ~{n_files_per_worker} files done." 462 ) 463 n_files_processed += 1 464 n_samples_processed += len(v51_samples) 465 466 except Exception as e: 467 logging.error( 468 f"Error occured during processing of file {os.path.basename(file_path)}: {e}" 469 ) 470 continue 471 472 # Once all tasks are processed, set the done_event 473 done_event.set() 474 logging.info(f"Data Extraction worker {os.getpid()} shutting down.") 475 return True 476 477 def result_json_worker( 478 self, result_queue, json_file_path, worker_done_events, log_dir 479 ): 480 """Process results from worker queue, aggregate them into a JSON file and log metrics.""" 481 # Check if the samples.json file exists and load existing data if it does 482 logging.info(f"Process ID: {os.getpid()}. Results processing process started.") 483 484 writer = SummaryWriter(log_dir=log_dir) 485 n_files_processed = 0 486 n_images_extracted = 0 487 488 v51_samples_dict = {"samples": []} 489 490 # Update the file with incoming results as long as workers are running 491 while len(worker_done_events) > 0 or not result_queue.empty(): 492 try: 493 v51_samples = result_queue.get(timeout=1) 494 v51_samples_dict["samples"].extend(v51_samples) 495 496 n_images_extracted += len(v51_samples) 497 writer.add_scalar( 498 "decode/samples", n_images_extracted, n_files_processed 499 ) 500 n_files_processed += 1 501 502 # Log every 100,000 processed samples 503 if n_files_processed % 1_000 == 0: 504 logging.info( 505 f"{n_images_extracted} samples added to dict. Items in queue: {result_queue.qsize()}. Active workers: {len(worker_done_events)}" 506 ) 507 508 except Empty: 509 # Empty queue is expected sometimes 510 pass 511 512 except Exception as e: 513 logging.error(f"JSON worker error: {e}") 514 continue 515 516 # Check if any worker is done 517 for event in list(worker_done_events): 518 if event.is_set(): 519 worker_done_events.remove(event) 520 521 # Store data to JSON 522 logging.info(f"Storing data to samples.json") 523 with open(json_file_path, "w") as json_file: 524 json.dump(v51_samples_dict, json_file) 525 526 writer.close() 527 logging.info(f"JSON worker {os.getpid()} shutting down.") 528 return True
class
AwsDownloader:
22class AwsDownloader: 23 """Downloads and decodes data from AWS S3 bucket, with support for multiprocessing and Voxel51 dataset creation.""" 24 25 def __init__(self, bucket, prefix, download_path, test_run): 26 """Initialize S3 downloader with bucket, prefix, download path and test flag, setting up AWS credentials from .secret file.""" 27 try: 28 with open(".secret", "r") as file: 29 for line in file: 30 key, value = line.strip().split("=") 31 os.environ[key] = value 32 33 # S3 Client 34 self.s3 = boto3.client( 35 "s3", 36 aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID", None), 37 aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY", None), 38 ) 39 40 self.bucket = bucket 41 self.prefix = prefix 42 self.test_run = test_run 43 44 self.download_path = download_path 45 os.makedirs(download_path, exist_ok=True) 46 except Exception as e: 47 logging.error(f"Connection to S3 client failed: {e}") 48 49 def _list_files(self, bucket, prefix): 50 """Return list of files and total size in TB from S3 bucket with given prefix.""" 51 files = [] 52 download_size_bytes = 0 53 paginator = self.s3.get_paginator("list_objects_v2") 54 for page in paginator.paginate(Bucket=bucket, Prefix=prefix): 55 if "Contents" in page: 56 for obj in page["Contents"]: 57 files.append(obj["Key"]) 58 download_size_bytes += obj["Size"] 59 60 total_size_tb = download_size_bytes / (1024**4) 61 62 return files, total_size_tb 63 64 def _set_v51_metadata(self, output_folder_root): 65 """Sets and saves FiftyOne metadata for dataset import by creating a metadata.json file with sample fields configuration.""" 66 sample_fields = [ 67 { 68 "name": "filepath", 69 "ftype": "fiftyone.core.fields.StringField", 70 "embedded_doc_type": None, 71 "subfield": None, 72 "fields": [], 73 "db_field": "filepath", 74 "description": None, 75 "info": None, 76 "read_only": True, 77 "created_at": { 78 "$date": "2024-11-14T15:24:21.719Z" 79 }, # FIXME Replace with actual date of function call 80 }, 81 { 82 "name": "sensor", 83 "ftype": "fiftyone.core.fields.StringField", 84 "embedded_doc_type": None, 85 "subfield": None, 86 "fields": [], 87 "db_field": "sensor", 88 "description": None, 89 "info": None, 90 "read_only": True, 91 "created_at": {"$date": "2024-11-14T15:24:21.719Z"}, 92 }, 93 { 94 "name": "timestamp", 95 "ftype": "fiftyone.core.fields.DateTimeField", 96 "embedded_doc_type": None, 97 "subfield": None, 98 "fields": [], 99 "db_field": "timestamp", 100 "description": None, 101 "info": None, 102 "read_only": True, 103 "created_at": {"$date": "2024-11-14T15:24:21.719Z"}, 104 }, 105 ] 106 107 # Get version of current V51 package 108 package_name = "fiftyone" 109 version = get_version(package_name) 110 version_str = str(version) 111 112 v51_metadata = {} 113 v51_metadata["version"] = version_str 114 v51_metadata["sample_fields"] = sample_fields 115 116 # Store metadata.json 117 file_path = os.path.join(output_folder_root, "metadata.json") 118 with open(file_path, "w") as json_file: 119 json.dump(v51_metadata, json_file) 120 121 def _process_file(self, file_path, output_folder_data): 122 """Process a JSON file containing image data and timestamps, converting them to a format compatible with V51 and saving images to disk.""" 123 v51_samples = [] 124 125 # Prepare ISO check for time format 126 iso8601_regex = re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d{3})?Z$") 127 128 with open(file_path, "r") as file: 129 for line in file: 130 try: 131 data = json.loads(line) 132 if "time" in data and "data" in data: 133 # Get data 134 timestamp = data.get("time") 135 image_base64 = data.get("data") 136 137 # Get timestamp 138 try: 139 # Time with ms (default) 140 time_obj = datetime.datetime.strptime( 141 timestamp, "%Y-%m-%d %H:%M:%S.%f" 142 ) 143 except ValueError: 144 time_obj = datetime.datetime.strptime( 145 timestamp, "%Y-%m-%d %H:%M:%S" 146 ) 147 formatted_time = ( 148 time_obj.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" 149 ) 150 sensor_name = data.get("sensor_name") 151 152 elif ( 153 "image" in data 154 and "sensor_name" in data 155 and "event_timestamp" in data 156 ): 157 # Get data 158 sensor_name = data.get("sensor_name") 159 timestamp = data.get("event_timestamp") 160 image_base64 = data.get("image") 161 162 # Get timestamps in UTC and Michigan time 163 # TODO Ensure that this conversion is correct, not clear in which time timestamps were written in 164 utc_time = datetime.datetime.utcfromtimestamp(timestamp) 165 michigan_tz = pytz.timezone("America/Detroit") 166 michigan_time = utc_time.astimezone(michigan_tz) 167 formatted_time = ( 168 michigan_time.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" 169 ) 170 else: 171 logging.error(f"Format cannot be processed: {data}") 172 continue 173 174 if image_base64 and formatted_time: 175 176 if ( 177 sensor_name is None 178 ): # FIXME Can be derived from S3 source if not stored in data itself 179 sensor_name = "Unknown" 180 181 # File paths 182 image_filename = f"{sensor_name}_{formatted_time}.jpg" 183 184 # Ensure correct timestamp format 185 milliseconds = formatted_time.split(".")[1][:3].ljust(3, "0") 186 formatted_time = ( 187 formatted_time.split(".")[0] + "." + milliseconds + "Z" 188 ) 189 190 iso8601_conform = bool(iso8601_regex.match(formatted_time)) 191 if not iso8601_conform: 192 logging.error( 193 f"Timestamp does not conform to ISO8601: {formatted_time}" 194 ) 195 196 # Store image to disk 197 output_path = os.path.join(output_folder_data, image_filename) 198 199 if os.path.exists(output_path): 200 logging.debug(f"File already exists: {output_path}") 201 else: 202 image_data = base64.b64decode(image_base64) 203 with open(output_path, "wb") as image_file: 204 image_file.write(image_data) 205 206 # Prepare import with V51 207 v51_sample = { 208 "filepath": output_path, 209 "sensor": sensor_name, 210 "timestamp": {"$date": formatted_time}, 211 } 212 v51_samples.append(v51_sample) 213 214 else: 215 logging.error( 216 f"There was an issue during file processing of {file_path}. Issues with image_base64: {image_base64 is None}, formatted_time: {formatted_time is None}, sensor_name: {sensor_name is None}" 217 ) 218 continue 219 220 except json.JSONDecodeError as e: 221 logging.error( 222 f"File {os.path.basename(file_path)} - Error decoding JSON: {e}" 223 ) 224 225 return v51_samples 226 227 def download_files(self, log_dir, MAX_SIZE_TB=1.5): 228 """Downloads files from AWS S3 bucket and verifies the download success, returning download details and status.""" 229 files_to_be_downloaded, total_size_tb = self._list_files( 230 self.bucket, self.prefix 231 ) 232 233 writer = SummaryWriter(log_dir=log_dir) 234 235 n_downloaded_files, n_skipped_files = 0, 0 236 if total_size_tb <= MAX_SIZE_TB: 237 for file in tqdm(files_to_be_downloaded, desc="Downloading files from AWS"): 238 time_start = time.time() 239 file_path = os.path.join(self.download_path, file) 240 241 # Ensure directory exists 242 os.makedirs(os.path.dirname(file_path), exist_ok=True) 243 244 if not os.path.exists(file_path): 245 if self.test_run == False: 246 self.s3.download_file(self.bucket, file, file_path) 247 n_downloaded_files += 1 248 249 time_end = time.time() 250 duration = time_end - time_start 251 files_per_second = 1 / duration 252 253 writer.add_scalar( 254 "download/files_per_second", 255 files_per_second, 256 n_downloaded_files, 257 ) 258 else: 259 logging.warning(f"Skipping {file}, already exists.") 260 n_skipped_files += 1 261 else: 262 logging.error( 263 f"Total size of {total_size_tb:.2f} TB exceeds limit of {MAX_SIZE_TB} TB. Skipping download." 264 ) 265 266 logging.info( 267 f"Collected {n_downloaded_files} files of total size {total_size_tb:.2f} TB, skipped {n_skipped_files} files." 268 ) 269 270 if self.test_run == True: 271 logging.error( 272 f"Test run: No data downloaded. Set 'test_run': False in the config. Aborting." 273 ) 274 275 # Check if all files were downloaded properly 276 DOWNLOAD_NUMBER_SUCCESS, DOWNLOAD_SIZE_SUCCESS = False, False 277 278 subfolders = [ 279 f.path for f in os.scandir(self.download_path) if f.is_dir() 280 ] # Each subfolder stands for a requested sample rate. Defaults to "1" 281 if len(subfolders) > 0: 282 sub_folder = os.path.basename(subfolders[0]) 283 if len(subfolders) > 1: 284 logging.warning( 285 f"More than one subfolder in download directory {self.download_path} found, selected {sub_folder}." 286 ) 287 288 downloaded_files = [] 289 downloaded_size_bytes = 0 290 291 folder_path = os.path.join(self.download_path, sub_folder) 292 files = [ 293 f 294 for f in os.listdir(folder_path) 295 if os.path.isfile(os.path.join(folder_path, f)) 296 ] 297 298 with tqdm( 299 total=len(files), desc="Checking downloaded files", unit="file" 300 ) as pbar: 301 for file in files: 302 file_path = os.path.join(folder_path, file) 303 downloaded_files.append(file_path) 304 downloaded_size_bytes += os.path.getsize(file_path) 305 pbar.update(1) 306 307 downloaded_size_tb = downloaded_size_bytes / (1024**4) 308 logging.info( 309 f"Downloaded {len(downloaded_files)} files. Total size: {downloaded_size_tb:.2f} TB" 310 ) 311 312 if len(files_to_be_downloaded) == len(downloaded_files): 313 logging.info("All files downloaded successfully.") 314 DOWNLOAD_NUMBER_SUCCESS = True 315 else: 316 logging.error( 317 f"Only {len(downloaded_files)} of {len(files_to_be_downloaded)} planned files downloaded." 318 ) 319 DOWNLOAD_NUMBER_SUCCESS = False 320 321 if total_size_tb == downloaded_size_tb: 322 logging.info("The downloaded size equals the planned download size.") 323 DOWNLOAD_SIZE_SUCCESS = True 324 else: 325 logging.error( 326 f"The downloaded size of {downloaded_size_tb} TB varies from the planned download size of {total_size_tb} TB." 327 ) 328 DOWNLOAD_SIZE_SUCCESS = False 329 else: 330 sub_folder = None 331 logging.error( 332 f"No subfolder found in download directory {self.download_path}." 333 ) 334 335 writer.close() 336 337 return ( 338 sub_folder, 339 files, 340 files_to_be_downloaded, 341 DOWNLOAD_NUMBER_SUCCESS, 342 DOWNLOAD_SIZE_SUCCESS, 343 ) 344 345 # Decode data with multiple workers 346 def decode_data( 347 self, 348 sub_folder, 349 files, 350 log_dir, 351 dataset_name="annarbor_rolling", 352 output_folder="decoded", 353 dataset_persistance=True, 354 ): 355 """Decodes downloaded data files into a Voxel51 dataset using multiprocessing workers for parallel processing.""" 356 output_folder_root = os.path.join(self.download_path, sub_folder, output_folder) 357 output_folder_data = os.path.join(output_folder_root, "data") 358 os.makedirs(output_folder_data, exist_ok=True) 359 360 # Save metadata to import dataset as Voxel51 dataset 361 self._set_v51_metadata(output_folder_root) 362 363 # Extract file content 364 json_file_path = os.path.join(output_folder_root, "samples.json") 365 366 # Queue for multiprocessing results 367 result_queue = mp.Queue() 368 task_queue = mp.Queue() 369 370 # Add files to the task queue 371 for file_path in files: 372 absolute_file_path = os.path.join(self.download_path, sub_folder, file_path) 373 task_queue.put(absolute_file_path) 374 375 logging.info( 376 f"Added {len(files)} files to task queue. Ready for multiprocessing." 377 ) 378 379 # Gather events for extraction workers 380 worker_done_events = [] 381 n_extraction_workers = NUM_WORKERS 382 for _ in range(n_extraction_workers): 383 done_event = mp.Event() 384 worker_done_events.append(done_event) 385 386 # Start the result worker 387 result_worker_process = mp.Process( 388 target=self.result_json_worker, 389 args=(result_queue, json_file_path, worker_done_events, log_dir), 390 ) 391 result_worker_process.start() 392 393 # Start the data extraction workers 394 n_files_per_worker = int(len(files) / n_extraction_workers) 395 workers = [] 396 for done_event in worker_done_events: 397 p = mp.Process( 398 target=self.data_extraction_worker, 399 args=( 400 task_queue, 401 result_queue, 402 done_event, 403 output_folder_data, 404 n_files_per_worker, 405 ), 406 ) 407 p.start() 408 workers.append(p) 409 410 # Waiting for data extraction workers 411 for p in workers: 412 p.join() 413 logging.info("All data processing workers finished processing.") 414 415 # Waiting for data processing worker 416 result_worker_process.join() 417 logging.info("JSON worker finished processing.") 418 419 task_queue.close() 420 result_queue.close() 421 422 # Load V51 dataset 423 dataset = fo.Dataset(name=dataset_name) 424 dataset.add_dir( 425 dataset_dir=output_folder_root, 426 dataset_type=fo.types.FiftyOneDataset, 427 progress=True, 428 ) 429 430 dataset.compute_metadata(num_workers=NUM_WORKERS, progress=True) 431 dataset.persistent = dataset_persistance 432 433 return dataset 434 435 # Worker functions 436 def data_extraction_worker( 437 self, 438 task_queue, 439 result_queue, 440 done_event, 441 output_folder_data, 442 n_files_per_worker, 443 ): 444 """Worker process that extracts data from files in the task queue and puts results in the result queue, processing until queue is empty.""" 445 logging.info(f"Process ID: {os.getpid()}. Data extraction process started.") 446 447 n_files_processed = 0 448 n_samples_processed = 0 449 while True: 450 if task_queue.empty() == True: # If queue is empty, break out 451 break 452 else: 453 try: 454 file_path = task_queue.get( 455 timeout=0.1 456 ) # Get a file path from the task queue 457 v51_samples = self._process_file(file_path, output_folder_data) 458 if len(v51_samples) > 0: 459 result_queue.put(v51_samples) 460 if n_files_processed % 100 == 0: 461 logging.info( 462 f"Worker {os.getpid()} finished {n_samples_processed} samples. {n_files_processed} / ~{n_files_per_worker} files done." 463 ) 464 n_files_processed += 1 465 n_samples_processed += len(v51_samples) 466 467 except Exception as e: 468 logging.error( 469 f"Error occured during processing of file {os.path.basename(file_path)}: {e}" 470 ) 471 continue 472 473 # Once all tasks are processed, set the done_event 474 done_event.set() 475 logging.info(f"Data Extraction worker {os.getpid()} shutting down.") 476 return True 477 478 def result_json_worker( 479 self, result_queue, json_file_path, worker_done_events, log_dir 480 ): 481 """Process results from worker queue, aggregate them into a JSON file and log metrics.""" 482 # Check if the samples.json file exists and load existing data if it does 483 logging.info(f"Process ID: {os.getpid()}. Results processing process started.") 484 485 writer = SummaryWriter(log_dir=log_dir) 486 n_files_processed = 0 487 n_images_extracted = 0 488 489 v51_samples_dict = {"samples": []} 490 491 # Update the file with incoming results as long as workers are running 492 while len(worker_done_events) > 0 or not result_queue.empty(): 493 try: 494 v51_samples = result_queue.get(timeout=1) 495 v51_samples_dict["samples"].extend(v51_samples) 496 497 n_images_extracted += len(v51_samples) 498 writer.add_scalar( 499 "decode/samples", n_images_extracted, n_files_processed 500 ) 501 n_files_processed += 1 502 503 # Log every 100,000 processed samples 504 if n_files_processed % 1_000 == 0: 505 logging.info( 506 f"{n_images_extracted} samples added to dict. Items in queue: {result_queue.qsize()}. Active workers: {len(worker_done_events)}" 507 ) 508 509 except Empty: 510 # Empty queue is expected sometimes 511 pass 512 513 except Exception as e: 514 logging.error(f"JSON worker error: {e}") 515 continue 516 517 # Check if any worker is done 518 for event in list(worker_done_events): 519 if event.is_set(): 520 worker_done_events.remove(event) 521 522 # Store data to JSON 523 logging.info(f"Storing data to samples.json") 524 with open(json_file_path, "w") as json_file: 525 json.dump(v51_samples_dict, json_file) 526 527 writer.close() 528 logging.info(f"JSON worker {os.getpid()} shutting down.") 529 return True
Downloads and decodes data from AWS S3 bucket, with support for multiprocessing and Voxel51 dataset creation.
AwsDownloader(bucket, prefix, download_path, test_run)
25 def __init__(self, bucket, prefix, download_path, test_run): 26 """Initialize S3 downloader with bucket, prefix, download path and test flag, setting up AWS credentials from .secret file.""" 27 try: 28 with open(".secret", "r") as file: 29 for line in file: 30 key, value = line.strip().split("=") 31 os.environ[key] = value 32 33 # S3 Client 34 self.s3 = boto3.client( 35 "s3", 36 aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID", None), 37 aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY", None), 38 ) 39 40 self.bucket = bucket 41 self.prefix = prefix 42 self.test_run = test_run 43 44 self.download_path = download_path 45 os.makedirs(download_path, exist_ok=True) 46 except Exception as e: 47 logging.error(f"Connection to S3 client failed: {e}")
Initialize S3 downloader with bucket, prefix, download path and test flag, setting up AWS credentials from .secret file.
def
download_files(self, log_dir, MAX_SIZE_TB=1.5):
227 def download_files(self, log_dir, MAX_SIZE_TB=1.5): 228 """Downloads files from AWS S3 bucket and verifies the download success, returning download details and status.""" 229 files_to_be_downloaded, total_size_tb = self._list_files( 230 self.bucket, self.prefix 231 ) 232 233 writer = SummaryWriter(log_dir=log_dir) 234 235 n_downloaded_files, n_skipped_files = 0, 0 236 if total_size_tb <= MAX_SIZE_TB: 237 for file in tqdm(files_to_be_downloaded, desc="Downloading files from AWS"): 238 time_start = time.time() 239 file_path = os.path.join(self.download_path, file) 240 241 # Ensure directory exists 242 os.makedirs(os.path.dirname(file_path), exist_ok=True) 243 244 if not os.path.exists(file_path): 245 if self.test_run == False: 246 self.s3.download_file(self.bucket, file, file_path) 247 n_downloaded_files += 1 248 249 time_end = time.time() 250 duration = time_end - time_start 251 files_per_second = 1 / duration 252 253 writer.add_scalar( 254 "download/files_per_second", 255 files_per_second, 256 n_downloaded_files, 257 ) 258 else: 259 logging.warning(f"Skipping {file}, already exists.") 260 n_skipped_files += 1 261 else: 262 logging.error( 263 f"Total size of {total_size_tb:.2f} TB exceeds limit of {MAX_SIZE_TB} TB. Skipping download." 264 ) 265 266 logging.info( 267 f"Collected {n_downloaded_files} files of total size {total_size_tb:.2f} TB, skipped {n_skipped_files} files." 268 ) 269 270 if self.test_run == True: 271 logging.error( 272 f"Test run: No data downloaded. Set 'test_run': False in the config. Aborting." 273 ) 274 275 # Check if all files were downloaded properly 276 DOWNLOAD_NUMBER_SUCCESS, DOWNLOAD_SIZE_SUCCESS = False, False 277 278 subfolders = [ 279 f.path for f in os.scandir(self.download_path) if f.is_dir() 280 ] # Each subfolder stands for a requested sample rate. Defaults to "1" 281 if len(subfolders) > 0: 282 sub_folder = os.path.basename(subfolders[0]) 283 if len(subfolders) > 1: 284 logging.warning( 285 f"More than one subfolder in download directory {self.download_path} found, selected {sub_folder}." 286 ) 287 288 downloaded_files = [] 289 downloaded_size_bytes = 0 290 291 folder_path = os.path.join(self.download_path, sub_folder) 292 files = [ 293 f 294 for f in os.listdir(folder_path) 295 if os.path.isfile(os.path.join(folder_path, f)) 296 ] 297 298 with tqdm( 299 total=len(files), desc="Checking downloaded files", unit="file" 300 ) as pbar: 301 for file in files: 302 file_path = os.path.join(folder_path, file) 303 downloaded_files.append(file_path) 304 downloaded_size_bytes += os.path.getsize(file_path) 305 pbar.update(1) 306 307 downloaded_size_tb = downloaded_size_bytes / (1024**4) 308 logging.info( 309 f"Downloaded {len(downloaded_files)} files. Total size: {downloaded_size_tb:.2f} TB" 310 ) 311 312 if len(files_to_be_downloaded) == len(downloaded_files): 313 logging.info("All files downloaded successfully.") 314 DOWNLOAD_NUMBER_SUCCESS = True 315 else: 316 logging.error( 317 f"Only {len(downloaded_files)} of {len(files_to_be_downloaded)} planned files downloaded." 318 ) 319 DOWNLOAD_NUMBER_SUCCESS = False 320 321 if total_size_tb == downloaded_size_tb: 322 logging.info("The downloaded size equals the planned download size.") 323 DOWNLOAD_SIZE_SUCCESS = True 324 else: 325 logging.error( 326 f"The downloaded size of {downloaded_size_tb} TB varies from the planned download size of {total_size_tb} TB." 327 ) 328 DOWNLOAD_SIZE_SUCCESS = False 329 else: 330 sub_folder = None 331 logging.error( 332 f"No subfolder found in download directory {self.download_path}." 333 ) 334 335 writer.close() 336 337 return ( 338 sub_folder, 339 files, 340 files_to_be_downloaded, 341 DOWNLOAD_NUMBER_SUCCESS, 342 DOWNLOAD_SIZE_SUCCESS, 343 )
Downloads files from AWS S3 bucket and verifies the download success, returning download details and status.
def
decode_data( self, sub_folder, files, log_dir, dataset_name='annarbor_rolling', output_folder='decoded', dataset_persistance=True):
346 def decode_data( 347 self, 348 sub_folder, 349 files, 350 log_dir, 351 dataset_name="annarbor_rolling", 352 output_folder="decoded", 353 dataset_persistance=True, 354 ): 355 """Decodes downloaded data files into a Voxel51 dataset using multiprocessing workers for parallel processing.""" 356 output_folder_root = os.path.join(self.download_path, sub_folder, output_folder) 357 output_folder_data = os.path.join(output_folder_root, "data") 358 os.makedirs(output_folder_data, exist_ok=True) 359 360 # Save metadata to import dataset as Voxel51 dataset 361 self._set_v51_metadata(output_folder_root) 362 363 # Extract file content 364 json_file_path = os.path.join(output_folder_root, "samples.json") 365 366 # Queue for multiprocessing results 367 result_queue = mp.Queue() 368 task_queue = mp.Queue() 369 370 # Add files to the task queue 371 for file_path in files: 372 absolute_file_path = os.path.join(self.download_path, sub_folder, file_path) 373 task_queue.put(absolute_file_path) 374 375 logging.info( 376 f"Added {len(files)} files to task queue. Ready for multiprocessing." 377 ) 378 379 # Gather events for extraction workers 380 worker_done_events = [] 381 n_extraction_workers = NUM_WORKERS 382 for _ in range(n_extraction_workers): 383 done_event = mp.Event() 384 worker_done_events.append(done_event) 385 386 # Start the result worker 387 result_worker_process = mp.Process( 388 target=self.result_json_worker, 389 args=(result_queue, json_file_path, worker_done_events, log_dir), 390 ) 391 result_worker_process.start() 392 393 # Start the data extraction workers 394 n_files_per_worker = int(len(files) / n_extraction_workers) 395 workers = [] 396 for done_event in worker_done_events: 397 p = mp.Process( 398 target=self.data_extraction_worker, 399 args=( 400 task_queue, 401 result_queue, 402 done_event, 403 output_folder_data, 404 n_files_per_worker, 405 ), 406 ) 407 p.start() 408 workers.append(p) 409 410 # Waiting for data extraction workers 411 for p in workers: 412 p.join() 413 logging.info("All data processing workers finished processing.") 414 415 # Waiting for data processing worker 416 result_worker_process.join() 417 logging.info("JSON worker finished processing.") 418 419 task_queue.close() 420 result_queue.close() 421 422 # Load V51 dataset 423 dataset = fo.Dataset(name=dataset_name) 424 dataset.add_dir( 425 dataset_dir=output_folder_root, 426 dataset_type=fo.types.FiftyOneDataset, 427 progress=True, 428 ) 429 430 dataset.compute_metadata(num_workers=NUM_WORKERS, progress=True) 431 dataset.persistent = dataset_persistance 432 433 return dataset
Decodes downloaded data files into a Voxel51 dataset using multiprocessing workers for parallel processing.
def
data_extraction_worker( self, task_queue, result_queue, done_event, output_folder_data, n_files_per_worker):
436 def data_extraction_worker( 437 self, 438 task_queue, 439 result_queue, 440 done_event, 441 output_folder_data, 442 n_files_per_worker, 443 ): 444 """Worker process that extracts data from files in the task queue and puts results in the result queue, processing until queue is empty.""" 445 logging.info(f"Process ID: {os.getpid()}. Data extraction process started.") 446 447 n_files_processed = 0 448 n_samples_processed = 0 449 while True: 450 if task_queue.empty() == True: # If queue is empty, break out 451 break 452 else: 453 try: 454 file_path = task_queue.get( 455 timeout=0.1 456 ) # Get a file path from the task queue 457 v51_samples = self._process_file(file_path, output_folder_data) 458 if len(v51_samples) > 0: 459 result_queue.put(v51_samples) 460 if n_files_processed % 100 == 0: 461 logging.info( 462 f"Worker {os.getpid()} finished {n_samples_processed} samples. {n_files_processed} / ~{n_files_per_worker} files done." 463 ) 464 n_files_processed += 1 465 n_samples_processed += len(v51_samples) 466 467 except Exception as e: 468 logging.error( 469 f"Error occured during processing of file {os.path.basename(file_path)}: {e}" 470 ) 471 continue 472 473 # Once all tasks are processed, set the done_event 474 done_event.set() 475 logging.info(f"Data Extraction worker {os.getpid()} shutting down.") 476 return True
Worker process that extracts data from files in the task queue and puts results in the result queue, processing until queue is empty.
def
result_json_worker(self, result_queue, json_file_path, worker_done_events, log_dir):
478 def result_json_worker( 479 self, result_queue, json_file_path, worker_done_events, log_dir 480 ): 481 """Process results from worker queue, aggregate them into a JSON file and log metrics.""" 482 # Check if the samples.json file exists and load existing data if it does 483 logging.info(f"Process ID: {os.getpid()}. Results processing process started.") 484 485 writer = SummaryWriter(log_dir=log_dir) 486 n_files_processed = 0 487 n_images_extracted = 0 488 489 v51_samples_dict = {"samples": []} 490 491 # Update the file with incoming results as long as workers are running 492 while len(worker_done_events) > 0 or not result_queue.empty(): 493 try: 494 v51_samples = result_queue.get(timeout=1) 495 v51_samples_dict["samples"].extend(v51_samples) 496 497 n_images_extracted += len(v51_samples) 498 writer.add_scalar( 499 "decode/samples", n_images_extracted, n_files_processed 500 ) 501 n_files_processed += 1 502 503 # Log every 100,000 processed samples 504 if n_files_processed % 1_000 == 0: 505 logging.info( 506 f"{n_images_extracted} samples added to dict. Items in queue: {result_queue.qsize()}. Active workers: {len(worker_done_events)}" 507 ) 508 509 except Empty: 510 # Empty queue is expected sometimes 511 pass 512 513 except Exception as e: 514 logging.error(f"JSON worker error: {e}") 515 continue 516 517 # Check if any worker is done 518 for event in list(worker_done_events): 519 if event.is_set(): 520 worker_done_events.remove(event) 521 522 # Store data to JSON 523 logging.info(f"Storing data to samples.json") 524 with open(json_file_path, "w") as json_file: 525 json.dump(v51_samples_dict, json_file) 526 527 writer.close() 528 logging.info(f"JSON worker {os.getpid()} shutting down.") 529 return True
Process results from worker queue, aggregate them into a JSON file and log metrics.