s3_file_list
1import datetime 2import json 3import os 4import re 5import time 6import traceback 7 8import boto3 9import wandb 10from aws_stream_filter_framerate import SampleTimestamps 11from dotenv import load_dotenv 12from tqdm import tqdm 13 14WANDB_ACTIVE=False 15 16 17load_dotenv() 18 19 20class AwsDownloader: 21 """AWS data downloader for handling, downloading, and processing camera data from AWS S3 buckets.""" 22 23 def __init__( 24 self, 25 start_date: datetime.datetime, 26 end_date: datetime.datetime, 27 sample_rate_hz: float, 28 log_time: datetime.datetime, 29 source: str = "mcity_gridsmart", 30 storage_target_root: str = ".", 31 subfolder_data: str = "data", 32 subfolder_logs: str = "logs", 33 test_run: bool = False, 34 delete_old_data: bool = False, 35 ): 36 """Initialize the class with parameters for date range, logging configurations, and storage settings.""" 37 self.start_date = start_date 38 self.end_date = end_date 39 self.sample_rate_hz = sample_rate_hz 40 self.source = source 41 self.storage_target_root = storage_target_root 42 self.test_run = test_run 43 self.delete_old_data = delete_old_data 44 self.log_time = log_time 45 self.file_names = [] 46 47 self.log_download = {} 48 self.log_sampling = {} 49 50 # Fill log 51 self.log_download["source"] = source 52 self.log_download["sample_rate_hz"] = sample_rate_hz 53 self.log_download["storage_target_root"] = storage_target_root 54 self.log_download["selection_start_date"] = start_date.strftime("%Y-%m-%d") 55 self.log_download["selection_end_date"] = end_date.strftime("%Y-%m-%d") 56 self.log_download["delete_old_data"] = delete_old_data 57 self.log_download["test_run"] = test_run 58 59 # Run name 60 formatted_start = self.start_date.strftime("%Y-%m-%d") 61 formatted_end = self.end_date.strftime("%Y-%m-%d") 62 self.run_name = f"data_engine_rolling_{formatted_start}_to_{formatted_end}" 63 64 # Setup storage folders 65 self.data_target = os.path.join( 66 storage_target_root, subfolder_data, self.run_name 67 ) 68 os.makedirs(self.data_target, exist_ok=True) 69 70 self.log_target = os.path.join( 71 storage_target_root, subfolder_logs, self.run_name 72 ) 73 os.makedirs(self.log_target, exist_ok=True) 74 75 # Connect to AWS S3 76 self.s3 = boto3.client( 77 "s3", 78 aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID", None), 79 aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY", None), 80 region_name="us-east-1", 81 ) 82 83 def process_data(self): 84 """Downloads files from AWS S3, samples them at 1Hz, and generates download and sampling logs.""" 85 cameras_dict = self._mcity_init_cameras() 86 self._mcity_process_aws_buckets(cameras_dict) 87 self.file_names, n_files_to_download = self._mcity_select_data(cameras_dict) 88 89 # Tracking 90 if WANDB_ACTIVE == True: 91 wandb.init( 92 name=self.run_name, 93 job_type="download", 94 project="Pre-Processing in AWS", 95 ) 96 97 targets = [] 98 step = 0 99 with tqdm(desc="Processing data", total=n_files_to_download) as pbar: 100 for camera in cameras_dict: 101 for aws_source in cameras_dict[camera]["aws-sources"]: 102 bucket = aws_source.split("/", 1)[0] 103 for date in cameras_dict[camera]["aws-sources"][aws_source]: 104 for file in cameras_dict[camera]["aws-sources"][aws_source][ 105 date 106 ]: 107 try: 108 log_run = {} 109 110 # AWS S3 Download 111 time_start = time.time() 112 file_name = os.path.basename(file) 113 key = cameras_dict[camera]["aws-sources"][aws_source][ 114 date 115 ][file_name]["key"] 116 target = os.path.join(self.data_target, file_name) 117 targets.append(target) 118 self.s3.download_file(bucket, key, target) 119 120 # Logging 121 file_size_mb = cameras_dict[camera]["aws-sources"][ 122 aws_source 123 ][date][file_name]["size"] / (1024**2) 124 time_end = time.time() 125 duration = time_end - time_start 126 mb_per_s = file_size_mb / duration 127 if WANDB_ACTIVE: 128 wandb.log({"download/mb_per_s": mb_per_s}, step) 129 wandb.log({"download/s": duration}, step) 130 131 # Sample data 132 time_start = time.time() 133 sampler = SampleTimestamps( 134 file_path=target, target_framerate_hz=1 135 ) 136 timestamps = sampler.get_timestamps() 137 138 # We need at least 2 timestamps to calculate a framerate 139 if len(timestamps) >= 2: 140 # Get framerate 141 framerate_hz, timestamps, upper_bound_threshold = ( 142 sampler.get_framerate(timestamps, log_run) 143 ) 144 valid_target_framerate = ( 145 sampler.check_target_framerate( 146 framerate_hz, log_run 147 ) 148 ) 149 # We need a target framerate lower than the oririinal framerate 150 if valid_target_framerate: 151 # Sample data 152 ( 153 selected_indices, 154 selected_timestamps, 155 target_timestamps, 156 selected_target_timestamps, 157 ) = sampler.sample_timestamps( 158 timestamps, upper_bound_threshold, log_run 159 ) 160 161 time_end = time.time() 162 duration = time_end - time_start 163 timestamps_per_s = len(timestamps) / duration 164 if WANDB_ACTIVE: 165 wandb.log( 166 { 167 "sampling/timestamps_per_s": timestamps_per_s 168 }, 169 step, 170 ) 171 wandb.log({"sampling/s": duration}, step) 172 173 # Upload data 174 time_start = time.time() 175 print(camera) 176 file_size_mb = sampler.update_upload_file( 177 target, selected_indices, camera 178 ) 179 180 time_end = time.time() 181 duration = time_end - time_start 182 mb_per_s = file_size_mb / duration 183 if WANDB_ACTIVE: 184 wandb.log( 185 {"upload/mb_per_s": mb_per_s}, step 186 ) 187 wandb.log({"upload/s": duration}, step) 188 189 # Update log 190 self.log_sampling[file] = log_run 191 192 else: 193 print( 194 f"Not enough timestamps to calculate framerate. Skipping {file}" 195 ) 196 197 # Delete local data 198 os.remove(target) 199 os.remove(target + "_sampled_1Hz") 200 201 # Update progress bar 202 step += 1 203 pbar.update(1) 204 205 except Exception as e: 206 print(f"Error in mcity_gridsmart_loader: {e}") 207 print(traceback.format_exc()) 208 209 # Finish tracking 210 try: 211 wandb.finish() 212 except: 213 pass 214 pbar.close() 215 216 # Store download log 217 name_log_download = "FileDownload" 218 self.log_download["data"] = cameras_dict 219 log_name = (self.log_time + "_" + name_log_download).replace(" ", "_").replace( 220 ":", "_" 221 ) + ".json" 222 log_file_path = os.path.join(self.log_target, log_name) 223 with open(log_file_path, "w") as json_file: 224 json.dump(self.log_download, json_file, indent=4) 225 226 # Store sampling log 227 name_log_sampling = "FileSampling" 228 log_name = (self.log_time + "_" + name_log_sampling).replace(" ", "_").replace( 229 ":", "_" 230 ) + ".json" 231 log_file_path = os.path.join(self.log_target, log_name) 232 with open(log_file_path, "w") as json_file: 233 json.dump(self.log_sampling, json_file, indent=4) 234 235 def _mcity_init_cameras( 236 self, 237 cameras={ 238 "Geddes_Huron_1", 239 "Geddes_Huron_2", 240 "Huron_Plymouth_1", 241 "Huron_Plymouth_2", 242 "Main_stadium_1", 243 "Main_stadium_2", 244 "Plymouth_Beal", 245 "Plymouth_Bishop", 246 "Plymouth_EPA", 247 "Plymouth_Georgetown", 248 "State_Ellsworth_NE", 249 "State_Ellsworth_NW", 250 "State_Ellsworth_SE", 251 "State_Ellsworth_SW", 252 "Fuller_Fuller_CT", 253 "Fuller_Glazier_1", 254 "Fuller_Glazier_2", 255 "Fuller_Glen", 256 "Dexter_Maple_1", 257 "Dexter_Maple_2", 258 "Hubbard_Huron_1", 259 "Hubbard_Huron_2", 260 "Maple_Miller_1", 261 "Maple_Miller_2", 262 }, 263 ): 264 """Initialize a dictionary of Mcity traffic cameras with IDs and empty AWS source mappings.""" 265 cameras_dict = {camera.lower(): {} for camera in cameras} 266 for id, camera in enumerate(cameras_dict): 267 cameras_dict[camera]["id"] = id 268 cameras_dict[camera]["aws-sources"] = {} 269 270 print(f"Processed {len(cameras_dict)} cameras") 271 return cameras_dict 272 273 def _mcity_process_aws_buckets( 274 self, 275 cameras_dict, 276 aws_sources={ 277 "sip-sensor-data": [""], 278 "sip-sensor-data2": ["wheeler1/", "wheeler2/"], 279 }, 280 ): 281 """Processes AWS buckets to map camera names to their corresponding AWS sources.""" 282 for bucket in tqdm(aws_sources, desc="Processing AWS sources"): 283 for folder in aws_sources[bucket]: 284 # Get and pre-process AWS data 285 result = self.s3.list_objects_v2( 286 Bucket=bucket, Prefix=folder, Delimiter="/" 287 ) 288 folders = self._process_aws_result(result) 289 290 # Align folder names with camera names 291 folders_aligned = [ 292 re.sub(r"(?<!_)(\d)", r"_\1", folder.lower().rstrip("/")) 293 for folder in folders 294 ] # Align varying AWS folder names with camera names 295 folders_aligned = [ 296 folder.replace("fullerct", "fuller_ct") 297 for folder in folders_aligned 298 ] # Replace "fullerct" with "fuller_ct" to align with camera names 299 folders_aligned = [ 300 folder.replace("fullser", "fuller") for folder in folders_aligned 301 ] # Fix "fullser" typo in AWS 302 303 # Check cameras for AWS sources 304 if folders: 305 for camera_name in cameras_dict: 306 for folder_name, folder_name_aligned in zip( 307 folders, folders_aligned 308 ): 309 if ( 310 camera_name in folder_name_aligned 311 and "gs_" in folder_name_aligned 312 ): # gs_ is the prefix used in AWS 313 aws_source = f"{bucket}/{folder_name}" 314 if ( 315 aws_source 316 not in cameras_dict[camera_name]["aws-sources"] 317 ): 318 cameras_dict[camera_name]["aws-sources"][ 319 aws_source 320 ] = {} 321 else: 322 print(f"AWS did not return a list of folders for {bucket}/{folder}") 323 324 def _mcity_select_data(self, cameras_dict): 325 """Processes S3 camera data within specified date range, organizing files by camera, source, and date while tracking metrics.""" 326 n_cameras = 0 327 n_aws_sources = 0 328 n_files_to_download = 0 329 download_size_bytes = 0 330 for camera in tqdm(cameras_dict, desc="Looking for data entries in range"): 331 n_cameras += 1 332 for aws_source in cameras_dict[camera]["aws-sources"]: 333 file_downloaded_test = False 334 n_aws_sources += 1 335 bucket = aws_source.split("/")[0] 336 prefix_camera = "/".join(aws_source.split("/")[1:]) 337 result = self.s3.list_objects_v2( 338 Bucket=bucket, Prefix=prefix_camera, Delimiter="/" 339 ) 340 # Each folder represents a day 341 folders_day = self._process_aws_result(result) 342 for folder_day in folders_day: 343 date = folder_day.split("/")[-2] 344 if self.test_run: 345 # Choose a sample irrespective of the data range to get data from all AWS sources 346 in_range = True 347 else: 348 # Only collect data within the date range 349 timestamp = datetime.datetime.strptime(date, "%Y-%m-%d") 350 in_range = self.start_date <= timestamp <= self.end_date 351 352 if in_range: 353 cameras_dict[camera]["aws-sources"][aws_source][date] = {} 354 result = self.s3.list_objects_v2( 355 Bucket=bucket, Prefix=folder_day, Delimiter="/" 356 ) 357 # Each folder represents an hour 358 folders_hour = self._process_aws_result(result) 359 for folder_hour in folders_hour: 360 result = self.s3.list_objects_v2( 361 Bucket=bucket, Prefix=folder_hour, Delimiter="/" 362 ) 363 files = result["Contents"] 364 for file in files: 365 n_files_to_download += 1 366 download_size_bytes += file["Size"] 367 file_name = os.path.basename(file["Key"]) 368 cameras_dict[camera]["aws-sources"][aws_source][date][ 369 file_name 370 ] = {} 371 cameras_dict[camera]["aws-sources"][aws_source][date][ 372 file_name 373 ]["key"] = file["Key"] 374 self.file_names.append(file["Key"]) 375 cameras_dict[camera]["aws-sources"][aws_source][date][ 376 file_name 377 ]["size"] = file["Size"] 378 cameras_dict[camera]["aws-sources"][aws_source][date][ 379 file_name 380 ]["date"] = file["LastModified"].strftime( 381 "%Y-%m-%d %H:%M:%S" 382 ) 383 if self.test_run: 384 file_downloaded_test = True 385 print(f"{aws_source} : {file_name}") 386 break # escape for file in files 387 if self.test_run and file_downloaded_test: 388 break # escape for folder_hour in folders_hour 389 if self.test_run and file_downloaded_test: 390 break # escape for folder_day in folders_day 391 392 self.log_download["n_cameras"] = n_cameras 393 self.log_download["n_aws_sources"] = n_aws_sources 394 self.log_download["n_files_to_process"] = n_files_to_download 395 self.log_download["selection_size_tb"] = download_size_bytes / (1024**4) 396 397 return self.file_names, n_files_to_download 398 399 def _mcity_download_data(self, cameras_dict, n_files_to_download, passed_checks): 400 """Downloads data from AWS S3 buckets based on camera dictionary, returns bool indicating success.""" 401 mb_per_s_list = [] 402 403 if passed_checks: 404 download_successful = True 405 406 # if self.delete_old_data: 407 # try: 408 # shutil.rmtree(self.data_target) 409 # except: 410 # pass 411 # os.makedirs(self.data_target) 412 413 step = 0 414 download_started = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") 415 self.log_download["download_started"] = download_started 416 with tqdm(desc="Downloading data", total=n_files_to_download) as pbar: 417 for camera in cameras_dict: 418 for aws_source in cameras_dict[camera]["aws-sources"]: 419 bucket = aws_source.split("/", 1)[0] 420 for date in cameras_dict[camera]["aws-sources"][aws_source]: 421 for file in cameras_dict[camera]["aws-sources"][aws_source][ 422 date 423 ]: 424 time_start = time.time() 425 426 # AWS S3 Download 427 file_name = os.path.basename(file) 428 key = cameras_dict[camera]["aws-sources"][aws_source][ 429 date 430 ][file_name]["key"] 431 target = os.path.join(self.data_target, file_name) 432 self.s3.download_file(bucket, key, target) 433 434 download_ended = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") 435 self.log_download["download_ended"] = download_ended 436 437 else: 438 download_successful = False 439 print("Safety checks failed. Not downloading data") 440 441 return download_successful 442 443 def _process_aws_result(self, result): 444 """Extracts folder prefixes from AWS S3 list operation result; returns folder list or None if no prefixes found.""" 445 # Get list of folders from AWS response 446 if "CommonPrefixes" in result: 447 folders = [prefix["Prefix"] for prefix in result["CommonPrefixes"]] 448 return folders 449 else: 450 return None
WANDB_ACTIVE =
False
class
AwsDownloader:
21class AwsDownloader: 22 """AWS data downloader for handling, downloading, and processing camera data from AWS S3 buckets.""" 23 24 def __init__( 25 self, 26 start_date: datetime.datetime, 27 end_date: datetime.datetime, 28 sample_rate_hz: float, 29 log_time: datetime.datetime, 30 source: str = "mcity_gridsmart", 31 storage_target_root: str = ".", 32 subfolder_data: str = "data", 33 subfolder_logs: str = "logs", 34 test_run: bool = False, 35 delete_old_data: bool = False, 36 ): 37 """Initialize the class with parameters for date range, logging configurations, and storage settings.""" 38 self.start_date = start_date 39 self.end_date = end_date 40 self.sample_rate_hz = sample_rate_hz 41 self.source = source 42 self.storage_target_root = storage_target_root 43 self.test_run = test_run 44 self.delete_old_data = delete_old_data 45 self.log_time = log_time 46 self.file_names = [] 47 48 self.log_download = {} 49 self.log_sampling = {} 50 51 # Fill log 52 self.log_download["source"] = source 53 self.log_download["sample_rate_hz"] = sample_rate_hz 54 self.log_download["storage_target_root"] = storage_target_root 55 self.log_download["selection_start_date"] = start_date.strftime("%Y-%m-%d") 56 self.log_download["selection_end_date"] = end_date.strftime("%Y-%m-%d") 57 self.log_download["delete_old_data"] = delete_old_data 58 self.log_download["test_run"] = test_run 59 60 # Run name 61 formatted_start = self.start_date.strftime("%Y-%m-%d") 62 formatted_end = self.end_date.strftime("%Y-%m-%d") 63 self.run_name = f"data_engine_rolling_{formatted_start}_to_{formatted_end}" 64 65 # Setup storage folders 66 self.data_target = os.path.join( 67 storage_target_root, subfolder_data, self.run_name 68 ) 69 os.makedirs(self.data_target, exist_ok=True) 70 71 self.log_target = os.path.join( 72 storage_target_root, subfolder_logs, self.run_name 73 ) 74 os.makedirs(self.log_target, exist_ok=True) 75 76 # Connect to AWS S3 77 self.s3 = boto3.client( 78 "s3", 79 aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID", None), 80 aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY", None), 81 region_name="us-east-1", 82 ) 83 84 def process_data(self): 85 """Downloads files from AWS S3, samples them at 1Hz, and generates download and sampling logs.""" 86 cameras_dict = self._mcity_init_cameras() 87 self._mcity_process_aws_buckets(cameras_dict) 88 self.file_names, n_files_to_download = self._mcity_select_data(cameras_dict) 89 90 # Tracking 91 if WANDB_ACTIVE == True: 92 wandb.init( 93 name=self.run_name, 94 job_type="download", 95 project="Pre-Processing in AWS", 96 ) 97 98 targets = [] 99 step = 0 100 with tqdm(desc="Processing data", total=n_files_to_download) as pbar: 101 for camera in cameras_dict: 102 for aws_source in cameras_dict[camera]["aws-sources"]: 103 bucket = aws_source.split("/", 1)[0] 104 for date in cameras_dict[camera]["aws-sources"][aws_source]: 105 for file in cameras_dict[camera]["aws-sources"][aws_source][ 106 date 107 ]: 108 try: 109 log_run = {} 110 111 # AWS S3 Download 112 time_start = time.time() 113 file_name = os.path.basename(file) 114 key = cameras_dict[camera]["aws-sources"][aws_source][ 115 date 116 ][file_name]["key"] 117 target = os.path.join(self.data_target, file_name) 118 targets.append(target) 119 self.s3.download_file(bucket, key, target) 120 121 # Logging 122 file_size_mb = cameras_dict[camera]["aws-sources"][ 123 aws_source 124 ][date][file_name]["size"] / (1024**2) 125 time_end = time.time() 126 duration = time_end - time_start 127 mb_per_s = file_size_mb / duration 128 if WANDB_ACTIVE: 129 wandb.log({"download/mb_per_s": mb_per_s}, step) 130 wandb.log({"download/s": duration}, step) 131 132 # Sample data 133 time_start = time.time() 134 sampler = SampleTimestamps( 135 file_path=target, target_framerate_hz=1 136 ) 137 timestamps = sampler.get_timestamps() 138 139 # We need at least 2 timestamps to calculate a framerate 140 if len(timestamps) >= 2: 141 # Get framerate 142 framerate_hz, timestamps, upper_bound_threshold = ( 143 sampler.get_framerate(timestamps, log_run) 144 ) 145 valid_target_framerate = ( 146 sampler.check_target_framerate( 147 framerate_hz, log_run 148 ) 149 ) 150 # We need a target framerate lower than the oririinal framerate 151 if valid_target_framerate: 152 # Sample data 153 ( 154 selected_indices, 155 selected_timestamps, 156 target_timestamps, 157 selected_target_timestamps, 158 ) = sampler.sample_timestamps( 159 timestamps, upper_bound_threshold, log_run 160 ) 161 162 time_end = time.time() 163 duration = time_end - time_start 164 timestamps_per_s = len(timestamps) / duration 165 if WANDB_ACTIVE: 166 wandb.log( 167 { 168 "sampling/timestamps_per_s": timestamps_per_s 169 }, 170 step, 171 ) 172 wandb.log({"sampling/s": duration}, step) 173 174 # Upload data 175 time_start = time.time() 176 print(camera) 177 file_size_mb = sampler.update_upload_file( 178 target, selected_indices, camera 179 ) 180 181 time_end = time.time() 182 duration = time_end - time_start 183 mb_per_s = file_size_mb / duration 184 if WANDB_ACTIVE: 185 wandb.log( 186 {"upload/mb_per_s": mb_per_s}, step 187 ) 188 wandb.log({"upload/s": duration}, step) 189 190 # Update log 191 self.log_sampling[file] = log_run 192 193 else: 194 print( 195 f"Not enough timestamps to calculate framerate. Skipping {file}" 196 ) 197 198 # Delete local data 199 os.remove(target) 200 os.remove(target + "_sampled_1Hz") 201 202 # Update progress bar 203 step += 1 204 pbar.update(1) 205 206 except Exception as e: 207 print(f"Error in mcity_gridsmart_loader: {e}") 208 print(traceback.format_exc()) 209 210 # Finish tracking 211 try: 212 wandb.finish() 213 except: 214 pass 215 pbar.close() 216 217 # Store download log 218 name_log_download = "FileDownload" 219 self.log_download["data"] = cameras_dict 220 log_name = (self.log_time + "_" + name_log_download).replace(" ", "_").replace( 221 ":", "_" 222 ) + ".json" 223 log_file_path = os.path.join(self.log_target, log_name) 224 with open(log_file_path, "w") as json_file: 225 json.dump(self.log_download, json_file, indent=4) 226 227 # Store sampling log 228 name_log_sampling = "FileSampling" 229 log_name = (self.log_time + "_" + name_log_sampling).replace(" ", "_").replace( 230 ":", "_" 231 ) + ".json" 232 log_file_path = os.path.join(self.log_target, log_name) 233 with open(log_file_path, "w") as json_file: 234 json.dump(self.log_sampling, json_file, indent=4) 235 236 def _mcity_init_cameras( 237 self, 238 cameras={ 239 "Geddes_Huron_1", 240 "Geddes_Huron_2", 241 "Huron_Plymouth_1", 242 "Huron_Plymouth_2", 243 "Main_stadium_1", 244 "Main_stadium_2", 245 "Plymouth_Beal", 246 "Plymouth_Bishop", 247 "Plymouth_EPA", 248 "Plymouth_Georgetown", 249 "State_Ellsworth_NE", 250 "State_Ellsworth_NW", 251 "State_Ellsworth_SE", 252 "State_Ellsworth_SW", 253 "Fuller_Fuller_CT", 254 "Fuller_Glazier_1", 255 "Fuller_Glazier_2", 256 "Fuller_Glen", 257 "Dexter_Maple_1", 258 "Dexter_Maple_2", 259 "Hubbard_Huron_1", 260 "Hubbard_Huron_2", 261 "Maple_Miller_1", 262 "Maple_Miller_2", 263 }, 264 ): 265 """Initialize a dictionary of Mcity traffic cameras with IDs and empty AWS source mappings.""" 266 cameras_dict = {camera.lower(): {} for camera in cameras} 267 for id, camera in enumerate(cameras_dict): 268 cameras_dict[camera]["id"] = id 269 cameras_dict[camera]["aws-sources"] = {} 270 271 print(f"Processed {len(cameras_dict)} cameras") 272 return cameras_dict 273 274 def _mcity_process_aws_buckets( 275 self, 276 cameras_dict, 277 aws_sources={ 278 "sip-sensor-data": [""], 279 "sip-sensor-data2": ["wheeler1/", "wheeler2/"], 280 }, 281 ): 282 """Processes AWS buckets to map camera names to their corresponding AWS sources.""" 283 for bucket in tqdm(aws_sources, desc="Processing AWS sources"): 284 for folder in aws_sources[bucket]: 285 # Get and pre-process AWS data 286 result = self.s3.list_objects_v2( 287 Bucket=bucket, Prefix=folder, Delimiter="/" 288 ) 289 folders = self._process_aws_result(result) 290 291 # Align folder names with camera names 292 folders_aligned = [ 293 re.sub(r"(?<!_)(\d)", r"_\1", folder.lower().rstrip("/")) 294 for folder in folders 295 ] # Align varying AWS folder names with camera names 296 folders_aligned = [ 297 folder.replace("fullerct", "fuller_ct") 298 for folder in folders_aligned 299 ] # Replace "fullerct" with "fuller_ct" to align with camera names 300 folders_aligned = [ 301 folder.replace("fullser", "fuller") for folder in folders_aligned 302 ] # Fix "fullser" typo in AWS 303 304 # Check cameras for AWS sources 305 if folders: 306 for camera_name in cameras_dict: 307 for folder_name, folder_name_aligned in zip( 308 folders, folders_aligned 309 ): 310 if ( 311 camera_name in folder_name_aligned 312 and "gs_" in folder_name_aligned 313 ): # gs_ is the prefix used in AWS 314 aws_source = f"{bucket}/{folder_name}" 315 if ( 316 aws_source 317 not in cameras_dict[camera_name]["aws-sources"] 318 ): 319 cameras_dict[camera_name]["aws-sources"][ 320 aws_source 321 ] = {} 322 else: 323 print(f"AWS did not return a list of folders for {bucket}/{folder}") 324 325 def _mcity_select_data(self, cameras_dict): 326 """Processes S3 camera data within specified date range, organizing files by camera, source, and date while tracking metrics.""" 327 n_cameras = 0 328 n_aws_sources = 0 329 n_files_to_download = 0 330 download_size_bytes = 0 331 for camera in tqdm(cameras_dict, desc="Looking for data entries in range"): 332 n_cameras += 1 333 for aws_source in cameras_dict[camera]["aws-sources"]: 334 file_downloaded_test = False 335 n_aws_sources += 1 336 bucket = aws_source.split("/")[0] 337 prefix_camera = "/".join(aws_source.split("/")[1:]) 338 result = self.s3.list_objects_v2( 339 Bucket=bucket, Prefix=prefix_camera, Delimiter="/" 340 ) 341 # Each folder represents a day 342 folders_day = self._process_aws_result(result) 343 for folder_day in folders_day: 344 date = folder_day.split("/")[-2] 345 if self.test_run: 346 # Choose a sample irrespective of the data range to get data from all AWS sources 347 in_range = True 348 else: 349 # Only collect data within the date range 350 timestamp = datetime.datetime.strptime(date, "%Y-%m-%d") 351 in_range = self.start_date <= timestamp <= self.end_date 352 353 if in_range: 354 cameras_dict[camera]["aws-sources"][aws_source][date] = {} 355 result = self.s3.list_objects_v2( 356 Bucket=bucket, Prefix=folder_day, Delimiter="/" 357 ) 358 # Each folder represents an hour 359 folders_hour = self._process_aws_result(result) 360 for folder_hour in folders_hour: 361 result = self.s3.list_objects_v2( 362 Bucket=bucket, Prefix=folder_hour, Delimiter="/" 363 ) 364 files = result["Contents"] 365 for file in files: 366 n_files_to_download += 1 367 download_size_bytes += file["Size"] 368 file_name = os.path.basename(file["Key"]) 369 cameras_dict[camera]["aws-sources"][aws_source][date][ 370 file_name 371 ] = {} 372 cameras_dict[camera]["aws-sources"][aws_source][date][ 373 file_name 374 ]["key"] = file["Key"] 375 self.file_names.append(file["Key"]) 376 cameras_dict[camera]["aws-sources"][aws_source][date][ 377 file_name 378 ]["size"] = file["Size"] 379 cameras_dict[camera]["aws-sources"][aws_source][date][ 380 file_name 381 ]["date"] = file["LastModified"].strftime( 382 "%Y-%m-%d %H:%M:%S" 383 ) 384 if self.test_run: 385 file_downloaded_test = True 386 print(f"{aws_source} : {file_name}") 387 break # escape for file in files 388 if self.test_run and file_downloaded_test: 389 break # escape for folder_hour in folders_hour 390 if self.test_run and file_downloaded_test: 391 break # escape for folder_day in folders_day 392 393 self.log_download["n_cameras"] = n_cameras 394 self.log_download["n_aws_sources"] = n_aws_sources 395 self.log_download["n_files_to_process"] = n_files_to_download 396 self.log_download["selection_size_tb"] = download_size_bytes / (1024**4) 397 398 return self.file_names, n_files_to_download 399 400 def _mcity_download_data(self, cameras_dict, n_files_to_download, passed_checks): 401 """Downloads data from AWS S3 buckets based on camera dictionary, returns bool indicating success.""" 402 mb_per_s_list = [] 403 404 if passed_checks: 405 download_successful = True 406 407 # if self.delete_old_data: 408 # try: 409 # shutil.rmtree(self.data_target) 410 # except: 411 # pass 412 # os.makedirs(self.data_target) 413 414 step = 0 415 download_started = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") 416 self.log_download["download_started"] = download_started 417 with tqdm(desc="Downloading data", total=n_files_to_download) as pbar: 418 for camera in cameras_dict: 419 for aws_source in cameras_dict[camera]["aws-sources"]: 420 bucket = aws_source.split("/", 1)[0] 421 for date in cameras_dict[camera]["aws-sources"][aws_source]: 422 for file in cameras_dict[camera]["aws-sources"][aws_source][ 423 date 424 ]: 425 time_start = time.time() 426 427 # AWS S3 Download 428 file_name = os.path.basename(file) 429 key = cameras_dict[camera]["aws-sources"][aws_source][ 430 date 431 ][file_name]["key"] 432 target = os.path.join(self.data_target, file_name) 433 self.s3.download_file(bucket, key, target) 434 435 download_ended = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") 436 self.log_download["download_ended"] = download_ended 437 438 else: 439 download_successful = False 440 print("Safety checks failed. Not downloading data") 441 442 return download_successful 443 444 def _process_aws_result(self, result): 445 """Extracts folder prefixes from AWS S3 list operation result; returns folder list or None if no prefixes found.""" 446 # Get list of folders from AWS response 447 if "CommonPrefixes" in result: 448 folders = [prefix["Prefix"] for prefix in result["CommonPrefixes"]] 449 return folders 450 else: 451 return None
AWS data downloader for handling, downloading, and processing camera data from AWS S3 buckets.
AwsDownloader( start_date: datetime.datetime, end_date: datetime.datetime, sample_rate_hz: float, log_time: datetime.datetime, source: str = 'mcity_gridsmart', storage_target_root: str = '.', subfolder_data: str = 'data', subfolder_logs: str = 'logs', test_run: bool = False, delete_old_data: bool = False)
24 def __init__( 25 self, 26 start_date: datetime.datetime, 27 end_date: datetime.datetime, 28 sample_rate_hz: float, 29 log_time: datetime.datetime, 30 source: str = "mcity_gridsmart", 31 storage_target_root: str = ".", 32 subfolder_data: str = "data", 33 subfolder_logs: str = "logs", 34 test_run: bool = False, 35 delete_old_data: bool = False, 36 ): 37 """Initialize the class with parameters for date range, logging configurations, and storage settings.""" 38 self.start_date = start_date 39 self.end_date = end_date 40 self.sample_rate_hz = sample_rate_hz 41 self.source = source 42 self.storage_target_root = storage_target_root 43 self.test_run = test_run 44 self.delete_old_data = delete_old_data 45 self.log_time = log_time 46 self.file_names = [] 47 48 self.log_download = {} 49 self.log_sampling = {} 50 51 # Fill log 52 self.log_download["source"] = source 53 self.log_download["sample_rate_hz"] = sample_rate_hz 54 self.log_download["storage_target_root"] = storage_target_root 55 self.log_download["selection_start_date"] = start_date.strftime("%Y-%m-%d") 56 self.log_download["selection_end_date"] = end_date.strftime("%Y-%m-%d") 57 self.log_download["delete_old_data"] = delete_old_data 58 self.log_download["test_run"] = test_run 59 60 # Run name 61 formatted_start = self.start_date.strftime("%Y-%m-%d") 62 formatted_end = self.end_date.strftime("%Y-%m-%d") 63 self.run_name = f"data_engine_rolling_{formatted_start}_to_{formatted_end}" 64 65 # Setup storage folders 66 self.data_target = os.path.join( 67 storage_target_root, subfolder_data, self.run_name 68 ) 69 os.makedirs(self.data_target, exist_ok=True) 70 71 self.log_target = os.path.join( 72 storage_target_root, subfolder_logs, self.run_name 73 ) 74 os.makedirs(self.log_target, exist_ok=True) 75 76 # Connect to AWS S3 77 self.s3 = boto3.client( 78 "s3", 79 aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID", None), 80 aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY", None), 81 region_name="us-east-1", 82 )
Initialize the class with parameters for date range, logging configurations, and storage settings.
def
process_data(self):
84 def process_data(self): 85 """Downloads files from AWS S3, samples them at 1Hz, and generates download and sampling logs.""" 86 cameras_dict = self._mcity_init_cameras() 87 self._mcity_process_aws_buckets(cameras_dict) 88 self.file_names, n_files_to_download = self._mcity_select_data(cameras_dict) 89 90 # Tracking 91 if WANDB_ACTIVE == True: 92 wandb.init( 93 name=self.run_name, 94 job_type="download", 95 project="Pre-Processing in AWS", 96 ) 97 98 targets = [] 99 step = 0 100 with tqdm(desc="Processing data", total=n_files_to_download) as pbar: 101 for camera in cameras_dict: 102 for aws_source in cameras_dict[camera]["aws-sources"]: 103 bucket = aws_source.split("/", 1)[0] 104 for date in cameras_dict[camera]["aws-sources"][aws_source]: 105 for file in cameras_dict[camera]["aws-sources"][aws_source][ 106 date 107 ]: 108 try: 109 log_run = {} 110 111 # AWS S3 Download 112 time_start = time.time() 113 file_name = os.path.basename(file) 114 key = cameras_dict[camera]["aws-sources"][aws_source][ 115 date 116 ][file_name]["key"] 117 target = os.path.join(self.data_target, file_name) 118 targets.append(target) 119 self.s3.download_file(bucket, key, target) 120 121 # Logging 122 file_size_mb = cameras_dict[camera]["aws-sources"][ 123 aws_source 124 ][date][file_name]["size"] / (1024**2) 125 time_end = time.time() 126 duration = time_end - time_start 127 mb_per_s = file_size_mb / duration 128 if WANDB_ACTIVE: 129 wandb.log({"download/mb_per_s": mb_per_s}, step) 130 wandb.log({"download/s": duration}, step) 131 132 # Sample data 133 time_start = time.time() 134 sampler = SampleTimestamps( 135 file_path=target, target_framerate_hz=1 136 ) 137 timestamps = sampler.get_timestamps() 138 139 # We need at least 2 timestamps to calculate a framerate 140 if len(timestamps) >= 2: 141 # Get framerate 142 framerate_hz, timestamps, upper_bound_threshold = ( 143 sampler.get_framerate(timestamps, log_run) 144 ) 145 valid_target_framerate = ( 146 sampler.check_target_framerate( 147 framerate_hz, log_run 148 ) 149 ) 150 # We need a target framerate lower than the oririinal framerate 151 if valid_target_framerate: 152 # Sample data 153 ( 154 selected_indices, 155 selected_timestamps, 156 target_timestamps, 157 selected_target_timestamps, 158 ) = sampler.sample_timestamps( 159 timestamps, upper_bound_threshold, log_run 160 ) 161 162 time_end = time.time() 163 duration = time_end - time_start 164 timestamps_per_s = len(timestamps) / duration 165 if WANDB_ACTIVE: 166 wandb.log( 167 { 168 "sampling/timestamps_per_s": timestamps_per_s 169 }, 170 step, 171 ) 172 wandb.log({"sampling/s": duration}, step) 173 174 # Upload data 175 time_start = time.time() 176 print(camera) 177 file_size_mb = sampler.update_upload_file( 178 target, selected_indices, camera 179 ) 180 181 time_end = time.time() 182 duration = time_end - time_start 183 mb_per_s = file_size_mb / duration 184 if WANDB_ACTIVE: 185 wandb.log( 186 {"upload/mb_per_s": mb_per_s}, step 187 ) 188 wandb.log({"upload/s": duration}, step) 189 190 # Update log 191 self.log_sampling[file] = log_run 192 193 else: 194 print( 195 f"Not enough timestamps to calculate framerate. Skipping {file}" 196 ) 197 198 # Delete local data 199 os.remove(target) 200 os.remove(target + "_sampled_1Hz") 201 202 # Update progress bar 203 step += 1 204 pbar.update(1) 205 206 except Exception as e: 207 print(f"Error in mcity_gridsmart_loader: {e}") 208 print(traceback.format_exc()) 209 210 # Finish tracking 211 try: 212 wandb.finish() 213 except: 214 pass 215 pbar.close() 216 217 # Store download log 218 name_log_download = "FileDownload" 219 self.log_download["data"] = cameras_dict 220 log_name = (self.log_time + "_" + name_log_download).replace(" ", "_").replace( 221 ":", "_" 222 ) + ".json" 223 log_file_path = os.path.join(self.log_target, log_name) 224 with open(log_file_path, "w") as json_file: 225 json.dump(self.log_download, json_file, indent=4) 226 227 # Store sampling log 228 name_log_sampling = "FileSampling" 229 log_name = (self.log_time + "_" + name_log_sampling).replace(" ", "_").replace( 230 ":", "_" 231 ) + ".json" 232 log_file_path = os.path.join(self.log_target, log_name) 233 with open(log_file_path, "w") as json_file: 234 json.dump(self.log_sampling, json_file, indent=4)
Downloads files from AWS S3, samples them at 1Hz, and generates download and sampling logs.