workflows.aws_download

  1import base64
  2import datetime
  3import json
  4import logging
  5import os
  6import re
  7import time
  8from importlib.metadata import version as get_version
  9from queue import Empty
 10
 11import boto3
 12import fiftyone as fo
 13import pytz
 14import torch.multiprocessing as mp
 15from torch.utils.tensorboard import SummaryWriter
 16from tqdm import tqdm
 17
 18from config.config import NUM_WORKERS
 19
 20
 21class AwsDownloader:
 22    """Downloads and decodes data from AWS S3 bucket, with support for multiprocessing and Voxel51 dataset creation."""
 23
 24    def __init__(self, bucket, prefix, download_path, test_run):
 25        """Initialize S3 downloader with bucket, prefix, download path and test flag, setting up AWS credentials from .secret file."""
 26        try:
 27            with open(".secret", "r") as file:
 28                for line in file:
 29                    key, value = line.strip().split("=")
 30                    os.environ[key] = value
 31
 32            # S3 Client
 33            self.s3 = boto3.client(
 34                "s3",
 35                aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID", None),
 36                aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY", None),
 37            )
 38
 39            self.bucket = bucket
 40            self.prefix = prefix
 41            self.test_run = test_run
 42
 43            self.download_path = download_path
 44            os.makedirs(download_path, exist_ok=True)
 45        except Exception as e:
 46            logging.error(f"Connection to S3 client failed: {e}")
 47
 48    def _list_files(self, bucket, prefix):
 49        """Return list of files and total size in TB from S3 bucket with given prefix."""
 50        files = []
 51        download_size_bytes = 0
 52        paginator = self.s3.get_paginator("list_objects_v2")
 53        for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
 54            if "Contents" in page:
 55                for obj in page["Contents"]:
 56                    files.append(obj["Key"])
 57                    download_size_bytes += obj["Size"]
 58
 59        total_size_tb = download_size_bytes / (1024**4)
 60
 61        return files, total_size_tb
 62
 63    def _set_v51_metadata(self, output_folder_root):
 64        """Sets and saves FiftyOne metadata for dataset import by creating a metadata.json file with sample fields configuration."""
 65        sample_fields = [
 66            {
 67                "name": "filepath",
 68                "ftype": "fiftyone.core.fields.StringField",
 69                "embedded_doc_type": None,
 70                "subfield": None,
 71                "fields": [],
 72                "db_field": "filepath",
 73                "description": None,
 74                "info": None,
 75                "read_only": True,
 76                "created_at": {
 77                    "$date": "2024-11-14T15:24:21.719Z"
 78                },  # FIXME Replace with actual date of function call
 79            },
 80            {
 81                "name": "sensor",
 82                "ftype": "fiftyone.core.fields.StringField",
 83                "embedded_doc_type": None,
 84                "subfield": None,
 85                "fields": [],
 86                "db_field": "sensor",
 87                "description": None,
 88                "info": None,
 89                "read_only": True,
 90                "created_at": {"$date": "2024-11-14T15:24:21.719Z"},
 91            },
 92            {
 93                "name": "timestamp",
 94                "ftype": "fiftyone.core.fields.DateTimeField",
 95                "embedded_doc_type": None,
 96                "subfield": None,
 97                "fields": [],
 98                "db_field": "timestamp",
 99                "description": None,
100                "info": None,
101                "read_only": True,
102                "created_at": {"$date": "2024-11-14T15:24:21.719Z"},
103            },
104        ]
105
106        # Get version of current V51 package
107        package_name = "fiftyone"
108        version = get_version(package_name)
109        version_str = str(version)
110
111        v51_metadata = {}
112        v51_metadata["version"] = version_str
113        v51_metadata["sample_fields"] = sample_fields
114
115        # Store metadata.json
116        file_path = os.path.join(output_folder_root, "metadata.json")
117        with open(file_path, "w") as json_file:
118            json.dump(v51_metadata, json_file)
119
120    def _process_file(self, file_path, output_folder_data):
121        """Process a JSON file containing image data and timestamps, converting them to a format compatible with V51 and saving images to disk."""
122        v51_samples = []
123
124        # Prepare ISO check for time format
125        iso8601_regex = re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d{3})?Z$")
126
127        with open(file_path, "r") as file:
128            for line in file:
129                try:
130                    data = json.loads(line)
131                    if "time" in data and "data" in data:
132                        # Get data
133                        timestamp = data.get("time")
134                        image_base64 = data.get("data")
135
136                        # Get timestamp
137                        try:
138                            # Time with ms (default)
139                            time_obj = datetime.datetime.strptime(
140                                timestamp, "%Y-%m-%d %H:%M:%S.%f"
141                            )
142                        except ValueError:
143                            time_obj = datetime.datetime.strptime(
144                                timestamp, "%Y-%m-%d %H:%M:%S"
145                            )
146                        formatted_time = (
147                            time_obj.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
148                        )
149                        sensor_name = data.get("sensor_name")
150
151                    elif (
152                        "image" in data
153                        and "sensor_name" in data
154                        and "event_timestamp" in data
155                    ):
156                        # Get data
157                        sensor_name = data.get("sensor_name")
158                        timestamp = data.get("event_timestamp")
159                        image_base64 = data.get("image")
160
161                        # Get timestamps in UTC and Michigan time
162                        # TODO Ensure that this conversion is correct, not clear in which time timestamps were written in
163                        utc_time = datetime.datetime.utcfromtimestamp(timestamp)
164                        michigan_tz = pytz.timezone("America/Detroit")
165                        michigan_time = utc_time.astimezone(michigan_tz)
166                        formatted_time = (
167                            michigan_time.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
168                        )
169                    else:
170                        logging.error(f"Format cannot be processed: {data}")
171                        continue
172
173                    if image_base64 and formatted_time:
174
175                        if (
176                            sensor_name is None
177                        ):  # FIXME Can be derived from S3 source if not stored in data itself
178                            sensor_name = "Unknown"
179
180                        # File paths
181                        image_filename = f"{sensor_name}_{formatted_time}.jpg"
182
183                        # Ensure correct timestamp format
184                        milliseconds = formatted_time.split(".")[1][:3].ljust(3, "0")
185                        formatted_time = (
186                            formatted_time.split(".")[0] + "." + milliseconds + "Z"
187                        )
188
189                        iso8601_conform = bool(iso8601_regex.match(formatted_time))
190                        if not iso8601_conform:
191                            logging.error(
192                                f"Timestamp does not conform to ISO8601: {formatted_time}"
193                            )
194
195                        # Store image to disk
196                        output_path = os.path.join(output_folder_data, image_filename)
197
198                        if os.path.exists(output_path):
199                            logging.debug(f"File already exists: {output_path}")
200                        else:
201                            image_data = base64.b64decode(image_base64)
202                            with open(output_path, "wb") as image_file:
203                                image_file.write(image_data)
204
205                        # Prepare import with V51
206                        v51_sample = {
207                            "filepath": output_path,
208                            "sensor": sensor_name,
209                            "timestamp": {"$date": formatted_time},
210                        }
211                        v51_samples.append(v51_sample)
212
213                    else:
214                        logging.error(
215                            f"There was an issue during file processing of {file_path}. Issues with image_base64: {image_base64 is None}, formatted_time: {formatted_time is None}, sensor_name: {sensor_name is None}"
216                        )
217                        continue
218
219                except json.JSONDecodeError as e:
220                    logging.error(
221                        f"File {os.path.basename(file_path)} - Error decoding JSON: {e}"
222                    )
223
224        return v51_samples
225
226    def download_files(self, log_dir, MAX_SIZE_TB=1.5):
227        """Downloads files from AWS S3 bucket and verifies the download success, returning download details and status."""
228        files_to_be_downloaded, total_size_tb = self._list_files(
229            self.bucket, self.prefix
230        )
231
232        writer = SummaryWriter(log_dir=log_dir)
233
234        n_downloaded_files, n_skipped_files = 0, 0
235        if total_size_tb <= MAX_SIZE_TB:
236            for file in tqdm(files_to_be_downloaded, desc="Downloading files from AWS"):
237                time_start = time.time()
238                file_path = os.path.join(self.download_path, file)
239
240                # Ensure directory exists
241                os.makedirs(os.path.dirname(file_path), exist_ok=True)
242
243                if not os.path.exists(file_path):
244                    if self.test_run == False:
245                        self.s3.download_file(self.bucket, file, file_path)
246                    n_downloaded_files += 1
247
248                    time_end = time.time()
249                    duration = time_end - time_start
250                    files_per_second = 1 / duration
251
252                    writer.add_scalar(
253                        "download/files_per_second",
254                        files_per_second,
255                        n_downloaded_files,
256                    )
257                else:
258                    logging.warning(f"Skipping {file}, already exists.")
259                    n_skipped_files += 1
260        else:
261            logging.error(
262                f"Total size of {total_size_tb:.2f} TB exceeds limit of {MAX_SIZE_TB} TB. Skipping download."
263            )
264
265        logging.info(
266            f"Collected {n_downloaded_files} files of total size {total_size_tb:.2f} TB, skipped {n_skipped_files} files."
267        )
268
269        if self.test_run == True:
270            logging.error(
271                f"Test run: No data downloaded. Set 'test_run': False in the config. Aborting."
272            )
273
274        # Check if all files were downloaded properly
275        DOWNLOAD_NUMBER_SUCCESS, DOWNLOAD_SIZE_SUCCESS = False, False
276
277        subfolders = [
278            f.path for f in os.scandir(self.download_path) if f.is_dir()
279        ]  # Each subfolder stands for a requested sample rate. Defaults to "1"
280        if len(subfolders) > 0:
281            sub_folder = os.path.basename(subfolders[0])
282            if len(subfolders) > 1:
283                logging.warning(
284                    f"More than one subfolder in download directory {self.download_path} found, selected {sub_folder}."
285                )
286
287            downloaded_files = []
288            downloaded_size_bytes = 0
289
290            folder_path = os.path.join(self.download_path, sub_folder)
291            files = [
292                f
293                for f in os.listdir(folder_path)
294                if os.path.isfile(os.path.join(folder_path, f))
295            ]
296
297            with tqdm(
298                total=len(files), desc="Checking downloaded files", unit="file"
299            ) as pbar:
300                for file in files:
301                    file_path = os.path.join(folder_path, file)
302                    downloaded_files.append(file_path)
303                    downloaded_size_bytes += os.path.getsize(file_path)
304                    pbar.update(1)
305
306            downloaded_size_tb = downloaded_size_bytes / (1024**4)
307            logging.info(
308                f"Downloaded {len(downloaded_files)} files. Total size: {downloaded_size_tb:.2f} TB"
309            )
310
311            if len(files_to_be_downloaded) == len(downloaded_files):
312                logging.info("All files downloaded successfully.")
313                DOWNLOAD_NUMBER_SUCCESS = True
314            else:
315                logging.error(
316                    f"Only {len(downloaded_files)} of {len(files_to_be_downloaded)} planned files downloaded."
317                )
318                DOWNLOAD_NUMBER_SUCCESS = False
319
320            if total_size_tb == downloaded_size_tb:
321                logging.info("The downloaded size equals the planned download size.")
322                DOWNLOAD_SIZE_SUCCESS = True
323            else:
324                logging.error(
325                    f"The downloaded size of {downloaded_size_tb} TB varies from the planned download size of {total_size_tb} TB."
326                )
327                DOWNLOAD_SIZE_SUCCESS = False
328        else:
329            sub_folder = None
330            logging.error(
331                f"No subfolder found in download directory {self.download_path}."
332            )
333
334        writer.close()
335
336        return (
337            sub_folder,
338            files,
339            files_to_be_downloaded,
340            DOWNLOAD_NUMBER_SUCCESS,
341            DOWNLOAD_SIZE_SUCCESS,
342        )
343
344    # Decode data with multiple workers
345    def decode_data(
346        self,
347        sub_folder,
348        files,
349        log_dir,
350        dataset_name="annarbor_rolling",
351        output_folder="decoded",
352        dataset_persistance=True,
353    ):
354        """Decodes downloaded data files into a Voxel51 dataset using multiprocessing workers for parallel processing."""
355        output_folder_root = os.path.join(self.download_path, sub_folder, output_folder)
356        output_folder_data = os.path.join(output_folder_root, "data")
357        os.makedirs(output_folder_data, exist_ok=True)
358
359        # Save metadata to import dataset as Voxel51 dataset
360        self._set_v51_metadata(output_folder_root)
361
362        # Extract file content
363        json_file_path = os.path.join(output_folder_root, "samples.json")
364
365        # Queue for multiprocessing results
366        result_queue = mp.Queue()
367        task_queue = mp.Queue()
368
369        # Add files to the task queue
370        for file_path in files:
371            absolute_file_path = os.path.join(self.download_path, sub_folder, file_path)
372            task_queue.put(absolute_file_path)
373
374        logging.info(
375            f"Added {len(files)} files to task queue. Ready for multiprocessing."
376        )
377
378        # Gather events for extraction workers
379        worker_done_events = []
380        n_extraction_workers = NUM_WORKERS
381        for _ in range(n_extraction_workers):
382            done_event = mp.Event()
383            worker_done_events.append(done_event)
384
385        # Start the result worker
386        result_worker_process = mp.Process(
387            target=self.result_json_worker,
388            args=(result_queue, json_file_path, worker_done_events, log_dir),
389        )
390        result_worker_process.start()
391
392        # Start the data extraction workers
393        n_files_per_worker = int(len(files) / n_extraction_workers)
394        workers = []
395        for done_event in worker_done_events:
396            p = mp.Process(
397                target=self.data_extraction_worker,
398                args=(
399                    task_queue,
400                    result_queue,
401                    done_event,
402                    output_folder_data,
403                    n_files_per_worker,
404                ),
405            )
406            p.start()
407            workers.append(p)
408
409        # Waiting for data extraction workers
410        for p in workers:
411            p.join()
412        logging.info("All data processing workers finished processing.")
413
414        # Waiting for data processing worker
415        result_worker_process.join()
416        logging.info("JSON worker finished processing.")
417
418        task_queue.close()
419        result_queue.close()
420
421        # Load V51 dataset
422        dataset = fo.Dataset(name=dataset_name)
423        dataset.add_dir(
424            dataset_dir=output_folder_root,
425            dataset_type=fo.types.FiftyOneDataset,
426            progress=True,
427        )
428
429        dataset.compute_metadata(num_workers=NUM_WORKERS, progress=True)
430        dataset.persistent = dataset_persistance
431
432        return dataset
433
434    # Worker functions
435    def data_extraction_worker(
436        self,
437        task_queue,
438        result_queue,
439        done_event,
440        output_folder_data,
441        n_files_per_worker,
442    ):
443        """Worker process that extracts data from files in the task queue and puts results in the result queue, processing until queue is empty."""
444        logging.info(f"Process ID: {os.getpid()}. Data extraction process started.")
445
446        n_files_processed = 0
447        n_samples_processed = 0
448        while True:
449            if task_queue.empty() == True:  # If queue is empty, break out
450                break
451            else:
452                try:
453                    file_path = task_queue.get(
454                        timeout=0.1
455                    )  # Get a file path from the task queue
456                    v51_samples = self._process_file(file_path, output_folder_data)
457                    if len(v51_samples) > 0:
458                        result_queue.put(v51_samples)
459                        if n_files_processed % 100 == 0:
460                            logging.info(
461                                f"Worker {os.getpid()} finished {n_samples_processed} samples. {n_files_processed} / ~{n_files_per_worker} files done."
462                            )
463                        n_files_processed += 1
464                        n_samples_processed += len(v51_samples)
465
466                except Exception as e:
467                    logging.error(
468                        f"Error occured during processing of file {os.path.basename(file_path)}: {e}"
469                    )
470                    continue
471
472        # Once all tasks are processed, set the done_event
473        done_event.set()
474        logging.info(f"Data Extraction worker {os.getpid()} shutting down.")
475        return True
476
477    def result_json_worker(
478        self, result_queue, json_file_path, worker_done_events, log_dir
479    ):
480        """Process results from worker queue, aggregate them into a JSON file and log metrics."""
481        # Check if the samples.json file exists and load existing data if it does
482        logging.info(f"Process ID: {os.getpid()}. Results processing process started.")
483
484        writer = SummaryWriter(log_dir=log_dir)
485        n_files_processed = 0
486        n_images_extracted = 0
487
488        v51_samples_dict = {"samples": []}
489
490        # Update the file with incoming results as long as workers are running
491        while len(worker_done_events) > 0 or not result_queue.empty():
492            try:
493                v51_samples = result_queue.get(timeout=1)
494                v51_samples_dict["samples"].extend(v51_samples)
495
496                n_images_extracted += len(v51_samples)
497                writer.add_scalar(
498                    "decode/samples", n_images_extracted, n_files_processed
499                )
500                n_files_processed += 1
501
502                # Log every 100,000 processed samples
503                if n_files_processed % 1_000 == 0:
504                    logging.info(
505                        f"{n_images_extracted} samples added to dict. Items in queue: {result_queue.qsize()}. Active workers: {len(worker_done_events)}"
506                    )
507
508            except Empty:
509                # Empty queue is expected sometimes
510                pass
511
512            except Exception as e:
513                logging.error(f"JSON worker error: {e}")
514                continue
515
516            # Check if any worker is done
517            for event in list(worker_done_events):
518                if event.is_set():
519                    worker_done_events.remove(event)
520
521        # Store data to JSON
522        logging.info(f"Storing data to samples.json")
523        with open(json_file_path, "w") as json_file:
524            json.dump(v51_samples_dict, json_file)
525
526        writer.close()
527        logging.info(f"JSON worker {os.getpid()} shutting down.")
528        return True
class AwsDownloader:
 22class AwsDownloader:
 23    """Downloads and decodes data from AWS S3 bucket, with support for multiprocessing and Voxel51 dataset creation."""
 24
 25    def __init__(self, bucket, prefix, download_path, test_run):
 26        """Initialize S3 downloader with bucket, prefix, download path and test flag, setting up AWS credentials from .secret file."""
 27        try:
 28            with open(".secret", "r") as file:
 29                for line in file:
 30                    key, value = line.strip().split("=")
 31                    os.environ[key] = value
 32
 33            # S3 Client
 34            self.s3 = boto3.client(
 35                "s3",
 36                aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID", None),
 37                aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY", None),
 38            )
 39
 40            self.bucket = bucket
 41            self.prefix = prefix
 42            self.test_run = test_run
 43
 44            self.download_path = download_path
 45            os.makedirs(download_path, exist_ok=True)
 46        except Exception as e:
 47            logging.error(f"Connection to S3 client failed: {e}")
 48
 49    def _list_files(self, bucket, prefix):
 50        """Return list of files and total size in TB from S3 bucket with given prefix."""
 51        files = []
 52        download_size_bytes = 0
 53        paginator = self.s3.get_paginator("list_objects_v2")
 54        for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
 55            if "Contents" in page:
 56                for obj in page["Contents"]:
 57                    files.append(obj["Key"])
 58                    download_size_bytes += obj["Size"]
 59
 60        total_size_tb = download_size_bytes / (1024**4)
 61
 62        return files, total_size_tb
 63
 64    def _set_v51_metadata(self, output_folder_root):
 65        """Sets and saves FiftyOne metadata for dataset import by creating a metadata.json file with sample fields configuration."""
 66        sample_fields = [
 67            {
 68                "name": "filepath",
 69                "ftype": "fiftyone.core.fields.StringField",
 70                "embedded_doc_type": None,
 71                "subfield": None,
 72                "fields": [],
 73                "db_field": "filepath",
 74                "description": None,
 75                "info": None,
 76                "read_only": True,
 77                "created_at": {
 78                    "$date": "2024-11-14T15:24:21.719Z"
 79                },  # FIXME Replace with actual date of function call
 80            },
 81            {
 82                "name": "sensor",
 83                "ftype": "fiftyone.core.fields.StringField",
 84                "embedded_doc_type": None,
 85                "subfield": None,
 86                "fields": [],
 87                "db_field": "sensor",
 88                "description": None,
 89                "info": None,
 90                "read_only": True,
 91                "created_at": {"$date": "2024-11-14T15:24:21.719Z"},
 92            },
 93            {
 94                "name": "timestamp",
 95                "ftype": "fiftyone.core.fields.DateTimeField",
 96                "embedded_doc_type": None,
 97                "subfield": None,
 98                "fields": [],
 99                "db_field": "timestamp",
100                "description": None,
101                "info": None,
102                "read_only": True,
103                "created_at": {"$date": "2024-11-14T15:24:21.719Z"},
104            },
105        ]
106
107        # Get version of current V51 package
108        package_name = "fiftyone"
109        version = get_version(package_name)
110        version_str = str(version)
111
112        v51_metadata = {}
113        v51_metadata["version"] = version_str
114        v51_metadata["sample_fields"] = sample_fields
115
116        # Store metadata.json
117        file_path = os.path.join(output_folder_root, "metadata.json")
118        with open(file_path, "w") as json_file:
119            json.dump(v51_metadata, json_file)
120
121    def _process_file(self, file_path, output_folder_data):
122        """Process a JSON file containing image data and timestamps, converting them to a format compatible with V51 and saving images to disk."""
123        v51_samples = []
124
125        # Prepare ISO check for time format
126        iso8601_regex = re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d{3})?Z$")
127
128        with open(file_path, "r") as file:
129            for line in file:
130                try:
131                    data = json.loads(line)
132                    if "time" in data and "data" in data:
133                        # Get data
134                        timestamp = data.get("time")
135                        image_base64 = data.get("data")
136
137                        # Get timestamp
138                        try:
139                            # Time with ms (default)
140                            time_obj = datetime.datetime.strptime(
141                                timestamp, "%Y-%m-%d %H:%M:%S.%f"
142                            )
143                        except ValueError:
144                            time_obj = datetime.datetime.strptime(
145                                timestamp, "%Y-%m-%d %H:%M:%S"
146                            )
147                        formatted_time = (
148                            time_obj.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
149                        )
150                        sensor_name = data.get("sensor_name")
151
152                    elif (
153                        "image" in data
154                        and "sensor_name" in data
155                        and "event_timestamp" in data
156                    ):
157                        # Get data
158                        sensor_name = data.get("sensor_name")
159                        timestamp = data.get("event_timestamp")
160                        image_base64 = data.get("image")
161
162                        # Get timestamps in UTC and Michigan time
163                        # TODO Ensure that this conversion is correct, not clear in which time timestamps were written in
164                        utc_time = datetime.datetime.utcfromtimestamp(timestamp)
165                        michigan_tz = pytz.timezone("America/Detroit")
166                        michigan_time = utc_time.astimezone(michigan_tz)
167                        formatted_time = (
168                            michigan_time.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
169                        )
170                    else:
171                        logging.error(f"Format cannot be processed: {data}")
172                        continue
173
174                    if image_base64 and formatted_time:
175
176                        if (
177                            sensor_name is None
178                        ):  # FIXME Can be derived from S3 source if not stored in data itself
179                            sensor_name = "Unknown"
180
181                        # File paths
182                        image_filename = f"{sensor_name}_{formatted_time}.jpg"
183
184                        # Ensure correct timestamp format
185                        milliseconds = formatted_time.split(".")[1][:3].ljust(3, "0")
186                        formatted_time = (
187                            formatted_time.split(".")[0] + "." + milliseconds + "Z"
188                        )
189
190                        iso8601_conform = bool(iso8601_regex.match(formatted_time))
191                        if not iso8601_conform:
192                            logging.error(
193                                f"Timestamp does not conform to ISO8601: {formatted_time}"
194                            )
195
196                        # Store image to disk
197                        output_path = os.path.join(output_folder_data, image_filename)
198
199                        if os.path.exists(output_path):
200                            logging.debug(f"File already exists: {output_path}")
201                        else:
202                            image_data = base64.b64decode(image_base64)
203                            with open(output_path, "wb") as image_file:
204                                image_file.write(image_data)
205
206                        # Prepare import with V51
207                        v51_sample = {
208                            "filepath": output_path,
209                            "sensor": sensor_name,
210                            "timestamp": {"$date": formatted_time},
211                        }
212                        v51_samples.append(v51_sample)
213
214                    else:
215                        logging.error(
216                            f"There was an issue during file processing of {file_path}. Issues with image_base64: {image_base64 is None}, formatted_time: {formatted_time is None}, sensor_name: {sensor_name is None}"
217                        )
218                        continue
219
220                except json.JSONDecodeError as e:
221                    logging.error(
222                        f"File {os.path.basename(file_path)} - Error decoding JSON: {e}"
223                    )
224
225        return v51_samples
226
227    def download_files(self, log_dir, MAX_SIZE_TB=1.5):
228        """Downloads files from AWS S3 bucket and verifies the download success, returning download details and status."""
229        files_to_be_downloaded, total_size_tb = self._list_files(
230            self.bucket, self.prefix
231        )
232
233        writer = SummaryWriter(log_dir=log_dir)
234
235        n_downloaded_files, n_skipped_files = 0, 0
236        if total_size_tb <= MAX_SIZE_TB:
237            for file in tqdm(files_to_be_downloaded, desc="Downloading files from AWS"):
238                time_start = time.time()
239                file_path = os.path.join(self.download_path, file)
240
241                # Ensure directory exists
242                os.makedirs(os.path.dirname(file_path), exist_ok=True)
243
244                if not os.path.exists(file_path):
245                    if self.test_run == False:
246                        self.s3.download_file(self.bucket, file, file_path)
247                    n_downloaded_files += 1
248
249                    time_end = time.time()
250                    duration = time_end - time_start
251                    files_per_second = 1 / duration
252
253                    writer.add_scalar(
254                        "download/files_per_second",
255                        files_per_second,
256                        n_downloaded_files,
257                    )
258                else:
259                    logging.warning(f"Skipping {file}, already exists.")
260                    n_skipped_files += 1
261        else:
262            logging.error(
263                f"Total size of {total_size_tb:.2f} TB exceeds limit of {MAX_SIZE_TB} TB. Skipping download."
264            )
265
266        logging.info(
267            f"Collected {n_downloaded_files} files of total size {total_size_tb:.2f} TB, skipped {n_skipped_files} files."
268        )
269
270        if self.test_run == True:
271            logging.error(
272                f"Test run: No data downloaded. Set 'test_run': False in the config. Aborting."
273            )
274
275        # Check if all files were downloaded properly
276        DOWNLOAD_NUMBER_SUCCESS, DOWNLOAD_SIZE_SUCCESS = False, False
277
278        subfolders = [
279            f.path for f in os.scandir(self.download_path) if f.is_dir()
280        ]  # Each subfolder stands for a requested sample rate. Defaults to "1"
281        if len(subfolders) > 0:
282            sub_folder = os.path.basename(subfolders[0])
283            if len(subfolders) > 1:
284                logging.warning(
285                    f"More than one subfolder in download directory {self.download_path} found, selected {sub_folder}."
286                )
287
288            downloaded_files = []
289            downloaded_size_bytes = 0
290
291            folder_path = os.path.join(self.download_path, sub_folder)
292            files = [
293                f
294                for f in os.listdir(folder_path)
295                if os.path.isfile(os.path.join(folder_path, f))
296            ]
297
298            with tqdm(
299                total=len(files), desc="Checking downloaded files", unit="file"
300            ) as pbar:
301                for file in files:
302                    file_path = os.path.join(folder_path, file)
303                    downloaded_files.append(file_path)
304                    downloaded_size_bytes += os.path.getsize(file_path)
305                    pbar.update(1)
306
307            downloaded_size_tb = downloaded_size_bytes / (1024**4)
308            logging.info(
309                f"Downloaded {len(downloaded_files)} files. Total size: {downloaded_size_tb:.2f} TB"
310            )
311
312            if len(files_to_be_downloaded) == len(downloaded_files):
313                logging.info("All files downloaded successfully.")
314                DOWNLOAD_NUMBER_SUCCESS = True
315            else:
316                logging.error(
317                    f"Only {len(downloaded_files)} of {len(files_to_be_downloaded)} planned files downloaded."
318                )
319                DOWNLOAD_NUMBER_SUCCESS = False
320
321            if total_size_tb == downloaded_size_tb:
322                logging.info("The downloaded size equals the planned download size.")
323                DOWNLOAD_SIZE_SUCCESS = True
324            else:
325                logging.error(
326                    f"The downloaded size of {downloaded_size_tb} TB varies from the planned download size of {total_size_tb} TB."
327                )
328                DOWNLOAD_SIZE_SUCCESS = False
329        else:
330            sub_folder = None
331            logging.error(
332                f"No subfolder found in download directory {self.download_path}."
333            )
334
335        writer.close()
336
337        return (
338            sub_folder,
339            files,
340            files_to_be_downloaded,
341            DOWNLOAD_NUMBER_SUCCESS,
342            DOWNLOAD_SIZE_SUCCESS,
343        )
344
345    # Decode data with multiple workers
346    def decode_data(
347        self,
348        sub_folder,
349        files,
350        log_dir,
351        dataset_name="annarbor_rolling",
352        output_folder="decoded",
353        dataset_persistance=True,
354    ):
355        """Decodes downloaded data files into a Voxel51 dataset using multiprocessing workers for parallel processing."""
356        output_folder_root = os.path.join(self.download_path, sub_folder, output_folder)
357        output_folder_data = os.path.join(output_folder_root, "data")
358        os.makedirs(output_folder_data, exist_ok=True)
359
360        # Save metadata to import dataset as Voxel51 dataset
361        self._set_v51_metadata(output_folder_root)
362
363        # Extract file content
364        json_file_path = os.path.join(output_folder_root, "samples.json")
365
366        # Queue for multiprocessing results
367        result_queue = mp.Queue()
368        task_queue = mp.Queue()
369
370        # Add files to the task queue
371        for file_path in files:
372            absolute_file_path = os.path.join(self.download_path, sub_folder, file_path)
373            task_queue.put(absolute_file_path)
374
375        logging.info(
376            f"Added {len(files)} files to task queue. Ready for multiprocessing."
377        )
378
379        # Gather events for extraction workers
380        worker_done_events = []
381        n_extraction_workers = NUM_WORKERS
382        for _ in range(n_extraction_workers):
383            done_event = mp.Event()
384            worker_done_events.append(done_event)
385
386        # Start the result worker
387        result_worker_process = mp.Process(
388            target=self.result_json_worker,
389            args=(result_queue, json_file_path, worker_done_events, log_dir),
390        )
391        result_worker_process.start()
392
393        # Start the data extraction workers
394        n_files_per_worker = int(len(files) / n_extraction_workers)
395        workers = []
396        for done_event in worker_done_events:
397            p = mp.Process(
398                target=self.data_extraction_worker,
399                args=(
400                    task_queue,
401                    result_queue,
402                    done_event,
403                    output_folder_data,
404                    n_files_per_worker,
405                ),
406            )
407            p.start()
408            workers.append(p)
409
410        # Waiting for data extraction workers
411        for p in workers:
412            p.join()
413        logging.info("All data processing workers finished processing.")
414
415        # Waiting for data processing worker
416        result_worker_process.join()
417        logging.info("JSON worker finished processing.")
418
419        task_queue.close()
420        result_queue.close()
421
422        # Load V51 dataset
423        dataset = fo.Dataset(name=dataset_name)
424        dataset.add_dir(
425            dataset_dir=output_folder_root,
426            dataset_type=fo.types.FiftyOneDataset,
427            progress=True,
428        )
429
430        dataset.compute_metadata(num_workers=NUM_WORKERS, progress=True)
431        dataset.persistent = dataset_persistance
432
433        return dataset
434
435    # Worker functions
436    def data_extraction_worker(
437        self,
438        task_queue,
439        result_queue,
440        done_event,
441        output_folder_data,
442        n_files_per_worker,
443    ):
444        """Worker process that extracts data from files in the task queue and puts results in the result queue, processing until queue is empty."""
445        logging.info(f"Process ID: {os.getpid()}. Data extraction process started.")
446
447        n_files_processed = 0
448        n_samples_processed = 0
449        while True:
450            if task_queue.empty() == True:  # If queue is empty, break out
451                break
452            else:
453                try:
454                    file_path = task_queue.get(
455                        timeout=0.1
456                    )  # Get a file path from the task queue
457                    v51_samples = self._process_file(file_path, output_folder_data)
458                    if len(v51_samples) > 0:
459                        result_queue.put(v51_samples)
460                        if n_files_processed % 100 == 0:
461                            logging.info(
462                                f"Worker {os.getpid()} finished {n_samples_processed} samples. {n_files_processed} / ~{n_files_per_worker} files done."
463                            )
464                        n_files_processed += 1
465                        n_samples_processed += len(v51_samples)
466
467                except Exception as e:
468                    logging.error(
469                        f"Error occured during processing of file {os.path.basename(file_path)}: {e}"
470                    )
471                    continue
472
473        # Once all tasks are processed, set the done_event
474        done_event.set()
475        logging.info(f"Data Extraction worker {os.getpid()} shutting down.")
476        return True
477
478    def result_json_worker(
479        self, result_queue, json_file_path, worker_done_events, log_dir
480    ):
481        """Process results from worker queue, aggregate them into a JSON file and log metrics."""
482        # Check if the samples.json file exists and load existing data if it does
483        logging.info(f"Process ID: {os.getpid()}. Results processing process started.")
484
485        writer = SummaryWriter(log_dir=log_dir)
486        n_files_processed = 0
487        n_images_extracted = 0
488
489        v51_samples_dict = {"samples": []}
490
491        # Update the file with incoming results as long as workers are running
492        while len(worker_done_events) > 0 or not result_queue.empty():
493            try:
494                v51_samples = result_queue.get(timeout=1)
495                v51_samples_dict["samples"].extend(v51_samples)
496
497                n_images_extracted += len(v51_samples)
498                writer.add_scalar(
499                    "decode/samples", n_images_extracted, n_files_processed
500                )
501                n_files_processed += 1
502
503                # Log every 100,000 processed samples
504                if n_files_processed % 1_000 == 0:
505                    logging.info(
506                        f"{n_images_extracted} samples added to dict. Items in queue: {result_queue.qsize()}. Active workers: {len(worker_done_events)}"
507                    )
508
509            except Empty:
510                # Empty queue is expected sometimes
511                pass
512
513            except Exception as e:
514                logging.error(f"JSON worker error: {e}")
515                continue
516
517            # Check if any worker is done
518            for event in list(worker_done_events):
519                if event.is_set():
520                    worker_done_events.remove(event)
521
522        # Store data to JSON
523        logging.info(f"Storing data to samples.json")
524        with open(json_file_path, "w") as json_file:
525            json.dump(v51_samples_dict, json_file)
526
527        writer.close()
528        logging.info(f"JSON worker {os.getpid()} shutting down.")
529        return True

Downloads and decodes data from AWS S3 bucket, with support for multiprocessing and Voxel51 dataset creation.

AwsDownloader(bucket, prefix, download_path, test_run)
25    def __init__(self, bucket, prefix, download_path, test_run):
26        """Initialize S3 downloader with bucket, prefix, download path and test flag, setting up AWS credentials from .secret file."""
27        try:
28            with open(".secret", "r") as file:
29                for line in file:
30                    key, value = line.strip().split("=")
31                    os.environ[key] = value
32
33            # S3 Client
34            self.s3 = boto3.client(
35                "s3",
36                aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID", None),
37                aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY", None),
38            )
39
40            self.bucket = bucket
41            self.prefix = prefix
42            self.test_run = test_run
43
44            self.download_path = download_path
45            os.makedirs(download_path, exist_ok=True)
46        except Exception as e:
47            logging.error(f"Connection to S3 client failed: {e}")

Initialize S3 downloader with bucket, prefix, download path and test flag, setting up AWS credentials from .secret file.

def download_files(self, log_dir, MAX_SIZE_TB=1.5):
227    def download_files(self, log_dir, MAX_SIZE_TB=1.5):
228        """Downloads files from AWS S3 bucket and verifies the download success, returning download details and status."""
229        files_to_be_downloaded, total_size_tb = self._list_files(
230            self.bucket, self.prefix
231        )
232
233        writer = SummaryWriter(log_dir=log_dir)
234
235        n_downloaded_files, n_skipped_files = 0, 0
236        if total_size_tb <= MAX_SIZE_TB:
237            for file in tqdm(files_to_be_downloaded, desc="Downloading files from AWS"):
238                time_start = time.time()
239                file_path = os.path.join(self.download_path, file)
240
241                # Ensure directory exists
242                os.makedirs(os.path.dirname(file_path), exist_ok=True)
243
244                if not os.path.exists(file_path):
245                    if self.test_run == False:
246                        self.s3.download_file(self.bucket, file, file_path)
247                    n_downloaded_files += 1
248
249                    time_end = time.time()
250                    duration = time_end - time_start
251                    files_per_second = 1 / duration
252
253                    writer.add_scalar(
254                        "download/files_per_second",
255                        files_per_second,
256                        n_downloaded_files,
257                    )
258                else:
259                    logging.warning(f"Skipping {file}, already exists.")
260                    n_skipped_files += 1
261        else:
262            logging.error(
263                f"Total size of {total_size_tb:.2f} TB exceeds limit of {MAX_SIZE_TB} TB. Skipping download."
264            )
265
266        logging.info(
267            f"Collected {n_downloaded_files} files of total size {total_size_tb:.2f} TB, skipped {n_skipped_files} files."
268        )
269
270        if self.test_run == True:
271            logging.error(
272                f"Test run: No data downloaded. Set 'test_run': False in the config. Aborting."
273            )
274
275        # Check if all files were downloaded properly
276        DOWNLOAD_NUMBER_SUCCESS, DOWNLOAD_SIZE_SUCCESS = False, False
277
278        subfolders = [
279            f.path for f in os.scandir(self.download_path) if f.is_dir()
280        ]  # Each subfolder stands for a requested sample rate. Defaults to "1"
281        if len(subfolders) > 0:
282            sub_folder = os.path.basename(subfolders[0])
283            if len(subfolders) > 1:
284                logging.warning(
285                    f"More than one subfolder in download directory {self.download_path} found, selected {sub_folder}."
286                )
287
288            downloaded_files = []
289            downloaded_size_bytes = 0
290
291            folder_path = os.path.join(self.download_path, sub_folder)
292            files = [
293                f
294                for f in os.listdir(folder_path)
295                if os.path.isfile(os.path.join(folder_path, f))
296            ]
297
298            with tqdm(
299                total=len(files), desc="Checking downloaded files", unit="file"
300            ) as pbar:
301                for file in files:
302                    file_path = os.path.join(folder_path, file)
303                    downloaded_files.append(file_path)
304                    downloaded_size_bytes += os.path.getsize(file_path)
305                    pbar.update(1)
306
307            downloaded_size_tb = downloaded_size_bytes / (1024**4)
308            logging.info(
309                f"Downloaded {len(downloaded_files)} files. Total size: {downloaded_size_tb:.2f} TB"
310            )
311
312            if len(files_to_be_downloaded) == len(downloaded_files):
313                logging.info("All files downloaded successfully.")
314                DOWNLOAD_NUMBER_SUCCESS = True
315            else:
316                logging.error(
317                    f"Only {len(downloaded_files)} of {len(files_to_be_downloaded)} planned files downloaded."
318                )
319                DOWNLOAD_NUMBER_SUCCESS = False
320
321            if total_size_tb == downloaded_size_tb:
322                logging.info("The downloaded size equals the planned download size.")
323                DOWNLOAD_SIZE_SUCCESS = True
324            else:
325                logging.error(
326                    f"The downloaded size of {downloaded_size_tb} TB varies from the planned download size of {total_size_tb} TB."
327                )
328                DOWNLOAD_SIZE_SUCCESS = False
329        else:
330            sub_folder = None
331            logging.error(
332                f"No subfolder found in download directory {self.download_path}."
333            )
334
335        writer.close()
336
337        return (
338            sub_folder,
339            files,
340            files_to_be_downloaded,
341            DOWNLOAD_NUMBER_SUCCESS,
342            DOWNLOAD_SIZE_SUCCESS,
343        )

Downloads files from AWS S3 bucket and verifies the download success, returning download details and status.

def decode_data( self, sub_folder, files, log_dir, dataset_name='annarbor_rolling', output_folder='decoded', dataset_persistance=True):
346    def decode_data(
347        self,
348        sub_folder,
349        files,
350        log_dir,
351        dataset_name="annarbor_rolling",
352        output_folder="decoded",
353        dataset_persistance=True,
354    ):
355        """Decodes downloaded data files into a Voxel51 dataset using multiprocessing workers for parallel processing."""
356        output_folder_root = os.path.join(self.download_path, sub_folder, output_folder)
357        output_folder_data = os.path.join(output_folder_root, "data")
358        os.makedirs(output_folder_data, exist_ok=True)
359
360        # Save metadata to import dataset as Voxel51 dataset
361        self._set_v51_metadata(output_folder_root)
362
363        # Extract file content
364        json_file_path = os.path.join(output_folder_root, "samples.json")
365
366        # Queue for multiprocessing results
367        result_queue = mp.Queue()
368        task_queue = mp.Queue()
369
370        # Add files to the task queue
371        for file_path in files:
372            absolute_file_path = os.path.join(self.download_path, sub_folder, file_path)
373            task_queue.put(absolute_file_path)
374
375        logging.info(
376            f"Added {len(files)} files to task queue. Ready for multiprocessing."
377        )
378
379        # Gather events for extraction workers
380        worker_done_events = []
381        n_extraction_workers = NUM_WORKERS
382        for _ in range(n_extraction_workers):
383            done_event = mp.Event()
384            worker_done_events.append(done_event)
385
386        # Start the result worker
387        result_worker_process = mp.Process(
388            target=self.result_json_worker,
389            args=(result_queue, json_file_path, worker_done_events, log_dir),
390        )
391        result_worker_process.start()
392
393        # Start the data extraction workers
394        n_files_per_worker = int(len(files) / n_extraction_workers)
395        workers = []
396        for done_event in worker_done_events:
397            p = mp.Process(
398                target=self.data_extraction_worker,
399                args=(
400                    task_queue,
401                    result_queue,
402                    done_event,
403                    output_folder_data,
404                    n_files_per_worker,
405                ),
406            )
407            p.start()
408            workers.append(p)
409
410        # Waiting for data extraction workers
411        for p in workers:
412            p.join()
413        logging.info("All data processing workers finished processing.")
414
415        # Waiting for data processing worker
416        result_worker_process.join()
417        logging.info("JSON worker finished processing.")
418
419        task_queue.close()
420        result_queue.close()
421
422        # Load V51 dataset
423        dataset = fo.Dataset(name=dataset_name)
424        dataset.add_dir(
425            dataset_dir=output_folder_root,
426            dataset_type=fo.types.FiftyOneDataset,
427            progress=True,
428        )
429
430        dataset.compute_metadata(num_workers=NUM_WORKERS, progress=True)
431        dataset.persistent = dataset_persistance
432
433        return dataset

Decodes downloaded data files into a Voxel51 dataset using multiprocessing workers for parallel processing.

def data_extraction_worker( self, task_queue, result_queue, done_event, output_folder_data, n_files_per_worker):
436    def data_extraction_worker(
437        self,
438        task_queue,
439        result_queue,
440        done_event,
441        output_folder_data,
442        n_files_per_worker,
443    ):
444        """Worker process that extracts data from files in the task queue and puts results in the result queue, processing until queue is empty."""
445        logging.info(f"Process ID: {os.getpid()}. Data extraction process started.")
446
447        n_files_processed = 0
448        n_samples_processed = 0
449        while True:
450            if task_queue.empty() == True:  # If queue is empty, break out
451                break
452            else:
453                try:
454                    file_path = task_queue.get(
455                        timeout=0.1
456                    )  # Get a file path from the task queue
457                    v51_samples = self._process_file(file_path, output_folder_data)
458                    if len(v51_samples) > 0:
459                        result_queue.put(v51_samples)
460                        if n_files_processed % 100 == 0:
461                            logging.info(
462                                f"Worker {os.getpid()} finished {n_samples_processed} samples. {n_files_processed} / ~{n_files_per_worker} files done."
463                            )
464                        n_files_processed += 1
465                        n_samples_processed += len(v51_samples)
466
467                except Exception as e:
468                    logging.error(
469                        f"Error occured during processing of file {os.path.basename(file_path)}: {e}"
470                    )
471                    continue
472
473        # Once all tasks are processed, set the done_event
474        done_event.set()
475        logging.info(f"Data Extraction worker {os.getpid()} shutting down.")
476        return True

Worker process that extracts data from files in the task queue and puts results in the result queue, processing until queue is empty.

def result_json_worker(self, result_queue, json_file_path, worker_done_events, log_dir):
478    def result_json_worker(
479        self, result_queue, json_file_path, worker_done_events, log_dir
480    ):
481        """Process results from worker queue, aggregate them into a JSON file and log metrics."""
482        # Check if the samples.json file exists and load existing data if it does
483        logging.info(f"Process ID: {os.getpid()}. Results processing process started.")
484
485        writer = SummaryWriter(log_dir=log_dir)
486        n_files_processed = 0
487        n_images_extracted = 0
488
489        v51_samples_dict = {"samples": []}
490
491        # Update the file with incoming results as long as workers are running
492        while len(worker_done_events) > 0 or not result_queue.empty():
493            try:
494                v51_samples = result_queue.get(timeout=1)
495                v51_samples_dict["samples"].extend(v51_samples)
496
497                n_images_extracted += len(v51_samples)
498                writer.add_scalar(
499                    "decode/samples", n_images_extracted, n_files_processed
500                )
501                n_files_processed += 1
502
503                # Log every 100,000 processed samples
504                if n_files_processed % 1_000 == 0:
505                    logging.info(
506                        f"{n_images_extracted} samples added to dict. Items in queue: {result_queue.qsize()}. Active workers: {len(worker_done_events)}"
507                    )
508
509            except Empty:
510                # Empty queue is expected sometimes
511                pass
512
513            except Exception as e:
514                logging.error(f"JSON worker error: {e}")
515                continue
516
517            # Check if any worker is done
518            for event in list(worker_done_events):
519                if event.is_set():
520                    worker_done_events.remove(event)
521
522        # Store data to JSON
523        logging.info(f"Storing data to samples.json")
524        with open(json_file_path, "w") as json_file:
525            json.dump(v51_samples_dict, json_file)
526
527        writer.close()
528        logging.info(f"JSON worker {os.getpid()} shutting down.")
529        return True

Process results from worker queue, aggregate them into a JSON file and log metrics.