aws_stream_filter_framerate

  1import datetime
  2import json
  3import os
  4import shutil
  5
  6import boto3
  7import numpy as np
  8from dotenv import load_dotenv
  9from tqdm import tqdm
 10
 11load_dotenv()
 12
 13S3_BUCKET_NAME = os.environ.get("S3_BUCKET_NAME", "mcity-data-engine")
 14aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID")
 15aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY")
 16region_name = os.environ.get("AWS_DEFAULT_REGION", "us-east-1")
 17
 18session = boto3.Session(
 19    aws_access_key_id=aws_access_key_id,
 20    aws_secret_access_key=aws_secret_access_key,
 21    region_name=region_name,
 22)
 23s3 = session.client("s3")
 24
 25
 26class SampleTimestamps:
 27
 28    def __init__(
 29        self, file_path: str, target_framerate_hz=1, aws_bucket=None, aws_prefix=None
 30    ):
 31        self.file_path = file_path
 32        self.aws_bucket = aws_bucket
 33        self.aws_prefix = aws_prefix
 34        self.target_framerate_hz = target_framerate_hz
 35        self.valid_target_framerate = True
 36        self.current_framerate_hz = None
 37        self.timestamps = []
 38
 39        if file_path:
 40            self.execution_mode = "local"
 41        elif aws_bucket and aws_prefix:
 42            self.execution_mode = "aws"
 43        else:
 44            raise ValueError(
 45                "Either file_path or aws_bucket and aws_prefix must be provided"
 46            )
 47
 48    def get_timestamps(self):
 49        timestamps = []
 50
 51        with open(self.file_path, "r") as file:
 52            total_lines = sum(1 for _ in file)
 53
 54        with open(self.file_path, "r") as file:
 55            # Collecting data
 56            for index, line in enumerate(file):
 57                data = json.loads(line)
 58                if "time" in data and "data" in data:
 59                    # Get time data
 60                    timestamp_raw = data.get("time")
 61                    try:
 62                        # Most timestamps have milliseconds
 63                        timestamp = datetime.datetime.strptime(
 64                            timestamp_raw, "%Y-%m-%d %H:%M:%S.%f"
 65                        )
 66                    except:
 67                        # Timestamps without milliseconds
 68                        timestamp = datetime.datetime.strptime(
 69                            timestamp_raw, "%Y-%m-%d %H:%M:%S"
 70                        )
 71
 72                elif (
 73                    "image" in data
 74                    and "sensor_name" in data
 75                    and "event_timestamp" in data
 76                ):
 77                    # Get time data
 78                    timestamp_raw = data.get("event_timestamp")
 79                    timestamp = datetime.datetime.fromtimestamp(
 80                        timestamp_raw, tz=datetime.timezone.utc
 81                    )
 82
 83                timestamps.append((index, timestamp))
 84
 85        return timestamps
 86
 87    def get_framerate(self, timestamps, log):
 88
 89        # Calculate time differences (s) and current framerate (Hz)
 90        time_differences = []
 91        timestamps = sorted(timestamps, key=lambda x: x[1])
 92
 93        previous_time = None
 94        for index, timestamp in timestamps:
 95            if previous_time is not None:
 96                time_difference = (timestamp - previous_time).total_seconds()
 97                time_differences.append(time_difference)
 98            previous_time = timestamp
 99
100        # Statistics about timestamp distribution
101        average_time_diff = np.mean(time_differences)
102        median_time_diff = np.median(time_differences)
103        std_time_diff = np.std(time_differences)
104        min_time_diff = np.min(time_differences)
105        max_time_diff = np.max(time_differences)
106        range_time_diff = max_time_diff - min_time_diff
107        q1_time_diff = np.percentile(time_differences, 25)
108        q3_time_diff = np.percentile(time_differences, 75)
109        current_framerate_hz = 1 / median_time_diff  # Median is more robust to outliers
110
111        log["time_s_avg_between_timestamps"] = average_time_diff
112        log["time_s_median_between_timestamps"] = median_time_diff
113        log["time_s_std_between_timestamps"] = std_time_diff
114        log["time_s_min_between_timestamps"] = min_time_diff
115        log["time_s_max_between_timestamps"] = max_time_diff
116        log["time_s_range_between_timestamps"] = range_time_diff
117        log["time_s_25_percentile_between_timestamps"] = q1_time_diff
118        log["time_s_75_percentile_between_timestamps"] = q3_time_diff
119        log["framerate_hz_original"] = current_framerate_hz
120        log["framerate_hz_target"] = self.target_framerate_hz
121
122        # Compute threshold
123        interquartile_range = q3_time_diff - q1_time_diff  # Interquartile Range
124        upper_bound_threshold = (
125            q3_time_diff + 1.5 * interquartile_range
126        )  # (1.5 * IQR rule)
127        log["upper_bound_threshold"] = upper_bound_threshold
128
129        return current_framerate_hz, timestamps, upper_bound_threshold
130
131    def check_target_framerate(self, current_framerate_hz, log):
132        # Check if target framerate is valid
133        if self.target_framerate_hz > current_framerate_hz:
134            print(
135                f"Target framerate of {self.target_framerate_hz} Hz cannot exceed original framerate of {current_framerate_hz} Hz"
136            )
137            log["framerate_target_ok"] = False
138            return False
139        else:
140            log["framerate_target_ok"] = True
141            return True
142
143    def sample_timestamps(self, timestamps, threshold_to_target, log):
144        # Generate target timestamps
145        start_time = timestamps[0][1]
146        end_time = timestamps[-1][1]
147        target_timestamps = []
148        current_time = start_time
149        while current_time <= end_time:
150            target_timestamps.append(current_time)
151            current_time += datetime.timedelta(seconds=1 / self.target_framerate_hz)
152
153        # Find nearest original timestamps to target_timestamps_seconds
154        selected_indices = []
155        selected_timestamps = []
156        selected_target_timestamps = []
157
158        for target in target_timestamps:
159            # Compute the time difference with each original timestamp
160            time_diffs = [(target - t).total_seconds() for i, t in timestamps]
161            time_diffs = np.abs(time_diffs)  # Take absolute differences
162
163            # Find the index of the nearest timestamp
164            nearest_index = np.argmin(time_diffs)
165
166            # Ensure no duplicates are selected
167            if (
168                time_diffs[nearest_index] <= threshold_to_target
169                and nearest_index not in selected_indices
170            ):
171                selected_target_timestamps.append(target)
172                selected_indices.append(nearest_index)
173                selected_timestamps.append(timestamps[nearest_index])
174
175        # Log statistics
176        log["n_original_timestamps"] = len(timestamps)
177        log["n_target_timestamps"] = len(target_timestamps)
178        log["n_selected_timestamps"] = len(selected_timestamps)
179
180        if len(selected_timestamps) >=2:
181            # Compute new framerate
182            time_differences_new = []
183            timestamps_new = sorted(selected_timestamps, key=lambda x: x[1])
184
185            previous_time = None
186            for index, timestamp in timestamps_new:
187                if previous_time is not None:
188                    time_difference = (timestamp - previous_time).total_seconds()
189                    time_differences_new.append(time_difference)
190                previous_time = timestamp
191            median_time_diff_new = np.median(time_differences_new)
192            new_framerate_hz = 1 / median_time_diff_new
193            log["framerate_hz_sampled"] = new_framerate_hz
194
195        else:
196            print(f"Not enough selected timestamps ({len(selected_timestamps)}) to compute new framerate. Original number of timestamps: {len(timestamps)}")
197
198        return (
199            selected_indices,
200            selected_timestamps,
201            target_timestamps,
202            selected_target_timestamps,
203        )
204
205    def update_upload_file(self, file_name, selected_indices, aws_source_path):
206        output_file_path = file_name + f"_sampled_{self.target_framerate_hz}Hz"
207        lines = []
208        with open(file_name, "r") as file:
209            for index, line in enumerate(file):
210                if index in selected_indices:
211                    lines.append(line)
212
213        with open(output_file_path, "w") as output_file:
214            output_file.writelines(lines)
215
216        try:
217            head, tail = os.path.split(output_file.name)
218            file_size_mb = os.path.getsize(output_file.name) / (1024 * 1024)
219            s3.upload_file(
220                output_file.name,
221                S3_BUCKET_NAME,
222                str(self.target_framerate_hz) + "/" + aws_source_path +"_"+ tail,
223            )
224            return file_size_mb
225
226        except Exception as e:
227            print(
228                "S3 upload failed for file "
229                + str(str(self.target_framerate_hz) + "/" + tail)
230                + " - "
231                + str(e)
232            )
233            return None
S3_BUCKET_NAME = 'mcity-data-engine'
aws_access_key_id = None
aws_secret_access_key = None
region_name = 'us-east-1'
session = Session(region_name='us-east-1')
s3 = <botocore.client.S3 object>
class SampleTimestamps:
 27class SampleTimestamps:
 28
 29    def __init__(
 30        self, file_path: str, target_framerate_hz=1, aws_bucket=None, aws_prefix=None
 31    ):
 32        self.file_path = file_path
 33        self.aws_bucket = aws_bucket
 34        self.aws_prefix = aws_prefix
 35        self.target_framerate_hz = target_framerate_hz
 36        self.valid_target_framerate = True
 37        self.current_framerate_hz = None
 38        self.timestamps = []
 39
 40        if file_path:
 41            self.execution_mode = "local"
 42        elif aws_bucket and aws_prefix:
 43            self.execution_mode = "aws"
 44        else:
 45            raise ValueError(
 46                "Either file_path or aws_bucket and aws_prefix must be provided"
 47            )
 48
 49    def get_timestamps(self):
 50        timestamps = []
 51
 52        with open(self.file_path, "r") as file:
 53            total_lines = sum(1 for _ in file)
 54
 55        with open(self.file_path, "r") as file:
 56            # Collecting data
 57            for index, line in enumerate(file):
 58                data = json.loads(line)
 59                if "time" in data and "data" in data:
 60                    # Get time data
 61                    timestamp_raw = data.get("time")
 62                    try:
 63                        # Most timestamps have milliseconds
 64                        timestamp = datetime.datetime.strptime(
 65                            timestamp_raw, "%Y-%m-%d %H:%M:%S.%f"
 66                        )
 67                    except:
 68                        # Timestamps without milliseconds
 69                        timestamp = datetime.datetime.strptime(
 70                            timestamp_raw, "%Y-%m-%d %H:%M:%S"
 71                        )
 72
 73                elif (
 74                    "image" in data
 75                    and "sensor_name" in data
 76                    and "event_timestamp" in data
 77                ):
 78                    # Get time data
 79                    timestamp_raw = data.get("event_timestamp")
 80                    timestamp = datetime.datetime.fromtimestamp(
 81                        timestamp_raw, tz=datetime.timezone.utc
 82                    )
 83
 84                timestamps.append((index, timestamp))
 85
 86        return timestamps
 87
 88    def get_framerate(self, timestamps, log):
 89
 90        # Calculate time differences (s) and current framerate (Hz)
 91        time_differences = []
 92        timestamps = sorted(timestamps, key=lambda x: x[1])
 93
 94        previous_time = None
 95        for index, timestamp in timestamps:
 96            if previous_time is not None:
 97                time_difference = (timestamp - previous_time).total_seconds()
 98                time_differences.append(time_difference)
 99            previous_time = timestamp
100
101        # Statistics about timestamp distribution
102        average_time_diff = np.mean(time_differences)
103        median_time_diff = np.median(time_differences)
104        std_time_diff = np.std(time_differences)
105        min_time_diff = np.min(time_differences)
106        max_time_diff = np.max(time_differences)
107        range_time_diff = max_time_diff - min_time_diff
108        q1_time_diff = np.percentile(time_differences, 25)
109        q3_time_diff = np.percentile(time_differences, 75)
110        current_framerate_hz = 1 / median_time_diff  # Median is more robust to outliers
111
112        log["time_s_avg_between_timestamps"] = average_time_diff
113        log["time_s_median_between_timestamps"] = median_time_diff
114        log["time_s_std_between_timestamps"] = std_time_diff
115        log["time_s_min_between_timestamps"] = min_time_diff
116        log["time_s_max_between_timestamps"] = max_time_diff
117        log["time_s_range_between_timestamps"] = range_time_diff
118        log["time_s_25_percentile_between_timestamps"] = q1_time_diff
119        log["time_s_75_percentile_between_timestamps"] = q3_time_diff
120        log["framerate_hz_original"] = current_framerate_hz
121        log["framerate_hz_target"] = self.target_framerate_hz
122
123        # Compute threshold
124        interquartile_range = q3_time_diff - q1_time_diff  # Interquartile Range
125        upper_bound_threshold = (
126            q3_time_diff + 1.5 * interquartile_range
127        )  # (1.5 * IQR rule)
128        log["upper_bound_threshold"] = upper_bound_threshold
129
130        return current_framerate_hz, timestamps, upper_bound_threshold
131
132    def check_target_framerate(self, current_framerate_hz, log):
133        # Check if target framerate is valid
134        if self.target_framerate_hz > current_framerate_hz:
135            print(
136                f"Target framerate of {self.target_framerate_hz} Hz cannot exceed original framerate of {current_framerate_hz} Hz"
137            )
138            log["framerate_target_ok"] = False
139            return False
140        else:
141            log["framerate_target_ok"] = True
142            return True
143
144    def sample_timestamps(self, timestamps, threshold_to_target, log):
145        # Generate target timestamps
146        start_time = timestamps[0][1]
147        end_time = timestamps[-1][1]
148        target_timestamps = []
149        current_time = start_time
150        while current_time <= end_time:
151            target_timestamps.append(current_time)
152            current_time += datetime.timedelta(seconds=1 / self.target_framerate_hz)
153
154        # Find nearest original timestamps to target_timestamps_seconds
155        selected_indices = []
156        selected_timestamps = []
157        selected_target_timestamps = []
158
159        for target in target_timestamps:
160            # Compute the time difference with each original timestamp
161            time_diffs = [(target - t).total_seconds() for i, t in timestamps]
162            time_diffs = np.abs(time_diffs)  # Take absolute differences
163
164            # Find the index of the nearest timestamp
165            nearest_index = np.argmin(time_diffs)
166
167            # Ensure no duplicates are selected
168            if (
169                time_diffs[nearest_index] <= threshold_to_target
170                and nearest_index not in selected_indices
171            ):
172                selected_target_timestamps.append(target)
173                selected_indices.append(nearest_index)
174                selected_timestamps.append(timestamps[nearest_index])
175
176        # Log statistics
177        log["n_original_timestamps"] = len(timestamps)
178        log["n_target_timestamps"] = len(target_timestamps)
179        log["n_selected_timestamps"] = len(selected_timestamps)
180
181        if len(selected_timestamps) >=2:
182            # Compute new framerate
183            time_differences_new = []
184            timestamps_new = sorted(selected_timestamps, key=lambda x: x[1])
185
186            previous_time = None
187            for index, timestamp in timestamps_new:
188                if previous_time is not None:
189                    time_difference = (timestamp - previous_time).total_seconds()
190                    time_differences_new.append(time_difference)
191                previous_time = timestamp
192            median_time_diff_new = np.median(time_differences_new)
193            new_framerate_hz = 1 / median_time_diff_new
194            log["framerate_hz_sampled"] = new_framerate_hz
195
196        else:
197            print(f"Not enough selected timestamps ({len(selected_timestamps)}) to compute new framerate. Original number of timestamps: {len(timestamps)}")
198
199        return (
200            selected_indices,
201            selected_timestamps,
202            target_timestamps,
203            selected_target_timestamps,
204        )
205
206    def update_upload_file(self, file_name, selected_indices, aws_source_path):
207        output_file_path = file_name + f"_sampled_{self.target_framerate_hz}Hz"
208        lines = []
209        with open(file_name, "r") as file:
210            for index, line in enumerate(file):
211                if index in selected_indices:
212                    lines.append(line)
213
214        with open(output_file_path, "w") as output_file:
215            output_file.writelines(lines)
216
217        try:
218            head, tail = os.path.split(output_file.name)
219            file_size_mb = os.path.getsize(output_file.name) / (1024 * 1024)
220            s3.upload_file(
221                output_file.name,
222                S3_BUCKET_NAME,
223                str(self.target_framerate_hz) + "/" + aws_source_path +"_"+ tail,
224            )
225            return file_size_mb
226
227        except Exception as e:
228            print(
229                "S3 upload failed for file "
230                + str(str(self.target_framerate_hz) + "/" + tail)
231                + " - "
232                + str(e)
233            )
234            return None
SampleTimestamps( file_path: str, target_framerate_hz=1, aws_bucket=None, aws_prefix=None)
29    def __init__(
30        self, file_path: str, target_framerate_hz=1, aws_bucket=None, aws_prefix=None
31    ):
32        self.file_path = file_path
33        self.aws_bucket = aws_bucket
34        self.aws_prefix = aws_prefix
35        self.target_framerate_hz = target_framerate_hz
36        self.valid_target_framerate = True
37        self.current_framerate_hz = None
38        self.timestamps = []
39
40        if file_path:
41            self.execution_mode = "local"
42        elif aws_bucket and aws_prefix:
43            self.execution_mode = "aws"
44        else:
45            raise ValueError(
46                "Either file_path or aws_bucket and aws_prefix must be provided"
47            )
file_path
aws_bucket
aws_prefix
target_framerate_hz
valid_target_framerate
current_framerate_hz
timestamps
def get_timestamps(self):
49    def get_timestamps(self):
50        timestamps = []
51
52        with open(self.file_path, "r") as file:
53            total_lines = sum(1 for _ in file)
54
55        with open(self.file_path, "r") as file:
56            # Collecting data
57            for index, line in enumerate(file):
58                data = json.loads(line)
59                if "time" in data and "data" in data:
60                    # Get time data
61                    timestamp_raw = data.get("time")
62                    try:
63                        # Most timestamps have milliseconds
64                        timestamp = datetime.datetime.strptime(
65                            timestamp_raw, "%Y-%m-%d %H:%M:%S.%f"
66                        )
67                    except:
68                        # Timestamps without milliseconds
69                        timestamp = datetime.datetime.strptime(
70                            timestamp_raw, "%Y-%m-%d %H:%M:%S"
71                        )
72
73                elif (
74                    "image" in data
75                    and "sensor_name" in data
76                    and "event_timestamp" in data
77                ):
78                    # Get time data
79                    timestamp_raw = data.get("event_timestamp")
80                    timestamp = datetime.datetime.fromtimestamp(
81                        timestamp_raw, tz=datetime.timezone.utc
82                    )
83
84                timestamps.append((index, timestamp))
85
86        return timestamps
def get_framerate(self, timestamps, log):
 88    def get_framerate(self, timestamps, log):
 89
 90        # Calculate time differences (s) and current framerate (Hz)
 91        time_differences = []
 92        timestamps = sorted(timestamps, key=lambda x: x[1])
 93
 94        previous_time = None
 95        for index, timestamp in timestamps:
 96            if previous_time is not None:
 97                time_difference = (timestamp - previous_time).total_seconds()
 98                time_differences.append(time_difference)
 99            previous_time = timestamp
100
101        # Statistics about timestamp distribution
102        average_time_diff = np.mean(time_differences)
103        median_time_diff = np.median(time_differences)
104        std_time_diff = np.std(time_differences)
105        min_time_diff = np.min(time_differences)
106        max_time_diff = np.max(time_differences)
107        range_time_diff = max_time_diff - min_time_diff
108        q1_time_diff = np.percentile(time_differences, 25)
109        q3_time_diff = np.percentile(time_differences, 75)
110        current_framerate_hz = 1 / median_time_diff  # Median is more robust to outliers
111
112        log["time_s_avg_between_timestamps"] = average_time_diff
113        log["time_s_median_between_timestamps"] = median_time_diff
114        log["time_s_std_between_timestamps"] = std_time_diff
115        log["time_s_min_between_timestamps"] = min_time_diff
116        log["time_s_max_between_timestamps"] = max_time_diff
117        log["time_s_range_between_timestamps"] = range_time_diff
118        log["time_s_25_percentile_between_timestamps"] = q1_time_diff
119        log["time_s_75_percentile_between_timestamps"] = q3_time_diff
120        log["framerate_hz_original"] = current_framerate_hz
121        log["framerate_hz_target"] = self.target_framerate_hz
122
123        # Compute threshold
124        interquartile_range = q3_time_diff - q1_time_diff  # Interquartile Range
125        upper_bound_threshold = (
126            q3_time_diff + 1.5 * interquartile_range
127        )  # (1.5 * IQR rule)
128        log["upper_bound_threshold"] = upper_bound_threshold
129
130        return current_framerate_hz, timestamps, upper_bound_threshold
def check_target_framerate(self, current_framerate_hz, log):
132    def check_target_framerate(self, current_framerate_hz, log):
133        # Check if target framerate is valid
134        if self.target_framerate_hz > current_framerate_hz:
135            print(
136                f"Target framerate of {self.target_framerate_hz} Hz cannot exceed original framerate of {current_framerate_hz} Hz"
137            )
138            log["framerate_target_ok"] = False
139            return False
140        else:
141            log["framerate_target_ok"] = True
142            return True
def sample_timestamps(self, timestamps, threshold_to_target, log):
144    def sample_timestamps(self, timestamps, threshold_to_target, log):
145        # Generate target timestamps
146        start_time = timestamps[0][1]
147        end_time = timestamps[-1][1]
148        target_timestamps = []
149        current_time = start_time
150        while current_time <= end_time:
151            target_timestamps.append(current_time)
152            current_time += datetime.timedelta(seconds=1 / self.target_framerate_hz)
153
154        # Find nearest original timestamps to target_timestamps_seconds
155        selected_indices = []
156        selected_timestamps = []
157        selected_target_timestamps = []
158
159        for target in target_timestamps:
160            # Compute the time difference with each original timestamp
161            time_diffs = [(target - t).total_seconds() for i, t in timestamps]
162            time_diffs = np.abs(time_diffs)  # Take absolute differences
163
164            # Find the index of the nearest timestamp
165            nearest_index = np.argmin(time_diffs)
166
167            # Ensure no duplicates are selected
168            if (
169                time_diffs[nearest_index] <= threshold_to_target
170                and nearest_index not in selected_indices
171            ):
172                selected_target_timestamps.append(target)
173                selected_indices.append(nearest_index)
174                selected_timestamps.append(timestamps[nearest_index])
175
176        # Log statistics
177        log["n_original_timestamps"] = len(timestamps)
178        log["n_target_timestamps"] = len(target_timestamps)
179        log["n_selected_timestamps"] = len(selected_timestamps)
180
181        if len(selected_timestamps) >=2:
182            # Compute new framerate
183            time_differences_new = []
184            timestamps_new = sorted(selected_timestamps, key=lambda x: x[1])
185
186            previous_time = None
187            for index, timestamp in timestamps_new:
188                if previous_time is not None:
189                    time_difference = (timestamp - previous_time).total_seconds()
190                    time_differences_new.append(time_difference)
191                previous_time = timestamp
192            median_time_diff_new = np.median(time_differences_new)
193            new_framerate_hz = 1 / median_time_diff_new
194            log["framerate_hz_sampled"] = new_framerate_hz
195
196        else:
197            print(f"Not enough selected timestamps ({len(selected_timestamps)}) to compute new framerate. Original number of timestamps: {len(timestamps)}")
198
199        return (
200            selected_indices,
201            selected_timestamps,
202            target_timestamps,
203            selected_target_timestamps,
204        )
def update_upload_file(self, file_name, selected_indices, aws_source_path):
206    def update_upload_file(self, file_name, selected_indices, aws_source_path):
207        output_file_path = file_name + f"_sampled_{self.target_framerate_hz}Hz"
208        lines = []
209        with open(file_name, "r") as file:
210            for index, line in enumerate(file):
211                if index in selected_indices:
212                    lines.append(line)
213
214        with open(output_file_path, "w") as output_file:
215            output_file.writelines(lines)
216
217        try:
218            head, tail = os.path.split(output_file.name)
219            file_size_mb = os.path.getsize(output_file.name) / (1024 * 1024)
220            s3.upload_file(
221                output_file.name,
222                S3_BUCKET_NAME,
223                str(self.target_framerate_hz) + "/" + aws_source_path +"_"+ tail,
224            )
225            return file_size_mb
226
227        except Exception as e:
228            print(
229                "S3 upload failed for file "
230                + str(str(self.target_framerate_hz) + "/" + tail)
231                + " - "
232                + str(e)
233            )
234            return None