s3_file_list

  1import datetime
  2import json
  3import os
  4import re
  5import time
  6import traceback
  7
  8import boto3
  9import wandb
 10from aws_stream_filter_framerate import SampleTimestamps
 11from dotenv import load_dotenv
 12from tqdm import tqdm
 13
 14WANDB_ACTIVE=False
 15
 16
 17load_dotenv()
 18
 19
 20class AwsDownloader:
 21    """AWS data downloader for handling, downloading, and processing camera data from AWS S3 buckets."""
 22
 23    def __init__(
 24        self,
 25        start_date: datetime.datetime,
 26        end_date: datetime.datetime,
 27        sample_rate_hz: float,
 28        log_time: datetime.datetime,
 29        source: str = "mcity_gridsmart",
 30        storage_target_root: str = ".",
 31        subfolder_data: str = "data",
 32        subfolder_logs: str = "logs",
 33        test_run: bool = False,
 34        delete_old_data: bool = False,
 35    ):
 36        """Initialize the class with parameters for date range, logging configurations, and storage settings."""
 37        self.start_date = start_date
 38        self.end_date = end_date
 39        self.sample_rate_hz = sample_rate_hz
 40        self.source = source
 41        self.storage_target_root = storage_target_root
 42        self.test_run = test_run
 43        self.delete_old_data = delete_old_data
 44        self.log_time = log_time
 45        self.file_names = []
 46
 47        self.log_download = {}
 48        self.log_sampling = {}
 49
 50        # Fill log
 51        self.log_download["source"] = source
 52        self.log_download["sample_rate_hz"] = sample_rate_hz
 53        self.log_download["storage_target_root"] = storage_target_root
 54        self.log_download["selection_start_date"] = start_date.strftime("%Y-%m-%d")
 55        self.log_download["selection_end_date"] = end_date.strftime("%Y-%m-%d")
 56        self.log_download["delete_old_data"] = delete_old_data
 57        self.log_download["test_run"] = test_run
 58
 59        # Run name
 60        formatted_start = self.start_date.strftime("%Y-%m-%d")
 61        formatted_end = self.end_date.strftime("%Y-%m-%d")
 62        self.run_name = f"data_engine_rolling_{formatted_start}_to_{formatted_end}"
 63
 64        # Setup storage folders
 65        self.data_target = os.path.join(
 66            storage_target_root, subfolder_data, self.run_name
 67        )
 68        os.makedirs(self.data_target, exist_ok=True)
 69
 70        self.log_target = os.path.join(
 71            storage_target_root, subfolder_logs, self.run_name
 72        )
 73        os.makedirs(self.log_target, exist_ok=True)
 74
 75        # Connect to AWS S3
 76        self.s3 = boto3.client(
 77            "s3",
 78            aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID", None),
 79            aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY", None),
 80            region_name="us-east-1",
 81        )
 82
 83    def process_data(self):
 84        """Downloads files from AWS S3, samples them at 1Hz, and generates download and sampling logs."""
 85        cameras_dict = self._mcity_init_cameras()
 86        self._mcity_process_aws_buckets(cameras_dict)
 87        self.file_names, n_files_to_download = self._mcity_select_data(cameras_dict)
 88
 89        # Tracking
 90        if WANDB_ACTIVE == True:
 91            wandb.init(
 92                name=self.run_name,
 93                job_type="download",
 94                project="Pre-Processing in AWS",
 95            )
 96
 97        targets = []
 98        step = 0
 99        with tqdm(desc="Processing data", total=n_files_to_download) as pbar:
100            for camera in cameras_dict:
101                for aws_source in cameras_dict[camera]["aws-sources"]:
102                    bucket = aws_source.split("/", 1)[0]
103                    for date in cameras_dict[camera]["aws-sources"][aws_source]:
104                        for file in cameras_dict[camera]["aws-sources"][aws_source][
105                            date
106                        ]:
107                            try:
108                                log_run = {}
109
110                                # AWS S3 Download
111                                time_start = time.time()
112                                file_name = os.path.basename(file)
113                                key = cameras_dict[camera]["aws-sources"][aws_source][
114                                    date
115                                ][file_name]["key"]
116                                target = os.path.join(self.data_target, file_name)
117                                targets.append(target)
118                                self.s3.download_file(bucket, key, target)
119
120                                # Logging
121                                file_size_mb = cameras_dict[camera]["aws-sources"][
122                                    aws_source
123                                ][date][file_name]["size"] / (1024**2)
124                                time_end = time.time()
125                                duration = time_end - time_start
126                                mb_per_s = file_size_mb / duration
127                                if WANDB_ACTIVE:
128                                    wandb.log({"download/mb_per_s": mb_per_s}, step)
129                                    wandb.log({"download/s": duration}, step)
130
131                                # Sample data
132                                time_start = time.time()
133                                sampler = SampleTimestamps(
134                                    file_path=target, target_framerate_hz=1
135                                )
136                                timestamps = sampler.get_timestamps()
137
138                                # We need at least 2 timestamps to calculate a framerate
139                                if len(timestamps) >= 2:
140                                    # Get framerate
141                                    framerate_hz, timestamps, upper_bound_threshold = (
142                                        sampler.get_framerate(timestamps, log_run)
143                                    )
144                                    valid_target_framerate = (
145                                        sampler.check_target_framerate(
146                                            framerate_hz, log_run
147                                        )
148                                    )
149                                    # We need a target framerate lower than the oririinal framerate
150                                    if valid_target_framerate:
151                                        # Sample data
152                                        (
153                                            selected_indices,
154                                            selected_timestamps,
155                                            target_timestamps,
156                                            selected_target_timestamps,
157                                        ) = sampler.sample_timestamps(
158                                            timestamps, upper_bound_threshold, log_run
159                                        )
160
161                                        time_end = time.time()
162                                        duration = time_end - time_start
163                                        timestamps_per_s = len(timestamps) / duration
164                                        if WANDB_ACTIVE:
165                                            wandb.log(
166                                                {
167                                                    "sampling/timestamps_per_s": timestamps_per_s
168                                                },
169                                                step,
170                                            )
171                                            wandb.log({"sampling/s": duration}, step)
172
173                                        # Upload data
174                                        time_start = time.time()
175                                        print(camera)
176                                        file_size_mb = sampler.update_upload_file(
177                                            target, selected_indices, camera
178                                        )
179
180                                        time_end = time.time()
181                                        duration = time_end - time_start
182                                        mb_per_s = file_size_mb / duration
183                                        if WANDB_ACTIVE:
184                                            wandb.log(
185                                                {"upload/mb_per_s": mb_per_s}, step
186                                            )
187                                            wandb.log({"upload/s": duration}, step)
188
189                                    # Update log
190                                    self.log_sampling[file] = log_run
191
192                                else:
193                                    print(
194                                        f"Not enough timestamps to calculate framerate. Skipping {file}"
195                                    )
196
197                                # Delete local data
198                                os.remove(target)
199                                os.remove(target + "_sampled_1Hz")
200
201                                # Update progress bar
202                                step += 1
203                                pbar.update(1)
204
205                            except Exception as e:
206                                print(f"Error in mcity_gridsmart_loader: {e}")
207                                print(traceback.format_exc())
208
209        # Finish tracking
210        try:
211            wandb.finish()
212        except:
213            pass
214        pbar.close()
215
216        # Store download log
217        name_log_download = "FileDownload"
218        self.log_download["data"] = cameras_dict
219        log_name = (self.log_time + "_" + name_log_download).replace(" ", "_").replace(
220            ":", "_"
221        ) + ".json"
222        log_file_path = os.path.join(self.log_target, log_name)
223        with open(log_file_path, "w") as json_file:
224            json.dump(self.log_download, json_file, indent=4)
225
226        # Store sampling log
227        name_log_sampling = "FileSampling"
228        log_name = (self.log_time + "_" + name_log_sampling).replace(" ", "_").replace(
229            ":", "_"
230        ) + ".json"
231        log_file_path = os.path.join(self.log_target, log_name)
232        with open(log_file_path, "w") as json_file:
233            json.dump(self.log_sampling, json_file, indent=4)
234
235    def _mcity_init_cameras(
236        self,
237        cameras={
238            "Geddes_Huron_1",
239            "Geddes_Huron_2",
240            "Huron_Plymouth_1",
241            "Huron_Plymouth_2",
242            "Main_stadium_1",
243            "Main_stadium_2",
244            "Plymouth_Beal",
245            "Plymouth_Bishop",
246            "Plymouth_EPA",
247            "Plymouth_Georgetown",
248            "State_Ellsworth_NE",
249            "State_Ellsworth_NW",
250            "State_Ellsworth_SE",
251            "State_Ellsworth_SW",
252            "Fuller_Fuller_CT",
253            "Fuller_Glazier_1",
254            "Fuller_Glazier_2",
255            "Fuller_Glen",
256            "Dexter_Maple_1",
257            "Dexter_Maple_2",
258            "Hubbard_Huron_1",
259            "Hubbard_Huron_2",
260            "Maple_Miller_1",
261            "Maple_Miller_2",
262        },
263    ):
264        """Initialize a dictionary of Mcity traffic cameras with IDs and empty AWS source mappings."""
265        cameras_dict = {camera.lower(): {} for camera in cameras}
266        for id, camera in enumerate(cameras_dict):
267            cameras_dict[camera]["id"] = id
268            cameras_dict[camera]["aws-sources"] = {}
269
270        print(f"Processed {len(cameras_dict)} cameras")
271        return cameras_dict
272
273    def _mcity_process_aws_buckets(
274        self,
275        cameras_dict,
276        aws_sources={
277            "sip-sensor-data": [""],
278            "sip-sensor-data2": ["wheeler1/", "wheeler2/"],
279        },
280    ):
281        """Processes AWS buckets to map camera names to their corresponding AWS sources."""
282        for bucket in tqdm(aws_sources, desc="Processing AWS sources"):
283            for folder in aws_sources[bucket]:
284                # Get and pre-process AWS data
285                result = self.s3.list_objects_v2(
286                    Bucket=bucket, Prefix=folder, Delimiter="/"
287                )
288                folders = self._process_aws_result(result)
289
290                # Align folder names with camera names
291                folders_aligned = [
292                    re.sub(r"(?<!_)(\d)", r"_\1", folder.lower().rstrip("/"))
293                    for folder in folders
294                ]  # Align varying AWS folder names with camera names
295                folders_aligned = [
296                    folder.replace("fullerct", "fuller_ct")
297                    for folder in folders_aligned
298                ]  # Replace "fullerct" with "fuller_ct" to align with camera names
299                folders_aligned = [
300                    folder.replace("fullser", "fuller") for folder in folders_aligned
301                ]  # Fix "fullser" typo in AWS
302
303                # Check cameras for AWS sources
304                if folders:
305                    for camera_name in cameras_dict:
306                        for folder_name, folder_name_aligned in zip(
307                            folders, folders_aligned
308                        ):
309                            if (
310                                camera_name in folder_name_aligned
311                                and "gs_" in folder_name_aligned
312                            ):  # gs_ is the prefix used in AWS
313                                aws_source = f"{bucket}/{folder_name}"
314                                if (
315                                    aws_source
316                                    not in cameras_dict[camera_name]["aws-sources"]
317                                ):
318                                    cameras_dict[camera_name]["aws-sources"][
319                                        aws_source
320                                    ] = {}
321                else:
322                    print(f"AWS did not return a list of folders for {bucket}/{folder}")
323
324    def _mcity_select_data(self, cameras_dict):
325        """Processes S3 camera data within specified date range, organizing files by camera, source, and date while tracking metrics."""
326        n_cameras = 0
327        n_aws_sources = 0
328        n_files_to_download = 0
329        download_size_bytes = 0
330        for camera in tqdm(cameras_dict, desc="Looking for data entries in range"):
331            n_cameras += 1
332            for aws_source in cameras_dict[camera]["aws-sources"]:
333                file_downloaded_test = False
334                n_aws_sources += 1
335                bucket = aws_source.split("/")[0]
336                prefix_camera = "/".join(aws_source.split("/")[1:])
337                result = self.s3.list_objects_v2(
338                    Bucket=bucket, Prefix=prefix_camera, Delimiter="/"
339                )
340                # Each folder represents a day
341                folders_day = self._process_aws_result(result)
342                for folder_day in folders_day:
343                    date = folder_day.split("/")[-2]
344                    if self.test_run:
345                        # Choose a sample irrespective of the data range to get data from all AWS sources
346                        in_range = True
347                    else:
348                        # Only collect data within the date range
349                        timestamp = datetime.datetime.strptime(date, "%Y-%m-%d")
350                        in_range = self.start_date <= timestamp <= self.end_date
351
352                    if in_range:
353                        cameras_dict[camera]["aws-sources"][aws_source][date] = {}
354                        result = self.s3.list_objects_v2(
355                            Bucket=bucket, Prefix=folder_day, Delimiter="/"
356                        )
357                        # Each folder represents an hour
358                        folders_hour = self._process_aws_result(result)
359                        for folder_hour in folders_hour:
360                            result = self.s3.list_objects_v2(
361                                Bucket=bucket, Prefix=folder_hour, Delimiter="/"
362                            )
363                            files = result["Contents"]
364                            for file in files:
365                                n_files_to_download += 1
366                                download_size_bytes += file["Size"]
367                                file_name = os.path.basename(file["Key"])
368                                cameras_dict[camera]["aws-sources"][aws_source][date][
369                                    file_name
370                                ] = {}
371                                cameras_dict[camera]["aws-sources"][aws_source][date][
372                                    file_name
373                                ]["key"] = file["Key"]
374                                self.file_names.append(file["Key"])
375                                cameras_dict[camera]["aws-sources"][aws_source][date][
376                                    file_name
377                                ]["size"] = file["Size"]
378                                cameras_dict[camera]["aws-sources"][aws_source][date][
379                                    file_name
380                                ]["date"] = file["LastModified"].strftime(
381                                    "%Y-%m-%d %H:%M:%S"
382                                )
383                                if self.test_run:
384                                    file_downloaded_test = True
385                                    print(f"{aws_source} : {file_name}")
386                                    break  # escape for file in files
387                            if self.test_run and file_downloaded_test:
388                                break  # escape for folder_hour in folders_hour
389                        if self.test_run and file_downloaded_test:
390                            break  # escape for folder_day in folders_day
391
392        self.log_download["n_cameras"] = n_cameras
393        self.log_download["n_aws_sources"] = n_aws_sources
394        self.log_download["n_files_to_process"] = n_files_to_download
395        self.log_download["selection_size_tb"] = download_size_bytes / (1024**4)
396
397        return self.file_names, n_files_to_download
398
399    def _mcity_download_data(self, cameras_dict, n_files_to_download, passed_checks):
400        """Downloads data from AWS S3 buckets based on camera dictionary, returns bool indicating success."""
401        mb_per_s_list = []
402
403        if passed_checks:
404            download_successful = True
405
406            # if self.delete_old_data:
407            #     try:
408            #         shutil.rmtree(self.data_target)
409            #     except:
410            #         pass
411            #     os.makedirs(self.data_target)
412
413            step = 0
414            download_started = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
415            self.log_download["download_started"] = download_started
416            with tqdm(desc="Downloading data", total=n_files_to_download) as pbar:
417                for camera in cameras_dict:
418                    for aws_source in cameras_dict[camera]["aws-sources"]:
419                        bucket = aws_source.split("/", 1)[0]
420                        for date in cameras_dict[camera]["aws-sources"][aws_source]:
421                            for file in cameras_dict[camera]["aws-sources"][aws_source][
422                                date
423                            ]:
424                                time_start = time.time()
425
426                                # AWS S3 Download
427                                file_name = os.path.basename(file)
428                                key = cameras_dict[camera]["aws-sources"][aws_source][
429                                    date
430                                ][file_name]["key"]
431                                target = os.path.join(self.data_target, file_name)
432                                self.s3.download_file(bucket, key, target)
433
434            download_ended = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
435            self.log_download["download_ended"] = download_ended
436
437        else:
438            download_successful = False
439            print("Safety checks failed. Not downloading data")
440
441        return download_successful
442
443    def _process_aws_result(self, result):
444        """Extracts folder prefixes from AWS S3 list operation result; returns folder list or None if no prefixes found."""
445        # Get list of folders from AWS response
446        if "CommonPrefixes" in result:
447            folders = [prefix["Prefix"] for prefix in result["CommonPrefixes"]]
448            return folders
449        else:
450            return None
WANDB_ACTIVE = False
class AwsDownloader:
 21class AwsDownloader:
 22    """AWS data downloader for handling, downloading, and processing camera data from AWS S3 buckets."""
 23
 24    def __init__(
 25        self,
 26        start_date: datetime.datetime,
 27        end_date: datetime.datetime,
 28        sample_rate_hz: float,
 29        log_time: datetime.datetime,
 30        source: str = "mcity_gridsmart",
 31        storage_target_root: str = ".",
 32        subfolder_data: str = "data",
 33        subfolder_logs: str = "logs",
 34        test_run: bool = False,
 35        delete_old_data: bool = False,
 36    ):
 37        """Initialize the class with parameters for date range, logging configurations, and storage settings."""
 38        self.start_date = start_date
 39        self.end_date = end_date
 40        self.sample_rate_hz = sample_rate_hz
 41        self.source = source
 42        self.storage_target_root = storage_target_root
 43        self.test_run = test_run
 44        self.delete_old_data = delete_old_data
 45        self.log_time = log_time
 46        self.file_names = []
 47
 48        self.log_download = {}
 49        self.log_sampling = {}
 50
 51        # Fill log
 52        self.log_download["source"] = source
 53        self.log_download["sample_rate_hz"] = sample_rate_hz
 54        self.log_download["storage_target_root"] = storage_target_root
 55        self.log_download["selection_start_date"] = start_date.strftime("%Y-%m-%d")
 56        self.log_download["selection_end_date"] = end_date.strftime("%Y-%m-%d")
 57        self.log_download["delete_old_data"] = delete_old_data
 58        self.log_download["test_run"] = test_run
 59
 60        # Run name
 61        formatted_start = self.start_date.strftime("%Y-%m-%d")
 62        formatted_end = self.end_date.strftime("%Y-%m-%d")
 63        self.run_name = f"data_engine_rolling_{formatted_start}_to_{formatted_end}"
 64
 65        # Setup storage folders
 66        self.data_target = os.path.join(
 67            storage_target_root, subfolder_data, self.run_name
 68        )
 69        os.makedirs(self.data_target, exist_ok=True)
 70
 71        self.log_target = os.path.join(
 72            storage_target_root, subfolder_logs, self.run_name
 73        )
 74        os.makedirs(self.log_target, exist_ok=True)
 75
 76        # Connect to AWS S3
 77        self.s3 = boto3.client(
 78            "s3",
 79            aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID", None),
 80            aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY", None),
 81            region_name="us-east-1",
 82        )
 83
 84    def process_data(self):
 85        """Downloads files from AWS S3, samples them at 1Hz, and generates download and sampling logs."""
 86        cameras_dict = self._mcity_init_cameras()
 87        self._mcity_process_aws_buckets(cameras_dict)
 88        self.file_names, n_files_to_download = self._mcity_select_data(cameras_dict)
 89
 90        # Tracking
 91        if WANDB_ACTIVE == True:
 92            wandb.init(
 93                name=self.run_name,
 94                job_type="download",
 95                project="Pre-Processing in AWS",
 96            )
 97
 98        targets = []
 99        step = 0
100        with tqdm(desc="Processing data", total=n_files_to_download) as pbar:
101            for camera in cameras_dict:
102                for aws_source in cameras_dict[camera]["aws-sources"]:
103                    bucket = aws_source.split("/", 1)[0]
104                    for date in cameras_dict[camera]["aws-sources"][aws_source]:
105                        for file in cameras_dict[camera]["aws-sources"][aws_source][
106                            date
107                        ]:
108                            try:
109                                log_run = {}
110
111                                # AWS S3 Download
112                                time_start = time.time()
113                                file_name = os.path.basename(file)
114                                key = cameras_dict[camera]["aws-sources"][aws_source][
115                                    date
116                                ][file_name]["key"]
117                                target = os.path.join(self.data_target, file_name)
118                                targets.append(target)
119                                self.s3.download_file(bucket, key, target)
120
121                                # Logging
122                                file_size_mb = cameras_dict[camera]["aws-sources"][
123                                    aws_source
124                                ][date][file_name]["size"] / (1024**2)
125                                time_end = time.time()
126                                duration = time_end - time_start
127                                mb_per_s = file_size_mb / duration
128                                if WANDB_ACTIVE:
129                                    wandb.log({"download/mb_per_s": mb_per_s}, step)
130                                    wandb.log({"download/s": duration}, step)
131
132                                # Sample data
133                                time_start = time.time()
134                                sampler = SampleTimestamps(
135                                    file_path=target, target_framerate_hz=1
136                                )
137                                timestamps = sampler.get_timestamps()
138
139                                # We need at least 2 timestamps to calculate a framerate
140                                if len(timestamps) >= 2:
141                                    # Get framerate
142                                    framerate_hz, timestamps, upper_bound_threshold = (
143                                        sampler.get_framerate(timestamps, log_run)
144                                    )
145                                    valid_target_framerate = (
146                                        sampler.check_target_framerate(
147                                            framerate_hz, log_run
148                                        )
149                                    )
150                                    # We need a target framerate lower than the oririinal framerate
151                                    if valid_target_framerate:
152                                        # Sample data
153                                        (
154                                            selected_indices,
155                                            selected_timestamps,
156                                            target_timestamps,
157                                            selected_target_timestamps,
158                                        ) = sampler.sample_timestamps(
159                                            timestamps, upper_bound_threshold, log_run
160                                        )
161
162                                        time_end = time.time()
163                                        duration = time_end - time_start
164                                        timestamps_per_s = len(timestamps) / duration
165                                        if WANDB_ACTIVE:
166                                            wandb.log(
167                                                {
168                                                    "sampling/timestamps_per_s": timestamps_per_s
169                                                },
170                                                step,
171                                            )
172                                            wandb.log({"sampling/s": duration}, step)
173
174                                        # Upload data
175                                        time_start = time.time()
176                                        print(camera)
177                                        file_size_mb = sampler.update_upload_file(
178                                            target, selected_indices, camera
179                                        )
180
181                                        time_end = time.time()
182                                        duration = time_end - time_start
183                                        mb_per_s = file_size_mb / duration
184                                        if WANDB_ACTIVE:
185                                            wandb.log(
186                                                {"upload/mb_per_s": mb_per_s}, step
187                                            )
188                                            wandb.log({"upload/s": duration}, step)
189
190                                    # Update log
191                                    self.log_sampling[file] = log_run
192
193                                else:
194                                    print(
195                                        f"Not enough timestamps to calculate framerate. Skipping {file}"
196                                    )
197
198                                # Delete local data
199                                os.remove(target)
200                                os.remove(target + "_sampled_1Hz")
201
202                                # Update progress bar
203                                step += 1
204                                pbar.update(1)
205
206                            except Exception as e:
207                                print(f"Error in mcity_gridsmart_loader: {e}")
208                                print(traceback.format_exc())
209
210        # Finish tracking
211        try:
212            wandb.finish()
213        except:
214            pass
215        pbar.close()
216
217        # Store download log
218        name_log_download = "FileDownload"
219        self.log_download["data"] = cameras_dict
220        log_name = (self.log_time + "_" + name_log_download).replace(" ", "_").replace(
221            ":", "_"
222        ) + ".json"
223        log_file_path = os.path.join(self.log_target, log_name)
224        with open(log_file_path, "w") as json_file:
225            json.dump(self.log_download, json_file, indent=4)
226
227        # Store sampling log
228        name_log_sampling = "FileSampling"
229        log_name = (self.log_time + "_" + name_log_sampling).replace(" ", "_").replace(
230            ":", "_"
231        ) + ".json"
232        log_file_path = os.path.join(self.log_target, log_name)
233        with open(log_file_path, "w") as json_file:
234            json.dump(self.log_sampling, json_file, indent=4)
235
236    def _mcity_init_cameras(
237        self,
238        cameras={
239            "Geddes_Huron_1",
240            "Geddes_Huron_2",
241            "Huron_Plymouth_1",
242            "Huron_Plymouth_2",
243            "Main_stadium_1",
244            "Main_stadium_2",
245            "Plymouth_Beal",
246            "Plymouth_Bishop",
247            "Plymouth_EPA",
248            "Plymouth_Georgetown",
249            "State_Ellsworth_NE",
250            "State_Ellsworth_NW",
251            "State_Ellsworth_SE",
252            "State_Ellsworth_SW",
253            "Fuller_Fuller_CT",
254            "Fuller_Glazier_1",
255            "Fuller_Glazier_2",
256            "Fuller_Glen",
257            "Dexter_Maple_1",
258            "Dexter_Maple_2",
259            "Hubbard_Huron_1",
260            "Hubbard_Huron_2",
261            "Maple_Miller_1",
262            "Maple_Miller_2",
263        },
264    ):
265        """Initialize a dictionary of Mcity traffic cameras with IDs and empty AWS source mappings."""
266        cameras_dict = {camera.lower(): {} for camera in cameras}
267        for id, camera in enumerate(cameras_dict):
268            cameras_dict[camera]["id"] = id
269            cameras_dict[camera]["aws-sources"] = {}
270
271        print(f"Processed {len(cameras_dict)} cameras")
272        return cameras_dict
273
274    def _mcity_process_aws_buckets(
275        self,
276        cameras_dict,
277        aws_sources={
278            "sip-sensor-data": [""],
279            "sip-sensor-data2": ["wheeler1/", "wheeler2/"],
280        },
281    ):
282        """Processes AWS buckets to map camera names to their corresponding AWS sources."""
283        for bucket in tqdm(aws_sources, desc="Processing AWS sources"):
284            for folder in aws_sources[bucket]:
285                # Get and pre-process AWS data
286                result = self.s3.list_objects_v2(
287                    Bucket=bucket, Prefix=folder, Delimiter="/"
288                )
289                folders = self._process_aws_result(result)
290
291                # Align folder names with camera names
292                folders_aligned = [
293                    re.sub(r"(?<!_)(\d)", r"_\1", folder.lower().rstrip("/"))
294                    for folder in folders
295                ]  # Align varying AWS folder names with camera names
296                folders_aligned = [
297                    folder.replace("fullerct", "fuller_ct")
298                    for folder in folders_aligned
299                ]  # Replace "fullerct" with "fuller_ct" to align with camera names
300                folders_aligned = [
301                    folder.replace("fullser", "fuller") for folder in folders_aligned
302                ]  # Fix "fullser" typo in AWS
303
304                # Check cameras for AWS sources
305                if folders:
306                    for camera_name in cameras_dict:
307                        for folder_name, folder_name_aligned in zip(
308                            folders, folders_aligned
309                        ):
310                            if (
311                                camera_name in folder_name_aligned
312                                and "gs_" in folder_name_aligned
313                            ):  # gs_ is the prefix used in AWS
314                                aws_source = f"{bucket}/{folder_name}"
315                                if (
316                                    aws_source
317                                    not in cameras_dict[camera_name]["aws-sources"]
318                                ):
319                                    cameras_dict[camera_name]["aws-sources"][
320                                        aws_source
321                                    ] = {}
322                else:
323                    print(f"AWS did not return a list of folders for {bucket}/{folder}")
324
325    def _mcity_select_data(self, cameras_dict):
326        """Processes S3 camera data within specified date range, organizing files by camera, source, and date while tracking metrics."""
327        n_cameras = 0
328        n_aws_sources = 0
329        n_files_to_download = 0
330        download_size_bytes = 0
331        for camera in tqdm(cameras_dict, desc="Looking for data entries in range"):
332            n_cameras += 1
333            for aws_source in cameras_dict[camera]["aws-sources"]:
334                file_downloaded_test = False
335                n_aws_sources += 1
336                bucket = aws_source.split("/")[0]
337                prefix_camera = "/".join(aws_source.split("/")[1:])
338                result = self.s3.list_objects_v2(
339                    Bucket=bucket, Prefix=prefix_camera, Delimiter="/"
340                )
341                # Each folder represents a day
342                folders_day = self._process_aws_result(result)
343                for folder_day in folders_day:
344                    date = folder_day.split("/")[-2]
345                    if self.test_run:
346                        # Choose a sample irrespective of the data range to get data from all AWS sources
347                        in_range = True
348                    else:
349                        # Only collect data within the date range
350                        timestamp = datetime.datetime.strptime(date, "%Y-%m-%d")
351                        in_range = self.start_date <= timestamp <= self.end_date
352
353                    if in_range:
354                        cameras_dict[camera]["aws-sources"][aws_source][date] = {}
355                        result = self.s3.list_objects_v2(
356                            Bucket=bucket, Prefix=folder_day, Delimiter="/"
357                        )
358                        # Each folder represents an hour
359                        folders_hour = self._process_aws_result(result)
360                        for folder_hour in folders_hour:
361                            result = self.s3.list_objects_v2(
362                                Bucket=bucket, Prefix=folder_hour, Delimiter="/"
363                            )
364                            files = result["Contents"]
365                            for file in files:
366                                n_files_to_download += 1
367                                download_size_bytes += file["Size"]
368                                file_name = os.path.basename(file["Key"])
369                                cameras_dict[camera]["aws-sources"][aws_source][date][
370                                    file_name
371                                ] = {}
372                                cameras_dict[camera]["aws-sources"][aws_source][date][
373                                    file_name
374                                ]["key"] = file["Key"]
375                                self.file_names.append(file["Key"])
376                                cameras_dict[camera]["aws-sources"][aws_source][date][
377                                    file_name
378                                ]["size"] = file["Size"]
379                                cameras_dict[camera]["aws-sources"][aws_source][date][
380                                    file_name
381                                ]["date"] = file["LastModified"].strftime(
382                                    "%Y-%m-%d %H:%M:%S"
383                                )
384                                if self.test_run:
385                                    file_downloaded_test = True
386                                    print(f"{aws_source} : {file_name}")
387                                    break  # escape for file in files
388                            if self.test_run and file_downloaded_test:
389                                break  # escape for folder_hour in folders_hour
390                        if self.test_run and file_downloaded_test:
391                            break  # escape for folder_day in folders_day
392
393        self.log_download["n_cameras"] = n_cameras
394        self.log_download["n_aws_sources"] = n_aws_sources
395        self.log_download["n_files_to_process"] = n_files_to_download
396        self.log_download["selection_size_tb"] = download_size_bytes / (1024**4)
397
398        return self.file_names, n_files_to_download
399
400    def _mcity_download_data(self, cameras_dict, n_files_to_download, passed_checks):
401        """Downloads data from AWS S3 buckets based on camera dictionary, returns bool indicating success."""
402        mb_per_s_list = []
403
404        if passed_checks:
405            download_successful = True
406
407            # if self.delete_old_data:
408            #     try:
409            #         shutil.rmtree(self.data_target)
410            #     except:
411            #         pass
412            #     os.makedirs(self.data_target)
413
414            step = 0
415            download_started = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
416            self.log_download["download_started"] = download_started
417            with tqdm(desc="Downloading data", total=n_files_to_download) as pbar:
418                for camera in cameras_dict:
419                    for aws_source in cameras_dict[camera]["aws-sources"]:
420                        bucket = aws_source.split("/", 1)[0]
421                        for date in cameras_dict[camera]["aws-sources"][aws_source]:
422                            for file in cameras_dict[camera]["aws-sources"][aws_source][
423                                date
424                            ]:
425                                time_start = time.time()
426
427                                # AWS S3 Download
428                                file_name = os.path.basename(file)
429                                key = cameras_dict[camera]["aws-sources"][aws_source][
430                                    date
431                                ][file_name]["key"]
432                                target = os.path.join(self.data_target, file_name)
433                                self.s3.download_file(bucket, key, target)
434
435            download_ended = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
436            self.log_download["download_ended"] = download_ended
437
438        else:
439            download_successful = False
440            print("Safety checks failed. Not downloading data")
441
442        return download_successful
443
444    def _process_aws_result(self, result):
445        """Extracts folder prefixes from AWS S3 list operation result; returns folder list or None if no prefixes found."""
446        # Get list of folders from AWS response
447        if "CommonPrefixes" in result:
448            folders = [prefix["Prefix"] for prefix in result["CommonPrefixes"]]
449            return folders
450        else:
451            return None

AWS data downloader for handling, downloading, and processing camera data from AWS S3 buckets.

AwsDownloader( start_date: datetime.datetime, end_date: datetime.datetime, sample_rate_hz: float, log_time: datetime.datetime, source: str = 'mcity_gridsmart', storage_target_root: str = '.', subfolder_data: str = 'data', subfolder_logs: str = 'logs', test_run: bool = False, delete_old_data: bool = False)
24    def __init__(
25        self,
26        start_date: datetime.datetime,
27        end_date: datetime.datetime,
28        sample_rate_hz: float,
29        log_time: datetime.datetime,
30        source: str = "mcity_gridsmart",
31        storage_target_root: str = ".",
32        subfolder_data: str = "data",
33        subfolder_logs: str = "logs",
34        test_run: bool = False,
35        delete_old_data: bool = False,
36    ):
37        """Initialize the class with parameters for date range, logging configurations, and storage settings."""
38        self.start_date = start_date
39        self.end_date = end_date
40        self.sample_rate_hz = sample_rate_hz
41        self.source = source
42        self.storage_target_root = storage_target_root
43        self.test_run = test_run
44        self.delete_old_data = delete_old_data
45        self.log_time = log_time
46        self.file_names = []
47
48        self.log_download = {}
49        self.log_sampling = {}
50
51        # Fill log
52        self.log_download["source"] = source
53        self.log_download["sample_rate_hz"] = sample_rate_hz
54        self.log_download["storage_target_root"] = storage_target_root
55        self.log_download["selection_start_date"] = start_date.strftime("%Y-%m-%d")
56        self.log_download["selection_end_date"] = end_date.strftime("%Y-%m-%d")
57        self.log_download["delete_old_data"] = delete_old_data
58        self.log_download["test_run"] = test_run
59
60        # Run name
61        formatted_start = self.start_date.strftime("%Y-%m-%d")
62        formatted_end = self.end_date.strftime("%Y-%m-%d")
63        self.run_name = f"data_engine_rolling_{formatted_start}_to_{formatted_end}"
64
65        # Setup storage folders
66        self.data_target = os.path.join(
67            storage_target_root, subfolder_data, self.run_name
68        )
69        os.makedirs(self.data_target, exist_ok=True)
70
71        self.log_target = os.path.join(
72            storage_target_root, subfolder_logs, self.run_name
73        )
74        os.makedirs(self.log_target, exist_ok=True)
75
76        # Connect to AWS S3
77        self.s3 = boto3.client(
78            "s3",
79            aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID", None),
80            aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY", None),
81            region_name="us-east-1",
82        )

Initialize the class with parameters for date range, logging configurations, and storage settings.

start_date
end_date
sample_rate_hz
source
storage_target_root
test_run
delete_old_data
log_time
file_names
log_download
log_sampling
run_name
data_target
log_target
s3
def process_data(self):
 84    def process_data(self):
 85        """Downloads files from AWS S3, samples them at 1Hz, and generates download and sampling logs."""
 86        cameras_dict = self._mcity_init_cameras()
 87        self._mcity_process_aws_buckets(cameras_dict)
 88        self.file_names, n_files_to_download = self._mcity_select_data(cameras_dict)
 89
 90        # Tracking
 91        if WANDB_ACTIVE == True:
 92            wandb.init(
 93                name=self.run_name,
 94                job_type="download",
 95                project="Pre-Processing in AWS",
 96            )
 97
 98        targets = []
 99        step = 0
100        with tqdm(desc="Processing data", total=n_files_to_download) as pbar:
101            for camera in cameras_dict:
102                for aws_source in cameras_dict[camera]["aws-sources"]:
103                    bucket = aws_source.split("/", 1)[0]
104                    for date in cameras_dict[camera]["aws-sources"][aws_source]:
105                        for file in cameras_dict[camera]["aws-sources"][aws_source][
106                            date
107                        ]:
108                            try:
109                                log_run = {}
110
111                                # AWS S3 Download
112                                time_start = time.time()
113                                file_name = os.path.basename(file)
114                                key = cameras_dict[camera]["aws-sources"][aws_source][
115                                    date
116                                ][file_name]["key"]
117                                target = os.path.join(self.data_target, file_name)
118                                targets.append(target)
119                                self.s3.download_file(bucket, key, target)
120
121                                # Logging
122                                file_size_mb = cameras_dict[camera]["aws-sources"][
123                                    aws_source
124                                ][date][file_name]["size"] / (1024**2)
125                                time_end = time.time()
126                                duration = time_end - time_start
127                                mb_per_s = file_size_mb / duration
128                                if WANDB_ACTIVE:
129                                    wandb.log({"download/mb_per_s": mb_per_s}, step)
130                                    wandb.log({"download/s": duration}, step)
131
132                                # Sample data
133                                time_start = time.time()
134                                sampler = SampleTimestamps(
135                                    file_path=target, target_framerate_hz=1
136                                )
137                                timestamps = sampler.get_timestamps()
138
139                                # We need at least 2 timestamps to calculate a framerate
140                                if len(timestamps) >= 2:
141                                    # Get framerate
142                                    framerate_hz, timestamps, upper_bound_threshold = (
143                                        sampler.get_framerate(timestamps, log_run)
144                                    )
145                                    valid_target_framerate = (
146                                        sampler.check_target_framerate(
147                                            framerate_hz, log_run
148                                        )
149                                    )
150                                    # We need a target framerate lower than the oririinal framerate
151                                    if valid_target_framerate:
152                                        # Sample data
153                                        (
154                                            selected_indices,
155                                            selected_timestamps,
156                                            target_timestamps,
157                                            selected_target_timestamps,
158                                        ) = sampler.sample_timestamps(
159                                            timestamps, upper_bound_threshold, log_run
160                                        )
161
162                                        time_end = time.time()
163                                        duration = time_end - time_start
164                                        timestamps_per_s = len(timestamps) / duration
165                                        if WANDB_ACTIVE:
166                                            wandb.log(
167                                                {
168                                                    "sampling/timestamps_per_s": timestamps_per_s
169                                                },
170                                                step,
171                                            )
172                                            wandb.log({"sampling/s": duration}, step)
173
174                                        # Upload data
175                                        time_start = time.time()
176                                        print(camera)
177                                        file_size_mb = sampler.update_upload_file(
178                                            target, selected_indices, camera
179                                        )
180
181                                        time_end = time.time()
182                                        duration = time_end - time_start
183                                        mb_per_s = file_size_mb / duration
184                                        if WANDB_ACTIVE:
185                                            wandb.log(
186                                                {"upload/mb_per_s": mb_per_s}, step
187                                            )
188                                            wandb.log({"upload/s": duration}, step)
189
190                                    # Update log
191                                    self.log_sampling[file] = log_run
192
193                                else:
194                                    print(
195                                        f"Not enough timestamps to calculate framerate. Skipping {file}"
196                                    )
197
198                                # Delete local data
199                                os.remove(target)
200                                os.remove(target + "_sampled_1Hz")
201
202                                # Update progress bar
203                                step += 1
204                                pbar.update(1)
205
206                            except Exception as e:
207                                print(f"Error in mcity_gridsmart_loader: {e}")
208                                print(traceback.format_exc())
209
210        # Finish tracking
211        try:
212            wandb.finish()
213        except:
214            pass
215        pbar.close()
216
217        # Store download log
218        name_log_download = "FileDownload"
219        self.log_download["data"] = cameras_dict
220        log_name = (self.log_time + "_" + name_log_download).replace(" ", "_").replace(
221            ":", "_"
222        ) + ".json"
223        log_file_path = os.path.join(self.log_target, log_name)
224        with open(log_file_path, "w") as json_file:
225            json.dump(self.log_download, json_file, indent=4)
226
227        # Store sampling log
228        name_log_sampling = "FileSampling"
229        log_name = (self.log_time + "_" + name_log_sampling).replace(" ", "_").replace(
230            ":", "_"
231        ) + ".json"
232        log_file_path = os.path.join(self.log_target, log_name)
233        with open(log_file_path, "w") as json_file:
234            json.dump(self.log_sampling, json_file, indent=4)

Downloads files from AWS S3, samples them at 1Hz, and generates download and sampling logs.